sc_network_sync/
engine.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `SyncingEngine` is the actor responsible for syncing Substrate chain
20//! to tip and keep the blockchain up to date with network updates.
21
22use crate::{
23	block_announce_validator::{
24		BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25	},
26	pending_responses::{PendingResponses, ResponseEvent},
27	service::{
28		self,
29		syncing_service::{SyncingService, ToServiceCommand},
30	},
31	strategy::{SyncingAction, SyncingStrategy},
32	types::{BadPeer, ExtendedPeerInfo, SyncEvent},
33	LOG_TARGET,
34};
35
36use codec::{Decode, DecodeAll, Encode};
37use futures::{channel::oneshot, StreamExt};
38use log::{debug, error, trace, warn};
39use prometheus_endpoint::{
40	register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
41};
42use schnellru::{ByLength, LruMap};
43use tokio::time::{Interval, MissedTickBehavior};
44
45use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
46use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
47use sc_network::{
48	config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49	peer_store::PeerStoreProvider,
50	request_responses::{OutboundFailure, RequestFailure},
51	service::{
52		traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53		NotificationMetrics,
54	},
55	types::ProtocolName,
56	utils::LruHashSet,
57	NetworkBackend, NotificationService, ReputationChange,
58};
59use sc_network_common::{
60	role::Roles,
61	sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
62};
63use sc_network_types::PeerId;
64use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
65use sp_blockchain::{Error as ClientError, HeaderMetadata};
66use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
67use sp_runtime::{
68	traits::{Block as BlockT, Header, NumberFor, Zero},
69	Justifications,
70};
71
72use std::{
73	collections::{HashMap, HashSet},
74	iter,
75	num::NonZeroUsize,
76	sync::{
77		atomic::{AtomicBool, AtomicUsize, Ordering},
78		Arc,
79	},
80};
81
82/// Interval at which we perform time based maintenance
83const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
84
85/// Maximum number of known block hashes to keep for a peer.
86const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
87
88/// Maximum allowed size for a block announce.
89const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
90
91mod rep {
92	use sc_network::ReputationChange as Rep;
93	/// Peer has different genesis.
94	pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
95	/// Peer send us a block announcement that failed at validation.
96	pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
97	/// Peer is on unsupported protocol version.
98	pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
99	/// Reputation change when a peer refuses a request.
100	pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
101	/// Reputation change when a peer doesn't respond in time to our messages.
102	pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
103}
104
105struct Metrics {
106	peers: Gauge<U64>,
107	import_queue_blocks_submitted: Counter<U64>,
108	import_queue_justifications_submitted: Counter<U64>,
109}
110
111impl Metrics {
112	fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
113		let _ = MajorSyncingGauge::register(r, major_syncing)?;
114		Ok(Self {
115			peers: {
116				let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
117				register(g, r)?
118			},
119			import_queue_blocks_submitted: {
120				let c = Counter::new(
121					"substrate_sync_import_queue_blocks_submitted",
122					"Number of blocks submitted to the import queue.",
123				)?;
124				register(c, r)?
125			},
126			import_queue_justifications_submitted: {
127				let c = Counter::new(
128					"substrate_sync_import_queue_justifications_submitted",
129					"Number of justifications submitted to the import queue.",
130				)?;
131				register(c, r)?
132			},
133		})
134	}
135}
136
137/// The "major syncing" metric.
138#[derive(Clone)]
139pub struct MajorSyncingGauge(Arc<AtomicBool>);
140
141impl MajorSyncingGauge {
142	/// Registers the [`MajorSyncGauge`] metric whose value is
143	/// obtained from the given `AtomicBool`.
144	fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
145		prometheus_endpoint::register(
146			SourcedGauge::new(
147				&Opts::new(
148					"substrate_sub_libp2p_is_major_syncing",
149					"Whether the node is performing a major sync or not.",
150				),
151				MajorSyncingGauge(value),
152			)?,
153			registry,
154		)?;
155
156		Ok(())
157	}
158}
159
160impl MetricSource for MajorSyncingGauge {
161	type N = u64;
162
163	fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
164		set(&[], self.0.load(Ordering::Relaxed) as u64);
165	}
166}
167
168/// Peer information
169#[derive(Debug)]
170pub struct Peer<B: BlockT> {
171	pub info: ExtendedPeerInfo<B>,
172	/// Holds a set of blocks known to this peer.
173	pub known_blocks: LruHashSet<B::Hash>,
174	/// Is the peer inbound.
175	inbound: bool,
176}
177
178pub struct SyncingEngine<B: BlockT, Client> {
179	/// Syncing strategy.
180	strategy: Box<dyn SyncingStrategy<B>>,
181
182	/// Blockchain client.
183	client: Arc<Client>,
184
185	/// Number of peers we're connected to.
186	num_connected: Arc<AtomicUsize>,
187
188	/// Are we actively catching up with the chain?
189	is_major_syncing: Arc<AtomicBool>,
190
191	/// Network service.
192	network_service: service::network::NetworkServiceHandle,
193
194	/// Channel for receiving service commands
195	service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
196
197	/// Assigned roles.
198	roles: Roles,
199
200	/// Genesis hash.
201	genesis_hash: B::Hash,
202
203	/// Set of channels for other protocols that have subscribed to syncing events.
204	event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
205
206	/// Interval at which we call `tick`.
207	tick_timeout: Interval,
208
209	/// All connected peers. Contains both full and light node peers.
210	peers: HashMap<PeerId, Peer<B>>,
211
212	/// List of nodes for which we perform additional logging because they are important for the
213	/// user.
214	important_peers: HashSet<PeerId>,
215
216	/// Actual list of connected no-slot nodes.
217	default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
218
219	/// List of nodes that should never occupy peer slots.
220	default_peers_set_no_slot_peers: HashSet<PeerId>,
221
222	/// Value that was passed as part of the configuration. Used to cap the number of full
223	/// nodes.
224	default_peers_set_num_full: usize,
225
226	/// Number of slots to allocate to light nodes.
227	default_peers_set_num_light: usize,
228
229	/// Maximum number of inbound peers.
230	max_in_peers: usize,
231
232	/// Number of inbound peers accepted so far.
233	num_in_peers: usize,
234
235	/// Async processor of block announce validations.
236	block_announce_validator: BlockAnnounceValidatorStream<B>,
237
238	/// A cache for the data that was associated to a block announcement.
239	block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
240
241	/// The `PeerId`'s of all boot nodes.
242	boot_node_ids: HashSet<PeerId>,
243
244	/// Protocol name used for block announcements
245	block_announce_protocol_name: ProtocolName,
246
247	/// Prometheus metrics.
248	metrics: Option<Metrics>,
249
250	/// Handle that is used to communicate with `sc_network::Notifications`.
251	notification_service: Box<dyn NotificationService>,
252
253	/// Handle to `PeerStore`.
254	peer_store_handle: Arc<dyn PeerStoreProvider>,
255
256	/// Pending responses
257	pending_responses: PendingResponses,
258
259	/// Handle to import queue.
260	import_queue: Box<dyn ImportQueueService<B>>,
261}
262
263impl<B: BlockT, Client> SyncingEngine<B, Client>
264where
265	B: BlockT,
266	Client: HeaderBackend<B>
267		+ BlockBackend<B>
268		+ HeaderMetadata<B, Error = sp_blockchain::Error>
269		+ ProofProvider<B>
270		+ Send
271		+ Sync
272		+ 'static,
273{
274	pub fn new<N>(
275		roles: Roles,
276		client: Arc<Client>,
277		metrics_registry: Option<&Registry>,
278		network_metrics: NotificationMetrics,
279		net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
280		protocol_id: ProtocolId,
281		fork_id: Option<&str>,
282		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
283		syncing_strategy: Box<dyn SyncingStrategy<B>>,
284		network_service: service::network::NetworkServiceHandle,
285		import_queue: Box<dyn ImportQueueService<B>>,
286		peer_store_handle: Arc<dyn PeerStoreProvider>,
287	) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
288	where
289		N: NetworkBackend<B, <B as BlockT>::Hash>,
290	{
291		let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
292			net_config.network_config.default_peers_set.out_peers)
293			.max(1);
294		let important_peers = {
295			let mut imp_p = HashSet::new();
296			for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
297				imp_p.insert(reserved.peer_id);
298			}
299			for config in net_config.notification_protocols() {
300				let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
301				imp_p.extend(peer_ids);
302			}
303
304			imp_p.shrink_to_fit();
305			imp_p
306		};
307		let boot_node_ids = {
308			let mut list = HashSet::new();
309			for node in &net_config.network_config.boot_nodes {
310				list.insert(node.peer_id);
311			}
312			list.shrink_to_fit();
313			list
314		};
315		let default_peers_set_no_slot_peers = {
316			let mut no_slot_p: HashSet<PeerId> = net_config
317				.network_config
318				.default_peers_set
319				.reserved_nodes
320				.iter()
321				.map(|reserved| reserved.peer_id)
322				.collect();
323			no_slot_p.shrink_to_fit();
324			no_slot_p
325		};
326		let default_peers_set_num_full =
327			net_config.network_config.default_peers_set_num_full as usize;
328		let default_peers_set_num_light = {
329			let total = net_config.network_config.default_peers_set.out_peers +
330				net_config.network_config.default_peers_set.in_peers;
331			total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
332		};
333
334		let info = client.info();
335
336		let (block_announce_config, notification_service) =
337			Self::get_block_announce_proto_config::<N>(
338				protocol_id,
339				fork_id,
340				roles,
341				info.best_number,
342				info.best_hash,
343				info.genesis_hash,
344				&net_config.network_config.default_peers_set,
345				network_metrics,
346				Arc::clone(&peer_store_handle),
347			);
348
349		let block_announce_protocol_name = block_announce_config.protocol_name().clone();
350		let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
351		let num_connected = Arc::new(AtomicUsize::new(0));
352		let is_major_syncing = Arc::new(AtomicBool::new(false));
353
354		// `default_peers_set.in_peers` contains an unspecified amount of light peers so the number
355		// of full inbound peers must be calculated from the total full peer count
356		let max_full_peers = net_config.network_config.default_peers_set_num_full;
357		let max_out_peers = net_config.network_config.default_peers_set.out_peers;
358		let max_in_peers = (max_full_peers - max_out_peers) as usize;
359
360		let tick_timeout = {
361			let mut interval = tokio::time::interval(TICK_TIMEOUT);
362			interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
363			interval
364		};
365
366		Ok((
367			Self {
368				roles,
369				client,
370				strategy: syncing_strategy,
371				network_service,
372				peers: HashMap::new(),
373				block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
374				block_announce_protocol_name,
375				block_announce_validator: BlockAnnounceValidatorStream::new(
376					block_announce_validator,
377				),
378				num_connected: num_connected.clone(),
379				is_major_syncing: is_major_syncing.clone(),
380				service_rx,
381				genesis_hash: info.genesis_hash,
382				important_peers,
383				default_peers_set_no_slot_connected_peers: HashSet::new(),
384				boot_node_ids,
385				default_peers_set_no_slot_peers,
386				default_peers_set_num_full,
387				default_peers_set_num_light,
388				num_in_peers: 0usize,
389				max_in_peers,
390				event_streams: Vec::new(),
391				notification_service,
392				tick_timeout,
393				peer_store_handle,
394				metrics: if let Some(r) = metrics_registry {
395					match Metrics::register(r, is_major_syncing.clone()) {
396						Ok(metrics) => Some(metrics),
397						Err(err) => {
398							log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
399							None
400						},
401					}
402				} else {
403					None
404				},
405				pending_responses: PendingResponses::new(),
406				import_queue,
407			},
408			SyncingService::new(tx, num_connected, is_major_syncing),
409			block_announce_config,
410		))
411	}
412
413	fn update_peer_info(
414		&mut self,
415		peer_id: &PeerId,
416		best_hash: B::Hash,
417		best_number: NumberFor<B>,
418	) {
419		if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
420			peer.info.best_hash = best_hash;
421			peer.info.best_number = best_number;
422		}
423	}
424
425	/// Process the result of the block announce validation.
426	fn process_block_announce_validation_result(
427		&mut self,
428		validation_result: BlockAnnounceValidationResult<B::Header>,
429	) {
430		match validation_result {
431			BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
432			BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
433				if let Some((best_hash, best_number)) =
434					self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
435				{
436					self.update_peer_info(&peer_id, best_hash, best_number);
437				}
438
439				if let Some(data) = announce.data {
440					if !data.is_empty() {
441						self.block_announce_data_cache.insert(announce.header.hash(), data);
442					}
443				}
444			},
445			BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
446				if disconnect {
447					log::debug!(
448						target: LOG_TARGET,
449						"Disconnecting peer {peer_id} due to block announce validation failure",
450					);
451					self.network_service
452						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
453				}
454
455				self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
456			},
457		}
458	}
459
460	/// Push a block announce validation.
461	pub fn push_block_announce_validation(
462		&mut self,
463		peer_id: PeerId,
464		announce: BlockAnnounce<B::Header>,
465	) {
466		let hash = announce.header.hash();
467
468		let peer = match self.peers.get_mut(&peer_id) {
469			Some(p) => p,
470			None => {
471				log::error!(
472					target: LOG_TARGET,
473					"Received block announce from disconnected peer {peer_id}",
474				);
475				debug_assert!(false);
476				return;
477			},
478		};
479		peer.known_blocks.insert(hash);
480
481		if peer.info.roles.is_full() {
482			let is_best = match announce.state.unwrap_or(BlockState::Best) {
483				BlockState::Best => true,
484				BlockState::Normal => false,
485			};
486
487			self.block_announce_validator
488				.push_block_announce_validation(peer_id, hash, announce, is_best);
489		}
490	}
491
492	/// Make sure an important block is propagated to peers.
493	///
494	/// In chain-based consensus, we often need to make sure non-best forks are
495	/// at least temporarily synced.
496	pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
497		let header = match self.client.header(hash) {
498			Ok(Some(header)) => header,
499			Ok(None) => {
500				log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
501				return;
502			},
503			Err(e) => {
504				log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
505				return;
506			},
507		};
508
509		// don't announce genesis block since it will be ignored
510		if header.number().is_zero() {
511			return;
512		}
513
514		let is_best = self.client.info().best_hash == hash;
515		log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
516
517		let data = data
518			.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
519			.unwrap_or_default();
520
521		for (peer_id, ref mut peer) in self.peers.iter_mut() {
522			let inserted = peer.known_blocks.insert(hash);
523			if inserted {
524				log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
525				let message = BlockAnnounce {
526					header: header.clone(),
527					state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
528					data: Some(data.clone()),
529				};
530
531				let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
532			}
533		}
534	}
535
536	pub async fn run(mut self) {
537		loop {
538			tokio::select! {
539				_ = self.tick_timeout.tick() => {
540					// TODO: This tick should not be necessary, but
541					//  `self.process_strategy_actions()` is not called in some cases otherwise and
542					//  some tests fail because of this
543				},
544				command = self.service_rx.select_next_some() =>
545					self.process_service_command(command),
546				notification_event = self.notification_service.next_event() => match notification_event {
547					Some(event) => self.process_notification_event(event),
548					None => return,
549				},
550				response_event = self.pending_responses.select_next_some() =>
551					self.process_response_event(response_event),
552				validation_result = self.block_announce_validator.select_next_some() =>
553					self.process_block_announce_validation_result(validation_result),
554			}
555
556			// Update atomic variables
557			self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
558
559			// Process actions requested by a syncing strategy.
560			if let Err(e) = self.process_strategy_actions() {
561				error!(
562					target: LOG_TARGET,
563					"Terminating `SyncingEngine` due to fatal error: {e:?}.",
564				);
565				return;
566			}
567		}
568	}
569
570	fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
571		for action in self.strategy.actions(&self.network_service)? {
572			match action {
573				SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
574					if !self.peers.contains_key(&peer_id) {
575						trace!(
576							target: LOG_TARGET,
577							"Cannot start request with strategy key {key:?} to unknown peer \
578							{peer_id}",
579						);
580						debug_assert!(false);
581						continue;
582					}
583					if remove_obsolete {
584						if self.pending_responses.remove(peer_id, key) {
585							warn!(
586								target: LOG_TARGET,
587								"Processed `SyncingAction::StartRequest` to {peer_id} with \
588								strategy key {key:?}. Stale response removed!",
589							)
590						} else {
591							trace!(
592								target: LOG_TARGET,
593								"Processed `SyncingAction::StartRequest` to {peer_id} with \
594								strategy key {key:?}.",
595							)
596						}
597					}
598
599					self.pending_responses.insert(peer_id, key, request);
600				},
601				SyncingAction::CancelRequest { peer_id, key } => {
602					let removed = self.pending_responses.remove(peer_id, key);
603
604					trace!(
605						target: LOG_TARGET,
606						"Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
607					);
608				},
609				SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
610					self.pending_responses.remove_all(&peer_id);
611					self.network_service
612						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
613					self.network_service.report_peer(peer_id, rep);
614
615					trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
616				},
617				SyncingAction::ImportBlocks { origin, blocks } => {
618					let count = blocks.len();
619					self.import_blocks(origin, blocks);
620
621					trace!(
622						target: LOG_TARGET,
623						"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
624					);
625				},
626				SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
627					self.import_justifications(peer_id, hash, number, justifications);
628
629					trace!(
630						target: LOG_TARGET,
631						"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
632						peer_id,
633						hash,
634						number,
635					)
636				},
637				// Nothing to do, this is handled internally by `PolkadotSyncingStrategy`.
638				SyncingAction::Finished => {},
639			}
640		}
641
642		Ok(())
643	}
644
645	fn process_service_command(&mut self, command: ToServiceCommand<B>) {
646		match command {
647			ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
648				self.strategy.set_sync_fork_request(peers, &hash, number);
649			},
650			ToServiceCommand::EventStream(tx) => {
651				// Let a new subscriber know about already connected peers.
652				for peer_id in self.peers.keys() {
653					let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
654				}
655				self.event_streams.push(tx);
656			},
657			ToServiceCommand::RequestJustification(hash, number) =>
658				self.strategy.request_justification(&hash, number),
659			ToServiceCommand::ClearJustificationRequests =>
660				self.strategy.clear_justification_requests(),
661			ToServiceCommand::BlocksProcessed(imported, count, results) => {
662				self.strategy.on_blocks_processed(imported, count, results);
663			},
664			ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
665				self.strategy.on_justification_import(hash, number, success);
666				if !success {
667					log::info!(
668						target: LOG_TARGET,
669						"💔 Invalid justification provided by {peer_id} for #{hash}",
670					);
671					self.network_service
672						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
673					self.network_service
674						.report_peer(peer_id, ReputationChange::new_fatal("Invalid justification"));
675				}
676			},
677			ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
678			ToServiceCommand::NewBestBlockImported(hash, number) => {
679				log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
680
681				self.strategy.update_chain_info(&hash, number);
682				let _ = self.notification_service.try_set_handshake(
683					BlockAnnouncesHandshake::<B>::build(
684						self.roles,
685						number,
686						hash,
687						self.genesis_hash,
688					)
689					.encode(),
690				);
691			},
692			ToServiceCommand::Status(tx) => {
693				let _ = tx.send(self.strategy.status());
694			},
695			ToServiceCommand::NumActivePeers(tx) => {
696				let _ = tx.send(self.num_active_peers());
697			},
698			ToServiceCommand::NumDownloadedBlocks(tx) => {
699				let _ = tx.send(self.strategy.num_downloaded_blocks());
700			},
701			ToServiceCommand::NumSyncRequests(tx) => {
702				let _ = tx.send(self.strategy.num_sync_requests());
703			},
704			ToServiceCommand::PeersInfo(tx) => {
705				let peers_info =
706					self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
707				let _ = tx.send(peers_info);
708			},
709			ToServiceCommand::OnBlockFinalized(hash, header) =>
710				self.strategy.on_block_finalized(&hash, *header.number()),
711		}
712	}
713
714	fn process_notification_event(&mut self, event: NotificationEvent) {
715		match event {
716			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
717				let validation_result = self
718					.validate_connection(&peer, handshake, Direction::Inbound)
719					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
720
721				let _ = result_tx.send(validation_result);
722			},
723			NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
724				log::debug!(
725					target: LOG_TARGET,
726					"Substream opened for {peer}, handshake {handshake:?}"
727				);
728
729				match self.validate_connection(&peer, handshake, direction) {
730					Ok(handshake) => {
731						if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
732							log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
733							self.network_service
734								.disconnect_peer(peer, self.block_announce_protocol_name.clone());
735						}
736					},
737					Err(wrong_genesis) => {
738						log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
739
740						if wrong_genesis {
741							self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
742						}
743
744						self.network_service
745							.disconnect_peer(peer, self.block_announce_protocol_name.clone());
746					},
747				}
748			},
749			NotificationEvent::NotificationStreamClosed { peer } => {
750				self.on_sync_peer_disconnected(peer);
751			},
752			NotificationEvent::NotificationReceived { peer, notification } => {
753				if !self.peers.contains_key(&peer) {
754					log::error!(
755						target: LOG_TARGET,
756						"received notification from {peer} who had been earlier refused by `SyncingEngine`",
757					);
758					return;
759				}
760
761				let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
762					log::warn!(target: LOG_TARGET, "failed to decode block announce");
763					return;
764				};
765
766				self.push_block_announce_validation(peer, announce);
767			},
768		}
769	}
770
771	/// Called by peer when it is disconnecting.
772	///
773	/// Returns a result if the handshake of this peer was indeed accepted.
774	fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
775		let Some(info) = self.peers.remove(&peer_id) else {
776			log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
777			return;
778		};
779		if let Some(metrics) = &self.metrics {
780			metrics.peers.dec();
781		}
782		self.num_connected.fetch_sub(1, Ordering::AcqRel);
783
784		if self.important_peers.contains(&peer_id) {
785			log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
786		} else {
787			log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
788		}
789
790		if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
791			info.inbound &&
792			info.info.roles.is_full()
793		{
794			match self.num_in_peers.checked_sub(1) {
795				Some(value) => {
796					self.num_in_peers = value;
797				},
798				None => {
799					log::error!(
800						target: LOG_TARGET,
801						"trying to disconnect an inbound node which is not counted as inbound"
802					);
803					debug_assert!(false);
804				},
805			}
806		}
807
808		self.strategy.remove_peer(&peer_id);
809		self.pending_responses.remove_all(&peer_id);
810		self.event_streams
811			.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
812	}
813
814	/// Validate received handshake.
815	fn validate_handshake(
816		&mut self,
817		peer_id: &PeerId,
818		handshake: Vec<u8>,
819	) -> Result<BlockAnnouncesHandshake<B>, bool> {
820		log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
821
822		let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
823			.map_err(|error| {
824				log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
825				false
826			})?;
827
828		if handshake.genesis_hash != self.genesis_hash {
829			if self.important_peers.contains(&peer_id) {
830				log::error!(
831					target: LOG_TARGET,
832					"Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
833					self.genesis_hash,
834					handshake.genesis_hash,
835				);
836			} else if self.boot_node_ids.contains(&peer_id) {
837				log::error!(
838					target: LOG_TARGET,
839					"Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
840					self.genesis_hash,
841					handshake.genesis_hash,
842				);
843			} else {
844				log::debug!(
845					target: LOG_TARGET,
846					"Peer is on different chain (our genesis: {} theirs: {})",
847					self.genesis_hash,
848					handshake.genesis_hash
849				);
850			}
851
852			return Err(true);
853		}
854
855		Ok(handshake)
856	}
857
858	/// Validate connection.
859	// NOTE Returning `Err(bool)` is a really ugly hack to work around the issue
860	// that `ProtocolController` thinks the peer is connected when in fact it can
861	// still be under validation. If the peer has different genesis than the
862	// local node the validation fails but the peer cannot be reported in
863	// `validate_connection()` as that is also called by
864	// `ValidateInboundSubstream` which means that the peer is still being
865	// validated and banning the peer when handling that event would
866	// result in peer getting dropped twice.
867	//
868	// The proper way to fix this is to integrate `ProtocolController` more
869	// tightly with `NotificationService` or add an additional API call for
870	// banning pre-accepted peers (which is not desirable)
871	fn validate_connection(
872		&mut self,
873		peer_id: &PeerId,
874		handshake: Vec<u8>,
875		direction: Direction,
876	) -> Result<BlockAnnouncesHandshake<B>, bool> {
877		log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
878
879		let handshake = self.validate_handshake(peer_id, handshake)?;
880
881		if self.peers.contains_key(&peer_id) {
882			log::error!(
883				target: LOG_TARGET,
884				"Called `validate_connection()` with already connected peer {peer_id}",
885			);
886			debug_assert!(false);
887			return Err(false);
888		}
889
890		let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
891		let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
892
893		if handshake.roles.is_full() &&
894			self.strategy.num_peers() >=
895				self.default_peers_set_num_full +
896					self.default_peers_set_no_slot_connected_peers.len() +
897					this_peer_reserved_slot
898		{
899			log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
900			return Err(false);
901		}
902
903		// make sure to accept no more than `--in-peers` many full nodes
904		if !no_slot_peer &&
905			handshake.roles.is_full() &&
906			direction.is_inbound() &&
907			self.num_in_peers == self.max_in_peers
908		{
909			log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
910			return Err(false);
911		}
912
913		// make sure that all slots are not occupied by light peers
914		//
915		// `ChainSync` only accepts full peers whereas `SyncingEngine` accepts both full and light
916		// peers. Verify that there is a slot in `SyncingEngine` for the inbound light peer
917		if handshake.roles.is_light() &&
918			(self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
919		{
920			log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
921			return Err(false);
922		}
923
924		Ok(handshake)
925	}
926
927	/// Called on the first connection between two peers on the default set, after their exchange
928	/// of handshake.
929	///
930	/// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync
931	/// from.
932	fn on_sync_peer_connected(
933		&mut self,
934		peer_id: PeerId,
935		status: &BlockAnnouncesHandshake<B>,
936		direction: Direction,
937	) -> Result<(), ()> {
938		log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
939
940		let peer = Peer {
941			info: ExtendedPeerInfo {
942				roles: status.roles,
943				best_hash: status.best_hash,
944				best_number: status.best_number,
945			},
946			known_blocks: LruHashSet::new(
947				NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
948			),
949			inbound: direction.is_inbound(),
950		};
951
952		// Only forward full peers to syncing strategy.
953		if status.roles.is_full() {
954			self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
955		}
956
957		log::debug!(target: LOG_TARGET, "Connected {peer_id}");
958
959		if self.peers.insert(peer_id, peer).is_none() {
960			if let Some(metrics) = &self.metrics {
961				metrics.peers.inc();
962			}
963			self.num_connected.fetch_add(1, Ordering::AcqRel);
964		}
965		self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
966
967		if self.default_peers_set_no_slot_peers.contains(&peer_id) {
968			self.default_peers_set_no_slot_connected_peers.insert(peer_id);
969		} else if direction.is_inbound() && status.roles.is_full() {
970			self.num_in_peers += 1;
971		}
972
973		self.event_streams
974			.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
975
976		Ok(())
977	}
978
979	fn process_response_event(&mut self, response_event: ResponseEvent) {
980		let ResponseEvent { peer_id, key, response: response_result } = response_event;
981
982		match response_result {
983			Ok(Ok((response, protocol_name))) => {
984				self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
985			},
986			Ok(Err(e)) => {
987				debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
988
989				match e {
990					RequestFailure::Network(OutboundFailure::Timeout) => {
991						self.network_service.report_peer(peer_id, rep::TIMEOUT);
992						self.network_service
993							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
994					},
995					RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
996						self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
997						self.network_service
998							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
999					},
1000					RequestFailure::Network(OutboundFailure::DialFailure) => {
1001						self.network_service
1002							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1003					},
1004					RequestFailure::Refused => {
1005						self.network_service.report_peer(peer_id, rep::REFUSED);
1006						self.network_service
1007							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1008					},
1009					RequestFailure::Network(OutboundFailure::ConnectionClosed) |
1010					RequestFailure::NotConnected => {
1011						self.network_service
1012							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1013					},
1014					RequestFailure::UnknownProtocol => {
1015						debug_assert!(false, "Block request protocol should always be known.");
1016					},
1017					RequestFailure::Obsolete => {
1018						debug_assert!(
1019							false,
1020							"Can not receive `RequestFailure::Obsolete` after dropping the \
1021								response receiver.",
1022						);
1023					},
1024				}
1025			},
1026			Err(oneshot::Canceled) => {
1027				trace!(
1028					target: LOG_TARGET,
1029					"Request to peer {peer_id:?} failed due to oneshot being canceled.",
1030				);
1031				self.network_service
1032					.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1033			},
1034		}
1035	}
1036
1037	/// Returns the number of peers we're connected to and that are being queried.
1038	fn num_active_peers(&self) -> usize {
1039		self.pending_responses.len()
1040	}
1041
1042	/// Get config for the block announcement protocol
1043	fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1044		protocol_id: ProtocolId,
1045		fork_id: Option<&str>,
1046		roles: Roles,
1047		best_number: NumberFor<B>,
1048		best_hash: B::Hash,
1049		genesis_hash: B::Hash,
1050		set_config: &SetConfig,
1051		metrics: NotificationMetrics,
1052		peer_store_handle: Arc<dyn PeerStoreProvider>,
1053	) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1054		let block_announces_protocol = {
1055			let genesis_hash = genesis_hash.as_ref();
1056			if let Some(fork_id) = fork_id {
1057				format!(
1058					"/{}/{}/block-announces/1",
1059					array_bytes::bytes2hex("", genesis_hash),
1060					fork_id
1061				)
1062			} else {
1063				format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1064			}
1065		};
1066
1067		N::notification_config(
1068			block_announces_protocol.into(),
1069			iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1070			MAX_BLOCK_ANNOUNCE_SIZE,
1071			Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1072				roles,
1073				best_number,
1074				best_hash,
1075				genesis_hash,
1076			))),
1077			set_config.clone(),
1078			metrics,
1079			peer_store_handle,
1080		)
1081	}
1082
1083	/// Import blocks.
1084	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1085		if let Some(metrics) = &self.metrics {
1086			metrics.import_queue_blocks_submitted.inc();
1087		}
1088
1089		self.import_queue.import_blocks(origin, blocks);
1090	}
1091
1092	/// Import justifications.
1093	fn import_justifications(
1094		&mut self,
1095		peer_id: PeerId,
1096		hash: B::Hash,
1097		number: NumberFor<B>,
1098		justifications: Justifications,
1099	) {
1100		if let Some(metrics) = &self.metrics {
1101			metrics.import_queue_justifications_submitted.inc();
1102		}
1103
1104		self.import_queue.import_justifications(peer_id, hash, number, justifications);
1105	}
1106}