use crate::{
block_storage::{
block_tree::BlockTree,
tracing::{observe_block, BlockStage},
BlockReader,
},
counters,
persistent_liveness_storage::{
PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
},
state_replication::StateComputer,
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use aptos_infallible::RwLock;
use aptos_logger::prelude::*;
use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::TransactionStatus};
use consensus_types::{
block::Block, common::Round, executed_block::ExecutedBlock, quorum_cert::QuorumCert,
sync_info::SyncInfo, timeout_2chain::TwoChainTimeoutCertificate,
};
use executor_types::{Error, StateComputeResult};
use futures::executor::block_on;
#[cfg(test)]
use std::collections::VecDeque;
use std::{sync::Arc, time::Duration};
#[cfg(test)]
#[path = "block_store_test.rs"]
mod block_store_test;
#[path = "sync_manager.rs"]
pub mod sync_manager;
fn update_counters_for_ordered_blocks(ordered_blocks: &[Arc<ExecutedBlock>]) {
for block in ordered_blocks {
observe_block(block.block().timestamp_usecs(), BlockStage::ORDERED);
}
}
pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<ExecutedBlock>]) {
for block in blocks_to_commit {
observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED);
let txn_status = block.compute_result().compute_status();
counters::NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64);
counters::COMMITTED_BLOCKS_COUNT.inc();
counters::LAST_COMMITTED_ROUND.set(block.round() as i64);
counters::LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);
for status in txn_status.iter() {
match status {
TransactionStatus::Keep(_) => {
counters::COMMITTED_TXNS_COUNT
.with_label_values(&["success"])
.inc();
}
TransactionStatus::Discard(_) => {
counters::COMMITTED_TXNS_COUNT
.with_label_values(&["failed"])
.inc();
}
TransactionStatus::Retry => {
counters::COMMITTED_TXNS_COUNT
.with_label_values(&["retry"])
.inc();
}
}
}
}
}
pub struct BlockStore {
inner: Arc<RwLock<BlockTree>>,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn PersistentLivenessStorage>,
time_service: Arc<dyn TimeService>,
back_pressure_limit: Round,
}
impl BlockStore {
pub fn new(
storage: Arc<dyn PersistentLivenessStorage>,
initial_data: RecoveryData,
state_computer: Arc<dyn StateComputer>,
max_pruned_blocks_in_mem: usize,
time_service: Arc<dyn TimeService>,
back_pressure_limit: Round,
) -> Self {
let highest_2chain_tc = initial_data.highest_2chain_timeout_certificate();
let (root, root_metadata, blocks, quorum_certs) = initial_data.take();
let block_store = block_on(Self::build(
root,
root_metadata,
blocks,
quorum_certs,
highest_2chain_tc,
state_computer,
storage,
max_pruned_blocks_in_mem,
time_service,
back_pressure_limit,
));
block_on(block_store.try_commit());
block_store
}
async fn try_commit(&self) {
let mut certs = self.inner.read().get_all_quorum_certs_with_commit_info();
certs.sort_unstable_by_key(|qc| qc.commit_info().round());
for qc in certs {
if qc.commit_info().round() > self.commit_root().round() {
info!(
"trying to commit to round {} with ledger info {}",
qc.commit_info().round(),
qc.ledger_info()
);
if let Err(e) = self.commit(qc.clone()).await {
error!("Error in try-committing blocks. {}", e.to_string());
}
}
}
}
async fn build(
root: RootInfo,
root_metadata: RootMetadata,
blocks: Vec<Block>,
quorum_certs: Vec<QuorumCert>,
highest_2chain_timeout_cert: Option<TwoChainTimeoutCertificate>,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn PersistentLivenessStorage>,
max_pruned_blocks_in_mem: usize,
time_service: Arc<dyn TimeService>,
back_pressure_limit: Round,
) -> Self {
let RootInfo(root_block, root_qc, root_ordered_cert, root_commit_cert) = root;
assert!(
root_qc.certified_block().version() == 0
|| root_qc.certified_block().version() == root_metadata.version(),
"root qc version {} doesn't match committed trees {}",
root_qc.certified_block().version(),
root_metadata.version(),
);
assert!(
root_qc.certified_block().executed_state_id() == *ACCUMULATOR_PLACEHOLDER_HASH
|| root_qc.certified_block().executed_state_id() == root_metadata.accu_hash,
"root qc state id {} doesn't match committed trees {}",
root_qc.certified_block().executed_state_id(),
root_metadata.accu_hash,
);
let result = StateComputeResult::new(
root_metadata.accu_hash,
root_metadata.frozen_root_hashes,
root_metadata.num_leaves,
vec![],
0,
None,
vec![],
vec![],
vec![],
);
let executed_root_block = ExecutedBlock::new(
root_block,
result,
);
let tree = BlockTree::new(
executed_root_block,
root_qc,
root_ordered_cert,
root_commit_cert,
max_pruned_blocks_in_mem,
highest_2chain_timeout_cert.map(Arc::new),
);
let block_store = Self {
inner: Arc::new(RwLock::new(tree)),
state_computer,
storage,
time_service,
back_pressure_limit,
};
for block in blocks {
block_store
.execute_and_insert_block(block)
.await
.unwrap_or_else(|e| {
panic!("[BlockStore] failed to insert block during build {:?}", e)
});
}
for qc in quorum_certs {
block_store
.insert_single_quorum_cert(qc)
.unwrap_or_else(|e| {
panic!("[BlockStore] failed to insert quorum during build{:?}", e)
});
}
counters::LAST_COMMITTED_ROUND.set(block_store.ordered_root().round() as i64);
block_store
}
pub async fn commit(&self, finality_proof: QuorumCert) -> anyhow::Result<()> {
let block_id_to_commit = finality_proof.commit_info().id();
let block_to_commit = self
.get_block(block_id_to_commit)
.ok_or_else(|| format_err!("Committed block id not found"))?;
ensure!(
block_to_commit.round() > self.ordered_root().round(),
"Committed block round lower than root"
);
let blocks_to_commit = self
.path_from_ordered_root(block_id_to_commit)
.unwrap_or_default();
assert!(!blocks_to_commit.is_empty());
let block_tree = self.inner.clone();
let storage = self.storage.clone();
self.state_computer
.commit(
&blocks_to_commit,
finality_proof.ledger_info().clone(),
Box::new(
move |committed_blocks: &[Arc<ExecutedBlock>],
commit_decision: LedgerInfoWithSignatures| {
block_tree.write().commit_callback(
storage,
committed_blocks,
finality_proof,
commit_decision,
);
},
),
)
.await
.expect("Failed to persist commit");
self.inner.write().update_ordered_root(block_to_commit.id());
update_counters_for_ordered_blocks(&blocks_to_commit);
Ok(())
}
pub async fn rebuild(
&self,
root: RootInfo,
root_metadata: RootMetadata,
blocks: Vec<Block>,
quorum_certs: Vec<QuorumCert>,
) {
let max_pruned_blocks_in_mem = self.inner.read().max_pruned_blocks_in_mem();
let prev_2chain_htc = self
.highest_2chain_timeout_cert()
.map(|tc| tc.as_ref().clone());
let BlockStore { inner, .. } = Self::build(
root,
root_metadata,
blocks,
quorum_certs,
prev_2chain_htc,
Arc::clone(&self.state_computer),
Arc::clone(&self.storage),
max_pruned_blocks_in_mem,
Arc::clone(&self.time_service),
self.back_pressure_limit,
)
.await;
let to_remove = self.inner.read().get_all_block_id();
if let Err(e) = self.storage.prune_tree(to_remove) {
error!(error = ?e, "Fail to delete block from consensus db");
}
*self.inner.write() = Arc::try_unwrap(inner)
.unwrap_or_else(|_| panic!("New block tree is not shared"))
.into_inner();
self.try_commit().await;
}
pub async fn execute_and_insert_block(
&self,
block: Block,
) -> anyhow::Result<Arc<ExecutedBlock>> {
if let Some(existing_block) = self.get_block(block.id()) {
return Ok(existing_block);
}
ensure!(
self.inner.read().ordered_root().round() < block.round(),
"Block with old round"
);
let executed_block = match self.execute_block(block.clone()).await {
Ok(res) => Ok(res),
Err(Error::BlockNotFound(parent_block_id)) => {
let blocks_to_reexecute = self
.path_from_ordered_root(parent_block_id)
.unwrap_or_default();
for block in blocks_to_reexecute {
self.execute_block(block.block().clone()).await?;
}
self.execute_block(block).await
}
err => err,
}?;
let block_time = Duration::from_micros(executed_block.timestamp_usecs());
let current_timestamp = self.time_service.get_current_timestamp();
if let Some(t) = block_time.checked_sub(current_timestamp) {
if t > Duration::from_secs(1) {
error!(
"Long wait time {}ms for block {}",
t.as_millis(),
executed_block.block()
);
}
self.time_service.wait_until(block_time).await;
}
self.storage
.save_tree(vec![executed_block.block().clone()], vec![])
.context("Insert block failed when saving block")?;
self.inner.write().insert_block(executed_block)
}
async fn execute_block(&self, block: Block) -> anyhow::Result<ExecutedBlock, Error> {
let state_compute_result = self
.state_computer
.compute(&block, block.parent_id())
.await?;
Ok(ExecutedBlock::new(block, state_compute_result))
}
pub fn insert_single_quorum_cert(&self, qc: QuorumCert) -> anyhow::Result<()> {
match self.get_block(qc.certified_block().id()) {
Some(executed_block) => {
ensure!(
executed_block
.block_info()
.match_ordered_only(qc.certified_block()),
"QC for block {} has different {:?} than local {:?}",
qc.certified_block().id(),
qc.certified_block(),
executed_block.block_info()
);
observe_block(
executed_block.block().timestamp_usecs(),
BlockStage::QC_ADDED,
);
}
None => bail!("Insert {} without having the block in store first", qc),
};
self.storage
.save_tree(vec![], vec![qc.clone()])
.context("Insert block failed when saving quorum")?;
self.inner.write().insert_quorum_cert(qc)
}
pub fn insert_2chain_timeout_certificate(
&self,
tc: Arc<TwoChainTimeoutCertificate>,
) -> anyhow::Result<()> {
let cur_tc_round = self
.highest_2chain_timeout_cert()
.map_or(0, |tc| tc.round());
if tc.round() <= cur_tc_round {
return Ok(());
}
self.storage
.save_highest_2chain_timeout_cert(tc.as_ref())
.context("Timeout certificate insert failed when persisting to DB")?;
self.inner.write().replace_2chain_timeout_cert(tc);
Ok(())
}
#[cfg(test)]
fn prune_tree(&self, next_root_id: HashValue) -> VecDeque<HashValue> {
let id_to_remove = self.inner.read().find_blocks_to_prune(next_root_id);
if let Err(e) = self
.storage
.prune_tree(id_to_remove.clone().into_iter().collect())
{
error!(error = ?e, "fail to delete block");
}
let mut wlock = self.inner.write();
wlock.update_ordered_root(next_root_id);
wlock.update_commit_root(next_root_id);
wlock.process_pruned_blocks(id_to_remove.clone());
id_to_remove
}
}
impl BlockReader for BlockStore {
fn block_exists(&self, block_id: HashValue) -> bool {
self.inner.read().block_exists(&block_id)
}
fn get_block(&self, block_id: HashValue) -> Option<Arc<ExecutedBlock>> {
self.inner.read().get_block(&block_id)
}
fn ordered_root(&self) -> Arc<ExecutedBlock> {
self.inner.read().ordered_root()
}
fn commit_root(&self) -> Arc<ExecutedBlock> {
self.inner.read().commit_root()
}
fn get_quorum_cert_for_block(&self, block_id: HashValue) -> Option<Arc<QuorumCert>> {
self.inner.read().get_quorum_cert_for_block(&block_id)
}
fn path_from_ordered_root(&self, block_id: HashValue) -> Option<Vec<Arc<ExecutedBlock>>> {
self.inner.read().path_from_ordered_root(block_id)
}
fn path_from_commit_root(&self, block_id: HashValue) -> Option<Vec<Arc<ExecutedBlock>>> {
self.inner.read().path_from_commit_root(block_id)
}
fn highest_certified_block(&self) -> Arc<ExecutedBlock> {
self.inner.read().highest_certified_block()
}
fn highest_quorum_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_quorum_cert()
}
fn highest_ordered_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_ordered_cert()
}
fn highest_commit_cert(&self) -> Arc<QuorumCert> {
self.inner.read().highest_commit_cert()
}
fn highest_2chain_timeout_cert(&self) -> Option<Arc<TwoChainTimeoutCertificate>> {
self.inner.read().highest_2chain_timeout_cert()
}
fn sync_info(&self) -> SyncInfo {
SyncInfo::new_decoupled(
self.highest_quorum_cert().as_ref().clone(),
self.highest_ordered_cert().as_ref().clone(),
self.highest_commit_cert().as_ref().clone(),
self.highest_2chain_timeout_cert()
.map(|tc| tc.as_ref().clone()),
)
}
}
#[cfg(any(test, feature = "fuzzing"))]
impl BlockStore {
pub(crate) fn len(&self) -> usize {
self.inner.read().len()
}
pub(crate) fn child_links(&self) -> usize {
self.inner.read().child_links()
}
pub(super) fn pruned_blocks_in_mem(&self) -> usize {
self.inner.read().pruned_blocks_in_mem()
}
pub async fn insert_block_with_qc(&self, block: Block) -> anyhow::Result<Arc<ExecutedBlock>> {
self.insert_single_quorum_cert(block.quorum_cert().clone())?;
self.execute_and_insert_block(block).await
}
}