pub(crate) mod test_utils;
use std::{
cmp::min,
collections::{BTreeMap, VecDeque},
convert::TryInto,
iter,
time::Duration,
};
use assert_matches::assert_matches;
use derive_more::From;
use num_rational::Ratio;
use rand::{seq::IteratorRandom, Rng};
use casper_storage::data_access_layer::ExecutionResultsChecksumResult;
use casper_types::{
global_state::TrieMerkleProof, testing::TestRng, AccessRights, BlockV2, CLValue,
ChainNameDigest, Chainspec, Deploy, Digest, EraId, FinalitySignatureV2, Key,
LegacyRequiredFinality, ProtocolVersion, PublicKey, SecretKey, StoredValue, TestBlockBuilder,
TestBlockV1Builder, TimeDiff, URef, U512,
};
use super::*;
use crate::{
components::{
block_synchronizer::block_acquisition::BlockAcquisitionState,
consensus::tests::utils::{ALICE_PUBLIC_KEY, ALICE_SECRET_KEY},
},
effect::Effect,
reactor::{EventQueueHandle, QueueKind, Scheduler},
tls::KeyFingerprint,
types::{BlockExecutionResultsOrChunkId, ValueOrChunk},
utils,
};
const MAX_SIMULTANEOUS_PEERS: u8 = 5;
const TEST_LATCH_RESET_INTERVAL_MILLIS: u64 = 5;
const SHOULD_FETCH_EXECUTION_STATE: bool = true;
const STRICT_FINALITY_REQUIRED_VERSION: ProtocolVersion = ProtocolVersion::from_parts(1, 5, 0);
#[derive(Debug, From)]
enum MockReactorEvent {
MarkBlockCompletedRequest(#[allow(dead_code)] MarkBlockCompletedRequest),
BlockFetcherRequest(FetcherRequest<Block>),
BlockHeaderFetcherRequest(FetcherRequest<BlockHeader>),
LegacyDeployFetcherRequest(FetcherRequest<LegacyDeploy>),
TransactionFetcherRequest(FetcherRequest<Transaction>),
FinalitySignatureFetcherRequest(FetcherRequest<FinalitySignature>),
TrieOrChunkFetcherRequest(#[allow(dead_code)] FetcherRequest<TrieOrChunk>),
BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest<BlockExecutionResultsOrChunk>),
SyncLeapFetcherRequest(#[allow(dead_code)] FetcherRequest<SyncLeap>),
ApprovalsHashesFetcherRequest(FetcherRequest<ApprovalsHashes>),
NetworkInfoRequest(NetworkInfoRequest),
BlockAccumulatorRequest(BlockAccumulatorRequest),
PeerBehaviorAnnouncement(#[allow(dead_code)] PeerBehaviorAnnouncement),
StorageRequest(StorageRequest),
TrieAccumulatorRequest(#[allow(dead_code)] TrieAccumulatorRequest),
ContractRuntimeRequest(ContractRuntimeRequest),
SyncGlobalStateRequest(SyncGlobalStateRequest),
MakeBlockExecutableRequest(MakeBlockExecutableRequest),
MetaBlockAnnouncement(MetaBlockAnnouncement),
}
struct MockReactor {
scheduler: &'static Scheduler<MockReactorEvent>,
effect_builder: EffectBuilder<MockReactorEvent>,
}
impl MockReactor {
fn new() -> Self {
let scheduler = utils::leak(Scheduler::new(QueueKind::weights(), None));
let event_queue_handle = EventQueueHandle::without_shutdown(scheduler);
let effect_builder = EffectBuilder::new(event_queue_handle);
MockReactor {
scheduler,
effect_builder,
}
}
fn effect_builder(&self) -> EffectBuilder<MockReactorEvent> {
self.effect_builder
}
async fn crank(&self) -> MockReactorEvent {
let ((_ancestor, reactor_event), _) = self.scheduler.pop().await;
reactor_event
}
async fn process_effects(
&self,
effects: impl IntoIterator<Item = Effect<Event>>,
) -> Vec<MockReactorEvent> {
let mut events = Vec::new();
for effect in effects {
tokio::spawn(effect);
let event = self.crank().await;
events.push(event);
}
events
}
}
struct TestEnv {
block: Block,
validator_keys: Vec<Arc<SecretKey>>,
peers: Vec<NodeId>,
}
impl TestEnv {
fn with_block(self, block: Block) -> Self {
Self {
block,
validator_keys: self.validator_keys,
peers: self.peers,
}
}
fn block(&self) -> &Block {
&self.block
}
fn validator_keys(&self) -> &Vec<Arc<SecretKey>> {
&self.validator_keys
}
fn peers(&self) -> &Vec<NodeId> {
&self.peers
}
fn gen_validator_matrix(&self) -> ValidatorMatrix {
let validator_weights: BTreeMap<PublicKey, U512> = self
.validator_keys
.iter()
.map(|key| (PublicKey::from(key.as_ref()), 100.into())) .collect();
assert_eq!(validator_weights.len(), self.validator_keys.len());
let mut validator_matrix = ValidatorMatrix::new(
Ratio::new(1, 3),
ChainNameDigest::from_chain_name("casper-example"),
None,
EraId::from(0),
self.validator_keys[0].clone(),
PublicKey::from(self.validator_keys[0].as_ref()),
1,
3,
);
validator_matrix.register_validator_weights(self.block.era_id(), validator_weights);
validator_matrix
}
fn random(rng: &mut TestRng) -> TestEnv {
let num_validators: usize = rng.gen_range(10..100);
let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(SecretKey::random(rng)))
.take(num_validators)
.collect();
let num_peers = rng.gen_range(10..20);
TestEnv {
block: TestBlockBuilder::new().build(rng).into(),
validator_keys,
peers: iter::repeat(())
.take(num_peers)
.map(|_| NodeId::from(rng.gen::<KeyFingerprint>()))
.collect(),
}
}
}
fn check_sync_global_state_event(event: MockReactorEvent, block: &Block) {
assert!(matches!(
event,
MockReactorEvent::SyncGlobalStateRequest { .. }
));
let global_sync_request = match event {
MockReactorEvent::SyncGlobalStateRequest(req) => req,
_ => unreachable!(),
};
assert_eq!(global_sync_request.block_hash, *block.hash());
assert_eq!(
global_sync_request.state_root_hash,
*block.state_root_hash()
);
}
async fn need_next(
rng: &mut TestRng,
reactor: &MockReactor,
block_synchronizer: &mut BlockSynchronizer,
num_expected_events: u8,
) -> Vec<MockReactorEvent> {
let effects = block_synchronizer.need_next(reactor.effect_builder(), rng);
assert_eq!(effects.len() as u8, num_expected_events);
reactor.process_effects(effects).await
}
fn register_multiple_signatures<'a, I: IntoIterator<Item = &'a Arc<SecretKey>>>(
builder: &mut BlockBuilder,
block: &Block,
validator_keys_iter: I,
chain_name_hash: ChainNameDigest,
) {
for secret_key in validator_keys_iter {
let signature = FinalitySignatureV2::create(
*block.hash(),
block.height(),
block.era_id(),
chain_name_hash,
secret_key.as_ref(),
);
assert!(signature.is_verified().is_ok());
assert!(builder
.register_finality_signature(signature.into(), None)
.is_ok());
}
}
fn dummy_merkle_proof() -> TrieMerkleProof<Key, StoredValue> {
TrieMerkleProof::new(
URef::new([255; 32], AccessRights::NONE).into(),
StoredValue::CLValue(CLValue::from_t(()).unwrap()),
VecDeque::new(),
)
}
trait OneExt: IntoIterator {
fn try_one(self) -> Option<Self::Item>;
fn one(self) -> Self::Item;
}
impl<I: IntoIterator> OneExt for I {
fn try_one(self) -> Option<Self::Item> {
let mut it = self.into_iter();
let first = it.next()?;
it.next().is_none().then_some(first)
}
#[track_caller]
fn one(self) -> Self::Item {
let mut it = self.into_iter();
let first = it
.next()
.expect("no element in the iterator, but 1 was expected");
if it.next().is_some() {
panic!("more that 1 element in the iterator, but 1 was expected")
}
first
}
}
#[cfg(test)]
impl BlockSynchronizer {
fn new_initialized(
rng: &mut TestRng,
validator_matrix: ValidatorMatrix,
config: Config,
) -> BlockSynchronizer {
let mut block_synchronizer = BlockSynchronizer::new(
config,
Arc::new(Chainspec::random(rng)),
MAX_SIMULTANEOUS_PEERS,
validator_matrix,
&Registry::new(),
)
.expect("Failed to create BlockSynchronizer");
<BlockSynchronizer as InitializedComponent<MainEvent>>::set_state(
&mut block_synchronizer,
ComponentState::Initialized,
);
block_synchronizer
}
fn with_legacy_finality(mut self, legacy_required_finality: LegacyRequiredFinality) -> Self {
let core_config = &mut Arc::get_mut(&mut self.chainspec).unwrap().core_config;
core_config.start_protocol_version_with_strict_finality_signatures_required =
STRICT_FINALITY_REQUIRED_VERSION;
core_config.legacy_required_finality = legacy_required_finality;
self
}
fn forward_builder(&self) -> &BlockBuilder {
self.forward.as_ref().expect("Forward builder missing")
}
}
fn weak_finality_threshold(n: usize) -> usize {
n / 3 + 1
}
fn strict_finality_threshold(n: usize) -> usize {
n * 2 / 3 + 1
}
fn latch_inner_check(builder: Option<&BlockBuilder>, expected: bool, msg: &str) {
assert_eq!(
builder.expect("builder should exist").latched(),
expected,
"{}",
msg
);
}
fn latch_count_check(builder: Option<&BlockBuilder>, expected: u8, msg: &str) {
assert_eq!(
builder.expect("builder should exist").latch_count(),
expected,
"{}",
msg
);
}
fn need_next_inner_check(
builder: Option<&mut BlockBuilder>,
rng: &mut TestRng,
expected: NeedNext,
msg: &str,
) {
let need_next = builder
.expect("should exist")
.block_acquisition_action(rng, MAX_SIMULTANEOUS_PEERS)
.need_next();
assert_eq!(need_next, expected, "{}", msg);
}
#[tokio::test]
async fn global_state_sync_wont_stall_with_bad_peers() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng).with_block(
TestBlockBuilder::new()
.era(1)
.random_transactions(1, &mut rng)
.build(&mut rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let cfg = Config {
latch_reset_interval: TimeDiff::from_millis(TEST_LATCH_RESET_INTERVAL_MILLIS),
..Default::default()
};
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, cfg);
block_synchronizer.register_block_by_hash(*block.hash(), true);
assert!(
block_synchronizer.historical.is_some(),
"we only get global state on historical sync"
);
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer.historical.as_mut().unwrap();
assert!(
historical_builder
.register_block_header(block.clone_header(), None)
.is_ok(),
"historical builder should register header"
);
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(
historical_builder
.register_block(block.clone(), None)
.is_ok(),
"should register block"
);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
let mut effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
assert_eq!(
effects.len(),
1,
"need next should have 1 effect at this step, not {}",
effects.len()
);
tokio::spawn(async move { effects.remove(0).await });
let event = mock_reactor.crank().await;
let first_peer_set = peers.iter().copied().choose_multiple(&mut rng, 4);
check_sync_global_state_event(event, block);
tokio::time::sleep(Duration::from_millis(TEST_LATCH_RESET_INTERVAL_MILLIS * 2)).await;
block_synchronizer.global_state_synced(
*block.hash(),
Err(GlobalStateSynchronizerError::TrieAccumulator(
first_peer_set.to_vec(),
)),
);
let mut effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
assert_eq!(
effects.len(),
1,
"need next should still have 1 effect at this step, not {}",
effects.len()
);
tokio::spawn(async move { effects.remove(0).await });
let event = mock_reactor.crank().await;
let second_peer_set = peers.iter().copied().choose_multiple(&mut rng, 4);
check_sync_global_state_event(event, block);
tokio::time::sleep(Duration::from_millis(TEST_LATCH_RESET_INTERVAL_MILLIS * 2)).await;
let unreliable_peers = second_peer_set.into_iter().choose_multiple(&mut rng, 2);
block_synchronizer.global_state_synced(
*block.hash(),
Ok(GlobalStateSynchronizerResponse::new(
(*block.state_root_hash()).into(),
unreliable_peers.clone(),
)),
);
let mut effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
assert_eq!(
effects.len(),
1,
"need next should still have 1 effect after global state sync'd, not {}",
effects.len()
);
tokio::spawn(async move { effects.remove(0).await });
let event = mock_reactor.crank().await;
assert!(
false == matches!(event, MockReactorEvent::SyncGlobalStateRequest { .. }),
"synchronizer should have progressed"
);
for peer in unreliable_peers.iter() {
assert!(
block_synchronizer
.historical
.as_ref()
.unwrap()
.peer_list()
.is_peer_unreliable(peer),
"{} should be marked unreliable",
peer
);
}
}
#[tokio::test]
async fn synchronizer_doesnt_busy_loop_without_peers() {
fn check_need_peer_events(expected_block_hash: BlockHash, events: Vec<MockReactorEvent>) {
assert_matches!(
events[0],
MockReactorEvent::NetworkInfoRequest(NetworkInfoRequest::FullyConnectedPeers {
count,
..
}) if count == MAX_SIMULTANEOUS_PEERS as usize
);
assert_matches!(
events[1],
MockReactorEvent::BlockAccumulatorRequest(BlockAccumulatorRequest::GetPeersForBlock {
block_hash,
..
}) if block_hash == expected_block_hash
);
}
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng).with_block(
TestBlockBuilder::new()
.era(1)
.random_transactions(1, &mut rng)
.build(&mut rng)
.into(),
);
let block = test_env.block();
let block_hash = *block.hash();
let validator_matrix = test_env.gen_validator_matrix();
let cfg = Config {
latch_reset_interval: TimeDiff::from_millis(TEST_LATCH_RESET_INTERVAL_MILLIS),
..Default::default()
};
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, cfg);
block_synchronizer.register_block_by_hash(block_hash, true);
latch_inner_check(
block_synchronizer.historical.as_ref(),
false,
"initial set up, should not be latched",
);
{
need_next_inner_check(
block_synchronizer.historical.as_mut(),
&mut rng,
NeedNext::Peers(block_hash),
"should need peers",
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::Request(BlockSynchronizerRequest::NeedNext),
);
assert_eq!(effects.len(), 2, "we should ask for peers from both networking and accumulator, thus two effects are expected");
latch_inner_check(
block_synchronizer.historical.as_ref(),
true,
"should be latched waiting for peers",
);
check_need_peer_events(block_hash, mock_reactor.process_effects(effects).await);
}
{
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::NetworkPeers(*block.hash(), vec![]),
);
latch_inner_check(
block_synchronizer.historical.as_ref(),
true,
"should still be latched because only one response was received and it \
did not have what we needed.",
);
assert!(effects.is_empty(), "effects should be empty");
}
{
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::AccumulatedPeers(*block.hash(), None),
);
assert!(!effects.is_empty(), "we should still need peers...");
latch_inner_check(
block_synchronizer.historical.as_ref(),
true,
"we need peers, ask again",
);
need_next_inner_check(
block_synchronizer.historical.as_mut(),
&mut rng,
NeedNext::Peers(block_hash),
"should need peers",
);
check_need_peer_events(block_hash, mock_reactor.process_effects(effects).await);
}
}
#[tokio::test]
async fn should_not_stall_after_registering_new_era_validator_weights() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let block_hash = *block.hash();
let era_id = block.era_id();
let mut validator_matrix = ValidatorMatrix::new_with_validator(ALICE_SECRET_KEY.clone());
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix.clone(), Config::default());
block_synchronizer.register_block_by_hash(block_hash, true);
block_synchronizer.register_peers(block_hash, peers.clone());
block_synchronizer
.historical
.as_mut()
.expect("should have historical builder")
.register_block_header(block.clone_header(), None)
.expect("should register block header");
latch_inner_check(
block_synchronizer.historical.as_ref(),
false,
"initial set up, should not be latched",
);
need_next_inner_check(
block_synchronizer.historical.as_mut(),
&mut rng,
NeedNext::EraValidators(era_id),
"should need era validators for era block is in",
);
let effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
assert_eq!(
effects.len(),
MAX_SIMULTANEOUS_PEERS as usize,
"need next should have an effect per peer when needing sync leap"
);
latch_inner_check(
block_synchronizer.historical.as_ref(),
true,
"after determination that we need validators, should be latched",
);
assert!(
block_synchronizer
.need_next(mock_reactor.effect_builder(), &mut rng)
.is_empty(),
"should return no effects while latched"
);
for effect in effects {
tokio::spawn(effect);
let event = mock_reactor.crank().await;
match event {
MockReactorEvent::SyncLeapFetcherRequest(_) => (),
_ => panic!("unexpected event: {:?}", event),
};
}
validator_matrix.register_validator_weights(
era_id,
iter::once((ALICE_PUBLIC_KEY.clone(), 100.into())).collect(),
);
block_synchronizer
.historical
.as_mut()
.expect("should have historical builder")
.register_era_validator_weights(&validator_matrix);
latch_inner_check(
block_synchronizer.historical.as_ref(),
false,
"after registering validators, should not be latched",
);
need_next_inner_check(
block_synchronizer.historical.as_mut(),
&mut rng,
NeedNext::FinalitySignatures(block_hash, era_id, validator_matrix.public_keys(&era_id)),
"should need finality sigs",
);
let mut effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
assert_eq!(
effects.len(),
1,
"need next should produce 1 effect because we currently need exactly 1 signature \
NOTE: finality signatures are a special case; we currently we fan out 1 peer per signature \
but do multiple rounds of this against increasingly strict weight thresholds. \
All other fetchers fan out by asking each of MAX_SIMULTANEOUS_PEERS for the _same_ item."
);
tokio::spawn(async move { effects.remove(0).await });
let event = mock_reactor.crank().await;
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id.block_hash() == block.hash()
);
}
#[test]
fn duplicate_register_block_not_allowed_if_builder_is_not_failed() {
let mut rng = TestRng::new();
let test_env = TestEnv::random(&mut rng);
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
assert!(!block_synchronizer.register_block_by_hash(*block.hash(), false));
let new_block: Block = TestBlockBuilder::new().build(&mut rng).into();
assert!(block_synchronizer.register_block_by_hash(*new_block.hash(), false));
assert_eq!(
block_synchronizer.forward.unwrap().block_hash(),
*new_block.hash()
);
}
#[tokio::test]
async fn historical_sync_gets_peers_form_both_connected_peers_and_accumulator() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.historical.is_some());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::Request(BlockSynchronizerRequest::NeedNext),
);
assert_eq!(effects.len(), 2);
let events = mock_reactor.process_effects(effects).await;
assert_matches!(
events[0],
MockReactorEvent::NetworkInfoRequest(NetworkInfoRequest::FullyConnectedPeers {
count,
..
}) if count == MAX_SIMULTANEOUS_PEERS as usize
);
assert_matches!(
events[1],
MockReactorEvent::BlockAccumulatorRequest(BlockAccumulatorRequest::GetPeersForBlock {
block_hash,
..
}) if block_hash == *block.hash()
)
}
#[tokio::test]
async fn fwd_sync_gets_peers_only_from_accumulator() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::Request(BlockSynchronizerRequest::NeedNext),
);
assert_eq!(effects.len(), 1);
let events = mock_reactor.process_effects(effects).await;
assert_matches!(
events[0],
MockReactorEvent::BlockAccumulatorRequest(BlockAccumulatorRequest::GetPeersForBlock {
block_hash,
..
}) if block_hash == *block.hash()
)
}
#[tokio::test]
async fn sync_starts_with_header_fetch() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let block = test_env.block();
let peers = test_env.peers();
let validator_matrix = test_env.gen_validator_matrix();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
MAX_SIMULTANEOUS_PEERS,
)
.await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockHeaderFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == *block.hash()
);
}
}
#[tokio::test]
async fn fwd_sync_is_not_blocked_by_failed_header_fetch_within_latch_interval() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let block = test_env.block();
let block_hash = *block.hash();
let peers = test_env.peers();
let validator_matrix = test_env.gen_validator_matrix();
let cfg = Config {
..Default::default()
};
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, cfg);
assert!(
block_synchronizer.register_block_by_hash(block_hash, false),
"should register block by hash"
);
assert!(
block_synchronizer.forward.is_some(),
"should have forward sync"
);
block_synchronizer.register_peers(block_hash, peers.clone());
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
MAX_SIMULTANEOUS_PEERS,
)
.await;
let initial_progress = block_synchronizer
.forward
.as_ref()
.expect("should exist")
.last_progress_time();
latch_inner_check(
block_synchronizer.forward.as_ref(),
true,
"forward builder should be latched after need next call",
);
let mut peers_asked = Vec::new();
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockHeaderFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == block_hash => {
peers_asked.push(peer);
},
"should be block header fetch"
);
}
let mut generated_effects = Effects::new();
for peer in peers_asked {
latch_inner_check(
block_synchronizer.forward.as_ref(),
true,
&format!("response from peer: {:?}, but should still be latched until after final response received", peer),
);
assert!(
generated_effects.is_empty(),
"effects should remain empty until last response"
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockHeaderFetched(Err(FetcherError::Absent {
id: Box::new(*block.hash()),
peer,
})),
);
generated_effects.extend(effects);
}
need_next_inner_check(
block_synchronizer.forward.as_mut(),
&mut rng,
NeedNext::BlockHeader(block_hash),
"should need block header",
);
assert!(
!generated_effects.is_empty(),
"should have gotten effects after the final response tail called into need next"
);
latch_inner_check(
block_synchronizer.forward.as_ref(),
true,
"all requests have been responded to, and the last event response should have \
resulted in a fresh need next being reported and thus a new latch",
);
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == block_hash,
"should be syncing"
);
tokio::time::sleep(Duration::from(cfg.need_next_interval)).await;
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == *block.hash()
);
let current_progress = block_synchronizer
.forward
.as_ref()
.expect("should exist")
.last_progress_time();
assert_eq!(
initial_progress, current_progress,
"we have not gotten the record we need, so progress should remain the same"
)
}
#[tokio::test]
async fn registering_header_successfully_triggers_signatures_fetch_for_weak_finality() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
MAX_SIMULTANEOUS_PEERS,
)
.await;
let mut peers_asked = Vec::new();
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockHeaderFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == *block.hash() => {
peers_asked.push(peer);
}
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockHeaderFetched(Ok(FetchedData::FromPeer {
item: Box::new(block.clone_header()),
peer: peers_asked[0],
})),
);
let fwd_builder = block_synchronizer.forward_builder();
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlockHeader(header, _) if header.block_hash() == *block.hash()
);
assert!(fwd_builder.peer_list().is_peer_reliable(&peers_asked[0]));
assert_eq!(
effects.len(),
min(
test_env.validator_keys().len(),
MAX_SIMULTANEOUS_PEERS as usize,
)
);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id.block_hash() == block.hash() && id.era_id() == block.era_id()
);
}
}
#[tokio::test]
async fn fwd_more_signatures_are_requested_if_weak_finality_is_not_reached() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlockHeader(header, _) if header.block_hash() == *block.hash()
);
let signature = FinalitySignatureV2::create(
*block.hash(),
block.height(),
block.era_id(),
chain_name_hash,
validators_secret_keys[0].as_ref(),
);
assert!(signature.is_verified().is_ok());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::FinalitySignatureFetched(Ok(FetchedData::FromPeer {
item: Box::new(signature.into()),
peer: peers[0],
})),
);
assert_eq!(
effects.len(),
min(
validators_secret_keys.len() - 1,
MAX_SIMULTANEOUS_PEERS as usize,
)
);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id.block_hash(), block.hash());
assert_eq!(id.era_id(), block.era_id());
assert_ne!(*id.public_key(), PublicKey::from(validators_secret_keys[0].as_ref()));
}
);
}
let mut generated_effects = Effects::new();
for secret_key in validators_secret_keys
.iter()
.skip(1)
.take(weak_finality_threshold(validators_secret_keys.len()))
{
let signature = FinalitySignatureV2::create(
*block.hash(),
block.height(),
block.era_id(),
chain_name_hash,
secret_key.as_ref(),
);
assert!(signature.is_verified().is_ok());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::FinalitySignatureFetched(Ok(FetchedData::FromPeer {
item: Box::new(signature.into()),
peer: peers[2],
})),
);
generated_effects.extend(effects);
}
let events = mock_reactor
.process_effects(
generated_effects
.into_iter()
.rev()
.take(MAX_SIMULTANEOUS_PEERS as usize),
)
.await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id, *block.hash());
}
);
}
}
#[tokio::test]
async fn fwd_sync_is_not_blocked_by_failed_signatures_fetch_within_latch_interval() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let expected_block_hash = *block.hash();
let era_id = block.era_id();
let validator_matrix = test_env.gen_validator_matrix();
let num_validators = test_env.validator_keys().len() as u8;
let cfg = Config {
..Default::default()
};
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, cfg);
assert!(block_synchronizer.register_block_by_hash(expected_block_hash, false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(expected_block_hash, peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlockHeader(header, _) if header.block_hash() == expected_block_hash
);
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
min(num_validators, MAX_SIMULTANEOUS_PEERS),
)
.await;
let mut sigs_requested = Vec::new();
for event in events {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(*id.block_hash(), expected_block_hash);
assert_eq!(id.era_id(), era_id);
sigs_requested.push((peer, id.public_key().clone()));
}
);
}
let mut generated_effects = Effects::new();
for (peer, public_key) in sigs_requested {
latch_inner_check(
block_synchronizer.forward.as_ref(),
true,
&format!("response from peer: {:?}, but should still be latched until after final response received", peer),
);
assert!(
generated_effects.is_empty(),
"effects should remain empty until last response"
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::FinalitySignatureFetched(Err(FetcherError::Absent {
id: Box::new(Box::new(FinalitySignatureId::new(
expected_block_hash,
era_id,
public_key,
))),
peer,
})),
);
generated_effects.extend(effects);
}
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == expected_block_hash,
"should be syncing"
);
assert!(
!generated_effects.is_empty(),
"should have gotten effects after the final response tail called into need next"
);
latch_inner_check(
block_synchronizer.forward.as_ref(),
true,
"all requests have been responded to, and the last event response should have \
resulted in a fresh need next being reported and thus a new latch",
);
for event in mock_reactor.process_effects(generated_effects).await {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && *id.block_hash() == expected_block_hash && id.era_id() == block.era_id()
);
}
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == expected_block_hash
);
}
#[tokio::test]
async fn next_action_for_have_weak_finality_is_fetching_block_body() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveWeakFinalitySignatures(header, _) if header.block_hash() == *block.hash()
);
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
MAX_SIMULTANEOUS_PEERS,
)
.await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id, *block.hash());
}
);
}
}
#[tokio::test]
async fn registering_block_body_transitions_builder_to_have_block_state() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveWeakFinalitySignatures(header, _) if header.block_hash() == *block.hash()
);
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
MAX_SIMULTANEOUS_PEERS,
)
.await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id, *block.hash());
}
);
}
block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockFetched(Ok(FetchedData::FromPeer {
item: Box::new(block.clone()),
peer: peers[0],
})),
);
assert_matches!(
block_synchronizer.forward_builder().block_acquisition_state(),
BlockAcquisitionState::HaveBlock(acquired_block, _, _) if acquired_block.hash() == block.hash()
);
}
#[tokio::test]
async fn fwd_having_block_body_for_block_without_deploys_requires_only_signatures() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlock(acquired_block, _, _) if acquired_block.hash() == block.hash()
);
let effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id.block_hash() == block.hash() && id.era_id() == block.era_id()
);
}
}
#[tokio::test]
async fn fwd_having_block_body_for_block_with_deploys_requires_approvals_hashes() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng).with_block(
TestBlockBuilder::new()
.era(1)
.random_transactions(1, &mut rng)
.build(&mut rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlock(acquired_block, _, _) if acquired_block.hash() == block.hash()
);
let events = need_next(
&mut rng,
&mock_reactor,
&mut block_synchronizer,
MAX_SIMULTANEOUS_PEERS,
)
.await;
for event in events {
if !matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == *block.hash()
) {
println!("peers: {:?}", peers);
println!("{}", block.hash());
println!("event: {:?}", event);
}
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == *block.hash()
);
}
}
#[tokio::test]
async fn fwd_registering_approvals_hashes_triggers_fetch_for_deploys() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let txns = [Transaction::random(&mut rng)];
let test_env = TestEnv::random(&mut rng).with_block(
TestBlockBuilder::new()
.era(1)
.transactions(txns.iter())
.build(&mut rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlock(acquired_block, _, _) if acquired_block.hash() == block.hash()
);
let approvals_hashes = ApprovalsHashes::new(
*block.hash(),
txns.iter()
.map(|txn| txn.compute_approvals_hash().unwrap())
.collect(),
dummy_merkle_proof(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::ApprovalsHashesFetched(Ok(FetchedData::FromPeer {
item: Box::new(approvals_hashes.clone()),
peer: peers[0],
})),
);
assert_eq!(effects.len(), MAX_SIMULTANEOUS_PEERS as usize);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::TransactionFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id, txns[0].compute_id());
}
);
}
}
#[tokio::test]
async fn fwd_have_block_body_without_deploys_and_strict_finality_transitions_state_machine() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys.iter(),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlock(acquired_block, _, _) if acquired_block.hash() == block.hash()
);
let mut effects = block_synchronizer.need_next(mock_reactor.effect_builder(), &mut rng);
assert_eq!(effects.len(), 1);
let fwd_builder = block_synchronizer
.forward
.as_ref()
.expect("Forward builder should have been initialized");
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveStrictFinalitySignatures(acquired_block, ..) if acquired_block.hash() == block.hash()
);
let events = effects.remove(0).await;
assert_eq!(events.len(), 1);
assert_matches!(
events[0],
Event::Request(BlockSynchronizerRequest::NeedNext)
);
}
#[tokio::test]
async fn fwd_have_block_with_strict_finality_requires_creation_of_finalized_block() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveBlock(acquired_block, _, _) if acquired_block.hash() == block.hash()
);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveStrictFinalitySignatures(acquired_block, ..) if acquired_block.hash() == block.hash()
);
let events = need_next(&mut rng, &mock_reactor, &mut block_synchronizer, 1).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::MakeBlockExecutableRequest(MakeBlockExecutableRequest {
block_hash,
..
}) if block_hash == *block.hash()
);
}
}
#[tokio::test]
async fn fwd_have_strict_finality_requests_enqueue_when_finalized_block_is_created() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveStrictFinalitySignatures(acquired_block, ..) if acquired_block.hash() == block.hash()
);
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == *block.hash()
);
let event = Event::MadeFinalizedBlock {
block_hash: *block.hash(),
result: Some(ExecutableBlock::from_block_and_transactions(
block.clone().try_into().expect("Expected a V2 block."),
Vec::new(),
)),
};
let effects = block_synchronizer.handle_event(mock_reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 1);
let events = mock_reactor.process_effects(effects).await;
let fwd_builder = block_synchronizer
.forward
.as_ref()
.expect("Forward builder should have been initialized");
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveExecutableBlock(actual_block, _, _) if *actual_block.hash() == *block.hash()
);
assert_matches!(
&events[0],
MockReactorEvent::StorageRequest(
StorageRequest::GetKeyBlockHeightForActivationPoint { .. }
)
);
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == *block.hash()
);
}
#[tokio::test]
async fn fwd_builder_status_is_executing_when_block_is_enqueued_for_execution() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveStrictFinalitySignatures(acquired_block, ..) if acquired_block.hash() == block.hash()
);
fwd_builder.register_made_executable_block(ExecutableBlock::from_block_and_transactions(
block.clone().try_into().expect("Expected a V2 block."),
Vec::new(),
));
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::HaveExecutableBlock(actual_block, _, _) if *actual_block.hash() == *block.hash()
);
let event = Event::MarkBlockExecutionEnqueued(*block.hash());
let effects = block_synchronizer.handle_event(mock_reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 0);
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Executing(block_hash, _, _) if block_hash == *block.hash()
);
}
#[tokio::test]
async fn fwd_sync_is_finished_when_block_is_marked_as_executed() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
fwd_builder.register_made_executable_block(ExecutableBlock::from_block_and_transactions(
block.clone().try_into().expect("Expected a V2 block."),
Vec::new(),
));
fwd_builder.register_block_execution_enqueued();
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Executing(block_hash, _, _) if block_hash == *block.hash()
);
let event = Event::MarkBlockExecuted(*block.hash());
let effects = block_synchronizer.handle_event(mock_reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 0);
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Synced(block_hash, _, _) if block_hash == *block.hash()
);
}
#[tokio::test]
async fn historical_sync_announces_meta_block() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.historical.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
assert!(historical_builder
.register_block_header(block.clone_header(), None)
.is_ok());
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
match historical_builder.block_acquisition_state() {
BlockAcquisitionState::HaveBlock(state_block, state_signatures, _) => historical_builder
.set_block_acquisition_state(BlockAcquisitionState::HaveStrictFinalitySignatures(
state_block.clone(),
state_signatures.clone(),
)),
other => panic!("Unexpected state: {:?}", other),
}
assert_matches!(
block_synchronizer.historical_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == *block.hash()
);
let event = Event::MarkBlockCompleted {
block_hash: *block.hash(),
is_new: true,
};
let effects = block_synchronizer.handle_event(mock_reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 1);
let mut events = mock_reactor.process_effects(effects).await;
match events.pop().unwrap() {
MockReactorEvent::StorageRequest(StorageRequest::GetExecutionResults {
block_hash: actual_block_hash,
responder,
}) => {
assert_eq!(actual_block_hash, *block.hash());
responder.respond(Some(vec![])).await;
}
other => panic!("Unexpected event: {:?}", other),
}
let event = mock_reactor.crank().await;
match event {
MockReactorEvent::MetaBlockAnnouncement(MetaBlockAnnouncement(mut meta_block)) => {
assert_eq!(meta_block.hash(), *block.hash());
assert!(meta_block
.mut_state()
.register_as_sent_to_transaction_buffer()
.was_updated());
}
other => panic!("Unexpected event: {:?}", other),
}
assert_matches!(
block_synchronizer.historical_progress(),
BlockSynchronizerProgress::Synced(block_hash, _, _) if block_hash == *block.hash()
);
}
#[test]
fn builders_are_purged_when_requested() {
let mut rng = TestRng::new();
let test_env = TestEnv::random(&mut rng);
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer
.register_block_by_hash(*TestBlockBuilder::new().build(&mut rng).hash(), true));
assert!(block_synchronizer.forward.is_some());
assert!(block_synchronizer.historical.is_some());
block_synchronizer.purge_historical();
assert!(block_synchronizer.forward.is_some());
assert!(block_synchronizer.historical.is_none());
assert!(block_synchronizer
.register_block_by_hash(*TestBlockBuilder::new().build(&mut rng).hash(), true));
assert!(block_synchronizer.forward.is_some());
assert!(block_synchronizer.historical.is_some());
block_synchronizer.purge_forward();
assert!(block_synchronizer.forward.is_none());
assert!(block_synchronizer.historical.is_some());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
assert!(block_synchronizer.historical.is_some());
block_synchronizer.purge();
assert!(block_synchronizer.forward.is_none());
assert!(block_synchronizer.historical.is_none());
}
#[tokio::test]
async fn synchronizer_halts_if_block_cannot_be_made_executable() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(&mut rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let fwd_builder = block_synchronizer
.forward
.as_mut()
.expect("Forward builder should have been initialized");
assert!(fwd_builder
.register_block_header(block.clone_header(), None)
.is_ok());
fwd_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(fwd_builder.register_block(block.clone(), None).is_ok());
register_multiple_signatures(
fwd_builder,
block,
validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
let events = need_next(&mut rng, &mock_reactor, &mut block_synchronizer, 1).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::MakeBlockExecutableRequest(MakeBlockExecutableRequest {
block_hash,
..
}) if block_hash == *block.hash()
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::MadeFinalizedBlock {
block_hash: *block.hash(),
result: None,
},
);
assert_eq!(effects.len(), 0);
let fwd_builder = block_synchronizer
.forward
.as_ref()
.expect("Forward builder should have been initialized");
assert_matches!(
fwd_builder.block_acquisition_state(),
BlockAcquisitionState::Failed(block_hash, _) if block_hash == block.hash()
);
assert_matches!(
block_synchronizer.forward_progress(),
BlockSynchronizerProgress::Syncing(block_hash, _, _) if block_hash == *block.hash()
);
}
fn historical_state(block_synchronizer: &BlockSynchronizer) -> &BlockAcquisitionState {
block_synchronizer
.historical
.as_ref()
.unwrap()
.block_acquisition_state()
}
#[tokio::test]
async fn historical_sync_skips_exec_results_and_deploys_if_block_empty() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let test_env = TestEnv::random(rng);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Strict);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.forward.is_none());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let events = need_next(rng, &mock_reactor, &mut block_synchronizer, 1).await;
let request = match events.try_one() {
Some(MockReactorEvent::SyncGlobalStateRequest(
request @ SyncGlobalStateRequest {
block_hash,
state_root_hash,
..
},
)) if block_hash == *block.hash() && &state_root_hash == block.state_root_hash() => request,
_ => panic!("there should be a unique event of type SyncGlobalStateRequest"),
};
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynchronizer(global_state_synchronizer::Event::Request(request)),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveBlock { .. }
);
let events = effects
.try_one()
.expect("there should be only one effect")
.await;
assert_matches!(
events.try_one(),
Some(Event::GlobalStateSynchronizer(
GlobalStateSynchronizerEvent::GetPeers(_)
))
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(event, MockReactorEvent::FinalitySignatureFetcherRequest(..));
}
}
#[tokio::test]
async fn historical_sync_no_legacy_block() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let txn = Transaction::random(rng);
let test_env = TestEnv::random(rng).with_block(
TestBlockBuilder::new()
.era(1)
.transactions(iter::once(&txn))
.build(rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Strict);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.forward.is_none());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let events = need_next(rng, &mock_reactor, &mut block_synchronizer, 1).await;
let request = match events.try_one() {
Some(MockReactorEvent::SyncGlobalStateRequest(
request @ SyncGlobalStateRequest {
block_hash,
state_root_hash,
..
},
)) if block_hash == *block.hash() && &state_root_hash == block.state_root_hash() => request,
_ => panic!("there should be a unique event of type SyncGlobalStateRequest"),
};
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynchronizer(global_state_synchronizer::Event::Request(request)),
);
let events = effects.one().await;
assert_matches!(
events.try_one(),
Some(Event::GlobalStateSynchronizer(
GlobalStateSynchronizerEvent::GetPeers(_)
))
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveBlock { .. }
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
let events = mock_reactor.process_effects(effects).await;
match events.try_one() {
Some(MockReactorEvent::ContractRuntimeRequest(
ContractRuntimeRequest::GetExecutionResultsChecksum {
state_root_hash,
responder,
},
)) => responder.respond(ExecutionResultsChecksumResult::Success { checksum: state_root_hash }).await,
other => panic!("Event should be of type `ContractRuntimeRequest(ContractRuntimeRequest::GetExecutionResultsChecksum) but it is {:?}", other),
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GotExecutionResultsChecksum {
block_hash: *block.hash(),
result: ExecutionResultsChecksumResult::Success {
checksum: Digest::SENTINEL_NONE,
},
},
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { .. })
);
}
let execution_results = BlockExecutionResultsOrChunk::new_mock_value(rng, *block.hash());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(execution_results))),
},
);
let mut events = mock_reactor.process_effects(effects).await;
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
assert_matches!(
events.remove(0),
MockReactorEvent::StorageRequest(StorageRequest::PutExecutionResults { .. })
);
for event in events {
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsStored(*block.hash()),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllExecutionResults(_, _, _, checksum) if checksum.is_checkable()
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ApprovalsHashesFetched(Ok(FetchedData::from_storage(Box::new(
ApprovalsHashes::new(
*block.hash(),
vec![txn.compute_approvals_hash().unwrap()],
dummy_merkle_proof(),
),
)))),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveApprovalsHashes(_, _, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::TransactionFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(txn)))),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllDeploys(_, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(event, MockReactorEvent::FinalitySignatureFetcherRequest(_));
}
}
#[tokio::test]
async fn historical_sync_legacy_block_strict_finality() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let deploy = Deploy::random(rng);
let test_env = TestEnv::random(rng).with_block(
TestBlockV1Builder::new()
.era(1)
.deploys(iter::once(&deploy.clone()))
.build(rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Strict);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.forward.is_none());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let events = need_next(rng, &mock_reactor, &mut block_synchronizer, 1).await;
let request = match events.try_one() {
Some(MockReactorEvent::SyncGlobalStateRequest(
request @ SyncGlobalStateRequest {
block_hash,
state_root_hash,
..
},
)) if block_hash == *block.hash() && &state_root_hash == block.state_root_hash() => request,
_ => panic!("there should be a unique event of type SyncGlobalStateRequest"),
};
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynchronizer(global_state_synchronizer::Event::Request(request)),
);
let events = effects.one().await;
assert_matches!(
events.try_one(),
Some(Event::GlobalStateSynchronizer(
GlobalStateSynchronizerEvent::GetPeers(_)
))
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveBlock { .. }
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
let events = mock_reactor.process_effects(effects).await;
match events.try_one() {
Some(MockReactorEvent::ContractRuntimeRequest(
ContractRuntimeRequest::GetExecutionResultsChecksum {
state_root_hash,
responder,
},
)) => responder.respond(ExecutionResultsChecksumResult::Success { checksum: state_root_hash }).await,
other => panic!("Event should be of type `ContractRuntimeRequest(ContractRuntimeRequest::GetExecutionResultsChecksum) but it is {:?}", other),
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GotExecutionResultsChecksum {
block_hash: *block.hash(),
result: ExecutionResultsChecksumResult::RegistryNotFound, },
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { .. })
);
}
let execution_results = BlockExecutionResultsOrChunk::new_mock_value(rng, *block.hash());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(execution_results))),
},
);
let mut events = mock_reactor.process_effects(effects).await;
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
assert_matches!(
events.remove(0),
MockReactorEvent::StorageRequest(StorageRequest::PutExecutionResults { .. })
);
for event in events {
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsStored(*block.hash()),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllExecutionResults(_, _, _, checksum)
if checksum.is_checkable() == false
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::LegacyDeployFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Left(Ok(FetchedData::from_storage(Box::new(deploy.into())))),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllDeploys(_, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(event, MockReactorEvent::FinalitySignatureFetcherRequest(_));
}
}
#[tokio::test]
async fn historical_sync_legacy_block_weak_finality() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let deploy = Deploy::random(rng);
let test_env = TestEnv::random(rng).with_block(
TestBlockV1Builder::new()
.era(1)
.deploys(iter::once(&deploy.clone()))
.build(rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Weak);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.forward.is_none());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let events = need_next(rng, &mock_reactor, &mut block_synchronizer, 1).await;
let request = match events.try_one() {
Some(MockReactorEvent::SyncGlobalStateRequest(
request @ SyncGlobalStateRequest {
block_hash,
state_root_hash,
..
},
)) if block_hash == *block.hash() && &state_root_hash == block.state_root_hash() => request,
_ => panic!("there should be a unique event of type SyncGlobalStateRequest"),
};
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynchronizer(global_state_synchronizer::Event::Request(request)),
);
let events = effects.one().await;
assert_matches!(
events.try_one(),
Some(Event::GlobalStateSynchronizer(
GlobalStateSynchronizerEvent::GetPeers(_)
))
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveBlock { .. }
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
let events = mock_reactor.process_effects(effects).await;
match events.try_one() {
Some(MockReactorEvent::ContractRuntimeRequest(
ContractRuntimeRequest::GetExecutionResultsChecksum {
state_root_hash,
responder,
},
)) => responder.respond(ExecutionResultsChecksumResult::Success { checksum: state_root_hash }).await,
other => panic!("Event should be of type `ContractRuntimeRequest(ContractRuntimeRequest::GetExecutionResultsChecksum) but it is {:?}", other),
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GotExecutionResultsChecksum {
block_hash: *block.hash(),
result: ExecutionResultsChecksumResult::RegistryNotFound, },
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { .. })
);
}
let execution_results = BlockExecutionResultsOrChunk::new_mock_value(rng, *block.hash());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(execution_results))),
},
);
let mut events = mock_reactor.process_effects(effects).await;
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
assert_matches!(
events.remove(0),
MockReactorEvent::StorageRequest(StorageRequest::PutExecutionResults { .. })
);
for event in events {
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsStored(*block.hash()),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllExecutionResults(_, _, _, checksum)
if checksum.is_checkable() == false
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::LegacyDeployFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Left(Ok(FetchedData::from_storage(Box::new(deploy.into())))),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveStrictFinalitySignatures(_, _)
);
let events = effects.one().await;
let event = match events.try_one() {
Some(event @ Event::Request(BlockSynchronizerRequest::NeedNext)) => event,
_ => panic!("Expected a NeedNext request here"),
};
let effects = block_synchronizer.handle_event(mock_reactor.effect_builder(), rng, event);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveStrictFinalitySignatures(_, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(event, MockReactorEvent::MarkBlockCompletedRequest(_));
}
}
#[tokio::test]
async fn historical_sync_legacy_block_any_finality() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let deploy = Deploy::random(rng);
let test_env = TestEnv::random(rng).with_block(
TestBlockV1Builder::new()
.era(1)
.deploys(iter::once(&deploy.clone()))
.build(rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Any);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
assert!(block_synchronizer.forward.is_none());
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys.iter().take(1),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let events = need_next(rng, &mock_reactor, &mut block_synchronizer, 1).await;
let request = match events.try_one() {
Some(MockReactorEvent::SyncGlobalStateRequest(
request @ SyncGlobalStateRequest {
block_hash,
state_root_hash,
..
},
)) if block_hash == *block.hash() && &state_root_hash == block.state_root_hash() => request,
_ => panic!("there should be a unique event of type SyncGlobalStateRequest"),
};
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynchronizer(global_state_synchronizer::Event::Request(request)),
);
let events = effects.one().await;
assert_matches!(
events.try_one(),
Some(Event::GlobalStateSynchronizer(
GlobalStateSynchronizerEvent::GetPeers(_)
))
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveBlock { .. }
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
let events = mock_reactor.process_effects(effects).await;
match events.try_one() {
Some(MockReactorEvent::ContractRuntimeRequest(
ContractRuntimeRequest::GetExecutionResultsChecksum {
state_root_hash,
responder,
},
)) => responder.respond(ExecutionResultsChecksumResult::Success { checksum: state_root_hash }).await,
other => panic!("Event should be of type `ContractRuntimeRequest(ContractRuntimeRequest::GetExecutionResultsChecksum) but it is {:?}", other),
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GotExecutionResultsChecksum {
block_hash: *block.hash(),
result: ExecutionResultsChecksumResult::RegistryNotFound, },
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { .. })
);
}
let execution_results = BlockExecutionResultsOrChunk::new_mock_value(rng, *block.hash());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(execution_results))),
},
);
let mut events = mock_reactor.process_effects(effects).await;
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
assert_matches!(
events.remove(0),
MockReactorEvent::StorageRequest(StorageRequest::PutExecutionResults { .. })
);
for event in events {
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsStored(*block.hash()),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllExecutionResults(_, _, _, checksum)
if checksum.is_checkable() == false
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::LegacyDeployFetcherRequest(FetcherRequest { .. })
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Left(Ok(FetchedData::from_storage(Box::new(deploy.into())))),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveStrictFinalitySignatures(_, _)
);
let events = effects.one().await;
let event = match events.try_one() {
Some(event @ Event::Request(BlockSynchronizerRequest::NeedNext)) => event,
_ => panic!("Expected a NeedNext request here"),
};
let effects = block_synchronizer.handle_event(mock_reactor.effect_builder(), rng, event);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveStrictFinalitySignatures(_, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(event, MockReactorEvent::MarkBlockCompletedRequest(_));
}
}
#[tokio::test]
async fn fwd_sync_latch_should_not_decrement_for_old_responses() {
let mut rng = TestRng::new();
let mock_reactor = MockReactor::new();
let txn = Transaction::random(&mut rng);
let test_env = TestEnv::random(&mut rng).with_block(
TestBlockBuilder::new()
.era(1)
.transactions(iter::once(&txn))
.build(&mut rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(&mut rng, validator_matrix, Config::default());
assert!(block_synchronizer.register_block_by_hash(*block.hash(), false));
assert!(block_synchronizer.forward.is_some());
{
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::Request(BlockSynchronizerRequest::NeedNext),
);
assert_eq!(effects.len(), 1);
let events = mock_reactor.process_effects(effects).await;
assert_matches!(
events[0],
MockReactorEvent::BlockAccumulatorRequest(BlockAccumulatorRequest::GetPeersForBlock {
block_hash,
..
}) if block_hash == *block.hash()
);
latch_inner_check(
block_synchronizer.forward.as_ref(),
true,
"should be latched waiting for peers",
);
}
{
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::AccumulatedPeers(*block.hash(), Some(peers.clone())),
);
let events = mock_reactor.process_effects(effects).await;
let mut peers_asked = Vec::new();
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockHeaderFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == *block.hash() => {
peers_asked.push(peer);
}
);
}
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no block header was received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockHeaderFetched(Ok(FetchedData::FromPeer {
item: Box::new(block.clone_header()),
peer: peers_asked[0],
})),
);
let events = mock_reactor.process_effects(effects).await;
let expected_latch_count = events.len() as u8;
let mut sigs_requested = Vec::new();
for event in events {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert_eq!(id.block_hash(), block.hash());
assert_eq!(id.era_id(), block.era_id());
sigs_requested.push((peer, id.public_key().clone()));
}
);
}
latch_count_check(
block_synchronizer.forward.as_ref(),
expected_latch_count,
format!(
"Latch count should be {} since no finality sigs were received.",
expected_latch_count
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockHeaderFetched(Ok(FetchedData::FromPeer {
item: Box::new(block.clone_header()),
peer: peers_asked[1],
})),
);
assert_eq!(effects.len(), 0);
latch_count_check(
block_synchronizer.forward.as_ref(),
expected_latch_count,
format!(
"Latch count should be {} since no finality sigs were received.",
expected_latch_count
)
.as_str(),
);
}
{
let mut generated_effects = Effects::new();
for secret_key in validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len()))
{
let signature = FinalitySignatureV2::create(
*block.hash(),
block.height(),
block.era_id(),
chain_name_hash,
secret_key.as_ref(),
);
assert!(signature.is_verified().is_ok());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::FinalitySignatureFetched(Ok(FetchedData::FromPeer {
item: Box::new(signature.into()),
peer: peers[2],
})),
);
generated_effects.extend(effects);
}
let events = mock_reactor
.process_effects(
generated_effects
.into_iter()
.rev()
.take(MAX_SIMULTANEOUS_PEERS as usize),
)
.await;
for event in events {
assert_matches!(
event,
MockReactorEvent::BlockFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id, *block.hash());
}
);
}
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no block was received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let mut generated_effects = Effects::new();
for secret_key in validators_secret_keys
.iter()
.skip(weak_finality_threshold(validators_secret_keys.len()))
.take(2)
{
let signature = FinalitySignatureV2::create(
*block.hash(),
block.height(),
block.era_id(),
chain_name_hash,
secret_key.as_ref(),
);
assert!(signature.is_verified().is_ok());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::FinalitySignatureFetched(Ok(FetchedData::FromPeer {
item: Box::new(signature.into()),
peer: peers[2],
})),
);
generated_effects.extend(effects);
}
assert_eq!(generated_effects.len(), 0);
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no block was received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
}
{
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockFetched(Ok(FetchedData::FromPeer {
item: Box::new(block.clone()),
peer: peers[0],
})),
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::ApprovalsHashesFetcherRequest(FetcherRequest {
id,
peer,
..
}) if peers.contains(&peer) && id == *block.hash()
);
}
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no approval hashes were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::BlockFetched(Ok(FetchedData::FromPeer {
item: Box::new(block.clone()),
peer: peers[1],
})),
);
assert_eq!(effects.len(), 0);
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no approval hashes were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
}
{
let approvals_hashes = ApprovalsHashes::new(
*block.hash(),
vec![txn.compute_approvals_hash().unwrap()],
dummy_merkle_proof(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::ApprovalsHashesFetched(Ok(FetchedData::FromPeer {
item: Box::new(approvals_hashes.clone()),
peer: peers[0],
})),
);
assert_eq!(effects.len(), MAX_SIMULTANEOUS_PEERS as usize);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::TransactionFetcherRequest(FetcherRequest {
id,
peer,
..
}) => {
assert!(peers.contains(&peer));
assert_eq!(id, txn.compute_id());
}
);
}
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no deploys were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::ApprovalsHashesFetched(Ok(FetchedData::FromPeer {
item: Box::new(approvals_hashes.clone()),
peer: peers[1],
})),
);
assert_eq!(effects.len(), 0);
latch_count_check(
block_synchronizer.forward.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no deploys were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
}
{
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(txn.clone())))),
},
);
let events = mock_reactor.process_effects(effects).await;
let expected_latch_count = events.len() as u8;
latch_count_check(
block_synchronizer.forward.as_ref(),
expected_latch_count,
format!(
"Latch count should be {} since no new signatures were received.",
expected_latch_count
)
.as_str(),
);
for event in events {
assert_matches!(
event,
MockReactorEvent::FinalitySignatureFetcherRequest(FetcherRequest {
id,
..
}) => {
assert_eq!(id.block_hash(), block.hash());
assert_eq!(id.era_id(), block.era_id());
}
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(txn.clone())))),
},
);
assert_eq!(effects.len(), 0);
latch_count_check(
block_synchronizer.forward.as_ref(),
expected_latch_count,
"Latch should not have changed since we did not receive a new signature yet.",
);
}
{
let mut generated_effects = Effects::new();
for secret_key in validators_secret_keys.iter().rev().take(
strict_finality_threshold(validators_secret_keys.len())
- weak_finality_threshold(validators_secret_keys.len()),
) {
let signature = FinalitySignatureV2::create(
*block.hash(),
block.height(),
block.era_id(),
chain_name_hash,
secret_key.as_ref(),
);
assert!(signature.is_verified().is_ok());
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
&mut rng,
Event::FinalitySignatureFetched(Ok(FetchedData::FromPeer {
item: Box::new(signature.into()),
peer: peers[2],
})),
);
generated_effects.extend(effects);
}
let events = mock_reactor
.process_effects(generated_effects.into_iter().rev().take(1))
.await;
for event in events {
assert_matches!(
event,
MockReactorEvent::MakeBlockExecutableRequest(MakeBlockExecutableRequest {
block_hash,
..
}) if block_hash == *block.hash()
);
}
latch_count_check(
block_synchronizer.forward.as_ref(),
1,
"Latch count should still be 1 since no FinalizedBlock was received.",
);
}
}
#[tokio::test]
async fn historical_sync_latch_should_not_decrement_for_old_deploy_fetch_responses() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let transactions: BTreeMap<_, _> = iter::repeat_with(|| {
let txn = Transaction::random(rng);
let hash = txn.hash();
(hash, txn)
})
.take(3)
.collect();
let test_env = TestEnv::random(rng).with_block(
TestBlockBuilder::new()
.era(1)
.transactions(transactions.values())
.build(rng)
.into(),
);
let block = test_env.block();
let block_v2: BlockV2 = block.clone().try_into().unwrap();
let first_txn = transactions
.get(block_v2.all_transactions().next().unwrap())
.unwrap();
let second_txn = transactions
.get(block_v2.all_transactions().nth(1).unwrap())
.unwrap();
let third_txn = transactions
.get(block_v2.all_transactions().nth(2).unwrap())
.unwrap();
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Strict);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GotExecutionResultsChecksum {
block_hash: *block.hash(),
result: ExecutionResultsChecksumResult::Success {
checksum: Digest::SENTINEL_NONE,
},
},
);
let execution_results =
BlockExecutionResultsOrChunk::new_mock_value_with_multiple_random_results(
rng,
*block.hash(),
3,
);
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(execution_results))),
},
);
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsStored(*block.hash()),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllExecutionResults(_, _, _, checksum)
if checksum.is_checkable() == true
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ApprovalsHashesFetched(Ok(FetchedData::from_storage(Box::new(
ApprovalsHashes::new(
*block.hash(),
vec![
first_txn.compute_approvals_hash().unwrap(),
second_txn.compute_approvals_hash().unwrap(),
third_txn.compute_approvals_hash().unwrap(),
],
dummy_merkle_proof(),
),
)))),
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveApprovalsHashes(_, _, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(
event,
MockReactorEvent::TransactionFetcherRequest(FetcherRequest { .. })
);
}
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no deploys were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(first_txn.clone())))),
},
);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::TransactionFetcherRequest(FetcherRequest { .. })
);
}
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since the node should ask for the second deploy.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(second_txn.clone())))),
},
);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::TransactionFetcherRequest(FetcherRequest { .. })
);
}
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since the node should ask for the third deploy.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
for _ in 1..MAX_SIMULTANEOUS_PEERS {
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(first_txn.clone())))),
},
);
assert_eq!(effects.len(), 0);
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
"Shouldn't decrement the latch since this was a late response",
);
}
for _ in 1..MAX_SIMULTANEOUS_PEERS {
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(second_txn.clone())))),
},
);
assert_eq!(effects.len(), 0);
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
"Shouldn't decrement the latch since this was a late response",
);
}
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::DeployFetched {
block_hash: *block.hash(),
result: Either::Right(Ok(FetchedData::from_storage(Box::new(third_txn.clone())))),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveAllDeploys(_, _)
);
let events = mock_reactor.process_effects(effects).await;
for event in events {
assert_matches!(event, MockReactorEvent::FinalitySignatureFetcherRequest(_));
}
}
#[tokio::test]
async fn historical_sync_latch_should_not_decrement_for_old_execution_results() {
let rng = &mut TestRng::new();
let mock_reactor = MockReactor::new();
let first_txn = Transaction::random(rng);
let second_txn = Transaction::random(rng);
let third_txn = Transaction::random(rng);
let test_env = TestEnv::random(rng).with_block(
TestBlockBuilder::new()
.era(1)
.transactions([first_txn, second_txn, third_txn].iter())
.build(rng)
.into(),
);
let peers = test_env.peers();
let block = test_env.block();
let validator_matrix = test_env.gen_validator_matrix();
let chain_name_hash = validator_matrix.chain_name_hash();
let validators_secret_keys = test_env.validator_keys();
let mut block_synchronizer =
BlockSynchronizer::new_initialized(rng, validator_matrix, Default::default())
.with_legacy_finality(LegacyRequiredFinality::Strict);
assert!(block_synchronizer.register_block_by_hash(*block.hash(), SHOULD_FETCH_EXECUTION_STATE));
block_synchronizer.register_peers(*block.hash(), peers.clone());
let historical_builder = block_synchronizer
.historical
.as_mut()
.expect("Historical builder should have been initialized");
historical_builder
.register_block_header(block.clone_header(), None)
.expect("header registration works");
historical_builder.register_era_validator_weights(&block_synchronizer.validator_matrix);
register_multiple_signatures(
historical_builder,
block,
validators_secret_keys
.iter()
.take(weak_finality_threshold(validators_secret_keys.len())),
chain_name_hash,
);
assert!(historical_builder
.register_block(block.clone(), None)
.is_ok());
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GlobalStateSynced {
block_hash: *block.hash(),
result: Ok(GlobalStateSynchronizerResponse::new(
global_state_synchronizer::RootHash::new(*block.state_root_hash()),
vec![],
)),
},
);
assert_matches!(
historical_state(&block_synchronizer),
BlockAcquisitionState::HaveGlobalState { .. }
);
latch_count_check(
block_synchronizer.historical.as_ref(),
1,
"Latch count should be 1 since we're waiting for execution results checksum.",
);
let execution_results =
BlockExecutionResultsOrChunk::new_mock_value_with_multiple_random_results(
rng,
*block.hash(),
100000, );
let checksum = assert_matches!(
execution_results.value(),
ValueOrChunk::ChunkWithProof(chunk) => chunk.proof().root_hash()
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::GotExecutionResultsChecksum {
block_hash: *block.hash(),
result: ExecutionResultsChecksumResult::Success { checksum },
},
);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { .. })
);
}
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no chunks of execution results were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(
execution_results.clone(),
))),
},
);
for event in mock_reactor.process_effects(effects).await {
assert_matches!(
event,
MockReactorEvent::BlockExecutionResultsOrChunkFetcherRequest(FetcherRequest { id, .. }) if id.chunk_index() != 0
);
}
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since no responses with chunks != 0 were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Ok(FetchedData::from_storage(Box::new(execution_results))),
},
);
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS,
format!(
"Latch count should be {} since we already had the first chunk and no responses with chunks != 0 were received.",
MAX_SIMULTANEOUS_PEERS
)
.as_str(),
);
let _effects = block_synchronizer.handle_event(
mock_reactor.effect_builder(),
rng,
Event::ExecutionResultsFetched {
block_hash: *block.hash(),
result: Err(FetcherError::Absent {
id: Box::new(BlockExecutionResultsOrChunkId::new(*block.hash())),
peer: peers[0],
}),
},
);
latch_count_check(
block_synchronizer.historical.as_ref(),
MAX_SIMULTANEOUS_PEERS - 1,
format!(
"Latch count should be {} since we received an `Absent` response.",
MAX_SIMULTANEOUS_PEERS - 1
)
.as_str(),
);
}