Skip to main content

soil_network/sync/
engine.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! `SyncingEngine` is the actor responsible for syncing Substrate chain
8//! to tip and keep the blockchain up to date with network updates.
9
10use crate::{
11	sync::{
12		block_announce_validator::{
13			BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
14		},
15		pending_responses::{PendingResponses, ResponseEvent},
16		service::{
17			self,
18			syncing_service::{SyncingService, ToServiceCommand},
19		},
20		strategy::{SyncingAction, SyncingStrategy},
21		types::{BadPeer, ExtendedPeerInfo, SyncEvent},
22	},
23	LOG_TARGET,
24};
25
26use codec::{Decode, DecodeAll, Encode};
27use futures::{channel::oneshot, StreamExt};
28use log::{debug, error, trace, warn};
29use soil_prometheus::{
30	register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
31};
32use schnellru::{ByLength, LruMap};
33use tokio::time::{Interval, MissedTickBehavior};
34
35use soil_client::blockchain::{Error as ClientError, HeaderMetadata};
36use soil_client::client_api::{BlockBackend, HeaderBackend, ProofProvider};
37use soil_client::consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
38use soil_client::import::{ImportQueueService, IncomingBlock};
39use soil_client::utils::mpsc::{
40	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
41};
42use soil_network::common::{
43	role::Roles,
44	sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
45};
46use soil_network::types::PeerId;
47use soil_network::{
48	config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49	peer_store::PeerStoreProvider,
50	request_responses::{OutboundFailure, RequestFailure},
51	service::{
52		traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53		NotificationMetrics,
54	},
55	types::ProtocolName,
56	utils::LruHashSet,
57	NetworkBackend, NotificationService, ReputationChange,
58};
59use subsoil::runtime::{
60	traits::{Block as BlockT, Header, NumberFor, Zero},
61	Justifications,
62};
63
64use std::{
65	collections::{HashMap, HashSet},
66	iter,
67	num::NonZeroUsize,
68	sync::{
69		atomic::{AtomicBool, AtomicUsize, Ordering},
70		Arc,
71	},
72};
73
74/// Interval at which we perform time based maintenance
75const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
76
77/// Maximum number of known block hashes to keep for a peer.
78const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
79
80/// Maximum allowed size for a block announce.
81const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
82
83mod rep {
84	use soil_network::ReputationChange as Rep;
85	/// Peer has different genesis.
86	pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
87	/// Peer send us a block announcement that failed at validation.
88	pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
89	/// Peer is on unsupported protocol version.
90	pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
91	/// Reputation change when a peer refuses a request.
92	pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
93	/// Reputation change when a peer doesn't respond in time to our messages.
94	pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
95	/// Reputation change when a peer connection failed with IO error.
96	pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
97}
98
99struct Metrics {
100	peers: Gauge<U64>,
101	import_queue_blocks_submitted: Counter<U64>,
102	import_queue_justifications_submitted: Counter<U64>,
103}
104
105impl Metrics {
106	fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
107		MajorSyncingGauge::register(r, major_syncing)?;
108		Ok(Self {
109			peers: {
110				let g = Gauge::new("substrate_sync_peers", "Number of peers we sync with")?;
111				register(g, r)?
112			},
113			import_queue_blocks_submitted: {
114				let c = Counter::new(
115					"substrate_sync_import_queue_blocks_submitted",
116					"Number of blocks submitted to the import queue.",
117				)?;
118				register(c, r)?
119			},
120			import_queue_justifications_submitted: {
121				let c = Counter::new(
122					"substrate_sync_import_queue_justifications_submitted",
123					"Number of justifications submitted to the import queue.",
124				)?;
125				register(c, r)?
126			},
127		})
128	}
129}
130
131/// The "major syncing" metric.
132#[derive(Clone)]
133pub struct MajorSyncingGauge(Arc<AtomicBool>);
134
135impl MajorSyncingGauge {
136	/// Registers the [`MajorSyncGauge`] metric whose value is
137	/// obtained from the given `AtomicBool`.
138	fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
139		soil_prometheus::register(
140			SourcedGauge::new(
141				&Opts::new(
142					"substrate_sub_libp2p_is_major_syncing",
143					"Whether the node is performing a major sync or not.",
144				),
145				MajorSyncingGauge(value),
146			)?,
147			registry,
148		)?;
149
150		Ok(())
151	}
152}
153
154impl MetricSource for MajorSyncingGauge {
155	type N = u64;
156
157	fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
158		set(&[], self.0.load(Ordering::Relaxed) as u64);
159	}
160}
161
162/// Peer information
163#[derive(Debug)]
164pub struct Peer<B: BlockT> {
165	pub info: ExtendedPeerInfo<B>,
166	/// Holds a set of blocks known to this peer.
167	pub known_blocks: LruHashSet<B::Hash>,
168	/// Is the peer inbound.
169	inbound: bool,
170}
171
172pub struct SyncingEngine<B: BlockT, Client> {
173	/// Syncing strategy.
174	strategy: Box<dyn SyncingStrategy<B>>,
175
176	/// Blockchain client.
177	client: Arc<Client>,
178
179	/// Number of peers we're connected to.
180	num_connected: Arc<AtomicUsize>,
181
182	/// Are we actively catching up with the chain?
183	is_major_syncing: Arc<AtomicBool>,
184
185	/// Network service.
186	network_service: service::network::NetworkServiceHandle,
187
188	/// Channel for receiving service commands
189	service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
190
191	/// Assigned roles.
192	roles: Roles,
193
194	/// Genesis hash.
195	genesis_hash: B::Hash,
196
197	/// Set of channels for other protocols that have subscribed to syncing events.
198	event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
199
200	/// Interval at which we call `tick`.
201	tick_timeout: Interval,
202
203	/// All connected peers. Contains both full and light node peers.
204	peers: HashMap<PeerId, Peer<B>>,
205
206	/// List of nodes for which we perform additional logging because they are important for the
207	/// user.
208	important_peers: HashSet<PeerId>,
209
210	/// Actual list of connected no-slot nodes.
211	default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
212
213	/// List of nodes that should never occupy peer slots.
214	default_peers_set_no_slot_peers: HashSet<PeerId>,
215
216	/// Value that was passed as part of the configuration. Used to cap the number of full
217	/// nodes.
218	default_peers_set_num_full: usize,
219
220	/// Number of slots to allocate to light nodes.
221	default_peers_set_num_light: usize,
222
223	/// Maximum number of inbound peers.
224	max_in_peers: usize,
225
226	/// Number of inbound peers accepted so far.
227	num_in_peers: usize,
228
229	/// Async processor of block announce validations.
230	block_announce_validator: BlockAnnounceValidatorStream<B>,
231
232	/// A cache for the data that was associated to a block announcement.
233	block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
234
235	/// The `PeerId`'s of all boot nodes.
236	boot_node_ids: HashSet<PeerId>,
237
238	/// Protocol name used for block announcements
239	block_announce_protocol_name: ProtocolName,
240
241	/// Prometheus metrics.
242	metrics: Option<Metrics>,
243
244	/// Handle that is used to communicate with `soil_network::Notifications`.
245	notification_service: Box<dyn NotificationService>,
246
247	/// Handle to `PeerStore`.
248	peer_store_handle: Arc<dyn PeerStoreProvider>,
249
250	/// Pending responses
251	pending_responses: PendingResponses,
252
253	/// Handle to import queue.
254	import_queue: Box<dyn ImportQueueService<B>>,
255}
256
257impl<B: BlockT, Client> SyncingEngine<B, Client>
258where
259	B: BlockT,
260	Client: HeaderBackend<B>
261		+ BlockBackend<B>
262		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
263		+ ProofProvider<B>
264		+ Send
265		+ Sync
266		+ 'static,
267{
268	pub fn new<N>(
269		roles: Roles,
270		client: Arc<Client>,
271		metrics_registry: Option<&Registry>,
272		network_metrics: NotificationMetrics,
273		net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
274		protocol_id: ProtocolId,
275		fork_id: Option<&str>,
276		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
277		syncing_strategy: Box<dyn SyncingStrategy<B>>,
278		network_service: service::network::NetworkServiceHandle,
279		import_queue: Box<dyn ImportQueueService<B>>,
280		peer_store_handle: Arc<dyn PeerStoreProvider>,
281	) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
282	where
283		N: NetworkBackend<B, <B as BlockT>::Hash>,
284	{
285		let cache_capacity = (net_config.network_config.default_peers_set.in_peers
286			+ net_config.network_config.default_peers_set.out_peers)
287			.max(1);
288		let important_peers = {
289			let mut imp_p = HashSet::new();
290			for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
291				imp_p.insert(reserved.peer_id);
292			}
293			for config in net_config.notification_protocols() {
294				let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
295				imp_p.extend(peer_ids);
296			}
297
298			imp_p.shrink_to_fit();
299			imp_p
300		};
301		let boot_node_ids = {
302			let mut list = HashSet::new();
303			for node in &net_config.network_config.boot_nodes {
304				list.insert(node.peer_id);
305			}
306			list.shrink_to_fit();
307			list
308		};
309		let default_peers_set_no_slot_peers = {
310			let mut no_slot_p: HashSet<PeerId> = net_config
311				.network_config
312				.default_peers_set
313				.reserved_nodes
314				.iter()
315				.map(|reserved| reserved.peer_id)
316				.collect();
317			no_slot_p.shrink_to_fit();
318			no_slot_p
319		};
320		let default_peers_set_num_full =
321			net_config.network_config.default_peers_set_num_full as usize;
322		let default_peers_set_num_light = {
323			let total = net_config.network_config.default_peers_set.out_peers
324				+ net_config.network_config.default_peers_set.in_peers;
325			total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
326		};
327
328		let info = client.info();
329
330		let (block_announce_config, notification_service) =
331			Self::get_block_announce_proto_config::<N>(
332				protocol_id,
333				fork_id,
334				roles,
335				info.best_number,
336				info.best_hash,
337				info.genesis_hash,
338				&net_config.network_config.default_peers_set,
339				network_metrics,
340				Arc::clone(&peer_store_handle),
341			);
342
343		let block_announce_protocol_name = block_announce_config.protocol_name().clone();
344		let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
345		let num_connected = Arc::new(AtomicUsize::new(0));
346		let is_major_syncing = Arc::new(AtomicBool::new(false));
347
348		// `default_peers_set.in_peers` contains an unspecified amount of light peers so the number
349		// of full inbound peers must be calculated from the total full peer count
350		let max_full_peers = net_config.network_config.default_peers_set_num_full;
351		let max_out_peers = net_config.network_config.default_peers_set.out_peers;
352		let max_in_peers = (max_full_peers - max_out_peers) as usize;
353
354		let tick_timeout = {
355			let mut interval = tokio::time::interval(TICK_TIMEOUT);
356			interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
357			interval
358		};
359
360		Ok((
361			Self {
362				roles,
363				client,
364				strategy: syncing_strategy,
365				network_service,
366				peers: HashMap::new(),
367				block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
368				block_announce_protocol_name,
369				block_announce_validator: BlockAnnounceValidatorStream::new(
370					block_announce_validator,
371				),
372				num_connected: num_connected.clone(),
373				is_major_syncing: is_major_syncing.clone(),
374				service_rx,
375				genesis_hash: info.genesis_hash,
376				important_peers,
377				default_peers_set_no_slot_connected_peers: HashSet::new(),
378				boot_node_ids,
379				default_peers_set_no_slot_peers,
380				default_peers_set_num_full,
381				default_peers_set_num_light,
382				num_in_peers: 0usize,
383				max_in_peers,
384				event_streams: Vec::new(),
385				notification_service,
386				tick_timeout,
387				peer_store_handle,
388				metrics: if let Some(r) = metrics_registry {
389					match Metrics::register(r, is_major_syncing.clone()) {
390						Ok(metrics) => Some(metrics),
391						Err(err) => {
392							log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
393							None
394						},
395					}
396				} else {
397					None
398				},
399				pending_responses: PendingResponses::new(),
400				import_queue,
401			},
402			SyncingService::new(tx, num_connected, is_major_syncing),
403			block_announce_config,
404		))
405	}
406
407	fn update_peer_info(
408		&mut self,
409		peer_id: &PeerId,
410		best_hash: B::Hash,
411		best_number: NumberFor<B>,
412	) {
413		if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
414			peer.info.best_hash = best_hash;
415			peer.info.best_number = best_number;
416		}
417	}
418
419	/// Process the result of the block announce validation.
420	fn process_block_announce_validation_result(
421		&mut self,
422		validation_result: BlockAnnounceValidationResult<B::Header>,
423	) {
424		match validation_result {
425			BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
426			BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
427				if let Some((best_hash, best_number)) =
428					self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
429				{
430					self.update_peer_info(&peer_id, best_hash, best_number);
431				}
432
433				if let Some(data) = announce.data {
434					if !data.is_empty() {
435						self.block_announce_data_cache.insert(announce.header.hash(), data);
436					}
437				}
438			},
439			BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
440				if disconnect {
441					log::debug!(
442						target: LOG_TARGET,
443						"Disconnecting peer {peer_id} due to block announce validation failure",
444					);
445					self.network_service
446						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
447				}
448
449				self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
450			},
451		}
452	}
453
454	/// Push a block announce validation.
455	pub fn push_block_announce_validation(
456		&mut self,
457		peer_id: PeerId,
458		announce: BlockAnnounce<B::Header>,
459	) {
460		let hash = announce.header.hash();
461
462		let peer = match self.peers.get_mut(&peer_id) {
463			Some(p) => p,
464			None => {
465				log::error!(
466					target: LOG_TARGET,
467					"Received block announce from disconnected peer {peer_id}",
468				);
469				debug_assert!(false);
470				return;
471			},
472		};
473		peer.known_blocks.insert(hash);
474
475		if peer.info.roles.is_full() {
476			let is_best = match announce.state.unwrap_or(BlockState::Best) {
477				BlockState::Best => true,
478				BlockState::Normal => false,
479			};
480
481			self.block_announce_validator
482				.push_block_announce_validation(peer_id, hash, announce, is_best);
483		}
484	}
485
486	/// Make sure an important block is propagated to peers.
487	///
488	/// In chain-based consensus, we often need to make sure non-best forks are
489	/// at least temporarily synced.
490	pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
491		let header = match self.client.header(hash) {
492			Ok(Some(header)) => header,
493			Ok(None) => {
494				log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
495				return;
496			},
497			Err(e) => {
498				log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
499				return;
500			},
501		};
502
503		// don't announce genesis block since it will be ignored
504		if header.number().is_zero() {
505			return;
506		}
507
508		let is_best = self.client.info().best_hash == hash;
509		log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
510
511		let data = data
512			.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
513			.unwrap_or_default();
514
515		for (peer_id, ref mut peer) in self.peers.iter_mut() {
516			let inserted = peer.known_blocks.insert(hash);
517			if inserted {
518				log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
519				let message = BlockAnnounce {
520					header: header.clone(),
521					state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
522					data: Some(data.clone()),
523				};
524
525				let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
526			}
527		}
528	}
529
530	pub async fn run(mut self) {
531		loop {
532			tokio::select! {
533				_ = self.tick_timeout.tick() => {
534					// TODO: This tick should not be necessary, but
535					//  `self.process_strategy_actions()` is not called in some cases otherwise and
536					//  some tests fail because of this
537				},
538				command = self.service_rx.select_next_some() =>
539					self.process_service_command(command),
540				notification_event = self.notification_service.next_event() => match notification_event {
541					Some(event) => self.process_notification_event(event),
542					None => {
543						error!(
544							target: LOG_TARGET,
545							"Terminating `SyncingEngine` because `NotificationService` has terminated.",
546						);
547
548						return;
549					}
550				},
551				response_event = self.pending_responses.select_next_some() =>
552					self.process_response_event(response_event),
553				validation_result = self.block_announce_validator.select_next_some() =>
554					self.process_block_announce_validation_result(validation_result),
555			}
556
557			// Update atomic variables
558			self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
559
560			// Process actions requested by a syncing strategy.
561			if let Err(e) = self.process_strategy_actions() {
562				error!(
563					target: LOG_TARGET,
564					"Terminating `SyncingEngine` due to fatal error: {e:?}.",
565				);
566				return;
567			}
568		}
569	}
570
571	fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
572		for action in self.strategy.actions(&self.network_service)? {
573			match action {
574				SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
575					if !self.peers.contains_key(&peer_id) {
576						trace!(
577							target: LOG_TARGET,
578							"Cannot start request with strategy key {key:?} to unknown peer \
579							{peer_id}",
580						);
581						debug_assert!(false);
582						continue;
583					}
584					if remove_obsolete {
585						if self.pending_responses.remove(peer_id, key) {
586							warn!(
587								target: LOG_TARGET,
588								"Processed `SyncingAction::StartRequest` to {peer_id} with \
589								strategy key {key:?}. Stale response removed!",
590							)
591						} else {
592							trace!(
593								target: LOG_TARGET,
594								"Processed `SyncingAction::StartRequest` to {peer_id} with \
595								strategy key {key:?}.",
596							)
597						}
598					}
599
600					self.pending_responses.insert(peer_id, key, request);
601				},
602				SyncingAction::CancelRequest { peer_id, key } => {
603					let removed = self.pending_responses.remove(peer_id, key);
604
605					trace!(
606						target: LOG_TARGET,
607						"Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
608					);
609				},
610				SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
611					self.pending_responses.remove_all(&peer_id);
612					self.network_service
613						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
614					self.network_service.report_peer(peer_id, rep);
615
616					trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
617				},
618				SyncingAction::ImportBlocks { origin, blocks } => {
619					let count = blocks.len();
620					self.import_blocks(origin, blocks);
621
622					trace!(
623						target: LOG_TARGET,
624						"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
625					);
626				},
627				SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
628					self.import_justifications(peer_id, hash, number, justifications);
629
630					trace!(
631						target: LOG_TARGET,
632						"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
633						peer_id,
634						hash,
635						number,
636					)
637				},
638				// Nothing to do, this is handled internally by `PolkadotSyncingStrategy`.
639				SyncingAction::Finished => {},
640			}
641		}
642
643		Ok(())
644	}
645
646	fn process_service_command(&mut self, command: ToServiceCommand<B>) {
647		match command {
648			ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
649				self.strategy.set_sync_fork_request(peers, &hash, number);
650			},
651			ToServiceCommand::EventStream(tx) => {
652				// Let a new subscriber know about already connected peers.
653				for peer_id in self.peers.keys() {
654					let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
655				}
656				self.event_streams.push(tx);
657			},
658			ToServiceCommand::RequestJustification(hash, number) => {
659				self.strategy.request_justification(&hash, number)
660			},
661			ToServiceCommand::ClearJustificationRequests => {
662				self.strategy.clear_justification_requests()
663			},
664			ToServiceCommand::BlocksProcessed(imported, count, results) => {
665				self.strategy.on_blocks_processed(imported, count, results);
666			},
667			ToServiceCommand::JustificationImported(peer_id, hash, number, import_result) => {
668				let success = matches!(
669					import_result,
670					soil_client::import::JustificationImportResult::Success
671				);
672				self.strategy.on_justification_import(hash, number, success);
673
674				match import_result {
675					soil_client::import::JustificationImportResult::OutdatedJustification => {
676						log::info!(
677							target: LOG_TARGET,
678							"💔 Outdated justification provided by {peer_id} for #{hash}",
679						);
680					},
681					soil_client::import::JustificationImportResult::Failure => {
682						log::info!(
683							target: LOG_TARGET,
684							"💔 Invalid justification provided by {peer_id} for #{hash}",
685						);
686						self.network_service
687							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
688						self.network_service.report_peer(
689							peer_id,
690							ReputationChange::new_fatal("Invalid justification"),
691						);
692					},
693					soil_client::import::JustificationImportResult::Success => {
694						log::debug!(
695							target: LOG_TARGET,
696							"Justification for block #{hash} ({number}) imported from {peer_id} successfully",
697						);
698					},
699				}
700			},
701			ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
702			ToServiceCommand::NewBestBlockImported(hash, number) => {
703				log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
704
705				self.strategy.update_chain_info(&hash, number);
706				let _ = self.notification_service.try_set_handshake(
707					BlockAnnouncesHandshake::<B>::build(
708						self.roles,
709						number,
710						hash,
711						self.genesis_hash,
712					)
713					.encode(),
714				);
715			},
716			ToServiceCommand::Status(tx) => {
717				let _ = tx.send(self.strategy.status());
718			},
719			ToServiceCommand::NumActivePeers(tx) => {
720				let _ = tx.send(self.num_active_peers());
721			},
722			ToServiceCommand::NumDownloadedBlocks(tx) => {
723				let _ = tx.send(self.strategy.num_downloaded_blocks());
724			},
725			ToServiceCommand::NumSyncRequests(tx) => {
726				let _ = tx.send(self.strategy.num_sync_requests());
727			},
728			ToServiceCommand::PeersInfo(tx) => {
729				let peers_info =
730					self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
731				let _ = tx.send(peers_info);
732			},
733			ToServiceCommand::OnBlockFinalized(hash, header) => {
734				self.strategy.on_block_finalized(&hash, *header.number())
735			},
736		}
737	}
738
739	fn process_notification_event(&mut self, event: NotificationEvent) {
740		match event {
741			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
742				let validation_result = self
743					.validate_connection(&peer, handshake, Direction::Inbound)
744					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
745
746				let _ = result_tx.send(validation_result);
747			},
748			NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
749				log::debug!(
750					target: LOG_TARGET,
751					"Substream opened for {peer}, handshake {handshake:?}"
752				);
753
754				match self.validate_connection(&peer, handshake, direction) {
755					Ok(handshake) => {
756						if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
757							log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
758							self.network_service
759								.disconnect_peer(peer, self.block_announce_protocol_name.clone());
760						}
761					},
762					Err(wrong_genesis) => {
763						log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
764
765						if wrong_genesis {
766							self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
767						}
768
769						self.network_service
770							.disconnect_peer(peer, self.block_announce_protocol_name.clone());
771					},
772				}
773			},
774			NotificationEvent::NotificationStreamClosed { peer } => {
775				self.on_sync_peer_disconnected(peer);
776			},
777			NotificationEvent::NotificationReceived { peer, notification } => {
778				if !self.peers.contains_key(&peer) {
779					log::error!(
780						target: LOG_TARGET,
781						"received notification from {peer} who had been earlier refused by `SyncingEngine`",
782					);
783					return;
784				}
785
786				let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
787					log::warn!(target: LOG_TARGET, "failed to decode block announce");
788					return;
789				};
790
791				self.push_block_announce_validation(peer, announce);
792			},
793		}
794	}
795
796	/// Called by peer when it is disconnecting.
797	///
798	/// Returns a result if the handshake of this peer was indeed accepted.
799	fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
800		let Some(info) = self.peers.remove(&peer_id) else {
801			log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
802			return;
803		};
804		if let Some(metrics) = &self.metrics {
805			metrics.peers.dec();
806		}
807		self.num_connected.fetch_sub(1, Ordering::AcqRel);
808
809		if self.important_peers.contains(&peer_id) {
810			log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
811		} else {
812			log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
813		}
814
815		if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id)
816			&& info.inbound
817			&& info.info.roles.is_full()
818		{
819			match self.num_in_peers.checked_sub(1) {
820				Some(value) => {
821					self.num_in_peers = value;
822				},
823				None => {
824					log::error!(
825						target: LOG_TARGET,
826						"trying to disconnect an inbound node which is not counted as inbound"
827					);
828					debug_assert!(false);
829				},
830			}
831		}
832
833		self.strategy.remove_peer(&peer_id);
834		self.pending_responses.remove_all(&peer_id);
835		self.event_streams
836			.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
837	}
838
839	/// Validate received handshake.
840	fn validate_handshake(
841		&mut self,
842		peer_id: &PeerId,
843		handshake: Vec<u8>,
844	) -> Result<BlockAnnouncesHandshake<B>, bool> {
845		log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
846
847		let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
848			.map_err(|error| {
849				log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
850				false
851			})?;
852
853		if handshake.genesis_hash != self.genesis_hash {
854			if self.important_peers.contains(&peer_id) {
855				log::error!(
856					target: LOG_TARGET,
857					"Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
858					self.genesis_hash,
859					handshake.genesis_hash,
860				);
861			} else if self.boot_node_ids.contains(&peer_id) {
862				log::error!(
863					target: LOG_TARGET,
864					"Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
865					self.genesis_hash,
866					handshake.genesis_hash,
867				);
868			} else {
869				log::debug!(
870					target: LOG_TARGET,
871					"Peer is on different chain (our genesis: {} theirs: {})",
872					self.genesis_hash,
873					handshake.genesis_hash
874				);
875			}
876
877			return Err(true);
878		}
879
880		Ok(handshake)
881	}
882
883	/// Validate connection.
884	// NOTE Returning `Err(bool)` is a really ugly hack to work around the issue
885	// that `ProtocolController` thinks the peer is connected when in fact it can
886	// still be under validation. If the peer has different genesis than the
887	// local node the validation fails but the peer cannot be reported in
888	// `validate_connection()` as that is also called by
889	// `ValidateInboundSubstream` which means that the peer is still being
890	// validated and banning the peer when handling that event would
891	// result in peer getting dropped twice.
892	//
893	// The proper way to fix this is to integrate `ProtocolController` more
894	// tightly with `NotificationService` or add an additional API call for
895	// banning pre-accepted peers (which is not desirable)
896	fn validate_connection(
897		&mut self,
898		peer_id: &PeerId,
899		handshake: Vec<u8>,
900		direction: Direction,
901	) -> Result<BlockAnnouncesHandshake<B>, bool> {
902		log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
903
904		let handshake = self.validate_handshake(peer_id, handshake)?;
905
906		if self.peers.contains_key(&peer_id) {
907			log::error!(
908				target: LOG_TARGET,
909				"Called `validate_connection()` with already connected peer {peer_id}",
910			);
911			debug_assert!(false);
912			return Err(false);
913		}
914
915		let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
916		let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
917
918		if handshake.roles.is_full()
919			&& self.strategy.num_peers()
920				>= self.default_peers_set_num_full
921					+ self.default_peers_set_no_slot_connected_peers.len()
922					+ this_peer_reserved_slot
923		{
924			log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
925			return Err(false);
926		}
927
928		// make sure to accept no more than `--in-peers` many full nodes
929		if !no_slot_peer
930			&& handshake.roles.is_full()
931			&& direction.is_inbound()
932			&& self.num_in_peers == self.max_in_peers
933		{
934			log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
935			return Err(false);
936		}
937
938		// make sure that all slots are not occupied by light peers
939		//
940		// `ChainSync` only accepts full peers whereas `SyncingEngine` accepts both full and light
941		// peers. Verify that there is a slot in `SyncingEngine` for the inbound light peer
942		if handshake.roles.is_light()
943			&& (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
944		{
945			log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
946			return Err(false);
947		}
948
949		Ok(handshake)
950	}
951
952	/// Called on the first connection between two peers on the default set, after their exchange
953	/// of handshake.
954	///
955	/// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync
956	/// from.
957	fn on_sync_peer_connected(
958		&mut self,
959		peer_id: PeerId,
960		status: &BlockAnnouncesHandshake<B>,
961		direction: Direction,
962	) -> Result<(), ()> {
963		log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
964
965		let peer = Peer {
966			info: ExtendedPeerInfo {
967				roles: status.roles,
968				best_hash: status.best_hash,
969				best_number: status.best_number,
970			},
971			known_blocks: LruHashSet::new(
972				NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
973			),
974			inbound: direction.is_inbound(),
975		};
976
977		// Only forward full peers to syncing strategy.
978		if status.roles.is_full() {
979			self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
980		}
981
982		log::debug!(target: LOG_TARGET, "Connected {peer_id}");
983
984		if self.peers.insert(peer_id, peer).is_none() {
985			if let Some(metrics) = &self.metrics {
986				metrics.peers.inc();
987			}
988			self.num_connected.fetch_add(1, Ordering::AcqRel);
989		}
990		self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
991
992		if self.default_peers_set_no_slot_peers.contains(&peer_id) {
993			self.default_peers_set_no_slot_connected_peers.insert(peer_id);
994		} else if direction.is_inbound() && status.roles.is_full() {
995			self.num_in_peers += 1;
996		}
997
998		self.event_streams
999			.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
1000
1001		Ok(())
1002	}
1003
1004	fn process_response_event(&mut self, response_event: ResponseEvent) {
1005		let ResponseEvent { peer_id, key, response: response_result } = response_event;
1006
1007		match response_result {
1008			Ok(Ok((response, protocol_name))) => {
1009				self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
1010			},
1011			Ok(Err(e)) => {
1012				debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1013
1014				match e {
1015					RequestFailure::Network(OutboundFailure::Timeout) => {
1016						self.network_service.report_peer(peer_id, rep::TIMEOUT);
1017						self.network_service
1018							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1019					},
1020					RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1021						self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1022						self.network_service
1023							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1024					},
1025					RequestFailure::Network(OutboundFailure::DialFailure) => {
1026						self.network_service
1027							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1028					},
1029					RequestFailure::Refused => {
1030						self.network_service.report_peer(peer_id, rep::REFUSED);
1031						self.network_service
1032							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1033					},
1034					RequestFailure::Network(OutboundFailure::ConnectionClosed)
1035					| RequestFailure::NotConnected => {
1036						self.network_service
1037							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1038					},
1039					RequestFailure::UnknownProtocol => {
1040						debug_assert!(false, "Block request protocol should always be known.");
1041					},
1042					RequestFailure::Obsolete => {
1043						debug_assert!(
1044							false,
1045							"Can not receive `RequestFailure::Obsolete` after dropping the \
1046							response receiver.",
1047						);
1048					},
1049					RequestFailure::Network(OutboundFailure::Io(_)) => {
1050						self.network_service.report_peer(peer_id, rep::IO);
1051						self.network_service
1052							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1053					},
1054				}
1055			},
1056			Err(oneshot::Canceled) => {
1057				trace!(
1058					target: LOG_TARGET,
1059					"Request to peer {peer_id:?} failed due to oneshot being canceled.",
1060				);
1061				self.network_service
1062					.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1063			},
1064		}
1065	}
1066
1067	/// Returns the number of peers we're connected to and that are being queried.
1068	fn num_active_peers(&self) -> usize {
1069		self.pending_responses.len()
1070	}
1071
1072	/// Get config for the block announcement protocol
1073	fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1074		protocol_id: ProtocolId,
1075		fork_id: Option<&str>,
1076		roles: Roles,
1077		best_number: NumberFor<B>,
1078		best_hash: B::Hash,
1079		genesis_hash: B::Hash,
1080		set_config: &SetConfig,
1081		metrics: NotificationMetrics,
1082		peer_store_handle: Arc<dyn PeerStoreProvider>,
1083	) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1084		let block_announces_protocol = {
1085			let genesis_hash = genesis_hash.as_ref();
1086			if let Some(fork_id) = fork_id {
1087				format!(
1088					"/{}/{}/block-announces/1",
1089					array_bytes::bytes2hex("", genesis_hash),
1090					fork_id
1091				)
1092			} else {
1093				format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1094			}
1095		};
1096
1097		N::notification_config(
1098			block_announces_protocol.into(),
1099			iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1100			MAX_BLOCK_ANNOUNCE_SIZE,
1101			Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1102				roles,
1103				best_number,
1104				best_hash,
1105				genesis_hash,
1106			))),
1107			set_config.clone(),
1108			metrics,
1109			peer_store_handle,
1110		)
1111	}
1112
1113	/// Import blocks.
1114	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1115		if let Some(metrics) = &self.metrics {
1116			metrics.import_queue_blocks_submitted.inc();
1117		}
1118
1119		self.import_queue.import_blocks(origin, blocks);
1120	}
1121
1122	/// Import justifications.
1123	fn import_justifications(
1124		&mut self,
1125		peer_id: PeerId,
1126		hash: B::Hash,
1127		number: NumberFor<B>,
1128		justifications: Justifications,
1129	) {
1130		if let Some(metrics) = &self.metrics {
1131			metrics.import_queue_justifications_submitted.inc();
1132		}
1133
1134		self.import_queue
1135			.import_justifications(peer_id.into(), hash, number, justifications);
1136	}
1137}