Skip to main content

sc_network_sync/strategy/
chain_sync.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Contains the state of the chain synchronization process
20//!
21//! At any given point in time, a running node tries as much as possible to be at the head of the
22//! chain. This module handles the logic of which blocks to request from remotes, and processing
23//! responses. It yields blocks to check and potentially move to the database.
24//!
25//! # Usage
26//!
27//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
28//! the network, or whenever a block has been successfully verified, call the appropriate method in
29//! order to update it.
30
31use crate::{
32	block_relay_protocol::{BlockDownloader, BlockResponseError},
33	blocks::BlockCollection,
34	justification_requests::ExtraRequests,
35	schema::v1::{StateRequest, StateResponse},
36	service::network::NetworkServiceHandle,
37	strategy::{
38		disconnected_peers::DisconnectedPeers,
39		state_sync::{ImportResult, StateSync, StateSyncProvider},
40		warp::{WarpSyncPhase, WarpSyncProgress},
41		StrategyKey, SyncingAction, SyncingStrategy,
42	},
43	types::{BadPeer, SyncState, SyncStatus},
44	LOG_TARGET,
45};
46
47use codec::Encode;
48use futures::{channel::oneshot, FutureExt};
49use log::{debug, error, info, trace, warn};
50use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
51use prost::Message;
52use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
53use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
54use sc_network::{IfDisconnected, ProtocolName};
55use sc_network_common::sync::message::{
56	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
57};
58use sc_network_types::PeerId;
59use sp_arithmetic::traits::Saturating;
60use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
61use sp_consensus::{BlockOrigin, BlockStatus};
62use sp_runtime::{
63	traits::{
64		Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
65	},
66	EncodedJustification, Justifications,
67};
68
69use std::{
70	any::Any,
71	collections::{HashMap, HashSet},
72	fmt,
73	ops::{AddAssign, Range},
74	sync::Arc,
75};
76
77#[cfg(test)]
78mod test;
79
80/// Maximum blocks to store in the import queue.
81const MAX_IMPORTING_BLOCKS: usize = 2048;
82
83/// Maximum blocks to download ahead of any gap.
84const MAX_DOWNLOAD_AHEAD: u32 = 2048;
85
86/// Maximum blocks to look backwards. The gap is the difference between the highest block and the
87/// common block of a node.
88const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
89
90/// Pick the state to sync as the latest finalized number minus this.
91const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
92
93/// We use a heuristic that with a high likelihood, by the time
94/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same
95/// chain as (or at least closer to) the peer so we want to delay
96/// the ancestor search to not waste time doing that when we are
97/// so far behind.
98const MAJOR_SYNC_BLOCKS: u8 = 5;
99
100mod rep {
101	use sc_network::ReputationChange as Rep;
102	/// Reputation change when a peer sent us a message that led to a
103	/// database read error.
104	pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
105
106	/// Reputation change when a peer sent us a status message with a different
107	/// genesis than us.
108	pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
109
110	/// Reputation change for peers which send us a block with an incomplete header.
111	pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
112
113	/// Reputation change for peers which send us a block which we fail to verify.
114	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
115
116	/// Reputation change for peers which send us a known bad block.
117	pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
118
119	/// Peer did not provide us with advertised block data.
120	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
121
122	/// Reputation change for peers which send us non-requested block data.
123	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
124
125	/// Reputation change for peers which send us a block with bad justifications.
126	pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
127
128	/// Reputation change when a peer sent us invalid ancestry result.
129	pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
130
131	/// Peer response data does not have requested bits.
132	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
133
134	/// We received a message that failed to decode.
135	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
136}
137
138struct Metrics {
139	queued_blocks: Gauge<U64>,
140	fork_targets: Gauge<U64>,
141}
142
143impl Metrics {
144	fn register(r: &Registry) -> Result<Self, PrometheusError> {
145		Ok(Self {
146			queued_blocks: {
147				let g =
148					Gauge::new("substrate_sync_queued_blocks", "Number of blocks in import queue")?;
149				register(g, r)?
150			},
151			fork_targets: {
152				let g = Gauge::new("substrate_sync_fork_targets", "Number of fork sync targets")?;
153				register(g, r)?
154			},
155		})
156	}
157}
158
159#[derive(Debug, Clone)]
160enum AllowedRequests {
161	Some(HashSet<PeerId>),
162	All,
163}
164
165impl AllowedRequests {
166	fn add(&mut self, id: &PeerId) {
167		if let Self::Some(ref mut set) = self {
168			set.insert(*id);
169		}
170	}
171
172	fn take(&mut self) -> Self {
173		std::mem::take(self)
174	}
175
176	fn set_all(&mut self) {
177		*self = Self::All;
178	}
179
180	fn contains(&self, id: &PeerId) -> bool {
181		match self {
182			Self::Some(set) => set.contains(id),
183			Self::All => true,
184		}
185	}
186
187	fn is_empty(&self) -> bool {
188		match self {
189			Self::Some(set) => set.is_empty(),
190			Self::All => false,
191		}
192	}
193
194	fn clear(&mut self) {
195		std::mem::take(self);
196	}
197}
198
199impl Default for AllowedRequests {
200	fn default() -> Self {
201		Self::Some(HashSet::default())
202	}
203}
204
205/// Statistics for gap sync operations.
206#[derive(Debug, Default, Clone)]
207struct GapSyncStats {
208	/// Size of headers downloaded during gap sync
209	header_bytes: usize,
210	/// Size of bodies downloaded during gap sync
211	body_bytes: usize,
212	/// Size of justifications downloaded during gap sync
213	justification_bytes: usize,
214}
215
216impl GapSyncStats {
217	fn new() -> Self {
218		Self::default()
219	}
220
221	fn total_bytes(&self) -> usize {
222		self.header_bytes + self.body_bytes + self.justification_bytes
223	}
224
225	fn bytes_to_mib(bytes: usize) -> f64 {
226		bytes as f64 / (1024.0 * 1024.0)
227	}
228}
229
230impl fmt::Display for GapSyncStats {
231	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232		let total = self.total_bytes();
233		write!(
234			f,
235			"hdr: {} B ({:.2} MiB), body: {} B ({:.2} MiB), just: {} B ({:.2} MiB) | total: {} B ({:.2} MiB)",
236			self.header_bytes,
237			Self::bytes_to_mib(self.header_bytes),
238			self.body_bytes,
239			Self::bytes_to_mib(self.body_bytes),
240			self.justification_bytes,
241			Self::bytes_to_mib(self.justification_bytes),
242			total,
243			Self::bytes_to_mib(total),
244		)
245	}
246}
247
248impl AddAssign for GapSyncStats {
249	fn add_assign(&mut self, other: Self) {
250		self.header_bytes += other.header_bytes;
251		self.body_bytes += other.body_bytes;
252		self.justification_bytes += other.justification_bytes;
253	}
254}
255
256struct GapSync<B: BlockT> {
257	blocks: BlockCollection<B>,
258	best_queued_number: NumberFor<B>,
259	target: NumberFor<B>,
260	stats: GapSyncStats,
261}
262
263/// Sync operation mode.
264#[derive(Copy, Clone, Debug, Eq, PartialEq)]
265pub enum ChainSyncMode {
266	/// Full block download and verification.
267	Full,
268	/// Download blocks and the latest state.
269	LightState {
270		/// Skip state proof download and verification.
271		skip_proofs: bool,
272		/// Download indexed transactions for recent blocks.
273		storage_chain_mode: bool,
274	},
275}
276
277impl ChainSyncMode {
278	/// Returns the base block attributes required for this sync mode.
279	pub fn required_block_attributes(&self, is_gap: bool, is_archive: bool) -> BlockAttributes {
280		let attrs = match self {
281			ChainSyncMode::Full => {
282				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
283			},
284			ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
285				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
286			},
287			ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
288				BlockAttributes::HEADER |
289					BlockAttributes::JUSTIFICATION |
290					BlockAttributes::INDEXED_BODY
291			},
292		};
293		// Skip body requests for gap sync only if not in archive mode.
294		// Archive nodes need bodies to maintain complete block history.
295		if is_gap && !is_archive {
296			attrs & !BlockAttributes::BODY
297		} else {
298			attrs
299		}
300	}
301}
302
303/// All the data we have about a Peer that we are trying to sync with
304#[derive(Debug, Clone)]
305pub(crate) struct PeerSync<B: BlockT> {
306	/// Peer id of this peer.
307	pub peer_id: PeerId,
308	/// The common number is the block number that is a common point of
309	/// ancestry for both our chains (as far as we know).
310	pub common_number: NumberFor<B>,
311	/// The hash of the best block that we've seen for this peer.
312	pub best_hash: B::Hash,
313	/// The number of the best block that we've seen for this peer.
314	pub best_number: NumberFor<B>,
315	/// The state of syncing this peer is in for us, generally categories
316	/// into `Available` or "busy" with something as defined by `PeerSyncState`.
317	pub state: PeerSyncState<B>,
318}
319
320impl<B: BlockT> PeerSync<B> {
321	/// Update the `common_number` iff `new_common > common_number`.
322	fn update_common_number(&mut self, new_common: NumberFor<B>) {
323		if self.common_number < new_common {
324			trace!(
325				target: LOG_TARGET,
326				"Updating peer {} common number from={} => to={}.",
327				self.peer_id,
328				self.common_number,
329				new_common,
330			);
331			self.common_number = new_common;
332		}
333	}
334}
335
336struct ForkTarget<B: BlockT> {
337	number: NumberFor<B>,
338	parent_hash: Option<B::Hash>,
339	peers: HashSet<PeerId>,
340}
341
342/// The state of syncing between a Peer and ourselves.
343///
344/// Generally two categories, "busy" or `Available`. If busy, the enum
345/// defines what we are busy with.
346#[derive(Copy, Clone, Eq, PartialEq, Debug)]
347pub(crate) enum PeerSyncState<B: BlockT> {
348	/// Available for sync requests.
349	Available,
350	/// Searching for ancestors the Peer has in common with us.
351	AncestorSearch {
352		/// The best queued number when starting the ancestor search.
353		start: NumberFor<B>,
354		/// The current block that is being downloaded.
355		current: NumberFor<B>,
356		/// The state of the search.
357		state: AncestorSearchState<B>,
358	},
359	/// Actively downloading new blocks, starting from the given Number.
360	DownloadingNew(NumberFor<B>),
361	/// Downloading a stale block with given Hash. Stale means that it is a
362	/// block with a number that is lower than our best number. It might be
363	/// from a fork and not necessarily already imported.
364	DownloadingStale(B::Hash),
365	/// Downloading justification for given block hash.
366	DownloadingJustification(B::Hash),
367	/// Downloading state.
368	DownloadingState,
369	/// Actively downloading block history after warp sync.
370	DownloadingGap(NumberFor<B>),
371}
372
373impl<B: BlockT> PeerSyncState<B> {
374	pub fn is_available(&self) -> bool {
375		matches!(self, Self::Available)
376	}
377}
378
379/// The main data structure which contains all the state for a chains
380/// active syncing strategy.
381pub struct ChainSync<B: BlockT, Client> {
382	/// Chain client.
383	client: Arc<Client>,
384	/// The active peers that we are using to sync and their PeerSync status
385	peers: HashMap<PeerId, PeerSync<B>>,
386	disconnected_peers: DisconnectedPeers,
387	/// A `BlockCollection` of blocks that are being downloaded from peers
388	blocks: BlockCollection<B>,
389	/// The best block number in our queue of blocks to import
390	best_queued_number: NumberFor<B>,
391	/// The best block hash in our queue of blocks to import
392	best_queued_hash: B::Hash,
393	/// Current mode (full/light)
394	mode: ChainSyncMode,
395	/// Any extra justification requests.
396	extra_justifications: ExtraRequests<B>,
397	/// A set of hashes of blocks that are being downloaded or have been
398	/// downloaded and are queued for import.
399	queue_blocks: HashSet<B::Hash>,
400	/// A pending attempt to start the state sync.
401	///
402	/// The initiation of state sync may be deferred in cases where other conditions
403	/// are not yet met when the finalized block notification is received, such as
404	/// when `queue_blocks` is not empty or there are no peers. This field holds the
405	/// necessary information to attempt the state sync at a later point when
406	/// conditions are satisfied.
407	pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
408	/// Fork sync targets.
409	fork_targets: HashMap<B::Hash, ForkTarget<B>>,
410	/// A set of peers for which there might be potential block requests
411	allowed_requests: AllowedRequests,
412	/// Maximum number of peers to ask the same blocks in parallel.
413	max_parallel_downloads: u32,
414	/// Maximum blocks per request.
415	max_blocks_per_request: u32,
416	/// Protocol name used to send out state requests
417	state_request_protocol_name: ProtocolName,
418	/// Total number of downloaded blocks.
419	downloaded_blocks: usize,
420	/// State sync in progress, if any.
421	state_sync: Option<StateSync<B, Client>>,
422	/// Enable importing existing blocks. This is used after the state download to
423	/// catch up to the latest state while re-importing blocks.
424	import_existing: bool,
425	/// Block downloader
426	block_downloader: Arc<dyn BlockDownloader<B>>,
427	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
428	/// block history.
429	archive_blocks: bool,
430	/// Gap download process.
431	gap_sync: Option<GapSync<B>>,
432	/// Pending actions.
433	actions: Vec<SyncingAction<B>>,
434	/// Prometheus metrics.
435	metrics: Option<Metrics>,
436}
437
438impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
439where
440	B: BlockT,
441	Client: HeaderBackend<B>
442		+ BlockBackend<B>
443		+ HeaderMetadata<B, Error = sp_blockchain::Error>
444		+ ProofProvider<B>
445		+ Send
446		+ Sync
447		+ 'static,
448{
449	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
450		match self.add_peer_inner(peer_id, best_hash, best_number) {
451			Ok(Some(request)) => {
452				let action = self.create_block_request_action(peer_id, request);
453				self.actions.push(action);
454			},
455			Ok(None) => {},
456			Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
457		}
458	}
459
460	fn remove_peer(&mut self, peer_id: &PeerId) {
461		self.blocks.clear_peer_download(peer_id);
462		if let Some(gap_sync) = &mut self.gap_sync {
463			gap_sync.blocks.clear_peer_download(peer_id)
464		}
465
466		if let Some(state) = self.peers.remove(peer_id) {
467			if !state.state.is_available() {
468				if let Some(bad_peer) =
469					self.disconnected_peers.on_disconnect_during_request(*peer_id)
470				{
471					self.actions.push(SyncingAction::DropPeer(bad_peer));
472				}
473			}
474		}
475
476		self.extra_justifications.peer_disconnected(peer_id);
477		self.allowed_requests.set_all();
478		self.fork_targets.retain(|_, target| {
479			target.peers.remove(peer_id);
480			!target.peers.is_empty()
481		});
482		if let Some(metrics) = &self.metrics {
483			metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
484		}
485
486		let blocks = self.ready_blocks();
487
488		if !blocks.is_empty() {
489			self.validate_and_queue_blocks(blocks, false);
490		}
491	}
492
493	fn on_validated_block_announce(
494		&mut self,
495		is_best: bool,
496		peer_id: PeerId,
497		announce: &BlockAnnounce<B::Header>,
498	) -> Option<(B::Hash, NumberFor<B>)> {
499		let number = *announce.header.number();
500		let hash = announce.header.hash();
501		let parent_status =
502			self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
503		let known_parent = parent_status != BlockStatus::Unknown;
504		let ancient_parent = parent_status == BlockStatus::InChainPruned;
505
506		let known = self.is_known(&hash);
507		let is_major_syncing = self.is_major_syncing();
508		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
509			peer
510		} else {
511			error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
512			return Some((hash, number));
513		};
514
515		if let PeerSyncState::AncestorSearch { .. } = peer.state {
516			trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
517			return None;
518		}
519
520		// The node is continuing a known fork if either the block itself is known, the
521		// parent is known or the block references the previously announced `best_hash`.
522		let continues_known_fork =
523			known || known_parent || announce.header.parent_hash() == &peer.best_hash;
524
525		let peer_info = is_best.then(|| {
526			// update their best block
527			peer.best_number = number;
528			peer.best_hash = hash;
529
530			(hash, number)
531		});
532
533		// If the announced block is the best they have and is not ahead of us, our common number
534		// is either one further ahead or it's the one they just announced, if we know about it.
535		if is_best {
536			let best_queued_number = self.best_queued_number;
537
538			if known && best_queued_number >= number {
539				peer.update_common_number(number);
540			} else if announce.header.parent_hash() == &self.best_queued_hash ||
541				known_parent && best_queued_number >= number
542			{
543				peer.update_common_number(number.saturating_sub(One::one()));
544			}
545
546			// If this announced block isn't following any known fork, we have to start an
547			// ancestor search to find out our real common block. However, we skip this during
548			// major sync to avoid pulling peers out of the download pool.
549			if !continues_known_fork && !is_major_syncing {
550				let current = number.min(best_queued_number);
551				peer.common_number = peer.common_number.min(self.client.info().finalized_number);
552				peer.state = PeerSyncState::AncestorSearch {
553					current,
554					start: best_queued_number,
555					state: AncestorSearchState::ExponentialBackoff(One::one()),
556				};
557
558				let request = ancestry_request::<B>(current);
559				let action = self.create_block_request_action(peer_id, request);
560				self.actions.push(action);
561
562				return peer_info;
563			}
564		}
565		self.allowed_requests.add(&peer_id);
566
567		// known block case
568		if known || self.is_already_downloading(&hash) {
569			trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
570			if let Some(target) = self.fork_targets.get_mut(&hash) {
571				target.peers.insert(peer_id);
572			}
573			return peer_info;
574		}
575
576		if ancient_parent {
577			trace!(
578				target: LOG_TARGET,
579				"Ignored ancient block announced from {}: {} {:?}",
580				peer_id,
581				hash,
582				announce.header,
583			);
584			return peer_info;
585		}
586
587		if self.status().state == SyncState::Idle {
588			trace!(
589				target: LOG_TARGET,
590				"Added sync target for block announced from {}: {} {:?}",
591				peer_id,
592				hash,
593				announce.summary(),
594			);
595			self.fork_targets
596				.entry(hash)
597				.or_insert_with(|| {
598					if let Some(metrics) = &self.metrics {
599						metrics.fork_targets.inc();
600					}
601
602					ForkTarget {
603						number,
604						parent_hash: Some(*announce.header.parent_hash()),
605						peers: Default::default(),
606					}
607				})
608				.peers
609				.insert(peer_id);
610		}
611
612		peer_info
613	}
614
615	// The implementation is similar to `on_validated_block_announce` with unknown parent hash.
616	fn set_sync_fork_request(
617		&mut self,
618		mut peers: Vec<PeerId>,
619		hash: &B::Hash,
620		number: NumberFor<B>,
621	) {
622		if peers.is_empty() {
623			peers = self
624				.peers
625				.iter()
626				// Only request blocks from peers who are ahead or on a par.
627				.filter(|(_, peer)| peer.best_number >= number)
628				.map(|(id, _)| *id)
629				.collect();
630
631			debug!(
632				target: LOG_TARGET,
633				"Explicit sync request for block {hash:?} with no peers specified. \
634				Syncing from these peers {peers:?} instead.",
635			);
636		} else {
637			debug!(
638				target: LOG_TARGET,
639				"Explicit sync request for block {hash:?} with {peers:?}",
640			);
641		}
642
643		if self.is_known(hash) {
644			debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
645			return;
646		}
647
648		trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
649		for peer_id in &peers {
650			if let Some(peer) = self.peers.get_mut(peer_id) {
651				if let PeerSyncState::AncestorSearch { .. } = peer.state {
652					continue;
653				}
654
655				if number > peer.best_number {
656					peer.best_number = number;
657					peer.best_hash = *hash;
658				}
659				self.allowed_requests.add(peer_id);
660			}
661		}
662
663		self.fork_targets
664			.entry(*hash)
665			.or_insert_with(|| {
666				if let Some(metrics) = &self.metrics {
667					metrics.fork_targets.inc();
668				}
669
670				ForkTarget { number, peers: Default::default(), parent_hash: None }
671			})
672			.peers
673			.extend(peers);
674	}
675
676	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
677		let client = &self.client;
678		self.extra_justifications
679			.schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
680	}
681
682	fn clear_justification_requests(&mut self) {
683		self.extra_justifications.reset();
684	}
685
686	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
687		let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
688		self.extra_justifications
689			.try_finalize_root((hash, number), finalization_result, true);
690		self.allowed_requests.set_all();
691	}
692
693	fn on_generic_response(
694		&mut self,
695		peer_id: &PeerId,
696		key: StrategyKey,
697		protocol_name: ProtocolName,
698		response: Box<dyn Any + Send>,
699	) {
700		if Self::STRATEGY_KEY != key {
701			warn!(
702				target: LOG_TARGET,
703				"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
704			);
705			debug_assert!(false);
706			return;
707		}
708
709		if protocol_name == self.state_request_protocol_name {
710			let Ok(response) = response.downcast::<Vec<u8>>() else {
711				warn!(target: LOG_TARGET, "Failed to downcast state response");
712				debug_assert!(false);
713				return;
714			};
715
716			if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
717				self.actions.push(SyncingAction::DropPeer(bad_peer));
718			}
719		} else if &protocol_name == self.block_downloader.protocol_name() {
720			let Ok(response) = response
721				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
722			else {
723				warn!(target: LOG_TARGET, "Failed to downcast block response");
724				debug_assert!(false);
725				return;
726			};
727
728			let (request, response) = *response;
729			let blocks = match response {
730				Ok(blocks) => blocks,
731				Err(BlockResponseError::DecodeFailed(e)) => {
732					debug!(
733						target: LOG_TARGET,
734						"Failed to decode block response from peer {:?}: {:?}.",
735						peer_id,
736						e
737					);
738					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
739					return;
740				},
741				Err(BlockResponseError::ExtractionFailed(e)) => {
742					debug!(
743						target: LOG_TARGET,
744						"Failed to extract blocks from peer response {:?}: {:?}.",
745						peer_id,
746						e
747					);
748					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
749					return;
750				},
751			};
752
753			if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
754				self.actions.push(SyncingAction::DropPeer(bad_peer));
755			}
756		} else {
757			warn!(
758				target: LOG_TARGET,
759				"Unexpected generic response protocol {protocol_name}, strategy key \
760				{key:?}",
761			);
762			debug_assert!(false);
763		}
764	}
765
766	fn on_blocks_processed(
767		&mut self,
768		imported: usize,
769		count: usize,
770		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
771	) {
772		trace!(target: LOG_TARGET, "Imported {imported} of {count}");
773
774		let mut has_error = false;
775		for (_, hash) in &results {
776			if self.queue_blocks.remove(hash) {
777				if let Some(metrics) = &self.metrics {
778					metrics.queued_blocks.dec();
779				}
780			}
781			self.blocks.clear_queued(hash);
782			if let Some(gap_sync) = &mut self.gap_sync {
783				gap_sync.blocks.clear_queued(hash);
784			}
785		}
786		for (result, hash) in results {
787			if has_error {
788				break;
789			}
790
791			has_error |= result.is_err();
792
793			match result {
794				Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
795					if let Some(peer) = peer_id {
796						self.update_peer_common_number(&peer, number);
797					}
798					self.complete_gap_if_target(number);
799				},
800				Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
801					if aux.clear_justification_requests {
802						trace!(
803							target: LOG_TARGET,
804							"Block imported clears all pending justification requests {number}: {hash:?}",
805						);
806						self.clear_justification_requests();
807					}
808
809					if aux.needs_justification {
810						trace!(
811							target: LOG_TARGET,
812							"Block imported but requires justification {number}: {hash:?}",
813						);
814						self.request_justification(&hash, number);
815					}
816
817					if aux.bad_justification {
818						if let Some(ref peer) = peer_id {
819							warn!("💔 Sent block with bad justification to import");
820							self.actions.push(SyncingAction::DropPeer(BadPeer(
821								*peer,
822								rep::BAD_JUSTIFICATION,
823							)));
824						}
825					}
826
827					if let Some(peer) = peer_id {
828						self.update_peer_common_number(&peer, number);
829					}
830					let state_sync_complete =
831						self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
832					if state_sync_complete {
833						info!(
834							target: LOG_TARGET,
835							"State sync is complete ({} MiB), restarting block sync.",
836							self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
837						);
838						self.state_sync = None;
839						self.mode = ChainSyncMode::Full;
840						self.restart();
841					}
842
843					self.complete_gap_if_target(number);
844				},
845				Err(BlockImportError::IncompleteHeader(peer_id)) => {
846					if let Some(peer) = peer_id {
847						warn!(
848							target: LOG_TARGET,
849							"💔 Peer sent block with incomplete header to import",
850						);
851						self.actions
852							.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
853						self.restart();
854					}
855				},
856				Err(BlockImportError::VerificationFailed(peer_id, e)) => {
857					let extra_message = peer_id
858						.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
859
860					warn!(
861						target: LOG_TARGET,
862						"💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
863					);
864
865					if let Some(peer) = peer_id {
866						self.actions
867							.push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
868					}
869
870					self.restart();
871				},
872				Err(BlockImportError::BadBlock(peer_id)) => {
873					if let Some(peer) = peer_id {
874						warn!(
875							target: LOG_TARGET,
876							"💔 Block {hash:?} received from peer {peer} has been blacklisted",
877						);
878						self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
879					}
880				},
881				Err(BlockImportError::MissingState) => {
882					// This may happen if the chain we were requesting upon has been discarded
883					// in the meantime because other chain has been finalized.
884					// Don't mark it as bad as it still may be synced if explicitly requested.
885					trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
886				},
887				e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
888					warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
889					self.state_sync = None;
890					self.restart();
891				},
892				Err(BlockImportError::Cancelled) => {},
893			};
894		}
895
896		self.allowed_requests.set_all();
897	}
898
899	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
900		let client = &self.client;
901		let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
902			is_descendent_of(&**client, base, block)
903		});
904
905		if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
906			if self.state_sync.is_none() {
907				if !self.peers.is_empty() && self.queue_blocks.is_empty() {
908					self.attempt_state_sync(*hash, number, *skip_proofs);
909				} else {
910					self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
911				}
912			}
913		}
914
915		if let Err(err) = r {
916			warn!(
917				target: LOG_TARGET,
918				"💔 Error cleaning up pending extra justification data requests: {err}",
919			);
920		}
921	}
922
923	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
924		self.on_block_queued(best_hash, best_number);
925	}
926
927	fn is_major_syncing(&self) -> bool {
928		self.status().state.is_major_syncing()
929	}
930
931	fn num_peers(&self) -> usize {
932		self.peers.len()
933	}
934
935	fn status(&self) -> SyncStatus<B> {
936		let median_seen = self.median_seen();
937		let best_seen_block =
938			median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
939		let sync_state = if let Some(target) = median_seen {
940			// A chain is classified as downloading if the provided best block is
941			// more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing
942			// if the same can be said about queued blocks.
943			let best_block = self.client.info().best_number;
944			if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
945				// If target is not queued, we're downloading, otherwise importing.
946				if target > self.best_queued_number {
947					SyncState::Downloading { target }
948				} else {
949					SyncState::Importing { target }
950				}
951			} else {
952				SyncState::Idle
953			}
954		} else {
955			SyncState::Idle
956		};
957
958		let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
959			phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
960			total_bytes: 0,
961			status: None,
962		});
963
964		SyncStatus {
965			state: sync_state,
966			best_seen_block,
967			num_peers: self.peers.len() as u32,
968			queued_blocks: self.queue_blocks.len() as u32,
969			state_sync: self.state_sync.as_ref().map(|s| s.progress()),
970			warp_sync: warp_sync_progress,
971		}
972	}
973
974	fn num_downloaded_blocks(&self) -> usize {
975		self.downloaded_blocks
976	}
977
978	fn num_sync_requests(&self) -> usize {
979		self.fork_targets
980			.values()
981			.filter(|f| f.number <= self.best_queued_number)
982			.count()
983	}
984
985	fn actions(
986		&mut self,
987		network_service: &NetworkServiceHandle,
988	) -> Result<Vec<SyncingAction<B>>, ClientError> {
989		if !self.peers.is_empty() && self.queue_blocks.is_empty() {
990			if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
991				self.attempt_state_sync(hash, number, skip_proofs);
992			}
993		}
994
995		let block_requests = self
996			.block_requests()
997			.into_iter()
998			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
999			.collect::<Vec<_>>();
1000		self.actions.extend(block_requests);
1001
1002		let justification_requests = self
1003			.justification_requests()
1004			.into_iter()
1005			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
1006			.collect::<Vec<_>>();
1007		self.actions.extend(justification_requests);
1008
1009		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
1010			trace!(
1011				target: LOG_TARGET,
1012				"Created `StateRequest` to {peer_id}.",
1013			);
1014
1015			let (tx, rx) = oneshot::channel();
1016
1017			network_service.start_request(
1018				peer_id,
1019				self.state_request_protocol_name.clone(),
1020				request.encode_to_vec(),
1021				tx,
1022				IfDisconnected::ImmediateError,
1023			);
1024
1025			SyncingAction::StartRequest {
1026				peer_id,
1027				key: Self::STRATEGY_KEY,
1028				request: async move {
1029					Ok(rx.await?.and_then(|(response, protocol_name)| {
1030						Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
1031					}))
1032				}
1033				.boxed(),
1034				remove_obsolete: false,
1035			}
1036		});
1037		self.actions.extend(state_request);
1038
1039		Ok(std::mem::take(&mut self.actions))
1040	}
1041}
1042
1043impl<B, Client> ChainSync<B, Client>
1044where
1045	B: BlockT,
1046	Client: HeaderBackend<B>
1047		+ BlockBackend<B>
1048		+ HeaderMetadata<B, Error = sp_blockchain::Error>
1049		+ ProofProvider<B>
1050		+ Send
1051		+ Sync
1052		+ 'static,
1053{
1054	/// Strategy key used by chain sync.
1055	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
1056
1057	/// Create a new instance.
1058	pub fn new(
1059		mode: ChainSyncMode,
1060		client: Arc<Client>,
1061		max_parallel_downloads: u32,
1062		max_blocks_per_request: u32,
1063		state_request_protocol_name: ProtocolName,
1064		block_downloader: Arc<dyn BlockDownloader<B>>,
1065		archive_blocks: bool,
1066		metrics_registry: Option<&Registry>,
1067		initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
1068	) -> Result<Self, ClientError> {
1069		let mut sync = Self {
1070			client,
1071			peers: HashMap::new(),
1072			disconnected_peers: DisconnectedPeers::new(),
1073			blocks: BlockCollection::new(),
1074			best_queued_hash: Default::default(),
1075			best_queued_number: Zero::zero(),
1076			extra_justifications: ExtraRequests::new("justification", metrics_registry),
1077			mode,
1078			queue_blocks: Default::default(),
1079			pending_state_sync_attempt: None,
1080			fork_targets: Default::default(),
1081			allowed_requests: Default::default(),
1082			max_parallel_downloads,
1083			max_blocks_per_request,
1084			state_request_protocol_name,
1085			downloaded_blocks: 0,
1086			state_sync: None,
1087			import_existing: false,
1088			block_downloader,
1089			archive_blocks,
1090			gap_sync: None,
1091			actions: Vec::new(),
1092			metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
1093				Ok(metrics) => Some(metrics),
1094				Err(err) => {
1095					log::error!(
1096						target: LOG_TARGET,
1097						"Failed to register `ChainSync` metrics {err:?}",
1098					);
1099					None
1100				},
1101			}),
1102		};
1103
1104		sync.reset_sync_start_point()?;
1105		initial_peers.for_each(|(peer_id, best_hash, best_number)| {
1106			sync.add_peer(peer_id, best_hash, best_number);
1107		});
1108
1109		Ok(sync)
1110	}
1111
1112	/// Complete the gap sync if the target number is reached and there is a gap.
1113	fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
1114		let Some(gap_sync) = &self.gap_sync else { return };
1115
1116		if gap_sync.target != number {
1117			return;
1118		}
1119
1120		info!(
1121			target: LOG_TARGET,
1122			"Block history download is complete.",
1123		);
1124		self.gap_sync = None;
1125	}
1126
1127	#[must_use]
1128	fn add_peer_inner(
1129		&mut self,
1130		peer_id: PeerId,
1131		best_hash: B::Hash,
1132		best_number: NumberFor<B>,
1133	) -> Result<Option<BlockRequest<B>>, BadPeer> {
1134		// There is nothing sync can get from the node that has no blockchain data.
1135		match self.block_status(&best_hash) {
1136			Err(e) => {
1137				debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1138				Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1139			},
1140			Ok(BlockStatus::KnownBad) => {
1141				info!(
1142					"💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1143				);
1144				Err(BadPeer(peer_id, rep::BAD_BLOCK))
1145			},
1146			Ok(BlockStatus::Unknown) => {
1147				if best_number.is_zero() {
1148					info!(
1149						"💔 New peer {} with unknown genesis hash {} ({}).",
1150						peer_id, best_hash, best_number,
1151					);
1152					return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1153				}
1154
1155				// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
1156				// enough to do in the import queue that it's not worth kicking off
1157				// an ancestor search, which is what we do in the next match case below.
1158				if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1159					debug!(
1160						target: LOG_TARGET,
1161						"New peer {} with unknown best hash {} ({}), assuming common block.",
1162						peer_id,
1163						self.best_queued_hash,
1164						self.best_queued_number
1165					);
1166					self.peers.insert(
1167						peer_id,
1168						PeerSync {
1169							peer_id,
1170							common_number: self.best_queued_number,
1171							best_hash,
1172							best_number,
1173							state: PeerSyncState::Available,
1174						},
1175					);
1176					return Ok(None);
1177				}
1178
1179				// If we are at genesis, just start downloading.
1180				let (state, req) = if self.best_queued_number.is_zero() {
1181					debug!(
1182						target: LOG_TARGET,
1183						"New peer {peer_id} with best hash {best_hash} ({best_number}).",
1184					);
1185
1186					(PeerSyncState::Available, None)
1187				} else {
1188					let common_best = std::cmp::min(self.best_queued_number, best_number);
1189
1190					debug!(
1191						target: LOG_TARGET,
1192						"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1193						peer_id,
1194						best_hash,
1195						best_number
1196					);
1197
1198					(
1199						PeerSyncState::AncestorSearch {
1200							current: common_best,
1201							start: self.best_queued_number,
1202							state: AncestorSearchState::ExponentialBackoff(One::one()),
1203						},
1204						Some(ancestry_request::<B>(common_best)),
1205					)
1206				};
1207
1208				self.allowed_requests.add(&peer_id);
1209				self.peers.insert(
1210					peer_id,
1211					PeerSync {
1212						peer_id,
1213						common_number: Zero::zero(),
1214						best_hash,
1215						best_number,
1216						state,
1217					},
1218				);
1219
1220				Ok(req)
1221			},
1222			Ok(BlockStatus::Queued) |
1223			Ok(BlockStatus::InChainWithState) |
1224			Ok(BlockStatus::InChainPruned) => {
1225				debug!(
1226					target: LOG_TARGET,
1227					"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1228				);
1229				self.peers.insert(
1230					peer_id,
1231					PeerSync {
1232						peer_id,
1233						common_number: std::cmp::min(self.best_queued_number, best_number),
1234						best_hash,
1235						best_number,
1236						state: PeerSyncState::Available,
1237					},
1238				);
1239				self.allowed_requests.add(&peer_id);
1240				Ok(None)
1241			},
1242		}
1243	}
1244
1245	fn create_block_request_action(
1246		&mut self,
1247		peer_id: PeerId,
1248		request: BlockRequest<B>,
1249	) -> SyncingAction<B> {
1250		let downloader = self.block_downloader.clone();
1251
1252		SyncingAction::StartRequest {
1253			peer_id,
1254			key: Self::STRATEGY_KEY,
1255			request: async move {
1256				Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1257					|(response, protocol_name)| {
1258						let decoded_response =
1259							downloader.block_response_into_blocks(&request, response);
1260						let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1261						Ok((result, protocol_name))
1262					},
1263				))
1264			}
1265			.boxed(),
1266			// Sending block request implies dropping obsolete pending response as we are not
1267			// interested in it anymore.
1268			remove_obsolete: true,
1269		}
1270	}
1271
1272	/// Submit a block response for processing.
1273	#[must_use]
1274	fn on_block_data(
1275		&mut self,
1276		peer_id: &PeerId,
1277		request: Option<BlockRequest<B>>,
1278		response: BlockResponse<B>,
1279	) -> Result<(), BadPeer> {
1280		self.downloaded_blocks += response.blocks.len();
1281		let mut gap = false;
1282		let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1283			let mut blocks = response.blocks;
1284			if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1285				trace!(target: LOG_TARGET, "Reversing incoming block list");
1286				blocks.reverse()
1287			}
1288			self.allowed_requests.add(peer_id);
1289			if let Some(request) = request {
1290				match &mut peer.state {
1291					PeerSyncState::DownloadingNew(_) => {
1292						self.blocks.clear_peer_download(peer_id);
1293						peer.state = PeerSyncState::Available;
1294						if let Some(start_block) =
1295							validate_blocks::<B>(&blocks, peer_id, Some(request))?
1296						{
1297							self.blocks.insert(start_block, blocks, *peer_id);
1298						}
1299						self.ready_blocks()
1300					},
1301					PeerSyncState::DownloadingGap(_) => {
1302						peer.state = PeerSyncState::Available;
1303						if let Some(gap_sync) = &mut self.gap_sync {
1304							gap_sync.blocks.clear_peer_download(peer_id);
1305							if let Some(start_block) =
1306								validate_blocks::<B>(&blocks, peer_id, Some(request))?
1307							{
1308								gap_sync.blocks.insert(start_block, blocks, *peer_id);
1309							}
1310							gap = true;
1311							let mut batch_gap_sync_stats = GapSyncStats::new();
1312							let blocks: Vec<_> = gap_sync
1313								.blocks
1314								.ready_blocks(gap_sync.best_queued_number + One::one())
1315								.into_iter()
1316								.map(|block_data| {
1317									let justifications =
1318										block_data.block.justifications.or_else(|| {
1319											legacy_justification_mapping(
1320												block_data.block.justification,
1321											)
1322										});
1323									let gap_sync_stats = GapSyncStats {
1324										header_bytes: block_data
1325											.block
1326											.header
1327											.as_ref()
1328											.map(|h| h.encoded_size())
1329											.unwrap_or(0),
1330										body_bytes: block_data
1331											.block
1332											.body
1333											.as_ref()
1334											.map(|b| b.encoded_size())
1335											.unwrap_or(0),
1336										justification_bytes: justifications
1337											.as_ref()
1338											.map(|j| j.encoded_size())
1339											.unwrap_or(0),
1340									};
1341									batch_gap_sync_stats += gap_sync_stats;
1342
1343									IncomingBlock {
1344										hash: block_data.block.hash,
1345										header: block_data.block.header,
1346										body: block_data.block.body,
1347										indexed_body: block_data.block.indexed_body,
1348										justifications,
1349										origin: block_data.origin,
1350										allow_missing_state: true,
1351										// Warp-synced blocks are header-only. Allow re-import to
1352										// store bodies if gap sync requested them.
1353										import_existing: true,
1354										skip_execution: true,
1355										state: None,
1356									}
1357								})
1358								.collect();
1359
1360							debug!(
1361								target: LOG_TARGET,
1362								"Drained {} gap blocks from {}",
1363								blocks.len(),
1364								gap_sync.best_queued_number,
1365							);
1366
1367							gap_sync.stats += batch_gap_sync_stats;
1368
1369							if blocks.len() > 0 {
1370								trace!(
1371									target: LOG_TARGET,
1372									"Gap sync cumulative stats: {}",
1373									gap_sync.stats
1374								);
1375							}
1376							blocks
1377						} else {
1378							debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1379							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1380						}
1381					},
1382					PeerSyncState::DownloadingStale(_) => {
1383						peer.state = PeerSyncState::Available;
1384						if blocks.is_empty() {
1385							debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1386							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1387						}
1388						validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1389						blocks
1390							.into_iter()
1391							.map(|b| {
1392								let justifications = b
1393									.justifications
1394									.or_else(|| legacy_justification_mapping(b.justification));
1395								IncomingBlock {
1396									hash: b.hash,
1397									header: b.header,
1398									body: b.body,
1399									indexed_body: None,
1400									justifications,
1401									origin: Some(*peer_id),
1402									allow_missing_state: true,
1403									import_existing: self.import_existing,
1404									skip_execution: self.skip_execution(),
1405									state: None,
1406								}
1407							})
1408							.collect()
1409					},
1410					PeerSyncState::AncestorSearch { current, start, state } => {
1411						let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1412							(Some(block), Ok(maybe_our_block_hash)) => {
1413								trace!(
1414									target: LOG_TARGET,
1415									"Got ancestry block #{} ({}) from peer {}",
1416									current,
1417									block.hash,
1418									peer_id,
1419								);
1420								maybe_our_block_hash.filter(|x| x == &block.hash)
1421							},
1422							(None, _) => {
1423								debug!(
1424									target: LOG_TARGET,
1425									"Invalid response when searching for ancestor from {peer_id}",
1426								);
1427								return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1428							},
1429							(_, Err(e)) => {
1430								info!(
1431									target: LOG_TARGET,
1432									"❌ Error answering legitimate blockchain query: {e}",
1433								);
1434								return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1435							},
1436						};
1437						if matching_hash.is_some() {
1438							if *start < self.best_queued_number &&
1439								self.best_queued_number <= peer.best_number
1440							{
1441								// We've made progress on this chain since the search was started.
1442								// Opportunistically set common number to updated number
1443								// instead of the one that started the search.
1444								trace!(
1445									target: LOG_TARGET,
1446									"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1447									*peer_id,
1448									peer.common_number,
1449									self.best_queued_number,
1450								);
1451								peer.common_number = self.best_queued_number;
1452							} else if peer.common_number < *current {
1453								trace!(
1454									target: LOG_TARGET,
1455									"Ancestry search: updating peer {} common number from={} => to={}.",
1456									*peer_id,
1457									peer.common_number,
1458									*current,
1459								);
1460								peer.common_number = *current;
1461							}
1462						}
1463						if matching_hash.is_none() && current.is_zero() {
1464							trace!(
1465								target: LOG_TARGET,
1466								"Ancestry search: genesis mismatch for peer {peer_id}",
1467							);
1468							return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1469						}
1470						if let Some((next_state, next_num)) =
1471							handle_ancestor_search_state(state, *current, matching_hash.is_some())
1472						{
1473							peer.state = PeerSyncState::AncestorSearch {
1474								current: next_num,
1475								start: *start,
1476								state: next_state,
1477							};
1478							let request = ancestry_request::<B>(next_num);
1479							let action = self.create_block_request_action(*peer_id, request);
1480							self.actions.push(action);
1481							return Ok(());
1482						} else {
1483							// Ancestry search is complete. Check if peer is on a stale fork unknown
1484							// to us and add it to sync targets if necessary.
1485							trace!(
1486								target: LOG_TARGET,
1487								"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1488								self.best_queued_hash,
1489								self.best_queued_number,
1490								peer.best_hash,
1491								peer.best_number,
1492								matching_hash,
1493								peer.common_number,
1494							);
1495							if peer.common_number < peer.best_number &&
1496								peer.best_number < self.best_queued_number
1497							{
1498								trace!(
1499									target: LOG_TARGET,
1500									"Added fork target {} for {}",
1501									peer.best_hash,
1502									peer_id,
1503								);
1504								self.fork_targets
1505									.entry(peer.best_hash)
1506									.or_insert_with(|| {
1507										if let Some(metrics) = &self.metrics {
1508											metrics.fork_targets.inc();
1509										}
1510
1511										ForkTarget {
1512											number: peer.best_number,
1513											parent_hash: None,
1514											peers: Default::default(),
1515										}
1516									})
1517									.peers
1518									.insert(*peer_id);
1519							}
1520							peer.state = PeerSyncState::Available;
1521							return Ok(());
1522						}
1523					},
1524					PeerSyncState::Available |
1525					PeerSyncState::DownloadingJustification(..) |
1526					PeerSyncState::DownloadingState => Vec::new(),
1527				}
1528			} else {
1529				// When request.is_none() this is a block announcement. Just accept blocks.
1530				validate_blocks::<B>(&blocks, peer_id, None)?;
1531				blocks
1532					.into_iter()
1533					.map(|b| {
1534						let justifications = b
1535							.justifications
1536							.or_else(|| legacy_justification_mapping(b.justification));
1537						IncomingBlock {
1538							hash: b.hash,
1539							header: b.header,
1540							body: b.body,
1541							indexed_body: None,
1542							justifications,
1543							origin: Some(*peer_id),
1544							allow_missing_state: true,
1545							import_existing: false,
1546							skip_execution: true,
1547							state: None,
1548						}
1549					})
1550					.collect()
1551			}
1552		} else {
1553			// We don't know of this peer, so we also did not request anything from it.
1554			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1555		};
1556
1557		self.validate_and_queue_blocks(new_blocks, gap);
1558
1559		Ok(())
1560	}
1561
1562	fn on_block_response(
1563		&mut self,
1564		peer_id: &PeerId,
1565		key: StrategyKey,
1566		request: BlockRequest<B>,
1567		blocks: Vec<BlockData<B>>,
1568	) -> Result<(), BadPeer> {
1569		if key != Self::STRATEGY_KEY {
1570			error!(
1571				target: LOG_TARGET,
1572				"`on_block_response()` called with unexpected key {key:?} for chain sync",
1573			);
1574			debug_assert!(false);
1575		}
1576		let block_response = BlockResponse::<B> { id: request.id, blocks };
1577
1578		let blocks_range = || match (
1579			block_response
1580				.blocks
1581				.first()
1582				.and_then(|b| b.header.as_ref().map(|h| h.number())),
1583			block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1584		) {
1585			(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1586			(Some(first), Some(_)) => format!(" ({})", first),
1587			_ => Default::default(),
1588		};
1589
1590		trace!(
1591			target: LOG_TARGET,
1592			"BlockResponse {} from {} with {} blocks {}",
1593			block_response.id,
1594			peer_id,
1595			block_response.blocks.len(),
1596			blocks_range(),
1597		);
1598
1599		if request.fields == BlockAttributes::JUSTIFICATION {
1600			self.on_block_justification(*peer_id, block_response)
1601		} else {
1602			self.on_block_data(peer_id, Some(request), block_response)
1603		}
1604	}
1605
1606	/// Submit a justification response for processing.
1607	#[must_use]
1608	fn on_block_justification(
1609		&mut self,
1610		peer_id: PeerId,
1611		response: BlockResponse<B>,
1612	) -> Result<(), BadPeer> {
1613		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1614			peer
1615		} else {
1616			error!(
1617				target: LOG_TARGET,
1618				"💔 Called on_block_justification with a peer ID of an unknown peer",
1619			);
1620			return Ok(());
1621		};
1622
1623		self.allowed_requests.add(&peer_id);
1624		if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1625			peer.state = PeerSyncState::Available;
1626
1627			// We only request one justification at a time
1628			let justification = if let Some(block) = response.blocks.into_iter().next() {
1629				if hash != block.hash {
1630					warn!(
1631						target: LOG_TARGET,
1632						"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1633						peer_id,
1634						hash,
1635						block.hash,
1636					);
1637					return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1638				}
1639
1640				block
1641					.justifications
1642					.or_else(|| legacy_justification_mapping(block.justification))
1643			} else {
1644				// we might have asked the peer for a justification on a block that we assumed it
1645				// had but didn't (regardless of whether it had a justification for it or not).
1646				trace!(
1647					target: LOG_TARGET,
1648					"Peer {peer_id:?} provided empty response for justification request {hash:?}",
1649				);
1650
1651				None
1652			};
1653
1654			if let Some((peer_id, hash, number, justifications)) =
1655				self.extra_justifications.on_response(peer_id, justification)
1656			{
1657				self.actions.push(SyncingAction::ImportJustifications {
1658					peer_id,
1659					hash,
1660					number,
1661					justifications,
1662				});
1663				return Ok(());
1664			}
1665		}
1666
1667		Ok(())
1668	}
1669
1670	/// Returns the median seen block number.
1671	fn median_seen(&self) -> Option<NumberFor<B>> {
1672		let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1673
1674		if best_seens.is_empty() {
1675			None
1676		} else {
1677			let middle = best_seens.len() / 2;
1678
1679			// Not the "perfect median" when we have an even number of peers.
1680			Some(*best_seens.select_nth_unstable(middle).1)
1681		}
1682	}
1683
1684	fn skip_execution(&self) -> bool {
1685		match self.mode {
1686			ChainSyncMode::Full => false,
1687			ChainSyncMode::LightState { .. } => true,
1688		}
1689	}
1690
1691	fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1692		let orig_len = new_blocks.len();
1693		new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1694		if new_blocks.len() != orig_len {
1695			debug!(
1696				target: LOG_TARGET,
1697				"Ignoring {} blocks that are already queued",
1698				orig_len - new_blocks.len(),
1699			);
1700		}
1701
1702		let origin = if gap {
1703			// Gap sync: filling historical blocks after warp sync
1704			BlockOrigin::GapSync
1705		} else if !self.status().state.is_major_syncing() {
1706			// Normal operation: receiving new blocks
1707			BlockOrigin::NetworkBroadcast
1708		} else {
1709			// Initial sync: catching up with the chain
1710			BlockOrigin::NetworkInitialSync
1711		};
1712
1713		if let Some((h, n)) = new_blocks
1714			.last()
1715			.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1716		{
1717			trace!(
1718				target: LOG_TARGET,
1719				"Accepted {} blocks ({:?}) with origin {:?}",
1720				new_blocks.len(),
1721				h,
1722				origin,
1723			);
1724			self.on_block_queued(h, n)
1725		}
1726		self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1727		if let Some(metrics) = &self.metrics {
1728			metrics
1729				.queued_blocks
1730				.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1731		}
1732
1733		self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1734	}
1735
1736	fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1737		if let Some(peer) = self.peers.get_mut(peer_id) {
1738			peer.update_common_number(new_common);
1739		}
1740	}
1741
1742	/// Called when a block has been queued for import.
1743	///
1744	/// Updates our internal state for best queued block and then goes
1745	/// through all peers to update our view of their state as well.
1746	fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1747		if self.fork_targets.remove(hash).is_some() {
1748			if let Some(metrics) = &self.metrics {
1749				metrics.fork_targets.dec();
1750			}
1751			trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1752		}
1753		if let Some(gap_sync) = &mut self.gap_sync {
1754			if number > gap_sync.best_queued_number && number <= gap_sync.target {
1755				gap_sync.best_queued_number = number;
1756			}
1757		}
1758		if number > self.best_queued_number {
1759			self.best_queued_number = number;
1760			self.best_queued_hash = *hash;
1761			// Update common blocks
1762			for (n, peer) in self.peers.iter_mut() {
1763				if let PeerSyncState::AncestorSearch { .. } = peer.state {
1764					// Wait for ancestry search to complete first.
1765					continue;
1766				}
1767				let new_common_number =
1768					if peer.best_number >= number { number } else { peer.best_number };
1769				trace!(
1770					target: LOG_TARGET,
1771					"Updating peer {} info, ours={}, common={}->{}, their best={}",
1772					n,
1773					number,
1774					peer.common_number,
1775					new_common_number,
1776					peer.best_number,
1777				);
1778				peer.common_number = new_common_number;
1779			}
1780		}
1781		self.allowed_requests.set_all();
1782	}
1783
1784	/// Restart the sync process. This will reset all pending block requests and return an iterator
1785	/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
1786	/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
1787	fn restart(&mut self) {
1788		self.blocks.clear();
1789		if let Err(e) = self.reset_sync_start_point() {
1790			warn!(target: LOG_TARGET, "💔  Unable to restart sync: {e}");
1791		}
1792		self.allowed_requests.set_all();
1793		debug!(
1794			target: LOG_TARGET,
1795			"Restarted with {} ({})",
1796			self.best_queued_number,
1797			self.best_queued_hash,
1798		);
1799		let old_peers = std::mem::take(&mut self.peers);
1800
1801		old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1802			match peer_sync.state {
1803				PeerSyncState::Available => {
1804					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1805				},
1806				PeerSyncState::AncestorSearch { .. } |
1807				PeerSyncState::DownloadingNew(_) |
1808				PeerSyncState::DownloadingStale(_) |
1809				PeerSyncState::DownloadingGap(_) |
1810				PeerSyncState::DownloadingState => {
1811					// Cancel a request first, as `add_peer` may generate a new request.
1812					self.actions
1813						.push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1814					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1815				},
1816				PeerSyncState::DownloadingJustification(_) => {
1817					// Peers that were downloading justifications
1818					// should be kept in that state.
1819					// We make sure our common number is at least something we have.
1820					trace!(
1821						target: LOG_TARGET,
1822						"Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1823						peer_id,
1824						peer_sync.common_number,
1825						self.best_queued_number,
1826					);
1827					peer_sync.common_number = self.best_queued_number;
1828					self.peers.insert(peer_id, peer_sync);
1829				},
1830			}
1831		});
1832	}
1833
1834	/// Find a block to start sync from. If we sync with state, that's the latest block we have
1835	/// state for.
1836	fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1837		let info = self.client.info();
1838		debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1839
1840		if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1841			warn!(
1842				target: LOG_TARGET,
1843				"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1844			);
1845			self.mode = ChainSyncMode::Full;
1846		}
1847
1848		self.import_existing = false;
1849		self.best_queued_hash = info.best_hash;
1850		self.best_queued_number = info.best_number;
1851
1852		if self.mode == ChainSyncMode::Full &&
1853			self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1854		{
1855			self.import_existing = true;
1856			// Latest state is missing, start with the last finalized state or genesis instead.
1857			if let Some((hash, number)) = info.finalized_state {
1858				debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1859				self.best_queued_hash = hash;
1860				self.best_queued_number = number;
1861			} else {
1862				debug!(target: LOG_TARGET, "Restarting from genesis");
1863				self.best_queued_hash = Default::default();
1864				self.best_queued_number = Zero::zero();
1865			}
1866		}
1867
1868		if let Some(BlockGap { start, end, .. }) = info.block_gap {
1869			let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1870			debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1871			self.gap_sync = Some(GapSync {
1872				best_queued_number: start - One::one(),
1873				target: end,
1874				blocks: BlockCollection::new(),
1875				stats: GapSyncStats::new(),
1876			});
1877		}
1878		trace!(
1879			target: LOG_TARGET,
1880			"Restarted sync at #{} ({:?})",
1881			self.best_queued_number,
1882			self.best_queued_hash,
1883		);
1884		Ok(())
1885	}
1886
1887	/// What is the status of the block corresponding to the given hash?
1888	fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1889		if self.queue_blocks.contains(hash) {
1890			return Ok(BlockStatus::Queued);
1891		}
1892		self.client.block_status(*hash)
1893	}
1894
1895	/// Is the block corresponding to the given hash known?
1896	fn is_known(&self, hash: &B::Hash) -> bool {
1897		self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1898	}
1899
1900	/// Is any peer downloading the given hash?
1901	fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1902		self.peers
1903			.iter()
1904			.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1905	}
1906
1907	/// Get the set of downloaded blocks that are ready to be queued for import.
1908	fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1909		self.blocks
1910			.ready_blocks(self.best_queued_number + One::one())
1911			.into_iter()
1912			.map(|block_data| {
1913				let justifications = block_data
1914					.block
1915					.justifications
1916					.or_else(|| legacy_justification_mapping(block_data.block.justification));
1917				IncomingBlock {
1918					hash: block_data.block.hash,
1919					header: block_data.block.header,
1920					body: block_data.block.body,
1921					indexed_body: block_data.block.indexed_body,
1922					justifications,
1923					origin: block_data.origin,
1924					allow_missing_state: true,
1925					import_existing: self.import_existing,
1926					skip_execution: self.skip_execution(),
1927					state: None,
1928				}
1929			})
1930			.collect()
1931	}
1932
1933	/// Get justification requests scheduled by sync to be sent out.
1934	fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1935		let peers = &mut self.peers;
1936		let mut matcher = self.extra_justifications.matcher();
1937		std::iter::from_fn(move || {
1938			if let Some((peer, request)) = matcher.next(peers) {
1939				peers
1940					.get_mut(&peer)
1941					.expect(
1942						"`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1943					)
1944					.state = PeerSyncState::DownloadingJustification(request.0);
1945				let req = BlockRequest::<B> {
1946					id: 0,
1947					fields: BlockAttributes::JUSTIFICATION,
1948					from: FromBlock::Hash(request.0),
1949					direction: Direction::Ascending,
1950					max: Some(1),
1951				};
1952				Some((peer, req))
1953			} else {
1954				None
1955			}
1956		})
1957		.collect()
1958	}
1959
1960	/// Get block requests scheduled by sync to be sent out.
1961	fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1962		if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1963			return Vec::new();
1964		}
1965
1966		if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1967			trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1968			return Vec::new();
1969		}
1970		let is_major_syncing = self.status().state.is_major_syncing();
1971		let mode = self.mode;
1972		let is_archive = self.archive_blocks;
1973		let blocks = &mut self.blocks;
1974		let fork_targets = &mut self.fork_targets;
1975		let last_finalized =
1976			std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1977		let best_queued = self.best_queued_number;
1978		let client = &self.client;
1979		let queue_blocks = &self.queue_blocks;
1980		let allowed_requests = self.allowed_requests.clone();
1981		let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1982		let max_blocks_per_request = self.max_blocks_per_request;
1983		let gap_sync = &mut self.gap_sync;
1984		let disconnected_peers = &mut self.disconnected_peers;
1985		let metrics = self.metrics.as_ref();
1986		let requests = self
1987			.peers
1988			.iter_mut()
1989			.filter_map(move |(&id, peer)| {
1990				if !peer.state.is_available() ||
1991					!allowed_requests.contains(&id) ||
1992					!disconnected_peers.is_peer_available(&id)
1993				{
1994					return None;
1995				}
1996
1997				// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from
1998				// the common number, the peer best number is higher than our best queued and the
1999				// common number is smaller than the last finalized block number, we should do an
2000				// ancestor search to find a better common block. If the queue is full we wait till
2001				// all blocks are imported though.
2002				if best_queued.saturating_sub(peer.common_number) >
2003					MAX_BLOCKS_TO_LOOK_BACKWARDS.into() &&
2004					best_queued < peer.best_number &&
2005					peer.common_number < last_finalized &&
2006					queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
2007				{
2008					trace!(
2009						target: LOG_TARGET,
2010						"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
2011						id,
2012						peer.common_number,
2013						best_queued,
2014					);
2015					let current = std::cmp::min(peer.best_number, best_queued);
2016					peer.state = PeerSyncState::AncestorSearch {
2017						current,
2018						start: best_queued,
2019						state: AncestorSearchState::ExponentialBackoff(One::one()),
2020					};
2021					Some((id, ancestry_request::<B>(current)))
2022				} else if let Some((range, req)) = peer_block_request(
2023					&id,
2024					peer,
2025					blocks,
2026					mode.required_block_attributes(false, is_archive),
2027					max_parallel,
2028					max_blocks_per_request,
2029					last_finalized,
2030					best_queued,
2031				) {
2032					peer.state = PeerSyncState::DownloadingNew(range.start);
2033					trace!(
2034						target: LOG_TARGET,
2035						"New block request for {}, (best:{}, common:{}) {:?}",
2036						id,
2037						peer.best_number,
2038						peer.common_number,
2039						req,
2040					);
2041					Some((id, req))
2042				} else if let Some((hash, req)) = fork_sync_request(
2043					&id,
2044					fork_targets,
2045					best_queued,
2046					last_finalized,
2047					mode.required_block_attributes(false, is_archive),
2048					|hash| {
2049						if queue_blocks.contains(hash) {
2050							BlockStatus::Queued
2051						} else {
2052							client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
2053						}
2054					},
2055					max_blocks_per_request,
2056					metrics,
2057				) {
2058					trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
2059					peer.state = PeerSyncState::DownloadingStale(hash);
2060					Some((id, req))
2061				} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
2062					peer_gap_block_request(
2063						&id,
2064						peer,
2065						&mut sync.blocks,
2066						mode.required_block_attributes(true, is_archive),
2067						sync.target,
2068						sync.best_queued_number,
2069						max_blocks_per_request,
2070					)
2071				}) {
2072					peer.state = PeerSyncState::DownloadingGap(range.start);
2073					trace!(
2074						target: LOG_TARGET,
2075						"New gap block request for {}, (best:{}, common:{}) {:?}",
2076						id,
2077						peer.best_number,
2078						peer.common_number,
2079						req,
2080					);
2081					Some((id, req))
2082				} else {
2083					None
2084				}
2085			})
2086			.collect::<Vec<_>>();
2087
2088		// Clear the allowed_requests state when sending new block requests
2089		// to prevent multiple inflight block requests from being issued.
2090		if !requests.is_empty() {
2091			self.allowed_requests.take();
2092		}
2093
2094		requests
2095	}
2096
2097	/// Get a state request scheduled by sync to be sent out (if any).
2098	fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
2099		if self.allowed_requests.is_empty() {
2100			return None;
2101		}
2102		if self.state_sync.is_some() &&
2103			self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
2104		{
2105			// Only one pending state request is allowed.
2106			return None;
2107		}
2108		if let Some(sync) = &self.state_sync {
2109			if sync.is_complete() {
2110				return None;
2111			}
2112
2113			for (id, peer) in self.peers.iter_mut() {
2114				if peer.state.is_available() &&
2115					peer.common_number >= sync.target_number() &&
2116					self.disconnected_peers.is_peer_available(&id)
2117				{
2118					peer.state = PeerSyncState::DownloadingState;
2119					let request = sync.next_request();
2120					trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
2121					self.allowed_requests.clear();
2122					return Some((*id, request));
2123				}
2124			}
2125		}
2126		None
2127	}
2128
2129	#[must_use]
2130	fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
2131		let response = match StateResponse::decode(response) {
2132			Ok(response) => response,
2133			Err(error) => {
2134				debug!(
2135					target: LOG_TARGET,
2136					"Failed to decode state response from peer {peer_id:?}: {error:?}.",
2137				);
2138
2139				return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2140			},
2141		};
2142
2143		if let Some(peer) = self.peers.get_mut(peer_id) {
2144			if let PeerSyncState::DownloadingState = peer.state {
2145				peer.state = PeerSyncState::Available;
2146				self.allowed_requests.set_all();
2147			}
2148		}
2149		let import_result = if let Some(sync) = &mut self.state_sync {
2150			debug!(
2151				target: LOG_TARGET,
2152				"Importing state data from {} with {} keys, {} proof nodes.",
2153				peer_id,
2154				response.entries.len(),
2155				response.proof.len(),
2156			);
2157			sync.import(response)
2158		} else {
2159			debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2160			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2161		};
2162
2163		match import_result {
2164			ImportResult::Import(hash, header, state, body, justifications) => {
2165				let origin = BlockOrigin::NetworkInitialSync;
2166				let block = IncomingBlock {
2167					hash,
2168					header: Some(header),
2169					body,
2170					indexed_body: None,
2171					justifications,
2172					origin: None,
2173					allow_missing_state: true,
2174					import_existing: true,
2175					skip_execution: self.skip_execution(),
2176					state: Some(state),
2177				};
2178				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2179				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2180				Ok(())
2181			},
2182			ImportResult::Continue => Ok(()),
2183			ImportResult::BadResponse => {
2184				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2185				Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2186			},
2187		}
2188	}
2189
2190	fn attempt_state_sync(
2191		&mut self,
2192		finalized_hash: B::Hash,
2193		finalized_number: NumberFor<B>,
2194		skip_proofs: bool,
2195	) {
2196		let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2197		heads.sort();
2198		let median = heads[heads.len() / 2];
2199		if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2200			if let Ok(Some(header)) = self.client.header(finalized_hash) {
2201				log::debug!(
2202					target: LOG_TARGET,
2203					"Starting state sync for #{finalized_number} ({finalized_hash})",
2204				);
2205				self.state_sync =
2206					Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2207				self.allowed_requests.set_all();
2208			} else {
2209				log::error!(
2210					target: LOG_TARGET,
2211					"Failed to start state sync: header for finalized block \
2212					  #{finalized_number} ({finalized_hash}) is not available",
2213				);
2214				debug_assert!(false);
2215			}
2216		}
2217	}
2218
2219	/// A version of `actions()` that doesn't schedule extra requests. For testing only.
2220	#[cfg(test)]
2221	#[must_use]
2222	fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2223		std::mem::take(&mut self.actions).into_iter()
2224	}
2225}
2226
2227// This is purely during a backwards compatible transitionary period and should be removed
2228// once we can assume all nodes can send and receive multiple Justifications
2229// The ID tag is hardcoded here to avoid depending on the GRANDPA crate.
2230// See: https://github.com/paritytech/substrate/issues/8172
2231fn legacy_justification_mapping(
2232	justification: Option<EncodedJustification>,
2233) -> Option<Justifications> {
2234	justification.map(|just| (*b"FRNK", just).into())
2235}
2236
2237/// Request the ancestry for a block. Sends a request for header and justification for the given
2238/// block number. Used during ancestry search.
2239fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2240	BlockRequest::<B> {
2241		id: 0,
2242		fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2243		from: FromBlock::Number(block),
2244		direction: Direction::Ascending,
2245		max: Some(1),
2246	}
2247}
2248
2249/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using
2250/// to try to find an ancestor block
2251#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2252pub(crate) enum AncestorSearchState<B: BlockT> {
2253	/// Use exponential backoff to find an ancestor, then switch to binary search.
2254	/// We keep track of the exponent.
2255	ExponentialBackoff(NumberFor<B>),
2256	/// Using binary search to find the best ancestor.
2257	/// We keep track of left and right bounds.
2258	BinarySearch(NumberFor<B>, NumberFor<B>),
2259}
2260
2261/// This function handles the ancestor search strategy used. The goal is to find a common point
2262/// that both our chains agree on that is as close to the tip as possible.
2263/// The way this works is we first have an exponential backoff strategy, where we try to step
2264/// forward until we find a block hash mismatch. The size of the step doubles each step we take.
2265///
2266/// When we've found a block hash mismatch we then fall back to a binary search between the two
2267/// last known points to find the common block closest to the tip.
2268fn handle_ancestor_search_state<B: BlockT>(
2269	state: &AncestorSearchState<B>,
2270	curr_block_num: NumberFor<B>,
2271	block_hash_match: bool,
2272) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2273	let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2274	match state {
2275		AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2276			let next_distance_to_tip = *next_distance_to_tip;
2277			if block_hash_match && next_distance_to_tip == One::one() {
2278				// We found the ancestor in the first step so there is no need to execute binary
2279				// search.
2280				return None;
2281			}
2282			if block_hash_match {
2283				let left = curr_block_num;
2284				let right = left + next_distance_to_tip / two;
2285				let middle = left + (right - left) / two;
2286				Some((AncestorSearchState::BinarySearch(left, right), middle))
2287			} else {
2288				let next_block_num =
2289					curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2290				let next_distance_to_tip = next_distance_to_tip * two;
2291				Some((
2292					AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2293					next_block_num,
2294				))
2295			}
2296		},
2297		AncestorSearchState::BinarySearch(mut left, mut right) => {
2298			if left >= curr_block_num {
2299				return None;
2300			}
2301			if block_hash_match {
2302				left = curr_block_num;
2303			} else {
2304				right = curr_block_num;
2305			}
2306			assert!(right >= left);
2307			let middle = left + (right - left) / two;
2308			if middle == curr_block_num {
2309				None
2310			} else {
2311				Some((AncestorSearchState::BinarySearch(left, right), middle))
2312			}
2313		},
2314	}
2315}
2316
2317/// Get a new block request for the peer if any.
2318fn peer_block_request<B: BlockT>(
2319	id: &PeerId,
2320	peer: &PeerSync<B>,
2321	blocks: &mut BlockCollection<B>,
2322	attrs: BlockAttributes,
2323	max_parallel_downloads: u32,
2324	max_blocks_per_request: u32,
2325	finalized: NumberFor<B>,
2326	best_num: NumberFor<B>,
2327) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2328	if best_num >= peer.best_number {
2329		// Will be downloaded as alternative fork instead.
2330		return None;
2331	} else if peer.common_number < finalized {
2332		trace!(
2333			target: LOG_TARGET,
2334			"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2335			id, peer.common_number, finalized, peer.best_number, best_num,
2336		);
2337	}
2338	let range = blocks.needed_blocks(
2339		*id,
2340		max_blocks_per_request,
2341		peer.best_number,
2342		peer.common_number,
2343		max_parallel_downloads,
2344		MAX_DOWNLOAD_AHEAD,
2345	)?;
2346
2347	// The end is not part of the range.
2348	let last = range.end.saturating_sub(One::one());
2349
2350	let from = if peer.best_number == last {
2351		FromBlock::Hash(peer.best_hash)
2352	} else {
2353		FromBlock::Number(last)
2354	};
2355
2356	let request = BlockRequest::<B> {
2357		id: 0,
2358		fields: attrs,
2359		from,
2360		direction: Direction::Descending,
2361		max: Some((range.end - range.start).saturated_into::<u32>()),
2362	};
2363
2364	Some((range, request))
2365}
2366
2367/// Get a new block request for the peer if any.
2368fn peer_gap_block_request<B: BlockT>(
2369	id: &PeerId,
2370	peer: &PeerSync<B>,
2371	blocks: &mut BlockCollection<B>,
2372	attrs: BlockAttributes,
2373	target: NumberFor<B>,
2374	common_number: NumberFor<B>,
2375	max_blocks_per_request: u32,
2376) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2377	let range = blocks.needed_blocks(
2378		*id,
2379		max_blocks_per_request,
2380		std::cmp::min(peer.best_number, target),
2381		common_number,
2382		1,
2383		MAX_DOWNLOAD_AHEAD,
2384	)?;
2385
2386	// The end is not part of the range.
2387	let last = range.end.saturating_sub(One::one());
2388	let from = FromBlock::Number(last);
2389
2390	let request = BlockRequest::<B> {
2391		id: 0,
2392		fields: attrs,
2393		from,
2394		direction: Direction::Descending,
2395		max: Some((range.end - range.start).saturated_into::<u32>()),
2396	};
2397	Some((range, request))
2398}
2399
2400/// Get pending fork sync targets for a peer.
2401fn fork_sync_request<B: BlockT>(
2402	id: &PeerId,
2403	fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2404	best_num: NumberFor<B>,
2405	finalized: NumberFor<B>,
2406	attributes: BlockAttributes,
2407	check_block: impl Fn(&B::Hash) -> BlockStatus,
2408	max_blocks_per_request: u32,
2409	metrics: Option<&Metrics>,
2410) -> Option<(B::Hash, BlockRequest<B>)> {
2411	fork_targets.retain(|hash, r| {
2412		if r.number <= finalized {
2413			trace!(
2414				target: LOG_TARGET,
2415				"Removed expired fork sync request {:?} (#{})",
2416				hash,
2417				r.number,
2418			);
2419			return false;
2420		}
2421		if check_block(hash) != BlockStatus::Unknown {
2422			trace!(
2423				target: LOG_TARGET,
2424				"Removed obsolete fork sync request {:?} (#{})",
2425				hash,
2426				r.number,
2427			);
2428			return false;
2429		}
2430		true
2431	});
2432	if let Some(metrics) = metrics {
2433		metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2434	}
2435	for (hash, r) in fork_targets {
2436		if !r.peers.contains(&id) {
2437			continue;
2438		}
2439		// Download the fork only if it is behind or not too far ahead our tip of the chain
2440		// Otherwise it should be downloaded in full sync mode.
2441		if r.number <= best_num ||
2442			(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2443		{
2444			let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2445			let count = if parent_status == BlockStatus::Unknown {
2446				(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
2447			} else {
2448				// request only single block
2449				1
2450			};
2451			trace!(
2452				target: LOG_TARGET,
2453				"Downloading requested fork {hash:?} from {id}, {count} blocks",
2454			);
2455			return Some((
2456				*hash,
2457				BlockRequest::<B> {
2458					id: 0,
2459					fields: attributes,
2460					from: FromBlock::Hash(*hash),
2461					direction: Direction::Descending,
2462					max: Some(count),
2463				},
2464			));
2465		} else {
2466			trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2467		}
2468	}
2469	None
2470}
2471
2472/// Returns `true` if the given `block` is a descendent of `base`.
2473fn is_descendent_of<Block, T>(
2474	client: &T,
2475	base: &Block::Hash,
2476	block: &Block::Hash,
2477) -> sp_blockchain::Result<bool>
2478where
2479	Block: BlockT,
2480	T: HeaderMetadata<Block, Error = sp_blockchain::Error> + ?Sized,
2481{
2482	if base == block {
2483		return Ok(false);
2484	}
2485
2486	let ancestor = sp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2487
2488	Ok(ancestor.hash == *base)
2489}
2490
2491/// Validate that the given `blocks` are correct.
2492/// Returns the number of the first block in the sequence.
2493///
2494/// It is expected that `blocks` are in ascending order.
2495pub fn validate_blocks<Block: BlockT>(
2496	blocks: &Vec<BlockData<Block>>,
2497	peer_id: &PeerId,
2498	request: Option<BlockRequest<Block>>,
2499) -> Result<Option<NumberFor<Block>>, BadPeer> {
2500	if let Some(request) = request {
2501		if Some(blocks.len() as _) > request.max {
2502			debug!(
2503				target: LOG_TARGET,
2504				"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2505				peer_id,
2506				request.max,
2507				blocks.len(),
2508			);
2509
2510			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2511		}
2512
2513		let block_header =
2514			if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2515				.and_then(|b| b.header.as_ref());
2516
2517		let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2518			FromBlock::Hash(hash) => h.hash() == hash,
2519			FromBlock::Number(n) => h.number() == &n,
2520		});
2521
2522		if !expected_block {
2523			debug!(
2524				target: LOG_TARGET,
2525				"Received block that was not requested. Requested {:?}, got {:?}.",
2526				request.from,
2527				block_header,
2528			);
2529
2530			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2531		}
2532
2533		if request.fields.contains(BlockAttributes::HEADER) &&
2534			blocks.iter().any(|b| b.header.is_none())
2535		{
2536			trace!(
2537				target: LOG_TARGET,
2538				"Missing requested header for a block in response from {peer_id}.",
2539			);
2540
2541			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2542		}
2543
2544		if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2545		{
2546			trace!(
2547				target: LOG_TARGET,
2548				"Missing requested body for a block in response from {peer_id}.",
2549			);
2550
2551			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2552		}
2553	}
2554
2555	for b in blocks {
2556		if let Some(header) = &b.header {
2557			let hash = header.hash();
2558			if hash != b.hash {
2559				debug!(
2560					target: LOG_TARGET,
2561					"Bad header received from {}. Expected hash {:?}, got {:?}",
2562					peer_id,
2563					b.hash,
2564					hash,
2565				);
2566				return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2567			}
2568		}
2569	}
2570
2571	Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2572}