1use pezsc_consensus::IncomingBlock;
22use pezsp_consensus::BlockOrigin;
23pub use pezsp_consensus_grandpa::{AuthorityList, SetId};
24
25use crate::{
26 block_relay_protocol::{BlockDownloader, BlockResponseError},
27 service::network::NetworkServiceHandle,
28 strategy::{
29 chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
30 SyncingAction,
31 },
32 types::{BadPeer, SyncState, SyncStatus},
33 LOG_TARGET,
34};
35use codec::{Decode, Encode};
36use futures::{channel::oneshot, FutureExt};
37use log::{debug, error, trace, warn};
38use pezsc_network::{IfDisconnected, ProtocolName};
39use pezsc_network_common::sync::message::{
40 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
41};
42use pezsc_network_types::PeerId;
43use pezsp_blockchain::HeaderBackend;
44use pezsp_runtime::{
45 traits::{Block as BlockT, Header, NumberFor, Zero},
46 Justifications, SaturatedConversion,
47};
48use std::{any::Any, collections::HashMap, fmt, sync::Arc};
49
50const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
52
53pub struct EncodedProof(pub Vec<u8>);
55
56#[derive(Encode, Decode, Debug, Clone)]
58pub struct WarpProofRequest<B: BlockT> {
59 pub begin: B::Hash,
61}
62
63pub enum VerificationResult<Block: BlockT> {
65 Partial(SetId, AuthorityList, Block::Hash, Vec<(Block::Header, Justifications)>),
67 Complete(SetId, AuthorityList, Block::Header, Vec<(Block::Header, Justifications)>),
69}
70
71pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
73 fn generate(
76 &self,
77 start: Block::Hash,
78 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
79 fn verify(
81 &self,
82 proof: &EncodedProof,
83 set_id: SetId,
84 authorities: AuthorityList,
85 ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
86 fn current_authorities(&self) -> AuthorityList;
89}
90
91mod rep {
92 use pezsc_network::ReputationChange as Rep;
93
94 pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
96
97 pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
99
100 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
102
103 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
105
106 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
108
109 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
111}
112
113#[derive(Clone, Eq, PartialEq, Debug)]
115pub enum WarpSyncPhase<Block: BlockT> {
116 AwaitingPeers { required_peers: usize },
118 DownloadingWarpProofs,
120 DownloadingTargetBlock,
122 DownloadingState,
124 ImportingState,
126 DownloadingBlocks(NumberFor<Block>),
128 Complete,
130}
131
132impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
133 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
134 match self {
135 Self::AwaitingPeers { required_peers } => {
136 write!(f, "Waiting for {required_peers} peers to be connected")
137 },
138 Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
139 Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
140 Self::DownloadingState => write!(f, "Downloading state"),
141 Self::ImportingState => write!(f, "Importing state"),
142 Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
143 Self::Complete => write!(f, "Warp sync is complete"),
144 }
145 }
146}
147
148#[derive(Clone, Eq, PartialEq, Debug)]
150pub struct WarpSyncProgress<Block: BlockT> {
151 pub phase: WarpSyncPhase<Block>,
153 pub total_bytes: u64,
155}
156
157pub enum WarpSyncConfig<Block: BlockT> {
159 WithProvider(Arc<dyn WarpSyncProvider<Block>>),
161 WithTarget(<Block as BlockT>::Header),
165}
166
167enum Phase<B: BlockT> {
169 WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
171 WarpProof {
173 set_id: SetId,
174 authorities: AuthorityList,
175 last_hash: B::Hash,
176 warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
177 },
178 TargetBlock(B::Header),
180 Complete,
182}
183
184enum PeerState {
185 Available,
186 DownloadingProofs,
187 DownloadingTargetBlock,
188}
189
190impl PeerState {
191 fn is_available(&self) -> bool {
192 matches!(self, PeerState::Available)
193 }
194}
195
196struct Peer<B: BlockT> {
197 best_number: NumberFor<B>,
198 state: PeerState,
199}
200
201pub struct WarpSyncResult<B: BlockT> {
202 pub target_header: B::Header,
203 pub target_body: Option<Vec<B::Extrinsic>>,
204 pub target_justifications: Option<Justifications>,
205}
206
207pub struct WarpSync<B: BlockT, Client> {
209 phase: Phase<B>,
210 client: Arc<Client>,
211 total_proof_bytes: u64,
212 total_state_bytes: u64,
213 peers: HashMap<PeerId, Peer<B>>,
214 disconnected_peers: DisconnectedPeers,
215 protocol_name: Option<ProtocolName>,
216 block_downloader: Arc<dyn BlockDownloader<B>>,
217 actions: Vec<SyncingAction<B>>,
218 result: Option<WarpSyncResult<B>>,
219 min_peers_to_start_warp_sync: usize,
221}
222
223impl<B, Client> WarpSync<B, Client>
224where
225 B: BlockT,
226 Client: HeaderBackend<B> + 'static,
227{
228 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
230
231 pub fn new(
235 client: Arc<Client>,
236 warp_sync_config: WarpSyncConfig<B>,
237 protocol_name: Option<ProtocolName>,
238 block_downloader: Arc<dyn BlockDownloader<B>>,
239 min_peers_to_start_warp_sync: Option<usize>,
240 ) -> Self {
241 let min_peers_to_start_warp_sync =
242 min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
243 if client.info().finalized_state.is_some() {
244 error!(
245 target: LOG_TARGET,
246 "Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
247 );
248 return Self {
249 client,
250 phase: Phase::Complete,
251 total_proof_bytes: 0,
252 total_state_bytes: 0,
253 peers: HashMap::new(),
254 disconnected_peers: DisconnectedPeers::new(),
255 protocol_name,
256 block_downloader,
257 actions: vec![SyncingAction::Finished],
258 result: None,
259 min_peers_to_start_warp_sync,
260 };
261 }
262
263 let phase = match warp_sync_config {
264 WarpSyncConfig::WithProvider(warp_sync_provider) => {
265 Phase::WaitingForPeers { warp_sync_provider }
266 },
267 WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
268 };
269
270 Self {
271 client,
272 phase,
273 total_proof_bytes: 0,
274 total_state_bytes: 0,
275 peers: HashMap::new(),
276 disconnected_peers: DisconnectedPeers::new(),
277 protocol_name,
278 block_downloader,
279 actions: Vec::new(),
280 result: None,
281 min_peers_to_start_warp_sync,
282 }
283 }
284
285 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
287 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
288
289 self.try_to_start_warp_sync();
290 }
291
292 pub fn remove_peer(&mut self, peer_id: &PeerId) {
294 if let Some(state) = self.peers.remove(peer_id) {
295 if !state.state.is_available() {
296 if let Some(bad_peer) =
297 self.disconnected_peers.on_disconnect_during_request(*peer_id)
298 {
299 self.actions.push(SyncingAction::DropPeer(bad_peer));
300 }
301 }
302 }
303 }
304
305 #[must_use]
309 pub fn on_validated_block_announce(
310 &mut self,
311 is_best: bool,
312 peer_id: PeerId,
313 announce: &BlockAnnounce<B::Header>,
314 ) -> Option<(B::Hash, NumberFor<B>)> {
315 is_best.then(|| {
316 let best_number = *announce.header.number();
317 let best_hash = announce.header.hash();
318 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
319 peer.best_number = best_number;
320 }
321 (best_hash, best_number)
323 })
324 }
325
326 fn try_to_start_warp_sync(&mut self) {
328 let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
329
330 if self.peers.len() < self.min_peers_to_start_warp_sync {
331 return;
332 }
333
334 self.phase = Phase::WarpProof {
335 set_id: 0,
336 authorities: warp_sync_provider.current_authorities(),
337 last_hash: self.client.info().genesis_hash,
338 warp_sync_provider: Arc::clone(warp_sync_provider),
339 };
340 trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
341 }
342
343 pub fn on_generic_response(
344 &mut self,
345 peer_id: &PeerId,
346 protocol_name: ProtocolName,
347 response: Box<dyn Any + Send>,
348 ) {
349 if &protocol_name == self.block_downloader.protocol_name() {
350 let Ok(response) = response
351 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
352 else {
353 warn!(target: LOG_TARGET, "Failed to downcast block response");
354 debug_assert!(false);
355 return;
356 };
357
358 let (request, response) = *response;
359 let blocks = match response {
360 Ok(blocks) => blocks,
361 Err(BlockResponseError::DecodeFailed(e)) => {
362 debug!(
363 target: LOG_TARGET,
364 "Failed to decode block response from peer {:?}: {:?}.",
365 peer_id,
366 e
367 );
368 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
369 return;
370 },
371 Err(BlockResponseError::ExtractionFailed(e)) => {
372 debug!(
373 target: LOG_TARGET,
374 "Failed to extract blocks from peer response {:?}: {:?}.",
375 peer_id,
376 e
377 );
378 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
379 return;
380 },
381 };
382
383 self.on_block_response(*peer_id, request, blocks);
384 } else {
385 let Ok(response) = response.downcast::<Vec<u8>>() else {
386 warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
387 debug_assert!(false);
388 return;
389 };
390
391 self.on_warp_proof_response(peer_id, EncodedProof(*response));
392 }
393 }
394
395 pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
397 if let Some(peer) = self.peers.get_mut(peer_id) {
398 peer.state = PeerState::Available;
399 }
400
401 let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
402 &mut self.phase
403 else {
404 debug!(target: LOG_TARGET, "Unexpected warp proof response");
405 self.actions
406 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
407 return;
408 };
409
410 let proof_to_incoming_block =
411 |(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
412 IncomingBlock {
413 hash: header.hash(),
414 header: Some(header),
415 body: None,
416 indexed_body: None,
417 justifications: Some(justifications),
418 origin: Some(*peer_id),
419 allow_missing_state: true,
422 skip_execution: true,
423 import_existing: false,
425 state: None,
426 }
427 };
428
429 match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
430 Err(e) => {
431 debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
432 self.actions
433 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
434 },
435 Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash, proofs)) => {
436 log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
437 *set_id = new_set_id;
438 *authorities = new_authorities;
439 *last_hash = new_last_hash;
440 self.total_proof_bytes += response.0.len() as u64;
441 self.actions.push(SyncingAction::ImportBlocks {
442 origin: BlockOrigin::NetworkInitialSync,
443 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
444 });
445 },
446 Ok(VerificationResult::Complete(new_set_id, _, header, proofs)) => {
447 log::debug!(
448 target: LOG_TARGET,
449 "Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
450 new_set_id,
451 header.hash(),
452 header.number(),
453 );
454 self.total_proof_bytes += response.0.len() as u64;
455 self.phase = Phase::TargetBlock(header);
456 self.actions.push(SyncingAction::ImportBlocks {
457 origin: BlockOrigin::NetworkInitialSync,
458 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
459 });
460 },
461 }
462 }
463
464 pub fn on_block_response(
466 &mut self,
467 peer_id: PeerId,
468 request: BlockRequest<B>,
469 blocks: Vec<BlockData<B>>,
470 ) {
471 if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
472 self.actions.push(SyncingAction::DropPeer(bad_peer));
473 }
474 }
475
476 fn on_block_response_inner(
477 &mut self,
478 peer_id: PeerId,
479 request: BlockRequest<B>,
480 mut blocks: Vec<BlockData<B>>,
481 ) -> Result<(), BadPeer> {
482 if let Some(peer) = self.peers.get_mut(&peer_id) {
483 peer.state = PeerState::Available;
484 }
485
486 let Phase::TargetBlock(header) = &mut self.phase else {
487 debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
488 return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE));
489 };
490
491 if blocks.is_empty() {
492 debug!(
493 target: LOG_TARGET,
494 "Downloading target block failed: empty block response from {peer_id}",
495 );
496 return Err(BadPeer(peer_id, rep::NO_BLOCK));
497 }
498
499 if blocks.len() > 1 {
500 debug!(
501 target: LOG_TARGET,
502 "Too many blocks ({}) in warp target block response from {peer_id}",
503 blocks.len(),
504 );
505 return Err(BadPeer(peer_id, rep::NOT_REQUESTED));
506 }
507
508 validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
509
510 let block = blocks.pop().expect("`blocks` len checked above; qed");
511
512 let Some(block_header) = &block.header else {
513 debug!(
514 target: LOG_TARGET,
515 "Downloading target block failed: missing header in response from {peer_id}.",
516 );
517 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
518 };
519
520 if block_header != header {
521 debug!(
522 target: LOG_TARGET,
523 "Downloading target block failed: different header in response from {peer_id}.",
524 );
525 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
526 }
527
528 if block.body.is_none() {
529 debug!(
530 target: LOG_TARGET,
531 "Downloading target block failed: missing body in response from {peer_id}.",
532 );
533 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
534 }
535
536 self.result = Some(WarpSyncResult {
537 target_header: header.clone(),
538 target_body: block.body,
539 target_justifications: block.justifications,
540 });
541 self.phase = Phase::Complete;
542 self.actions.push(SyncingAction::Finished);
543 Ok(())
544 }
545
546 fn schedule_next_peer(
548 &mut self,
549 new_state: PeerState,
550 min_best_number: Option<NumberFor<B>>,
551 ) -> Option<PeerId> {
552 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
553 if targets.is_empty() {
554 return None;
555 }
556 targets.sort();
557 let median = targets[targets.len() / 2];
558 let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
559 for (peer_id, peer) in self.peers.iter_mut() {
562 if peer.state.is_available()
563 && peer.best_number >= threshold
564 && self.disconnected_peers.is_peer_available(peer_id)
565 {
566 peer.state = new_state;
567 return Some(*peer_id);
568 }
569 }
570 None
571 }
572
573 fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
575 let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
576
577 let begin = *last_hash;
579
580 if self
581 .peers
582 .values()
583 .any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
584 {
585 return None;
587 }
588
589 let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
590 trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
591
592 let request = WarpProofRequest { begin };
593
594 let Some(protocol_name) = self.protocol_name.clone() else {
595 warn!(
596 target: LOG_TARGET,
597 "Trying to send warp sync request when no protocol is configured {request:?}",
598 );
599 return None;
600 };
601
602 Some((peer_id, protocol_name, request))
603 }
604
605 fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
607 let Phase::TargetBlock(target_header) = &self.phase else { return None };
608
609 if self
610 .peers
611 .values()
612 .any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
613 {
614 return None;
616 }
617
618 let target_hash = target_header.hash();
620 let target_number = *target_header.number();
621
622 let peer_id =
623 self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
624
625 trace!(
626 target: LOG_TARGET,
627 "New target block request to {peer_id}, target: {} ({}).",
628 target_hash,
629 target_number,
630 );
631
632 Some((
633 peer_id,
634 BlockRequest::<B> {
635 id: 0,
636 fields: BlockAttributes::HEADER
637 | BlockAttributes::BODY
638 | BlockAttributes::JUSTIFICATION,
639 from: FromBlock::Hash(target_hash),
640 direction: Direction::Ascending,
641 max: Some(1),
642 },
643 ))
644 }
645
646 pub fn progress(&self) -> WarpSyncProgress<B> {
648 match &self.phase {
649 Phase::WaitingForPeers { .. } => WarpSyncProgress {
650 phase: WarpSyncPhase::AwaitingPeers {
651 required_peers: self.min_peers_to_start_warp_sync,
652 },
653 total_bytes: self.total_proof_bytes,
654 },
655 Phase::WarpProof { .. } => WarpSyncProgress {
656 phase: WarpSyncPhase::DownloadingWarpProofs,
657 total_bytes: self.total_proof_bytes,
658 },
659 Phase::TargetBlock(_) => WarpSyncProgress {
660 phase: WarpSyncPhase::DownloadingTargetBlock,
661 total_bytes: self.total_proof_bytes,
662 },
663 Phase::Complete => WarpSyncProgress {
664 phase: WarpSyncPhase::Complete,
665 total_bytes: self.total_proof_bytes + self.total_state_bytes,
666 },
667 }
668 }
669
670 pub fn num_peers(&self) -> usize {
672 self.peers.len()
673 }
674
675 pub fn status(&self) -> SyncStatus<B> {
677 SyncStatus {
678 state: match &self.phase {
679 Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
680 Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
681 Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
682 Phase::Complete => SyncState::Idle,
683 },
684 best_seen_block: match &self.phase {
685 Phase::WaitingForPeers { .. } => None,
686 Phase::WarpProof { .. } => None,
687 Phase::TargetBlock(header) => Some(*header.number()),
688 Phase::Complete => None,
689 },
690 num_peers: self.peers.len().saturated_into(),
691 queued_blocks: 0,
692 state_sync: None,
693 warp_sync: Some(self.progress()),
694 }
695 }
696
697 #[must_use]
699 pub fn actions(
700 &mut self,
701 network_service: &NetworkServiceHandle,
702 ) -> impl Iterator<Item = SyncingAction<B>> {
703 let warp_proof_request =
704 self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
705 trace!(
706 target: LOG_TARGET,
707 "Created `WarpProofRequest` to {}, request: {:?}.",
708 peer_id,
709 request,
710 );
711
712 let (tx, rx) = oneshot::channel();
713
714 network_service.start_request(
715 peer_id,
716 protocol_name,
717 request.encode(),
718 tx,
719 IfDisconnected::ImmediateError,
720 );
721
722 SyncingAction::StartRequest {
723 peer_id,
724 key: Self::STRATEGY_KEY,
725 request: async move {
726 Ok(rx.await?.and_then(|(response, protocol_name)| {
727 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
728 }))
729 }
730 .boxed(),
731 remove_obsolete: false,
732 }
733 });
734 self.actions.extend(warp_proof_request);
735
736 let target_block_request =
737 self.target_block_request().into_iter().map(|(peer_id, request)| {
738 let downloader = self.block_downloader.clone();
739
740 SyncingAction::StartRequest {
741 peer_id,
742 key: Self::STRATEGY_KEY,
743 request: async move {
744 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
745 |(response, protocol_name)| {
746 let decoded_response =
747 downloader.block_response_into_blocks(&request, response);
748 let result =
749 Box::new((request, decoded_response)) as Box<dyn Any + Send>;
750 Ok((result, protocol_name))
751 },
752 ))
753 }
754 .boxed(),
755 remove_obsolete: true,
758 }
759 });
760 self.actions.extend(target_block_request);
761
762 std::mem::take(&mut self.actions).into_iter()
763 }
764
765 #[must_use]
767 pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
768 self.result.take()
769 }
770}
771
772#[cfg(test)]
773mod test {
774 use super::*;
775 use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
776 use bizinikiwi_test_runtime_client::{
777 runtime::{Block, Hash},
778 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
779 };
780 use pezsc_block_builder::BlockBuilderBuilder;
781 use pezsp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
782 use pezsp_consensus_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID};
783 use pezsp_core::H256;
784 use pezsp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
785 use std::{io::ErrorKind, sync::Arc};
786
787 mockall::mock! {
788 pub Client<B: BlockT> {}
789
790 impl<B: BlockT> HeaderBackend<B> for Client<B> {
791 fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
792 fn info(&self) -> Info<B>;
793 fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
794 fn number(
795 &self,
796 hash: B::Hash,
797 ) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
798 fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
799 }
800 }
801
802 mockall::mock! {
803 pub WarpSyncProvider<B: BlockT> {}
804
805 impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
806 fn generate(
807 &self,
808 start: B::Hash,
809 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
810 fn verify(
811 &self,
812 proof: &EncodedProof,
813 set_id: SetId,
814 authorities: AuthorityList,
815 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
816 fn current_authorities(&self) -> AuthorityList;
817 }
818 }
819
820 fn mock_client_with_state() -> MockClient<Block> {
821 let mut client = MockClient::<Block>::new();
822 let genesis_hash = Hash::random();
823 client.expect_info().return_once(move || Info {
824 best_hash: genesis_hash,
825 best_number: 0,
826 genesis_hash,
827 finalized_hash: genesis_hash,
828 finalized_number: 0,
829 finalized_state: Some((genesis_hash, 0)),
831 number_leaves: 0,
832 block_gap: None,
833 });
834
835 client
836 }
837
838 fn mock_client_without_state() -> MockClient<Block> {
839 let mut client = MockClient::<Block>::new();
840 let genesis_hash = Hash::random();
841 client.expect_info().returning(move || Info {
842 best_hash: genesis_hash,
843 best_number: 0,
844 genesis_hash,
845 finalized_hash: genesis_hash,
846 finalized_number: 0,
847 finalized_state: None,
848 number_leaves: 0,
849 block_gap: None,
850 });
851
852 client
853 }
854
855 #[test]
856 fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
857 let client = mock_client_with_state();
858 let provider = MockWarpSyncProvider::<Block>::new();
859 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
860 let mut warp_sync = WarpSync::new(
861 Arc::new(client),
862 config,
863 None,
864 Arc::new(MockBlockDownloader::new()),
865 None,
866 );
867
868 let network_provider = NetworkServiceProvider::new();
869 let network_handle = network_provider.handle();
870
871 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
873 assert_eq!(actions.len(), 1);
874 assert!(matches!(actions[0], SyncingAction::Finished));
875
876 assert!(warp_sync.take_result().is_none());
878 }
879
880 #[test]
881 fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
882 let client = mock_client_with_state();
883 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
884 1,
885 Default::default(),
886 Default::default(),
887 Default::default(),
888 Default::default(),
889 ));
890 let mut warp_sync = WarpSync::new(
891 Arc::new(client),
892 config,
893 None,
894 Arc::new(MockBlockDownloader::new()),
895 None,
896 );
897
898 let network_provider = NetworkServiceProvider::new();
899 let network_handle = network_provider.handle();
900
901 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
903 assert_eq!(actions.len(), 1);
904 assert!(matches!(actions[0], SyncingAction::Finished));
905
906 assert!(warp_sync.take_result().is_none());
908 }
909
910 #[test]
911 fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
912 let client = mock_client_without_state();
913 let provider = MockWarpSyncProvider::<Block>::new();
914 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
915 let mut warp_sync = WarpSync::new(
916 Arc::new(client),
917 config,
918 None,
919 Arc::new(MockBlockDownloader::new()),
920 None,
921 );
922
923 let network_provider = NetworkServiceProvider::new();
924 let network_handle = network_provider.handle();
925
926 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
928 }
929
930 #[test]
931 fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
932 let client = mock_client_without_state();
933 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
934 1,
935 Default::default(),
936 Default::default(),
937 Default::default(),
938 Default::default(),
939 ));
940 let mut warp_sync = WarpSync::new(
941 Arc::new(client),
942 config,
943 None,
944 Arc::new(MockBlockDownloader::new()),
945 None,
946 );
947
948 let network_provider = NetworkServiceProvider::new();
949 let network_handle = network_provider.handle();
950
951 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
953 }
954
955 #[test]
956 fn warp_sync_is_started_only_when_there_is_enough_peers() {
957 let client = mock_client_without_state();
958 let mut provider = MockWarpSyncProvider::<Block>::new();
959 provider
960 .expect_current_authorities()
961 .once()
962 .return_const(AuthorityList::default());
963 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
964 let mut warp_sync = WarpSync::new(
965 Arc::new(client),
966 config,
967 None,
968 Arc::new(MockBlockDownloader::new()),
969 None,
970 );
971
972 for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
974 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
975 assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
976 }
977
978 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
980 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
981 }
982
983 #[test]
984 fn no_peer_is_scheduled_if_no_peers_connected() {
985 let client = mock_client_without_state();
986 let provider = MockWarpSyncProvider::<Block>::new();
987 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
988 let mut warp_sync = WarpSync::new(
989 Arc::new(client),
990 config,
991 None,
992 Arc::new(MockBlockDownloader::new()),
993 None,
994 );
995
996 assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
997 }
998
999 #[test]
1000 fn enough_peers_are_used_in_tests() {
1001 assert!(
1003 10 >= MIN_PEERS_TO_START_WARP_SYNC,
1004 "Tests must be updated to use that many initial peers.",
1005 );
1006 }
1007
1008 #[test]
1009 fn at_least_median_synced_peer_is_scheduled() {
1010 for _ in 0..100 {
1011 let client = mock_client_without_state();
1012 let mut provider = MockWarpSyncProvider::<Block>::new();
1013 provider
1014 .expect_current_authorities()
1015 .once()
1016 .return_const(AuthorityList::default());
1017 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1018 let mut warp_sync = WarpSync::new(
1019 Arc::new(client),
1020 config,
1021 None,
1022 Arc::new(MockBlockDownloader::new()),
1023 None,
1024 );
1025
1026 for best_number in 1..11 {
1027 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1028 }
1029
1030 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1031 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1032 }
1033 }
1034
1035 #[test]
1036 fn min_best_number_peer_is_scheduled() {
1037 for _ in 0..10 {
1038 let client = mock_client_without_state();
1039 let mut provider = MockWarpSyncProvider::<Block>::new();
1040 provider
1041 .expect_current_authorities()
1042 .once()
1043 .return_const(AuthorityList::default());
1044 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1045 let mut warp_sync = WarpSync::new(
1046 Arc::new(client),
1047 config,
1048 None,
1049 Arc::new(MockBlockDownloader::new()),
1050 None,
1051 );
1052
1053 for best_number in 1..11 {
1054 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1055 }
1056
1057 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1058 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1059 }
1060 }
1061
1062 #[test]
1063 fn backedoff_number_peer_is_not_scheduled() {
1064 let client = mock_client_without_state();
1065 let mut provider = MockWarpSyncProvider::<Block>::new();
1066 provider
1067 .expect_current_authorities()
1068 .once()
1069 .return_const(AuthorityList::default());
1070 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1071 let mut warp_sync = WarpSync::new(
1072 Arc::new(client),
1073 config,
1074 None,
1075 Arc::new(MockBlockDownloader::new()),
1076 None,
1077 );
1078
1079 for best_number in 1..11 {
1080 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1081 }
1082
1083 let ninth_peer =
1084 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1085 let tenth_peer =
1086 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1087
1088 warp_sync.remove_peer(&tenth_peer);
1090 assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1091
1092 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1093 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1094 assert_eq!(tenth_peer, peer_id.unwrap());
1095 warp_sync.remove_peer(&tenth_peer);
1096
1097 assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1099
1100 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1102 let peer_id: Option<PeerId> =
1103 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1104 assert!(peer_id.is_none());
1105
1106 let peer_id: Option<PeerId> =
1108 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1109 assert_eq!(ninth_peer, peer_id.unwrap());
1110 }
1111
1112 #[test]
1113 fn no_warp_proof_request_in_another_phase() {
1114 let client = mock_client_without_state();
1115 let mut provider = MockWarpSyncProvider::<Block>::new();
1116 provider
1117 .expect_current_authorities()
1118 .once()
1119 .return_const(AuthorityList::default());
1120 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1121 let mut warp_sync = WarpSync::new(
1122 Arc::new(client),
1123 config,
1124 Some(ProtocolName::Static("")),
1125 Arc::new(MockBlockDownloader::new()),
1126 None,
1127 );
1128
1129 for best_number in 1..11 {
1131 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1132 }
1133
1134 warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1136 1,
1137 Default::default(),
1138 Default::default(),
1139 Default::default(),
1140 Default::default(),
1141 ));
1142
1143 assert!(warp_sync.warp_proof_request().is_none());
1145 }
1146
1147 #[test]
1148 fn warp_proof_request_starts_at_last_hash() {
1149 let client = mock_client_without_state();
1150 let mut provider = MockWarpSyncProvider::<Block>::new();
1151 provider
1152 .expect_current_authorities()
1153 .once()
1154 .return_const(AuthorityList::default());
1155 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1156 let mut warp_sync = WarpSync::new(
1157 Arc::new(client),
1158 config,
1159 Some(ProtocolName::Static("")),
1160 Arc::new(MockBlockDownloader::new()),
1161 None,
1162 );
1163
1164 for best_number in 1..11 {
1166 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1167 }
1168 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1169
1170 let known_last_hash = Hash::random();
1171
1172 match &mut warp_sync.phase {
1174 Phase::WarpProof { last_hash, .. } => {
1175 *last_hash = known_last_hash;
1176 },
1177 _ => panic!("Invalid phase."),
1178 }
1179
1180 let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1181 assert_eq!(request.begin, known_last_hash);
1182 }
1183
1184 #[test]
1185 fn no_parallel_warp_proof_requests() {
1186 let client = mock_client_without_state();
1187 let mut provider = MockWarpSyncProvider::<Block>::new();
1188 provider
1189 .expect_current_authorities()
1190 .once()
1191 .return_const(AuthorityList::default());
1192 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1193 let mut warp_sync = WarpSync::new(
1194 Arc::new(client),
1195 config,
1196 Some(ProtocolName::Static("")),
1197 Arc::new(MockBlockDownloader::new()),
1198 None,
1199 );
1200
1201 for best_number in 1..11 {
1203 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1204 }
1205 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1206
1207 assert!(warp_sync.warp_proof_request().is_some());
1209 assert!(warp_sync.warp_proof_request().is_none());
1211 }
1212
1213 #[test]
1214 fn bad_warp_proof_response_drops_peer() {
1215 let client = mock_client_without_state();
1216 let mut provider = MockWarpSyncProvider::<Block>::new();
1217 provider
1218 .expect_current_authorities()
1219 .once()
1220 .return_const(AuthorityList::default());
1221 provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
1223 Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1224 });
1225 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1226 let mut warp_sync = WarpSync::new(
1227 Arc::new(client),
1228 config,
1229 Some(ProtocolName::Static("")),
1230 Arc::new(MockBlockDownloader::new()),
1231 None,
1232 );
1233
1234 for best_number in 1..11 {
1236 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1237 }
1238 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1239
1240 let network_provider = NetworkServiceProvider::new();
1241 let network_handle = network_provider.handle();
1242
1243 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1245 assert_eq!(actions.len(), 1);
1246 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1247 panic!("Invalid action");
1248 };
1249
1250 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1251
1252 let actions = std::mem::take(&mut warp_sync.actions);
1254 assert_eq!(actions.len(), 1);
1255 assert!(matches!(
1256 actions[0],
1257 SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1258 ));
1259 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1260 }
1261
1262 #[test]
1263 fn partial_warp_proof_doesnt_advance_phase() {
1264 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1265 let mut provider = MockWarpSyncProvider::<Block>::new();
1266 provider
1267 .expect_current_authorities()
1268 .once()
1269 .return_const(AuthorityList::default());
1270 let target_block = BlockBuilderBuilder::new(&*client)
1271 .on_parent_block(client.chain_info().best_hash)
1272 .with_parent_block_number(client.chain_info().best_number)
1273 .build()
1274 .unwrap()
1275 .build()
1276 .unwrap()
1277 .block;
1278 let target_header = target_block.header().clone();
1279 let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1280 {
1282 let target_header = target_header.clone();
1283 let justifications = justifications.clone();
1284 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1285 Ok(VerificationResult::Partial(
1286 set_id,
1287 authorities,
1288 target_header.hash(),
1289 vec![(target_header, justifications)],
1290 ))
1291 });
1292 }
1293 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1294 let mut warp_sync = WarpSync::new(
1295 client,
1296 config,
1297 Some(ProtocolName::Static("")),
1298 Arc::new(MockBlockDownloader::new()),
1299 None,
1300 );
1301
1302 for best_number in 1..11 {
1304 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1305 }
1306 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1307
1308 let network_provider = NetworkServiceProvider::new();
1309 let network_handle = network_provider.handle();
1310
1311 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1313 assert_eq!(actions.len(), 1);
1314 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1315 panic!("Invalid action");
1316 };
1317
1318 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1319
1320 assert_eq!(warp_sync.actions.len(), 1);
1321 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1322 else {
1323 panic!("Expected `ImportBlocks` action.");
1324 };
1325 assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1326 assert_eq!(blocks.len(), 1);
1327 let import_block = blocks.pop().unwrap();
1328 assert_eq!(
1329 import_block,
1330 IncomingBlock {
1331 hash: target_header.hash(),
1332 header: Some(target_header),
1333 body: None,
1334 indexed_body: None,
1335 justifications: Some(justifications),
1336 origin: Some(request_peer_id),
1337 allow_missing_state: true,
1338 skip_execution: true,
1339 import_existing: false,
1340 state: None,
1341 }
1342 );
1343 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1344 }
1345
1346 #[test]
1347 fn complete_warp_proof_advances_phase() {
1348 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1349 let mut provider = MockWarpSyncProvider::<Block>::new();
1350 provider
1351 .expect_current_authorities()
1352 .once()
1353 .return_const(AuthorityList::default());
1354 let target_block = BlockBuilderBuilder::new(&*client)
1355 .on_parent_block(client.chain_info().best_hash)
1356 .with_parent_block_number(client.chain_info().best_number)
1357 .build()
1358 .unwrap()
1359 .build()
1360 .unwrap()
1361 .block;
1362 let target_header = target_block.header().clone();
1363 let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1364 {
1366 let target_header = target_header.clone();
1367 let justifications = justifications.clone();
1368 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1369 Ok(VerificationResult::Complete(
1370 set_id,
1371 authorities,
1372 target_header.clone(),
1373 vec![(target_header, justifications)],
1374 ))
1375 });
1376 }
1377 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1378 let mut warp_sync = WarpSync::new(
1379 client,
1380 config,
1381 Some(ProtocolName::Static("")),
1382 Arc::new(MockBlockDownloader::new()),
1383 None,
1384 );
1385
1386 for best_number in 1..11 {
1388 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1389 }
1390 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1391
1392 let network_provider = NetworkServiceProvider::new();
1393 let network_handle = network_provider.handle();
1394
1395 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1397 assert_eq!(actions.len(), 1);
1398 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1399 panic!("Invalid action.");
1400 };
1401
1402 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1403
1404 assert_eq!(warp_sync.actions.len(), 1);
1405 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1406 else {
1407 panic!("Expected `ImportBlocks` action.");
1408 };
1409 assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1410 assert_eq!(blocks.len(), 1);
1411 let import_block = blocks.pop().unwrap();
1412 assert_eq!(
1413 import_block,
1414 IncomingBlock {
1415 hash: target_header.hash(),
1416 header: Some(target_header),
1417 body: None,
1418 indexed_body: None,
1419 justifications: Some(justifications),
1420 origin: Some(request_peer_id),
1421 allow_missing_state: true,
1422 skip_execution: true,
1423 import_existing: false,
1424 state: None,
1425 }
1426 );
1427 assert!(
1428 matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1429 );
1430 }
1431
1432 #[test]
1433 fn no_target_block_requests_in_another_phase() {
1434 let client = mock_client_without_state();
1435 let mut provider = MockWarpSyncProvider::<Block>::new();
1436 provider
1437 .expect_current_authorities()
1438 .once()
1439 .return_const(AuthorityList::default());
1440 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1441 let mut warp_sync = WarpSync::new(
1442 Arc::new(client),
1443 config,
1444 None,
1445 Arc::new(MockBlockDownloader::new()),
1446 None,
1447 );
1448
1449 for best_number in 1..11 {
1451 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1452 }
1453 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1455
1456 assert!(warp_sync.target_block_request().is_none());
1458 }
1459
1460 #[test]
1461 fn target_block_request_is_correct() {
1462 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1463 let mut provider = MockWarpSyncProvider::<Block>::new();
1464 provider
1465 .expect_current_authorities()
1466 .once()
1467 .return_const(AuthorityList::default());
1468 let target_block = BlockBuilderBuilder::new(&*client)
1469 .on_parent_block(client.chain_info().best_hash)
1470 .with_parent_block_number(client.chain_info().best_number)
1471 .build()
1472 .unwrap()
1473 .build()
1474 .unwrap()
1475 .block;
1476 let target_header = target_block.header().clone();
1477 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1479 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1480 });
1481 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1482 let mut warp_sync =
1483 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1484
1485 for best_number in 1..11 {
1487 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1488 }
1489
1490 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1492
1493 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1494 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1495 assert_eq!(
1496 request.fields,
1497 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1498 );
1499 assert_eq!(request.max, Some(1));
1500 }
1501
1502 #[test]
1503 fn externally_set_target_block_is_requested() {
1504 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1505 let target_block = BlockBuilderBuilder::new(&*client)
1506 .on_parent_block(client.chain_info().best_hash)
1507 .with_parent_block_number(client.chain_info().best_number)
1508 .build()
1509 .unwrap()
1510 .build()
1511 .unwrap()
1512 .block;
1513 let target_header = target_block.header().clone();
1514 let config = WarpSyncConfig::WithTarget(target_header);
1515 let mut warp_sync =
1516 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1517
1518 for best_number in 1..11 {
1520 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1521 }
1522
1523 assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1524
1525 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1526 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1527 assert_eq!(
1528 request.fields,
1529 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1530 );
1531 assert_eq!(request.max, Some(1));
1532 }
1533
1534 #[test]
1535 fn no_parallel_target_block_requests() {
1536 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1537 let mut provider = MockWarpSyncProvider::<Block>::new();
1538 provider
1539 .expect_current_authorities()
1540 .once()
1541 .return_const(AuthorityList::default());
1542 let target_block = BlockBuilderBuilder::new(&*client)
1543 .on_parent_block(client.chain_info().best_hash)
1544 .with_parent_block_number(client.chain_info().best_number)
1545 .build()
1546 .unwrap()
1547 .build()
1548 .unwrap()
1549 .block;
1550 let target_header = target_block.header().clone();
1551 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1553 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1554 });
1555 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1556 let mut warp_sync =
1557 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1558
1559 for best_number in 1..11 {
1561 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1562 }
1563
1564 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1566
1567 assert!(warp_sync.target_block_request().is_some());
1569 assert!(warp_sync.target_block_request().is_none());
1571 }
1572
1573 #[test]
1574 fn target_block_response_with_no_blocks_drops_peer() {
1575 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1576 let mut provider = MockWarpSyncProvider::<Block>::new();
1577 provider
1578 .expect_current_authorities()
1579 .once()
1580 .return_const(AuthorityList::default());
1581 let target_block = BlockBuilderBuilder::new(&*client)
1582 .on_parent_block(client.chain_info().best_hash)
1583 .with_parent_block_number(client.chain_info().best_number)
1584 .build()
1585 .unwrap()
1586 .build()
1587 .unwrap()
1588 .block;
1589 let target_header = target_block.header().clone();
1590 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1592 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1593 });
1594 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1595 let mut warp_sync =
1596 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1597
1598 for best_number in 1..11 {
1600 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1601 }
1602
1603 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1605
1606 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1607
1608 let response = Vec::new();
1610 assert!(matches!(
1612 warp_sync.on_block_response_inner(peer_id, request, response),
1613 Err(BadPeer(id, _rep)) if id == peer_id,
1614 ));
1615 }
1616
1617 #[test]
1618 fn target_block_response_with_extra_blocks_drops_peer() {
1619 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1620 let mut provider = MockWarpSyncProvider::<Block>::new();
1621 provider
1622 .expect_current_authorities()
1623 .once()
1624 .return_const(AuthorityList::default());
1625 let target_block = BlockBuilderBuilder::new(&*client)
1626 .on_parent_block(client.chain_info().best_hash)
1627 .with_parent_block_number(client.chain_info().best_number)
1628 .build()
1629 .unwrap()
1630 .build()
1631 .unwrap()
1632 .block;
1633
1634 let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1635 .on_parent_block(client.chain_info().best_hash)
1636 .with_parent_block_number(client.chain_info().best_number)
1637 .build()
1638 .unwrap();
1639 extra_block_builder
1640 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1641 .unwrap();
1642 let extra_block = extra_block_builder.build().unwrap().block;
1643
1644 let target_header = target_block.header().clone();
1645 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1647 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1648 });
1649 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1650 let mut warp_sync =
1651 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1652
1653 for best_number in 1..11 {
1655 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1656 }
1657
1658 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1660
1661 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1662
1663 let response = vec![
1665 BlockData::<Block> {
1666 hash: target_block.header().hash(),
1667 header: Some(target_block.header().clone()),
1668 body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1669 indexed_body: None,
1670 receipt: None,
1671 message_queue: None,
1672 justification: None,
1673 justifications: None,
1674 },
1675 BlockData::<Block> {
1676 hash: extra_block.header().hash(),
1677 header: Some(extra_block.header().clone()),
1678 body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1679 indexed_body: None,
1680 receipt: None,
1681 message_queue: None,
1682 justification: None,
1683 justifications: None,
1684 },
1685 ];
1686 assert!(matches!(
1688 warp_sync.on_block_response_inner(peer_id, request, response),
1689 Err(BadPeer(id, _rep)) if id == peer_id,
1690 ));
1691 }
1692
1693 #[test]
1694 fn target_block_response_with_wrong_block_drops_peer() {
1695 pezsp_tracing::try_init_simple();
1696
1697 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1698 let mut provider = MockWarpSyncProvider::<Block>::new();
1699 provider
1700 .expect_current_authorities()
1701 .once()
1702 .return_const(AuthorityList::default());
1703 let target_block = BlockBuilderBuilder::new(&*client)
1704 .on_parent_block(client.chain_info().best_hash)
1705 .with_parent_block_number(client.chain_info().best_number)
1706 .build()
1707 .unwrap()
1708 .build()
1709 .unwrap()
1710 .block;
1711
1712 let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1713 .on_parent_block(client.chain_info().best_hash)
1714 .with_parent_block_number(client.chain_info().best_number)
1715 .build()
1716 .unwrap();
1717 wrong_block_builder
1718 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1719 .unwrap();
1720 let wrong_block = wrong_block_builder.build().unwrap().block;
1721
1722 let target_header = target_block.header().clone();
1723 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1725 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1726 });
1727 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1728 let mut warp_sync =
1729 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1730
1731 for best_number in 1..11 {
1733 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1734 }
1735
1736 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1738
1739 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1740
1741 let response = vec![BlockData::<Block> {
1743 hash: wrong_block.header().hash(),
1744 header: Some(wrong_block.header().clone()),
1745 body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1746 indexed_body: None,
1747 receipt: None,
1748 message_queue: None,
1749 justification: None,
1750 justifications: None,
1751 }];
1752 assert!(matches!(
1754 warp_sync.on_block_response_inner(peer_id, request, response),
1755 Err(BadPeer(id, _rep)) if id == peer_id,
1756 ));
1757 }
1758
1759 #[test]
1760 fn correct_target_block_response_sets_strategy_result() {
1761 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1762 let mut provider = MockWarpSyncProvider::<Block>::new();
1763 provider
1764 .expect_current_authorities()
1765 .once()
1766 .return_const(AuthorityList::default());
1767 let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1768 .on_parent_block(client.chain_info().best_hash)
1769 .with_parent_block_number(client.chain_info().best_number)
1770 .build()
1771 .unwrap();
1772 target_block_builder
1773 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1774 .unwrap();
1775 let target_block = target_block_builder.build().unwrap().block;
1776 let target_header = target_block.header().clone();
1777 provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1779 Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1780 });
1781 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1782 let mut warp_sync =
1783 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1784
1785 for best_number in 1..11 {
1787 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1788 }
1789
1790 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1792
1793 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1794
1795 let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1797 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1798 let response = vec![BlockData::<Block> {
1799 hash: target_block.header().hash(),
1800 header: Some(target_block.header().clone()),
1801 body: body.clone(),
1802 indexed_body: None,
1803 receipt: None,
1804 message_queue: None,
1805 justification: None,
1806 justifications: justifications.clone(),
1807 }];
1808
1809 assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1810
1811 let network_provider = NetworkServiceProvider::new();
1812 let network_handle = network_provider.handle();
1813
1814 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1816 assert_eq!(actions.len(), 1);
1817 assert!(matches!(actions[0], SyncingAction::Finished));
1818
1819 let result = warp_sync.take_result().unwrap();
1821 assert_eq!(result.target_header, *target_block.header());
1822 assert_eq!(result.target_body, body);
1823 assert_eq!(result.target_justifications, justifications);
1824 }
1825}