Skip to main content

sc_network_sync/strategy/
warp.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Warp syncing strategy. Bootstraps chain by downloading warp proofs and state.
20
21use sc_consensus::IncomingBlock;
22use sp_consensus::BlockOrigin;
23
24use crate::{
25	block_relay_protocol::{BlockDownloader, BlockResponseError},
26	service::network::NetworkServiceHandle,
27	strategy::{
28		chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
29		SyncingAction,
30	},
31	types::{BadPeer, SyncState, SyncStatus},
32	LOG_TARGET,
33};
34use codec::{Decode, Encode};
35use futures::{channel::oneshot, FutureExt};
36use log::{debug, error, trace, warn};
37use sc_network::{IfDisconnected, ProtocolName};
38use sc_network_common::sync::message::{
39	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
40};
41use sc_network_types::PeerId;
42use sp_blockchain::HeaderBackend;
43use sp_runtime::{
44	traits::{Block as BlockT, Header, NumberFor, Zero},
45	Justifications, SaturatedConversion,
46};
47use std::{any::Any, collections::HashMap, fmt, sync::Arc};
48
49/// Number of peers that need to be connected before warp sync is started.
50const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
51
52/// Scale-encoded warp sync proof response.
53pub struct EncodedProof(pub Vec<u8>);
54
55/// Warp sync request
56#[derive(Encode, Decode, Debug, Clone)]
57pub struct WarpProofRequest<B: BlockT> {
58	/// Start collecting proofs from this block.
59	pub begin: B::Hash,
60}
61
62/// Verifier for warp sync proofs. Each verifier operates in a specific context.
63pub trait Verifier<Block: BlockT>: Send + Sync {
64	/// Verify a warp sync proof.
65	fn verify(
66		&mut self,
67		proof: &EncodedProof,
68	) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
69	/// Hash to be used as the starting point for the next proof request.
70	fn next_proof_context(&self) -> Block::Hash;
71	/// Get status text for progress reporting
72	fn status(&self) -> Option<String>;
73}
74
75/// Proof verification result.
76pub enum VerificationResult<Block: BlockT> {
77	/// Proof is valid, but the target was not reached.
78	Partial(Vec<(Block::Header, Justifications)>),
79	/// Target finality is proved.
80	Complete(Block::Header, Vec<(Block::Header, Justifications)>),
81}
82
83/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
84pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
85	/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
86	/// size is reached.
87	fn generate(
88		&self,
89		start: Block::Hash,
90	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
91	/// Create a verifier for warp sync proofs.
92	fn create_verifier(&self) -> Box<dyn Verifier<Block>>;
93}
94
95mod rep {
96	use sc_network::ReputationChange as Rep;
97
98	/// Unexpected response received form a peer
99	pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
100
101	/// Peer provided invalid warp proof data
102	pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
103
104	/// Peer did not provide us with advertised block data.
105	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
106
107	/// Reputation change for peers which send us non-requested block data.
108	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
109
110	/// Reputation change for peers which send us a block which we fail to verify.
111	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
112
113	/// We received a message that failed to decode.
114	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
115}
116
117/// Reported warp sync phase.
118#[derive(Clone, Eq, PartialEq, Debug)]
119pub enum WarpSyncPhase<Block: BlockT> {
120	/// Waiting for peers to connect.
121	AwaitingPeers { required_peers: usize },
122	/// Downloading and verifying warp proofs.
123	DownloadingWarpProofs,
124	/// Downloading target block.
125	DownloadingTargetBlock,
126	/// Downloading state data.
127	DownloadingState,
128	/// Importing state.
129	ImportingState,
130	/// Downloading block history.
131	DownloadingBlocks(NumberFor<Block>),
132	/// Warp sync is complete.
133	Complete,
134}
135
136impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
137	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138		match self {
139			Self::AwaitingPeers { required_peers } => {
140				write!(f, "Waiting for {required_peers} peers to be connected")
141			},
142			Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
143			Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
144			Self::DownloadingState => write!(f, "Downloading state"),
145			Self::ImportingState => write!(f, "Importing state"),
146			Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
147			Self::Complete => write!(f, "Warp sync is complete"),
148		}
149	}
150}
151
152/// Reported warp sync progress.
153#[derive(Clone, Eq, PartialEq, Debug)]
154pub struct WarpSyncProgress<Block: BlockT> {
155	/// Estimated download percentage.
156	pub phase: WarpSyncPhase<Block>,
157	/// Total bytes downloaded so far.
158	pub total_bytes: u64,
159	/// Optional status text from the verifier.
160	pub status: Option<String>,
161}
162
163/// Warp sync configuration as accepted by [`WarpSync`].
164pub enum WarpSyncConfig<Block: BlockT> {
165	/// Standard warp sync for the chain.
166	WithProvider(Arc<dyn WarpSyncProvider<Block>>),
167	/// Skip downloading proofs and use provided header of the state that should be downloaded.
168	///
169	/// It is expected that the header provider ensures that the header is trusted.
170	WithTarget(<Block as BlockT>::Header),
171}
172
173/// Warp sync phase used by warp sync state machine.
174enum Phase<B: BlockT> {
175	/// Waiting for enough peers to connect.
176	WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
177	/// Downloading warp proofs.
178	WarpProof { verifier: Box<dyn Verifier<B>> },
179	/// Downloading target block.
180	TargetBlock(B::Header),
181	/// Warp sync is complete.
182	Complete,
183}
184
185enum PeerState {
186	Available,
187	DownloadingProofs,
188	DownloadingTargetBlock,
189}
190
191impl PeerState {
192	fn is_available(&self) -> bool {
193		matches!(self, PeerState::Available)
194	}
195}
196
197struct Peer<B: BlockT> {
198	best_number: NumberFor<B>,
199	state: PeerState,
200}
201
202pub struct WarpSyncResult<B: BlockT> {
203	pub target_header: B::Header,
204	pub target_body: Option<Vec<B::Extrinsic>>,
205	pub target_justifications: Option<Justifications>,
206}
207
208/// Warp sync state machine. Accumulates warp proofs and state.
209pub struct WarpSync<B: BlockT> {
210	phase: Phase<B>,
211	total_proof_bytes: u64,
212	total_state_bytes: u64,
213	peers: HashMap<PeerId, Peer<B>>,
214	disconnected_peers: DisconnectedPeers,
215	protocol_name: Option<ProtocolName>,
216	block_downloader: Arc<dyn BlockDownloader<B>>,
217	actions: Vec<SyncingAction<B>>,
218	result: Option<WarpSyncResult<B>>,
219	/// Number of peers that need to be connected before warp sync is started.
220	min_peers_to_start_warp_sync: usize,
221}
222
223impl<B> WarpSync<B>
224where
225	B: BlockT,
226{
227	/// Strategy key used by warp sync.
228	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
229
230	/// Create a new instance. When passing a warp sync provider we will be checking for proof and
231	/// authorities. Alternatively we can pass a target block when we want to skip downloading
232	/// proofs, in this case we will continue polling until the target block is known.
233	pub fn new<Client>(
234		client: Arc<Client>,
235		warp_sync_config: WarpSyncConfig<B>,
236		protocol_name: Option<ProtocolName>,
237		block_downloader: Arc<dyn BlockDownloader<B>>,
238		min_peers_to_start_warp_sync: Option<usize>,
239	) -> Self
240	where
241		Client: HeaderBackend<B> + 'static,
242	{
243		let min_peers_to_start_warp_sync =
244			min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
245		if client.info().finalized_state.is_some() {
246			error!(
247				target: LOG_TARGET,
248				"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
249			);
250			return Self {
251				phase: Phase::Complete,
252				total_proof_bytes: 0,
253				total_state_bytes: 0,
254				peers: HashMap::new(),
255				disconnected_peers: DisconnectedPeers::new(),
256				protocol_name,
257				block_downloader,
258				actions: vec![SyncingAction::Finished],
259				result: None,
260				min_peers_to_start_warp_sync,
261			};
262		}
263
264		let phase = match warp_sync_config {
265			WarpSyncConfig::WithProvider(warp_sync_provider) => {
266				Phase::WaitingForPeers { warp_sync_provider }
267			},
268			WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
269		};
270
271		Self {
272			phase,
273			total_proof_bytes: 0,
274			total_state_bytes: 0,
275			peers: HashMap::new(),
276			disconnected_peers: DisconnectedPeers::new(),
277			protocol_name,
278			block_downloader,
279			actions: Vec::new(),
280			result: None,
281			min_peers_to_start_warp_sync,
282		}
283	}
284
285	/// Notify that a new peer has connected.
286	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
287		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
288
289		self.try_to_start_warp_sync();
290	}
291
292	/// Notify that a peer has disconnected.
293	pub fn remove_peer(&mut self, peer_id: &PeerId) {
294		if let Some(state) = self.peers.remove(peer_id) {
295			if !state.state.is_available() {
296				if let Some(bad_peer) =
297					self.disconnected_peers.on_disconnect_during_request(*peer_id)
298				{
299					self.actions.push(SyncingAction::DropPeer(bad_peer));
300				}
301			}
302		}
303	}
304
305	/// Submit a validated block announcement.
306	///
307	/// Returns new best hash & best number of the peer if they are updated.
308	#[must_use]
309	pub fn on_validated_block_announce(
310		&mut self,
311		is_best: bool,
312		peer_id: PeerId,
313		announce: &BlockAnnounce<B::Header>,
314	) -> Option<(B::Hash, NumberFor<B>)> {
315		is_best.then(|| {
316			let best_number = *announce.header.number();
317			let best_hash = announce.header.hash();
318			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
319				peer.best_number = best_number;
320			}
321			// Let `SyncingEngine` know that we should update the peer info.
322			(best_hash, best_number)
323		})
324	}
325
326	/// Start warp sync as soon as we have enough peers.
327	fn try_to_start_warp_sync(&mut self) {
328		let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
329
330		if self.peers.len() < self.min_peers_to_start_warp_sync {
331			return;
332		}
333
334		let verifier = warp_sync_provider.create_verifier();
335		self.phase = Phase::WarpProof { verifier };
336		debug!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
337	}
338
339	pub fn on_generic_response(
340		&mut self,
341		peer_id: &PeerId,
342		protocol_name: ProtocolName,
343		response: Box<dyn Any + Send>,
344	) {
345		if &protocol_name == self.block_downloader.protocol_name() {
346			let Ok(response) = response
347				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
348			else {
349				warn!(target: LOG_TARGET, "Failed to downcast block response");
350				debug_assert!(false);
351				return;
352			};
353
354			let (request, response) = *response;
355			let blocks = match response {
356				Ok(blocks) => blocks,
357				Err(BlockResponseError::DecodeFailed(e)) => {
358					debug!(
359						target: LOG_TARGET,
360						"Failed to decode block response from peer {:?}: {:?}.",
361						peer_id,
362						e
363					);
364					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
365					return;
366				},
367				Err(BlockResponseError::ExtractionFailed(e)) => {
368					debug!(
369						target: LOG_TARGET,
370						"Failed to extract blocks from peer response {:?}: {:?}.",
371						peer_id,
372						e
373					);
374					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
375					return;
376				},
377			};
378
379			self.on_block_response(*peer_id, request, blocks);
380		} else {
381			let Ok(response) = response.downcast::<Vec<u8>>() else {
382				warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
383				debug_assert!(false);
384				return;
385			};
386
387			self.on_warp_proof_response(peer_id, EncodedProof(*response));
388		}
389	}
390
391	/// Process warp proof response.
392	pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
393		if let Some(peer) = self.peers.get_mut(peer_id) {
394			peer.state = PeerState::Available;
395		}
396
397		let Phase::WarpProof { verifier } = &mut self.phase else {
398			debug!(target: LOG_TARGET, "Unexpected warp proof response");
399			self.actions
400				.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
401			return;
402		};
403
404		let proof_to_incoming_block =
405			|(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
406				IncomingBlock {
407					hash: header.hash(),
408					header: Some(header),
409					body: None,
410					indexed_body: None,
411					justifications: Some(justifications),
412					origin: Some(*peer_id),
413					// We are still in warp sync, so we don't have the state. This means
414					// we also can't execute the block.
415					allow_missing_state: true,
416					skip_execution: true,
417					// Shouldn't already exist in the database.
418					import_existing: false,
419					state: None,
420				}
421			};
422
423		match verifier.verify(&response) {
424			Err(e) => {
425				debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
426				self.actions
427					.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
428			},
429			Ok(VerificationResult::Partial(proofs)) => {
430				debug!(target: LOG_TARGET, "Verified partial proof");
431				self.total_proof_bytes += response.0.len() as u64;
432				self.actions.push(SyncingAction::ImportBlocks {
433					origin: BlockOrigin::WarpSync,
434					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
435				});
436			},
437			Ok(VerificationResult::Complete(header, proofs)) => {
438				debug!(
439					target: LOG_TARGET,
440					"Verified complete proof. Continuing with target block download: {} ({}).",
441					header.hash(),
442					header.number(),
443				);
444				self.total_proof_bytes += response.0.len() as u64;
445				self.phase = Phase::TargetBlock(header.clone());
446				let incoming_blocks: Vec<_> = proofs
447					.into_iter()
448					.map(proof_to_incoming_block)
449					.filter(|i| {
450						// We need target block with state and warp sync does not provide this.
451						// That's why we filter out target block here, otherwise oncoming state sync
452						// (which comes after warp sync) will abort because target block is already
453						// imported.
454						if header.number() != i.header.as_ref().unwrap().number() {
455							true
456						} else {
457							log::trace!(
458								target: LOG_TARGET,
459								"Filtered out target block: {} ({})",
460								header.hash(),
461								header.number()
462							);
463							false
464						}
465					})
466					.collect();
467				self.actions.push(SyncingAction::ImportBlocks {
468					origin: BlockOrigin::WarpSync,
469					blocks: incoming_blocks,
470				});
471			},
472		}
473	}
474
475	/// Process (target) block response.
476	pub fn on_block_response(
477		&mut self,
478		peer_id: PeerId,
479		request: BlockRequest<B>,
480		blocks: Vec<BlockData<B>>,
481	) {
482		if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
483			self.actions.push(SyncingAction::DropPeer(bad_peer));
484		}
485	}
486
487	fn on_block_response_inner(
488		&mut self,
489		peer_id: PeerId,
490		request: BlockRequest<B>,
491		mut blocks: Vec<BlockData<B>>,
492	) -> Result<(), BadPeer> {
493		if let Some(peer) = self.peers.get_mut(&peer_id) {
494			peer.state = PeerState::Available;
495		}
496
497		let Phase::TargetBlock(header) = &mut self.phase else {
498			debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
499			return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE));
500		};
501
502		if blocks.is_empty() {
503			debug!(
504				target: LOG_TARGET,
505				"Downloading target block failed: empty block response from {peer_id}",
506			);
507			return Err(BadPeer(peer_id, rep::NO_BLOCK));
508		}
509
510		if blocks.len() > 1 {
511			debug!(
512				target: LOG_TARGET,
513				"Too many blocks ({}) in warp target block response from {peer_id}",
514				blocks.len(),
515			);
516			return Err(BadPeer(peer_id, rep::NOT_REQUESTED));
517		}
518
519		validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
520
521		let block = blocks.pop().expect("`blocks` len checked above; qed");
522
523		let Some(block_header) = &block.header else {
524			debug!(
525				target: LOG_TARGET,
526				"Downloading target block failed: missing header in response from {peer_id}.",
527			);
528			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
529		};
530
531		if block_header != header {
532			debug!(
533				target: LOG_TARGET,
534				"Downloading target block failed: different header in response from {peer_id}.",
535			);
536			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
537		}
538
539		if block.body.is_none() {
540			debug!(
541				target: LOG_TARGET,
542				"Downloading target block failed: missing body in response from {peer_id}.",
543			);
544			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
545		}
546
547		self.result = Some(WarpSyncResult {
548			target_header: header.clone(),
549			target_body: block.body,
550			target_justifications: block.justifications,
551		});
552		self.phase = Phase::Complete;
553		self.actions.push(SyncingAction::Finished);
554		Ok(())
555	}
556
557	/// Reserve a peer for a request assigning `new_state`.
558	fn schedule_next_peer(
559		&mut self,
560		new_state: PeerState,
561		min_best_number: Option<NumberFor<B>>,
562	) -> Option<PeerId> {
563		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
564		if targets.is_empty() {
565			return None;
566		}
567		targets.sort();
568		let median = targets[targets.len() / 2];
569		let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
570		// Find a random peer that is synced as much as peer majority and is above
571		// `min_best_number`.
572		for (peer_id, peer) in self.peers.iter_mut() {
573			if peer.state.is_available() &&
574				peer.best_number >= threshold &&
575				self.disconnected_peers.is_peer_available(peer_id)
576			{
577				peer.state = new_state;
578				return Some(*peer_id);
579			}
580		}
581		None
582	}
583
584	/// Produce warp proof request.
585	fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
586		let Phase::WarpProof { verifier } = &self.phase else { return None };
587
588		// Copy verifier context early to cut the borrowing tie.
589		let begin = verifier.next_proof_context();
590
591		if self
592			.peers
593			.values()
594			.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
595		{
596			// Only one warp proof request at a time is possible.
597			return None;
598		}
599
600		let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
601		trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
602
603		let request = WarpProofRequest { begin };
604
605		let Some(protocol_name) = self.protocol_name.clone() else {
606			warn!(
607				target: LOG_TARGET,
608				"Trying to send warp sync request when no protocol is configured {request:?}",
609			);
610			return None;
611		};
612
613		Some((peer_id, protocol_name, request))
614	}
615
616	/// Produce target block request.
617	fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
618		let Phase::TargetBlock(target_header) = &self.phase else { return None };
619
620		if self
621			.peers
622			.values()
623			.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
624		{
625			// Only one target block request at a time is possible.
626			return None;
627		}
628
629		// Cut the borrowing tie.
630		let target_hash = target_header.hash();
631		let target_number = *target_header.number();
632
633		let peer_id =
634			self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
635
636		trace!(
637			target: LOG_TARGET,
638			"New target block request to {peer_id}, target: {} ({}).",
639			target_hash,
640			target_number,
641		);
642
643		Some((
644			peer_id,
645			BlockRequest::<B> {
646				id: 0,
647				fields: BlockAttributes::HEADER |
648					BlockAttributes::BODY |
649					BlockAttributes::JUSTIFICATION,
650				from: FromBlock::Hash(target_hash),
651				direction: Direction::Ascending,
652				max: Some(1),
653			},
654		))
655	}
656
657	/// Returns warp sync estimated progress (stage, bytes received).
658	pub fn progress(&self) -> WarpSyncProgress<B> {
659		match &self.phase {
660			Phase::WaitingForPeers { .. } => WarpSyncProgress {
661				phase: WarpSyncPhase::AwaitingPeers {
662					required_peers: self.min_peers_to_start_warp_sync,
663				},
664				total_bytes: self.total_proof_bytes,
665				status: None,
666			},
667			Phase::WarpProof { verifier } => WarpSyncProgress {
668				phase: WarpSyncPhase::DownloadingWarpProofs,
669				total_bytes: self.total_proof_bytes,
670				status: verifier.status(),
671			},
672			Phase::TargetBlock(_) => WarpSyncProgress {
673				phase: WarpSyncPhase::DownloadingTargetBlock,
674				total_bytes: self.total_proof_bytes,
675				status: None,
676			},
677			Phase::Complete => WarpSyncProgress {
678				phase: WarpSyncPhase::Complete,
679				total_bytes: self.total_proof_bytes + self.total_state_bytes,
680				status: None,
681			},
682		}
683	}
684
685	/// Get the number of peers known to warp sync.
686	pub fn num_peers(&self) -> usize {
687		self.peers.len()
688	}
689
690	/// Returns the current sync status.
691	pub fn status(&self) -> SyncStatus<B> {
692		SyncStatus {
693			state: match &self.phase {
694				Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
695				Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
696				Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
697				Phase::Complete => SyncState::Idle,
698			},
699			best_seen_block: match &self.phase {
700				Phase::WaitingForPeers { .. } => None,
701				Phase::WarpProof { .. } => None,
702				Phase::TargetBlock(header) => Some(*header.number()),
703				Phase::Complete => None,
704			},
705			num_peers: self.peers.len().saturated_into(),
706			queued_blocks: 0,
707			state_sync: None,
708			warp_sync: Some(self.progress()),
709		}
710	}
711
712	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
713	#[must_use]
714	pub fn actions(
715		&mut self,
716		network_service: &NetworkServiceHandle,
717	) -> impl Iterator<Item = SyncingAction<B>> {
718		let warp_proof_request =
719			self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
720				trace!(
721					target: LOG_TARGET,
722					"Created `WarpProofRequest` to {}, request: {:?}.",
723					peer_id,
724					request,
725				);
726
727				let (tx, rx) = oneshot::channel();
728
729				network_service.start_request(
730					peer_id,
731					protocol_name,
732					request.encode(),
733					tx,
734					IfDisconnected::ImmediateError,
735				);
736
737				SyncingAction::StartRequest {
738					peer_id,
739					key: Self::STRATEGY_KEY,
740					request: async move {
741						Ok(rx.await?.and_then(|(response, protocol_name)| {
742							Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
743						}))
744					}
745					.boxed(),
746					remove_obsolete: false,
747				}
748			});
749		self.actions.extend(warp_proof_request);
750
751		let target_block_request =
752			self.target_block_request().into_iter().map(|(peer_id, request)| {
753				let downloader = self.block_downloader.clone();
754
755				SyncingAction::StartRequest {
756					peer_id,
757					key: Self::STRATEGY_KEY,
758					request: async move {
759						Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
760							|(response, protocol_name)| {
761								let decoded_response =
762									downloader.block_response_into_blocks(&request, response);
763								let result =
764									Box::new((request, decoded_response)) as Box<dyn Any + Send>;
765								Ok((result, protocol_name))
766							},
767						))
768					}
769					.boxed(),
770					// Sending block request implies dropping obsolete pending response as we are
771					// not interested in it anymore.
772					remove_obsolete: true,
773				}
774			});
775		self.actions.extend(target_block_request);
776
777		std::mem::take(&mut self.actions).into_iter()
778	}
779
780	/// Take the result of finished warp sync, returning `None` if the sync was unsuccessful.
781	#[must_use]
782	pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
783		self.result.take()
784	}
785}
786
787#[cfg(test)]
788mod test {
789	use super::*;
790	use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
791	use sc_block_builder::BlockBuilderBuilder;
792	use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
793	use sp_core::H256;
794	use sp_runtime::{
795		traits::{Block as BlockT, Header as HeaderT, NumberFor},
796		ConsensusEngineId,
797	};
798	use std::{io::ErrorKind, sync::Arc};
799	use substrate_test_runtime_client::{
800		runtime::{Block, Hash},
801		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
802	};
803
804	pub const TEST_ENGINE_ID: ConsensusEngineId = *b"TEST";
805
806	mockall::mock! {
807		pub Client<B: BlockT> {}
808
809		impl<B: BlockT> HeaderBackend<B> for Client<B> {
810			fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
811			fn info(&self) -> Info<B>;
812			fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
813			fn number(
814				&self,
815				hash: B::Hash,
816			) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
817			fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
818		}
819	}
820
821	mockall::mock! {
822		pub WarpSyncProvider<B: BlockT> {}
823
824		impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
825			fn generate(
826				&self,
827				start: B::Hash,
828			) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
829			fn create_verifier(&self) -> Box<dyn super::Verifier<B>>;
830		}
831	}
832
833	mockall::mock! {
834		pub Verifier<B: BlockT> {}
835
836		impl<B: BlockT> super::Verifier<B> for Verifier<B> {
837			fn verify(
838				&mut self,
839				proof: &EncodedProof,
840			) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
841			fn next_proof_context(&self) -> B::Hash;
842			fn status(&self) -> Option<String>;
843		}
844	}
845
846	fn mock_client_with_state() -> MockClient<Block> {
847		let mut client = MockClient::<Block>::new();
848		let genesis_hash = Hash::random();
849		client.expect_info().return_once(move || Info {
850			best_hash: genesis_hash,
851			best_number: 0,
852			genesis_hash,
853			finalized_hash: genesis_hash,
854			finalized_number: 0,
855			// We need some finalized state to render warp sync impossible.
856			finalized_state: Some((genesis_hash, 0)),
857			number_leaves: 0,
858			block_gap: None,
859		});
860
861		client
862	}
863
864	fn mock_client_without_state() -> MockClient<Block> {
865		let mut client = MockClient::<Block>::new();
866		let genesis_hash = Hash::random();
867		client.expect_info().returning(move || Info {
868			best_hash: genesis_hash,
869			best_number: 0,
870			genesis_hash,
871			finalized_hash: genesis_hash,
872			finalized_number: 0,
873			finalized_state: None,
874			number_leaves: 0,
875			block_gap: None,
876		});
877
878		client
879	}
880
881	#[test]
882	fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
883		let client = mock_client_with_state();
884		let provider = MockWarpSyncProvider::<Block>::new();
885		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
886		let mut warp_sync = WarpSync::new(
887			Arc::new(client),
888			config,
889			None,
890			Arc::new(MockBlockDownloader::new()),
891			None,
892		);
893
894		let network_provider = NetworkServiceProvider::new();
895		let network_handle = network_provider.handle();
896
897		// Warp sync instantly finishes
898		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
899		assert_eq!(actions.len(), 1);
900		assert!(matches!(actions[0], SyncingAction::Finished));
901
902		// ... with no result.
903		assert!(warp_sync.take_result().is_none());
904	}
905
906	#[test]
907	fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
908		let client = mock_client_with_state();
909		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
910			1,
911			Default::default(),
912			Default::default(),
913			Default::default(),
914			Default::default(),
915		));
916		let mut warp_sync = WarpSync::new(
917			Arc::new(client),
918			config,
919			None,
920			Arc::new(MockBlockDownloader::new()),
921			None,
922		);
923
924		let network_provider = NetworkServiceProvider::new();
925		let network_handle = network_provider.handle();
926
927		// Warp sync instantly finishes
928		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
929		assert_eq!(actions.len(), 1);
930		assert!(matches!(actions[0], SyncingAction::Finished));
931
932		// ... with no result.
933		assert!(warp_sync.take_result().is_none());
934	}
935
936	#[test]
937	fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
938		let client = mock_client_without_state();
939		let provider = MockWarpSyncProvider::<Block>::new();
940		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
941		let mut warp_sync = WarpSync::new(
942			Arc::new(client),
943			config,
944			None,
945			Arc::new(MockBlockDownloader::new()),
946			None,
947		);
948
949		let network_provider = NetworkServiceProvider::new();
950		let network_handle = network_provider.handle();
951
952		// No actions are emitted.
953		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
954	}
955
956	#[test]
957	fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
958		let client = mock_client_without_state();
959		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
960			1,
961			Default::default(),
962			Default::default(),
963			Default::default(),
964			Default::default(),
965		));
966		let mut warp_sync = WarpSync::new(
967			Arc::new(client),
968			config,
969			None,
970			Arc::new(MockBlockDownloader::new()),
971			None,
972		);
973
974		let network_provider = NetworkServiceProvider::new();
975		let network_handle = network_provider.handle();
976
977		// No actions are emitted.
978		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
979	}
980
981	#[test]
982	fn warp_sync_is_started_only_when_there_is_enough_peers() {
983		let client = mock_client_without_state();
984		let mut provider = MockWarpSyncProvider::<Block>::new();
985		let mut verifier = MockVerifier::<Block>::new();
986		verifier.expect_next_proof_context().returning(|| Hash::random());
987		verifier
988			.expect_verify()
989			.returning(|_| unreachable!("verify should not be called in this test"));
990		provider.expect_create_verifier().return_once(move || Box::new(verifier));
991		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
992		let mut warp_sync = WarpSync::new(
993			Arc::new(client),
994			config,
995			None,
996			Arc::new(MockBlockDownloader::new()),
997			None,
998		);
999
1000		// Warp sync is not started when there is not enough peers.
1001		for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
1002			warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
1003			assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
1004		}
1005
1006		// Now we have enough peers and warp sync is started.
1007		warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
1008		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
1009	}
1010
1011	#[test]
1012	fn no_peer_is_scheduled_if_no_peers_connected() {
1013		let client = mock_client_without_state();
1014		let provider = MockWarpSyncProvider::<Block>::new();
1015		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1016		let mut warp_sync = WarpSync::new(
1017			Arc::new(client),
1018			config,
1019			None,
1020			Arc::new(MockBlockDownloader::new()),
1021			None,
1022		);
1023
1024		assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
1025	}
1026
1027	#[test]
1028	fn enough_peers_are_used_in_tests() {
1029		// Tests below use 10 peers. Fail early if it's less than a threshold for warp sync.
1030		assert!(
1031			10 >= MIN_PEERS_TO_START_WARP_SYNC,
1032			"Tests must be updated to use that many initial peers.",
1033		);
1034	}
1035
1036	#[test]
1037	fn at_least_median_synced_peer_is_scheduled() {
1038		for _ in 0..100 {
1039			let client = mock_client_without_state();
1040			let mut provider = MockWarpSyncProvider::<Block>::new();
1041			let mut verifier = MockVerifier::<Block>::new();
1042			verifier.expect_next_proof_context().returning(|| Hash::random());
1043			verifier
1044				.expect_verify()
1045				.returning(|_| unreachable!("verify should not be called in this test"));
1046			provider.expect_create_verifier().return_once(move || Box::new(verifier));
1047			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1048			let mut warp_sync = WarpSync::new(
1049				Arc::new(client),
1050				config,
1051				None,
1052				Arc::new(MockBlockDownloader::new()),
1053				None,
1054			);
1055
1056			for best_number in 1..11 {
1057				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1058			}
1059
1060			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1061			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1062		}
1063	}
1064
1065	#[test]
1066	fn min_best_number_peer_is_scheduled() {
1067		for _ in 0..10 {
1068			let client = mock_client_without_state();
1069			let mut provider = MockWarpSyncProvider::<Block>::new();
1070			let mut verifier = MockVerifier::<Block>::new();
1071			verifier.expect_next_proof_context().returning(|| Hash::random());
1072			verifier
1073				.expect_verify()
1074				.returning(|_| unreachable!("verify should not be called in this test"));
1075			provider.expect_create_verifier().return_once(move || Box::new(verifier));
1076			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1077			let mut warp_sync = WarpSync::new(
1078				Arc::new(client),
1079				config,
1080				None,
1081				Arc::new(MockBlockDownloader::new()),
1082				None,
1083			);
1084
1085			for best_number in 1..11 {
1086				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1087			}
1088
1089			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1090			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1091		}
1092	}
1093
1094	#[test]
1095	fn backedoff_number_peer_is_not_scheduled() {
1096		let client = mock_client_without_state();
1097		let mut provider = MockWarpSyncProvider::<Block>::new();
1098		let mut verifier = MockVerifier::<Block>::new();
1099		verifier.expect_next_proof_context().returning(|| Hash::random());
1100		verifier
1101			.expect_verify()
1102			.returning(|_| unreachable!("verify should not be called in this test"));
1103		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1104		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1105		let mut warp_sync = WarpSync::new(
1106			Arc::new(client),
1107			config,
1108			None,
1109			Arc::new(MockBlockDownloader::new()),
1110			None,
1111		);
1112
1113		for best_number in 1..11 {
1114			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1115		}
1116
1117		let ninth_peer =
1118			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1119		let tenth_peer =
1120			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1121
1122		// Disconnecting a peer without an inflight request has no effect on persistent states.
1123		warp_sync.remove_peer(&tenth_peer);
1124		assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1125
1126		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1127		let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1128		assert_eq!(tenth_peer, peer_id.unwrap());
1129		warp_sync.remove_peer(&tenth_peer);
1130
1131		// Peer is backed off.
1132		assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1133
1134		// No peer available for 10'th best block because of the backoff.
1135		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1136		let peer_id: Option<PeerId> =
1137			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1138		assert!(peer_id.is_none());
1139
1140		// Other requests can still happen.
1141		let peer_id: Option<PeerId> =
1142			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1143		assert_eq!(ninth_peer, peer_id.unwrap());
1144	}
1145
1146	#[test]
1147	fn no_warp_proof_request_in_another_phase() {
1148		let client = mock_client_without_state();
1149		let mut provider = MockWarpSyncProvider::<Block>::new();
1150		let mut verifier = MockVerifier::<Block>::new();
1151		verifier.expect_next_proof_context().returning(|| Hash::random());
1152		verifier
1153			.expect_verify()
1154			.returning(|_| unreachable!("verify should not be called in this test"));
1155		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1156		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1157		let mut warp_sync = WarpSync::new(
1158			Arc::new(client),
1159			config,
1160			Some(ProtocolName::Static("")),
1161			Arc::new(MockBlockDownloader::new()),
1162			None,
1163		);
1164
1165		// Make sure we have enough peers to make a request.
1166		for best_number in 1..11 {
1167			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1168		}
1169
1170		// Manually set to another phase.
1171		warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1172			1,
1173			Default::default(),
1174			Default::default(),
1175			Default::default(),
1176			Default::default(),
1177		));
1178
1179		// No request is made.
1180		assert!(warp_sync.warp_proof_request().is_none());
1181	}
1182
1183	#[test]
1184	fn warp_proof_request_starts_at_last_hash() {
1185		let client = mock_client_without_state();
1186		let mut provider = MockWarpSyncProvider::<Block>::new();
1187		let mut verifier = MockVerifier::<Block>::new();
1188		let known_last_hash = Hash::random();
1189		verifier.expect_next_proof_context().returning(move || known_last_hash);
1190		verifier
1191			.expect_verify()
1192			.returning(|_| unreachable!("verify should not be called in this test"));
1193		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1194		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1195		let mut warp_sync = WarpSync::new(
1196			Arc::new(client),
1197			config,
1198			Some(ProtocolName::Static("")),
1199			Arc::new(MockBlockDownloader::new()),
1200			None,
1201		);
1202
1203		// Make sure we have enough peers to make a request.
1204		for best_number in 1..11 {
1205			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1206		}
1207		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1208
1209		let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1210		assert_eq!(request.begin, known_last_hash);
1211	}
1212
1213	#[test]
1214	fn no_parallel_warp_proof_requests() {
1215		let client = mock_client_without_state();
1216		let mut provider = MockWarpSyncProvider::<Block>::new();
1217		let mut verifier = MockVerifier::<Block>::new();
1218		verifier.expect_next_proof_context().returning(|| Hash::random());
1219		verifier
1220			.expect_verify()
1221			.returning(|_| unreachable!("verify should not be called in this test"));
1222		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1223		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1224		let mut warp_sync = WarpSync::new(
1225			Arc::new(client),
1226			config,
1227			Some(ProtocolName::Static("")),
1228			Arc::new(MockBlockDownloader::new()),
1229			None,
1230		);
1231
1232		// Make sure we have enough peers to make requests.
1233		for best_number in 1..11 {
1234			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1235		}
1236		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1237
1238		// First request is made.
1239		assert!(warp_sync.warp_proof_request().is_some());
1240		// Second request is not made.
1241		assert!(warp_sync.warp_proof_request().is_none());
1242	}
1243
1244	#[test]
1245	fn bad_warp_proof_response_drops_peer() {
1246		let client = mock_client_without_state();
1247		let mut provider = MockWarpSyncProvider::<Block>::new();
1248		let mut verifier = MockVerifier::<Block>::new();
1249		verifier.expect_next_proof_context().returning(|| Hash::random());
1250		// Warp proof verification fails.
1251		verifier.expect_verify().return_once(|_proof| {
1252			Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1253		});
1254		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1255		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1256		let mut warp_sync = WarpSync::new(
1257			Arc::new(client),
1258			config,
1259			Some(ProtocolName::Static("")),
1260			Arc::new(MockBlockDownloader::new()),
1261			None,
1262		);
1263
1264		// Make sure we have enough peers to make a request.
1265		for best_number in 1..11 {
1266			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1267		}
1268		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1269
1270		let network_provider = NetworkServiceProvider::new();
1271		let network_handle = network_provider.handle();
1272
1273		// Consume `SendWarpProofRequest` action.
1274		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1275		assert_eq!(actions.len(), 1);
1276		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1277			panic!("Invalid action");
1278		};
1279
1280		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1281
1282		// We only interested in already generated actions, not new requests.
1283		let actions = std::mem::take(&mut warp_sync.actions);
1284		assert_eq!(actions.len(), 1);
1285		assert!(matches!(
1286			actions[0],
1287			SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1288		));
1289		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1290	}
1291
1292	#[test]
1293	fn partial_warp_proof_doesnt_advance_phase() {
1294		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1295		let mut provider = MockWarpSyncProvider::<Block>::new();
1296		let target_block = BlockBuilderBuilder::new(&*client)
1297			.on_parent_block(client.chain_info().best_hash)
1298			.with_parent_block_number(client.chain_info().best_number)
1299			.build()
1300			.unwrap()
1301			.build()
1302			.unwrap()
1303			.block;
1304		let target_header = target_block.header().clone();
1305		let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1306		// Warp proof is partial.
1307		let mut verifier = MockVerifier::<Block>::new();
1308		let context = client.info().genesis_hash;
1309		verifier.expect_next_proof_context().returning(move || context);
1310		let header_for_verify = target_header.clone();
1311		let just_for_verify = justifications.clone();
1312		verifier.expect_verify().return_once(move |_proof| {
1313			Ok(VerificationResult::Partial(vec![(
1314				header_for_verify.clone(),
1315				just_for_verify.clone(),
1316			)]))
1317		});
1318		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1319		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1320		let mut warp_sync = WarpSync::new(
1321			client,
1322			config,
1323			Some(ProtocolName::Static("")),
1324			Arc::new(MockBlockDownloader::new()),
1325			None,
1326		);
1327
1328		// Make sure we have enough peers to make a request.
1329		for best_number in 1..11 {
1330			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1331		}
1332		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1333
1334		let network_provider = NetworkServiceProvider::new();
1335		let network_handle = network_provider.handle();
1336
1337		// Consume `SendWarpProofRequest` action.
1338		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1339		assert_eq!(actions.len(), 1);
1340		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1341			panic!("Invalid action");
1342		};
1343
1344		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1345
1346		assert_eq!(warp_sync.actions.len(), 1);
1347		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1348		else {
1349			panic!("Expected `ImportBlocks` action.");
1350		};
1351		assert_eq!(origin, BlockOrigin::WarpSync);
1352		assert_eq!(blocks.len(), 1);
1353		let import_block = blocks.pop().unwrap();
1354		assert_eq!(
1355			import_block,
1356			IncomingBlock {
1357				hash: target_header.hash(),
1358				header: Some(target_header),
1359				body: None,
1360				indexed_body: None,
1361				justifications: Some(justifications),
1362				origin: Some(request_peer_id),
1363				allow_missing_state: true,
1364				skip_execution: true,
1365				import_existing: false,
1366				state: None,
1367			}
1368		);
1369		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1370	}
1371
1372	#[test]
1373	fn complete_warp_proof_advances_phase() {
1374		// Initialize logging
1375		sp_tracing::try_init_simple();
1376
1377		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1378		let mut provider = MockWarpSyncProvider::<Block>::new();
1379
1380		// Building both blocks manually. Can't use BlockBuilderBuilder to build one block on top of
1381		// another, if no genesis is set (required for warp sync to make sure finalized_state is not
1382		// set)
1383		let warp_synced_header = <<Block as BlockT>::Header as HeaderT>::new(
1384			1,
1385			Default::default(),
1386			Default::default(),
1387			client.chain_info().best_hash,
1388			Default::default(),
1389		);
1390
1391		// Build target_block on top of warp_synced_block
1392		let target_header = <<Block as BlockT>::Header as HeaderT>::new(
1393			2,
1394			Default::default(),
1395			Default::default(),
1396			warp_synced_header.hash(),
1397			Default::default(),
1398		);
1399		let warp_justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1400		let target_justifications =
1401			Justifications::new(vec![(TEST_ENGINE_ID, vec![6, 7, 8, 9, 10])]);
1402
1403		// Warp proof is complete and contains both blocks.
1404		let mut verifier = MockVerifier::<Block>::new();
1405		let context = client.info().genesis_hash;
1406		verifier.expect_next_proof_context().returning(move || context);
1407		let warp_synced_header_for_verify = warp_synced_header.clone();
1408		let warp_just_for_verify = warp_justifications.clone();
1409		let target_header_for_verify = target_header.clone();
1410		let target_just_for_verify = target_justifications.clone();
1411		verifier.expect_verify().return_once(move |_proof| {
1412			Ok(VerificationResult::Complete(
1413				target_header_for_verify.clone(),
1414				vec![
1415					(warp_synced_header_for_verify, warp_just_for_verify),
1416					(target_header_for_verify, target_just_for_verify),
1417				],
1418			))
1419		});
1420		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1421		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1422		let mut warp_sync = WarpSync::new(
1423			client,
1424			config,
1425			Some(ProtocolName::Static("")),
1426			Arc::new(MockBlockDownloader::new()),
1427			None,
1428		);
1429
1430		// Make sure we have enough peers to make a request.
1431		for best_number in 1..11 {
1432			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1433		}
1434		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1435
1436		let network_provider = NetworkServiceProvider::new();
1437		let network_handle = network_provider.handle();
1438
1439		// Consume `SendWarpProofRequest` action.
1440		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1441		assert_eq!(actions.len(), 1);
1442		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1443			panic!("Invalid action.");
1444		};
1445
1446		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1447
1448		assert_eq!(warp_sync.actions.len(), 1);
1449		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1450		else {
1451			panic!("Expected `ImportBlocks` action.");
1452		};
1453		assert_eq!(origin, BlockOrigin::WarpSync);
1454		// Only warp_synced_block should be in the import list; target_block is filtered out
1455		assert_eq!(blocks.len(), 1);
1456		let import_block = blocks.pop().unwrap();
1457		assert_eq!(
1458			import_block,
1459			IncomingBlock {
1460				hash: warp_synced_header.hash(),
1461				header: Some(warp_synced_header),
1462				body: None,
1463				indexed_body: None,
1464				justifications: Some(warp_justifications),
1465				origin: Some(request_peer_id),
1466				allow_missing_state: true,
1467				skip_execution: true,
1468				import_existing: false,
1469				state: None,
1470			}
1471		);
1472		assert!(matches!(warp_sync.phase, Phase::TargetBlock(header) if header == target_header));
1473	}
1474
1475	#[test]
1476	fn no_target_block_requests_in_another_phase() {
1477		let client = mock_client_without_state();
1478		let mut provider = MockWarpSyncProvider::<Block>::new();
1479		let mut verifier = MockVerifier::<Block>::new();
1480		verifier.expect_next_proof_context().returning(|| Hash::random());
1481		verifier
1482			.expect_verify()
1483			.returning(|_| unreachable!("verify should not be called in this test"));
1484		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1485		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1486		let mut warp_sync = WarpSync::new(
1487			Arc::new(client),
1488			config,
1489			None,
1490			Arc::new(MockBlockDownloader::new()),
1491			None,
1492		);
1493
1494		// Make sure we have enough peers to make a request.
1495		for best_number in 1..11 {
1496			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1497		}
1498		// We are not in `Phase::TargetBlock`
1499		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1500
1501		// No request is made.
1502		assert!(warp_sync.target_block_request().is_none());
1503	}
1504
1505	#[test]
1506	fn target_block_request_is_correct() {
1507		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1508		let mut provider = MockWarpSyncProvider::<Block>::new();
1509		let mut verifier = MockVerifier::<Block>::new();
1510		let header_for_ctx = client.info().genesis_hash;
1511		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1512		let target_block = BlockBuilderBuilder::new(&*client)
1513			.on_parent_block(client.chain_info().best_hash)
1514			.with_parent_block_number(client.chain_info().best_number)
1515			.build()
1516			.unwrap()
1517			.build()
1518			.unwrap()
1519			.block;
1520		let target_header = target_block.header().clone();
1521		// Warp proof is complete.
1522		let header_for_verify = target_header.clone();
1523		verifier.expect_verify().return_once(move |_proof| {
1524			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1525		});
1526		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1527		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1528		let mut warp_sync =
1529			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1530
1531		// Make sure we have enough peers to make a request.
1532		for best_number in 1..11 {
1533			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1534		}
1535
1536		// Manually set `TargetBlock` phase.
1537		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1538
1539		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1540		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1541		assert_eq!(
1542			request.fields,
1543			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1544		);
1545		assert_eq!(request.max, Some(1));
1546	}
1547
1548	#[test]
1549	fn externally_set_target_block_is_requested() {
1550		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1551		let target_block = BlockBuilderBuilder::new(&*client)
1552			.on_parent_block(client.chain_info().best_hash)
1553			.with_parent_block_number(client.chain_info().best_number)
1554			.build()
1555			.unwrap()
1556			.build()
1557			.unwrap()
1558			.block;
1559		let target_header = target_block.header().clone();
1560		let config = WarpSyncConfig::WithTarget(target_header);
1561		let mut warp_sync =
1562			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1563
1564		// Make sure we have enough peers to make a request.
1565		for best_number in 1..11 {
1566			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1567		}
1568
1569		assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1570
1571		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1572		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1573		assert_eq!(
1574			request.fields,
1575			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1576		);
1577		assert_eq!(request.max, Some(1));
1578	}
1579
1580	#[test]
1581	fn no_parallel_target_block_requests() {
1582		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1583		let mut provider = MockWarpSyncProvider::<Block>::new();
1584		let mut verifier = MockVerifier::<Block>::new();
1585		let header_for_ctx = client.info().genesis_hash;
1586		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1587		let target_block = BlockBuilderBuilder::new(&*client)
1588			.on_parent_block(client.chain_info().best_hash)
1589			.with_parent_block_number(client.chain_info().best_number)
1590			.build()
1591			.unwrap()
1592			.build()
1593			.unwrap()
1594			.block;
1595		let target_header = target_block.header().clone();
1596		// Warp proof is complete.
1597		let header_for_verify = target_header.clone();
1598		verifier.expect_verify().return_once(move |_proof| {
1599			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1600		});
1601		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1602		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1603		let mut warp_sync =
1604			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1605
1606		// Make sure we have enough peers to make a request.
1607		for best_number in 1..11 {
1608			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1609		}
1610
1611		// Manually set `TargetBlock` phase.
1612		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1613
1614		// First target block request is made.
1615		assert!(warp_sync.target_block_request().is_some());
1616		// No parallel request is made.
1617		assert!(warp_sync.target_block_request().is_none());
1618	}
1619
1620	#[test]
1621	fn target_block_response_with_no_blocks_drops_peer() {
1622		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1623		let mut provider = MockWarpSyncProvider::<Block>::new();
1624		let mut verifier = MockVerifier::<Block>::new();
1625		let header_for_ctx = client.info().genesis_hash;
1626		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1627		let target_block = BlockBuilderBuilder::new(&*client)
1628			.on_parent_block(client.chain_info().best_hash)
1629			.with_parent_block_number(client.chain_info().best_number)
1630			.build()
1631			.unwrap()
1632			.build()
1633			.unwrap()
1634			.block;
1635		let target_header = target_block.header().clone();
1636		// Warp proof is complete.
1637		let header_for_verify = target_header.clone();
1638		verifier.expect_verify().return_once(move |_proof| {
1639			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1640		});
1641		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1642		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1643		let mut warp_sync =
1644			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1645
1646		// Make sure we have enough peers to make a request.
1647		for best_number in 1..11 {
1648			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1649		}
1650
1651		// Manually set `TargetBlock` phase.
1652		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1653
1654		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1655
1656		// Empty block response received.
1657		let response = Vec::new();
1658		// Peer is dropped.
1659		assert!(matches!(
1660			warp_sync.on_block_response_inner(peer_id, request, response),
1661			Err(BadPeer(id, _rep)) if id == peer_id,
1662		));
1663	}
1664
1665	#[test]
1666	fn target_block_response_with_extra_blocks_drops_peer() {
1667		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1668		let mut provider = MockWarpSyncProvider::<Block>::new();
1669		let mut verifier = MockVerifier::<Block>::new();
1670		let header_for_ctx = client.info().genesis_hash;
1671		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1672		let target_block = BlockBuilderBuilder::new(&*client)
1673			.on_parent_block(client.chain_info().best_hash)
1674			.with_parent_block_number(client.chain_info().best_number)
1675			.build()
1676			.unwrap()
1677			.build()
1678			.unwrap()
1679			.block;
1680
1681		let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1682			.on_parent_block(client.chain_info().best_hash)
1683			.with_parent_block_number(client.chain_info().best_number)
1684			.build()
1685			.unwrap();
1686		extra_block_builder
1687			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1688			.unwrap();
1689		let extra_block = extra_block_builder.build().unwrap().block;
1690
1691		let target_header = target_block.header().clone();
1692		// Warp proof is complete.
1693		let header_for_verify = target_header.clone();
1694		verifier.expect_verify().return_once(move |_proof| {
1695			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1696		});
1697		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1698		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1699		let mut warp_sync =
1700			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1701
1702		// Make sure we have enough peers to make a request.
1703		for best_number in 1..11 {
1704			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1705		}
1706
1707		// Manually set `TargetBlock` phase.
1708		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1709
1710		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1711
1712		// Block response with extra blocks received.
1713		let response = vec![
1714			BlockData::<Block> {
1715				hash: target_block.header().hash(),
1716				header: Some(target_block.header().clone()),
1717				body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1718				indexed_body: None,
1719				receipt: None,
1720				message_queue: None,
1721				justification: None,
1722				justifications: None,
1723			},
1724			BlockData::<Block> {
1725				hash: extra_block.header().hash(),
1726				header: Some(extra_block.header().clone()),
1727				body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1728				indexed_body: None,
1729				receipt: None,
1730				message_queue: None,
1731				justification: None,
1732				justifications: None,
1733			},
1734		];
1735		// Peer is dropped.
1736		assert!(matches!(
1737			warp_sync.on_block_response_inner(peer_id, request, response),
1738			Err(BadPeer(id, _rep)) if id == peer_id,
1739		));
1740	}
1741
1742	#[test]
1743	fn target_block_response_with_wrong_block_drops_peer() {
1744		sp_tracing::try_init_simple();
1745
1746		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1747		let mut provider = MockWarpSyncProvider::<Block>::new();
1748		let mut verifier = MockVerifier::<Block>::new();
1749		let header_for_ctx = client.info().genesis_hash;
1750		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1751		let target_block = BlockBuilderBuilder::new(&*client)
1752			.on_parent_block(client.chain_info().best_hash)
1753			.with_parent_block_number(client.chain_info().best_number)
1754			.build()
1755			.unwrap()
1756			.build()
1757			.unwrap()
1758			.block;
1759
1760		let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1761			.on_parent_block(client.chain_info().best_hash)
1762			.with_parent_block_number(client.chain_info().best_number)
1763			.build()
1764			.unwrap();
1765		wrong_block_builder
1766			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1767			.unwrap();
1768		let wrong_block = wrong_block_builder.build().unwrap().block;
1769
1770		let target_header = target_block.header().clone();
1771		// Warp proof is complete.
1772		let header_for_verify = target_header.clone();
1773		verifier.expect_verify().return_once(move |_proof| {
1774			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1775		});
1776		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1777		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1778		let mut warp_sync =
1779			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1780
1781		// Make sure we have enough peers to make a request.
1782		for best_number in 1..11 {
1783			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1784		}
1785
1786		// Manually set `TargetBlock` phase.
1787		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1788
1789		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1790
1791		// Wrong block received.
1792		let response = vec![BlockData::<Block> {
1793			hash: wrong_block.header().hash(),
1794			header: Some(wrong_block.header().clone()),
1795			body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1796			indexed_body: None,
1797			receipt: None,
1798			message_queue: None,
1799			justification: None,
1800			justifications: None,
1801		}];
1802		// Peer is dropped.
1803		assert!(matches!(
1804			warp_sync.on_block_response_inner(peer_id, request, response),
1805			Err(BadPeer(id, _rep)) if id == peer_id,
1806		));
1807	}
1808
1809	#[test]
1810	fn correct_target_block_response_sets_strategy_result() {
1811		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1812		let mut provider = MockWarpSyncProvider::<Block>::new();
1813		let mut verifier = MockVerifier::<Block>::new();
1814		let header_for_ctx = client.info().genesis_hash;
1815		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1816		let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1817			.on_parent_block(client.chain_info().best_hash)
1818			.with_parent_block_number(client.chain_info().best_number)
1819			.build()
1820			.unwrap();
1821		target_block_builder
1822			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1823			.unwrap();
1824		let target_block = target_block_builder.build().unwrap().block;
1825		let target_header = target_block.header().clone();
1826		// Warp proof is complete.
1827		let header_for_verify = target_header.clone();
1828		verifier.expect_verify().return_once(move |_proof| {
1829			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1830		});
1831		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1832		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1833		let mut warp_sync =
1834			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1835
1836		// Make sure we have enough peers to make a request.
1837		for best_number in 1..11 {
1838			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1839		}
1840
1841		// Manually set `TargetBlock` phase.
1842		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1843
1844		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1845
1846		// Correct block received.
1847		let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1848		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1849		let response = vec![BlockData::<Block> {
1850			hash: target_block.header().hash(),
1851			header: Some(target_block.header().clone()),
1852			body: body.clone(),
1853			indexed_body: None,
1854			receipt: None,
1855			message_queue: None,
1856			justification: None,
1857			justifications: justifications.clone(),
1858		}];
1859
1860		assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1861
1862		let network_provider = NetworkServiceProvider::new();
1863		let network_handle = network_provider.handle();
1864
1865		// Strategy finishes.
1866		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1867		assert_eq!(actions.len(), 1);
1868		assert!(matches!(actions[0], SyncingAction::Finished));
1869
1870		// With correct result.
1871		let result = warp_sync.take_result().unwrap();
1872		assert_eq!(result.target_header, *target_block.header());
1873		assert_eq!(result.target_body, body);
1874		assert_eq!(result.target_justifications, justifications);
1875	}
1876}