pezsc_network_sync/strategy/
warp.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Warp syncing strategy. Bootstraps chain by downloading warp proofs and state.
20
21use pezsc_consensus::IncomingBlock;
22use pezsp_consensus::BlockOrigin;
23pub use pezsp_consensus_grandpa::{AuthorityList, SetId};
24
25use crate::{
26	block_relay_protocol::{BlockDownloader, BlockResponseError},
27	service::network::NetworkServiceHandle,
28	strategy::{
29		chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers, StrategyKey,
30		SyncingAction,
31	},
32	types::{BadPeer, SyncState, SyncStatus},
33	LOG_TARGET,
34};
35use codec::{Decode, Encode};
36use futures::{channel::oneshot, FutureExt};
37use log::{debug, error, trace, warn};
38use pezsc_network::{IfDisconnected, ProtocolName};
39use pezsc_network_common::sync::message::{
40	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock,
41};
42use pezsc_network_types::PeerId;
43use pezsp_blockchain::HeaderBackend;
44use pezsp_runtime::{
45	traits::{Block as BlockT, Header, NumberFor, Zero},
46	Justifications, SaturatedConversion,
47};
48use std::{any::Any, collections::HashMap, fmt, sync::Arc};
49
50/// Number of peers that need to be connected before warp sync is started.
51const MIN_PEERS_TO_START_WARP_SYNC: usize = 3;
52
53/// Scale-encoded warp sync proof response.
54pub struct EncodedProof(pub Vec<u8>);
55
56/// Warp sync request
57#[derive(Encode, Decode, Debug, Clone)]
58pub struct WarpProofRequest<B: BlockT> {
59	/// Start collecting proofs from this block.
60	pub begin: B::Hash,
61}
62
63/// Proof verification result.
64pub enum VerificationResult<Block: BlockT> {
65	/// Proof is valid, but the target was not reached.
66	Partial(SetId, AuthorityList, Block::Hash, Vec<(Block::Header, Justifications)>),
67	/// Target finality is proved.
68	Complete(SetId, AuthorityList, Block::Header, Vec<(Block::Header, Justifications)>),
69}
70
71/// Warp sync backend. Handles retrieving and verifying warp sync proofs.
72pub trait WarpSyncProvider<Block: BlockT>: Send + Sync {
73	/// Generate proof starting at given block hash. The proof is accumulated until maximum proof
74	/// size is reached.
75	fn generate(
76		&self,
77		start: Block::Hash,
78	) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
79	/// Verify warp proof against current set of authorities.
80	fn verify(
81		&self,
82		proof: &EncodedProof,
83		set_id: SetId,
84		authorities: AuthorityList,
85	) -> Result<VerificationResult<Block>, Box<dyn std::error::Error + Send + Sync>>;
86	/// Get current list of authorities. This is supposed to be genesis authorities when starting
87	/// sync.
88	fn current_authorities(&self) -> AuthorityList;
89}
90
91mod rep {
92	use pezsc_network::ReputationChange as Rep;
93
94	/// Unexpected response received form a peer
95	pub const UNEXPECTED_RESPONSE: Rep = Rep::new(-(1 << 29), "Unexpected response");
96
97	/// Peer provided invalid warp proof data
98	pub const BAD_WARP_PROOF: Rep = Rep::new(-(1 << 29), "Bad warp proof");
99
100	/// Peer did not provide us with advertised block data.
101	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
102
103	/// Reputation change for peers which send us non-requested block data.
104	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
105
106	/// Reputation change for peers which send us a block which we fail to verify.
107	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
108
109	/// We received a message that failed to decode.
110	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
111}
112
113/// Reported warp sync phase.
114#[derive(Clone, Eq, PartialEq, Debug)]
115pub enum WarpSyncPhase<Block: BlockT> {
116	/// Waiting for peers to connect.
117	AwaitingPeers { required_peers: usize },
118	/// Downloading and verifying grandpa warp proofs.
119	DownloadingWarpProofs,
120	/// Downloading target block.
121	DownloadingTargetBlock,
122	/// Downloading state data.
123	DownloadingState,
124	/// Importing state.
125	ImportingState,
126	/// Downloading block history.
127	DownloadingBlocks(NumberFor<Block>),
128	/// Warp sync is complete.
129	Complete,
130}
131
132impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
133	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
134		match self {
135			Self::AwaitingPeers { required_peers } => {
136				write!(f, "Waiting for {required_peers} peers to be connected")
137			},
138			Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
139			Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
140			Self::DownloadingState => write!(f, "Downloading state"),
141			Self::ImportingState => write!(f, "Importing state"),
142			Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
143			Self::Complete => write!(f, "Warp sync is complete"),
144		}
145	}
146}
147
148/// Reported warp sync progress.
149#[derive(Clone, Eq, PartialEq, Debug)]
150pub struct WarpSyncProgress<Block: BlockT> {
151	/// Estimated download percentage.
152	pub phase: WarpSyncPhase<Block>,
153	/// Total bytes downloaded so far.
154	pub total_bytes: u64,
155}
156
157/// Warp sync configuration as accepted by [`WarpSync`].
158pub enum WarpSyncConfig<Block: BlockT> {
159	/// Standard warp sync for the chain.
160	WithProvider(Arc<dyn WarpSyncProvider<Block>>),
161	/// Skip downloading proofs and use provided header of the state that should be downloaded.
162	///
163	/// It is expected that the header provider ensures that the header is trusted.
164	WithTarget(<Block as BlockT>::Header),
165}
166
167/// Warp sync phase used by warp sync state machine.
168enum Phase<B: BlockT> {
169	/// Waiting for enough peers to connect.
170	WaitingForPeers { warp_sync_provider: Arc<dyn WarpSyncProvider<B>> },
171	/// Downloading warp proofs.
172	WarpProof {
173		set_id: SetId,
174		authorities: AuthorityList,
175		last_hash: B::Hash,
176		warp_sync_provider: Arc<dyn WarpSyncProvider<B>>,
177	},
178	/// Downloading target block.
179	TargetBlock(B::Header),
180	/// Warp sync is complete.
181	Complete,
182}
183
184enum PeerState {
185	Available,
186	DownloadingProofs,
187	DownloadingTargetBlock,
188}
189
190impl PeerState {
191	fn is_available(&self) -> bool {
192		matches!(self, PeerState::Available)
193	}
194}
195
196struct Peer<B: BlockT> {
197	best_number: NumberFor<B>,
198	state: PeerState,
199}
200
201pub struct WarpSyncResult<B: BlockT> {
202	pub target_header: B::Header,
203	pub target_body: Option<Vec<B::Extrinsic>>,
204	pub target_justifications: Option<Justifications>,
205}
206
207/// Warp sync state machine. Accumulates warp proofs and state.
208pub struct WarpSync<B: BlockT, Client> {
209	phase: Phase<B>,
210	client: Arc<Client>,
211	total_proof_bytes: u64,
212	total_state_bytes: u64,
213	peers: HashMap<PeerId, Peer<B>>,
214	disconnected_peers: DisconnectedPeers,
215	protocol_name: Option<ProtocolName>,
216	block_downloader: Arc<dyn BlockDownloader<B>>,
217	actions: Vec<SyncingAction<B>>,
218	result: Option<WarpSyncResult<B>>,
219	/// Number of peers that need to be connected before warp sync is started.
220	min_peers_to_start_warp_sync: usize,
221}
222
223impl<B, Client> WarpSync<B, Client>
224where
225	B: BlockT,
226	Client: HeaderBackend<B> + 'static,
227{
228	/// Strategy key used by warp sync.
229	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("Warp");
230
231	/// Create a new instance. When passing a warp sync provider we will be checking for proof and
232	/// authorities. Alternatively we can pass a target block when we want to skip downloading
233	/// proofs, in this case we will continue polling until the target block is known.
234	pub fn new(
235		client: Arc<Client>,
236		warp_sync_config: WarpSyncConfig<B>,
237		protocol_name: Option<ProtocolName>,
238		block_downloader: Arc<dyn BlockDownloader<B>>,
239		min_peers_to_start_warp_sync: Option<usize>,
240	) -> Self {
241		let min_peers_to_start_warp_sync =
242			min_peers_to_start_warp_sync.unwrap_or(MIN_PEERS_TO_START_WARP_SYNC);
243		if client.info().finalized_state.is_some() {
244			error!(
245				target: LOG_TARGET,
246				"Can't use warp sync mode with a partially synced database. Reverting to full sync mode."
247			);
248			return Self {
249				client,
250				phase: Phase::Complete,
251				total_proof_bytes: 0,
252				total_state_bytes: 0,
253				peers: HashMap::new(),
254				disconnected_peers: DisconnectedPeers::new(),
255				protocol_name,
256				block_downloader,
257				actions: vec![SyncingAction::Finished],
258				result: None,
259				min_peers_to_start_warp_sync,
260			};
261		}
262
263		let phase = match warp_sync_config {
264			WarpSyncConfig::WithProvider(warp_sync_provider) => {
265				Phase::WaitingForPeers { warp_sync_provider }
266			},
267			WarpSyncConfig::WithTarget(target_header) => Phase::TargetBlock(target_header),
268		};
269
270		Self {
271			client,
272			phase,
273			total_proof_bytes: 0,
274			total_state_bytes: 0,
275			peers: HashMap::new(),
276			disconnected_peers: DisconnectedPeers::new(),
277			protocol_name,
278			block_downloader,
279			actions: Vec::new(),
280			result: None,
281			min_peers_to_start_warp_sync,
282		}
283	}
284
285	/// Notify that a new peer has connected.
286	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
287		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
288
289		self.try_to_start_warp_sync();
290	}
291
292	/// Notify that a peer has disconnected.
293	pub fn remove_peer(&mut self, peer_id: &PeerId) {
294		if let Some(state) = self.peers.remove(peer_id) {
295			if !state.state.is_available() {
296				if let Some(bad_peer) =
297					self.disconnected_peers.on_disconnect_during_request(*peer_id)
298				{
299					self.actions.push(SyncingAction::DropPeer(bad_peer));
300				}
301			}
302		}
303	}
304
305	/// Submit a validated block announcement.
306	///
307	/// Returns new best hash & best number of the peer if they are updated.
308	#[must_use]
309	pub fn on_validated_block_announce(
310		&mut self,
311		is_best: bool,
312		peer_id: PeerId,
313		announce: &BlockAnnounce<B::Header>,
314	) -> Option<(B::Hash, NumberFor<B>)> {
315		is_best.then(|| {
316			let best_number = *announce.header.number();
317			let best_hash = announce.header.hash();
318			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
319				peer.best_number = best_number;
320			}
321			// Let `SyncingEngine` know that we should update the peer info.
322			(best_hash, best_number)
323		})
324	}
325
326	/// Start warp sync as soon as we have enough peers.
327	fn try_to_start_warp_sync(&mut self) {
328		let Phase::WaitingForPeers { warp_sync_provider } = &self.phase else { return };
329
330		if self.peers.len() < self.min_peers_to_start_warp_sync {
331			return;
332		}
333
334		self.phase = Phase::WarpProof {
335			set_id: 0,
336			authorities: warp_sync_provider.current_authorities(),
337			last_hash: self.client.info().genesis_hash,
338			warp_sync_provider: Arc::clone(warp_sync_provider),
339		};
340		trace!(target: LOG_TARGET, "Started warp sync with {} peers.", self.peers.len());
341	}
342
343	pub fn on_generic_response(
344		&mut self,
345		peer_id: &PeerId,
346		protocol_name: ProtocolName,
347		response: Box<dyn Any + Send>,
348	) {
349		if &protocol_name == self.block_downloader.protocol_name() {
350			let Ok(response) = response
351				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
352			else {
353				warn!(target: LOG_TARGET, "Failed to downcast block response");
354				debug_assert!(false);
355				return;
356			};
357
358			let (request, response) = *response;
359			let blocks = match response {
360				Ok(blocks) => blocks,
361				Err(BlockResponseError::DecodeFailed(e)) => {
362					debug!(
363						target: LOG_TARGET,
364						"Failed to decode block response from peer {:?}: {:?}.",
365						peer_id,
366						e
367					);
368					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
369					return;
370				},
371				Err(BlockResponseError::ExtractionFailed(e)) => {
372					debug!(
373						target: LOG_TARGET,
374						"Failed to extract blocks from peer response {:?}: {:?}.",
375						peer_id,
376						e
377					);
378					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
379					return;
380				},
381			};
382
383			self.on_block_response(*peer_id, request, blocks);
384		} else {
385			let Ok(response) = response.downcast::<Vec<u8>>() else {
386				warn!(target: LOG_TARGET, "Failed to downcast warp sync response");
387				debug_assert!(false);
388				return;
389			};
390
391			self.on_warp_proof_response(peer_id, EncodedProof(*response));
392		}
393	}
394
395	/// Process warp proof response.
396	pub fn on_warp_proof_response(&mut self, peer_id: &PeerId, response: EncodedProof) {
397		if let Some(peer) = self.peers.get_mut(peer_id) {
398			peer.state = PeerState::Available;
399		}
400
401		let Phase::WarpProof { set_id, authorities, last_hash, warp_sync_provider } =
402			&mut self.phase
403		else {
404			debug!(target: LOG_TARGET, "Unexpected warp proof response");
405			self.actions
406				.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::UNEXPECTED_RESPONSE)));
407			return;
408		};
409
410		let proof_to_incoming_block =
411			|(header, justifications): (B::Header, Justifications)| -> IncomingBlock<B> {
412				IncomingBlock {
413					hash: header.hash(),
414					header: Some(header),
415					body: None,
416					indexed_body: None,
417					justifications: Some(justifications),
418					origin: Some(*peer_id),
419					// We are still in warp sync, so we don't have the state. This means
420					// we also can't execute the block.
421					allow_missing_state: true,
422					skip_execution: true,
423					// Shouldn't already exist in the database.
424					import_existing: false,
425					state: None,
426				}
427			};
428
429		match warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
430			Err(e) => {
431				debug!(target: LOG_TARGET, "Bad warp proof response: {}", e);
432				self.actions
433					.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_WARP_PROOF)))
434			},
435			Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash, proofs)) => {
436				log::debug!(target: LOG_TARGET, "Verified partial proof, set_id={:?}", new_set_id);
437				*set_id = new_set_id;
438				*authorities = new_authorities;
439				*last_hash = new_last_hash;
440				self.total_proof_bytes += response.0.len() as u64;
441				self.actions.push(SyncingAction::ImportBlocks {
442					origin: BlockOrigin::NetworkInitialSync,
443					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
444				});
445			},
446			Ok(VerificationResult::Complete(new_set_id, _, header, proofs)) => {
447				log::debug!(
448					target: LOG_TARGET,
449					"Verified complete proof, set_id={:?}. Continuing with target block download: {} ({}).",
450					new_set_id,
451					header.hash(),
452					header.number(),
453				);
454				self.total_proof_bytes += response.0.len() as u64;
455				self.phase = Phase::TargetBlock(header);
456				self.actions.push(SyncingAction::ImportBlocks {
457					origin: BlockOrigin::NetworkInitialSync,
458					blocks: proofs.into_iter().map(proof_to_incoming_block).collect(),
459				});
460			},
461		}
462	}
463
464	/// Process (target) block response.
465	pub fn on_block_response(
466		&mut self,
467		peer_id: PeerId,
468		request: BlockRequest<B>,
469		blocks: Vec<BlockData<B>>,
470	) {
471		if let Err(bad_peer) = self.on_block_response_inner(peer_id, request, blocks) {
472			self.actions.push(SyncingAction::DropPeer(bad_peer));
473		}
474	}
475
476	fn on_block_response_inner(
477		&mut self,
478		peer_id: PeerId,
479		request: BlockRequest<B>,
480		mut blocks: Vec<BlockData<B>>,
481	) -> Result<(), BadPeer> {
482		if let Some(peer) = self.peers.get_mut(&peer_id) {
483			peer.state = PeerState::Available;
484		}
485
486		let Phase::TargetBlock(header) = &mut self.phase else {
487			debug!(target: LOG_TARGET, "Unexpected target block response from {peer_id}");
488			return Err(BadPeer(peer_id, rep::UNEXPECTED_RESPONSE));
489		};
490
491		if blocks.is_empty() {
492			debug!(
493				target: LOG_TARGET,
494				"Downloading target block failed: empty block response from {peer_id}",
495			);
496			return Err(BadPeer(peer_id, rep::NO_BLOCK));
497		}
498
499		if blocks.len() > 1 {
500			debug!(
501				target: LOG_TARGET,
502				"Too many blocks ({}) in warp target block response from {peer_id}",
503				blocks.len(),
504			);
505			return Err(BadPeer(peer_id, rep::NOT_REQUESTED));
506		}
507
508		validate_blocks::<B>(&blocks, &peer_id, Some(request))?;
509
510		let block = blocks.pop().expect("`blocks` len checked above; qed");
511
512		let Some(block_header) = &block.header else {
513			debug!(
514				target: LOG_TARGET,
515				"Downloading target block failed: missing header in response from {peer_id}.",
516			);
517			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
518		};
519
520		if block_header != header {
521			debug!(
522				target: LOG_TARGET,
523				"Downloading target block failed: different header in response from {peer_id}.",
524			);
525			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
526		}
527
528		if block.body.is_none() {
529			debug!(
530				target: LOG_TARGET,
531				"Downloading target block failed: missing body in response from {peer_id}.",
532			);
533			return Err(BadPeer(peer_id, rep::VERIFICATION_FAIL));
534		}
535
536		self.result = Some(WarpSyncResult {
537			target_header: header.clone(),
538			target_body: block.body,
539			target_justifications: block.justifications,
540		});
541		self.phase = Phase::Complete;
542		self.actions.push(SyncingAction::Finished);
543		Ok(())
544	}
545
546	/// Reserve a peer for a request assigning `new_state`.
547	fn schedule_next_peer(
548		&mut self,
549		new_state: PeerState,
550		min_best_number: Option<NumberFor<B>>,
551	) -> Option<PeerId> {
552		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
553		if targets.is_empty() {
554			return None;
555		}
556		targets.sort();
557		let median = targets[targets.len() / 2];
558		let threshold = std::cmp::max(median, min_best_number.unwrap_or(Zero::zero()));
559		// Find a random peer that is synced as much as peer majority and is above
560		// `min_best_number`.
561		for (peer_id, peer) in self.peers.iter_mut() {
562			if peer.state.is_available()
563				&& peer.best_number >= threshold
564				&& self.disconnected_peers.is_peer_available(peer_id)
565			{
566				peer.state = new_state;
567				return Some(*peer_id);
568			}
569		}
570		None
571	}
572
573	/// Produce warp proof request.
574	fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest<B>)> {
575		let Phase::WarpProof { last_hash, .. } = &self.phase else { return None };
576
577		// Copy `last_hash` early to cut the borrowing tie.
578		let begin = *last_hash;
579
580		if self
581			.peers
582			.values()
583			.any(|peer| matches!(peer.state, PeerState::DownloadingProofs))
584		{
585			// Only one warp proof request at a time is possible.
586			return None;
587		}
588
589		let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?;
590		trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}.");
591
592		let request = WarpProofRequest { begin };
593
594		let Some(protocol_name) = self.protocol_name.clone() else {
595			warn!(
596				target: LOG_TARGET,
597				"Trying to send warp sync request when no protocol is configured {request:?}",
598			);
599			return None;
600		};
601
602		Some((peer_id, protocol_name, request))
603	}
604
605	/// Produce target block request.
606	fn target_block_request(&mut self) -> Option<(PeerId, BlockRequest<B>)> {
607		let Phase::TargetBlock(target_header) = &self.phase else { return None };
608
609		if self
610			.peers
611			.values()
612			.any(|peer| matches!(peer.state, PeerState::DownloadingTargetBlock))
613		{
614			// Only one target block request at a time is possible.
615			return None;
616		}
617
618		// Cut the borrowing tie.
619		let target_hash = target_header.hash();
620		let target_number = *target_header.number();
621
622		let peer_id =
623			self.schedule_next_peer(PeerState::DownloadingTargetBlock, Some(target_number))?;
624
625		trace!(
626			target: LOG_TARGET,
627			"New target block request to {peer_id}, target: {} ({}).",
628			target_hash,
629			target_number,
630		);
631
632		Some((
633			peer_id,
634			BlockRequest::<B> {
635				id: 0,
636				fields: BlockAttributes::HEADER
637					| BlockAttributes::BODY
638					| BlockAttributes::JUSTIFICATION,
639				from: FromBlock::Hash(target_hash),
640				direction: Direction::Ascending,
641				max: Some(1),
642			},
643		))
644	}
645
646	/// Returns warp sync estimated progress (stage, bytes received).
647	pub fn progress(&self) -> WarpSyncProgress<B> {
648		match &self.phase {
649			Phase::WaitingForPeers { .. } => WarpSyncProgress {
650				phase: WarpSyncPhase::AwaitingPeers {
651					required_peers: self.min_peers_to_start_warp_sync,
652				},
653				total_bytes: self.total_proof_bytes,
654			},
655			Phase::WarpProof { .. } => WarpSyncProgress {
656				phase: WarpSyncPhase::DownloadingWarpProofs,
657				total_bytes: self.total_proof_bytes,
658			},
659			Phase::TargetBlock(_) => WarpSyncProgress {
660				phase: WarpSyncPhase::DownloadingTargetBlock,
661				total_bytes: self.total_proof_bytes,
662			},
663			Phase::Complete => WarpSyncProgress {
664				phase: WarpSyncPhase::Complete,
665				total_bytes: self.total_proof_bytes + self.total_state_bytes,
666			},
667		}
668	}
669
670	/// Get the number of peers known to warp sync.
671	pub fn num_peers(&self) -> usize {
672		self.peers.len()
673	}
674
675	/// Returns the current sync status.
676	pub fn status(&self) -> SyncStatus<B> {
677		SyncStatus {
678			state: match &self.phase {
679				Phase::WaitingForPeers { .. } => SyncState::Downloading { target: Zero::zero() },
680				Phase::WarpProof { .. } => SyncState::Downloading { target: Zero::zero() },
681				Phase::TargetBlock(header) => SyncState::Downloading { target: *header.number() },
682				Phase::Complete => SyncState::Idle,
683			},
684			best_seen_block: match &self.phase {
685				Phase::WaitingForPeers { .. } => None,
686				Phase::WarpProof { .. } => None,
687				Phase::TargetBlock(header) => Some(*header.number()),
688				Phase::Complete => None,
689			},
690			num_peers: self.peers.len().saturated_into(),
691			queued_blocks: 0,
692			state_sync: None,
693			warp_sync: Some(self.progress()),
694		}
695	}
696
697	/// Get actions that should be performed by the owner on [`WarpSync`]'s behalf
698	#[must_use]
699	pub fn actions(
700		&mut self,
701		network_service: &NetworkServiceHandle,
702	) -> impl Iterator<Item = SyncingAction<B>> {
703		let warp_proof_request =
704			self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| {
705				trace!(
706					target: LOG_TARGET,
707					"Created `WarpProofRequest` to {}, request: {:?}.",
708					peer_id,
709					request,
710				);
711
712				let (tx, rx) = oneshot::channel();
713
714				network_service.start_request(
715					peer_id,
716					protocol_name,
717					request.encode(),
718					tx,
719					IfDisconnected::ImmediateError,
720				);
721
722				SyncingAction::StartRequest {
723					peer_id,
724					key: Self::STRATEGY_KEY,
725					request: async move {
726						Ok(rx.await?.and_then(|(response, protocol_name)| {
727							Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
728						}))
729					}
730					.boxed(),
731					remove_obsolete: false,
732				}
733			});
734		self.actions.extend(warp_proof_request);
735
736		let target_block_request =
737			self.target_block_request().into_iter().map(|(peer_id, request)| {
738				let downloader = self.block_downloader.clone();
739
740				SyncingAction::StartRequest {
741					peer_id,
742					key: Self::STRATEGY_KEY,
743					request: async move {
744						Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
745							|(response, protocol_name)| {
746								let decoded_response =
747									downloader.block_response_into_blocks(&request, response);
748								let result =
749									Box::new((request, decoded_response)) as Box<dyn Any + Send>;
750								Ok((result, protocol_name))
751							},
752						))
753					}
754					.boxed(),
755					// Sending block request implies dropping obsolete pending response as we are
756					// not interested in it anymore.
757					remove_obsolete: true,
758				}
759			});
760		self.actions.extend(target_block_request);
761
762		std::mem::take(&mut self.actions).into_iter()
763	}
764
765	/// Take the result of finished warp sync, returning `None` if the sync was unsuccessful.
766	#[must_use]
767	pub fn take_result(&mut self) -> Option<WarpSyncResult<B>> {
768		self.result.take()
769	}
770}
771
772#[cfg(test)]
773mod test {
774	use super::*;
775	use crate::{mock::MockBlockDownloader, service::network::NetworkServiceProvider};
776	use bizinikiwi_test_runtime_client::{
777		runtime::{Block, Hash},
778		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
779	};
780	use pezsc_block_builder::BlockBuilderBuilder;
781	use pezsp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info};
782	use pezsp_consensus_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID};
783	use pezsp_core::H256;
784	use pezsp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
785	use std::{io::ErrorKind, sync::Arc};
786
787	mockall::mock! {
788		pub Client<B: BlockT> {}
789
790		impl<B: BlockT> HeaderBackend<B> for Client<B> {
791			fn header(&self, hash: B::Hash) -> Result<Option<B::Header>, BlockchainError>;
792			fn info(&self) -> Info<B>;
793			fn status(&self, hash: B::Hash) -> Result<BlockStatus, BlockchainError>;
794			fn number(
795				&self,
796				hash: B::Hash,
797			) -> Result<Option<<<B as BlockT>::Header as HeaderT>::Number>, BlockchainError>;
798			fn hash(&self, number: NumberFor<B>) -> Result<Option<B::Hash>, BlockchainError>;
799		}
800	}
801
802	mockall::mock! {
803		pub WarpSyncProvider<B: BlockT> {}
804
805		impl<B: BlockT> super::WarpSyncProvider<B> for WarpSyncProvider<B> {
806			fn generate(
807				&self,
808				start: B::Hash,
809			) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
810			fn verify(
811				&self,
812				proof: &EncodedProof,
813				set_id: SetId,
814				authorities: AuthorityList,
815			) -> Result<VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>;
816			fn current_authorities(&self) -> AuthorityList;
817		}
818	}
819
820	fn mock_client_with_state() -> MockClient<Block> {
821		let mut client = MockClient::<Block>::new();
822		let genesis_hash = Hash::random();
823		client.expect_info().return_once(move || Info {
824			best_hash: genesis_hash,
825			best_number: 0,
826			genesis_hash,
827			finalized_hash: genesis_hash,
828			finalized_number: 0,
829			// We need some finalized state to render warp sync impossible.
830			finalized_state: Some((genesis_hash, 0)),
831			number_leaves: 0,
832			block_gap: None,
833		});
834
835		client
836	}
837
838	fn mock_client_without_state() -> MockClient<Block> {
839		let mut client = MockClient::<Block>::new();
840		let genesis_hash = Hash::random();
841		client.expect_info().returning(move || Info {
842			best_hash: genesis_hash,
843			best_number: 0,
844			genesis_hash,
845			finalized_hash: genesis_hash,
846			finalized_number: 0,
847			finalized_state: None,
848			number_leaves: 0,
849			block_gap: None,
850		});
851
852		client
853	}
854
855	#[test]
856	fn warp_sync_with_provider_for_db_with_finalized_state_is_noop() {
857		let client = mock_client_with_state();
858		let provider = MockWarpSyncProvider::<Block>::new();
859		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
860		let mut warp_sync = WarpSync::new(
861			Arc::new(client),
862			config,
863			None,
864			Arc::new(MockBlockDownloader::new()),
865			None,
866		);
867
868		let network_provider = NetworkServiceProvider::new();
869		let network_handle = network_provider.handle();
870
871		// Warp sync instantly finishes
872		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
873		assert_eq!(actions.len(), 1);
874		assert!(matches!(actions[0], SyncingAction::Finished));
875
876		// ... with no result.
877		assert!(warp_sync.take_result().is_none());
878	}
879
880	#[test]
881	fn warp_sync_to_target_for_db_with_finalized_state_is_noop() {
882		let client = mock_client_with_state();
883		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
884			1,
885			Default::default(),
886			Default::default(),
887			Default::default(),
888			Default::default(),
889		));
890		let mut warp_sync = WarpSync::new(
891			Arc::new(client),
892			config,
893			None,
894			Arc::new(MockBlockDownloader::new()),
895			None,
896		);
897
898		let network_provider = NetworkServiceProvider::new();
899		let network_handle = network_provider.handle();
900
901		// Warp sync instantly finishes
902		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
903		assert_eq!(actions.len(), 1);
904		assert!(matches!(actions[0], SyncingAction::Finished));
905
906		// ... with no result.
907		assert!(warp_sync.take_result().is_none());
908	}
909
910	#[test]
911	fn warp_sync_with_provider_for_empty_db_doesnt_finish_instantly() {
912		let client = mock_client_without_state();
913		let provider = MockWarpSyncProvider::<Block>::new();
914		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
915		let mut warp_sync = WarpSync::new(
916			Arc::new(client),
917			config,
918			None,
919			Arc::new(MockBlockDownloader::new()),
920			None,
921		);
922
923		let network_provider = NetworkServiceProvider::new();
924		let network_handle = network_provider.handle();
925
926		// No actions are emitted.
927		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
928	}
929
930	#[test]
931	fn warp_sync_to_target_for_empty_db_doesnt_finish_instantly() {
932		let client = mock_client_without_state();
933		let config = WarpSyncConfig::WithTarget(<Block as BlockT>::Header::new(
934			1,
935			Default::default(),
936			Default::default(),
937			Default::default(),
938			Default::default(),
939		));
940		let mut warp_sync = WarpSync::new(
941			Arc::new(client),
942			config,
943			None,
944			Arc::new(MockBlockDownloader::new()),
945			None,
946		);
947
948		let network_provider = NetworkServiceProvider::new();
949		let network_handle = network_provider.handle();
950
951		// No actions are emitted.
952		assert_eq!(warp_sync.actions(&network_handle).count(), 0)
953	}
954
955	#[test]
956	fn warp_sync_is_started_only_when_there_is_enough_peers() {
957		let client = mock_client_without_state();
958		let mut provider = MockWarpSyncProvider::<Block>::new();
959		provider
960			.expect_current_authorities()
961			.once()
962			.return_const(AuthorityList::default());
963		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
964		let mut warp_sync = WarpSync::new(
965			Arc::new(client),
966			config,
967			None,
968			Arc::new(MockBlockDownloader::new()),
969			None,
970		);
971
972		// Warp sync is not started when there is not enough peers.
973		for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) {
974			warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
975			assert!(matches!(warp_sync.phase, Phase::WaitingForPeers { .. }))
976		}
977
978		// Now we have enough peers and warp sync is started.
979		warp_sync.add_peer(PeerId::random(), Hash::random(), 10);
980		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }))
981	}
982
983	#[test]
984	fn no_peer_is_scheduled_if_no_peers_connected() {
985		let client = mock_client_without_state();
986		let provider = MockWarpSyncProvider::<Block>::new();
987		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
988		let mut warp_sync = WarpSync::new(
989			Arc::new(client),
990			config,
991			None,
992			Arc::new(MockBlockDownloader::new()),
993			None,
994		);
995
996		assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none());
997	}
998
999	#[test]
1000	fn enough_peers_are_used_in_tests() {
1001		// Tests below use 10 peers. Fail early if it's less than a threshold for warp sync.
1002		assert!(
1003			10 >= MIN_PEERS_TO_START_WARP_SYNC,
1004			"Tests must be updated to use that many initial peers.",
1005		);
1006	}
1007
1008	#[test]
1009	fn at_least_median_synced_peer_is_scheduled() {
1010		for _ in 0..100 {
1011			let client = mock_client_without_state();
1012			let mut provider = MockWarpSyncProvider::<Block>::new();
1013			provider
1014				.expect_current_authorities()
1015				.once()
1016				.return_const(AuthorityList::default());
1017			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1018			let mut warp_sync = WarpSync::new(
1019				Arc::new(client),
1020				config,
1021				None,
1022				Arc::new(MockBlockDownloader::new()),
1023				None,
1024			);
1025
1026			for best_number in 1..11 {
1027				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1028			}
1029
1030			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None);
1031			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number >= 6);
1032		}
1033	}
1034
1035	#[test]
1036	fn min_best_number_peer_is_scheduled() {
1037		for _ in 0..10 {
1038			let client = mock_client_without_state();
1039			let mut provider = MockWarpSyncProvider::<Block>::new();
1040			provider
1041				.expect_current_authorities()
1042				.once()
1043				.return_const(AuthorityList::default());
1044			let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1045			let mut warp_sync = WarpSync::new(
1046				Arc::new(client),
1047				config,
1048				None,
1049				Arc::new(MockBlockDownloader::new()),
1050				None,
1051			);
1052
1053			for best_number in 1..11 {
1054				warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1055			}
1056
1057			let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1058			assert!(warp_sync.peers.get(&peer_id.unwrap()).unwrap().best_number == 10);
1059		}
1060	}
1061
1062	#[test]
1063	fn backedoff_number_peer_is_not_scheduled() {
1064		let client = mock_client_without_state();
1065		let mut provider = MockWarpSyncProvider::<Block>::new();
1066		provider
1067			.expect_current_authorities()
1068			.once()
1069			.return_const(AuthorityList::default());
1070		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1071		let mut warp_sync = WarpSync::new(
1072			Arc::new(client),
1073			config,
1074			None,
1075			Arc::new(MockBlockDownloader::new()),
1076			None,
1077		);
1078
1079		for best_number in 1..11 {
1080			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1081		}
1082
1083		let ninth_peer =
1084			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0;
1085		let tenth_peer =
1086			*warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0;
1087
1088		// Disconnecting a peer without an inflight request has no effect on persistent states.
1089		warp_sync.remove_peer(&tenth_peer);
1090		assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1091
1092		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1093		let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1094		assert_eq!(tenth_peer, peer_id.unwrap());
1095		warp_sync.remove_peer(&tenth_peer);
1096
1097		// Peer is backed off.
1098		assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer));
1099
1100		// No peer available for 10'th best block because of the backoff.
1101		warp_sync.add_peer(tenth_peer, H256::random(), 10);
1102		let peer_id: Option<PeerId> =
1103			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10));
1104		assert!(peer_id.is_none());
1105
1106		// Other requests can still happen.
1107		let peer_id: Option<PeerId> =
1108			warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9));
1109		assert_eq!(ninth_peer, peer_id.unwrap());
1110	}
1111
1112	#[test]
1113	fn no_warp_proof_request_in_another_phase() {
1114		let client = mock_client_without_state();
1115		let mut provider = MockWarpSyncProvider::<Block>::new();
1116		provider
1117			.expect_current_authorities()
1118			.once()
1119			.return_const(AuthorityList::default());
1120		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1121		let mut warp_sync = WarpSync::new(
1122			Arc::new(client),
1123			config,
1124			Some(ProtocolName::Static("")),
1125			Arc::new(MockBlockDownloader::new()),
1126			None,
1127		);
1128
1129		// Make sure we have enough peers to make a request.
1130		for best_number in 1..11 {
1131			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1132		}
1133
1134		// Manually set to another phase.
1135		warp_sync.phase = Phase::TargetBlock(<Block as BlockT>::Header::new(
1136			1,
1137			Default::default(),
1138			Default::default(),
1139			Default::default(),
1140			Default::default(),
1141		));
1142
1143		// No request is made.
1144		assert!(warp_sync.warp_proof_request().is_none());
1145	}
1146
1147	#[test]
1148	fn warp_proof_request_starts_at_last_hash() {
1149		let client = mock_client_without_state();
1150		let mut provider = MockWarpSyncProvider::<Block>::new();
1151		provider
1152			.expect_current_authorities()
1153			.once()
1154			.return_const(AuthorityList::default());
1155		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1156		let mut warp_sync = WarpSync::new(
1157			Arc::new(client),
1158			config,
1159			Some(ProtocolName::Static("")),
1160			Arc::new(MockBlockDownloader::new()),
1161			None,
1162		);
1163
1164		// Make sure we have enough peers to make a request.
1165		for best_number in 1..11 {
1166			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1167		}
1168		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1169
1170		let known_last_hash = Hash::random();
1171
1172		// Manually set last hash to known value.
1173		match &mut warp_sync.phase {
1174			Phase::WarpProof { last_hash, .. } => {
1175				*last_hash = known_last_hash;
1176			},
1177			_ => panic!("Invalid phase."),
1178		}
1179
1180		let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap();
1181		assert_eq!(request.begin, known_last_hash);
1182	}
1183
1184	#[test]
1185	fn no_parallel_warp_proof_requests() {
1186		let client = mock_client_without_state();
1187		let mut provider = MockWarpSyncProvider::<Block>::new();
1188		provider
1189			.expect_current_authorities()
1190			.once()
1191			.return_const(AuthorityList::default());
1192		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1193		let mut warp_sync = WarpSync::new(
1194			Arc::new(client),
1195			config,
1196			Some(ProtocolName::Static("")),
1197			Arc::new(MockBlockDownloader::new()),
1198			None,
1199		);
1200
1201		// Make sure we have enough peers to make requests.
1202		for best_number in 1..11 {
1203			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1204		}
1205		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1206
1207		// First request is made.
1208		assert!(warp_sync.warp_proof_request().is_some());
1209		// Second request is not made.
1210		assert!(warp_sync.warp_proof_request().is_none());
1211	}
1212
1213	#[test]
1214	fn bad_warp_proof_response_drops_peer() {
1215		let client = mock_client_without_state();
1216		let mut provider = MockWarpSyncProvider::<Block>::new();
1217		provider
1218			.expect_current_authorities()
1219			.once()
1220			.return_const(AuthorityList::default());
1221		// Warp proof verification fails.
1222		provider.expect_verify().return_once(|_proof, _set_id, _authorities| {
1223			Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure")))
1224		});
1225		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1226		let mut warp_sync = WarpSync::new(
1227			Arc::new(client),
1228			config,
1229			Some(ProtocolName::Static("")),
1230			Arc::new(MockBlockDownloader::new()),
1231			None,
1232		);
1233
1234		// Make sure we have enough peers to make a request.
1235		for best_number in 1..11 {
1236			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1237		}
1238		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1239
1240		let network_provider = NetworkServiceProvider::new();
1241		let network_handle = network_provider.handle();
1242
1243		// Consume `SendWarpProofRequest` action.
1244		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1245		assert_eq!(actions.len(), 1);
1246		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1247			panic!("Invalid action");
1248		};
1249
1250		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1251
1252		// We only interested in already generated actions, not new requests.
1253		let actions = std::mem::take(&mut warp_sync.actions);
1254		assert_eq!(actions.len(), 1);
1255		assert!(matches!(
1256			actions[0],
1257			SyncingAction::DropPeer(BadPeer(peer_id, _rep)) if peer_id == request_peer_id
1258		));
1259		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1260	}
1261
1262	#[test]
1263	fn partial_warp_proof_doesnt_advance_phase() {
1264		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1265		let mut provider = MockWarpSyncProvider::<Block>::new();
1266		provider
1267			.expect_current_authorities()
1268			.once()
1269			.return_const(AuthorityList::default());
1270		let target_block = BlockBuilderBuilder::new(&*client)
1271			.on_parent_block(client.chain_info().best_hash)
1272			.with_parent_block_number(client.chain_info().best_number)
1273			.build()
1274			.unwrap()
1275			.build()
1276			.unwrap()
1277			.block;
1278		let target_header = target_block.header().clone();
1279		let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1280		// Warp proof is partial.
1281		{
1282			let target_header = target_header.clone();
1283			let justifications = justifications.clone();
1284			provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1285				Ok(VerificationResult::Partial(
1286					set_id,
1287					authorities,
1288					target_header.hash(),
1289					vec![(target_header, justifications)],
1290				))
1291			});
1292		}
1293		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1294		let mut warp_sync = WarpSync::new(
1295			client,
1296			config,
1297			Some(ProtocolName::Static("")),
1298			Arc::new(MockBlockDownloader::new()),
1299			None,
1300		);
1301
1302		// Make sure we have enough peers to make a request.
1303		for best_number in 1..11 {
1304			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1305		}
1306		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1307
1308		let network_provider = NetworkServiceProvider::new();
1309		let network_handle = network_provider.handle();
1310
1311		// Consume `SendWarpProofRequest` action.
1312		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1313		assert_eq!(actions.len(), 1);
1314		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1315			panic!("Invalid action");
1316		};
1317
1318		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1319
1320		assert_eq!(warp_sync.actions.len(), 1);
1321		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1322		else {
1323			panic!("Expected `ImportBlocks` action.");
1324		};
1325		assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1326		assert_eq!(blocks.len(), 1);
1327		let import_block = blocks.pop().unwrap();
1328		assert_eq!(
1329			import_block,
1330			IncomingBlock {
1331				hash: target_header.hash(),
1332				header: Some(target_header),
1333				body: None,
1334				indexed_body: None,
1335				justifications: Some(justifications),
1336				origin: Some(request_peer_id),
1337				allow_missing_state: true,
1338				skip_execution: true,
1339				import_existing: false,
1340				state: None,
1341			}
1342		);
1343		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1344	}
1345
1346	#[test]
1347	fn complete_warp_proof_advances_phase() {
1348		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1349		let mut provider = MockWarpSyncProvider::<Block>::new();
1350		provider
1351			.expect_current_authorities()
1352			.once()
1353			.return_const(AuthorityList::default());
1354		let target_block = BlockBuilderBuilder::new(&*client)
1355			.on_parent_block(client.chain_info().best_hash)
1356			.with_parent_block_number(client.chain_info().best_number)
1357			.build()
1358			.unwrap()
1359			.build()
1360			.unwrap()
1361			.block;
1362		let target_header = target_block.header().clone();
1363		let justifications = Justifications::new(vec![(GRANDPA_ENGINE_ID, vec![1, 2, 3, 4, 5])]);
1364		// Warp proof is complete.
1365		{
1366			let target_header = target_header.clone();
1367			let justifications = justifications.clone();
1368			provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1369				Ok(VerificationResult::Complete(
1370					set_id,
1371					authorities,
1372					target_header.clone(),
1373					vec![(target_header, justifications)],
1374				))
1375			});
1376		}
1377		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1378		let mut warp_sync = WarpSync::new(
1379			client,
1380			config,
1381			Some(ProtocolName::Static("")),
1382			Arc::new(MockBlockDownloader::new()),
1383			None,
1384		);
1385
1386		// Make sure we have enough peers to make a request.
1387		for best_number in 1..11 {
1388			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1389		}
1390		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1391
1392		let network_provider = NetworkServiceProvider::new();
1393		let network_handle = network_provider.handle();
1394
1395		// Consume `SendWarpProofRequest` action.
1396		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1397		assert_eq!(actions.len(), 1);
1398		let SyncingAction::StartRequest { peer_id: request_peer_id, .. } = actions[0] else {
1399			panic!("Invalid action.");
1400		};
1401
1402		warp_sync.on_warp_proof_response(&request_peer_id, EncodedProof(Vec::new()));
1403
1404		assert_eq!(warp_sync.actions.len(), 1);
1405		let SyncingAction::ImportBlocks { origin, mut blocks } = warp_sync.actions.pop().unwrap()
1406		else {
1407			panic!("Expected `ImportBlocks` action.");
1408		};
1409		assert_eq!(origin, BlockOrigin::NetworkInitialSync);
1410		assert_eq!(blocks.len(), 1);
1411		let import_block = blocks.pop().unwrap();
1412		assert_eq!(
1413			import_block,
1414			IncomingBlock {
1415				hash: target_header.hash(),
1416				header: Some(target_header),
1417				body: None,
1418				indexed_body: None,
1419				justifications: Some(justifications),
1420				origin: Some(request_peer_id),
1421				allow_missing_state: true,
1422				skip_execution: true,
1423				import_existing: false,
1424				state: None,
1425			}
1426		);
1427		assert!(
1428			matches!(warp_sync.phase, Phase::TargetBlock(header) if header == *target_block.header())
1429		);
1430	}
1431
1432	#[test]
1433	fn no_target_block_requests_in_another_phase() {
1434		let client = mock_client_without_state();
1435		let mut provider = MockWarpSyncProvider::<Block>::new();
1436		provider
1437			.expect_current_authorities()
1438			.once()
1439			.return_const(AuthorityList::default());
1440		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1441		let mut warp_sync = WarpSync::new(
1442			Arc::new(client),
1443			config,
1444			None,
1445			Arc::new(MockBlockDownloader::new()),
1446			None,
1447		);
1448
1449		// Make sure we have enough peers to make a request.
1450		for best_number in 1..11 {
1451			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1452		}
1453		// We are not in `Phase::TargetBlock`
1454		assert!(matches!(warp_sync.phase, Phase::WarpProof { .. }));
1455
1456		// No request is made.
1457		assert!(warp_sync.target_block_request().is_none());
1458	}
1459
1460	#[test]
1461	fn target_block_request_is_correct() {
1462		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1463		let mut provider = MockWarpSyncProvider::<Block>::new();
1464		provider
1465			.expect_current_authorities()
1466			.once()
1467			.return_const(AuthorityList::default());
1468		let target_block = BlockBuilderBuilder::new(&*client)
1469			.on_parent_block(client.chain_info().best_hash)
1470			.with_parent_block_number(client.chain_info().best_number)
1471			.build()
1472			.unwrap()
1473			.build()
1474			.unwrap()
1475			.block;
1476		let target_header = target_block.header().clone();
1477		// Warp proof is complete.
1478		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1479			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1480		});
1481		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1482		let mut warp_sync =
1483			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1484
1485		// Make sure we have enough peers to make a request.
1486		for best_number in 1..11 {
1487			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1488		}
1489
1490		// Manually set `TargetBlock` phase.
1491		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1492
1493		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1494		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1495		assert_eq!(
1496			request.fields,
1497			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1498		);
1499		assert_eq!(request.max, Some(1));
1500	}
1501
1502	#[test]
1503	fn externally_set_target_block_is_requested() {
1504		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1505		let target_block = BlockBuilderBuilder::new(&*client)
1506			.on_parent_block(client.chain_info().best_hash)
1507			.with_parent_block_number(client.chain_info().best_number)
1508			.build()
1509			.unwrap()
1510			.build()
1511			.unwrap()
1512			.block;
1513		let target_header = target_block.header().clone();
1514		let config = WarpSyncConfig::WithTarget(target_header);
1515		let mut warp_sync =
1516			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1517
1518		// Make sure we have enough peers to make a request.
1519		for best_number in 1..11 {
1520			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1521		}
1522
1523		assert!(matches!(warp_sync.phase, Phase::TargetBlock(_)));
1524
1525		let (_peer_id, request) = warp_sync.target_block_request().unwrap();
1526		assert_eq!(request.from, FromBlock::Hash(target_block.header().hash()));
1527		assert_eq!(
1528			request.fields,
1529			BlockAttributes::HEADER | BlockAttributes::BODY | BlockAttributes::JUSTIFICATION
1530		);
1531		assert_eq!(request.max, Some(1));
1532	}
1533
1534	#[test]
1535	fn no_parallel_target_block_requests() {
1536		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1537		let mut provider = MockWarpSyncProvider::<Block>::new();
1538		provider
1539			.expect_current_authorities()
1540			.once()
1541			.return_const(AuthorityList::default());
1542		let target_block = BlockBuilderBuilder::new(&*client)
1543			.on_parent_block(client.chain_info().best_hash)
1544			.with_parent_block_number(client.chain_info().best_number)
1545			.build()
1546			.unwrap()
1547			.build()
1548			.unwrap()
1549			.block;
1550		let target_header = target_block.header().clone();
1551		// Warp proof is complete.
1552		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1553			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1554		});
1555		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1556		let mut warp_sync =
1557			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1558
1559		// Make sure we have enough peers to make a request.
1560		for best_number in 1..11 {
1561			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1562		}
1563
1564		// Manually set `TargetBlock` phase.
1565		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1566
1567		// First target block request is made.
1568		assert!(warp_sync.target_block_request().is_some());
1569		// No parallel request is made.
1570		assert!(warp_sync.target_block_request().is_none());
1571	}
1572
1573	#[test]
1574	fn target_block_response_with_no_blocks_drops_peer() {
1575		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1576		let mut provider = MockWarpSyncProvider::<Block>::new();
1577		provider
1578			.expect_current_authorities()
1579			.once()
1580			.return_const(AuthorityList::default());
1581		let target_block = BlockBuilderBuilder::new(&*client)
1582			.on_parent_block(client.chain_info().best_hash)
1583			.with_parent_block_number(client.chain_info().best_number)
1584			.build()
1585			.unwrap()
1586			.build()
1587			.unwrap()
1588			.block;
1589		let target_header = target_block.header().clone();
1590		// Warp proof is complete.
1591		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1592			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1593		});
1594		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1595		let mut warp_sync =
1596			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1597
1598		// Make sure we have enough peers to make a request.
1599		for best_number in 1..11 {
1600			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1601		}
1602
1603		// Manually set `TargetBlock` phase.
1604		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1605
1606		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1607
1608		// Empty block response received.
1609		let response = Vec::new();
1610		// Peer is dropped.
1611		assert!(matches!(
1612			warp_sync.on_block_response_inner(peer_id, request, response),
1613			Err(BadPeer(id, _rep)) if id == peer_id,
1614		));
1615	}
1616
1617	#[test]
1618	fn target_block_response_with_extra_blocks_drops_peer() {
1619		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1620		let mut provider = MockWarpSyncProvider::<Block>::new();
1621		provider
1622			.expect_current_authorities()
1623			.once()
1624			.return_const(AuthorityList::default());
1625		let target_block = BlockBuilderBuilder::new(&*client)
1626			.on_parent_block(client.chain_info().best_hash)
1627			.with_parent_block_number(client.chain_info().best_number)
1628			.build()
1629			.unwrap()
1630			.build()
1631			.unwrap()
1632			.block;
1633
1634		let mut extra_block_builder = BlockBuilderBuilder::new(&*client)
1635			.on_parent_block(client.chain_info().best_hash)
1636			.with_parent_block_number(client.chain_info().best_number)
1637			.build()
1638			.unwrap();
1639		extra_block_builder
1640			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1641			.unwrap();
1642		let extra_block = extra_block_builder.build().unwrap().block;
1643
1644		let target_header = target_block.header().clone();
1645		// Warp proof is complete.
1646		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1647			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1648		});
1649		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1650		let mut warp_sync =
1651			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1652
1653		// Make sure we have enough peers to make a request.
1654		for best_number in 1..11 {
1655			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1656		}
1657
1658		// Manually set `TargetBlock` phase.
1659		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1660
1661		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1662
1663		// Block response with extra blocks received.
1664		let response = vec![
1665			BlockData::<Block> {
1666				hash: target_block.header().hash(),
1667				header: Some(target_block.header().clone()),
1668				body: Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1669				indexed_body: None,
1670				receipt: None,
1671				message_queue: None,
1672				justification: None,
1673				justifications: None,
1674			},
1675			BlockData::<Block> {
1676				hash: extra_block.header().hash(),
1677				header: Some(extra_block.header().clone()),
1678				body: Some(extra_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1679				indexed_body: None,
1680				receipt: None,
1681				message_queue: None,
1682				justification: None,
1683				justifications: None,
1684			},
1685		];
1686		// Peer is dropped.
1687		assert!(matches!(
1688			warp_sync.on_block_response_inner(peer_id, request, response),
1689			Err(BadPeer(id, _rep)) if id == peer_id,
1690		));
1691	}
1692
1693	#[test]
1694	fn target_block_response_with_wrong_block_drops_peer() {
1695		pezsp_tracing::try_init_simple();
1696
1697		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1698		let mut provider = MockWarpSyncProvider::<Block>::new();
1699		provider
1700			.expect_current_authorities()
1701			.once()
1702			.return_const(AuthorityList::default());
1703		let target_block = BlockBuilderBuilder::new(&*client)
1704			.on_parent_block(client.chain_info().best_hash)
1705			.with_parent_block_number(client.chain_info().best_number)
1706			.build()
1707			.unwrap()
1708			.build()
1709			.unwrap()
1710			.block;
1711
1712		let mut wrong_block_builder = BlockBuilderBuilder::new(&*client)
1713			.on_parent_block(client.chain_info().best_hash)
1714			.with_parent_block_number(client.chain_info().best_number)
1715			.build()
1716			.unwrap();
1717		wrong_block_builder
1718			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1719			.unwrap();
1720		let wrong_block = wrong_block_builder.build().unwrap().block;
1721
1722		let target_header = target_block.header().clone();
1723		// Warp proof is complete.
1724		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1725			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1726		});
1727		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1728		let mut warp_sync =
1729			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1730
1731		// Make sure we have enough peers to make a request.
1732		for best_number in 1..11 {
1733			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1734		}
1735
1736		// Manually set `TargetBlock` phase.
1737		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1738
1739		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1740
1741		// Wrong block received.
1742		let response = vec![BlockData::<Block> {
1743			hash: wrong_block.header().hash(),
1744			header: Some(wrong_block.header().clone()),
1745			body: Some(wrong_block.extrinsics().iter().cloned().collect::<Vec<_>>()),
1746			indexed_body: None,
1747			receipt: None,
1748			message_queue: None,
1749			justification: None,
1750			justifications: None,
1751		}];
1752		// Peer is dropped.
1753		assert!(matches!(
1754			warp_sync.on_block_response_inner(peer_id, request, response),
1755			Err(BadPeer(id, _rep)) if id == peer_id,
1756		));
1757	}
1758
1759	#[test]
1760	fn correct_target_block_response_sets_strategy_result() {
1761		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
1762		let mut provider = MockWarpSyncProvider::<Block>::new();
1763		provider
1764			.expect_current_authorities()
1765			.once()
1766			.return_const(AuthorityList::default());
1767		let mut target_block_builder = BlockBuilderBuilder::new(&*client)
1768			.on_parent_block(client.chain_info().best_hash)
1769			.with_parent_block_number(client.chain_info().best_number)
1770			.build()
1771			.unwrap();
1772		target_block_builder
1773			.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6]))
1774			.unwrap();
1775		let target_block = target_block_builder.build().unwrap().block;
1776		let target_header = target_block.header().clone();
1777		// Warp proof is complete.
1778		provider.expect_verify().return_once(move |_proof, set_id, authorities| {
1779			Ok(VerificationResult::Complete(set_id, authorities, target_header, Default::default()))
1780		});
1781		let config = WarpSyncConfig::WithProvider(Arc::new(provider));
1782		let mut warp_sync =
1783			WarpSync::new(client, config, None, Arc::new(MockBlockDownloader::new()), None);
1784
1785		// Make sure we have enough peers to make a request.
1786		for best_number in 1..11 {
1787			warp_sync.add_peer(PeerId::random(), Hash::random(), best_number);
1788		}
1789
1790		// Manually set `TargetBlock` phase.
1791		warp_sync.phase = Phase::TargetBlock(target_block.header().clone());
1792
1793		let (peer_id, request) = warp_sync.target_block_request().unwrap();
1794
1795		// Correct block received.
1796		let body = Some(target_block.extrinsics().iter().cloned().collect::<Vec<_>>());
1797		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
1798		let response = vec![BlockData::<Block> {
1799			hash: target_block.header().hash(),
1800			header: Some(target_block.header().clone()),
1801			body: body.clone(),
1802			indexed_body: None,
1803			receipt: None,
1804			message_queue: None,
1805			justification: None,
1806			justifications: justifications.clone(),
1807		}];
1808
1809		assert!(warp_sync.on_block_response_inner(peer_id, request, response).is_ok());
1810
1811		let network_provider = NetworkServiceProvider::new();
1812		let network_handle = network_provider.handle();
1813
1814		// Strategy finishes.
1815		let actions = warp_sync.actions(&network_handle).collect::<Vec<_>>();
1816		assert_eq!(actions.len(), 1);
1817		assert!(matches!(actions[0], SyncingAction::Finished));
1818
1819		// With correct result.
1820		let result = warp_sync.take_result().unwrap();
1821		assert_eq!(result.target_header, *target_block.header());
1822		assert_eq!(result.target_body, body);
1823		assert_eq!(result.target_justifications, justifications);
1824	}
1825}