pub mod announcements;
pub mod requests;
use std::{
any::type_name,
borrow::Cow,
collections::{BTreeMap, HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
future::Future,
sync::Arc,
time::{Duration, Instant},
};
use datasize::DataSize;
use futures::{channel::oneshot, future::BoxFuture, FutureExt};
use semver::Version;
use serde::{de::DeserializeOwned, Serialize};
use smallvec::{smallvec, SmallVec};
use tokio::join;
use tracing::{error, warn};
use casper_execution_engine::{
core::engine_state::{
self,
era_validators::GetEraValidatorsError,
execute_request::ExecuteRequest,
execution_result::ExecutionResults,
genesis::GenesisResult,
step::{StepRequest, StepResult},
BalanceRequest, BalanceResult, QueryRequest, QueryResult, MAX_PAYMENT,
},
shared::{additive_map::AdditiveMap, stored_value::StoredValue, transform::Transform},
storage::{global_state::CommitResult, protocol_data::ProtocolData},
};
use casper_types::{
auction::{EraValidators, ValidatorWeights},
ExecutionResult, Key, ProtocolVersion, PublicKey, Transfer,
};
use crate::{
components::{
chainspec_loader::ChainspecInfo,
consensus::{BlockContext, EraId},
contract_runtime::{EraValidatorsRequest, ValidatorWeightsByEraIdRequest},
deploy_acceptor::Error,
fetcher::FetchResult,
small_network::GossipedAddress,
},
crypto::hash::Digest,
effect::requests::LinearChainRequest,
reactor::{EventQueueHandle, QueueKind},
types::{
Block, BlockByHeight, BlockHash, BlockHeader, BlockLike, Deploy, DeployHash, DeployHeader,
DeployMetadata, FinalitySignature, FinalizedBlock, Item, ProtoBlock, Timestamp,
},
utils::Source,
Chainspec,
};
use announcements::{
BlockExecutorAnnouncement, ConsensusAnnouncement, DeployAcceptorAnnouncement,
GossiperAnnouncement, LinearChainAnnouncement, NetworkAnnouncement, RpcServerAnnouncement,
};
use requests::{
BlockExecutorRequest, BlockProposerRequest, BlockValidationRequest, ChainspecLoaderRequest,
ConsensusRequest, ContractRuntimeRequest, FetcherRequest, MetricsRequest, NetworkInfoRequest,
NetworkRequest, ProtoBlockRequest, StateStoreRequest, StorageRequest,
};
pub type Effect<Ev> = BoxFuture<'static, Multiple<Ev>>;
pub type Effects<Ev> = Multiple<Effect<Ev>>;
pub type Multiple<T> = SmallVec<[T; 2]>;
#[must_use]
#[derive(DataSize)]
pub struct Responder<T>(Option<oneshot::Sender<T>>);
impl<T: 'static + Send> Responder<T> {
#[inline]
fn new(sender: oneshot::Sender<T>) -> Self {
Responder(Some(sender))
}
#[cfg(test)]
#[inline]
pub(crate) fn create(sender: oneshot::Sender<T>) -> Self {
Responder::new(sender)
}
}
impl<T> Responder<T> {
pub async fn respond(mut self, data: T) {
if let Some(sender) = self.0.take() {
if sender.send(data).is_err() {
error!("could not send response to request down oneshot channel");
}
} else {
error!("tried to send a value down a responder channel, but it was already used");
}
}
}
impl<T> Debug for Responder<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "Responder<{}>", type_name::<T>(),)
}
}
impl<T> Display for Responder<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "responder({})", type_name::<T>(),)
}
}
impl<T> Drop for Responder<T> {
fn drop(&mut self) {
if self.0.is_some() {
error!(
"{} dropped without being responded to --- \
this is always a bug and will likely cause another component to be stuck!",
self
);
}
}
}
impl<T> Serialize for Responder<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&format!("{:?}", self))
}
}
pub trait EffectExt: Future + Send {
fn event<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Output) -> U + 'static + Send,
U: 'static,
Self: Sized;
fn ignore<Ev>(self) -> Effects<Ev>;
}
pub trait EffectResultExt {
type Value;
type Error;
fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
G: FnOnce(Self::Error) -> U + 'static + Send,
U: 'static;
}
pub trait EffectOptionExt {
type Value;
fn map_or_else<U, F, G>(self, f_some: F, f_none: G) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
G: FnOnce() -> U + 'static + Send,
U: 'static;
fn map_some<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
U: 'static;
}
impl<T: ?Sized> EffectExt for T
where
T: Future + Send + 'static + Sized,
{
fn event<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Output) -> U + 'static + Send,
U: 'static,
{
smallvec![self.map(f).map(|item| smallvec![item]).boxed()]
}
fn ignore<Ev>(self) -> Effects<Ev> {
smallvec![self.map(|_| Multiple::new()).boxed()]
}
}
impl<T, V, E> EffectResultExt for T
where
T: Future<Output = Result<V, E>> + Send + 'static + Sized,
T: ?Sized,
{
type Value = V;
type Error = E;
fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
where
F: FnOnce(V) -> U + 'static + Send,
G: FnOnce(E) -> U + 'static + Send,
U: 'static,
{
smallvec![self
.map(|result| result.map_or_else(f_err, f_ok))
.map(|item| smallvec![item])
.boxed()]
}
}
impl<T, V> EffectOptionExt for T
where
T: Future<Output = Option<V>> + Send + 'static + Sized,
T: ?Sized,
{
type Value = V;
fn map_or_else<U, F, G>(self, f_some: F, f_none: G) -> Effects<U>
where
F: FnOnce(V) -> U + 'static + Send,
G: FnOnce() -> U + 'static + Send,
U: 'static,
{
smallvec![self
.map(|option| option.map_or_else(f_none, f_some))
.map(|item| smallvec![item])
.boxed()]
}
fn map_some<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
U: 'static,
{
smallvec![self
.map(|option| option
.map(|el| smallvec![f(el)])
.unwrap_or_else(|| smallvec![]))
.boxed()]
}
}
#[derive(Debug)]
pub struct EffectBuilder<REv: 'static>(EventQueueHandle<REv>);
impl<REv> Clone for EffectBuilder<REv> {
fn clone(&self) -> Self {
EffectBuilder(self.0)
}
}
impl<REv> Copy for EffectBuilder<REv> {}
impl<REv> EffectBuilder<REv> {
pub fn new(event_queue_handle: EventQueueHandle<REv>) -> Self {
EffectBuilder(event_queue_handle)
}
pub(crate) async fn make_request<T, Q, F>(self, f: F, queue_kind: QueueKind) -> T
where
T: Send + 'static,
Q: Into<REv>,
F: FnOnce(Responder<T>) -> Q,
{
let (sender, receiver) = oneshot::channel();
let responder = Responder::new(sender);
let request_event = f(responder).into();
self.0.schedule(request_event, queue_kind).await;
receiver.await.unwrap_or_else(|err| {
error!(%err, ?queue_kind, "request for {} channel closed, this is a serious bug --- \
a component will likely be stuck from now on ", type_name::<T>());
panic!("request not answerable");
})
}
#[inline(always)]
#[allow(clippy::manual_async_fn)]
pub fn immediately(self) -> impl Future<Output = ()> + Send {
async {}
}
pub fn fatal(self, file: &str, line: u32, msg: String) -> impl Future<Output = ()> + Send {
panic!("fatal error [{}:{}]: {}", file, line, msg);
#[allow(unreachable_code)]
async {}
}
pub(crate) async fn set_timeout(self, timeout: Duration) -> Duration {
let then = Instant::now();
tokio::time::delay_for(timeout).await;
Instant::now() - then
}
pub(crate) async fn get_metrics(self) -> Option<String>
where
REv: From<MetricsRequest>,
{
self.make_request(
|responder| MetricsRequest::RenderNodeMetricsText { responder },
QueueKind::Api,
)
.await
}
pub(crate) async fn get_block_at_height_local<I>(self, height: u64) -> Option<Block>
where
REv: From<LinearChainRequest<I>>,
{
self.make_request(
|responder| LinearChainRequest::BlockAtHeightLocal(height, responder),
QueueKind::Regular,
)
.await
}
pub(crate) async fn send_message<I, P>(self, dest: I, payload: P)
where
REv: From<NetworkRequest<I, P>>,
{
self.make_request(
|responder| NetworkRequest::SendMessage {
dest,
payload,
responder,
},
QueueKind::Network,
)
.await
}
pub async fn broadcast_message<I, P>(self, payload: P)
where
REv: From<NetworkRequest<I, P>>,
{
self.make_request(
|responder| NetworkRequest::Broadcast { payload, responder },
QueueKind::Network,
)
.await
}
pub async fn gossip_message<I, P>(
self,
payload: P,
count: usize,
exclude: HashSet<I>,
) -> HashSet<I>
where
REv: From<NetworkRequest<I, P>>,
I: Send + 'static,
P: Send,
{
self.make_request(
|responder| NetworkRequest::Gossip {
payload,
count,
exclude,
responder,
},
QueueKind::Network,
)
.await
}
pub async fn network_peers<I>(self) -> BTreeMap<I, String>
where
REv: From<NetworkInfoRequest<I>>,
I: Send + 'static,
{
self.make_request(
|responder| NetworkInfoRequest::GetPeers { responder },
QueueKind::Api,
)
.await
}
pub(crate) async fn announce_message_received<I, P>(self, sender: I, payload: P)
where
REv: From<NetworkAnnouncement<I, P>>,
{
self.0
.schedule(
NetworkAnnouncement::MessageReceived { sender, payload },
QueueKind::NetworkIncoming,
)
.await;
}
pub(crate) async fn announce_gossip_our_address<I, P>(self, our_address: GossipedAddress)
where
REv: From<NetworkAnnouncement<I, P>>,
{
self.0
.schedule(
NetworkAnnouncement::GossipOurAddress(our_address),
QueueKind::Regular,
)
.await;
}
pub(crate) async fn announce_new_peer<I, P>(self, peer_id: I)
where
REv: From<NetworkAnnouncement<I, P>>,
{
self.0
.schedule(
NetworkAnnouncement::NewPeer(peer_id),
QueueKind::NetworkIncoming,
)
.await;
}
pub(crate) async fn announce_complete_item_received_via_gossip<T: Item>(self, item: T::Id)
where
REv: From<GossiperAnnouncement<T>>,
{
assert!(
T::ID_IS_COMPLETE_ITEM,
"{} must be an item where the ID _is_ the complete item",
item
);
self.0
.schedule(
GossiperAnnouncement::NewCompleteItem(item),
QueueKind::Regular,
)
.await;
}
pub(crate) async fn announce_deploy_received(
self,
deploy: Box<Deploy>,
responder: Option<Responder<Result<(), Error>>>,
) where
REv: From<RpcServerAnnouncement>,
{
self.0
.schedule(
RpcServerAnnouncement::DeployReceived { deploy, responder },
QueueKind::Api,
)
.await;
}
pub(crate) fn announce_new_deploy_accepted<I>(
self,
deploy: Box<Deploy>,
source: Source<I>,
) -> impl Future<Output = ()>
where
REv: From<DeployAcceptorAnnouncement<I>>,
{
self.0.schedule(
DeployAcceptorAnnouncement::AcceptedNewDeploy { deploy, source },
QueueKind::Regular,
)
}
pub(crate) fn announce_invalid_deploy<I>(
self,
deploy: Box<Deploy>,
source: Source<I>,
) -> impl Future<Output = ()>
where
REv: From<DeployAcceptorAnnouncement<I>>,
{
self.0.schedule(
DeployAcceptorAnnouncement::InvalidDeploy { deploy, source },
QueueKind::Regular,
)
}
pub(crate) async fn announce_linear_chain_block(
self,
block: Block,
execution_results: HashMap<DeployHash, (DeployHeader, ExecutionResult)>,
) where
REv: From<BlockExecutorAnnouncement>,
{
self.0
.schedule(
BlockExecutorAnnouncement::LinearChainBlock {
block,
execution_results,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_block_to_storage(self, block: Box<Block>) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutBlock { block, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_block_from_storage(self, block_hash: BlockHash) -> Option<Block>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlock {
block_hash,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_block_transfers_from_storage(
self,
block_hash: BlockHash,
) -> Option<Vec<Transfer>>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockTransfers {
block_hash,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_block_at_height(self, height: u64) -> Option<Block>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockAtHeight { height, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_highest_block(self) -> Option<Block>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetHighestBlock { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_deploy_to_storage(self, deploy: Box<Deploy>) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutDeploy { deploy, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_deploys_from_storage(
self,
deploy_hashes: Multiple<DeployHash>,
) -> Vec<Option<Deploy>>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetDeploys {
deploy_hashes,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_execution_results_to_storage(
self,
block_hash: BlockHash,
execution_results: HashMap<DeployHash, ExecutionResult>,
) where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutExecutionResults {
block_hash,
execution_results,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_deploy_and_metadata_from_storage(
self,
deploy_hash: DeployHash,
) -> Option<(Deploy, DeployMetadata)>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetDeployAndMetadata {
deploy_hash,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn fetch_deploy<I>(
self,
deploy_hash: DeployHash,
peer: I,
) -> Option<FetchResult<Deploy>>
where
REv: From<FetcherRequest<I, Deploy>>,
I: Send + 'static,
{
self.make_request(
|responder| FetcherRequest::Fetch {
id: deploy_hash,
peer,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn fetch_block<I>(
self,
block_hash: BlockHash,
peer: I,
) -> Option<FetchResult<Block>>
where
REv: From<FetcherRequest<I, Block>>,
I: Send + 'static,
{
self.make_request(
|responder| FetcherRequest::Fetch {
id: block_hash,
peer,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn fetch_block_by_height<I>(
self,
block_height: u64,
peer: I,
) -> Option<FetchResult<BlockByHeight>>
where
REv: From<FetcherRequest<I, BlockByHeight>>,
I: Send + 'static,
{
self.make_request(
|responder| FetcherRequest::Fetch {
id: block_height,
peer,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn request_proto_block(
self,
block_context: BlockContext,
past_deploys: HashSet<DeployHash>,
next_finalized: u64,
random_bit: bool,
) -> (ProtoBlock, BlockContext)
where
REv: From<BlockProposerRequest>,
{
let proto_block = self
.make_request(
|responder| {
BlockProposerRequest::RequestProtoBlock(ProtoBlockRequest {
current_instant: block_context.timestamp(),
past_deploys,
next_finalized,
responder,
random_bit,
})
},
QueueKind::Regular,
)
.await;
(proto_block, block_context)
}
pub(crate) async fn execute_block(self, finalized_block: FinalizedBlock)
where
REv: From<BlockExecutorRequest>,
{
self.0
.schedule(
BlockExecutorRequest::ExecuteBlock(finalized_block),
QueueKind::Regular,
)
.await
}
pub(crate) async fn validate_block<I, T>(
self,
sender: I,
block: T,
block_timestamp: Timestamp,
) -> (bool, T)
where
REv: From<BlockValidationRequest<T, I>>,
T: BlockLike + Send + 'static,
{
self.make_request(
|responder| BlockValidationRequest {
block,
sender,
responder,
block_timestamp,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_finalized_block<I>(self, finalized_block: FinalizedBlock)
where
REv: From<ConsensusAnnouncement<I>>,
{
self.0
.schedule(
ConsensusAnnouncement::Finalized(Box::new(finalized_block)),
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_block_handled<I>(self, block_header: BlockHeader)
where
REv: From<ConsensusAnnouncement<I>>,
{
self.0
.schedule(
ConsensusAnnouncement::Handled(Box::new(block_header)),
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_fault_event<I>(
self,
era_id: EraId,
public_key: PublicKey,
timestamp: Timestamp,
) where
REv: From<ConsensusAnnouncement<I>>,
{
self.0
.schedule(
ConsensusAnnouncement::Fault {
era_id,
public_key: Box::new(public_key),
timestamp,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_disconnect_from_peer<I>(self, peer: I)
where
REv: From<ConsensusAnnouncement<I>>,
{
self.0
.schedule(
ConsensusAnnouncement::DisconnectFromPeer(peer),
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_block_added(self, block_hash: BlockHash, block_header: BlockHeader)
where
REv: From<LinearChainAnnouncement>,
{
self.0
.schedule(
LinearChainAnnouncement::BlockAdded {
block_hash,
block_header: Box::new(block_header),
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_finality_signature(self, fs: Box<FinalitySignature>)
where
REv: From<LinearChainAnnouncement>,
{
self.0
.schedule(
LinearChainAnnouncement::NewFinalitySignature(fs),
QueueKind::Regular,
)
.await
}
pub(crate) async fn commit_genesis(
self,
chainspec: Chainspec,
) -> Result<GenesisResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::CommitGenesis {
chainspec: Box::new(chainspec),
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_chainspec(self, chainspec: Chainspec)
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutChainspec {
chainspec: Arc::new(chainspec),
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_chainspec(self, version: Version) -> Option<Arc<Chainspec>>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetChainspec { version, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_chainspec_info(self) -> ChainspecInfo
where
REv: From<ChainspecLoaderRequest> + Send,
{
self.make_request(ChainspecLoaderRequest::GetChainspecInfo, QueueKind::Regular)
.await
}
pub(crate) async fn load_state<T>(self, key: Cow<'static, [u8]>) -> Option<T>
where
REv: From<StateStoreRequest>,
T: DeserializeOwned,
{
self.make_request(
move |responder| StateStoreRequest::Load { key, responder },
QueueKind::Regular,
)
.await
.map(|data| bincode::deserialize(&data))
.transpose()
.unwrap_or_else(|err| {
let type_name = type_name::<T>();
warn!(%type_name, %err, "could not deserialize state from storage");
None
})
}
pub(crate) async fn save_state<T>(self, key: Cow<'static, [u8]>, value: T) -> bool
where
REv: From<StateStoreRequest>,
T: Serialize,
{
match bincode::serialize(&value) {
Ok(data) => {
self.make_request(
move |responder| StateStoreRequest::Save {
key,
data,
responder,
},
QueueKind::Regular,
)
.await;
true
}
Err(err) => {
let type_name = type_name::<T>();
warn!(%type_name, %err, "Error serializing state");
false
}
}
}
pub(crate) async fn request_execute(
self,
execute_request: ExecuteRequest,
) -> Result<ExecutionResults, engine_state::RootNotFound>
where
REv: From<ContractRuntimeRequest>,
{
let execute_request = Box::new(execute_request);
self.make_request(
|responder| ContractRuntimeRequest::Execute {
execute_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn request_commit(
self,
state_root_hash: Digest,
effects: AdditiveMap<Key, Transform>,
) -> Result<CommitResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Commit {
state_root_hash,
effects,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn query_global_state(
self,
query_request: QueryRequest,
) -> Result<QueryResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Query {
query_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn is_verified_account(self, account_key: Key) -> bool
where
REv: From<ContractRuntimeRequest>,
REv: From<StorageRequest>,
{
if let Some(block) = self.get_highest_block().await {
let state_hash = (*block.state_root_hash()).into();
let query_request = QueryRequest::new(state_hash, account_key, vec![]);
if let Ok(QueryResult::Success { value, .. }) =
self.query_global_state(query_request).await
{
if let StoredValue::Account(account) = *value {
let purse_uref = account.main_purse();
let balance_request = BalanceRequest::new(state_hash, purse_uref);
if let Ok(balance_result) = self.get_balance(balance_request).await {
if let Some(motes) = balance_result.motes() {
return motes >= &*MAX_PAYMENT;
}
}
}
}
}
false
}
pub(crate) async fn get_balance(
self,
balance_request: BalanceRequest,
) -> Result<BalanceResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetBalance {
balance_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_protocol_data(
self,
protocol_version: ProtocolVersion,
) -> Result<Option<Box<ProtocolData>>, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetProtocolData {
protocol_version,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_era_validators(
self,
request: EraValidatorsRequest,
) -> Result<EraValidators, GetEraValidatorsError>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetEraValidators { request, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_validator_weights_by_era_id(
self,
request: ValidatorWeightsByEraIdRequest,
) -> Result<Option<ValidatorWeights>, GetEraValidatorsError>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetValidatorWeightsByEraId { request, responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn run_step(
self,
step_request: StepRequest,
) -> Result<StepResult, engine_state::Error>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Step {
step_request,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn create_new_era(
self,
request: ValidatorWeightsByEraIdRequest,
booking_block_height: u64,
key_block_height: u64,
) -> (
Result<Option<ValidatorWeights>, GetEraValidatorsError>,
Option<Block>,
Option<Block>,
)
where
REv: From<ContractRuntimeRequest> + From<StorageRequest>,
{
let future_validators = self.get_validator_weights_by_era_id(request);
let future_booking_block = self.get_block_at_height(booking_block_height);
let future_key_block = self.get_block_at_height(key_block_height);
join!(future_validators, future_booking_block, future_key_block)
}
pub(crate) async fn handle_linear_chain_block(
self,
block_header: BlockHeader,
) -> Option<FinalitySignature>
where
REv: From<ConsensusRequest>,
{
self.make_request(
|responder| ConsensusRequest::HandleLinearBlock(Box::new(block_header), responder),
QueueKind::Regular,
)
.await
}
pub(crate) async fn is_bonded_validator(self, era_id: EraId, public_key: PublicKey) -> bool
where
REv: From<ConsensusRequest>,
{
self.make_request(
|responder| ConsensusRequest::IsBondedValidator(era_id, public_key, responder),
QueueKind::Regular,
)
.await
}
pub(crate) async fn is_bonded_in_future_era(
self,
state_root_hash: Digest,
era_id: EraId,
protocol_version: ProtocolVersion,
public_key: PublicKey,
) -> Result<bool, GetEraValidatorsError>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::IsBonded {
state_root_hash,
era_id,
protocol_version,
public_key,
responder,
},
QueueKind::Regular,
)
.await
}
}
#[macro_export]
macro_rules! fatal {
($effect_builder:expr, $($arg:tt)*) => {
$effect_builder.fatal(file!(), line!(), format_args!($($arg)*).to_string()).ignore()
};
}