use crate::{
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
Primary,
helpers::{
BFTReceiver,
ConsensusSender,
DAG,
PrimaryReceiver,
PrimarySender,
Storage,
fmt_id,
init_bft_channels,
now,
},
};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{
console::account::Address,
ledger::{
block::Transaction,
committee::Committee,
narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
puzzle::{Solution, SolutionID},
},
prelude::{Field, Network, Result, bail, ensure},
};
use colored::Colorize;
use indexmap::{IndexMap, IndexSet};
use parking_lot::{Mutex, RwLock};
use std::{
collections::{BTreeMap, HashSet},
future::Future,
net::SocketAddr,
sync::{
Arc,
atomic::{AtomicI64, Ordering},
},
};
use tokio::{
sync::{Mutex as TMutex, OnceCell, oneshot},
task::JoinHandle,
};
#[derive(Clone)]
pub struct BFT<N: Network> {
primary: Primary<N>,
dag: Arc<RwLock<DAG<N>>>,
leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
leader_certificate_timer: Arc<AtomicI64>,
consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
lock: Arc<TMutex<()>>,
}
impl<N: Network> BFT<N> {
pub fn new(
account: Account<N>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
dev: Option<u16>,
) -> Result<Self> {
Ok(Self {
primary: Primary::new(account, storage, ledger, ip, trusted_validators, dev)?,
dag: Default::default(),
leader_certificate: Default::default(),
leader_certificate_timer: Default::default(),
consensus_sender: Default::default(),
handles: Default::default(),
lock: Default::default(),
})
}
pub async fn run(
&mut self,
consensus_sender: Option<ConsensusSender<N>>,
primary_sender: PrimarySender<N>,
primary_receiver: PrimaryReceiver<N>,
) -> Result<()> {
info!("Starting the BFT instance...");
let (bft_sender, bft_receiver) = init_bft_channels::<N>();
self.start_handlers(bft_receiver);
self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
if let Some(consensus_sender) = consensus_sender {
self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
}
Ok(())
}
pub fn is_synced(&self) -> bool {
self.primary.is_synced()
}
pub const fn primary(&self) -> &Primary<N> {
&self.primary
}
pub const fn storage(&self) -> &Storage<N> {
self.primary.storage()
}
pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
self.primary.ledger()
}
pub fn leader(&self) -> Option<Address<N>> {
self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
}
pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
&self.leader_certificate
}
}
impl<N: Network> BFT<N> {
pub fn num_unconfirmed_transmissions(&self) -> usize {
self.primary.num_unconfirmed_transmissions()
}
pub fn num_unconfirmed_ratifications(&self) -> usize {
self.primary.num_unconfirmed_ratifications()
}
pub fn num_unconfirmed_solutions(&self) -> usize {
self.primary.num_unconfirmed_solutions()
}
pub fn num_unconfirmed_transactions(&self) -> usize {
self.primary.num_unconfirmed_transactions()
}
}
impl<N: Network> BFT<N> {
pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
self.primary.worker_transmission_ids()
}
pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
self.primary.worker_transmissions()
}
pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
self.primary.worker_solutions()
}
pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
self.primary.worker_transactions()
}
}
impl<N: Network> BFT<N> {
fn update_to_next_round(&self, current_round: u64) -> bool {
let storage_round = self.storage().current_round();
if current_round < storage_round {
debug!(
"BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
);
return false;
}
let is_ready = match current_round % 2 == 0 {
true => self.update_leader_certificate_to_even_round(current_round),
false => self.is_leader_quorum_or_nonleaders_available(current_round),
};
#[cfg(feature = "metrics")]
{
let start = self.leader_certificate_timer.load(Ordering::SeqCst);
if start > 0 {
let end = now();
let elapsed = std::time::Duration::from_secs((end - start) as u64);
metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
}
}
if current_round % 2 == 0 {
if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
if !is_ready {
trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
}
let leader_round = leader_certificate.round();
match leader_round == current_round {
true => {
info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
#[cfg(feature = "metrics")]
metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
}
false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
}
} else {
match is_ready {
true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
}
}
}
if is_ready {
if let Err(e) = self.storage().increment_to_next_round(current_round) {
warn!("BFT failed to increment to the next round from round {current_round} - {e}");
return false;
}
self.leader_certificate_timer.store(now(), Ordering::SeqCst);
}
is_ready
}
fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
let current_round = self.storage().current_round();
if current_round != even_round {
warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
return false;
}
if current_round % 2 != 0 || current_round < 2 {
error!("BFT cannot update the leader certificate in an odd round");
return false;
}
let current_certificates = self.storage().get_certificates_for_round(current_round);
if current_certificates.is_empty() {
*self.leader_certificate.write() = None;
return false;
}
let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
Ok(committee) => committee,
Err(e) => {
error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
return false;
}
};
let leader = match self.ledger().latest_leader() {
Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
_ => {
let computed_leader = match committee_lookback.get_leader(current_round) {
Ok(leader) => leader,
Err(e) => {
error!("BFT failed to compute the leader for the even round {current_round} - {e}");
return false;
}
};
self.ledger().update_latest_leader(current_round, computed_leader);
computed_leader
}
};
let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
*self.leader_certificate.write() = leader_certificate.cloned();
self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
}
fn is_even_round_ready_for_next_round(
&self,
certificates: IndexSet<BatchCertificate<N>>,
committee: Committee<N>,
current_round: u64,
) -> bool {
let authors = certificates.into_iter().map(|c| c.author()).collect();
if !committee.is_quorum_threshold_reached(&authors) {
trace!("BFT failed to reach quorum threshold in even round {current_round}");
return false;
}
if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
if leader_certificate.round() == current_round {
return true;
}
}
if self.is_timer_expired() {
debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
return true;
}
false
}
fn is_timer_expired(&self) -> bool {
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
}
fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
let current_round = self.storage().current_round();
if current_round != odd_round {
warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
return false;
}
if current_round % 2 != 1 {
error!("BFT does not compute stakes for the leader certificate in an even round");
return false;
}
let current_certificates = self.storage().get_certificates_for_round(current_round);
let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
Ok(committee) => committee,
Err(e) => {
error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
return false;
}
};
let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
if !committee_lookback.is_quorum_threshold_reached(&authors) {
trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
return false;
}
let Some(leader_certificate) = self.leader_certificate.read().clone() else {
return true;
};
let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
leader_certificate.id(),
current_certificates,
&committee_lookback,
);
stake_with_leader >= committee_lookback.availability_threshold()
|| stake_without_leader >= committee_lookback.quorum_threshold()
|| self.is_timer_expired()
}
fn compute_stake_for_leader_certificate(
&self,
leader_certificate_id: Field<N>,
current_certificates: IndexSet<BatchCertificate<N>>,
current_committee: &Committee<N>,
) -> (u64, u64) {
if current_certificates.is_empty() {
return (0, 0);
}
let mut stake_with_leader = 0u64;
let mut stake_without_leader = 0u64;
for certificate in current_certificates {
let stake = current_committee.get_stake(certificate.author());
match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
true => stake_with_leader = stake_with_leader.saturating_add(stake),
false => stake_without_leader = stake_without_leader.saturating_add(stake),
}
}
(stake_with_leader, stake_without_leader)
}
}
impl<N: Network> BFT<N> {
async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
&self,
certificate: BatchCertificate<N>,
) -> Result<()> {
let _lock = self.lock.lock().await;
let certificate_round = certificate.round();
self.dag.write().insert(certificate);
let commit_round = certificate_round.saturating_sub(1);
if commit_round % 2 != 0 || commit_round < 2 {
return Ok(());
}
if commit_round <= self.dag.read().last_committed_round() {
return Ok(());
}
trace!("Checking if the leader is ready to be committed for round {commit_round}...");
let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
};
let leader = match self.ledger().latest_leader() {
Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
_ => {
let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
bail!("BFT failed to compute the leader for commit round {commit_round}");
};
self.ledger().update_latest_leader(commit_round, computed_leader);
computed_leader
}
};
let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
else {
trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
return Ok(());
};
let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
};
let authors = certificates
.values()
.filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
true => Some(c.author()),
false => None,
})
.collect();
if !committee_lookback.is_availability_threshold_reached(&authors) {
trace!("BFT is not ready to commit {commit_round}");
return Ok(());
}
info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
}
async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
&self,
leader_certificate: BatchCertificate<N>,
) -> Result<()> {
let latest_leader_round = leader_certificate.round();
let mut leader_certificates = vec![leader_certificate.clone()];
{
let leader_round = leader_certificate.round();
let mut current_certificate = leader_certificate;
for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
{
let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
Ok(committee) => committee,
Err(e) => {
bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
}
};
let leader = match self.ledger().latest_leader() {
Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
_ => {
let computed_leader = match previous_committee_lookback.get_leader(round) {
Ok(leader) => leader,
Err(e) => {
bail!("BFT failed to compute the leader for the even round {round} - {e}");
}
};
self.ledger().update_latest_leader(round, computed_leader);
computed_leader
}
};
let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
else {
continue;
};
if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
leader_certificates.push(previous_certificate.clone());
current_certificate = previous_certificate;
}
}
}
for leader_certificate in leader_certificates.into_iter().rev() {
let leader_round = leader_certificate.round();
let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
Ok(subdag) => subdag,
Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
};
if !IS_SYNCING {
let mut transmissions = IndexMap::new();
let mut seen_transaction_ids = IndexSet::new();
let mut seen_solution_ids = IndexSet::new();
for certificate in commit_subdag.values().flatten() {
for transmission_id in certificate.transmission_ids() {
match transmission_id {
TransmissionID::Solution(solution_id, _) => {
if seen_solution_ids.contains(&solution_id) {
continue;
}
}
TransmissionID::Transaction(transaction_id, _) => {
if seen_transaction_ids.contains(transaction_id) {
continue;
}
}
TransmissionID::Ratification => {
bail!("Ratifications are currently not supported in the BFT.")
}
}
if transmissions.contains_key(transmission_id) {
continue;
}
if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
continue;
}
let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
bail!(
"BFT failed to retrieve transmission '{}.{}' from round {}",
fmt_id(transmission_id),
fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
certificate.round()
);
};
match transmission_id {
TransmissionID::Solution(id, _) => {
seen_solution_ids.insert(id);
}
TransmissionID::Transaction(id, _) => {
seen_transaction_ids.insert(id);
}
TransmissionID::Ratification => {}
}
transmissions.insert(*transmission_id, transmission);
}
}
let subdag = Subdag::from(commit_subdag.clone())?;
let anchor_round = subdag.anchor_round();
let num_transmissions = transmissions.len();
let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
ensure!(
anchor_round == leader_round,
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);
if let Some(consensus_sender) = self.consensus_sender.get() {
let (callback_sender, callback_receiver) = oneshot::channel();
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
match callback_receiver.await {
Ok(Ok(())) => (), Ok(Err(e)) => {
error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
return Ok(());
}
Err(e) => {
error!("BFT failed to receive the callback for round {anchor_round} - {e}");
return Ok(());
}
}
}
info!(
"\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
);
}
let mut dag_write = self.dag.write();
for certificate in commit_subdag.values().flatten() {
dag_write.commit(certificate, self.storage().max_gc_rounds());
}
}
self.storage().garbage_collect_certificates(latest_leader_round);
Ok(())
}
fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
&self,
leader_certificate: BatchCertificate<N>,
) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
let mut already_ordered = HashSet::new();
let mut buffer = vec![leader_certificate];
while let Some(certificate) = buffer.pop() {
commit.entry(certificate.round()).or_default().insert(certificate.clone());
let previous_round = certificate.round().saturating_sub(1);
if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
continue;
}
for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
if already_ordered.contains(previous_certificate_id) {
continue;
}
if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
continue;
}
if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
continue;
}
let previous_certificate = {
match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
Some(previous_certificate) => previous_certificate,
None => match self.storage().get_certificate(*previous_certificate_id) {
Some(previous_certificate) => previous_certificate,
None => bail!(
"Missing previous certificate {} for round {previous_round}",
fmt_id(previous_certificate_id)
),
},
}
};
already_ordered.insert(previous_certificate.id());
buffer.push(previous_certificate);
}
}
commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
Ok(commit)
}
fn is_linked(
&self,
previous_certificate: BatchCertificate<N>,
current_certificate: BatchCertificate<N>,
) -> Result<bool> {
let mut traversal = vec![current_certificate.clone()];
for round in (previous_certificate.round()..current_certificate.round()).rev() {
let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
bail!("BFT failed to retrieve the certificates for past round {round}");
};
traversal = certificates
.into_values()
.filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
.collect();
}
Ok(traversal.contains(&previous_certificate))
}
}
impl<N: Network> BFT<N> {
fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
let BFTReceiver {
mut rx_primary_round,
mut rx_primary_certificate,
mut rx_sync_bft_dag_at_bootup,
mut rx_sync_bft,
} = bft_receiver;
let self_ = self.clone();
self.spawn(async move {
while let Some((current_round, callback)) = rx_primary_round.recv().await {
callback.send(self_.update_to_next_round(current_round)).ok();
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
let result = self_.update_dag::<true, false>(certificate).await;
callback.send(result).ok();
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
self_.sync_bft_dag_at_bootup(certificates).await;
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((certificate, callback)) = rx_sync_bft.recv().await {
let result = self_.update_dag::<true, true>(certificate).await;
callback.send(result).ok();
}
});
}
async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
let mut dag = self.dag.write();
for certificate in certificates {
dag.commit(&certificate, self.storage().max_gc_rounds());
}
}
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
pub async fn shut_down(&self) {
info!("Shutting down the BFT...");
let _lock = self.lock.lock().await;
self.primary.shut_down().await;
self.handles.lock().iter().for_each(|handle| handle.abort());
}
}
#[cfg(test)]
mod tests {
use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
use snarkos_account::Account;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkos_node_bft_storage_service::BFTMemoryService;
use snarkvm::{
console::account::{Address, PrivateKey},
ledger::{
committee::Committee,
narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
},
utilities::TestRng,
};
use anyhow::Result;
use indexmap::{IndexMap, IndexSet};
use std::sync::Arc;
type CurrentNetwork = snarkvm::console::network::MainnetV0;
fn sample_test_instance(
committee_round: Option<u64>,
max_gc_rounds: u64,
rng: &mut TestRng,
) -> (
Committee<CurrentNetwork>,
Account<CurrentNetwork>,
Arc<MockLedgerService<CurrentNetwork>>,
Storage<CurrentNetwork>,
) {
let committee = match committee_round {
Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
};
let account = Account::new(rng).unwrap();
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let transmissions = Arc::new(BFTMemoryService::new());
let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
(committee, account, ledger, storage)
}
#[test]
#[tracing_test::traced_test]
fn test_is_leader_quorum_odd() -> Result<()> {
let rng = &mut TestRng::default();
let mut certificates = IndexSet::new();
certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1,
vec![
certificates[0].author(),
certificates[1].author(),
certificates[2].author(),
certificates[3].author(),
],
rng,
);
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
assert!(bft.is_timer_expired());
let result = bft.is_leader_quorum_or_nonleaders_available(1);
assert!(!result);
for certificate in certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let result = bft.is_leader_quorum_or_nonleaders_available(1);
assert!(result); let leader_certificate = sample_batch_certificate(rng);
*bft.leader_certificate.write() = Some(leader_certificate);
let result = bft.is_leader_quorum_or_nonleaders_available(1);
assert!(result); Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
let rng = &mut TestRng::default();
let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
assert_eq!(committee.starting_round(), 1);
assert_eq!(storage.current_round(), 1);
assert_eq!(storage.max_gc_rounds(), 10);
let bft = BFT::new(account, storage, ledger, None, &[], None)?;
assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
assert!(!result);
Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_is_leader_quorum_even() -> Result<()> {
let rng = &mut TestRng::default();
let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
assert_eq!(committee.starting_round(), 2);
assert_eq!(storage.current_round(), 2);
assert_eq!(storage.max_gc_rounds(), 10);
let bft = BFT::new(account, storage, ledger, None, &[], None)?;
assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
assert!(!result);
Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_is_even_round_ready() -> Result<()> {
let rng = &mut TestRng::default();
let mut certificates = IndexSet::new();
certificates.insert(sample_batch_certificate_for_round(2, rng));
certificates.insert(sample_batch_certificate_for_round(2, rng));
certificates.insert(sample_batch_certificate_for_round(2, rng));
certificates.insert(sample_batch_certificate_for_round(2, rng));
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
2,
vec![
certificates[0].author(),
certificates[1].author(),
certificates[2].author(),
certificates[3].author(),
],
rng,
);
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
let leader_certificate = sample_batch_certificate_for_round(2, rng);
*bft.leader_certificate.write() = Some(leader_certificate);
let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
assert!(!result);
let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
assert!(result);
let bft_timer = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if !bft_timer.is_timer_expired() {
assert!(!result);
}
let leader_certificate_timeout =
std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
std::thread::sleep(leader_certificate_timeout);
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if bft_timer.is_timer_expired() {
assert!(result);
} else {
assert!(!result);
}
Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_update_leader_certificate_odd() -> Result<()> {
let rng = &mut TestRng::default();
let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
assert_eq!(storage.max_gc_rounds(), 10);
let bft = BFT::new(account, storage, ledger, None, &[], None)?;
let result = bft.update_leader_certificate_to_even_round(1);
assert!(!result);
Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_update_leader_certificate_bad_round() -> Result<()> {
let rng = &mut TestRng::default();
let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
assert_eq!(storage.max_gc_rounds(), 10);
let bft = BFT::new(account, storage, ledger, None, &[], None)?;
let result = bft.update_leader_certificate_to_even_round(6);
assert!(!result);
Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_update_leader_certificate_even() -> Result<()> {
let rng = &mut TestRng::default();
let current_round = 3;
let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
current_round,
rng,
);
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
2,
vec![
certificates[0].author(),
certificates[1].author(),
certificates[2].author(),
certificates[3].author(),
],
rng,
);
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let transmissions = Arc::new(BFTMemoryService::new());
let storage = Storage::new(ledger.clone(), transmissions, 10);
storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
assert_eq!(storage.current_round(), 2);
let leader = committee.get_leader(2).unwrap();
let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
let account = Account::new(rng)?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], None)?;
*bft.leader_certificate.write() = Some(leader_certificate);
let result = bft.update_leader_certificate_to_even_round(2);
assert!(result);
Ok(())
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_order_dag_with_dfs() -> Result<()> {
let rng = &mut TestRng::default();
let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
let previous_round = 2; let current_round = previous_round + 1;
let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
current_round,
rng,
);
{
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
for certificate in previous_certificates.clone() {
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
let result = bft.order_dag_with_dfs::<false>(certificate.clone());
assert!(result.is_ok());
let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
assert_eq!(candidate_certificates.len(), 1);
let expected_certificates = vec![certificate.clone()];
assert_eq!(
candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
);
assert_eq!(candidate_certificates, expected_certificates);
}
{
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
let bft = BFT::new(account, storage, ledger, None, &[], None)?;
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
for certificate in previous_certificates.clone() {
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
let result = bft.order_dag_with_dfs::<false>(certificate.clone());
assert!(result.is_ok());
let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
assert_eq!(candidate_certificates.len(), 5);
let expected_certificates = vec![
previous_certificates[0].clone(),
previous_certificates[1].clone(),
previous_certificates[2].clone(),
previous_certificates[3].clone(),
certificate,
];
assert_eq!(
candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
);
assert_eq!(candidate_certificates, expected_certificates);
}
Ok(())
}
#[test]
#[tracing_test::traced_test]
fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
let rng = &mut TestRng::default();
let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
assert_eq!(committee.starting_round(), 1);
assert_eq!(storage.current_round(), 1);
assert_eq!(storage.max_gc_rounds(), 1);
let previous_round = 2; let current_round = previous_round + 1;
let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
current_round,
rng,
);
let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
let bft = BFT::new(account, storage, ledger, None, &[], None)?;
let error_msg = format!(
"Missing previous certificate {} for round {previous_round}",
crate::helpers::fmt_id(previous_certificate_ids[3]),
);
let result = bft.order_dag_with_dfs::<false>(certificate);
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), error_msg);
Ok(())
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_bft_gc_on_commit() -> Result<()> {
let rng = &mut TestRng::default();
let max_gc_rounds = 1;
let committee_round = 0;
let commit_round = 2;
let current_round = commit_round + 1;
let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
current_round,
rng,
);
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
committee_round,
vec![
certificates[0].author(),
certificates[1].author(),
certificates[2].author(),
certificates[3].author(),
],
rng,
);
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let transmissions = Arc::new(BFTMemoryService::new());
let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
for certificate in certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let leader = committee.get_leader(commit_round).unwrap();
let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
let account = Account::new(rng)?;
let bft = BFT::new(account, storage.clone(), ledger, None, &[], None)?;
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
for certificate in certificates {
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
Ok(())
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_sync_bft_dag_at_bootup() -> Result<()> {
let rng = &mut TestRng::default();
let max_gc_rounds = 1;
let committee_round = 0;
let commit_round = 2;
let current_round = commit_round + 1;
let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
current_round,
rng,
);
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
committee_round,
vec![
certificates[0].author(),
certificates[1].author(),
certificates[2].author(),
certificates[3].author(),
],
rng,
);
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
for certificate in certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let leader = committee.get_leader(commit_round).unwrap();
let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
for certificate in certificates.clone() {
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], None)?;
bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
for certificate in certificates {
let certificate_round = certificate.round();
let certificate_id = certificate.id();
assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
}
Ok(())
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
let rng = &mut TestRng::default();
let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
let committee_round = 0;
let commit_round = 2;
let current_round = commit_round + 1;
let next_round = current_round + 1;
let (round_to_certificates_map, committee) = {
let private_keys = vec![
PrivateKey::new(rng).unwrap(),
PrivateKey::new(rng).unwrap(),
PrivateKey::new(rng).unwrap(),
PrivateKey::new(rng).unwrap(),
];
let addresses = vec![
Address::try_from(private_keys[0])?,
Address::try_from(private_keys[1])?,
Address::try_from(private_keys[2])?,
Address::try_from(private_keys[3])?,
];
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
committee_round,
addresses,
rng,
);
let mut round_to_certificates_map: IndexMap<
u64,
IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
> = IndexMap::new();
let mut previous_certificates = IndexSet::with_capacity(4);
for _ in 0..4 {
previous_certificates.insert(sample_batch_certificate(rng));
}
for round in 0..commit_round + 3 {
let mut current_certificates = IndexSet::new();
let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
IndexSet::new()
} else {
previous_certificates.iter().map(|c| c.id()).collect()
};
let transmission_ids =
snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
.into_iter()
.collect::<IndexSet<_>>();
let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
let committee_id = committee.id();
for (i, private_key_1) in private_keys.iter().enumerate() {
let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
private_key_1,
round,
timestamp,
committee_id,
transmission_ids.clone(),
previous_certificate_ids.clone(),
rng,
)
.unwrap();
let mut signatures = IndexSet::with_capacity(4);
for (j, private_key_2) in private_keys.iter().enumerate() {
if i != j {
signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
}
}
let certificate =
snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
current_certificates.insert(certificate);
}
round_to_certificates_map.insert(round, current_certificates.clone());
previous_certificates = current_certificates.clone();
}
(round_to_certificates_map, committee)
};
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
let leader = committee.get_leader(commit_round).unwrap();
let next_leader = committee.get_leader(next_round).unwrap();
let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
for i in 1..=commit_round {
let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
if i == commit_round {
let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
if let Some(c) = leader_certificate {
pre_shutdown_certificates.push(c.clone());
}
continue;
}
pre_shutdown_certificates.extend(certificates);
}
for certificate in pre_shutdown_certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
Vec::new();
for j in commit_round..=commit_round + 2 {
let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
post_shutdown_certificates.extend(certificate);
}
for certificate in post_shutdown_certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
let account = Account::new(rng)?;
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
for certificate in pre_shutdown_certificates.clone() {
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
for certificate in post_shutdown_certificates.clone() {
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
let bootup_bft = BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], None)?;
bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
for certificate in post_shutdown_certificates.iter() {
bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
}
for certificate in post_shutdown_certificates.clone() {
assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
}
let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
let commit_subdag_metadata_bootup =
commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
assert!(
bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
);
for certificate in pre_shutdown_certificates.clone() {
let certificate_round = certificate.round();
let certificate_id = certificate.id();
assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
}
for certificate in committed_certificates_bootup.clone() {
let certificate_round = certificate.round();
let certificate_id = certificate.id();
assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
}
assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
Ok(())
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
let rng = &mut TestRng::default();
let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
let committee_round = 0;
let commit_round = 2;
let current_round = commit_round + 1;
let next_round = current_round + 1;
let (round_to_certificates_map, committee) = {
let private_keys = vec![
PrivateKey::new(rng).unwrap(),
PrivateKey::new(rng).unwrap(),
PrivateKey::new(rng).unwrap(),
PrivateKey::new(rng).unwrap(),
];
let addresses = vec![
Address::try_from(private_keys[0])?,
Address::try_from(private_keys[1])?,
Address::try_from(private_keys[2])?,
Address::try_from(private_keys[3])?,
];
let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
committee_round,
addresses,
rng,
);
let mut round_to_certificates_map: IndexMap<
u64,
IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
> = IndexMap::new();
let mut previous_certificates = IndexSet::with_capacity(4);
for _ in 0..4 {
previous_certificates.insert(sample_batch_certificate(rng));
}
for round in 0..=commit_round + 2 {
let mut current_certificates = IndexSet::new();
let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
IndexSet::new()
} else {
previous_certificates.iter().map(|c| c.id()).collect()
};
let transmission_ids =
snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
.into_iter()
.collect::<IndexSet<_>>();
let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
let committee_id = committee.id();
for (i, private_key_1) in private_keys.iter().enumerate() {
let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
private_key_1,
round,
timestamp,
committee_id,
transmission_ids.clone(),
previous_certificate_ids.clone(),
rng,
)
.unwrap();
let mut signatures = IndexSet::with_capacity(4);
for (j, private_key_2) in private_keys.iter().enumerate() {
if i != j {
signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
}
}
let certificate =
snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
current_certificates.insert(certificate);
}
round_to_certificates_map.insert(round, current_certificates.clone());
previous_certificates = current_certificates.clone();
}
(round_to_certificates_map, committee)
};
let ledger = Arc::new(MockLedgerService::new(committee.clone()));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
let leader = committee.get_leader(commit_round).unwrap();
let next_leader = committee.get_leader(next_round).unwrap();
let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
for i in 1..=commit_round {
let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
if i == commit_round {
let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
if let Some(c) = leader_certificate {
pre_shutdown_certificates.push(c.clone());
}
continue;
}
pre_shutdown_certificates.extend(certificates);
}
for certificate in pre_shutdown_certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let account = Account::new(rng)?;
let bootup_bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
*bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
Vec::new();
for j in commit_round..=commit_round + 2 {
let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
post_shutdown_certificates.extend(certificate);
}
for certificate in post_shutdown_certificates.iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
for certificate in post_shutdown_certificates.clone() {
assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
}
let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
let committed_certificates = commit_subdag.values().flatten();
for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
for committed_certificate in committed_certificates.clone() {
assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
}
}
Ok(())
}
}