mod config;
mod event;
mod metrics;
mod tests;
use std::{collections::HashMap, fmt::Debug, time::Duration};
use datasize::DataSize;
use prometheus::Registry;
use tracing::{debug, error, info};
use casper_execution_engine::storage::trie::Trie;
use casper_hashing::Digest;
use casper_types::{Key, StoredValue};
use crate::{
components::{fetcher::event::FetchResponder, Component},
effect::{
requests::{ContractRuntimeRequest, LinearChainRequest, NetworkRequest, StorageRequest},
EffectBuilder, EffectExt, Effects,
},
protocol::Message,
types::{
Block, BlockByHeight, BlockHash, Deploy, DeployHash, DeployWithFinalizedApprovals, Item,
NodeId,
},
utils::Source,
NodeRng,
};
pub(crate) use config::Config;
pub(crate) use event::{Event, FetchResult};
use metrics::Metrics;
pub(crate) trait ReactorEventT<T>:
From<Event<T>>
+ From<NetworkRequest<NodeId, Message>>
+ From<StorageRequest>
+ From<ContractRuntimeRequest>
+ From<LinearChainRequest<NodeId>>
+ Send
+ 'static
where
T: Item + 'static,
<T as Item>::Id: 'static,
{
}
impl<REv, T> ReactorEventT<T> for REv
where
T: Item + 'static,
<T as Item>::Id: 'static,
REv: From<Event<T>>
+ From<NetworkRequest<NodeId, Message>>
+ From<StorageRequest>
+ From<ContractRuntimeRequest>
+ From<LinearChainRequest<NodeId>>
+ Send
+ 'static,
{
}
pub(crate) trait ItemFetcher<T: Item + 'static> {
fn responders(&mut self) -> &mut HashMap<T::Id, HashMap<NodeId, Vec<FetchResponder<T>>>>;
fn peer_timeout(&self) -> Duration;
fn fetch<REv: ReactorEventT<T>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: T::Id,
peer: NodeId,
responder: FetchResponder<T>,
) -> Effects<Event<T>> {
let responders = self.responders();
responders
.entry(id)
.or_default()
.entry(peer)
.or_default()
.push(responder);
self.get_from_storage(effect_builder, id, peer)
}
fn get_from_storage<REv: ReactorEventT<T>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: T::Id,
peer: NodeId,
) -> Effects<Event<T>>;
fn got_from_storage(&mut self, item: T, peer: NodeId) -> Effects<Event<T>> {
self.signal(
item.id(),
Some(FetchResult::FromStorage(Box::new(item))),
peer,
)
}
fn failed_to_get_from_storage<REv: ReactorEventT<T>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: T::Id,
peer: NodeId,
) -> Effects<Event<T>> {
match Message::new_get_request::<T>(&id) {
Ok(message) => {
let mut effects = effect_builder.send_message(peer, message).ignore();
effects.extend(
effect_builder
.set_timeout(self.peer_timeout())
.event(move |_| Event::TimeoutPeer { id, peer }),
);
effects
}
Err(error) => {
error!("failed to construct get request: {}", error);
self.signal(id, None, peer)
}
}
}
fn signal(
&mut self,
id: T::Id,
result: Option<FetchResult<T, NodeId>>,
peer: NodeId,
) -> Effects<Event<T>> {
let mut effects = Effects::new();
let mut all_responders = self.responders().remove(&id).unwrap_or_default();
match result {
Some(ret) => {
for (_, responders) in all_responders {
for responder in responders {
effects.extend(responder.respond(Some(ret.clone())).ignore());
}
}
}
None => {
if let Some(responders) = all_responders.remove(&peer) {
for responder in responders {
effects.extend(responder.respond(None).ignore());
}
}
if !all_responders.is_empty() {
self.responders().insert(id, all_responders);
}
}
}
effects
}
}
#[derive(DataSize, Debug)]
pub(crate) struct Fetcher<T>
where
T: Item + 'static,
{
get_from_peer_timeout: Duration,
responders: HashMap<T::Id, HashMap<NodeId, Vec<FetchResponder<T>>>>,
#[data_size(skip)]
metrics: Metrics,
}
impl<T: Item> Fetcher<T> {
pub(crate) fn new(
name: &str,
config: Config,
registry: &Registry,
) -> Result<Self, prometheus::Error> {
Ok(Fetcher {
get_from_peer_timeout: config.get_from_peer_timeout().into(),
responders: HashMap::new(),
metrics: Metrics::new(name, registry)?,
})
}
}
impl ItemFetcher<Deploy> for Fetcher<Deploy> {
fn responders(
&mut self,
) -> &mut HashMap<DeployHash, HashMap<NodeId, Vec<FetchResponder<Deploy>>>> {
&mut self.responders
}
fn peer_timeout(&self) -> Duration {
self.get_from_peer_timeout
}
fn get_from_storage<REv: ReactorEventT<Deploy>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: DeployHash,
peer: NodeId,
) -> Effects<Event<Deploy>> {
effect_builder
.get_deploys_from_storage(vec![id])
.event(move |mut results| Event::GetFromStorageResult {
id,
peer,
maybe_item: Box::new(
results
.pop()
.expect("can only contain one result")
.map(DeployWithFinalizedApprovals::into_naive),
),
})
}
}
impl ItemFetcher<Block> for Fetcher<Block> {
fn responders(
&mut self,
) -> &mut HashMap<BlockHash, HashMap<NodeId, Vec<FetchResponder<Block>>>> {
&mut self.responders
}
fn peer_timeout(&self) -> Duration {
self.get_from_peer_timeout
}
fn get_from_storage<REv: ReactorEventT<Block>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: BlockHash,
peer: NodeId,
) -> Effects<Event<Block>> {
effect_builder
.get_block_from_storage(id)
.event(move |result| Event::GetFromStorageResult {
id,
peer,
maybe_item: Box::new(result),
})
}
}
impl ItemFetcher<BlockByHeight> for Fetcher<BlockByHeight> {
fn responders(
&mut self,
) -> &mut HashMap<u64, HashMap<NodeId, Vec<FetchResponder<BlockByHeight>>>> {
&mut self.responders
}
fn peer_timeout(&self) -> Duration {
self.get_from_peer_timeout
}
fn get_from_storage<REv: ReactorEventT<BlockByHeight>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: u64,
peer: NodeId,
) -> Effects<Event<BlockByHeight>> {
effect_builder
.get_block_at_height_from_storage(id)
.event(move |result| Event::GetFromStorageResult {
id,
peer,
maybe_item: Box::new(result.map(Into::into)),
})
}
}
type GlobalStorageTrie = Trie<Key, StoredValue>;
impl ItemFetcher<GlobalStorageTrie> for Fetcher<GlobalStorageTrie> {
fn responders(
&mut self,
) -> &mut HashMap<Digest, HashMap<NodeId, Vec<FetchResponder<GlobalStorageTrie>>>> {
&mut self.responders
}
fn peer_timeout(&self) -> Duration {
self.get_from_peer_timeout
}
fn get_from_storage<REv: ReactorEventT<GlobalStorageTrie>>(
&mut self,
effect_builder: EffectBuilder<REv>,
id: Digest,
peer: NodeId,
) -> Effects<Event<GlobalStorageTrie>> {
async move {
let maybe_trie = match effect_builder.get_trie(id).await {
Ok(maybe_trie) => maybe_trie,
Err(error) => {
error!(?error, "get_trie_request");
None
}
};
Event::GetFromStorageResult {
id,
peer,
maybe_item: Box::new(maybe_trie),
}
}
.event(std::convert::identity)
}
}
impl<T, REv> Component<REv> for Fetcher<T>
where
Fetcher<T>: ItemFetcher<T>,
T: Item + 'static,
REv: ReactorEventT<T>,
{
type Event = Event<T>;
type ConstructionError = prometheus::Error;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
_rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
debug!(?event, "handling event");
match event {
Event::Fetch {
id,
peer,
responder,
} => self.fetch(effect_builder, id, peer, responder),
Event::GetFromStorageResult {
id,
peer,
maybe_item,
} => match *maybe_item {
Some(item) => {
self.metrics.found_in_storage.inc();
self.got_from_storage(item, peer)
}
None => self.failed_to_get_from_storage(effect_builder, id, peer),
},
Event::GotRemotely { item, source } => {
match source {
Source::Peer(peer) => {
self.metrics.found_on_peer.inc();
self.signal(item.id(), Some(FetchResult::FromPeer(item, peer)), peer)
}
Source::Client | Source::Ourself => {
Effects::new()
}
}
}
Event::RejectedRemotely { .. } => Effects::new(),
Event::AbsentRemotely { id, peer } => {
info!(%id, %peer, "element absent on the remote node");
self.signal(id, None, peer)
}
Event::TimeoutPeer { id, peer } => {
info!(%id, %peer, "request timed out");
self.metrics.timeouts.inc();
self.signal(id, None, peer)
}
}
}
}