mod cached_state;
mod config;
mod deploy_sets;
mod event;
mod metrics;
#[cfg(test)]
mod tests;
use std::{
collections::{BTreeSet, HashMap, HashSet},
convert::Infallible,
sync::Arc,
time::Duration,
};
use datasize::DataSize;
use futures::join;
use prometheus::{self, Registry};
use tracing::{debug, error, info, trace, warn};
use casper_types::PublicKey;
use crate::{
components::{
consensus::{BlockContext, ClContext},
Component,
},
effect::{
announcements::BlockProposerAnnouncement,
requests::{BlockPayloadRequest, BlockProposerRequest, StateStoreRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
types::{
appendable_block::{AddError, AppendableBlock},
chainspec::DeployConfig,
Approval, BlockPayload, Chainspec, DeployHash, DeployHeader, DeployOrTransferHash,
DeployWithApprovals, Timestamp,
},
NodeRng,
};
use cached_state::CachedState;
pub use config::Config;
use deploy_sets::{BlockProposerDeploySets, PendingDeployInfo, PruneResult};
pub(crate) use event::{DeployInfo, Event};
use metrics::Metrics;
#[derive(DataSize, Debug)]
pub(crate) struct BlockProposer {
state: BlockProposerState,
metrics: Metrics,
}
const STATE_KEY: &[u8] = b"block proposer";
const PRUNE_INTERVAL: Duration = Duration::from_secs(10);
const DEPLOY_APPROX_MIN_SIZE: usize = 300;
type BlockHeight = u64;
type FinalizationQueue = HashMap<BlockHeight, Vec<DeployOrTransferHash>>;
type RequestQueue = HashMap<BlockHeight, Vec<BlockPayloadRequest>>;
#[derive(DataSize, Debug)]
#[allow(clippy::large_enum_variant)]
enum BlockProposerState {
Initializing {
pending: Vec<Event>,
deploy_config: DeployConfig,
local_config: Config,
},
Ready(BlockProposerReady),
}
impl BlockProposer {
pub(crate) fn new<REv>(
registry: Registry,
effect_builder: EffectBuilder<REv>,
next_finalized_block: BlockHeight,
chainspec: &Chainspec,
local_config: Config,
) -> Result<(Self, Effects<Event>), prometheus::Error>
where
REv: From<Event> + From<StorageRequest> + From<StateStoreRequest> + Send + 'static,
{
debug!(%next_finalized_block, "creating block proposer");
let max_ttl = chainspec.deploy_config.max_ttl;
let effects = async move {
join!(
effect_builder.get_finalized_deploys(max_ttl),
effect_builder.load_state::<CachedState>(STATE_KEY.into())
)
}
.event(
move |(finalized_deploys, maybe_cached_state)| Event::Loaded {
finalized_deploys,
next_finalized_block,
cached_state: maybe_cached_state.unwrap_or_default(),
},
);
let block_proposer = BlockProposer {
state: BlockProposerState::Initializing {
pending: Vec::new(),
deploy_config: chainspec.deploy_config,
local_config,
},
metrics: Metrics::new(registry)?,
};
Ok((block_proposer, effects))
}
}
impl<REv> Component<REv> for BlockProposer
where
REv: From<Event>
+ From<StorageRequest>
+ From<StateStoreRequest>
+ From<BlockProposerAnnouncement>
+ Send
+ 'static,
{
type Event = Event;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
let mut effects = Effects::new();
match (&mut self.state, event) {
(
BlockProposerState::Initializing {
ref mut pending,
deploy_config,
local_config,
},
Event::Loaded {
finalized_deploys,
next_finalized_block,
cached_state,
},
) => {
let (sets, pruned_hashes) = BlockProposerDeploySets::new(
finalized_deploys,
next_finalized_block,
cached_state,
);
let mut new_ready_state = BlockProposerReady {
sets,
unhandled_finalized: Default::default(),
deploy_config: *deploy_config,
request_queue: Default::default(),
local_config: local_config.clone(),
};
let pruned_count = pruned_hashes.total_pruned;
debug!(%pruned_count, "pruned deploys from buffer on loading");
effects.extend(
effect_builder
.announce_expired_deploys(pruned_hashes.expired_hashes_to_be_announced)
.ignore(),
);
effects.extend(
effect_builder
.save_state(STATE_KEY.into(), CachedState::from(&new_ready_state.sets))
.ignore(),
);
for ev in pending.drain(..) {
effects.extend(new_ready_state.handle_event(effect_builder, ev));
}
self.state = BlockProposerState::Ready(new_ready_state);
effects.extend(
effect_builder
.set_timeout(PRUNE_INTERVAL)
.event(|_| Event::Prune),
);
}
(
BlockProposerState::Initializing {
ref mut pending, ..
},
event,
) => {
pending.push(event);
}
(BlockProposerState::Ready(ref mut ready_state), event) => {
effects.extend(ready_state.handle_event(effect_builder, event));
self.metrics.pending_deploys.set(
ready_state.sets.pending_deploys.len() as i64
+ ready_state.sets.pending_transfers.len() as i64,
);
}
};
effects
}
}
#[derive(DataSize, Debug)]
#[cfg_attr(test, derive(Default))]
struct BlockProposerReady {
sets: BlockProposerDeploySets,
unhandled_finalized: HashSet<DeployHash>,
deploy_config: DeployConfig,
request_queue: RequestQueue,
local_config: Config,
}
impl BlockProposerReady {
fn handle_event<REv>(
&mut self,
effect_builder: EffectBuilder<REv>,
event: Event,
) -> Effects<Event>
where
REv: Send + From<StateStoreRequest> + From<BlockProposerAnnouncement>,
{
match event {
Event::Request(BlockProposerRequest::RequestBlockPayload(request)) => {
if request.next_finalized > self.sets.next_finalized {
warn!(
%request.next_finalized, %self.sets.next_finalized,
"received request before finalization announcement"
);
self.request_queue
.entry(request.next_finalized)
.or_default()
.push(request);
Effects::new()
} else {
info!(%request.next_finalized, "proposing a block payload");
request
.responder
.respond(self.propose_block_payload(
self.deploy_config,
request.context,
request.accusations,
request.random_bit,
))
.ignore()
}
}
Event::BufferDeploy {
hash,
approvals,
deploy_info,
} => {
self.add_deploy(Timestamp::now(), hash, approvals, *deploy_info);
Effects::new()
}
Event::Prune => {
let mut effects = effect_builder
.set_timeout(PRUNE_INTERVAL)
.event(|_| Event::Prune);
let pruned_hashes = self.prune(Timestamp::now());
let pruned_count = pruned_hashes.total_pruned;
debug!(%pruned_count, "pruned deploys from buffer");
effects.extend(
effect_builder
.announce_expired_deploys(pruned_hashes.expired_hashes_to_be_announced)
.ignore(),
);
effects.extend(
effect_builder
.save_state(STATE_KEY.into(), CachedState::from(&self.sets))
.ignore(),
);
effects
}
Event::Loaded { .. } => {
error!("got loaded event for block proposer state during ready state");
Effects::new()
}
Event::FinalizedBlock(block) => {
let deploys = block
.deploy_hashes()
.iter()
.copied()
.map(DeployOrTransferHash::Deploy)
.chain(
block
.transfer_hashes()
.iter()
.copied()
.map(DeployOrTransferHash::Transfer),
)
.collect();
let mut height = block.height();
if height > self.sets.next_finalized {
warn!(
%height, next_finalized = %self.sets.next_finalized,
"received finalized blocks out of order; queueing"
);
self.sets.finalization_queue.insert(height - 1, deploys);
Effects::new()
} else {
debug!(%height, "handling finalized block");
let mut effects = self.handle_finalized_block(effect_builder, height, deploys);
while let Some(deploys) = self.sets.finalization_queue.remove(&height) {
info!(%height, "removed finalization queue entry");
height += 1;
effects.extend(self.handle_finalized_block(
effect_builder,
height,
deploys,
));
}
effects
}
}
}
}
fn add_deploy(
&mut self,
current_instant: Timestamp,
hash: DeployOrTransferHash,
approvals: BTreeSet<Approval>,
deploy_info: DeployInfo,
) {
if deploy_info.header.expired(current_instant) {
trace!(%hash, "expired deploy rejected from the buffer");
return;
}
if self.unhandled_finalized.remove(hash.deploy_hash()) {
info!(%hash, "deploy was previously marked as finalized, storing header");
self.sets
.finalized_deploys
.insert(hash.into(), deploy_info.header);
return;
}
if self.sets.finalized_deploys.contains_key(hash.deploy_hash()) {
info!(%hash, "deploy rejected from the buffer");
return;
}
if hash.is_transfer() {
self.sets.pending_transfers.insert(
*hash.deploy_hash(),
PendingDeployInfo {
approvals,
info: deploy_info,
timestamp: current_instant,
},
);
} else {
self.sets.pending_deploys.insert(
*hash.deploy_hash(),
PendingDeployInfo {
approvals,
info: deploy_info,
timestamp: current_instant,
},
);
}
info!(%hash, "added deploy to the buffer");
}
fn finalized_deploys<I>(&mut self, deploys: I)
where
I: IntoIterator<Item = DeployOrTransferHash>,
{
for deploy_hash in deploys.into_iter() {
let (hash, remove_result) = match deploy_hash {
DeployOrTransferHash::Deploy(hash) => {
(hash, self.sets.pending_deploys.remove(&hash))
}
DeployOrTransferHash::Transfer(hash) => {
(hash, self.sets.pending_transfers.remove(&hash))
}
};
match remove_result {
Some(deploy_data) => {
self.sets
.finalized_deploys
.insert(hash, deploy_data.info.header);
}
None => {
self.unhandled_finalized.insert(hash);
}
}
}
}
fn handle_finalized_block<I, REv>(
&mut self,
_effect_builder: EffectBuilder<REv>,
height: BlockHeight,
deploys: I,
) -> Effects<Event>
where
I: IntoIterator<Item = DeployOrTransferHash>,
{
self.finalized_deploys(deploys);
self.sets.next_finalized = self.sets.next_finalized.max(height + 1);
if let Some(requests) = self.request_queue.remove(&self.sets.next_finalized) {
info!(height = %(height + 1), "handling queued requests");
requests
.into_iter()
.flat_map(|request| {
request
.responder
.respond(self.propose_block_payload(
self.deploy_config,
request.context,
request.accusations,
request.random_bit,
))
.ignore()
})
.collect()
} else {
Effects::new()
}
}
fn deps_resolved(&self, header: &DeployHeader, past_deploys: &HashSet<DeployHash>) -> bool {
header
.dependencies()
.iter()
.all(|dep| past_deploys.contains(dep) || self.contains_finalized(dep))
}
fn propose_block_payload(
&mut self,
deploy_config: DeployConfig,
context: BlockContext<ClContext>,
accusations: Vec<PublicKey>,
random_bit: bool,
) -> Arc<BlockPayload> {
let past_deploys = context
.ancestor_values()
.iter()
.flat_map(|block_payload| block_payload.deploys_and_transfers_iter())
.map(DeployOrTransferHash::into)
.take_while(|hash| !self.contains_finalized(hash))
.collect();
let block_timestamp = context.timestamp();
let mut appendable_block = AppendableBlock::new(deploy_config, block_timestamp);
for (hash, deploy_data) in &self.sets.pending_transfers {
if !self.deps_resolved(&deploy_data.info.header, &past_deploys)
|| past_deploys.contains(hash)
|| self.contains_finalized(hash)
|| block_timestamp.saturating_diff(deploy_data.timestamp)
< self.local_config.deploy_delay
{
continue;
}
if let Err(err) = appendable_block.add_transfer(
DeployWithApprovals::new(*hash, deploy_data.approvals.clone()),
&deploy_data.info,
) {
match err {
AddError::TransferCount | AddError::GasLimit | AddError::BlockSize => break,
AddError::ApprovalCount if deploy_data.approvals.len() > 1 => (),
AddError::ApprovalCount => break,
AddError::InvalidDeploy => (),
AddError::InvalidGasAmount | AddError::DeployCount | AddError::Duplicate => {
error!(?err, "unexpected error when adding transfer")
}
}
}
}
for (hash, deploy_data) in &self.sets.pending_deploys {
if !self.deps_resolved(&deploy_data.info.header, &past_deploys)
|| past_deploys.contains(hash)
|| self.contains_finalized(hash)
|| block_timestamp.saturating_diff(deploy_data.timestamp)
< self.local_config.deploy_delay
{
continue;
}
if let Err(err) = appendable_block.add_deploy(
DeployWithApprovals::new(*hash, deploy_data.approvals.clone()),
&deploy_data.info,
) {
match err {
AddError::DeployCount => break,
AddError::BlockSize => {
if appendable_block.total_size() + DEPLOY_APPROX_MIN_SIZE
> deploy_config.block_gas_limit as usize
{
break; }
}
AddError::ApprovalCount if deploy_data.approvals.len() > 1 => (),
AddError::ApprovalCount => break,
AddError::InvalidDeploy | AddError::GasLimit => (),
AddError::TransferCount | AddError::Duplicate => {
error!(?err, "unexpected error when adding deploy")
}
AddError::InvalidGasAmount => {
error!("payment_amount couldn't be converted from motes to gas")
}
}
}
}
Arc::new(appendable_block.into_block_payload(accusations, random_bit))
}
fn prune(&mut self, current_instant: Timestamp) -> PruneResult {
self.sets.prune(current_instant)
}
fn contains_finalized(&self, dep: &DeployHash) -> bool {
self.sets.finalized_deploys.contains_key(dep) || self.unhandled_finalized.contains(dep)
}
}