use std::{
fmt::{Display, Formatter},
time::Duration,
};
use either::Either;
use tracing::{debug, error, info, warn};
use casper_storage::data_access_layer::EraValidatorsRequest;
use casper_types::{ActivationPoint, BlockHash, BlockHeader, EraId, Timestamp};
use crate::{
components::{
block_accumulator::{SyncIdentifier, SyncInstruction},
block_synchronizer::BlockSynchronizerProgress,
storage::HighestOrphanedBlockResult,
sync_leaper,
sync_leaper::{LeapActivityError, LeapState},
},
effect::{
requests::BlockSynchronizerRequest, EffectBuilder, EffectExt, EffectResultExt, Effects,
},
reactor::main_reactor::{MainEvent, MainReactor},
types::{GlobalStatesMetadata, MaxTtl, SyncLeap, SyncLeapIdentifier},
NodeRng,
};
pub(super) enum KeepUpInstruction {
Validate(Effects<MainEvent>),
Do(Duration, Effects<MainEvent>),
CheckLater(String, Duration),
CatchUp,
ShutdownForUpgrade,
Fatal(String),
}
#[derive(Debug, Clone, Copy)]
enum SyncBackInstruction {
Sync {
sync_hash: BlockHash,
sync_era: EraId,
},
Syncing,
TtlSynced,
GenesisSynced,
NoSync,
}
impl Display for SyncBackInstruction {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SyncBackInstruction::Sync { sync_hash, .. } => {
write!(f, "attempt to sync {}", sync_hash)
}
SyncBackInstruction::Syncing => write!(f, "syncing"),
SyncBackInstruction::TtlSynced => write!(f, "ttl reached"),
SyncBackInstruction::GenesisSynced => write!(f, "genesis reached"),
SyncBackInstruction::NoSync => write!(f, "configured to not sync"),
}
}
}
impl MainReactor {
pub(super) fn keep_up_instruction(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
) -> KeepUpInstruction {
if self.should_shutdown_for_upgrade() {
return KeepUpInstruction::ShutdownForUpgrade;
}
let sync_identifier = match self.keep_up_process() {
Either::Right(keep_up_instruction) => return keep_up_instruction,
Either::Left(sync_identifier) => sync_identifier,
};
debug!(
?sync_identifier,
"KeepUp: sync identifier {}",
sync_identifier.block_hash()
);
let sync_instruction = self.block_accumulator.sync_instruction(sync_identifier);
debug!(
?sync_instruction,
"KeepUp: sync_instruction {}",
sync_instruction.block_hash()
);
if let Some(keep_up_instruction) =
self.keep_up_sync_instruction(effect_builder, sync_instruction)
{
return keep_up_instruction;
}
debug!("KeepUp: keeping up with the network; try to sync an historical block");
if let Some(keep_up_instruction) = self.sync_back_keep_up_instruction(effect_builder, rng) {
return keep_up_instruction;
}
self.keep_up_should_validate(effect_builder, rng)
.unwrap_or_else(|| {
KeepUpInstruction::CheckLater(
"node is keeping up".to_string(),
self.control_logic_default_delay.into(),
)
})
}
fn keep_up_should_validate(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
) -> Option<KeepUpInstruction> {
if let ActivationPoint::Genesis(genesis_timestamp) =
self.chainspec.protocol_config.activation_point
{
if genesis_timestamp > Timestamp::now() {
return None;
}
}
if self.sync_handling.is_no_sync() {
return None;
}
if self.block_synchronizer.forward_progress().is_active() {
debug!("KeepUp: still syncing a block");
return None;
}
let queue_depth = self.contract_runtime.queue_depth();
if queue_depth > 0 {
debug!("KeepUp: should_validate queue_depth {}", queue_depth);
return None;
}
match self.create_required_eras(effect_builder, rng) {
Ok(Some(effects)) => Some(KeepUpInstruction::Validate(effects)),
Ok(None) => None,
Err(msg) => Some(KeepUpInstruction::Fatal(msg)),
}
}
fn keep_up_process(&mut self) -> Either<SyncIdentifier, KeepUpInstruction> {
let forward_progress = self.block_synchronizer.forward_progress();
self.update_last_progress(&forward_progress, false);
match forward_progress {
BlockSynchronizerProgress::Idle => {
self.keep_up_idle()
}
BlockSynchronizerProgress::Syncing(block_hash, block_height, _) => {
Either::Left(self.keep_up_syncing(block_hash, block_height))
}
BlockSynchronizerProgress::Executing(block_hash, block_height, era_id) => {
Either::Left(self.keep_up_executing(block_hash, block_height, era_id))
}
BlockSynchronizerProgress::Synced(block_hash, block_height, era_id) => {
Either::Left(self.keep_up_synced(block_hash, block_height, era_id))
}
}
}
fn keep_up_idle(&mut self) -> Either<SyncIdentifier, KeepUpInstruction> {
match self.storage.get_highest_complete_block() {
Ok(Some(block)) => Either::Left(SyncIdentifier::LocalTip(
*block.hash(),
block.height(),
block.era_id(),
)),
Ok(None) => {
error!("KeepUp: block synchronizer idle, local storage has no complete blocks");
Either::Right(KeepUpInstruction::CatchUp)
}
Err(error) => Either::Right(KeepUpInstruction::Fatal(format!(
"failed to read highest complete block: {}",
error
))),
}
}
fn keep_up_syncing(
&mut self,
block_hash: BlockHash,
block_height: Option<u64>,
) -> SyncIdentifier {
match block_height {
None => SyncIdentifier::BlockHash(block_hash),
Some(height) => SyncIdentifier::BlockIdentifier(block_hash, height),
}
}
fn keep_up_executing(
&mut self,
block_hash: BlockHash,
block_height: u64,
era_id: EraId,
) -> SyncIdentifier {
SyncIdentifier::ExecutingBlockIdentifier(block_hash, block_height, era_id)
}
fn keep_up_synced(
&mut self,
block_hash: BlockHash,
block_height: u64,
era_id: EraId,
) -> SyncIdentifier {
debug!("KeepUp: synced block: {}", block_hash);
self.block_synchronizer.purge_forward();
SyncIdentifier::SyncedBlockIdentifier(block_hash, block_height, era_id)
}
fn keep_up_sync_instruction(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
sync_instruction: SyncInstruction,
) -> Option<KeepUpInstruction> {
match sync_instruction {
SyncInstruction::Leap { .. } | SyncInstruction::LeapIntervalElapsed { .. } => {
if !self.sync_handling.is_isolated() {
Some(KeepUpInstruction::CatchUp)
} else {
None
}
}
SyncInstruction::BlockSync { block_hash } => {
debug!("KeepUp: BlockSync: {:?}", block_hash);
if self
.block_synchronizer
.register_block_by_hash(block_hash, false)
{
info!(%block_hash, "KeepUp: BlockSync: registered block by hash");
Some(KeepUpInstruction::Do(
Duration::ZERO,
effect_builder.immediately().event(|_| {
MainEvent::BlockSynchronizerRequest(BlockSynchronizerRequest::NeedNext)
}),
))
} else {
None
}
}
SyncInstruction::CaughtUp { .. } => {
None
}
}
}
fn sync_back_keep_up_instruction(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
) -> Option<KeepUpInstruction> {
let sync_back_progress = self.block_synchronizer.historical_progress();
debug!(?sync_back_progress, "KeepUp: historical sync back progress");
self.update_last_progress(&sync_back_progress, true);
match self.sync_back_instruction(&sync_back_progress) {
Ok(Some(sbi @ sync_back_instruction)) => match sync_back_instruction {
SyncBackInstruction::NoSync
| SyncBackInstruction::GenesisSynced
| SyncBackInstruction::TtlSynced => {
debug!("KeepUp: {}", sbi);
self.block_synchronizer.purge_historical();
self.sync_leaper.purge();
None
}
SyncBackInstruction::Syncing => {
debug!("KeepUp: syncing historical; checking later");
Some(KeepUpInstruction::CheckLater(
format!("historical {}", SyncBackInstruction::Syncing),
self.control_logic_default_delay.into(),
))
}
SyncBackInstruction::Sync {
sync_hash,
sync_era,
} => {
debug!(%sync_hash, ?sync_era, validator_matrix_eras=?self.validator_matrix.eras(), "KeepUp: historical sync back instruction");
if self.validator_matrix.has_era(&sync_era) {
Some(self.sync_back_register(effect_builder, rng, sync_hash))
} else {
Some(self.sync_back_leap(effect_builder, rng, sync_hash))
}
}
},
Ok(None) => None,
Err(msg) => Some(KeepUpInstruction::Fatal(msg)),
}
}
fn try_read_validators_for_block_after_upgrade(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
global_states_metadata: GlobalStatesMetadata,
) -> KeepUpInstruction {
let effects = async move {
let before_era_validators_request =
EraValidatorsRequest::new(global_states_metadata.before_state_hash);
let before_era_validators_result = effect_builder
.get_era_validators_from_contract_runtime(before_era_validators_request)
.await;
let after_era_validators_request =
EraValidatorsRequest::new(global_states_metadata.after_state_hash);
let after_era_validators_result = effect_builder
.get_era_validators_from_contract_runtime(after_era_validators_request)
.await;
let lhs = before_era_validators_result.take_era_validators();
let rhs = after_era_validators_result.take_era_validators();
match (lhs, rhs) {
(Some(before_era_validators), Some(after_era_validators)) => {
Ok((before_era_validators, after_era_validators))
}
(None, None) => Err(vec![
(
global_states_metadata.before_hash,
global_states_metadata.before_state_hash,
),
(
global_states_metadata.after_hash,
global_states_metadata.after_state_hash,
),
]),
(Some(_), None) => Err(vec![(
global_states_metadata.after_hash,
global_states_metadata.after_state_hash,
)]),
(None, Some(_)) => Err(vec![(
global_states_metadata.before_hash,
global_states_metadata.before_state_hash,
)]),
}
}
.result(
move |(before_era_validators, after_era_validators)| {
MainEvent::GotBlockAfterUpgradeEraValidators(
global_states_metadata.after_era_id,
before_era_validators,
after_era_validators,
)
},
|global_states_to_sync| {
MainEvent::BlockSynchronizerRequest(BlockSynchronizerRequest::SyncGlobalStates(
global_states_to_sync,
))
},
);
KeepUpInstruction::Do(Duration::ZERO, effects)
}
fn sync_back_leap(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
parent_hash: BlockHash,
) -> KeepUpInstruction {
let sync_back_status = self.sync_leaper.leap_status();
info!(
"KeepUp: historical sync back status {} {}",
parent_hash, sync_back_status
);
debug!(
?parent_hash,
?sync_back_status,
"KeepUp: historical sync back status"
);
match sync_back_status {
LeapState::Idle => {
debug!("KeepUp: historical sync back idle");
self.sync_back_leaper_idle(effect_builder, rng, parent_hash, Duration::ZERO)
}
LeapState::Awaiting { .. } => KeepUpInstruction::CheckLater(
"KeepUp: historical sync back is awaiting response".to_string(),
self.control_logic_default_delay.into(),
),
LeapState::Received {
best_available,
from_peers: _,
..
} => self.sync_back_leap_received(effect_builder, *best_available),
LeapState::Failed { error, .. } => {
self.sync_back_leap_failed(effect_builder, rng, parent_hash, error)
}
}
}
fn sync_back_leap_failed(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
parent_hash: BlockHash,
error: LeapActivityError,
) -> KeepUpInstruction {
warn!(
%error,
"KeepUp: failed historical sync back",
);
self.sync_back_leaper_idle(
effect_builder,
rng,
parent_hash,
self.control_logic_default_delay.into(),
)
}
fn sync_back_leaper_idle(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
parent_hash: BlockHash,
offset: Duration,
) -> KeepUpInstruction {
let peers_to_ask = self.net.fully_connected_peers_random(
rng,
self.chainspec.core_config.simultaneous_peer_requests as usize,
);
if peers_to_ask.is_empty() {
return KeepUpInstruction::CheckLater(
"no peers".to_string(),
self.control_logic_default_delay.into(),
);
}
self.block_accumulator.reset_last_progress();
let sync_leap_identifier = SyncLeapIdentifier::sync_to_historical(parent_hash);
let effects = effect_builder.immediately().event(move |_| {
MainEvent::SyncLeaper(sync_leaper::Event::AttemptLeap {
sync_leap_identifier,
peers_to_ask,
})
});
KeepUpInstruction::Do(offset, effects)
}
fn sync_back_leap_received(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
sync_leap: SyncLeap,
) -> KeepUpInstruction {
let block_hash = sync_leap.highest_block_hash();
let block_height = sync_leap.highest_block_height();
info!(%sync_leap, %block_height, %block_hash, "KeepUp: historical sync_back received");
let era_validator_weights = sync_leap.era_validator_weights(
self.validator_matrix.fault_tolerance_threshold(),
&self.chainspec.protocol_config,
);
for evw in era_validator_weights {
let era_id = evw.era_id();
debug!(%era_id, "KeepUp: attempt to register historical validators for era");
if self.validator_matrix.register_era_validator_weights(evw) {
info!("KeepUp: got historical era {}", era_id);
} else {
debug!(%era_id, "KeepUp: historical era already present or is not relevant");
}
}
if let Some(global_states_metadata) = sync_leap.global_states_for_sync_across_upgrade() {
self.try_read_validators_for_block_after_upgrade(effect_builder, global_states_metadata)
} else {
KeepUpInstruction::CheckLater(
"historical sync back received".to_string(),
Duration::ZERO,
)
}
}
fn sync_back_register(
&mut self,
effect_builder: EffectBuilder<MainEvent>,
rng: &mut NodeRng,
parent_hash: BlockHash,
) -> KeepUpInstruction {
if self
.block_synchronizer
.register_block_by_hash(parent_hash, true)
{
let peers_to_ask = self.net.fully_connected_peers_random(
rng,
self.chainspec.core_config.simultaneous_peer_requests as usize,
);
debug!(
"KeepUp: historical register_block_by_hash: {} peers count: {:?}",
parent_hash,
peers_to_ask.len()
);
self.block_synchronizer
.register_peers(parent_hash, peers_to_ask);
KeepUpInstruction::Do(
Duration::ZERO,
effect_builder.immediately().event(|_| {
MainEvent::BlockSynchronizerRequest(BlockSynchronizerRequest::NeedNext)
}),
)
} else {
KeepUpInstruction::CheckLater(
format!("historical syncing {}", parent_hash),
self.control_logic_default_delay.into(),
)
}
}
fn sync_back_instruction(
&mut self,
block_synchronizer_progress: &BlockSynchronizerProgress,
) -> Result<Option<SyncBackInstruction>, String> {
match block_synchronizer_progress {
BlockSynchronizerProgress::Syncing(_, _, _) => {
debug!("KeepUp: still syncing historical block");
return Ok(Some(SyncBackInstruction::Syncing));
}
BlockSynchronizerProgress::Executing(block_hash, height, _) => {
warn!(
%block_hash,
%height,
"Historical block synchronizer should not be waiting for the block to be executed"
);
}
BlockSynchronizerProgress::Idle | BlockSynchronizerProgress::Synced(_, _, _) => {}
}
match self.storage.get_highest_orphaned_block_header() {
HighestOrphanedBlockResult::Orphan(highest_orphaned_block_header) => {
if let Some(synched) = self.synched(&highest_orphaned_block_header)? {
debug!(?synched, "synched result");
return Ok(Some(synched));
}
let (sync_hash, sync_era) =
self.sync_hash_and_era(&highest_orphaned_block_header)?;
debug!(?sync_era, %sync_hash, "KeepUp: historical sync target era and block hash");
self.validator_matrix
.register_retrograde_latch(Some(sync_era));
Ok(Some(SyncBackInstruction::Sync {
sync_hash,
sync_era,
}))
}
HighestOrphanedBlockResult::MissingHeader(height) => Err(format!(
"KeepUp: storage is missing historical block header for height {}",
height
)),
HighestOrphanedBlockResult::MissingHighestSequence => {
Err("KeepUp: storage is missing historical highest block sequence".to_string())
}
}
}
fn synched(
&self,
highest_orphaned_block_header: &BlockHeader,
) -> Result<Option<SyncBackInstruction>, String> {
if self.sync_handling.is_no_sync() {
return Ok(Some(SyncBackInstruction::NoSync));
}
if highest_orphaned_block_header.is_genesis() {
return Ok(Some(SyncBackInstruction::GenesisSynced));
}
if self.sync_handling.is_sync_to_genesis() {
return Ok(None);
}
if let Some(highest_switch_block_header) = self
.storage
.read_highest_switch_block_headers(1)
.map_err(|err| err.to_string())?
.last()
{
debug!(
highest_switch_timestamp=?highest_switch_block_header.timestamp(),
highest_orphaned_timestamp=?highest_orphaned_block_header.timestamp(),
"checking max ttl");
let max_ttl: MaxTtl = self.chainspec.transaction_config.max_ttl.into();
if max_ttl.synced_to_ttl(
highest_switch_block_header.timestamp(),
highest_orphaned_block_header,
) {
debug!("is synced to ttl");
return Ok(Some(SyncBackInstruction::TtlSynced));
}
}
Ok(None)
}
fn sync_hash_and_era(
&self,
highest_orphaned_block_header: &BlockHeader,
) -> Result<(BlockHash, EraId), String> {
let parent_hash = highest_orphaned_block_header.parent_hash();
debug!(?highest_orphaned_block_header, %parent_hash, "KeepUp: highest orphaned historical block");
if highest_orphaned_block_header.era_id().is_genesis()
&& !self
.validator_matrix
.has_era(&highest_orphaned_block_header.era_id())
{
match self
.storage
.get_switch_block_by_era_id(&highest_orphaned_block_header.era_id().successor())
{
Ok(Some(switch)) => {
debug!(
?highest_orphaned_block_header,
"KeepUp: historical sync in genesis era attempting correction for unmatrixed genesis validators"
);
return Ok((*switch.hash(), switch.era_id()));
}
Ok(None) => return Err(
"In genesis era with no genesis validators and missing next era switch block"
.to_string(),
),
Err(err) => return Err(err.to_string()),
}
}
match self.storage.read_block_header_by_hash(parent_hash) {
Ok(Some(parent_block_header)) => {
debug!(
?parent_block_header,
"KeepUp: historical sync found parent block header in storage"
);
Ok((
parent_block_header.block_hash(),
parent_block_header.era_id(),
))
}
Ok(None) => {
debug!(%parent_hash, "KeepUp: historical sync did not find block header in storage");
let era_id = match highest_orphaned_block_header.era_id().predecessor() {
None => EraId::from(0),
Some(predecessor) => {
predecessor
}
};
Ok((*parent_hash, era_id))
}
Err(err) => Err(err.to_string()),
}
}
}
#[cfg(test)]
pub(crate) fn synced_to_ttl(
latest_switch_block_header: &BlockHeader,
highest_orphaned_block_header: &BlockHeader,
max_ttl: casper_types::TimeDiff,
) -> Result<bool, String> {
Ok(highest_orphaned_block_header.height() == 0
|| is_timestamp_at_ttl(
latest_switch_block_header.timestamp(),
highest_orphaned_block_header.timestamp(),
max_ttl,
))
}
#[cfg(test)]
fn is_timestamp_at_ttl(
latest_switch_block_timestamp: Timestamp,
lowest_block_timestamp: Timestamp,
max_ttl: casper_types::TimeDiff,
) -> bool {
lowest_block_timestamp < latest_switch_block_timestamp.saturating_sub(max_ttl)
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use casper_types::{testing::TestRng, TestBlockBuilder, TimeDiff, Timestamp};
use crate::reactor::main_reactor::keep_up::{is_timestamp_at_ttl, synced_to_ttl};
const TWO_DAYS_SECS: u32 = 60 * 60 * 24 * 2;
const MAX_TTL: TimeDiff = TimeDiff::from_seconds(86400);
#[test]
fn should_be_at_ttl() {
let latest_switch_block_timestamp = Timestamp::from_str("2010-06-15 00:00:00.000").unwrap();
let lowest_block_timestamp = Timestamp::from_str("2010-06-10 00:00:00.000").unwrap();
let max_ttl = TimeDiff::from_seconds(TWO_DAYS_SECS);
assert!(is_timestamp_at_ttl(
latest_switch_block_timestamp,
lowest_block_timestamp,
max_ttl
));
}
#[test]
fn should_not_be_at_ttl() {
let latest_switch_block_timestamp = Timestamp::from_str("2010-06-15 00:00:00.000").unwrap();
let lowest_block_timestamp = Timestamp::from_str("2010-06-14 00:00:00.000").unwrap();
let max_ttl = TimeDiff::from_seconds(TWO_DAYS_SECS);
assert!(!is_timestamp_at_ttl(
latest_switch_block_timestamp,
lowest_block_timestamp,
max_ttl
));
}
#[test]
fn should_detect_ttl_at_the_boundary() {
let latest_switch_block_timestamp = Timestamp::from_str("2010-06-15 00:00:00.000").unwrap();
let lowest_block_timestamp = Timestamp::from_str("2010-06-12 23:59:59.999").unwrap();
let max_ttl = TimeDiff::from_seconds(TWO_DAYS_SECS);
assert!(is_timestamp_at_ttl(
latest_switch_block_timestamp,
lowest_block_timestamp,
max_ttl
));
let latest_switch_block_timestamp = Timestamp::from_str("2010-06-15 00:00:00.000").unwrap();
let lowest_block_timestamp = Timestamp::from_str("2010-06-13 00:00:00.000").unwrap();
let max_ttl = TimeDiff::from_seconds(TWO_DAYS_SECS);
assert!(!is_timestamp_at_ttl(
latest_switch_block_timestamp,
lowest_block_timestamp,
max_ttl
));
let latest_switch_block_timestamp = Timestamp::from_str("2010-06-15 00:00:00.000").unwrap();
let lowest_block_timestamp = Timestamp::from_str("2010-06-13 00:00:00.001").unwrap();
let max_ttl = TimeDiff::from_seconds(TWO_DAYS_SECS);
assert!(!is_timestamp_at_ttl(
latest_switch_block_timestamp,
lowest_block_timestamp,
max_ttl
));
}
#[test]
fn should_detect_ttl_at_genesis() {
let rng = &mut TestRng::new();
let latest_switch_block = TestBlockBuilder::new()
.era(100)
.height(1000)
.switch_block(true)
.build_versioned(rng);
let latest_orphaned_block = TestBlockBuilder::new()
.era(0)
.height(0)
.switch_block(true)
.build_versioned(rng);
assert_eq!(latest_orphaned_block.height(), 0);
assert_eq!(
synced_to_ttl(
&latest_switch_block.clone_header(),
&latest_orphaned_block.clone_header(),
MAX_TTL
),
Ok(true)
);
}
}