use crate::{
Gateway,
MAX_FETCH_TIMEOUT_IN_MS,
PRIMARY_PING_IN_MS,
Transport,
helpers::{BFTSender, Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests},
spawn_blocking,
};
use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::{BlockSync, BlockSyncMode, locators::BlockLocators};
use snarkos_node_tcp::P2P;
use snarkvm::{
console::{network::Network, types::Field},
ledger::{authority::Authority, block::Block, narwhal::BatchCertificate},
prelude::{cfg_into_iter, cfg_iter},
};
use anyhow::{Result, bail};
use parking_lot::Mutex;
use rayon::prelude::*;
use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
sync::{Mutex as TMutex, OnceCell, oneshot},
task::JoinHandle,
};
#[derive(Clone)]
pub struct Sync<N: Network> {
gateway: Gateway<N>,
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
block_sync: BlockSync<N>,
pending: Arc<Pending<Field<N>, BatchCertificate<N>>>,
bft_sender: Arc<OnceCell<BFTSender<N>>>,
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
response_lock: Arc<TMutex<()>>,
sync_lock: Arc<TMutex<()>>,
latest_block_responses: Arc<TMutex<HashMap<u32, Block<N>>>>,
}
impl<N: Network> Sync<N> {
pub fn new(gateway: Gateway<N>, storage: Storage<N>, ledger: Arc<dyn LedgerService<N>>) -> Self {
let block_sync = BlockSync::new(BlockSyncMode::Gateway, ledger.clone(), gateway.tcp().clone());
Self {
gateway,
storage,
ledger,
block_sync,
pending: Default::default(),
bft_sender: Default::default(),
handles: Default::default(),
response_lock: Default::default(),
sync_lock: Default::default(),
latest_block_responses: Default::default(),
}
}
pub async fn initialize(&self, bft_sender: Option<BFTSender<N>>) -> Result<()> {
if let Some(bft_sender) = bft_sender {
self.bft_sender.set(bft_sender).expect("BFT sender already set in gateway");
}
info!("Syncing storage with the ledger...");
self.sync_storage_with_ledger_at_bootup().await
}
pub async fn run(&self, sync_receiver: SyncReceiver<N>) -> Result<()> {
info!("Starting the sync module...");
let self_ = self.clone();
self.handles.lock().push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
loop {
tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
let communication = &self_.gateway;
self_.block_sync.try_block_sync(communication).await;
if let Err(e) = self_.sync_storage_with_blocks().await {
error!("Unable to sync storage with blocks - {e}");
}
if self_.is_synced() {
self_.latest_block_responses.lock().await.clear();
}
}
}));
let self_ = self.clone();
self.spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
let self__ = self_.clone();
let _ = spawn_blocking!({
self__.pending.clear_expired_callbacks();
Ok(())
});
}
});
let SyncReceiver {
mut rx_block_sync_advance_with_sync_blocks,
mut rx_block_sync_remove_peer,
mut rx_block_sync_update_peer_locators,
mut rx_certificate_request,
mut rx_certificate_response,
} = sync_receiver;
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, blocks, callback)) = rx_block_sync_advance_with_sync_blocks.recv().await {
if let Err(e) = self_.block_sync.process_block_response(peer_ip, blocks) {
callback.send(Err(e)).ok();
continue;
}
if let Err(e) = self_.sync_storage_with_blocks().await {
callback.send(Err(e)).ok();
continue;
}
callback.send(Ok(())).ok();
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some(peer_ip) = rx_block_sync_remove_peer.recv().await {
self_.block_sync.remove_peer(&peer_ip);
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, locators, callback)) = rx_block_sync_update_peer_locators.recv().await {
let self_clone = self_.clone();
tokio::spawn(async move {
let result = self_clone.block_sync.update_peer_locators(peer_ip, locators);
callback.send(result).ok();
});
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, certificate_request)) = rx_certificate_request.recv().await {
self_.send_certificate_response(peer_ip, certificate_request);
}
});
let self_ = self.clone();
self.spawn(async move {
while let Some((peer_ip, certificate_response)) = rx_certificate_response.recv().await {
self_.finish_certificate_request(peer_ip, certificate_response)
}
});
Ok(())
}
}
impl<N: Network> Sync<N> {
pub async fn sync_storage_with_ledger_at_bootup(&self) -> Result<()> {
let latest_block = self.ledger.latest_block();
let block_height = latest_block.height();
let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
let gc_height = block_height.saturating_sub(max_gc_blocks);
let blocks = self.ledger.get_blocks(gc_height..block_height.saturating_add(1))?;
let _lock = self.sync_lock.lock().await;
debug!("Syncing storage with the ledger from block {} to {}...", gc_height, block_height.saturating_add(1));
self.storage.sync_height_with_block(latest_block.height());
self.storage.sync_round_with_block(latest_block.round());
self.storage.garbage_collect_certificates(latest_block.round());
for block in &blocks {
if let Authority::Quorum(subdag) = block.authority() {
let unconfirmed_transactions = cfg_iter!(block.transactions())
.filter_map(|tx| {
tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
})
.collect::<HashMap<_, _>>();
for certificates in subdag.values().cloned() {
cfg_into_iter!(certificates).for_each(|certificate| {
self.storage.sync_certificate_with_block(block, certificate, &unconfirmed_transactions);
});
}
}
}
let certificates = blocks
.iter()
.flat_map(|block| {
match block.authority() {
Authority::Beacon(_) => None,
Authority::Quorum(subdag) => Some(subdag.values().flatten().cloned().collect::<Vec<_>>()),
}
})
.flatten()
.collect::<Vec<_>>();
if let Some(bft_sender) = self.bft_sender.get() {
if let Err(e) = bft_sender.tx_sync_bft_dag_at_bootup.send(certificates).await {
bail!("Failed to update the BFT DAG from sync: {e}");
}
}
Ok(())
}
pub async fn sync_storage_with_blocks(&self) -> Result<()> {
let _lock = self.response_lock.lock().await;
let mut current_height = self.ledger.latest_block_height() + 1;
let tip = self.block_sync.find_sync_peers().map(|(x, _)| x.into_values().max().unwrap_or(0)).unwrap_or(0);
let max_gc_blocks = u32::try_from(self.storage.max_gc_rounds())?.saturating_div(2);
let max_gc_height = tip.saturating_sub(max_gc_blocks);
if current_height <= max_gc_height {
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the ledger to block {}...", block.height());
self.sync_ledger_with_block_without_bft(block).await?;
current_height += 1;
}
if current_height > max_gc_height {
if let Err(e) = self.sync_storage_with_ledger_at_bootup().await {
error!("BFT sync (with bootup routine) failed - {e}");
}
}
}
while let Some(block) = self.block_sync.process_next_block(current_height) {
info!("Syncing the BFT to block {}...", block.height());
self.sync_storage_with_block(block).await?;
current_height += 1;
}
Ok(())
}
async fn sync_ledger_with_block_without_bft(&self, block: Block<N>) -> Result<()> {
let _lock = self.sync_lock.lock().await;
let self_ = self.clone();
tokio::task::spawn_blocking(move || {
self_.ledger.check_next_block(&block)?;
self_.ledger.advance_to_next_block(&block)?;
self_.storage.sync_height_with_block(block.height());
self_.storage.sync_round_with_block(block.round());
Ok(())
})
.await?
}
pub async fn sync_storage_with_block(&self, block: Block<N>) -> Result<()> {
let _lock = self.sync_lock.lock().await;
let mut latest_block_responses = self.latest_block_responses.lock().await;
if self.ledger.contains_block_height(block.height()) || latest_block_responses.contains_key(&block.height()) {
return Ok(());
}
if let Authority::Quorum(subdag) = block.authority() {
let unconfirmed_transactions = cfg_iter!(block.transactions())
.filter_map(|tx| {
tx.to_unconfirmed_transaction().map(|unconfirmed| (unconfirmed.id(), unconfirmed)).ok()
})
.collect::<HashMap<_, _>>();
for certificates in subdag.values().cloned() {
cfg_into_iter!(certificates.clone()).for_each(|certificate| {
self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions);
});
for certificate in certificates {
if let Some(bft_sender) = self.bft_sender.get() {
if let Err(e) = bft_sender.send_sync_bft(certificate).await {
bail!("Sync - {e}");
};
}
}
}
}
let latest_block_height = self.ledger.latest_block_height();
latest_block_responses.insert(block.height(), block);
latest_block_responses.retain(|height, _| *height > latest_block_height);
let contiguous_blocks: Vec<Block<N>> = (latest_block_height.saturating_add(1)..)
.take_while(|&k| latest_block_responses.contains_key(&k))
.filter_map(|k| latest_block_responses.get(&k).cloned())
.collect();
for next_block in contiguous_blocks.into_iter() {
let next_block_height = next_block.height();
let leader_certificate = match next_block.authority() {
Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
_ => bail!("Received a block with an unexpected authority type."),
};
let commit_round = leader_certificate.round();
let certificate_round = commit_round.saturating_add(1);
let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?;
let certificates = self.storage.get_certificates_for_round(certificate_round);
let authors = certificates
.iter()
.filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
true => Some(c.author()),
false => None,
})
.collect();
debug!("Validating sync block {next_block_height} at round {commit_round}...");
if committee_lookback.is_availability_threshold_reached(&authors) {
let mut current_certificate = leader_certificate;
let mut blocks_to_add = vec![next_block];
for height in (self.ledger.latest_block_height().saturating_add(1)..next_block_height).rev() {
let Some(previous_block) = latest_block_responses.get(&height) else {
bail!("Block {height} is missing from the latest block responses.");
};
let previous_certificate = match previous_block.authority() {
Authority::Quorum(subdag) => subdag.leader_certificate().clone(),
_ => bail!("Received a block with an unexpected authority type."),
};
if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
debug!("Previous sync block {height} is linked to the current block {next_block_height}");
blocks_to_add.insert(0, previous_block.clone());
current_certificate = previous_certificate;
}
}
for block in blocks_to_add {
let block_height = block.height();
if block_height != self.ledger.latest_block_height().saturating_add(1) {
warn!("Skipping block {block_height} from the latest block responses - not sequential.");
continue;
}
let self_ = self.clone();
tokio::task::spawn_blocking(move || {
self_.ledger.check_next_block(&block)?;
self_.ledger.advance_to_next_block(&block)?;
self_.storage.sync_height_with_block(block.height());
self_.storage.sync_round_with_block(block.round());
Ok::<(), anyhow::Error>(())
})
.await??;
latest_block_responses.remove(&block_height);
}
} else {
debug!(
"Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..."
);
}
}
Ok(())
}
fn is_linked(
&self,
previous_certificate: BatchCertificate<N>,
current_certificate: BatchCertificate<N>,
) -> Result<bool> {
let mut traversal = vec![current_certificate.clone()];
for round in (previous_certificate.round()..current_certificate.round()).rev() {
let certificates = self.storage.get_certificates_for_round(round);
traversal = certificates
.into_iter()
.filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
.collect();
}
Ok(traversal.contains(&previous_certificate))
}
}
impl<N: Network> Sync<N> {
pub fn is_synced(&self) -> bool {
if self.gateway.number_of_connected_peers() == 0 {
return false;
}
self.block_sync.is_block_synced()
}
pub fn num_blocks_behind(&self) -> u32 {
self.block_sync.num_blocks_behind()
}
pub const fn is_gateway_mode(&self) -> bool {
self.block_sync.mode().is_gateway()
}
pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
self.block_sync.get_block_locators()
}
#[cfg(test)]
#[doc(hidden)]
pub(super) fn block_sync(&self) -> &BlockSync<N> {
&self.block_sync
}
}
impl<N: Network> Sync<N> {
pub async fn send_certificate_request(
&self,
peer_ip: SocketAddr,
certificate_id: Field<N>,
) -> Result<BatchCertificate<N>> {
let (callback_sender, callback_receiver) = oneshot::channel();
let num_sent_requests = self.pending.num_sent_requests(certificate_id);
let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(certificate_id, peer_ip);
let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round());
let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
self.pending.insert(certificate_id, peer_ip, Some((callback_sender, should_send_request)));
if should_send_request {
if self.gateway.send(peer_ip, Event::CertificateRequest(certificate_id.into())).await.is_none() {
bail!("Unable to fetch batch certificate {certificate_id} - failed to send request")
}
} else {
debug!(
"Skipped sending request for certificate {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
fmt_id(certificate_id)
);
}
match tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver).await {
Ok(result) => Ok(result?),
Err(e) => bail!("Unable to fetch certificate {} - (timeout) {e}", fmt_id(certificate_id)),
}
}
fn send_certificate_response(&self, peer_ip: SocketAddr, request: CertificateRequest<N>) {
if let Some(certificate) = self.storage.get_certificate(request.certificate_id) {
let self_ = self.clone();
tokio::spawn(async move {
let _ = self_.gateway.send(peer_ip, Event::CertificateResponse(certificate.into())).await;
});
}
}
fn finish_certificate_request(&self, peer_ip: SocketAddr, response: CertificateResponse<N>) {
let certificate = response.certificate;
let exists = self.pending.get_peers(certificate.id()).unwrap_or_default().contains(&peer_ip);
if exists {
self.pending.remove(certificate.id(), Some(certificate));
}
}
}
impl<N: Network> Sync<N> {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
}
pub async fn shut_down(&self) {
info!("Shutting down the sync module...");
let _lock = self.response_lock.lock().await;
let _lock = self.sync_lock.lock().await;
self.handles.lock().iter().for_each(|handle| handle.abort());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{helpers::now, ledger_service::CoreLedgerService, storage_service::BFTMemoryService};
use snarkos_account::Account;
use snarkvm::{
console::{
account::{Address, PrivateKey},
network::MainnetV0,
},
ledger::{
narwhal::{BatchCertificate, BatchHeader, Subdag},
store::{ConsensusStore, helpers::memory::ConsensusMemory},
},
prelude::{Ledger, VM},
utilities::TestRng,
};
use aleo_std::StorageMode;
use indexmap::IndexSet;
use rand::Rng;
use std::collections::BTreeMap;
type CurrentNetwork = MainnetV0;
type CurrentLedger = Ledger<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
type CurrentConsensusStore = ConsensusStore<CurrentNetwork, ConsensusMemory<CurrentNetwork>>;
#[tokio::test]
#[tracing_test::traced_test]
async fn test_commit_via_is_linked() -> anyhow::Result<()> {
let rng = &mut TestRng::default();
let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
let commit_round = 2;
let store = CurrentConsensusStore::open(None).unwrap();
let account: Account<CurrentNetwork> = Account::new(rng)?;
let seed: u64 = rng.gen();
let genesis_rng = &mut TestRng::from_seed(seed);
let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
let genesis_rng = &mut TestRng::from_seed(seed);
let private_keys = [
*account.private_key(),
PrivateKey::new(genesis_rng)?,
PrivateKey::new(genesis_rng)?,
PrivateKey::new(genesis_rng)?,
];
let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
let (round_to_certificates_map, committee) = {
let addresses = vec![
Address::try_from(private_keys[0])?,
Address::try_from(private_keys[1])?,
Address::try_from(private_keys[2])?,
Address::try_from(private_keys[3])?,
];
let committee = ledger.latest_committee().unwrap();
let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
HashMap::new();
let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
for round in 0..=commit_round + 8 {
let mut current_certificates = IndexSet::new();
let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
IndexSet::new()
} else {
previous_certificates.iter().map(|c| c.id()).collect()
};
let committee_id = committee.id();
if round <= 5 {
let leader = committee.get_leader(round).unwrap();
let leader_index = addresses.iter().position(|&address| address == leader).unwrap();
let non_leader_index = addresses.iter().position(|&address| address != leader).unwrap();
for i in [leader_index, non_leader_index].into_iter() {
let batch_header = BatchHeader::new(
&private_keys[i],
round,
now(),
committee_id,
Default::default(),
previous_certificate_ids.clone(),
rng,
)
.unwrap();
let mut signatures = IndexSet::with_capacity(4);
for (j, private_key_2) in private_keys.iter().enumerate() {
if i != j {
signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
}
}
current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
}
}
if round > 5 {
for (i, private_key_1) in private_keys.iter().enumerate() {
let batch_header = BatchHeader::new(
private_key_1,
round,
now(),
committee_id,
Default::default(),
previous_certificate_ids.clone(),
rng,
)
.unwrap();
let mut signatures = IndexSet::with_capacity(4);
for (j, private_key_2) in private_keys.iter().enumerate() {
if i != j {
signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
}
}
current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
}
}
round_to_certificates_map.insert(round, current_certificates.clone());
previous_certificates = current_certificates.clone();
}
(round_to_certificates_map, committee)
};
let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
for i in 1..=commit_round + 8 {
let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
certificates.extend(c);
}
for certificate in certificates.clone().iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let leader_round_1 = commit_round;
let leader_1 = committee.get_leader(leader_round_1).unwrap();
let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
let block_1 = {
let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
let mut leader_cert_map = IndexSet::new();
leader_cert_map.insert(leader_certificate.clone());
let mut previous_cert_map = IndexSet::new();
for cert in storage.get_certificates_for_round(commit_round - 1) {
previous_cert_map.insert(cert);
}
subdag_map.insert(commit_round, leader_cert_map.clone());
subdag_map.insert(commit_round - 1, previous_cert_map.clone());
let subdag = Subdag::from(subdag_map.clone())?;
core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
};
core_ledger.advance_to_next_block(&block_1)?;
let leader_round_2 = commit_round + 2;
let leader_2 = committee.get_leader(leader_round_2).unwrap();
let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
let block_2 = {
let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
let mut leader_cert_map_2 = IndexSet::new();
leader_cert_map_2.insert(leader_certificate_2.clone());
let mut previous_cert_map_2 = IndexSet::new();
for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
previous_cert_map_2.insert(cert);
}
let mut prev_commit_cert_map_2 = IndexSet::new();
for cert in storage.get_certificates_for_round(leader_round_2 - 2) {
if cert != leader_certificate {
prev_commit_cert_map_2.insert(cert);
}
}
subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
subdag_map_2.insert(leader_round_2 - 2, prev_commit_cert_map_2.clone());
let subdag_2 = Subdag::from(subdag_map_2.clone())?;
core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
};
core_ledger.advance_to_next_block(&block_2)?;
let leader_round_3 = commit_round + 4;
let leader_3 = committee.get_leader(leader_round_3).unwrap();
let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
let block_3 = {
let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
let mut leader_cert_map_3 = IndexSet::new();
leader_cert_map_3.insert(leader_certificate_3.clone());
let mut previous_cert_map_3 = IndexSet::new();
for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
previous_cert_map_3.insert(cert);
}
let mut prev_commit_cert_map_3 = IndexSet::new();
for cert in storage.get_certificates_for_round(leader_round_3 - 2) {
if cert != leader_certificate_2 {
prev_commit_cert_map_3.insert(cert);
}
}
subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
subdag_map_3.insert(leader_round_3 - 2, prev_commit_cert_map_3.clone());
let subdag_3 = Subdag::from(subdag_map_3.clone())?;
core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
};
core_ledger.advance_to_next_block(&block_3)?;
let syncing_ledger = Arc::new(CoreLedgerService::new(
CurrentLedger::load(genesis, StorageMode::Production).unwrap(),
Default::default(),
));
let gateway = Gateway::new(account.clone(), storage.clone(), syncing_ledger.clone(), None, &[], None)?;
let sync = Sync::new(gateway.clone(), storage.clone(), syncing_ledger.clone());
sync.sync_storage_with_block(block_1).await?;
assert_eq!(syncing_ledger.latest_block_height(), 1);
sync.sync_storage_with_block(block_2).await?;
assert_eq!(syncing_ledger.latest_block_height(), 2);
sync.sync_storage_with_block(block_3).await?;
assert_eq!(syncing_ledger.latest_block_height(), 3);
assert!(syncing_ledger.contains_block_height(1));
assert!(syncing_ledger.contains_block_height(2));
Ok(())
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_pending_certificates() -> anyhow::Result<()> {
let rng = &mut TestRng::default();
let max_gc_rounds = BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
let commit_round = 2;
let store = CurrentConsensusStore::open(None).unwrap();
let account: Account<CurrentNetwork> = Account::new(rng)?;
let seed: u64 = rng.gen();
let genesis_rng = &mut TestRng::from_seed(seed);
let genesis = VM::from(store).unwrap().genesis_beacon(account.private_key(), genesis_rng).unwrap();
let genesis_rng = &mut TestRng::from_seed(seed);
let private_keys = [
*account.private_key(),
PrivateKey::new(genesis_rng)?,
PrivateKey::new(genesis_rng)?,
PrivateKey::new(genesis_rng)?,
];
let ledger = CurrentLedger::load(genesis.clone(), StorageMode::Production).unwrap();
let core_ledger = Arc::new(CoreLedgerService::new(ledger.clone(), Default::default()));
let (round_to_certificates_map, committee) = {
let committee = ledger.latest_committee().unwrap();
let mut round_to_certificates_map: HashMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> =
HashMap::new();
let mut previous_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::with_capacity(4);
for round in 0..=commit_round + 8 {
let mut current_certificates = IndexSet::new();
let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
IndexSet::new()
} else {
previous_certificates.iter().map(|c| c.id()).collect()
};
let committee_id = committee.id();
for (i, private_key_1) in private_keys.iter().enumerate() {
let batch_header = BatchHeader::new(
private_key_1,
round,
now(),
committee_id,
Default::default(),
previous_certificate_ids.clone(),
rng,
)
.unwrap();
let mut signatures = IndexSet::with_capacity(4);
for (j, private_key_2) in private_keys.iter().enumerate() {
if i != j {
signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
}
}
current_certificates.insert(BatchCertificate::from(batch_header, signatures).unwrap());
}
round_to_certificates_map.insert(round, current_certificates.clone());
previous_certificates = current_certificates.clone();
}
(round_to_certificates_map, committee)
};
let storage = Storage::new(core_ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
let mut certificates: Vec<BatchCertificate<CurrentNetwork>> = Vec::new();
for i in 1..=commit_round + 8 {
let c = (*round_to_certificates_map.get(&i).unwrap()).clone();
certificates.extend(c);
}
for certificate in certificates.clone().iter() {
storage.testing_only_insert_certificate_testing_only(certificate.clone());
}
let leader_round_1 = commit_round;
let leader_1 = committee.get_leader(leader_round_1).unwrap();
let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader_1).unwrap();
let mut subdag_map: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
let block_1 = {
let mut leader_cert_map = IndexSet::new();
leader_cert_map.insert(leader_certificate.clone());
let mut previous_cert_map = IndexSet::new();
for cert in storage.get_certificates_for_round(commit_round - 1) {
previous_cert_map.insert(cert);
}
subdag_map.insert(commit_round, leader_cert_map.clone());
subdag_map.insert(commit_round - 1, previous_cert_map.clone());
let subdag = Subdag::from(subdag_map.clone())?;
core_ledger.prepare_advance_to_next_quorum_block(subdag, Default::default())?
};
core_ledger.advance_to_next_block(&block_1)?;
let leader_round_2 = commit_round + 2;
let leader_2 = committee.get_leader(leader_round_2).unwrap();
let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader_2).unwrap();
let mut subdag_map_2: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
let block_2 = {
let mut leader_cert_map_2 = IndexSet::new();
leader_cert_map_2.insert(leader_certificate_2.clone());
let mut previous_cert_map_2 = IndexSet::new();
for cert in storage.get_certificates_for_round(leader_round_2 - 1) {
previous_cert_map_2.insert(cert);
}
subdag_map_2.insert(leader_round_2, leader_cert_map_2.clone());
subdag_map_2.insert(leader_round_2 - 1, previous_cert_map_2.clone());
let subdag_2 = Subdag::from(subdag_map_2.clone())?;
core_ledger.prepare_advance_to_next_quorum_block(subdag_2, Default::default())?
};
core_ledger.advance_to_next_block(&block_2)?;
let leader_round_3 = commit_round + 4;
let leader_3 = committee.get_leader(leader_round_3).unwrap();
let leader_certificate_3 = storage.get_certificate_for_round_with_author(leader_round_3, leader_3).unwrap();
let mut subdag_map_3: BTreeMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = BTreeMap::new();
let block_3 = {
let mut leader_cert_map_3 = IndexSet::new();
leader_cert_map_3.insert(leader_certificate_3.clone());
let mut previous_cert_map_3 = IndexSet::new();
for cert in storage.get_certificates_for_round(leader_round_3 - 1) {
previous_cert_map_3.insert(cert);
}
subdag_map_3.insert(leader_round_3, leader_cert_map_3.clone());
subdag_map_3.insert(leader_round_3 - 1, previous_cert_map_3.clone());
let subdag_3 = Subdag::from(subdag_map_3.clone())?;
core_ledger.prepare_advance_to_next_quorum_block(subdag_3, Default::default())?
};
core_ledger.advance_to_next_block(&block_3)?;
let pending_certificates = storage.get_pending_certificates();
for certificate in pending_certificates.clone() {
assert!(!core_ledger.contains_certificate(&certificate.id()).unwrap_or(false));
}
let mut committed_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
{
let subdag_maps = [&subdag_map, &subdag_map_2, &subdag_map_3];
for subdag in subdag_maps.iter() {
for subdag_certificates in subdag.values() {
committed_certificates.extend(subdag_certificates.iter().cloned());
}
}
};
let mut candidate_pending_certificates: IndexSet<BatchCertificate<CurrentNetwork>> = IndexSet::new();
for certificate in certificates.clone() {
if !committed_certificates.contains(&certificate) {
candidate_pending_certificates.insert(certificate);
}
}
assert_eq!(pending_certificates, candidate_pending_certificates);
Ok(())
}
}