Skip to main content

snarkos_node_bft/
bft.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31
32use snarkos_account::Account;
33use snarkos_node_bft_ledger_service::LedgerService;
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_utilities::NodeDataDir;
36
37use snarkvm::{
38    console::account::Address,
39    ledger::{
40        block::Transaction,
41        committee::Committee,
42        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
43        puzzle::{Solution, SolutionID},
44    },
45    prelude::{Field, Network, Result, bail, ensure},
46    utilities::flatten_error,
47};
48
49use anyhow::Context;
50use colored::Colorize;
51use indexmap::{IndexMap, IndexSet};
52#[cfg(feature = "locktick")]
53use locktick::{
54    parking_lot::{Mutex, RwLock},
55    tokio::Mutex as TMutex,
56};
57#[cfg(not(feature = "locktick"))]
58use parking_lot::{Mutex, RwLock};
59use std::{
60    collections::{BTreeMap, HashSet},
61    future::Future,
62    net::SocketAddr,
63    sync::{
64        Arc,
65        atomic::{AtomicI64, Ordering},
66    },
67};
68#[cfg(not(feature = "locktick"))]
69use tokio::sync::Mutex as TMutex;
70use tokio::{
71    sync::{OnceCell, oneshot},
72    task::JoinHandle,
73};
74
75#[derive(Clone)]
76pub struct BFT<N: Network> {
77    /// The primary for this node.
78    primary: Primary<N>,
79    /// The DAG of batches from which we build the blockchain.
80    dag: Arc<RwLock<DAG<N>>>,
81    /// The batch certificate of the leader from the current even round, if one was present.
82    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
83    /// The timer for the leader certificate to be received.
84    leader_certificate_timer: Arc<AtomicI64>,
85    /// The consensus sender.
86    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
87    /// Handles for all spawned tasks.
88    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
89    /// The BFT lock.
90    lock: Arc<TMutex<()>>,
91}
92
93impl<N: Network> BFT<N> {
94    /// Initializes a new instance of the BFT.
95    #[allow(clippy::too_many_arguments)]
96    pub fn new(
97        account: Account<N>,
98        storage: Storage<N>,
99        ledger: Arc<dyn LedgerService<N>>,
100        block_sync: Arc<BlockSync<N>>,
101        ip: Option<SocketAddr>,
102        trusted_validators: &[SocketAddr],
103        trusted_peers_only: bool,
104        node_data_dir: NodeDataDir,
105        dev: Option<u16>,
106    ) -> Result<Self> {
107        Ok(Self {
108            primary: Primary::new(
109                account,
110                storage,
111                ledger,
112                block_sync,
113                ip,
114                trusted_validators,
115                trusted_peers_only,
116                node_data_dir,
117                dev,
118            )?,
119            dag: Default::default(),
120            leader_certificate: Default::default(),
121            leader_certificate_timer: Default::default(),
122            consensus_sender: Default::default(),
123            handles: Default::default(),
124            lock: Default::default(),
125        })
126    }
127
128    /// Run the BFT instance.
129    ///
130    /// This will return as soon as all required tasks are spawned.
131    /// The function must not be called more than once per instance.
132    pub async fn run(
133        &mut self,
134        ping: Option<Arc<Ping<N>>>,
135        consensus_sender: Option<ConsensusSender<N>>,
136        primary_sender: PrimarySender<N>,
137        primary_receiver: PrimaryReceiver<N>,
138    ) -> Result<()> {
139        info!("Starting the BFT instance...");
140        // Initialize the BFT channels.
141        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
142        // First, start the BFT handlers.
143        self.start_handlers(bft_receiver);
144        // Next, run the primary instance.
145        self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
146        // Lastly, set the consensus sender.
147        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
148        if let Some(consensus_sender) = consensus_sender {
149            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
150        }
151        Ok(())
152    }
153
154    /// Returns `true` if the primary is synced.
155    pub fn is_synced(&self) -> bool {
156        self.primary.is_synced()
157    }
158
159    /// Returns the primary.
160    pub const fn primary(&self) -> &Primary<N> {
161        &self.primary
162    }
163
164    /// Returns the storage.
165    pub const fn storage(&self) -> &Storage<N> {
166        self.primary.storage()
167    }
168
169    /// Returns the ledger.
170    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
171        self.primary.ledger()
172    }
173
174    /// Returns the leader of the current even round, if one was present.
175    pub fn leader(&self) -> Option<Address<N>> {
176        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
177    }
178
179    /// Returns the certificate of the leader from the current even round, if one was present.
180    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
181        &self.leader_certificate
182    }
183}
184
185impl<N: Network> BFT<N> {
186    /// Returns the number of unconfirmed transmissions.
187    pub fn num_unconfirmed_transmissions(&self) -> usize {
188        self.primary.num_unconfirmed_transmissions()
189    }
190
191    /// Returns the number of unconfirmed ratifications.
192    pub fn num_unconfirmed_ratifications(&self) -> usize {
193        self.primary.num_unconfirmed_ratifications()
194    }
195
196    /// Returns the number of solutions.
197    pub fn num_unconfirmed_solutions(&self) -> usize {
198        self.primary.num_unconfirmed_solutions()
199    }
200
201    /// Returns the number of unconfirmed transactions.
202    pub fn num_unconfirmed_transactions(&self) -> usize {
203        self.primary.num_unconfirmed_transactions()
204    }
205}
206
207impl<N: Network> BFT<N> {
208    /// Returns the worker transmission IDs.
209    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210        self.primary.worker_transmission_ids()
211    }
212
213    /// Returns the worker transmissions.
214    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215        self.primary.worker_transmissions()
216    }
217
218    /// Returns the worker solutions.
219    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220        self.primary.worker_solutions()
221    }
222
223    /// Returns the worker transactions.
224    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225        self.primary.worker_transactions()
226    }
227}
228
229impl<N: Network> BFT<N> {
230    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
231    fn update_to_next_round(&self, current_round: u64) -> bool {
232        // Ensure the current round is at least the storage round (this is a sanity check).
233        let storage_round = self.storage().current_round();
234        if current_round < storage_round {
235            debug!(
236                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
237            );
238            return false;
239        }
240
241        // Determine if the BFT is ready to update to the next round.
242        let is_ready = match current_round % 2 == 0 {
243            true => self.update_leader_certificate_to_even_round(current_round),
244            false => self.is_leader_quorum_or_nonleaders_available(current_round),
245        };
246
247        #[cfg(feature = "metrics")]
248        {
249            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
250            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
251            if start > 0 {
252                let end = now();
253                let elapsed = std::time::Duration::from_secs((end - start) as u64);
254                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
255            }
256        }
257
258        // Log whether the round is going to update.
259        if current_round % 2 == 0 {
260            // Determine if there is a leader certificate.
261            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
262                // Ensure the state of the leader certificate is consistent with the BFT being ready.
263                if !is_ready {
264                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
265                }
266                // Log the leader election.
267                let leader_round = leader_certificate.round();
268                match leader_round == current_round {
269                    true => {
270                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
271                        #[cfg(feature = "metrics")]
272                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
273                    }
274                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
275                }
276            } else {
277                match is_ready {
278                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
279                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
280                }
281            }
282        }
283
284        // If the BFT is ready, then update to the next round.
285        if is_ready {
286            // Update to the next round in storage.
287            if let Err(err) = self
288                .storage()
289                .increment_to_next_round(current_round)
290                .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
291            {
292                warn!("{}", &flatten_error(err));
293                return false;
294            }
295            // Update the timer for the leader certificate.
296            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
297        }
298
299        is_ready
300    }
301
302    /// Updates the leader certificate to the current even round,
303    /// returning `true` if the BFT is ready to update to the next round.
304    ///
305    /// This method runs on every even round, by determining the leader of the current even round,
306    /// and setting the leader certificate to their certificate in the round, if they were present.
307    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
308        // Retrieve the current round.
309        let current_round = self.storage().current_round();
310        // Ensure the current round matches the given round.
311        if current_round != even_round {
312            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
313            return false;
314        }
315
316        // If the current round is odd, return false.
317        if current_round % 2 != 0 || current_round < 2 {
318            error!("BFT cannot update the leader certificate in an odd round");
319            return false;
320        }
321
322        // Retrieve the certificates for the current round.
323        let current_certificates = self.storage().get_certificates_for_round(current_round);
324        // If there are no current certificates, set the leader certificate to 'None', and return early.
325        if current_certificates.is_empty() {
326            // Set the leader certificate to 'None'.
327            *self.leader_certificate.write() = None;
328            return false;
329        }
330
331        // Retrieve the committee lookback of the current round.
332        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
333            Ok(committee) => committee,
334            Err(err) => {
335                let err = err.context(format!(
336                    "BFT failed to retrieve the committee lookback for the even round {current_round}"
337                ));
338                warn!("{}", &flatten_error(err));
339                return false;
340            }
341        };
342        // Determine the leader of the current round.
343        let leader = match self.ledger().latest_leader() {
344            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
345            _ => {
346                // Compute the leader for the current round.
347                let computed_leader = match committee_lookback.get_leader(current_round) {
348                    Ok(leader) => leader,
349                    Err(err) => {
350                        let err =
351                            err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
352                        error!("{}", &flatten_error(err));
353                        return false;
354                    }
355                };
356
357                // Cache the computed leader.
358                self.ledger().update_latest_leader(current_round, computed_leader);
359
360                computed_leader
361            }
362        };
363        // Find and set the leader certificate, if the leader was present in the current even round.
364        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
365        *self.leader_certificate.write() = leader_certificate.cloned();
366
367        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
368    }
369
370    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
371    ///  - If the leader certificate is set for the current even round.
372    ///  - The timer for the leader certificate has expired.
373    fn is_even_round_ready_for_next_round(
374        &self,
375        certificates: IndexSet<BatchCertificate<N>>,
376        committee: Committee<N>,
377        current_round: u64,
378    ) -> bool {
379        // Retrieve the authors for the current round.
380        let authors = certificates.into_iter().map(|c| c.author()).collect();
381        // Check if quorum threshold is reached.
382        if !committee.is_quorum_threshold_reached(&authors) {
383            trace!("BFT failed to reach quorum threshold in even round {current_round}");
384            return false;
385        }
386        // If the leader certificate is set for the current even round, return 'true'.
387        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
388            if leader_certificate.round() == current_round {
389                return true;
390            }
391        }
392        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
393        if self.is_timer_expired() {
394            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
395            return true;
396        }
397        // Otherwise, return 'false'.
398        false
399    }
400
401    /// Returns `true` if the timer for the leader certificate has expired.
402    ///
403    /// This is always true for a new BFT instance.
404    fn is_timer_expired(&self) -> bool {
405        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
406    }
407
408    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
409    ///  - The leader certificate is `None`.
410    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
411    ///  - The leader certificate timer has expired.
412    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
413        // Retrieve the current round.
414        let current_round = self.storage().current_round();
415        // Ensure the current round matches the given round.
416        if current_round != odd_round {
417            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
418            return false;
419        }
420        // If the current round is even, return false.
421        if current_round % 2 != 1 {
422            error!("BFT does not compute stakes for the leader certificate in an even round");
423            return false;
424        }
425        // Retrieve the certificates for the current round.
426        let current_certificates = self.storage().get_certificates_for_round(current_round);
427        // Retrieve the committee lookback for the current round.
428        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
429            Ok(committee) => committee,
430            Err(err) => {
431                let err = err.context(format!(
432                    "BFT failed to retrieve the committee lookback for the odd round {current_round}"
433                ));
434                error!("{}", &flatten_error(err));
435                return false;
436            }
437        };
438        // Retrieve the authors of the current certificates.
439        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
440        // Check if quorum threshold is reached.
441        if !committee_lookback.is_quorum_threshold_reached(&authors) {
442            trace!("BFT failed reach quorum threshold in odd round {current_round}.");
443            return false;
444        }
445        // Retrieve the leader certificate.
446        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
447            // If there is no leader certificate for the previous round, return 'true'.
448            return true;
449        };
450        // Compute the stake for the leader certificate.
451        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
452            leader_certificate.id(),
453            current_certificates,
454            &committee_lookback,
455        );
456        // Return 'true' if any of the following conditions hold:
457        stake_with_leader >= committee_lookback.availability_threshold()
458            || stake_without_leader >= committee_lookback.quorum_threshold()
459            || self.is_timer_expired()
460    }
461
462    /// Computes the amount of stake that has & has not signed for the leader certificate.
463    fn compute_stake_for_leader_certificate(
464        &self,
465        leader_certificate_id: Field<N>,
466        current_certificates: IndexSet<BatchCertificate<N>>,
467        current_committee: &Committee<N>,
468    ) -> (u64, u64) {
469        // If there are no current certificates, return early.
470        if current_certificates.is_empty() {
471            return (0, 0);
472        }
473
474        // Initialize a tracker for the stake with the leader.
475        let mut stake_with_leader = 0u64;
476        // Initialize a tracker for the stake without the leader.
477        let mut stake_without_leader = 0u64;
478        // Iterate over the current certificates.
479        for certificate in current_certificates {
480            // Retrieve the stake for the author of the certificate.
481            let stake = current_committee.get_stake(certificate.author());
482            // Determine if the certificate includes the leader.
483            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
484                // If the certificate includes the leader, add the stake to the stake with the leader.
485                true => stake_with_leader = stake_with_leader.saturating_add(stake),
486                // If the certificate does not include the leader, add the stake to the stake without the leader.
487                false => stake_without_leader = stake_without_leader.saturating_add(stake),
488            }
489        }
490        // Return the stake with the leader, and the stake without the leader.
491        (stake_with_leader, stake_without_leader)
492    }
493}
494
495impl<N: Network> BFT<N> {
496    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
497    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
498        &self,
499        certificate: BatchCertificate<N>,
500    ) -> Result<()> {
501        // Acquire the BFT lock.
502        let _lock = self.lock.lock().await;
503
504        // ### First, insert the certificate into the DAG. ###
505        // Retrieve the round of the new certificate to add to the DAG.
506        let certificate_round = certificate.round();
507
508        // Insert the certificate into the DAG.
509        self.dag.write().insert(certificate);
510
511        // ### Second, determine if a new leader certificate can be committed. ###
512        let commit_round = certificate_round.saturating_sub(1);
513
514        // Leaders are elected in even rounds.
515        // If the previous round is odd, the current round cannot commit any leader certs.
516        // Similarly, no leader certificate can be committed for round zero.
517        if !commit_round.is_multiple_of(2) || commit_round < 2 {
518            return Ok(());
519        }
520        // If the commit round is at or below the last committed round, return early.
521        if commit_round <= self.dag.read().last_committed_round() {
522            return Ok(());
523        }
524
525        /* Proceeding to check if the leader is ready to be committed. */
526        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
527
528        // Retrieve the committee lookback for the commit round.
529        let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
530            format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
531        })?;
532
533        // Either retrieve the cached leader or compute it.
534        let leader = match self.ledger().latest_leader() {
535            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
536            _ => {
537                // Compute the leader for the commit round.
538                let computed_leader = committee_lookback
539                    .get_leader(commit_round)
540                    .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
541
542                // Cache the computed leader.
543                self.ledger().update_latest_leader(commit_round, computed_leader);
544
545                computed_leader
546            }
547        };
548
549        // Retrieve the leader certificate for the commit round.
550        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
551        else {
552            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
553            return Ok(());
554        };
555        // Retrieve all of the certificates for the **certificate** round.
556        let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
557            format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
558        })?;
559
560        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
561        let certificate_committee_lookback =
562            self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
563                format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
564            })?;
565
566        // Construct a set over the authors who included the leader's certificate in the certificate round.
567        let authors = certificates
568            .values()
569            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
570                true => Some(c.author()),
571                false => None,
572            })
573            .collect();
574        // Check if the leader is ready to be committed.
575        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
576            // If the leader is not ready to be committed, return early.
577            trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
578            return Ok(());
579        }
580
581        if IS_SYNCING {
582            info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
583        } else {
584            info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
585        }
586
587        // Commit the leader certificate, and all previous leader certificates since the last committed round.
588        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
589    }
590
591    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
592    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
593        &self,
594        leader_certificate: BatchCertificate<N>,
595    ) -> Result<()> {
596        #[cfg(feature = "metrics")]
597        let start = std::time::Instant::now();
598        #[cfg(debug_assertions)]
599        trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
600
601        // Fetch the leader round.
602        let latest_leader_round = leader_certificate.round();
603        // Determine the list of all previous leader certificates since the last committed round.
604        // The order of the leader certificates is from **newest** to **oldest**.
605        let mut leader_certificates = vec![leader_certificate.clone()];
606        {
607            // Retrieve the leader round.
608            let leader_round = leader_certificate.round();
609
610            let mut current_certificate = leader_certificate;
611            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
612            {
613                // Retrieve the previous committee for the leader round.
614                let previous_committee_lookback =
615                    self.ledger().get_committee_lookback_for_round(round).with_context(|| {
616                        format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
617                    })?;
618
619                // Either retrieve the cached leader or compute it.
620                let leader = match self.ledger().latest_leader() {
621                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
622                    _ => {
623                        // Compute the leader for the commit round.
624                        let computed_leader = previous_committee_lookback
625                            .get_leader(round)
626                            .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
627
628                        // Cache the computed leader.
629                        self.ledger().update_latest_leader(round, computed_leader);
630
631                        computed_leader
632                    }
633                };
634                // Retrieve the previous leader certificate.
635                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
636                else {
637                    continue;
638                };
639                // Determine if there is a path between the previous certificate and the current certificate.
640                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
641                    // Add the previous leader certificate to the list of certificates to commit.
642                    leader_certificates.push(previous_certificate.clone());
643                    // Update the current certificate to the previous leader certificate.
644                    current_certificate = previous_certificate;
645                } else {
646                    #[cfg(debug_assertions)]
647                    trace!(
648                        "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
649                    );
650                }
651            }
652        }
653
654        // Iterate over the leader certificates to commit.
655        for leader_certificate in leader_certificates.into_iter().rev() {
656            // Retrieve the leader certificate round.
657            let leader_round = leader_certificate.round();
658            // Compute the commit subdag.
659            let commit_subdag = self
660                .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
661                .with_context(|| "BFT failed to order the DAG with DFS")?;
662            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
663            if !IS_SYNCING {
664                // Initialize a map for the deduped transmissions.
665                let mut transmissions = IndexMap::new();
666                // Initialize a map for the deduped transaction ids.
667                let mut seen_transaction_ids = IndexSet::new();
668                // Initialize a map for the deduped solution ids.
669                let mut seen_solution_ids = IndexSet::new();
670                // Start from the oldest leader certificate.
671                for certificate in commit_subdag.values().flatten() {
672                    // Retrieve the transmissions.
673                    for transmission_id in certificate.transmission_ids() {
674                        // If the transaction ID or solution ID already exists in the map, skip it.
675                        // Note: This additional check is done to ensure that we do not include duplicate
676                        // transaction IDs or solution IDs that may have a different transmission ID.
677                        match transmission_id {
678                            TransmissionID::Solution(solution_id, _) => {
679                                // If the solution already exists, skip it.
680                                if seen_solution_ids.contains(&solution_id) {
681                                    continue;
682                                }
683                            }
684                            TransmissionID::Transaction(transaction_id, _) => {
685                                // If the transaction already exists, skip it.
686                                if seen_transaction_ids.contains(transaction_id) {
687                                    continue;
688                                }
689                            }
690                            TransmissionID::Ratification => {
691                                bail!("Ratifications are currently not supported in the BFT.")
692                            }
693                        }
694                        // If the transmission already exists in the map, skip it.
695                        if transmissions.contains_key(transmission_id) {
696                            continue;
697                        }
698                        // If the transmission already exists in the ledger, skip it.
699                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
700                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
701                            continue;
702                        }
703                        // Retrieve the transmission.
704                        let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
705                            format!(
706                                "BFT failed to retrieve transmission '{}.{}' from round {}",
707                                fmt_id(transmission_id),
708                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
709                                certificate.round()
710                            )
711                        })?;
712                        // Insert the transaction ID or solution ID into the map.
713                        match transmission_id {
714                            TransmissionID::Solution(id, _) => {
715                                seen_solution_ids.insert(id);
716                            }
717                            TransmissionID::Transaction(id, _) => {
718                                seen_transaction_ids.insert(id);
719                            }
720                            TransmissionID::Ratification => {}
721                        }
722                        // Add the transmission to the set.
723                        transmissions.insert(*transmission_id, transmission);
724                    }
725                }
726                // Trigger consensus, as this will build a new block for the ledger.
727                // Construct the subdag.
728                let subdag = Subdag::from(commit_subdag.clone())?;
729                // Retrieve the anchor round.
730                let anchor_round = subdag.anchor_round();
731                // Retrieve the number of transmissions.
732                let num_transmissions = transmissions.len();
733                // Retrieve metadata about the subdag.
734                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
735
736                // Ensure the subdag anchor round matches the leader round.
737                ensure!(
738                    anchor_round == leader_round,
739                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
740                );
741
742                // Trigger consensus.
743                if let Some(consensus_sender) = self.consensus_sender.get() {
744                    // Initialize a callback sender and receiver.
745                    let (callback_sender, callback_receiver) = oneshot::channel();
746                    // Send the subdag and transmissions to consensus.
747                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
748                    // Await the callback to continue.
749                    match callback_receiver.await {
750                        Ok(Ok(())) => (), // continue
751                        Ok(Err(err)) => {
752                            let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
753                            error!("{}", &flatten_error(err));
754                            return Ok(());
755                        }
756                        Err(err) => {
757                            let err: anyhow::Error = err.into();
758                            let err =
759                                err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
760                            error!("{}", flatten_error(err));
761                            return Ok(());
762                        }
763                    }
764                }
765
766                info!(
767                    "\n\nCommitting a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})\n",
768                );
769            }
770
771            // Update the DAG, as the subdag was successfully included into a block.
772            {
773                let mut dag_write = self.dag.write();
774                let mut count = 0;
775                for certificate in commit_subdag.values().flatten() {
776                    dag_write.commit(certificate, self.storage().max_gc_rounds());
777                    count += 1;
778                }
779
780                trace!("Committed {count} certificates to the DAG");
781            }
782
783            // Update the validator telemetry.
784            #[cfg(feature = "telemetry")]
785            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
786        }
787
788        // Perform garbage collection based on the latest committed leader round.
789        // The protocol guarantees that validators commit the same anchors in the same order,
790        // but they may do so in different chunks of anchors,
791        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
792        // Doing garbage collection at the end of each chunk (as we do here),
793        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
794        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
795        // one validator may have more certificates than the other, not yet garbage collected.
796        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
797        // it excludes certificates that are below the GC round,
798        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
799        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
800        // so long as garbage collection is done after each chunk.
801        // If garbage collection were done after each committed certificate,
802        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
803        self.storage().garbage_collect_certificates(latest_leader_round);
804
805        #[cfg(feature = "metrics")]
806        metrics::histogram(metrics::bft::COMMIT_LEADER_CERTIFICATE_LATENCY, start.elapsed().as_secs_f64());
807        Ok(())
808    }
809
810    /// Returns the subdag of batch certificates to commit.
811    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
812        &self,
813        leader_certificate: BatchCertificate<N>,
814    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
815        // Initialize a map for the certificates to commit.
816        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
817        // Initialize a set for the already ordered certificates.
818        let mut already_ordered = HashSet::new();
819        // Initialize a buffer for the certificates to order.
820        let mut buffer = vec![leader_certificate];
821        // Iterate over the certificates to order.
822        while let Some(certificate) = buffer.pop() {
823            // Insert the certificate into the map.
824            commit.entry(certificate.round()).or_default().insert(certificate.clone());
825
826            // Check if the previous certificate is below the GC round.
827            // This is currently a critical check to prevent forking,
828            // as explained in the comment at the end of `commit_leader_certificate()`,
829            // just before the call to garbage collection.
830            let previous_round = certificate.round().saturating_sub(1);
831            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
832                continue;
833            }
834            // Iterate over the previous certificate IDs.
835            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
836            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
837            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
838                // If the previous certificate is already ordered, continue.
839                if already_ordered.contains(previous_certificate_id) {
840                    continue;
841                }
842                // If the previous certificate was recently committed, continue.
843                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
844                    continue;
845                }
846                // If the previous certificate already exists in the ledger, continue.
847                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
848                    continue;
849                }
850
851                // Retrieve the previous certificate.
852                let previous_certificate = {
853                    // Start by retrieving the previous certificate from the DAG.
854                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
855                        // If the previous certificate is found, return it.
856                        Some(previous_certificate) => previous_certificate,
857                        // If the previous certificate is not found, retrieve it from the storage.
858                        None => match self.storage().get_certificate(*previous_certificate_id) {
859                            // If the previous certificate is found, return it.
860                            Some(previous_certificate) => previous_certificate,
861                            // Otherwise, the previous certificate is missing, and throw an error.
862                            None => bail!(
863                                "Missing previous certificate {} for round {previous_round}",
864                                fmt_id(previous_certificate_id)
865                            ),
866                        },
867                    }
868                };
869                // Insert the previous certificate into the set of already ordered certificates.
870                already_ordered.insert(previous_certificate.id());
871                // Insert the previous certificate into the buffer.
872                buffer.push(previous_certificate);
873            }
874        }
875        // Ensure we only retain certificates that are above the GC round.
876        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
877        // Return the certificates to commit.
878        Ok(commit)
879    }
880
881    /// Returns `true` if there is a path from the previous certificate to the current certificate.
882    fn is_linked(
883        &self,
884        previous_certificate: BatchCertificate<N>,
885        current_certificate: BatchCertificate<N>,
886    ) -> Result<bool> {
887        // Initialize the list containing the traversal.
888        let mut traversal = vec![current_certificate.clone()];
889        // Iterate over the rounds from the current certificate to the previous certificate.
890        for round in (previous_certificate.round()..current_certificate.round()).rev() {
891            // Retrieve all of the certificates for this past round.
892            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
893                // This is a critical error, as the traversal should have these certificates.
894                // If this error is hit, it is likely that the maximum GC rounds should be increased.
895                bail!("BFT failed to retrieve the certificates for past round {round}");
896            };
897            // Filter the certificates to only include those that are in the traversal.
898            traversal = certificates
899                .into_values()
900                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
901                .collect();
902        }
903        Ok(traversal.contains(&previous_certificate))
904    }
905}
906
907impl<N: Network> BFT<N> {
908    /// Starts the BFT handlers.
909    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
910        let BFTReceiver {
911            mut rx_primary_round,
912            mut rx_primary_certificate,
913            mut rx_sync_bft_dag_at_bootup,
914            mut rx_sync_bft,
915            mut rx_sync_block_committed,
916        } = bft_receiver;
917
918        // Process the current round from the primary.
919        let self_ = self.clone();
920        self.spawn(async move {
921            while let Some((current_round, callback)) = rx_primary_round.recv().await {
922                callback.send(self_.update_to_next_round(current_round)).ok();
923            }
924        });
925
926        // Process the certificate from the primary.
927        let self_ = self.clone();
928        self.spawn(async move {
929            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
930                // Update the DAG with the certificate.
931                let result = self_.update_dag::<true, false>(certificate).await;
932                // Send the callback **after** updating the DAG.
933                // Note: We must await the DAG update before proceeding.
934                callback.send(result).ok();
935            }
936        });
937
938        // Process the request to sync the BFT DAG at bootup.
939        let self_ = self.clone();
940        self.spawn(async move {
941            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
942                self_.sync_bft_dag_at_bootup(certificates).await;
943            }
944        });
945
946        // Handler for new certificates that were fetched by the sync module.
947        let self_ = self.clone();
948        self.spawn(async move {
949            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
950                // Update the DAG with the certificate.
951                let result = self_.update_dag::<true, true>(certificate).await;
952                // Send the callback **after** updating the DAG.
953                // Note: We must await the DAG update before proceeding.
954                callback.send(result).ok();
955            }
956        });
957
958        // Handler for new blocks that were synced wit BFT.
959        //
960        // Note: This is just a workaround until other sync changes are merged.
961        // BFT sync logic should always use DAG commits within GC, but uses pending blocks in rare cases.
962        // This sender ensures that the DAG is still updated accordingly if that happens.
963        let self_ = self.clone();
964        self.spawn(async move {
965            while let Some((leader_certificate, callback)) = rx_sync_block_committed.recv().await {
966                self_.dag.write().commit(&leader_certificate, self_.storage().max_gc_rounds());
967                callback.send(Ok(())).ok();
968            }
969        });
970    }
971
972    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
973    /// already exist in the ledger.
974    ///
975    /// This method commits all the certificates into the DAG.
976    /// Note that there is no need to insert the certificates into the DAG, because these certificates
977    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
978    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
979        // Acquire the BFT write lock.
980        let mut dag = self.dag.write();
981
982        // Commit all the certificates.
983        for certificate in certificates {
984            dag.commit(&certificate, self.storage().max_gc_rounds());
985        }
986    }
987
988    /// Spawns a task with the given future; it should only be used for long-running tasks.
989    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
990        self.handles.lock().push(tokio::spawn(future));
991    }
992
993    /// Shuts down the BFT.
994    pub async fn shut_down(&self) {
995        info!("Shutting down the BFT...");
996        // Acquire the lock.
997        let _lock = self.lock.lock().await;
998        // Shut down the primary.
999        self.primary.shut_down().await;
1000        // Abort the tasks.
1001        self.handles.lock().iter().for_each(|handle| handle.abort());
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use crate::{
1008        BFT,
1009        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
1010        helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
1011    };
1012
1013    use snarkos_account::Account;
1014    use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
1015    use snarkos_node_bft_storage_service::BFTMemoryService;
1016    use snarkos_node_sync::BlockSync;
1017    use snarkos_utilities::NodeDataDir;
1018
1019    use snarkvm::{
1020        console::account::{Address, PrivateKey},
1021        ledger::{
1022            committee::{
1023                Committee,
1024                test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
1025            },
1026            narwhal::{
1027                BatchCertificate,
1028                batch_certificate::test_helpers::{
1029                    sample_batch_certificate,
1030                    sample_batch_certificate_for_round,
1031                    sample_batch_certificate_for_round_with_committee,
1032                },
1033            },
1034        },
1035        utilities::TestRng,
1036    };
1037
1038    use anyhow::Result;
1039    use indexmap::{IndexMap, IndexSet};
1040    use std::sync::Arc;
1041
1042    type CurrentNetwork = snarkvm::console::network::MainnetV0;
1043
1044    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
1045    fn sample_test_instance(
1046        committee_round: Option<u64>,
1047        max_gc_rounds: u64,
1048        rng: &mut TestRng,
1049    ) -> (
1050        Committee<CurrentNetwork>,
1051        Account<CurrentNetwork>,
1052        Arc<MockLedgerService<CurrentNetwork>>,
1053        Storage<CurrentNetwork>,
1054    ) {
1055        let committee = match committee_round {
1056            Some(round) => sample_committee_for_round(round, rng),
1057            None => sample_committee(rng),
1058        };
1059        let account = Account::new(rng).unwrap();
1060        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1061        let transmissions = Arc::new(BFTMemoryService::new());
1062        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1063
1064        (committee, account, ledger, storage)
1065    }
1066
1067    // Helper function to set up BFT for testing.
1068    fn initialize_bft(
1069        account: Account<CurrentNetwork>,
1070        storage: Storage<CurrentNetwork>,
1071        ledger: Arc<MockLedgerService<CurrentNetwork>>,
1072    ) -> anyhow::Result<BFT<CurrentNetwork>> {
1073        // Create the block synchronization logic.
1074        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1075        // Initialize the BFT.
1076        BFT::new(
1077            account.clone(),
1078            storage.clone(),
1079            ledger.clone(),
1080            block_sync,
1081            None,
1082            &[],
1083            false,
1084            NodeDataDir::new_test(None),
1085            None,
1086        )
1087    }
1088
1089    #[test]
1090    #[tracing_test::traced_test]
1091    fn test_is_leader_quorum_odd() -> Result<()> {
1092        let rng = &mut TestRng::default();
1093
1094        // Sample batch certificates.
1095        let mut certificates = IndexSet::new();
1096        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1097        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1098        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1099        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1100
1101        // Initialize the committee.
1102        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1103            1,
1104            vec![
1105                certificates[0].author(),
1106                certificates[1].author(),
1107                certificates[2].author(),
1108                certificates[3].author(),
1109            ],
1110            rng,
1111        );
1112
1113        // Initialize the ledger.
1114        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1115        // Initialize the storage.
1116        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1117        // Initialize the account.
1118        let account = Account::new(rng)?;
1119        // Initialize the BFT.
1120        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1121        assert!(bft.is_timer_expired());
1122        // Ensure this call succeeds on an odd round.
1123        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1124        // If timer has expired but quorum threshold is not reached, return 'false'.
1125        assert!(!result);
1126        // Insert certificates into storage.
1127        for certificate in certificates.iter() {
1128            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1129        }
1130        // Ensure this call succeeds on an odd round.
1131        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1132        assert!(result); // no previous leader certificate
1133        // Set the leader certificate.
1134        let leader_certificate = sample_batch_certificate(rng);
1135        *bft.leader_certificate.write() = Some(leader_certificate);
1136        // Ensure this call succeeds on an odd round.
1137        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1138        assert!(result); // should now fall through to the end of function
1139
1140        Ok(())
1141    }
1142
1143    #[test]
1144    #[tracing_test::traced_test]
1145    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1146        let rng = &mut TestRng::default();
1147
1148        // Sample the test instance.
1149        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1150        assert_eq!(committee.starting_round(), 1);
1151        assert_eq!(storage.current_round(), 1);
1152        assert_eq!(storage.max_gc_rounds(), 10);
1153
1154        // Set up the BFT logic.
1155        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1156        assert!(bft.is_timer_expired());
1157
1158        // Store is at round 1, and we are checking for round 2.
1159        // Ensure this call fails on an even round.
1160        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1161        assert!(!result);
1162        Ok(())
1163    }
1164
1165    #[test]
1166    #[tracing_test::traced_test]
1167    fn test_is_leader_quorum_even() -> Result<()> {
1168        let rng = &mut TestRng::default();
1169
1170        // Sample the test instance.
1171        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1172        assert_eq!(committee.starting_round(), 2);
1173        assert_eq!(storage.current_round(), 2);
1174        assert_eq!(storage.max_gc_rounds(), 10);
1175
1176        // Set up the BFT logic.
1177        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1178        assert!(bft.is_timer_expired());
1179
1180        // Ensure this call fails on an even round.
1181        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1182        assert!(!result);
1183        Ok(())
1184    }
1185
1186    #[test]
1187    #[tracing_test::traced_test]
1188    fn test_is_even_round_ready() -> Result<()> {
1189        let rng = &mut TestRng::default();
1190
1191        // Sample batch certificates.
1192        let mut certificates = IndexSet::new();
1193        certificates.insert(sample_batch_certificate_for_round(2, rng));
1194        certificates.insert(sample_batch_certificate_for_round(2, rng));
1195        certificates.insert(sample_batch_certificate_for_round(2, rng));
1196        certificates.insert(sample_batch_certificate_for_round(2, rng));
1197
1198        // Initialize the committee.
1199        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1200            2,
1201            vec![
1202                certificates[0].author(),
1203                certificates[1].author(),
1204                certificates[2].author(),
1205                certificates[3].author(),
1206            ],
1207            rng,
1208        );
1209
1210        // Initialize the ledger.
1211        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1212        // Initialize the storage.
1213        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1214        // Initialize the account.
1215        let account = Account::new(rng)?;
1216
1217        // Set up the BFT logic.
1218        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1219        assert!(bft.is_timer_expired());
1220
1221        // Set the leader certificate.
1222        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1223        *bft.leader_certificate.write() = Some(leader_certificate);
1224        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1225        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1226        assert!(!result);
1227        // Once quorum threshold is reached, we are ready for the next round.
1228        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1229        assert!(result);
1230
1231        // Initialize a new BFT.
1232        let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1233        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1234        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1235        if !bft_timer.is_timer_expired() {
1236            assert!(!result);
1237        }
1238        // Wait for the timer to expire.
1239        let leader_certificate_timeout =
1240            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1241        std::thread::sleep(leader_certificate_timeout);
1242        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1243        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1244        if bft_timer.is_timer_expired() {
1245            assert!(result);
1246        } else {
1247            assert!(!result);
1248        }
1249
1250        Ok(())
1251    }
1252
1253    #[test]
1254    #[tracing_test::traced_test]
1255    fn test_update_leader_certificate_odd() -> Result<()> {
1256        let rng = &mut TestRng::default();
1257
1258        // Sample the test instance.
1259        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1260        assert_eq!(storage.max_gc_rounds(), 10);
1261
1262        // Initialize the BFT.
1263        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1264        assert!(bft.is_timer_expired());
1265
1266        // Ensure this call fails on an odd round.
1267        let result = bft.update_leader_certificate_to_even_round(1);
1268        assert!(!result);
1269        Ok(())
1270    }
1271
1272    #[test]
1273    #[tracing_test::traced_test]
1274    fn test_update_leader_certificate_bad_round() -> Result<()> {
1275        let rng = &mut TestRng::default();
1276
1277        // Sample the test instance.
1278        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1279        assert_eq!(storage.max_gc_rounds(), 10);
1280
1281        // Initialize the BFT.
1282        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1283
1284        // Ensure this call succeeds on an even round.
1285        let result = bft.update_leader_certificate_to_even_round(6);
1286        assert!(!result);
1287        Ok(())
1288    }
1289
1290    #[test]
1291    #[tracing_test::traced_test]
1292    fn test_update_leader_certificate_even() -> Result<()> {
1293        let rng = &mut TestRng::default();
1294
1295        // Set the current round.
1296        let current_round = 3;
1297
1298        // Sample the certificates.
1299        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1300            current_round,
1301            rng,
1302        );
1303
1304        // Initialize the committee.
1305        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1306            2,
1307            vec![
1308                certificates[0].author(),
1309                certificates[1].author(),
1310                certificates[2].author(),
1311                certificates[3].author(),
1312            ],
1313            rng,
1314        );
1315
1316        // Initialize the ledger.
1317        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1318
1319        // Initialize the storage.
1320        let transmissions = Arc::new(BFTMemoryService::new());
1321        let storage = Storage::new(ledger.clone(), transmissions, 10);
1322        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1323        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1324        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1325        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1326        assert_eq!(storage.current_round(), 2);
1327
1328        // Retrieve the leader certificate.
1329        let leader = committee.get_leader(2).unwrap();
1330        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1331
1332        // Initialize the BFT.
1333        let account = Account::new(rng)?;
1334        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1335
1336        // Set the leader certificate.
1337        *bft.leader_certificate.write() = Some(leader_certificate);
1338
1339        // Update the leader certificate.
1340        // Ensure this call succeeds on an even round.
1341        let result = bft.update_leader_certificate_to_even_round(2);
1342        assert!(result);
1343
1344        Ok(())
1345    }
1346
1347    #[tokio::test]
1348    #[tracing_test::traced_test]
1349    async fn test_order_dag_with_dfs() -> Result<()> {
1350        let rng = &mut TestRng::default();
1351
1352        // Sample the test instance.
1353        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1354
1355        // Initialize the round parameters.
1356        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1357        let current_round = previous_round + 1;
1358
1359        // Sample the current certificate and previous certificates.
1360        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1361            current_round,
1362            rng,
1363        );
1364
1365        /* Test GC */
1366
1367        // Ensure the function succeeds in returning only certificates above GC.
1368        {
1369            // Initialize the storage.
1370            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1371            // Initialize the BFT.
1372            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1373
1374            // Insert a mock DAG in the BFT.
1375            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1376
1377            // Insert the previous certificates into the BFT.
1378            for certificate in previous_certificates.clone() {
1379                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1380            }
1381
1382            // Ensure this call succeeds and returns all given certificates.
1383            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1384            assert!(result.is_ok());
1385            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1386            assert_eq!(candidate_certificates.len(), 1);
1387            let expected_certificates = vec![certificate.clone()];
1388            assert_eq!(
1389                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1390                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1391            );
1392            assert_eq!(candidate_certificates, expected_certificates);
1393        }
1394
1395        /* Test normal case */
1396
1397        // Ensure the function succeeds in returning all given certificates.
1398        {
1399            // Initialize the storage.
1400            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1401            // Initialize the BFT.
1402            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1403
1404            // Insert a mock DAG in the BFT.
1405            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1406
1407            // Insert the previous certificates into the BFT.
1408            for certificate in previous_certificates.clone() {
1409                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1410            }
1411
1412            // Ensure this call succeeds and returns all given certificates.
1413            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1414            assert!(result.is_ok());
1415            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1416            assert_eq!(candidate_certificates.len(), 5);
1417            let expected_certificates = vec![
1418                previous_certificates[0].clone(),
1419                previous_certificates[1].clone(),
1420                previous_certificates[2].clone(),
1421                previous_certificates[3].clone(),
1422                certificate,
1423            ];
1424            assert_eq!(
1425                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1426                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1427            );
1428            assert_eq!(candidate_certificates, expected_certificates);
1429        }
1430
1431        Ok(())
1432    }
1433
1434    #[test]
1435    #[tracing_test::traced_test]
1436    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1437        let rng = &mut TestRng::default();
1438
1439        // Sample the test instance.
1440        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1441        assert_eq!(committee.starting_round(), 1);
1442        assert_eq!(storage.current_round(), 1);
1443        assert_eq!(storage.max_gc_rounds(), 1);
1444
1445        // Initialize the round parameters.
1446        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1447        let current_round = previous_round + 1;
1448
1449        // Sample the current certificate and previous certificates.
1450        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1451            current_round,
1452            rng,
1453        );
1454        // Construct the previous certificate IDs.
1455        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1456
1457        /* Test missing previous certificate. */
1458
1459        // Initialize the BFT.
1460        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1461
1462        // The expected error message.
1463        let error_msg = format!(
1464            "Missing previous certificate {} for round {previous_round}",
1465            crate::helpers::fmt_id(previous_certificate_ids[3]),
1466        );
1467
1468        // Ensure this call fails on a missing previous certificate.
1469        let result = bft.order_dag_with_dfs::<false>(certificate);
1470        assert!(result.is_err());
1471        assert_eq!(result.unwrap_err().to_string(), error_msg);
1472        Ok(())
1473    }
1474
1475    #[tokio::test]
1476    async fn test_bft_gc_on_commit() -> Result<()> {
1477        let rng = &mut TestRng::default();
1478
1479        // Initialize the round parameters.
1480        let max_gc_rounds = 1;
1481        let committee_round = 0;
1482        let commit_round = 2;
1483        let current_round = commit_round + 1;
1484
1485        // Sample the certificates.
1486        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1487            current_round,
1488            rng,
1489        );
1490
1491        // Initialize the committee.
1492        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1493            committee_round,
1494            vec![
1495                certificates[0].author(),
1496                certificates[1].author(),
1497                certificates[2].author(),
1498                certificates[3].author(),
1499            ],
1500            rng,
1501        );
1502
1503        // Initialize the ledger.
1504        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1505
1506        // Initialize the storage.
1507        let transmissions = Arc::new(BFTMemoryService::new());
1508        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1509        // Insert the certificates into the storage.
1510        for certificate in certificates.iter() {
1511            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1512        }
1513
1514        // Get the leader certificate.
1515        let leader = committee.get_leader(commit_round).unwrap();
1516        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1517
1518        // Initialize the BFT.
1519        let account = Account::new(rng)?;
1520        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1521
1522        // Create an empty mock DAG with last committed round set to `commit_round`.
1523        *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1524
1525        // Ensure that the `gc_round` has not been updated yet.
1526        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1527
1528        // Insert the certificates into the BFT.
1529        for certificate in certificates {
1530            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1531        }
1532
1533        // Commit the leader certificate.
1534        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1535
1536        // Ensure that the `gc_round` has been updated.
1537        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1538
1539        Ok(())
1540    }
1541
1542    #[tokio::test]
1543    #[tracing_test::traced_test]
1544    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1545        let rng = &mut TestRng::default();
1546
1547        // Initialize the round parameters.
1548        let max_gc_rounds = 1;
1549        let committee_round = 0;
1550        let commit_round = 2;
1551        let current_round = commit_round + 1;
1552
1553        // Sample the current certificate and previous certificates.
1554        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1555            current_round,
1556            rng,
1557        );
1558
1559        // Initialize the committee.
1560        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1561            committee_round,
1562            vec![
1563                certificates[0].author(),
1564                certificates[1].author(),
1565                certificates[2].author(),
1566                certificates[3].author(),
1567            ],
1568            rng,
1569        );
1570
1571        // Initialize the ledger.
1572        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1573
1574        // Initialize the storage.
1575        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1576        // Insert the certificates into the storage.
1577        for certificate in certificates.iter() {
1578            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1579        }
1580
1581        // Get the leader certificate.
1582        let leader = committee.get_leader(commit_round).unwrap();
1583        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1584
1585        // Initialize the BFT.
1586        let account = Account::new(rng)?;
1587        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1588
1589        // Insert a mock DAG in the BFT.
1590        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1591
1592        // Insert the previous certificates into the BFT.
1593        for certificate in certificates.clone() {
1594            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1595        }
1596
1597        // Commit the leader certificate.
1598        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1599
1600        // Simulate a bootup of the BFT.
1601
1602        // Initialize a new instance of storage.
1603        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1604        // Initialize a new instance of BFT.
1605        let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1606
1607        // Sync the BFT DAG at bootup.
1608        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1609
1610        // Check that the BFT starts from the same last committed round.
1611        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1612
1613        // Ensure that both BFTs have committed the leader certificate.
1614        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1615        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1616
1617        // Check the state of the bootup BFT.
1618        for certificate in certificates {
1619            let certificate_round = certificate.round();
1620            let certificate_id = certificate.id();
1621            // Check that the bootup BFT has committed the certificates.
1622            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1623            // Check that the bootup BFT does not contain the certificates in its graph, because
1624            // it should not need to order them again in subsequent subdags.
1625            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1626        }
1627
1628        Ok(())
1629    }
1630
1631    #[tokio::test]
1632    #[tracing_test::traced_test]
1633    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1634        /*
1635        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1636        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1637        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1638        */
1639
1640        let rng = &mut TestRng::default();
1641
1642        // Initialize the round parameters.
1643        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1644        let committee_round = 0;
1645        let commit_round = 2;
1646        let current_round = commit_round + 1;
1647        let next_round = current_round + 1;
1648
1649        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1650        let (round_to_certificates_map, committee) = {
1651            let private_keys = vec![
1652                PrivateKey::new(rng).unwrap(),
1653                PrivateKey::new(rng).unwrap(),
1654                PrivateKey::new(rng).unwrap(),
1655                PrivateKey::new(rng).unwrap(),
1656            ];
1657            let addresses = vec![
1658                Address::try_from(private_keys[0])?,
1659                Address::try_from(private_keys[1])?,
1660                Address::try_from(private_keys[2])?,
1661                Address::try_from(private_keys[3])?,
1662            ];
1663            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1664                committee_round,
1665                addresses,
1666                rng,
1667            );
1668            // Initialize a mapping from the round number to the set of batch certificates in the round.
1669            let mut round_to_certificates_map: IndexMap<
1670                u64,
1671                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1672            > = IndexMap::new();
1673            let mut previous_certificates = IndexSet::with_capacity(4);
1674            // Initialize the genesis batch certificates.
1675            for _ in 0..4 {
1676                previous_certificates.insert(sample_batch_certificate(rng));
1677            }
1678            for round in 0..commit_round + 3 {
1679                let mut current_certificates = IndexSet::new();
1680                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1681                    IndexSet::new()
1682                } else {
1683                    previous_certificates.iter().map(|c| c.id()).collect()
1684                };
1685                let transmission_ids =
1686                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1687                        .into_iter()
1688                        .collect::<IndexSet<_>>();
1689                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1690                let committee_id = committee.id();
1691                for (i, private_key_1) in private_keys.iter().enumerate() {
1692                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1693                        private_key_1,
1694                        round,
1695                        timestamp,
1696                        committee_id,
1697                        transmission_ids.clone(),
1698                        previous_certificate_ids.clone(),
1699                        rng,
1700                    )
1701                    .unwrap();
1702                    let mut signatures = IndexSet::with_capacity(4);
1703                    for (j, private_key_2) in private_keys.iter().enumerate() {
1704                        if i != j {
1705                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1706                        }
1707                    }
1708                    let certificate =
1709                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1710                    current_certificates.insert(certificate);
1711                }
1712                // Update the mapping.
1713                round_to_certificates_map.insert(round, current_certificates.clone());
1714                previous_certificates = current_certificates.clone();
1715            }
1716            (round_to_certificates_map, committee)
1717        };
1718
1719        // Initialize the ledger.
1720        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1721        // Initialize the storage.
1722        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1723        // Get the leaders for the next 2 commit rounds.
1724        let leader = committee.get_leader(commit_round).unwrap();
1725        let next_leader = committee.get_leader(next_round).unwrap();
1726        // Insert the pre shutdown certificates into the storage.
1727        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1728        for i in 1..=commit_round {
1729            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1730            if i == commit_round {
1731                // Only insert the leader certificate for the commit round.
1732                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1733                if let Some(c) = leader_certificate {
1734                    pre_shutdown_certificates.push(c.clone());
1735                }
1736                continue;
1737            }
1738            pre_shutdown_certificates.extend(certificates);
1739        }
1740        for certificate in pre_shutdown_certificates.iter() {
1741            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1742        }
1743        // Insert the post shutdown certificates into the storage.
1744        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1745            Vec::new();
1746        for j in commit_round..=commit_round + 2 {
1747            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1748            post_shutdown_certificates.extend(certificate);
1749        }
1750        for certificate in post_shutdown_certificates.iter() {
1751            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1752        }
1753        // Get the leader certificates.
1754        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1755        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1756
1757        // Initialize the BFT without bootup.
1758        let account = Account::new(rng)?;
1759        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1760
1761        // Insert a mock DAG in the BFT without bootup.
1762        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1763
1764        // Insert the certificates into the BFT without bootup.
1765        for certificate in pre_shutdown_certificates.clone() {
1766            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1767        }
1768
1769        // Insert the post shutdown certificates into the BFT without bootup.
1770        for certificate in post_shutdown_certificates.clone() {
1771            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1772        }
1773        // Commit the second leader certificate.
1774        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1775        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1776        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1777
1778        // Simulate a bootup of the BFT.
1779
1780        // Initialize a new instance of storage.
1781        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1782
1783        // Initialize a new instance of BFT with bootup.
1784        let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1785
1786        // Sync the BFT DAG at bootup.
1787        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1788
1789        // Insert the post shutdown certificates to the storage and BFT with bootup.
1790        for certificate in post_shutdown_certificates.iter() {
1791            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1792        }
1793        for certificate in post_shutdown_certificates.clone() {
1794            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1795        }
1796        // Commit the second leader certificate.
1797        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1798        let commit_subdag_metadata_bootup =
1799            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1800        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1801        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1802
1803        // Check that the final state of both BFTs is the same.
1804
1805        // Check that both BFTs start from the same last committed round.
1806        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1807
1808        // Ensure that both BFTs have committed the leader certificates.
1809        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1810        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1811        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1812        assert!(
1813            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1814        );
1815
1816        // Check that the bootup BFT has committed the pre shutdown certificates.
1817        for certificate in pre_shutdown_certificates.clone() {
1818            let certificate_round = certificate.round();
1819            let certificate_id = certificate.id();
1820            // Check that both BFTs have committed the certificates.
1821            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1822            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1823            // Check that the bootup BFT does not contain the certificates in its graph, because
1824            // it should not need to order them again in subsequent subdags.
1825            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1826            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1827        }
1828
1829        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1830        for certificate in committed_certificates_bootup.clone() {
1831            let certificate_round = certificate.round();
1832            let certificate_id = certificate.id();
1833            // Check that the both BFTs have committed the certificates.
1834            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1835            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1836            // Check that the bootup BFT does not contain the certificates in its graph, because
1837            // it should not need to order them again in subsequent subdags.
1838            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1839            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1840        }
1841
1842        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1843        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1844
1845        Ok(())
1846    }
1847
1848    #[tokio::test]
1849    #[tracing_test::traced_test]
1850    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1851        /*
1852        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1853        2. Add post shutdown certificates to the bootup BFT.
1854        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1855        */
1856
1857        let rng = &mut TestRng::default();
1858
1859        // Initialize the round parameters.
1860        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1861        let committee_round = 0;
1862        let commit_round = 2;
1863        let current_round = commit_round + 1;
1864        let next_round = current_round + 1;
1865
1866        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1867        let (round_to_certificates_map, committee) = {
1868            let private_keys = vec![
1869                PrivateKey::new(rng).unwrap(),
1870                PrivateKey::new(rng).unwrap(),
1871                PrivateKey::new(rng).unwrap(),
1872                PrivateKey::new(rng).unwrap(),
1873            ];
1874            let addresses = vec![
1875                Address::try_from(private_keys[0])?,
1876                Address::try_from(private_keys[1])?,
1877                Address::try_from(private_keys[2])?,
1878                Address::try_from(private_keys[3])?,
1879            ];
1880            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1881                committee_round,
1882                addresses,
1883                rng,
1884            );
1885            // Initialize a mapping from the round number to the set of batch certificates in the round.
1886            let mut round_to_certificates_map: IndexMap<
1887                u64,
1888                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1889            > = IndexMap::new();
1890            let mut previous_certificates = IndexSet::with_capacity(4);
1891            // Initialize the genesis batch certificates.
1892            for _ in 0..4 {
1893                previous_certificates.insert(sample_batch_certificate(rng));
1894            }
1895            for round in 0..=commit_round + 2 {
1896                let mut current_certificates = IndexSet::new();
1897                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1898                    IndexSet::new()
1899                } else {
1900                    previous_certificates.iter().map(|c| c.id()).collect()
1901                };
1902                let transmission_ids =
1903                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1904                        .into_iter()
1905                        .collect::<IndexSet<_>>();
1906                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1907                let committee_id = committee.id();
1908                for (i, private_key_1) in private_keys.iter().enumerate() {
1909                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1910                        private_key_1,
1911                        round,
1912                        timestamp,
1913                        committee_id,
1914                        transmission_ids.clone(),
1915                        previous_certificate_ids.clone(),
1916                        rng,
1917                    )
1918                    .unwrap();
1919                    let mut signatures = IndexSet::with_capacity(4);
1920                    for (j, private_key_2) in private_keys.iter().enumerate() {
1921                        if i != j {
1922                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1923                        }
1924                    }
1925                    let certificate =
1926                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1927                    current_certificates.insert(certificate);
1928                }
1929                // Update the mapping.
1930                round_to_certificates_map.insert(round, current_certificates.clone());
1931                previous_certificates = current_certificates.clone();
1932            }
1933            (round_to_certificates_map, committee)
1934        };
1935
1936        // Initialize the ledger.
1937        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1938        // Initialize the storage.
1939        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1940        // Get the leaders for the next 2 commit rounds.
1941        let leader = committee.get_leader(commit_round).unwrap();
1942        let next_leader = committee.get_leader(next_round).unwrap();
1943        // Insert the pre shutdown certificates into the storage.
1944        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1945        for i in 1..=commit_round {
1946            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1947            if i == commit_round {
1948                // Only insert the leader certificate for the commit round.
1949                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1950                if let Some(c) = leader_certificate {
1951                    pre_shutdown_certificates.push(c.clone());
1952                }
1953                continue;
1954            }
1955            pre_shutdown_certificates.extend(certificates);
1956        }
1957        for certificate in pre_shutdown_certificates.iter() {
1958            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1959        }
1960        // Initialize the bootup BFT.
1961        let account = Account::new(rng)?;
1962        let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1963
1964        // Insert a mock DAG in the BFT without bootup.
1965        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1966        // Sync the BFT DAG at bootup.
1967        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1968
1969        // Insert the post shutdown certificates into the storage.
1970        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1971            Vec::new();
1972        for j in commit_round..=commit_round + 2 {
1973            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1974            post_shutdown_certificates.extend(certificate);
1975        }
1976        for certificate in post_shutdown_certificates.iter() {
1977            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1978        }
1979
1980        // Insert the post shutdown certificates into the DAG.
1981        for certificate in post_shutdown_certificates.clone() {
1982            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1983        }
1984
1985        // Get the next leader certificate to commit.
1986        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1987        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1988        let committed_certificates = commit_subdag.values().flatten();
1989
1990        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1991        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1992            for committed_certificate in committed_certificates.clone() {
1993                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1994            }
1995        }
1996        Ok(())
1997    }
1998
1999    /// Tests that a leader certificate can be committed by sufficient endorsements in a succeeding leader certificate.
2000    #[test_log::test(tokio::test)]
2001    async fn test_commit_via_is_linked() {
2002        let rng = &mut TestRng::default();
2003
2004        let committee_round = 0;
2005        let leader_round_1 = 2;
2006        let leader_round_2 = 4; // subsequent even round
2007        let max_gc_rounds = 50;
2008
2009        // Create a committee with four members.
2010        let num_authors = 4;
2011        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2012        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2013
2014        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2015        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2016        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2017        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2018
2019        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2020
2021        // Round 1
2022        let round1_certs: IndexSet<_> = (0..num_authors)
2023            .map(|idx| {
2024                let author = &private_keys[idx];
2025                let endorsements: Vec<_> = private_keys
2026                    .iter()
2027                    .enumerate()
2028                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2029                    .collect();
2030
2031                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2032            })
2033            .collect();
2034        certificates_by_round.insert(1, round1_certs.clone());
2035
2036        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2037        let mut leader1_certificate = None;
2038
2039        let round2_certs: IndexSet<_> = (0..num_authors)
2040            .map(|idx| {
2041                let author = &private_keys[idx];
2042                let endorsements: Vec<_> = private_keys
2043                    .iter()
2044                    .enumerate()
2045                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2046                    .collect();
2047                let cert = sample_batch_certificate_for_round_with_committee(
2048                    leader_round_1,
2049                    round1_certs.iter().map(|c| c.id()).collect(),
2050                    author,
2051                    &endorsements[..],
2052                    rng,
2053                );
2054
2055                if cert.author() == leader1 {
2056                    leader1_certificate = Some(cert.clone());
2057                }
2058                cert
2059            })
2060            .collect();
2061        certificates_by_round.insert(leader_round_1, round2_certs.clone());
2062
2063        let round3_certs: IndexSet<_> = (0..num_authors)
2064            .map(|idx| {
2065                let author = &private_keys[idx];
2066                let endorsements: Vec<_> = private_keys
2067                    .iter()
2068                    .enumerate()
2069                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2070                    .collect();
2071
2072                let previous_certificate_ids: IndexSet<_> = round2_certs
2073                    .iter()
2074                    .filter_map(|cert| {
2075                        // Only have the leader endorse the previous round's leader certificate.
2076                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2077                    })
2078                    .collect();
2079
2080                sample_batch_certificate_for_round_with_committee(
2081                    leader_round_1 + 1,
2082                    previous_certificate_ids,
2083                    author,
2084                    &endorsements[..],
2085                    rng,
2086                )
2087            })
2088            .collect();
2089        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2090
2091        // Ensure the first leader's certificate is not committed yet.
2092        let leader_certificate_1 = leader1_certificate.unwrap();
2093        assert!(
2094            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2095            "Leader certificate 1 should not be committed yet"
2096        );
2097        assert_eq!(bft.dag.read().last_committed_round(), 0);
2098
2099        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2100        let round4_certs: IndexSet<_> = (0..num_authors)
2101            .map(|idx| {
2102                let endorsements: Vec<_> = private_keys
2103                    .iter()
2104                    .enumerate()
2105                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2106                    .collect();
2107
2108                sample_batch_certificate_for_round_with_committee(
2109                    leader_round_2,
2110                    round3_certs.iter().map(|c| c.id()).collect(),
2111                    &private_keys[idx],
2112                    &endorsements[..],
2113                    rng,
2114                )
2115            })
2116            .collect();
2117        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2118
2119        // Insert all certificates into the storage and DAG.
2120        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2121            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2122            bft.update_dag::<false, false>(certificate).await.unwrap();
2123        }
2124
2125        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2126
2127        assert!(
2128            bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2129            "Leader certificate 1 should be linked to leader certificate 2"
2130        );
2131
2132        // Explicitely commit leader certificate 2.
2133        bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2134
2135        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2136        assert!(
2137            bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2138            "Leader certificate for round 2 should be committed when committing at round 4"
2139        );
2140
2141        // Leader certificate 2 should be committed as the above call was successful.
2142        assert!(
2143            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2144            "Leader certificate for round 4 should be committed"
2145        );
2146
2147        assert_eq!(bft.dag.read().last_committed_round(), 4);
2148    }
2149
2150    #[test_log::test(tokio::test)]
2151    async fn test_commit_via_is_linked_with_skipped_anchor() {
2152        let rng = &mut TestRng::default();
2153
2154        let committee_round = 0;
2155        let leader_round_1 = 2;
2156        let leader_round_2 = 4;
2157        let max_gc_rounds = 50;
2158
2159        let num_authors = 4;
2160        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2161        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2162
2163        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2164        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2165        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2166        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2167
2168        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2169
2170        // Round 1
2171        let round1_certs: IndexSet<_> = (0..num_authors)
2172            .map(|idx| {
2173                let author = &private_keys[idx];
2174                let endorsements: Vec<_> = private_keys
2175                    .iter()
2176                    .enumerate()
2177                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2178                    .collect();
2179
2180                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2181            })
2182            .collect();
2183        certificates_by_round.insert(1, round1_certs.clone());
2184
2185        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2186        let mut leader1_certificate = None;
2187
2188        let round2_certs: IndexSet<_> = (0..num_authors)
2189            .map(|idx| {
2190                let author = &private_keys[idx];
2191                let endorsements: Vec<_> = private_keys
2192                    .iter()
2193                    .enumerate()
2194                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2195                    .collect();
2196                let cert = sample_batch_certificate_for_round_with_committee(
2197                    leader_round_1,
2198                    round1_certs.iter().map(|c| c.id()).collect(),
2199                    author,
2200                    &endorsements[..],
2201                    rng,
2202                );
2203
2204                if cert.author() == leader1 {
2205                    leader1_certificate = Some(cert.clone());
2206                }
2207                cert
2208            })
2209            .collect();
2210        certificates_by_round.insert(leader_round_1, round2_certs.clone());
2211
2212        let round3_certs: IndexSet<_> = (0..num_authors)
2213            .map(|idx| {
2214                let author = &private_keys[idx];
2215                let endorsements: Vec<_> = private_keys
2216                    .iter()
2217                    .enumerate()
2218                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2219                    .collect();
2220
2221                let previous_certificate_ids: IndexSet<_> = round2_certs
2222                    .iter()
2223                    .filter_map(|cert| {
2224                        // Only have the leader endorse the previous round's leader certificate.
2225                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2226                    })
2227                    .collect();
2228
2229                sample_batch_certificate_for_round_with_committee(
2230                    leader_round_1 + 1,
2231                    previous_certificate_ids,
2232                    author,
2233                    &endorsements[..],
2234                    rng,
2235                )
2236            })
2237            .collect();
2238        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2239
2240        // Ensure the first leader's certificate is not committed yet.
2241        let leader_certificate_1 = leader1_certificate.unwrap();
2242        assert!(
2243            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2244            "Leader certificate 1 should not be committed yet"
2245        );
2246
2247        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2248        let round4_certs: IndexSet<_> = (0..num_authors)
2249            .map(|idx| {
2250                let endorsements: Vec<_> = private_keys
2251                    .iter()
2252                    .enumerate()
2253                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2254                    .collect();
2255
2256                // Do not create a path to the previous leader certificate.
2257                let previous_certificate_ids: IndexSet<_> = round3_certs
2258                    .iter()
2259                    .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2260                    .collect();
2261
2262                sample_batch_certificate_for_round_with_committee(
2263                    leader_round_2,
2264                    previous_certificate_ids,
2265                    &private_keys[idx],
2266                    &endorsements[..],
2267                    rng,
2268                )
2269            })
2270            .collect();
2271        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2272
2273        // Insert all certificates into the storage and DAG.
2274        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2275            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2276            bft.update_dag::<false, false>(certificate).await.unwrap();
2277        }
2278
2279        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2280
2281        assert!(
2282            !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2283            "Leader certificate 1 should not be linked to leader certificate 2"
2284        );
2285        assert_eq!(bft.dag.read().last_committed_round(), 0);
2286
2287        // Explicitely commit leader certificate 2.
2288        bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2289
2290        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2291        assert!(
2292            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2293            "Leader certificate for round 2 should not be committed when committing at round 4"
2294        );
2295
2296        // Leader certificate 2 should be committed as the above call was successful.
2297        assert!(
2298            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2299            "Leader certificate for round 4 should be committed"
2300        );
2301        assert_eq!(bft.dag.read().last_committed_round(), 4);
2302    }
2303}