1pub use sp_consensus_grandpa::{AuthorityList, SetId};
22
23use crate::{
24 block_relay_protocol::{BlockDownloader, BlockResponseError},
25 service::network::NetworkServiceHandle,
26 strategy::{
27 chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
28 SyncingAction,
29 },
30 types::{BadPeer, SyncState, SyncStatus},
31 LOG_TARGET,
32};
33use codec::{Decode, Encode};
34use futures::{channel::oneshot, FutureExt};
35use log::{debug, error, trace, warn};
36use sc_network::{IfDisconnected, ProtocolName};
37use sc_network_common::sync::message::{
38 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
39};
40use sc_network_types::PeerId;
41use sp_blockchain::HeaderBackend;
42use sp_runtime::{
43 traits::{Block as BlockT, Header, NumberFor, Zero},
44 Justifications, SaturatedConversion,
45};
46use std::{any::Any, collections::HashMap, fmt, sync::Arc};
47
48const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
50
51pub struct EncodedProof(pub Vec<u8>);
53
54#[derive(Encode, Decode, Debug, Clone)]
56pub struct WarpProofRequest<B: BlockT> {
57 pub begin: B::Hash,
59}
60
61pub enum VerificationResult<Block: BlockT> {
63 Partial(SetId, AuthorityList, Block::Hash),
65 Complete(SetId, AuthorityList, Block::Header),
67}
68
69pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
71 fn generate(
74 &self,
75 start: Block::Hash,
76 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
77 fn verify(
79 &self,
80 proof: &EncodedProof,
81 set_id: SetId,
82 authorities: AuthorityList,
83 ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
84 fn current_authorities(&self) -> AuthorityList;
87}
88
89mod rep {
90 use sc_network::ReputationChange as Rep;
91
92 pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
94
95 pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
97
98 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
100
101 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
103
104 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
106
107 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
109}
110
111#[derive(Clone, Eq, PartialEq, Debug)]
113pub enum WarpSyncPhase<Block: BlockT> {
114 AwaitingPeers { required_peers: usize },
116 DownloadingWarpProofs,
118 DownloadingTargetBlock,
120 DownloadingState,
122 ImportingState,
124 DownloadingBlocks(NumberFor<Block>),
126 Complete,
128}
129
130impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
131 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132 match self {
133 Self::AwaitingPeers { required_peers } =>
134 write!(f, "Waiting for {required_peers} peers to be connected"),
135 Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
136 Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
137 Self::DownloadingState => write!(f, "Downloading state"),
138 Self::ImportingState => write!(f, "Importing state"),
139 Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
140 Self::Complete => write!(f, "Warp sync is complete"),
141 }
142 }
143}
144
145#[derive(Clone, Eq, PartialEq, Debug)]
147pub struct WarpSyncProgress<Block: BlockT> {
148 pub phase: WarpSyncPhase<Block>,
150 pub total_bytes: u64,
152}
153
154pub enum WarpSyncConfig<Block: BlockT> {
156 WithProvider(Arc<dyn WarpSyncProvider<Block>>),
158 WithTarget(<Block as BlockT>::Header),
162}
163
164enum Phase<B: BlockT> {
166 WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
168 WarpProof {
170 set_id: SetId,
171 authorities: AuthorityList,
172 last_hash: B::Hash,
173 warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
174 },
175 TargetBlock(B::Header),
177 Complete,
179}
180
181enum PeerState {
182 Available,
183 DownloadingProofs,
184 DownloadingTargetBlock,
185}
186
187impl PeerState {
188 fn is_available(&self) -> bool {
189 matches!(self, PeerState::Available)
190 }
191}
192
193struct Peer<B: BlockT> {
194 best_number: NumberFor<B>,
195 state: PeerState,
196}
197
198pub struct WarpSyncResult<B: BlockT> {
199 pub target_header: B::Header,
200 pub target_body: Option<Vec<B::Extrinsic>>,
201 pub target_justifications: Option<Justifications>,
202}
203
204pub struct WarpSync<B: BlockT, Client> {
206 phase: Phase<B>,
207 client: Arc<Client>,
208 total_proof_bytes: u64,
209 total_state_bytes: u64,
210 peers: HashMap<PeerId, Peer<B>>,
211 disconnected_peers: DisconnectedPeers,
212 protocol_name: Option<ProtocolName>,
213 block_downloader: Arc<dyn BlockDownloader<B>>,
214 actions: Vec<SyncingAction<B>>,
215 result: Option<WarpSyncResult<B>>,
216 min_peers_to_start_warp_sync: usize,
218}
219
220impl<B, Client> WarpSync<B, Client>
221where
222 B: BlockT,
223 Client: HeaderBackend<B> + 'static,
224{
225 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
227
228 pub fn new(
232 client: Arc<Client>,
233 warp_sync_config: WarpSyncConfig<B>,
234 protocol_name: Option<ProtocolName>,
235 block_downloader: Arc<dyn BlockDownloader<B>>,
236 min_peers_to_start_warp_sync: Option<usize>,
237 ) -> Self {
238 let min_peers_to_start_warp_sync =
239 min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
240 if client.info().finalized_state.is_some() {
241 error!(
242 target: LOG_TARGET,
243 "Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
244 );
245 return Self {
246 client,
247 phase: Phase::Complete,
248 total_proof_bytes: 0,
249 total_state_bytes: 0,
250 peers: HashMap::new(),
251 disconnected_peers: DisconnectedPeers::new(),
252 protocol_name,
253 block_downloader,
254 actions: vec![SyncingAction::Finished],
255 result: None,
256 min_peers_to_start_warp_sync,
257 }
258 }
259
260 let phase = match warp_sync_config {
261 WarpSyncConfig::WithProvider(warp_sync_provider) =>
262 Phase::WaitingForPeers { warp_sync_provider },
263 WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
264 };
265
266 Self {
267 client,
268 phase,
269 total_proof_bytes: 0,
270 total_state_bytes: 0,
271 peers: HashMap::new(),
272 disconnected_peers: DisconnectedPeers::new(),
273 protocol_name,
274 block_downloader,
275 actions: Vec::new(),
276 result: None,
277 min_peers_to_start_warp_sync,
278 }
279 }
280
281 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
283 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
284
285 self.try_to_start_warp_sync();
286 }
287
288 pub fn remove_peer(&mut self, peer_id: &PeerId) {
290 if let Some(state) = self.peers.remove(peer_id) {
291 if !state.state.is_available() {
292 if let Some(bad_peer) =
293 self.disconnected_peers.on_disconnect_during_request(*peer_id)
294 {
295 self.actions.push(SyncingAction::DropPeer(bad_peer));
296 }
297 }
298 }
299 }
300
301 #[must_use]
305 pub fn on_validated_block_announce(
306 &mut self,
307 is_best: bool,
308 peer_id: PeerId,
309 announce: &BlockAnnounce<B::Header>,
310 ) -> Option<(B::Hash, NumberFor<B>)> {
311 is_best.then(|| {
312 let best_number = *announce.header.number();
313 let best_hash = announce.header.hash();
314 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
315 peer.best_number = best_number;
316 }
317 (best_hash, best_number)
319 })
320 }
321
322 fn try_to_start_warp_sync(&mut self) {
324 let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
325
326 if self.peers.len() < self.min_peers_to_start_warp_sync {
327 return
328 }
329
330 self.phase = Phase::WarpProof {
331 set_id: 0,
332 authorities: warp_sync_provider.current_authorities(),
333 last_hash: self.client.info().genesis_hash,
334 warp_sync_provider: Arc::clone(warp_sync_provider),
335 };
336 trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
337 }
338
339 pub fn on_generic_response(
340 &mut self,
341 peer_id: &PeerId,
342 protocol_name: ProtocolName,
343 response: Box<dyn Any + Send>,
344 ) {
345 if &protocol_name == self.block_downloader.protocol_name() {
346 let Ok(response) = response
347 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
348 else {
349 warn!(target: LOG_TARGET, "Failed to downcast block response");
350 debug_assert!(false);
351 return;
352 };
353
354 let (request, response) = *response;
355 let blocks = match response {
356 Ok(blocks) => blocks,
357 Err(BlockResponseError::DecodeFailed(e)) => {
358 debug!(
359 target: LOG_TARGET,
360 "Failed to decode block response from peer {:?}: {:?}.",
361 peer_id,
362 e
363 );
364 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
365 return;
366 },
367 Err(BlockResponseError::ExtractionFailed(e)) => {
368 debug!(
369 target: LOG_TARGET,
370 "Failed to extract blocks from peer response {:?}: {:?}.",
371 peer_id,
372 e
373 );
374 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
375 return;
376 },
377 };
378
379 self.on_block_response(*peer_id, request, blocks);
380 } else {
381 let Ok(response) = response.downcast::<Vec<u8>>() else {
382 warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
383 debug_assert!(false);
384 return;
385 };
386
387 self.on_warp_proof_response(peer_id, EncodedProof(*response));
388 }
389 }
390
391 pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
393 if let Some(peer) = self.peers.get_mut(peer_id) {
394 peer.state = PeerState::Available;
395 }
396
397 let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
398 &mut self.phase
399 else {
400 debug!(target: LOG_TARGET, "Unexpected warp proof response");
401 self.actions
402 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
403 return
404 };
405
406 match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
407 Err(e) => {
408 debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
409 self.actions
410 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
411 },
412 Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash)) => {
413 log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
414 *set_id = new_set_id;
415 *authorities = new_authorities;
416 *last_hash = new_last_hash;
417 self.total_proof_bytes += response.0.len() as u64;
418 },
419 Ok(VerificationResult::Complete(new_set_id, _, header)) => {
420 log::debug!(
421 target: LOG_TARGET,
422 "Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
423 new_set_id,
424 header.hash(),
425 header.number(),
426 );
427 self.total_proof_bytes += response.0.len() as u64;
428 self.phase = Phase::TargetBlock(header);
429 },
430 }
431 }
432
433 pub fn on_block_response(
435 &mut self,
436 peer_id: PeerId,
437 request: BlockRequest<B>,
438 blocks: Vec<BlockData<B>>,
439 ) {
440 if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
441 self.actions.push(SyncingAction::DropPeer(bad_peer));
442 }
443 }
444
445 fn on_block_response_inner(
446 &mut self,
447 peer_id: PeerId,
448 request: BlockRequest<B>,
449 mut blocks: Vec<BlockData<B>>,
450 ) -> Result<(), BadPeer> {
451 if let Some(peer) = self.peers.get_mut(&peer_id) {
452 peer.state = PeerState::Available;
453 }
454
455 let Phase::TargetBlock(header) = &mut self.phase else {
456 debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
457 return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
458 };
459
460 if blocks.is_empty() {
461 debug!(
462 target: LOG_TARGET,
463 "Downloading target block failed: empty block response from {peer_id}",
464 );
465 return Err(BadPeer(peer_id, rep::NO_BLOCK))
466 }
467
468 if blocks.len() > 1 {
469 debug!(
470 target: LOG_TARGET,
471 "Too many blocks ({}) in warp target block response from {peer_id}",
472 blocks.len(),
473 );
474 return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
475 }
476
477 validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
478
479 let block = blocks.pop().expect("`blocks` len checked above; qed");
480
481 let Some(block_header) = &block.header else {
482 debug!(
483 target: LOG_TARGET,
484 "Downloading target block failed: missing header in response from {peer_id}.",
485 );
486 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
487 };
488
489 if block_header != header {
490 debug!(
491 target: LOG_TARGET,
492 "Downloading target block failed: different header in response from {peer_id}.",
493 );
494 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
495 }
496
497 if block.body.is_none() {
498 debug!(
499 target: LOG_TARGET,
500 "Downloading target block failed: missing body in response from {peer_id}.",
501 );
502 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
503 }
504
505 self.result = Some(WarpSyncResult {
506 target_header: header.clone(),
507 target_body: block.body,
508 target_justifications: block.justifications,
509 });
510 self.phase = Phase::Complete;
511 self.actions.push(SyncingAction::Finished);
512 Ok(())
513 }
514
515 fn schedule_next_peer(
517 &mut self,
518 new_state: PeerState,
519 min_best_number: Option<NumberFor<B>>,
520 ) -> Option<PeerId> {
521 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
522 if targets.is_empty() {
523 return None
524 }
525 targets.sort();
526 let median = targets[targets.len() / 2];
527 let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
528 for (peer_id, peer) in self.peers.iter_mut() {
531 if peer.state.is_available() &&
532 peer.best_number >= threshold &&
533 self.disconnected_peers.is_peer_available(peer_id)
534 {
535 peer.state = new_state;
536 return Some(*peer_id)
537 }
538 }
539 None
540 }
541
542 fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
544 let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
545
546 let begin = *last_hash;
548
549 if self
550 .peers
551 .values()
552 .any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
553 {
554 return None
556 }
557
558 let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
559 trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
560
561 let request = WarpProofRequest { begin };
562
563 let Some(protocol_name) = self.protocol_name.clone() else {
564 warn!(
565 target: LOG_TARGET,
566 "Trying to send warp sync request when no protocol is configured {request:?}",
567 );
568 return None;
569 };
570
571 Some((peer_id, protocol_name, request))
572 }
573
574 fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
576 let Phase::TargetBlock(target_header) = &self.phase else { return None };
577
578 if self
579 .peers
580 .values()
581 .any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
582 {
583 return None
585 }
586
587 let target_hash = target_header.hash();
589 let target_number = *target_header.number();
590
591 let peer_id =
592 self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
593
594 trace!(
595 target: LOG_TARGET,
596 "New target block request to {peer_id}, target: {} ({}).",
597 target_hash,
598 target_number,
599 );
600
601 Some((
602 peer_id,
603 BlockRequest::<B> {
604 id: 0,
605 fields: BlockAttributes::HEADER |
606 BlockAttributes::BODY |
607 BlockAttributes::JUSTIFICATION,
608 from: FromBlock::Hash(target_hash),
609 direction: Direction::Ascending,
610 max: Some(1),
611 },
612 ))
613 }
614
615 pub fn progress(&self) -> WarpSyncProgress<B> {
617 match &self.phase {
618 Phase::WaitingForPeers { .. } => WarpSyncProgress {
619 phase: WarpSyncPhase::AwaitingPeers {
620 required_peers: self.min_peers_to_start_warp_sync,
621 },
622 total_bytes: self.total_proof_bytes,
623 },
624 Phase::WarpProof { .. } => WarpSyncProgress {
625 phase: WarpSyncPhase::DownloadingWarpProofs,
626 total_bytes: self.total_proof_bytes,
627 },
628 Phase::TargetBlock(_) => WarpSyncProgress {
629 phase: WarpSyncPhase::DownloadingTargetBlock,
630 total_bytes: self.total_proof_bytes,
631 },
632 Phase::Complete => WarpSyncProgress {
633 phase: WarpSyncPhase::Complete,
634 total_bytes: self.total_proof_bytes + self.total_state_bytes,
635 },
636 }
637 }
638
639 pub fn num_peers(&self) -> usize {
641 self.peers.len()
642 }
643
644 pub fn status(&self) -> SyncStatus<B> {
646 SyncStatus {
647 state: match &self.phase {
648 Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
649 Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
650 Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
651 Phase::Complete => SyncState::Idle,
652 },
653 best_seen_block: match &self.phase {
654 Phase::WaitingForPeers { .. } => None,
655 Phase::WarpProof { .. } => None,
656 Phase::TargetBlock(header) => Some(*header.number()),
657 Phase::Complete => None,
658 },
659 num_peers: self.peers.len().saturated_into(),
660 queued_blocks: 0,
661 state_sync: None,
662 warp_sync: Some(self.progress()),
663 }
664 }
665
666 #[must_use]
668 pub fn actions(
669 &mut self,
670 network_service: &NetworkServiceHandle,
671 ) -> impl Iterator<Item = SyncingAction<B>> {
672 let warp_proof_request =
673 self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
674 trace!(
675 target: LOG_TARGET,
676 "Created `WarpProofRequest` to {}, request: {:?}.",
677 peer_id,
678 request,
679 );
680
681 let (tx, rx) = oneshot::channel();
682
683 network_service.start_request(
684 peer_id,
685 protocol_name,
686 request.encode(),
687 tx,
688 IfDisconnected::ImmediateError,
689 );
690
691 SyncingAction::StartRequest {
692 peer_id,
693 key: Self::STRATEGY_KEY,
694 request: async move {
695 Ok(rx.await?.and_then(|(response, protocol_name)| {
696 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
697 }))
698 }
699 .boxed(),
700 remove_obsolete: false,
701 }
702 });
703 self.actions.extend(warp_proof_request);
704
705 let target_block_request =
706 self.target_block_request().into_iter().map(|(peer_id, request)| {
707 let downloader = self.block_downloader.clone();
708
709 SyncingAction::StartRequest {
710 peer_id,
711 key: Self::STRATEGY_KEY,
712 request: async move {
713 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
714 |(response, protocol_name)| {
715 let decoded_response =
716 downloader.block_response_into_blocks(&request, response);
717 let result =
718 Box::new((request, decoded_response)) as Box<dyn Any + Send>;
719 Ok((result, protocol_name))
720 },
721 ))
722 }
723 .boxed(),
724 remove_obsolete: true,
727 }
728 });
729 self.actions.extend(target_block_request);
730
731 std::mem::take(&mut self.actions).into_iter()
732 }
733
734 #[must_use]
736 pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
737 self.result.take()
738 }
739}
740
741#[cfg(test)]
742mod test {
743 use super::*;
744 use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
745 use sc_block_builder::BlockBuilderBuilder;
746 use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
747 use sp_consensus_grandpa::{AuthorityList, SetId};
748 use sp_core::H256;
749 use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
750 use std::{io::ErrorKind, sync::Arc};
751 use substrate_test_runtime_client::{
752 runtime::{Block, Hash},
753 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
754 };
755
756 mockall::mock! {
757 pub Client<B: BlockT> {}
758
759 impl<B: BlockT> HeaderBackend<B> for Client<B> {
760 fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
761 fn info(&self) -> Info<B>;
762 fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
763 fn number(
764 &self,
765 hash: B::Hash,
766 ) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
767 fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
768 }
769 }
770
771 mockall::mock! {
772 pub WarpSyncProvider<B: BlockT> {}
773
774 impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
775 fn generate(
776 &self,
777 start: B::Hash,
778 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
779 fn verify(
780 &self,
781 proof: &EncodedProof,
782 set_id: SetId,
783 authorities: AuthorityList,
784 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
785 fn current_authorities(&self) -> AuthorityList;
786 }
787 }
788
789 fn mock_client_with_state() -> MockClient<Block> {
790 let mut client = MockClient::<Block>::new();
791 let genesis_hash = Hash::random();
792 client.expect_info().return_once(move || Info {
793 best_hash: genesis_hash,
794 best_number: 0,
795 genesis_hash,
796 finalized_hash: genesis_hash,
797 finalized_number: 0,
798 finalized_state: Some((genesis_hash, 0)),
800 number_leaves: 0,
801 block_gap: None,
802 });
803
804 client
805 }
806
807 fn mock_client_without_state() -> MockClient<Block> {
808 let mut client = MockClient::<Block>::new();
809 let genesis_hash = Hash::random();
810 client.expect_info().returning(move || Info {
811 best_hash: genesis_hash,
812 best_number: 0,
813 genesis_hash,
814 finalized_hash: genesis_hash,
815 finalized_number: 0,
816 finalized_state: None,
817 number_leaves: 0,
818 block_gap: None,
819 });
820
821 client
822 }
823
824 #[test]
825 fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
826 let client = mock_client_with_state();
827 let provider = MockWarpSyncProvider::<Block>::new();
828 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
829 let mut warp_sync = WarpSync::new(
830 Arc::new(client),
831 config,
832 None,
833 Arc::new(MockBlockDownloader::new()),
834 None,
835 );
836
837 let network_provider = NetworkServiceProvider::new();
838 let network_handle = network_provider.handle();
839
840 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
842 assert_eq!(actions.len(), 1);
843 assert!(matches!(actions[0], SyncingAction::Finished));
844
845 assert!(warp_sync.take_result().is_none());
847 }
848
849 #[test]
850 fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
851 let client = mock_client_with_state();
852 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
853 1,
854 Default::default(),
855 Default::default(),
856 Default::default(),
857 Default::default(),
858 ));
859 let mut warp_sync = WarpSync::new(
860 Arc::new(client),
861 config,
862 None,
863 Arc::new(MockBlockDownloader::new()),
864 None,
865 );
866
867 let network_provider = NetworkServiceProvider::new();
868 let network_handle = network_provider.handle();
869
870 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
872 assert_eq!(actions.len(), 1);
873 assert!(matches!(actions[0], SyncingAction::Finished));
874
875 assert!(warp_sync.take_result().is_none());
877 }
878
879 #[test]
880 fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
881 let client = mock_client_without_state();
882 let provider = MockWarpSyncProvider::<Block>::new();
883 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
884 let mut warp_sync = WarpSync::new(
885 Arc::new(client),
886 config,
887 None,
888 Arc::new(MockBlockDownloader::new()),
889 None,
890 );
891
892 let network_provider = NetworkServiceProvider::new();
893 let network_handle = network_provider.handle();
894
895 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
897 }
898
899 #[test]
900 fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
901 let client = mock_client_without_state();
902 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
903 1,
904 Default::default(),
905 Default::default(),
906 Default::default(),
907 Default::default(),
908 ));
909 let mut warp_sync = WarpSync::new(
910 Arc::new(client),
911 config,
912 None,
913 Arc::new(MockBlockDownloader::new()),
914 None,
915 );
916
917 let network_provider = NetworkServiceProvider::new();
918 let network_handle = network_provider.handle();
919
920 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
922 }
923
924 #[test]
925 fn warp_sync_is_started_only_when_there_is_enough_peers() {
926 let client = mock_client_without_state();
927 let mut provider = MockWarpSyncProvider::<Block>::new();
928 provider
929 .expect_current_authorities()
930 .once()
931 .return_const(AuthorityList::default());
932 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
933 let mut warp_sync = WarpSync::new(
934 Arc::new(client),
935 config,
936 None,
937 Arc::new(MockBlockDownloader::new()),
938 None,
939 );
940
941 for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
943 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
944 assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
945 }
946
947 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
949 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
950 }
951
952 #[test]
953 fn no_peer_is_scheduled_if_no_peers_connected() {
954 let client = mock_client_without_state();
955 let provider = MockWarpSyncProvider::<Block>::new();
956 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
957 let mut warp_sync = WarpSync::new(
958 Arc::new(client),
959 config,
960 None,
961 Arc::new(MockBlockDownloader::new()),
962 None,
963 );
964
965 assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
966 }
967
968 #[test]
969 fn enough_peers_are_used_in_tests() {
970 assert!(
972 10 >= MIN_PEERS_TO_START_WARP_SYNC,
973 "Tests must be updated to use that many initial peers.",
974 );
975 }
976
977 #[test]
978 fn at_least_median_synced_peer_is_scheduled() {
979 for _ in 0..100 {
980 let client = mock_client_without_state();
981 let mut provider = MockWarpSyncProvider::<Block>::new();
982 provider
983 .expect_current_authorities()
984 .once()
985 .return_const(AuthorityList::default());
986 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
987 let mut warp_sync = WarpSync::new(
988 Arc::new(client),
989 config,
990 None,
991 Arc::new(MockBlockDownloader::new()),
992 None,
993 );
994
995 for best_number in 1..11 {
996 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
997 }
998
999 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1000 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1001 }
1002 }
1003
1004 #[test]
1005 fn min_best_number_peer_is_scheduled() {
1006 for _ in 0..10 {
1007 let client = mock_client_without_state();
1008 let mut provider = MockWarpSyncProvider::<Block>::new();
1009 provider
1010 .expect_current_authorities()
1011 .once()
1012 .return_const(AuthorityList::default());
1013 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1014 let mut warp_sync = WarpSync::new(
1015 Arc::new(client),
1016 config,
1017 None,
1018 Arc::new(MockBlockDownloader::new()),
1019 None,
1020 );
1021
1022 for best_number in 1..11 {
1023 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1024 }
1025
1026 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1027 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1028 }
1029 }
1030
1031 #[test]
1032 fn backedoff_number_peer_is_not_scheduled() {
1033 let client = mock_client_without_state();
1034 let mut provider = MockWarpSyncProvider::<Block>::new();
1035 provider
1036 .expect_current_authorities()
1037 .once()
1038 .return_const(AuthorityList::default());
1039 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1040 let mut warp_sync = WarpSync::new(
1041 Arc::new(client),
1042 config,
1043 None,
1044 Arc::new(MockBlockDownloader::new()),
1045 None,
1046 );
1047
1048 for best_number in 1..11 {
1049 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1050 }
1051
1052 let ninth_peer =
1053 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1054 let tenth_peer =
1055 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1056
1057 warp_sync.remove_peer(&tenth_peer);
1059 assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1060
1061 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1062 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1063 assert_eq!(tenth_peer, peer_id.unwrap());
1064 warp_sync.remove_peer(&tenth_peer);
1065
1066 assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1068
1069 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1071 let peer_id: Option<PeerId> =
1072 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1073 assert!(peer_id.is_none());
1074
1075 let peer_id: Option<PeerId> =
1077 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1078 assert_eq!(ninth_peer, peer_id.unwrap());
1079 }
1080
1081 #[test]
1082 fn no_warp_proof_request_in_another_phase() {
1083 let client = mock_client_without_state();
1084 let mut provider = MockWarpSyncProvider::<Block>::new();
1085 provider
1086 .expect_current_authorities()
1087 .once()
1088 .return_const(AuthorityList::default());
1089 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1090 let mut warp_sync = WarpSync::new(
1091 Arc::new(client),
1092 config,
1093 Some(ProtocolName::Static("")),
1094 Arc::new(MockBlockDownloader::new()),
1095 None,
1096 );
1097
1098 for best_number in 1..11 {
1100 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1101 }
1102
1103 warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1105 1,
1106 Default::default(),
1107 Default::default(),
1108 Default::default(),
1109 Default::default(),
1110 ));
1111
1112 assert!(warp_sync.warp_proof_request().is_none());
1114 }
1115
1116 #[test]
1117 fn warp_proof_request_starts_at_last_hash() {
1118 let client = mock_client_without_state();
1119 let mut provider = MockWarpSyncProvider::<Block>::new();
1120 provider
1121 .expect_current_authorities()
1122 .once()
1123 .return_const(AuthorityList::default());
1124 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1125 let mut warp_sync = WarpSync::new(
1126 Arc::new(client),
1127 config,
1128 Some(ProtocolName::Static("")),
1129 Arc::new(MockBlockDownloader::new()),
1130 None,
1131 );
1132
1133 for best_number in 1..11 {
1135 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1136 }
1137 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1138
1139 let known_last_hash = Hash::random();
1140
1141 match &mut warp_sync.phase {
1143 Phase::WarpProof { last_hash, .. } => {
1144 *last_hash = known_last_hash;
1145 },
1146 _ => panic!("Invalid phase."),
1147 }
1148
1149 let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1150 assert_eq!(request.begin, known_last_hash);
1151 }
1152
1153 #[test]
1154 fn no_parallel_warp_proof_requests() {
1155 let client = mock_client_without_state();
1156 let mut provider = MockWarpSyncProvider::<Block>::new();
1157 provider
1158 .expect_current_authorities()
1159 .once()
1160 .return_const(AuthorityList::default());
1161 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1162 let mut warp_sync = WarpSync::new(
1163 Arc::new(client),
1164 config,
1165 Some(ProtocolName::Static("")),
1166 Arc::new(MockBlockDownloader::new()),
1167 None,
1168 );
1169
1170 for best_number in 1..11 {
1172 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1173 }
1174 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1175
1176 assert!(warp_sync.warp_proof_request().is_some());
1178 assert!(warp_sync.warp_proof_request().is_none());
1180 }
1181
1182 #[test]
1183 fn bad_warp_proof_response_drops_peer() {
1184 let client = mock_client_without_state();
1185 let mut provider = MockWarpSyncProvider::<Block>::new();
1186 provider
1187 .expect_current_authorities()
1188 .once()
1189 .return_const(AuthorityList::default());
1190 provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
1192 Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1193 });
1194 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1195 let mut warp_sync = WarpSync::new(
1196 Arc::new(client),
1197 config,
1198 Some(ProtocolName::Static("")),
1199 Arc::new(MockBlockDownloader::new()),
1200 None,
1201 );
1202
1203 for best_number in 1..11 {
1205 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1206 }
1207 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1208
1209 let network_provider = NetworkServiceProvider::new();
1210 let network_handle = network_provider.handle();
1211
1212 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1214 assert_eq!(actions.len(), 1);
1215 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1216 panic!("Invalid action");
1217 };
1218
1219 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1220
1221 let actions = std::mem::take(&mut warp_sync.actions);
1223 assert_eq!(actions.len(), 1);
1224 assert!(matches!(
1225 actions[0],
1226 SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1227 ));
1228 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1229 }
1230
1231 #[test]
1232 fn partial_warp_proof_doesnt_advance_phase() {
1233 let client = mock_client_without_state();
1234 let mut provider = MockWarpSyncProvider::<Block>::new();
1235 provider
1236 .expect_current_authorities()
1237 .once()
1238 .return_const(AuthorityList::default());
1239 provider.expect_verify().return_once(|_proof, set_id, authorities| {
1241 Ok(VerificationResult::Partial(set_id, authorities, Hash::random()))
1242 });
1243 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1244 let mut warp_sync = WarpSync::new(
1245 Arc::new(client),
1246 config,
1247 Some(ProtocolName::Static("")),
1248 Arc::new(MockBlockDownloader::new()),
1249 None,
1250 );
1251
1252 for best_number in 1..11 {
1254 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1255 }
1256 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1257
1258 let network_provider = NetworkServiceProvider::new();
1259 let network_handle = network_provider.handle();
1260
1261 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1263 assert_eq!(actions.len(), 1);
1264 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1265 panic!("Invalid action");
1266 };
1267
1268 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1269
1270 assert!(warp_sync.actions.is_empty(), "No extra actions generated");
1271 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1272 }
1273
1274 #[test]
1275 fn complete_warp_proof_advances_phase() {
1276 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1277 let mut provider = MockWarpSyncProvider::<Block>::new();
1278 provider
1279 .expect_current_authorities()
1280 .once()
1281 .return_const(AuthorityList::default());
1282 let target_block = BlockBuilderBuilder::new(&*client)
1283 .on_parent_block(client.chain_info().best_hash)
1284 .with_parent_block_number(client.chain_info().best_number)
1285 .build()
1286 .unwrap()
1287 .build()
1288 .unwrap()
1289 .block;
1290 let target_header = target_block.header().clone();
1291 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1293 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1294 });
1295 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1296 let mut warp_sync = WarpSync::new(
1297 client,
1298 config,
1299 Some(ProtocolName::Static("")),
1300 Arc::new(MockBlockDownloader::new()),
1301 None,
1302 );
1303
1304 for best_number in 1..11 {
1306 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1307 }
1308 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1309
1310 let network_provider = NetworkServiceProvider::new();
1311 let network_handle = network_provider.handle();
1312
1313 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1315 assert_eq!(actions.len(), 1);
1316 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1317 panic!("Invalid action.");
1318 };
1319
1320 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1321
1322 assert!(warp_sync.actions.is_empty(), "No extra actions generated.");
1323 assert!(
1324 matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1325 );
1326 }
1327
1328 #[test]
1329 fn no_target_block_requests_in_another_phase() {
1330 let client = mock_client_without_state();
1331 let mut provider = MockWarpSyncProvider::<Block>::new();
1332 provider
1333 .expect_current_authorities()
1334 .once()
1335 .return_const(AuthorityList::default());
1336 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1337 let mut warp_sync = WarpSync::new(
1338 Arc::new(client),
1339 config,
1340 None,
1341 Arc::new(MockBlockDownloader::new()),
1342 None,
1343 );
1344
1345 for best_number in 1..11 {
1347 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1348 }
1349 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1351
1352 assert!(warp_sync.target_block_request().is_none());
1354 }
1355
1356 #[test]
1357 fn target_block_request_is_correct() {
1358 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1359 let mut provider = MockWarpSyncProvider::<Block>::new();
1360 provider
1361 .expect_current_authorities()
1362 .once()
1363 .return_const(AuthorityList::default());
1364 let target_block = BlockBuilderBuilder::new(&*client)
1365 .on_parent_block(client.chain_info().best_hash)
1366 .with_parent_block_number(client.chain_info().best_number)
1367 .build()
1368 .unwrap()
1369 .build()
1370 .unwrap()
1371 .block;
1372 let target_header = target_block.header().clone();
1373 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1375 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1376 });
1377 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1378 let mut warp_sync =
1379 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1380
1381 for best_number in 1..11 {
1383 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1384 }
1385
1386 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1388
1389 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1390 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1391 assert_eq!(
1392 request.fields,
1393 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1394 );
1395 assert_eq!(request.max, Some(1));
1396 }
1397
1398 #[test]
1399 fn externally_set_target_block_is_requested() {
1400 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1401 let target_block = BlockBuilderBuilder::new(&*client)
1402 .on_parent_block(client.chain_info().best_hash)
1403 .with_parent_block_number(client.chain_info().best_number)
1404 .build()
1405 .unwrap()
1406 .build()
1407 .unwrap()
1408 .block;
1409 let target_header = target_block.header().clone();
1410 let config = WarpSyncConfig::WithTarget(target_header);
1411 let mut warp_sync =
1412 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1413
1414 for best_number in 1..11 {
1416 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1417 }
1418
1419 assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1420
1421 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1422 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1423 assert_eq!(
1424 request.fields,
1425 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1426 );
1427 assert_eq!(request.max, Some(1));
1428 }
1429
1430 #[test]
1431 fn no_parallel_target_block_requests() {
1432 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1433 let mut provider = MockWarpSyncProvider::<Block>::new();
1434 provider
1435 .expect_current_authorities()
1436 .once()
1437 .return_const(AuthorityList::default());
1438 let target_block = BlockBuilderBuilder::new(&*client)
1439 .on_parent_block(client.chain_info().best_hash)
1440 .with_parent_block_number(client.chain_info().best_number)
1441 .build()
1442 .unwrap()
1443 .build()
1444 .unwrap()
1445 .block;
1446 let target_header = target_block.header().clone();
1447 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1449 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1450 });
1451 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1452 let mut warp_sync =
1453 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1454
1455 for best_number in 1..11 {
1457 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1458 }
1459
1460 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1462
1463 assert!(warp_sync.target_block_request().is_some());
1465 assert!(warp_sync.target_block_request().is_none());
1467 }
1468
1469 #[test]
1470 fn target_block_response_with_no_blocks_drops_peer() {
1471 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1472 let mut provider = MockWarpSyncProvider::<Block>::new();
1473 provider
1474 .expect_current_authorities()
1475 .once()
1476 .return_const(AuthorityList::default());
1477 let target_block = BlockBuilderBuilder::new(&*client)
1478 .on_parent_block(client.chain_info().best_hash)
1479 .with_parent_block_number(client.chain_info().best_number)
1480 .build()
1481 .unwrap()
1482 .build()
1483 .unwrap()
1484 .block;
1485 let target_header = target_block.header().clone();
1486 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1488 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1489 });
1490 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1491 let mut warp_sync =
1492 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1493
1494 for best_number in 1..11 {
1496 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1497 }
1498
1499 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1501
1502 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1503
1504 let response = Vec::new();
1506 assert!(matches!(
1508 warp_sync.on_block_response_inner(peer_id, request, response),
1509 Err(BadPeer(id, _rep)) if id == peer_id,
1510 ));
1511 }
1512
1513 #[test]
1514 fn target_block_response_with_extra_blocks_drops_peer() {
1515 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1516 let mut provider = MockWarpSyncProvider::<Block>::new();
1517 provider
1518 .expect_current_authorities()
1519 .once()
1520 .return_const(AuthorityList::default());
1521 let target_block = BlockBuilderBuilder::new(&*client)
1522 .on_parent_block(client.chain_info().best_hash)
1523 .with_parent_block_number(client.chain_info().best_number)
1524 .build()
1525 .unwrap()
1526 .build()
1527 .unwrap()
1528 .block;
1529
1530 let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1531 .on_parent_block(client.chain_info().best_hash)
1532 .with_parent_block_number(client.chain_info().best_number)
1533 .build()
1534 .unwrap();
1535 extra_block_builder
1536 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1537 .unwrap();
1538 let extra_block = extra_block_builder.build().unwrap().block;
1539
1540 let target_header = target_block.header().clone();
1541 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1543 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1544 });
1545 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1546 let mut warp_sync =
1547 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1548
1549 for best_number in 1..11 {
1551 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1552 }
1553
1554 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1556
1557 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1558
1559 let response = vec![
1561 BlockData::<Block> {
1562 hash: target_block.header().hash(),
1563 header: Some(target_block.header().clone()),
1564 body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1565 indexed_body: None,
1566 receipt: None,
1567 message_queue: None,
1568 justification: None,
1569 justifications: None,
1570 },
1571 BlockData::<Block> {
1572 hash: extra_block.header().hash(),
1573 header: Some(extra_block.header().clone()),
1574 body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1575 indexed_body: None,
1576 receipt: None,
1577 message_queue: None,
1578 justification: None,
1579 justifications: None,
1580 },
1581 ];
1582 assert!(matches!(
1584 warp_sync.on_block_response_inner(peer_id, request, response),
1585 Err(BadPeer(id, _rep)) if id == peer_id,
1586 ));
1587 }
1588
1589 #[test]
1590 fn target_block_response_with_wrong_block_drops_peer() {
1591 sp_tracing::try_init_simple();
1592
1593 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1594 let mut provider = MockWarpSyncProvider::<Block>::new();
1595 provider
1596 .expect_current_authorities()
1597 .once()
1598 .return_const(AuthorityList::default());
1599 let target_block = BlockBuilderBuilder::new(&*client)
1600 .on_parent_block(client.chain_info().best_hash)
1601 .with_parent_block_number(client.chain_info().best_number)
1602 .build()
1603 .unwrap()
1604 .build()
1605 .unwrap()
1606 .block;
1607
1608 let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1609 .on_parent_block(client.chain_info().best_hash)
1610 .with_parent_block_number(client.chain_info().best_number)
1611 .build()
1612 .unwrap();
1613 wrong_block_builder
1614 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1615 .unwrap();
1616 let wrong_block = wrong_block_builder.build().unwrap().block;
1617
1618 let target_header = target_block.header().clone();
1619 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1621 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1622 });
1623 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1624 let mut warp_sync =
1625 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1626
1627 for best_number in 1..11 {
1629 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1630 }
1631
1632 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1634
1635 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1636
1637 let response = vec![BlockData::<Block> {
1639 hash: wrong_block.header().hash(),
1640 header: Some(wrong_block.header().clone()),
1641 body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1642 indexed_body: None,
1643 receipt: None,
1644 message_queue: None,
1645 justification: None,
1646 justifications: None,
1647 }];
1648 assert!(matches!(
1650 warp_sync.on_block_response_inner(peer_id, request, response),
1651 Err(BadPeer(id, _rep)) if id == peer_id,
1652 ));
1653 }
1654
1655 #[test]
1656 fn correct_target_block_response_sets_strategy_result() {
1657 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1658 let mut provider = MockWarpSyncProvider::<Block>::new();
1659 provider
1660 .expect_current_authorities()
1661 .once()
1662 .return_const(AuthorityList::default());
1663 let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1664 .on_parent_block(client.chain_info().best_hash)
1665 .with_parent_block_number(client.chain_info().best_number)
1666 .build()
1667 .unwrap();
1668 target_block_builder
1669 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1670 .unwrap();
1671 let target_block = target_block_builder.build().unwrap().block;
1672 let target_header = target_block.header().clone();
1673 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1675 Ok(VerificationResult::Complete(set_id, authorities, target_header))
1676 });
1677 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1678 let mut warp_sync =
1679 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1680
1681 for best_number in 1..11 {
1683 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1684 }
1685
1686 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1688
1689 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1690
1691 let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1693 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1694 let response = vec![BlockData::<Block> {
1695 hash: target_block.header().hash(),
1696 header: Some(target_block.header().clone()),
1697 body: body.clone(),
1698 indexed_body: None,
1699 receipt: None,
1700 message_queue: None,
1701 justification: None,
1702 justifications: justifications.clone(),
1703 }];
1704
1705 assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1706
1707 let network_provider = NetworkServiceProvider::new();
1708 let network_handle = network_provider.handle();
1709
1710 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1712 assert_eq!(actions.len(), 1);
1713 assert!(matches!(actions[0], SyncingAction::Finished));
1714
1715 let result = warp_sync.take_result().unwrap();
1717 assert_eq!(result.target_header, *target_block.header());
1718 assert_eq!(result.target_body, body);
1719 assert_eq!(result.target_justifications, justifications);
1720 }
1721}