Skip to main content

snarkos_node_bft/
bft.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31
32use snarkos_account::Account;
33use snarkos_node_bft_ledger_service::LedgerService;
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_utilities::NodeDataDir;
36
37use snarkvm::{
38    console::account::Address,
39    ledger::{
40        block::Transaction,
41        committee::Committee,
42        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
43        puzzle::{Solution, SolutionID},
44    },
45    prelude::{Field, Network, Result, bail, ensure},
46    utilities::flatten_error,
47};
48
49use anyhow::Context;
50use colored::Colorize;
51use indexmap::{IndexMap, IndexSet};
52#[cfg(feature = "locktick")]
53use locktick::{
54    parking_lot::{Mutex, RwLock},
55    tokio::Mutex as TMutex,
56};
57#[cfg(not(feature = "locktick"))]
58use parking_lot::{Mutex, RwLock};
59use std::{
60    collections::{BTreeMap, HashSet},
61    future::Future,
62    net::SocketAddr,
63    sync::{
64        Arc,
65        atomic::{AtomicI64, Ordering},
66    },
67};
68#[cfg(not(feature = "locktick"))]
69use tokio::sync::Mutex as TMutex;
70use tokio::{
71    sync::{OnceCell, oneshot},
72    task::JoinHandle,
73};
74
75#[derive(Clone)]
76pub struct BFT<N: Network> {
77    /// The primary for this node.
78    primary: Primary<N>,
79    /// The DAG of batches from which we build the blockchain.
80    dag: Arc<RwLock<DAG<N>>>,
81    /// The batch certificate of the leader from the current even round, if one was present.
82    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
83    /// The timer for the leader certificate to be received.
84    leader_certificate_timer: Arc<AtomicI64>,
85    /// The consensus sender.
86    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
87    /// Handles for all spawned tasks.
88    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
89    /// The BFT lock.
90    lock: Arc<TMutex<()>>,
91}
92
93impl<N: Network> BFT<N> {
94    /// Initializes a new instance of the BFT.
95    #[allow(clippy::too_many_arguments)]
96    pub fn new(
97        account: Account<N>,
98        storage: Storage<N>,
99        ledger: Arc<dyn LedgerService<N>>,
100        block_sync: Arc<BlockSync<N>>,
101        ip: Option<SocketAddr>,
102        trusted_validators: &[SocketAddr],
103        trusted_peers_only: bool,
104        node_data_dir: NodeDataDir,
105        dev: Option<u16>,
106    ) -> Result<Self> {
107        Ok(Self {
108            primary: Primary::new(
109                account,
110                storage,
111                ledger,
112                block_sync,
113                ip,
114                trusted_validators,
115                trusted_peers_only,
116                node_data_dir,
117                dev,
118            )?,
119            dag: Default::default(),
120            leader_certificate: Default::default(),
121            leader_certificate_timer: Default::default(),
122            consensus_sender: Default::default(),
123            handles: Default::default(),
124            lock: Default::default(),
125        })
126    }
127
128    /// Run the BFT instance.
129    ///
130    /// This will return as soon as all required tasks are spawned.
131    /// The function must not be called more than once per instance.
132    pub async fn run(
133        &mut self,
134        ping: Option<Arc<Ping<N>>>,
135        consensus_sender: Option<ConsensusSender<N>>,
136        primary_sender: PrimarySender<N>,
137        primary_receiver: PrimaryReceiver<N>,
138    ) -> Result<()> {
139        info!("Starting the BFT instance...");
140        // Initialize the BFT channels.
141        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
142        // First, start the BFT handlers.
143        self.start_handlers(bft_receiver);
144        // Next, run the primary instance.
145        self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
146        // Lastly, set the consensus sender.
147        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
148        if let Some(consensus_sender) = consensus_sender {
149            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
150        }
151        Ok(())
152    }
153
154    /// Returns `true` if the primary is synced.
155    pub fn is_synced(&self) -> bool {
156        self.primary.is_synced()
157    }
158
159    /// Returns the primary.
160    pub const fn primary(&self) -> &Primary<N> {
161        &self.primary
162    }
163
164    /// Returns the storage.
165    pub const fn storage(&self) -> &Storage<N> {
166        self.primary.storage()
167    }
168
169    /// Returns the ledger.
170    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
171        self.primary.ledger()
172    }
173
174    /// Returns the leader of the current even round, if one was present.
175    pub fn leader(&self) -> Option<Address<N>> {
176        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
177    }
178
179    /// Returns the certificate of the leader from the current even round, if one was present.
180    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
181        &self.leader_certificate
182    }
183}
184
185impl<N: Network> BFT<N> {
186    /// Returns the number of unconfirmed transmissions.
187    pub fn num_unconfirmed_transmissions(&self) -> usize {
188        self.primary.num_unconfirmed_transmissions()
189    }
190
191    /// Returns the number of unconfirmed ratifications.
192    pub fn num_unconfirmed_ratifications(&self) -> usize {
193        self.primary.num_unconfirmed_ratifications()
194    }
195
196    /// Returns the number of solutions.
197    pub fn num_unconfirmed_solutions(&self) -> usize {
198        self.primary.num_unconfirmed_solutions()
199    }
200
201    /// Returns the number of unconfirmed transactions.
202    pub fn num_unconfirmed_transactions(&self) -> usize {
203        self.primary.num_unconfirmed_transactions()
204    }
205}
206
207impl<N: Network> BFT<N> {
208    /// Returns the worker transmission IDs.
209    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210        self.primary.worker_transmission_ids()
211    }
212
213    /// Returns the worker transmissions.
214    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215        self.primary.worker_transmissions()
216    }
217
218    /// Returns the worker solutions.
219    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220        self.primary.worker_solutions()
221    }
222
223    /// Returns the worker transactions.
224    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225        self.primary.worker_transactions()
226    }
227}
228
229impl<N: Network> BFT<N> {
230    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
231    fn update_to_next_round(&self, current_round: u64) -> bool {
232        // Ensure the current round is at least the storage round (this is a sanity check).
233        let storage_round = self.storage().current_round();
234        if current_round < storage_round {
235            debug!(
236                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
237            );
238            return false;
239        }
240
241        // Determine if the BFT is ready to update to the next round.
242        let is_ready = match current_round % 2 == 0 {
243            true => self.update_leader_certificate_to_even_round(current_round),
244            false => self.is_leader_quorum_or_nonleaders_available(current_round),
245        };
246
247        #[cfg(feature = "metrics")]
248        {
249            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
250            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
251            if start > 0 {
252                let end = now();
253                let elapsed = std::time::Duration::from_secs((end - start) as u64);
254                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
255            }
256        }
257
258        // Log whether the round is going to update.
259        if current_round % 2 == 0 {
260            // Determine if there is a leader certificate.
261            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
262                // Ensure the state of the leader certificate is consistent with the BFT being ready.
263                if !is_ready {
264                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
265                }
266                // Log the leader election.
267                let leader_round = leader_certificate.round();
268                match leader_round == current_round {
269                    true => {
270                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
271                        #[cfg(feature = "metrics")]
272                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
273                    }
274                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
275                }
276            } else {
277                match is_ready {
278                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
279                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
280                }
281            }
282        }
283
284        // If the BFT is ready, then update to the next round.
285        if is_ready {
286            // Update to the next round in storage.
287            if let Err(err) = self
288                .storage()
289                .increment_to_next_round(current_round)
290                .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
291            {
292                warn!("{}", &flatten_error(err));
293                return false;
294            }
295            // Update the timer for the leader certificate.
296            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
297        }
298
299        is_ready
300    }
301
302    /// Updates the leader certificate to the current even round,
303    /// returning `true` if the BFT is ready to update to the next round.
304    ///
305    /// This method runs on every even round, by determining the leader of the current even round,
306    /// and setting the leader certificate to their certificate in the round, if they were present.
307    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
308        // Retrieve the current round.
309        let current_round = self.storage().current_round();
310        // Ensure the current round matches the given round.
311        if current_round != even_round {
312            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
313            return false;
314        }
315
316        // If the current round is odd, return false.
317        if current_round % 2 != 0 || current_round < 2 {
318            error!("BFT cannot update the leader certificate in an odd round");
319            return false;
320        }
321
322        // Retrieve the certificates for the current round.
323        let current_certificates = self.storage().get_certificates_for_round(current_round);
324        // If there are no current certificates, set the leader certificate to 'None', and return early.
325        if current_certificates.is_empty() {
326            // Set the leader certificate to 'None'.
327            *self.leader_certificate.write() = None;
328            return false;
329        }
330
331        // Retrieve the committee lookback of the current round.
332        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
333            Ok(committee) => committee,
334            Err(err) => {
335                let err = err.context(format!(
336                    "BFT failed to retrieve the committee lookback for the even round {current_round}"
337                ));
338                warn!("{}", &flatten_error(err));
339                return false;
340            }
341        };
342        // Determine the leader of the current round.
343        let leader = match self.ledger().latest_leader() {
344            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
345            _ => {
346                // Compute the leader for the current round.
347                let computed_leader = match committee_lookback.get_leader(current_round) {
348                    Ok(leader) => leader,
349                    Err(err) => {
350                        let err =
351                            err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
352                        error!("{}", &flatten_error(err));
353                        return false;
354                    }
355                };
356
357                // Cache the computed leader.
358                self.ledger().update_latest_leader(current_round, computed_leader);
359
360                computed_leader
361            }
362        };
363        // Find and set the leader certificate, if the leader was present in the current even round.
364        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
365        *self.leader_certificate.write() = leader_certificate.cloned();
366
367        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
368    }
369
370    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
371    ///  - If the leader certificate is set for the current even round.
372    ///  - The timer for the leader certificate has expired.
373    fn is_even_round_ready_for_next_round(
374        &self,
375        certificates: IndexSet<BatchCertificate<N>>,
376        committee: Committee<N>,
377        current_round: u64,
378    ) -> bool {
379        // Retrieve the authors for the current round.
380        let authors = certificates.into_iter().map(|c| c.author()).collect();
381        // Check if quorum threshold is reached.
382        if !committee.is_quorum_threshold_reached(&authors) {
383            trace!("BFT failed to reach quorum threshold in even round {current_round}");
384            return false;
385        }
386        // If the leader certificate is set for the current even round, return 'true'.
387        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
388            if leader_certificate.round() == current_round {
389                return true;
390            }
391        }
392        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
393        if self.is_timer_expired() {
394            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
395            return true;
396        }
397        // Otherwise, return 'false'.
398        false
399    }
400
401    /// Returns `true` if the timer for the leader certificate has expired.
402    ///
403    /// This is always true for a new BFT instance.
404    fn is_timer_expired(&self) -> bool {
405        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
406    }
407
408    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
409    ///  - The leader certificate is `None`.
410    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
411    ///  - The leader certificate timer has expired.
412    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
413        // Retrieve the current round.
414        let current_round = self.storage().current_round();
415        // Ensure the current round matches the given round.
416        if current_round != odd_round {
417            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
418            return false;
419        }
420        // If the current round is even, return false.
421        if current_round % 2 != 1 {
422            error!("BFT does not compute stakes for the leader certificate in an even round");
423            return false;
424        }
425        // Retrieve the certificates for the current round.
426        let current_certificates = self.storage().get_certificates_for_round(current_round);
427        // Retrieve the committee lookback for the current round.
428        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
429            Ok(committee) => committee,
430            Err(err) => {
431                let err = err.context(format!(
432                    "BFT failed to retrieve the committee lookback for the odd round {current_round}"
433                ));
434                error!("{}", &flatten_error(err));
435                return false;
436            }
437        };
438        // Retrieve the authors of the current certificates.
439        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
440        // Check if quorum threshold is reached.
441        if !committee_lookback.is_quorum_threshold_reached(&authors) {
442            trace!("BFT failed reach quorum threshold in odd round {current_round}.");
443            return false;
444        }
445        // Retrieve the leader certificate.
446        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
447            // If there is no leader certificate for the previous round, return 'true'.
448            return true;
449        };
450        // Compute the stake for the leader certificate.
451        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
452            leader_certificate.id(),
453            current_certificates,
454            &committee_lookback,
455        );
456        // Return 'true' if any of the following conditions hold:
457        stake_with_leader >= committee_lookback.availability_threshold()
458            || stake_without_leader >= committee_lookback.quorum_threshold()
459            || self.is_timer_expired()
460    }
461
462    /// Computes the amount of stake that has & has not signed for the leader certificate.
463    fn compute_stake_for_leader_certificate(
464        &self,
465        leader_certificate_id: Field<N>,
466        current_certificates: IndexSet<BatchCertificate<N>>,
467        current_committee: &Committee<N>,
468    ) -> (u64, u64) {
469        // If there are no current certificates, return early.
470        if current_certificates.is_empty() {
471            return (0, 0);
472        }
473
474        // Initialize a tracker for the stake with the leader.
475        let mut stake_with_leader = 0u64;
476        // Initialize a tracker for the stake without the leader.
477        let mut stake_without_leader = 0u64;
478        // Iterate over the current certificates.
479        for certificate in current_certificates {
480            // Retrieve the stake for the author of the certificate.
481            let stake = current_committee.get_stake(certificate.author());
482            // Determine if the certificate includes the leader.
483            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
484                // If the certificate includes the leader, add the stake to the stake with the leader.
485                true => stake_with_leader = stake_with_leader.saturating_add(stake),
486                // If the certificate does not include the leader, add the stake to the stake without the leader.
487                false => stake_without_leader = stake_without_leader.saturating_add(stake),
488            }
489        }
490        // Return the stake with the leader, and the stake without the leader.
491        (stake_with_leader, stake_without_leader)
492    }
493}
494
495impl<N: Network> BFT<N> {
496    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
497    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
498        &self,
499        certificate: BatchCertificate<N>,
500    ) -> Result<()> {
501        // Acquire the BFT lock.
502        let _lock = self.lock.lock().await;
503
504        // ### First, insert the certificate into the DAG. ###
505        // Retrieve the round of the new certificate to add to the DAG.
506        let certificate_round = certificate.round();
507
508        // Insert the certificate into the DAG.
509        self.dag.write().insert(certificate);
510
511        // ### Second, determine if a new leader certificate can be committed. ###
512        let commit_round = certificate_round.saturating_sub(1);
513
514        // Leaders are elected in even rounds.
515        // If the previous round is odd, the current round cannot commit any leader certs.
516        // Similarly, no leader certificate can be committed for round zero.
517        if !commit_round.is_multiple_of(2) || commit_round < 2 {
518            return Ok(());
519        }
520        // If the commit round is at or below the last committed round, return early.
521        if commit_round <= self.dag.read().last_committed_round() {
522            return Ok(());
523        }
524
525        /* Proceeding to check if the leader is ready to be committed. */
526        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
527
528        // Retrieve the committee lookback for the commit round.
529        let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
530            format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
531        })?;
532
533        // Either retrieve the cached leader or compute it.
534        let leader = match self.ledger().latest_leader() {
535            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
536            _ => {
537                // Compute the leader for the commit round.
538                let computed_leader = committee_lookback
539                    .get_leader(commit_round)
540                    .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
541
542                // Cache the computed leader.
543                self.ledger().update_latest_leader(commit_round, computed_leader);
544
545                computed_leader
546            }
547        };
548
549        // Retrieve the leader certificate for the commit round.
550        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
551        else {
552            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
553            return Ok(());
554        };
555        // Retrieve all of the certificates for the **certificate** round.
556        let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
557            format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
558        })?;
559
560        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
561        let certificate_committee_lookback =
562            self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
563                format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
564            })?;
565
566        // Construct a set over the authors who included the leader's certificate in the certificate round.
567        let authors = certificates
568            .values()
569            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
570                true => Some(c.author()),
571                false => None,
572            })
573            .collect();
574        // Check if the leader is ready to be committed.
575        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
576            // If the leader is not ready to be committed, return early.
577            trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
578            return Ok(());
579        }
580
581        if IS_SYNCING {
582            info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
583        } else {
584            info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
585        }
586
587        // Commit the leader certificate, and all previous leader certificates since the last committed round.
588        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
589    }
590
591    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
592    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
593        &self,
594        leader_certificate: BatchCertificate<N>,
595    ) -> Result<()> {
596        #[cfg(debug_assertions)]
597        trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
598
599        // Fetch the leader round.
600        let latest_leader_round = leader_certificate.round();
601        // Determine the list of all previous leader certificates since the last committed round.
602        // The order of the leader certificates is from **newest** to **oldest**.
603        let mut leader_certificates = vec![leader_certificate.clone()];
604        {
605            // Retrieve the leader round.
606            let leader_round = leader_certificate.round();
607
608            let mut current_certificate = leader_certificate;
609            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
610            {
611                // Retrieve the previous committee for the leader round.
612                let previous_committee_lookback =
613                    self.ledger().get_committee_lookback_for_round(round).with_context(|| {
614                        format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
615                    })?;
616
617                // Either retrieve the cached leader or compute it.
618                let leader = match self.ledger().latest_leader() {
619                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
620                    _ => {
621                        // Compute the leader for the commit round.
622                        let computed_leader = previous_committee_lookback
623                            .get_leader(round)
624                            .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
625
626                        // Cache the computed leader.
627                        self.ledger().update_latest_leader(round, computed_leader);
628
629                        computed_leader
630                    }
631                };
632                // Retrieve the previous leader certificate.
633                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
634                else {
635                    continue;
636                };
637                // Determine if there is a path between the previous certificate and the current certificate.
638                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
639                    // Add the previous leader certificate to the list of certificates to commit.
640                    leader_certificates.push(previous_certificate.clone());
641                    // Update the current certificate to the previous leader certificate.
642                    current_certificate = previous_certificate;
643                } else {
644                    #[cfg(debug_assertions)]
645                    trace!(
646                        "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
647                    );
648                }
649            }
650        }
651
652        // Iterate over the leader certificates to commit.
653        for leader_certificate in leader_certificates.into_iter().rev() {
654            // Retrieve the leader certificate round.
655            let leader_round = leader_certificate.round();
656            // Compute the commit subdag.
657            let commit_subdag = self
658                .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
659                .with_context(|| "BFT failed to order the DAG with DFS")?;
660            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
661            if !IS_SYNCING {
662                // Initialize a map for the deduped transmissions.
663                let mut transmissions = IndexMap::new();
664                // Initialize a map for the deduped transaction ids.
665                let mut seen_transaction_ids = IndexSet::new();
666                // Initialize a map for the deduped solution ids.
667                let mut seen_solution_ids = IndexSet::new();
668                // Start from the oldest leader certificate.
669                for certificate in commit_subdag.values().flatten() {
670                    // Retrieve the transmissions.
671                    for transmission_id in certificate.transmission_ids() {
672                        // If the transaction ID or solution ID already exists in the map, skip it.
673                        // Note: This additional check is done to ensure that we do not include duplicate
674                        // transaction IDs or solution IDs that may have a different transmission ID.
675                        match transmission_id {
676                            TransmissionID::Solution(solution_id, _) => {
677                                // If the solution already exists, skip it.
678                                if seen_solution_ids.contains(&solution_id) {
679                                    continue;
680                                }
681                            }
682                            TransmissionID::Transaction(transaction_id, _) => {
683                                // If the transaction already exists, skip it.
684                                if seen_transaction_ids.contains(transaction_id) {
685                                    continue;
686                                }
687                            }
688                            TransmissionID::Ratification => {
689                                bail!("Ratifications are currently not supported in the BFT.")
690                            }
691                        }
692                        // If the transmission already exists in the map, skip it.
693                        if transmissions.contains_key(transmission_id) {
694                            continue;
695                        }
696                        // If the transmission already exists in the ledger, skip it.
697                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
698                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
699                            continue;
700                        }
701                        // Retrieve the transmission.
702                        let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
703                            format!(
704                                "BFT failed to retrieve transmission '{}.{}' from round {}",
705                                fmt_id(transmission_id),
706                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
707                                certificate.round()
708                            )
709                        })?;
710                        // Insert the transaction ID or solution ID into the map.
711                        match transmission_id {
712                            TransmissionID::Solution(id, _) => {
713                                seen_solution_ids.insert(id);
714                            }
715                            TransmissionID::Transaction(id, _) => {
716                                seen_transaction_ids.insert(id);
717                            }
718                            TransmissionID::Ratification => {}
719                        }
720                        // Add the transmission to the set.
721                        transmissions.insert(*transmission_id, transmission);
722                    }
723                }
724                // Trigger consensus, as this will build a new block for the ledger.
725                // Construct the subdag.
726                let subdag = Subdag::from(commit_subdag.clone())?;
727                // Retrieve the anchor round.
728                let anchor_round = subdag.anchor_round();
729                // Retrieve the number of transmissions.
730                let num_transmissions = transmissions.len();
731                // Retrieve metadata about the subdag.
732                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
733
734                // Ensure the subdag anchor round matches the leader round.
735                ensure!(
736                    anchor_round == leader_round,
737                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
738                );
739
740                // Trigger consensus.
741                if let Some(consensus_sender) = self.consensus_sender.get() {
742                    // Initialize a callback sender and receiver.
743                    let (callback_sender, callback_receiver) = oneshot::channel();
744                    // Send the subdag and transmissions to consensus.
745                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
746                    // Await the callback to continue.
747                    match callback_receiver.await {
748                        Ok(Ok(())) => (), // continue
749                        Ok(Err(err)) => {
750                            let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
751                            error!("{}", &flatten_error(err));
752                            return Ok(());
753                        }
754                        Err(err) => {
755                            let err: anyhow::Error = err.into();
756                            let err =
757                                err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
758                            error!("{}", flatten_error(err));
759                            return Ok(());
760                        }
761                    }
762                }
763
764                info!(
765                    "\n\nCommitting a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})\n",
766                );
767            }
768
769            // Update the DAG, as the subdag was successfully included into a block.
770            {
771                let mut dag_write = self.dag.write();
772                let mut count = 0;
773                for certificate in commit_subdag.values().flatten() {
774                    dag_write.commit(certificate, self.storage().max_gc_rounds());
775                    count += 1;
776                }
777
778                trace!("Committed {count} certificates to the DAG");
779            }
780
781            // Update the validator telemetry.
782            #[cfg(feature = "telemetry")]
783            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
784        }
785
786        // Perform garbage collection based on the latest committed leader round.
787        // The protocol guarantees that validators commit the same anchors in the same order,
788        // but they may do so in different chunks of anchors,
789        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
790        // Doing garbage collection at the end of each chunk (as we do here),
791        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
792        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
793        // one validator may have more certificates than the other, not yet garbage collected.
794        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
795        // it excludes certificates that are below the GC round,
796        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
797        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
798        // so long as garbage collection is done after each chunk.
799        // If garbage collection were done after each committed certificate,
800        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
801        self.storage().garbage_collect_certificates(latest_leader_round);
802
803        Ok(())
804    }
805
806    /// Returns the subdag of batch certificates to commit.
807    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
808        &self,
809        leader_certificate: BatchCertificate<N>,
810    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
811        // Initialize a map for the certificates to commit.
812        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
813        // Initialize a set for the already ordered certificates.
814        let mut already_ordered = HashSet::new();
815        // Initialize a buffer for the certificates to order.
816        let mut buffer = vec![leader_certificate];
817        // Iterate over the certificates to order.
818        while let Some(certificate) = buffer.pop() {
819            // Insert the certificate into the map.
820            commit.entry(certificate.round()).or_default().insert(certificate.clone());
821
822            // Check if the previous certificate is below the GC round.
823            // This is currently a critical check to prevent forking,
824            // as explained in the comment at the end of `commit_leader_certificate()`,
825            // just before the call to garbage collection.
826            let previous_round = certificate.round().saturating_sub(1);
827            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
828                continue;
829            }
830            // Iterate over the previous certificate IDs.
831            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
832            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
833            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
834                // If the previous certificate is already ordered, continue.
835                if already_ordered.contains(previous_certificate_id) {
836                    continue;
837                }
838                // If the previous certificate was recently committed, continue.
839                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
840                    continue;
841                }
842                // If the previous certificate already exists in the ledger, continue.
843                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
844                    continue;
845                }
846
847                // Retrieve the previous certificate.
848                let previous_certificate = {
849                    // Start by retrieving the previous certificate from the DAG.
850                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
851                        // If the previous certificate is found, return it.
852                        Some(previous_certificate) => previous_certificate,
853                        // If the previous certificate is not found, retrieve it from the storage.
854                        None => match self.storage().get_certificate(*previous_certificate_id) {
855                            // If the previous certificate is found, return it.
856                            Some(previous_certificate) => previous_certificate,
857                            // Otherwise, the previous certificate is missing, and throw an error.
858                            None => bail!(
859                                "Missing previous certificate {} for round {previous_round}",
860                                fmt_id(previous_certificate_id)
861                            ),
862                        },
863                    }
864                };
865                // Insert the previous certificate into the set of already ordered certificates.
866                already_ordered.insert(previous_certificate.id());
867                // Insert the previous certificate into the buffer.
868                buffer.push(previous_certificate);
869            }
870        }
871        // Ensure we only retain certificates that are above the GC round.
872        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
873        // Return the certificates to commit.
874        Ok(commit)
875    }
876
877    /// Returns `true` if there is a path from the previous certificate to the current certificate.
878    fn is_linked(
879        &self,
880        previous_certificate: BatchCertificate<N>,
881        current_certificate: BatchCertificate<N>,
882    ) -> Result<bool> {
883        // Initialize the list containing the traversal.
884        let mut traversal = vec![current_certificate.clone()];
885        // Iterate over the rounds from the current certificate to the previous certificate.
886        for round in (previous_certificate.round()..current_certificate.round()).rev() {
887            // Retrieve all of the certificates for this past round.
888            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
889                // This is a critical error, as the traversal should have these certificates.
890                // If this error is hit, it is likely that the maximum GC rounds should be increased.
891                bail!("BFT failed to retrieve the certificates for past round {round}");
892            };
893            // Filter the certificates to only include those that are in the traversal.
894            traversal = certificates
895                .into_values()
896                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
897                .collect();
898        }
899        Ok(traversal.contains(&previous_certificate))
900    }
901}
902
903impl<N: Network> BFT<N> {
904    /// Starts the BFT handlers.
905    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
906        let BFTReceiver {
907            mut rx_primary_round,
908            mut rx_primary_certificate,
909            mut rx_sync_bft_dag_at_bootup,
910            mut rx_sync_bft,
911            mut rx_sync_block_committed,
912        } = bft_receiver;
913
914        // Process the current round from the primary.
915        let self_ = self.clone();
916        self.spawn(async move {
917            while let Some((current_round, callback)) = rx_primary_round.recv().await {
918                callback.send(self_.update_to_next_round(current_round)).ok();
919            }
920        });
921
922        // Process the certificate from the primary.
923        let self_ = self.clone();
924        self.spawn(async move {
925            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
926                // Update the DAG with the certificate.
927                let result = self_.update_dag::<true, false>(certificate).await;
928                // Send the callback **after** updating the DAG.
929                // Note: We must await the DAG update before proceeding.
930                callback.send(result).ok();
931            }
932        });
933
934        // Process the request to sync the BFT DAG at bootup.
935        let self_ = self.clone();
936        self.spawn(async move {
937            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
938                self_.sync_bft_dag_at_bootup(certificates).await;
939            }
940        });
941
942        // Handler for new certificates that were fetched by the sync module.
943        let self_ = self.clone();
944        self.spawn(async move {
945            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
946                // Update the DAG with the certificate.
947                let result = self_.update_dag::<true, true>(certificate).await;
948                // Send the callback **after** updating the DAG.
949                // Note: We must await the DAG update before proceeding.
950                callback.send(result).ok();
951            }
952        });
953
954        // Handler for new blocks that were synced wit BFT.
955        //
956        // Note: This is just a workaround until other sync changes are merged.
957        // BFT sync logic should always use DAG commits within GC, but uses pending blocks in rare cases.
958        // This sender ensures that the DAG is still updated accordingly if that happens.
959        let self_ = self.clone();
960        self.spawn(async move {
961            while let Some((leader_certificate, callback)) = rx_sync_block_committed.recv().await {
962                self_.dag.write().commit(&leader_certificate, self_.storage().max_gc_rounds());
963                callback.send(Ok(())).ok();
964            }
965        });
966    }
967
968    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
969    /// already exist in the ledger.
970    ///
971    /// This method commits all the certificates into the DAG.
972    /// Note that there is no need to insert the certificates into the DAG, because these certificates
973    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
974    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
975        // Acquire the BFT write lock.
976        let mut dag = self.dag.write();
977
978        // Commit all the certificates.
979        for certificate in certificates {
980            dag.commit(&certificate, self.storage().max_gc_rounds());
981        }
982    }
983
984    /// Spawns a task with the given future; it should only be used for long-running tasks.
985    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
986        self.handles.lock().push(tokio::spawn(future));
987    }
988
989    /// Shuts down the BFT.
990    pub async fn shut_down(&self) {
991        info!("Shutting down the BFT...");
992        // Acquire the lock.
993        let _lock = self.lock.lock().await;
994        // Shut down the primary.
995        self.primary.shut_down().await;
996        // Abort the tasks.
997        self.handles.lock().iter().for_each(|handle| handle.abort());
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use crate::{
1004        BFT,
1005        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
1006        helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
1007    };
1008
1009    use snarkos_account::Account;
1010    use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
1011    use snarkos_node_bft_storage_service::BFTMemoryService;
1012    use snarkos_node_sync::BlockSync;
1013    use snarkos_utilities::NodeDataDir;
1014
1015    use snarkvm::{
1016        console::account::{Address, PrivateKey},
1017        ledger::{
1018            committee::{
1019                Committee,
1020                test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
1021            },
1022            narwhal::{
1023                BatchCertificate,
1024                batch_certificate::test_helpers::{
1025                    sample_batch_certificate,
1026                    sample_batch_certificate_for_round,
1027                    sample_batch_certificate_for_round_with_committee,
1028                },
1029            },
1030        },
1031        utilities::TestRng,
1032    };
1033
1034    use anyhow::Result;
1035    use indexmap::{IndexMap, IndexSet};
1036    use std::sync::Arc;
1037
1038    type CurrentNetwork = snarkvm::console::network::MainnetV0;
1039
1040    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
1041    fn sample_test_instance(
1042        committee_round: Option<u64>,
1043        max_gc_rounds: u64,
1044        rng: &mut TestRng,
1045    ) -> (
1046        Committee<CurrentNetwork>,
1047        Account<CurrentNetwork>,
1048        Arc<MockLedgerService<CurrentNetwork>>,
1049        Storage<CurrentNetwork>,
1050    ) {
1051        let committee = match committee_round {
1052            Some(round) => sample_committee_for_round(round, rng),
1053            None => sample_committee(rng),
1054        };
1055        let account = Account::new(rng).unwrap();
1056        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1057        let transmissions = Arc::new(BFTMemoryService::new());
1058        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1059
1060        (committee, account, ledger, storage)
1061    }
1062
1063    // Helper function to set up BFT for testing.
1064    fn initialize_bft(
1065        account: Account<CurrentNetwork>,
1066        storage: Storage<CurrentNetwork>,
1067        ledger: Arc<MockLedgerService<CurrentNetwork>>,
1068    ) -> anyhow::Result<BFT<CurrentNetwork>> {
1069        // Create the block synchronization logic.
1070        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1071        // Initialize the BFT.
1072        BFT::new(
1073            account.clone(),
1074            storage.clone(),
1075            ledger.clone(),
1076            block_sync,
1077            None,
1078            &[],
1079            false,
1080            NodeDataDir::new_test(None),
1081            None,
1082        )
1083    }
1084
1085    #[test]
1086    #[tracing_test::traced_test]
1087    fn test_is_leader_quorum_odd() -> Result<()> {
1088        let rng = &mut TestRng::default();
1089
1090        // Sample batch certificates.
1091        let mut certificates = IndexSet::new();
1092        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1093        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1094        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1095        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1096
1097        // Initialize the committee.
1098        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1099            1,
1100            vec![
1101                certificates[0].author(),
1102                certificates[1].author(),
1103                certificates[2].author(),
1104                certificates[3].author(),
1105            ],
1106            rng,
1107        );
1108
1109        // Initialize the ledger.
1110        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1111        // Initialize the storage.
1112        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1113        // Initialize the account.
1114        let account = Account::new(rng)?;
1115        // Initialize the BFT.
1116        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1117        assert!(bft.is_timer_expired());
1118        // Ensure this call succeeds on an odd round.
1119        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1120        // If timer has expired but quorum threshold is not reached, return 'false'.
1121        assert!(!result);
1122        // Insert certificates into storage.
1123        for certificate in certificates.iter() {
1124            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1125        }
1126        // Ensure this call succeeds on an odd round.
1127        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1128        assert!(result); // no previous leader certificate
1129        // Set the leader certificate.
1130        let leader_certificate = sample_batch_certificate(rng);
1131        *bft.leader_certificate.write() = Some(leader_certificate);
1132        // Ensure this call succeeds on an odd round.
1133        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1134        assert!(result); // should now fall through to the end of function
1135
1136        Ok(())
1137    }
1138
1139    #[test]
1140    #[tracing_test::traced_test]
1141    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1142        let rng = &mut TestRng::default();
1143
1144        // Sample the test instance.
1145        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1146        assert_eq!(committee.starting_round(), 1);
1147        assert_eq!(storage.current_round(), 1);
1148        assert_eq!(storage.max_gc_rounds(), 10);
1149
1150        // Set up the BFT logic.
1151        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1152        assert!(bft.is_timer_expired());
1153
1154        // Store is at round 1, and we are checking for round 2.
1155        // Ensure this call fails on an even round.
1156        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1157        assert!(!result);
1158        Ok(())
1159    }
1160
1161    #[test]
1162    #[tracing_test::traced_test]
1163    fn test_is_leader_quorum_even() -> Result<()> {
1164        let rng = &mut TestRng::default();
1165
1166        // Sample the test instance.
1167        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1168        assert_eq!(committee.starting_round(), 2);
1169        assert_eq!(storage.current_round(), 2);
1170        assert_eq!(storage.max_gc_rounds(), 10);
1171
1172        // Set up the BFT logic.
1173        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1174        assert!(bft.is_timer_expired());
1175
1176        // Ensure this call fails on an even round.
1177        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1178        assert!(!result);
1179        Ok(())
1180    }
1181
1182    #[test]
1183    #[tracing_test::traced_test]
1184    fn test_is_even_round_ready() -> Result<()> {
1185        let rng = &mut TestRng::default();
1186
1187        // Sample batch certificates.
1188        let mut certificates = IndexSet::new();
1189        certificates.insert(sample_batch_certificate_for_round(2, rng));
1190        certificates.insert(sample_batch_certificate_for_round(2, rng));
1191        certificates.insert(sample_batch_certificate_for_round(2, rng));
1192        certificates.insert(sample_batch_certificate_for_round(2, rng));
1193
1194        // Initialize the committee.
1195        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1196            2,
1197            vec![
1198                certificates[0].author(),
1199                certificates[1].author(),
1200                certificates[2].author(),
1201                certificates[3].author(),
1202            ],
1203            rng,
1204        );
1205
1206        // Initialize the ledger.
1207        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1208        // Initialize the storage.
1209        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1210        // Initialize the account.
1211        let account = Account::new(rng)?;
1212
1213        // Set up the BFT logic.
1214        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1215        assert!(bft.is_timer_expired());
1216
1217        // Set the leader certificate.
1218        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1219        *bft.leader_certificate.write() = Some(leader_certificate);
1220        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1221        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1222        assert!(!result);
1223        // Once quorum threshold is reached, we are ready for the next round.
1224        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1225        assert!(result);
1226
1227        // Initialize a new BFT.
1228        let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1229        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1230        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1231        if !bft_timer.is_timer_expired() {
1232            assert!(!result);
1233        }
1234        // Wait for the timer to expire.
1235        let leader_certificate_timeout =
1236            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1237        std::thread::sleep(leader_certificate_timeout);
1238        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1239        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1240        if bft_timer.is_timer_expired() {
1241            assert!(result);
1242        } else {
1243            assert!(!result);
1244        }
1245
1246        Ok(())
1247    }
1248
1249    #[test]
1250    #[tracing_test::traced_test]
1251    fn test_update_leader_certificate_odd() -> Result<()> {
1252        let rng = &mut TestRng::default();
1253
1254        // Sample the test instance.
1255        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1256        assert_eq!(storage.max_gc_rounds(), 10);
1257
1258        // Initialize the BFT.
1259        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1260        assert!(bft.is_timer_expired());
1261
1262        // Ensure this call fails on an odd round.
1263        let result = bft.update_leader_certificate_to_even_round(1);
1264        assert!(!result);
1265        Ok(())
1266    }
1267
1268    #[test]
1269    #[tracing_test::traced_test]
1270    fn test_update_leader_certificate_bad_round() -> Result<()> {
1271        let rng = &mut TestRng::default();
1272
1273        // Sample the test instance.
1274        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1275        assert_eq!(storage.max_gc_rounds(), 10);
1276
1277        // Initialize the BFT.
1278        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1279
1280        // Ensure this call succeeds on an even round.
1281        let result = bft.update_leader_certificate_to_even_round(6);
1282        assert!(!result);
1283        Ok(())
1284    }
1285
1286    #[test]
1287    #[tracing_test::traced_test]
1288    fn test_update_leader_certificate_even() -> Result<()> {
1289        let rng = &mut TestRng::default();
1290
1291        // Set the current round.
1292        let current_round = 3;
1293
1294        // Sample the certificates.
1295        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1296            current_round,
1297            rng,
1298        );
1299
1300        // Initialize the committee.
1301        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1302            2,
1303            vec![
1304                certificates[0].author(),
1305                certificates[1].author(),
1306                certificates[2].author(),
1307                certificates[3].author(),
1308            ],
1309            rng,
1310        );
1311
1312        // Initialize the ledger.
1313        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1314
1315        // Initialize the storage.
1316        let transmissions = Arc::new(BFTMemoryService::new());
1317        let storage = Storage::new(ledger.clone(), transmissions, 10);
1318        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1319        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1320        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1321        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1322        assert_eq!(storage.current_round(), 2);
1323
1324        // Retrieve the leader certificate.
1325        let leader = committee.get_leader(2).unwrap();
1326        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1327
1328        // Initialize the BFT.
1329        let account = Account::new(rng)?;
1330        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1331
1332        // Set the leader certificate.
1333        *bft.leader_certificate.write() = Some(leader_certificate);
1334
1335        // Update the leader certificate.
1336        // Ensure this call succeeds on an even round.
1337        let result = bft.update_leader_certificate_to_even_round(2);
1338        assert!(result);
1339
1340        Ok(())
1341    }
1342
1343    #[tokio::test]
1344    #[tracing_test::traced_test]
1345    async fn test_order_dag_with_dfs() -> Result<()> {
1346        let rng = &mut TestRng::default();
1347
1348        // Sample the test instance.
1349        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1350
1351        // Initialize the round parameters.
1352        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1353        let current_round = previous_round + 1;
1354
1355        // Sample the current certificate and previous certificates.
1356        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1357            current_round,
1358            rng,
1359        );
1360
1361        /* Test GC */
1362
1363        // Ensure the function succeeds in returning only certificates above GC.
1364        {
1365            // Initialize the storage.
1366            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1367            // Initialize the BFT.
1368            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1369
1370            // Insert a mock DAG in the BFT.
1371            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1372
1373            // Insert the previous certificates into the BFT.
1374            for certificate in previous_certificates.clone() {
1375                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1376            }
1377
1378            // Ensure this call succeeds and returns all given certificates.
1379            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1380            assert!(result.is_ok());
1381            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1382            assert_eq!(candidate_certificates.len(), 1);
1383            let expected_certificates = vec![certificate.clone()];
1384            assert_eq!(
1385                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1386                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1387            );
1388            assert_eq!(candidate_certificates, expected_certificates);
1389        }
1390
1391        /* Test normal case */
1392
1393        // Ensure the function succeeds in returning all given certificates.
1394        {
1395            // Initialize the storage.
1396            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1397            // Initialize the BFT.
1398            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1399
1400            // Insert a mock DAG in the BFT.
1401            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1402
1403            // Insert the previous certificates into the BFT.
1404            for certificate in previous_certificates.clone() {
1405                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1406            }
1407
1408            // Ensure this call succeeds and returns all given certificates.
1409            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1410            assert!(result.is_ok());
1411            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1412            assert_eq!(candidate_certificates.len(), 5);
1413            let expected_certificates = vec![
1414                previous_certificates[0].clone(),
1415                previous_certificates[1].clone(),
1416                previous_certificates[2].clone(),
1417                previous_certificates[3].clone(),
1418                certificate,
1419            ];
1420            assert_eq!(
1421                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1422                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1423            );
1424            assert_eq!(candidate_certificates, expected_certificates);
1425        }
1426
1427        Ok(())
1428    }
1429
1430    #[test]
1431    #[tracing_test::traced_test]
1432    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1433        let rng = &mut TestRng::default();
1434
1435        // Sample the test instance.
1436        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1437        assert_eq!(committee.starting_round(), 1);
1438        assert_eq!(storage.current_round(), 1);
1439        assert_eq!(storage.max_gc_rounds(), 1);
1440
1441        // Initialize the round parameters.
1442        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1443        let current_round = previous_round + 1;
1444
1445        // Sample the current certificate and previous certificates.
1446        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1447            current_round,
1448            rng,
1449        );
1450        // Construct the previous certificate IDs.
1451        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1452
1453        /* Test missing previous certificate. */
1454
1455        // Initialize the BFT.
1456        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1457
1458        // The expected error message.
1459        let error_msg = format!(
1460            "Missing previous certificate {} for round {previous_round}",
1461            crate::helpers::fmt_id(previous_certificate_ids[3]),
1462        );
1463
1464        // Ensure this call fails on a missing previous certificate.
1465        let result = bft.order_dag_with_dfs::<false>(certificate);
1466        assert!(result.is_err());
1467        assert_eq!(result.unwrap_err().to_string(), error_msg);
1468        Ok(())
1469    }
1470
1471    #[tokio::test]
1472    async fn test_bft_gc_on_commit() -> Result<()> {
1473        let rng = &mut TestRng::default();
1474
1475        // Initialize the round parameters.
1476        let max_gc_rounds = 1;
1477        let committee_round = 0;
1478        let commit_round = 2;
1479        let current_round = commit_round + 1;
1480
1481        // Sample the certificates.
1482        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1483            current_round,
1484            rng,
1485        );
1486
1487        // Initialize the committee.
1488        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1489            committee_round,
1490            vec![
1491                certificates[0].author(),
1492                certificates[1].author(),
1493                certificates[2].author(),
1494                certificates[3].author(),
1495            ],
1496            rng,
1497        );
1498
1499        // Initialize the ledger.
1500        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1501
1502        // Initialize the storage.
1503        let transmissions = Arc::new(BFTMemoryService::new());
1504        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1505        // Insert the certificates into the storage.
1506        for certificate in certificates.iter() {
1507            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1508        }
1509
1510        // Get the leader certificate.
1511        let leader = committee.get_leader(commit_round).unwrap();
1512        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1513
1514        // Initialize the BFT.
1515        let account = Account::new(rng)?;
1516        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1517
1518        // Create an empty mock DAG with last committed round set to `commit_round`.
1519        *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1520
1521        // Ensure that the `gc_round` has not been updated yet.
1522        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1523
1524        // Insert the certificates into the BFT.
1525        for certificate in certificates {
1526            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1527        }
1528
1529        // Commit the leader certificate.
1530        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1531
1532        // Ensure that the `gc_round` has been updated.
1533        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1534
1535        Ok(())
1536    }
1537
1538    #[tokio::test]
1539    #[tracing_test::traced_test]
1540    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1541        let rng = &mut TestRng::default();
1542
1543        // Initialize the round parameters.
1544        let max_gc_rounds = 1;
1545        let committee_round = 0;
1546        let commit_round = 2;
1547        let current_round = commit_round + 1;
1548
1549        // Sample the current certificate and previous certificates.
1550        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1551            current_round,
1552            rng,
1553        );
1554
1555        // Initialize the committee.
1556        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1557            committee_round,
1558            vec![
1559                certificates[0].author(),
1560                certificates[1].author(),
1561                certificates[2].author(),
1562                certificates[3].author(),
1563            ],
1564            rng,
1565        );
1566
1567        // Initialize the ledger.
1568        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1569
1570        // Initialize the storage.
1571        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1572        // Insert the certificates into the storage.
1573        for certificate in certificates.iter() {
1574            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1575        }
1576
1577        // Get the leader certificate.
1578        let leader = committee.get_leader(commit_round).unwrap();
1579        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1580
1581        // Initialize the BFT.
1582        let account = Account::new(rng)?;
1583        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1584
1585        // Insert a mock DAG in the BFT.
1586        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1587
1588        // Insert the previous certificates into the BFT.
1589        for certificate in certificates.clone() {
1590            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1591        }
1592
1593        // Commit the leader certificate.
1594        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1595
1596        // Simulate a bootup of the BFT.
1597
1598        // Initialize a new instance of storage.
1599        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1600        // Initialize a new instance of BFT.
1601        let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1602
1603        // Sync the BFT DAG at bootup.
1604        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1605
1606        // Check that the BFT starts from the same last committed round.
1607        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1608
1609        // Ensure that both BFTs have committed the leader certificate.
1610        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1611        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1612
1613        // Check the state of the bootup BFT.
1614        for certificate in certificates {
1615            let certificate_round = certificate.round();
1616            let certificate_id = certificate.id();
1617            // Check that the bootup BFT has committed the certificates.
1618            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1619            // Check that the bootup BFT does not contain the certificates in its graph, because
1620            // it should not need to order them again in subsequent subdags.
1621            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1622        }
1623
1624        Ok(())
1625    }
1626
1627    #[tokio::test]
1628    #[tracing_test::traced_test]
1629    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1630        /*
1631        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1632        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1633        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1634        */
1635
1636        let rng = &mut TestRng::default();
1637
1638        // Initialize the round parameters.
1639        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1640        let committee_round = 0;
1641        let commit_round = 2;
1642        let current_round = commit_round + 1;
1643        let next_round = current_round + 1;
1644
1645        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1646        let (round_to_certificates_map, committee) = {
1647            let private_keys = vec![
1648                PrivateKey::new(rng).unwrap(),
1649                PrivateKey::new(rng).unwrap(),
1650                PrivateKey::new(rng).unwrap(),
1651                PrivateKey::new(rng).unwrap(),
1652            ];
1653            let addresses = vec![
1654                Address::try_from(private_keys[0])?,
1655                Address::try_from(private_keys[1])?,
1656                Address::try_from(private_keys[2])?,
1657                Address::try_from(private_keys[3])?,
1658            ];
1659            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1660                committee_round,
1661                addresses,
1662                rng,
1663            );
1664            // Initialize a mapping from the round number to the set of batch certificates in the round.
1665            let mut round_to_certificates_map: IndexMap<
1666                u64,
1667                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1668            > = IndexMap::new();
1669            let mut previous_certificates = IndexSet::with_capacity(4);
1670            // Initialize the genesis batch certificates.
1671            for _ in 0..4 {
1672                previous_certificates.insert(sample_batch_certificate(rng));
1673            }
1674            for round in 0..commit_round + 3 {
1675                let mut current_certificates = IndexSet::new();
1676                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1677                    IndexSet::new()
1678                } else {
1679                    previous_certificates.iter().map(|c| c.id()).collect()
1680                };
1681                let transmission_ids =
1682                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1683                        .into_iter()
1684                        .collect::<IndexSet<_>>();
1685                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1686                let committee_id = committee.id();
1687                for (i, private_key_1) in private_keys.iter().enumerate() {
1688                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1689                        private_key_1,
1690                        round,
1691                        timestamp,
1692                        committee_id,
1693                        transmission_ids.clone(),
1694                        previous_certificate_ids.clone(),
1695                        rng,
1696                    )
1697                    .unwrap();
1698                    let mut signatures = IndexSet::with_capacity(4);
1699                    for (j, private_key_2) in private_keys.iter().enumerate() {
1700                        if i != j {
1701                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1702                        }
1703                    }
1704                    let certificate =
1705                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1706                    current_certificates.insert(certificate);
1707                }
1708                // Update the mapping.
1709                round_to_certificates_map.insert(round, current_certificates.clone());
1710                previous_certificates = current_certificates.clone();
1711            }
1712            (round_to_certificates_map, committee)
1713        };
1714
1715        // Initialize the ledger.
1716        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1717        // Initialize the storage.
1718        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1719        // Get the leaders for the next 2 commit rounds.
1720        let leader = committee.get_leader(commit_round).unwrap();
1721        let next_leader = committee.get_leader(next_round).unwrap();
1722        // Insert the pre shutdown certificates into the storage.
1723        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1724        for i in 1..=commit_round {
1725            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1726            if i == commit_round {
1727                // Only insert the leader certificate for the commit round.
1728                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1729                if let Some(c) = leader_certificate {
1730                    pre_shutdown_certificates.push(c.clone());
1731                }
1732                continue;
1733            }
1734            pre_shutdown_certificates.extend(certificates);
1735        }
1736        for certificate in pre_shutdown_certificates.iter() {
1737            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1738        }
1739        // Insert the post shutdown certificates into the storage.
1740        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1741            Vec::new();
1742        for j in commit_round..=commit_round + 2 {
1743            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1744            post_shutdown_certificates.extend(certificate);
1745        }
1746        for certificate in post_shutdown_certificates.iter() {
1747            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1748        }
1749        // Get the leader certificates.
1750        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1751        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1752
1753        // Initialize the BFT without bootup.
1754        let account = Account::new(rng)?;
1755        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1756
1757        // Insert a mock DAG in the BFT without bootup.
1758        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1759
1760        // Insert the certificates into the BFT without bootup.
1761        for certificate in pre_shutdown_certificates.clone() {
1762            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1763        }
1764
1765        // Insert the post shutdown certificates into the BFT without bootup.
1766        for certificate in post_shutdown_certificates.clone() {
1767            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1768        }
1769        // Commit the second leader certificate.
1770        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1771        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1772        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1773
1774        // Simulate a bootup of the BFT.
1775
1776        // Initialize a new instance of storage.
1777        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1778
1779        // Initialize a new instance of BFT with bootup.
1780        let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1781
1782        // Sync the BFT DAG at bootup.
1783        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1784
1785        // Insert the post shutdown certificates to the storage and BFT with bootup.
1786        for certificate in post_shutdown_certificates.iter() {
1787            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1788        }
1789        for certificate in post_shutdown_certificates.clone() {
1790            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1791        }
1792        // Commit the second leader certificate.
1793        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1794        let commit_subdag_metadata_bootup =
1795            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1796        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1797        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1798
1799        // Check that the final state of both BFTs is the same.
1800
1801        // Check that both BFTs start from the same last committed round.
1802        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1803
1804        // Ensure that both BFTs have committed the leader certificates.
1805        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1806        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1807        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1808        assert!(
1809            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1810        );
1811
1812        // Check that the bootup BFT has committed the pre shutdown certificates.
1813        for certificate in pre_shutdown_certificates.clone() {
1814            let certificate_round = certificate.round();
1815            let certificate_id = certificate.id();
1816            // Check that both BFTs have committed the certificates.
1817            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1818            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1819            // Check that the bootup BFT does not contain the certificates in its graph, because
1820            // it should not need to order them again in subsequent subdags.
1821            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1822            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1823        }
1824
1825        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1826        for certificate in committed_certificates_bootup.clone() {
1827            let certificate_round = certificate.round();
1828            let certificate_id = certificate.id();
1829            // Check that the both BFTs have committed the certificates.
1830            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1831            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1832            // Check that the bootup BFT does not contain the certificates in its graph, because
1833            // it should not need to order them again in subsequent subdags.
1834            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1835            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1836        }
1837
1838        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1839        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1840
1841        Ok(())
1842    }
1843
1844    #[tokio::test]
1845    #[tracing_test::traced_test]
1846    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1847        /*
1848        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1849        2. Add post shutdown certificates to the bootup BFT.
1850        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1851        */
1852
1853        let rng = &mut TestRng::default();
1854
1855        // Initialize the round parameters.
1856        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1857        let committee_round = 0;
1858        let commit_round = 2;
1859        let current_round = commit_round + 1;
1860        let next_round = current_round + 1;
1861
1862        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1863        let (round_to_certificates_map, committee) = {
1864            let private_keys = vec![
1865                PrivateKey::new(rng).unwrap(),
1866                PrivateKey::new(rng).unwrap(),
1867                PrivateKey::new(rng).unwrap(),
1868                PrivateKey::new(rng).unwrap(),
1869            ];
1870            let addresses = vec![
1871                Address::try_from(private_keys[0])?,
1872                Address::try_from(private_keys[1])?,
1873                Address::try_from(private_keys[2])?,
1874                Address::try_from(private_keys[3])?,
1875            ];
1876            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1877                committee_round,
1878                addresses,
1879                rng,
1880            );
1881            // Initialize a mapping from the round number to the set of batch certificates in the round.
1882            let mut round_to_certificates_map: IndexMap<
1883                u64,
1884                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1885            > = IndexMap::new();
1886            let mut previous_certificates = IndexSet::with_capacity(4);
1887            // Initialize the genesis batch certificates.
1888            for _ in 0..4 {
1889                previous_certificates.insert(sample_batch_certificate(rng));
1890            }
1891            for round in 0..=commit_round + 2 {
1892                let mut current_certificates = IndexSet::new();
1893                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1894                    IndexSet::new()
1895                } else {
1896                    previous_certificates.iter().map(|c| c.id()).collect()
1897                };
1898                let transmission_ids =
1899                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1900                        .into_iter()
1901                        .collect::<IndexSet<_>>();
1902                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1903                let committee_id = committee.id();
1904                for (i, private_key_1) in private_keys.iter().enumerate() {
1905                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1906                        private_key_1,
1907                        round,
1908                        timestamp,
1909                        committee_id,
1910                        transmission_ids.clone(),
1911                        previous_certificate_ids.clone(),
1912                        rng,
1913                    )
1914                    .unwrap();
1915                    let mut signatures = IndexSet::with_capacity(4);
1916                    for (j, private_key_2) in private_keys.iter().enumerate() {
1917                        if i != j {
1918                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1919                        }
1920                    }
1921                    let certificate =
1922                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1923                    current_certificates.insert(certificate);
1924                }
1925                // Update the mapping.
1926                round_to_certificates_map.insert(round, current_certificates.clone());
1927                previous_certificates = current_certificates.clone();
1928            }
1929            (round_to_certificates_map, committee)
1930        };
1931
1932        // Initialize the ledger.
1933        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1934        // Initialize the storage.
1935        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1936        // Get the leaders for the next 2 commit rounds.
1937        let leader = committee.get_leader(commit_round).unwrap();
1938        let next_leader = committee.get_leader(next_round).unwrap();
1939        // Insert the pre shutdown certificates into the storage.
1940        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1941        for i in 1..=commit_round {
1942            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1943            if i == commit_round {
1944                // Only insert the leader certificate for the commit round.
1945                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1946                if let Some(c) = leader_certificate {
1947                    pre_shutdown_certificates.push(c.clone());
1948                }
1949                continue;
1950            }
1951            pre_shutdown_certificates.extend(certificates);
1952        }
1953        for certificate in pre_shutdown_certificates.iter() {
1954            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1955        }
1956        // Initialize the bootup BFT.
1957        let account = Account::new(rng)?;
1958        let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1959
1960        // Insert a mock DAG in the BFT without bootup.
1961        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1962        // Sync the BFT DAG at bootup.
1963        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1964
1965        // Insert the post shutdown certificates into the storage.
1966        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1967            Vec::new();
1968        for j in commit_round..=commit_round + 2 {
1969            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1970            post_shutdown_certificates.extend(certificate);
1971        }
1972        for certificate in post_shutdown_certificates.iter() {
1973            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1974        }
1975
1976        // Insert the post shutdown certificates into the DAG.
1977        for certificate in post_shutdown_certificates.clone() {
1978            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1979        }
1980
1981        // Get the next leader certificate to commit.
1982        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1983        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1984        let committed_certificates = commit_subdag.values().flatten();
1985
1986        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1987        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1988            for committed_certificate in committed_certificates.clone() {
1989                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1990            }
1991        }
1992        Ok(())
1993    }
1994
1995    /// Tests that a leader certificate can be committed by sufficient endorsements in a succeeding leader certificate.
1996    #[test_log::test(tokio::test)]
1997    async fn test_commit_via_is_linked() {
1998        let rng = &mut TestRng::default();
1999
2000        let committee_round = 0;
2001        let leader_round_1 = 2;
2002        let leader_round_2 = 4; // subsequent even round
2003        let max_gc_rounds = 50;
2004
2005        // Create a committee with four members.
2006        let num_authors = 4;
2007        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2008        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2009
2010        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2011        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2012        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2013        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2014
2015        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2016
2017        // Round 1
2018        let round1_certs: IndexSet<_> = (0..num_authors)
2019            .map(|idx| {
2020                let author = &private_keys[idx];
2021                let endorsements: Vec<_> = private_keys
2022                    .iter()
2023                    .enumerate()
2024                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2025                    .collect();
2026
2027                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2028            })
2029            .collect();
2030        certificates_by_round.insert(1, round1_certs.clone());
2031
2032        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2033        let mut leader1_certificate = None;
2034
2035        let round2_certs: IndexSet<_> = (0..num_authors)
2036            .map(|idx| {
2037                let author = &private_keys[idx];
2038                let endorsements: Vec<_> = private_keys
2039                    .iter()
2040                    .enumerate()
2041                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2042                    .collect();
2043                let cert = sample_batch_certificate_for_round_with_committee(
2044                    leader_round_1,
2045                    round1_certs.iter().map(|c| c.id()).collect(),
2046                    author,
2047                    &endorsements[..],
2048                    rng,
2049                );
2050
2051                if cert.author() == leader1 {
2052                    leader1_certificate = Some(cert.clone());
2053                }
2054                cert
2055            })
2056            .collect();
2057        certificates_by_round.insert(leader_round_1, round2_certs.clone());
2058
2059        let round3_certs: IndexSet<_> = (0..num_authors)
2060            .map(|idx| {
2061                let author = &private_keys[idx];
2062                let endorsements: Vec<_> = private_keys
2063                    .iter()
2064                    .enumerate()
2065                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2066                    .collect();
2067
2068                let previous_certificate_ids: IndexSet<_> = round2_certs
2069                    .iter()
2070                    .filter_map(|cert| {
2071                        // Only have the leader endorse the previous round's leader certificate.
2072                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2073                    })
2074                    .collect();
2075
2076                sample_batch_certificate_for_round_with_committee(
2077                    leader_round_1 + 1,
2078                    previous_certificate_ids,
2079                    author,
2080                    &endorsements[..],
2081                    rng,
2082                )
2083            })
2084            .collect();
2085        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2086
2087        // Ensure the first leader's certificate is not committed yet.
2088        let leader_certificate_1 = leader1_certificate.unwrap();
2089        assert!(
2090            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2091            "Leader certificate 1 should not be committed yet"
2092        );
2093        assert_eq!(bft.dag.read().last_committed_round(), 0);
2094
2095        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2096        let round4_certs: IndexSet<_> = (0..num_authors)
2097            .map(|idx| {
2098                let endorsements: Vec<_> = private_keys
2099                    .iter()
2100                    .enumerate()
2101                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2102                    .collect();
2103
2104                sample_batch_certificate_for_round_with_committee(
2105                    leader_round_2,
2106                    round3_certs.iter().map(|c| c.id()).collect(),
2107                    &private_keys[idx],
2108                    &endorsements[..],
2109                    rng,
2110                )
2111            })
2112            .collect();
2113        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2114
2115        // Insert all certificates into the storage and DAG.
2116        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2117            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2118            bft.update_dag::<false, false>(certificate).await.unwrap();
2119        }
2120
2121        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2122
2123        assert!(
2124            bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2125            "Leader certificate 1 should be linked to leader certificate 2"
2126        );
2127
2128        // Explicitely commit leader certificate 2.
2129        bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2130
2131        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2132        assert!(
2133            bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2134            "Leader certificate for round 2 should be committed when committing at round 4"
2135        );
2136
2137        // Leader certificate 2 should be committed as the above call was successful.
2138        assert!(
2139            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2140            "Leader certificate for round 4 should be committed"
2141        );
2142
2143        assert_eq!(bft.dag.read().last_committed_round(), 4);
2144    }
2145
2146    #[test_log::test(tokio::test)]
2147    async fn test_commit_via_is_linked_with_skipped_anchor() {
2148        let rng = &mut TestRng::default();
2149
2150        let committee_round = 0;
2151        let leader_round_1 = 2;
2152        let leader_round_2 = 4;
2153        let max_gc_rounds = 50;
2154
2155        let num_authors = 4;
2156        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2157        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2158
2159        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2160        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2161        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2162        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2163
2164        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2165
2166        // Round 1
2167        let round1_certs: IndexSet<_> = (0..num_authors)
2168            .map(|idx| {
2169                let author = &private_keys[idx];
2170                let endorsements: Vec<_> = private_keys
2171                    .iter()
2172                    .enumerate()
2173                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2174                    .collect();
2175
2176                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2177            })
2178            .collect();
2179        certificates_by_round.insert(1, round1_certs.clone());
2180
2181        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2182        let mut leader1_certificate = None;
2183
2184        let round2_certs: IndexSet<_> = (0..num_authors)
2185            .map(|idx| {
2186                let author = &private_keys[idx];
2187                let endorsements: Vec<_> = private_keys
2188                    .iter()
2189                    .enumerate()
2190                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2191                    .collect();
2192                let cert = sample_batch_certificate_for_round_with_committee(
2193                    leader_round_1,
2194                    round1_certs.iter().map(|c| c.id()).collect(),
2195                    author,
2196                    &endorsements[..],
2197                    rng,
2198                );
2199
2200                if cert.author() == leader1 {
2201                    leader1_certificate = Some(cert.clone());
2202                }
2203                cert
2204            })
2205            .collect();
2206        certificates_by_round.insert(leader_round_1, round2_certs.clone());
2207
2208        let round3_certs: IndexSet<_> = (0..num_authors)
2209            .map(|idx| {
2210                let author = &private_keys[idx];
2211                let endorsements: Vec<_> = private_keys
2212                    .iter()
2213                    .enumerate()
2214                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2215                    .collect();
2216
2217                let previous_certificate_ids: IndexSet<_> = round2_certs
2218                    .iter()
2219                    .filter_map(|cert| {
2220                        // Only have the leader endorse the previous round's leader certificate.
2221                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2222                    })
2223                    .collect();
2224
2225                sample_batch_certificate_for_round_with_committee(
2226                    leader_round_1 + 1,
2227                    previous_certificate_ids,
2228                    author,
2229                    &endorsements[..],
2230                    rng,
2231                )
2232            })
2233            .collect();
2234        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2235
2236        // Ensure the first leader's certificate is not committed yet.
2237        let leader_certificate_1 = leader1_certificate.unwrap();
2238        assert!(
2239            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2240            "Leader certificate 1 should not be committed yet"
2241        );
2242
2243        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2244        let round4_certs: IndexSet<_> = (0..num_authors)
2245            .map(|idx| {
2246                let endorsements: Vec<_> = private_keys
2247                    .iter()
2248                    .enumerate()
2249                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2250                    .collect();
2251
2252                // Do not create a path to the previous leader certificate.
2253                let previous_certificate_ids: IndexSet<_> = round3_certs
2254                    .iter()
2255                    .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2256                    .collect();
2257
2258                sample_batch_certificate_for_round_with_committee(
2259                    leader_round_2,
2260                    previous_certificate_ids,
2261                    &private_keys[idx],
2262                    &endorsements[..],
2263                    rng,
2264                )
2265            })
2266            .collect();
2267        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2268
2269        // Insert all certificates into the storage and DAG.
2270        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2271            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2272            bft.update_dag::<false, false>(certificate).await.unwrap();
2273        }
2274
2275        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2276
2277        assert!(
2278            !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2279            "Leader certificate 1 should not be linked to leader certificate 2"
2280        );
2281        assert_eq!(bft.dag.read().last_committed_round(), 0);
2282
2283        // Explicitely commit leader certificate 2.
2284        bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2285
2286        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2287        assert!(
2288            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2289            "Leader certificate for round 2 should not be committed when committing at round 4"
2290        );
2291
2292        // Leader certificate 2 should be committed as the above call was successful.
2293        assert!(
2294            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2295            "Leader certificate for round 4 should be committed"
2296        );
2297        assert_eq!(bft.dag.read().last_committed_round(), 4);
2298    }
2299}