Skip to main content

snarkos_node_bft/
bft.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY,
18    helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
19    primary::{Primary, PrimaryCallback},
20    sync::SyncCallback,
21};
22
23use snarkos_account::Account;
24use snarkos_node_bft_ledger_service::LedgerService;
25use snarkos_node_sync::{BlockSync, Ping};
26use snarkos_utilities::NodeDataDir;
27
28use snarkvm::{
29    console::account::Address,
30    ledger::{
31        block::Transaction,
32        committee::Committee,
33        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
34        puzzle::{Solution, SolutionID},
35    },
36    prelude::{Field, Network, Result, bail, ensure},
37    utilities::flatten_error,
38};
39
40use anyhow::Context;
41use colored::Colorize;
42use indexmap::{IndexMap, IndexSet};
43#[cfg(feature = "locktick")]
44use locktick::parking_lot::RwLock;
45#[cfg(feature = "locktick")]
46use locktick::tokio::Mutex;
47#[cfg(not(feature = "locktick"))]
48use parking_lot::RwLock;
49use std::{
50    collections::{BTreeMap, HashSet},
51    net::SocketAddr,
52    sync::{
53        Arc,
54        atomic::{AtomicI64, Ordering},
55    },
56};
57#[cfg(not(feature = "locktick"))]
58use tokio::sync::Mutex;
59use tokio::sync::{OnceCell, oneshot};
60
61#[derive(Clone)]
62pub struct BFT<N: Network> {
63    /// The primary for this node.
64    primary: Primary<N>,
65    /// The DAG of batches from which we build the blockchain.
66    dag: Arc<RwLock<DAG<N>>>,
67    /// The batch certificate of the leader from the current even round, if one was present.
68    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
69    /// The timer for the leader certificate to be received.
70    leader_certificate_timer: Arc<AtomicI64>,
71    /// The consensus sender.
72    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73    /// Ensures only one call to `commit_leader_certificate` runs at a time.
74    ///
75    /// Without this, a second certificate crossing the availability threshold while the consensus
76    /// callback for a prior commit is still in-flight would re-walk already-committed rounds
77    /// (because `last_committed_round` hasn't been updated yet), causing duplicate subdag commits.
78    commit_lock: Arc<Mutex<()>>,
79}
80
81impl<N: Network> BFT<N> {
82    /// Initializes a new instance of the BFT.
83    #[allow(clippy::too_many_arguments)]
84    pub fn new(
85        account: Account<N>,
86        storage: Storage<N>,
87        ledger: Arc<dyn LedgerService<N>>,
88        block_sync: Arc<BlockSync<N>>,
89        ip: Option<SocketAddr>,
90        trusted_validators: &[SocketAddr],
91        trusted_peers_only: bool,
92        node_data_dir: NodeDataDir,
93        dev: Option<u16>,
94    ) -> Result<Self> {
95        Ok(Self {
96            primary: Primary::new(
97                account,
98                storage,
99                ledger,
100                block_sync,
101                ip,
102                trusted_validators,
103                trusted_peers_only,
104                node_data_dir,
105                dev,
106            )?,
107            dag: Default::default(),
108            leader_certificate: Default::default(),
109            leader_certificate_timer: Default::default(),
110            consensus_sender: Default::default(),
111            commit_lock: Default::default(),
112        })
113    }
114
115    /// Run the BFT instance.
116    ///
117    /// This will return as soon as all required tasks are spawned.
118    /// The function must not be called more than once per instance.
119    pub async fn run(
120        &mut self,
121        ping: Option<Arc<Ping<N>>>,
122        consensus_sender: Option<ConsensusSender<N>>,
123        primary_sender: PrimarySender<N>,
124        primary_receiver: PrimaryReceiver<N>,
125    ) -> Result<()> {
126        info!("Starting the BFT instance...");
127        // Set up callbacks.
128        let primary_callback = Some(Arc::new(self.clone()) as Arc<dyn PrimaryCallback<N>>);
129
130        let sync_callback = Some(Arc::new(self.clone()) as Arc<dyn SyncCallback<N>>);
131
132        // Next, run the primary instance.
133        self.primary.run(ping, primary_callback, sync_callback, primary_sender, primary_receiver).await?;
134
135        // Lastly, set the consensus sender.
136        // Note: This ensures that the BFT does not advance the ledger during initial syncing.
137        if let Some(consensus_sender) = consensus_sender {
138            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
139        }
140        Ok(())
141    }
142
143    /// Returns `true` if the primary is synced.
144    pub fn is_synced(&self) -> bool {
145        self.primary.is_synced()
146    }
147
148    /// Returns the primary.
149    pub const fn primary(&self) -> &Primary<N> {
150        &self.primary
151    }
152
153    /// Returns the storage.
154    pub const fn storage(&self) -> &Storage<N> {
155        self.primary.storage()
156    }
157
158    /// Returns the ledger.
159    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
160        self.primary.ledger()
161    }
162
163    /// Returns the leader of the current even round, if one was present.
164    pub fn leader(&self) -> Option<Address<N>> {
165        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
166    }
167
168    /// Returns the certificate of the leader from the current even round, if one was present.
169    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
170        &self.leader_certificate
171    }
172}
173
174impl<N: Network> BFT<N> {
175    /// Returns the number of unconfirmed transmissions.
176    pub fn num_unconfirmed_transmissions(&self) -> usize {
177        self.primary.num_unconfirmed_transmissions()
178    }
179
180    /// Returns the number of unconfirmed ratifications.
181    pub fn num_unconfirmed_ratifications(&self) -> usize {
182        self.primary.num_unconfirmed_ratifications()
183    }
184
185    /// Returns the number of solutions.
186    pub fn num_unconfirmed_solutions(&self) -> usize {
187        self.primary.num_unconfirmed_solutions()
188    }
189
190    /// Returns the number of unconfirmed transactions.
191    pub fn num_unconfirmed_transactions(&self) -> usize {
192        self.primary.num_unconfirmed_transactions()
193    }
194}
195
196impl<N: Network> BFT<N> {
197    /// Returns the worker transmission IDs.
198    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
199        self.primary.worker_transmission_ids()
200    }
201
202    /// Returns the worker transmissions.
203    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
204        self.primary.worker_transmissions()
205    }
206
207    /// Returns the worker solutions.
208    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
209        self.primary.worker_solutions()
210    }
211
212    /// Returns the worker transactions.
213    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
214        self.primary.worker_transactions()
215    }
216}
217
218#[async_trait::async_trait]
219impl<N: Network> PrimaryCallback<N> for BFT<N> {
220    /// Notification that a new round has started.
221    ///
222    /// # Arguments
223    /// * `current_round` - the round the caller is in (to avoid race conditions)
224    ///
225    /// # Returns
226    /// `true` if the BFT moved to the next round.
227    fn try_advance_to_next_round(&self, current_round: u64) -> bool {
228        // Ensure the current round is at least the storage round (this is a sanity check).
229        let storage_round = self.storage().current_round();
230        if current_round < storage_round {
231            debug!(
232                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
233            );
234            return false;
235        }
236
237        // Determine if the BFT is ready to update to the next round.
238        let is_ready = match current_round.is_multiple_of(2) {
239            true => self.update_leader_certificate_to_even_round(current_round),
240            false => self.is_leader_quorum_or_nonleaders_available(current_round),
241        };
242
243        #[cfg(feature = "metrics")]
244        {
245            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
246            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
247            if start > 0 {
248                let end = now();
249                let elapsed = std::time::Duration::from_secs((end - start) as u64);
250                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
251            }
252        }
253
254        // Log whether the round is going to update.
255        if current_round.is_multiple_of(2) {
256            // Determine if there is a leader certificate.
257            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
258                // Ensure the state of the leader certificate is consistent with the BFT being ready.
259                if !is_ready {
260                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
261                }
262                // Log the leader election.
263                let leader_round = leader_certificate.round();
264                match leader_round == current_round {
265                    true => {
266                        info!("Round {current_round} elected a leader - {}", leader_certificate.author());
267                        #[cfg(feature = "metrics")]
268                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
269                    }
270                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
271                }
272            } else {
273                match is_ready {
274                    true => info!("Round {current_round} reached quorum without a leader"),
275                    false => info!("{}", format!("Round {current_round} did not elect a leader (yet)").dimmed()),
276                }
277            }
278        }
279
280        // If the BFT is ready, then update to the next round.
281        if is_ready {
282            // Update to the next round in storage.
283            if let Err(err) = self
284                .storage()
285                .increment_to_next_round(current_round)
286                .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
287            {
288                warn!("{}", &flatten_error(err));
289                return false;
290            }
291            // Update the timer for the leader certificate.
292            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
293        }
294
295        is_ready
296    }
297
298    /// Notification about a new certificated generated by `Primary` or received by the `Primary` from a peer.
299    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
300        // ### First, insert the certificate into the DAG. ###
301        // Retrieve the round of the new certificate to add to the DAG.
302        let certificate_round = certificate.round();
303
304        // Insert the certificate into the DAG.
305        self.dag.write().insert(certificate);
306
307        // ### Second, determine if a new leader certificate can be committed. ###
308        let commit_round = certificate_round.saturating_sub(1);
309
310        // Leaders are elected in even rounds.
311        // If the previous round is odd, the current round cannot commit any leader certs.
312        // Similarly, no leader certificate can be committed for round zero.
313        if !commit_round.is_multiple_of(2) || commit_round < 2 {
314            return Ok(());
315        }
316        // If the commit round is at or below the last committed round, return early.
317        if commit_round <= self.dag.read().last_committed_round() {
318            return Ok(());
319        }
320
321        /* Proceeding to check if the leader is ready to be committed. */
322        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
323
324        // Retrieve the committee lookback for the commit round.
325        let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
326            format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
327        })?;
328
329        // Either retrieve the cached leader or compute it.
330        let leader = match self.ledger().latest_leader() {
331            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
332            _ => {
333                // Compute the leader for the commit round.
334                let computed_leader = committee_lookback
335                    .get_leader(commit_round)
336                    .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
337
338                // Cache the computed leader.
339                self.ledger().update_latest_leader(commit_round, computed_leader);
340
341                computed_leader
342            }
343        };
344
345        // Retrieve the leader certificate for the commit round.
346        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
347        else {
348            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
349            return Ok(());
350        };
351        // Retrieve all of the certificates for the **certificate** round.
352        let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
353            format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
354        })?;
355
356        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
357        let certificate_committee_lookback =
358            self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
359                format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
360            })?;
361
362        // Construct a set over the authors who included the leader's certificate in the certificate round.
363        let authors = certificates
364            .values()
365            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
366                true => Some(c.author()),
367                false => None,
368            })
369            .collect();
370
371        // Check if the leader is ready to be committed.
372        if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
373            info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
374            self.commit_leader_certificate(leader_certificate).await
375        } else {
376            trace!(
377                "BFT is not ready to commit round {commit_round} with leader '{}' - Availability threshold has not been reached yet",
378                fmt_id(leader)
379            );
380            Ok(())
381        }
382    }
383}
384
385#[async_trait::async_trait]
386impl<N: Network> SyncCallback<N> for BFT<N> {
387    // Notification about a new certificate detected by the `Sync` instance after fetching a new block.
388    fn add_certificate_from_sync(&self, certificate: BatchCertificate<N>) {
389        self.dag.write().insert(certificate);
390    }
391
392    // Commits a certificate into the DAG.
393    fn commit_certificate_from_sync(&self, certificate: &BatchCertificate<N>) {
394        self.dag.write().commit(certificate, self.storage().max_gc_rounds());
395    }
396}
397
398impl<N: Network> BFT<N> {
399    /// Updates the leader certificate to the current even round,
400    /// returning `true` if the BFT is ready to update to the next round.
401    ///
402    /// This method runs on every even round, by determining the leader of the current even round,
403    /// and setting the leader certificate to their certificate in the round, if they were present.
404    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
405        // Retrieve the current round.
406        let current_round = self.storage().current_round();
407        // Ensure the current round matches the given round.
408        if current_round != even_round {
409            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
410            return false;
411        }
412
413        // If the current round is odd, return false.
414        if !current_round.is_multiple_of(2) || current_round < 2 {
415            error!("BFT cannot update the leader certificate in an odd round");
416            return false;
417        }
418
419        // Retrieve the certificates for the current round.
420        let current_certificates = self.storage().get_certificates_for_round(current_round);
421        // If there are no current certificates, set the leader certificate to 'None', and return early.
422        if current_certificates.is_empty() {
423            // Set the leader certificate to 'None'.
424            *self.leader_certificate.write() = None;
425            return false;
426        }
427
428        // Retrieve the committee lookback of the current round.
429        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
430            Ok(committee) => committee,
431            Err(err) => {
432                let err = err.context(format!(
433                    "BFT failed to retrieve the committee lookback for the even round {current_round}"
434                ));
435                warn!("{}", &flatten_error(err));
436                return false;
437            }
438        };
439        // Determine the leader of the current round.
440        let leader = match self.ledger().latest_leader() {
441            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
442            _ => {
443                // Compute the leader for the current round.
444                let computed_leader = match committee_lookback.get_leader(current_round) {
445                    Ok(leader) => leader,
446                    Err(err) => {
447                        let err =
448                            err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
449                        error!("{}", &flatten_error(err));
450                        return false;
451                    }
452                };
453
454                // Cache the computed leader.
455                self.ledger().update_latest_leader(current_round, computed_leader);
456
457                computed_leader
458            }
459        };
460        // Find and set the leader certificate, if the leader was present in the current even round.
461        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
462        *self.leader_certificate.write() = leader_certificate.cloned();
463
464        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
465    }
466
467    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
468    ///  - If the leader certificate is set for the current even round.
469    ///  - The timer for the leader certificate has expired.
470    fn is_even_round_ready_for_next_round(
471        &self,
472        certificates: IndexSet<BatchCertificate<N>>,
473        committee: Committee<N>,
474        current_round: u64,
475    ) -> bool {
476        // Retrieve the authors for the current round.
477        let authors = certificates.into_iter().map(|c| c.author()).collect();
478        // Check if quorum threshold is reached.
479        if !committee.is_quorum_threshold_reached(&authors) {
480            trace!("BFT failed to reach quorum threshold in even round {current_round}");
481            return false;
482        }
483        // If the leader certificate is set for the current even round, return 'true'.
484        if let Some(leader_certificate) = self.leader_certificate.read().as_ref()
485            && leader_certificate.round() == current_round
486        {
487            return true;
488        }
489        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
490        if self.is_timer_expired() {
491            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
492            return true;
493        }
494        // Otherwise, return 'false'.
495        false
496    }
497
498    /// Returns `true` if the timer for the leader certificate has expired.
499    ///
500    /// This is always true for a new BFT instance.
501    fn is_timer_expired(&self) -> bool {
502        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now()
503    }
504
505    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
506    ///  - The leader certificate is `None`.
507    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
508    ///  - The leader certificate timer has expired.
509    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
510        // Retrieve the current round.
511        let current_round = self.storage().current_round();
512        // Ensure the current round matches the given round.
513        if current_round != odd_round {
514            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
515            return false;
516        }
517        // If the current round is even, return false.
518        if current_round % 2 != 1 {
519            error!("BFT does not compute stakes for the leader certificate in an even round");
520            return false;
521        }
522        // Retrieve the certificates for the current round.
523        let current_certificates = self.storage().get_certificates_for_round(current_round);
524        // Retrieve the committee lookback for the current round.
525        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
526            Ok(committee) => committee,
527            Err(err) => {
528                let err = err.context(format!(
529                    "BFT failed to retrieve the committee lookback for the odd round {current_round}"
530                ));
531                error!("{}", &flatten_error(err));
532                return false;
533            }
534        };
535        // Retrieve the authors of the current certificates.
536        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
537        // Check if quorum threshold is reached.
538        if !committee_lookback.is_quorum_threshold_reached(&authors) {
539            trace!("BFT failed reach quorum threshold in odd round {current_round}.");
540            return false;
541        }
542        // Retrieve the leader certificate.
543        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
544            // If there is no leader certificate for the previous round, return 'true'.
545            return true;
546        };
547        // Compute the stake for the leader certificate.
548        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
549            leader_certificate.id(),
550            current_certificates,
551            &committee_lookback,
552        );
553        // Return 'true' if any of the following conditions hold:
554        stake_with_leader >= committee_lookback.availability_threshold()
555            || stake_without_leader >= committee_lookback.quorum_threshold()
556            || self.is_timer_expired()
557    }
558
559    /// Computes the amount of stake that has & has not signed for the leader certificate.
560    fn compute_stake_for_leader_certificate(
561        &self,
562        leader_certificate_id: Field<N>,
563        current_certificates: IndexSet<BatchCertificate<N>>,
564        current_committee: &Committee<N>,
565    ) -> (u64, u64) {
566        // If there are no current certificates, return early.
567        if current_certificates.is_empty() {
568            return (0, 0);
569        }
570
571        // Initialize a tracker for the stake with the leader.
572        let mut stake_with_leader = 0u64;
573        // Initialize a tracker for the stake without the leader.
574        let mut stake_without_leader = 0u64;
575        // Iterate over the current certificates.
576        for certificate in current_certificates {
577            // Retrieve the stake for the author of the certificate.
578            let stake = current_committee.get_stake(certificate.author());
579            // Determine if the certificate includes the leader.
580            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
581                // If the certificate includes the leader, add the stake to the stake with the leader.
582                true => stake_with_leader = stake_with_leader.saturating_add(stake),
583                // If the certificate does not include the leader, add the stake to the stake without the leader.
584                false => stake_without_leader = stake_without_leader.saturating_add(stake),
585            }
586        }
587        // Return the stake with the leader, and the stake without the leader.
588        (stake_with_leader, stake_without_leader)
589    }
590}
591
592impl<N: Network> BFT<N> {
593    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
594    async fn commit_leader_certificate(&self, leader_certificate: BatchCertificate<N>) -> Result<()> {
595        #[cfg(feature = "metrics")]
596        let start = std::time::Instant::now();
597        #[cfg(debug_assertions)]
598        trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
599
600        // Serialize all commits so that `last_committed_round` is up-to-date before the next call
601        // re-walks the DAG, preventing duplicate subdag commits.
602        let _commit_guard = self.commit_lock.lock().await;
603
604        // Fetch the leader round.
605        let latest_leader_round = leader_certificate.round();
606
607        // Determine the list of all previous leader certificates since the last committed round.
608        // The order of the leader certificates is from **newest** to **oldest**.
609        let mut leader_certificates = vec![leader_certificate.clone()];
610        // Whether the consensus callback should be skipped (true when the round is already committed).
611        // When `latest_leader_round == last_committed_round` the round was already committed by a
612        // concurrent call that beat us to the lock, or by a prior session whose DAG state was
613        // reconstructed without populating `recently_committed`.  In both cases we still re-run
614        // DFS + GC to ensure `recently_committed` and `gc_round` are populated, but we must NOT
615        // send a duplicate subdag to the consensus callback.
616        let skip_consensus;
617        {
618            // Read-lock the DAG.
619            // We need to hold the lock, so we do not later fail to re-acquire it.
620            let dag = self.dag.read();
621
622            // Re-check under the lock: another call may have committed this round while we were waiting.
623            if latest_leader_round < dag.last_committed_round() {
624                trace!("Skipping already-committed leader round {latest_leader_round}");
625                return Ok(());
626            }
627            skip_consensus = latest_leader_round == dag.last_committed_round();
628
629            #[cfg(debug_assertions)]
630            trace!("Attempting to commit leader certificate for round {}...", latest_leader_round);
631
632            let mut current_certificate = leader_certificate;
633            for round in (dag.last_committed_round() + 2..=latest_leader_round.saturating_sub(2)).rev().step_by(2) {
634                // Retrieve the previous committee for the leader round.
635                let previous_committee_lookback =
636                    self.ledger().get_committee_lookback_for_round(round).with_context(|| {
637                        format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
638                    })?;
639
640                // Either retrieve the cached leader or compute it.
641                let leader = match self.ledger().latest_leader() {
642                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
643                    _ => {
644                        // Compute the leader for the commit round.
645                        let computed_leader = previous_committee_lookback
646                            .get_leader(round)
647                            .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
648
649                        // Cache the computed leader.
650                        self.ledger().update_latest_leader(round, computed_leader);
651
652                        computed_leader
653                    }
654                };
655                // Retrieve the previous leader certificate.
656                let Some(previous_certificate) = dag.get_certificate_for_round_with_author(round, leader) else {
657                    continue;
658                };
659                // Determine if there is a path between the previous certificate and the current certificate.
660                if dag.is_linked(previous_certificate.clone(), current_certificate.clone())? {
661                    // Add the previous leader certificate to the list of certificates to commit.
662                    leader_certificates.push(previous_certificate.clone());
663                    // Update the current certificate to the previous leader certificate.
664                    current_certificate = previous_certificate;
665                } else {
666                    #[cfg(debug_assertions)]
667                    trace!(
668                        "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
669                    );
670                }
671            }
672        }
673
674        // Iterate over the leader certificates to commit.
675        for leader_certificate in leader_certificates.into_iter().rev() {
676            // Retrieve the leader certificate round.
677            let leader_round = leader_certificate.round();
678            // Compute the commit subdag.
679            let commit_subdag =
680                self.order_dag_with_dfs(leader_certificate).with_context(|| "BFT failed to order the DAG with DFS")?;
681            // Initialize a map for the deduped transmissions.
682            let mut transmissions = IndexMap::new();
683            // Initialize a map for the deduped transaction ids.
684            let mut seen_transaction_ids = IndexSet::new();
685            // Initialize a map for the deduped solution ids.
686            let mut seen_solution_ids = IndexSet::new();
687            // Start from the oldest leader certificate.
688            for certificate in commit_subdag.values().flatten() {
689                // Retrieve the transmissions.
690                for transmission_id in certificate.transmission_ids() {
691                    // If the transaction ID or solution ID already exists in the map, skip it.
692                    // Note: This additional check is done to ensure that we do not include duplicate
693                    // transaction IDs or solution IDs that may have a different transmission ID.
694                    match transmission_id {
695                        TransmissionID::Solution(solution_id, _) => {
696                            // If the solution already exists, skip it.
697                            if seen_solution_ids.contains(&solution_id) {
698                                continue;
699                            }
700                        }
701                        TransmissionID::Transaction(transaction_id, _) => {
702                            // If the transaction already exists, skip it.
703                            if seen_transaction_ids.contains(transaction_id) {
704                                continue;
705                            }
706                        }
707                        TransmissionID::Ratification => {
708                            bail!("Ratifications are currently not supported in the BFT.")
709                        }
710                    }
711                    // If the transmission already exists in the map, skip it.
712                    if transmissions.contains_key(transmission_id) {
713                        continue;
714                    }
715                    // If the transmission already exists in the ledger, skip it.
716                    // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
717                    if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
718                        continue;
719                    }
720                    // Retrieve the transmission.
721                    // Note: If this fails, we have to fetch the block/certificate again. As there is no logic here to sync the transmission.
722                    let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
723                        format!(
724                            "BFT failed to retrieve transmission '{}.{}' from round {}",
725                            fmt_id(transmission_id),
726                            fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
727                            certificate.round()
728                        )
729                    })?;
730                    // Insert the transaction ID or solution ID into the map.
731                    match transmission_id {
732                        TransmissionID::Solution(id, _) => {
733                            seen_solution_ids.insert(id);
734                        }
735                        TransmissionID::Transaction(id, _) => {
736                            seen_transaction_ids.insert(id);
737                        }
738                        TransmissionID::Ratification => {}
739                    }
740                    // Add the transmission to the set.
741                    transmissions.insert(*transmission_id, transmission);
742                }
743            }
744            // Trigger consensus, as this will build a new block for the ledger.
745            // Construct the subdag.
746            let subdag = Subdag::from(commit_subdag.clone())?;
747            // Retrieve the anchor round.
748            let anchor_round = subdag.anchor_round();
749            // Retrieve the number of transmissions.
750            let num_transmissions = transmissions.len();
751            // Retrieve metadata about the subdag.
752            let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
753
754            // Ensure the subdag anchor round matches the leader round.
755            ensure!(
756                anchor_round == leader_round,
757                "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
758            );
759
760            // Trigger consensus (skipped if the round was already committed by a prior call).
761            if !skip_consensus {
762                if let Some(consensus_sender) = self.consensus_sender.get() {
763                    // Initialize a callback sender and receiver.
764                    let (callback_sender, callback_receiver) = oneshot::channel();
765                    // Send the subdag and transmissions to consensus.
766                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
767                    // Await the callback to continue.
768                    match callback_receiver.await {
769                        Ok(Ok(_)) => (),
770                        Ok(Err(err)) => {
771                            let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
772                            error!("{}", &flatten_error(err));
773                            return Ok(());
774                        }
775                        Err(err) => {
776                            let err: anyhow::Error = err.into();
777                            let err =
778                                err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
779                            error!("{}", flatten_error(err));
780                            return Ok(());
781                        }
782                    }
783                }
784            }
785
786            info!(
787                "Committing a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?}",
788            );
789
790            // Update the DAG, as the subdag was successfully included into a block.
791            {
792                let mut dag_write = self.dag.write();
793                let mut count = 0;
794                for certificate in commit_subdag.values().flatten() {
795                    dag_write.commit(certificate, self.storage().max_gc_rounds());
796                    count += 1;
797                }
798
799                trace!("Committed {count} certificates to the DAG");
800            }
801
802            // Update the validator telemetry.
803            #[cfg(feature = "telemetry")]
804            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
805        }
806
807        // Perform garbage collection based on the latest committed leader round.
808        // The protocol guarantees that validators commit the same anchors in the same order,
809        // but they may do so in different chunks of anchors,
810        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
811        // Doing garbage collection at the end of each chunk (as we do here),
812        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
813        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
814        // one validator may have more certificates than the other, not yet garbage collected.
815        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
816        // it excludes certificates that are below the GC round,
817        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
818        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
819        // so long as garbage collection is done after each chunk.
820        // If garbage collection were done after each committed certificate,
821        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
822        self.storage()
823            .garbage_collect_certificates(latest_leader_round)
824            .with_context(|| "BFT failed to garbage collect certificates")?;
825
826        #[cfg(feature = "metrics")]
827        metrics::histogram(metrics::bft::COMMIT_LEADER_CERTIFICATE_LATENCY, start.elapsed().as_secs_f64());
828        Ok(())
829    }
830
831    /// Returns the subdag of batch certificates to commit.
832    fn order_dag_with_dfs(
833        &self,
834        leader_certificate: BatchCertificate<N>,
835    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
836        // Initialize a map for the certificates to commit.
837        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
838        // Initialize a set for the already ordered certificates.
839        let mut already_ordered = HashSet::new();
840        // Initialize a buffer for the certificates to order.
841        let mut buffer = vec![leader_certificate];
842        // Iterate over the certificates to order.
843        while let Some(certificate) = buffer.pop() {
844            // Insert the certificate into the map.
845            commit.entry(certificate.round()).or_default().insert(certificate.clone());
846
847            // Check if the previous certificate is below the GC round.
848            // This is currently a critical check to prevent forking,
849            // as explained in the comment at the end of `commit_leader_certificate()`,
850            // just before the call to garbage collection.
851            let previous_round = certificate.round().saturating_sub(1);
852            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
853                continue;
854            }
855            // Iterate over the previous certificate IDs.
856            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
857            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
858            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
859                // If the previous certificate is already ordered, continue.
860                if already_ordered.contains(previous_certificate_id) {
861                    continue;
862                }
863                // If the previous certificate was recently committed, continue.
864                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
865                    continue;
866                }
867                // If the previous certificate already exists in the ledger, continue.
868                if self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
869                    continue;
870                }
871
872                // Retrieve the previous certificate.
873                let previous_certificate = {
874                    // Start by retrieving the previous certificate from the DAG.
875                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
876                        // If the previous certificate is found, return it.
877                        Some(previous_certificate) => previous_certificate,
878                        // If the previous certificate is not found, retrieve it from the storage.
879                        None => match self.storage().get_certificate(*previous_certificate_id) {
880                            // If the previous certificate is found, return it.
881                            Some(previous_certificate) => previous_certificate,
882                            // Otherwise, the previous certificate is missing, and throw an error.
883                            None => bail!(
884                                "Missing previous certificate {} for round {previous_round}",
885                                fmt_id(previous_certificate_id)
886                            ),
887                        },
888                    }
889                };
890                // Insert the previous certificate into the set of already ordered certificates.
891                already_ordered.insert(previous_certificate.id());
892                // Insert the previous certificate into the buffer.
893                buffer.push(previous_certificate);
894            }
895        }
896        // Ensure we only retain certificates that are above the GC round.
897        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
898        // Return the certificates to commit.
899        Ok(commit)
900    }
901
902    /// Shuts down the BFT.
903    pub async fn shut_down(&self) {
904        info!("Shutting down the BFT...");
905        // Shut down the primary.
906        self.primary.shut_down().await;
907    }
908}
909
910#[cfg(test)]
911impl<N: Network> BFT<N> {
912    /// Returns the latest round a leader certificate was committed to.
913    /// (only used in unit tests)
914    pub fn testing_only_latest_committed_round(&self) -> u64 {
915        self.dag.read().last_committed_round()
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use crate::{
922        BFT,
923        MAX_LEADER_CERTIFICATE_DELAY,
924        PrimaryCallback,
925        helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
926        sync::SyncCallback,
927    };
928
929    use snarkos_account::Account;
930    use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
931    use snarkos_node_bft_storage_service::BFTMemoryService;
932    use snarkos_node_network::ConnectionMode;
933    use snarkos_node_sync::BlockSync;
934    use snarkos_utilities::NodeDataDir;
935
936    use snarkvm::{
937        console::account::{Address, PrivateKey},
938        ledger::{
939            committee::{
940                Committee,
941                test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
942            },
943            narwhal::{
944                BatchCertificate,
945                batch_certificate::test_helpers::{
946                    sample_batch_certificate,
947                    sample_batch_certificate_for_round,
948                    sample_batch_certificate_for_round_with_committee,
949                },
950            },
951        },
952        utilities::TestRng,
953    };
954
955    use anyhow::Result;
956    use indexmap::{IndexMap, IndexSet};
957    use std::sync::Arc;
958
959    type CurrentNetwork = snarkvm::console::network::MainnetV0;
960
961    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
962    fn sample_test_instance(
963        committee_round: Option<u64>,
964        max_gc_rounds: u64,
965        rng: &mut TestRng,
966    ) -> (
967        Committee<CurrentNetwork>,
968        Account<CurrentNetwork>,
969        Arc<MockLedgerService<CurrentNetwork>>,
970        Storage<CurrentNetwork>,
971    ) {
972        let committee = match committee_round {
973            Some(round) => sample_committee_for_round(round, rng),
974            None => sample_committee(rng),
975        };
976        let account = Account::new(rng).unwrap();
977        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
978        let transmissions = Arc::new(BFTMemoryService::new());
979        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds).unwrap();
980
981        (committee, account, ledger, storage)
982    }
983
984    // Helper function to set up BFT for testing.
985    fn initialize_bft(
986        account: Account<CurrentNetwork>,
987        storage: Storage<CurrentNetwork>,
988        ledger: Arc<MockLedgerService<CurrentNetwork>>,
989    ) -> anyhow::Result<BFT<CurrentNetwork>> {
990        // Create the block synchronization logic.
991        let block_sync = Arc::new(BlockSync::new(ledger.clone(), ConnectionMode::Gateway));
992        // Initialize the BFT.
993        BFT::new(
994            account.clone(),
995            storage.clone(),
996            ledger.clone(),
997            block_sync,
998            None,
999            &[],
1000            false,
1001            NodeDataDir::new_test(None),
1002            None,
1003        )
1004    }
1005
1006    #[test]
1007    #[tracing_test::traced_test]
1008    fn test_is_leader_quorum_odd() -> Result<()> {
1009        let rng = &mut TestRng::default();
1010
1011        // Sample batch certificates.
1012        let mut certificates = IndexSet::new();
1013        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1014        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1015        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1016        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1017
1018        // Initialize the committee.
1019        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1020            1,
1021            vec![
1022                certificates[0].author(),
1023                certificates[1].author(),
1024                certificates[2].author(),
1025                certificates[3].author(),
1026            ],
1027            rng,
1028        );
1029
1030        // Initialize the ledger.
1031        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1032        // Initialize the storage.
1033        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
1034        // Initialize the account.
1035        let account = Account::new(rng)?;
1036        // Initialize the BFT.
1037        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1038        assert!(bft.is_timer_expired());
1039        // Ensure this call succeeds on an odd round.
1040        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1041        // If timer has expired but quorum threshold is not reached, return 'false'.
1042        assert!(!result);
1043        // Insert certificates into storage.
1044        for certificate in certificates.iter() {
1045            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1046        }
1047        // Ensure this call succeeds on an odd round.
1048        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1049        assert!(result); // no previous leader certificate
1050        // Set the leader certificate.
1051        let leader_certificate = sample_batch_certificate(rng);
1052        *bft.leader_certificate.write() = Some(leader_certificate);
1053        // Ensure this call succeeds on an odd round.
1054        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1055        assert!(result); // should now fall through to the end of function
1056
1057        Ok(())
1058    }
1059
1060    #[test]
1061    #[tracing_test::traced_test]
1062    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1063        let rng = &mut TestRng::default();
1064
1065        // Sample the test instance.
1066        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1067        assert_eq!(committee.starting_round(), 1);
1068        assert_eq!(storage.current_round(), 1);
1069        assert_eq!(storage.max_gc_rounds(), 10);
1070
1071        // Set up the BFT logic.
1072        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1073        assert!(bft.is_timer_expired());
1074
1075        // Store is at round 1, and we are checking for round 2.
1076        // Ensure this call fails on an even round.
1077        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1078        assert!(!result);
1079        Ok(())
1080    }
1081
1082    #[test]
1083    #[tracing_test::traced_test]
1084    fn test_is_leader_quorum_even() -> Result<()> {
1085        let rng = &mut TestRng::default();
1086
1087        // Sample the test instance.
1088        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1089        assert_eq!(committee.starting_round(), 2);
1090        assert_eq!(storage.current_round(), 2);
1091        assert_eq!(storage.max_gc_rounds(), 10);
1092
1093        // Set up the BFT logic.
1094        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1095        assert!(bft.is_timer_expired());
1096
1097        // Ensure this call fails on an even round.
1098        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1099        assert!(!result);
1100        Ok(())
1101    }
1102
1103    #[test]
1104    #[tracing_test::traced_test]
1105    fn test_is_even_round_ready() -> Result<()> {
1106        let rng = &mut TestRng::default();
1107
1108        // Sample batch certificates.
1109        let mut certificates = IndexSet::new();
1110        certificates.insert(sample_batch_certificate_for_round(2, rng));
1111        certificates.insert(sample_batch_certificate_for_round(2, rng));
1112        certificates.insert(sample_batch_certificate_for_round(2, rng));
1113        certificates.insert(sample_batch_certificate_for_round(2, rng));
1114
1115        // Initialize the committee.
1116        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1117            2,
1118            vec![
1119                certificates[0].author(),
1120                certificates[1].author(),
1121                certificates[2].author(),
1122                certificates[3].author(),
1123            ],
1124            rng,
1125        );
1126
1127        // Initialize the ledger.
1128        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1129        // Initialize the storage.
1130        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
1131        // Initialize the account.
1132        let account = Account::new(rng)?;
1133
1134        // Set up the BFT logic.
1135        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1136        assert!(bft.is_timer_expired());
1137
1138        // Set the leader certificate.
1139        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1140        *bft.leader_certificate.write() = Some(leader_certificate);
1141        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1142        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1143        assert!(!result);
1144        // Once quorum threshold is reached, we are ready for the next round.
1145        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1146        assert!(result);
1147
1148        // Initialize a new BFT.
1149        let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1150        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1151        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1152        if !bft_timer.is_timer_expired() {
1153            assert!(!result);
1154        }
1155        // Wait for the timer to expire.
1156        std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY);
1157        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1158        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1159        if bft_timer.is_timer_expired() {
1160            assert!(result);
1161        } else {
1162            assert!(!result);
1163        }
1164
1165        Ok(())
1166    }
1167
1168    #[test]
1169    #[tracing_test::traced_test]
1170    fn test_update_leader_certificate_odd() -> Result<()> {
1171        let rng = &mut TestRng::default();
1172
1173        // Sample the test instance.
1174        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1175        assert_eq!(storage.max_gc_rounds(), 10);
1176
1177        // Initialize the BFT.
1178        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1179        assert!(bft.is_timer_expired());
1180
1181        // Ensure this call fails on an odd round.
1182        let result = bft.update_leader_certificate_to_even_round(1);
1183        assert!(!result);
1184        Ok(())
1185    }
1186
1187    #[test]
1188    #[tracing_test::traced_test]
1189    fn test_update_leader_certificate_bad_round() -> Result<()> {
1190        let rng = &mut TestRng::default();
1191
1192        // Sample the test instance.
1193        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1194        assert_eq!(storage.max_gc_rounds(), 10);
1195
1196        // Initialize the BFT.
1197        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1198
1199        // Ensure this call succeeds on an even round.
1200        let result = bft.update_leader_certificate_to_even_round(6);
1201        assert!(!result);
1202        Ok(())
1203    }
1204
1205    #[test]
1206    #[tracing_test::traced_test]
1207    fn test_update_leader_certificate_even() -> Result<()> {
1208        let rng = &mut TestRng::default();
1209
1210        // Set the current round.
1211        let current_round = 3;
1212
1213        // Sample the certificates.
1214        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1215            current_round,
1216            rng,
1217        );
1218
1219        // Initialize the committee.
1220        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1221            2,
1222            vec![
1223                certificates[0].author(),
1224                certificates[1].author(),
1225                certificates[2].author(),
1226                certificates[3].author(),
1227            ],
1228            rng,
1229        );
1230
1231        // Initialize the ledger.
1232        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1233
1234        // Initialize the storage.
1235        let transmissions = Arc::new(BFTMemoryService::new());
1236        let storage = Storage::new(ledger.clone(), transmissions, 10).unwrap();
1237        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1238        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1239        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1240        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1241        assert_eq!(storage.current_round(), 2);
1242
1243        // Retrieve the leader certificate.
1244        let leader = committee.get_leader(2).unwrap();
1245        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1246
1247        // Initialize the BFT.
1248        let account = Account::new(rng)?;
1249        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1250
1251        // Set the leader certificate.
1252        *bft.leader_certificate.write() = Some(leader_certificate);
1253
1254        // Update the leader certificate.
1255        // Ensure this call succeeds on an even round.
1256        let result = bft.update_leader_certificate_to_even_round(2);
1257        assert!(result);
1258
1259        Ok(())
1260    }
1261
1262    #[tokio::test]
1263    #[tracing_test::traced_test]
1264    async fn test_order_dag_with_dfs() -> Result<()> {
1265        let rng = &mut TestRng::default();
1266
1267        // Sample the test instance.
1268        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1269
1270        // Initialize the round parameters.
1271        let previous_round = 2; // <- This must be an even number for the DAG structure expected below.
1272        let current_round = previous_round + 1;
1273
1274        // Sample the current certificate and previous certificates.
1275        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1276            current_round,
1277            rng,
1278        );
1279
1280        /* Test GC */
1281
1282        // Ensure the function succeeds in returning only certificates above GC.
1283        {
1284            // Initialize the storage.
1285            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
1286            // Initialize the BFT.
1287            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1288
1289            // Insert a mock DAG in the BFT.
1290            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1291
1292            // Insert the previous certificates into the BFT.
1293            for certificate in previous_certificates.clone() {
1294                bft.add_certificate_from_sync(certificate);
1295            }
1296
1297            // Ensure this call succeeds and returns all given certificates.
1298            let result = bft.order_dag_with_dfs(certificate.clone());
1299            assert!(result.is_ok());
1300            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1301            assert_eq!(candidate_certificates.len(), 1);
1302            let expected_certificates = vec![certificate.clone()];
1303            assert_eq!(
1304                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1305                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1306            );
1307            assert_eq!(candidate_certificates, expected_certificates);
1308        }
1309
1310        /* Test normal case */
1311
1312        // Ensure the function succeeds in returning all given certificates.
1313        {
1314            // Initialize the storage.
1315            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
1316            // Initialize the BFT.
1317            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1318
1319            // Insert a mock DAG in the BFT.
1320            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1321
1322            // Insert the previous certificates into the BFT.
1323            for certificate in previous_certificates.clone() {
1324                bft.add_certificate_from_sync(certificate);
1325            }
1326
1327            // Ensure this call succeeds and returns all given certificates.
1328            let result = bft.order_dag_with_dfs(certificate.clone());
1329            assert!(result.is_ok());
1330            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1331            assert_eq!(candidate_certificates.len(), 5);
1332            let expected_certificates = vec![
1333                previous_certificates[0].clone(),
1334                previous_certificates[1].clone(),
1335                previous_certificates[2].clone(),
1336                previous_certificates[3].clone(),
1337                certificate,
1338            ];
1339            assert_eq!(
1340                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1341                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1342            );
1343            assert_eq!(candidate_certificates, expected_certificates);
1344        }
1345
1346        Ok(())
1347    }
1348
1349    #[test]
1350    #[tracing_test::traced_test]
1351    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1352        let rng = &mut TestRng::default();
1353
1354        // Sample the test instance.
1355        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1356        assert_eq!(committee.starting_round(), 1);
1357        assert_eq!(storage.current_round(), 1);
1358        assert_eq!(storage.max_gc_rounds(), 1);
1359
1360        // Initialize the round parameters.
1361        let previous_round = 2; // <- This must be an even number for the DAG structure expected below.
1362        let current_round = previous_round + 1;
1363
1364        // Sample the current certificate and previous certificates.
1365        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1366            current_round,
1367            rng,
1368        );
1369        // Construct the previous certificate IDs.
1370        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1371
1372        /* Test missing previous certificate. */
1373
1374        // Initialize the BFT.
1375        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1376
1377        // The expected error message.
1378        let error_msg = format!(
1379            "Missing previous certificate {} for round {previous_round}",
1380            crate::helpers::fmt_id(previous_certificate_ids[3]),
1381        );
1382
1383        // Ensure this call fails on a missing previous certificate.
1384        let result = bft.order_dag_with_dfs(certificate);
1385        assert!(result.is_err());
1386        assert_eq!(result.unwrap_err().to_string(), error_msg);
1387        Ok(())
1388    }
1389
1390    #[tokio::test]
1391    async fn test_bft_gc_on_commit() -> Result<()> {
1392        let rng = &mut TestRng::default();
1393
1394        // Initialize the round parameters.
1395        let max_gc_rounds = 1;
1396        let committee_round = 0;
1397        let commit_round = 2;
1398        let current_round = commit_round + 1;
1399
1400        // Sample the certificates.
1401        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1402            current_round,
1403            rng,
1404        );
1405
1406        // Initialize the committee.
1407        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1408            committee_round,
1409            vec![
1410                certificates[0].author(),
1411                certificates[1].author(),
1412                certificates[2].author(),
1413                certificates[3].author(),
1414            ],
1415            rng,
1416        );
1417
1418        // Initialize the ledger.
1419        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1420
1421        // Initialize the storage.
1422        let transmissions = Arc::new(BFTMemoryService::new());
1423        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds).unwrap();
1424        // Insert the certificates into the storage.
1425        for certificate in certificates.iter() {
1426            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1427        }
1428
1429        // Get the leader certificate.
1430        let leader = committee.get_leader(commit_round).unwrap();
1431        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1432
1433        // Initialize the BFT.
1434        let account = Account::new(rng)?;
1435        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1436
1437        // Create an empty mock DAG with last committed round set to `commit_round`.
1438        *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1439
1440        // Ensure that the `gc_round` has not been updated yet.
1441        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1442
1443        // Insert the certificates into the BFT.
1444        for certificate in certificates {
1445            bft.add_certificate_from_sync(certificate);
1446        }
1447
1448        // Commit the leader certificate.
1449        bft.commit_leader_certificate(leader_certificate).await.unwrap();
1450
1451        // Ensure that the `gc_round` has been updated.
1452        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1453
1454        Ok(())
1455    }
1456
1457    #[tokio::test]
1458    #[tracing_test::traced_test]
1459    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1460        let rng = &mut TestRng::default();
1461
1462        // Initialize the round parameters.
1463        let max_gc_rounds = 1;
1464        let committee_round = 0;
1465        let commit_round = 2;
1466        let current_round = commit_round + 1;
1467
1468        // Sample the current certificate and previous certificates.
1469        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1470            current_round,
1471            rng,
1472        );
1473
1474        // Initialize the committee.
1475        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1476            committee_round,
1477            vec![
1478                certificates[0].author(),
1479                certificates[1].author(),
1480                certificates[2].author(),
1481                certificates[3].author(),
1482            ],
1483            rng,
1484        );
1485
1486        // Initialize the ledger.
1487        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1488
1489        // Initialize the storage.
1490        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1491        // Insert the certificates into the storage.
1492        for certificate in certificates.iter() {
1493            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1494        }
1495
1496        // Get the leader certificate.
1497        let leader = committee.get_leader(commit_round).unwrap();
1498        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1499
1500        // Initialize the BFT.
1501        let account = Account::new(rng)?;
1502        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1503
1504        // Insert a mock DAG in the BFT.
1505        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1506
1507        // Insert the previous certificates into the BFT.
1508        for certificate in certificates.clone() {
1509            bft.add_certificate_from_sync(certificate);
1510        }
1511
1512        // Commit the leader certificate.
1513        bft.commit_leader_certificate(leader_certificate.clone()).await.unwrap();
1514
1515        // Simulate a bootup of the BFT.
1516
1517        // Initialize a new instance of storage.
1518        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1519        // Initialize a new instance of BFT.
1520        let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1521
1522        // Sync the BFT DAG at bootup.
1523        for cert in certificates.iter() {
1524            bootup_bft.add_certificate_from_sync(cert.clone());
1525            bootup_bft.commit_certificate_from_sync(cert);
1526        }
1527
1528        // Check that the BFT starts from the same last committed round.
1529        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1530
1531        // Ensure that both BFTs have committed the leader certificate.
1532        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1533        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1534
1535        // Check the state of the bootup BFT.
1536        for certificate in certificates {
1537            let certificate_round = certificate.round();
1538            let certificate_id = certificate.id();
1539            // Check that the bootup BFT has committed the certificates.
1540            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1541            // Check that the bootup BFT does not contain the certificates in its graph, because
1542            // it should not need to order them again in subsequent subdags.
1543            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1544        }
1545
1546        Ok(())
1547    }
1548
1549    #[tokio::test]
1550    #[tracing_test::traced_test]
1551    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1552        /*
1553        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1554        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1555        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1556        */
1557
1558        let rng = &mut TestRng::default();
1559
1560        // Initialize the round parameters.
1561        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1562        let committee_round = 0;
1563        let commit_round = 2;
1564        let current_round = commit_round + 1;
1565        let next_round = current_round + 1;
1566
1567        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1568        let (round_to_certificates_map, committee) = {
1569            let private_keys = [
1570                PrivateKey::new(rng).unwrap(),
1571                PrivateKey::new(rng).unwrap(),
1572                PrivateKey::new(rng).unwrap(),
1573                PrivateKey::new(rng).unwrap(),
1574            ];
1575            let addresses = vec![
1576                Address::try_from(private_keys[0])?,
1577                Address::try_from(private_keys[1])?,
1578                Address::try_from(private_keys[2])?,
1579                Address::try_from(private_keys[3])?,
1580            ];
1581            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1582                committee_round,
1583                addresses,
1584                rng,
1585            );
1586            // Initialize a mapping from the round number to the set of batch certificates in the round.
1587            let mut round_to_certificates_map: IndexMap<
1588                u64,
1589                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1590            > = IndexMap::new();
1591            let mut previous_certificates = IndexSet::with_capacity(4);
1592            // Initialize the genesis batch certificates.
1593            for _ in 0..4 {
1594                previous_certificates.insert(sample_batch_certificate(rng));
1595            }
1596            for round in 0..commit_round + 3 {
1597                let mut current_certificates = IndexSet::new();
1598                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1599                    IndexSet::new()
1600                } else {
1601                    previous_certificates.iter().map(|c| c.id()).collect()
1602                };
1603                let transmission_ids =
1604                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1605                        .into_iter()
1606                        .collect::<IndexSet<_>>();
1607                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1608                let committee_id = committee.id();
1609                for (i, private_key_1) in private_keys.iter().enumerate() {
1610                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1611                        private_key_1,
1612                        round,
1613                        timestamp,
1614                        committee_id,
1615                        transmission_ids.clone(),
1616                        previous_certificate_ids.clone(),
1617                        rng,
1618                    )
1619                    .unwrap();
1620                    let mut signatures = IndexSet::with_capacity(4);
1621                    for (j, private_key_2) in private_keys.iter().enumerate() {
1622                        if i != j {
1623                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1624                        }
1625                    }
1626                    let certificate =
1627                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1628                    current_certificates.insert(certificate);
1629                }
1630                // Update the mapping.
1631                round_to_certificates_map.insert(round, current_certificates.clone());
1632                previous_certificates = current_certificates.clone();
1633            }
1634            (round_to_certificates_map, committee)
1635        };
1636
1637        // Initialize the ledger.
1638        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1639        // Initialize the storage.
1640        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1641        // Get the leaders for the next 2 commit rounds.
1642        let leader = committee.get_leader(commit_round).unwrap();
1643        let next_leader = committee.get_leader(next_round).unwrap();
1644        // Insert the pre shutdown certificates into the storage.
1645        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1646        for i in 1..=commit_round {
1647            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1648            if i == commit_round {
1649                // Only insert the leader certificate for the commit round.
1650                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1651                if let Some(c) = leader_certificate {
1652                    pre_shutdown_certificates.push(c.clone());
1653                }
1654                continue;
1655            }
1656            pre_shutdown_certificates.extend(certificates);
1657        }
1658        for certificate in pre_shutdown_certificates.iter() {
1659            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1660        }
1661        // Insert the post shutdown certificates into the storage.
1662        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1663            Vec::new();
1664        for j in commit_round..=commit_round + 2 {
1665            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1666            post_shutdown_certificates.extend(certificate);
1667        }
1668        for certificate in post_shutdown_certificates.iter() {
1669            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1670        }
1671        // Get the leader certificates.
1672        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1673        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1674
1675        // Initialize the BFT without bootup.
1676        let account = Account::new(rng)?;
1677        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1678
1679        // Insert a mock DAG in the BFT without bootup.
1680        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1681
1682        // Insert the certificates into the BFT without bootup.
1683        for certificate in pre_shutdown_certificates.clone() {
1684            assert!(bft.add_new_certificate(certificate).await.is_ok());
1685        }
1686
1687        // Insert the post shutdown certificates into the BFT without bootup.
1688        for certificate in post_shutdown_certificates.clone() {
1689            assert!(bft.add_new_certificate(certificate).await.is_ok());
1690        }
1691        // Commit the second leader certificate.
1692        let commit_subdag = bft.order_dag_with_dfs(next_leader_certificate.clone()).unwrap();
1693        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1694        bft.commit_leader_certificate(next_leader_certificate.clone()).await.unwrap();
1695
1696        // Simulate a bootup of the BFT.
1697
1698        // Initialize a new instance of storage.
1699        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1700
1701        // Initialize a new instance of BFT with bootup.
1702        let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1703
1704        // Sync the BFT DAG at bootup.
1705        for cert in pre_shutdown_certificates.iter() {
1706            bootup_bft.add_certificate_from_sync(cert.clone());
1707            bootup_bft.commit_certificate_from_sync(cert);
1708        }
1709
1710        // Insert the post shutdown certificates to the storage and BFT with bootup.
1711        for certificate in post_shutdown_certificates.iter() {
1712            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1713        }
1714        for certificate in post_shutdown_certificates.clone() {
1715            assert!(bootup_bft.add_new_certificate(certificate).await.is_ok());
1716        }
1717        // Commit the second leader certificate.
1718        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs(next_leader_certificate.clone()).unwrap();
1719        let commit_subdag_metadata_bootup =
1720            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1721        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1722        bootup_bft.commit_leader_certificate(next_leader_certificate.clone()).await.unwrap();
1723
1724        // Check that the final state of both BFTs is the same.
1725
1726        // Check that both BFTs start from the same last committed round.
1727        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1728
1729        // Ensure that both BFTs have committed the leader certificates.
1730        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1731        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1732        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1733        assert!(
1734            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1735        );
1736
1737        // Check that the bootup BFT has committed the pre shutdown certificates.
1738        for certificate in pre_shutdown_certificates.clone() {
1739            let certificate_round = certificate.round();
1740            let certificate_id = certificate.id();
1741            // Check that both BFTs have committed the certificates.
1742            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1743            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1744            // Check that the bootup BFT does not contain the certificates in its graph, because
1745            // it should not need to order them again in subsequent subdags.
1746            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1747            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1748        }
1749
1750        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1751        for certificate in committed_certificates_bootup.clone() {
1752            let certificate_round = certificate.round();
1753            let certificate_id = certificate.id();
1754            // Check that the both BFTs have committed the certificates.
1755            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1756            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1757            // Check that the bootup BFT does not contain the certificates in its graph, because
1758            // it should not need to order them again in subsequent subdags.
1759            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1760            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1761        }
1762
1763        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1764        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1765
1766        Ok(())
1767    }
1768
1769    #[tokio::test]
1770    #[tracing_test::traced_test]
1771    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1772        /*
1773        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1774        2. Add post shutdown certificates to the bootup BFT.
1775        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1776        */
1777
1778        let rng = &mut TestRng::default();
1779
1780        // Initialize the round parameters.
1781        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1782        let committee_round = 0;
1783        let commit_round = 2;
1784        let current_round = commit_round + 1;
1785        let next_round = current_round + 1;
1786
1787        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1788        let (round_to_certificates_map, committee) = {
1789            let private_keys = [
1790                PrivateKey::new(rng).unwrap(),
1791                PrivateKey::new(rng).unwrap(),
1792                PrivateKey::new(rng).unwrap(),
1793                PrivateKey::new(rng).unwrap(),
1794            ];
1795            let addresses = vec![
1796                Address::try_from(private_keys[0])?,
1797                Address::try_from(private_keys[1])?,
1798                Address::try_from(private_keys[2])?,
1799                Address::try_from(private_keys[3])?,
1800            ];
1801            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1802                committee_round,
1803                addresses,
1804                rng,
1805            );
1806            // Initialize a mapping from the round number to the set of batch certificates in the round.
1807            let mut round_to_certificates_map: IndexMap<
1808                u64,
1809                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1810            > = IndexMap::new();
1811            let mut previous_certificates = IndexSet::with_capacity(4);
1812            // Initialize the genesis batch certificates.
1813            for _ in 0..4 {
1814                previous_certificates.insert(sample_batch_certificate(rng));
1815            }
1816            for round in 0..=commit_round + 2 {
1817                let mut current_certificates = IndexSet::new();
1818                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1819                    IndexSet::new()
1820                } else {
1821                    previous_certificates.iter().map(|c| c.id()).collect()
1822                };
1823                let transmission_ids =
1824                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1825                        .into_iter()
1826                        .collect::<IndexSet<_>>();
1827                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1828                let committee_id = committee.id();
1829                for (i, private_key_1) in private_keys.iter().enumerate() {
1830                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1831                        private_key_1,
1832                        round,
1833                        timestamp,
1834                        committee_id,
1835                        transmission_ids.clone(),
1836                        previous_certificate_ids.clone(),
1837                        rng,
1838                    )
1839                    .unwrap();
1840                    let mut signatures = IndexSet::with_capacity(4);
1841                    for (j, private_key_2) in private_keys.iter().enumerate() {
1842                        if i != j {
1843                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1844                        }
1845                    }
1846                    let certificate =
1847                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1848                    current_certificates.insert(certificate);
1849                }
1850                // Update the mapping.
1851                round_to_certificates_map.insert(round, current_certificates.clone());
1852                previous_certificates = current_certificates.clone();
1853            }
1854            (round_to_certificates_map, committee)
1855        };
1856
1857        // Initialize the ledger.
1858        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1859        // Initialize the storage.
1860        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1861        // Get the leaders for the next 2 commit rounds.
1862        let leader = committee.get_leader(commit_round).unwrap();
1863        let next_leader = committee.get_leader(next_round).unwrap();
1864        // Insert the pre shutdown certificates into the storage.
1865        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1866        for i in 1..=commit_round {
1867            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1868            if i == commit_round {
1869                // Only insert the leader certificate for the commit round.
1870                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1871                if let Some(c) = leader_certificate {
1872                    pre_shutdown_certificates.push(c.clone());
1873                }
1874                continue;
1875            }
1876            pre_shutdown_certificates.extend(certificates);
1877        }
1878        for certificate in pre_shutdown_certificates.iter() {
1879            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1880        }
1881        // Initialize the bootup BFT.
1882        let account = Account::new(rng)?;
1883        let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1884
1885        // Insert a mock DAG in the BFT without bootup.
1886        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1887        // Sync the BFT DAG at bootup.
1888        for cert in pre_shutdown_certificates.iter() {
1889            bootup_bft.add_certificate_from_sync(cert.clone());
1890            bootup_bft.commit_certificate_from_sync(cert);
1891        }
1892
1893        // Insert the post shutdown certificates into the storage.
1894        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1895            Vec::new();
1896        for j in commit_round..=commit_round + 2 {
1897            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1898            post_shutdown_certificates.extend(certificate);
1899        }
1900        for certificate in post_shutdown_certificates.iter() {
1901            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1902        }
1903
1904        // Insert the post shutdown certificates into the DAG.
1905        for certificate in post_shutdown_certificates.clone() {
1906            assert!(bootup_bft.add_new_certificate(certificate).await.is_ok());
1907        }
1908
1909        // Get the next leader certificate to commit.
1910        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1911        let commit_subdag = bootup_bft.order_dag_with_dfs(next_leader_certificate).unwrap();
1912        let committed_certificates = commit_subdag.values().flatten();
1913
1914        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1915        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1916            for committed_certificate in committed_certificates.clone() {
1917                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1918            }
1919        }
1920        Ok(())
1921    }
1922
1923    /// Tests that a leader certificate can be committed by sufficient endorsements in a succeeding leader certificate.
1924    #[test_log::test(tokio::test)]
1925    async fn test_commit_via_is_linked() {
1926        let rng = &mut TestRng::default();
1927
1928        let committee_round = 0;
1929        let leader_round_1 = 2;
1930        let leader_round_2 = 4; // subsequent even round
1931        let max_gc_rounds = 50;
1932
1933        // Create a committee with four members.
1934        let num_authors = 4;
1935        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
1936        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
1937
1938        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
1939        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1940        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1941        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
1942
1943        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
1944
1945        // Round 1
1946        let round1_certs: IndexSet<_> = (0..num_authors)
1947            .map(|idx| {
1948                let author = &private_keys[idx];
1949                let endorsements: Vec<_> = private_keys
1950                    .iter()
1951                    .enumerate()
1952                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1953                    .collect();
1954
1955                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
1956            })
1957            .collect();
1958        certificates_by_round.insert(1, round1_certs.clone());
1959
1960        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
1961        let mut leader1_certificate = None;
1962
1963        let round2_certs: IndexSet<_> = (0..num_authors)
1964            .map(|idx| {
1965                let author = &private_keys[idx];
1966                let endorsements: Vec<_> = private_keys
1967                    .iter()
1968                    .enumerate()
1969                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1970                    .collect();
1971                let cert = sample_batch_certificate_for_round_with_committee(
1972                    leader_round_1,
1973                    round1_certs.iter().map(|c| c.id()).collect(),
1974                    author,
1975                    &endorsements[..],
1976                    rng,
1977                );
1978
1979                if cert.author() == leader1 {
1980                    leader1_certificate = Some(cert.clone());
1981                }
1982                cert
1983            })
1984            .collect();
1985        certificates_by_round.insert(leader_round_1, round2_certs.clone());
1986
1987        let round3_certs: IndexSet<_> = (0..num_authors)
1988            .map(|idx| {
1989                let author = &private_keys[idx];
1990                let endorsements: Vec<_> = private_keys
1991                    .iter()
1992                    .enumerate()
1993                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1994                    .collect();
1995
1996                let previous_certificate_ids: IndexSet<_> = round2_certs
1997                    .iter()
1998                    .filter_map(|cert| {
1999                        // Only have the leader endorse the previous round's leader certificate.
2000                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2001                    })
2002                    .collect();
2003
2004                sample_batch_certificate_for_round_with_committee(
2005                    leader_round_1 + 1,
2006                    previous_certificate_ids,
2007                    author,
2008                    &endorsements[..],
2009                    rng,
2010                )
2011            })
2012            .collect();
2013        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2014
2015        // Ensure the first leader's certificate is not committed yet.
2016        let leader_certificate_1 = leader1_certificate.unwrap();
2017        assert!(
2018            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2019            "Leader certificate 1 should not be committed yet"
2020        );
2021        assert_eq!(bft.dag.read().last_committed_round(), 0);
2022
2023        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2024        let round4_certs: IndexSet<_> = (0..num_authors)
2025            .map(|idx| {
2026                let endorsements: Vec<_> = private_keys
2027                    .iter()
2028                    .enumerate()
2029                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2030                    .collect();
2031
2032                sample_batch_certificate_for_round_with_committee(
2033                    leader_round_2,
2034                    round3_certs.iter().map(|c| c.id()).collect(),
2035                    &private_keys[idx],
2036                    &endorsements[..],
2037                    rng,
2038                )
2039            })
2040            .collect();
2041        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2042
2043        // Insert all certificates into the storage and DAG.
2044        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2045            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2046            bft.add_certificate_from_sync(certificate);
2047        }
2048
2049        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2050
2051        assert!(
2052            bft.dag.read().is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2053            "Leader certificate 1 should be linked to leader certificate 2"
2054        );
2055
2056        // Explicitely commit leader certificate 2.
2057        bft.commit_leader_certificate(leader_certificate_2.clone()).await.unwrap();
2058
2059        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2060        assert!(
2061            bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2062            "Leader certificate for round 2 should be committed when committing at round 4"
2063        );
2064
2065        // Leader certificate 2 should be committed as the above call was successful.
2066        assert!(
2067            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2068            "Leader certificate for round 4 should be committed"
2069        );
2070
2071        assert_eq!(bft.dag.read().last_committed_round(), 4);
2072    }
2073
2074    #[test_log::test(tokio::test)]
2075    async fn test_commit_via_is_linked_with_skipped_anchor() {
2076        let rng = &mut TestRng::default();
2077
2078        let committee_round = 0;
2079        let leader_round_1 = 2;
2080        let leader_round_2 = 4;
2081        let max_gc_rounds = 50;
2082
2083        let num_authors = 4;
2084        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2085        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2086
2087        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2088        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2089        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
2090        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2091
2092        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2093
2094        // Round 1
2095        let round1_certs: IndexSet<_> = (0..num_authors)
2096            .map(|idx| {
2097                let author = &private_keys[idx];
2098                let endorsements: Vec<_> = private_keys
2099                    .iter()
2100                    .enumerate()
2101                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2102                    .collect();
2103
2104                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2105            })
2106            .collect();
2107        certificates_by_round.insert(1, round1_certs.clone());
2108
2109        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2110        let mut leader1_certificate = None;
2111
2112        let round2_certs: IndexSet<_> = (0..num_authors)
2113            .map(|idx| {
2114                let author = &private_keys[idx];
2115                let endorsements: Vec<_> = private_keys
2116                    .iter()
2117                    .enumerate()
2118                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2119                    .collect();
2120                let cert = sample_batch_certificate_for_round_with_committee(
2121                    leader_round_1,
2122                    round1_certs.iter().map(|c| c.id()).collect(),
2123                    author,
2124                    &endorsements[..],
2125                    rng,
2126                );
2127
2128                if cert.author() == leader1 {
2129                    leader1_certificate = Some(cert.clone());
2130                }
2131                cert
2132            })
2133            .collect();
2134        certificates_by_round.insert(leader_round_1, round2_certs.clone());
2135
2136        let round3_certs: IndexSet<_> = (0..num_authors)
2137            .map(|idx| {
2138                let author = &private_keys[idx];
2139                let endorsements: Vec<_> = private_keys
2140                    .iter()
2141                    .enumerate()
2142                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2143                    .collect();
2144
2145                let previous_certificate_ids: IndexSet<_> = round2_certs
2146                    .iter()
2147                    .filter_map(|cert| {
2148                        // Only have the leader endorse the previous round's leader certificate.
2149                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2150                    })
2151                    .collect();
2152
2153                sample_batch_certificate_for_round_with_committee(
2154                    leader_round_1 + 1,
2155                    previous_certificate_ids,
2156                    author,
2157                    &endorsements[..],
2158                    rng,
2159                )
2160            })
2161            .collect();
2162        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2163
2164        // Ensure the first leader's certificate is not committed yet.
2165        let leader_certificate_1 = leader1_certificate.unwrap();
2166        assert!(
2167            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2168            "Leader certificate 1 should not be committed yet"
2169        );
2170
2171        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2172        let round4_certs: IndexSet<_> = (0..num_authors)
2173            .map(|idx| {
2174                let endorsements: Vec<_> = private_keys
2175                    .iter()
2176                    .enumerate()
2177                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2178                    .collect();
2179
2180                // Do not create a path to the previous leader certificate.
2181                let previous_certificate_ids: IndexSet<_> = round3_certs
2182                    .iter()
2183                    .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2184                    .collect();
2185
2186                sample_batch_certificate_for_round_with_committee(
2187                    leader_round_2,
2188                    previous_certificate_ids,
2189                    &private_keys[idx],
2190                    &endorsements[..],
2191                    rng,
2192                )
2193            })
2194            .collect();
2195        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2196
2197        // Insert all certificates into the storage and DAG.
2198        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2199            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2200            bft.add_certificate_from_sync(certificate);
2201        }
2202
2203        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2204
2205        assert!(
2206            !bft.dag.read().is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2207            "Leader certificate 1 should not be linked to leader certificate 2"
2208        );
2209        assert_eq!(bft.dag.read().last_committed_round(), 0);
2210
2211        // Explicitely commit leader certificate 2.
2212        bft.commit_leader_certificate(leader_certificate_2.clone()).await.unwrap();
2213
2214        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2215        assert!(
2216            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2217            "Leader certificate for round 2 should not be committed when committing at round 4"
2218        );
2219
2220        // Leader certificate 2 should be committed as the above call was successful.
2221        assert!(
2222            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2223            "Leader certificate for round 4 should be committed"
2224        );
2225        assert_eq!(bft.dag.read().last_committed_round(), 4);
2226    }
2227}