#![allow(clippy::boxed_local)]
mod event_queue_metrics;
pub(crate) mod main_reactor;
mod queue_kind;
use std::{
any,
collections::HashMap,
env,
fmt::{Debug, Display},
io::Write,
num::NonZeroU64,
str::FromStr,
sync::{atomic::Ordering, Arc},
};
use datasize::DataSize;
use erased_serde::Serialize as ErasedSerialize;
#[cfg(test)]
use fake_instant::FakeClock;
#[cfg(test)]
use futures::future::BoxFuture;
use futures::FutureExt;
use once_cell::sync::Lazy;
use prometheus::{self, Histogram, HistogramOpts, IntCounter, IntGauge, Registry};
use quanta::{Clock, IntoNanoseconds};
use serde::Serialize;
use signal_hook::consts::signal::{SIGINT, SIGQUIT, SIGTERM};
use stats_alloc::{Stats, INSTRUMENTED_SYSTEM};
use tokio::time::{Duration, Instant};
use tracing::{debug_span, error, info, instrument, trace, warn, Instrument, Span};
#[cfg(test)]
use crate::components::ComponentState;
#[cfg(test)]
use casper_types::testing::TestRng;
use casper_types::{
Block, BlockHeader, Chainspec, ChainspecRawBytes, FinalitySignature, Transaction,
};
#[cfg(target_os = "linux")]
use utils::rlimit::{Limit, OpenFiles, ResourceLimit};
#[cfg(test)]
use crate::testing::{network::NetworkedReactor, ConditionCheckReactor};
use crate::{
components::{
block_accumulator,
fetcher::{self, FetchItem},
network::{blocklist::BlocklistJustification, Identity as NetworkIdentity},
transaction_acceptor,
},
effect::{
announcements::{ControlAnnouncement, PeerBehaviorAnnouncement, QueueDumpFormat},
incoming::NetResponse,
Effect, EffectBuilder, EffectExt, Effects,
},
failpoints::FailpointActivation,
types::{BlockExecutionResultsOrChunk, ExitCode, LegacyDeploy, NodeId, SyncLeap, TrieOrChunk},
unregister_metric,
utils::{self, SharedFlag, WeightedRoundRobin},
NodeRng, TERMINATION_REQUESTED,
};
use casper_storage::block_store::types::ApprovalsHashes;
pub(crate) use queue_kind::QueueKind;
const DEFAULT_DISPATCH_EVENT_THRESHOLD: Duration = Duration::from_secs(1);
const DISPATCH_EVENT_THRESHOLD_ENV_VAR: &str = "CL_EVENT_MAX_MICROSECS";
#[cfg(test)]
const POLL_INTERVAL: Duration = Duration::from_millis(10);
static DISPATCH_EVENT_THRESHOLD: Lazy<Duration> = Lazy::new(|| {
env::var(DISPATCH_EVENT_THRESHOLD_ENV_VAR)
.map(|threshold_str| {
let threshold_microsecs = u64::from_str(&threshold_str).unwrap_or_else(|error| {
panic!(
"can't parse env var {}={} as a u64: {}",
DISPATCH_EVENT_THRESHOLD_ENV_VAR, threshold_str, error
)
});
Duration::from_micros(threshold_microsecs)
})
.unwrap_or_else(|_| DEFAULT_DISPATCH_EVENT_THRESHOLD)
});
#[cfg(target_os = "linux")]
const TARGET_OPEN_FILES_LIMIT: Limit = 64_000;
#[cfg(target_os = "linux")]
fn adjust_open_files_limit() {
match ResourceLimit::<OpenFiles>::get() {
Err(err) => {
warn!(%err, "could not retrieve open files limit");
}
Ok(current_limit) => {
if current_limit.current() < TARGET_OPEN_FILES_LIMIT {
let best_possible = if current_limit.max() < TARGET_OPEN_FILES_LIMIT {
warn!(
wanted = TARGET_OPEN_FILES_LIMIT,
hard_limit = current_limit.max(),
"settling for lower open files limit due to hard limit"
);
current_limit.max()
} else {
TARGET_OPEN_FILES_LIMIT
};
let new_limit = ResourceLimit::<OpenFiles>::fixed(best_possible);
if let Err(err) = new_limit.set() {
warn!(%err, current=current_limit.current(), target=best_possible, "did not succeed in raising open files limit")
} else {
tracing::debug!(?new_limit, "successfully increased open files limit");
}
} else {
tracing::debug!(
?current_limit,
"not changing open files limit, already sufficient"
);
}
}
}
}
#[cfg(not(target_os = "linux"))]
fn adjust_open_files_limit() {
info!("not on linux, not adjusting open files limit");
}
pub(crate) type Scheduler<Ev> = WeightedRoundRobin<(Option<NonZeroU64>, Ev), QueueKind>;
#[derive(DataSize, Debug)]
pub(crate) struct EventQueueHandle<REv>
where
REv: 'static,
{
scheduler: &'static Scheduler<REv>,
is_shutting_down: SharedFlag,
}
impl<REv> Clone for EventQueueHandle<REv> {
fn clone(&self) -> Self {
*self
}
}
impl<REv> Copy for EventQueueHandle<REv> {}
impl<REv> EventQueueHandle<REv> {
pub(crate) fn new(scheduler: &'static Scheduler<REv>, is_shutting_down: SharedFlag) -> Self {
EventQueueHandle {
scheduler,
is_shutting_down,
}
}
#[cfg(test)]
pub(crate) fn without_shutdown(scheduler: &'static Scheduler<REv>) -> Self {
EventQueueHandle::new(scheduler, SharedFlag::global_shared())
}
pub(crate) async fn schedule<Ev>(self, event: Ev, queue_kind: QueueKind)
where
REv: From<Ev>,
{
self.schedule_with_ancestor(None, event, queue_kind).await;
}
pub(crate) async fn schedule_with_ancestor<Ev>(
self,
ancestor: Option<NonZeroU64>,
event: Ev,
queue_kind: QueueKind,
) where
REv: From<Ev>,
{
self.scheduler
.push((ancestor, event.into()), queue_kind)
.await;
}
pub(crate) fn event_queues_counts(&self) -> HashMap<QueueKind, usize> {
self.scheduler.event_queues_counts()
}
pub(crate) fn shutdown_flag(&self) -> SharedFlag {
self.is_shutting_down
}
}
pub(crate) trait Reactor: Sized {
type Event: ReactorEvent + Display;
type Config;
type Error: Send + 'static;
fn dispatch_event(
&mut self,
effect_builder: EffectBuilder<Self::Event>,
rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event>;
fn new(
cfg: Self::Config,
chainspec: Arc<Chainspec>,
chainspec_raw_bytes: Arc<ChainspecRawBytes>,
network_identity: NetworkIdentity,
registry: &Registry,
event_queue: EventQueueHandle<Self::Event>,
rng: &mut NodeRng,
) -> Result<(Self, Effects<Self::Event>), Self::Error>;
fn update_metrics(&mut self, _event_queue_handle: EventQueueHandle<Self::Event>) {}
fn activate_failpoint(&mut self, _activation: &FailpointActivation) {
}
#[allow(dead_code)]
#[cfg(test)]
fn get_component_state(&self, _name: &str) -> Option<&ComponentState> {
None
}
}
pub(crate) trait ReactorEvent: Send + Debug + From<ControlAnnouncement> + 'static {
fn is_control(&self) -> bool;
fn try_into_control(self) -> Option<ControlAnnouncement>;
fn description(&self) -> &'static str {
"anonymous event"
}
}
#[cfg(test)]
pub(crate) trait Finalize: Sized {
fn finalize(self) -> BoxFuture<'static, ()> {
async move {}.boxed()
}
}
struct AllocatedMem {
allocated: u64,
consumed: u64,
total: u64,
}
#[derive(Debug)]
pub(crate) struct Runner<R>
where
R: Reactor,
{
scheduler: &'static Scheduler<R::Event>,
reactor: R,
current_event_id: u64,
last_metrics: Instant,
metrics: RunnerMetrics,
event_metrics_threshold: u64,
event_metrics_min_delay: Duration,
clock: Clock,
is_shutting_down: SharedFlag,
}
#[derive(Debug)]
struct RunnerMetrics {
events: IntCounter,
event_dispatch_duration: Histogram,
allocated_ram_bytes: IntGauge,
consumed_ram_bytes: IntGauge,
total_ram_bytes: IntGauge,
registry: Registry,
}
impl RunnerMetrics {
fn new(registry: &Registry) -> Result<Self, prometheus::Error> {
let events = IntCounter::new(
"runner_events",
"running total count of events handled by this reactor",
)?;
let event_dispatch_duration = Histogram::with_opts(
HistogramOpts::new(
"event_dispatch_duration",
"time in nanoseconds to dispatch an event",
)
.buckets(vec![
100.0,
500.0,
1_000.0,
5_000.0,
10_000.0,
20_000.0,
50_000.0,
100_000.0,
200_000.0,
300_000.0,
400_000.0,
500_000.0,
600_000.0,
700_000.0,
800_000.0,
900_000.0,
1_000_000.0,
2_000_000.0,
5_000_000.0,
]),
)?;
let allocated_ram_bytes =
IntGauge::new("allocated_ram_bytes", "total allocated ram in bytes")?;
let consumed_ram_bytes =
IntGauge::new("consumed_ram_bytes", "total consumed ram in bytes")?;
let total_ram_bytes = IntGauge::new("total_ram_bytes", "total system ram in bytes")?;
registry.register(Box::new(events.clone()))?;
registry.register(Box::new(event_dispatch_duration.clone()))?;
registry.register(Box::new(allocated_ram_bytes.clone()))?;
registry.register(Box::new(consumed_ram_bytes.clone()))?;
registry.register(Box::new(total_ram_bytes.clone()))?;
Ok(RunnerMetrics {
events,
event_dispatch_duration,
registry: registry.clone(),
allocated_ram_bytes,
consumed_ram_bytes,
total_ram_bytes,
})
}
}
impl Drop for RunnerMetrics {
fn drop(&mut self) {
unregister_metric!(self.registry, self.events);
unregister_metric!(self.registry, self.event_dispatch_duration);
unregister_metric!(self.registry, self.allocated_ram_bytes);
unregister_metric!(self.registry, self.consumed_ram_bytes);
unregister_metric!(self.registry, self.total_ram_bytes);
}
}
impl<R> Runner<R>
where
R: Reactor,
R::Event: Serialize,
R::Error: From<prometheus::Error>,
{
#[instrument(
"init",
level = "debug",
skip_all,
fields(node_id = %NodeId::from(&network_identity))
)]
pub(crate) async fn with_metrics(
cfg: R::Config,
chainspec: Arc<Chainspec>,
chainspec_raw_bytes: Arc<ChainspecRawBytes>,
network_identity: NetworkIdentity,
rng: &mut NodeRng,
registry: &Registry,
) -> Result<Self, R::Error> {
adjust_open_files_limit();
let event_size = size_of::<R::Event>();
if event_size > 16 * size_of::<usize>() {
warn!(
%event_size, type_name = ?any::type_name::<R::Event>(),
"large event size, consider reducing it or boxing"
);
}
let event_queue_dump_threshold =
env::var("CL_EVENT_QUEUE_DUMP_THRESHOLD").map_or(None, |s| s.parse::<usize>().ok());
let scheduler = utils::leak(Scheduler::new(
QueueKind::weights(),
event_queue_dump_threshold,
));
let is_shutting_down = SharedFlag::new();
let event_queue = EventQueueHandle::new(scheduler, is_shutting_down);
let (reactor, initial_effects) = R::new(
cfg,
chainspec,
chainspec_raw_bytes,
network_identity,
registry,
event_queue,
rng,
)?;
info!(
"Reactor: with_metrics has: {} initial_effects",
initial_effects.len()
);
process_effects(None, scheduler, initial_effects, QueueKind::Regular)
.instrument(debug_span!("process initial effects"))
.await;
info!("reactor main loop is ready");
Ok(Runner {
scheduler,
reactor,
current_event_id: 1,
metrics: RunnerMetrics::new(registry)?,
last_metrics: Instant::now(),
event_metrics_min_delay: Duration::from_secs(30),
event_metrics_threshold: 1000,
clock: Clock::new(),
is_shutting_down,
})
}
#[instrument("dispatch", level = "debug", fields(a, ev = self.current_event_id), skip(self, rng))]
pub(crate) async fn crank(&mut self, rng: &mut NodeRng) -> Option<ExitCode> {
self.metrics.events.inc();
let event_queue = EventQueueHandle::new(self.scheduler, self.is_shutting_down);
let effect_builder = EffectBuilder::new(event_queue);
if self.current_event_id % self.event_metrics_threshold == 0 {
if self.last_metrics.elapsed() >= self.event_metrics_min_delay {
self.reactor.update_metrics(event_queue);
self.last_metrics = Instant::now();
}
if let Some(AllocatedMem {
allocated,
consumed,
total,
}) = Self::get_allocated_memory()
{
trace!(%allocated, %total, "memory allocated");
self.metrics.allocated_ram_bytes.set(allocated as i64);
self.metrics.consumed_ram_bytes.set(consumed as i64);
self.metrics.total_ram_bytes.set(total as i64);
}
}
let ((ancestor, event), queue_kind) = self.scheduler.pop().await;
trace!(%event, %queue_kind, "current");
let event_desc = event.description();
Span::current().record("ev", self.current_event_id);
if let Some(ancestor) = ancestor {
Span::current().record("a", ancestor.get());
}
let start = self.clock.start();
let (effects, maybe_exit_code, queue_kind) = if event.is_control() {
match event.try_into_control() {
None => {
error!(
"event::as_control succeeded, but try_into_control failed. this is a bug"
);
(Effects::new(), None, QueueKind::Control)
}
Some(ControlAnnouncement::ShutdownDueToUserRequest) => (
Effects::new(),
Some(ExitCode::CleanExitDontRestart),
QueueKind::Control,
),
Some(ControlAnnouncement::ShutdownForUpgrade) => {
(Effects::new(), Some(ExitCode::Success), QueueKind::Control)
}
Some(ControlAnnouncement::ShutdownAfterCatchingUp) => (
Effects::new(),
Some(ExitCode::CleanExitDontRestart),
QueueKind::Control,
),
Some(ControlAnnouncement::FatalError { file, line, msg }) => {
error!(%file, %line, %msg, "fatal error via control announcement");
(Effects::new(), Some(ExitCode::Abort), QueueKind::Control)
}
Some(ControlAnnouncement::QueueDumpRequest {
dump_format,
finished,
}) => {
match dump_format {
QueueDumpFormat::Serde(mut ser) => {
self.scheduler
.dump(move |queue_dump| {
if let Err(err) =
queue_dump.erased_serialize(&mut ser.as_serializer())
{
warn!(%err, "queue dump failed to serialize");
}
})
.await;
}
QueueDumpFormat::Debug(ref file) => {
match file.try_clone() {
Ok(mut local_file) => {
self.scheduler
.dump(move |queue_dump| {
write!(&mut local_file, "{:?}", queue_dump)
.and_then(|_| local_file.flush())
.map_err(|err| {
warn!(
?err,
"failed to write/flush queue dump using debug format"
);
})
.ok();
})
.await;
}
Err(err) => warn!(
%err,
"could not create clone of temporary file for queue debug dump"
),
};
}
}
finished.respond(()).await;
(Default::default(), None, QueueKind::Control)
}
Some(ControlAnnouncement::ActivateFailpoint { activation }) => {
self.reactor.activate_failpoint(&activation);
(Effects::new(), None, QueueKind::Control)
}
}
} else {
(
self.reactor.dispatch_event(effect_builder, rng, event),
None,
queue_kind,
)
};
let end = self.clock.end();
let delta = self.clock.delta(start, end);
if delta > *DISPATCH_EVENT_THRESHOLD {
warn!(%event_desc, ns = delta.into_nanos(), "event took very long to dispatch");
}
self.metrics
.event_dispatch_duration
.observe(delta.into_nanos() as f64);
process_effects(
NonZeroU64::new(self.current_event_id),
self.scheduler,
effects,
queue_kind,
)
.in_current_span()
.await;
self.current_event_id += 1;
maybe_exit_code
}
fn get_allocated_memory() -> Option<AllocatedMem> {
let mem_info = match sys_info::mem_info() {
Ok(mem_info) => mem_info,
Err(error) => {
warn!(%error, "unable to get mem_info using sys-info");
return None;
}
};
let total = mem_info.total * 1024;
let consumed = total - (mem_info.avail * 1024);
let Stats {
allocations: _,
deallocations: _,
reallocations: _,
bytes_allocated,
bytes_deallocated,
bytes_reallocated: _,
} = INSTRUMENTED_SYSTEM.stats();
Some(AllocatedMem {
allocated: bytes_allocated.saturating_sub(bytes_deallocated) as u64,
consumed,
total,
})
}
pub(crate) async fn run(&mut self, rng: &mut NodeRng) -> ExitCode {
loop {
match TERMINATION_REQUESTED.load(Ordering::SeqCst) as i32 {
0 => {
if let Some(exit_code) = self.crank(rng).await {
self.is_shutting_down.set();
break exit_code;
}
}
SIGINT => {
self.is_shutting_down.set();
break ExitCode::SigInt;
}
SIGQUIT => {
self.is_shutting_down.set();
break ExitCode::SigQuit;
}
SIGTERM => {
self.is_shutting_down.set();
break ExitCode::SigTerm;
}
_ => error!("should be unreachable - bug in signal handler"),
}
}
}
}
#[cfg(test)]
#[derive(Eq, PartialEq, Debug)]
pub(crate) enum TryCrankOutcome {
NoEventsToProcess,
ProcessedAnEvent,
ShouldExit(ExitCode),
Exited,
}
#[cfg(test)]
impl<R> Runner<R>
where
R: Reactor,
R::Event: Serialize,
R::Error: From<prometheus::Error>,
{
pub(crate) async fn new(
cfg: R::Config,
chainspec: Arc<Chainspec>,
chainspec_raw_bytes: Arc<ChainspecRawBytes>,
rng: &mut NodeRng,
) -> Result<Self, R::Error> {
let registry = Registry::new();
let network_identity = NetworkIdentity::with_generated_certs().unwrap();
Self::with_metrics(
cfg,
chainspec,
chainspec_raw_bytes,
network_identity,
rng,
®istry,
)
.await
}
#[cfg(test)]
pub(crate) fn effect_builder(&self) -> EffectBuilder<R::Event> {
let event_queue = EventQueueHandle::new(self.scheduler, self.is_shutting_down);
EffectBuilder::new(event_queue)
}
#[cfg(test)]
pub(crate) async fn process_injected_effects<F>(&mut self, create_effects: F)
where
F: FnOnce(EffectBuilder<R::Event>) -> Effects<R::Event>,
{
use tracing::{debug_span, Instrument};
let event_queue = EventQueueHandle::new(self.scheduler, self.is_shutting_down);
let effect_builder = EffectBuilder::new(event_queue);
let effects = create_effects(effect_builder);
process_effects(None, self.scheduler, effects, QueueKind::Regular)
.instrument(debug_span!(
"process injected effects",
ev = self.current_event_id
))
.await
}
pub(crate) async fn try_crank(&mut self, rng: &mut NodeRng) -> TryCrankOutcome {
if self.is_shutting_down.is_set() {
TryCrankOutcome::Exited
} else if self.scheduler.item_count() == 0 {
TryCrankOutcome::NoEventsToProcess
} else {
match self.crank(rng).await {
Some(exit_code) => {
self.is_shutting_down.set();
TryCrankOutcome::ShouldExit(exit_code)
}
None => TryCrankOutcome::ProcessedAnEvent,
}
}
}
pub(crate) fn reactor(&self) -> &R {
&self.reactor
}
pub(crate) fn reactor_mut(&mut self) -> &mut R {
&mut self.reactor
}
pub(crate) async fn drain_into_inner(self) -> R {
self.is_shutting_down.set();
self.scheduler.seal();
for (ancestor, event) in self.scheduler.drain_queues().await {
tracing::debug!(?ancestor, %event, "drained event");
}
self.reactor
}
}
#[cfg(test)]
impl<R> Runner<ConditionCheckReactor<R>>
where
R: Reactor + NetworkedReactor,
R::Event: Serialize,
R::Error: From<prometheus::Error>,
{
pub(crate) async fn crank_until<F>(&mut self, rng: &mut TestRng, condition: F, within: Duration)
where
F: Fn(&R::Event) -> bool + Send + 'static,
{
self.reactor.set_condition_checker(Box::new(condition));
tokio::time::timeout(within, self.crank_and_check_indefinitely(rng))
.await
.unwrap_or_else(|_| {
panic!(
"Runner::crank_until() timed out after {}s on node {}",
within.as_secs_f64(),
self.reactor.inner().node_id()
)
})
}
async fn crank_and_check_indefinitely(&mut self, rng: &mut TestRng) {
loop {
match self.try_crank(rng).await {
TryCrankOutcome::NoEventsToProcess => {
FakeClock::advance_time(POLL_INTERVAL.as_millis() as u64);
tokio::time::sleep(POLL_INTERVAL).await;
continue;
}
TryCrankOutcome::ProcessedAnEvent => {}
TryCrankOutcome::ShouldExit(exit_code) => {
panic!("should not exit: {:?}", exit_code)
}
TryCrankOutcome::Exited => unreachable!(),
}
if self.reactor.condition_result() {
info!("{} met condition", self.reactor.inner().node_id());
return;
}
}
}
}
async fn process_effects<Ev>(
ancestor: Option<NonZeroU64>,
scheduler: &'static Scheduler<Ev>,
effects: Effects<Ev>,
queue_kind: QueueKind,
) where
Ev: Send + 'static,
{
for effect in effects {
tokio::spawn(async move {
for event in effect.await {
scheduler.push((ancestor, event), queue_kind).await;
}
});
}
}
fn wrap_effect<Ev, REv, F>(wrap: F, effect: Effect<Ev>) -> Effect<REv>
where
F: Fn(Ev) -> REv + Send + 'static,
Ev: Send + 'static,
REv: Send + 'static,
{
(async move {
let events = effect.await;
events.into_iter().map(wrap).collect()
})
.boxed()
}
pub(crate) fn wrap_effects<Ev, REv, F>(wrap: F, effects: Effects<Ev>) -> Effects<REv>
where
F: Fn(Ev) -> REv + Send + 'static + Clone,
Ev: Send + 'static,
REv: Send + 'static,
{
effects
.into_iter()
.map(move |effect| wrap_effect(wrap.clone(), effect))
.collect()
}
fn handle_fetch_response<R, I>(
reactor: &mut R,
effect_builder: EffectBuilder<<R as Reactor>::Event>,
rng: &mut NodeRng,
sender: NodeId,
serialized_item: &[u8],
) -> Effects<<R as Reactor>::Event>
where
I: FetchItem,
R: Reactor,
<R as Reactor>::Event: From<fetcher::Event<I>> + From<PeerBehaviorAnnouncement>,
{
match fetcher::Event::<I>::from_get_response_serialized_item(sender, serialized_item) {
Some(fetcher_event) => {
Reactor::dispatch_event(reactor, effect_builder, rng, fetcher_event.into())
}
None => effect_builder
.announce_block_peer_with_justification(
sender,
BlocklistJustification::SentBadItem { tag: I::TAG },
)
.ignore(),
}
}
fn handle_get_response<R>(
reactor: &mut R,
effect_builder: EffectBuilder<<R as Reactor>::Event>,
rng: &mut NodeRng,
sender: NodeId,
message: Box<NetResponse>,
) -> Effects<<R as Reactor>::Event>
where
R: Reactor,
<R as Reactor>::Event: From<transaction_acceptor::Event>
+ From<fetcher::Event<FinalitySignature>>
+ From<fetcher::Event<Block>>
+ From<fetcher::Event<BlockHeader>>
+ From<fetcher::Event<BlockExecutionResultsOrChunk>>
+ From<fetcher::Event<LegacyDeploy>>
+ From<fetcher::Event<Transaction>>
+ From<fetcher::Event<SyncLeap>>
+ From<fetcher::Event<TrieOrChunk>>
+ From<fetcher::Event<ApprovalsHashes>>
+ From<block_accumulator::Event>
+ From<PeerBehaviorAnnouncement>,
{
match *message {
NetResponse::Transaction(ref serialized_item) => handle_fetch_response::<R, Transaction>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
),
NetResponse::LegacyDeploy(ref serialized_item) => handle_fetch_response::<R, LegacyDeploy>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
),
NetResponse::Block(ref serialized_item) => {
handle_fetch_response::<R, Block>(reactor, effect_builder, rng, sender, serialized_item)
}
NetResponse::BlockHeader(ref serialized_item) => handle_fetch_response::<R, BlockHeader>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
),
NetResponse::FinalitySignature(ref serialized_item) => {
handle_fetch_response::<R, FinalitySignature>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
)
}
NetResponse::SyncLeap(ref serialized_item) => handle_fetch_response::<R, SyncLeap>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
),
NetResponse::ApprovalsHashes(ref serialized_item) => {
handle_fetch_response::<R, ApprovalsHashes>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
)
}
NetResponse::BlockExecutionResults(ref serialized_item) => {
handle_fetch_response::<R, BlockExecutionResultsOrChunk>(
reactor,
effect_builder,
rng,
sender,
serialized_item,
)
}
}
}