use std::time::Duration;
use either::Either;
use tracing::{debug, info, warn};
use casper_types::{ActivationPoint, BlockHash, TimeDiff, Timestamp};
use crate::{
components::{
block_accumulator::{SyncIdentifier, SyncInstruction},
block_synchronizer::BlockSynchronizerProgress,
sync_leaper,
sync_leaper::{LeapActivityError, LeapState},
ValidatorBoundComponent,
},
effect::{requests::BlockSynchronizerRequest, EffectBuilder, EffectExt, Effects},
reactor::{
main_reactor::{MainEvent, MainReactor},
wrap_effects,
},
types::{NodeId, SyncLeap, SyncLeapIdentifier},
NodeRng,
};
pub(super) enum CatchUpInstruction {
Do(Duration, Effects<MainEvent>),
CheckLater(String, Duration),
Fatal(String),
ShutdownForUpgrade,
CaughtUp,
CommitGenesis,
CommitUpgrade,
}
impl MainReactor {
pub(super) fn catch_up_instruction(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
) -> CatchUpInstruction {
let sync_identifier = match self.catch_up_process() {
Either::Right(catch_up_instruction) => return catch_up_instruction,
Either::Left(sync_identifier) => sync_identifier,
};
debug!(
?sync_identifier,
block_hash = %sync_identifier.block_hash(),
"CatchUp: sync identifier"
);
let sync_instruction = self.block_accumulator.sync_instruction(sync_identifier);
debug!(
?sync_instruction,
block_hash = %sync_instruction.block_hash(),
"CatchUp: sync_instruction"
);
if let Some(catch_up_instruction) =
self.catch_up_sync_instruction(effect_builder, rng, sync_instruction)
{
return catch_up_instruction;
}
CatchUpInstruction::CaughtUp
}
fn catch_up_process(&mut self) -> Either<SyncIdentifier, CatchUpInstruction> {
let catch_up_progress = self.block_synchronizer.historical_progress();
self.update_last_progress(&catch_up_progress, false);
match catch_up_progress {
BlockSynchronizerProgress::Idle => {
match self.trusted_hash {
Some(trusted_hash) => self.catch_up_trusted_hash(trusted_hash),
None => self.catch_up_no_trusted_hash(),
}
}
BlockSynchronizerProgress::Syncing(block_hash, maybe_block_height, last_progress) => {
self.catch_up_syncing(block_hash, maybe_block_height, last_progress)
}
BlockSynchronizerProgress::Executing(block_hash, _, _) => {
Either::Right(CatchUpInstruction::Fatal(format!(
"CatchUp: block synchronizer attempted to execute block: {}",
block_hash
)))
}
BlockSynchronizerProgress::Synced(block_hash, block_height, era_id) => Either::Left(
SyncIdentifier::SyncedBlockIdentifier(block_hash, block_height, era_id),
),
}
}
fn catch_up_no_trusted_hash(&mut self) -> Either<SyncIdentifier, CatchUpInstruction> {
match self.storage.get_highest_complete_block() {
Ok(Some(block)) => {
info!("CatchUp: local tip detected, no trusted hash");
Either::Left(SyncIdentifier::LocalTip(
*block.hash(),
block.height(),
block.era_id(),
))
}
Ok(None) => {
match self
.storage
.read_highest_switch_block_headers(1)
.map(|headers| headers.first().cloned())
{
Ok(Some(_)) => {
info!("CatchUp: waiting to store genesis immediate switch block");
Either::Right(CatchUpInstruction::CheckLater(
"waiting for genesis immediate switch block to be stored".to_string(),
self.control_logic_default_delay.into(),
))
}
Ok(None) => {
self.catch_up_check_genesis()
}
Err(storage_err) => Either::Right(CatchUpInstruction::Fatal(format!(
"CatchUp: Could not read storage to find highest switch block header: {}",
storage_err
))),
}
}
Err(err) => Either::Right(CatchUpInstruction::Fatal(format!(
"CatchUp: fatal block store error when attempting to read \
highest complete block: {}",
err
))),
}
}
fn catch_up_check_genesis(&mut self) -> Either<SyncIdentifier, CatchUpInstruction> {
match self.chainspec.protocol_config.activation_point {
ActivationPoint::Genesis(timestamp) => {
let now = Timestamp::now();
let grace_period = timestamp.saturating_add(TimeDiff::from_seconds(180));
if now > grace_period {
return Either::Right(CatchUpInstruction::Fatal(
"CatchUp: late for genesis; cannot proceed without trusted hash"
.to_string(),
));
}
let time_remaining = timestamp.saturating_diff(now);
if time_remaining > TimeDiff::default() {
return Either::Right(CatchUpInstruction::CheckLater(
format!("waiting for genesis activation at {}", timestamp),
Duration::from(time_remaining),
));
}
Either::Right(CatchUpInstruction::CommitGenesis)
}
ActivationPoint::EraId(_) => {
Either::Right(CatchUpInstruction::Fatal(
"CatchUp: cannot proceed without trusted hash".to_string(),
))
}
}
}
fn catch_up_trusted_hash(
&mut self,
trusted_hash: BlockHash,
) -> Either<SyncIdentifier, CatchUpInstruction> {
match self.storage.read_block_header_by_hash(&trusted_hash) {
Ok(Some(trusted_header)) => {
match self.storage.get_highest_complete_block() {
Ok(Some(block)) => {
let trusted_height = trusted_header.height();
if trusted_height > block.height() {
Either::Left(SyncIdentifier::BlockIdentifier(
trusted_hash,
trusted_height,
))
} else {
Either::Left(SyncIdentifier::LocalTip(
*block.hash(),
block.height(),
block.era_id(),
))
}
}
Ok(None) => Either::Left(SyncIdentifier::BlockHash(trusted_hash)),
Err(_) => Either::Right(CatchUpInstruction::Fatal(
"CatchUp: fatal block store error when attempting to \
read highest complete block"
.to_string(),
)),
}
}
Ok(None) => {
Either::Left(SyncIdentifier::BlockHash(trusted_hash))
}
Err(err) => Either::Right(CatchUpInstruction::Fatal(format!(
"CatchUp: fatal block store error when attempting to read \
highest complete block: {}",
err
))),
}
}
fn catch_up_syncing(
&mut self,
block_hash: BlockHash,
maybe_block_height: Option<u64>,
last_progress: Timestamp,
) -> Either<SyncIdentifier, CatchUpInstruction> {
let idleness = Timestamp::now().saturating_diff(last_progress);
if idleness > self.idle_tolerance {
self.attempts += 1;
warn!(
%last_progress,
remaining_attempts = self.max_attempts.saturating_sub(self.attempts),
"CatchUp: idleness detected"
);
}
match maybe_block_height {
None => Either::Left(SyncIdentifier::BlockHash(block_hash)),
Some(block_height) => {
Either::Left(SyncIdentifier::BlockIdentifier(block_hash, block_height))
}
}
}
fn catch_up_sync_instruction(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
sync_instruction: SyncInstruction,
) -> Option<CatchUpInstruction> {
match sync_instruction {
SyncInstruction::Leap { block_hash }
| SyncInstruction::LeapIntervalElapsed { block_hash } => {
Some(self.catch_up_leap(effect_builder, rng, block_hash))
}
SyncInstruction::BlockSync { block_hash } => {
Some(self.catch_up_block_sync(effect_builder, block_hash))
}
SyncInstruction::CaughtUp { .. } => self.catch_up_check_transition(),
}
}
fn catch_up_leap(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
block_hash: BlockHash,
) -> CatchUpInstruction {
self.block_synchronizer
.register_block_by_hash(block_hash, true);
let leap_status = self.sync_leaper.leap_status();
info!(%block_hash, %leap_status, "CatchUp: status");
match leap_status {
LeapState::Idle => self.catch_up_leaper_idle(effect_builder, rng, block_hash),
LeapState::Awaiting { .. } => CatchUpInstruction::CheckLater(
"sync leaper is awaiting response".to_string(),
self.control_logic_default_delay.into(),
),
LeapState::Received {
best_available,
from_peers,
..
} => self.catch_up_leap_received(effect_builder, rng, *best_available, from_peers),
LeapState::Failed { error, .. } => {
self.catch_up_leap_failed(effect_builder, rng, block_hash, error)
}
}
}
fn catch_up_leap_failed(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
block_hash: BlockHash,
error: LeapActivityError,
) -> CatchUpInstruction {
self.attempts += 1;
warn!(
%error,
remaining_attempts = %self.max_attempts.saturating_sub(self.attempts),
"CatchUp: failed leap",
);
self.catch_up_leaper_idle(effect_builder, rng, block_hash)
}
fn catch_up_leaper_idle(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
block_hash: BlockHash,
) -> CatchUpInstruction {
let peers_to_ask = self.net.fully_connected_peers_random(
rng,
self.chainspec.core_config.simultaneous_peer_requests as usize,
);
if peers_to_ask.is_empty() {
return CatchUpInstruction::CheckLater(
"no peers".to_string(),
self.chainspec.core_config.minimum_block_time.into(),
);
}
self.block_accumulator.reset_last_progress();
let sync_leap_identifier = SyncLeapIdentifier::sync_to_tip(block_hash);
let effects = effect_builder.immediately().event(move |_| {
MainEvent::SyncLeaper(sync_leaper::Event::AttemptLeap {
sync_leap_identifier,
peers_to_ask,
})
});
CatchUpInstruction::Do(self.control_logic_default_delay.into(), effects)
}
fn catch_up_leap_received(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
sync_leap: SyncLeap,
from_peers: Vec<NodeId>,
) -> CatchUpInstruction {
let block_hash = sync_leap.highest_block_hash();
let block_height = sync_leap.highest_block_height();
info!(
%sync_leap,
%block_height,
%block_hash,
"CatchUp: leap received"
);
for validator_weights in sync_leap.era_validator_weights(
self.validator_matrix.fault_tolerance_threshold(),
&self.chainspec.protocol_config,
) {
self.validator_matrix
.register_era_validator_weights(validator_weights);
}
let mut effects = Effects::new();
effects.extend(wrap_effects(
MainEvent::BlockAccumulator,
self.block_accumulator
.handle_validators(effect_builder, rng),
));
effects.extend(wrap_effects(
MainEvent::BlockSynchronizer,
self.block_synchronizer
.handle_validators(effect_builder, rng),
));
self.block_synchronizer
.register_sync_leap(&sync_leap, from_peers, true);
CatchUpInstruction::Do(self.control_logic_default_delay.into(), effects)
}
fn catch_up_block_sync(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
block_hash: BlockHash,
) -> CatchUpInstruction {
if self
.block_synchronizer
.register_block_by_hash(block_hash, true)
{
let mut effects = Effects::new();
effects.extend(effect_builder.immediately().event(|_| {
MainEvent::BlockSynchronizerRequest(BlockSynchronizerRequest::NeedNext)
}));
CatchUpInstruction::Do(Duration::ZERO, effects)
} else {
CatchUpInstruction::CheckLater(
format!("block_synchronizer is currently working on {}", block_hash),
self.control_logic_default_delay.into(),
)
}
}
fn catch_up_check_transition(&mut self) -> Option<CatchUpInstruction> {
if self.should_commit_upgrade() {
return Some(CatchUpInstruction::CommitUpgrade);
}
if self.should_shutdown_for_upgrade() {
Some(CatchUpInstruction::ShutdownForUpgrade)
} else {
None
}
}
}