pezsc_network_sync/
engine.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `SyncingEngine` is the actor responsible for syncing Bizinikiwi chain
20//! to tip and keep the blockchain up to date with network updates.
21
22use crate::{
23	block_announce_validator::{
24		BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
25	},
26	pending_responses::{PendingResponses, ResponseEvent},
27	service::{
28		self,
29		syncing_service::{SyncingService, ToServiceCommand},
30	},
31	strategy::{SyncingAction, SyncingStrategy},
32	types::{BadPeer, ExtendedPeerInfo, SyncEvent},
33	LOG_TARGET,
34};
35
36use codec::{Decode, DecodeAll, Encode};
37use futures::{channel::oneshot, StreamExt};
38use log::{debug, error, trace, warn};
39use prometheus_endpoint::{
40	register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
41};
42use schnellru::{ByLength, LruMap};
43use tokio::time::{Interval, MissedTickBehavior};
44
45use pezsc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
46use pezsc_consensus::{import_queue::ImportQueueService, IncomingBlock};
47use pezsc_network::{
48	config::{FullNetworkConfiguration, NotificationHandshake, ProtocolId, SetConfig},
49	peer_store::PeerStoreProvider,
50	request_responses::{OutboundFailure, RequestFailure},
51	service::{
52		traits::{Direction, NotificationConfig, NotificationEvent, ValidationResult},
53		NotificationMetrics,
54	},
55	types::ProtocolName,
56	utils::LruHashSet,
57	NetworkBackend, NotificationService, ReputationChange,
58};
59use pezsc_network_common::{
60	role::Roles,
61	sync::message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
62};
63use pezsc_network_types::PeerId;
64use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
65use pezsp_blockchain::{Error as ClientError, HeaderMetadata};
66use pezsp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
67use pezsp_runtime::{
68	traits::{Block as BlockT, Header, NumberFor, Zero},
69	Justifications,
70};
71
72use std::{
73	collections::{HashMap, HashSet},
74	iter,
75	num::NonZeroUsize,
76	sync::{
77		atomic::{AtomicBool, AtomicUsize, Ordering},
78		Arc,
79	},
80};
81
82/// Interval at which we perform time based maintenance
83const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);
84
85/// Maximum number of known block hashes to keep for a peer.
86const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
87
88/// Maximum allowed size for a block announce.
89const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
90
91mod rep {
92	use pezsc_network::ReputationChange as Rep;
93	/// Peer has different genesis.
94	pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
95	/// Peer send us a block announcement that failed at validation.
96	pub const BAD_BLOCK_ANNOUNCEMENT: Rep = Rep::new(-(1 << 12), "Bad block announcement");
97	/// Peer is on unsupported protocol version.
98	pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
99	/// Reputation change when a peer refuses a request.
100	pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
101	/// Reputation change when a peer doesn't respond in time to our messages.
102	pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
103	/// Reputation change when a peer connection failed with IO error.
104	pub const IO: Rep = Rep::new(-(1 << 10), "IO error during request");
105}
106
107struct Metrics {
108	peers: Gauge<U64>,
109	import_queue_blocks_submitted: Counter<U64>,
110	import_queue_justifications_submitted: Counter<U64>,
111}
112
113impl Metrics {
114	fn register(r: &Registry, major_syncing: Arc<AtomicBool>) -> Result<Self, PrometheusError> {
115		MajorSyncingGauge::register(r, major_syncing)?;
116		Ok(Self {
117			peers: {
118				let g = Gauge::new("bizinikiwi_sync_peers", "Number of peers we sync with")?;
119				register(g, r)?
120			},
121			import_queue_blocks_submitted: {
122				let c = Counter::new(
123					"bizinikiwi_sync_import_queue_blocks_submitted",
124					"Number of blocks submitted to the import queue.",
125				)?;
126				register(c, r)?
127			},
128			import_queue_justifications_submitted: {
129				let c = Counter::new(
130					"bizinikiwi_sync_import_queue_justifications_submitted",
131					"Number of justifications submitted to the import queue.",
132				)?;
133				register(c, r)?
134			},
135		})
136	}
137}
138
139/// The "major syncing" metric.
140#[derive(Clone)]
141pub struct MajorSyncingGauge(Arc<AtomicBool>);
142
143impl MajorSyncingGauge {
144	/// Registers the [`MajorSyncGauge`] metric whose value is
145	/// obtained from the given `AtomicBool`.
146	fn register(registry: &Registry, value: Arc<AtomicBool>) -> Result<(), PrometheusError> {
147		prometheus_endpoint::register(
148			SourcedGauge::new(
149				&Opts::new(
150					"bizinikiwi_sub_libp2p_is_major_syncing",
151					"Whether the node is performing a major sync or not.",
152				),
153				MajorSyncingGauge(value),
154			)?,
155			registry,
156		)?;
157
158		Ok(())
159	}
160}
161
162impl MetricSource for MajorSyncingGauge {
163	type N = u64;
164
165	fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
166		set(&[], self.0.load(Ordering::Relaxed) as u64);
167	}
168}
169
170/// Peer information
171#[derive(Debug)]
172pub struct Peer<B: BlockT> {
173	pub info: ExtendedPeerInfo<B>,
174	/// Holds a set of blocks known to this peer.
175	pub known_blocks: LruHashSet<B::Hash>,
176	/// Is the peer inbound.
177	inbound: bool,
178}
179
180pub struct SyncingEngine<B: BlockT, Client> {
181	/// Syncing strategy.
182	strategy: Box<dyn SyncingStrategy<B>>,
183
184	/// Blockchain client.
185	client: Arc<Client>,
186
187	/// Number of peers we're connected to.
188	num_connected: Arc<AtomicUsize>,
189
190	/// Are we actively catching up with the chain?
191	is_major_syncing: Arc<AtomicBool>,
192
193	/// Network service.
194	network_service: service::network::NetworkServiceHandle,
195
196	/// Channel for receiving service commands
197	service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
198
199	/// Assigned roles.
200	roles: Roles,
201
202	/// Genesis hash.
203	genesis_hash: B::Hash,
204
205	/// Set of channels for other protocols that have subscribed to syncing events.
206	event_streams: Vec<TracingUnboundedSender<SyncEvent>>,
207
208	/// Interval at which we call `tick`.
209	tick_timeout: Interval,
210
211	/// All connected peers. Contains both full and light node peers.
212	peers: HashMap<PeerId, Peer<B>>,
213
214	/// List of nodes for which we perform additional logging because they are important for the
215	/// user.
216	important_peers: HashSet<PeerId>,
217
218	/// Actual list of connected no-slot nodes.
219	default_peers_set_no_slot_connected_peers: HashSet<PeerId>,
220
221	/// List of nodes that should never occupy peer slots.
222	default_peers_set_no_slot_peers: HashSet<PeerId>,
223
224	/// Value that was passed as part of the configuration. Used to cap the number of full
225	/// nodes.
226	default_peers_set_num_full: usize,
227
228	/// Number of slots to allocate to light nodes.
229	default_peers_set_num_light: usize,
230
231	/// Maximum number of inbound peers.
232	max_in_peers: usize,
233
234	/// Number of inbound peers accepted so far.
235	num_in_peers: usize,
236
237	/// Async processor of block announce validations.
238	block_announce_validator: BlockAnnounceValidatorStream<B>,
239
240	/// A cache for the data that was associated to a block announcement.
241	block_announce_data_cache: LruMap<B::Hash, Vec<u8>>,
242
243	/// The `PeerId`'s of all boot nodes.
244	boot_node_ids: HashSet<PeerId>,
245
246	/// Protocol name used for block announcements
247	block_announce_protocol_name: ProtocolName,
248
249	/// Prometheus metrics.
250	metrics: Option<Metrics>,
251
252	/// Handle that is used to communicate with `pezsc_network::Notifications`.
253	notification_service: Box<dyn NotificationService>,
254
255	/// Handle to `PeerStore`.
256	peer_store_handle: Arc<dyn PeerStoreProvider>,
257
258	/// Pending responses
259	pending_responses: PendingResponses,
260
261	/// Handle to import queue.
262	import_queue: Box<dyn ImportQueueService<B>>,
263}
264
265impl<B: BlockT, Client> SyncingEngine<B, Client>
266where
267	B: BlockT,
268	Client: HeaderBackend<B>
269		+ BlockBackend<B>
270		+ HeaderMetadata<B, Error = pezsp_blockchain::Error>
271		+ ProofProvider<B>
272		+ Send
273		+ Sync
274		+ 'static,
275{
276	pub fn new<N>(
277		roles: Roles,
278		client: Arc<Client>,
279		metrics_registry: Option<&Registry>,
280		network_metrics: NotificationMetrics,
281		net_config: &FullNetworkConfiguration<B, <B as BlockT>::Hash, N>,
282		protocol_id: ProtocolId,
283		fork_id: Option<&str>,
284		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
285		syncing_strategy: Box<dyn SyncingStrategy<B>>,
286		network_service: service::network::NetworkServiceHandle,
287		import_queue: Box<dyn ImportQueueService<B>>,
288		peer_store_handle: Arc<dyn PeerStoreProvider>,
289	) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
290	where
291		N: NetworkBackend<B, <B as BlockT>::Hash>,
292	{
293		let cache_capacity = (net_config.network_config.default_peers_set.in_peers
294			+ net_config.network_config.default_peers_set.out_peers)
295			.max(1);
296		let important_peers = {
297			let mut imp_p = HashSet::new();
298			for reserved in &net_config.network_config.default_peers_set.reserved_nodes {
299				imp_p.insert(reserved.peer_id);
300			}
301			for config in net_config.notification_protocols() {
302				let peer_ids = config.set_config().reserved_nodes.iter().map(|info| info.peer_id);
303				imp_p.extend(peer_ids);
304			}
305
306			imp_p.shrink_to_fit();
307			imp_p
308		};
309		let boot_node_ids = {
310			let mut list = HashSet::new();
311			for node in &net_config.network_config.boot_nodes {
312				list.insert(node.peer_id);
313			}
314			list.shrink_to_fit();
315			list
316		};
317		let default_peers_set_no_slot_peers = {
318			let mut no_slot_p: HashSet<PeerId> = net_config
319				.network_config
320				.default_peers_set
321				.reserved_nodes
322				.iter()
323				.map(|reserved| reserved.peer_id)
324				.collect();
325			no_slot_p.shrink_to_fit();
326			no_slot_p
327		};
328		let default_peers_set_num_full =
329			net_config.network_config.default_peers_set_num_full as usize;
330		let default_peers_set_num_light = {
331			let total = net_config.network_config.default_peers_set.out_peers
332				+ net_config.network_config.default_peers_set.in_peers;
333			total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
334		};
335
336		let info = client.info();
337
338		let (block_announce_config, notification_service) =
339			Self::get_block_announce_proto_config::<N>(
340				protocol_id,
341				fork_id,
342				roles,
343				info.best_number,
344				info.best_hash,
345				info.genesis_hash,
346				&net_config.network_config.default_peers_set,
347				network_metrics,
348				Arc::clone(&peer_store_handle),
349			);
350
351		let block_announce_protocol_name = block_announce_config.protocol_name().clone();
352		let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
353		let num_connected = Arc::new(AtomicUsize::new(0));
354		let is_major_syncing = Arc::new(AtomicBool::new(false));
355
356		// `default_peers_set.in_peers` contains an unspecified amount of light peers so the number
357		// of full inbound peers must be calculated from the total full peer count
358		let max_full_peers = net_config.network_config.default_peers_set_num_full;
359		let max_out_peers = net_config.network_config.default_peers_set.out_peers;
360		let max_in_peers = (max_full_peers - max_out_peers) as usize;
361
362		let tick_timeout = {
363			let mut interval = tokio::time::interval(TICK_TIMEOUT);
364			interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
365			interval
366		};
367
368		Ok((
369			Self {
370				roles,
371				client,
372				strategy: syncing_strategy,
373				network_service,
374				peers: HashMap::new(),
375				block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
376				block_announce_protocol_name,
377				block_announce_validator: BlockAnnounceValidatorStream::new(
378					block_announce_validator,
379				),
380				num_connected: num_connected.clone(),
381				is_major_syncing: is_major_syncing.clone(),
382				service_rx,
383				genesis_hash: info.genesis_hash,
384				important_peers,
385				default_peers_set_no_slot_connected_peers: HashSet::new(),
386				boot_node_ids,
387				default_peers_set_no_slot_peers,
388				default_peers_set_num_full,
389				default_peers_set_num_light,
390				num_in_peers: 0usize,
391				max_in_peers,
392				event_streams: Vec::new(),
393				notification_service,
394				tick_timeout,
395				peer_store_handle,
396				metrics: if let Some(r) = metrics_registry {
397					match Metrics::register(r, is_major_syncing.clone()) {
398						Ok(metrics) => Some(metrics),
399						Err(err) => {
400							log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
401							None
402						},
403					}
404				} else {
405					None
406				},
407				pending_responses: PendingResponses::new(),
408				import_queue,
409			},
410			SyncingService::new(tx, num_connected, is_major_syncing),
411			block_announce_config,
412		))
413	}
414
415	fn update_peer_info(
416		&mut self,
417		peer_id: &PeerId,
418		best_hash: B::Hash,
419		best_number: NumberFor<B>,
420	) {
421		if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
422			peer.info.best_hash = best_hash;
423			peer.info.best_number = best_number;
424		}
425	}
426
427	/// Process the result of the block announce validation.
428	fn process_block_announce_validation_result(
429		&mut self,
430		validation_result: BlockAnnounceValidationResult<B::Header>,
431	) {
432		match validation_result {
433			BlockAnnounceValidationResult::Skip { peer_id: _ } => {},
434			BlockAnnounceValidationResult::Process { is_new_best, peer_id, announce } => {
435				if let Some((best_hash, best_number)) =
436					self.strategy.on_validated_block_announce(is_new_best, peer_id, &announce)
437				{
438					self.update_peer_info(&peer_id, best_hash, best_number);
439				}
440
441				if let Some(data) = announce.data {
442					if !data.is_empty() {
443						self.block_announce_data_cache.insert(announce.header.hash(), data);
444					}
445				}
446			},
447			BlockAnnounceValidationResult::Failure { peer_id, disconnect } => {
448				if disconnect {
449					log::debug!(
450						target: LOG_TARGET,
451						"Disconnecting peer {peer_id} due to block announce validation failure",
452					);
453					self.network_service
454						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
455				}
456
457				self.network_service.report_peer(peer_id, rep::BAD_BLOCK_ANNOUNCEMENT);
458			},
459		}
460	}
461
462	/// Push a block announce validation.
463	pub fn push_block_announce_validation(
464		&mut self,
465		peer_id: PeerId,
466		announce: BlockAnnounce<B::Header>,
467	) {
468		let hash = announce.header.hash();
469
470		let peer = match self.peers.get_mut(&peer_id) {
471			Some(p) => p,
472			None => {
473				log::error!(
474					target: LOG_TARGET,
475					"Received block announce from disconnected peer {peer_id}",
476				);
477				debug_assert!(false);
478				return;
479			},
480		};
481		peer.known_blocks.insert(hash);
482
483		if peer.info.roles.is_full() {
484			let is_best = match announce.state.unwrap_or(BlockState::Best) {
485				BlockState::Best => true,
486				BlockState::Normal => false,
487			};
488
489			self.block_announce_validator
490				.push_block_announce_validation(peer_id, hash, announce, is_best);
491		}
492	}
493
494	/// Make sure an important block is propagated to peers.
495	///
496	/// In chain-based consensus, we often need to make sure non-best forks are
497	/// at least temporarily synced.
498	pub fn announce_block(&mut self, hash: B::Hash, data: Option<Vec<u8>>) {
499		let header = match self.client.header(hash) {
500			Ok(Some(header)) => header,
501			Ok(None) => {
502				log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
503				return;
504			},
505			Err(e) => {
506				log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
507				return;
508			},
509		};
510
511		// don't announce genesis block since it will be ignored
512		if header.number().is_zero() {
513			return;
514		}
515
516		let is_best = self.client.info().best_hash == hash;
517		log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");
518
519		let data = data
520			.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
521			.unwrap_or_default();
522
523		for (peer_id, ref mut peer) in self.peers.iter_mut() {
524			let inserted = peer.known_blocks.insert(hash);
525			if inserted {
526				log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
527				let message = BlockAnnounce {
528					header: header.clone(),
529					state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
530					data: Some(data.clone()),
531				};
532
533				let _ = self.notification_service.send_sync_notification(peer_id, message.encode());
534			}
535		}
536	}
537
538	pub async fn run(mut self) {
539		loop {
540			tokio::select! {
541				_ = self.tick_timeout.tick() => {
542					// TODO: This tick should not be necessary, but
543					//  `self.process_strategy_actions()` is not called in some cases otherwise and
544					//  some tests fail because of this
545				},
546				command = self.service_rx.select_next_some() =>
547					self.process_service_command(command),
548				notification_event = self.notification_service.next_event() => match notification_event {
549					Some(event) => self.process_notification_event(event),
550					None => {
551						error!(
552							target: LOG_TARGET,
553							"Terminating `SyncingEngine` because `NotificationService` has terminated.",
554						);
555
556						return;
557					}
558				},
559				response_event = self.pending_responses.select_next_some() =>
560					self.process_response_event(response_event),
561				validation_result = self.block_announce_validator.select_next_some() =>
562					self.process_block_announce_validation_result(validation_result),
563			}
564
565			// Update atomic variables
566			self.is_major_syncing.store(self.strategy.is_major_syncing(), Ordering::Relaxed);
567
568			// Process actions requested by a syncing strategy.
569			if let Err(e) = self.process_strategy_actions() {
570				error!(
571					target: LOG_TARGET,
572					"Terminating `SyncingEngine` due to fatal error: {e:?}.",
573				);
574				return;
575			}
576		}
577	}
578
579	fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
580		for action in self.strategy.actions(&self.network_service)? {
581			match action {
582				SyncingAction::StartRequest { peer_id, key, request, remove_obsolete } => {
583					if !self.peers.contains_key(&peer_id) {
584						trace!(
585							target: LOG_TARGET,
586							"Cannot start request with strategy key {key:?} to unknown peer \
587							{peer_id}",
588						);
589						debug_assert!(false);
590						continue;
591					}
592					if remove_obsolete {
593						if self.pending_responses.remove(peer_id, key) {
594							warn!(
595								target: LOG_TARGET,
596								"Processed `SyncingAction::StartRequest` to {peer_id} with \
597								strategy key {key:?}. Stale response removed!",
598							)
599						} else {
600							trace!(
601								target: LOG_TARGET,
602								"Processed `SyncingAction::StartRequest` to {peer_id} with \
603								strategy key {key:?}.",
604							)
605						}
606					}
607
608					self.pending_responses.insert(peer_id, key, request);
609				},
610				SyncingAction::CancelRequest { peer_id, key } => {
611					let removed = self.pending_responses.remove(peer_id, key);
612
613					trace!(
614						target: LOG_TARGET,
615						"Processed `SyncingAction::CancelRequest`, response removed: {removed}.",
616					);
617				},
618				SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
619					self.pending_responses.remove_all(&peer_id);
620					self.network_service
621						.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
622					self.network_service.report_peer(peer_id, rep);
623
624					trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}.");
625				},
626				SyncingAction::ImportBlocks { origin, blocks } => {
627					let count = blocks.len();
628					self.import_blocks(origin, blocks);
629
630					trace!(
631						target: LOG_TARGET,
632						"Processed `ChainSyncAction::ImportBlocks` with {count} blocks.",
633					);
634				},
635				SyncingAction::ImportJustifications { peer_id, hash, number, justifications } => {
636					self.import_justifications(peer_id, hash, number, justifications);
637
638					trace!(
639						target: LOG_TARGET,
640						"Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).",
641						peer_id,
642						hash,
643						number,
644					)
645				},
646				// Nothing to do, this is handled internally by `PezkuwiSyncingStrategy`.
647				SyncingAction::Finished => {},
648			}
649		}
650
651		Ok(())
652	}
653
654	fn process_service_command(&mut self, command: ToServiceCommand<B>) {
655		match command {
656			ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
657				self.strategy.set_sync_fork_request(peers, &hash, number);
658			},
659			ToServiceCommand::EventStream(tx) => {
660				// Let a new subscriber know about already connected peers.
661				for peer_id in self.peers.keys() {
662					let _ = tx.unbounded_send(SyncEvent::PeerConnected(*peer_id));
663				}
664				self.event_streams.push(tx);
665			},
666			ToServiceCommand::RequestJustification(hash, number) => {
667				self.strategy.request_justification(&hash, number)
668			},
669			ToServiceCommand::ClearJustificationRequests => {
670				self.strategy.clear_justification_requests()
671			},
672			ToServiceCommand::BlocksProcessed(imported, count, results) => {
673				self.strategy.on_blocks_processed(imported, count, results);
674			},
675			ToServiceCommand::JustificationImported(peer_id, hash, number, import_result) => {
676				let success =
677					matches!(import_result, pezsc_consensus::JustificationImportResult::Success);
678				self.strategy.on_justification_import(hash, number, success);
679
680				match import_result {
681					pezsc_consensus::JustificationImportResult::OutdatedJustification => {
682						log::info!(
683							target: LOG_TARGET,
684							"💔 Outdated justification provided by {peer_id} for #{hash}",
685						);
686					},
687					pezsc_consensus::JustificationImportResult::Failure => {
688						log::info!(
689							target: LOG_TARGET,
690							"💔 Invalid justification provided by {peer_id} for #{hash}",
691						);
692						self.network_service
693							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
694						self.network_service.report_peer(
695							peer_id,
696							ReputationChange::new_fatal("Invalid justification"),
697						);
698					},
699					pezsc_consensus::JustificationImportResult::Success => {
700						log::debug!(
701							target: LOG_TARGET,
702							"Justification for block #{hash} ({number}) imported from {peer_id} successfully",
703						);
704					},
705				}
706			},
707			ToServiceCommand::AnnounceBlock(hash, data) => self.announce_block(hash, data),
708			ToServiceCommand::NewBestBlockImported(hash, number) => {
709				log::debug!(target: LOG_TARGET, "New best block imported {:?}/#{}", hash, number);
710
711				self.strategy.update_chain_info(&hash, number);
712				let _ = self.notification_service.try_set_handshake(
713					BlockAnnouncesHandshake::<B>::build(
714						self.roles,
715						number,
716						hash,
717						self.genesis_hash,
718					)
719					.encode(),
720				);
721			},
722			ToServiceCommand::Status(tx) => {
723				let _ = tx.send(self.strategy.status());
724			},
725			ToServiceCommand::NumActivePeers(tx) => {
726				let _ = tx.send(self.num_active_peers());
727			},
728			ToServiceCommand::NumDownloadedBlocks(tx) => {
729				let _ = tx.send(self.strategy.num_downloaded_blocks());
730			},
731			ToServiceCommand::NumSyncRequests(tx) => {
732				let _ = tx.send(self.strategy.num_sync_requests());
733			},
734			ToServiceCommand::PeersInfo(tx) => {
735				let peers_info =
736					self.peers.iter().map(|(peer_id, peer)| (*peer_id, peer.info)).collect();
737				let _ = tx.send(peers_info);
738			},
739			ToServiceCommand::OnBlockFinalized(hash, header) => {
740				self.strategy.on_block_finalized(&hash, *header.number())
741			},
742		}
743	}
744
745	fn process_notification_event(&mut self, event: NotificationEvent) {
746		match event {
747			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
748				let validation_result = self
749					.validate_connection(&peer, handshake, Direction::Inbound)
750					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
751
752				let _ = result_tx.send(validation_result);
753			},
754			NotificationEvent::NotificationStreamOpened { peer, handshake, direction, .. } => {
755				log::debug!(
756					target: LOG_TARGET,
757					"Substream opened for {peer}, handshake {handshake:?}"
758				);
759
760				match self.validate_connection(&peer, handshake, direction) {
761					Ok(handshake) => {
762						if self.on_sync_peer_connected(peer, &handshake, direction).is_err() {
763							log::debug!(target: LOG_TARGET, "Failed to register peer {peer}");
764							self.network_service
765								.disconnect_peer(peer, self.block_announce_protocol_name.clone());
766						}
767					},
768					Err(wrong_genesis) => {
769						log::debug!(target: LOG_TARGET, "`SyncingEngine` rejected {peer}");
770
771						if wrong_genesis {
772							self.peer_store_handle.report_peer(peer, rep::GENESIS_MISMATCH);
773						}
774
775						self.network_service
776							.disconnect_peer(peer, self.block_announce_protocol_name.clone());
777					},
778				}
779			},
780			NotificationEvent::NotificationStreamClosed { peer } => {
781				self.on_sync_peer_disconnected(peer);
782			},
783			NotificationEvent::NotificationReceived { peer, notification } => {
784				if !self.peers.contains_key(&peer) {
785					log::error!(
786						target: LOG_TARGET,
787						"received notification from {peer} who had been earlier refused by `SyncingEngine`",
788					);
789					return;
790				}
791
792				let Ok(announce) = BlockAnnounce::decode(&mut notification.as_ref()) else {
793					log::warn!(target: LOG_TARGET, "failed to decode block announce");
794					return;
795				};
796
797				self.push_block_announce_validation(peer, announce);
798			},
799		}
800	}
801
802	/// Called by peer when it is disconnecting.
803	///
804	/// Returns a result if the handshake of this peer was indeed accepted.
805	fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) {
806		let Some(info) = self.peers.remove(&peer_id) else {
807			log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
808			return;
809		};
810		if let Some(metrics) = &self.metrics {
811			metrics.peers.dec();
812		}
813		self.num_connected.fetch_sub(1, Ordering::AcqRel);
814
815		if self.important_peers.contains(&peer_id) {
816			log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
817		} else {
818			log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
819		}
820
821		if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id)
822			&& info.inbound
823			&& info.info.roles.is_full()
824		{
825			match self.num_in_peers.checked_sub(1) {
826				Some(value) => {
827					self.num_in_peers = value;
828				},
829				None => {
830					log::error!(
831						target: LOG_TARGET,
832						"trying to disconnect an inbound node which is not counted as inbound"
833					);
834					debug_assert!(false);
835				},
836			}
837		}
838
839		self.strategy.remove_peer(&peer_id);
840		self.pending_responses.remove_all(&peer_id);
841		self.event_streams
842			.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
843	}
844
845	/// Validate received handshake.
846	fn validate_handshake(
847		&mut self,
848		peer_id: &PeerId,
849		handshake: Vec<u8>,
850	) -> Result<BlockAnnouncesHandshake<B>, bool> {
851		log::trace!(target: LOG_TARGET, "Validate handshake for {peer_id}");
852
853		let handshake = <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &handshake[..])
854			.map_err(|error| {
855				log::debug!(target: LOG_TARGET, "Failed to decode handshake for {peer_id}: {error:?}");
856				false
857			})?;
858
859		if handshake.genesis_hash != self.genesis_hash {
860			if self.important_peers.contains(&peer_id) {
861				log::error!(
862					target: LOG_TARGET,
863					"Reserved peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
864					self.genesis_hash,
865					handshake.genesis_hash,
866				);
867			} else if self.boot_node_ids.contains(&peer_id) {
868				log::error!(
869					target: LOG_TARGET,
870					"Bootnode with peer id `{peer_id}` is on a different chain (our genesis: {} theirs: {})",
871					self.genesis_hash,
872					handshake.genesis_hash,
873				);
874			} else {
875				log::debug!(
876					target: LOG_TARGET,
877					"Peer is on different chain (our genesis: {} theirs: {})",
878					self.genesis_hash,
879					handshake.genesis_hash
880				);
881			}
882
883			return Err(true);
884		}
885
886		Ok(handshake)
887	}
888
889	/// Validate connection.
890	// NOTE Returning `Err(bool)` is a really ugly hack to work around the issue
891	// that `ProtocolController` thinks the peer is connected when in fact it can
892	// still be under validation. If the peer has different genesis than the
893	// local node the validation fails but the peer cannot be reported in
894	// `validate_connection()` as that is also called by
895	// `ValidateInboundSubstream` which means that the peer is still being
896	// validated and banning the peer when handling that event would
897	// result in peer getting dropped twice.
898	//
899	// The proper way to fix this is to integrate `ProtocolController` more
900	// tightly with `NotificationService` or add an additional API call for
901	// banning pre-accepted peers (which is not desirable)
902	fn validate_connection(
903		&mut self,
904		peer_id: &PeerId,
905		handshake: Vec<u8>,
906		direction: Direction,
907	) -> Result<BlockAnnouncesHandshake<B>, bool> {
908		log::trace!(target: LOG_TARGET, "New peer {peer_id} {handshake:?}");
909
910		let handshake = self.validate_handshake(peer_id, handshake)?;
911
912		if self.peers.contains_key(&peer_id) {
913			log::error!(
914				target: LOG_TARGET,
915				"Called `validate_connection()` with already connected peer {peer_id}",
916			);
917			debug_assert!(false);
918			return Err(false);
919		}
920
921		let no_slot_peer = self.default_peers_set_no_slot_peers.contains(&peer_id);
922		let this_peer_reserved_slot: usize = if no_slot_peer { 1 } else { 0 };
923
924		if handshake.roles.is_full()
925			&& self.strategy.num_peers()
926				>= self.default_peers_set_num_full
927					+ self.default_peers_set_no_slot_connected_peers.len()
928					+ this_peer_reserved_slot
929		{
930			log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
931			return Err(false);
932		}
933
934		// make sure to accept no more than `--in-peers` many full nodes
935		if !no_slot_peer
936			&& handshake.roles.is_full()
937			&& direction.is_inbound()
938			&& self.num_in_peers == self.max_in_peers
939		{
940			log::debug!(target: LOG_TARGET, "All inbound slots have been consumed, rejecting {peer_id}");
941			return Err(false);
942		}
943
944		// make sure that all slots are not occupied by light peers
945		//
946		// `ChainSync` only accepts full peers whereas `SyncingEngine` accepts both full and light
947		// peers. Verify that there is a slot in `SyncingEngine` for the inbound light peer
948		if handshake.roles.is_light()
949			&& (self.peers.len() - self.strategy.num_peers()) >= self.default_peers_set_num_light
950		{
951			log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
952			return Err(false);
953		}
954
955		Ok(handshake)
956	}
957
958	/// Called on the first connection between two peers on the default set, after their exchange
959	/// of handshake.
960	///
961	/// Returns `Ok` if the handshake is accepted and the peer added to the list of peers we sync
962	/// from.
963	fn on_sync_peer_connected(
964		&mut self,
965		peer_id: PeerId,
966		status: &BlockAnnouncesHandshake<B>,
967		direction: Direction,
968	) -> Result<(), ()> {
969		log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");
970
971		let peer = Peer {
972			info: ExtendedPeerInfo {
973				roles: status.roles,
974				best_hash: status.best_hash,
975				best_number: status.best_number,
976			},
977			known_blocks: LruHashSet::new(
978				NonZeroUsize::new(MAX_KNOWN_BLOCKS).expect("Constant is nonzero"),
979			),
980			inbound: direction.is_inbound(),
981		};
982
983		// Only forward full peers to syncing strategy.
984		if status.roles.is_full() {
985			self.strategy.add_peer(peer_id, peer.info.best_hash, peer.info.best_number);
986		}
987
988		log::debug!(target: LOG_TARGET, "Connected {peer_id}");
989
990		if self.peers.insert(peer_id, peer).is_none() {
991			if let Some(metrics) = &self.metrics {
992				metrics.peers.inc();
993			}
994			self.num_connected.fetch_add(1, Ordering::AcqRel);
995		}
996		self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());
997
998		if self.default_peers_set_no_slot_peers.contains(&peer_id) {
999			self.default_peers_set_no_slot_connected_peers.insert(peer_id);
1000		} else if direction.is_inbound() && status.roles.is_full() {
1001			self.num_in_peers += 1;
1002		}
1003
1004		self.event_streams
1005			.retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok());
1006
1007		Ok(())
1008	}
1009
1010	fn process_response_event(&mut self, response_event: ResponseEvent) {
1011		let ResponseEvent { peer_id, key, response: response_result } = response_event;
1012
1013		match response_result {
1014			Ok(Ok((response, protocol_name))) => {
1015				self.strategy.on_generic_response(&peer_id, key, protocol_name, response);
1016			},
1017			Ok(Err(e)) => {
1018				debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}.");
1019
1020				match e {
1021					RequestFailure::Network(OutboundFailure::Timeout) => {
1022						self.network_service.report_peer(peer_id, rep::TIMEOUT);
1023						self.network_service
1024							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1025					},
1026					RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1027						self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL);
1028						self.network_service
1029							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1030					},
1031					RequestFailure::Network(OutboundFailure::DialFailure) => {
1032						self.network_service
1033							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1034					},
1035					RequestFailure::Refused => {
1036						self.network_service.report_peer(peer_id, rep::REFUSED);
1037						self.network_service
1038							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1039					},
1040					RequestFailure::Network(OutboundFailure::ConnectionClosed)
1041					| RequestFailure::NotConnected => {
1042						self.network_service
1043							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1044					},
1045					RequestFailure::UnknownProtocol => {
1046						debug_assert!(false, "Block request protocol should always be known.");
1047					},
1048					RequestFailure::Obsolete => {
1049						debug_assert!(
1050							false,
1051							"Can not receive `RequestFailure::Obsolete` after dropping the \
1052							response receiver.",
1053						);
1054					},
1055					RequestFailure::Network(OutboundFailure::Io(_)) => {
1056						self.network_service.report_peer(peer_id, rep::IO);
1057						self.network_service
1058							.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1059					},
1060				}
1061			},
1062			Err(oneshot::Canceled) => {
1063				trace!(
1064					target: LOG_TARGET,
1065					"Request to peer {peer_id:?} failed due to oneshot being canceled.",
1066				);
1067				self.network_service
1068					.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
1069			},
1070		}
1071	}
1072
1073	/// Returns the number of peers we're connected to and that are being queried.
1074	fn num_active_peers(&self) -> usize {
1075		self.pending_responses.len()
1076	}
1077
1078	/// Get config for the block announcement protocol
1079	fn get_block_announce_proto_config<N: NetworkBackend<B, <B as BlockT>::Hash>>(
1080		protocol_id: ProtocolId,
1081		fork_id: Option<&str>,
1082		roles: Roles,
1083		best_number: NumberFor<B>,
1084		best_hash: B::Hash,
1085		genesis_hash: B::Hash,
1086		set_config: &SetConfig,
1087		metrics: NotificationMetrics,
1088		peer_store_handle: Arc<dyn PeerStoreProvider>,
1089	) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
1090		let block_announces_protocol = {
1091			let genesis_hash = genesis_hash.as_ref();
1092			if let Some(fork_id) = fork_id {
1093				format!(
1094					"/{}/{}/block-announces/1",
1095					array_bytes::bytes2hex("", genesis_hash),
1096					fork_id
1097				)
1098			} else {
1099				format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
1100			}
1101		};
1102
1103		N::notification_config(
1104			block_announces_protocol.into(),
1105			iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1106			MAX_BLOCK_ANNOUNCE_SIZE,
1107			Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
1108				roles,
1109				best_number,
1110				best_hash,
1111				genesis_hash,
1112			))),
1113			set_config.clone(),
1114			metrics,
1115			peer_store_handle,
1116		)
1117	}
1118
1119	/// Import blocks.
1120	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
1121		if let Some(metrics) = &self.metrics {
1122			metrics.import_queue_blocks_submitted.inc();
1123		}
1124
1125		self.import_queue.import_blocks(origin, blocks);
1126	}
1127
1128	/// Import justifications.
1129	fn import_justifications(
1130		&mut self,
1131		peer_id: PeerId,
1132		hash: B::Hash,
1133		number: NumberFor<B>,
1134		justifications: Justifications,
1135	) {
1136		if let Some(metrics) = &self.metrics {
1137			metrics.import_queue_justifications_submitted.inc();
1138		}
1139
1140		self.import_queue.import_justifications(peer_id, hash, number, justifications);
1141	}
1142}