Skip to main content

soil_network/sync/strategy/
chain_sync.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Contains the state of the chain synchronization process
8//!
9//! At any given point in time, a running node tries as much as possible to be at the head of the
10//! chain. This module handles the logic of which blocks to request from remotes, and processing
11//! responses. It yields blocks to check and potentially move to the database.
12//!
13//! # Usage
14//!
15//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
16//! the network, or whenever a block has been successfully verified, call the appropriate method in
17//! order to update it.
18
19use crate::{
20	sync::{
21		block_relay_protocol::{BlockDownloader, BlockResponseError},
22		blocks::BlockCollection,
23		justification_requests::ExtraRequests,
24		schema::v1::{StateRequest, StateResponse},
25		service::network::NetworkServiceHandle,
26		strategy::{
27			disconnected_peers::DisconnectedPeers,
28			state_sync::{ImportResult, StateSync, StateSyncProvider},
29			warp::{WarpSyncPhase, WarpSyncProgress},
30			StrategyKey, SyncingAction, SyncingStrategy,
31		},
32		types::{BadPeer, SyncState, SyncStatus},
33	},
34	LOG_TARGET,
35};
36
37use codec::Encode;
38use futures::{channel::oneshot, FutureExt};
39use log::{debug, error, info, trace, warn};
40use soil_prometheus::{register, Gauge, PrometheusError, Registry, U64};
41use prost::Message;
42use soil_client::blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use soil_client::client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
44use soil_client::consensus::{BlockOrigin, BlockStatus};
45use soil_client::import::{BlockImportError, BlockImportStatus, IncomingBlock};
46use soil_network::common::sync::message::{
47	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
48};
49use soil_network::types::PeerId;
50use soil_network::{IfDisconnected, ProtocolName};
51use subsoil::arithmetic::traits::Saturating;
52use subsoil::runtime::{
53	traits::{
54		Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
55	},
56	EncodedJustification, Justifications,
57};
58
59use std::{
60	any::Any,
61	collections::{HashMap, HashSet},
62	fmt,
63	ops::{AddAssign, Range},
64	sync::Arc,
65};
66
67#[cfg(test)]
68mod test;
69
70/// Maximum blocks to store in the import queue.
71const MAX_IMPORTING_BLOCKS: usize = 2048;
72
73/// Maximum blocks to download ahead of any gap.
74const MAX_DOWNLOAD_AHEAD: u32 = 2048;
75
76/// Maximum blocks to look backwards. The gap is the difference between the highest block and the
77/// common block of a node.
78const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
79
80/// Pick the state to sync as the latest finalized number minus this.
81const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
82
83/// We use a heuristic that with a high likelihood, by the time
84/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same
85/// chain as (or at least closer to) the peer so we want to delay
86/// the ancestor search to not waste time doing that when we are
87/// so far behind.
88const MAJOR_SYNC_BLOCKS: u8 = 5;
89
90mod rep {
91	use soil_network::ReputationChange as Rep;
92	/// Reputation change when a peer sent us a message that led to a
93	/// database read error.
94	pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
95
96	/// Reputation change when a peer sent us a status message with a different
97	/// genesis than us.
98	pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
99
100	/// Reputation change for peers which send us a block with an incomplete header.
101	pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
102
103	/// Reputation change for peers which send us a block which we fail to verify.
104	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
105
106	/// Reputation change for peers which send us a known bad block.
107	pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
108
109	/// Peer did not provide us with advertised block data.
110	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
111
112	/// Reputation change for peers which send us non-requested block data.
113	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
114
115	/// Reputation change for peers which send us a block with bad justifications.
116	pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
117
118	/// Reputation change when a peer sent us invalid ancestry result.
119	pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
120
121	/// Peer response data does not have requested bits.
122	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
123
124	/// We received a message that failed to decode.
125	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
126}
127
128struct Metrics {
129	queued_blocks: Gauge<U64>,
130	fork_targets: Gauge<U64>,
131}
132
133impl Metrics {
134	fn register(r: &Registry) -> Result<Self, PrometheusError> {
135		Ok(Self {
136			queued_blocks: {
137				let g =
138					Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
139				register(g, r)?
140			},
141			fork_targets: {
142				let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
143				register(g, r)?
144			},
145		})
146	}
147}
148
149#[derive(Debug, Clone)]
150enum AllowedRequests {
151	Some(HashSet<PeerId>),
152	All,
153}
154
155impl AllowedRequests {
156	fn add(&mut self, id: &PeerId) {
157		if let Self::Some(ref mut set) = self {
158			set.insert(*id);
159		}
160	}
161
162	fn take(&mut self) -> Self {
163		std::mem::take(self)
164	}
165
166	fn set_all(&mut self) {
167		*self = Self::All;
168	}
169
170	fn contains(&self, id: &PeerId) -> bool {
171		match self {
172			Self::Some(set) => set.contains(id),
173			Self::All => true,
174		}
175	}
176
177	fn is_empty(&self) -> bool {
178		match self {
179			Self::Some(set) => set.is_empty(),
180			Self::All => false,
181		}
182	}
183
184	fn clear(&mut self) {
185		std::mem::take(self);
186	}
187}
188
189impl Default for AllowedRequests {
190	fn default() -> Self {
191		Self::Some(HashSet::default())
192	}
193}
194
195/// Statistics for gap sync operations.
196#[derive(Debug, Default, Clone)]
197struct GapSyncStats {
198	/// Size of headers downloaded during gap sync
199	header_bytes: usize,
200	/// Size of bodies downloaded during gap sync
201	body_bytes: usize,
202	/// Size of justifications downloaded during gap sync
203	justification_bytes: usize,
204}
205
206impl GapSyncStats {
207	fn new() -> Self {
208		Self::default()
209	}
210
211	fn total_bytes(&self) -> usize {
212		self.header_bytes + self.body_bytes + self.justification_bytes
213	}
214
215	fn bytes_to_mib(bytes: usize) -> f64 {
216		bytes as f64 / (1024.0 * 1024.0)
217	}
218}
219
220impl fmt::Display for GapSyncStats {
221	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222		let total = self.total_bytes();
223		write!(
224			f,
225			"hdr: {} B ({:.2} MiB), body: {} B ({:.2} MiB), just: {} B ({:.2} MiB) | total: {} B ({:.2} MiB)",
226			self.header_bytes,
227			Self::bytes_to_mib(self.header_bytes),
228			self.body_bytes,
229			Self::bytes_to_mib(self.body_bytes),
230			self.justification_bytes,
231			Self::bytes_to_mib(self.justification_bytes),
232			total,
233			Self::bytes_to_mib(total),
234		)
235	}
236}
237
238impl AddAssign for GapSyncStats {
239	fn add_assign(&mut self, other: Self) {
240		self.header_bytes += other.header_bytes;
241		self.body_bytes += other.body_bytes;
242		self.justification_bytes += other.justification_bytes;
243	}
244}
245
246struct GapSync<B: BlockT> {
247	blocks: BlockCollection<B>,
248	best_queued_number: NumberFor<B>,
249	target: NumberFor<B>,
250	stats: GapSyncStats,
251}
252
253/// Sync operation mode.
254#[derive(Copy, Clone, Debug, Eq, PartialEq)]
255pub enum ChainSyncMode {
256	/// Full block download and verification.
257	Full,
258	/// Download blocks and the latest state.
259	LightState {
260		/// Skip state proof download and verification.
261		skip_proofs: bool,
262		/// Download indexed transactions for recent blocks.
263		storage_chain_mode: bool,
264	},
265}
266
267impl ChainSyncMode {
268	/// Returns the base block attributes required for this sync mode.
269	pub fn required_block_attributes(&self, is_gap: bool, is_archive: bool) -> BlockAttributes {
270		let attrs = match self {
271			ChainSyncMode::Full => {
272				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
273			},
274			ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
275				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
276			},
277			ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
278				BlockAttributes::HEADER
279					| BlockAttributes::JUSTIFICATION
280					| BlockAttributes::INDEXED_BODY
281			},
282		};
283		// Skip body requests for gap sync only if not in archive mode.
284		// Archive nodes need bodies to maintain complete block history.
285		if is_gap && !is_archive {
286			attrs & !BlockAttributes::BODY
287		} else {
288			attrs
289		}
290	}
291}
292
293/// All the data we have about a Peer that we are trying to sync with
294#[derive(Debug, Clone)]
295pub(crate) struct PeerSync<B: BlockT> {
296	/// Peer id of this peer.
297	pub peer_id: PeerId,
298	/// The common number is the block number that is a common point of
299	/// ancestry for both our chains (as far as we know).
300	pub common_number: NumberFor<B>,
301	/// The hash of the best block that we've seen for this peer.
302	pub best_hash: B::Hash,
303	/// The number of the best block that we've seen for this peer.
304	pub best_number: NumberFor<B>,
305	/// The state of syncing this peer is in for us, generally categories
306	/// into `Available` or "busy" with something as defined by `PeerSyncState`.
307	pub state: PeerSyncState<B>,
308}
309
310impl<B: BlockT> PeerSync<B> {
311	/// Update the `common_number` iff `new_common > common_number`.
312	fn update_common_number(&mut self, new_common: NumberFor<B>) {
313		if self.common_number < new_common {
314			trace!(
315				target: LOG_TARGET,
316				"Updating peer {} common number from={} => to={}.",
317				self.peer_id,
318				self.common_number,
319				new_common,
320			);
321			self.common_number = new_common;
322		}
323	}
324}
325
326struct ForkTarget<B: BlockT> {
327	number: NumberFor<B>,
328	parent_hash: Option<B::Hash>,
329	peers: HashSet<PeerId>,
330}
331
332/// The state of syncing between a Peer and ourselves.
333///
334/// Generally two categories, "busy" or `Available`. If busy, the enum
335/// defines what we are busy with.
336#[derive(Copy, Clone, Eq, PartialEq, Debug)]
337pub(crate) enum PeerSyncState<B: BlockT> {
338	/// Available for sync requests.
339	Available,
340	/// Searching for ancestors the Peer has in common with us.
341	AncestorSearch {
342		/// The best queued number when starting the ancestor search.
343		start: NumberFor<B>,
344		/// The current block that is being downloaded.
345		current: NumberFor<B>,
346		/// The state of the search.
347		state: AncestorSearchState<B>,
348	},
349	/// Actively downloading new blocks, starting from the given Number.
350	DownloadingNew(NumberFor<B>),
351	/// Downloading a stale block with given Hash. Stale means that it is a
352	/// block with a number that is lower than our best number. It might be
353	/// from a fork and not necessarily already imported.
354	DownloadingStale(B::Hash),
355	/// Downloading justification for given block hash.
356	DownloadingJustification(B::Hash),
357	/// Downloading state.
358	DownloadingState,
359	/// Actively downloading block history after warp sync.
360	DownloadingGap(NumberFor<B>),
361}
362
363impl<B: BlockT> PeerSyncState<B> {
364	pub fn is_available(&self) -> bool {
365		matches!(self, Self::Available)
366	}
367}
368
369/// The main data structure which contains all the state for a chains
370/// active syncing strategy.
371pub struct ChainSync<B: BlockT, Client> {
372	/// Chain client.
373	client: Arc<Client>,
374	/// The active peers that we are using to sync and their PeerSync status
375	peers: HashMap<PeerId, PeerSync<B>>,
376	disconnected_peers: DisconnectedPeers,
377	/// A `BlockCollection` of blocks that are being downloaded from peers
378	blocks: BlockCollection<B>,
379	/// The best block number in our queue of blocks to import
380	best_queued_number: NumberFor<B>,
381	/// The best block hash in our queue of blocks to import
382	best_queued_hash: B::Hash,
383	/// Current mode (full/light)
384	mode: ChainSyncMode,
385	/// Any extra justification requests.
386	extra_justifications: ExtraRequests<B>,
387	/// A set of hashes of blocks that are being downloaded or have been
388	/// downloaded and are queued for import.
389	queue_blocks: HashSet<B::Hash>,
390	/// A pending attempt to start the state sync.
391	///
392	/// The initiation of state sync may be deferred in cases where other conditions
393	/// are not yet met when the finalized block notification is received, such as
394	/// when `queue_blocks` is not empty or there are no peers. This field holds the
395	/// necessary information to attempt the state sync at a later point when
396	/// conditions are satisfied.
397	pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
398	/// Fork sync targets.
399	fork_targets: HashMap<B::Hash, ForkTarget<B>>,
400	/// A set of peers for which there might be potential block requests
401	allowed_requests: AllowedRequests,
402	/// Maximum number of peers to ask the same blocks in parallel.
403	max_parallel_downloads: u32,
404	/// Maximum blocks per request.
405	max_blocks_per_request: u32,
406	/// Protocol name used to send out state requests
407	state_request_protocol_name: ProtocolName,
408	/// Total number of downloaded blocks.
409	downloaded_blocks: usize,
410	/// State sync in progress, if any.
411	state_sync: Option<StateSync<B, Client>>,
412	/// Enable importing existing blocks. This is used after the state download to
413	/// catch up to the latest state while re-importing blocks.
414	import_existing: bool,
415	/// Block downloader
416	block_downloader: Arc<dyn BlockDownloader<B>>,
417	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
418	/// block history.
419	archive_blocks: bool,
420	/// Gap download process.
421	gap_sync: Option<GapSync<B>>,
422	/// Pending actions.
423	actions: Vec<SyncingAction<B>>,
424	/// Prometheus metrics.
425	metrics: Option<Metrics>,
426}
427
428impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
429where
430	B: BlockT,
431	Client: HeaderBackend<B>
432		+ BlockBackend<B>
433		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
434		+ ProofProvider<B>
435		+ Send
436		+ Sync
437		+ 'static,
438{
439	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
440		match self.add_peer_inner(peer_id, best_hash, best_number) {
441			Ok(Some(request)) => {
442				let action = self.create_block_request_action(peer_id, request);
443				self.actions.push(action);
444			},
445			Ok(None) => {},
446			Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
447		}
448	}
449
450	fn remove_peer(&mut self, peer_id: &PeerId) {
451		self.blocks.clear_peer_download(peer_id);
452		if let Some(gap_sync) = &mut self.gap_sync {
453			gap_sync.blocks.clear_peer_download(peer_id)
454		}
455
456		if let Some(state) = self.peers.remove(peer_id) {
457			if !state.state.is_available() {
458				if let Some(bad_peer) =
459					self.disconnected_peers.on_disconnect_during_request(*peer_id)
460				{
461					self.actions.push(SyncingAction::DropPeer(bad_peer));
462				}
463			}
464		}
465
466		self.extra_justifications.peer_disconnected(peer_id);
467		self.allowed_requests.set_all();
468		self.fork_targets.retain(|_, target| {
469			target.peers.remove(peer_id);
470			!target.peers.is_empty()
471		});
472		if let Some(metrics) = &self.metrics {
473			metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
474		}
475
476		let blocks = self.ready_blocks();
477
478		if !blocks.is_empty() {
479			self.validate_and_queue_blocks(blocks, false);
480		}
481	}
482
483	fn on_validated_block_announce(
484		&mut self,
485		is_best: bool,
486		peer_id: PeerId,
487		announce: &BlockAnnounce<B::Header>,
488	) -> Option<(B::Hash, NumberFor<B>)> {
489		let number = *announce.header.number();
490		let hash = announce.header.hash();
491		let parent_status =
492			self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
493		let known_parent = parent_status != BlockStatus::Unknown;
494		let ancient_parent = parent_status == BlockStatus::InChainPruned;
495
496		let known = self.is_known(&hash);
497		let is_major_syncing = self.is_major_syncing();
498		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
499			peer
500		} else {
501			error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
502			return Some((hash, number));
503		};
504
505		if let PeerSyncState::AncestorSearch { .. } = peer.state {
506			trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
507			return None;
508		}
509
510		// The node is continuing a known fork if either the block itself is known, the
511		// parent is known or the block references the previously announced `best_hash`.
512		let continues_known_fork =
513			known || known_parent || announce.header.parent_hash() == &peer.best_hash;
514
515		let peer_info = is_best.then(|| {
516			// update their best block
517			peer.best_number = number;
518			peer.best_hash = hash;
519
520			(hash, number)
521		});
522
523		// If the announced block is the best they have and is not ahead of us, our common number
524		// is either one further ahead or it's the one they just announced, if we know about it.
525		if is_best {
526			let best_queued_number = self.best_queued_number;
527
528			if known && best_queued_number >= number {
529				peer.update_common_number(number);
530			} else if announce.header.parent_hash() == &self.best_queued_hash
531				|| known_parent && best_queued_number >= number
532			{
533				peer.update_common_number(number.saturating_sub(One::one()));
534			}
535
536			// If this announced block isn't following any known fork, we have to start an
537			// ancestor search to find out our real common block. However, we skip this during
538			// major sync to avoid pulling peers out of the download pool.
539			if !continues_known_fork && !is_major_syncing {
540				let current = number.min(best_queued_number);
541				peer.common_number = peer.common_number.min(self.client.info().finalized_number);
542				peer.state = PeerSyncState::AncestorSearch {
543					current,
544					start: best_queued_number,
545					state: AncestorSearchState::ExponentialBackoff(One::one()),
546				};
547
548				let request = ancestry_request::<B>(current);
549				let action = self.create_block_request_action(peer_id, request);
550				self.actions.push(action);
551
552				return peer_info;
553			}
554		}
555		self.allowed_requests.add(&peer_id);
556
557		// known block case
558		if known || self.is_already_downloading(&hash) {
559			trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
560			if let Some(target) = self.fork_targets.get_mut(&hash) {
561				target.peers.insert(peer_id);
562			}
563			return peer_info;
564		}
565
566		if ancient_parent {
567			trace!(
568				target: LOG_TARGET,
569				"Ignored ancient block announced from {}: {} {:?}",
570				peer_id,
571				hash,
572				announce.header,
573			);
574			return peer_info;
575		}
576
577		if self.status().state == SyncState::Idle {
578			trace!(
579				target: LOG_TARGET,
580				"Added sync target for block announced from {}: {} {:?}",
581				peer_id,
582				hash,
583				announce.summary(),
584			);
585			self.fork_targets
586				.entry(hash)
587				.or_insert_with(|| {
588					if let Some(metrics) = &self.metrics {
589						metrics.fork_targets.inc();
590					}
591
592					ForkTarget {
593						number,
594						parent_hash: Some(*announce.header.parent_hash()),
595						peers: Default::default(),
596					}
597				})
598				.peers
599				.insert(peer_id);
600		}
601
602		peer_info
603	}
604
605	// The implementation is similar to `on_validated_block_announce` with unknown parent hash.
606	fn set_sync_fork_request(
607		&mut self,
608		mut peers: Vec<PeerId>,
609		hash: &B::Hash,
610		number: NumberFor<B>,
611	) {
612		if peers.is_empty() {
613			peers = self
614				.peers
615				.iter()
616				// Only request blocks from peers who are ahead or on a par.
617				.filter(|(_, peer)| peer.best_number >= number)
618				.map(|(id, _)| *id)
619				.collect();
620
621			debug!(
622				target: LOG_TARGET,
623				"Explicit sync request for block {hash:?} with no peers specified. \
624				Syncing from these peers {peers:?} instead.",
625			);
626		} else {
627			debug!(
628				target: LOG_TARGET,
629				"Explicit sync request for block {hash:?} with {peers:?}",
630			);
631		}
632
633		if self.is_known(hash) {
634			debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
635			return;
636		}
637
638		trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
639		for peer_id in &peers {
640			if let Some(peer) = self.peers.get_mut(peer_id) {
641				if let PeerSyncState::AncestorSearch { .. } = peer.state {
642					continue;
643				}
644
645				if number > peer.best_number {
646					peer.best_number = number;
647					peer.best_hash = *hash;
648				}
649				self.allowed_requests.add(peer_id);
650			}
651		}
652
653		self.fork_targets
654			.entry(*hash)
655			.or_insert_with(|| {
656				if let Some(metrics) = &self.metrics {
657					metrics.fork_targets.inc();
658				}
659
660				ForkTarget { number, peers: Default::default(), parent_hash: None }
661			})
662			.peers
663			.extend(peers);
664	}
665
666	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
667		let client = &self.client;
668		self.extra_justifications
669			.schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
670	}
671
672	fn clear_justification_requests(&mut self) {
673		self.extra_justifications.reset();
674	}
675
676	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
677		let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
678		self.extra_justifications
679			.try_finalize_root((hash, number), finalization_result, true);
680		self.allowed_requests.set_all();
681	}
682
683	fn on_generic_response(
684		&mut self,
685		peer_id: &PeerId,
686		key: StrategyKey,
687		protocol_name: ProtocolName,
688		response: Box<dyn Any + Send>,
689	) {
690		if Self::STRATEGY_KEY != key {
691			warn!(
692				target: LOG_TARGET,
693				"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
694			);
695			debug_assert!(false);
696			return;
697		}
698
699		if protocol_name == self.state_request_protocol_name {
700			let Ok(response) = response.downcast::<Vec<u8>>() else {
701				warn!(target: LOG_TARGET, "Failed to downcast state response");
702				debug_assert!(false);
703				return;
704			};
705
706			if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
707				self.actions.push(SyncingAction::DropPeer(bad_peer));
708			}
709		} else if &protocol_name == self.block_downloader.protocol_name() {
710			let Ok(response) = response
711				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
712			else {
713				warn!(target: LOG_TARGET, "Failed to downcast block response");
714				debug_assert!(false);
715				return;
716			};
717
718			let (request, response) = *response;
719			let blocks = match response {
720				Ok(blocks) => blocks,
721				Err(BlockResponseError::DecodeFailed(e)) => {
722					debug!(
723						target: LOG_TARGET,
724						"Failed to decode block response from peer {:?}: {:?}.",
725						peer_id,
726						e
727					);
728					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
729					return;
730				},
731				Err(BlockResponseError::ExtractionFailed(e)) => {
732					debug!(
733						target: LOG_TARGET,
734						"Failed to extract blocks from peer response {:?}: {:?}.",
735						peer_id,
736						e
737					);
738					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
739					return;
740				},
741			};
742
743			if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
744				self.actions.push(SyncingAction::DropPeer(bad_peer));
745			}
746		} else {
747			warn!(
748				target: LOG_TARGET,
749				"Unexpected generic response protocol {protocol_name}, strategy key \
750				{key:?}",
751			);
752			debug_assert!(false);
753		}
754	}
755
756	fn on_blocks_processed(
757		&mut self,
758		imported: usize,
759		count: usize,
760		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
761	) {
762		trace!(target: LOG_TARGET, "Imported {imported} of {count}");
763
764		let mut has_error = false;
765		for (_, hash) in &results {
766			if self.queue_blocks.remove(hash) {
767				if let Some(metrics) = &self.metrics {
768					metrics.queued_blocks.dec();
769				}
770			}
771			self.blocks.clear_queued(hash);
772			if let Some(gap_sync) = &mut self.gap_sync {
773				gap_sync.blocks.clear_queued(hash);
774			}
775		}
776		for (result, hash) in results {
777			if has_error {
778				break;
779			}
780
781			has_error |= result.is_err();
782
783			match result {
784				Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
785					if let Some(peer) =
786						peer_id.as_ref().and_then(|peer| PeerId::try_from(peer).ok())
787					{
788						self.update_peer_common_number(&peer, number);
789					}
790					self.complete_gap_if_target(number);
791				},
792				Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
793					if aux.clear_justification_requests {
794						trace!(
795							target: LOG_TARGET,
796							"Block imported clears all pending justification requests {number}: {hash:?}",
797						);
798						self.clear_justification_requests();
799					}
800
801					if aux.needs_justification {
802						trace!(
803							target: LOG_TARGET,
804							"Block imported but requires justification {number}: {hash:?}",
805						);
806						self.request_justification(&hash, number);
807					}
808
809					if aux.bad_justification {
810						if let Some(peer) =
811							peer_id.as_ref().and_then(|peer| PeerId::try_from(peer).ok())
812						{
813							warn!("💔 Sent block with bad justification to import");
814							self.actions.push(SyncingAction::DropPeer(BadPeer(
815								peer,
816								rep::BAD_JUSTIFICATION,
817							)));
818						}
819					}
820
821					if let Some(peer) =
822						peer_id.as_ref().and_then(|peer| PeerId::try_from(peer).ok())
823					{
824						self.update_peer_common_number(&peer, number);
825					}
826					let state_sync_complete =
827						self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
828					if state_sync_complete {
829						info!(
830							target: LOG_TARGET,
831							"State sync is complete ({} MiB), restarting block sync.",
832							self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
833						);
834						self.state_sync = None;
835						self.mode = ChainSyncMode::Full;
836						self.restart();
837					}
838
839					self.complete_gap_if_target(number);
840				},
841				Err(BlockImportError::IncompleteHeader(peer_id)) => {
842					if let Some(peer) = peer_id.and_then(|peer| PeerId::try_from(peer).ok()) {
843						warn!(
844							target: LOG_TARGET,
845							"💔 Peer sent block with incomplete header to import",
846						);
847						self.actions
848							.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
849						self.restart();
850					}
851				},
852				Err(BlockImportError::VerificationFailed(peer_id, e)) => {
853					let extra_message = peer_id
854						.as_ref()
855						.map_or_else(|| "".into(), |peer| format!(" received from ({peer:?})"));
856
857					warn!(
858						target: LOG_TARGET,
859						"💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
860					);
861
862					if let Some(peer) = peer_id.and_then(|peer| PeerId::try_from(peer).ok()) {
863						self.actions
864							.push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
865					}
866
867					self.restart();
868				},
869				Err(BlockImportError::BadBlock(peer_id)) => {
870					if let Some(peer) = peer_id.and_then(|peer| PeerId::try_from(peer).ok()) {
871						warn!(
872							target: LOG_TARGET,
873							"💔 Block {hash:?} received from peer {peer:?} has been blacklisted",
874						);
875						self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
876					}
877				},
878				Err(BlockImportError::MissingState) => {
879					// This may happen if the chain we were requesting upon has been discarded
880					// in the meantime because other chain has been finalized.
881					// Don't mark it as bad as it still may be synced if explicitly requested.
882					trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
883				},
884				e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
885					warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
886					self.state_sync = None;
887					self.restart();
888				},
889				Err(BlockImportError::Cancelled) => {},
890			};
891		}
892
893		self.allowed_requests.set_all();
894	}
895
896	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
897		let client = &self.client;
898		let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
899			is_descendent_of(&**client, base, block)
900		});
901
902		if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
903			if self.state_sync.is_none() {
904				if !self.peers.is_empty() && self.queue_blocks.is_empty() {
905					self.attempt_state_sync(*hash, number, *skip_proofs);
906				} else {
907					self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
908				}
909			}
910		}
911
912		if let Err(err) = r {
913			warn!(
914				target: LOG_TARGET,
915				"💔 Error cleaning up pending extra justification data requests: {err}",
916			);
917		}
918	}
919
920	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
921		self.on_block_queued(best_hash, best_number);
922	}
923
924	fn is_major_syncing(&self) -> bool {
925		self.status().state.is_major_syncing()
926	}
927
928	fn num_peers(&self) -> usize {
929		self.peers.len()
930	}
931
932	fn status(&self) -> SyncStatus<B> {
933		let median_seen = self.median_seen();
934		let best_seen_block =
935			median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
936		let sync_state = if let Some(target) = median_seen {
937			// A chain is classified as downloading if the provided best block is
938			// more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing
939			// if the same can be said about queued blocks.
940			let best_block = self.client.info().best_number;
941			if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
942				// If target is not queued, we're downloading, otherwise importing.
943				if target > self.best_queued_number {
944					SyncState::Downloading { target }
945				} else {
946					SyncState::Importing { target }
947				}
948			} else {
949				SyncState::Idle
950			}
951		} else {
952			SyncState::Idle
953		};
954
955		let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
956			phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
957			total_bytes: 0,
958			status: None,
959		});
960
961		SyncStatus {
962			state: sync_state,
963			best_seen_block,
964			num_peers: self.peers.len() as u32,
965			queued_blocks: self.queue_blocks.len() as u32,
966			state_sync: self.state_sync.as_ref().map(|s| s.progress()),
967			warp_sync: warp_sync_progress,
968		}
969	}
970
971	fn num_downloaded_blocks(&self) -> usize {
972		self.downloaded_blocks
973	}
974
975	fn num_sync_requests(&self) -> usize {
976		self.fork_targets
977			.values()
978			.filter(|f| f.number <= self.best_queued_number)
979			.count()
980	}
981
982	fn actions(
983		&mut self,
984		network_service: &NetworkServiceHandle,
985	) -> Result<Vec<SyncingAction<B>>, ClientError> {
986		if !self.peers.is_empty() && self.queue_blocks.is_empty() {
987			if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
988				self.attempt_state_sync(hash, number, skip_proofs);
989			}
990		}
991
992		let block_requests = self
993			.block_requests()
994			.into_iter()
995			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
996			.collect::<Vec<_>>();
997		self.actions.extend(block_requests);
998
999		let justification_requests = self
1000			.justification_requests()
1001			.into_iter()
1002			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
1003			.collect::<Vec<_>>();
1004		self.actions.extend(justification_requests);
1005
1006		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
1007			trace!(
1008				target: LOG_TARGET,
1009				"Created `StateRequest` to {peer_id}.",
1010			);
1011
1012			let (tx, rx) = oneshot::channel();
1013
1014			network_service.start_request(
1015				peer_id,
1016				self.state_request_protocol_name.clone(),
1017				request.encode_to_vec(),
1018				tx,
1019				IfDisconnected::ImmediateError,
1020			);
1021
1022			SyncingAction::StartRequest {
1023				peer_id,
1024				key: Self::STRATEGY_KEY,
1025				request: async move {
1026					Ok(rx.await?.and_then(|(response, protocol_name)| {
1027						Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
1028					}))
1029				}
1030				.boxed(),
1031				remove_obsolete: false,
1032			}
1033		});
1034		self.actions.extend(state_request);
1035
1036		Ok(std::mem::take(&mut self.actions))
1037	}
1038}
1039
1040impl<B, Client> ChainSync<B, Client>
1041where
1042	B: BlockT,
1043	Client: HeaderBackend<B>
1044		+ BlockBackend<B>
1045		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
1046		+ ProofProvider<B>
1047		+ Send
1048		+ Sync
1049		+ 'static,
1050{
1051	/// Strategy key used by chain sync.
1052	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
1053
1054	/// Create a new instance.
1055	pub fn new(
1056		mode: ChainSyncMode,
1057		client: Arc<Client>,
1058		max_parallel_downloads: u32,
1059		max_blocks_per_request: u32,
1060		state_request_protocol_name: ProtocolName,
1061		block_downloader: Arc<dyn BlockDownloader<B>>,
1062		archive_blocks: bool,
1063		metrics_registry: Option<&Registry>,
1064		initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
1065	) -> Result<Self, ClientError> {
1066		let mut sync = Self {
1067			client,
1068			peers: HashMap::new(),
1069			disconnected_peers: DisconnectedPeers::new(),
1070			blocks: BlockCollection::new(),
1071			best_queued_hash: Default::default(),
1072			best_queued_number: Zero::zero(),
1073			extra_justifications: ExtraRequests::new("justification", metrics_registry),
1074			mode,
1075			queue_blocks: Default::default(),
1076			pending_state_sync_attempt: None,
1077			fork_targets: Default::default(),
1078			allowed_requests: Default::default(),
1079			max_parallel_downloads,
1080			max_blocks_per_request,
1081			state_request_protocol_name,
1082			downloaded_blocks: 0,
1083			state_sync: None,
1084			import_existing: false,
1085			block_downloader,
1086			archive_blocks,
1087			gap_sync: None,
1088			actions: Vec::new(),
1089			metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
1090				Ok(metrics) => Some(metrics),
1091				Err(err) => {
1092					log::error!(
1093						target: LOG_TARGET,
1094						"Failed to register `ChainSync` metrics {err:?}",
1095					);
1096					None
1097				},
1098			}),
1099		};
1100
1101		sync.reset_sync_start_point()?;
1102		initial_peers.for_each(|(peer_id, best_hash, best_number)| {
1103			sync.add_peer(peer_id, best_hash, best_number);
1104		});
1105
1106		Ok(sync)
1107	}
1108
1109	/// Complete the gap sync if the target number is reached and there is a gap.
1110	fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
1111		let Some(gap_sync) = &self.gap_sync else { return };
1112
1113		if gap_sync.target != number {
1114			return;
1115		}
1116
1117		info!(
1118			target: LOG_TARGET,
1119			"Block history download is complete.",
1120		);
1121		self.gap_sync = None;
1122	}
1123
1124	#[must_use]
1125	fn add_peer_inner(
1126		&mut self,
1127		peer_id: PeerId,
1128		best_hash: B::Hash,
1129		best_number: NumberFor<B>,
1130	) -> Result<Option<BlockRequest<B>>, BadPeer> {
1131		// There is nothing sync can get from the node that has no blockchain data.
1132		match self.block_status(&best_hash) {
1133			Err(e) => {
1134				debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1135				Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1136			},
1137			Ok(BlockStatus::KnownBad) => {
1138				info!(
1139					"💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1140				);
1141				Err(BadPeer(peer_id, rep::BAD_BLOCK))
1142			},
1143			Ok(BlockStatus::Unknown) => {
1144				if best_number.is_zero() {
1145					info!(
1146						"💔 New peer {} with unknown genesis hash {} ({}).",
1147						peer_id, best_hash, best_number,
1148					);
1149					return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1150				}
1151
1152				// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
1153				// enough to do in the import queue that it's not worth kicking off
1154				// an ancestor search, which is what we do in the next match case below.
1155				if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1156					debug!(
1157						target: LOG_TARGET,
1158						"New peer {} with unknown best hash {} ({}), assuming common block.",
1159						peer_id,
1160						self.best_queued_hash,
1161						self.best_queued_number
1162					);
1163					self.peers.insert(
1164						peer_id,
1165						PeerSync {
1166							peer_id,
1167							common_number: self.best_queued_number,
1168							best_hash,
1169							best_number,
1170							state: PeerSyncState::Available,
1171						},
1172					);
1173					return Ok(None);
1174				}
1175
1176				// If we are at genesis, just start downloading.
1177				let (state, req) = if self.best_queued_number.is_zero() {
1178					debug!(
1179						target: LOG_TARGET,
1180						"New peer {peer_id} with best hash {best_hash} ({best_number}).",
1181					);
1182
1183					(PeerSyncState::Available, None)
1184				} else {
1185					let common_best = std::cmp::min(self.best_queued_number, best_number);
1186
1187					debug!(
1188						target: LOG_TARGET,
1189						"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1190						peer_id,
1191						best_hash,
1192						best_number
1193					);
1194
1195					(
1196						PeerSyncState::AncestorSearch {
1197							current: common_best,
1198							start: self.best_queued_number,
1199							state: AncestorSearchState::ExponentialBackoff(One::one()),
1200						},
1201						Some(ancestry_request::<B>(common_best)),
1202					)
1203				};
1204
1205				self.allowed_requests.add(&peer_id);
1206				self.peers.insert(
1207					peer_id,
1208					PeerSync {
1209						peer_id,
1210						common_number: Zero::zero(),
1211						best_hash,
1212						best_number,
1213						state,
1214					},
1215				);
1216
1217				Ok(req)
1218			},
1219			Ok(BlockStatus::Queued)
1220			| Ok(BlockStatus::InChainWithState)
1221			| Ok(BlockStatus::InChainPruned) => {
1222				debug!(
1223					target: LOG_TARGET,
1224					"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1225				);
1226				self.peers.insert(
1227					peer_id,
1228					PeerSync {
1229						peer_id,
1230						common_number: std::cmp::min(self.best_queued_number, best_number),
1231						best_hash,
1232						best_number,
1233						state: PeerSyncState::Available,
1234					},
1235				);
1236				self.allowed_requests.add(&peer_id);
1237				Ok(None)
1238			},
1239		}
1240	}
1241
1242	fn create_block_request_action(
1243		&mut self,
1244		peer_id: PeerId,
1245		request: BlockRequest<B>,
1246	) -> SyncingAction<B> {
1247		let downloader = self.block_downloader.clone();
1248
1249		SyncingAction::StartRequest {
1250			peer_id,
1251			key: Self::STRATEGY_KEY,
1252			request: async move {
1253				Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1254					|(response, protocol_name)| {
1255						let decoded_response =
1256							downloader.block_response_into_blocks(&request, response);
1257						let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1258						Ok((result, protocol_name))
1259					},
1260				))
1261			}
1262			.boxed(),
1263			// Sending block request implies dropping obsolete pending response as we are not
1264			// interested in it anymore.
1265			remove_obsolete: true,
1266		}
1267	}
1268
1269	/// Submit a block response for processing.
1270	#[must_use]
1271	fn on_block_data(
1272		&mut self,
1273		peer_id: &PeerId,
1274		request: Option<BlockRequest<B>>,
1275		response: BlockResponse<B>,
1276	) -> Result<(), BadPeer> {
1277		self.downloaded_blocks += response.blocks.len();
1278		let mut gap = false;
1279		let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1280			let mut blocks = response.blocks;
1281			if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1282				trace!(target: LOG_TARGET, "Reversing incoming block list");
1283				blocks.reverse()
1284			}
1285			self.allowed_requests.add(peer_id);
1286			if let Some(request) = request {
1287				match &mut peer.state {
1288					PeerSyncState::DownloadingNew(_) => {
1289						self.blocks.clear_peer_download(peer_id);
1290						peer.state = PeerSyncState::Available;
1291						if let Some(start_block) =
1292							validate_blocks::<B>(&blocks, peer_id, Some(request))?
1293						{
1294							self.blocks.insert(start_block, blocks, *peer_id);
1295						}
1296						self.ready_blocks()
1297					},
1298					PeerSyncState::DownloadingGap(_) => {
1299						peer.state = PeerSyncState::Available;
1300						if let Some(gap_sync) = &mut self.gap_sync {
1301							gap_sync.blocks.clear_peer_download(peer_id);
1302							if let Some(start_block) =
1303								validate_blocks::<B>(&blocks, peer_id, Some(request))?
1304							{
1305								gap_sync.blocks.insert(start_block, blocks, *peer_id);
1306							}
1307							gap = true;
1308							let mut batch_gap_sync_stats = GapSyncStats::new();
1309							let blocks: Vec<_> = gap_sync
1310								.blocks
1311								.ready_blocks(gap_sync.best_queued_number + One::one())
1312								.into_iter()
1313								.map(|block_data| {
1314									let justifications =
1315										block_data.block.justifications.or_else(|| {
1316											legacy_justification_mapping(
1317												block_data.block.justification,
1318											)
1319										});
1320									let gap_sync_stats = GapSyncStats {
1321										header_bytes: block_data
1322											.block
1323											.header
1324											.as_ref()
1325											.map(|h| h.encoded_size())
1326											.unwrap_or(0),
1327										body_bytes: block_data
1328											.block
1329											.body
1330											.as_ref()
1331											.map(|b| b.encoded_size())
1332											.unwrap_or(0),
1333										justification_bytes: justifications
1334											.as_ref()
1335											.map(|j| j.encoded_size())
1336											.unwrap_or(0),
1337									};
1338									batch_gap_sync_stats += gap_sync_stats;
1339
1340									IncomingBlock {
1341										hash: block_data.block.hash,
1342										header: block_data.block.header,
1343										body: block_data.block.body,
1344										indexed_body: block_data.block.indexed_body,
1345										justifications,
1346										origin: block_data.origin.map(Into::into),
1347										allow_missing_state: true,
1348										// Warp-synced blocks are header-only. Allow re-import to
1349										// store bodies if gap sync requested them.
1350										import_existing: true,
1351										skip_execution: true,
1352										state: None,
1353									}
1354								})
1355								.collect();
1356
1357							debug!(
1358								target: LOG_TARGET,
1359								"Drained {} gap blocks from {}",
1360								blocks.len(),
1361								gap_sync.best_queued_number,
1362							);
1363
1364							gap_sync.stats += batch_gap_sync_stats;
1365
1366							if blocks.len() > 0 {
1367								trace!(
1368									target: LOG_TARGET,
1369									"Gap sync cumulative stats: {}",
1370									gap_sync.stats
1371								);
1372							}
1373							blocks
1374						} else {
1375							debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1376							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1377						}
1378					},
1379					PeerSyncState::DownloadingStale(_) => {
1380						peer.state = PeerSyncState::Available;
1381						if blocks.is_empty() {
1382							debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1383							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1384						}
1385						validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1386						blocks
1387							.into_iter()
1388							.map(|b| {
1389								let justifications = b
1390									.justifications
1391									.or_else(|| legacy_justification_mapping(b.justification));
1392								IncomingBlock {
1393									hash: b.hash,
1394									header: b.header,
1395									body: b.body,
1396									indexed_body: None,
1397									justifications,
1398									origin: Some((*peer_id).into()),
1399									allow_missing_state: true,
1400									import_existing: self.import_existing,
1401									skip_execution: self.skip_execution(),
1402									state: None,
1403								}
1404							})
1405							.collect()
1406					},
1407					PeerSyncState::AncestorSearch { current, start, state } => {
1408						let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1409							(Some(block), Ok(maybe_our_block_hash)) => {
1410								trace!(
1411									target: LOG_TARGET,
1412									"Got ancestry block #{} ({}) from peer {}",
1413									current,
1414									block.hash,
1415									peer_id,
1416								);
1417								maybe_our_block_hash.filter(|x| x == &block.hash)
1418							},
1419							(None, _) => {
1420								debug!(
1421									target: LOG_TARGET,
1422									"Invalid response when searching for ancestor from {peer_id}",
1423								);
1424								return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1425							},
1426							(_, Err(e)) => {
1427								info!(
1428									target: LOG_TARGET,
1429									"❌ Error answering legitimate blockchain query: {e}",
1430								);
1431								return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1432							},
1433						};
1434						if matching_hash.is_some() {
1435							if *start < self.best_queued_number
1436								&& self.best_queued_number <= peer.best_number
1437							{
1438								// We've made progress on this chain since the search was started.
1439								// Opportunistically set common number to updated number
1440								// instead of the one that started the search.
1441								trace!(
1442									target: LOG_TARGET,
1443									"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1444									*peer_id,
1445									peer.common_number,
1446									self.best_queued_number,
1447								);
1448								peer.common_number = self.best_queued_number;
1449							} else if peer.common_number < *current {
1450								trace!(
1451									target: LOG_TARGET,
1452									"Ancestry search: updating peer {} common number from={} => to={}.",
1453									*peer_id,
1454									peer.common_number,
1455									*current,
1456								);
1457								peer.common_number = *current;
1458							}
1459						}
1460						if matching_hash.is_none() && current.is_zero() {
1461							trace!(
1462								target: LOG_TARGET,
1463								"Ancestry search: genesis mismatch for peer {peer_id}",
1464							);
1465							return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1466						}
1467						if let Some((next_state, next_num)) =
1468							handle_ancestor_search_state(state, *current, matching_hash.is_some())
1469						{
1470							peer.state = PeerSyncState::AncestorSearch {
1471								current: next_num,
1472								start: *start,
1473								state: next_state,
1474							};
1475							let request = ancestry_request::<B>(next_num);
1476							let action = self.create_block_request_action(*peer_id, request);
1477							self.actions.push(action);
1478							return Ok(());
1479						} else {
1480							// Ancestry search is complete. Check if peer is on a stale fork unknown
1481							// to us and add it to sync targets if necessary.
1482							trace!(
1483								target: LOG_TARGET,
1484								"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1485								self.best_queued_hash,
1486								self.best_queued_number,
1487								peer.best_hash,
1488								peer.best_number,
1489								matching_hash,
1490								peer.common_number,
1491							);
1492							if peer.common_number < peer.best_number
1493								&& peer.best_number < self.best_queued_number
1494							{
1495								trace!(
1496									target: LOG_TARGET,
1497									"Added fork target {} for {}",
1498									peer.best_hash,
1499									peer_id,
1500								);
1501								self.fork_targets
1502									.entry(peer.best_hash)
1503									.or_insert_with(|| {
1504										if let Some(metrics) = &self.metrics {
1505											metrics.fork_targets.inc();
1506										}
1507
1508										ForkTarget {
1509											number: peer.best_number,
1510											parent_hash: None,
1511											peers: Default::default(),
1512										}
1513									})
1514									.peers
1515									.insert(*peer_id);
1516							}
1517							peer.state = PeerSyncState::Available;
1518							return Ok(());
1519						}
1520					},
1521					PeerSyncState::Available
1522					| PeerSyncState::DownloadingJustification(..)
1523					| PeerSyncState::DownloadingState => Vec::new(),
1524				}
1525			} else {
1526				// When request.is_none() this is a block announcement. Just accept blocks.
1527				validate_blocks::<B>(&blocks, peer_id, None)?;
1528				blocks
1529					.into_iter()
1530					.map(|b| {
1531						let justifications = b
1532							.justifications
1533							.or_else(|| legacy_justification_mapping(b.justification));
1534						IncomingBlock {
1535							hash: b.hash,
1536							header: b.header,
1537							body: b.body,
1538							indexed_body: None,
1539							justifications,
1540							origin: Some((*peer_id).into()),
1541							allow_missing_state: true,
1542							import_existing: false,
1543							skip_execution: true,
1544							state: None,
1545						}
1546					})
1547					.collect()
1548			}
1549		} else {
1550			// We don't know of this peer, so we also did not request anything from it.
1551			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1552		};
1553
1554		self.validate_and_queue_blocks(new_blocks, gap);
1555
1556		Ok(())
1557	}
1558
1559	fn on_block_response(
1560		&mut self,
1561		peer_id: &PeerId,
1562		key: StrategyKey,
1563		request: BlockRequest<B>,
1564		blocks: Vec<BlockData<B>>,
1565	) -> Result<(), BadPeer> {
1566		if key != Self::STRATEGY_KEY {
1567			error!(
1568				target: LOG_TARGET,
1569				"`on_block_response()` called with unexpected key {key:?} for chain sync",
1570			);
1571			debug_assert!(false);
1572		}
1573		let block_response = BlockResponse::<B> { id: request.id, blocks };
1574
1575		let blocks_range = || match (
1576			block_response
1577				.blocks
1578				.first()
1579				.and_then(|b| b.header.as_ref().map(|h| h.number())),
1580			block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1581		) {
1582			(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1583			(Some(first), Some(_)) => format!(" ({})", first),
1584			_ => Default::default(),
1585		};
1586
1587		trace!(
1588			target: LOG_TARGET,
1589			"BlockResponse {} from {} with {} blocks {}",
1590			block_response.id,
1591			peer_id,
1592			block_response.blocks.len(),
1593			blocks_range(),
1594		);
1595
1596		if request.fields == BlockAttributes::JUSTIFICATION {
1597			self.on_block_justification(*peer_id, block_response)
1598		} else {
1599			self.on_block_data(peer_id, Some(request), block_response)
1600		}
1601	}
1602
1603	/// Submit a justification response for processing.
1604	#[must_use]
1605	fn on_block_justification(
1606		&mut self,
1607		peer_id: PeerId,
1608		response: BlockResponse<B>,
1609	) -> Result<(), BadPeer> {
1610		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1611			peer
1612		} else {
1613			error!(
1614				target: LOG_TARGET,
1615				"💔 Called on_block_justification with a peer ID of an unknown peer",
1616			);
1617			return Ok(());
1618		};
1619
1620		self.allowed_requests.add(&peer_id);
1621		if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1622			peer.state = PeerSyncState::Available;
1623
1624			// We only request one justification at a time
1625			let justification = if let Some(block) = response.blocks.into_iter().next() {
1626				if hash != block.hash {
1627					warn!(
1628						target: LOG_TARGET,
1629						"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1630						peer_id,
1631						hash,
1632						block.hash,
1633					);
1634					return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1635				}
1636
1637				block
1638					.justifications
1639					.or_else(|| legacy_justification_mapping(block.justification))
1640			} else {
1641				// we might have asked the peer for a justification on a block that we assumed it
1642				// had but didn't (regardless of whether it had a justification for it or not).
1643				trace!(
1644					target: LOG_TARGET,
1645					"Peer {peer_id:?} provided empty response for justification request {hash:?}",
1646				);
1647
1648				None
1649			};
1650
1651			if let Some((peer_id, hash, number, justifications)) =
1652				self.extra_justifications.on_response(peer_id, justification)
1653			{
1654				self.actions.push(SyncingAction::ImportJustifications {
1655					peer_id,
1656					hash,
1657					number,
1658					justifications,
1659				});
1660				return Ok(());
1661			}
1662		}
1663
1664		Ok(())
1665	}
1666
1667	/// Returns the median seen block number.
1668	fn median_seen(&self) -> Option<NumberFor<B>> {
1669		let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1670
1671		if best_seens.is_empty() {
1672			None
1673		} else {
1674			let middle = best_seens.len() / 2;
1675
1676			// Not the "perfect median" when we have an even number of peers.
1677			Some(*best_seens.select_nth_unstable(middle).1)
1678		}
1679	}
1680
1681	fn skip_execution(&self) -> bool {
1682		match self.mode {
1683			ChainSyncMode::Full => false,
1684			ChainSyncMode::LightState { .. } => true,
1685		}
1686	}
1687
1688	fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1689		let orig_len = new_blocks.len();
1690		new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1691		if new_blocks.len() != orig_len {
1692			debug!(
1693				target: LOG_TARGET,
1694				"Ignoring {} blocks that are already queued",
1695				orig_len - new_blocks.len(),
1696			);
1697		}
1698
1699		let origin = if gap {
1700			// Gap sync: filling historical blocks after warp sync
1701			BlockOrigin::GapSync
1702		} else if !self.status().state.is_major_syncing() {
1703			// Normal operation: receiving new blocks
1704			BlockOrigin::NetworkBroadcast
1705		} else {
1706			// Initial sync: catching up with the chain
1707			BlockOrigin::NetworkInitialSync
1708		};
1709
1710		if let Some((h, n)) = new_blocks
1711			.last()
1712			.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1713		{
1714			trace!(
1715				target: LOG_TARGET,
1716				"Accepted {} blocks ({:?}) with origin {:?}",
1717				new_blocks.len(),
1718				h,
1719				origin,
1720			);
1721			self.on_block_queued(h, n)
1722		}
1723		self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1724		if let Some(metrics) = &self.metrics {
1725			metrics
1726				.queued_blocks
1727				.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1728		}
1729
1730		self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1731	}
1732
1733	fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1734		if let Some(peer) = self.peers.get_mut(peer_id) {
1735			peer.update_common_number(new_common);
1736		}
1737	}
1738
1739	/// Called when a block has been queued for import.
1740	///
1741	/// Updates our internal state for best queued block and then goes
1742	/// through all peers to update our view of their state as well.
1743	fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1744		if self.fork_targets.remove(hash).is_some() {
1745			if let Some(metrics) = &self.metrics {
1746				metrics.fork_targets.dec();
1747			}
1748			trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1749		}
1750		if let Some(gap_sync) = &mut self.gap_sync {
1751			if number > gap_sync.best_queued_number && number <= gap_sync.target {
1752				gap_sync.best_queued_number = number;
1753			}
1754		}
1755		if number > self.best_queued_number {
1756			self.best_queued_number = number;
1757			self.best_queued_hash = *hash;
1758			// Update common blocks
1759			for (n, peer) in self.peers.iter_mut() {
1760				if let PeerSyncState::AncestorSearch { .. } = peer.state {
1761					// Wait for ancestry search to complete first.
1762					continue;
1763				}
1764				let new_common_number =
1765					if peer.best_number >= number { number } else { peer.best_number };
1766				trace!(
1767					target: LOG_TARGET,
1768					"Updating peer {} info, ours={}, common={}->{}, their best={}",
1769					n,
1770					number,
1771					peer.common_number,
1772					new_common_number,
1773					peer.best_number,
1774				);
1775				peer.common_number = new_common_number;
1776			}
1777		}
1778		self.allowed_requests.set_all();
1779	}
1780
1781	/// Restart the sync process. This will reset all pending block requests and return an iterator
1782	/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
1783	/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
1784	fn restart(&mut self) {
1785		self.blocks.clear();
1786		if let Err(e) = self.reset_sync_start_point() {
1787			warn!(target: LOG_TARGET, "💔  Unable to restart sync: {e}");
1788		}
1789		self.allowed_requests.set_all();
1790		debug!(
1791			target: LOG_TARGET,
1792			"Restarted with {} ({})",
1793			self.best_queued_number,
1794			self.best_queued_hash,
1795		);
1796		let old_peers = std::mem::take(&mut self.peers);
1797
1798		old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1799			match peer_sync.state {
1800				PeerSyncState::Available => {
1801					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1802				},
1803				PeerSyncState::AncestorSearch { .. }
1804				| PeerSyncState::DownloadingNew(_)
1805				| PeerSyncState::DownloadingStale(_)
1806				| PeerSyncState::DownloadingGap(_)
1807				| PeerSyncState::DownloadingState => {
1808					// Cancel a request first, as `add_peer` may generate a new request.
1809					self.actions
1810						.push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1811					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1812				},
1813				PeerSyncState::DownloadingJustification(_) => {
1814					// Peers that were downloading justifications
1815					// should be kept in that state.
1816					// We make sure our common number is at least something we have.
1817					trace!(
1818						target: LOG_TARGET,
1819						"Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1820						peer_id,
1821						peer_sync.common_number,
1822						self.best_queued_number,
1823					);
1824					peer_sync.common_number = self.best_queued_number;
1825					self.peers.insert(peer_id, peer_sync);
1826				},
1827			}
1828		});
1829	}
1830
1831	/// Find a block to start sync from. If we sync with state, that's the latest block we have
1832	/// state for.
1833	fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1834		let info = self.client.info();
1835		debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1836
1837		if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1838			warn!(
1839				target: LOG_TARGET,
1840				"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1841			);
1842			self.mode = ChainSyncMode::Full;
1843		}
1844
1845		self.import_existing = false;
1846		self.best_queued_hash = info.best_hash;
1847		self.best_queued_number = info.best_number;
1848
1849		if self.mode == ChainSyncMode::Full
1850			&& self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1851		{
1852			self.import_existing = true;
1853			// Latest state is missing, start with the last finalized state or genesis instead.
1854			if let Some((hash, number)) = info.finalized_state {
1855				debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1856				self.best_queued_hash = hash;
1857				self.best_queued_number = number;
1858			} else {
1859				debug!(target: LOG_TARGET, "Restarting from genesis");
1860				self.best_queued_hash = Default::default();
1861				self.best_queued_number = Zero::zero();
1862			}
1863		}
1864
1865		if let Some(BlockGap { start, end, .. }) = info.block_gap {
1866			let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1867			debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1868			self.gap_sync = Some(GapSync {
1869				best_queued_number: start - One::one(),
1870				target: end,
1871				blocks: BlockCollection::new(),
1872				stats: GapSyncStats::new(),
1873			});
1874		}
1875		trace!(
1876			target: LOG_TARGET,
1877			"Restarted sync at #{} ({:?})",
1878			self.best_queued_number,
1879			self.best_queued_hash,
1880		);
1881		Ok(())
1882	}
1883
1884	/// What is the status of the block corresponding to the given hash?
1885	fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1886		if self.queue_blocks.contains(hash) {
1887			return Ok(BlockStatus::Queued);
1888		}
1889		self.client.block_status(*hash)
1890	}
1891
1892	/// Is the block corresponding to the given hash known?
1893	fn is_known(&self, hash: &B::Hash) -> bool {
1894		self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1895	}
1896
1897	/// Is any peer downloading the given hash?
1898	fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1899		self.peers
1900			.iter()
1901			.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1902	}
1903
1904	/// Get the set of downloaded blocks that are ready to be queued for import.
1905	fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1906		self.blocks
1907			.ready_blocks(self.best_queued_number + One::one())
1908			.into_iter()
1909			.map(|block_data| {
1910				let justifications = block_data
1911					.block
1912					.justifications
1913					.or_else(|| legacy_justification_mapping(block_data.block.justification));
1914				IncomingBlock {
1915					hash: block_data.block.hash,
1916					header: block_data.block.header,
1917					body: block_data.block.body,
1918					indexed_body: block_data.block.indexed_body,
1919					justifications,
1920					origin: block_data.origin.map(Into::into),
1921					allow_missing_state: true,
1922					import_existing: self.import_existing,
1923					skip_execution: self.skip_execution(),
1924					state: None,
1925				}
1926			})
1927			.collect()
1928	}
1929
1930	/// Get justification requests scheduled by sync to be sent out.
1931	fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1932		let peers = &mut self.peers;
1933		let mut matcher = self.extra_justifications.matcher();
1934		std::iter::from_fn(move || {
1935			if let Some((peer, request)) = matcher.next(peers) {
1936				peers
1937					.get_mut(&peer)
1938					.expect(
1939						"`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1940					)
1941					.state = PeerSyncState::DownloadingJustification(request.0);
1942				let req = BlockRequest::<B> {
1943					id: 0,
1944					fields: BlockAttributes::JUSTIFICATION,
1945					from: FromBlock::Hash(request.0),
1946					direction: Direction::Ascending,
1947					max: Some(1),
1948				};
1949				Some((peer, req))
1950			} else {
1951				None
1952			}
1953		})
1954		.collect()
1955	}
1956
1957	/// Get block requests scheduled by sync to be sent out.
1958	fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1959		if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1960			return Vec::new();
1961		}
1962
1963		if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1964			trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1965			return Vec::new();
1966		}
1967		let is_major_syncing = self.status().state.is_major_syncing();
1968		let mode = self.mode;
1969		let is_archive = self.archive_blocks;
1970		let blocks = &mut self.blocks;
1971		let fork_targets = &mut self.fork_targets;
1972		let last_finalized =
1973			std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1974		let best_queued = self.best_queued_number;
1975		let client = &self.client;
1976		let queue_blocks = &self.queue_blocks;
1977		let allowed_requests = self.allowed_requests.clone();
1978		let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1979		let max_blocks_per_request = self.max_blocks_per_request;
1980		let gap_sync = &mut self.gap_sync;
1981		let disconnected_peers = &mut self.disconnected_peers;
1982		let metrics = self.metrics.as_ref();
1983		let requests = self
1984			.peers
1985			.iter_mut()
1986			.filter_map(move |(&id, peer)| {
1987				if !peer.state.is_available()
1988					|| !allowed_requests.contains(&id)
1989					|| !disconnected_peers.is_peer_available(&id)
1990				{
1991					return None;
1992				}
1993
1994				// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from
1995				// the common number, the peer best number is higher than our best queued and the
1996				// common number is smaller than the last finalized block number, we should do an
1997				// ancestor search to find a better common block. If the queue is full we wait till
1998				// all blocks are imported though.
1999				if best_queued.saturating_sub(peer.common_number)
2000					> MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
2001					&& best_queued < peer.best_number
2002					&& peer.common_number < last_finalized
2003					&& queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
2004				{
2005					trace!(
2006						target: LOG_TARGET,
2007						"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
2008						id,
2009						peer.common_number,
2010						best_queued,
2011					);
2012					let current = std::cmp::min(peer.best_number, best_queued);
2013					peer.state = PeerSyncState::AncestorSearch {
2014						current,
2015						start: best_queued,
2016						state: AncestorSearchState::ExponentialBackoff(One::one()),
2017					};
2018					Some((id, ancestry_request::<B>(current)))
2019				} else if let Some((range, req)) = peer_block_request(
2020					&id,
2021					peer,
2022					blocks,
2023					mode.required_block_attributes(false, is_archive),
2024					max_parallel,
2025					max_blocks_per_request,
2026					last_finalized,
2027					best_queued,
2028				) {
2029					peer.state = PeerSyncState::DownloadingNew(range.start);
2030					trace!(
2031						target: LOG_TARGET,
2032						"New block request for {}, (best:{}, common:{}) {:?}",
2033						id,
2034						peer.best_number,
2035						peer.common_number,
2036						req,
2037					);
2038					Some((id, req))
2039				} else if let Some((hash, req)) = fork_sync_request(
2040					&id,
2041					fork_targets,
2042					best_queued,
2043					last_finalized,
2044					mode.required_block_attributes(false, is_archive),
2045					|hash| {
2046						if queue_blocks.contains(hash) {
2047							BlockStatus::Queued
2048						} else {
2049							client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
2050						}
2051					},
2052					max_blocks_per_request,
2053					metrics,
2054				) {
2055					trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
2056					peer.state = PeerSyncState::DownloadingStale(hash);
2057					Some((id, req))
2058				} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
2059					peer_gap_block_request(
2060						&id,
2061						peer,
2062						&mut sync.blocks,
2063						mode.required_block_attributes(true, is_archive),
2064						sync.target,
2065						sync.best_queued_number,
2066						max_blocks_per_request,
2067					)
2068				}) {
2069					peer.state = PeerSyncState::DownloadingGap(range.start);
2070					trace!(
2071						target: LOG_TARGET,
2072						"New gap block request for {}, (best:{}, common:{}) {:?}",
2073						id,
2074						peer.best_number,
2075						peer.common_number,
2076						req,
2077					);
2078					Some((id, req))
2079				} else {
2080					None
2081				}
2082			})
2083			.collect::<Vec<_>>();
2084
2085		// Clear the allowed_requests state when sending new block requests
2086		// to prevent multiple inflight block requests from being issued.
2087		if !requests.is_empty() {
2088			self.allowed_requests.take();
2089		}
2090
2091		requests
2092	}
2093
2094	/// Get a state request scheduled by sync to be sent out (if any).
2095	fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
2096		if self.allowed_requests.is_empty() {
2097			return None;
2098		}
2099		if self.state_sync.is_some()
2100			&& self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
2101		{
2102			// Only one pending state request is allowed.
2103			return None;
2104		}
2105		if let Some(sync) = &self.state_sync {
2106			if sync.is_complete() {
2107				return None;
2108			}
2109
2110			for (id, peer) in self.peers.iter_mut() {
2111				if peer.state.is_available()
2112					&& peer.common_number >= sync.target_number()
2113					&& self.disconnected_peers.is_peer_available(&id)
2114				{
2115					peer.state = PeerSyncState::DownloadingState;
2116					let request = sync.next_request();
2117					trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
2118					self.allowed_requests.clear();
2119					return Some((*id, request));
2120				}
2121			}
2122		}
2123		None
2124	}
2125
2126	#[must_use]
2127	fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
2128		let response = match StateResponse::decode(response) {
2129			Ok(response) => response,
2130			Err(error) => {
2131				debug!(
2132					target: LOG_TARGET,
2133					"Failed to decode state response from peer {peer_id:?}: {error:?}.",
2134				);
2135
2136				return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2137			},
2138		};
2139
2140		if let Some(peer) = self.peers.get_mut(peer_id) {
2141			if let PeerSyncState::DownloadingState = peer.state {
2142				peer.state = PeerSyncState::Available;
2143				self.allowed_requests.set_all();
2144			}
2145		}
2146		let import_result = if let Some(sync) = &mut self.state_sync {
2147			debug!(
2148				target: LOG_TARGET,
2149				"Importing state data from {} with {} keys, {} proof nodes.",
2150				peer_id,
2151				response.entries.len(),
2152				response.proof.len(),
2153			);
2154			sync.import(response)
2155		} else {
2156			debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2157			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2158		};
2159
2160		match import_result {
2161			ImportResult::Import(hash, header, state, body, justifications) => {
2162				let origin = BlockOrigin::NetworkInitialSync;
2163				let block = IncomingBlock {
2164					hash,
2165					header: Some(header),
2166					body,
2167					indexed_body: None,
2168					justifications,
2169					origin: None,
2170					allow_missing_state: true,
2171					import_existing: true,
2172					skip_execution: self.skip_execution(),
2173					state: Some(state),
2174				};
2175				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2176				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2177				Ok(())
2178			},
2179			ImportResult::Continue => Ok(()),
2180			ImportResult::BadResponse => {
2181				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2182				Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2183			},
2184		}
2185	}
2186
2187	fn attempt_state_sync(
2188		&mut self,
2189		finalized_hash: B::Hash,
2190		finalized_number: NumberFor<B>,
2191		skip_proofs: bool,
2192	) {
2193		let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2194		heads.sort();
2195		let median = heads[heads.len() / 2];
2196		if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2197			if let Ok(Some(header)) = self.client.header(finalized_hash) {
2198				log::debug!(
2199					target: LOG_TARGET,
2200					"Starting state sync for #{finalized_number} ({finalized_hash})",
2201				);
2202				self.state_sync =
2203					Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2204				self.allowed_requests.set_all();
2205			} else {
2206				log::error!(
2207					target: LOG_TARGET,
2208					"Failed to start state sync: header for finalized block \
2209					  #{finalized_number} ({finalized_hash}) is not available",
2210				);
2211				debug_assert!(false);
2212			}
2213		}
2214	}
2215
2216	/// A version of `actions()` that doesn't schedule extra requests. For testing only.
2217	#[cfg(test)]
2218	#[must_use]
2219	fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2220		std::mem::take(&mut self.actions).into_iter()
2221	}
2222}
2223
2224// This is purely during a backwards compatible transitionary period and should be removed
2225// once we can assume all nodes can send and receive multiple Justifications
2226// The ID tag is hardcoded here to avoid depending on the GRANDPA crate.
2227// See: https://github.com/paritytech/substrate/issues/8172
2228fn legacy_justification_mapping(
2229	justification: Option<EncodedJustification>,
2230) -> Option<Justifications> {
2231	justification.map(|just| (*b"FRNK", just).into())
2232}
2233
2234/// Request the ancestry for a block. Sends a request for header and justification for the given
2235/// block number. Used during ancestry search.
2236fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2237	BlockRequest::<B> {
2238		id: 0,
2239		fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2240		from: FromBlock::Number(block),
2241		direction: Direction::Ascending,
2242		max: Some(1),
2243	}
2244}
2245
2246/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using
2247/// to try to find an ancestor block
2248#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2249pub(crate) enum AncestorSearchState<B: BlockT> {
2250	/// Use exponential backoff to find an ancestor, then switch to binary search.
2251	/// We keep track of the exponent.
2252	ExponentialBackoff(NumberFor<B>),
2253	/// Using binary search to find the best ancestor.
2254	/// We keep track of left and right bounds.
2255	BinarySearch(NumberFor<B>, NumberFor<B>),
2256}
2257
2258/// This function handles the ancestor search strategy used. The goal is to find a common point
2259/// that both our chains agree on that is as close to the tip as possible.
2260/// The way this works is we first have an exponential backoff strategy, where we try to step
2261/// forward until we find a block hash mismatch. The size of the step doubles each step we take.
2262///
2263/// When we've found a block hash mismatch we then fall back to a binary search between the two
2264/// last known points to find the common block closest to the tip.
2265fn handle_ancestor_search_state<B: BlockT>(
2266	state: &AncestorSearchState<B>,
2267	curr_block_num: NumberFor<B>,
2268	block_hash_match: bool,
2269) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2270	let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2271	match state {
2272		AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2273			let next_distance_to_tip = *next_distance_to_tip;
2274			if block_hash_match && next_distance_to_tip == One::one() {
2275				// We found the ancestor in the first step so there is no need to execute binary
2276				// search.
2277				return None;
2278			}
2279			if block_hash_match {
2280				let left = curr_block_num;
2281				let right = left + next_distance_to_tip / two;
2282				let middle = left + (right - left) / two;
2283				Some((AncestorSearchState::BinarySearch(left, right), middle))
2284			} else {
2285				let next_block_num =
2286					curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2287				let next_distance_to_tip = next_distance_to_tip * two;
2288				Some((
2289					AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2290					next_block_num,
2291				))
2292			}
2293		},
2294		AncestorSearchState::BinarySearch(mut left, mut right) => {
2295			if left >= curr_block_num {
2296				return None;
2297			}
2298			if block_hash_match {
2299				left = curr_block_num;
2300			} else {
2301				right = curr_block_num;
2302			}
2303			assert!(right >= left);
2304			let middle = left + (right - left) / two;
2305			if middle == curr_block_num {
2306				None
2307			} else {
2308				Some((AncestorSearchState::BinarySearch(left, right), middle))
2309			}
2310		},
2311	}
2312}
2313
2314/// Get a new block request for the peer if any.
2315fn peer_block_request<B: BlockT>(
2316	id: &PeerId,
2317	peer: &PeerSync<B>,
2318	blocks: &mut BlockCollection<B>,
2319	attrs: BlockAttributes,
2320	max_parallel_downloads: u32,
2321	max_blocks_per_request: u32,
2322	finalized: NumberFor<B>,
2323	best_num: NumberFor<B>,
2324) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2325	if best_num >= peer.best_number {
2326		// Will be downloaded as alternative fork instead.
2327		return None;
2328	} else if peer.common_number < finalized {
2329		trace!(
2330			target: LOG_TARGET,
2331			"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2332			id, peer.common_number, finalized, peer.best_number, best_num,
2333		);
2334	}
2335	let range = blocks.needed_blocks(
2336		*id,
2337		max_blocks_per_request,
2338		peer.best_number,
2339		peer.common_number,
2340		max_parallel_downloads,
2341		MAX_DOWNLOAD_AHEAD,
2342	)?;
2343
2344	// The end is not part of the range.
2345	let last = range.end.saturating_sub(One::one());
2346
2347	let from = if peer.best_number == last {
2348		FromBlock::Hash(peer.best_hash)
2349	} else {
2350		FromBlock::Number(last)
2351	};
2352
2353	let request = BlockRequest::<B> {
2354		id: 0,
2355		fields: attrs,
2356		from,
2357		direction: Direction::Descending,
2358		max: Some((range.end - range.start).saturated_into::<u32>()),
2359	};
2360
2361	Some((range, request))
2362}
2363
2364/// Get a new block request for the peer if any.
2365fn peer_gap_block_request<B: BlockT>(
2366	id: &PeerId,
2367	peer: &PeerSync<B>,
2368	blocks: &mut BlockCollection<B>,
2369	attrs: BlockAttributes,
2370	target: NumberFor<B>,
2371	common_number: NumberFor<B>,
2372	max_blocks_per_request: u32,
2373) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2374	let range = blocks.needed_blocks(
2375		*id,
2376		max_blocks_per_request,
2377		std::cmp::min(peer.best_number, target),
2378		common_number,
2379		1,
2380		MAX_DOWNLOAD_AHEAD,
2381	)?;
2382
2383	// The end is not part of the range.
2384	let last = range.end.saturating_sub(One::one());
2385	let from = FromBlock::Number(last);
2386
2387	let request = BlockRequest::<B> {
2388		id: 0,
2389		fields: attrs,
2390		from,
2391		direction: Direction::Descending,
2392		max: Some((range.end - range.start).saturated_into::<u32>()),
2393	};
2394	Some((range, request))
2395}
2396
2397/// Get pending fork sync targets for a peer.
2398fn fork_sync_request<B: BlockT>(
2399	id: &PeerId,
2400	fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2401	best_num: NumberFor<B>,
2402	finalized: NumberFor<B>,
2403	attributes: BlockAttributes,
2404	check_block: impl Fn(&B::Hash) -> BlockStatus,
2405	max_blocks_per_request: u32,
2406	metrics: Option<&Metrics>,
2407) -> Option<(B::Hash, BlockRequest<B>)> {
2408	fork_targets.retain(|hash, r| {
2409		if r.number <= finalized {
2410			trace!(
2411				target: LOG_TARGET,
2412				"Removed expired fork sync request {:?} (#{})",
2413				hash,
2414				r.number,
2415			);
2416			return false;
2417		}
2418		if check_block(hash) != BlockStatus::Unknown {
2419			trace!(
2420				target: LOG_TARGET,
2421				"Removed obsolete fork sync request {:?} (#{})",
2422				hash,
2423				r.number,
2424			);
2425			return false;
2426		}
2427		true
2428	});
2429	if let Some(metrics) = metrics {
2430		metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2431	}
2432	for (hash, r) in fork_targets {
2433		if !r.peers.contains(&id) {
2434			continue;
2435		}
2436		// Download the fork only if it is behind or not too far ahead our tip of the chain
2437		// Otherwise it should be downloaded in full sync mode.
2438		if r.number <= best_num
2439			|| (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2440		{
2441			let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2442			let count = if parent_status == BlockStatus::Unknown {
2443				(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
2444			} else {
2445				// request only single block
2446				1
2447			};
2448			trace!(
2449				target: LOG_TARGET,
2450				"Downloading requested fork {hash:?} from {id}, {count} blocks",
2451			);
2452			return Some((
2453				*hash,
2454				BlockRequest::<B> {
2455					id: 0,
2456					fields: attributes,
2457					from: FromBlock::Hash(*hash),
2458					direction: Direction::Descending,
2459					max: Some(count),
2460				},
2461			));
2462		} else {
2463			trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2464		}
2465	}
2466	None
2467}
2468
2469/// Returns `true` if the given `block` is a descendent of `base`.
2470fn is_descendent_of<Block, T>(
2471	client: &T,
2472	base: &Block::Hash,
2473	block: &Block::Hash,
2474) -> soil_client::blockchain::Result<bool>
2475where
2476	Block: BlockT,
2477	T: HeaderMetadata<Block, Error = soil_client::blockchain::Error> + ?Sized,
2478{
2479	if base == block {
2480		return Ok(false);
2481	}
2482
2483	let ancestor = soil_client::blockchain::lowest_common_ancestor(client, *block, *base)?;
2484
2485	Ok(ancestor.hash == *base)
2486}
2487
2488/// Validate that the given `blocks` are correct.
2489/// Returns the number of the first block in the sequence.
2490///
2491/// It is expected that `blocks` are in ascending order.
2492pub fn validate_blocks<Block: BlockT>(
2493	blocks: &Vec<BlockData<Block>>,
2494	peer_id: &PeerId,
2495	request: Option<BlockRequest<Block>>,
2496) -> Result<Option<NumberFor<Block>>, BadPeer> {
2497	if let Some(request) = request {
2498		if Some(blocks.len() as _) > request.max {
2499			debug!(
2500				target: LOG_TARGET,
2501				"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2502				peer_id,
2503				request.max,
2504				blocks.len(),
2505			);
2506
2507			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2508		}
2509
2510		let block_header =
2511			if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2512				.and_then(|b| b.header.as_ref());
2513
2514		let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2515			FromBlock::Hash(hash) => h.hash() == hash,
2516			FromBlock::Number(n) => h.number() == &n,
2517		});
2518
2519		if !expected_block {
2520			debug!(
2521				target: LOG_TARGET,
2522				"Received block that was not requested. Requested {:?}, got {:?}.",
2523				request.from,
2524				block_header,
2525			);
2526
2527			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2528		}
2529
2530		if request.fields.contains(BlockAttributes::HEADER)
2531			&& blocks.iter().any(|b| b.header.is_none())
2532		{
2533			trace!(
2534				target: LOG_TARGET,
2535				"Missing requested header for a block in response from {peer_id}.",
2536			);
2537
2538			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2539		}
2540
2541		if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2542		{
2543			trace!(
2544				target: LOG_TARGET,
2545				"Missing requested body for a block in response from {peer_id}.",
2546			);
2547
2548			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2549		}
2550	}
2551
2552	for b in blocks {
2553		if let Some(header) = &b.header {
2554			let hash = header.hash();
2555			if hash != b.hash {
2556				debug!(
2557					target: LOG_TARGET,
2558					"Bad header received from {}. Expected hash {:?}, got {:?}",
2559					peer_id,
2560					b.hash,
2561					hash,
2562				);
2563				return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2564			}
2565		}
2566	}
2567
2568	Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2569}