use super::*;
use crate::{
components::block_synchronizer::tests::test_utils::test_chunks_with_proof,
reactor::{EventQueueHandle, QueueKind, Scheduler},
types::ValueOrChunk,
utils,
};
use casper_types::testing::TestRng;
use futures::channel::oneshot;
#[derive(Debug)]
enum ReactorEvent {
FetcherRequest(FetcherRequest<TrieOrChunk>),
PeerBehaviorAnnouncement(#[allow(dead_code)] PeerBehaviorAnnouncement),
}
impl From<PeerBehaviorAnnouncement> for ReactorEvent {
fn from(req: PeerBehaviorAnnouncement) -> ReactorEvent {
ReactorEvent::PeerBehaviorAnnouncement(req)
}
}
impl From<FetcherRequest<TrieOrChunk>> for ReactorEvent {
fn from(req: FetcherRequest<TrieOrChunk>) -> ReactorEvent {
ReactorEvent::FetcherRequest(req)
}
}
struct MockReactor {
scheduler: &'static Scheduler<ReactorEvent>,
effect_builder: EffectBuilder<ReactorEvent>,
}
impl MockReactor {
fn new() -> Self {
let scheduler = utils::leak(Scheduler::new(QueueKind::weights(), None));
let event_queue_handle = EventQueueHandle::without_shutdown(scheduler);
let effect_builder = EffectBuilder::new(event_queue_handle);
MockReactor {
scheduler,
effect_builder,
}
}
fn effect_builder(&self) -> EffectBuilder<ReactorEvent> {
self.effect_builder
}
async fn expect_fetch_event(&self, chunk_id: &TrieOrChunkId, peer: &NodeId) {
let ((_ancestor, reactor_event), _) = self.scheduler.pop().await;
match reactor_event {
ReactorEvent::FetcherRequest(request) => {
assert_eq!(request.id, *chunk_id);
assert_eq!(request.peer, *peer);
}
_ => {
unreachable!();
}
};
}
}
async fn download_chunk_and_check(
reactor: &MockReactor,
trie_accumulator: &mut TrieAccumulator,
chunk_to_download: &TrieOrChunkId,
peer: &NodeId,
partial_chunks: PartialChunks,
) {
let mut effects = trie_accumulator.try_download_chunk(
reactor.effect_builder(),
*chunk_to_download,
*peer,
partial_chunks,
);
assert_eq!(effects.len(), 1);
tokio::spawn(async move { effects.remove(0).await });
reactor.expect_fetch_event(chunk_to_download, peer).await;
}
#[test]
fn unsolicited_chunk_produces_no_effects() {
let reactor = MockReactor::new();
let mut trie_accumulator = TrieAccumulator::new();
let (test_chunks, _, _) = test_chunks_with_proof(1);
let effects = trie_accumulator.consume_chunk(reactor.effect_builder(), test_chunks[0].clone());
assert!(effects.is_empty());
}
#[tokio::test]
async fn try_download_chunk_generates_fetch_effect() {
let mut rng = TestRng::new();
let reactor = MockReactor::new();
let mut trie_accumulator = TrieAccumulator::new();
let (_, chunk_ids, _) = test_chunks_with_proof(1);
let peer = NodeId::random(&mut rng);
let chunks = PartialChunks {
peers: vec![peer],
responders: Default::default(),
chunks: Default::default(),
unreliable_peers: Default::default(),
};
download_chunk_and_check(
&reactor,
&mut trie_accumulator,
&chunk_ids[0],
&peer,
chunks,
)
.await;
}
#[tokio::test]
async fn failed_fetch_retriggers_download_with_different_peer() {
let mut rng = TestRng::new();
let reactor = MockReactor::new();
let mut trie_accumulator = TrieAccumulator::new();
let (_, chunk_ids, _) = test_chunks_with_proof(1);
let peers: Vec<NodeId> = (0..2).map(|_| NodeId::random(&mut rng)).collect();
let chunks = PartialChunks {
peers: peers.clone(),
responders: Default::default(),
chunks: Default::default(),
unreliable_peers: Default::default(),
};
download_chunk_and_check(
&reactor,
&mut trie_accumulator,
&chunk_ids[0],
&peers[1],
chunks,
)
.await;
let fetch_result: FetchResult<TrieOrChunk> = Err(FetcherError::TimedOut {
id: Box::new(chunk_ids[0]),
peer: peers[1],
});
let event = Event::TrieOrChunkFetched {
id: chunk_ids[0],
fetch_result,
};
let mut effects = trie_accumulator.handle_event(reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 1);
tokio::spawn(async move { effects.remove(0).await });
reactor.expect_fetch_event(&chunk_ids[0], &peers[0]).await;
}
#[tokio::test]
async fn fetched_chunk_triggers_download_of_missing_chunk() {
let mut rng = TestRng::new();
let reactor = MockReactor::new();
let mut trie_accumulator = TrieAccumulator::new();
let (test_chunks, chunk_ids, _) = test_chunks_with_proof(2);
let peer = NodeId::random(&mut rng);
let chunks = PartialChunks {
peers: vec![peer],
responders: Default::default(),
chunks: Default::default(),
unreliable_peers: Default::default(),
};
download_chunk_and_check(
&reactor,
&mut trie_accumulator,
&chunk_ids[1],
&peer,
chunks,
)
.await;
let chunk = Box::new(ValueOrChunk::ChunkWithProof(test_chunks[1].clone()));
let fetch_result: FetchResult<TrieOrChunk> = Ok(FetchedData::FromPeer { peer, item: chunk });
let event = Event::TrieOrChunkFetched {
id: chunk_ids[1],
fetch_result,
};
let mut effects = trie_accumulator.handle_event(reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 1);
tokio::spawn(async move { effects.remove(0).await });
reactor.expect_fetch_event(&chunk_ids[0], &peer).await;
}
#[tokio::test]
async fn trie_returned_when_all_chunks_fetched() {
let mut rng = TestRng::new();
let reactor = MockReactor::new();
let mut trie_accumulator = TrieAccumulator::new();
let (test_chunks, chunk_ids, data) = test_chunks_with_proof(3);
let peer = NodeId::random(&mut rng);
let (sender, receiver) = oneshot::channel();
let responder = Responder::without_shutdown(sender);
let chunks = PartialChunks {
peers: vec![peer],
responders: vec![responder],
chunks: Default::default(),
unreliable_peers: Default::default(),
};
download_chunk_and_check(
&reactor,
&mut trie_accumulator,
&chunk_ids[0],
&peer,
chunks,
)
.await;
let mut effects = Effects::new();
for i in 0..3 {
let fetch_result: FetchResult<TrieOrChunk> = Ok(FetchedData::FromPeer {
peer,
item: Box::new(ValueOrChunk::ChunkWithProof(test_chunks[i].clone())),
});
let event = Event::TrieOrChunkFetched {
id: chunk_ids[i],
fetch_result,
};
effects = trie_accumulator.handle_event(reactor.effect_builder(), &mut rng, event);
assert_eq!(effects.len(), 1);
}
tokio::spawn(async move { effects.remove(0).await });
let result_trie = receiver.await.unwrap().expect("Expected trie").trie;
assert_eq!(*result_trie, TrieRaw::new(Bytes::from(data)));
}