mod keyed_counter;
#[cfg(test)]
mod tests;
use std::{
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, VecDeque},
convert::Infallible,
fmt::Debug,
hash::Hash,
sync::Arc,
};
use datasize::DataSize;
use derive_more::{Display, From};
use itertools::Itertools;
use smallvec::{smallvec, SmallVec};
use tracing::info;
use crate::{
components::{
block_proposer::DeployInfo,
consensus::{ClContext, ProposedBlock},
Component,
},
effect::{
requests::{BlockValidationRequest, FetcherRequest, StorageRequest},
EffectBuilder, EffectExt, EffectOptionExt, Effects, Responder,
},
types::{
appendable_block::AppendableBlock, Approval, Block, Chainspec, Deploy, DeployHash,
DeployOrTransferHash, DeployWithApprovals, Timestamp,
},
NodeRng,
};
use keyed_counter::KeyedCounter;
use super::fetcher::FetchResult;
#[derive(DataSize, Debug, Display, Clone, Hash, Eq, PartialEq)]
pub(crate) enum ValidatingBlock {
#[display(fmt = "{}", _0.display())]
Block(Box<Block>),
#[display(fmt = "{}", _0.display())]
ProposedBlock(Box<ProposedBlock<ClContext>>),
}
impl From<Block> for ValidatingBlock {
fn from(block: Block) -> ValidatingBlock {
ValidatingBlock::Block(Box::new(block))
}
}
impl From<ProposedBlock<ClContext>> for ValidatingBlock {
fn from(proposed_block: ProposedBlock<ClContext>) -> ValidatingBlock {
ValidatingBlock::ProposedBlock(Box::new(proposed_block))
}
}
impl ValidatingBlock {
fn timestamp(&self) -> Timestamp {
match self {
ValidatingBlock::Block(block) => block.timestamp(),
ValidatingBlock::ProposedBlock(pb) => pb.context().timestamp(),
}
}
fn deploy_hashes(&self) -> Box<dyn Iterator<Item = &DeployHash> + '_> {
match self {
ValidatingBlock::Block(block) => Box::new(block.deploy_hashes().iter()),
ValidatingBlock::ProposedBlock(pb) => Box::new(pb.value().deploy_hashes()),
}
}
fn transfer_hashes(&self) -> Box<dyn Iterator<Item = &DeployHash> + '_> {
match self {
ValidatingBlock::Block(block) => Box::new(block.transfer_hashes().iter()),
ValidatingBlock::ProposedBlock(pb) => Box::new(pb.value().transfer_hashes()),
}
}
fn deploys_and_transfers_iter(
&self,
) -> Box<dyn Iterator<Item = (DeployOrTransferHash, Option<BTreeSet<Approval>>)> + '_> {
match self {
ValidatingBlock::Block(block) => {
let deploys = block
.deploy_hashes()
.iter()
.map(|hash| (DeployOrTransferHash::Deploy(*hash), None));
let transfers = block
.transfer_hashes()
.iter()
.map(|hash| (DeployOrTransferHash::Transfer(*hash), None));
Box::new(deploys.chain(transfers))
}
ValidatingBlock::ProposedBlock(pb) => {
let deploys = pb.value().deploys().iter().map(|dwa| {
(
DeployOrTransferHash::Deploy(*dwa.deploy_hash()),
Some(dwa.approvals().clone()),
)
});
let transfers = pb.value().transfers().iter().map(|dwa| {
(
DeployOrTransferHash::Transfer(*dwa.deploy_hash()),
Some(dwa.approvals().clone()),
)
});
Box::new(deploys.chain(transfers))
}
}
}
}
#[derive(Debug, From, Display)]
pub(crate) enum Event<I> {
#[from]
Request(BlockValidationRequest<I>),
#[display(fmt = "{} found", dt_hash)]
DeployFound {
dt_hash: DeployOrTransferHash,
approvals: BTreeSet<Approval>,
deploy_info: Box<DeployInfo>,
},
#[display(fmt = "{} missing", _0)]
DeployMissing(DeployOrTransferHash),
#[display(fmt = "{} invalid", _0)]
CannotConvertDeploy(DeployOrTransferHash),
}
#[derive(DataSize, Debug)]
pub(crate) struct BlockValidationState<I> {
appendable_block: AppendableBlock,
missing_deploys: HashMap<DeployOrTransferHash, Option<BTreeSet<Approval>>>,
responders: SmallVec<[Responder<bool>; 2]>,
sources: VecDeque<I>,
}
impl<I> BlockValidationState<I>
where
I: PartialEq + Eq + 'static,
{
fn add_source(&mut self, peer: I) -> bool {
if self.sources.contains(&peer) {
true
} else {
self.sources.push_back(peer);
false
}
}
fn source(&mut self) -> Option<I> {
self.sources.pop_front()
}
fn respond<REv>(&mut self, value: bool) -> Effects<REv> {
self.responders
.drain(..)
.flat_map(|responder| responder.respond(value).ignore())
.collect()
}
}
#[derive(DataSize, Debug)]
pub(crate) struct BlockValidator<I> {
#[data_size(skip)]
chainspec: Arc<Chainspec>,
validation_states: HashMap<ValidatingBlock, BlockValidationState<I>>,
in_flight: KeyedCounter<DeployHash>,
}
impl<I> BlockValidator<I>
where
I: Clone + Debug + Send + 'static + Send,
{
pub(crate) fn new(chainspec: Arc<Chainspec>) -> Self {
BlockValidator {
chainspec,
validation_states: HashMap::new(),
in_flight: KeyedCounter::default(),
}
}
fn log_block_with_replay(&self, sender: I, block: &ValidatingBlock) {
let mut deploy_counts = BTreeMap::new();
for (dt_hash, _) in block.deploys_and_transfers_iter() {
*deploy_counts.entry(dt_hash).or_default() += 1;
}
let duplicates = deploy_counts
.into_iter()
.filter_map(|(dt_hash, count): (DeployOrTransferHash, usize)| {
(count > 1).then(|| format!("{} * {}", count, dt_hash))
})
.join(", ");
info!(
peer_id=?sender, %duplicates,
"received invalid block containing duplicated deploys"
);
}
}
impl<I, REv> Component<REv> for BlockValidator<I>
where
I: Clone + Debug + Send + PartialEq + Eq + 'static,
REv: From<Event<I>>
+ From<BlockValidationRequest<I>>
+ From<FetcherRequest<I, Deploy>>
+ From<StorageRequest>
+ Send,
{
type Event = Event<I>;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
let mut effects = Effects::new();
match event {
Event::Request(BlockValidationRequest {
block,
sender,
responder,
}) => {
let deploy_count = block.deploy_hashes().count() + block.transfer_hashes().count();
if deploy_count == 0 {
return responder.respond(true).ignore();
}
let block_deploys: HashMap<_, _> = block.deploys_and_transfers_iter().collect();
if block_deploys.len() != deploy_count {
self.log_block_with_replay(sender, &block);
return responder.respond(false).ignore();
}
match self.validation_states.entry(block) {
Entry::Occupied(mut entry) => {
if entry.get().missing_deploys.is_empty() {
effects.extend(responder.respond(true).ignore());
} else {
entry.get_mut().responders.push(responder);
entry.get_mut().add_source(sender);
}
}
Entry::Vacant(entry) => {
let in_flight = &mut self.in_flight;
effects.extend(entry.key().deploys_and_transfers_iter().flat_map(
|(dt_hash, _)| {
in_flight.inc(&dt_hash.into());
fetch_deploy(effect_builder, dt_hash, sender.clone())
},
));
let block_timestamp = entry.key().timestamp();
let deploy_config = self.chainspec.deploy_config;
entry.insert(BlockValidationState {
appendable_block: AppendableBlock::new(deploy_config, block_timestamp),
missing_deploys: block_deploys,
responders: smallvec![responder],
sources: VecDeque::new(),
});
}
}
}
Event::DeployFound {
dt_hash,
approvals,
deploy_info,
} => {
self.in_flight.dec(&dt_hash.into());
let mut invalid = Vec::new();
for (key, state) in self.validation_states.iter_mut() {
if let Some(maybe_approvals) = state.missing_deploys.remove(&dt_hash) {
let approvals = maybe_approvals.unwrap_or_else(|| approvals.clone());
let add_result = match dt_hash {
DeployOrTransferHash::Deploy(hash) => {
state.appendable_block.add_deploy(
DeployWithApprovals::new(hash, approvals.clone()),
&*deploy_info,
)
}
DeployOrTransferHash::Transfer(hash) => {
state.appendable_block.add_transfer(
DeployWithApprovals::new(hash, approvals.clone()),
&*deploy_info,
)
}
};
if let Err(err) = add_result {
info!(block = ?key, %dt_hash, ?deploy_info, ?err, "block invalid");
invalid.push(key.clone());
}
}
}
self.validation_states.retain(|key, state| {
if invalid.contains(key) {
effects.extend(state.respond(false));
return false;
}
if state.missing_deploys.is_empty() {
effects.extend(state.respond(true));
return false;
}
true
});
}
Event::DeployMissing(dt_hash) => {
info!(%dt_hash, "request to download deploy timed out");
if self.in_flight.dec(&dt_hash.into()) != 0 {
return Effects::new();
}
let mut retried = false;
self.validation_states.retain(|key, state| {
if !state.missing_deploys.contains_key(&dt_hash) {
return true;
}
if retried {
return true;
}
match state.source() {
Some(peer) => {
info!(%dt_hash, ?peer, "trying the next peer");
effects.extend(fetch_deploy(effect_builder, dt_hash, peer));
retried = true;
true
}
None => {
info!(
block = ?key, %dt_hash,
"could not validate the deploy. block is invalid"
);
effects.extend(state.respond(false));
false
}
}
});
if retried {
self.in_flight.inc(&dt_hash.into());
}
}
Event::CannotConvertDeploy(dt_hash) => {
self.in_flight.dec(&dt_hash.into());
self.validation_states.retain(|key, state| {
if state.missing_deploys.contains_key(&dt_hash) {
info!(
block = ?key, %dt_hash,
"could not convert deploy to deploy type. block is invalid"
);
effects.extend(state.respond(false));
false
} else {
true
}
});
}
}
effects
}
}
fn fetch_deploy<REv, I>(
effect_builder: EffectBuilder<REv>,
dt_hash: DeployOrTransferHash,
sender: I,
) -> Effects<Event<I>>
where
REv: From<Event<I>>
+ From<BlockValidationRequest<I>>
+ From<StorageRequest>
+ From<FetcherRequest<I, Deploy>>
+ Send,
I: Clone + Send + PartialEq + Eq + 'static,
{
let validate_deploy = move |result: FetchResult<Deploy, I>| match result {
FetchResult::FromStorage(deploy) | FetchResult::FromPeer(deploy, _) => {
(deploy.deploy_or_transfer_hash() == dt_hash)
.then(|| deploy)
.and_then(|deploy| {
deploy
.deploy_info()
.ok()
.map(|deploy_info| (deploy_info, deploy.approvals().clone()))
})
.map_or(
Event::CannotConvertDeploy(dt_hash),
|(deploy_info, approvals)| Event::DeployFound {
dt_hash,
approvals,
deploy_info: Box::new(deploy_info),
},
)
}
};
effect_builder
.fetch_deploy(dt_hash.into(), sender)
.map_or_else(validate_deploy, move || Event::DeployMissing(dt_hash))
}