1use sc_consensus::IncomingBlock;
22use sp_consensus::BlockOrigin;
23
24use crate::{
25 block_relay_protocol::{BlockDownloader, BlockResponseError},
26 service::network::NetworkServiceHandle,
27 strategy::{
28 chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
29 SyncingAction,
30 },
31 types::{BadPeer, SyncState, SyncStatus},
32 LOG_TARGET,
33};
34use codec::{Decode, Encode};
35use futures::{channel::oneshot, FutureExt};
36use log::{debug, error, trace, warn};
37use sc_network::{IfDisconnected, ProtocolName};
38use sc_network_common::sync::message::{
39 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
40};
41use sc_network_types::PeerId;
42use sp_blockchain::HeaderBackend;
43use sp_runtime::{
44 traits::{Block as BlockT, Header, NumberFor, Zero},
45 Justifications, SaturatedConversion,
46};
47use std::{any::Any, collections::HashMap, fmt, sync::Arc};
48
49const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
51
52pub struct EncodedProof(pub Vec<u8>);
54
55#[derive(Encode, Decode, Debug, Clone)]
57pub struct WarpProofRequest<B: BlockT> {
58 pub begin: B::Hash,
60}
61
62pub trait Verifier<Block: BlockT>: Send + Sync {
64 fn verify(
66 &mut self,
67 proof: &EncodedProof,
68 ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
69 fn next_proof_context(&self) -> Block::Hash;
71 fn status(&self) -> Option<String>;
73}
74
75pub enum VerificationResult<Block: BlockT> {
77 Partial(Vec<(Block::Header, Justifications)>),
79 Complete(Block::Header, Vec<(Block::Header, Justifications)>),
81}
82
83pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
85 fn generate(
88 &self,
89 start: Block::Hash,
90 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
91 fn create_verifier(&self) -> Box<dyn Verifier<Block>>;
93}
94
95mod rep {
96 use sc_network::ReputationChange as Rep;
97
98 pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
100
101 pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
103
104 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
106
107 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
109
110 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
112
113 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
115}
116
117#[derive(Clone, Eq, PartialEq, Debug)]
119pub enum WarpSyncPhase<Block: BlockT> {
120 AwaitingPeers { required_peers: usize },
122 DownloadingWarpProofs,
124 DownloadingTargetBlock,
126 DownloadingState,
128 ImportingState,
130 DownloadingBlocks(NumberFor<Block>),
132 Complete,
134}
135
136impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138 match self {
139 Self::AwaitingPeers { required_peers } => {
140 write!(f, "Waiting for {required_peers} peers to be connected")
141 },
142 Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
143 Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
144 Self::DownloadingState => write!(f, "Downloading state"),
145 Self::ImportingState => write!(f, "Importing state"),
146 Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
147 Self::Complete => write!(f, "Warp sync is complete"),
148 }
149 }
150}
151
152#[derive(Clone, Eq, PartialEq, Debug)]
154pub struct WarpSyncProgress<Block: BlockT> {
155 pub phase: WarpSyncPhase<Block>,
157 pub total_bytes: u64,
159 pub status: Option<String>,
161}
162
163pub enum WarpSyncConfig<Block: BlockT> {
165 WithProvider(Arc<dyn WarpSyncProvider<Block>>),
167 WithTarget(<Block as BlockT>::Header),
171}
172
173enum Phase<B: BlockT> {
175 WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
177 WarpProof { verifier: Box<dyn Verifier<B>> },
179 TargetBlock(B::Header),
181 Complete,
183}
184
185enum PeerState {
186 Available,
187 DownloadingProofs,
188 DownloadingTargetBlock,
189}
190
191impl PeerState {
192 fn is_available(&self) -> bool {
193 matches!(self, PeerState::Available)
194 }
195}
196
197struct Peer<B: BlockT> {
198 best_number: NumberFor<B>,
199 state: PeerState,
200}
201
202pub struct WarpSyncResult<B: BlockT> {
203 pub target_header: B::Header,
204 pub target_body: Option<Vec<B::Extrinsic>>,
205 pub target_justifications: Option<Justifications>,
206}
207
208pub struct WarpSync<B: BlockT> {
210 phase: Phase<B>,
211 total_proof_bytes: u64,
212 total_state_bytes: u64,
213 peers: HashMap<PeerId, Peer<B>>,
214 disconnected_peers: DisconnectedPeers,
215 protocol_name: Option<ProtocolName>,
216 block_downloader: Arc<dyn BlockDownloader<B>>,
217 actions: Vec<SyncingAction<B>>,
218 result: Option<WarpSyncResult<B>>,
219 min_peers_to_start_warp_sync: usize,
221}
222
223impl<B> WarpSync<B>
224where
225 B: BlockT,
226{
227 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
229
230 pub fn new<Client>(
234 client: Arc<Client>,
235 warp_sync_config: WarpSyncConfig<B>,
236 protocol_name: Option<ProtocolName>,
237 block_downloader: Arc<dyn BlockDownloader<B>>,
238 min_peers_to_start_warp_sync: Option<usize>,
239 ) -> Self
240 where
241 Client: HeaderBackend<B> + 'static,
242 {
243 let min_peers_to_start_warp_sync =
244 min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
245 if client.info().finalized_state.is_some() {
246 error!(
247 target: LOG_TARGET,
248 "Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
249 );
250 return Self {
251 phase: Phase::Complete,
252 total_proof_bytes: 0,
253 total_state_bytes: 0,
254 peers: HashMap::new(),
255 disconnected_peers: DisconnectedPeers::new(),
256 protocol_name,
257 block_downloader,
258 actions: vec![SyncingAction::Finished],
259 result: None,
260 min_peers_to_start_warp_sync,
261 };
262 }
263
264 let phase = match warp_sync_config {
265 WarpSyncConfig::WithProvider(warp_sync_provider) => {
266 Phase::WaitingForPeers { warp_sync_provider }
267 },
268 WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
269 };
270
271 Self {
272 phase,
273 total_proof_bytes: 0,
274 total_state_bytes: 0,
275 peers: HashMap::new(),
276 disconnected_peers: DisconnectedPeers::new(),
277 protocol_name,
278 block_downloader,
279 actions: Vec::new(),
280 result: None,
281 min_peers_to_start_warp_sync,
282 }
283 }
284
285 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
287 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
288
289 self.try_to_start_warp_sync();
290 }
291
292 pub fn remove_peer(&mut self, peer_id: &PeerId) {
294 if let Some(state) = self.peers.remove(peer_id) {
295 if !state.state.is_available() {
296 if let Some(bad_peer) =
297 self.disconnected_peers.on_disconnect_during_request(*peer_id)
298 {
299 self.actions.push(SyncingAction::DropPeer(bad_peer));
300 }
301 }
302 }
303 }
304
305 #[must_use]
309 pub fn on_validated_block_announce(
310 &mut self,
311 is_best: bool,
312 peer_id: PeerId,
313 announce: &BlockAnnounce<B::Header>,
314 ) -> Option<(B::Hash, NumberFor<B>)> {
315 is_best.then(|| {
316 let best_number = *announce.header.number();
317 let best_hash = announce.header.hash();
318 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
319 peer.best_number = best_number;
320 }
321 (best_hash, best_number)
323 })
324 }
325
326 fn try_to_start_warp_sync(&mut self) {
328 let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
329
330 if self.peers.len() < self.min_peers_to_start_warp_sync {
331 return;
332 }
333
334 let verifier = warp_sync_provider.create_verifier();
335 self.phase = Phase::WarpProof { verifier };
336 debug!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
337 }
338
339 pub fn on_generic_response(
340 &mut self,
341 peer_id: &PeerId,
342 protocol_name: ProtocolName,
343 response: Box<dyn Any + Send>,
344 ) {
345 if &protocol_name == self.block_downloader.protocol_name() {
346 let Ok(response) = response
347 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
348 else {
349 warn!(target: LOG_TARGET, "Failed to downcast block response");
350 debug_assert!(false);
351 return;
352 };
353
354 let (request, response) = *response;
355 let blocks = match response {
356 Ok(blocks) => blocks,
357 Err(BlockResponseError::DecodeFailed(e)) => {
358 debug!(
359 target: LOG_TARGET,
360 "Failed to decode block response from peer {:?}: {:?}.",
361 peer_id,
362 e
363 );
364 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
365 return;
366 },
367 Err(BlockResponseError::ExtractionFailed(e)) => {
368 debug!(
369 target: LOG_TARGET,
370 "Failed to extract blocks from peer response {:?}: {:?}.",
371 peer_id,
372 e
373 );
374 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
375 return;
376 },
377 };
378
379 self.on_block_response(*peer_id, request, blocks);
380 } else {
381 let Ok(response) = response.downcast::<Vec<u8>>() else {
382 warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
383 debug_assert!(false);
384 return;
385 };
386
387 self.on_warp_proof_response(peer_id, EncodedProof(*response));
388 }
389 }
390
391 pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
393 if let Some(peer) = self.peers.get_mut(peer_id) {
394 peer.state = PeerState::Available;
395 }
396
397 let Phase::WarpProof { verifier } = &mut self.phase else {
398 debug!(target: LOG_TARGET, "Unexpected warp proof response");
399 self.actions
400 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
401 return;
402 };
403
404 let proof_to_incoming_block =
405 |(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
406 IncomingBlock {
407 hash: header.hash(),
408 header: Some(header),
409 body: None,
410 indexed_body: None,
411 justifications: Some(justifications),
412 origin: Some(*peer_id),
413 allow_missing_state: true,
416 skip_execution: true,
417 import_existing: false,
419 state: None,
420 }
421 };
422
423 match verifier.verify(&response) {
424 Err(e) => {
425 debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
426 self.actions
427 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
428 },
429 Ok(VerificationResult::Partial(proofs)) => {
430 debug!(target: LOG_TARGET, "Verified partial proof");
431 self.total_proof_bytes += response.0.len() as u64;
432 self.actions.push(SyncingAction::ImportBlocks {
433 origin: BlockOrigin::WarpSync,
434 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
435 });
436 },
437 Ok(VerificationResult::Complete(header, proofs)) => {
438 debug!(
439 target: LOG_TARGET,
440 "Verified complete proof. Continuing with target block download: {} ({}).",
441 header.hash(),
442 header.number(),
443 );
444 self.total_proof_bytes += response.0.len() as u64;
445 self.phase = Phase::TargetBlock(header.clone());
446 let incoming_blocks: Vec<_> = proofs
447 .into_iter()
448 .map(proof_to_incoming_block)
449 .filter(|i| {
450 if header.number() != i.header.as_ref().unwrap().number() {
455 true
456 } else {
457 log::trace!(
458 target: LOG_TARGET,
459 "Filtered out target block: {} ({})",
460 header.hash(),
461 header.number()
462 );
463 false
464 }
465 })
466 .collect();
467 self.actions.push(SyncingAction::ImportBlocks {
468 origin: BlockOrigin::WarpSync,
469 blocks: incoming_blocks,
470 });
471 },
472 }
473 }
474
475 pub fn on_block_response(
477 &mut self,
478 peer_id: PeerId,
479 request: BlockRequest<B>,
480 blocks: Vec<BlockData<B>>,
481 ) {
482 if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
483 self.actions.push(SyncingAction::DropPeer(bad_peer));
484 }
485 }
486
487 fn on_block_response_inner(
488 &mut self,
489 peer_id: PeerId,
490 request: BlockRequest<B>,
491 mut blocks: Vec<BlockData<B>>,
492 ) -> Result<(), BadPeer> {
493 if let Some(peer) = self.peers.get_mut(&peer_id) {
494 peer.state = PeerState::Available;
495 }
496
497 let Phase::TargetBlock(header) = &mut self.phase else {
498 debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
499 return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE));
500 };
501
502 if blocks.is_empty() {
503 debug!(
504 target: LOG_TARGET,
505 "Downloading target block failed: empty block response from {peer_id}",
506 );
507 return Err(BadPeer(peer_id, rep::NO_BLOCK));
508 }
509
510 if blocks.len() > 1 {
511 debug!(
512 target: LOG_TARGET,
513 "Too many blocks ({}) in warp target block response from {peer_id}",
514 blocks.len(),
515 );
516 return Err(BadPeer(peer_id, rep::NOT_REQUESTED));
517 }
518
519 validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
520
521 let block = blocks.pop().expect("`blocks` len checked above; qed");
522
523 let Some(block_header) = &block.header else {
524 debug!(
525 target: LOG_TARGET,
526 "Downloading target block failed: missing header in response from {peer_id}.",
527 );
528 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
529 };
530
531 if block_header != header {
532 debug!(
533 target: LOG_TARGET,
534 "Downloading target block failed: different header in response from {peer_id}.",
535 );
536 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
537 }
538
539 if block.body.is_none() {
540 debug!(
541 target: LOG_TARGET,
542 "Downloading target block failed: missing body in response from {peer_id}.",
543 );
544 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
545 }
546
547 self.result = Some(WarpSyncResult {
548 target_header: header.clone(),
549 target_body: block.body,
550 target_justifications: block.justifications,
551 });
552 self.phase = Phase::Complete;
553 self.actions.push(SyncingAction::Finished);
554 Ok(())
555 }
556
557 fn schedule_next_peer(
559 &mut self,
560 new_state: PeerState,
561 min_best_number: Option<NumberFor<B>>,
562 ) -> Option<PeerId> {
563 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
564 if targets.is_empty() {
565 return None;
566 }
567 targets.sort();
568 let median = targets[targets.len() / 2];
569 let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
570 for (peer_id, peer) in self.peers.iter_mut() {
573 if peer.state.is_available() &&
574 peer.best_number >= threshold &&
575 self.disconnected_peers.is_peer_available(peer_id)
576 {
577 peer.state = new_state;
578 return Some(*peer_id);
579 }
580 }
581 None
582 }
583
584 fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
586 let Phase::WarpProof { verifier } = &self.phase else { return None };
587
588 let begin = verifier.next_proof_context();
590
591 if self
592 .peers
593 .values()
594 .any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
595 {
596 return None;
598 }
599
600 let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
601 trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
602
603 let request = WarpProofRequest { begin };
604
605 let Some(protocol_name) = self.protocol_name.clone() else {
606 warn!(
607 target: LOG_TARGET,
608 "Trying to send warp sync request when no protocol is configured {request:?}",
609 );
610 return None;
611 };
612
613 Some((peer_id, protocol_name, request))
614 }
615
616 fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
618 let Phase::TargetBlock(target_header) = &self.phase else { return None };
619
620 if self
621 .peers
622 .values()
623 .any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
624 {
625 return None;
627 }
628
629 let target_hash = target_header.hash();
631 let target_number = *target_header.number();
632
633 let peer_id =
634 self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
635
636 trace!(
637 target: LOG_TARGET,
638 "New target block request to {peer_id}, target: {} ({}).",
639 target_hash,
640 target_number,
641 );
642
643 Some((
644 peer_id,
645 BlockRequest::<B> {
646 id: 0,
647 fields: BlockAttributes::HEADER |
648 BlockAttributes::BODY |
649 BlockAttributes::JUSTIFICATION,
650 from: FromBlock::Hash(target_hash),
651 direction: Direction::Ascending,
652 max: Some(1),
653 },
654 ))
655 }
656
657 pub fn progress(&self) -> WarpSyncProgress<B> {
659 match &self.phase {
660 Phase::WaitingForPeers { .. } => WarpSyncProgress {
661 phase: WarpSyncPhase::AwaitingPeers {
662 required_peers: self.min_peers_to_start_warp_sync,
663 },
664 total_bytes: self.total_proof_bytes,
665 status: None,
666 },
667 Phase::WarpProof { verifier } => WarpSyncProgress {
668 phase: WarpSyncPhase::DownloadingWarpProofs,
669 total_bytes: self.total_proof_bytes,
670 status: verifier.status(),
671 },
672 Phase::TargetBlock(_) => WarpSyncProgress {
673 phase: WarpSyncPhase::DownloadingTargetBlock,
674 total_bytes: self.total_proof_bytes,
675 status: None,
676 },
677 Phase::Complete => WarpSyncProgress {
678 phase: WarpSyncPhase::Complete,
679 total_bytes: self.total_proof_bytes + self.total_state_bytes,
680 status: None,
681 },
682 }
683 }
684
685 pub fn num_peers(&self) -> usize {
687 self.peers.len()
688 }
689
690 pub fn status(&self) -> SyncStatus<B> {
692 SyncStatus {
693 state: match &self.phase {
694 Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
695 Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
696 Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
697 Phase::Complete => SyncState::Idle,
698 },
699 best_seen_block: match &self.phase {
700 Phase::WaitingForPeers { .. } => None,
701 Phase::WarpProof { .. } => None,
702 Phase::TargetBlock(header) => Some(*header.number()),
703 Phase::Complete => None,
704 },
705 num_peers: self.peers.len().saturated_into(),
706 queued_blocks: 0,
707 state_sync: None,
708 warp_sync: Some(self.progress()),
709 }
710 }
711
712 #[must_use]
714 pub fn actions(
715 &mut self,
716 network_service: &NetworkServiceHandle,
717 ) -> impl Iterator<Item = SyncingAction<B>> {
718 let warp_proof_request =
719 self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
720 trace!(
721 target: LOG_TARGET,
722 "Created `WarpProofRequest` to {}, request: {:?}.",
723 peer_id,
724 request,
725 );
726
727 let (tx, rx) = oneshot::channel();
728
729 network_service.start_request(
730 peer_id,
731 protocol_name,
732 request.encode(),
733 tx,
734 IfDisconnected::ImmediateError,
735 );
736
737 SyncingAction::StartRequest {
738 peer_id,
739 key: Self::STRATEGY_KEY,
740 request: async move {
741 Ok(rx.await?.and_then(|(response, protocol_name)| {
742 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
743 }))
744 }
745 .boxed(),
746 remove_obsolete: false,
747 }
748 });
749 self.actions.extend(warp_proof_request);
750
751 let target_block_request =
752 self.target_block_request().into_iter().map(|(peer_id, request)| {
753 let downloader = self.block_downloader.clone();
754
755 SyncingAction::StartRequest {
756 peer_id,
757 key: Self::STRATEGY_KEY,
758 request: async move {
759 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
760 |(response, protocol_name)| {
761 let decoded_response =
762 downloader.block_response_into_blocks(&request, response);
763 let result =
764 Box::new((request, decoded_response)) as Box<dyn Any + Send>;
765 Ok((result, protocol_name))
766 },
767 ))
768 }
769 .boxed(),
770 remove_obsolete: true,
773 }
774 });
775 self.actions.extend(target_block_request);
776
777 std::mem::take(&mut self.actions).into_iter()
778 }
779
780 #[must_use]
782 pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
783 self.result.take()
784 }
785}
786
787#[cfg(test)]
788mod test {
789 use super::*;
790 use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
791 use sc_block_builder::BlockBuilderBuilder;
792 use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
793 use sp_core::H256;
794 use sp_runtime::{
795 traits::{Block as BlockT, Header as HeaderT, NumberFor},
796 ConsensusEngineId,
797 };
798 use std::{io::ErrorKind, sync::Arc};
799 use substrate_test_runtime_client::{
800 runtime::{Block, Hash},
801 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
802 };
803
804 pub const TEST_ENGINE_ID: ConsensusEngineId = *b"TEST";
805
806 mockall::mock! {
807 pub Client<B: BlockT> {}
808
809 impl<B: BlockT> HeaderBackend<B> for Client<B> {
810 fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
811 fn info(&self) -> Info<B>;
812 fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
813 fn number(
814 &self,
815 hash: B::Hash,
816 ) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
817 fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
818 }
819 }
820
821 mockall::mock! {
822 pub WarpSyncProvider<B: BlockT> {}
823
824 impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
825 fn generate(
826 &self,
827 start: B::Hash,
828 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
829 fn create_verifier(&self) -> Box<dyn super::Verifier<B>>;
830 }
831 }
832
833 mockall::mock! {
834 pub Verifier<B: BlockT> {}
835
836 impl<B: BlockT> super::Verifier<B> for Verifier<B> {
837 fn verify(
838 &mut self,
839 proof: &EncodedProof,
840 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
841 fn next_proof_context(&self) -> B::Hash;
842 fn status(&self) -> Option<String>;
843 }
844 }
845
846 fn mock_client_with_state() -> MockClient<Block> {
847 let mut client = MockClient::<Block>::new();
848 let genesis_hash = Hash::random();
849 client.expect_info().return_once(move || Info {
850 best_hash: genesis_hash,
851 best_number: 0,
852 genesis_hash,
853 finalized_hash: genesis_hash,
854 finalized_number: 0,
855 finalized_state: Some((genesis_hash, 0)),
857 number_leaves: 0,
858 block_gap: None,
859 });
860
861 client
862 }
863
864 fn mock_client_without_state() -> MockClient<Block> {
865 let mut client = MockClient::<Block>::new();
866 let genesis_hash = Hash::random();
867 client.expect_info().returning(move || Info {
868 best_hash: genesis_hash,
869 best_number: 0,
870 genesis_hash,
871 finalized_hash: genesis_hash,
872 finalized_number: 0,
873 finalized_state: None,
874 number_leaves: 0,
875 block_gap: None,
876 });
877
878 client
879 }
880
881 #[test]
882 fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
883 let client = mock_client_with_state();
884 let provider = MockWarpSyncProvider::<Block>::new();
885 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
886 let mut warp_sync = WarpSync::new(
887 Arc::new(client),
888 config,
889 None,
890 Arc::new(MockBlockDownloader::new()),
891 None,
892 );
893
894 let network_provider = NetworkServiceProvider::new();
895 let network_handle = network_provider.handle();
896
897 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
899 assert_eq!(actions.len(), 1);
900 assert!(matches!(actions[0], SyncingAction::Finished));
901
902 assert!(warp_sync.take_result().is_none());
904 }
905
906 #[test]
907 fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
908 let client = mock_client_with_state();
909 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
910 1,
911 Default::default(),
912 Default::default(),
913 Default::default(),
914 Default::default(),
915 ));
916 let mut warp_sync = WarpSync::new(
917 Arc::new(client),
918 config,
919 None,
920 Arc::new(MockBlockDownloader::new()),
921 None,
922 );
923
924 let network_provider = NetworkServiceProvider::new();
925 let network_handle = network_provider.handle();
926
927 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
929 assert_eq!(actions.len(), 1);
930 assert!(matches!(actions[0], SyncingAction::Finished));
931
932 assert!(warp_sync.take_result().is_none());
934 }
935
936 #[test]
937 fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
938 let client = mock_client_without_state();
939 let provider = MockWarpSyncProvider::<Block>::new();
940 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
941 let mut warp_sync = WarpSync::new(
942 Arc::new(client),
943 config,
944 None,
945 Arc::new(MockBlockDownloader::new()),
946 None,
947 );
948
949 let network_provider = NetworkServiceProvider::new();
950 let network_handle = network_provider.handle();
951
952 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
954 }
955
956 #[test]
957 fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
958 let client = mock_client_without_state();
959 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
960 1,
961 Default::default(),
962 Default::default(),
963 Default::default(),
964 Default::default(),
965 ));
966 let mut warp_sync = WarpSync::new(
967 Arc::new(client),
968 config,
969 None,
970 Arc::new(MockBlockDownloader::new()),
971 None,
972 );
973
974 let network_provider = NetworkServiceProvider::new();
975 let network_handle = network_provider.handle();
976
977 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
979 }
980
981 #[test]
982 fn warp_sync_is_started_only_when_there_is_enough_peers() {
983 let client = mock_client_without_state();
984 let mut provider = MockWarpSyncProvider::<Block>::new();
985 let mut verifier = MockVerifier::<Block>::new();
986 verifier.expect_next_proof_context().returning(|| Hash::random());
987 verifier
988 .expect_verify()
989 .returning(|_| unreachable!("verify should not be called in this test"));
990 provider.expect_create_verifier().return_once(move || Box::new(verifier));
991 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
992 let mut warp_sync = WarpSync::new(
993 Arc::new(client),
994 config,
995 None,
996 Arc::new(MockBlockDownloader::new()),
997 None,
998 );
999
1000 for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
1002 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
1003 assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
1004 }
1005
1006 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
1008 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
1009 }
1010
1011 #[test]
1012 fn no_peer_is_scheduled_if_no_peers_connected() {
1013 let client = mock_client_without_state();
1014 let provider = MockWarpSyncProvider::<Block>::new();
1015 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1016 let mut warp_sync = WarpSync::new(
1017 Arc::new(client),
1018 config,
1019 None,
1020 Arc::new(MockBlockDownloader::new()),
1021 None,
1022 );
1023
1024 assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
1025 }
1026
1027 #[test]
1028 fn enough_peers_are_used_in_tests() {
1029 assert!(
1031 10 >= MIN_PEERS_TO_START_WARP_SYNC,
1032 "Tests must be updated to use that many initial peers.",
1033 );
1034 }
1035
1036 #[test]
1037 fn at_least_median_synced_peer_is_scheduled() {
1038 for _ in 0..100 {
1039 let client = mock_client_without_state();
1040 let mut provider = MockWarpSyncProvider::<Block>::new();
1041 let mut verifier = MockVerifier::<Block>::new();
1042 verifier.expect_next_proof_context().returning(|| Hash::random());
1043 verifier
1044 .expect_verify()
1045 .returning(|_| unreachable!("verify should not be called in this test"));
1046 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1047 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1048 let mut warp_sync = WarpSync::new(
1049 Arc::new(client),
1050 config,
1051 None,
1052 Arc::new(MockBlockDownloader::new()),
1053 None,
1054 );
1055
1056 for best_number in 1..11 {
1057 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1058 }
1059
1060 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1061 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1062 }
1063 }
1064
1065 #[test]
1066 fn min_best_number_peer_is_scheduled() {
1067 for _ in 0..10 {
1068 let client = mock_client_without_state();
1069 let mut provider = MockWarpSyncProvider::<Block>::new();
1070 let mut verifier = MockVerifier::<Block>::new();
1071 verifier.expect_next_proof_context().returning(|| Hash::random());
1072 verifier
1073 .expect_verify()
1074 .returning(|_| unreachable!("verify should not be called in this test"));
1075 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1076 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1077 let mut warp_sync = WarpSync::new(
1078 Arc::new(client),
1079 config,
1080 None,
1081 Arc::new(MockBlockDownloader::new()),
1082 None,
1083 );
1084
1085 for best_number in 1..11 {
1086 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1087 }
1088
1089 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1090 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1091 }
1092 }
1093
1094 #[test]
1095 fn backedoff_number_peer_is_not_scheduled() {
1096 let client = mock_client_without_state();
1097 let mut provider = MockWarpSyncProvider::<Block>::new();
1098 let mut verifier = MockVerifier::<Block>::new();
1099 verifier.expect_next_proof_context().returning(|| Hash::random());
1100 verifier
1101 .expect_verify()
1102 .returning(|_| unreachable!("verify should not be called in this test"));
1103 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1104 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1105 let mut warp_sync = WarpSync::new(
1106 Arc::new(client),
1107 config,
1108 None,
1109 Arc::new(MockBlockDownloader::new()),
1110 None,
1111 );
1112
1113 for best_number in 1..11 {
1114 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1115 }
1116
1117 let ninth_peer =
1118 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1119 let tenth_peer =
1120 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1121
1122 warp_sync.remove_peer(&tenth_peer);
1124 assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1125
1126 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1127 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1128 assert_eq!(tenth_peer, peer_id.unwrap());
1129 warp_sync.remove_peer(&tenth_peer);
1130
1131 assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1133
1134 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1136 let peer_id: Option<PeerId> =
1137 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1138 assert!(peer_id.is_none());
1139
1140 let peer_id: Option<PeerId> =
1142 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1143 assert_eq!(ninth_peer, peer_id.unwrap());
1144 }
1145
1146 #[test]
1147 fn no_warp_proof_request_in_another_phase() {
1148 let client = mock_client_without_state();
1149 let mut provider = MockWarpSyncProvider::<Block>::new();
1150 let mut verifier = MockVerifier::<Block>::new();
1151 verifier.expect_next_proof_context().returning(|| Hash::random());
1152 verifier
1153 .expect_verify()
1154 .returning(|_| unreachable!("verify should not be called in this test"));
1155 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1156 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1157 let mut warp_sync = WarpSync::new(
1158 Arc::new(client),
1159 config,
1160 Some(ProtocolName::Static("")),
1161 Arc::new(MockBlockDownloader::new()),
1162 None,
1163 );
1164
1165 for best_number in 1..11 {
1167 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1168 }
1169
1170 warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1172 1,
1173 Default::default(),
1174 Default::default(),
1175 Default::default(),
1176 Default::default(),
1177 ));
1178
1179 assert!(warp_sync.warp_proof_request().is_none());
1181 }
1182
1183 #[test]
1184 fn warp_proof_request_starts_at_last_hash() {
1185 let client = mock_client_without_state();
1186 let mut provider = MockWarpSyncProvider::<Block>::new();
1187 let mut verifier = MockVerifier::<Block>::new();
1188 let known_last_hash = Hash::random();
1189 verifier.expect_next_proof_context().returning(move || known_last_hash);
1190 verifier
1191 .expect_verify()
1192 .returning(|_| unreachable!("verify should not be called in this test"));
1193 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1194 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1195 let mut warp_sync = WarpSync::new(
1196 Arc::new(client),
1197 config,
1198 Some(ProtocolName::Static("")),
1199 Arc::new(MockBlockDownloader::new()),
1200 None,
1201 );
1202
1203 for best_number in 1..11 {
1205 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1206 }
1207 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1208
1209 let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1210 assert_eq!(request.begin, known_last_hash);
1211 }
1212
1213 #[test]
1214 fn no_parallel_warp_proof_requests() {
1215 let client = mock_client_without_state();
1216 let mut provider = MockWarpSyncProvider::<Block>::new();
1217 let mut verifier = MockVerifier::<Block>::new();
1218 verifier.expect_next_proof_context().returning(|| Hash::random());
1219 verifier
1220 .expect_verify()
1221 .returning(|_| unreachable!("verify should not be called in this test"));
1222 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1223 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1224 let mut warp_sync = WarpSync::new(
1225 Arc::new(client),
1226 config,
1227 Some(ProtocolName::Static("")),
1228 Arc::new(MockBlockDownloader::new()),
1229 None,
1230 );
1231
1232 for best_number in 1..11 {
1234 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1235 }
1236 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1237
1238 assert!(warp_sync.warp_proof_request().is_some());
1240 assert!(warp_sync.warp_proof_request().is_none());
1242 }
1243
1244 #[test]
1245 fn bad_warp_proof_response_drops_peer() {
1246 let client = mock_client_without_state();
1247 let mut provider = MockWarpSyncProvider::<Block>::new();
1248 let mut verifier = MockVerifier::<Block>::new();
1249 verifier.expect_next_proof_context().returning(|| Hash::random());
1250 verifier.expect_verify().return_once(|_proof| {
1252 Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1253 });
1254 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1255 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1256 let mut warp_sync = WarpSync::new(
1257 Arc::new(client),
1258 config,
1259 Some(ProtocolName::Static("")),
1260 Arc::new(MockBlockDownloader::new()),
1261 None,
1262 );
1263
1264 for best_number in 1..11 {
1266 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1267 }
1268 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1269
1270 let network_provider = NetworkServiceProvider::new();
1271 let network_handle = network_provider.handle();
1272
1273 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1275 assert_eq!(actions.len(), 1);
1276 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1277 panic!("Invalid action");
1278 };
1279
1280 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1281
1282 let actions = std::mem::take(&mut warp_sync.actions);
1284 assert_eq!(actions.len(), 1);
1285 assert!(matches!(
1286 actions[0],
1287 SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1288 ));
1289 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1290 }
1291
1292 #[test]
1293 fn partial_warp_proof_doesnt_advance_phase() {
1294 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1295 let mut provider = MockWarpSyncProvider::<Block>::new();
1296 let target_block = BlockBuilderBuilder::new(&*client)
1297 .on_parent_block(client.chain_info().best_hash)
1298 .with_parent_block_number(client.chain_info().best_number)
1299 .build()
1300 .unwrap()
1301 .build()
1302 .unwrap()
1303 .block;
1304 let target_header = target_block.header().clone();
1305 let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1306 let mut verifier = MockVerifier::<Block>::new();
1308 let context = client.info().genesis_hash;
1309 verifier.expect_next_proof_context().returning(move || context);
1310 let header_for_verify = target_header.clone();
1311 let just_for_verify = justifications.clone();
1312 verifier.expect_verify().return_once(move |_proof| {
1313 Ok(VerificationResult::Partial(vec![(
1314 header_for_verify.clone(),
1315 just_for_verify.clone(),
1316 )]))
1317 });
1318 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1319 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1320 let mut warp_sync = WarpSync::new(
1321 client,
1322 config,
1323 Some(ProtocolName::Static("")),
1324 Arc::new(MockBlockDownloader::new()),
1325 None,
1326 );
1327
1328 for best_number in 1..11 {
1330 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1331 }
1332 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1333
1334 let network_provider = NetworkServiceProvider::new();
1335 let network_handle = network_provider.handle();
1336
1337 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1339 assert_eq!(actions.len(), 1);
1340 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1341 panic!("Invalid action");
1342 };
1343
1344 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1345
1346 assert_eq!(warp_sync.actions.len(), 1);
1347 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1348 else {
1349 panic!("Expected `ImportBlocks` action.");
1350 };
1351 assert_eq!(origin, BlockOrigin::WarpSync);
1352 assert_eq!(blocks.len(), 1);
1353 let import_block = blocks.pop().unwrap();
1354 assert_eq!(
1355 import_block,
1356 IncomingBlock {
1357 hash: target_header.hash(),
1358 header: Some(target_header),
1359 body: None,
1360 indexed_body: None,
1361 justifications: Some(justifications),
1362 origin: Some(request_peer_id),
1363 allow_missing_state: true,
1364 skip_execution: true,
1365 import_existing: false,
1366 state: None,
1367 }
1368 );
1369 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1370 }
1371
1372 #[test]
1373 fn complete_warp_proof_advances_phase() {
1374 sp_tracing::try_init_simple();
1376
1377 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1378 let mut provider = MockWarpSyncProvider::<Block>::new();
1379
1380 let warp_synced_header = <<Block as BlockT>::Header as HeaderT>::new(
1384 1,
1385 Default::default(),
1386 Default::default(),
1387 client.chain_info().best_hash,
1388 Default::default(),
1389 );
1390
1391 let target_header = <<Block as BlockT>::Header as HeaderT>::new(
1393 2,
1394 Default::default(),
1395 Default::default(),
1396 warp_synced_header.hash(),
1397 Default::default(),
1398 );
1399 let warp_justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1400 let target_justifications =
1401 Justifications::new(vec![(TEST_ENGINE_ID, vec![6, 7, 8, 9, 10])]);
1402
1403 let mut verifier = MockVerifier::<Block>::new();
1405 let context = client.info().genesis_hash;
1406 verifier.expect_next_proof_context().returning(move || context);
1407 let warp_synced_header_for_verify = warp_synced_header.clone();
1408 let warp_just_for_verify = warp_justifications.clone();
1409 let target_header_for_verify = target_header.clone();
1410 let target_just_for_verify = target_justifications.clone();
1411 verifier.expect_verify().return_once(move |_proof| {
1412 Ok(VerificationResult::Complete(
1413 target_header_for_verify.clone(),
1414 vec![
1415 (warp_synced_header_for_verify, warp_just_for_verify),
1416 (target_header_for_verify, target_just_for_verify),
1417 ],
1418 ))
1419 });
1420 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1421 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1422 let mut warp_sync = WarpSync::new(
1423 client,
1424 config,
1425 Some(ProtocolName::Static("")),
1426 Arc::new(MockBlockDownloader::new()),
1427 None,
1428 );
1429
1430 for best_number in 1..11 {
1432 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1433 }
1434 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1435
1436 let network_provider = NetworkServiceProvider::new();
1437 let network_handle = network_provider.handle();
1438
1439 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1441 assert_eq!(actions.len(), 1);
1442 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1443 panic!("Invalid action.");
1444 };
1445
1446 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1447
1448 assert_eq!(warp_sync.actions.len(), 1);
1449 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1450 else {
1451 panic!("Expected `ImportBlocks` action.");
1452 };
1453 assert_eq!(origin, BlockOrigin::WarpSync);
1454 assert_eq!(blocks.len(), 1);
1456 let import_block = blocks.pop().unwrap();
1457 assert_eq!(
1458 import_block,
1459 IncomingBlock {
1460 hash: warp_synced_header.hash(),
1461 header: Some(warp_synced_header),
1462 body: None,
1463 indexed_body: None,
1464 justifications: Some(warp_justifications),
1465 origin: Some(request_peer_id),
1466 allow_missing_state: true,
1467 skip_execution: true,
1468 import_existing: false,
1469 state: None,
1470 }
1471 );
1472 assert!(matches!(warp_sync.phase, Phase::TargetBlock(header) if header == target_header));
1473 }
1474
1475 #[test]
1476 fn no_target_block_requests_in_another_phase() {
1477 let client = mock_client_without_state();
1478 let mut provider = MockWarpSyncProvider::<Block>::new();
1479 let mut verifier = MockVerifier::<Block>::new();
1480 verifier.expect_next_proof_context().returning(|| Hash::random());
1481 verifier
1482 .expect_verify()
1483 .returning(|_| unreachable!("verify should not be called in this test"));
1484 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1485 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1486 let mut warp_sync = WarpSync::new(
1487 Arc::new(client),
1488 config,
1489 None,
1490 Arc::new(MockBlockDownloader::new()),
1491 None,
1492 );
1493
1494 for best_number in 1..11 {
1496 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1497 }
1498 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1500
1501 assert!(warp_sync.target_block_request().is_none());
1503 }
1504
1505 #[test]
1506 fn target_block_request_is_correct() {
1507 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1508 let mut provider = MockWarpSyncProvider::<Block>::new();
1509 let mut verifier = MockVerifier::<Block>::new();
1510 let header_for_ctx = client.info().genesis_hash;
1511 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1512 let target_block = BlockBuilderBuilder::new(&*client)
1513 .on_parent_block(client.chain_info().best_hash)
1514 .with_parent_block_number(client.chain_info().best_number)
1515 .build()
1516 .unwrap()
1517 .build()
1518 .unwrap()
1519 .block;
1520 let target_header = target_block.header().clone();
1521 let header_for_verify = target_header.clone();
1523 verifier.expect_verify().return_once(move |_proof| {
1524 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1525 });
1526 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1527 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1528 let mut warp_sync =
1529 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1530
1531 for best_number in 1..11 {
1533 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1534 }
1535
1536 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1538
1539 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1540 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1541 assert_eq!(
1542 request.fields,
1543 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1544 );
1545 assert_eq!(request.max, Some(1));
1546 }
1547
1548 #[test]
1549 fn externally_set_target_block_is_requested() {
1550 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1551 let target_block = BlockBuilderBuilder::new(&*client)
1552 .on_parent_block(client.chain_info().best_hash)
1553 .with_parent_block_number(client.chain_info().best_number)
1554 .build()
1555 .unwrap()
1556 .build()
1557 .unwrap()
1558 .block;
1559 let target_header = target_block.header().clone();
1560 let config = WarpSyncConfig::WithTarget(target_header);
1561 let mut warp_sync =
1562 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1563
1564 for best_number in 1..11 {
1566 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1567 }
1568
1569 assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1570
1571 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1572 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1573 assert_eq!(
1574 request.fields,
1575 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1576 );
1577 assert_eq!(request.max, Some(1));
1578 }
1579
1580 #[test]
1581 fn no_parallel_target_block_requests() {
1582 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1583 let mut provider = MockWarpSyncProvider::<Block>::new();
1584 let mut verifier = MockVerifier::<Block>::new();
1585 let header_for_ctx = client.info().genesis_hash;
1586 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1587 let target_block = BlockBuilderBuilder::new(&*client)
1588 .on_parent_block(client.chain_info().best_hash)
1589 .with_parent_block_number(client.chain_info().best_number)
1590 .build()
1591 .unwrap()
1592 .build()
1593 .unwrap()
1594 .block;
1595 let target_header = target_block.header().clone();
1596 let header_for_verify = target_header.clone();
1598 verifier.expect_verify().return_once(move |_proof| {
1599 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1600 });
1601 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1602 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1603 let mut warp_sync =
1604 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1605
1606 for best_number in 1..11 {
1608 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1609 }
1610
1611 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1613
1614 assert!(warp_sync.target_block_request().is_some());
1616 assert!(warp_sync.target_block_request().is_none());
1618 }
1619
1620 #[test]
1621 fn target_block_response_with_no_blocks_drops_peer() {
1622 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1623 let mut provider = MockWarpSyncProvider::<Block>::new();
1624 let mut verifier = MockVerifier::<Block>::new();
1625 let header_for_ctx = client.info().genesis_hash;
1626 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1627 let target_block = BlockBuilderBuilder::new(&*client)
1628 .on_parent_block(client.chain_info().best_hash)
1629 .with_parent_block_number(client.chain_info().best_number)
1630 .build()
1631 .unwrap()
1632 .build()
1633 .unwrap()
1634 .block;
1635 let target_header = target_block.header().clone();
1636 let header_for_verify = target_header.clone();
1638 verifier.expect_verify().return_once(move |_proof| {
1639 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1640 });
1641 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1642 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1643 let mut warp_sync =
1644 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1645
1646 for best_number in 1..11 {
1648 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1649 }
1650
1651 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1653
1654 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1655
1656 let response = Vec::new();
1658 assert!(matches!(
1660 warp_sync.on_block_response_inner(peer_id, request, response),
1661 Err(BadPeer(id, _rep)) if id == peer_id,
1662 ));
1663 }
1664
1665 #[test]
1666 fn target_block_response_with_extra_blocks_drops_peer() {
1667 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1668 let mut provider = MockWarpSyncProvider::<Block>::new();
1669 let mut verifier = MockVerifier::<Block>::new();
1670 let header_for_ctx = client.info().genesis_hash;
1671 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1672 let target_block = BlockBuilderBuilder::new(&*client)
1673 .on_parent_block(client.chain_info().best_hash)
1674 .with_parent_block_number(client.chain_info().best_number)
1675 .build()
1676 .unwrap()
1677 .build()
1678 .unwrap()
1679 .block;
1680
1681 let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1682 .on_parent_block(client.chain_info().best_hash)
1683 .with_parent_block_number(client.chain_info().best_number)
1684 .build()
1685 .unwrap();
1686 extra_block_builder
1687 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1688 .unwrap();
1689 let extra_block = extra_block_builder.build().unwrap().block;
1690
1691 let target_header = target_block.header().clone();
1692 let header_for_verify = target_header.clone();
1694 verifier.expect_verify().return_once(move |_proof| {
1695 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1696 });
1697 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1698 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1699 let mut warp_sync =
1700 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1701
1702 for best_number in 1..11 {
1704 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1705 }
1706
1707 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1709
1710 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1711
1712 let response = vec![
1714 BlockData::<Block> {
1715 hash: target_block.header().hash(),
1716 header: Some(target_block.header().clone()),
1717 body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1718 indexed_body: None,
1719 receipt: None,
1720 message_queue: None,
1721 justification: None,
1722 justifications: None,
1723 },
1724 BlockData::<Block> {
1725 hash: extra_block.header().hash(),
1726 header: Some(extra_block.header().clone()),
1727 body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1728 indexed_body: None,
1729 receipt: None,
1730 message_queue: None,
1731 justification: None,
1732 justifications: None,
1733 },
1734 ];
1735 assert!(matches!(
1737 warp_sync.on_block_response_inner(peer_id, request, response),
1738 Err(BadPeer(id, _rep)) if id == peer_id,
1739 ));
1740 }
1741
1742 #[test]
1743 fn target_block_response_with_wrong_block_drops_peer() {
1744 sp_tracing::try_init_simple();
1745
1746 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1747 let mut provider = MockWarpSyncProvider::<Block>::new();
1748 let mut verifier = MockVerifier::<Block>::new();
1749 let header_for_ctx = client.info().genesis_hash;
1750 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1751 let target_block = BlockBuilderBuilder::new(&*client)
1752 .on_parent_block(client.chain_info().best_hash)
1753 .with_parent_block_number(client.chain_info().best_number)
1754 .build()
1755 .unwrap()
1756 .build()
1757 .unwrap()
1758 .block;
1759
1760 let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1761 .on_parent_block(client.chain_info().best_hash)
1762 .with_parent_block_number(client.chain_info().best_number)
1763 .build()
1764 .unwrap();
1765 wrong_block_builder
1766 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1767 .unwrap();
1768 let wrong_block = wrong_block_builder.build().unwrap().block;
1769
1770 let target_header = target_block.header().clone();
1771 let header_for_verify = target_header.clone();
1773 verifier.expect_verify().return_once(move |_proof| {
1774 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1775 });
1776 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1777 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1778 let mut warp_sync =
1779 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1780
1781 for best_number in 1..11 {
1783 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1784 }
1785
1786 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1788
1789 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1790
1791 let response = vec![BlockData::<Block> {
1793 hash: wrong_block.header().hash(),
1794 header: Some(wrong_block.header().clone()),
1795 body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1796 indexed_body: None,
1797 receipt: None,
1798 message_queue: None,
1799 justification: None,
1800 justifications: None,
1801 }];
1802 assert!(matches!(
1804 warp_sync.on_block_response_inner(peer_id, request, response),
1805 Err(BadPeer(id, _rep)) if id == peer_id,
1806 ));
1807 }
1808
1809 #[test]
1810 fn correct_target_block_response_sets_strategy_result() {
1811 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1812 let mut provider = MockWarpSyncProvider::<Block>::new();
1813 let mut verifier = MockVerifier::<Block>::new();
1814 let header_for_ctx = client.info().genesis_hash;
1815 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1816 let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1817 .on_parent_block(client.chain_info().best_hash)
1818 .with_parent_block_number(client.chain_info().best_number)
1819 .build()
1820 .unwrap();
1821 target_block_builder
1822 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1823 .unwrap();
1824 let target_block = target_block_builder.build().unwrap().block;
1825 let target_header = target_block.header().clone();
1826 let header_for_verify = target_header.clone();
1828 verifier.expect_verify().return_once(move |_proof| {
1829 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1830 });
1831 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1832 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1833 let mut warp_sync =
1834 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1835
1836 for best_number in 1..11 {
1838 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1839 }
1840
1841 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1843
1844 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1845
1846 let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1848 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1849 let response = vec![BlockData::<Block> {
1850 hash: target_block.header().hash(),
1851 header: Some(target_block.header().clone()),
1852 body: body.clone(),
1853 indexed_body: None,
1854 receipt: None,
1855 message_queue: None,
1856 justification: None,
1857 justifications: justifications.clone(),
1858 }];
1859
1860 assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1861
1862 let network_provider = NetworkServiceProvider::new();
1863 let network_handle = network_provider.handle();
1864
1865 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1867 assert_eq!(actions.len(), 1);
1868 assert!(matches!(actions[0], SyncingAction::Finished));
1869
1870 let result = warp_sync.take_result().unwrap();
1872 assert_eq!(result.target_header, *target_block.header());
1873 assert_eq!(result.target_body, body);
1874 assert_eq!(result.target_justifications, justifications);
1875 }
1876}