1use soil_client::consensus::BlockOrigin;
10use soil_client::import::IncomingBlock;
11
12use crate::{
13 sync::{
14 block_relay_protocol::{BlockDownloader, BlockResponseError},
15 service::network::NetworkServiceHandle,
16 strategy::{
17 chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
18 SyncingAction,
19 },
20 types::{BadPeer, SyncState, SyncStatus},
21 },
22 LOG_TARGET,
23};
24use codec::{Decode, Encode};
25use futures::{channel::oneshot, FutureExt};
26use log::{debug, error, trace, warn};
27use soil_client::blockchain::HeaderBackend;
28use soil_network::common::sync::message::{
29 BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
30};
31use soil_network::types::PeerId;
32use soil_network::{IfDisconnected, ProtocolName};
33use std::{any::Any, collections::HashMap, fmt, sync::Arc};
34use subsoil::runtime::{
35 traits::{Block as BlockT, Header, NumberFor, Zero},
36 Justifications, SaturatedConversion,
37};
38
39const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
41
42pub struct EncodedProof(pub Vec<u8>);
44
45#[derive(Encode, Decode, Debug, Clone)]
47pub struct WarpProofRequest<B: BlockT> {
48 pub begin: B::Hash,
50}
51
52pub trait Verifier<Block: BlockT>: Send + Sync {
54 fn verify(
56 &mut self,
57 proof: &EncodedProof,
58 ) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
59 fn next_proof_context(&self) -> Block::Hash;
61 fn status(&self) -> Option<String>;
63}
64
65pub enum VerificationResult<Block: BlockT> {
67 Partial(Vec<(Block::Header, Justifications)>),
69 Complete(Block::Header, Vec<(Block::Header, Justifications)>),
71}
72
73pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
75 fn generate(
78 &self,
79 start: Block::Hash,
80 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
81 fn create_verifier(&self) -> Box<dyn Verifier<Block>>;
83}
84
85mod rep {
86 use soil_network::ReputationChange as Rep;
87
88 pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
90
91 pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
93
94 pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
96
97 pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
99
100 pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
102
103 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
105}
106
107#[derive(Clone, Eq, PartialEq, Debug)]
109pub enum WarpSyncPhase<Block: BlockT> {
110 AwaitingPeers { required_peers: usize },
112 DownloadingWarpProofs,
114 DownloadingTargetBlock,
116 DownloadingState,
118 ImportingState,
120 DownloadingBlocks(NumberFor<Block>),
122 Complete,
124}
125
126impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 match self {
129 Self::AwaitingPeers { required_peers } => {
130 write!(f, "Waiting for {required_peers} peers to be connected")
131 },
132 Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
133 Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
134 Self::DownloadingState => write!(f, "Downloading state"),
135 Self::ImportingState => write!(f, "Importing state"),
136 Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
137 Self::Complete => write!(f, "Warp sync is complete"),
138 }
139 }
140}
141
142#[derive(Clone, Eq, PartialEq, Debug)]
144pub struct WarpSyncProgress<Block: BlockT> {
145 pub phase: WarpSyncPhase<Block>,
147 pub total_bytes: u64,
149 pub status: Option<String>,
151}
152
153pub enum WarpSyncConfig<Block: BlockT> {
155 WithProvider(Arc<dyn WarpSyncProvider<Block>>),
157 WithTarget(<Block as BlockT>::Header),
161}
162
163enum Phase<B: BlockT> {
165 WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
167 WarpProof { verifier: Box<dyn Verifier<B>> },
169 TargetBlock(B::Header),
171 Complete,
173}
174
175enum PeerState {
176 Available,
177 DownloadingProofs,
178 DownloadingTargetBlock,
179}
180
181impl PeerState {
182 fn is_available(&self) -> bool {
183 matches!(self, PeerState::Available)
184 }
185}
186
187struct Peer<B: BlockT> {
188 best_number: NumberFor<B>,
189 state: PeerState,
190}
191
192pub struct WarpSyncResult<B: BlockT> {
193 pub target_header: B::Header,
194 pub target_body: Option<Vec<B::Extrinsic>>,
195 pub target_justifications: Option<Justifications>,
196}
197
198pub struct WarpSync<B: BlockT> {
200 phase: Phase<B>,
201 total_proof_bytes: u64,
202 total_state_bytes: u64,
203 peers: HashMap<PeerId, Peer<B>>,
204 disconnected_peers: DisconnectedPeers,
205 protocol_name: Option<ProtocolName>,
206 block_downloader: Arc<dyn BlockDownloader<B>>,
207 actions: Vec<SyncingAction<B>>,
208 result: Option<WarpSyncResult<B>>,
209 min_peers_to_start_warp_sync: usize,
211}
212
213impl<B> WarpSync<B>
214where
215 B: BlockT,
216{
217 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
219
220 pub fn new<Client>(
224 client: Arc<Client>,
225 warp_sync_config: WarpSyncConfig<B>,
226 protocol_name: Option<ProtocolName>,
227 block_downloader: Arc<dyn BlockDownloader<B>>,
228 min_peers_to_start_warp_sync: Option<usize>,
229 ) -> Self
230 where
231 Client: HeaderBackend<B> + 'static,
232 {
233 let min_peers_to_start_warp_sync =
234 min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
235 if client.info().finalized_state.is_some() {
236 error!(
237 target: LOG_TARGET,
238 "Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
239 );
240 return Self {
241 phase: Phase::Complete,
242 total_proof_bytes: 0,
243 total_state_bytes: 0,
244 peers: HashMap::new(),
245 disconnected_peers: DisconnectedPeers::new(),
246 protocol_name,
247 block_downloader,
248 actions: vec![SyncingAction::Finished],
249 result: None,
250 min_peers_to_start_warp_sync,
251 };
252 }
253
254 let phase = match warp_sync_config {
255 WarpSyncConfig::WithProvider(warp_sync_provider) => {
256 Phase::WaitingForPeers { warp_sync_provider }
257 },
258 WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
259 };
260
261 Self {
262 phase,
263 total_proof_bytes: 0,
264 total_state_bytes: 0,
265 peers: HashMap::new(),
266 disconnected_peers: DisconnectedPeers::new(),
267 protocol_name,
268 block_downloader,
269 actions: Vec::new(),
270 result: None,
271 min_peers_to_start_warp_sync,
272 }
273 }
274
275 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
277 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
278
279 self.try_to_start_warp_sync();
280 }
281
282 pub fn remove_peer(&mut self, peer_id: &PeerId) {
284 if let Some(state) = self.peers.remove(peer_id) {
285 if !state.state.is_available() {
286 if let Some(bad_peer) =
287 self.disconnected_peers.on_disconnect_during_request(*peer_id)
288 {
289 self.actions.push(SyncingAction::DropPeer(bad_peer));
290 }
291 }
292 }
293 }
294
295 #[must_use]
299 pub fn on_validated_block_announce(
300 &mut self,
301 is_best: bool,
302 peer_id: PeerId,
303 announce: &BlockAnnounce<B::Header>,
304 ) -> Option<(B::Hash, NumberFor<B>)> {
305 is_best.then(|| {
306 let best_number = *announce.header.number();
307 let best_hash = announce.header.hash();
308 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
309 peer.best_number = best_number;
310 }
311 (best_hash, best_number)
313 })
314 }
315
316 fn try_to_start_warp_sync(&mut self) {
318 let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
319
320 if self.peers.len() < self.min_peers_to_start_warp_sync {
321 return;
322 }
323
324 let verifier = warp_sync_provider.create_verifier();
325 self.phase = Phase::WarpProof { verifier };
326 debug!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
327 }
328
329 pub fn on_generic_response(
330 &mut self,
331 peer_id: &PeerId,
332 protocol_name: ProtocolName,
333 response: Box<dyn Any + Send>,
334 ) {
335 if &protocol_name == self.block_downloader.protocol_name() {
336 let Ok(response) = response
337 .downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
338 else {
339 warn!(target: LOG_TARGET, "Failed to downcast block response");
340 debug_assert!(false);
341 return;
342 };
343
344 let (request, response) = *response;
345 let blocks = match response {
346 Ok(blocks) => blocks,
347 Err(BlockResponseError::DecodeFailed(e)) => {
348 debug!(
349 target: LOG_TARGET,
350 "Failed to decode block response from peer {:?}: {:?}.",
351 peer_id,
352 e
353 );
354 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
355 return;
356 },
357 Err(BlockResponseError::ExtractionFailed(e)) => {
358 debug!(
359 target: LOG_TARGET,
360 "Failed to extract blocks from peer response {:?}: {:?}.",
361 peer_id,
362 e
363 );
364 self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
365 return;
366 },
367 };
368
369 self.on_block_response(*peer_id, request, blocks);
370 } else {
371 let Ok(response) = response.downcast::<Vec<u8>>() else {
372 warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
373 debug_assert!(false);
374 return;
375 };
376
377 self.on_warp_proof_response(peer_id, EncodedProof(*response));
378 }
379 }
380
381 pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
383 if let Some(peer) = self.peers.get_mut(peer_id) {
384 peer.state = PeerState::Available;
385 }
386
387 let Phase::WarpProof { verifier } = &mut self.phase else {
388 debug!(target: LOG_TARGET, "Unexpected warp proof response");
389 self.actions
390 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
391 return;
392 };
393
394 let proof_to_incoming_block =
395 |(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
396 IncomingBlock {
397 hash: header.hash(),
398 header: Some(header),
399 body: None,
400 indexed_body: None,
401 justifications: Some(justifications),
402 origin: Some((*peer_id).into()),
403 allow_missing_state: true,
406 skip_execution: true,
407 import_existing: false,
409 state: None,
410 }
411 };
412
413 match verifier.verify(&response) {
414 Err(e) => {
415 debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
416 self.actions
417 .push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
418 },
419 Ok(VerificationResult::Partial(proofs)) => {
420 debug!(target: LOG_TARGET, "Verified partial proof");
421 self.total_proof_bytes += response.0.len() as u64;
422 self.actions.push(SyncingAction::ImportBlocks {
423 origin: BlockOrigin::WarpSync,
424 blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
425 });
426 },
427 Ok(VerificationResult::Complete(header, proofs)) => {
428 debug!(
429 target: LOG_TARGET,
430 "Verified complete proof. Continuing with target block download: {} ({}).",
431 header.hash(),
432 header.number(),
433 );
434 self.total_proof_bytes += response.0.len() as u64;
435 self.phase = Phase::TargetBlock(header.clone());
436 let incoming_blocks: Vec<_> = proofs
437 .into_iter()
438 .map(proof_to_incoming_block)
439 .filter(|i| {
440 if header.number() != i.header.as_ref().unwrap().number() {
445 true
446 } else {
447 log::trace!(
448 target: LOG_TARGET,
449 "Filtered out target block: {} ({})",
450 header.hash(),
451 header.number()
452 );
453 false
454 }
455 })
456 .collect();
457 self.actions.push(SyncingAction::ImportBlocks {
458 origin: BlockOrigin::WarpSync,
459 blocks: incoming_blocks,
460 });
461 },
462 }
463 }
464
465 pub fn on_block_response(
467 &mut self,
468 peer_id: PeerId,
469 request: BlockRequest<B>,
470 blocks: Vec<BlockData<B>>,
471 ) {
472 if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
473 self.actions.push(SyncingAction::DropPeer(bad_peer));
474 }
475 }
476
477 fn on_block_response_inner(
478 &mut self,
479 peer_id: PeerId,
480 request: BlockRequest<B>,
481 mut blocks: Vec<BlockData<B>>,
482 ) -> Result<(), BadPeer> {
483 if let Some(peer) = self.peers.get_mut(&peer_id) {
484 peer.state = PeerState::Available;
485 }
486
487 let Phase::TargetBlock(header) = &mut self.phase else {
488 debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
489 return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE));
490 };
491
492 if blocks.is_empty() {
493 debug!(
494 target: LOG_TARGET,
495 "Downloading target block failed: empty block response from {peer_id}",
496 );
497 return Err(BadPeer(peer_id, rep::NO_BLOCK));
498 }
499
500 if blocks.len() > 1 {
501 debug!(
502 target: LOG_TARGET,
503 "Too many blocks ({}) in warp target block response from {peer_id}",
504 blocks.len(),
505 );
506 return Err(BadPeer(peer_id, rep::NOT_REQUESTED));
507 }
508
509 validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
510
511 let block = blocks.pop().expect("`blocks` len checked above; qed");
512
513 let Some(block_header) = &block.header else {
514 debug!(
515 target: LOG_TARGET,
516 "Downloading target block failed: missing header in response from {peer_id}.",
517 );
518 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
519 };
520
521 if block_header != header {
522 debug!(
523 target: LOG_TARGET,
524 "Downloading target block failed: different header in response from {peer_id}.",
525 );
526 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
527 }
528
529 if block.body.is_none() {
530 debug!(
531 target: LOG_TARGET,
532 "Downloading target block failed: missing body in response from {peer_id}.",
533 );
534 return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
535 }
536
537 self.result = Some(WarpSyncResult {
538 target_header: header.clone(),
539 target_body: block.body,
540 target_justifications: block.justifications,
541 });
542 self.phase = Phase::Complete;
543 self.actions.push(SyncingAction::Finished);
544 Ok(())
545 }
546
547 fn schedule_next_peer(
549 &mut self,
550 new_state: PeerState,
551 min_best_number: Option<NumberFor<B>>,
552 ) -> Option<PeerId> {
553 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
554 if targets.is_empty() {
555 return None;
556 }
557 targets.sort();
558 let median = targets[targets.len() / 2];
559 let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
560 for (peer_id, peer) in self.peers.iter_mut() {
563 if peer.state.is_available()
564 && peer.best_number >= threshold
565 && self.disconnected_peers.is_peer_available(peer_id)
566 {
567 peer.state = new_state;
568 return Some(*peer_id);
569 }
570 }
571 None
572 }
573
574 fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
576 let Phase::WarpProof { verifier } = &self.phase else { return None };
577
578 let begin = verifier.next_proof_context();
580
581 if self
582 .peers
583 .values()
584 .any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
585 {
586 return None;
588 }
589
590 let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
591 trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
592
593 let request = WarpProofRequest { begin };
594
595 let Some(protocol_name) = self.protocol_name.clone() else {
596 warn!(
597 target: LOG_TARGET,
598 "Trying to send warp sync request when no protocol is configured {request:?}",
599 );
600 return None;
601 };
602
603 Some((peer_id, protocol_name, request))
604 }
605
606 fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
608 let Phase::TargetBlock(target_header) = &self.phase else { return None };
609
610 if self
611 .peers
612 .values()
613 .any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
614 {
615 return None;
617 }
618
619 let target_hash = target_header.hash();
621 let target_number = *target_header.number();
622
623 let peer_id =
624 self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
625
626 trace!(
627 target: LOG_TARGET,
628 "New target block request to {peer_id}, target: {} ({}).",
629 target_hash,
630 target_number,
631 );
632
633 Some((
634 peer_id,
635 BlockRequest::<B> {
636 id: 0,
637 fields: BlockAttributes::HEADER
638 | BlockAttributes::BODY
639 | BlockAttributes::JUSTIFICATION,
640 from: FromBlock::Hash(target_hash),
641 direction: Direction::Ascending,
642 max: Some(1),
643 },
644 ))
645 }
646
647 pub fn progress(&self) -> WarpSyncProgress<B> {
649 match &self.phase {
650 Phase::WaitingForPeers { .. } => WarpSyncProgress {
651 phase: WarpSyncPhase::AwaitingPeers {
652 required_peers: self.min_peers_to_start_warp_sync,
653 },
654 total_bytes: self.total_proof_bytes,
655 status: None,
656 },
657 Phase::WarpProof { verifier } => WarpSyncProgress {
658 phase: WarpSyncPhase::DownloadingWarpProofs,
659 total_bytes: self.total_proof_bytes,
660 status: verifier.status(),
661 },
662 Phase::TargetBlock(_) => WarpSyncProgress {
663 phase: WarpSyncPhase::DownloadingTargetBlock,
664 total_bytes: self.total_proof_bytes,
665 status: None,
666 },
667 Phase::Complete => WarpSyncProgress {
668 phase: WarpSyncPhase::Complete,
669 total_bytes: self.total_proof_bytes + self.total_state_bytes,
670 status: None,
671 },
672 }
673 }
674
675 pub fn num_peers(&self) -> usize {
677 self.peers.len()
678 }
679
680 pub fn status(&self) -> SyncStatus<B> {
682 SyncStatus {
683 state: match &self.phase {
684 Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
685 Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
686 Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
687 Phase::Complete => SyncState::Idle,
688 },
689 best_seen_block: match &self.phase {
690 Phase::WaitingForPeers { .. } => None,
691 Phase::WarpProof { .. } => None,
692 Phase::TargetBlock(header) => Some(*header.number()),
693 Phase::Complete => None,
694 },
695 num_peers: self.peers.len().saturated_into(),
696 queued_blocks: 0,
697 state_sync: None,
698 warp_sync: Some(self.progress()),
699 }
700 }
701
702 #[must_use]
704 pub fn actions(
705 &mut self,
706 network_service: &NetworkServiceHandle,
707 ) -> impl Iterator<Item = SyncingAction<B>> {
708 let warp_proof_request =
709 self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
710 trace!(
711 target: LOG_TARGET,
712 "Created `WarpProofRequest` to {}, request: {:?}.",
713 peer_id,
714 request,
715 );
716
717 let (tx, rx) = oneshot::channel();
718
719 network_service.start_request(
720 peer_id,
721 protocol_name,
722 request.encode(),
723 tx,
724 IfDisconnected::ImmediateError,
725 );
726
727 SyncingAction::StartRequest {
728 peer_id,
729 key: Self::STRATEGY_KEY,
730 request: async move {
731 Ok(rx.await?.and_then(|(response, protocol_name)| {
732 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
733 }))
734 }
735 .boxed(),
736 remove_obsolete: false,
737 }
738 });
739 self.actions.extend(warp_proof_request);
740
741 let target_block_request =
742 self.target_block_request().into_iter().map(|(peer_id, request)| {
743 let downloader = self.block_downloader.clone();
744
745 SyncingAction::StartRequest {
746 peer_id,
747 key: Self::STRATEGY_KEY,
748 request: async move {
749 Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
750 |(response, protocol_name)| {
751 let decoded_response =
752 downloader.block_response_into_blocks(&request, response);
753 let result =
754 Box::new((request, decoded_response)) as Box<dyn Any + Send>;
755 Ok((result, protocol_name))
756 },
757 ))
758 }
759 .boxed(),
760 remove_obsolete: true,
763 }
764 });
765 self.actions.extend(target_block_request);
766
767 std::mem::take(&mut self.actions).into_iter()
768 }
769
770 #[must_use]
772 pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
773 self.result.take()
774 }
775}
776
777#[cfg(test)]
778mod test {
779 use super::*;
780 use crate::sync::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
781 use soil_client::block_builder::BlockBuilderBuilder;
782 use soil_client::blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
783 use std::{io::ErrorKind, sync::Arc};
784 use subsoil::core::H256;
785 use subsoil::runtime::{
786 traits::{Block as BlockT, Header as HeaderT, NumberFor},
787 ConsensusEngineId,
788 };
789 use soil_test_node_runtime_client::{
790 runtime::{Block, Hash},
791 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
792 };
793
794 pub const TEST_ENGINE_ID: ConsensusEngineId = *b"TEST";
795
796 mockall::mock! {
797 pub Client<B: BlockT> {}
798
799 impl<B: BlockT> HeaderBackend<B> for Client<B> {
800 fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
801 fn info(&self) -> Info<B>;
802 fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
803 fn number(
804 &self,
805 hash: B::Hash,
806 ) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
807 fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
808 }
809 }
810
811 mockall::mock! {
812 pub WarpSyncProvider<B: BlockT> {}
813
814 impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
815 fn generate(
816 &self,
817 start: B::Hash,
818 ) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
819 fn create_verifier(&self) -> Box<dyn super::Verifier<B>>;
820 }
821 }
822
823 mockall::mock! {
824 pub Verifier<B: BlockT> {}
825
826 impl<B: BlockT> super::Verifier<B> for Verifier<B> {
827 fn verify(
828 &mut self,
829 proof: &EncodedProof,
830 ) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
831 fn next_proof_context(&self) -> B::Hash;
832 fn status(&self) -> Option<String>;
833 }
834 }
835
836 fn mock_client_with_state() -> MockClient<Block> {
837 let mut client = MockClient::<Block>::new();
838 let genesis_hash = Hash::random();
839 client.expect_info().return_once(move || Info {
840 best_hash: genesis_hash,
841 best_number: 0,
842 genesis_hash,
843 finalized_hash: genesis_hash,
844 finalized_number: 0,
845 finalized_state: Some((genesis_hash, 0)),
847 number_leaves: 0,
848 block_gap: None,
849 });
850
851 client
852 }
853
854 fn mock_client_without_state() -> MockClient<Block> {
855 let mut client = MockClient::<Block>::new();
856 let genesis_hash = Hash::random();
857 client.expect_info().returning(move || Info {
858 best_hash: genesis_hash,
859 best_number: 0,
860 genesis_hash,
861 finalized_hash: genesis_hash,
862 finalized_number: 0,
863 finalized_state: None,
864 number_leaves: 0,
865 block_gap: None,
866 });
867
868 client
869 }
870
871 #[test]
872 fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
873 let client = mock_client_with_state();
874 let provider = MockWarpSyncProvider::<Block>::new();
875 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
876 let mut warp_sync = WarpSync::new(
877 Arc::new(client),
878 config,
879 None,
880 Arc::new(MockBlockDownloader::new()),
881 None,
882 );
883
884 let network_provider = NetworkServiceProvider::new();
885 let network_handle = network_provider.handle();
886
887 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
889 assert_eq!(actions.len(), 1);
890 assert!(matches!(actions[0], SyncingAction::Finished));
891
892 assert!(warp_sync.take_result().is_none());
894 }
895
896 #[test]
897 fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
898 let client = mock_client_with_state();
899 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
900 1,
901 Default::default(),
902 Default::default(),
903 Default::default(),
904 Default::default(),
905 ));
906 let mut warp_sync = WarpSync::new(
907 Arc::new(client),
908 config,
909 None,
910 Arc::new(MockBlockDownloader::new()),
911 None,
912 );
913
914 let network_provider = NetworkServiceProvider::new();
915 let network_handle = network_provider.handle();
916
917 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
919 assert_eq!(actions.len(), 1);
920 assert!(matches!(actions[0], SyncingAction::Finished));
921
922 assert!(warp_sync.take_result().is_none());
924 }
925
926 #[test]
927 fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
928 let client = mock_client_without_state();
929 let provider = MockWarpSyncProvider::<Block>::new();
930 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
931 let mut warp_sync = WarpSync::new(
932 Arc::new(client),
933 config,
934 None,
935 Arc::new(MockBlockDownloader::new()),
936 None,
937 );
938
939 let network_provider = NetworkServiceProvider::new();
940 let network_handle = network_provider.handle();
941
942 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
944 }
945
946 #[test]
947 fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
948 let client = mock_client_without_state();
949 let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
950 1,
951 Default::default(),
952 Default::default(),
953 Default::default(),
954 Default::default(),
955 ));
956 let mut warp_sync = WarpSync::new(
957 Arc::new(client),
958 config,
959 None,
960 Arc::new(MockBlockDownloader::new()),
961 None,
962 );
963
964 let network_provider = NetworkServiceProvider::new();
965 let network_handle = network_provider.handle();
966
967 assert_eq!(warp_sync.actions(&network_handle).count(), 0)
969 }
970
971 #[test]
972 fn warp_sync_is_started_only_when_there_is_enough_peers() {
973 let client = mock_client_without_state();
974 let mut provider = MockWarpSyncProvider::<Block>::new();
975 let mut verifier = MockVerifier::<Block>::new();
976 verifier.expect_next_proof_context().returning(|| Hash::random());
977 verifier
978 .expect_verify()
979 .returning(|_| unreachable!("verify should not be called in this test"));
980 provider.expect_create_verifier().return_once(move || Box::new(verifier));
981 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
982 let mut warp_sync = WarpSync::new(
983 Arc::new(client),
984 config,
985 None,
986 Arc::new(MockBlockDownloader::new()),
987 None,
988 );
989
990 for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
992 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
993 assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
994 }
995
996 warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
998 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
999 }
1000
1001 #[test]
1002 fn no_peer_is_scheduled_if_no_peers_connected() {
1003 let client = mock_client_without_state();
1004 let provider = MockWarpSyncProvider::<Block>::new();
1005 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1006 let mut warp_sync = WarpSync::new(
1007 Arc::new(client),
1008 config,
1009 None,
1010 Arc::new(MockBlockDownloader::new()),
1011 None,
1012 );
1013
1014 assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
1015 }
1016
1017 #[test]
1018 fn enough_peers_are_used_in_tests() {
1019 assert!(
1021 10 >= MIN_PEERS_TO_START_WARP_SYNC,
1022 "Tests must be updated to use that many initial peers.",
1023 );
1024 }
1025
1026 #[test]
1027 fn at_least_median_synced_peer_is_scheduled() {
1028 for _ in 0..100 {
1029 let client = mock_client_without_state();
1030 let mut provider = MockWarpSyncProvider::<Block>::new();
1031 let mut verifier = MockVerifier::<Block>::new();
1032 verifier.expect_next_proof_context().returning(|| Hash::random());
1033 verifier
1034 .expect_verify()
1035 .returning(|_| unreachable!("verify should not be called in this test"));
1036 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1037 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1038 let mut warp_sync = WarpSync::new(
1039 Arc::new(client),
1040 config,
1041 None,
1042 Arc::new(MockBlockDownloader::new()),
1043 None,
1044 );
1045
1046 for best_number in 1..11 {
1047 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1048 }
1049
1050 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1051 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1052 }
1053 }
1054
1055 #[test]
1056 fn min_best_number_peer_is_scheduled() {
1057 for _ in 0..10 {
1058 let client = mock_client_without_state();
1059 let mut provider = MockWarpSyncProvider::<Block>::new();
1060 let mut verifier = MockVerifier::<Block>::new();
1061 verifier.expect_next_proof_context().returning(|| Hash::random());
1062 verifier
1063 .expect_verify()
1064 .returning(|_| unreachable!("verify should not be called in this test"));
1065 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1066 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1067 let mut warp_sync = WarpSync::new(
1068 Arc::new(client),
1069 config,
1070 None,
1071 Arc::new(MockBlockDownloader::new()),
1072 None,
1073 );
1074
1075 for best_number in 1..11 {
1076 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1077 }
1078
1079 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1080 assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1081 }
1082 }
1083
1084 #[test]
1085 fn backedoff_number_peer_is_not_scheduled() {
1086 let client = mock_client_without_state();
1087 let mut provider = MockWarpSyncProvider::<Block>::new();
1088 let mut verifier = MockVerifier::<Block>::new();
1089 verifier.expect_next_proof_context().returning(|| Hash::random());
1090 verifier
1091 .expect_verify()
1092 .returning(|_| unreachable!("verify should not be called in this test"));
1093 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1094 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1095 let mut warp_sync = WarpSync::new(
1096 Arc::new(client),
1097 config,
1098 None,
1099 Arc::new(MockBlockDownloader::new()),
1100 None,
1101 );
1102
1103 for best_number in 1..11 {
1104 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1105 }
1106
1107 let ninth_peer =
1108 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1109 let tenth_peer =
1110 *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1111
1112 warp_sync.remove_peer(&tenth_peer);
1114 assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1115
1116 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1117 let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1118 assert_eq!(tenth_peer, peer_id.unwrap());
1119 warp_sync.remove_peer(&tenth_peer);
1120
1121 assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1123
1124 warp_sync.add_peer(tenth_peer, H256::random(), 10);
1126 let peer_id: Option<PeerId> =
1127 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1128 assert!(peer_id.is_none());
1129
1130 let peer_id: Option<PeerId> =
1132 warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1133 assert_eq!(ninth_peer, peer_id.unwrap());
1134 }
1135
1136 #[test]
1137 fn no_warp_proof_request_in_another_phase() {
1138 let client = mock_client_without_state();
1139 let mut provider = MockWarpSyncProvider::<Block>::new();
1140 let mut verifier = MockVerifier::<Block>::new();
1141 verifier.expect_next_proof_context().returning(|| Hash::random());
1142 verifier
1143 .expect_verify()
1144 .returning(|_| unreachable!("verify should not be called in this test"));
1145 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1146 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1147 let mut warp_sync = WarpSync::new(
1148 Arc::new(client),
1149 config,
1150 Some(ProtocolName::Static("")),
1151 Arc::new(MockBlockDownloader::new()),
1152 None,
1153 );
1154
1155 for best_number in 1..11 {
1157 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1158 }
1159
1160 warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1162 1,
1163 Default::default(),
1164 Default::default(),
1165 Default::default(),
1166 Default::default(),
1167 ));
1168
1169 assert!(warp_sync.warp_proof_request().is_none());
1171 }
1172
1173 #[test]
1174 fn warp_proof_request_starts_at_last_hash() {
1175 let client = mock_client_without_state();
1176 let mut provider = MockWarpSyncProvider::<Block>::new();
1177 let mut verifier = MockVerifier::<Block>::new();
1178 let known_last_hash = Hash::random();
1179 verifier.expect_next_proof_context().returning(move || known_last_hash);
1180 verifier
1181 .expect_verify()
1182 .returning(|_| unreachable!("verify should not be called in this test"));
1183 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1184 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1185 let mut warp_sync = WarpSync::new(
1186 Arc::new(client),
1187 config,
1188 Some(ProtocolName::Static("")),
1189 Arc::new(MockBlockDownloader::new()),
1190 None,
1191 );
1192
1193 for best_number in 1..11 {
1195 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1196 }
1197 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1198
1199 let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1200 assert_eq!(request.begin, known_last_hash);
1201 }
1202
1203 #[test]
1204 fn no_parallel_warp_proof_requests() {
1205 let client = mock_client_without_state();
1206 let mut provider = MockWarpSyncProvider::<Block>::new();
1207 let mut verifier = MockVerifier::<Block>::new();
1208 verifier.expect_next_proof_context().returning(|| Hash::random());
1209 verifier
1210 .expect_verify()
1211 .returning(|_| unreachable!("verify should not be called in this test"));
1212 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1213 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1214 let mut warp_sync = WarpSync::new(
1215 Arc::new(client),
1216 config,
1217 Some(ProtocolName::Static("")),
1218 Arc::new(MockBlockDownloader::new()),
1219 None,
1220 );
1221
1222 for best_number in 1..11 {
1224 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1225 }
1226 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1227
1228 assert!(warp_sync.warp_proof_request().is_some());
1230 assert!(warp_sync.warp_proof_request().is_none());
1232 }
1233
1234 #[test]
1235 fn bad_warp_proof_response_drops_peer() {
1236 let client = mock_client_without_state();
1237 let mut provider = MockWarpSyncProvider::<Block>::new();
1238 let mut verifier = MockVerifier::<Block>::new();
1239 verifier.expect_next_proof_context().returning(|| Hash::random());
1240 verifier.expect_verify().return_once(|_proof| {
1242 Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1243 });
1244 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1245 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1246 let mut warp_sync = WarpSync::new(
1247 Arc::new(client),
1248 config,
1249 Some(ProtocolName::Static("")),
1250 Arc::new(MockBlockDownloader::new()),
1251 None,
1252 );
1253
1254 for best_number in 1..11 {
1256 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1257 }
1258 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1259
1260 let network_provider = NetworkServiceProvider::new();
1261 let network_handle = network_provider.handle();
1262
1263 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1265 assert_eq!(actions.len(), 1);
1266 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1267 panic!("Invalid action");
1268 };
1269
1270 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1271
1272 let actions = std::mem::take(&mut warp_sync.actions);
1274 assert_eq!(actions.len(), 1);
1275 assert!(matches!(
1276 actions[0],
1277 SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1278 ));
1279 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1280 }
1281
1282 #[test]
1283 fn partial_warp_proof_doesnt_advance_phase() {
1284 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1285 let mut provider = MockWarpSyncProvider::<Block>::new();
1286 let target_block = BlockBuilderBuilder::new(&*client)
1287 .on_parent_block(client.chain_info().best_hash)
1288 .with_parent_block_number(client.chain_info().best_number)
1289 .build()
1290 .unwrap()
1291 .build()
1292 .unwrap()
1293 .block;
1294 let target_header = target_block.header().clone();
1295 let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1296 let mut verifier = MockVerifier::<Block>::new();
1298 let context = client.info().genesis_hash;
1299 verifier.expect_next_proof_context().returning(move || context);
1300 let header_for_verify = target_header.clone();
1301 let just_for_verify = justifications.clone();
1302 verifier.expect_verify().return_once(move |_proof| {
1303 Ok(VerificationResult::Partial(vec![(
1304 header_for_verify.clone(),
1305 just_for_verify.clone(),
1306 )]))
1307 });
1308 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1309 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1310 let mut warp_sync = WarpSync::new(
1311 client,
1312 config,
1313 Some(ProtocolName::Static("")),
1314 Arc::new(MockBlockDownloader::new()),
1315 None,
1316 );
1317
1318 for best_number in 1..11 {
1320 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1321 }
1322 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1323
1324 let network_provider = NetworkServiceProvider::new();
1325 let network_handle = network_provider.handle();
1326
1327 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1329 assert_eq!(actions.len(), 1);
1330 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1331 panic!("Invalid action");
1332 };
1333
1334 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1335
1336 assert_eq!(warp_sync.actions.len(), 1);
1337 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1338 else {
1339 panic!("Expected `ImportBlocks` action.");
1340 };
1341 assert_eq!(origin, BlockOrigin::WarpSync);
1342 assert_eq!(blocks.len(), 1);
1343 let import_block = blocks.pop().unwrap();
1344 assert_eq!(
1345 import_block,
1346 IncomingBlock {
1347 hash: target_header.hash(),
1348 header: Some(target_header),
1349 body: None,
1350 indexed_body: None,
1351 justifications: Some(justifications),
1352 origin: Some(request_peer_id.into()),
1353 allow_missing_state: true,
1354 skip_execution: true,
1355 import_existing: false,
1356 state: None,
1357 }
1358 );
1359 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1360 }
1361
1362 #[test]
1363 fn complete_warp_proof_advances_phase() {
1364 subsoil::tracing::try_init_simple();
1366
1367 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1368 let mut provider = MockWarpSyncProvider::<Block>::new();
1369
1370 let warp_synced_header = <<Block as BlockT>::Header as HeaderT>::new(
1374 1,
1375 Default::default(),
1376 Default::default(),
1377 client.chain_info().best_hash,
1378 Default::default(),
1379 );
1380
1381 let target_header = <<Block as BlockT>::Header as HeaderT>::new(
1383 2,
1384 Default::default(),
1385 Default::default(),
1386 warp_synced_header.hash(),
1387 Default::default(),
1388 );
1389 let warp_justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1390 let target_justifications =
1391 Justifications::new(vec![(TEST_ENGINE_ID, vec![6, 7, 8, 9, 10])]);
1392
1393 let mut verifier = MockVerifier::<Block>::new();
1395 let context = client.info().genesis_hash;
1396 verifier.expect_next_proof_context().returning(move || context);
1397 let warp_synced_header_for_verify = warp_synced_header.clone();
1398 let warp_just_for_verify = warp_justifications.clone();
1399 let target_header_for_verify = target_header.clone();
1400 let target_just_for_verify = target_justifications.clone();
1401 verifier.expect_verify().return_once(move |_proof| {
1402 Ok(VerificationResult::Complete(
1403 target_header_for_verify.clone(),
1404 vec![
1405 (warp_synced_header_for_verify, warp_just_for_verify),
1406 (target_header_for_verify, target_just_for_verify),
1407 ],
1408 ))
1409 });
1410 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1411 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1412 let mut warp_sync = WarpSync::new(
1413 client,
1414 config,
1415 Some(ProtocolName::Static("")),
1416 Arc::new(MockBlockDownloader::new()),
1417 None,
1418 );
1419
1420 for best_number in 1..11 {
1422 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1423 }
1424 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1425
1426 let network_provider = NetworkServiceProvider::new();
1427 let network_handle = network_provider.handle();
1428
1429 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1431 assert_eq!(actions.len(), 1);
1432 let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1433 panic!("Invalid action.");
1434 };
1435
1436 warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1437
1438 assert_eq!(warp_sync.actions.len(), 1);
1439 let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1440 else {
1441 panic!("Expected `ImportBlocks` action.");
1442 };
1443 assert_eq!(origin, BlockOrigin::WarpSync);
1444 assert_eq!(blocks.len(), 1);
1446 let import_block = blocks.pop().unwrap();
1447 assert_eq!(
1448 import_block,
1449 IncomingBlock {
1450 hash: warp_synced_header.hash(),
1451 header: Some(warp_synced_header),
1452 body: None,
1453 indexed_body: None,
1454 justifications: Some(warp_justifications),
1455 origin: Some(request_peer_id.into()),
1456 allow_missing_state: true,
1457 skip_execution: true,
1458 import_existing: false,
1459 state: None,
1460 }
1461 );
1462 assert!(matches!(warp_sync.phase, Phase::TargetBlock(header) if header == target_header));
1463 }
1464
1465 #[test]
1466 fn no_target_block_requests_in_another_phase() {
1467 let client = mock_client_without_state();
1468 let mut provider = MockWarpSyncProvider::<Block>::new();
1469 let mut verifier = MockVerifier::<Block>::new();
1470 verifier.expect_next_proof_context().returning(|| Hash::random());
1471 verifier
1472 .expect_verify()
1473 .returning(|_| unreachable!("verify should not be called in this test"));
1474 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1475 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1476 let mut warp_sync = WarpSync::new(
1477 Arc::new(client),
1478 config,
1479 None,
1480 Arc::new(MockBlockDownloader::new()),
1481 None,
1482 );
1483
1484 for best_number in 1..11 {
1486 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1487 }
1488 assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1490
1491 assert!(warp_sync.target_block_request().is_none());
1493 }
1494
1495 #[test]
1496 fn target_block_request_is_correct() {
1497 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1498 let mut provider = MockWarpSyncProvider::<Block>::new();
1499 let mut verifier = MockVerifier::<Block>::new();
1500 let header_for_ctx = client.info().genesis_hash;
1501 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1502 let target_block = BlockBuilderBuilder::new(&*client)
1503 .on_parent_block(client.chain_info().best_hash)
1504 .with_parent_block_number(client.chain_info().best_number)
1505 .build()
1506 .unwrap()
1507 .build()
1508 .unwrap()
1509 .block;
1510 let target_header = target_block.header().clone();
1511 let header_for_verify = target_header.clone();
1513 verifier.expect_verify().return_once(move |_proof| {
1514 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1515 });
1516 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1517 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1518 let mut warp_sync =
1519 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1520
1521 for best_number in 1..11 {
1523 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1524 }
1525
1526 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1528
1529 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1530 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1531 assert_eq!(
1532 request.fields,
1533 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1534 );
1535 assert_eq!(request.max, Some(1));
1536 }
1537
1538 #[test]
1539 fn externally_set_target_block_is_requested() {
1540 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1541 let target_block = BlockBuilderBuilder::new(&*client)
1542 .on_parent_block(client.chain_info().best_hash)
1543 .with_parent_block_number(client.chain_info().best_number)
1544 .build()
1545 .unwrap()
1546 .build()
1547 .unwrap()
1548 .block;
1549 let target_header = target_block.header().clone();
1550 let config = WarpSyncConfig::WithTarget(target_header);
1551 let mut warp_sync =
1552 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1553
1554 for best_number in 1..11 {
1556 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1557 }
1558
1559 assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1560
1561 let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1562 assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1563 assert_eq!(
1564 request.fields,
1565 BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1566 );
1567 assert_eq!(request.max, Some(1));
1568 }
1569
1570 #[test]
1571 fn no_parallel_target_block_requests() {
1572 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1573 let mut provider = MockWarpSyncProvider::<Block>::new();
1574 let mut verifier = MockVerifier::<Block>::new();
1575 let header_for_ctx = client.info().genesis_hash;
1576 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1577 let target_block = BlockBuilderBuilder::new(&*client)
1578 .on_parent_block(client.chain_info().best_hash)
1579 .with_parent_block_number(client.chain_info().best_number)
1580 .build()
1581 .unwrap()
1582 .build()
1583 .unwrap()
1584 .block;
1585 let target_header = target_block.header().clone();
1586 let header_for_verify = target_header.clone();
1588 verifier.expect_verify().return_once(move |_proof| {
1589 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1590 });
1591 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1592 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1593 let mut warp_sync =
1594 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1595
1596 for best_number in 1..11 {
1598 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1599 }
1600
1601 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1603
1604 assert!(warp_sync.target_block_request().is_some());
1606 assert!(warp_sync.target_block_request().is_none());
1608 }
1609
1610 #[test]
1611 fn target_block_response_with_no_blocks_drops_peer() {
1612 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1613 let mut provider = MockWarpSyncProvider::<Block>::new();
1614 let mut verifier = MockVerifier::<Block>::new();
1615 let header_for_ctx = client.info().genesis_hash;
1616 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1617 let target_block = BlockBuilderBuilder::new(&*client)
1618 .on_parent_block(client.chain_info().best_hash)
1619 .with_parent_block_number(client.chain_info().best_number)
1620 .build()
1621 .unwrap()
1622 .build()
1623 .unwrap()
1624 .block;
1625 let target_header = target_block.header().clone();
1626 let header_for_verify = target_header.clone();
1628 verifier.expect_verify().return_once(move |_proof| {
1629 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1630 });
1631 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1632 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1633 let mut warp_sync =
1634 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1635
1636 for best_number in 1..11 {
1638 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1639 }
1640
1641 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1643
1644 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1645
1646 let response = Vec::new();
1648 assert!(matches!(
1650 warp_sync.on_block_response_inner(peer_id, request, response),
1651 Err(BadPeer(id, _rep)) if id == peer_id,
1652 ));
1653 }
1654
1655 #[test]
1656 fn target_block_response_with_extra_blocks_drops_peer() {
1657 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1658 let mut provider = MockWarpSyncProvider::<Block>::new();
1659 let mut verifier = MockVerifier::<Block>::new();
1660 let header_for_ctx = client.info().genesis_hash;
1661 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1662 let target_block = BlockBuilderBuilder::new(&*client)
1663 .on_parent_block(client.chain_info().best_hash)
1664 .with_parent_block_number(client.chain_info().best_number)
1665 .build()
1666 .unwrap()
1667 .build()
1668 .unwrap()
1669 .block;
1670
1671 let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1672 .on_parent_block(client.chain_info().best_hash)
1673 .with_parent_block_number(client.chain_info().best_number)
1674 .build()
1675 .unwrap();
1676 extra_block_builder
1677 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1678 .unwrap();
1679 let extra_block = extra_block_builder.build().unwrap().block;
1680
1681 let target_header = target_block.header().clone();
1682 let header_for_verify = target_header.clone();
1684 verifier.expect_verify().return_once(move |_proof| {
1685 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1686 });
1687 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1688 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1689 let mut warp_sync =
1690 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1691
1692 for best_number in 1..11 {
1694 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1695 }
1696
1697 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1699
1700 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1701
1702 let response = vec![
1704 BlockData::<Block> {
1705 hash: target_block.header().hash(),
1706 header: Some(target_block.header().clone()),
1707 body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1708 indexed_body: None,
1709 receipt: None,
1710 message_queue: None,
1711 justification: None,
1712 justifications: None,
1713 },
1714 BlockData::<Block> {
1715 hash: extra_block.header().hash(),
1716 header: Some(extra_block.header().clone()),
1717 body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1718 indexed_body: None,
1719 receipt: None,
1720 message_queue: None,
1721 justification: None,
1722 justifications: None,
1723 },
1724 ];
1725 assert!(matches!(
1727 warp_sync.on_block_response_inner(peer_id, request, response),
1728 Err(BadPeer(id, _rep)) if id == peer_id,
1729 ));
1730 }
1731
1732 #[test]
1733 fn target_block_response_with_wrong_block_drops_peer() {
1734 subsoil::tracing::try_init_simple();
1735
1736 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1737 let mut provider = MockWarpSyncProvider::<Block>::new();
1738 let mut verifier = MockVerifier::<Block>::new();
1739 let header_for_ctx = client.info().genesis_hash;
1740 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1741 let target_block = BlockBuilderBuilder::new(&*client)
1742 .on_parent_block(client.chain_info().best_hash)
1743 .with_parent_block_number(client.chain_info().best_number)
1744 .build()
1745 .unwrap()
1746 .build()
1747 .unwrap()
1748 .block;
1749
1750 let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1751 .on_parent_block(client.chain_info().best_hash)
1752 .with_parent_block_number(client.chain_info().best_number)
1753 .build()
1754 .unwrap();
1755 wrong_block_builder
1756 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1757 .unwrap();
1758 let wrong_block = wrong_block_builder.build().unwrap().block;
1759
1760 let target_header = target_block.header().clone();
1761 let header_for_verify = target_header.clone();
1763 verifier.expect_verify().return_once(move |_proof| {
1764 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1765 });
1766 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1767 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1768 let mut warp_sync =
1769 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1770
1771 for best_number in 1..11 {
1773 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1774 }
1775
1776 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1778
1779 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1780
1781 let response = vec![BlockData::<Block> {
1783 hash: wrong_block.header().hash(),
1784 header: Some(wrong_block.header().clone()),
1785 body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1786 indexed_body: None,
1787 receipt: None,
1788 message_queue: None,
1789 justification: None,
1790 justifications: None,
1791 }];
1792 assert!(matches!(
1794 warp_sync.on_block_response_inner(peer_id, request, response),
1795 Err(BadPeer(id, _rep)) if id == peer_id,
1796 ));
1797 }
1798
1799 #[test]
1800 fn correct_target_block_response_sets_strategy_result() {
1801 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1802 let mut provider = MockWarpSyncProvider::<Block>::new();
1803 let mut verifier = MockVerifier::<Block>::new();
1804 let header_for_ctx = client.info().genesis_hash;
1805 verifier.expect_next_proof_context().returning(move || header_for_ctx);
1806 let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1807 .on_parent_block(client.chain_info().best_hash)
1808 .with_parent_block_number(client.chain_info().best_number)
1809 .build()
1810 .unwrap();
1811 target_block_builder
1812 .push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1813 .unwrap();
1814 let target_block = target_block_builder.build().unwrap().block;
1815 let target_header = target_block.header().clone();
1816 let header_for_verify = target_header.clone();
1818 verifier.expect_verify().return_once(move |_proof| {
1819 Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1820 });
1821 provider.expect_create_verifier().return_once(move || Box::new(verifier));
1822 let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1823 let mut warp_sync =
1824 WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1825
1826 for best_number in 1..11 {
1828 warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1829 }
1830
1831 warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1833
1834 let (peer_id, request) = warp_sync.target_block_request().unwrap();
1835
1836 let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1838 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1839 let response = vec![BlockData::<Block> {
1840 hash: target_block.header().hash(),
1841 header: Some(target_block.header().clone()),
1842 body: body.clone(),
1843 indexed_body: None,
1844 receipt: None,
1845 message_queue: None,
1846 justification: None,
1847 justifications: justifications.clone(),
1848 }];
1849
1850 assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1851
1852 let network_provider = NetworkServiceProvider::new();
1853 let network_handle = network_provider.handle();
1854
1855 let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1857 assert_eq!(actions.len(), 1);
1858 assert!(matches!(actions[0], SyncingAction::Finished));
1859
1860 let result = warp_sync.take_result().unwrap();
1862 assert_eq!(result.target_header, *target_block.header());
1863 assert_eq!(result.target_body, body);
1864 assert_eq!(result.target_justifications, justifications);
1865 }
1866}