pub(crate) mod announcements;
pub(crate) mod diagnostics_port;
pub(crate) mod incoming;
pub(crate) mod requests;
use std::{
any::type_name,
borrow::Cow,
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
fmt::{self, Debug, Display, Formatter},
future::Future,
mem,
sync::Arc,
time::{Duration, Instant},
};
use datasize::DataSize;
use futures::{channel::oneshot, future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use serde::{Serialize, Serializer};
use smallvec::{smallvec, SmallVec};
use tokio::{sync::Semaphore, time};
use tracing::{debug, error, warn};
use casper_binary_port::{
ConsensusStatus, ConsensusValidatorChanges, LastProgress, NetworkName, RecordId, Uptime,
};
use casper_storage::{
block_store::types::ApprovalsHashes,
data_access_layer::{
prefixed_values::{PrefixedValuesRequest, PrefixedValuesResult},
tagged_values::{TaggedValuesRequest, TaggedValuesResult},
AddressableEntityResult, BalanceRequest, BalanceResult, EraValidatorsRequest,
EraValidatorsResult, ExecutionResultsChecksumResult, PutTrieRequest, PutTrieResult,
QueryRequest, QueryResult, SeigniorageRecipientsRequest, SeigniorageRecipientsResult,
TrieRequest, TrieResult,
},
DbRawBytesSpec,
};
use casper_types::{
execution::{Effects as ExecutionEffects, ExecutionResult},
Approval, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockSignatures,
BlockSynchronizerStatus, BlockV2, ChainspecRawBytes, DeployHash, Digest, EntityAddr, EraId,
ExecutionInfo, FinalitySignature, FinalitySignatureId, FinalitySignatureV2, HashAddr, Key,
NextUpgrade, Package, PackageAddr, ProtocolUpgradeConfig, PublicKey, TimeDiff, Timestamp,
Transaction, TransactionHash, TransactionId, Transfer, U512,
};
use crate::{
components::{
block_synchronizer::{
GlobalStateSynchronizerError, GlobalStateSynchronizerResponse, TrieAccumulatorError,
TrieAccumulatorResponse,
},
consensus::{ClContext, EraDump, ProposedBlock},
contract_runtime::SpeculativeExecutionResult,
diagnostics_port::StopAtSpec,
fetcher::{FetchItem, FetchResult},
gossiper::GossipItem,
network::{blocklist::BlocklistJustification, FromIncoming, NetworkInsights},
transaction_acceptor,
},
contract_runtime::ExecutionPreState,
failpoints::FailpointActivation,
reactor::{main_reactor::ReactorState, EventQueueHandle, QueueKind},
types::{
appendable_block::AppendableBlock, BlockExecutionResultsOrChunk,
BlockExecutionResultsOrChunkId, BlockWithMetadata, ExecutableBlock, FinalizedBlock,
InvalidProposalError, LegacyDeploy, MetaBlock, MetaBlockState, NodeId, TransactionHeader,
},
utils::{fmt_limit::FmtLimit, SharedFlag, Source},
};
use announcements::{
BlockAccumulatorAnnouncement, ConsensusAnnouncement, ContractRuntimeAnnouncement,
ControlAnnouncement, FatalAnnouncement, FetchedNewBlockAnnouncement,
FetchedNewFinalitySignatureAnnouncement, GossiperAnnouncement, MetaBlockAnnouncement,
PeerBehaviorAnnouncement, QueueDumpFormat, TransactionAcceptorAnnouncement,
TransactionBufferAnnouncement, UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement,
};
use casper_storage::data_access_layer::EntryPointExistsResult;
use diagnostics_port::DumpConsensusStateRequest;
use requests::{
AcceptTransactionRequest, BeginGossipRequest, BlockAccumulatorRequest,
BlockSynchronizerRequest, BlockValidationRequest, ChainspecRawBytesRequest, ConsensusRequest,
ContractRuntimeRequest, FetcherRequest, MakeBlockExecutableRequest, MarkBlockCompletedRequest,
MetricsRequest, NetworkInfoRequest, NetworkRequest, ReactorInfoRequest, SetNodeStopRequest,
StorageRequest, SyncGlobalStateRequest, TransactionBufferRequest, TrieAccumulatorRequest,
UpgradeWatcherRequest,
};
static UNOBTAINABLE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(0));
pub(crate) type Effect<Ev> = BoxFuture<'static, Multiple<Ev>>;
pub(crate) type Effects<Ev> = Multiple<Effect<Ev>>;
pub(crate) type Multiple<T> = SmallVec<[T; 2]>;
#[derive(Debug, Serialize, PartialEq, Eq, Hash, Copy, Clone, DataSize)]
pub(crate) enum GossipTarget {
Mixed(EraId),
All,
}
impl Display for GossipTarget {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
match self {
GossipTarget::Mixed(era_id) => write!(formatter, "gossip target mixed for {}", era_id),
GossipTarget::All => write!(formatter, "gossip target all"),
}
}
}
#[must_use]
#[derive(DataSize)]
pub(crate) struct Responder<T> {
sender: Option<oneshot::Sender<T>>,
is_shutting_down: SharedFlag,
}
#[must_use]
#[derive(DataSize, Debug)]
pub(crate) struct AutoClosingResponder<T>(Responder<Option<T>>);
impl<T> AutoClosingResponder<T> {
pub(crate) fn from_opt_responder(responder: Responder<Option<T>>) -> Self {
AutoClosingResponder(responder)
}
fn into_inner(mut self) -> Responder<Option<T>> {
let is_shutting_down = self.0.is_shutting_down;
mem::replace(
&mut self.0,
Responder {
sender: None,
is_shutting_down,
},
)
}
}
impl<T: Debug> AutoClosingResponder<T> {
pub(crate) async fn respond(self, data: T) {
self.into_inner().respond(Some(data)).await;
}
pub(crate) async fn respond_none(self) {
self.into_inner().respond(None).await;
}
}
impl<T> Drop for AutoClosingResponder<T> {
fn drop(&mut self) {
if let Some(sender) = self.0.sender.take() {
debug!(
sending_value = %self.0,
"responding None by dropping auto-close responder"
);
if let Err(_unsent_value) = sender.send(None) {
debug!(
unsent_value = %self.0,
"failed to auto-close responder, ignoring"
);
}
}
}
}
impl<T: 'static + Send> Responder<T> {
#[inline]
fn new(sender: oneshot::Sender<T>, is_shutting_down: SharedFlag) -> Self {
Responder {
sender: Some(sender),
is_shutting_down,
}
}
#[cfg(test)]
#[inline]
pub(crate) fn without_shutdown(sender: oneshot::Sender<T>) -> Self {
Responder::new(sender, SharedFlag::global_shared())
}
}
impl<T: Debug> Responder<T> {
pub(crate) async fn respond(mut self, data: T) {
if let Some(sender) = self.sender.take() {
if let Err(data) = sender.send(data) {
debug!(
data=?FmtLimit::new(1000, &data),
"ignored failure to send response to request down oneshot channel"
);
}
} else {
error!(
data=?FmtLimit::new(1000, &data),
"tried to send a value down a responder channel, but it was already used"
);
}
}
}
impl<T> Debug for Responder<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "Responder<{}>", type_name::<T>(),)
}
}
impl<T> Display for Responder<T> {
fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
write!(formatter, "responder({})", type_name::<T>(),)
}
}
impl<T> Drop for Responder<T> {
fn drop(&mut self) {
if self.sender.is_some() {
if self.is_shutting_down.is_set() {
debug!(
responder=?self,
"ignored dropping of responder during shutdown"
);
} else {
error!(
responder=?self,
"dropped without being responded to outside of shutdown"
);
}
}
}
}
impl<T> Serialize for Responder<T> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&format!("{:?}", self))
}
}
impl<T> Serialize for AutoClosingResponder<T> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.0.serialize(serializer)
}
}
pub(crate) trait EffectExt: Future + Send {
fn event<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Output) -> U + 'static + Send,
U: 'static,
Self: Sized;
fn ignore<Ev>(self) -> Effects<Ev>;
}
pub(crate) trait EffectResultExt {
type Value;
type Error;
fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
where
F: FnOnce(Self::Value) -> U + 'static + Send,
G: FnOnce(Self::Error) -> U + 'static + Send,
U: 'static;
}
impl<T> EffectExt for T
where
T: Future + Send + 'static + Sized,
{
fn event<U, F>(self, f: F) -> Effects<U>
where
F: FnOnce(Self::Output) -> U + 'static + Send,
U: 'static,
{
smallvec![self.map(f).map(|item| smallvec![item]).boxed()]
}
fn ignore<Ev>(self) -> Effects<Ev> {
smallvec![self.map(|_| Multiple::new()).boxed()]
}
}
impl<T, V, E> EffectResultExt for T
where
T: Future<Output = Result<V, E>> + Send + 'static + Sized,
{
type Value = V;
type Error = E;
fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
where
F: FnOnce(V) -> U + 'static + Send,
G: FnOnce(E) -> U + 'static + Send,
U: 'static,
{
smallvec![self
.map(|result| result.map_or_else(f_err, f_ok))
.map(|item| smallvec![item])
.boxed()]
}
}
#[derive(Debug)]
pub(crate) struct EffectBuilder<REv: 'static> {
event_queue: EventQueueHandle<REv>,
}
impl<REv> Clone for EffectBuilder<REv> {
fn clone(&self) -> Self {
*self
}
}
impl<REv> Copy for EffectBuilder<REv> {}
impl<REv> EffectBuilder<REv> {
pub(crate) fn new(event_queue: EventQueueHandle<REv>) -> Self {
EffectBuilder { event_queue }
}
pub(crate) fn into_inner(self) -> EventQueueHandle<REv> {
self.event_queue
}
pub(crate) async fn make_request<T, Q, F>(self, f: F, queue_kind: QueueKind) -> T
where
T: Send + 'static,
Q: Into<REv>,
F: FnOnce(Responder<T>) -> Q,
{
let (event, wait_future) = self.create_request_parts(f);
self.event_queue.schedule(event, queue_kind).await;
wait_future.await
}
pub(crate) fn create_request_parts<T, Q, F>(self, f: F) -> (REv, impl Future<Output = T>)
where
T: Send + 'static,
Q: Into<REv>,
F: FnOnce(Responder<T>) -> Q,
{
let (sender, receiver) = oneshot::channel();
let responder = Responder::new(sender, self.event_queue.shutdown_flag());
let request_event = f(responder).into();
let fut = async move {
match receiver.await {
Ok(value) => value,
Err(err) => {
if self.event_queue.shutdown_flag().is_set() {
debug!(%err, channel=?type_name::<T>(), "ignoring closed channel due to shutdown");
} else {
error!(%err, channel=?type_name::<T>(), "request for channel closed, this may be a bug? \
check if a component is stuck from now on");
}
let _ = UNOBTAINABLE.acquire().await;
panic!("should never obtain unobtainable semaphore");
}
}
};
(request_event, fut)
}
#[inline(always)]
#[allow(clippy::manual_async_fn)]
pub(crate) fn immediately(self) -> impl Future<Output = ()> + Send {
async {}
}
pub(crate) async fn fatal(self, file: &'static str, line: u32, msg: String)
where
REv: From<FatalAnnouncement>,
{
self.event_queue
.schedule(FatalAnnouncement { file, line, msg }, QueueKind::Control)
.await;
}
pub(crate) async fn set_timeout(self, timeout: Duration) -> Duration {
let then = Instant::now();
time::sleep(timeout).await;
then.elapsed()
}
pub(crate) async fn get_metrics(self) -> Option<String>
where
REv: From<MetricsRequest>,
{
self.make_request(
|responder| MetricsRequest::RenderNodeMetricsText { responder },
QueueKind::Api,
)
.await
}
pub(crate) async fn send_message<P>(self, dest: NodeId, payload: P)
where
REv: From<NetworkRequest<P>>,
{
self.make_request(
|responder| NetworkRequest::SendMessage {
dest: Box::new(dest),
payload: Box::new(payload),
respond_after_queueing: false,
auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
},
QueueKind::Network,
)
.await;
}
pub(crate) async fn enqueue_message<P>(self, dest: NodeId, payload: P)
where
REv: From<NetworkRequest<P>>,
{
self.make_request(
|responder| NetworkRequest::SendMessage {
dest: Box::new(dest),
payload: Box::new(payload),
respond_after_queueing: true,
auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
},
QueueKind::Network,
)
.await;
}
pub(crate) async fn broadcast_message_to_validators<P>(self, payload: P, era_id: EraId)
where
REv: From<NetworkRequest<P>>,
{
self.make_request(
|responder| {
debug!("validator broadcast for {}", era_id);
NetworkRequest::ValidatorBroadcast {
payload: Box::new(payload),
era_id,
auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
}
},
QueueKind::Network,
)
.await;
}
pub(crate) async fn gossip_message<P>(
self,
payload: P,
gossip_target: GossipTarget,
count: usize,
exclude: HashSet<NodeId>,
) -> HashSet<NodeId>
where
REv: From<NetworkRequest<P>>,
P: Send,
{
self.make_request(
|responder| NetworkRequest::Gossip {
payload: Box::new(payload),
gossip_target,
count,
exclude,
auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
},
QueueKind::Network,
)
.await
.unwrap_or_default()
}
pub(crate) async fn get_network_insights(self) -> NetworkInsights
where
REv: From<NetworkInfoRequest>,
{
self.make_request(
|responder| NetworkInfoRequest::Insight { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn network_peers(self) -> BTreeMap<NodeId, String>
where
REv: From<NetworkInfoRequest>,
{
self.make_request(
|responder| NetworkInfoRequest::Peers { responder },
QueueKind::Api,
)
.await
}
pub async fn get_fully_connected_peers(self, count: usize) -> Vec<NodeId>
where
REv: From<NetworkInfoRequest>,
{
self.make_request(
|responder| NetworkInfoRequest::FullyConnectedPeers { count, responder },
QueueKind::NetworkInfo,
)
.await
}
pub(crate) async fn announce_expired_transactions(self, hashes: Vec<TransactionHash>)
where
REv: From<TransactionBufferAnnouncement>,
{
self.event_queue
.schedule(
TransactionBufferAnnouncement::TransactionsExpired(hashes),
QueueKind::Validation,
)
.await;
}
pub(crate) async fn announce_incoming<P>(self, sender: NodeId, payload: P)
where
REv: FromIncoming<P>,
{
self.event_queue
.schedule(
<REv as FromIncoming<P>>::from_incoming(sender, payload),
QueueKind::NetworkIncoming,
)
.await;
}
pub(crate) async fn announce_complete_item_received_via_gossip<T: GossipItem>(self, item: T::Id)
where
REv: From<GossiperAnnouncement<T>>,
{
assert!(
T::ID_IS_COMPLETE_ITEM,
"{} must be an item where the ID _is_ the complete item",
item
);
self.event_queue
.schedule(
GossiperAnnouncement::NewCompleteItem(item),
QueueKind::Gossip,
)
.await;
}
pub(crate) async fn announce_item_body_received_via_gossip<T: GossipItem>(
self,
item: Box<T>,
sender: NodeId,
) where
REv: From<GossiperAnnouncement<T>>,
{
self.event_queue
.schedule(
GossiperAnnouncement::NewItemBody { item, sender },
QueueKind::Gossip,
)
.await;
}
pub(crate) async fn announce_finality_signature_accepted(
self,
finality_signature: Box<FinalitySignatureV2>,
) where
REv: From<BlockAccumulatorAnnouncement>,
{
self.event_queue
.schedule(
BlockAccumulatorAnnouncement::AcceptedNewFinalitySignature { finality_signature },
QueueKind::FinalitySignature,
)
.await;
}
pub(crate) async fn make_block_executable(
self,
block_hash: BlockHash,
) -> Option<ExecutableBlock>
where
REv: From<MakeBlockExecutableRequest>,
{
self.make_request(
|responder| MakeBlockExecutableRequest {
block_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn mark_block_completed(self, block_height: u64) -> bool
where
REv: From<MarkBlockCompletedRequest>,
{
self.make_request(
|responder| MarkBlockCompletedRequest {
block_height,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn try_accept_transaction(
self,
transaction: Transaction,
is_speculative: bool,
) -> Result<(), transaction_acceptor::Error>
where
REv: From<AcceptTransactionRequest>,
{
self.make_request(
|responder| AcceptTransactionRequest {
transaction,
is_speculative,
responder,
},
QueueKind::Api,
)
.await
}
pub(crate) fn announce_new_transaction_accepted(
self,
transaction: Arc<Transaction>,
source: Source,
) -> impl Future<Output = ()>
where
REv: From<TransactionAcceptorAnnouncement>,
{
self.event_queue.schedule(
TransactionAcceptorAnnouncement::AcceptedNewTransaction {
transaction,
source,
},
QueueKind::Validation,
)
}
pub(crate) async fn announce_gossip_received<T>(self, item_id: T::Id, sender: NodeId)
where
REv: From<GossiperAnnouncement<T>>,
T: GossipItem,
{
self.event_queue
.schedule(
GossiperAnnouncement::GossipReceived { item_id, sender },
QueueKind::Gossip,
)
.await;
}
pub(crate) async fn announce_finished_gossiping<T>(self, item_id: T::Id)
where
REv: From<GossiperAnnouncement<T>>,
T: GossipItem,
{
self.event_queue
.schedule(
GossiperAnnouncement::FinishedGossiping(item_id),
QueueKind::Gossip,
)
.await;
}
pub(crate) fn announce_invalid_transaction(
self,
transaction: Transaction,
source: Source,
) -> impl Future<Output = ()>
where
REv: From<TransactionAcceptorAnnouncement>,
{
self.event_queue.schedule(
TransactionAcceptorAnnouncement::InvalidTransaction {
transaction,
source,
},
QueueKind::Validation,
)
}
pub(crate) async fn upgrade_watcher_announcement(self, maybe_next_upgrade: Option<NextUpgrade>)
where
REv: From<UpgradeWatcherAnnouncement>,
{
self.event_queue
.schedule(
UpgradeWatcherAnnouncement(maybe_next_upgrade),
QueueKind::Control,
)
.await;
}
pub(crate) async fn announce_commit_step_success(self, era_id: EraId, effects: ExecutionEffects)
where
REv: From<ContractRuntimeAnnouncement>,
{
self.event_queue
.schedule(
ContractRuntimeAnnouncement::CommitStepSuccess { era_id, effects },
QueueKind::ContractRuntime,
)
.await;
}
pub(crate) async fn update_contract_runtime_state(self, new_pre_state: ExecutionPreState)
where
REv: From<ContractRuntimeRequest>,
{
self.event_queue
.schedule(
ContractRuntimeRequest::UpdatePreState { new_pre_state },
QueueKind::ContractRuntime,
)
.await;
}
pub(crate) async fn announce_upcoming_era_validators(
self,
era_that_is_ending: EraId,
upcoming_era_validators: BTreeMap<EraId, BTreeMap<PublicKey, U512>>,
) where
REv: From<ContractRuntimeAnnouncement>,
{
self.event_queue
.schedule(
ContractRuntimeAnnouncement::UpcomingEraValidators {
era_that_is_ending,
upcoming_era_validators,
},
QueueKind::ContractRuntime,
)
.await;
}
pub(crate) async fn announce_new_era_gas_price(self, era_id: EraId, next_era_gas_price: u8)
where
REv: From<ContractRuntimeAnnouncement>,
{
self.event_queue
.schedule(
ContractRuntimeAnnouncement::NextEraGasPrice {
era_id,
next_era_gas_price,
},
QueueKind::ContractRuntime,
)
.await;
}
pub(crate) async fn begin_gossip<T>(self, item_id: T::Id, source: Source, target: GossipTarget)
where
T: GossipItem,
REv: From<BeginGossipRequest<T>>,
{
self.make_request(
|responder| BeginGossipRequest {
item_id,
source,
target,
responder,
},
QueueKind::Gossip,
)
.await;
}
pub(crate) async fn put_block_to_storage(self, block: Arc<Block>) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutBlock { block, responder },
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn put_approvals_hashes_to_storage(
self,
approvals_hashes: Box<ApprovalsHashes>,
) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutApprovalsHashes {
approvals_hashes,
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn put_executed_block_to_storage(
self,
block: Arc<BlockV2>,
approvals_hashes: Box<ApprovalsHashes>,
execution_results: HashMap<TransactionHash, ExecutionResult>,
) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutExecutedBlock {
block,
approvals_hashes,
execution_results,
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn get_block_from_storage(self, block_hash: BlockHash) -> Option<Block>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlock {
block_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_block_utilization(
self,
era_id: EraId,
block_height: u64,
transaction_count: u64,
) -> Option<(u64, u64)>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockUtilizationScore {
era_id,
block_height,
switch_block_utilization: transaction_count,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn is_block_stored(self, block_hash: BlockHash) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::IsBlockStored {
block_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_approvals_hashes_from_storage(
self,
block_hash: BlockHash,
) -> Option<ApprovalsHashes>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetApprovalsHashes {
block_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_raw_data(
self,
record_id: RecordId,
key: Vec<u8>,
) -> Option<DbRawBytesSpec>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetRawData {
record_id,
key,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_block_header_from_storage(
self,
block_hash: BlockHash,
only_from_available_block_range: bool,
) -> Option<BlockHeader>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockHeader {
block_hash,
only_from_available_block_range,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_block_header_at_height_from_storage(
self,
block_height: u64,
only_from_available_block_range: bool,
) -> Option<BlockHeader>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockHeaderByHeight {
block_height,
only_from_available_block_range,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_latest_switch_block_header_from_storage(self) -> Option<BlockHeader>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetLatestSwitchBlockHeader { responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_switch_block_header_by_era_id_from_storage(
self,
era_id: EraId,
) -> Option<BlockHeader>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetSwitchBlockHeaderByEra { era_id, responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_signature_from_storage(
self,
block_hash: BlockHash,
public_key: PublicKey,
) -> Option<FinalitySignature>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockSignature {
block_hash,
public_key: Box::new(public_key),
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_execution_results_from_storage(
self,
block_hash: BlockHash,
) -> Option<Vec<(TransactionHash, TransactionHeader, ExecutionResult)>>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetExecutionResults {
block_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn put_block_header_to_storage(self, block_header: Box<BlockHeader>) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutBlockHeader {
block_header,
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn put_signatures_to_storage(self, signatures: BlockSignatures) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutBlockSignatures {
signatures,
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn put_finality_signature_to_storage(
self,
signature: FinalitySignature,
) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutFinalitySignature {
signature: Box::new(signature),
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn get_block_transfers_from_storage(
self,
block_hash: BlockHash,
) -> Option<Vec<Transfer>>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockTransfers {
block_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_transactions_era_ids(
self,
transaction_hashes: HashSet<TransactionHash>,
) -> HashSet<EraId>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetTransactionsEraIds {
transaction_hashes,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_highest_complete_block_from_storage(self) -> Option<Block>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetHighestCompleteBlock { responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_highest_complete_block_header_from_storage(self) -> Option<BlockHeader>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetHighestCompleteBlockHeader { responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_available_block_range_from_storage(self) -> AvailableBlockRange
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetAvailableBlockRange { responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn sync_global_state(
self,
block_hash: BlockHash,
state_root_hash: Digest,
) -> Result<GlobalStateSynchronizerResponse, GlobalStateSynchronizerError>
where
REv: From<SyncGlobalStateRequest>,
{
self.make_request(
|responder| SyncGlobalStateRequest {
block_hash,
state_root_hash,
responder,
},
QueueKind::SyncGlobalState,
)
.await
}
pub(crate) async fn get_trie(self, request: TrieRequest) -> TrieResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetTrie { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_reactor_state(self) -> ReactorState
where
REv: From<ReactorInfoRequest>,
{
self.make_request(
|responder| ReactorInfoRequest::ReactorState { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_last_progress(self) -> LastProgress
where
REv: From<ReactorInfoRequest>,
{
self.make_request(
|responder| ReactorInfoRequest::LastProgress { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_uptime(self) -> Uptime
where
REv: From<ReactorInfoRequest>,
{
self.make_request(
|responder| ReactorInfoRequest::Uptime { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_network_name(self) -> NetworkName
where
REv: From<ReactorInfoRequest>,
{
self.make_request(
|responder| ReactorInfoRequest::NetworkName { responder },
QueueKind::Regular,
)
.await
}
#[allow(unused)]
pub(crate) async fn get_balance_holds_interval(self) -> TimeDiff
where
REv: From<ReactorInfoRequest>,
{
self.make_request(
|responder| ReactorInfoRequest::BalanceHoldsInterval { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn get_block_synchronizer_status(self) -> BlockSynchronizerStatus
where
REv: From<BlockSynchronizerRequest>,
{
self.make_request(
|responder| BlockSynchronizerRequest::Status { responder },
QueueKind::Regular,
)
.await
}
pub(crate) async fn put_trie_if_all_children_present(
self,
request: PutTrieRequest,
) -> PutTrieResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::PutTrie { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_current_gas_price(self, era_id: EraId) -> Option<u8>
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetEraGasPrice { era_id, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn put_transaction_to_storage(self, transaction: Transaction) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutTransaction {
transaction: Arc::new(transaction),
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn get_transactions_from_storage(
self,
transaction_hashes: Vec<TransactionHash>,
) -> SmallVec<[Option<(Transaction, Option<BTreeSet<Approval>>)>; 1]>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetTransactions {
transaction_hashes,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_transaction_and_exec_info_from_storage(
self,
transaction_hash: TransactionHash,
with_finalized_approvals: bool,
) -> Option<(Transaction, Option<ExecutionInfo>)>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetTransactionAndExecutionInfo {
transaction_hash,
with_finalized_approvals,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_stored_legacy_deploy(
self,
deploy_hash: DeployHash,
) -> Option<LegacyDeploy>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetLegacyDeploy {
deploy_hash,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_stored_transaction(
self,
transaction_id: TransactionId,
) -> Option<Transaction>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetTransaction {
transaction_id,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn is_transaction_stored(self, transaction_id: TransactionId) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::IsTransactionStored {
transaction_id,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn put_execution_artifacts_to_storage(
self,
block_hash: BlockHash,
block_height: u64,
era_id: EraId,
execution_results: HashMap<TransactionHash, ExecutionResult>,
) where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::PutExecutionResults {
block_hash: Box::new(block_hash),
block_height,
era_id,
execution_results,
responder,
},
QueueKind::ToStorage,
)
.await;
}
pub(crate) async fn get_block_at_height_with_metadata_from_storage(
self,
block_height: u64,
only_from_available_block_range: bool,
) -> Option<BlockWithMetadata>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockAndMetadataByHeight {
block_height,
only_from_available_block_range,
responder,
},
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn collect_past_blocks_with_metadata(
self,
range: std::ops::Range<u64>,
only_from_available_block_range: bool,
) -> Vec<Option<BlockWithMetadata>>
where
REv: From<StorageRequest>,
{
futures::future::join_all(range.into_iter().map(|block_height| {
self.get_block_at_height_with_metadata_from_storage(
block_height,
only_from_available_block_range,
)
}))
.await
.into_iter()
.collect()
}
pub(crate) async fn get_finality_signature_from_storage(
self,
id: Box<FinalitySignatureId>,
) -> Option<FinalitySignature>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetFinalitySignature { id, responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn is_finality_signature_stored(self, id: Box<FinalitySignatureId>) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::IsFinalitySignatureStored { id, responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn fetch<T>(
self,
id: T::Id,
peer: NodeId,
validation_metadata: Box<T::ValidationMetadata>,
) -> FetchResult<T>
where
REv: From<FetcherRequest<T>>,
T: FetchItem + 'static,
{
self.make_request(
|responder| FetcherRequest {
id,
peer,
validation_metadata,
responder,
},
QueueKind::Fetch,
)
.await
}
pub(crate) async fn fetch_trie(
self,
hash: Digest,
peers: Vec<NodeId>,
) -> Result<TrieAccumulatorResponse, TrieAccumulatorError>
where
REv: From<TrieAccumulatorRequest>,
{
self.make_request(
|responder| TrieAccumulatorRequest {
hash,
peers,
responder,
},
QueueKind::SyncGlobalState,
)
.await
}
pub(crate) async fn request_appendable_block(
self,
timestamp: Timestamp,
era_id: EraId,
request_expiry: Timestamp,
) -> AppendableBlock
where
REv: From<TransactionBufferRequest>,
{
self.make_request(
|responder| TransactionBufferRequest::GetAppendableBlock {
timestamp,
era_id,
request_expiry,
responder,
},
QueueKind::Consensus,
)
.await
}
pub(crate) async fn enqueue_block_for_execution(
self,
executable_block: ExecutableBlock,
meta_block_state: MetaBlockState,
) where
REv: From<StorageRequest> + From<ContractRuntimeRequest>,
{
let key_block_height_for_activation_point = self
.make_request(
|responder| StorageRequest::GetKeyBlockHeightForActivationPoint { responder },
QueueKind::FromStorage,
)
.await
.unwrap_or_else(|| {
warn!("key block height for current activation point unknown");
0
});
self.event_queue
.schedule(
ContractRuntimeRequest::EnqueueBlockForExecution {
executable_block,
key_block_height_for_activation_point,
meta_block_state,
},
QueueKind::ContractRuntime,
)
.await;
}
pub(crate) async fn enqueue_protocol_upgrade(
self,
upgrade_config: ProtocolUpgradeConfig,
next_block_height: u64,
parent_hash: BlockHash,
parent_seed: Digest,
) where
REv: From<ContractRuntimeRequest>,
{
self.event_queue
.schedule(
ContractRuntimeRequest::DoProtocolUpgrade {
protocol_upgrade_config: upgrade_config,
next_block_height,
parent_hash,
parent_seed,
},
QueueKind::Control,
)
.await;
}
pub(crate) async fn validate_block(
self,
sender: NodeId,
proposed_block_height: u64,
block: ProposedBlock<ClContext>,
) -> Result<(), Box<InvalidProposalError>>
where
REv: From<BlockValidationRequest>,
{
self.make_request(
|responder| BlockValidationRequest {
proposed_block_height,
block,
sender,
responder,
},
QueueKind::Regular,
)
.await
}
pub(crate) async fn announce_proposed_block(self, proposed_block: ProposedBlock<ClContext>)
where
REv: From<ConsensusAnnouncement>,
{
self.event_queue
.schedule(
ConsensusAnnouncement::Proposed(Box::new(proposed_block)),
QueueKind::Consensus,
)
.await;
}
pub(crate) async fn announce_finalized_block(self, finalized_block: FinalizedBlock)
where
REv: From<ConsensusAnnouncement>,
{
self.event_queue
.schedule(
ConsensusAnnouncement::Finalized(Box::new(finalized_block)),
QueueKind::Consensus,
)
.await;
}
pub(crate) async fn announce_meta_block(self, meta_block: MetaBlock)
where
REv: From<MetaBlockAnnouncement>,
{
self.event_queue
.schedule(MetaBlockAnnouncement(meta_block), QueueKind::Regular)
.await;
}
pub(crate) async fn announce_unexecuted_block(self, block_height: u64)
where
REv: From<UnexecutedBlockAnnouncement>,
{
self.event_queue
.schedule(
UnexecutedBlockAnnouncement(block_height),
QueueKind::Regular,
)
.await;
}
pub(crate) async fn announce_fault_event(
self,
era_id: EraId,
public_key: PublicKey,
timestamp: Timestamp,
) where
REv: From<ConsensusAnnouncement>,
{
self.event_queue
.schedule(
ConsensusAnnouncement::Fault {
era_id,
public_key: Box::new(public_key),
timestamp,
},
QueueKind::Consensus,
)
.await;
}
pub(crate) async fn announce_block_peer_with_justification(
self,
offender: NodeId,
justification: BlocklistJustification,
) where
REv: From<PeerBehaviorAnnouncement>,
{
warn!(%offender, %justification, "banning peer");
self.event_queue
.schedule(
PeerBehaviorAnnouncement::OffenseCommitted {
offender: Box::new(offender),
justification: Box::new(justification),
},
QueueKind::NetworkInfo,
)
.await;
}
pub(crate) async fn get_next_upgrade(self) -> Option<NextUpgrade>
where
REv: From<UpgradeWatcherRequest> + Send,
{
self.make_request(UpgradeWatcherRequest, QueueKind::Control)
.await
}
pub(crate) async fn query_global_state(self, request: QueryRequest) -> QueryResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::Query { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_addressable_entity(
self,
state_root_hash: Digest,
entity_addr: EntityAddr,
) -> AddressableEntityResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetAddressableEntity {
state_root_hash,
entity_addr,
responder,
},
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn does_entry_point_exist(
self,
state_root_hash: Digest,
contract_hash: HashAddr,
entry_point_name: String,
) -> EntryPointExistsResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetEntryPointExists {
state_root_hash,
contract_hash,
entry_point_name,
responder,
},
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_package(
self,
state_root_hash: Digest,
package_addr: PackageAddr,
) -> Option<Box<Package>>
where
REv: From<ContractRuntimeRequest>,
{
let key = Key::Hash(package_addr);
let query_request = QueryRequest::new(state_root_hash, key, vec![]);
match self.query_global_state(query_request).await {
QueryResult::RootNotFound | QueryResult::Failure(_) => None,
QueryResult::ValueNotFound(_) => {
let query_request =
QueryRequest::new(state_root_hash, Key::SmartContract(package_addr), vec![]);
debug!("requesting under different key");
if let QueryResult::Success { value, .. } =
self.query_global_state(query_request).await
{
value.into_package().map(Box::new)
} else {
None
}
}
QueryResult::Success { value, .. } => value
.into_contract_package()
.map(Package::from)
.map(Box::new),
}
}
pub(crate) async fn get_balance(self, request: BalanceRequest) -> BalanceResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetBalance { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_era_validators_from_contract_runtime(
self,
request: EraValidatorsRequest,
) -> EraValidatorsResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetEraValidators { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_seigniorage_recipients_snapshot_from_contract_runtime(
self,
request: SeigniorageRecipientsRequest,
) -> SeigniorageRecipientsResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetSeigniorageRecipients { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_tagged_values(self, request: TaggedValuesRequest) -> TaggedValuesResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetTaggedValues { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_prefixed_values(
self,
request: PrefixedValuesRequest,
) -> PrefixedValuesResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::QueryByPrefix { request, responder },
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_execution_results_checksum(
self,
state_root_hash: Digest,
) -> ExecutionResultsChecksumResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::GetExecutionResultsChecksum {
state_root_hash,
responder,
},
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn consensus_status(self) -> Option<ConsensusStatus>
where
REv: From<ConsensusRequest>,
{
self.make_request(ConsensusRequest::Status, QueueKind::Consensus)
.await
}
pub(crate) async fn get_consensus_validator_changes(self) -> ConsensusValidatorChanges
where
REv: From<ConsensusRequest>,
{
self.make_request(ConsensusRequest::ValidatorChanges, QueueKind::Consensus)
.await
}
pub(crate) async fn diagnostics_port_dump_consensus_state(
self,
era_id: Option<EraId>,
serialize: fn(&EraDump<'_>) -> Result<Vec<u8>, Cow<'static, str>>,
) -> Result<Vec<u8>, Cow<'static, str>>
where
REv: From<DumpConsensusStateRequest>,
{
self.make_request(
|responder| DumpConsensusStateRequest {
era_id,
serialize,
responder,
},
QueueKind::Control,
)
.await
}
pub(crate) async fn diagnostics_port_dump_queue(self, dump_format: QueueDumpFormat)
where
REv: From<ControlAnnouncement>,
{
self.make_request(
|responder| ControlAnnouncement::QueueDumpRequest {
dump_format,
finished: responder,
},
QueueKind::Control,
)
.await;
}
pub(crate) async fn activate_failpoint(self, activation: FailpointActivation)
where
REv: From<ControlAnnouncement>,
{
self.event_queue
.schedule(
ControlAnnouncement::ActivateFailpoint { activation },
QueueKind::Control,
)
.await;
}
pub(crate) async fn announce_user_shutdown_request(self)
where
REv: From<ControlAnnouncement>,
{
self.event_queue
.schedule(
ControlAnnouncement::ShutdownDueToUserRequest,
QueueKind::Control,
)
.await;
}
pub(crate) async fn announce_fetched_new_block(self, block: Arc<Block>, peer: NodeId)
where
REv: From<FetchedNewBlockAnnouncement>,
{
self.event_queue
.schedule(
FetchedNewBlockAnnouncement { block, peer },
QueueKind::Fetch,
)
.await;
}
pub(crate) async fn announce_fetched_new_finality_signature(
self,
finality_signature: Box<FinalitySignature>,
peer: NodeId,
) where
REv: From<FetchedNewFinalitySignatureAnnouncement>,
{
self.event_queue
.schedule(
FetchedNewFinalitySignatureAnnouncement {
finality_signature,
peer,
},
QueueKind::Fetch,
)
.await;
}
pub(crate) async fn get_chainspec_raw_bytes(self) -> Arc<ChainspecRawBytes>
where
REv: From<ChainspecRawBytesRequest> + Send,
{
self.make_request(
ChainspecRawBytesRequest::GetChainspecRawBytes,
QueueKind::NetworkInfo,
)
.await
}
pub(crate) async fn store_finalized_approvals(
self,
transaction_hash: TransactionHash,
finalized_approvals: BTreeSet<Approval>,
) -> bool
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::StoreFinalizedApprovals {
transaction_hash,
finalized_approvals,
responder,
},
QueueKind::ToStorage,
)
.await
}
pub(crate) async fn speculatively_execute(
self,
block_header: Box<BlockHeader>,
transaction: Box<Transaction>,
) -> SpeculativeExecutionResult
where
REv: From<ContractRuntimeRequest>,
{
self.make_request(
|responder| ContractRuntimeRequest::SpeculativelyExecute {
block_header,
transaction,
responder,
},
QueueKind::ContractRuntime,
)
.await
}
pub(crate) async fn get_block_execution_results_or_chunk_from_storage(
self,
id: BlockExecutionResultsOrChunkId,
) -> Option<BlockExecutionResultsOrChunk>
where
REv: From<StorageRequest>,
{
self.make_request(
|responder| StorageRequest::GetBlockExecutionResultsOrChunk { id, responder },
QueueKind::FromStorage,
)
.await
}
pub(crate) async fn get_block_accumulated_peers(
self,
block_hash: BlockHash,
) -> Option<Vec<NodeId>>
where
REv: From<BlockAccumulatorRequest>,
{
self.make_request(
|responder| BlockAccumulatorRequest::GetPeersForBlock {
block_hash,
responder,
},
QueueKind::NetworkInfo,
)
.await
}
pub(crate) async fn set_node_stop_at(self, stop_at: Option<StopAtSpec>) -> Option<StopAtSpec>
where
REv: From<SetNodeStopRequest>,
{
self.make_request(
|responder| SetNodeStopRequest { stop_at, responder },
QueueKind::Control,
)
.await
}
}
#[macro_export]
macro_rules! fatal {
($effect_builder:expr, $($arg:tt)*) => {
$effect_builder.fatal(file!(), line!(), format!($($arg)*))
};
}