pezsc_network_sync/strategy/
chain_sync.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Contains the state of the chain synchronization process
20//!
21//! At any given point in time, a running node tries as much as possible to be at the head of the
22//! chain. This module handles the logic of which blocks to request from remotes, and processing
23//! responses. It yields blocks to check and potentially move to the database.
24//!
25//! # Usage
26//!
27//! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on
28//! the network, or whenever a block has been successfully verified, call the appropriate method in
29//! order to update it.
30
31use crate::{
32	block_relay_protocol::{BlockDownloader, BlockResponseError},
33	blocks::BlockCollection,
34	justification_requests::ExtraRequests,
35	schema::v1::{StateRequest, StateResponse},
36	service::network::NetworkServiceHandle,
37	strategy::{
38		disconnected_peers::DisconnectedPeers,
39		state_sync::{ImportResult, StateSync, StateSyncProvider},
40		warp::{WarpSyncPhase, WarpSyncProgress},
41		StrategyKey, SyncingAction, SyncingStrategy,
42	},
43	types::{BadPeer, SyncState, SyncStatus},
44	LOG_TARGET,
45};
46
47use futures::{channel::oneshot, FutureExt};
48use log::{debug, error, info, trace, warn};
49use pezsc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider};
50use pezsc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
51use pezsc_network::{IfDisconnected, ProtocolName};
52use pezsc_network_common::sync::message::{
53	BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock,
54};
55use pezsc_network_types::PeerId;
56use pezsp_arithmetic::traits::Saturating;
57use pezsp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
58use pezsp_consensus::{BlockOrigin, BlockStatus};
59use pezsp_runtime::{
60	traits::{
61		Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, One, SaturatedConversion, Zero,
62	},
63	EncodedJustification, Justifications,
64};
65use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64};
66use prost::Message;
67
68use std::{
69	any::Any,
70	collections::{HashMap, HashSet},
71	ops::Range,
72	sync::Arc,
73};
74
75#[cfg(test)]
76mod test;
77
78/// Maximum blocks to store in the import queue.
79const MAX_IMPORTING_BLOCKS: usize = 2048;
80
81/// Maximum blocks to download ahead of any gap.
82const MAX_DOWNLOAD_AHEAD: u32 = 2048;
83
84/// Maximum blocks to look backwards. The gap is the difference between the highest block and the
85/// common block of a node.
86const MAX_BLOCKS_TO_LOOK_BACKWARDS: u32 = MAX_DOWNLOAD_AHEAD / 2;
87
88/// Pick the state to sync as the latest finalized number minus this.
89const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8;
90
91/// We use a heuristic that with a high likelihood, by the time
92/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same
93/// chain as (or at least closer to) the peer so we want to delay
94/// the ancestor search to not waste time doing that when we are
95/// so far behind.
96const MAJOR_SYNC_BLOCKS: u8 = 5;
97
98mod rep {
99	use pezsc_network::ReputationChange as Rep;
100	/// Reputation change when a peer sent us a message that led to a
101	/// database read error.
102	pub const BLOCKCHAIN_READ_ERROR: Rep = Rep::new(-(1 << 16), "DB Error");
103
104	/// Reputation change when a peer sent us a status message with a different
105	/// genesis than us.
106	pub const GENESIS_MISMATCH: Rep = Rep::new(i32::MIN, "Genesis mismatch");
107
108	/// Reputation change for peers which send us a block with an incomplete header.
109	pub const INCOMPLETE_HEADER: Rep = Rep::new(-(1 << 20), "Incomplete header");
110
111	/// Reputation change for peers which send us a block which we fail to verify.
112	pub const VERIFICATION_FAIL: Rep = Rep::new(-(1 << 29), "Block verification failed");
113
114	/// Reputation change for peers which send us a known bad block.
115	pub const BAD_BLOCK: Rep = Rep::new(-(1 << 29), "Bad block");
116
117	/// Peer did not provide us with advertised block data.
118	pub const NO_BLOCK: Rep = Rep::new(-(1 << 29), "No requested block data");
119
120	/// Reputation change for peers which send us non-requested block data.
121	pub const NOT_REQUESTED: Rep = Rep::new(-(1 << 29), "Not requested block data");
122
123	/// Reputation change for peers which send us a block with bad justifications.
124	pub const BAD_JUSTIFICATION: Rep = Rep::new(-(1 << 16), "Bad justification");
125
126	/// Reputation change when a peer sent us invalid ancestry result.
127	pub const UNKNOWN_ANCESTOR: Rep = Rep::new(-(1 << 16), "DB Error");
128
129	/// Peer response data does not have requested bits.
130	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
131
132	/// We received a message that failed to decode.
133	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
134}
135
136struct Metrics {
137	queued_blocks: Gauge<U64>,
138	fork_targets: Gauge<U64>,
139}
140
141impl Metrics {
142	fn register(r: &Registry) -> Result<Self, PrometheusError> {
143		Ok(Self {
144			queued_blocks: {
145				let g = Gauge::new(
146					"bizinikiwi_sync_queued_blocks",
147					"Number of blocks in import queue",
148				)?;
149				register(g, r)?
150			},
151			fork_targets: {
152				let g = Gauge::new("bizinikiwi_sync_fork_targets", "Number of fork sync targets")?;
153				register(g, r)?
154			},
155		})
156	}
157}
158
159#[derive(Debug, Clone)]
160enum AllowedRequests {
161	Some(HashSet<PeerId>),
162	All,
163}
164
165impl AllowedRequests {
166	fn add(&mut self, id: &PeerId) {
167		if let Self::Some(ref mut set) = self {
168			set.insert(*id);
169		}
170	}
171
172	fn take(&mut self) -> Self {
173		std::mem::take(self)
174	}
175
176	fn set_all(&mut self) {
177		*self = Self::All;
178	}
179
180	fn contains(&self, id: &PeerId) -> bool {
181		match self {
182			Self::Some(set) => set.contains(id),
183			Self::All => true,
184		}
185	}
186
187	fn is_empty(&self) -> bool {
188		match self {
189			Self::Some(set) => set.is_empty(),
190			Self::All => false,
191		}
192	}
193
194	fn clear(&mut self) {
195		std::mem::take(self);
196	}
197}
198
199impl Default for AllowedRequests {
200	fn default() -> Self {
201		Self::Some(HashSet::default())
202	}
203}
204
205struct GapSync<B: BlockT> {
206	blocks: BlockCollection<B>,
207	best_queued_number: NumberFor<B>,
208	target: NumberFor<B>,
209}
210
211/// Sync operation mode.
212#[derive(Copy, Clone, Debug, Eq, PartialEq)]
213pub enum ChainSyncMode {
214	/// Full block download and verification.
215	Full,
216	/// Download blocks and the latest state.
217	LightState {
218		/// Skip state proof download and verification.
219		skip_proofs: bool,
220		/// Download indexed transactions for recent blocks.
221		storage_chain_mode: bool,
222	},
223}
224
225/// All the data we have about a Peer that we are trying to sync with
226#[derive(Debug, Clone)]
227pub(crate) struct PeerSync<B: BlockT> {
228	/// Peer id of this peer.
229	pub peer_id: PeerId,
230	/// The common number is the block number that is a common point of
231	/// ancestry for both our chains (as far as we know).
232	pub common_number: NumberFor<B>,
233	/// The hash of the best block that we've seen for this peer.
234	pub best_hash: B::Hash,
235	/// The number of the best block that we've seen for this peer.
236	pub best_number: NumberFor<B>,
237	/// The state of syncing this peer is in for us, generally categories
238	/// into `Available` or "busy" with something as defined by `PeerSyncState`.
239	pub state: PeerSyncState<B>,
240}
241
242impl<B: BlockT> PeerSync<B> {
243	/// Update the `common_number` iff `new_common > common_number`.
244	fn update_common_number(&mut self, new_common: NumberFor<B>) {
245		if self.common_number < new_common {
246			trace!(
247				target: LOG_TARGET,
248				"Updating peer {} common number from={} => to={}.",
249				self.peer_id,
250				self.common_number,
251				new_common,
252			);
253			self.common_number = new_common;
254		}
255	}
256}
257
258struct ForkTarget<B: BlockT> {
259	number: NumberFor<B>,
260	parent_hash: Option<B::Hash>,
261	peers: HashSet<PeerId>,
262}
263
264/// The state of syncing between a Peer and ourselves.
265///
266/// Generally two categories, "busy" or `Available`. If busy, the enum
267/// defines what we are busy with.
268#[derive(Copy, Clone, Eq, PartialEq, Debug)]
269pub(crate) enum PeerSyncState<B: BlockT> {
270	/// Available for sync requests.
271	Available,
272	/// Searching for ancestors the Peer has in common with us.
273	AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
274	/// Actively downloading new blocks, starting from the given Number.
275	DownloadingNew(NumberFor<B>),
276	/// Downloading a stale block with given Hash. Stale means that it is a
277	/// block with a number that is lower than our best number. It might be
278	/// from a fork and not necessarily already imported.
279	DownloadingStale(B::Hash),
280	/// Downloading justification for given block hash.
281	DownloadingJustification(B::Hash),
282	/// Downloading state.
283	DownloadingState,
284	/// Actively downloading block history after warp sync.
285	DownloadingGap(NumberFor<B>),
286}
287
288impl<B: BlockT> PeerSyncState<B> {
289	pub fn is_available(&self) -> bool {
290		matches!(self, Self::Available)
291	}
292}
293
294/// The main data structure which contains all the state for a chains
295/// active syncing strategy.
296pub struct ChainSync<B: BlockT, Client> {
297	/// Chain client.
298	client: Arc<Client>,
299	/// The active peers that we are using to sync and their PeerSync status
300	peers: HashMap<PeerId, PeerSync<B>>,
301	disconnected_peers: DisconnectedPeers,
302	/// A `BlockCollection` of blocks that are being downloaded from peers
303	blocks: BlockCollection<B>,
304	/// The best block number in our queue of blocks to import
305	best_queued_number: NumberFor<B>,
306	/// The best block hash in our queue of blocks to import
307	best_queued_hash: B::Hash,
308	/// Current mode (full/light)
309	mode: ChainSyncMode,
310	/// Any extra justification requests.
311	extra_justifications: ExtraRequests<B>,
312	/// A set of hashes of blocks that are being downloaded or have been
313	/// downloaded and are queued for import.
314	queue_blocks: HashSet<B::Hash>,
315	/// A pending attempt to start the state sync.
316	///
317	/// The initiation of state sync may be deferred in cases where other conditions
318	/// are not yet met when the finalized block notification is received, such as
319	/// when `queue_blocks` is not empty or there are no peers. This field holds the
320	/// necessary information to attempt the state sync at a later point when
321	/// conditions are satisfied.
322	pending_state_sync_attempt: Option<(B::Hash, NumberFor<B>, bool)>,
323	/// Fork sync targets.
324	fork_targets: HashMap<B::Hash, ForkTarget<B>>,
325	/// A set of peers for which there might be potential block requests
326	allowed_requests: AllowedRequests,
327	/// Maximum number of peers to ask the same blocks in parallel.
328	max_parallel_downloads: u32,
329	/// Maximum blocks per request.
330	max_blocks_per_request: u32,
331	/// Protocol name used to send out state requests
332	state_request_protocol_name: ProtocolName,
333	/// Total number of downloaded blocks.
334	downloaded_blocks: usize,
335	/// State sync in progress, if any.
336	state_sync: Option<StateSync<B, Client>>,
337	/// Enable importing existing blocks. This is used after the state download to
338	/// catch up to the latest state while re-importing blocks.
339	import_existing: bool,
340	/// Block downloader
341	block_downloader: Arc<dyn BlockDownloader<B>>,
342	/// Gap download process.
343	gap_sync: Option<GapSync<B>>,
344	/// Pending actions.
345	actions: Vec<SyncingAction<B>>,
346	/// Prometheus metrics.
347	metrics: Option<Metrics>,
348}
349
350impl<B, Client> SyncingStrategy<B> for ChainSync<B, Client>
351where
352	B: BlockT,
353	Client: HeaderBackend<B>
354		+ BlockBackend<B>
355		+ HeaderMetadata<B, Error = pezsp_blockchain::Error>
356		+ ProofProvider<B>
357		+ Send
358		+ Sync
359		+ 'static,
360{
361	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
362		match self.add_peer_inner(peer_id, best_hash, best_number) {
363			Ok(Some(request)) => {
364				let action = self.create_block_request_action(peer_id, request);
365				self.actions.push(action);
366			},
367			Ok(None) => {},
368			Err(bad_peer) => self.actions.push(SyncingAction::DropPeer(bad_peer)),
369		}
370	}
371
372	fn remove_peer(&mut self, peer_id: &PeerId) {
373		self.blocks.clear_peer_download(peer_id);
374		if let Some(gap_sync) = &mut self.gap_sync {
375			gap_sync.blocks.clear_peer_download(peer_id)
376		}
377
378		if let Some(state) = self.peers.remove(peer_id) {
379			if !state.state.is_available() {
380				if let Some(bad_peer) =
381					self.disconnected_peers.on_disconnect_during_request(*peer_id)
382				{
383					self.actions.push(SyncingAction::DropPeer(bad_peer));
384				}
385			}
386		}
387
388		self.extra_justifications.peer_disconnected(peer_id);
389		self.allowed_requests.set_all();
390		self.fork_targets.retain(|_, target| {
391			target.peers.remove(peer_id);
392			!target.peers.is_empty()
393		});
394		if let Some(metrics) = &self.metrics {
395			metrics.fork_targets.set(self.fork_targets.len().try_into().unwrap_or(u64::MAX));
396		}
397
398		let blocks = self.ready_blocks();
399
400		if !blocks.is_empty() {
401			self.validate_and_queue_blocks(blocks, false);
402		}
403	}
404
405	fn on_validated_block_announce(
406		&mut self,
407		is_best: bool,
408		peer_id: PeerId,
409		announce: &BlockAnnounce<B::Header>,
410	) -> Option<(B::Hash, NumberFor<B>)> {
411		let number = *announce.header.number();
412		let hash = announce.header.hash();
413		let parent_status =
414			self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
415		let known_parent = parent_status != BlockStatus::Unknown;
416		let ancient_parent = parent_status == BlockStatus::InChainPruned;
417
418		let known = self.is_known(&hash);
419		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
420			peer
421		} else {
422			error!(target: LOG_TARGET, "💔 Called `on_validated_block_announce` with a bad peer ID {peer_id}");
423			return Some((hash, number));
424		};
425
426		if let PeerSyncState::AncestorSearch { .. } = peer.state {
427			trace!(target: LOG_TARGET, "Peer {} is in the ancestor search state.", peer_id);
428			return None;
429		}
430
431		let peer_info = is_best.then(|| {
432			// update their best block
433			peer.best_number = number;
434			peer.best_hash = hash;
435
436			(hash, number)
437		});
438
439		// If the announced block is the best they have and is not ahead of us, our common number
440		// is either one further ahead or it's the one they just announced, if we know about it.
441		if is_best {
442			if known && self.best_queued_number >= number {
443				self.update_peer_common_number(&peer_id, number);
444			} else if announce.header.parent_hash() == &self.best_queued_hash
445				|| known_parent && self.best_queued_number >= number
446			{
447				self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
448			}
449		}
450		self.allowed_requests.add(&peer_id);
451
452		// known block case
453		if known || self.is_already_downloading(&hash) {
454			trace!(target: LOG_TARGET, "Known block announce from {}: {}", peer_id, hash);
455			if let Some(target) = self.fork_targets.get_mut(&hash) {
456				target.peers.insert(peer_id);
457			}
458			return peer_info;
459		}
460
461		if ancient_parent {
462			trace!(
463				target: LOG_TARGET,
464				"Ignored ancient block announced from {}: {} {:?}",
465				peer_id,
466				hash,
467				announce.header,
468			);
469			return peer_info;
470		}
471
472		if self.status().state == SyncState::Idle {
473			trace!(
474				target: LOG_TARGET,
475				"Added sync target for block announced from {}: {} {:?}",
476				peer_id,
477				hash,
478				announce.summary(),
479			);
480			self.fork_targets
481				.entry(hash)
482				.or_insert_with(|| {
483					if let Some(metrics) = &self.metrics {
484						metrics.fork_targets.inc();
485					}
486
487					ForkTarget {
488						number,
489						parent_hash: Some(*announce.header.parent_hash()),
490						peers: Default::default(),
491					}
492				})
493				.peers
494				.insert(peer_id);
495		}
496
497		peer_info
498	}
499
500	// The implementation is similar to `on_validated_block_announce` with unknown parent hash.
501	fn set_sync_fork_request(
502		&mut self,
503		mut peers: Vec<PeerId>,
504		hash: &B::Hash,
505		number: NumberFor<B>,
506	) {
507		if peers.is_empty() {
508			peers = self
509				.peers
510				.iter()
511				// Only request blocks from peers who are ahead or on a par.
512				.filter(|(_, peer)| peer.best_number >= number)
513				.map(|(id, _)| *id)
514				.collect();
515
516			debug!(
517				target: LOG_TARGET,
518				"Explicit sync request for block {hash:?} with no peers specified. \
519				Syncing from these peers {peers:?} instead.",
520			);
521		} else {
522			debug!(
523				target: LOG_TARGET,
524				"Explicit sync request for block {hash:?} with {peers:?}",
525			);
526		}
527
528		if self.is_known(hash) {
529			debug!(target: LOG_TARGET, "Refusing to sync known hash {hash:?}");
530			return;
531		}
532
533		trace!(target: LOG_TARGET, "Downloading requested old fork {hash:?}");
534		for peer_id in &peers {
535			if let Some(peer) = self.peers.get_mut(peer_id) {
536				if let PeerSyncState::AncestorSearch { .. } = peer.state {
537					continue;
538				}
539
540				if number > peer.best_number {
541					peer.best_number = number;
542					peer.best_hash = *hash;
543				}
544				self.allowed_requests.add(peer_id);
545			}
546		}
547
548		self.fork_targets
549			.entry(*hash)
550			.or_insert_with(|| {
551				if let Some(metrics) = &self.metrics {
552					metrics.fork_targets.inc();
553				}
554
555				ForkTarget { number, peers: Default::default(), parent_hash: None }
556			})
557			.peers
558			.extend(peers);
559	}
560
561	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
562		let client = &self.client;
563		self.extra_justifications
564			.schedule((*hash, number), |base, block| is_descendent_of(&**client, base, block))
565	}
566
567	fn clear_justification_requests(&mut self) {
568		self.extra_justifications.reset();
569	}
570
571	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
572		let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
573		self.extra_justifications
574			.try_finalize_root((hash, number), finalization_result, true);
575		self.allowed_requests.set_all();
576	}
577
578	fn on_generic_response(
579		&mut self,
580		peer_id: &PeerId,
581		key: StrategyKey,
582		protocol_name: ProtocolName,
583		response: Box<dyn Any + Send>,
584	) {
585		if Self::STRATEGY_KEY != key {
586			warn!(
587				target: LOG_TARGET,
588				"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
589			);
590			debug_assert!(false);
591			return;
592		}
593
594		if protocol_name == self.state_request_protocol_name {
595			let Ok(response) = response.downcast::<Vec<u8>>() else {
596				warn!(target: LOG_TARGET, "Failed to downcast state response");
597				debug_assert!(false);
598				return;
599			};
600
601			if let Err(bad_peer) = self.on_state_data(&peer_id, &response) {
602				self.actions.push(SyncingAction::DropPeer(bad_peer));
603			}
604		} else if &protocol_name == self.block_downloader.protocol_name() {
605			let Ok(response) = response
606				.downcast::<(BlockRequest<B>, Result<Vec<BlockData<B>>, BlockResponseError>)>()
607			else {
608				warn!(target: LOG_TARGET, "Failed to downcast block response");
609				debug_assert!(false);
610				return;
611			};
612
613			let (request, response) = *response;
614			let blocks = match response {
615				Ok(blocks) => blocks,
616				Err(BlockResponseError::DecodeFailed(e)) => {
617					debug!(
618						target: LOG_TARGET,
619						"Failed to decode block response from peer {:?}: {:?}.",
620						peer_id,
621						e
622					);
623					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
624					return;
625				},
626				Err(BlockResponseError::ExtractionFailed(e)) => {
627					debug!(
628						target: LOG_TARGET,
629						"Failed to extract blocks from peer response {:?}: {:?}.",
630						peer_id,
631						e
632					);
633					self.actions.push(SyncingAction::DropPeer(BadPeer(*peer_id, rep::BAD_MESSAGE)));
634					return;
635				},
636			};
637
638			if let Err(bad_peer) = self.on_block_response(peer_id, key, request, blocks) {
639				self.actions.push(SyncingAction::DropPeer(bad_peer));
640			}
641		} else {
642			warn!(
643				target: LOG_TARGET,
644				"Unexpected generic response protocol {protocol_name}, strategy key \
645				{key:?}",
646			);
647			debug_assert!(false);
648		}
649	}
650
651	fn on_blocks_processed(
652		&mut self,
653		imported: usize,
654		count: usize,
655		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
656	) {
657		trace!(target: LOG_TARGET, "Imported {imported} of {count}");
658
659		let mut has_error = false;
660		for (_, hash) in &results {
661			if self.queue_blocks.remove(hash) {
662				if let Some(metrics) = &self.metrics {
663					metrics.queued_blocks.dec();
664				}
665			}
666			self.blocks.clear_queued(hash);
667			if let Some(gap_sync) = &mut self.gap_sync {
668				gap_sync.blocks.clear_queued(hash);
669			}
670		}
671		for (result, hash) in results {
672			if has_error {
673				break;
674			}
675
676			has_error |= result.is_err();
677
678			match result {
679				Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => {
680					if let Some(peer) = peer_id {
681						self.update_peer_common_number(&peer, number);
682					}
683					self.complete_gap_if_target(number);
684				},
685				Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => {
686					if aux.clear_justification_requests {
687						trace!(
688							target: LOG_TARGET,
689							"Block imported clears all pending justification requests {number}: {hash:?}",
690						);
691						self.clear_justification_requests();
692					}
693
694					if aux.needs_justification {
695						trace!(
696							target: LOG_TARGET,
697							"Block imported but requires justification {number}: {hash:?}",
698						);
699						self.request_justification(&hash, number);
700					}
701
702					if aux.bad_justification {
703						if let Some(ref peer) = peer_id {
704							warn!("💔 Sent block with bad justification to import");
705							self.actions.push(SyncingAction::DropPeer(BadPeer(
706								*peer,
707								rep::BAD_JUSTIFICATION,
708							)));
709						}
710					}
711
712					if let Some(peer) = peer_id {
713						self.update_peer_common_number(&peer, number);
714					}
715					let state_sync_complete =
716						self.state_sync.as_ref().map_or(false, |s| s.target_hash() == hash);
717					if state_sync_complete {
718						info!(
719							target: LOG_TARGET,
720							"State sync is complete ({} MiB), restarting block sync.",
721							self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
722						);
723						self.state_sync = None;
724						self.mode = ChainSyncMode::Full;
725						self.restart();
726					}
727
728					self.complete_gap_if_target(number);
729				},
730				Err(BlockImportError::IncompleteHeader(peer_id)) => {
731					if let Some(peer) = peer_id {
732						warn!(
733							target: LOG_TARGET,
734							"💔 Peer sent block with incomplete header to import",
735						);
736						self.actions
737							.push(SyncingAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER)));
738						self.restart();
739					}
740				},
741				Err(BlockImportError::VerificationFailed(peer_id, e)) => {
742					let extra_message = peer_id
743						.map_or_else(|| "".into(), |peer| format!(" received from ({peer})"));
744
745					warn!(
746						target: LOG_TARGET,
747						"💔 Verification failed for block {hash:?}{extra_message}: {e:?}",
748					);
749
750					if let Some(peer) = peer_id {
751						self.actions
752							.push(SyncingAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL)));
753					}
754
755					self.restart();
756				},
757				Err(BlockImportError::BadBlock(peer_id)) => {
758					if let Some(peer) = peer_id {
759						warn!(
760							target: LOG_TARGET,
761							"💔 Block {hash:?} received from peer {peer} has been blacklisted",
762						);
763						self.actions.push(SyncingAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK)));
764					}
765				},
766				Err(BlockImportError::MissingState) => {
767					// This may happen if the chain we were requesting upon has been discarded
768					// in the meantime because other chain has been finalized.
769					// Don't mark it as bad as it still may be synced if explicitly requested.
770					trace!(target: LOG_TARGET, "Obsolete block {hash:?}");
771				},
772				e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
773					warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err());
774					self.state_sync = None;
775					self.restart();
776				},
777				Err(BlockImportError::Cancelled) => {},
778			};
779		}
780
781		self.allowed_requests.set_all();
782	}
783
784	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
785		let client = &self.client;
786		let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| {
787			is_descendent_of(&**client, base, block)
788		});
789
790		if let ChainSyncMode::LightState { skip_proofs, .. } = &self.mode {
791			if self.state_sync.is_none() {
792				if !self.peers.is_empty() && self.queue_blocks.is_empty() {
793					self.attempt_state_sync(*hash, number, *skip_proofs);
794				} else {
795					self.pending_state_sync_attempt.replace((*hash, number, *skip_proofs));
796				}
797			}
798		}
799
800		if let Err(err) = r {
801			warn!(
802				target: LOG_TARGET,
803				"💔 Error cleaning up pending extra justification data requests: {err}",
804			);
805		}
806	}
807
808	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
809		self.on_block_queued(best_hash, best_number);
810	}
811
812	fn is_major_syncing(&self) -> bool {
813		self.status().state.is_major_syncing()
814	}
815
816	fn num_peers(&self) -> usize {
817		self.peers.len()
818	}
819
820	fn status(&self) -> SyncStatus<B> {
821		let median_seen = self.median_seen();
822		let best_seen_block =
823			median_seen.and_then(|median| (median > self.best_queued_number).then_some(median));
824		let sync_state = if let Some(target) = median_seen {
825			// A chain is classified as downloading if the provided best block is
826			// more than `MAJOR_SYNC_BLOCKS` behind the best block or as importing
827			// if the same can be said about queued blocks.
828			let best_block = self.client.info().best_number;
829			if target > best_block && target - best_block > MAJOR_SYNC_BLOCKS.into() {
830				// If target is not queued, we're downloading, otherwise importing.
831				if target > self.best_queued_number {
832					SyncState::Downloading { target }
833				} else {
834					SyncState::Importing { target }
835				}
836			} else {
837				SyncState::Idle
838			}
839		} else {
840			SyncState::Idle
841		};
842
843		let warp_sync_progress = self.gap_sync.as_ref().map(|gap_sync| WarpSyncProgress {
844			phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
845			total_bytes: 0,
846		});
847
848		SyncStatus {
849			state: sync_state,
850			best_seen_block,
851			num_peers: self.peers.len() as u32,
852			queued_blocks: self.queue_blocks.len() as u32,
853			state_sync: self.state_sync.as_ref().map(|s| s.progress()),
854			warp_sync: warp_sync_progress,
855		}
856	}
857
858	fn num_downloaded_blocks(&self) -> usize {
859		self.downloaded_blocks
860	}
861
862	fn num_sync_requests(&self) -> usize {
863		self.fork_targets
864			.values()
865			.filter(|f| f.number <= self.best_queued_number)
866			.count()
867	}
868
869	fn actions(
870		&mut self,
871		network_service: &NetworkServiceHandle,
872	) -> Result<Vec<SyncingAction<B>>, ClientError> {
873		if !self.peers.is_empty() && self.queue_blocks.is_empty() {
874			if let Some((hash, number, skip_proofs)) = self.pending_state_sync_attempt.take() {
875				self.attempt_state_sync(hash, number, skip_proofs);
876			}
877		}
878
879		let block_requests = self
880			.block_requests()
881			.into_iter()
882			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
883			.collect::<Vec<_>>();
884		self.actions.extend(block_requests);
885
886		let justification_requests = self
887			.justification_requests()
888			.into_iter()
889			.map(|(peer_id, request)| self.create_block_request_action(peer_id, request))
890			.collect::<Vec<_>>();
891		self.actions.extend(justification_requests);
892
893		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
894			trace!(
895				target: LOG_TARGET,
896				"Created `StrategyRequest` to {peer_id}.",
897			);
898
899			let (tx, rx) = oneshot::channel();
900
901			network_service.start_request(
902				peer_id,
903				self.state_request_protocol_name.clone(),
904				request.encode_to_vec(),
905				tx,
906				IfDisconnected::ImmediateError,
907			);
908
909			SyncingAction::StartRequest {
910				peer_id,
911				key: Self::STRATEGY_KEY,
912				request: async move {
913					Ok(rx.await?.and_then(|(response, protocol_name)| {
914						Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
915					}))
916				}
917				.boxed(),
918				remove_obsolete: false,
919			}
920		});
921		self.actions.extend(state_request);
922
923		Ok(std::mem::take(&mut self.actions))
924	}
925}
926
927impl<B, Client> ChainSync<B, Client>
928where
929	B: BlockT,
930	Client: HeaderBackend<B>
931		+ BlockBackend<B>
932		+ HeaderMetadata<B, Error = pezsp_blockchain::Error>
933		+ ProofProvider<B>
934		+ Send
935		+ Sync
936		+ 'static,
937{
938	/// Strategy key used by chain sync.
939	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("ChainSync");
940
941	/// Create a new instance.
942	pub fn new(
943		mode: ChainSyncMode,
944		client: Arc<Client>,
945		max_parallel_downloads: u32,
946		max_blocks_per_request: u32,
947		state_request_protocol_name: ProtocolName,
948		block_downloader: Arc<dyn BlockDownloader<B>>,
949		metrics_registry: Option<&Registry>,
950		initial_peers: impl Iterator<Item = (PeerId, B::Hash, NumberFor<B>)>,
951	) -> Result<Self, ClientError> {
952		let mut sync = Self {
953			client,
954			peers: HashMap::new(),
955			disconnected_peers: DisconnectedPeers::new(),
956			blocks: BlockCollection::new(),
957			best_queued_hash: Default::default(),
958			best_queued_number: Zero::zero(),
959			extra_justifications: ExtraRequests::new("justification", metrics_registry),
960			mode,
961			queue_blocks: Default::default(),
962			pending_state_sync_attempt: None,
963			fork_targets: Default::default(),
964			allowed_requests: Default::default(),
965			max_parallel_downloads,
966			max_blocks_per_request,
967			state_request_protocol_name,
968			downloaded_blocks: 0,
969			state_sync: None,
970			import_existing: false,
971			block_downloader,
972			gap_sync: None,
973			actions: Vec::new(),
974			metrics: metrics_registry.and_then(|r| match Metrics::register(r) {
975				Ok(metrics) => Some(metrics),
976				Err(err) => {
977					log::error!(
978						target: LOG_TARGET,
979						"Failed to register `ChainSync` metrics {err:?}",
980					);
981					None
982				},
983			}),
984		};
985
986		sync.reset_sync_start_point()?;
987		initial_peers.for_each(|(peer_id, best_hash, best_number)| {
988			sync.add_peer(peer_id, best_hash, best_number);
989		});
990
991		Ok(sync)
992	}
993
994	/// Complete the gap sync if the target number is reached and there is a gap.
995	fn complete_gap_if_target(&mut self, number: NumberFor<B>) {
996		let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number);
997		if gap_sync_complete {
998			info!(
999				target: LOG_TARGET,
1000				"Block history download is complete."
1001			);
1002			self.gap_sync = None;
1003		}
1004	}
1005
1006	#[must_use]
1007	fn add_peer_inner(
1008		&mut self,
1009		peer_id: PeerId,
1010		best_hash: B::Hash,
1011		best_number: NumberFor<B>,
1012	) -> Result<Option<BlockRequest<B>>, BadPeer> {
1013		// There is nothing sync can get from the node that has no blockchain data.
1014		match self.block_status(&best_hash) {
1015			Err(e) => {
1016				debug!(target: LOG_TARGET, "Error reading blockchain: {e}");
1017				Err(BadPeer(peer_id, rep::BLOCKCHAIN_READ_ERROR))
1018			},
1019			Ok(BlockStatus::KnownBad) => {
1020				info!(
1021					"💔 New peer {peer_id} with known bad best block {best_hash} ({best_number})."
1022				);
1023				Err(BadPeer(peer_id, rep::BAD_BLOCK))
1024			},
1025			Ok(BlockStatus::Unknown) => {
1026				if best_number.is_zero() {
1027					info!(
1028						"💔 New peer {} with unknown genesis hash {} ({}).",
1029						peer_id, best_hash, best_number,
1030					);
1031					return Err(BadPeer(peer_id, rep::GENESIS_MISMATCH));
1032				}
1033
1034				// If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have
1035				// enough to do in the import queue that it's not worth kicking off
1036				// an ancestor search, which is what we do in the next match case below.
1037				if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS as usize {
1038					debug!(
1039						target: LOG_TARGET,
1040						"New peer {} with unknown best hash {} ({}), assuming common block.",
1041						peer_id,
1042						self.best_queued_hash,
1043						self.best_queued_number
1044					);
1045					self.peers.insert(
1046						peer_id,
1047						PeerSync {
1048							peer_id,
1049							common_number: self.best_queued_number,
1050							best_hash,
1051							best_number,
1052							state: PeerSyncState::Available,
1053						},
1054					);
1055					return Ok(None);
1056				}
1057
1058				// If we are at genesis, just start downloading.
1059				let (state, req) = if self.best_queued_number.is_zero() {
1060					debug!(
1061						target: LOG_TARGET,
1062						"New peer {peer_id} with best hash {best_hash} ({best_number}).",
1063					);
1064
1065					(PeerSyncState::Available, None)
1066				} else {
1067					let common_best = std::cmp::min(self.best_queued_number, best_number);
1068
1069					debug!(
1070						target: LOG_TARGET,
1071						"New peer {} with unknown best hash {} ({}), searching for common ancestor.",
1072						peer_id,
1073						best_hash,
1074						best_number
1075					);
1076
1077					(
1078						PeerSyncState::AncestorSearch {
1079							current: common_best,
1080							start: self.best_queued_number,
1081							state: AncestorSearchState::ExponentialBackoff(One::one()),
1082						},
1083						Some(ancestry_request::<B>(common_best)),
1084					)
1085				};
1086
1087				self.allowed_requests.add(&peer_id);
1088				self.peers.insert(
1089					peer_id,
1090					PeerSync {
1091						peer_id,
1092						common_number: Zero::zero(),
1093						best_hash,
1094						best_number,
1095						state,
1096					},
1097				);
1098
1099				Ok(req)
1100			},
1101			Ok(BlockStatus::Queued)
1102			| Ok(BlockStatus::InChainWithState)
1103			| Ok(BlockStatus::InChainPruned) => {
1104				debug!(
1105					target: LOG_TARGET,
1106					"New peer {peer_id} with known best hash {best_hash} ({best_number}).",
1107				);
1108				self.peers.insert(
1109					peer_id,
1110					PeerSync {
1111						peer_id,
1112						common_number: std::cmp::min(self.best_queued_number, best_number),
1113						best_hash,
1114						best_number,
1115						state: PeerSyncState::Available,
1116					},
1117				);
1118				self.allowed_requests.add(&peer_id);
1119				Ok(None)
1120			},
1121		}
1122	}
1123
1124	fn create_block_request_action(
1125		&mut self,
1126		peer_id: PeerId,
1127		request: BlockRequest<B>,
1128	) -> SyncingAction<B> {
1129		let downloader = self.block_downloader.clone();
1130
1131		SyncingAction::StartRequest {
1132			peer_id,
1133			key: Self::STRATEGY_KEY,
1134			request: async move {
1135				Ok(downloader.download_blocks(peer_id, request.clone()).await?.and_then(
1136					|(response, protocol_name)| {
1137						let decoded_response =
1138							downloader.block_response_into_blocks(&request, response);
1139						let result = Box::new((request, decoded_response)) as Box<dyn Any + Send>;
1140						Ok((result, protocol_name))
1141					},
1142				))
1143			}
1144			.boxed(),
1145			// Sending block request implies dropping obsolete pending response as we are not
1146			// interested in it anymore.
1147			remove_obsolete: true,
1148		}
1149	}
1150
1151	/// Submit a block response for processing.
1152	#[must_use]
1153	fn on_block_data(
1154		&mut self,
1155		peer_id: &PeerId,
1156		request: Option<BlockRequest<B>>,
1157		response: BlockResponse<B>,
1158	) -> Result<(), BadPeer> {
1159		self.downloaded_blocks += response.blocks.len();
1160		let mut gap = false;
1161		let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(peer_id) {
1162			let mut blocks = response.blocks;
1163			if request.as_ref().map_or(false, |r| r.direction == Direction::Descending) {
1164				trace!(target: LOG_TARGET, "Reversing incoming block list");
1165				blocks.reverse()
1166			}
1167			self.allowed_requests.add(peer_id);
1168			if let Some(request) = request {
1169				match &mut peer.state {
1170					PeerSyncState::DownloadingNew(_) => {
1171						self.blocks.clear_peer_download(peer_id);
1172						peer.state = PeerSyncState::Available;
1173						if let Some(start_block) =
1174							validate_blocks::<B>(&blocks, peer_id, Some(request))?
1175						{
1176							self.blocks.insert(start_block, blocks, *peer_id);
1177						}
1178						self.ready_blocks()
1179					},
1180					PeerSyncState::DownloadingGap(_) => {
1181						peer.state = PeerSyncState::Available;
1182						if let Some(gap_sync) = &mut self.gap_sync {
1183							gap_sync.blocks.clear_peer_download(peer_id);
1184							if let Some(start_block) =
1185								validate_blocks::<B>(&blocks, peer_id, Some(request))?
1186							{
1187								gap_sync.blocks.insert(start_block, blocks, *peer_id);
1188							}
1189							gap = true;
1190							let blocks: Vec<_> = gap_sync
1191								.blocks
1192								.ready_blocks(gap_sync.best_queued_number + One::one())
1193								.into_iter()
1194								.map(|block_data| {
1195									let justifications =
1196										block_data.block.justifications.or_else(|| {
1197											legacy_justification_mapping(
1198												block_data.block.justification,
1199											)
1200										});
1201									IncomingBlock {
1202										hash: block_data.block.hash,
1203										header: block_data.block.header,
1204										body: block_data.block.body,
1205										indexed_body: block_data.block.indexed_body,
1206										justifications,
1207										origin: block_data.origin,
1208										allow_missing_state: true,
1209										import_existing: self.import_existing,
1210										skip_execution: true,
1211										state: None,
1212									}
1213								})
1214								.collect();
1215							debug!(
1216								target: LOG_TARGET,
1217								"Drained {} gap blocks from {}",
1218								blocks.len(),
1219								gap_sync.best_queued_number,
1220							);
1221							blocks
1222						} else {
1223							debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}");
1224							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1225						}
1226					},
1227					PeerSyncState::DownloadingStale(_) => {
1228						peer.state = PeerSyncState::Available;
1229						if blocks.is_empty() {
1230							debug!(target: LOG_TARGET, "Empty block response from {peer_id}");
1231							return Err(BadPeer(*peer_id, rep::NO_BLOCK));
1232						}
1233						validate_blocks::<B>(&blocks, peer_id, Some(request))?;
1234						blocks
1235							.into_iter()
1236							.map(|b| {
1237								let justifications = b
1238									.justifications
1239									.or_else(|| legacy_justification_mapping(b.justification));
1240								IncomingBlock {
1241									hash: b.hash,
1242									header: b.header,
1243									body: b.body,
1244									indexed_body: None,
1245									justifications,
1246									origin: Some(*peer_id),
1247									allow_missing_state: true,
1248									import_existing: self.import_existing,
1249									skip_execution: self.skip_execution(),
1250									state: None,
1251								}
1252							})
1253							.collect()
1254					},
1255					PeerSyncState::AncestorSearch { current, start, state } => {
1256						let matching_hash = match (blocks.get(0), self.client.hash(*current)) {
1257							(Some(block), Ok(maybe_our_block_hash)) => {
1258								trace!(
1259									target: LOG_TARGET,
1260									"Got ancestry block #{} ({}) from peer {}",
1261									current,
1262									block.hash,
1263									peer_id,
1264								);
1265								maybe_our_block_hash.filter(|x| x == &block.hash)
1266							},
1267							(None, _) => {
1268								debug!(
1269									target: LOG_TARGET,
1270									"Invalid response when searching for ancestor from {peer_id}",
1271								);
1272								return Err(BadPeer(*peer_id, rep::UNKNOWN_ANCESTOR));
1273							},
1274							(_, Err(e)) => {
1275								info!(
1276									target: LOG_TARGET,
1277									"❌ Error answering legitimate blockchain query: {e}",
1278								);
1279								return Err(BadPeer(*peer_id, rep::BLOCKCHAIN_READ_ERROR));
1280							},
1281						};
1282						if matching_hash.is_some() {
1283							if *start < self.best_queued_number
1284								&& self.best_queued_number <= peer.best_number
1285							{
1286								// We've made progress on this chain since the search was started.
1287								// Opportunistically set common number to updated number
1288								// instead of the one that started the search.
1289								trace!(
1290									target: LOG_TARGET,
1291									"Ancestry search: opportunistically updating peer {} common number from={} => to={}.",
1292									*peer_id,
1293									peer.common_number,
1294									self.best_queued_number,
1295								);
1296								peer.common_number = self.best_queued_number;
1297							} else if peer.common_number < *current {
1298								trace!(
1299									target: LOG_TARGET,
1300									"Ancestry search: updating peer {} common number from={} => to={}.",
1301									*peer_id,
1302									peer.common_number,
1303									*current,
1304								);
1305								peer.common_number = *current;
1306							}
1307						}
1308						if matching_hash.is_none() && current.is_zero() {
1309							trace!(
1310								target: LOG_TARGET,
1311								"Ancestry search: genesis mismatch for peer {peer_id}",
1312							);
1313							return Err(BadPeer(*peer_id, rep::GENESIS_MISMATCH));
1314						}
1315						if let Some((next_state, next_num)) =
1316							handle_ancestor_search_state(state, *current, matching_hash.is_some())
1317						{
1318							peer.state = PeerSyncState::AncestorSearch {
1319								current: next_num,
1320								start: *start,
1321								state: next_state,
1322							};
1323							let request = ancestry_request::<B>(next_num);
1324							let action = self.create_block_request_action(*peer_id, request);
1325							self.actions.push(action);
1326							return Ok(());
1327						} else {
1328							// Ancestry search is complete. Check if peer is on a stale fork unknown
1329							// to us and add it to sync targets if necessary.
1330							trace!(
1331								target: LOG_TARGET,
1332								"Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
1333								self.best_queued_hash,
1334								self.best_queued_number,
1335								peer.best_hash,
1336								peer.best_number,
1337								matching_hash,
1338								peer.common_number,
1339							);
1340							if peer.common_number < peer.best_number
1341								&& peer.best_number < self.best_queued_number
1342							{
1343								trace!(
1344									target: LOG_TARGET,
1345									"Added fork target {} for {}",
1346									peer.best_hash,
1347									peer_id,
1348								);
1349								self.fork_targets
1350									.entry(peer.best_hash)
1351									.or_insert_with(|| {
1352										if let Some(metrics) = &self.metrics {
1353											metrics.fork_targets.inc();
1354										}
1355
1356										ForkTarget {
1357											number: peer.best_number,
1358											parent_hash: None,
1359											peers: Default::default(),
1360										}
1361									})
1362									.peers
1363									.insert(*peer_id);
1364							}
1365							peer.state = PeerSyncState::Available;
1366							return Ok(());
1367						}
1368					},
1369					PeerSyncState::Available
1370					| PeerSyncState::DownloadingJustification(..)
1371					| PeerSyncState::DownloadingState => Vec::new(),
1372				}
1373			} else {
1374				// When request.is_none() this is a block announcement. Just accept blocks.
1375				validate_blocks::<B>(&blocks, peer_id, None)?;
1376				blocks
1377					.into_iter()
1378					.map(|b| {
1379						let justifications = b
1380							.justifications
1381							.or_else(|| legacy_justification_mapping(b.justification));
1382						IncomingBlock {
1383							hash: b.hash,
1384							header: b.header,
1385							body: b.body,
1386							indexed_body: None,
1387							justifications,
1388							origin: Some(*peer_id),
1389							allow_missing_state: true,
1390							import_existing: false,
1391							skip_execution: true,
1392							state: None,
1393						}
1394					})
1395					.collect()
1396			}
1397		} else {
1398			// We don't know of this peer, so we also did not request anything from it.
1399			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
1400		};
1401
1402		self.validate_and_queue_blocks(new_blocks, gap);
1403
1404		Ok(())
1405	}
1406
1407	fn on_block_response(
1408		&mut self,
1409		peer_id: &PeerId,
1410		key: StrategyKey,
1411		request: BlockRequest<B>,
1412		blocks: Vec<BlockData<B>>,
1413	) -> Result<(), BadPeer> {
1414		if key != Self::STRATEGY_KEY {
1415			error!(
1416				target: LOG_TARGET,
1417				"`on_block_response()` called with unexpected key {key:?} for chain sync",
1418			);
1419			debug_assert!(false);
1420		}
1421		let block_response = BlockResponse::<B> { id: request.id, blocks };
1422
1423		let blocks_range = || match (
1424			block_response
1425				.blocks
1426				.first()
1427				.and_then(|b| b.header.as_ref().map(|h| h.number())),
1428			block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
1429		) {
1430			(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
1431			(Some(first), Some(_)) => format!(" ({})", first),
1432			_ => Default::default(),
1433		};
1434		trace!(
1435			target: LOG_TARGET,
1436			"BlockResponse {} from {} with {} blocks {}",
1437			block_response.id,
1438			peer_id,
1439			block_response.blocks.len(),
1440			blocks_range(),
1441		);
1442
1443		if request.fields == BlockAttributes::JUSTIFICATION {
1444			self.on_block_justification(*peer_id, block_response)
1445		} else {
1446			self.on_block_data(peer_id, Some(request), block_response)
1447		}
1448	}
1449
1450	/// Submit a justification response for processing.
1451	#[must_use]
1452	fn on_block_justification(
1453		&mut self,
1454		peer_id: PeerId,
1455		response: BlockResponse<B>,
1456	) -> Result<(), BadPeer> {
1457		let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
1458			peer
1459		} else {
1460			error!(
1461				target: LOG_TARGET,
1462				"💔 Called on_block_justification with a peer ID of an unknown peer",
1463			);
1464			return Ok(());
1465		};
1466
1467		self.allowed_requests.add(&peer_id);
1468		if let PeerSyncState::DownloadingJustification(hash) = peer.state {
1469			peer.state = PeerSyncState::Available;
1470
1471			// We only request one justification at a time
1472			let justification = if let Some(block) = response.blocks.into_iter().next() {
1473				if hash != block.hash {
1474					warn!(
1475						target: LOG_TARGET,
1476						"💔 Invalid block justification provided by {}: requested: {:?} got: {:?}",
1477						peer_id,
1478						hash,
1479						block.hash,
1480					);
1481					return Err(BadPeer(peer_id, rep::BAD_JUSTIFICATION));
1482				}
1483
1484				block
1485					.justifications
1486					.or_else(|| legacy_justification_mapping(block.justification))
1487			} else {
1488				// we might have asked the peer for a justification on a block that we assumed it
1489				// had but didn't (regardless of whether it had a justification for it or not).
1490				trace!(
1491					target: LOG_TARGET,
1492					"Peer {peer_id:?} provided empty response for justification request {hash:?}",
1493				);
1494
1495				None
1496			};
1497
1498			if let Some((peer_id, hash, number, justifications)) =
1499				self.extra_justifications.on_response(peer_id, justification)
1500			{
1501				self.actions.push(SyncingAction::ImportJustifications {
1502					peer_id,
1503					hash,
1504					number,
1505					justifications,
1506				});
1507				return Ok(());
1508			}
1509		}
1510
1511		Ok(())
1512	}
1513
1514	/// Returns the median seen block number.
1515	fn median_seen(&self) -> Option<NumberFor<B>> {
1516		let mut best_seens = self.peers.values().map(|p| p.best_number).collect::<Vec<_>>();
1517
1518		if best_seens.is_empty() {
1519			None
1520		} else {
1521			let middle = best_seens.len() / 2;
1522
1523			// Not the "perfect median" when we have an even number of peers.
1524			Some(*best_seens.select_nth_unstable(middle).1)
1525		}
1526	}
1527
1528	fn required_block_attributes(&self) -> BlockAttributes {
1529		match self.mode {
1530			ChainSyncMode::Full => {
1531				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
1532			},
1533			ChainSyncMode::LightState { storage_chain_mode: false, .. } => {
1534				BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY
1535			},
1536			ChainSyncMode::LightState { storage_chain_mode: true, .. } => {
1537				BlockAttributes::HEADER
1538					| BlockAttributes::JUSTIFICATION
1539					| BlockAttributes::INDEXED_BODY
1540			},
1541		}
1542	}
1543
1544	fn skip_execution(&self) -> bool {
1545		match self.mode {
1546			ChainSyncMode::Full => false,
1547			ChainSyncMode::LightState { .. } => true,
1548		}
1549	}
1550
1551	fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec<IncomingBlock<B>>, gap: bool) {
1552		let orig_len = new_blocks.len();
1553		new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
1554		if new_blocks.len() != orig_len {
1555			debug!(
1556				target: LOG_TARGET,
1557				"Ignoring {} blocks that are already queued",
1558				orig_len - new_blocks.len(),
1559			);
1560		}
1561
1562		let origin = if !gap && !self.status().state.is_major_syncing() {
1563			BlockOrigin::NetworkBroadcast
1564		} else {
1565			BlockOrigin::NetworkInitialSync
1566		};
1567
1568		if let Some((h, n)) = new_blocks
1569			.last()
1570			.and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number())))
1571		{
1572			trace!(
1573				target: LOG_TARGET,
1574				"Accepted {} blocks ({:?}) with origin {:?}",
1575				new_blocks.len(),
1576				h,
1577				origin,
1578			);
1579			self.on_block_queued(h, n)
1580		}
1581		self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash));
1582		if let Some(metrics) = &self.metrics {
1583			metrics
1584				.queued_blocks
1585				.set(self.queue_blocks.len().try_into().unwrap_or(u64::MAX));
1586		}
1587
1588		self.actions.push(SyncingAction::ImportBlocks { origin, blocks: new_blocks })
1589	}
1590
1591	fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor<B>) {
1592		if let Some(peer) = self.peers.get_mut(peer_id) {
1593			peer.update_common_number(new_common);
1594		}
1595	}
1596
1597	/// Called when a block has been queued for import.
1598	///
1599	/// Updates our internal state for best queued block and then goes
1600	/// through all peers to update our view of their state as well.
1601	fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
1602		if self.fork_targets.remove(hash).is_some() {
1603			if let Some(metrics) = &self.metrics {
1604				metrics.fork_targets.dec();
1605			}
1606			trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
1607		}
1608		if let Some(gap_sync) = &mut self.gap_sync {
1609			if number > gap_sync.best_queued_number && number <= gap_sync.target {
1610				gap_sync.best_queued_number = number;
1611			}
1612		}
1613		if number > self.best_queued_number {
1614			self.best_queued_number = number;
1615			self.best_queued_hash = *hash;
1616			// Update common blocks
1617			for (n, peer) in self.peers.iter_mut() {
1618				if let PeerSyncState::AncestorSearch { .. } = peer.state {
1619					// Wait for ancestry search to complete first.
1620					continue;
1621				}
1622				let new_common_number =
1623					if peer.best_number >= number { number } else { peer.best_number };
1624				trace!(
1625					target: LOG_TARGET,
1626					"Updating peer {} info, ours={}, common={}->{}, their best={}",
1627					n,
1628					number,
1629					peer.common_number,
1630					new_common_number,
1631					peer.best_number,
1632				);
1633				peer.common_number = new_common_number;
1634			}
1635		}
1636		self.allowed_requests.set_all();
1637	}
1638
1639	/// Restart the sync process. This will reset all pending block requests and return an iterator
1640	/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
1641	/// their state was `DownloadingJustification`) are unaffected and will stay in the same state.
1642	fn restart(&mut self) {
1643		self.blocks.clear();
1644		if let Err(e) = self.reset_sync_start_point() {
1645			warn!(target: LOG_TARGET, "💔  Unable to restart sync: {e}");
1646		}
1647		self.allowed_requests.set_all();
1648		debug!(
1649			target: LOG_TARGET,
1650			"Restarted with {} ({})",
1651			self.best_queued_number,
1652			self.best_queued_hash,
1653		);
1654		let old_peers = std::mem::take(&mut self.peers);
1655
1656		old_peers.into_iter().for_each(|(peer_id, mut peer_sync)| {
1657			match peer_sync.state {
1658				PeerSyncState::Available => {
1659					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1660				},
1661				PeerSyncState::AncestorSearch { .. }
1662				| PeerSyncState::DownloadingNew(_)
1663				| PeerSyncState::DownloadingStale(_)
1664				| PeerSyncState::DownloadingGap(_)
1665				| PeerSyncState::DownloadingState => {
1666					// Cancel a request first, as `add_peer` may generate a new request.
1667					self.actions
1668						.push(SyncingAction::CancelRequest { peer_id, key: Self::STRATEGY_KEY });
1669					self.add_peer(peer_id, peer_sync.best_hash, peer_sync.best_number);
1670				},
1671				PeerSyncState::DownloadingJustification(_) => {
1672					// Peers that were downloading justifications
1673					// should be kept in that state.
1674					// We make sure our common number is at least something we have.
1675					trace!(
1676						target: LOG_TARGET,
1677						"Keeping peer {} after restart, updating common number from={} => to={} (our best).",
1678						peer_id,
1679						peer_sync.common_number,
1680						self.best_queued_number,
1681					);
1682					peer_sync.common_number = self.best_queued_number;
1683					self.peers.insert(peer_id, peer_sync);
1684				},
1685			}
1686		});
1687	}
1688
1689	/// Find a block to start sync from. If we sync with state, that's the latest block we have
1690	/// state for.
1691	fn reset_sync_start_point(&mut self) -> Result<(), ClientError> {
1692		let info = self.client.info();
1693		debug!(target: LOG_TARGET, "Restarting sync with client info {info:?}");
1694
1695		if matches!(self.mode, ChainSyncMode::LightState { .. }) && info.finalized_state.is_some() {
1696			warn!(
1697				target: LOG_TARGET,
1698				"Can't use fast sync mode with a partially synced database. Reverting to full sync mode."
1699			);
1700			self.mode = ChainSyncMode::Full;
1701		}
1702
1703		self.import_existing = false;
1704		self.best_queued_hash = info.best_hash;
1705		self.best_queued_number = info.best_number;
1706
1707		if self.mode == ChainSyncMode::Full
1708			&& self.client.block_status(info.best_hash)? != BlockStatus::InChainWithState
1709		{
1710			self.import_existing = true;
1711			// Latest state is missing, start with the last finalized state or genesis instead.
1712			if let Some((hash, number)) = info.finalized_state {
1713				debug!(target: LOG_TARGET, "Starting from finalized state #{number}");
1714				self.best_queued_hash = hash;
1715				self.best_queued_number = number;
1716			} else {
1717				debug!(target: LOG_TARGET, "Restarting from genesis");
1718				self.best_queued_hash = Default::default();
1719				self.best_queued_number = Zero::zero();
1720			}
1721		}
1722
1723		if let Some(BlockGap { start, end, .. }) = info.block_gap {
1724			let old_gap = self.gap_sync.take().map(|g| (g.best_queued_number, g.target));
1725			debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end} (old gap best and target: {old_gap:?})");
1726			self.gap_sync = Some(GapSync {
1727				best_queued_number: start - One::one(),
1728				target: end,
1729				blocks: BlockCollection::new(),
1730			});
1731		}
1732		trace!(
1733			target: LOG_TARGET,
1734			"Restarted sync at #{} ({:?})",
1735			self.best_queued_number,
1736			self.best_queued_hash,
1737		);
1738		Ok(())
1739	}
1740
1741	/// What is the status of the block corresponding to the given hash?
1742	fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
1743		if self.queue_blocks.contains(hash) {
1744			return Ok(BlockStatus::Queued);
1745		}
1746		self.client.block_status(*hash)
1747	}
1748
1749	/// Is the block corresponding to the given hash known?
1750	fn is_known(&self, hash: &B::Hash) -> bool {
1751		self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
1752	}
1753
1754	/// Is any peer downloading the given hash?
1755	fn is_already_downloading(&self, hash: &B::Hash) -> bool {
1756		self.peers
1757			.iter()
1758			.any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
1759	}
1760
1761	/// Get the set of downloaded blocks that are ready to be queued for import.
1762	fn ready_blocks(&mut self) -> Vec<IncomingBlock<B>> {
1763		self.blocks
1764			.ready_blocks(self.best_queued_number + One::one())
1765			.into_iter()
1766			.map(|block_data| {
1767				let justifications = block_data
1768					.block
1769					.justifications
1770					.or_else(|| legacy_justification_mapping(block_data.block.justification));
1771				IncomingBlock {
1772					hash: block_data.block.hash,
1773					header: block_data.block.header,
1774					body: block_data.block.body,
1775					indexed_body: block_data.block.indexed_body,
1776					justifications,
1777					origin: block_data.origin,
1778					allow_missing_state: true,
1779					import_existing: self.import_existing,
1780					skip_execution: self.skip_execution(),
1781					state: None,
1782				}
1783			})
1784			.collect()
1785	}
1786
1787	/// Get justification requests scheduled by sync to be sent out.
1788	fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1789		let peers = &mut self.peers;
1790		let mut matcher = self.extra_justifications.matcher();
1791		std::iter::from_fn(move || {
1792			if let Some((peer, request)) = matcher.next(peers) {
1793				peers
1794					.get_mut(&peer)
1795					.expect(
1796						"`Matcher::next` guarantees the `PeerId` comes from the given peers; qed",
1797					)
1798					.state = PeerSyncState::DownloadingJustification(request.0);
1799				let req = BlockRequest::<B> {
1800					id: 0,
1801					fields: BlockAttributes::JUSTIFICATION,
1802					from: FromBlock::Hash(request.0),
1803					direction: Direction::Ascending,
1804					max: Some(1),
1805				};
1806				Some((peer, req))
1807			} else {
1808				None
1809			}
1810		})
1811		.collect()
1812	}
1813
1814	/// Get block requests scheduled by sync to be sent out.
1815	fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest<B>)> {
1816		if self.allowed_requests.is_empty() || self.state_sync.is_some() {
1817			return Vec::new();
1818		}
1819
1820		if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
1821			trace!(target: LOG_TARGET, "Too many blocks in the queue.");
1822			return Vec::new();
1823		}
1824		let is_major_syncing = self.status().state.is_major_syncing();
1825		let attrs = self.required_block_attributes();
1826		let blocks = &mut self.blocks;
1827		let fork_targets = &mut self.fork_targets;
1828		let last_finalized =
1829			std::cmp::min(self.best_queued_number, self.client.info().finalized_number);
1830		let best_queued = self.best_queued_number;
1831		let client = &self.client;
1832		let queue_blocks = &self.queue_blocks;
1833		let allowed_requests = self.allowed_requests.clone();
1834		let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads };
1835		let max_blocks_per_request = self.max_blocks_per_request;
1836		let gap_sync = &mut self.gap_sync;
1837		let disconnected_peers = &mut self.disconnected_peers;
1838		let metrics = self.metrics.as_ref();
1839		let requests = self
1840			.peers
1841			.iter_mut()
1842			.filter_map(move |(&id, peer)| {
1843				if !peer.state.is_available()
1844					|| !allowed_requests.contains(&id)
1845					|| !disconnected_peers.is_peer_available(&id)
1846				{
1847					return None;
1848				}
1849
1850				// If our best queued is more than `MAX_BLOCKS_TO_LOOK_BACKWARDS` blocks away from
1851				// the common number, the peer best number is higher than our best queued and the
1852				// common number is smaller than the last finalized block number, we should do an
1853				// ancestor search to find a better common block. If the queue is full we wait till
1854				// all blocks are imported though.
1855				if best_queued.saturating_sub(peer.common_number)
1856					> MAX_BLOCKS_TO_LOOK_BACKWARDS.into()
1857					&& best_queued < peer.best_number
1858					&& peer.common_number < last_finalized
1859					&& queue_blocks.len() <= MAJOR_SYNC_BLOCKS as usize
1860				{
1861					trace!(
1862						target: LOG_TARGET,
1863						"Peer {:?} common block {} too far behind of our best {}. Starting ancestry search.",
1864						id,
1865						peer.common_number,
1866						best_queued,
1867					);
1868					let current = std::cmp::min(peer.best_number, best_queued);
1869					peer.state = PeerSyncState::AncestorSearch {
1870						current,
1871						start: best_queued,
1872						state: AncestorSearchState::ExponentialBackoff(One::one()),
1873					};
1874					Some((id, ancestry_request::<B>(current)))
1875				} else if let Some((range, req)) = peer_block_request(
1876					&id,
1877					peer,
1878					blocks,
1879					attrs,
1880					max_parallel,
1881					max_blocks_per_request,
1882					last_finalized,
1883					best_queued,
1884				) {
1885					peer.state = PeerSyncState::DownloadingNew(range.start);
1886					trace!(
1887						target: LOG_TARGET,
1888						"New block request for {}, (best:{}, common:{}) {:?}",
1889						id,
1890						peer.best_number,
1891						peer.common_number,
1892						req,
1893					);
1894					Some((id, req))
1895				} else if let Some((hash, req)) = fork_sync_request(
1896					&id,
1897					fork_targets,
1898					best_queued,
1899					last_finalized,
1900					attrs,
1901					|hash| {
1902						if queue_blocks.contains(hash) {
1903							BlockStatus::Queued
1904						} else {
1905							client.block_status(*hash).unwrap_or(BlockStatus::Unknown)
1906						}
1907					},
1908					max_blocks_per_request,
1909					metrics,
1910				) {
1911					trace!(target: LOG_TARGET, "Downloading fork {hash:?} from {id}");
1912					peer.state = PeerSyncState::DownloadingStale(hash);
1913					Some((id, req))
1914				} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
1915					peer_gap_block_request(
1916						&id,
1917						peer,
1918						&mut sync.blocks,
1919						attrs,
1920						sync.target,
1921						sync.best_queued_number,
1922						max_blocks_per_request,
1923					)
1924				}) {
1925					peer.state = PeerSyncState::DownloadingGap(range.start);
1926					trace!(
1927						target: LOG_TARGET,
1928						"New gap block request for {}, (best:{}, common:{}) {:?}",
1929						id,
1930						peer.best_number,
1931						peer.common_number,
1932						req,
1933					);
1934					Some((id, req))
1935				} else {
1936					None
1937				}
1938			})
1939			.collect::<Vec<_>>();
1940
1941		// Clear the allowed_requests state when sending new block requests
1942		// to prevent multiple inflight block requests from being issued.
1943		if !requests.is_empty() {
1944			self.allowed_requests.take();
1945		}
1946
1947		requests
1948	}
1949
1950	/// Get a state request scheduled by sync to be sent out (if any).
1951	fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
1952		if self.allowed_requests.is_empty() {
1953			return None;
1954		}
1955		if self.state_sync.is_some()
1956			&& self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState)
1957		{
1958			// Only one pending state request is allowed.
1959			return None;
1960		}
1961		if let Some(sync) = &self.state_sync {
1962			if sync.is_complete() {
1963				return None;
1964			}
1965
1966			for (id, peer) in self.peers.iter_mut() {
1967				if peer.state.is_available()
1968					&& peer.common_number >= sync.target_number()
1969					&& self.disconnected_peers.is_peer_available(&id)
1970				{
1971					peer.state = PeerSyncState::DownloadingState;
1972					let request = sync.next_request();
1973					trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request);
1974					self.allowed_requests.clear();
1975					return Some((*id, request));
1976				}
1977			}
1978		}
1979		None
1980	}
1981
1982	#[must_use]
1983	fn on_state_data(&mut self, peer_id: &PeerId, response: &[u8]) -> Result<(), BadPeer> {
1984		let response = match StateResponse::decode(response) {
1985			Ok(response) => response,
1986			Err(error) => {
1987				debug!(
1988					target: LOG_TARGET,
1989					"Failed to decode state response from peer {peer_id:?}: {error:?}.",
1990				);
1991
1992				return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
1993			},
1994		};
1995
1996		if let Some(peer) = self.peers.get_mut(peer_id) {
1997			if let PeerSyncState::DownloadingState = peer.state {
1998				peer.state = PeerSyncState::Available;
1999				self.allowed_requests.set_all();
2000			}
2001		}
2002		let import_result = if let Some(sync) = &mut self.state_sync {
2003			debug!(
2004				target: LOG_TARGET,
2005				"Importing state data from {} with {} keys, {} proof nodes.",
2006				peer_id,
2007				response.entries.len(),
2008				response.proof.len(),
2009			);
2010			sync.import(response)
2011		} else {
2012			debug!(target: LOG_TARGET, "Ignored obsolete state response from {peer_id}");
2013			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2014		};
2015
2016		match import_result {
2017			ImportResult::Import(hash, header, state, body, justifications) => {
2018				let origin = BlockOrigin::NetworkInitialSync;
2019				let block = IncomingBlock {
2020					hash,
2021					header: Some(header),
2022					body,
2023					indexed_body: None,
2024					justifications,
2025					origin: None,
2026					allow_missing_state: true,
2027					import_existing: true,
2028					skip_execution: self.skip_execution(),
2029					state: Some(state),
2030				};
2031				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
2032				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
2033				Ok(())
2034			},
2035			ImportResult::Continue => Ok(()),
2036			ImportResult::BadResponse => {
2037				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
2038				Err(BadPeer(*peer_id, rep::BAD_BLOCK))
2039			},
2040		}
2041	}
2042
2043	fn attempt_state_sync(
2044		&mut self,
2045		finalized_hash: B::Hash,
2046		finalized_number: NumberFor<B>,
2047		skip_proofs: bool,
2048	) {
2049		let mut heads: Vec<_> = self.peers.values().map(|peer| peer.best_number).collect();
2050		heads.sort();
2051		let median = heads[heads.len() / 2];
2052		if finalized_number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median {
2053			if let Ok(Some(header)) = self.client.header(finalized_hash) {
2054				log::debug!(
2055					target: LOG_TARGET,
2056					"Starting state sync for #{finalized_number} ({finalized_hash})",
2057				);
2058				self.state_sync =
2059					Some(StateSync::new(self.client.clone(), header, None, None, skip_proofs));
2060				self.allowed_requests.set_all();
2061			} else {
2062				log::error!(
2063					target: LOG_TARGET,
2064					"Failed to start state sync: header for finalized block \
2065					  #{finalized_number} ({finalized_hash}) is not available",
2066				);
2067				debug_assert!(false);
2068			}
2069		}
2070	}
2071
2072	/// A version of `actions()` that doesn't schedule extra requests. For testing only.
2073	#[cfg(test)]
2074	#[must_use]
2075	fn take_actions(&mut self) -> impl Iterator<Item = SyncingAction<B>> {
2076		std::mem::take(&mut self.actions).into_iter()
2077	}
2078}
2079
2080// This is purely during a backwards compatible transitionary period and should be removed
2081// once we can assume all nodes can send and receive multiple Justifications
2082// The ID tag is hardcoded here to avoid depending on the GRANDPA crate.
2083// See: https://github.com/pezkuwichain/pezkuwi-sdk/issues/32
2084fn legacy_justification_mapping(
2085	justification: Option<EncodedJustification>,
2086) -> Option<Justifications> {
2087	justification.map(|just| (*b"FRNK", just).into())
2088}
2089
2090/// Request the ancestry for a block. Sends a request for header and justification for the given
2091/// block number. Used during ancestry search.
2092fn ancestry_request<B: BlockT>(block: NumberFor<B>) -> BlockRequest<B> {
2093	BlockRequest::<B> {
2094		id: 0,
2095		fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION,
2096		from: FromBlock::Number(block),
2097		direction: Direction::Ascending,
2098		max: Some(1),
2099	}
2100}
2101
2102/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using
2103/// to try to find an ancestor block
2104#[derive(Copy, Clone, Eq, PartialEq, Debug)]
2105pub(crate) enum AncestorSearchState<B: BlockT> {
2106	/// Use exponential backoff to find an ancestor, then switch to binary search.
2107	/// We keep track of the exponent.
2108	ExponentialBackoff(NumberFor<B>),
2109	/// Using binary search to find the best ancestor.
2110	/// We keep track of left and right bounds.
2111	BinarySearch(NumberFor<B>, NumberFor<B>),
2112}
2113
2114/// This function handles the ancestor search strategy used. The goal is to find a common point
2115/// that both our chains agree on that is as close to the tip as possible.
2116/// The way this works is we first have an exponential backoff strategy, where we try to step
2117/// forward until we find a block hash mismatch. The size of the step doubles each step we take.
2118///
2119/// When we've found a block hash mismatch we then fall back to a binary search between the two
2120/// last known points to find the common block closest to the tip.
2121fn handle_ancestor_search_state<B: BlockT>(
2122	state: &AncestorSearchState<B>,
2123	curr_block_num: NumberFor<B>,
2124	block_hash_match: bool,
2125) -> Option<(AncestorSearchState<B>, NumberFor<B>)> {
2126	let two = <NumberFor<B>>::one() + <NumberFor<B>>::one();
2127	match state {
2128		AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => {
2129			let next_distance_to_tip = *next_distance_to_tip;
2130			if block_hash_match && next_distance_to_tip == One::one() {
2131				// We found the ancestor in the first step so there is no need to execute binary
2132				// search.
2133				return None;
2134			}
2135			if block_hash_match {
2136				let left = curr_block_num;
2137				let right = left + next_distance_to_tip / two;
2138				let middle = left + (right - left) / two;
2139				Some((AncestorSearchState::BinarySearch(left, right), middle))
2140			} else {
2141				let next_block_num =
2142					curr_block_num.checked_sub(&next_distance_to_tip).unwrap_or_else(Zero::zero);
2143				let next_distance_to_tip = next_distance_to_tip * two;
2144				Some((
2145					AncestorSearchState::ExponentialBackoff(next_distance_to_tip),
2146					next_block_num,
2147				))
2148			}
2149		},
2150		AncestorSearchState::BinarySearch(mut left, mut right) => {
2151			if left >= curr_block_num {
2152				return None;
2153			}
2154			if block_hash_match {
2155				left = curr_block_num;
2156			} else {
2157				right = curr_block_num;
2158			}
2159			assert!(right >= left);
2160			let middle = left + (right - left) / two;
2161			if middle == curr_block_num {
2162				None
2163			} else {
2164				Some((AncestorSearchState::BinarySearch(left, right), middle))
2165			}
2166		},
2167	}
2168}
2169
2170/// Get a new block request for the peer if any.
2171fn peer_block_request<B: BlockT>(
2172	id: &PeerId,
2173	peer: &PeerSync<B>,
2174	blocks: &mut BlockCollection<B>,
2175	attrs: BlockAttributes,
2176	max_parallel_downloads: u32,
2177	max_blocks_per_request: u32,
2178	finalized: NumberFor<B>,
2179	best_num: NumberFor<B>,
2180) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2181	if best_num >= peer.best_number {
2182		// Will be downloaded as alternative fork instead.
2183		return None;
2184	} else if peer.common_number < finalized {
2185		trace!(
2186			target: LOG_TARGET,
2187			"Requesting pre-finalized chain from {:?}, common={}, finalized={}, peer best={}, our best={}",
2188			id, peer.common_number, finalized, peer.best_number, best_num,
2189		);
2190	}
2191	let range = blocks.needed_blocks(
2192		*id,
2193		max_blocks_per_request,
2194		peer.best_number,
2195		peer.common_number,
2196		max_parallel_downloads,
2197		MAX_DOWNLOAD_AHEAD,
2198	)?;
2199
2200	// The end is not part of the range.
2201	let last = range.end.saturating_sub(One::one());
2202
2203	let from = if peer.best_number == last {
2204		FromBlock::Hash(peer.best_hash)
2205	} else {
2206		FromBlock::Number(last)
2207	};
2208
2209	let request = BlockRequest::<B> {
2210		id: 0,
2211		fields: attrs,
2212		from,
2213		direction: Direction::Descending,
2214		max: Some((range.end - range.start).saturated_into::<u32>()),
2215	};
2216
2217	Some((range, request))
2218}
2219
2220/// Get a new block request for the peer if any.
2221fn peer_gap_block_request<B: BlockT>(
2222	id: &PeerId,
2223	peer: &PeerSync<B>,
2224	blocks: &mut BlockCollection<B>,
2225	attrs: BlockAttributes,
2226	target: NumberFor<B>,
2227	common_number: NumberFor<B>,
2228	max_blocks_per_request: u32,
2229) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
2230	let range = blocks.needed_blocks(
2231		*id,
2232		max_blocks_per_request,
2233		std::cmp::min(peer.best_number, target),
2234		common_number,
2235		1,
2236		MAX_DOWNLOAD_AHEAD,
2237	)?;
2238
2239	// The end is not part of the range.
2240	let last = range.end.saturating_sub(One::one());
2241	let from = FromBlock::Number(last);
2242
2243	let request = BlockRequest::<B> {
2244		id: 0,
2245		fields: attrs,
2246		from,
2247		direction: Direction::Descending,
2248		max: Some((range.end - range.start).saturated_into::<u32>()),
2249	};
2250	Some((range, request))
2251}
2252
2253/// Get pending fork sync targets for a peer.
2254fn fork_sync_request<B: BlockT>(
2255	id: &PeerId,
2256	fork_targets: &mut HashMap<B::Hash, ForkTarget<B>>,
2257	best_num: NumberFor<B>,
2258	finalized: NumberFor<B>,
2259	attributes: BlockAttributes,
2260	check_block: impl Fn(&B::Hash) -> BlockStatus,
2261	max_blocks_per_request: u32,
2262	metrics: Option<&Metrics>,
2263) -> Option<(B::Hash, BlockRequest<B>)> {
2264	fork_targets.retain(|hash, r| {
2265		if r.number <= finalized {
2266			trace!(
2267				target: LOG_TARGET,
2268				"Removed expired fork sync request {:?} (#{})",
2269				hash,
2270				r.number,
2271			);
2272			return false;
2273		}
2274		if check_block(hash) != BlockStatus::Unknown {
2275			trace!(
2276				target: LOG_TARGET,
2277				"Removed obsolete fork sync request {:?} (#{})",
2278				hash,
2279				r.number,
2280			);
2281			return false;
2282		}
2283		true
2284	});
2285	if let Some(metrics) = metrics {
2286		metrics.fork_targets.set(fork_targets.len().try_into().unwrap_or(u64::MAX));
2287	}
2288	for (hash, r) in fork_targets {
2289		if !r.peers.contains(&id) {
2290			continue;
2291		}
2292		// Download the fork only if it is behind or not too far ahead our tip of the chain
2293		// Otherwise it should be downloaded in full sync mode.
2294		if r.number <= best_num
2295			|| (r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
2296		{
2297			let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
2298			let count = if parent_status == BlockStatus::Unknown {
2299				(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
2300			} else {
2301				// request only single block
2302				1
2303			};
2304			trace!(
2305				target: LOG_TARGET,
2306				"Downloading requested fork {hash:?} from {id}, {count} blocks",
2307			);
2308			return Some((
2309				*hash,
2310				BlockRequest::<B> {
2311					id: 0,
2312					fields: attributes,
2313					from: FromBlock::Hash(*hash),
2314					direction: Direction::Descending,
2315					max: Some(count),
2316				},
2317			));
2318		} else {
2319			trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
2320		}
2321	}
2322	None
2323}
2324
2325/// Returns `true` if the given `block` is a descendent of `base`.
2326fn is_descendent_of<Block, T>(
2327	client: &T,
2328	base: &Block::Hash,
2329	block: &Block::Hash,
2330) -> pezsp_blockchain::Result<bool>
2331where
2332	Block: BlockT,
2333	T: HeaderMetadata<Block, Error = pezsp_blockchain::Error> + ?Sized,
2334{
2335	if base == block {
2336		return Ok(false);
2337	}
2338
2339	let ancestor = pezsp_blockchain::lowest_common_ancestor(client, *block, *base)?;
2340
2341	Ok(ancestor.hash == *base)
2342}
2343
2344/// Validate that the given `blocks` are correct.
2345/// Returns the number of the first block in the sequence.
2346///
2347/// It is expected that `blocks` are in ascending order.
2348pub fn validate_blocks<Block: BlockT>(
2349	blocks: &Vec<BlockData<Block>>,
2350	peer_id: &PeerId,
2351	request: Option<BlockRequest<Block>>,
2352) -> Result<Option<NumberFor<Block>>, BadPeer> {
2353	if let Some(request) = request {
2354		if Some(blocks.len() as _) > request.max {
2355			debug!(
2356				target: LOG_TARGET,
2357				"Received more blocks than requested from {}. Expected in maximum {:?}, got {}.",
2358				peer_id,
2359				request.max,
2360				blocks.len(),
2361			);
2362
2363			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2364		}
2365
2366		let block_header =
2367			if request.direction == Direction::Descending { blocks.last() } else { blocks.first() }
2368				.and_then(|b| b.header.as_ref());
2369
2370		let expected_block = block_header.as_ref().map_or(false, |h| match request.from {
2371			FromBlock::Hash(hash) => h.hash() == hash,
2372			FromBlock::Number(n) => h.number() == &n,
2373		});
2374
2375		if !expected_block {
2376			debug!(
2377				target: LOG_TARGET,
2378				"Received block that was not requested. Requested {:?}, got {:?}.",
2379				request.from,
2380				block_header,
2381			);
2382
2383			return Err(BadPeer(*peer_id, rep::NOT_REQUESTED));
2384		}
2385
2386		if request.fields.contains(BlockAttributes::HEADER)
2387			&& blocks.iter().any(|b| b.header.is_none())
2388		{
2389			trace!(
2390				target: LOG_TARGET,
2391				"Missing requested header for a block in response from {peer_id}.",
2392			);
2393
2394			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2395		}
2396
2397		if request.fields.contains(BlockAttributes::BODY) && blocks.iter().any(|b| b.body.is_none())
2398		{
2399			trace!(
2400				target: LOG_TARGET,
2401				"Missing requested body for a block in response from {peer_id}.",
2402			);
2403
2404			return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
2405		}
2406	}
2407
2408	for b in blocks {
2409		if let Some(header) = &b.header {
2410			let hash = header.hash();
2411			if hash != b.hash {
2412				debug!(
2413					target: LOG_TARGET,
2414					"Bad header received from {}. Expected hash {:?}, got {:?}",
2415					peer_id,
2416					b.hash,
2417					hash,
2418				);
2419				return Err(BadPeer(*peer_id, rep::BAD_BLOCK));
2420			}
2421		}
2422	}
2423
2424	Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
2425}