mod blocks;
mod config;
mod deploys;
mod event;
mod metrics;
mod peers;
mod state;
mod traits;
use std::{cmp::Ordering, convert::Infallible, fmt::Display, mem};
use datasize::DataSize;
use prometheus::Registry;
use tracing::{error, info, trace, warn};
use self::event::BlockByHashResult;
use casper_types::{EraId, ProtocolVersion};
use super::{
storage::{self, Storage},
Component,
};
use crate::{
components::contract_runtime::ExecutionPreState,
effect::{EffectBuilder, EffectExt, Effects},
fatal,
types::{ActivationPoint, Block, BlockHash, BlockHeader, Chainspec, TimeDiff},
NodeRng,
};
pub(crate) use config::Config;
pub(crate) use event::Event;
use event::{BlockByHeightResult, StopReason};
use metrics::Metrics;
pub(crate) use peers::PeersState;
pub(crate) use state::State;
pub(crate) use traits::ReactorEventT;
#[derive(DataSize, Debug)]
pub(crate) struct LinearChainSync<I> {
peers: PeersState<I>,
state: State,
#[data_size(skip)]
metrics: Metrics,
next_upgrade_activation_point: Option<ActivationPoint>,
stop_reason: Option<StopReason>,
state_key: Vec<u8>,
shortest_era: TimeDiff,
min_round_length: TimeDiff,
started_syncing: bool,
protocol_version: ProtocolVersion,
initial_execution_pre_state: ExecutionPreState,
}
impl<I: Clone + PartialEq + 'static> LinearChainSync<I> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<REv, Err>(
registry: &Registry,
effect_builder: EffectBuilder<REv>,
chainspec: &Chainspec,
storage: &Storage,
trusted_hash: Option<BlockHash>,
highest_block: Option<Block>,
after_upgrade: bool,
next_upgrade_activation_point: Option<ActivationPoint>,
initial_execution_pre_state: ExecutionPreState,
config: Config,
) -> Result<(Self, Effects<Event<I>>), Err>
where
REv: From<Event<I>> + Send,
Err: From<prometheus::Error> + From<storage::Error>,
{
let timeout_event = effect_builder
.set_timeout(config.get_sync_timeout().into())
.event(|_| Event::InitializeTimeout);
let protocol_version = chainspec.protocol_config.version;
if let Some(state) = read_init_state(storage, chainspec)? {
let linear_chain_sync = LinearChainSync::from_state(
registry,
chainspec,
state,
next_upgrade_activation_point,
protocol_version,
initial_execution_pre_state,
)?;
Ok((linear_chain_sync, timeout_event))
} else {
let shortest_era: TimeDiff = std::cmp::max(
chainspec.highway_config.min_round_length()
* chainspec.core_config.minimum_era_height,
chainspec.core_config.era_duration,
);
let state = match trusted_hash {
Some(hash) => {
match storage.read_block(&hash) {
Ok(Some(block)) => {
if block.protocol_version() < protocol_version {
error!(trusted_hash=?hash,
trusted_block_version=?block.protocol_version(),
current_protocol_version=?protocol_version,
"used trusted hash from before an upgrade. Use trusted hash from the new protocol version");
}
let switch_block_height = storage
.transactional_get_switch_block_by_era_id(
block.header().era_id().value(),
)
.map(|b| b.height());
State::sync_descendants(hash, block, switch_block_height)
}
Ok(None) => State::sync_trusted_hash(
hash,
highest_block.map(|block| block.take_header()),
),
Err(error) => {
error!(?error, "error when reading block by hash from storage");
State::sync_trusted_hash(
hash,
highest_block.map(|block| block.take_header()),
)
}
}
}
None if after_upgrade => {
info!(
"No synchronization of the linear chain will be done because the node \
was started right after an upgrade without a trusted hash."
);
State::Done(highest_block.map(Box::new))
}
None => {
if let Some(highest_block) = highest_block {
State::sync_descendants(*highest_block.hash(), highest_block, None)
} else {
info!(
"No synchronization of the linear chain will be done because there \
is neither a trusted hash nor a highest block present."
);
State::Done(None)
}
}
};
let state_key = create_state_key(chainspec);
let linear_chain_sync = LinearChainSync {
peers: PeersState::new(),
state,
metrics: Metrics::new(registry)?,
next_upgrade_activation_point,
stop_reason: None,
state_key,
shortest_era,
min_round_length: chainspec.highway_config.min_round_length(),
started_syncing: false,
protocol_version,
initial_execution_pre_state,
};
Ok((linear_chain_sync, timeout_event))
}
}
fn from_state(
registry: &Registry,
chainspec: &Chainspec,
state: State,
next_upgrade_activation_point: Option<ActivationPoint>,
protocol_version: ProtocolVersion,
initial_execution_pre_state: ExecutionPreState,
) -> Result<Self, prometheus::Error> {
let state_key = create_state_key(chainspec);
info!(?state, "reusing previous state");
let shortest_era: TimeDiff = std::cmp::max(
chainspec.highway_config.min_round_length() * chainspec.core_config.minimum_era_height,
chainspec.core_config.era_duration,
);
if matches!(state, State::None | State::Done(_)) {
info!(
"No synchronization of the linear chain will be done because the component is \
already in State::Done or in State::None."
);
}
Ok(LinearChainSync {
peers: PeersState::new(),
state,
metrics: Metrics::new(registry)?,
next_upgrade_activation_point,
stop_reason: None,
state_key,
shortest_era,
min_round_length: chainspec.highway_config.min_round_length(),
started_syncing: false,
protocol_version,
initial_execution_pre_state,
})
}
fn add_block(&mut self, block: Block) {
self.started_syncing = true;
match &mut self.state {
State::None | State::Done(_) => {}
State::SyncingTrustedHash { linear_chain, .. } => linear_chain.push(block),
State::SyncingDescendants { latest_block, .. } => **latest_block = block,
};
}
pub(crate) fn is_synced(&self) -> bool {
matches!(self.state, State::Done(_))
}
pub(crate) fn stopped_for_upgrade(&self) -> bool {
self.stop_reason == Some(StopReason::ForUpgrade)
}
pub(crate) fn stopped_for_downgrade(&self) -> bool {
self.stop_reason == Some(StopReason::ForDowngrade)
}
fn block_downloaded<REv>(
&mut self,
rng: &mut NodeRng,
effect_builder: EffectBuilder<REv>,
block: &Block,
) -> Effects<Event<I>>
where
I: Send + 'static,
REv: ReactorEventT<I>,
{
self.peers.reset(rng);
self.state.block_downloaded(block);
self.add_block(block.clone());
match &self.state {
State::None | State::Done(_) => {
error!(state=?self.state, "block downloaded when in incorrect state.");
fatal!(effect_builder, "block downloaded in incorrect state").ignore()
}
State::SyncingTrustedHash {
highest_block_header,
..
} => {
let should_start_downloading_deploys = highest_block_header
.as_ref()
.map(|hdr| hdr.hash() == *block.header().parent_hash())
.unwrap_or(false)
|| block.header().is_genesis_child();
if should_start_downloading_deploys {
info!("linear chain downloaded. Start downloading deploys.");
effect_builder
.immediately()
.event(move |_| Event::StartDownloadingDeploys)
} else {
self.fetch_next_block(effect_builder, rng, block)
}
}
State::SyncingDescendants { .. } => {
self.fetch_next_block_deploys(effect_builder)
}
}
}
fn mark_done(&mut self, latest_block: Option<Block>) {
let latest_block = latest_block.map(Box::new);
self.state = State::Done(latest_block);
}
fn block_handled<REv>(
&mut self,
rng: &mut NodeRng,
effect_builder: EffectBuilder<REv>,
block: &Block,
) -> Effects<Event<I>>
where
I: Send + 'static,
REv: ReactorEventT<I>,
{
let height = block.height();
let hash = block.hash();
trace!(%hash, %height, "downloaded linear chain block.");
if block.header().is_switch_block() {
self.state.set_last_switch_block_height(block.height());
}
if block.header().is_switch_block() && self.should_upgrade(block.header().era_id()) {
info!(
era = block.header().era_id().value(),
"shutting down for upgrade"
);
return effect_builder
.immediately()
.event(|_| Event::InitUpgradeShutdown);
}
self.peers.reset(rng);
let block_height = block.height();
let curr_state = mem::replace(&mut self.state, State::None);
match curr_state {
State::None | State::Done(_) => {
error!(state=?self.state, "block handled when in incorrect state.");
fatal!(effect_builder, "block handled in incorrect state").ignore()
}
State::SyncingTrustedHash {
highest_block_seen,
ref latest_block,
..
} if highest_block_seen != block_height => {
match latest_block.as_ref() {
Some(expected) if expected != block => {
error!(
?expected, got=?block,
"block execution result doesn't match received block"
);
return fatal!(effect_builder, "unexpected block execution result")
.ignore();
}
None => {
error!("block execution results received when not expected");
return fatal!(effect_builder, "unexpected block execution results.")
.ignore();
}
Some(_) => (),
}
self.state = curr_state;
self.fetch_next_block_deploys(effect_builder)
}
State::SyncingTrustedHash {
highest_block_seen,
trusted_hash,
ref latest_block,
last_switch_block_height,
..
} => {
if highest_block_seen != block_height {
error!(?highest_block_seen, downloaded_block_height=?block_height, "downloaded unexpected block");
return fatal!(
effect_builder,
"downloaded unexpected block when syncing trusted hash"
)
.ignore();
}
match latest_block.as_ref() {
Some(expected) if expected != block => {
error!(
?expected, got=?block,
"block execution result doesn't match received block"
);
return fatal!(effect_builder, "unexpected block execution result")
.ignore();
}
None => {
error!("block execution results received when not expected");
return fatal!(effect_builder, "unexpected block execution results.")
.ignore();
}
Some(_) => (),
}
info!(%block_height, "Finished synchronizing linear chain up until trusted hash.");
let peer = self.peers.random_unsafe();
self.state =
State::sync_descendants(trusted_hash, block.clone(), last_switch_block_height);
blocks::fetch_block_at_height(effect_builder, peer, block_height + 1)
}
State::SyncingDescendants {
ref latest_block,
last_switch_block_height,
..
} => {
if latest_block.as_ref() != block {
error!(
expected=?*latest_block, got=?block,
"block execution result doesn't match received block"
);
return fatal!(effect_builder, "unexpected block execution result").ignore();
}
if self.is_currently_active_era(latest_block, last_switch_block_height) {
info!(
hash=?block.hash(),
height=?block.header().height(),
era=block.header().era_id().value(),
"downloaded a block in the current era. finished synchronization"
);
self.mark_done(Some(*latest_block.clone()));
return Effects::new();
}
self.state = curr_state;
self.fetch_next_block(effect_builder, rng, block)
}
}
}
fn is_currently_active_era(
&self,
block: &Block,
last_switch_block_height: Option<u64>,
) -> bool {
let last_switch_block_height = last_switch_block_height.unwrap_or(0);
let past_blocks_in_this_era = block.height().saturating_sub(last_switch_block_height);
block.header().timestamp().elapsed() + self.min_round_length * past_blocks_in_this_era
< self.shortest_era
}
fn fetch_next_block_deploys<REv>(
&mut self,
effect_builder: EffectBuilder<REv>,
) -> Effects<Event<I>>
where
I: Send + 'static,
REv: ReactorEventT<I>,
{
let peer = self.peers.random_unsafe();
let next_block = match &mut self.state {
State::None | State::Done(_) => {
error!(state=?self.state, "tried fetching next block when in wrong state");
return fatal!(
effect_builder,
"tried fetching next block when in wrong state"
)
.ignore();
}
State::SyncingTrustedHash {
linear_chain,
latest_block,
..
} => match linear_chain.pop() {
None => None,
Some(block) => {
latest_block.replace(block.clone());
Some(block)
}
},
State::SyncingDescendants { latest_block, .. } => Some((**latest_block).clone()),
};
next_block.map_or_else(
|| {
warn!("tried fetching next block deploys when there was no block.");
Effects::new()
},
|block| {
self.metrics.reset_start_time();
deploys::fetch_block_deploys(effect_builder, peer, block)
},
)
}
fn fetch_next_block<REv>(
&mut self,
effect_builder: EffectBuilder<REv>,
rng: &mut NodeRng,
block: &Block,
) -> Effects<Event<I>>
where
I: Send + 'static,
REv: ReactorEventT<I>,
{
self.peers.reset(rng);
let peer = self.peers.random_unsafe();
match self.state {
State::SyncingTrustedHash { .. } => {
let parent_hash = *block.header().parent_hash();
self.metrics.reset_start_time();
blocks::fetch_block_by_hash(effect_builder, peer, parent_hash)
}
State::SyncingDescendants { .. } => {
let next_height = block.height() + 1;
self.metrics.reset_start_time();
blocks::fetch_block_at_height(effect_builder, peer, next_height)
}
State::Done(_) | State::None => {
error!(state=?self.state, "tried fetching next block when in wrong state");
fatal!(
effect_builder,
"tried fetching next block when in wrong state"
)
.ignore()
}
}
}
fn handle_upgrade_shutdown<REv>(
&mut self,
effect_builder: EffectBuilder<REv>,
) -> Effects<Event<I>>
where
I: Send + 'static,
REv: ReactorEventT<I>,
{
if self.state.is_done() || self.state.is_none() {
error!(state=?self.state, "shutdown for upgrade initiated when in wrong state");
return fatal!(
effect_builder,
"shutdown for upgrade initiated when in wrong state"
)
.ignore();
}
effect_builder
.save_state(self.state_key.clone().into(), Some(self.state.clone()))
.event(|_| Event::Shutdown(StopReason::ForUpgrade))
}
pub(crate) fn latest_block(&self) -> Option<&Block> {
match &self.state {
State::SyncingTrustedHash { latest_block, .. } => Option::as_ref(latest_block),
State::SyncingDescendants { latest_block, .. } => Some(latest_block),
State::Done(latest_block) => latest_block.as_deref(),
State::None => None,
}
}
pub(crate) fn into_maybe_latest_block_header(self) -> Option<BlockHeader> {
match self.state {
State::SyncingTrustedHash { latest_block, .. } => latest_block.map(Block::take_header),
State::SyncingDescendants { latest_block, .. } => Some(latest_block.take_header()),
State::Done(maybe_latest_block) => {
maybe_latest_block.map(|latest_block| latest_block.take_header())
}
State::None => None,
}
}
fn should_upgrade(&self, era_id: EraId) -> bool {
should_upgrade(self.next_upgrade_activation_point, era_id)
}
fn set_last_block_if_syncing_trusted_hash(&mut self, block: &Block) {
if let State::SyncingTrustedHash {
ref mut latest_block,
..
} = &mut self.state
{
*latest_block = Box::new(Some(block.clone()));
}
self.state.block_downloaded(block);
}
}
impl<I, REv> Component<REv> for LinearChainSync<I>
where
I: Display + Clone + Send + PartialEq + 'static,
REv: ReactorEventT<I>,
{
type Event = Event<I>;
type ConstructionError = Infallible;
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::Start(init_peer) => {
match &self.state {
State::None => {
trace!("received `Start` event when in {} state.", self.state);
Effects::new()
}
State::Done(_) => {
error!("should not have received `Start` event when in `Done` state.",);
Effects::new()
}
State::SyncingDescendants { latest_block, .. } => {
let next_block_height = latest_block.height() + 1;
info!(?next_block_height, "start synchronization");
self.metrics.reset_start_time();
blocks::fetch_block_at_height(effect_builder, init_peer, next_block_height)
}
State::SyncingTrustedHash { trusted_hash, .. } => {
trace!(?trusted_hash, "start synchronization");
self.metrics.reset_start_time();
blocks::fetch_block_by_hash(effect_builder, init_peer, *trusted_hash)
}
}
}
Event::GetBlockHeightResult(block_height, fetch_result) => {
match fetch_result {
BlockByHeightResult::Absent(peer) => {
self.metrics.observe_get_block_by_height();
trace!(
%block_height, %peer,
"failed to download block by height. Trying next peer"
);
self.peers.failure(&peer);
match self.peers.random() {
None => {
info!(
"finished synchronizing descendants of the trusted hash. \
cleaning state."
);
self.mark_done(self.latest_block().cloned());
Effects::new()
}
Some(peer) => {
self.metrics.reset_start_time();
blocks::fetch_block_at_height(effect_builder, peer, block_height)
}
}
}
BlockByHeightResult::FromStorage(block) => {
if block.height() != block_height {
return fatal!(effect_builder, "block height mismatch").ignore();
}
trace!(%block_height, "Linear block found in the local storage.");
let _ = self.block_downloaded(rng, effect_builder, &block);
let _ = self.block_handled(rng, effect_builder, &block);
self.fetch_next_block(effect_builder, rng, &block)
}
BlockByHeightResult::FromPeer(block, peer) => {
self.metrics.observe_get_block_by_height();
trace!(%block_height, %peer, "linear chain block downloaded from a peer");
if block.height() != block_height
|| *block.header().parent_hash() != *self.latest_block().unwrap().hash()
{
warn!(
%peer,
got_height = block.height(),
expected_height = block_height,
got_parent = %block.header().parent_hash(),
expected_parent = %self.latest_block().unwrap().hash(),
"block mismatch",
);
self.peers.ban(&peer);
return self.handle_event(
effect_builder,
rng,
Event::GetBlockHeightResult(
block_height,
BlockByHeightResult::Absent(peer),
),
);
}
if block.protocol_version() != self.protocol_version {
warn!(
%peer,
protocol_version = %self.protocol_version,
block_version = %block.protocol_version(),
"block protocol version mismatch",
);
self.peers.ban(&peer);
return self.handle_event(
effect_builder,
rng,
Event::GetBlockHeightResult(
block_height,
BlockByHeightResult::Absent(peer),
),
);
}
self.peers.success(peer);
self.block_downloaded(rng, effect_builder, &block)
}
}
}
Event::GetBlockHashResult(block_hash, fetch_result) => {
match fetch_result {
BlockByHashResult::Absent(peer) => {
self.metrics.observe_get_block_by_hash();
trace!(
%block_hash, %peer,
"failed to download block by hash. Trying next peer"
);
self.peers.failure(&peer);
match self.peers.random() {
None if self.started_syncing => {
error!(
%block_hash,
"could not download linear block from any of the peers."
);
fatal!(effect_builder, "failed to synchronize linear chain")
.ignore()
}
None => {
warn!(
"run out of peers before managed to start syncing. \
Resetting peers' list and continuing"
);
self.peers.reset(rng);
self.metrics.reset_start_time();
blocks::fetch_block_by_hash(effect_builder, peer, block_hash)
}
Some(peer) => {
self.metrics.reset_start_time();
blocks::fetch_block_by_hash(effect_builder, peer, block_hash)
}
}
}
BlockByHashResult::FromStorage(block) => {
if *block.hash() != block_hash {
return fatal!(effect_builder, "block hash mismatch").ignore();
}
trace!(%block_hash, "Linear block found in the local storage.");
self.set_last_block_if_syncing_trusted_hash(&block);
self.block_handled(rng, effect_builder, &block)
}
BlockByHashResult::FromPeer(block, peer) => {
self.metrics.observe_get_block_by_hash();
trace!(%block_hash, %peer, "linear chain block downloaded from a peer");
let header_hash = block.header().hash();
if header_hash != block_hash || header_hash != *block.hash() {
warn!(
"Block hash mismatch. Expected {} got {} from {}.\
Block claims to have hash {}. Disconnecting.",
block_hash,
header_hash,
block.hash(),
peer
);
self.peers.ban(&peer);
return self.handle_event(
effect_builder,
rng,
Event::GetBlockHashResult(
block_hash,
BlockByHashResult::Absent(peer),
),
);
}
self.peers.success(peer);
self.block_downloaded(rng, effect_builder, &block)
}
}
}
Event::GetDeploysResult(fetch_result) => {
self.metrics.observe_get_deploys();
match fetch_result {
event::DeploysResult::Found(block_to_execute) => {
match self
.protocol_version
.cmp(&block_to_execute.protocol_version())
{
Ordering::Less => {
warn!(
block_hash = %block_to_execute.hash(),
our_version = %self.protocol_version,
block_version = %block_to_execute.protocol_version(),
"attempting to execute a block with a newer protocol version;\
shutting down to upgrade"
);
return effect_builder
.immediately()
.event(|_| Event::Shutdown(StopReason::ForUpgrade));
}
Ordering::Greater => {
warn!(
block_hash = %block_to_execute.hash(),
our_version = %self.protocol_version,
block_version = %block_to_execute.protocol_version(),
"attempting to execute a block with an older protocol version;\
shutting down to downgrade"
);
return effect_builder
.immediately()
.event(|_| Event::Shutdown(StopReason::ForDowngrade));
}
_ => (),
}
let block_hash = block_to_execute.hash();
trace!(%block_hash, "deploys for linear chain block found");
self.peers.reset(rng);
blocks::execute_block(
effect_builder,
*block_to_execute,
self.initial_execution_pre_state.clone(),
)
.ignore()
}
event::DeploysResult::NotFound(block, peer) => {
let block_hash = block.hash();
trace!(
%block_hash, %peer,
"deploy for linear chain block not found. Trying next peer"
);
self.peers.failure(&peer);
match self.peers.random() {
None => {
error!(
%block_hash,
"could not download deploys from linear chain block."
);
fatal!(effect_builder, "failed to download linear chain deploys")
.ignore()
}
Some(peer) => {
self.metrics.reset_start_time();
deploys::fetch_block_deploys(effect_builder, peer, *block)
}
}
}
}
}
Event::StartDownloadingDeploys => {
self.peers.reset(rng);
self.fetch_next_block_deploys(effect_builder)
}
Event::NewPeerConnected(peer_id) => {
trace!(%peer_id, "new peer connected");
let mut effects = Effects::new();
if self.peers.is_empty() {
let cloned_peer_id = peer_id.clone();
effects.extend(
effect_builder
.immediately()
.event(move |_| Event::Start(cloned_peer_id)),
);
}
self.peers.push(peer_id);
effects
}
Event::BlockHandled(block) => {
let block_height = block.height();
let block_hash = *block.hash();
let effects = self.block_handled(rng, effect_builder, &block);
trace!(%block_height, %block_hash, "block handled");
effects
}
Event::GotUpgradeActivationPoint(next_upgrade_activation_point) => {
trace!(%next_upgrade_activation_point, "new activation point");
self.next_upgrade_activation_point = Some(next_upgrade_activation_point);
Effects::new()
}
Event::InitUpgradeShutdown => {
info!("shutdown initiated");
self.handle_upgrade_shutdown(effect_builder)
}
Event::Shutdown(reason) => {
info!(?reason, "ready for shutdown");
self.stop_reason = Some(reason);
Effects::new()
}
Event::InitializeTimeout => {
if !self.started_syncing {
info!("hasn't downloaded any blocks in expected time window. Shutting down…");
fatal!(effect_builder, "no syncing progress, shutting down…").ignore()
} else {
Effects::new()
}
}
}
}
}
fn should_upgrade(next_upgrade: Option<ActivationPoint>, block_era_id: EraId) -> bool {
match next_upgrade {
None => false,
Some(activation_point) => activation_point.should_upgrade(&block_era_id),
}
}
fn create_state_key(chainspec: &Chainspec) -> Vec<u8> {
format!(
"linear_chain_sync:network_name={}",
chainspec.network_config.name.clone()
)
.into()
}
fn deserialize_state(serialized_state: &[u8]) -> Option<State> {
bincode::deserialize(serialized_state).unwrap_or_else(|error| {
panic!(
"could not deserialize state from storage, error {:?}",
error
)
})
}
fn read_init_state(
storage: &Storage,
chainspec: &Chainspec,
) -> Result<Option<State>, storage::Error> {
let key = create_state_key(chainspec);
if let Some(bytes) = storage.read_state_store(&key)? {
Ok(deserialize_state(&bytes))
} else {
Ok(None)
}
}
pub(crate) fn clean_linear_chain_state(
storage: &Storage,
chainspec: &Chainspec,
) -> Result<bool, storage::Error> {
let key = create_state_key(chainspec);
storage.del_state_store(key)
}