Skip to main content

soil_network/sync/strategy/
warp.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Warp syncing strategy. Bootstraps chain by downloading warp proofs and state.
8
9use soil_client::consensus::BlockOrigin;
10use soil_client::import::IncomingBlock;
11
12use crate::{
13	sync::{
14		block_relay_protocol::{BlockDownloader, BlockResponseError},
15		service::network::NetworkServiceHandle,
16		strategy::{
17			chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
18			SyncingAction,
19		},
20		types::{BadPeer, SyncState, SyncStatus},
21	},
22	LOG_TARGET,
23};
24use codec::{Decode, Encode};
25use futures::{channel::oneshot, FutureExt};
26use log::{debug, error, trace, warn};
27use soil_client::blockchain::HeaderBackend;
28use soil_network::common::sync::message::{
29	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
30};
31use soil_network::types::PeerId;
32use soil_network::{IfDisconnected, ProtocolName};
33use std::{any::Any, collections::HashMap, fmt, sync::Arc};
34use subsoil::runtime::{
35	traits::{Block as BlockT, Header, NumberFor, Zero},
36	Justifications, SaturatedConversion,
37};
38
39/// Number of peers that need to be connected before warp sync is started.
40const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
41
42/// Scale-encoded warp sync proof response.
43pub struct EncodedProof(pub Vec<u8>);
44
45/// Warp sync request
46#[derive(Encode, Decode, Debug, Clone)]
47pub struct WarpProofRequest<B: BlockT> {
48	/// Start collecting proofs from this block.
49	pub begin: B::Hash,
50}
51
52/// Verifier for warp sync proofs. Each verifier operates in a specific context.
53pub trait Verifier<Block: BlockT>: Send + Sync {
54	/// Verify a warp sync proof.
55	fn verify(
56		&mut self,
57		proof: &EncodedProof,
58	) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
59	/// Hash to be used as the starting point for the next proof request.
60	fn next_proof_context(&self) -> Block::Hash;
61	/// Get status text for progress reporting
62	fn status(&self) -> Option<String>;
63}
64
65/// Proof verification result.
66pub enum VerificationResult<Block: BlockT> {
67	/// Proof is valid, but the target was not reached.
68	Partial(Vec<(Block::Header, Justifications)>),
69	/// Target finality is proved.
70	Complete(Block::Header, Vec<(Block::Header, Justifications)>),
71}
72
73/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
74pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
75	/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
76	/// size is reached.
77	fn generate(
78		&self,
79		start: Block::Hash,
80	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
81	/// Create a verifier for warp sync proofs.
82	fn create_verifier(&self) -> Box<dyn Verifier<Block>>;
83}
84
85mod rep {
86	use soil_network::ReputationChange as Rep;
87
88	/// Unexpected response received form a peer
89	pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
90
91	/// Peer provided invalid warp proof data
92	pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
93
94	/// Peer did not provide us with advertised block data.
95	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
96
97	/// Reputation change for peers which send us non-requested block data.
98	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
99
100	/// Reputation change for peers which send us a block which we fail to verify.
101	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
102
103	/// We received a message that failed to decode.
104	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
105}
106
107/// Reported warp sync phase.
108#[derive(Clone, Eq, PartialEq, Debug)]
109pub enum WarpSyncPhase<Block: BlockT> {
110	/// Waiting for peers to connect.
111	AwaitingPeers { required_peers: usize },
112	/// Downloading and verifying warp proofs.
113	DownloadingWarpProofs,
114	/// Downloading target block.
115	DownloadingTargetBlock,
116	/// Downloading state data.
117	DownloadingState,
118	/// Importing state.
119	ImportingState,
120	/// Downloading block history.
121	DownloadingBlocks(NumberFor<Block>),
122	/// Warp sync is complete.
123	Complete,
124}
125
126impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
127	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128		match self {
129			Self::AwaitingPeers { required_peers } => {
130				write!(f, "Waiting for {required_peers} peers to be connected")
131			},
132			Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
133			Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
134			Self::DownloadingState => write!(f, "Downloading state"),
135			Self::ImportingState => write!(f, "Importing state"),
136			Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
137			Self::Complete => write!(f, "Warp sync is complete"),
138		}
139	}
140}
141
142/// Reported warp sync progress.
143#[derive(Clone, Eq, PartialEq, Debug)]
144pub struct WarpSyncProgress<Block: BlockT> {
145	/// Estimated download percentage.
146	pub phase: WarpSyncPhase<Block>,
147	/// Total bytes downloaded so far.
148	pub total_bytes: u64,
149	/// Optional status text from the verifier.
150	pub status: Option<String>,
151}
152
153/// Warp sync configuration as accepted by [`WarpSync`].
154pub enum WarpSyncConfig<Block: BlockT> {
155	/// Standard warp sync for the chain.
156	WithProvider(Arc<dyn WarpSyncProvider<Block>>),
157	/// Skip downloading proofs and use provided header of the state that should be downloaded.
158	///
159	/// It is expected that the header provider ensures that the header is trusted.
160	WithTarget(<Block as BlockT>::Header),
161}
162
163/// Warp sync phase used by warp sync state machine.
164enum Phase<B: BlockT> {
165	/// Waiting for enough peers to connect.
166	WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
167	/// Downloading warp proofs.
168	WarpProof { verifier: Box<dyn Verifier<B>> },
169	/// Downloading target block.
170	TargetBlock(B::Header),
171	/// Warp sync is complete.
172	Complete,
173}
174
175enum PeerState {
176	Available,
177	DownloadingProofs,
178	DownloadingTargetBlock,
179}
180
181impl PeerState {
182	fn is_available(&self) -> bool {
183		matches!(self, PeerState::Available)
184	}
185}
186
187struct Peer<B: BlockT> {
188	best_number: NumberFor<B>,
189	state: PeerState,
190}
191
192pub struct WarpSyncResult<B: BlockT> {
193	pub target_header: B::Header,
194	pub target_body: Option<Vec<B::Extrinsic>>,
195	pub target_justifications: Option<Justifications>,
196}
197
198/// Warp sync state machine. Accumulates warp proofs and state.
199pub struct WarpSync<B: BlockT> {
200	phase: Phase<B>,
201	total_proof_bytes: u64,
202	total_state_bytes: u64,
203	peers: HashMap<PeerId, Peer<B>>,
204	disconnected_peers: DisconnectedPeers,
205	protocol_name: Option<ProtocolName>,
206	block_downloader: Arc<dyn BlockDownloader<B>>,
207	actions: Vec<SyncingAction<B>>,
208	result: Option<WarpSyncResult<B>>,
209	/// Number of peers that need to be connected before warp sync is started.
210	min_peers_to_start_warp_sync: usize,
211}
212
213impl<B> WarpSync<B>
214where
215	B: BlockT,
216{
217	/// Strategy key used by warp sync.
218	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
219
220	/// Create a new instance. When passing a warp sync provider we will be checking for proof and
221	/// authorities. Alternatively we can pass a target block when we want to skip downloading
222	/// proofs, in this case we will continue polling until the target block is known.
223	pub fn new<Client>(
224		client: Arc<Client>,
225		warp_sync_config: WarpSyncConfig<B>,
226		protocol_name: Option<ProtocolName>,
227		block_downloader: Arc<dyn BlockDownloader<B>>,
228		min_peers_to_start_warp_sync: Option<usize>,
229	) -> Self
230	where
231		Client: HeaderBackend<B> + 'static,
232	{
233		let min_peers_to_start_warp_sync =
234			min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
235		if client.info().finalized_state.is_some() {
236			error!(
237				target: LOG_TARGET,
238				"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
239			);
240			return Self {
241				phase: Phase::Complete,
242				total_proof_bytes: 0,
243				total_state_bytes: 0,
244				peers: HashMap::new(),
245				disconnected_peers: DisconnectedPeers::new(),
246				protocol_name,
247				block_downloader,
248				actions: vec![SyncingAction::Finished],
249				result: None,
250				min_peers_to_start_warp_sync,
251			};
252		}
253
254		let phase = match warp_sync_config {
255			WarpSyncConfig::WithProvider(warp_sync_provider) => {
256				Phase::WaitingForPeers { warp_sync_provider }
257			},
258			WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
259		};
260
261		Self {
262			phase,
263			total_proof_bytes: 0,
264			total_state_bytes: 0,
265			peers: HashMap::new(),
266			disconnected_peers: DisconnectedPeers::new(),
267			protocol_name,
268			block_downloader,
269			actions: Vec::new(),
270			result: None,
271			min_peers_to_start_warp_sync,
272		}
273	}
274
275	/// Notify that a new peer has connected.
276	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
277		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
278
279		self.try_to_start_warp_sync();
280	}
281
282	/// Notify that a peer has disconnected.
283	pub fn remove_peer(&mut self, peer_id: &PeerId) {
284		if let Some(state) = self.peers.remove(peer_id) {
285			if !state.state.is_available() {
286				if let Some(bad_peer) =
287					self.disconnected_peers.on_disconnect_during_request(*peer_id)
288				{
289					self.actions.push(SyncingAction::DropPeer(bad_peer));
290				}
291			}
292		}
293	}
294
295	/// Submit a validated block announcement.
296	///
297	/// Returns new best hash & best number of the peer if they are updated.
298	#[must_use]
299	pub fn on_validated_block_announce(
300		&mut self,
301		is_best: bool,
302		peer_id: PeerId,
303		announce: &BlockAnnounce<B::Header>,
304	) -> Option<(B::Hash, NumberFor<B>)> {
305		is_best.then(|| {
306			let best_number = *announce.header.number();
307			let best_hash = announce.header.hash();
308			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
309				peer.best_number = best_number;
310			}
311			// Let `SyncingEngine` know that we should update the peer info.
312			(best_hash, best_number)
313		})
314	}
315
316	/// Start warp sync as soon as we have enough peers.
317	fn try_to_start_warp_sync(&mut self) {
318		let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
319
320		if self.peers.len() < self.min_peers_to_start_warp_sync {
321			return;
322		}
323
324		let verifier = warp_sync_provider.create_verifier();
325		self.phase = Phase::WarpProof { verifier };
326		debug!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
327	}
328
329	pub fn on_generic_response(
330		&mut self,
331		peer_id: &PeerId,
332		protocol_name: ProtocolName,
333		response: Box<dyn Any + Send>,
334	) {
335		if &protocol_name == self.block_downloader.protocol_name() {
336			let Ok(response) = response
337				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
338			else {
339				warn!(target: LOG_TARGET, "Failed to downcast block response");
340				debug_assert!(false);
341				return;
342			};
343
344			let (request, response) = *response;
345			let blocks = match response {
346				Ok(blocks) => blocks,
347				Err(BlockResponseError::DecodeFailed(e)) => {
348					debug!(
349						target: LOG_TARGET,
350						"Failed to decode block response from peer {:?}: {:?}.",
351						peer_id,
352						e
353					);
354					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
355					return;
356				},
357				Err(BlockResponseError::ExtractionFailed(e)) => {
358					debug!(
359						target: LOG_TARGET,
360						"Failed to extract blocks from peer response {:?}: {:?}.",
361						peer_id,
362						e
363					);
364					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
365					return;
366				},
367			};
368
369			self.on_block_response(*peer_id, request, blocks);
370		} else {
371			let Ok(response) = response.downcast::<Vec<u8>>() else {
372				warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
373				debug_assert!(false);
374				return;
375			};
376
377			self.on_warp_proof_response(peer_id, EncodedProof(*response));
378		}
379	}
380
381	/// Process warp proof response.
382	pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
383		if let Some(peer) = self.peers.get_mut(peer_id) {
384			peer.state = PeerState::Available;
385		}
386
387		let Phase::WarpProof { verifier } = &mut self.phase else {
388			debug!(target: LOG_TARGET, "Unexpected warp proof response");
389			self.actions
390				.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
391			return;
392		};
393
394		let proof_to_incoming_block =
395			|(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
396				IncomingBlock {
397					hash: header.hash(),
398					header: Some(header),
399					body: None,
400					indexed_body: None,
401					justifications: Some(justifications),
402					origin: Some((*peer_id).into()),
403					// We are still in warp sync, so we don't have the state. This means
404					// we also can't execute the block.
405					allow_missing_state: true,
406					skip_execution: true,
407					// Shouldn't already exist in the database.
408					import_existing: false,
409					state: None,
410				}
411			};
412
413		match verifier.verify(&response) {
414			Err(e) => {
415				debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
416				self.actions
417					.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
418			},
419			Ok(VerificationResult::Partial(proofs)) => {
420				debug!(target: LOG_TARGET, "Verified partial proof");
421				self.total_proof_bytes += response.0.len() as u64;
422				self.actions.push(SyncingAction::ImportBlocks {
423					origin: BlockOrigin::WarpSync,
424					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
425				});
426			},
427			Ok(VerificationResult::Complete(header, proofs)) => {
428				debug!(
429					target: LOG_TARGET,
430					"Verified complete proof. Continuing with target block download: {} ({}).",
431					header.hash(),
432					header.number(),
433				);
434				self.total_proof_bytes += response.0.len() as u64;
435				self.phase = Phase::TargetBlock(header.clone());
436				let incoming_blocks: Vec<_> = proofs
437					.into_iter()
438					.map(proof_to_incoming_block)
439					.filter(|i| {
440						// We need target block with state and warp sync does not provide this.
441						// That's why we filter out target block here, otherwise oncoming state sync
442						// (which comes after warp sync) will abort because target block is already
443						// imported.
444						if header.number() != i.header.as_ref().unwrap().number() {
445							true
446						} else {
447							log::trace!(
448								target: LOG_TARGET,
449								"Filtered out target block: {} ({})",
450								header.hash(),
451								header.number()
452							);
453							false
454						}
455					})
456					.collect();
457				self.actions.push(SyncingAction::ImportBlocks {
458					origin: BlockOrigin::WarpSync,
459					blocks: incoming_blocks,
460				});
461			},
462		}
463	}
464
465	/// Process (target) block response.
466	pub fn on_block_response(
467		&mut self,
468		peer_id: PeerId,
469		request: BlockRequest<B>,
470		blocks: Vec<BlockData<B>>,
471	) {
472		if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
473			self.actions.push(SyncingAction::DropPeer(bad_peer));
474		}
475	}
476
477	fn on_block_response_inner(
478		&mut self,
479		peer_id: PeerId,
480		request: BlockRequest<B>,
481		mut blocks: Vec<BlockData<B>>,
482	) -> Result<(), BadPeer> {
483		if let Some(peer) = self.peers.get_mut(&peer_id) {
484			peer.state = PeerState::Available;
485		}
486
487		let Phase::TargetBlock(header) = &mut self.phase else {
488			debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
489			return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE));
490		};
491
492		if blocks.is_empty() {
493			debug!(
494				target: LOG_TARGET,
495				"Downloading target block failed: empty block response from {peer_id}",
496			);
497			return Err(BadPeer(peer_id, rep::NO_BLOCK));
498		}
499
500		if blocks.len() > 1 {
501			debug!(
502				target: LOG_TARGET,
503				"Too many blocks ({}) in warp target block response from {peer_id}",
504				blocks.len(),
505			);
506			return Err(BadPeer(peer_id, rep::NOT_REQUESTED));
507		}
508
509		validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
510
511		let block = blocks.pop().expect("`blocks` len checked above; qed");
512
513		let Some(block_header) = &block.header else {
514			debug!(
515				target: LOG_TARGET,
516				"Downloading target block failed: missing header in response from {peer_id}.",
517			);
518			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
519		};
520
521		if block_header != header {
522			debug!(
523				target: LOG_TARGET,
524				"Downloading target block failed: different header in response from {peer_id}.",
525			);
526			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
527		}
528
529		if block.body.is_none() {
530			debug!(
531				target: LOG_TARGET,
532				"Downloading target block failed: missing body in response from {peer_id}.",
533			);
534			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
535		}
536
537		self.result = Some(WarpSyncResult {
538			target_header: header.clone(),
539			target_body: block.body,
540			target_justifications: block.justifications,
541		});
542		self.phase = Phase::Complete;
543		self.actions.push(SyncingAction::Finished);
544		Ok(())
545	}
546
547	/// Reserve a peer for a request assigning `new_state`.
548	fn schedule_next_peer(
549		&mut self,
550		new_state: PeerState,
551		min_best_number: Option<NumberFor<B>>,
552	) -> Option<PeerId> {
553		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
554		if targets.is_empty() {
555			return None;
556		}
557		targets.sort();
558		let median = targets[targets.len() / 2];
559		let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
560		// Find a random peer that is synced as much as peer majority and is above
561		// `min_best_number`.
562		for (peer_id, peer) in self.peers.iter_mut() {
563			if peer.state.is_available()
564				&& peer.best_number >= threshold
565				&& self.disconnected_peers.is_peer_available(peer_id)
566			{
567				peer.state = new_state;
568				return Some(*peer_id);
569			}
570		}
571		None
572	}
573
574	/// Produce warp proof request.
575	fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
576		let Phase::WarpProof { verifier } = &self.phase else { return None };
577
578		// Copy verifier context early to cut the borrowing tie.
579		let begin = verifier.next_proof_context();
580
581		if self
582			.peers
583			.values()
584			.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
585		{
586			// Only one warp proof request at a time is possible.
587			return None;
588		}
589
590		let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
591		trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
592
593		let request = WarpProofRequest { begin };
594
595		let Some(protocol_name) = self.protocol_name.clone() else {
596			warn!(
597				target: LOG_TARGET,
598				"Trying to send warp sync request when no protocol is configured {request:?}",
599			);
600			return None;
601		};
602
603		Some((peer_id, protocol_name, request))
604	}
605
606	/// Produce target block request.
607	fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
608		let Phase::TargetBlock(target_header) = &self.phase else { return None };
609
610		if self
611			.peers
612			.values()
613			.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
614		{
615			// Only one target block request at a time is possible.
616			return None;
617		}
618
619		// Cut the borrowing tie.
620		let target_hash = target_header.hash();
621		let target_number = *target_header.number();
622
623		let peer_id =
624			self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
625
626		trace!(
627			target: LOG_TARGET,
628			"New target block request to {peer_id}, target: {} ({}).",
629			target_hash,
630			target_number,
631		);
632
633		Some((
634			peer_id,
635			BlockRequest::<B> {
636				id: 0,
637				fields: BlockAttributes::HEADER
638					| BlockAttributes::BODY
639					| BlockAttributes::JUSTIFICATION,
640				from: FromBlock::Hash(target_hash),
641				direction: Direction::Ascending,
642				max: Some(1),
643			},
644		))
645	}
646
647	/// Returns warp sync estimated progress (stage, bytes received).
648	pub fn progress(&self) -> WarpSyncProgress<B> {
649		match &self.phase {
650			Phase::WaitingForPeers { .. } => WarpSyncProgress {
651				phase: WarpSyncPhase::AwaitingPeers {
652					required_peers: self.min_peers_to_start_warp_sync,
653				},
654				total_bytes: self.total_proof_bytes,
655				status: None,
656			},
657			Phase::WarpProof { verifier } => WarpSyncProgress {
658				phase: WarpSyncPhase::DownloadingWarpProofs,
659				total_bytes: self.total_proof_bytes,
660				status: verifier.status(),
661			},
662			Phase::TargetBlock(_) => WarpSyncProgress {
663				phase: WarpSyncPhase::DownloadingTargetBlock,
664				total_bytes: self.total_proof_bytes,
665				status: None,
666			},
667			Phase::Complete => WarpSyncProgress {
668				phase: WarpSyncPhase::Complete,
669				total_bytes: self.total_proof_bytes + self.total_state_bytes,
670				status: None,
671			},
672		}
673	}
674
675	/// Get the number of peers known to warp sync.
676	pub fn num_peers(&self) -> usize {
677		self.peers.len()
678	}
679
680	/// Returns the current sync status.
681	pub fn status(&self) -> SyncStatus<B> {
682		SyncStatus {
683			state: match &self.phase {
684				Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
685				Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
686				Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
687				Phase::Complete => SyncState::Idle,
688			},
689			best_seen_block: match &self.phase {
690				Phase::WaitingForPeers { .. } => None,
691				Phase::WarpProof { .. } => None,
692				Phase::TargetBlock(header) => Some(*header.number()),
693				Phase::Complete => None,
694			},
695			num_peers: self.peers.len().saturated_into(),
696			queued_blocks: 0,
697			state_sync: None,
698			warp_sync: Some(self.progress()),
699		}
700	}
701
702	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
703	#[must_use]
704	pub fn actions(
705		&mut self,
706		network_service: &NetworkServiceHandle,
707	) -> impl Iterator<Item = SyncingAction<B>> {
708		let warp_proof_request =
709			self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
710				trace!(
711					target: LOG_TARGET,
712					"Created `WarpProofRequest` to {}, request: {:?}.",
713					peer_id,
714					request,
715				);
716
717				let (tx, rx) = oneshot::channel();
718
719				network_service.start_request(
720					peer_id,
721					protocol_name,
722					request.encode(),
723					tx,
724					IfDisconnected::ImmediateError,
725				);
726
727				SyncingAction::StartRequest {
728					peer_id,
729					key: Self::STRATEGY_KEY,
730					request: async move {
731						Ok(rx.await?.and_then(|(response, protocol_name)| {
732							Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
733						}))
734					}
735					.boxed(),
736					remove_obsolete: false,
737				}
738			});
739		self.actions.extend(warp_proof_request);
740
741		let target_block_request =
742			self.target_block_request().into_iter().map(|(peer_id, request)| {
743				let downloader = self.block_downloader.clone();
744
745				SyncingAction::StartRequest {
746					peer_id,
747					key: Self::STRATEGY_KEY,
748					request: async move {
749						Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
750							|(response, protocol_name)| {
751								let decoded_response =
752									downloader.block_response_into_blocks(&request, response);
753								let result =
754									Box::new((request, decoded_response)) as Box<dyn Any + Send>;
755								Ok((result, protocol_name))
756							},
757						))
758					}
759					.boxed(),
760					// Sending block request implies dropping obsolete pending response as we are
761					// not interested in it anymore.
762					remove_obsolete: true,
763				}
764			});
765		self.actions.extend(target_block_request);
766
767		std::mem::take(&mut self.actions).into_iter()
768	}
769
770	/// Take the result of finished warp sync, returning `None` if the sync was unsuccessful.
771	#[must_use]
772	pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
773		self.result.take()
774	}
775}
776
777#[cfg(test)]
778mod test {
779	use super::*;
780	use crate::sync::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
781	use soil_client::block_builder::BlockBuilderBuilder;
782	use soil_client::blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
783	use std::{io::ErrorKind, sync::Arc};
784	use subsoil::core::H256;
785	use subsoil::runtime::{
786		traits::{Block as BlockT, Header as HeaderT, NumberFor},
787		ConsensusEngineId,
788	};
789	use soil_test_node_runtime_client::{
790		runtime::{Block, Hash},
791		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
792	};
793
794	pub const TEST_ENGINE_ID: ConsensusEngineId = *b"TEST";
795
796	mockall::mock! {
797		pub Client<B: BlockT> {}
798
799		impl<B: BlockT> HeaderBackend<B> for Client<B> {
800			fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
801			fn info(&self) -> Info<B>;
802			fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
803			fn number(
804				&self,
805				hash: B::Hash,
806			) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
807			fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
808		}
809	}
810
811	mockall::mock! {
812		pub WarpSyncProvider<B: BlockT> {}
813
814		impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
815			fn generate(
816				&self,
817				start: B::Hash,
818			) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
819			fn create_verifier(&self) -> Box<dyn super::Verifier<B>>;
820		}
821	}
822
823	mockall::mock! {
824		pub Verifier<B: BlockT> {}
825
826		impl<B: BlockT> super::Verifier<B> for Verifier<B> {
827			fn verify(
828				&mut self,
829				proof: &EncodedProof,
830			) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
831			fn next_proof_context(&self) -> B::Hash;
832			fn status(&self) -> Option<String>;
833		}
834	}
835
836	fn mock_client_with_state() -> MockClient<Block> {
837		let mut client = MockClient::<Block>::new();
838		let genesis_hash = Hash::random();
839		client.expect_info().return_once(move || Info {
840			best_hash: genesis_hash,
841			best_number: 0,
842			genesis_hash,
843			finalized_hash: genesis_hash,
844			finalized_number: 0,
845			// We need some finalized state to render warp sync impossible.
846			finalized_state: Some((genesis_hash, 0)),
847			number_leaves: 0,
848			block_gap: None,
849		});
850
851		client
852	}
853
854	fn mock_client_without_state() -> MockClient<Block> {
855		let mut client = MockClient::<Block>::new();
856		let genesis_hash = Hash::random();
857		client.expect_info().returning(move || Info {
858			best_hash: genesis_hash,
859			best_number: 0,
860			genesis_hash,
861			finalized_hash: genesis_hash,
862			finalized_number: 0,
863			finalized_state: None,
864			number_leaves: 0,
865			block_gap: None,
866		});
867
868		client
869	}
870
871	#[test]
872	fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
873		let client = mock_client_with_state();
874		let provider = MockWarpSyncProvider::<Block>::new();
875		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
876		let mut warp_sync = WarpSync::new(
877			Arc::new(client),
878			config,
879			None,
880			Arc::new(MockBlockDownloader::new()),
881			None,
882		);
883
884		let network_provider = NetworkServiceProvider::new();
885		let network_handle = network_provider.handle();
886
887		// Warp sync instantly finishes
888		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
889		assert_eq!(actions.len(), 1);
890		assert!(matches!(actions[0], SyncingAction::Finished));
891
892		// ... with no result.
893		assert!(warp_sync.take_result().is_none());
894	}
895
896	#[test]
897	fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
898		let client = mock_client_with_state();
899		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
900			1,
901			Default::default(),
902			Default::default(),
903			Default::default(),
904			Default::default(),
905		));
906		let mut warp_sync = WarpSync::new(
907			Arc::new(client),
908			config,
909			None,
910			Arc::new(MockBlockDownloader::new()),
911			None,
912		);
913
914		let network_provider = NetworkServiceProvider::new();
915		let network_handle = network_provider.handle();
916
917		// Warp sync instantly finishes
918		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
919		assert_eq!(actions.len(), 1);
920		assert!(matches!(actions[0], SyncingAction::Finished));
921
922		// ... with no result.
923		assert!(warp_sync.take_result().is_none());
924	}
925
926	#[test]
927	fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
928		let client = mock_client_without_state();
929		let provider = MockWarpSyncProvider::<Block>::new();
930		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
931		let mut warp_sync = WarpSync::new(
932			Arc::new(client),
933			config,
934			None,
935			Arc::new(MockBlockDownloader::new()),
936			None,
937		);
938
939		let network_provider = NetworkServiceProvider::new();
940		let network_handle = network_provider.handle();
941
942		// No actions are emitted.
943		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
944	}
945
946	#[test]
947	fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
948		let client = mock_client_without_state();
949		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
950			1,
951			Default::default(),
952			Default::default(),
953			Default::default(),
954			Default::default(),
955		));
956		let mut warp_sync = WarpSync::new(
957			Arc::new(client),
958			config,
959			None,
960			Arc::new(MockBlockDownloader::new()),
961			None,
962		);
963
964		let network_provider = NetworkServiceProvider::new();
965		let network_handle = network_provider.handle();
966
967		// No actions are emitted.
968		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
969	}
970
971	#[test]
972	fn warp_sync_is_started_only_when_there_is_enough_peers() {
973		let client = mock_client_without_state();
974		let mut provider = MockWarpSyncProvider::<Block>::new();
975		let mut verifier = MockVerifier::<Block>::new();
976		verifier.expect_next_proof_context().returning(|| Hash::random());
977		verifier
978			.expect_verify()
979			.returning(|_| unreachable!("verify should not be called in this test"));
980		provider.expect_create_verifier().return_once(move || Box::new(verifier));
981		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
982		let mut warp_sync = WarpSync::new(
983			Arc::new(client),
984			config,
985			None,
986			Arc::new(MockBlockDownloader::new()),
987			None,
988		);
989
990		// Warp sync is not started when there is not enough peers.
991		for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
992			warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
993			assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
994		}
995
996		// Now we have enough peers and warp sync is started.
997		warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
998		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
999	}
1000
1001	#[test]
1002	fn no_peer_is_scheduled_if_no_peers_connected() {
1003		let client = mock_client_without_state();
1004		let provider = MockWarpSyncProvider::<Block>::new();
1005		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1006		let mut warp_sync = WarpSync::new(
1007			Arc::new(client),
1008			config,
1009			None,
1010			Arc::new(MockBlockDownloader::new()),
1011			None,
1012		);
1013
1014		assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
1015	}
1016
1017	#[test]
1018	fn enough_peers_are_used_in_tests() {
1019		// Tests below use 10 peers. Fail early if it's less than a threshold for warp sync.
1020		assert!(
1021			10 >= MIN_PEERS_TO_START_WARP_SYNC,
1022			"Tests must be updated to use that many initial peers.",
1023		);
1024	}
1025
1026	#[test]
1027	fn at_least_median_synced_peer_is_scheduled() {
1028		for _ in 0..100 {
1029			let client = mock_client_without_state();
1030			let mut provider = MockWarpSyncProvider::<Block>::new();
1031			let mut verifier = MockVerifier::<Block>::new();
1032			verifier.expect_next_proof_context().returning(|| Hash::random());
1033			verifier
1034				.expect_verify()
1035				.returning(|_| unreachable!("verify should not be called in this test"));
1036			provider.expect_create_verifier().return_once(move || Box::new(verifier));
1037			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1038			let mut warp_sync = WarpSync::new(
1039				Arc::new(client),
1040				config,
1041				None,
1042				Arc::new(MockBlockDownloader::new()),
1043				None,
1044			);
1045
1046			for best_number in 1..11 {
1047				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1048			}
1049
1050			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1051			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1052		}
1053	}
1054
1055	#[test]
1056	fn min_best_number_peer_is_scheduled() {
1057		for _ in 0..10 {
1058			let client = mock_client_without_state();
1059			let mut provider = MockWarpSyncProvider::<Block>::new();
1060			let mut verifier = MockVerifier::<Block>::new();
1061			verifier.expect_next_proof_context().returning(|| Hash::random());
1062			verifier
1063				.expect_verify()
1064				.returning(|_| unreachable!("verify should not be called in this test"));
1065			provider.expect_create_verifier().return_once(move || Box::new(verifier));
1066			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1067			let mut warp_sync = WarpSync::new(
1068				Arc::new(client),
1069				config,
1070				None,
1071				Arc::new(MockBlockDownloader::new()),
1072				None,
1073			);
1074
1075			for best_number in 1..11 {
1076				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1077			}
1078
1079			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1080			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1081		}
1082	}
1083
1084	#[test]
1085	fn backedoff_number_peer_is_not_scheduled() {
1086		let client = mock_client_without_state();
1087		let mut provider = MockWarpSyncProvider::<Block>::new();
1088		let mut verifier = MockVerifier::<Block>::new();
1089		verifier.expect_next_proof_context().returning(|| Hash::random());
1090		verifier
1091			.expect_verify()
1092			.returning(|_| unreachable!("verify should not be called in this test"));
1093		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1094		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1095		let mut warp_sync = WarpSync::new(
1096			Arc::new(client),
1097			config,
1098			None,
1099			Arc::new(MockBlockDownloader::new()),
1100			None,
1101		);
1102
1103		for best_number in 1..11 {
1104			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1105		}
1106
1107		let ninth_peer =
1108			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1109		let tenth_peer =
1110			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1111
1112		// Disconnecting a peer without an inflight request has no effect on persistent states.
1113		warp_sync.remove_peer(&tenth_peer);
1114		assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1115
1116		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1117		let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1118		assert_eq!(tenth_peer, peer_id.unwrap());
1119		warp_sync.remove_peer(&tenth_peer);
1120
1121		// Peer is backed off.
1122		assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1123
1124		// No peer available for 10'th best block because of the backoff.
1125		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1126		let peer_id: Option<PeerId> =
1127			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1128		assert!(peer_id.is_none());
1129
1130		// Other requests can still happen.
1131		let peer_id: Option<PeerId> =
1132			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1133		assert_eq!(ninth_peer, peer_id.unwrap());
1134	}
1135
1136	#[test]
1137	fn no_warp_proof_request_in_another_phase() {
1138		let client = mock_client_without_state();
1139		let mut provider = MockWarpSyncProvider::<Block>::new();
1140		let mut verifier = MockVerifier::<Block>::new();
1141		verifier.expect_next_proof_context().returning(|| Hash::random());
1142		verifier
1143			.expect_verify()
1144			.returning(|_| unreachable!("verify should not be called in this test"));
1145		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1146		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1147		let mut warp_sync = WarpSync::new(
1148			Arc::new(client),
1149			config,
1150			Some(ProtocolName::Static("")),
1151			Arc::new(MockBlockDownloader::new()),
1152			None,
1153		);
1154
1155		// Make sure we have enough peers to make a request.
1156		for best_number in 1..11 {
1157			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1158		}
1159
1160		// Manually set to another phase.
1161		warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1162			1,
1163			Default::default(),
1164			Default::default(),
1165			Default::default(),
1166			Default::default(),
1167		));
1168
1169		// No request is made.
1170		assert!(warp_sync.warp_proof_request().is_none());
1171	}
1172
1173	#[test]
1174	fn warp_proof_request_starts_at_last_hash() {
1175		let client = mock_client_without_state();
1176		let mut provider = MockWarpSyncProvider::<Block>::new();
1177		let mut verifier = MockVerifier::<Block>::new();
1178		let known_last_hash = Hash::random();
1179		verifier.expect_next_proof_context().returning(move || known_last_hash);
1180		verifier
1181			.expect_verify()
1182			.returning(|_| unreachable!("verify should not be called in this test"));
1183		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1184		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1185		let mut warp_sync = WarpSync::new(
1186			Arc::new(client),
1187			config,
1188			Some(ProtocolName::Static("")),
1189			Arc::new(MockBlockDownloader::new()),
1190			None,
1191		);
1192
1193		// Make sure we have enough peers to make a request.
1194		for best_number in 1..11 {
1195			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1196		}
1197		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1198
1199		let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1200		assert_eq!(request.begin, known_last_hash);
1201	}
1202
1203	#[test]
1204	fn no_parallel_warp_proof_requests() {
1205		let client = mock_client_without_state();
1206		let mut provider = MockWarpSyncProvider::<Block>::new();
1207		let mut verifier = MockVerifier::<Block>::new();
1208		verifier.expect_next_proof_context().returning(|| Hash::random());
1209		verifier
1210			.expect_verify()
1211			.returning(|_| unreachable!("verify should not be called in this test"));
1212		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1213		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1214		let mut warp_sync = WarpSync::new(
1215			Arc::new(client),
1216			config,
1217			Some(ProtocolName::Static("")),
1218			Arc::new(MockBlockDownloader::new()),
1219			None,
1220		);
1221
1222		// Make sure we have enough peers to make requests.
1223		for best_number in 1..11 {
1224			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1225		}
1226		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1227
1228		// First request is made.
1229		assert!(warp_sync.warp_proof_request().is_some());
1230		// Second request is not made.
1231		assert!(warp_sync.warp_proof_request().is_none());
1232	}
1233
1234	#[test]
1235	fn bad_warp_proof_response_drops_peer() {
1236		let client = mock_client_without_state();
1237		let mut provider = MockWarpSyncProvider::<Block>::new();
1238		let mut verifier = MockVerifier::<Block>::new();
1239		verifier.expect_next_proof_context().returning(|| Hash::random());
1240		// Warp proof verification fails.
1241		verifier.expect_verify().return_once(|_proof| {
1242			Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1243		});
1244		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1245		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1246		let mut warp_sync = WarpSync::new(
1247			Arc::new(client),
1248			config,
1249			Some(ProtocolName::Static("")),
1250			Arc::new(MockBlockDownloader::new()),
1251			None,
1252		);
1253
1254		// Make sure we have enough peers to make a request.
1255		for best_number in 1..11 {
1256			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1257		}
1258		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1259
1260		let network_provider = NetworkServiceProvider::new();
1261		let network_handle = network_provider.handle();
1262
1263		// Consume `SendWarpProofRequest` action.
1264		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1265		assert_eq!(actions.len(), 1);
1266		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1267			panic!("Invalid action");
1268		};
1269
1270		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1271
1272		// We only interested in already generated actions, not new requests.
1273		let actions = std::mem::take(&mut warp_sync.actions);
1274		assert_eq!(actions.len(), 1);
1275		assert!(matches!(
1276			actions[0],
1277			SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1278		));
1279		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1280	}
1281
1282	#[test]
1283	fn partial_warp_proof_doesnt_advance_phase() {
1284		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1285		let mut provider = MockWarpSyncProvider::<Block>::new();
1286		let target_block = BlockBuilderBuilder::new(&*client)
1287			.on_parent_block(client.chain_info().best_hash)
1288			.with_parent_block_number(client.chain_info().best_number)
1289			.build()
1290			.unwrap()
1291			.build()
1292			.unwrap()
1293			.block;
1294		let target_header = target_block.header().clone();
1295		let justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1296		// Warp proof is partial.
1297		let mut verifier = MockVerifier::<Block>::new();
1298		let context = client.info().genesis_hash;
1299		verifier.expect_next_proof_context().returning(move || context);
1300		let header_for_verify = target_header.clone();
1301		let just_for_verify = justifications.clone();
1302		verifier.expect_verify().return_once(move |_proof| {
1303			Ok(VerificationResult::Partial(vec![(
1304				header_for_verify.clone(),
1305				just_for_verify.clone(),
1306			)]))
1307		});
1308		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1309		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1310		let mut warp_sync = WarpSync::new(
1311			client,
1312			config,
1313			Some(ProtocolName::Static("")),
1314			Arc::new(MockBlockDownloader::new()),
1315			None,
1316		);
1317
1318		// Make sure we have enough peers to make a request.
1319		for best_number in 1..11 {
1320			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1321		}
1322		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1323
1324		let network_provider = NetworkServiceProvider::new();
1325		let network_handle = network_provider.handle();
1326
1327		// Consume `SendWarpProofRequest` action.
1328		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1329		assert_eq!(actions.len(), 1);
1330		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1331			panic!("Invalid action");
1332		};
1333
1334		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1335
1336		assert_eq!(warp_sync.actions.len(), 1);
1337		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1338		else {
1339			panic!("Expected `ImportBlocks` action.");
1340		};
1341		assert_eq!(origin, BlockOrigin::WarpSync);
1342		assert_eq!(blocks.len(), 1);
1343		let import_block = blocks.pop().unwrap();
1344		assert_eq!(
1345			import_block,
1346			IncomingBlock {
1347				hash: target_header.hash(),
1348				header: Some(target_header),
1349				body: None,
1350				indexed_body: None,
1351				justifications: Some(justifications),
1352				origin: Some(request_peer_id.into()),
1353				allow_missing_state: true,
1354				skip_execution: true,
1355				import_existing: false,
1356				state: None,
1357			}
1358		);
1359		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1360	}
1361
1362	#[test]
1363	fn complete_warp_proof_advances_phase() {
1364		// Initialize logging
1365		subsoil::tracing::try_init_simple();
1366
1367		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1368		let mut provider = MockWarpSyncProvider::<Block>::new();
1369
1370		// Building both blocks manually. Can't use BlockBuilderBuilder to build one block on top of
1371		// another, if no genesis is set (required for warp sync to make sure finalized_state is not
1372		// set)
1373		let warp_synced_header = <<Block as BlockT>::Header as HeaderT>::new(
1374			1,
1375			Default::default(),
1376			Default::default(),
1377			client.chain_info().best_hash,
1378			Default::default(),
1379		);
1380
1381		// Build target_block on top of warp_synced_block
1382		let target_header = <<Block as BlockT>::Header as HeaderT>::new(
1383			2,
1384			Default::default(),
1385			Default::default(),
1386			warp_synced_header.hash(),
1387			Default::default(),
1388		);
1389		let warp_justifications = Justifications::new(vec![(TEST_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1390		let target_justifications =
1391			Justifications::new(vec![(TEST_ENGINE_ID, vec![6, 7, 8, 9, 10])]);
1392
1393		// Warp proof is complete and contains both blocks.
1394		let mut verifier = MockVerifier::<Block>::new();
1395		let context = client.info().genesis_hash;
1396		verifier.expect_next_proof_context().returning(move || context);
1397		let warp_synced_header_for_verify = warp_synced_header.clone();
1398		let warp_just_for_verify = warp_justifications.clone();
1399		let target_header_for_verify = target_header.clone();
1400		let target_just_for_verify = target_justifications.clone();
1401		verifier.expect_verify().return_once(move |_proof| {
1402			Ok(VerificationResult::Complete(
1403				target_header_for_verify.clone(),
1404				vec![
1405					(warp_synced_header_for_verify, warp_just_for_verify),
1406					(target_header_for_verify, target_just_for_verify),
1407				],
1408			))
1409		});
1410		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1411		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1412		let mut warp_sync = WarpSync::new(
1413			client,
1414			config,
1415			Some(ProtocolName::Static("")),
1416			Arc::new(MockBlockDownloader::new()),
1417			None,
1418		);
1419
1420		// Make sure we have enough peers to make a request.
1421		for best_number in 1..11 {
1422			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1423		}
1424		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1425
1426		let network_provider = NetworkServiceProvider::new();
1427		let network_handle = network_provider.handle();
1428
1429		// Consume `SendWarpProofRequest` action.
1430		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1431		assert_eq!(actions.len(), 1);
1432		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1433			panic!("Invalid action.");
1434		};
1435
1436		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1437
1438		assert_eq!(warp_sync.actions.len(), 1);
1439		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1440		else {
1441			panic!("Expected `ImportBlocks` action.");
1442		};
1443		assert_eq!(origin, BlockOrigin::WarpSync);
1444		// Only warp_synced_block should be in the import list; target_block is filtered out
1445		assert_eq!(blocks.len(), 1);
1446		let import_block = blocks.pop().unwrap();
1447		assert_eq!(
1448			import_block,
1449			IncomingBlock {
1450				hash: warp_synced_header.hash(),
1451				header: Some(warp_synced_header),
1452				body: None,
1453				indexed_body: None,
1454				justifications: Some(warp_justifications),
1455				origin: Some(request_peer_id.into()),
1456				allow_missing_state: true,
1457				skip_execution: true,
1458				import_existing: false,
1459				state: None,
1460			}
1461		);
1462		assert!(matches!(warp_sync.phase, Phase::TargetBlock(header) if header == target_header));
1463	}
1464
1465	#[test]
1466	fn no_target_block_requests_in_another_phase() {
1467		let client = mock_client_without_state();
1468		let mut provider = MockWarpSyncProvider::<Block>::new();
1469		let mut verifier = MockVerifier::<Block>::new();
1470		verifier.expect_next_proof_context().returning(|| Hash::random());
1471		verifier
1472			.expect_verify()
1473			.returning(|_| unreachable!("verify should not be called in this test"));
1474		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1475		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1476		let mut warp_sync = WarpSync::new(
1477			Arc::new(client),
1478			config,
1479			None,
1480			Arc::new(MockBlockDownloader::new()),
1481			None,
1482		);
1483
1484		// Make sure we have enough peers to make a request.
1485		for best_number in 1..11 {
1486			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1487		}
1488		// We are not in `Phase::TargetBlock`
1489		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1490
1491		// No request is made.
1492		assert!(warp_sync.target_block_request().is_none());
1493	}
1494
1495	#[test]
1496	fn target_block_request_is_correct() {
1497		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1498		let mut provider = MockWarpSyncProvider::<Block>::new();
1499		let mut verifier = MockVerifier::<Block>::new();
1500		let header_for_ctx = client.info().genesis_hash;
1501		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1502		let target_block = BlockBuilderBuilder::new(&*client)
1503			.on_parent_block(client.chain_info().best_hash)
1504			.with_parent_block_number(client.chain_info().best_number)
1505			.build()
1506			.unwrap()
1507			.build()
1508			.unwrap()
1509			.block;
1510		let target_header = target_block.header().clone();
1511		// Warp proof is complete.
1512		let header_for_verify = target_header.clone();
1513		verifier.expect_verify().return_once(move |_proof| {
1514			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1515		});
1516		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1517		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1518		let mut warp_sync =
1519			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1520
1521		// Make sure we have enough peers to make a request.
1522		for best_number in 1..11 {
1523			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1524		}
1525
1526		// Manually set `TargetBlock` phase.
1527		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1528
1529		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1530		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1531		assert_eq!(
1532			request.fields,
1533			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1534		);
1535		assert_eq!(request.max, Some(1));
1536	}
1537
1538	#[test]
1539	fn externally_set_target_block_is_requested() {
1540		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1541		let target_block = BlockBuilderBuilder::new(&*client)
1542			.on_parent_block(client.chain_info().best_hash)
1543			.with_parent_block_number(client.chain_info().best_number)
1544			.build()
1545			.unwrap()
1546			.build()
1547			.unwrap()
1548			.block;
1549		let target_header = target_block.header().clone();
1550		let config = WarpSyncConfig::WithTarget(target_header);
1551		let mut warp_sync =
1552			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1553
1554		// Make sure we have enough peers to make a request.
1555		for best_number in 1..11 {
1556			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1557		}
1558
1559		assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1560
1561		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1562		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1563		assert_eq!(
1564			request.fields,
1565			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1566		);
1567		assert_eq!(request.max, Some(1));
1568	}
1569
1570	#[test]
1571	fn no_parallel_target_block_requests() {
1572		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1573		let mut provider = MockWarpSyncProvider::<Block>::new();
1574		let mut verifier = MockVerifier::<Block>::new();
1575		let header_for_ctx = client.info().genesis_hash;
1576		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1577		let target_block = BlockBuilderBuilder::new(&*client)
1578			.on_parent_block(client.chain_info().best_hash)
1579			.with_parent_block_number(client.chain_info().best_number)
1580			.build()
1581			.unwrap()
1582			.build()
1583			.unwrap()
1584			.block;
1585		let target_header = target_block.header().clone();
1586		// Warp proof is complete.
1587		let header_for_verify = target_header.clone();
1588		verifier.expect_verify().return_once(move |_proof| {
1589			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1590		});
1591		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1592		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1593		let mut warp_sync =
1594			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1595
1596		// Make sure we have enough peers to make a request.
1597		for best_number in 1..11 {
1598			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1599		}
1600
1601		// Manually set `TargetBlock` phase.
1602		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1603
1604		// First target block request is made.
1605		assert!(warp_sync.target_block_request().is_some());
1606		// No parallel request is made.
1607		assert!(warp_sync.target_block_request().is_none());
1608	}
1609
1610	#[test]
1611	fn target_block_response_with_no_blocks_drops_peer() {
1612		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1613		let mut provider = MockWarpSyncProvider::<Block>::new();
1614		let mut verifier = MockVerifier::<Block>::new();
1615		let header_for_ctx = client.info().genesis_hash;
1616		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1617		let target_block = BlockBuilderBuilder::new(&*client)
1618			.on_parent_block(client.chain_info().best_hash)
1619			.with_parent_block_number(client.chain_info().best_number)
1620			.build()
1621			.unwrap()
1622			.build()
1623			.unwrap()
1624			.block;
1625		let target_header = target_block.header().clone();
1626		// Warp proof is complete.
1627		let header_for_verify = target_header.clone();
1628		verifier.expect_verify().return_once(move |_proof| {
1629			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1630		});
1631		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1632		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1633		let mut warp_sync =
1634			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1635
1636		// Make sure we have enough peers to make a request.
1637		for best_number in 1..11 {
1638			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1639		}
1640
1641		// Manually set `TargetBlock` phase.
1642		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1643
1644		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1645
1646		// Empty block response received.
1647		let response = Vec::new();
1648		// Peer is dropped.
1649		assert!(matches!(
1650			warp_sync.on_block_response_inner(peer_id, request, response),
1651			Err(BadPeer(id, _rep)) if id == peer_id,
1652		));
1653	}
1654
1655	#[test]
1656	fn target_block_response_with_extra_blocks_drops_peer() {
1657		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1658		let mut provider = MockWarpSyncProvider::<Block>::new();
1659		let mut verifier = MockVerifier::<Block>::new();
1660		let header_for_ctx = client.info().genesis_hash;
1661		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1662		let target_block = BlockBuilderBuilder::new(&*client)
1663			.on_parent_block(client.chain_info().best_hash)
1664			.with_parent_block_number(client.chain_info().best_number)
1665			.build()
1666			.unwrap()
1667			.build()
1668			.unwrap()
1669			.block;
1670
1671		let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1672			.on_parent_block(client.chain_info().best_hash)
1673			.with_parent_block_number(client.chain_info().best_number)
1674			.build()
1675			.unwrap();
1676		extra_block_builder
1677			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1678			.unwrap();
1679		let extra_block = extra_block_builder.build().unwrap().block;
1680
1681		let target_header = target_block.header().clone();
1682		// Warp proof is complete.
1683		let header_for_verify = target_header.clone();
1684		verifier.expect_verify().return_once(move |_proof| {
1685			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1686		});
1687		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1688		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1689		let mut warp_sync =
1690			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1691
1692		// Make sure we have enough peers to make a request.
1693		for best_number in 1..11 {
1694			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1695		}
1696
1697		// Manually set `TargetBlock` phase.
1698		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1699
1700		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1701
1702		// Block response with extra blocks received.
1703		let response = vec![
1704			BlockData::<Block> {
1705				hash: target_block.header().hash(),
1706				header: Some(target_block.header().clone()),
1707				body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1708				indexed_body: None,
1709				receipt: None,
1710				message_queue: None,
1711				justification: None,
1712				justifications: None,
1713			},
1714			BlockData::<Block> {
1715				hash: extra_block.header().hash(),
1716				header: Some(extra_block.header().clone()),
1717				body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1718				indexed_body: None,
1719				receipt: None,
1720				message_queue: None,
1721				justification: None,
1722				justifications: None,
1723			},
1724		];
1725		// Peer is dropped.
1726		assert!(matches!(
1727			warp_sync.on_block_response_inner(peer_id, request, response),
1728			Err(BadPeer(id, _rep)) if id == peer_id,
1729		));
1730	}
1731
1732	#[test]
1733	fn target_block_response_with_wrong_block_drops_peer() {
1734		subsoil::tracing::try_init_simple();
1735
1736		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1737		let mut provider = MockWarpSyncProvider::<Block>::new();
1738		let mut verifier = MockVerifier::<Block>::new();
1739		let header_for_ctx = client.info().genesis_hash;
1740		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1741		let target_block = BlockBuilderBuilder::new(&*client)
1742			.on_parent_block(client.chain_info().best_hash)
1743			.with_parent_block_number(client.chain_info().best_number)
1744			.build()
1745			.unwrap()
1746			.build()
1747			.unwrap()
1748			.block;
1749
1750		let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1751			.on_parent_block(client.chain_info().best_hash)
1752			.with_parent_block_number(client.chain_info().best_number)
1753			.build()
1754			.unwrap();
1755		wrong_block_builder
1756			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1757			.unwrap();
1758		let wrong_block = wrong_block_builder.build().unwrap().block;
1759
1760		let target_header = target_block.header().clone();
1761		// Warp proof is complete.
1762		let header_for_verify = target_header.clone();
1763		verifier.expect_verify().return_once(move |_proof| {
1764			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1765		});
1766		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1767		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1768		let mut warp_sync =
1769			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1770
1771		// Make sure we have enough peers to make a request.
1772		for best_number in 1..11 {
1773			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1774		}
1775
1776		// Manually set `TargetBlock` phase.
1777		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1778
1779		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1780
1781		// Wrong block received.
1782		let response = vec![BlockData::<Block> {
1783			hash: wrong_block.header().hash(),
1784			header: Some(wrong_block.header().clone()),
1785			body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1786			indexed_body: None,
1787			receipt: None,
1788			message_queue: None,
1789			justification: None,
1790			justifications: None,
1791		}];
1792		// Peer is dropped.
1793		assert!(matches!(
1794			warp_sync.on_block_response_inner(peer_id, request, response),
1795			Err(BadPeer(id, _rep)) if id == peer_id,
1796		));
1797	}
1798
1799	#[test]
1800	fn correct_target_block_response_sets_strategy_result() {
1801		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1802		let mut provider = MockWarpSyncProvider::<Block>::new();
1803		let mut verifier = MockVerifier::<Block>::new();
1804		let header_for_ctx = client.info().genesis_hash;
1805		verifier.expect_next_proof_context().returning(move || header_for_ctx);
1806		let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1807			.on_parent_block(client.chain_info().best_hash)
1808			.with_parent_block_number(client.chain_info().best_number)
1809			.build()
1810			.unwrap();
1811		target_block_builder
1812			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1813			.unwrap();
1814		let target_block = target_block_builder.build().unwrap().block;
1815		let target_header = target_block.header().clone();
1816		// Warp proof is complete.
1817		let header_for_verify = target_header.clone();
1818		verifier.expect_verify().return_once(move |_proof| {
1819			Ok(VerificationResult::Complete(header_for_verify, Default::default()))
1820		});
1821		provider.expect_create_verifier().return_once(move || Box::new(verifier));
1822		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1823		let mut warp_sync =
1824			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1825
1826		// Make sure we have enough peers to make a request.
1827		for best_number in 1..11 {
1828			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1829		}
1830
1831		// Manually set `TargetBlock` phase.
1832		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1833
1834		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1835
1836		// Correct block received.
1837		let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1838		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1839		let response = vec![BlockData::<Block> {
1840			hash: target_block.header().hash(),
1841			header: Some(target_block.header().clone()),
1842			body: body.clone(),
1843			indexed_body: None,
1844			receipt: None,
1845			message_queue: None,
1846			justification: None,
1847			justifications: justifications.clone(),
1848		}];
1849
1850		assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1851
1852		let network_provider = NetworkServiceProvider::new();
1853		let network_handle = network_provider.handle();
1854
1855		// Strategy finishes.
1856		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1857		assert_eq!(actions.len(), 1);
1858		assert!(matches!(actions[0], SyncingAction::Finished));
1859
1860		// With correct result.
1861		let result = warp_sync.take_result().unwrap();
1862		assert_eq!(result.target_header, *target_block.header());
1863		assert_eq!(result.target_body, body);
1864		assert_eq!(result.target_justifications, justifications);
1865	}
1866}