pub mod announcements;
pub mod requests;
use std::{
any::type_name,
collections::{HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
future::Future,
net::SocketAddr,
time::{Duration, Instant},
};
use datasize::DataSize;
use futures::{channel::oneshot, future::BoxFuture, FutureExt};
use semver::Version;
use smallvec::{smallvec, SmallVec};
use tokio::join;
use tracing::error;
use casper_execution_engine::{
core::engine_state::{
self,
era_validators::GetEraValidatorsError,
execute_request::ExecuteRequest,
execution_result::ExecutionResults,
genesis::GenesisResult,
step::{StepRequest, StepResult},
BalanceRequest, BalanceResult, QueryRequest, QueryResult,
},
shared::{additive_map::AdditiveMap, transform::Transform},
storage::{global_state::CommitResult, protocol_data::ProtocolData},
};
use casper_types::{auction::ValidatorWeights, Key, ProtocolVersion};
use crate::{
components::{
chainspec_loader::ChainspecInfo,
consensus::BlockContext,
contract_runtime::{EraValidatorsRequest, ValidatorWeightsByEraIdRequest},
fetcher::FetchResult,
small_network::GossipedAddress,
storage::{DeployHashes, DeployMetadata, DeployResults, StorageType, Value},
},
crypto::{asymmetric_key::Signature, hash::Digest},
effect::requests::LinearChainRequest,
reactor::{EventQueueHandle, QueueKind},
types::{
json_compatibility::ExecutionResult, Block, BlockByHeight, BlockHash, BlockHeader,
BlockLike, Deploy, DeployHash, DeployHeader, FinalizedBlock, Item, ProtoBlock, Timestamp,
},
utils::Source,
Chainspec,
};
use announcements::{
ApiServerAnnouncement, BlockExecutorAnnouncement, ConsensusAnnouncement,
DeployAcceptorAnnouncement, GossiperAnnouncement, LinearChainAnnouncement, NetworkAnnouncement,
};
use casper_types::auction::EraValidators;
use requests::{
BlockExecutorRequest, BlockProposerRequest, BlockValidationRequest, ChainspecLoaderRequest,
ConsensusRequest, ContractRuntimeRequest, FetcherRequest, MetricsRequest, NetworkInfoRequest,
NetworkRequest, StorageRequest,
};
pub type Effect<Ev> = BoxFuture<'static, Multiple<Ev>>;
pub type Effects<Ev> = Multiple<Effect<Ev>>;
type Multiple<T> = SmallVec<[T; 2]>;
#[must_use]
#[derive(DataSize)]
pub struct Responder<T>(Option<oneshot::Sender<T>>);
impl<T: 'static + Send> Responder<T> {
fn new(sender: oneshot::Sender<T>) -> Self {
Responder(Some(sender))
}
}
impl<T> Responder<T> {
pub async fn respond(mut self, data: T) {
if let Some(sender) = self.0.take() {
if sender.send(data).is_err() {
error!("could not send response to request down oneshot channel");
}
} else {
error!("tried to send a value down a responder channel, but it was already used");
}
}
}
impl<T> Debug for Responder<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "Responder<{}>", type_name::<T>(),)
}
}
impl<T> Display for Responder<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "responder({})", type_name::<T>(),)
}
}
impl<T> Drop for Responder<T> {
fn drop(&mut self) {
if self.0.is_some() {
error!(
"{} dropped without being responded to --- \
this is always a bug and will likely cause another component to be stuck!",
self
);
}
}
}
pub trait EffectExt: Future + Send {
fn event<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Output) -> U + 'static + Send,
U: 'static,
Self: Sized;
fn ignore<Ev>(self) -> Effects<Ev>;
}
pub trait EffectResultExt {
type Value;
type Error;
fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
G: FnOnce(Self::Error) -> U + 'static + Send,
U: 'static;
}
pub trait EffectOptionExt {
type Value;
fn option<U, F, G>(self, f_some: F, f_none: G) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
G: FnOnce() -> U + 'static + Send,
U: 'static;
}
impl<T: ?Sized> EffectExt for T
where
T: Future + Send + 'static + Sized,
{
fn event<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Output) -> U + 'static + Send,
U: 'static,
{
smallvec![self.map(f).map(|item| smallvec![item]).boxed()]
}
fn ignore<Ev>(self) -> Effects<Ev> {
smallvec![self.map(|_| Multiple::new()).boxed()]
}
}
impl<T, V, E> EffectResultExt for T
where
T: Future<Output = Result<V, E>> + Send + 'static + Sized,
T: ?Sized,
{
type Value = V;
type Error = E;
fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
where
F: FnOnce(V) -> U + 'static + Send,
G: FnOnce(E) -> U + 'static + Send,
U: 'static,
{
smallvec![self
.map(|result| result.map_or_else(f_err, f_ok))
.map(|item| smallvec![item])
.boxed()]
}
}
impl<T, V> EffectOptionExt for T
where
T: Future<Output = Option<V>> + Send + 'static + Sized,
T: ?Sized,
{
type Value = V;
fn option<U, F, G>(self, f_some: F, f_none: G) -> Effects<U>
where
F: FnOnce(V) -> U + 'static + Send,
G: FnOnce() -> U + 'static + Send,
U: 'static,
{
smallvec![self
.map(|option| option.map_or_else(f_none, f_some))
.map(|item| smallvec![item])
.boxed()]
}
}
#[derive(Debug)]
pub struct EffectBuilder<REv: 'static>(EventQueueHandle<REv>);
impl<REv> Clone for EffectBuilder<REv> {
fn clone(&self) -> Self {
EffectBuilder(self.0)
}
}
impl<REv> Copy for EffectBuilder<REv> {}
impl<REv> EffectBuilder<REv> {
pub fn new(event_queue_handle: EventQueueHandle<REv>) -> Self {
EffectBuilder(event_queue_handle)
}
pub(crate) async fn make_request<T, Q, F>(self, f: F, queue_kind: QueueKind) -> T
where
T: Send + 'static,
Q: Into<REv>,
F: FnOnce(Responder<T>) -> Q,
{
let (sender, receiver) = oneshot::channel();
let responder = Responder::new(sender);
let request_event = f(responder).into();
self.0.schedule(request_event, queue_kind).await;
receiver.await.unwrap_or_else(|err| {
error!(%err, ?queue_kind, "request for {} channel closed, this is a serious bug --- \
a component will likely be stuck from now on ", type_name::<T>());
panic!("request not answerable");
})
}
#[inline(always)]
pub async fn immediately(self) {}
pub async fn fatal<M: Display + ?Sized>(self, file: &str, line: u32, msg: &M) {
panic!("fatal error [{}:{}]: {}", file, line, msg);
}
pub(crate) async fn set_timeout(self, timeout: Duration) -> Duration {
let then = Instant::now();
tokio::time::delay_for(timeout).await;
Instant::now() - then
}
pub(crate) async fn get_metrics(self) -> Option<String>
where
REv: From<MetricsRequest>,
{
self.make_request(
|responder| MetricsRequest::RenderNodeMetricsText { responder },
QueueKind::Api,
)
.await
}
pub(crate) async fn get_block_at_height_local<I>(self, height: u64) -> Option<Block>
where
REv: From<LinearChainRequest<I>>,
{
self.make_request(
|responder| LinearChainRequest::BlockAtHeightLocal(height, responder),
QueueKind::Regular,
)
.await
}
pub(crate) async fn send_message<I, P>(self, dest: I, payload: P)
where
REv: From<NetworkRequest<I, P>>,
{
self.make_request(
|responder| NetworkRequest::SendMessage {
dest,
payload,
responder,
},
QueueKind::Network,
)
.await
}
pub async fn broadcast_message<I, P>(self, payload: P)
where
REv: From<NetworkRequest<I, P>>,
{
self.make_request(
|responder| NetworkRequest::Broadcast { payload, responder },
QueueKind::Network,
)
.await
}
pub async fn gossip_message<I, P>(
self,
payload: P,
count: usize,
exclude: HashSet<I>,
) -> HashSet<I>
where
REv: From<NetworkRequest<I, P>>,
I: Send + 'static,
P: Send,
{
self.make_request(
|responder| NetworkRequest::Gossip {
payload,
count,
exclude,
responder,
},
QueueKind::Network,
)
.await
}
pub async fn network_peers<I>(self) -> HashMap<I, SocketAddr>
where
REv: From<NetworkInfoRequest<I>>,
I: Send + 'static,
{
self.make_request(
|responder| NetworkInfoRequest::GetPeers { responder },
QueueKind::Api,
)
.await
}
pub(crate) async fn announce_message_received<I, P>(self, sender: I, payload: P)
where
REv: From<NetworkAnnouncement<I, P>>,
{
self.0
.schedule(
NetworkAnnouncement::MessageReceived { sender, payload },
QueueKind::NetworkIncoming,
)
.await;
}
pub(crate) async fn announce_gossip_our_address<I, P>(self, our_address: GossipedAddress)
where
REv: From<NetworkAnnouncement<I, P>>,
{
self.0
.schedule(
NetworkAnnouncement::GossipOurAddress(our_address),
QueueKind::Regular,
)
.await;
}
pub(crate) async fn announce_new_peer<I, P>(self, peer_id: I)
where
REv: From<NetworkAnnouncement<I, P>>,
{
self.0
.schedule(
NetworkAnnouncement::NewPeer(peer_id),
QueueKind::NetworkIncoming,
)
.await;
}
pub(crate) async fn announce_complete_item_received_via_gossip<T: Item>(self, item: T::Id)
where
REv: From<GossiperAnnouncement<T>>,
{
assert!(
T::ID_IS_COMPLETE_ITEM,
"{} must be an item where the ID _is_ the complete item",
item
);
self.0
.schedule(
GossiperAnnouncement::NewCompleteItem(item),
QueueKind::Regular,
)
.await;
}
pub(crate) async fn announce_deploy_received(self, deploy: Box<Deploy>)
where
REv: From<ApiServerAnnouncement>,
{
self.0
.schedule(
ApiServerAnnouncement::DeployReceived { deploy },
QueueKind::Api,
)
.await;
}
pub(crate) fn announce_new_deploy_accepted<I>(
self,
deploy: Box<Deploy>,
source: Source<I>,
) -> impl Future<Output = ()>
where
REv: From<DeployAcceptorAnnouncement<I>>,
{
self.0.schedule(
DeployAcceptorAnnouncement::AcceptedNewDeploy { deploy, source },
QueueKind::Regular,
)
}
pub(crate) fn announce_invalid_deploy<I>(
self,
deploy: Box<Deploy>,
source: Source<I>,
) -> impl Future<Output = ()>
where
REv: From<DeployAcceptorAnnouncement<I>>,
{
self.0.schedule(
DeployAcceptorAnnouncement::InvalidDeploy { deploy, source },
QueueKind::Regular,
)
}
pub(crate) async fn announce_linear_chain_block(
self,
block: Block,
execution_results: HashMap<DeployHash, (DeployHeader, ExecutionResult)>,
) where
REv: From<BlockExecutorAnnouncement>,
{
self.0
.schedule(
BlockExecutorAnnouncement::LinearChainBlock {
block,
execution_results,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_block_to_storage<S>(self, block: Box<S::Block>) -> bool
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::PutBlock { block, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_block_from_storage<S>(
self,
block_hash: <S::Block as Value>::Id,
) -> Option<S::Block>
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::GetBlock {
block_hash,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_block_at_height<S>(self, height: u64) -> Option<S::Block>
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::GetBlockAtHeight { height, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_highest_block<S>(self) -> Option<S::Block>
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::GetHighestBlock { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_deploy_to_storage<S>(self, deploy: Box<S::Deploy>) -> bool
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::PutDeploy { deploy, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_deploys_from_storage<S>(
self,
deploy_hashes: DeployHashes<S>,
) -> DeployResults<S>
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::GetDeploys {
deploy_hashes,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_execution_results_to_storage<S>(
self,
block_hash: <S::Block as Value>::Id,
execution_results: HashMap<<S::Deploy as Value>::Id, ExecutionResult>,
) where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::PutExecutionResults {
block_hash,
execution_results,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_deploy_and_metadata_from_storage<S>(
self,
deploy_hash: <S::Deploy as Value>::Id,
) -> Option<(S::Deploy, DeployMetadata<S::Block>)>
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::GetDeployAndMetadata {
deploy_hash,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn fetch_deploy<I>(
self,
deploy_hash: DeployHash,
peer: I,
) -> Option<FetchResult<Deploy>>
where
REv: From<FetcherRequest<I, Deploy>>,
I: Send + 'static,
{
self.make_request(
|responder| FetcherRequest::Fetch {
id: deploy_hash,
peer,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn fetch_block<I>(
self,
block_hash: BlockHash,
peer: I,
) -> Option<FetchResult<Block>>
where
REv: From<FetcherRequest<I, Block>>,
I: Send + 'static,
{
self.make_request(
|responder| FetcherRequest::Fetch {
id: block_hash,
peer,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn fetch_block_by_height<I>(
self,
block_height: u64,
peer: I,
) -> Option<FetchResult<BlockByHeight>>
where
REv: From<FetcherRequest<I, BlockByHeight>>,
I: Send + 'static,
{
self.make_request(
|responder| FetcherRequest::Fetch {
id: block_height,
peer,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn request_proto_block(
self,
block_context: BlockContext,
random_bit: bool,
) -> (ProtoBlock, BlockContext)
where
REv: From<BlockProposerRequest>,
{
let deploys = self
.make_request(
|responder| BlockProposerRequest::ListForInclusion {
current_instant: block_context.timestamp(),
past_blocks: Default::default(),
responder,
},
QueueKind::Regular,
)
.await
.into_iter()
.collect();
let proto_block = ProtoBlock::new(deploys, random_bit);
(proto_block, block_context)
}
pub(crate) async fn execute_block(self, finalized_block: FinalizedBlock)
where
REv: From<BlockExecutorRequest>,
{
self.0
.schedule(
BlockExecutorRequest::ExecuteBlock(finalized_block),
QueueKind::Regular,
)
.await
}
pub(crate) async fn validate_block<I, T>(
self,
sender: I,
block: T,
block_timestamp: Timestamp,
) -> (bool, T)
where
REv: From<BlockValidationRequest<T, I>>,
T: BlockLike + Send + 'static,
{
self.make_request(
|responder| BlockValidationRequest {
block,
sender,
responder,
block_timestamp,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_proposed_proto_block(self, proto_block: ProtoBlock)
where
REv: From<ConsensusAnnouncement>,
{
self.0
.schedule(
ConsensusAnnouncement::Proposed(proto_block),
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_finalized_block(self, finalized_block: FinalizedBlock)
where
REv: From<ConsensusAnnouncement>,
{
self.0
.schedule(
ConsensusAnnouncement::Finalized(Box::new(finalized_block)),
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_block_handled(self, block_header: BlockHeader)
where
REv: From<ConsensusAnnouncement>,
{
self.0
.schedule(
ConsensusAnnouncement::Handled(Box::new(block_header)),
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_block_added(self, block_hash: BlockHash, block_header: BlockHeader)
where
REv: From<LinearChainAnnouncement>,
{
self.0
.schedule(
LinearChainAnnouncement::BlockAdded {
block_hash,
block_header: Box::new(block_header),
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn commit_genesis(
self,
chainspec: Chainspec,
) -> Result<GenesisResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::CommitGenesis {
chainspec: Box::new(chainspec),
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_chainspec<S>(self, chainspec: Chainspec)
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::PutChainspec {
chainspec: Box::new(chainspec),
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_chainspec<S>(self, version: Version) -> Option<Chainspec>
where
S: StorageType + 'static,
REv: From<StorageRequest<S>>,
{
self.make_request(
|responder| StorageRequest::GetChainspec { version, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_chainspec_info(self) -> ChainspecInfo
where
REv: From<ChainspecLoaderRequest> + Send,
{
self.make_request(ChainspecLoaderRequest::GetChainspecInfo, QueueKind::Regular)
.await
}
pub(crate) async fn request_execute(
self,
execute_request: ExecuteRequest,
) -> Result<ExecutionResults, engine_state::RootNotFound>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Execute {
execute_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn request_commit(
self,
state_root_hash: Digest,
effects: AdditiveMap<Key, Transform>,
) -> Result<CommitResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Commit {
state_root_hash,
effects,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn query_global_state(
self,
query_request: QueryRequest,
) -> Result<QueryResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Query {
query_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_balance(
self,
balance_request: BalanceRequest,
) -> Result<BalanceResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetBalance {
balance_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_protocol_data(
self,
protocol_version: ProtocolVersion,
) -> Result<Option<Box<ProtocolData>>, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetProtocolData {
protocol_version,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_era_validators(
self,
request: EraValidatorsRequest,
) -> Result<EraValidators, GetEraValidatorsError>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetEraValidators { request, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_validator_weights_by_era_id(
self,
request: ValidatorWeightsByEraIdRequest,
) -> Result<Option<ValidatorWeights>, GetEraValidatorsError>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetValidatorWeightsByEraId { request, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn run_step(
self,
step_request: StepRequest,
) -> Result<StepResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Step {
step_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn create_new_era<S>(
self,
request: ValidatorWeightsByEraIdRequest,
booking_block_height: u64,
key_block_height: u64,
) -> (
Result<Option<ValidatorWeights>, GetEraValidatorsError>,
Option<S::Block>,
Option<S::Block>,
)
where
REv: From<ContractRuntimeRequest> + From<StorageRequest<S>>,
S: StorageType + 'static,
{
let future_validators = self.get_validator_weights_by_era_id(request);
let future_booking_block = self.get_block_at_height(booking_block_height);
let future_key_block = self.get_block_at_height(key_block_height);
join!(future_validators, future_booking_block, future_key_block)
}
pub(crate) async fn handle_linear_chain_block(self, block_header: BlockHeader) -> Signature
where
REv: From<ConsensusRequest>,
{
self.make_request(
|responder| ConsensusRequest::HandleLinearBlock(Box::new(block_header), responder),
QueueKind::Regular,
)
.await
}
}
#[macro_export]
macro_rules! fatal {
($effect_builder:expr, $msg:expr) => {
$effect_builder.fatal(file!(), line!(), &$msg).ignore()
};
}