use super::*;
use crate::{
block_relay_protocol::BlockResponseError, mock::MockBlockDownloader,
service::network::NetworkServiceProvider,
};
use futures::{channel::oneshot::Canceled, executor::block_on};
use sc_block_builder::BlockBuilderBuilder;
use sc_network::RequestFailure;
use sc_network_common::sync::message::{BlockAnnounce, BlockData, BlockState, FromBlock};
use sp_blockchain::HeaderBackend;
use std::sync::Mutex;
use substrate_test_runtime_client::{
runtime::{Block, Hash, Header},
BlockBuilderExt, ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt, TestClient,
TestClientBuilder, TestClientBuilderExt,
};
#[derive(Debug)]
struct ProxyBlockDownloader {
protocol_name: ProtocolName,
sender: std::sync::mpsc::Sender<BlockRequest<Block>>,
request: Mutex<std::sync::mpsc::Receiver<BlockRequest<Block>>>,
}
#[async_trait::async_trait]
impl BlockDownloader<Block> for ProxyBlockDownloader {
fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
}
async fn download_blocks(
&self,
_who: PeerId,
request: BlockRequest<Block>,
) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, Canceled> {
self.sender.send(request).unwrap();
Ok(Ok((Vec::new(), self.protocol_name.clone())))
}
fn block_response_into_blocks(
&self,
_request: &BlockRequest<Block>,
_response: Vec<u8>,
) -> Result<Vec<BlockData<Block>>, BlockResponseError> {
Ok(Vec::new())
}
}
impl ProxyBlockDownloader {
fn new(protocol_name: ProtocolName) -> Self {
let (sender, receiver) = std::sync::mpsc::channel();
Self { protocol_name, sender, request: Mutex::new(receiver) }
}
fn next_request(&self) -> BlockRequest<Block> {
self.request.lock().unwrap().recv().unwrap()
}
}
#[test]
fn processes_empty_response_on_justification_request_for_unknown_block() {
let client = Arc::new(TestClientBuilder::new().build());
let peer_id = PeerId::random();
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
1,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let (a1_hash, a1_number) = {
let a1 = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
(a1.hash(), *a1.header.number())
};
sync.add_peer(peer_id, a1_hash, a1_number);
sync.request_justification(&a1_hash, a1_number);
assert!(sync
.justification_requests()
.iter()
.any(|(who, request)| { *who == peer_id && request.from == FromBlock::Hash(a1_hash) }));
assert_eq!(sync.extra_justifications.pending_requests().count(), 0);
assert!(sync.extra_justifications.active_requests().any(|(who, (hash, number))| {
*who == peer_id && *hash == a1_hash && *number == a1_number
}));
sync.on_block_justification(peer_id, BlockResponse::<Block> { id: 0, blocks: vec![] })
.unwrap();
assert_eq!(sync.extra_justifications.active_requests().count(), 0);
assert!(sync
.extra_justifications
.pending_requests()
.any(|(hash, number)| { *hash == a1_hash && *number == a1_number }));
}
#[test]
fn restart_doesnt_affect_peers_downloading_finality_data() {
let client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
1,
8,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
let peer_id3 = PeerId::random();
let new_blocks = |n| {
for _ in 0..n {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
}
let info = client.info();
(info.best_hash, info.best_number)
};
let (b1_hash, b1_number) = new_blocks(50);
sync.add_peer(peer_id1, Hash::random(), 42);
sync.add_peer(peer_id2, Hash::random(), 10);
let network_provider = NetworkServiceProvider::new();
let network_handle = network_provider.handle();
let actions = sync.actions(&network_handle).unwrap();
assert_eq!(actions.len(), 2);
assert!(actions.iter().all(|action| match action {
SyncingAction::StartRequest { peer_id, .. } => peer_id == &peer_id1 || peer_id == &peer_id2,
_ => false,
}));
sync.add_peer(peer_id3, b1_hash, b1_number);
sync.request_justification(&b1_hash, b1_number);
assert!(sync.justification_requests().iter().any(|(p, r)| {
*p == peer_id3 &&
r.fields == BlockAttributes::JUSTIFICATION &&
r.from == FromBlock::Hash(b1_hash)
}));
assert_eq!(
sync.peers.get(&peer_id3).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
let _ = sync.take_actions();
sync.restart();
let actions = sync.actions(&network_handle).unwrap();
assert_eq!(actions.len(), 4);
let mut cancelled_first = HashSet::new();
assert!(actions.iter().all(|action| match action {
SyncingAction::CancelRequest { peer_id, .. } => {
cancelled_first.insert(peer_id);
peer_id == &peer_id1 || peer_id == &peer_id2
},
SyncingAction::StartRequest { peer_id, .. } => {
assert!(cancelled_first.remove(peer_id));
peer_id == &peer_id1 || peer_id == &peer_id2
},
_ => false,
}));
assert_eq!(
sync.peers.get(&peer_id3).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
sync.peers.get_mut(&peer_id3).unwrap().common_number = 100;
sync.restart();
assert_eq!(sync.peers.get(&peer_id3).unwrap().common_number, 50);
}
fn send_block_announce(header: Header, peer_id: PeerId, sync: &mut ChainSync<Block, TestClient>) {
let announce = BlockAnnounce {
header: header.clone(),
state: Some(BlockState::Best),
data: Some(Vec::new()),
};
let _ = sync.on_validated_block_announce(true, peer_id, &announce);
}
fn create_block_response(blocks: Vec<Block>) -> BlockResponse<Block> {
BlockResponse::<Block> {
id: 0,
blocks: blocks
.into_iter()
.map(|b| BlockData::<Block> {
hash: b.hash(),
header: Some(b.header().clone()),
body: Some(b.deconstruct().1),
indexed_body: None,
receipt: None,
message_queue: None,
justification: None,
justifications: None,
})
.collect(),
}
}
fn get_block_request(
sync: &mut ChainSync<Block, TestClient>,
from: FromBlock<Hash, u64>,
max: u32,
peer: &PeerId,
) -> BlockRequest<Block> {
let requests = sync.block_requests();
log::trace!(target: LOG_TARGET, "Requests: {requests:?}");
assert_eq!(1, requests.len());
assert_eq!(*peer, requests[0].0);
let request = requests[0].1.clone();
assert_eq!(from, request.from);
assert_eq!(Some(max), request.max);
request
}
fn build_block(client: &TestClient, at: Option<Hash>, fork: bool) -> Block {
let at = at.unwrap_or_else(|| client.info().best_hash);
let mut block_builder = BlockBuilderBuilder::new(client)
.on_parent_block(at)
.fetch_parent_block_number(client)
.unwrap()
.build()
.unwrap();
if fork {
block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
}
let block = block_builder.build().unwrap().block;
block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
block
}
fn unwrap_from_block_number(from: FromBlock<Hash, u64>) -> u64 {
if let FromBlock::Number(from) = from {
from
} else {
panic!("Expected a number!");
}
}
#[test]
fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() {
sp_tracing::try_init_simple();
let blocks = {
let client = TestClientBuilder::new().build();
(0..MAX_DOWNLOAD_AHEAD * 2)
.map(|_| build_block(&client, None, false))
.collect::<Vec<_>>()
};
let client = Arc::new(TestClientBuilder::new().build());
let info = client.info();
let protocol_name = ProtocolName::Static("");
let proxy_block_downloader = Arc::new(ProxyBlockDownloader::new(protocol_name.clone()));
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
protocol_name,
proxy_block_downloader.clone(),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
let best_block = blocks.last().unwrap().clone();
let max_blocks_to_request = sync.max_blocks_per_request;
sync.add_peer(peer_id1, best_block.hash(), *best_block.header().number());
sync.add_peer(peer_id2, info.best_hash, 0);
let mut best_block_num = 0;
while best_block_num < MAX_DOWNLOAD_AHEAD {
let request = get_block_request(
&mut sync,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = blocks[best_block_num as usize..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let _ = sync.take_actions();
sync.on_block_data(&peer_id1, Some(request), response).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize,
));
best_block_num += max_blocks_to_request as u32;
let _ = sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id1),
)),
b.hash(),
)
})
.collect(),
);
resp_blocks
.into_iter()
.rev()
.for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap());
}
sync.queue_blocks.clear();
send_block_announce(best_block.header().clone(), peer_id2, &mut sync);
let block_requests = sync
.block_requests()
.into_iter()
.map(|(peer_id, request)| sync.create_block_request_action(peer_id, request))
.collect::<Vec<_>>();
sync.actions.extend(block_requests);
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 2);
let (mut peer1_req, mut peer2_req) = (None, None);
for action in actions {
match action {
SyncingAction::StartRequest { peer_id, request, .. } => {
block_on(request).unwrap().unwrap();
let req = proxy_block_downloader.next_request();
if peer_id == peer_id1 {
peer1_req = Some(req);
} else if peer_id == peer_id2 {
peer2_req = Some(req);
} else {
panic!("Unexpected peer: {peer_id}");
}
},
action => panic!("Unexpected action: {}", action.name()),
}
}
let peer2_req = peer2_req.unwrap();
assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from);
assert_eq!(Some(1), peer2_req.max);
let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]);
sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert!(actions.is_empty());
let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from);
get_block_request(
&mut sync,
FromBlock::Number(peer1_from + max_blocks_to_request as u64),
max_blocks_to_request as u32,
&peer_id2,
);
}
#[test]
fn can_sync_huge_fork() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4)
.map(|_| build_block(&client, None, false))
.collect::<Vec<_>>();
let fork_blocks = {
let client = TestClientBuilder::new().build();
let fork_blocks = blocks[..MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2]
.into_iter()
.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
.cloned()
.collect::<Vec<_>>();
fork_blocks
.into_iter()
.chain(
(0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 2 + 1).map(|_| build_block(&client, None, true)),
)
.collect::<Vec<_>>()
};
let info = client.info();
let protocol_name = ProtocolName::Static("");
let proxy_block_downloader = Arc::new(ProxyBlockDownloader::new(protocol_name.clone()));
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
protocol_name,
proxy_block_downloader.clone(),
false,
None,
std::iter::empty(),
)
.unwrap();
let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
let just = (*b"TEST", Vec::new());
client.finalize_block(finalized_block.hash(), Some(just)).unwrap();
sync.update_chain_info(&info.best_hash, info.best_number);
let peer_id1 = PeerId::random();
let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone();
sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number());
send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);
let mut actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let mut request = match actions.pop().unwrap() {
SyncingAction::StartRequest { request, .. } => {
block_on(request).unwrap().unwrap();
proxy_block_downloader.next_request()
},
action => panic!("Unexpected action: {}", action.name()),
};
assert_eq!(FromBlock::Number(info.best_number), request.from);
assert_eq!(Some(1), request.max);
loop {
let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
let mut actions = sync.take_actions().collect::<Vec<_>>();
request = if actions.is_empty() {
break;
} else {
assert_eq!(actions.len(), 1);
match actions.pop().unwrap() {
SyncingAction::StartRequest { request, .. } => {
block_on(request).unwrap().unwrap();
proxy_block_downloader.next_request()
},
action => panic!("Unexpected action: {}", action.name()),
}
};
log::trace!(target: LOG_TARGET, "Request: {request:?}");
}
let mut best_block_num = *finalized_block.header().number() as u32;
let max_blocks_to_request = sync.max_blocks_per_request;
while best_block_num < *fork_blocks.last().unwrap().header().number() as u32 - 1 {
let request = get_block_request(
&mut sync,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = fork_blocks[best_block_num as usize..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
sync.on_block_data(&peer_id1, Some(request), response).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize
));
best_block_num += sync.max_blocks_per_request as u32;
sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id1),
)),
b.hash(),
)
})
.collect(),
);
let _ = sync.take_actions();
resp_blocks
.into_iter()
.rev()
.for_each(|b| block_on(client.import(BlockOrigin::Own, b)).unwrap());
}
get_block_request(&mut sync, FromBlock::Hash(fork_blocks.last().unwrap().hash()), 1, &peer_id1);
}
#[test]
fn syncs_fork_without_duplicate_requests() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4)
.map(|_| build_block(&client, None, false))
.collect::<Vec<_>>();
let fork_blocks = {
let client = TestClientBuilder::new().build();
let fork_blocks = blocks[..MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2]
.into_iter()
.inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap())
.cloned()
.collect::<Vec<_>>();
fork_blocks
.into_iter()
.chain(
(0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 2 + 1).map(|_| build_block(&client, None, true)),
)
.collect::<Vec<_>>()
};
let info = client.info();
let protocol_name = ProtocolName::Static("");
let proxy_block_downloader = Arc::new(ProxyBlockDownloader::new(protocol_name.clone()));
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
protocol_name,
proxy_block_downloader.clone(),
false,
None,
std::iter::empty(),
)
.unwrap();
let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone();
let just = (*b"TEST", Vec::new());
client.finalize_block(finalized_block.hash(), Some(just)).unwrap();
sync.update_chain_info(&info.best_hash, info.best_number);
let peer_id1 = PeerId::random();
let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone();
sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number());
send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);
let mut actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let mut request = match actions.pop().unwrap() {
SyncingAction::StartRequest { request, .. } => {
block_on(request).unwrap().unwrap();
proxy_block_downloader.next_request()
},
action => panic!("Unexpected action: {}", action.name()),
};
assert_eq!(FromBlock::Number(info.best_number), request.from);
assert_eq!(Some(1), request.max);
loop {
let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1];
let response = create_block_response(vec![block.clone()]);
sync.on_block_data(&peer_id1, Some(request), response).unwrap();
let mut actions = sync.take_actions().collect::<Vec<_>>();
request = if actions.is_empty() {
break;
} else {
assert_eq!(actions.len(), 1);
match actions.pop().unwrap() {
SyncingAction::StartRequest { request, .. } => {
block_on(request).unwrap().unwrap();
proxy_block_downloader.next_request()
},
action => panic!("Unexpected action: {}", action.name()),
}
};
log::trace!(target: LOG_TARGET, "Request: {request:?}");
}
let mut best_block_num = *finalized_block.header().number() as u32;
let max_blocks_to_request = sync.max_blocks_per_request;
let mut request = get_block_request(
&mut sync,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
let last_block_num = *fork_blocks.last().unwrap().header().number() as u32 - 1;
while best_block_num < last_block_num {
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = fork_blocks[best_block_num as usize..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let _ = sync.take_actions();
sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize
));
best_block_num += max_blocks_to_request as u32;
if best_block_num < last_block_num {
request = get_block_request(
&mut sync,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
}
let mut notify_imported: Vec<_> = resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id1),
)),
b.hash(),
)
})
.collect();
let max_blocks_to_request = sync.max_blocks_per_request;
let second_batch = notify_imported.split_off(notify_imported.len() / 2);
let _ = sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
notify_imported,
);
let _ = sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
second_batch,
);
resp_blocks
.into_iter()
.rev()
.for_each(|b| block_on(client.import(BlockOrigin::Own, b)).unwrap());
}
get_block_request(&mut sync, FromBlock::Hash(fork_blocks.last().unwrap().hash()), 1, &peer_id1);
}
#[test]
fn removes_target_fork_on_disconnect() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..3).map(|_| build_block(&client, None, false)).collect::<Vec<_>>();
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
1,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
let common_block = blocks[1].clone();
sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number());
let mut header = blocks[0].header().clone();
header.number = 4;
send_block_announce(header, peer_id1, &mut sync);
assert!(sync.fork_targets.len() == 1);
let _ = sync.remove_peer(&peer_id1);
assert!(sync.fork_targets.len() == 0);
}
#[test]
fn can_import_response_with_missing_blocks() {
sp_tracing::try_init_simple();
let client2 = TestClientBuilder::new().build();
let blocks = (0..4).map(|_| build_block(&client2, None, false)).collect::<Vec<_>>();
let empty_client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
ChainSyncMode::Full,
empty_client.clone(),
1,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
let best_block = blocks[3].clone();
sync.add_peer(peer_id1, best_block.hash(), *best_block.header().number());
sync.peers.get_mut(&peer_id1).unwrap().state = PeerSyncState::Available;
sync.peers.get_mut(&peer_id1).unwrap().common_number = 0;
let request = get_block_request(&mut sync, FromBlock::Hash(best_block.hash()), 4, &peer_id1);
let response =
create_block_response(vec![blocks[3].clone(), blocks[2].clone(), blocks[1].clone()]);
sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap();
assert_eq!(sync.best_queued_number, 0);
let request = get_block_request(&mut sync, FromBlock::Number(1), 1, &peer_id1);
let response = create_block_response(vec![blocks[0].clone()]);
sync.on_block_data(&peer_id1, Some(request), response).unwrap();
assert_eq!(sync.best_queued_number, 4);
}
#[test]
fn ancestor_search_repeat() {
let state = AncestorSearchState::<Block>::BinarySearch(1, 3);
assert!(handle_ancestor_search_state(&state, 2, true).is_none());
}
#[test]
fn sync_restart_removes_block_but_not_justification_requests() {
let client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
1,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peers = vec![PeerId::random(), PeerId::random()];
let new_blocks = |n| {
for _ in 0..n {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(client.chain_info().best_hash)
.with_parent_block_number(client.chain_info().best_number)
.build()
.unwrap()
.build()
.unwrap()
.block;
block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
}
let info = client.info();
(info.best_hash, info.best_number)
};
let (b1_hash, b1_number) = new_blocks(50);
sync.add_peer(peers[0], Hash::random(), 42);
let mut pending_responses = HashSet::new();
for (peer, _request) in sync.block_requests() {
pending_responses.insert(peer);
}
sync.add_peer(peers[1], b1_hash, b1_number);
sync.request_justification(&b1_hash, b1_number);
let mut requests = sync.justification_requests();
assert_eq!(requests.len(), 1);
let (peer, _request) = requests.remove(0);
assert!(pending_responses.insert(peer));
assert!(!std::matches!(
sync.peers.get(&peers[0]).unwrap().state,
PeerSyncState::DownloadingJustification(_),
));
assert_eq!(
sync.peers.get(&peers[1]).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
assert_eq!(pending_responses.len(), 2);
let _ = sync.take_actions();
sync.restart();
let actions = sync.take_actions().collect::<Vec<_>>();
for action in actions.iter() {
match action {
SyncingAction::CancelRequest { peer_id, key: _ } => {
pending_responses.remove(&peer_id);
},
SyncingAction::StartRequest { peer_id, .. } => {
pending_responses.remove(&peer_id);
},
action @ _ => panic!("Unexpected action: {}", action.name()),
}
}
assert!(actions.iter().any(|action| {
match action {
SyncingAction::StartRequest { peer_id, .. } => peer_id == &peers[0],
_ => false,
}
}));
assert_eq!(pending_responses.len(), 1);
assert!(pending_responses.contains(&peers[1]));
assert_eq!(
sync.peers.get(&peers[1]).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
let _ = sync.remove_peer(&peers[1]);
pending_responses.remove(&peers[1]);
assert_eq!(pending_responses.len(), 0);
}
#[test]
#[should_panic]
fn request_across_forks() {
sp_tracing::try_init_simple();
let client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..100).map(|_| build_block(&client, None, false)).collect::<Vec<_>>();
let fork_a_blocks = {
let client = TestClientBuilder::new().build();
let mut fork_blocks = blocks[..]
.into_iter()
.inspect(|b| {
assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()
})
.cloned()
.collect::<Vec<_>>();
for _ in 0..10 {
fork_blocks.push(build_block(&client, None, false));
}
fork_blocks
};
let fork_b_blocks = {
let client = TestClientBuilder::new().build();
let mut fork_blocks = blocks[..]
.into_iter()
.inspect(|b| {
assert!(matches!(client.block(*b.header.parent_hash()), Ok(Some(_))));
block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()
})
.cloned()
.collect::<Vec<_>>();
for _ in 0..10 {
fork_blocks.push(build_block(&client, None, true));
}
fork_blocks
};
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let common_block = blocks.last().unwrap();
let peer_id1 = PeerId::random();
sync.add_peer(peer_id1, common_block.hash(), *common_block.header().number());
let peer_id2 = PeerId::random();
sync.add_peer(peer_id2, common_block.hash(), *common_block.header().number());
{
let block = (&fork_a_blocks[106]).clone();
let peer = peer_id1;
log::trace!(target: LOG_TARGET, "<1> {peer} announces from fork 1");
send_block_announce(block.header().clone(), peer, &mut sync);
let request = get_block_request(&mut sync, FromBlock::Hash(block.hash()), 7, &peer);
let mut resp_blocks = fork_a_blocks[100_usize..107_usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let _ = sync.take_actions();
sync.on_block_data(&peer, Some(request), response).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize
));
assert_eq!(sync.best_queued_number, 107);
assert_eq!(sync.best_queued_hash, block.hash());
assert!(sync.is_known(&block.header.parent_hash()));
}
{
let prev_best_number = sync.best_queued_number;
let prev_best_hash = sync.best_queued_hash;
let peer = peer_id2;
log::trace!(target: LOG_TARGET, "<2> {peer} announces from fork 1");
for i in 100..107 {
let block = (&fork_a_blocks[i]).clone();
send_block_announce(block.header().clone(), peer, &mut sync);
assert!(sync.block_requests().is_empty());
}
assert_eq!(sync.best_queued_number, prev_best_number);
assert_eq!(sync.best_queued_hash, prev_best_hash);
}
{
let block = (&fork_b_blocks[107]).clone();
let peer = peer_id2;
log::trace!(target: LOG_TARGET, "<3> {peer} announces from fork 2");
send_block_announce(block.header().clone(), peer, &mut sync);
let request = get_block_request(&mut sync, FromBlock::Hash(block.hash()), 1, &peer);
let response = create_block_response(vec![block.clone()]);
let _ = sync.take_actions();
sync.on_block_data(&peer, Some(request), response).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize
));
assert!(sync.is_known(&block.header.parent_hash()));
}
}
#[test]
fn sync_verification_failed_with_gap_filled() {
sp_tracing::try_init_simple();
const TEST_TARGET: u32 = 64 * 3;
let blocks = {
let client = TestClientBuilder::new().build();
(0..TEST_TARGET).map(|_| build_block(&client, None, false)).collect::<Vec<_>>()
};
let client = Arc::new(TestClientBuilder::new().build());
let info = client.info();
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
let best_block = blocks.last().unwrap().clone();
let max_blocks_to_request = sync.max_blocks_per_request;
let status = sync.status();
assert!(status.warp_sync.is_none());
log::info!(target: LOG_TARGET, "Before adding peers: {status:?}");
sync.add_peer(peer_id1, best_block.hash(), *best_block.header().number());
sync.add_peer(peer_id2, info.best_hash, 0);
let mut best_block_num = 0;
assert_eq!(sync.best_queued_number, 0);
for loop_index in 0..2 {
log::info!(target: LOG_TARGET, "Loop index: {loop_index}");
let request = get_block_request(
&mut sync,
FromBlock::Number(max_blocks_to_request as u64 + best_block_num as u64),
max_blocks_to_request as u32,
&peer_id1,
);
let from = unwrap_from_block_number(request.from.clone());
let mut resp_blocks = blocks[best_block_num as usize..from as usize].to_vec();
resp_blocks.reverse();
let response = create_block_response(resp_blocks.clone());
let _ = sync.take_actions();
let status = sync.status();
log::info!(target: LOG_TARGET, "Status before on_block_data: {status:?}");
sync.on_block_data(&peer_id1, Some(request.clone()), response.clone()).unwrap();
let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
assert!(matches!(
&actions[0],
SyncingAction::ImportBlocks{ origin: _, blocks } if blocks.len() ==
max_blocks_to_request as usize, ));
let status = sync.status();
log::info!(target: LOG_TARGET, "Status before processing blocks: {status:?}");
best_block_num += max_blocks_to_request as u32;
let responses: Vec<_> = resp_blocks
.iter()
.rev()
.map(|b| {
(
Ok(BlockImportStatus::ImportedUnknown(
*b.header().number(),
Default::default(),
Some(peer_id1),
)),
b.hash(),
)
})
.collect();
sync.on_blocks_processed(
max_blocks_to_request as usize,
max_blocks_to_request as usize,
responses,
);
let status = sync.status();
log::info!(target: LOG_TARGET, "Status after processing blocks: {status:?}");
resp_blocks
.into_iter()
.rev()
.for_each(|b| block_on(client.import_as_final(BlockOrigin::Own, b)).unwrap());
if loop_index == 0 {
log::info!(target: LOG_TARGET, "Peer state {:#?}", sync.peers);
match sync.peers.get(&peer_id1) {
Some(peer) => assert_eq!(peer.state, PeerSyncState::Available),
None => panic!("Peer not found"),
}
match sync.peers.get(&peer_id2) {
Some(peer) => assert_eq!(peer.state, PeerSyncState::Available),
None => panic!("Peer not found"),
}
sync.gap_sync = Some(GapSync {
best_queued_number: 64 as u64,
target: 84 as u64,
blocks: BlockCollection::new(),
stats: GapSyncStats::new(),
});
} else if loop_index == 1 {
if sync.gap_sync.is_none() {
log::info!(target: LOG_TARGET, "Gap successfully closed");
} else {
panic!("Gap not closed after the second loop");
}
}
}
}
#[test]
fn sync_gap_filled_regardless_of_blocks_origin() {
sp_tracing::try_init_simple();
let blocks = {
let client = TestClientBuilder::new().build();
(0..2).map(|_| build_block(&client, None, false)).collect::<Vec<_>>()
};
let client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
{
sync.gap_sync = Some(GapSync {
best_queued_number: *blocks[0].header().number(),
target: *blocks[0].header().number(),
blocks: BlockCollection::new(),
stats: GapSyncStats::new(),
});
let results = [(
Ok(BlockImportStatus::ImportedUnknown(
*blocks[0].header().number(),
Default::default(),
Some(peer_id1),
)),
blocks[0].hash(),
)];
sync.on_blocks_processed(1, 1, results.into_iter().collect());
assert!(sync.gap_sync.is_none());
}
{
sync.gap_sync = Some(GapSync {
best_queued_number: *blocks[0].header().number(),
target: *blocks[0].header().number(),
blocks: BlockCollection::new(),
stats: GapSyncStats::new(),
});
let results = [(
Ok(BlockImportStatus::ImportedKnown(*blocks[0].header().number(), Some(peer_id1))),
blocks[0].hash(),
)];
sync.on_blocks_processed(1, 1, results.into_iter().collect());
assert!(sync.gap_sync.is_none());
}
}
#[test]
fn gap_sync_body_request_depends_on_pruning_mode() {
sp_tracing::try_init_simple();
for archive_blocks in [true, false] {
let should_request_bodies = archive_blocks;
log::info!("Testing gap sync with archive_blocks: {}", archive_blocks);
let client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..10).map(|_| build_block(&client, None, false)).collect::<Vec<_>>();
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
archive_blocks,
None,
std::iter::empty(),
)
.unwrap();
let peer_id = PeerId::random();
sync.gap_sync = Some(GapSync {
best_queued_number: 5,
target: 10,
blocks: BlockCollection::new(),
stats: GapSyncStats::new(),
});
sync.add_peer(peer_id, blocks[9].hash(), 10);
let requests = sync.block_requests();
assert!(
!requests.is_empty(),
"[archive_blocks={archive_blocks}] Should generate gap sync request"
);
let (_peer, request) = &requests[0];
let expected_fields = if should_request_bodies {
BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
} else {
BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION
};
assert_eq!(
request.fields, expected_fields,
"[archive_blocks={archive_blocks}] Gap sync fields mismatch: expected {expected_fields:?}, got {:?}",
request.fields
);
}
}
#[test]
fn regular_sync_always_requests_bodies_regardless_of_pruning() {
sp_tracing::try_init_simple();
for archive_blocks in [true, false] {
log::info!("Testing regular sync with archive_blocks: {}", archive_blocks);
let client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..5).map(|_| build_block(&client, None, false)).collect::<Vec<_>>();
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
archive_blocks,
None,
std::iter::empty(),
)
.unwrap();
let peer_id = PeerId::random();
assert!(
sync.gap_sync.is_none(),
"[archive_blocks={archive_blocks}] Should not have gap sync active"
);
sync.add_peer(peer_id, blocks[4].hash(), 5);
let requests = sync.block_requests();
if !requests.is_empty() {
let (_peer, request) = &requests[0];
let expected_fields =
BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION;
assert_eq!(
request.fields, expected_fields,
"[archive_blocks={archive_blocks}] Regular sync fields mismatch: expected {expected_fields:?}, got {:?}",
request.fields
);
}
}
}
#[test]
fn no_ancestry_search_during_major_sync() {
sp_tracing::try_init_simple();
let (blocks, fork_block) = {
let client = TestClientBuilder::new().build();
let blocks = (0..MAX_DOWNLOAD_AHEAD * 2)
.map(|_| build_block(&client, None, false))
.collect::<Vec<_>>();
let fork_block = build_block(&client, Some(blocks[blocks.len() - 2].hash()), true);
(blocks, fork_block)
};
let client = Arc::new(TestClientBuilder::new().build());
for b in &blocks[..10] {
block_on(client.import(BlockOrigin::Own, b.clone())).unwrap();
}
let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();
let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
let best_block = blocks.last().unwrap().clone();
let best_block_num = *best_block.header().number();
sync.add_peer(peer_id1, best_block.hash(), best_block_num);
assert!(matches!(
sync.peers.get(&peer_id1).unwrap().state,
PeerSyncState::AncestorSearch { .. }
));
for block in &blocks[..MAJOR_SYNC_BLOCKS as usize + 1] {
sync.queue_blocks.insert(block.hash());
}
sync.add_peer(peer_id2, best_block.hash(), best_block_num);
assert_eq!(sync.peers.get(&peer_id2).unwrap().state, PeerSyncState::Available);
assert_ne!(fork_block.header().parent_hash(), &best_block.hash());
let announce = BlockAnnounce {
header: fork_block.header().clone(),
state: Some(BlockState::Best),
data: Some(Vec::new()),
};
let _ = sync.on_validated_block_announce(true, peer_id2, &announce);
assert!(
!matches!(sync.peers.get(&peer_id2).unwrap().state, PeerSyncState::AncestorSearch { .. }),
"Peer should not be in AncestorSearch during major sync — this would stall sync!",
);
let actions = sync.take_actions().collect::<Vec<_>>();
for action in &actions {
if let SyncingAction::StartRequest { peer_id, .. } = action {
assert_ne!(*peer_id, peer_id2, "No request should be sent to peer2 during major sync",);
}
}
}