sc_network_sync/strategy/
warp.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Warp syncing strategy. Bootstraps chain by downloading warp proofs and state.
20
21pub use sp_consensus_grandpa::{AuthorityList, SetId};
22
23use crate::{
24	block_relay_protocol::{BlockDownloader, BlockResponseError},
25	service::network::NetworkServiceHandle,
26	strategy::{
27		chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
28		SyncingAction,
29	},
30	types::{BadPeer, SyncState, SyncStatus},
31	LOG_TARGET,
32};
33use codec::{Decode, Encode};
34use futures::{channel::oneshot, FutureExt};
35use log::{debug, error, trace, warn};
36use sc_network::{IfDisconnected, ProtocolName};
37use sc_network_common::sync::message::{
38	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
39};
40use sc_network_types::PeerId;
41use sp_blockchain::HeaderBackend;
42use sp_runtime::{
43	traits::{Block as BlockT, Header, NumberFor, Zero},
44	Justifications, SaturatedConversion,
45};
46use std::{any::Any, collections::HashMap, fmt, sync::Arc};
47
48/// Number of peers that need to be connected before warp sync is started.
49const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
50
51/// Scale-encoded warp sync proof response.
52pub struct EncodedProof(pub Vec<u8>);
53
54/// Warp sync request
55#[derive(Encode, Decode, Debug, Clone)]
56pub struct WarpProofRequest<B: BlockT> {
57	/// Start collecting proofs from this block.
58	pub begin: B::Hash,
59}
60
61/// Proof verification result.
62pub enum VerificationResult<Block: BlockT> {
63	/// Proof is valid, but the target was not reached.
64	Partial(SetId, AuthorityList, Block::Hash),
65	/// Target finality is proved.
66	Complete(SetId, AuthorityList, Block::Header),
67}
68
69/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
70pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
71	/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
72	/// size is reached.
73	fn generate(
74		&self,
75		start: Block::Hash,
76	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
77	/// Verify warp proof against current set of authorities.
78	fn verify(
79		&self,
80		proof: &EncodedProof,
81		set_id: SetId,
82		authorities: AuthorityList,
83	) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
84	/// Get current list of authorities. This is supposed to be genesis authorities when starting
85	/// sync.
86	fn current_authorities(&self) -> AuthorityList;
87}
88
89mod rep {
90	use sc_network::ReputationChange as Rep;
91
92	/// Unexpected response received form a peer
93	pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
94
95	/// Peer provided invalid warp proof data
96	pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
97
98	/// Peer did not provide us with advertised block data.
99	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
100
101	/// Reputation change for peers which send us non-requested block data.
102	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
103
104	/// Reputation change for peers which send us a block which we fail to verify.
105	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
106
107	/// We received a message that failed to decode.
108	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
109}
110
111/// Reported warp sync phase.
112#[derive(Clone, Eq, PartialEq, Debug)]
113pub enum WarpSyncPhase<Block: BlockT> {
114	/// Waiting for peers to connect.
115	AwaitingPeers { required_peers: usize },
116	/// Downloading and verifying grandpa warp proofs.
117	DownloadingWarpProofs,
118	/// Downloading target block.
119	DownloadingTargetBlock,
120	/// Downloading state data.
121	DownloadingState,
122	/// Importing state.
123	ImportingState,
124	/// Downloading block history.
125	DownloadingBlocks(NumberFor<Block>),
126	/// Warp sync is complete.
127	Complete,
128}
129
130impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
131	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132		match self {
133			Self::AwaitingPeers { required_peers } =>
134				write!(f, "Waiting for {required_peers} peers to be connected"),
135			Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
136			Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
137			Self::DownloadingState => write!(f, "Downloading state"),
138			Self::ImportingState => write!(f, "Importing state"),
139			Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
140			Self::Complete => write!(f, "Warp sync is complete"),
141		}
142	}
143}
144
145/// Reported warp sync progress.
146#[derive(Clone, Eq, PartialEq, Debug)]
147pub struct WarpSyncProgress<Block: BlockT> {
148	/// Estimated download percentage.
149	pub phase: WarpSyncPhase<Block>,
150	/// Total bytes downloaded so far.
151	pub total_bytes: u64,
152}
153
154/// Warp sync configuration as accepted by [`WarpSync`].
155pub enum WarpSyncConfig<Block: BlockT> {
156	/// Standard warp sync for the chain.
157	WithProvider(Arc<dyn WarpSyncProvider<Block>>),
158	/// Skip downloading proofs and use provided header of the state that should be downloaded.
159	///
160	/// It is expected that the header provider ensures that the header is trusted.
161	WithTarget(<Block as BlockT>::Header),
162}
163
164/// Warp sync phase used by warp sync state machine.
165enum Phase<B: BlockT> {
166	/// Waiting for enough peers to connect.
167	WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
168	/// Downloading warp proofs.
169	WarpProof {
170		set_id: SetId,
171		authorities: AuthorityList,
172		last_hash: B::Hash,
173		warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
174	},
175	/// Downloading target block.
176	TargetBlock(B::Header),
177	/// Warp sync is complete.
178	Complete,
179}
180
181enum PeerState {
182	Available,
183	DownloadingProofs,
184	DownloadingTargetBlock,
185}
186
187impl PeerState {
188	fn is_available(&self) -> bool {
189		matches!(self, PeerState::Available)
190	}
191}
192
193struct Peer<B: BlockT> {
194	best_number: NumberFor<B>,
195	state: PeerState,
196}
197
198pub struct WarpSyncResult<B: BlockT> {
199	pub target_header: B::Header,
200	pub target_body: Option<Vec<B::Extrinsic>>,
201	pub target_justifications: Option<Justifications>,
202}
203
204/// Warp sync state machine. Accumulates warp proofs and state.
205pub struct WarpSync<B: BlockT, Client> {
206	phase: Phase<B>,
207	client: Arc<Client>,
208	total_proof_bytes: u64,
209	total_state_bytes: u64,
210	peers: HashMap<PeerId, Peer<B>>,
211	disconnected_peers: DisconnectedPeers,
212	protocol_name: Option<ProtocolName>,
213	block_downloader: Arc<dyn BlockDownloader<B>>,
214	actions: Vec<SyncingAction<B>>,
215	result: Option<WarpSyncResult<B>>,
216	/// Number of peers that need to be connected before warp sync is started.
217	min_peers_to_start_warp_sync: usize,
218}
219
220impl<B, Client> WarpSync<B, Client>
221where
222	B: BlockT,
223	Client: HeaderBackend<B> + 'static,
224{
225	/// Strategy key used by warp sync.
226	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
227
228	/// Create a new instance. When passing a warp sync provider we will be checking for proof and
229	/// authorities. Alternatively we can pass a target block when we want to skip downloading
230	/// proofs, in this case we will continue polling until the target block is known.
231	pub fn new(
232		client: Arc<Client>,
233		warp_sync_config: WarpSyncConfig<B>,
234		protocol_name: Option<ProtocolName>,
235		block_downloader: Arc<dyn BlockDownloader<B>>,
236		min_peers_to_start_warp_sync: Option<usize>,
237	) -> Self {
238		let min_peers_to_start_warp_sync =
239			min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
240		if client.info().finalized_state.is_some() {
241			error!(
242				target: LOG_TARGET,
243				"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
244			);
245			return Self {
246				client,
247				phase: Phase::Complete,
248				total_proof_bytes: 0,
249				total_state_bytes: 0,
250				peers: HashMap::new(),
251				disconnected_peers: DisconnectedPeers::new(),
252				protocol_name,
253				block_downloader,
254				actions: vec![SyncingAction::Finished],
255				result: None,
256				min_peers_to_start_warp_sync,
257			}
258		}
259
260		let phase = match warp_sync_config {
261			WarpSyncConfig::WithProvider(warp_sync_provider) =>
262				Phase::WaitingForPeers { warp_sync_provider },
263			WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
264		};
265
266		Self {
267			client,
268			phase,
269			total_proof_bytes: 0,
270			total_state_bytes: 0,
271			peers: HashMap::new(),
272			disconnected_peers: DisconnectedPeers::new(),
273			protocol_name,
274			block_downloader,
275			actions: Vec::new(),
276			result: None,
277			min_peers_to_start_warp_sync,
278		}
279	}
280
281	/// Notify that a new peer has connected.
282	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
283		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
284
285		self.try_to_start_warp_sync();
286	}
287
288	/// Notify that a peer has disconnected.
289	pub fn remove_peer(&mut self, peer_id: &PeerId) {
290		if let Some(state) = self.peers.remove(peer_id) {
291			if !state.state.is_available() {
292				if let Some(bad_peer) =
293					self.disconnected_peers.on_disconnect_during_request(*peer_id)
294				{
295					self.actions.push(SyncingAction::DropPeer(bad_peer));
296				}
297			}
298		}
299	}
300
301	/// Submit a validated block announcement.
302	///
303	/// Returns new best hash & best number of the peer if they are updated.
304	#[must_use]
305	pub fn on_validated_block_announce(
306		&mut self,
307		is_best: bool,
308		peer_id: PeerId,
309		announce: &BlockAnnounce<B::Header>,
310	) -> Option<(B::Hash, NumberFor<B>)> {
311		is_best.then(|| {
312			let best_number = *announce.header.number();
313			let best_hash = announce.header.hash();
314			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
315				peer.best_number = best_number;
316			}
317			// Let `SyncingEngine` know that we should update the peer info.
318			(best_hash, best_number)
319		})
320	}
321
322	/// Start warp sync as soon as we have enough peers.
323	fn try_to_start_warp_sync(&mut self) {
324		let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
325
326		if self.peers.len() < self.min_peers_to_start_warp_sync {
327			return
328		}
329
330		self.phase = Phase::WarpProof {
331			set_id: 0,
332			authorities: warp_sync_provider.current_authorities(),
333			last_hash: self.client.info().genesis_hash,
334			warp_sync_provider: Arc::clone(warp_sync_provider),
335		};
336		trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
337	}
338
339	pub fn on_generic_response(
340		&mut self,
341		peer_id: &PeerId,
342		protocol_name: ProtocolName,
343		response: Box<dyn Any + Send>,
344	) {
345		if &protocol_name == self.block_downloader.protocol_name() {
346			let Ok(response) = response
347				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
348			else {
349				warn!(target: LOG_TARGET, "Failed to downcast block response");
350				debug_assert!(false);
351				return;
352			};
353
354			let (request, response) = *response;
355			let blocks = match response {
356				Ok(blocks) => blocks,
357				Err(BlockResponseError::DecodeFailed(e)) => {
358					debug!(
359						target: LOG_TARGET,
360						"Failed to decode block response from peer {:?}: {:?}.",
361						peer_id,
362						e
363					);
364					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
365					return;
366				},
367				Err(BlockResponseError::ExtractionFailed(e)) => {
368					debug!(
369						target: LOG_TARGET,
370						"Failed to extract blocks from peer response {:?}: {:?}.",
371						peer_id,
372						e
373					);
374					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
375					return;
376				},
377			};
378
379			self.on_block_response(*peer_id, request, blocks);
380		} else {
381			let Ok(response) = response.downcast::<Vec<u8>>() else {
382				warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
383				debug_assert!(false);
384				return;
385			};
386
387			self.on_warp_proof_response(peer_id, EncodedProof(*response));
388		}
389	}
390
391	/// Process warp proof response.
392	pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
393		if let Some(peer) = self.peers.get_mut(peer_id) {
394			peer.state = PeerState::Available;
395		}
396
397		let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
398			&mut self.phase
399		else {
400			debug!(target: LOG_TARGET, "Unexpected warp proof response");
401			self.actions
402				.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
403			return
404		};
405
406		match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
407			Err(e) => {
408				debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
409				self.actions
410					.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
411			},
412			Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash)) => {
413				log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
414				*set_id = new_set_id;
415				*authorities = new_authorities;
416				*last_hash = new_last_hash;
417				self.total_proof_bytes += response.0.len() as u64;
418			},
419			Ok(VerificationResult::Complete(new_set_id, _, header)) => {
420				log::debug!(
421					target: LOG_TARGET,
422					"Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
423					new_set_id,
424					header.hash(),
425					header.number(),
426				);
427				self.total_proof_bytes += response.0.len() as u64;
428				self.phase = Phase::TargetBlock(header);
429			},
430		}
431	}
432
433	/// Process (target) block response.
434	pub fn on_block_response(
435		&mut self,
436		peer_id: PeerId,
437		request: BlockRequest<B>,
438		blocks: Vec<BlockData<B>>,
439	) {
440		if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
441			self.actions.push(SyncingAction::DropPeer(bad_peer));
442		}
443	}
444
445	fn on_block_response_inner(
446		&mut self,
447		peer_id: PeerId,
448		request: BlockRequest<B>,
449		mut blocks: Vec<BlockData<B>>,
450	) -> Result<(), BadPeer> {
451		if let Some(peer) = self.peers.get_mut(&peer_id) {
452			peer.state = PeerState::Available;
453		}
454
455		let Phase::TargetBlock(header) = &mut self.phase else {
456			debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
457			return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE))
458		};
459
460		if blocks.is_empty() {
461			debug!(
462				target: LOG_TARGET,
463				"Downloading target block failed: empty block response from {peer_id}",
464			);
465			return Err(BadPeer(peer_id, rep::NO_BLOCK))
466		}
467
468		if blocks.len() > 1 {
469			debug!(
470				target: LOG_TARGET,
471				"Too many blocks ({}) in warp target block response from {peer_id}",
472				blocks.len(),
473			);
474			return Err(BadPeer(peer_id, rep::NOT_REQUESTED))
475		}
476
477		validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
478
479		let block = blocks.pop().expect("`blocks` len checked above; qed");
480
481		let Some(block_header) = &block.header else {
482			debug!(
483				target: LOG_TARGET,
484				"Downloading target block failed: missing header in response from {peer_id}.",
485			);
486			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
487		};
488
489		if block_header != header {
490			debug!(
491				target: LOG_TARGET,
492				"Downloading target block failed: different header in response from {peer_id}.",
493			);
494			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
495		}
496
497		if block.body.is_none() {
498			debug!(
499				target: LOG_TARGET,
500				"Downloading target block failed: missing body in response from {peer_id}.",
501			);
502			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL))
503		}
504
505		self.result = Some(WarpSyncResult {
506			target_header: header.clone(),
507			target_body: block.body,
508			target_justifications: block.justifications,
509		});
510		self.phase = Phase::Complete;
511		self.actions.push(SyncingAction::Finished);
512		Ok(())
513	}
514
515	/// Reserve a peer for a request assigning `new_state`.
516	fn schedule_next_peer(
517		&mut self,
518		new_state: PeerState,
519		min_best_number: Option<NumberFor<B>>,
520	) -> Option<PeerId> {
521		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
522		if targets.is_empty() {
523			return None
524		}
525		targets.sort();
526		let median = targets[targets.len() / 2];
527		let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
528		// Find a random peer that is synced as much as peer majority and is above
529		// `min_best_number`.
530		for (peer_id, peer) in self.peers.iter_mut() {
531			if peer.state.is_available() &&
532				peer.best_number >= threshold &&
533				self.disconnected_peers.is_peer_available(peer_id)
534			{
535				peer.state = new_state;
536				return Some(*peer_id)
537			}
538		}
539		None
540	}
541
542	/// Produce warp proof request.
543	fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
544		let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
545
546		// Copy `last_hash` early to cut the borrowing tie.
547		let begin = *last_hash;
548
549		if self
550			.peers
551			.values()
552			.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
553		{
554			// Only one warp proof request at a time is possible.
555			return None
556		}
557
558		let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
559		trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
560
561		let request = WarpProofRequest { begin };
562
563		let Some(protocol_name) = self.protocol_name.clone() else {
564			warn!(
565				target: LOG_TARGET,
566				"Trying to send warp sync request when no protocol is configured {request:?}",
567			);
568			return None;
569		};
570
571		Some((peer_id, protocol_name, request))
572	}
573
574	/// Produce target block request.
575	fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
576		let Phase::TargetBlock(target_header) = &self.phase else { return None };
577
578		if self
579			.peers
580			.values()
581			.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
582		{
583			// Only one target block request at a time is possible.
584			return None
585		}
586
587		// Cut the borrowing tie.
588		let target_hash = target_header.hash();
589		let target_number = *target_header.number();
590
591		let peer_id =
592			self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
593
594		trace!(
595			target: LOG_TARGET,
596			"New target block request to {peer_id}, target: {} ({}).",
597			target_hash,
598			target_number,
599		);
600
601		Some((
602			peer_id,
603			BlockRequest::<B> {
604				id: 0,
605				fields: BlockAttributes::HEADER |
606					BlockAttributes::BODY |
607					BlockAttributes::JUSTIFICATION,
608				from: FromBlock::Hash(target_hash),
609				direction: Direction::Ascending,
610				max: Some(1),
611			},
612		))
613	}
614
615	/// Returns warp sync estimated progress (stage, bytes received).
616	pub fn progress(&self) -> WarpSyncProgress<B> {
617		match &self.phase {
618			Phase::WaitingForPeers { .. } => WarpSyncProgress {
619				phase: WarpSyncPhase::AwaitingPeers {
620					required_peers: self.min_peers_to_start_warp_sync,
621				},
622				total_bytes: self.total_proof_bytes,
623			},
624			Phase::WarpProof { .. } => WarpSyncProgress {
625				phase: WarpSyncPhase::DownloadingWarpProofs,
626				total_bytes: self.total_proof_bytes,
627			},
628			Phase::TargetBlock(_) => WarpSyncProgress {
629				phase: WarpSyncPhase::DownloadingTargetBlock,
630				total_bytes: self.total_proof_bytes,
631			},
632			Phase::Complete => WarpSyncProgress {
633				phase: WarpSyncPhase::Complete,
634				total_bytes: self.total_proof_bytes + self.total_state_bytes,
635			},
636		}
637	}
638
639	/// Get the number of peers known to warp sync.
640	pub fn num_peers(&self) -> usize {
641		self.peers.len()
642	}
643
644	/// Returns the current sync status.
645	pub fn status(&self) -> SyncStatus<B> {
646		SyncStatus {
647			state: match &self.phase {
648				Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
649				Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
650				Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
651				Phase::Complete => SyncState::Idle,
652			},
653			best_seen_block: match &self.phase {
654				Phase::WaitingForPeers { .. } => None,
655				Phase::WarpProof { .. } => None,
656				Phase::TargetBlock(header) => Some(*header.number()),
657				Phase::Complete => None,
658			},
659			num_peers: self.peers.len().saturated_into(),
660			queued_blocks: 0,
661			state_sync: None,
662			warp_sync: Some(self.progress()),
663		}
664	}
665
666	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
667	#[must_use]
668	pub fn actions(
669		&mut self,
670		network_service: &NetworkServiceHandle,
671	) -> impl Iterator<Item = SyncingAction<B>> {
672		let warp_proof_request =
673			self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
674				trace!(
675					target: LOG_TARGET,
676					"Created `WarpProofRequest` to {}, request: {:?}.",
677					peer_id,
678					request,
679				);
680
681				let (tx, rx) = oneshot::channel();
682
683				network_service.start_request(
684					peer_id,
685					protocol_name,
686					request.encode(),
687					tx,
688					IfDisconnected::ImmediateError,
689				);
690
691				SyncingAction::StartRequest {
692					peer_id,
693					key: Self::STRATEGY_KEY,
694					request: async move {
695						Ok(rx.await?.and_then(|(response, protocol_name)| {
696							Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
697						}))
698					}
699					.boxed(),
700					remove_obsolete: false,
701				}
702			});
703		self.actions.extend(warp_proof_request);
704
705		let target_block_request =
706			self.target_block_request().into_iter().map(|(peer_id, request)| {
707				let downloader = self.block_downloader.clone();
708
709				SyncingAction::StartRequest {
710					peer_id,
711					key: Self::STRATEGY_KEY,
712					request: async move {
713						Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
714							|(response, protocol_name)| {
715								let decoded_response =
716									downloader.block_response_into_blocks(&request, response);
717								let result =
718									Box::new((request, decoded_response)) as Box<dyn Any + Send>;
719								Ok((result, protocol_name))
720							},
721						))
722					}
723					.boxed(),
724					// Sending block request implies dropping obsolete pending response as we are
725					// not interested in it anymore.
726					remove_obsolete: true,
727				}
728			});
729		self.actions.extend(target_block_request);
730
731		std::mem::take(&mut self.actions).into_iter()
732	}
733
734	/// Take the result of finished warp sync, returning `None` if the sync was unsuccessful.
735	#[must_use]
736	pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
737		self.result.take()
738	}
739}
740
741#[cfg(test)]
742mod test {
743	use super::*;
744	use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
745	use sc_block_builder::BlockBuilderBuilder;
746	use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
747	use sp_consensus_grandpa::{AuthorityList, SetId};
748	use sp_core::H256;
749	use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
750	use std::{io::ErrorKind, sync::Arc};
751	use substrate_test_runtime_client::{
752		runtime::{Block, Hash},
753		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
754	};
755
756	mockall::mock! {
757		pub Client<B: BlockT> {}
758
759		impl<B: BlockT> HeaderBackend<B> for Client<B> {
760			fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
761			fn info(&self) -> Info<B>;
762			fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
763			fn number(
764				&self,
765				hash: B::Hash,
766			) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
767			fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
768		}
769	}
770
771	mockall::mock! {
772		pub WarpSyncProvider<B: BlockT> {}
773
774		impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
775			fn generate(
776				&self,
777				start: B::Hash,
778			) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
779			fn verify(
780				&self,
781				proof: &EncodedProof,
782				set_id: SetId,
783				authorities: AuthorityList,
784			) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
785			fn current_authorities(&self) -> AuthorityList;
786		}
787	}
788
789	fn mock_client_with_state() -> MockClient<Block> {
790		let mut client = MockClient::<Block>::new();
791		let genesis_hash = Hash::random();
792		client.expect_info().return_once(move || Info {
793			best_hash: genesis_hash,
794			best_number: 0,
795			genesis_hash,
796			finalized_hash: genesis_hash,
797			finalized_number: 0,
798			// We need some finalized state to render warp sync impossible.
799			finalized_state: Some((genesis_hash, 0)),
800			number_leaves: 0,
801			block_gap: None,
802		});
803
804		client
805	}
806
807	fn mock_client_without_state() -> MockClient<Block> {
808		let mut client = MockClient::<Block>::new();
809		let genesis_hash = Hash::random();
810		client.expect_info().returning(move || Info {
811			best_hash: genesis_hash,
812			best_number: 0,
813			genesis_hash,
814			finalized_hash: genesis_hash,
815			finalized_number: 0,
816			finalized_state: None,
817			number_leaves: 0,
818			block_gap: None,
819		});
820
821		client
822	}
823
824	#[test]
825	fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
826		let client = mock_client_with_state();
827		let provider = MockWarpSyncProvider::<Block>::new();
828		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
829		let mut warp_sync = WarpSync::new(
830			Arc::new(client),
831			config,
832			None,
833			Arc::new(MockBlockDownloader::new()),
834			None,
835		);
836
837		let network_provider = NetworkServiceProvider::new();
838		let network_handle = network_provider.handle();
839
840		// Warp sync instantly finishes
841		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
842		assert_eq!(actions.len(), 1);
843		assert!(matches!(actions[0], SyncingAction::Finished));
844
845		// ... with no result.
846		assert!(warp_sync.take_result().is_none());
847	}
848
849	#[test]
850	fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
851		let client = mock_client_with_state();
852		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
853			1,
854			Default::default(),
855			Default::default(),
856			Default::default(),
857			Default::default(),
858		));
859		let mut warp_sync = WarpSync::new(
860			Arc::new(client),
861			config,
862			None,
863			Arc::new(MockBlockDownloader::new()),
864			None,
865		);
866
867		let network_provider = NetworkServiceProvider::new();
868		let network_handle = network_provider.handle();
869
870		// Warp sync instantly finishes
871		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
872		assert_eq!(actions.len(), 1);
873		assert!(matches!(actions[0], SyncingAction::Finished));
874
875		// ... with no result.
876		assert!(warp_sync.take_result().is_none());
877	}
878
879	#[test]
880	fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
881		let client = mock_client_without_state();
882		let provider = MockWarpSyncProvider::<Block>::new();
883		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
884		let mut warp_sync = WarpSync::new(
885			Arc::new(client),
886			config,
887			None,
888			Arc::new(MockBlockDownloader::new()),
889			None,
890		);
891
892		let network_provider = NetworkServiceProvider::new();
893		let network_handle = network_provider.handle();
894
895		// No actions are emitted.
896		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
897	}
898
899	#[test]
900	fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
901		let client = mock_client_without_state();
902		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
903			1,
904			Default::default(),
905			Default::default(),
906			Default::default(),
907			Default::default(),
908		));
909		let mut warp_sync = WarpSync::new(
910			Arc::new(client),
911			config,
912			None,
913			Arc::new(MockBlockDownloader::new()),
914			None,
915		);
916
917		let network_provider = NetworkServiceProvider::new();
918		let network_handle = network_provider.handle();
919
920		// No actions are emitted.
921		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
922	}
923
924	#[test]
925	fn warp_sync_is_started_only_when_there_is_enough_peers() {
926		let client = mock_client_without_state();
927		let mut provider = MockWarpSyncProvider::<Block>::new();
928		provider
929			.expect_current_authorities()
930			.once()
931			.return_const(AuthorityList::default());
932		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
933		let mut warp_sync = WarpSync::new(
934			Arc::new(client),
935			config,
936			None,
937			Arc::new(MockBlockDownloader::new()),
938			None,
939		);
940
941		// Warp sync is not started when there is not enough peers.
942		for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
943			warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
944			assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
945		}
946
947		// Now we have enough peers and warp sync is started.
948		warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
949		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
950	}
951
952	#[test]
953	fn no_peer_is_scheduled_if_no_peers_connected() {
954		let client = mock_client_without_state();
955		let provider = MockWarpSyncProvider::<Block>::new();
956		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
957		let mut warp_sync = WarpSync::new(
958			Arc::new(client),
959			config,
960			None,
961			Arc::new(MockBlockDownloader::new()),
962			None,
963		);
964
965		assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
966	}
967
968	#[test]
969	fn enough_peers_are_used_in_tests() {
970		// Tests below use 10 peers. Fail early if it's less than a threshold for warp sync.
971		assert!(
972			10 >= MIN_PEERS_TO_START_WARP_SYNC,
973			"Tests must be updated to use that many initial peers.",
974		);
975	}
976
977	#[test]
978	fn at_least_median_synced_peer_is_scheduled() {
979		for _ in 0..100 {
980			let client = mock_client_without_state();
981			let mut provider = MockWarpSyncProvider::<Block>::new();
982			provider
983				.expect_current_authorities()
984				.once()
985				.return_const(AuthorityList::default());
986			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
987			let mut warp_sync = WarpSync::new(
988				Arc::new(client),
989				config,
990				None,
991				Arc::new(MockBlockDownloader::new()),
992				None,
993			);
994
995			for best_number in 1..11 {
996				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
997			}
998
999			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1000			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1001		}
1002	}
1003
1004	#[test]
1005	fn min_best_number_peer_is_scheduled() {
1006		for _ in 0..10 {
1007			let client = mock_client_without_state();
1008			let mut provider = MockWarpSyncProvider::<Block>::new();
1009			provider
1010				.expect_current_authorities()
1011				.once()
1012				.return_const(AuthorityList::default());
1013			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1014			let mut warp_sync = WarpSync::new(
1015				Arc::new(client),
1016				config,
1017				None,
1018				Arc::new(MockBlockDownloader::new()),
1019				None,
1020			);
1021
1022			for best_number in 1..11 {
1023				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1024			}
1025
1026			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1027			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1028		}
1029	}
1030
1031	#[test]
1032	fn backedoff_number_peer_is_not_scheduled() {
1033		let client = mock_client_without_state();
1034		let mut provider = MockWarpSyncProvider::<Block>::new();
1035		provider
1036			.expect_current_authorities()
1037			.once()
1038			.return_const(AuthorityList::default());
1039		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1040		let mut warp_sync = WarpSync::new(
1041			Arc::new(client),
1042			config,
1043			None,
1044			Arc::new(MockBlockDownloader::new()),
1045			None,
1046		);
1047
1048		for best_number in 1..11 {
1049			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1050		}
1051
1052		let ninth_peer =
1053			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1054		let tenth_peer =
1055			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1056
1057		// Disconnecting a peer without an inflight request has no effect on persistent states.
1058		warp_sync.remove_peer(&tenth_peer);
1059		assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1060
1061		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1062		let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1063		assert_eq!(tenth_peer, peer_id.unwrap());
1064		warp_sync.remove_peer(&tenth_peer);
1065
1066		// Peer is backed off.
1067		assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1068
1069		// No peer available for 10'th best block because of the backoff.
1070		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1071		let peer_id: Option<PeerId> =
1072			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1073		assert!(peer_id.is_none());
1074
1075		// Other requests can still happen.
1076		let peer_id: Option<PeerId> =
1077			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1078		assert_eq!(ninth_peer, peer_id.unwrap());
1079	}
1080
1081	#[test]
1082	fn no_warp_proof_request_in_another_phase() {
1083		let client = mock_client_without_state();
1084		let mut provider = MockWarpSyncProvider::<Block>::new();
1085		provider
1086			.expect_current_authorities()
1087			.once()
1088			.return_const(AuthorityList::default());
1089		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1090		let mut warp_sync = WarpSync::new(
1091			Arc::new(client),
1092			config,
1093			Some(ProtocolName::Static("")),
1094			Arc::new(MockBlockDownloader::new()),
1095			None,
1096		);
1097
1098		// Make sure we have enough peers to make a request.
1099		for best_number in 1..11 {
1100			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1101		}
1102
1103		// Manually set to another phase.
1104		warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1105			1,
1106			Default::default(),
1107			Default::default(),
1108			Default::default(),
1109			Default::default(),
1110		));
1111
1112		// No request is made.
1113		assert!(warp_sync.warp_proof_request().is_none());
1114	}
1115
1116	#[test]
1117	fn warp_proof_request_starts_at_last_hash() {
1118		let client = mock_client_without_state();
1119		let mut provider = MockWarpSyncProvider::<Block>::new();
1120		provider
1121			.expect_current_authorities()
1122			.once()
1123			.return_const(AuthorityList::default());
1124		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1125		let mut warp_sync = WarpSync::new(
1126			Arc::new(client),
1127			config,
1128			Some(ProtocolName::Static("")),
1129			Arc::new(MockBlockDownloader::new()),
1130			None,
1131		);
1132
1133		// Make sure we have enough peers to make a request.
1134		for best_number in 1..11 {
1135			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1136		}
1137		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1138
1139		let known_last_hash = Hash::random();
1140
1141		// Manually set last hash to known value.
1142		match &mut warp_sync.phase {
1143			Phase::WarpProof { last_hash, .. } => {
1144				*last_hash = known_last_hash;
1145			},
1146			_ => panic!("Invalid phase."),
1147		}
1148
1149		let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1150		assert_eq!(request.begin, known_last_hash);
1151	}
1152
1153	#[test]
1154	fn no_parallel_warp_proof_requests() {
1155		let client = mock_client_without_state();
1156		let mut provider = MockWarpSyncProvider::<Block>::new();
1157		provider
1158			.expect_current_authorities()
1159			.once()
1160			.return_const(AuthorityList::default());
1161		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1162		let mut warp_sync = WarpSync::new(
1163			Arc::new(client),
1164			config,
1165			Some(ProtocolName::Static("")),
1166			Arc::new(MockBlockDownloader::new()),
1167			None,
1168		);
1169
1170		// Make sure we have enough peers to make requests.
1171		for best_number in 1..11 {
1172			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1173		}
1174		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1175
1176		// First request is made.
1177		assert!(warp_sync.warp_proof_request().is_some());
1178		// Second request is not made.
1179		assert!(warp_sync.warp_proof_request().is_none());
1180	}
1181
1182	#[test]
1183	fn bad_warp_proof_response_drops_peer() {
1184		let client = mock_client_without_state();
1185		let mut provider = MockWarpSyncProvider::<Block>::new();
1186		provider
1187			.expect_current_authorities()
1188			.once()
1189			.return_const(AuthorityList::default());
1190		// Warp proof verification fails.
1191		provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
1192			Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1193		});
1194		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1195		let mut warp_sync = WarpSync::new(
1196			Arc::new(client),
1197			config,
1198			Some(ProtocolName::Static("")),
1199			Arc::new(MockBlockDownloader::new()),
1200			None,
1201		);
1202
1203		// Make sure we have enough peers to make a request.
1204		for best_number in 1..11 {
1205			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1206		}
1207		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1208
1209		let network_provider = NetworkServiceProvider::new();
1210		let network_handle = network_provider.handle();
1211
1212		// Consume `SendWarpProofRequest` action.
1213		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1214		assert_eq!(actions.len(), 1);
1215		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1216			panic!("Invalid action");
1217		};
1218
1219		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1220
1221		// We only interested in already generated actions, not new requests.
1222		let actions = std::mem::take(&mut warp_sync.actions);
1223		assert_eq!(actions.len(), 1);
1224		assert!(matches!(
1225			actions[0],
1226			SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1227		));
1228		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1229	}
1230
1231	#[test]
1232	fn partial_warp_proof_doesnt_advance_phase() {
1233		let client = mock_client_without_state();
1234		let mut provider = MockWarpSyncProvider::<Block>::new();
1235		provider
1236			.expect_current_authorities()
1237			.once()
1238			.return_const(AuthorityList::default());
1239		// Warp proof is partial.
1240		provider.expect_verify().return_once(|_proof, set_id, authorities| {
1241			Ok(VerificationResult::Partial(set_id, authorities, Hash::random()))
1242		});
1243		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1244		let mut warp_sync = WarpSync::new(
1245			Arc::new(client),
1246			config,
1247			Some(ProtocolName::Static("")),
1248			Arc::new(MockBlockDownloader::new()),
1249			None,
1250		);
1251
1252		// Make sure we have enough peers to make a request.
1253		for best_number in 1..11 {
1254			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1255		}
1256		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1257
1258		let network_provider = NetworkServiceProvider::new();
1259		let network_handle = network_provider.handle();
1260
1261		// Consume `SendWarpProofRequest` action.
1262		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1263		assert_eq!(actions.len(), 1);
1264		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1265			panic!("Invalid action");
1266		};
1267
1268		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1269
1270		assert!(warp_sync.actions.is_empty(), "No extra actions generated");
1271		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1272	}
1273
1274	#[test]
1275	fn complete_warp_proof_advances_phase() {
1276		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1277		let mut provider = MockWarpSyncProvider::<Block>::new();
1278		provider
1279			.expect_current_authorities()
1280			.once()
1281			.return_const(AuthorityList::default());
1282		let target_block = BlockBuilderBuilder::new(&*client)
1283			.on_parent_block(client.chain_info().best_hash)
1284			.with_parent_block_number(client.chain_info().best_number)
1285			.build()
1286			.unwrap()
1287			.build()
1288			.unwrap()
1289			.block;
1290		let target_header = target_block.header().clone();
1291		// Warp proof is complete.
1292		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1293			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1294		});
1295		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1296		let mut warp_sync = WarpSync::new(
1297			client,
1298			config,
1299			Some(ProtocolName::Static("")),
1300			Arc::new(MockBlockDownloader::new()),
1301			None,
1302		);
1303
1304		// Make sure we have enough peers to make a request.
1305		for best_number in 1..11 {
1306			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1307		}
1308		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1309
1310		let network_provider = NetworkServiceProvider::new();
1311		let network_handle = network_provider.handle();
1312
1313		// Consume `SendWarpProofRequest` action.
1314		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1315		assert_eq!(actions.len(), 1);
1316		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1317			panic!("Invalid action.");
1318		};
1319
1320		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1321
1322		assert!(warp_sync.actions.is_empty(), "No extra actions generated.");
1323		assert!(
1324			matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1325		);
1326	}
1327
1328	#[test]
1329	fn no_target_block_requests_in_another_phase() {
1330		let client = mock_client_without_state();
1331		let mut provider = MockWarpSyncProvider::<Block>::new();
1332		provider
1333			.expect_current_authorities()
1334			.once()
1335			.return_const(AuthorityList::default());
1336		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1337		let mut warp_sync = WarpSync::new(
1338			Arc::new(client),
1339			config,
1340			None,
1341			Arc::new(MockBlockDownloader::new()),
1342			None,
1343		);
1344
1345		// Make sure we have enough peers to make a request.
1346		for best_number in 1..11 {
1347			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1348		}
1349		// We are not in `Phase::TargetBlock`
1350		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1351
1352		// No request is made.
1353		assert!(warp_sync.target_block_request().is_none());
1354	}
1355
1356	#[test]
1357	fn target_block_request_is_correct() {
1358		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1359		let mut provider = MockWarpSyncProvider::<Block>::new();
1360		provider
1361			.expect_current_authorities()
1362			.once()
1363			.return_const(AuthorityList::default());
1364		let target_block = BlockBuilderBuilder::new(&*client)
1365			.on_parent_block(client.chain_info().best_hash)
1366			.with_parent_block_number(client.chain_info().best_number)
1367			.build()
1368			.unwrap()
1369			.build()
1370			.unwrap()
1371			.block;
1372		let target_header = target_block.header().clone();
1373		// Warp proof is complete.
1374		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1375			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1376		});
1377		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1378		let mut warp_sync =
1379			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1380
1381		// Make sure we have enough peers to make a request.
1382		for best_number in 1..11 {
1383			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1384		}
1385
1386		// Manually set `TargetBlock` phase.
1387		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1388
1389		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1390		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1391		assert_eq!(
1392			request.fields,
1393			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1394		);
1395		assert_eq!(request.max, Some(1));
1396	}
1397
1398	#[test]
1399	fn externally_set_target_block_is_requested() {
1400		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1401		let target_block = BlockBuilderBuilder::new(&*client)
1402			.on_parent_block(client.chain_info().best_hash)
1403			.with_parent_block_number(client.chain_info().best_number)
1404			.build()
1405			.unwrap()
1406			.build()
1407			.unwrap()
1408			.block;
1409		let target_header = target_block.header().clone();
1410		let config = WarpSyncConfig::WithTarget(target_header);
1411		let mut warp_sync =
1412			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1413
1414		// Make sure we have enough peers to make a request.
1415		for best_number in 1..11 {
1416			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1417		}
1418
1419		assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1420
1421		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1422		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1423		assert_eq!(
1424			request.fields,
1425			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1426		);
1427		assert_eq!(request.max, Some(1));
1428	}
1429
1430	#[test]
1431	fn no_parallel_target_block_requests() {
1432		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1433		let mut provider = MockWarpSyncProvider::<Block>::new();
1434		provider
1435			.expect_current_authorities()
1436			.once()
1437			.return_const(AuthorityList::default());
1438		let target_block = BlockBuilderBuilder::new(&*client)
1439			.on_parent_block(client.chain_info().best_hash)
1440			.with_parent_block_number(client.chain_info().best_number)
1441			.build()
1442			.unwrap()
1443			.build()
1444			.unwrap()
1445			.block;
1446		let target_header = target_block.header().clone();
1447		// Warp proof is complete.
1448		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1449			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1450		});
1451		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1452		let mut warp_sync =
1453			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1454
1455		// Make sure we have enough peers to make a request.
1456		for best_number in 1..11 {
1457			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1458		}
1459
1460		// Manually set `TargetBlock` phase.
1461		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1462
1463		// First target block request is made.
1464		assert!(warp_sync.target_block_request().is_some());
1465		// No parallel request is made.
1466		assert!(warp_sync.target_block_request().is_none());
1467	}
1468
1469	#[test]
1470	fn target_block_response_with_no_blocks_drops_peer() {
1471		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1472		let mut provider = MockWarpSyncProvider::<Block>::new();
1473		provider
1474			.expect_current_authorities()
1475			.once()
1476			.return_const(AuthorityList::default());
1477		let target_block = BlockBuilderBuilder::new(&*client)
1478			.on_parent_block(client.chain_info().best_hash)
1479			.with_parent_block_number(client.chain_info().best_number)
1480			.build()
1481			.unwrap()
1482			.build()
1483			.unwrap()
1484			.block;
1485		let target_header = target_block.header().clone();
1486		// Warp proof is complete.
1487		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1488			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1489		});
1490		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1491		let mut warp_sync =
1492			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1493
1494		// Make sure we have enough peers to make a request.
1495		for best_number in 1..11 {
1496			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1497		}
1498
1499		// Manually set `TargetBlock` phase.
1500		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1501
1502		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1503
1504		// Empty block response received.
1505		let response = Vec::new();
1506		// Peer is dropped.
1507		assert!(matches!(
1508			warp_sync.on_block_response_inner(peer_id, request, response),
1509			Err(BadPeer(id, _rep)) if id == peer_id,
1510		));
1511	}
1512
1513	#[test]
1514	fn target_block_response_with_extra_blocks_drops_peer() {
1515		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1516		let mut provider = MockWarpSyncProvider::<Block>::new();
1517		provider
1518			.expect_current_authorities()
1519			.once()
1520			.return_const(AuthorityList::default());
1521		let target_block = BlockBuilderBuilder::new(&*client)
1522			.on_parent_block(client.chain_info().best_hash)
1523			.with_parent_block_number(client.chain_info().best_number)
1524			.build()
1525			.unwrap()
1526			.build()
1527			.unwrap()
1528			.block;
1529
1530		let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1531			.on_parent_block(client.chain_info().best_hash)
1532			.with_parent_block_number(client.chain_info().best_number)
1533			.build()
1534			.unwrap();
1535		extra_block_builder
1536			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1537			.unwrap();
1538		let extra_block = extra_block_builder.build().unwrap().block;
1539
1540		let target_header = target_block.header().clone();
1541		// Warp proof is complete.
1542		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1543			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1544		});
1545		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1546		let mut warp_sync =
1547			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1548
1549		// Make sure we have enough peers to make a request.
1550		for best_number in 1..11 {
1551			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1552		}
1553
1554		// Manually set `TargetBlock` phase.
1555		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1556
1557		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1558
1559		// Block response with extra blocks received.
1560		let response = vec![
1561			BlockData::<Block> {
1562				hash: target_block.header().hash(),
1563				header: Some(target_block.header().clone()),
1564				body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1565				indexed_body: None,
1566				receipt: None,
1567				message_queue: None,
1568				justification: None,
1569				justifications: None,
1570			},
1571			BlockData::<Block> {
1572				hash: extra_block.header().hash(),
1573				header: Some(extra_block.header().clone()),
1574				body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1575				indexed_body: None,
1576				receipt: None,
1577				message_queue: None,
1578				justification: None,
1579				justifications: None,
1580			},
1581		];
1582		// Peer is dropped.
1583		assert!(matches!(
1584			warp_sync.on_block_response_inner(peer_id, request, response),
1585			Err(BadPeer(id, _rep)) if id == peer_id,
1586		));
1587	}
1588
1589	#[test]
1590	fn target_block_response_with_wrong_block_drops_peer() {
1591		sp_tracing::try_init_simple();
1592
1593		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1594		let mut provider = MockWarpSyncProvider::<Block>::new();
1595		provider
1596			.expect_current_authorities()
1597			.once()
1598			.return_const(AuthorityList::default());
1599		let target_block = BlockBuilderBuilder::new(&*client)
1600			.on_parent_block(client.chain_info().best_hash)
1601			.with_parent_block_number(client.chain_info().best_number)
1602			.build()
1603			.unwrap()
1604			.build()
1605			.unwrap()
1606			.block;
1607
1608		let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1609			.on_parent_block(client.chain_info().best_hash)
1610			.with_parent_block_number(client.chain_info().best_number)
1611			.build()
1612			.unwrap();
1613		wrong_block_builder
1614			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1615			.unwrap();
1616		let wrong_block = wrong_block_builder.build().unwrap().block;
1617
1618		let target_header = target_block.header().clone();
1619		// Warp proof is complete.
1620		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1621			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1622		});
1623		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1624		let mut warp_sync =
1625			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1626
1627		// Make sure we have enough peers to make a request.
1628		for best_number in 1..11 {
1629			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1630		}
1631
1632		// Manually set `TargetBlock` phase.
1633		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1634
1635		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1636
1637		// Wrong block received.
1638		let response = vec![BlockData::<Block> {
1639			hash: wrong_block.header().hash(),
1640			header: Some(wrong_block.header().clone()),
1641			body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1642			indexed_body: None,
1643			receipt: None,
1644			message_queue: None,
1645			justification: None,
1646			justifications: None,
1647		}];
1648		// Peer is dropped.
1649		assert!(matches!(
1650			warp_sync.on_block_response_inner(peer_id, request, response),
1651			Err(BadPeer(id, _rep)) if id == peer_id,
1652		));
1653	}
1654
1655	#[test]
1656	fn correct_target_block_response_sets_strategy_result() {
1657		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1658		let mut provider = MockWarpSyncProvider::<Block>::new();
1659		provider
1660			.expect_current_authorities()
1661			.once()
1662			.return_const(AuthorityList::default());
1663		let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1664			.on_parent_block(client.chain_info().best_hash)
1665			.with_parent_block_number(client.chain_info().best_number)
1666			.build()
1667			.unwrap();
1668		target_block_builder
1669			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1670			.unwrap();
1671		let target_block = target_block_builder.build().unwrap().block;
1672		let target_header = target_block.header().clone();
1673		// Warp proof is complete.
1674		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1675			Ok(VerificationResult::Complete(set_id, authorities, target_header))
1676		});
1677		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1678		let mut warp_sync =
1679			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1680
1681		// Make sure we have enough peers to make a request.
1682		for best_number in 1..11 {
1683			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1684		}
1685
1686		// Manually set `TargetBlock` phase.
1687		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1688
1689		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1690
1691		// Correct block received.
1692		let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1693		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1694		let response = vec![BlockData::<Block> {
1695			hash: target_block.header().hash(),
1696			header: Some(target_block.header().clone()),
1697			body: body.clone(),
1698			indexed_body: None,
1699			receipt: None,
1700			message_queue: None,
1701			justification: None,
1702			justifications: justifications.clone(),
1703		}];
1704
1705		assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1706
1707		let network_provider = NetworkServiceProvider::new();
1708		let network_handle = network_provider.handle();
1709
1710		// Strategy finishes.
1711		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1712		assert_eq!(actions.len(), 1);
1713		assert!(matches!(actions[0], SyncingAction::Finished));
1714
1715		// With correct result.
1716		let result = warp_sync.take_result().unwrap();
1717		assert_eq!(result.target_header, *target_block.header());
1718		assert_eq!(result.target_body, body);
1719		assert_eq!(result.target_justifications, justifications);
1720	}
1721}