use std::sync::Arc;
use crate::chain_sync::BadBlockCache;
use crate::networks::Height;
use crate::shim::clock::ALLOWABLE_CLOCK_DRIFT;
use crate::shim::crypto::SignatureType;
use crate::shim::{
address::Address, crypto::verify_bls_aggregate, econ::BLOCK_GAS_LIMIT,
gas::price_list_by_network_version, message::Message, state_tree::StateTree,
};
use crate::state_manager::ExecutedTipset;
use crate::state_manager::{Error as StateManagerError, StateManager, utils::is_valid_for_sending};
use crate::utils::ShallowClone as _;
use crate::{
blocks::{Block, CachingBlockHeader, Error as ForestBlockError, FullTipset, Tipset},
fil_cns::{self, FilecoinConsensus, FilecoinConsensusError},
};
use crate::{
chain::{ChainStore, Error as ChainStoreError},
metrics::HistogramTimerExt,
};
use crate::{
eth::is_valid_eth_tx_for_sending,
message::{MessageRead as _, valid_for_block_inclusion},
};
use ahash::HashMap;
use cid::Cid;
use futures::TryFutureExt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::to_vec;
use itertools::Itertools;
use nunny::Vec as NonEmpty;
use thiserror::Error;
use tokio::task::JoinSet;
use tracing::{trace, warn};
use crate::chain_sync::{consensus::collect_errs, metrics, validation::TipsetValidator};
#[derive(Debug, Error)]
pub enum TipsetSyncerError {
#[error("Block must have a signature")]
BlockWithoutSignature,
#[error("Block without BLS aggregate signature")]
BlockWithoutBlsAggregate,
#[error("Block received from the future: now = {0}, block = {1}")]
TimeTravellingBlock(u64, u64),
#[error("Validation error: {0}")]
Validation(String),
#[error("Processing error: {0}")]
Calculation(String),
#[error("Chain store error: {0}")]
ChainStore(#[from] ChainStoreError),
#[error("StateManager error: {0}")]
StateManager(#[from] StateManagerError),
#[error("Block error: {0}")]
BlockError(#[from] ForestBlockError),
#[error("Querying tipsets from the network failed: {0}")]
NetworkTipsetQueryFailed(String),
#[error("BLS aggregate signature {0} was invalid for msgs {1}")]
BlsAggregateSignatureInvalid(String, String),
#[error("Message signature invalid: {0}")]
MessageSignatureInvalid(String),
#[error("Block message root does not match: expected {0}, computed {1}")]
BlockMessageRootInvalid(String, String),
#[error("Computing message root failed: {0}")]
ComputingMessageRoot(String),
#[error("Resolving address from message failed: {0}")]
ResolvingAddressFromMessage(String),
#[error("Loading tipset parent from the store failed: {0}")]
TipsetParentNotFound(ChainStoreError),
#[error("Consensus error: {0}")]
ConsensusError(FilecoinConsensusError),
}
impl From<tokio::task::JoinError> for TipsetSyncerError {
fn from(err: tokio::task::JoinError) -> Self {
TipsetSyncerError::NetworkTipsetQueryFailed(format!("{err}"))
}
}
impl TipsetSyncerError {
fn concat(errs: NonEmpty<TipsetSyncerError>) -> Self {
let msg = errs.iter().map(|e| e.to_string()).collect_vec().join(", ");
TipsetSyncerError::Validation(msg)
}
}
pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
state_manager: &Arc<StateManager<DB>>,
full_tipset: FullTipset,
bad_block_cache: Option<Arc<BadBlockCache>>,
) -> Result<(), TipsetSyncerError> {
if full_tipset
.key()
.eq(state_manager.chain_store().genesis_tipset().key())
{
trace!("Skipping genesis tipset validation");
return Ok(());
}
let timer = metrics::TIPSET_PROCESSING_TIME.start_timer();
let epoch = full_tipset.epoch();
let parent_state = *full_tipset.parent_state();
let tipset_key = full_tipset.key();
trace!("Tipset keys: {tipset_key}");
let blocks = full_tipset.into_blocks();
let mut validations = JoinSet::new();
for b in blocks {
validations.spawn(validate_block(state_manager.clone(), Arc::new(b)));
}
while let Some(result) = validations.join_next().await {
match result? {
Ok(block) => {
state_manager
.chain_store()
.add_to_tipset_tracker(block.header());
}
Err((cid, why)) => {
warn!(
"Validating block [CID = {cid}, PARENT_STATE = {parent_state}] in EPOCH = {epoch} failed: {why}",
);
match &why {
TipsetSyncerError::TimeTravellingBlock(_, _) => {
}
_ => {
if StateTree::new_from_root(state_manager.blockstore_owned(), &parent_state)
.is_ok()
&& let Some(bad_block_cache) = bad_block_cache
{
bad_block_cache.push(cid);
}
}
};
return Err(why);
}
}
}
drop(timer);
Ok(())
}
async fn validate_block<DB: Blockstore + Sync + Send + 'static>(
state_manager: Arc<StateManager<DB>>,
block: Arc<Block>,
) -> Result<Arc<Block>, (Cid, TipsetSyncerError)> {
let consensus = FilecoinConsensus::new(state_manager.beacon_schedule().clone());
trace!(
"Validating block: epoch = {}, weight = {}, key = {}",
block.header().epoch,
block.header().weight,
block.header().cid(),
);
let chain_store = state_manager.chain_store().clone();
let block_cid = block.cid();
let is_validated = chain_store.is_block_validated(block_cid);
if is_validated {
return Ok(block);
}
let _timer = metrics::BLOCK_VALIDATION_TIME.start_timer();
let header = block.header();
block_sanity_checks(header).map_err(|e| (*block_cid, e))?;
block_timestamp_checks(header).map_err(|e| (*block_cid, e))?;
let base_tipset = chain_store
.chain_index()
.load_required_tipset(&header.parents)
.map_err(|why| (*block_cid, TipsetSyncerError::TipsetParentNotFound(why)))?;
let lookback_state = ChainStore::get_lookback_tipset_for_round(
state_manager.chain_store().chain_index(),
state_manager.chain_config(),
&base_tipset,
block.header().epoch,
)
.map_err(|e| (*block_cid, e.into()))
.map(|(_, s)| Arc::new(s))?;
let work_addr = state_manager
.get_miner_work_addr(*lookback_state, &header.miner_address)
.map_err(|e| (*block_cid, e.into()))?;
let mut validations = JoinSet::new();
validations.spawn(check_block_messages(
state_manager.shallow_clone(),
block.shallow_clone(),
base_tipset.shallow_clone(),
));
validations.spawn_blocking({
let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
let firehorse_height = state_manager.chain_config().epoch(Height::FireHorse);
let base_tipset = base_tipset.shallow_clone();
let block_store = state_manager.blockstore_owned();
let block = block.shallow_clone();
move || {
let base_fee = crate::chain::compute_base_fee(
&block_store,
&base_tipset,
smoke_height,
firehorse_height,
)
.map_err(|e| {
TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
})?;
let parent_base_fee = &block.header.parent_base_fee;
if &base_fee != parent_base_fee {
return Err(TipsetSyncerError::Validation(format!(
"base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
)));
}
Ok(())
}
});
validations.spawn_blocking({
let block_store = state_manager.blockstore_owned();
let base_tipset = base_tipset.shallow_clone();
let weight = header.weight.clone();
move || {
let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
TipsetSyncerError::Calculation(format!("Error calculating weight: {e:#}"))
})?;
if weight != calc_weight {
return Err(TipsetSyncerError::Validation(format!(
"Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
)));
}
Ok(())
}
});
validations.spawn({
let state_manager = state_manager.clone();
let block = block.clone();
async move {
let header = block.header();
let ExecutedTipset {
state_root,
receipt_root,
..
} = state_manager
.load_executed_tipset(&base_tipset)
.await
.map_err(|e| {
TipsetSyncerError::Calculation(format!("Failed to calculate state: {e:#}"))
})?;
if state_root != header.state_root {
return Err(TipsetSyncerError::Validation(format!(
"Parent state root did not match computed state: {} (header), {} (computed)",
header.state_root, state_root,
)));
}
if receipt_root != header.message_receipts {
return Err(TipsetSyncerError::Validation(format!(
"Parent receipt root did not match computed root: {} (header), {} (computed)",
header.message_receipts, receipt_root
)));
}
Ok(())
}
});
validations.spawn_blocking({
let block = block.clone();
move || {
block.header().verify_signature_against(&work_addr)?;
Ok(())
}
});
validations.spawn({
let block = block.clone();
async move {
consensus
.validate_block(state_manager, block)
.map_err(|errs| {
TipsetSyncerError::concat(
errs.into_iter_ne()
.map(TipsetSyncerError::ConsensusError)
.collect_vec(),
)
})
.await
}
});
if let Err(errs) = collect_errs(validations).await {
return Err((*block_cid, TipsetSyncerError::concat(errs)));
}
chain_store.mark_block_as_validated(block_cid);
Ok(block)
}
async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
state_manager: Arc<StateManager<DB>>,
block: Arc<Block>,
base_tipset: Tipset,
) -> Result<(), TipsetSyncerError> {
let network_version = state_manager
.chain_config()
.network_version(block.header.epoch);
let eth_chain_id = state_manager.chain_config().eth_chain_id;
if let Some(sig) = &block.header().bls_aggregate {
let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
let mut cids = Vec::with_capacity(block.bls_msgs().len());
let db = state_manager.blockstore();
for m in block.bls_msgs() {
let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
pub_keys.push(pk);
cids.push(m.cid().to_bytes());
}
if !verify_bls_aggregate(
&cids.iter().map(|x| x.as_slice()).collect_vec(),
&pub_keys,
sig,
) {
return Err(TipsetSyncerError::BlsAggregateSignatureInvalid(
format!("{sig:?}"),
format!("{cids:?}"),
));
}
} else {
return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
}
let price_list = price_list_by_network_version(network_version);
let mut sum_gas_limit = 0;
let mut check_msg = |msg: &Message,
account_sequences: &mut HashMap<Address, u64>,
tree: &StateTree<DB>|
-> anyhow::Result<()> {
let min_gas = price_list.on_chain_message(to_vec(msg).unwrap().len());
valid_for_block_inclusion(msg, min_gas.total(), network_version)
.map_err(|e| anyhow::anyhow!("{}", e))?;
sum_gas_limit += msg.gas_limit;
if sum_gas_limit > BLOCK_GAS_LIMIT {
anyhow::bail!("block gas limit exceeded");
}
let sequence: u64 = match account_sequences.get(&msg.from()) {
Some(sequence) => *sequence,
None => {
let actor = tree.get_actor(&msg.from)?.ok_or_else(|| {
anyhow::anyhow!(
"Failed to retrieve nonce for addr: Actor does not exist in state"
)
})?;
let network_version = state_manager
.chain_config()
.network_version(block.header.epoch);
if !is_valid_for_sending(network_version, &actor) {
anyhow::bail!("not valid for sending!");
}
actor.sequence
}
};
if sequence != msg.sequence {
anyhow::bail!(
"Message has incorrect sequence (exp: {} got: {})",
sequence,
msg.sequence
);
}
account_sequences.insert(msg.from(), sequence + 1);
Ok(())
};
let mut account_sequences: HashMap<Address, u64> = HashMap::default();
let ExecutedTipset { state_root, .. } = state_manager
.load_executed_tipset(&base_tipset)
.await
.map_err(|e| TipsetSyncerError::Calculation(format!("Could not update state: {e:#}")))?;
let tree =
StateTree::new_from_root(state_manager.blockstore_owned(), &state_root).map_err(|e| {
TipsetSyncerError::Calculation(format!(
"Could not load from new state root in state manager: {e:#}"
))
})?;
for (i, msg) in block.bls_msgs().iter().enumerate() {
check_msg(msg, &mut account_sequences, &tree).map_err(|e| {
TipsetSyncerError::Validation(format!(
"Block had invalid BLS message at index {i}: {e:#}"
))
})?;
}
for (i, msg) in block.secp_msgs().iter().enumerate() {
if msg.signature().signature_type() == SignatureType::Delegated
&& !is_valid_eth_tx_for_sending(eth_chain_id, network_version, msg)
{
return Err(TipsetSyncerError::Validation(
"Network version must be at least NV23 for legacy Ethereum transactions".to_owned(),
));
}
check_msg(msg.message(), &mut account_sequences, &tree).map_err(|e| {
TipsetSyncerError::Validation(format!(
"block had an invalid secp message at index {i}: {e:#}"
))
})?;
let key_addr = state_manager
.resolve_to_key_addr(&msg.from(), &base_tipset)
.await
.map_err(|e| TipsetSyncerError::ResolvingAddressFromMessage(e.to_string()))?;
msg.signature
.authenticate_msg(eth_chain_id, msg, &key_addr)
.map_err(|e| TipsetSyncerError::MessageSignatureInvalid(e.to_string()))?;
}
let msg_root = TipsetValidator::compute_msg_root(
state_manager.blockstore(),
block.bls_msgs(),
block.secp_msgs(),
)
.map_err(|err| TipsetSyncerError::ComputingMessageRoot(err.to_string()))?;
if block.header().messages != msg_root {
return Err(TipsetSyncerError::BlockMessageRootInvalid(
format!("{:?}", block.header().messages),
format!("{msg_root:?}"),
));
}
Ok(())
}
fn block_sanity_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
if header.signature.is_none() {
return Err(TipsetSyncerError::BlockWithoutSignature);
}
if header.bls_aggregate.is_none() {
return Err(TipsetSyncerError::BlockWithoutBlsAggregate);
}
Ok(())
}
fn block_timestamp_checks(header: &CachingBlockHeader) -> Result<(), TipsetSyncerError> {
let time_now = chrono::Utc::now().timestamp() as u64;
if header.timestamp > time_now.saturating_add(ALLOWABLE_CLOCK_DRIFT) {
return Err(TipsetSyncerError::TimeTravellingBlock(
time_now,
header.timestamp,
));
} else if header.timestamp > time_now {
warn!(
"Got block from the future, but within clock drift threshold, {} > {}",
header.timestamp, time_now
);
}
Ok(())
}