Skip to main content

soil_network/
service.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Main entry point of the soil-network crate.
8//!
9//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
10//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
11//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
12//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
13//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
14//!
15//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
16//! which is then processed by [`NetworkWorker::next_action`].
17
18use crate::{
19	behaviour::{self, Behaviour, BehaviourOut},
20	bitswap::BitswapRequestHandler,
21	config::{
22		parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
23		NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
24	},
25	discovery::DiscoveryConfig,
26	error::Error,
27	event::{DhtEvent, Event},
28	network_state::{
29		NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
30	},
31	peer_store::{PeerStore, PeerStoreProvider},
32	protocol::{self, Protocol, Ready},
33	protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
34	request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
35	service::{
36		signature::{Signature, SigningError},
37		traits::{
38			BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
39			NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
40			NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
41			NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
42		},
43	},
44	transport,
45	types::ProtocolName,
46	NotificationService, ReputationChange,
47};
48
49use crate::types::kad::{Key as KademliaKey, Record};
50use codec::DecodeAll;
51use futures::{channel::oneshot, prelude::*};
52use libp2p::{
53	connection_limits::{ConnectionLimits, Exceeded},
54	core::{upgrade, ConnectedPoint, Endpoint},
55	identify::Info as IdentifyInfo,
56	identity::ed25519,
57	multiaddr::{self, Multiaddr},
58	swarm::{
59		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
60		NetworkBehaviour, Swarm, SwarmEvent,
61	},
62	PeerId,
63};
64use log::{debug, error, info, trace, warn};
65use metrics::{Histogram, MetricSources, Metrics};
66use parking_lot::Mutex;
67use soil_prometheus::Registry;
68
69use crate::common::{
70	role::{ObservedRole, Roles},
71	ExHashT,
72};
73use soil_client::client_api::BlockBackend;
74use soil_client::utils::mpsc::{
75	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
76};
77use subsoil::runtime::traits::Block as BlockT;
78
79pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
80pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
81pub use metrics::NotificationMetrics;
82pub use protocol::NotificationsSink;
83use std::{
84	collections::{HashMap, HashSet},
85	fs, iter,
86	marker::PhantomData,
87	num::NonZeroUsize,
88	pin::Pin,
89	str,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97pub(crate) mod metrics;
98pub(crate) mod out_events;
99
100pub mod signature;
101pub mod traits;
102
103/// Logging target for the file.
104const LOG_TARGET: &str = "sub-libp2p";
105
106struct Libp2pBandwidthSink {
107	#[allow(deprecated)]
108	sink: Arc<transport::BandwidthSinks>,
109}
110
111impl BandwidthSink for Libp2pBandwidthSink {
112	fn total_inbound(&self) -> u64 {
113		self.sink.total_inbound()
114	}
115
116	fn total_outbound(&self) -> u64 {
117		self.sink.total_outbound()
118	}
119}
120
121/// Substrate network service. Handles network IO and manages connectivity.
122pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
123	/// Number of peers we're connected to.
124	num_connected: Arc<AtomicUsize>,
125	/// The local external addresses.
126	external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
127	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
128	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
129	/// Local copy of the `PeerId` of the local node.
130	local_peer_id: PeerId,
131	/// The `KeyPair` that defines the `PeerId` of the local node.
132	local_identity: Keypair,
133	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
134	bandwidth: Arc<dyn BandwidthSink>,
135	/// Channel that sends messages to the actual worker.
136	to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
137	/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
138	/// initialization.
139	notification_protocol_ids: HashMap<ProtocolName, SetId>,
140	/// Handles to manage peer connections on notification protocols. The vector never changes
141	/// after initialization.
142	protocol_handles: Vec<protocol_controller::ProtocolHandle>,
143	/// Shortcut to sync protocol handle (`protocol_handles[0]`).
144	sync_protocol_handle: protocol_controller::ProtocolHandle,
145	/// Handle to `PeerStore`.
146	peer_store_handle: Arc<dyn PeerStoreProvider>,
147	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
148	/// compatibility.
149	_marker: PhantomData<H>,
150	/// Marker for block type
151	_block: PhantomData<B>,
152}
153
154#[async_trait::async_trait]
155impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
156where
157	B: BlockT + 'static,
158	H: ExHashT,
159{
160	type NotificationProtocolConfig = NonDefaultSetConfig;
161	type RequestResponseProtocolConfig = RequestResponseConfig;
162	type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
163	type PeerStore = PeerStore;
164	type BitswapConfig = RequestResponseConfig;
165
166	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
167	where
168		Self: Sized,
169	{
170		NetworkWorker::new(params)
171	}
172
173	/// Get handle to `NetworkService` of the `NetworkBackend`.
174	fn network_service(&self) -> Arc<dyn NetworkServiceT> {
175		self.service.clone()
176	}
177
178	/// Create `PeerStore`.
179	fn peer_store(
180		bootnodes: Vec<crate::types::PeerId>,
181		metrics_registry: Option<Registry>,
182	) -> Self::PeerStore {
183		PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
184	}
185
186	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
187		NotificationMetrics::new(registry)
188	}
189
190	fn bitswap_server(
191		client: Arc<dyn BlockBackend<B> + Send + Sync>,
192	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
193		let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
194
195		(Box::pin(async move { handler.run().await }), protocol_config)
196	}
197
198	/// Create notification protocol configuration.
199	fn notification_config(
200		protocol_name: ProtocolName,
201		fallback_names: Vec<ProtocolName>,
202		max_notification_size: u64,
203		handshake: Option<NotificationHandshake>,
204		set_config: SetConfig,
205		_metrics: NotificationMetrics,
206		_peerstore_handle: Arc<dyn PeerStoreProvider>,
207	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
208		NonDefaultSetConfig::new(
209			protocol_name,
210			fallback_names,
211			max_notification_size,
212			handshake,
213			set_config,
214		)
215	}
216
217	/// Create request-response protocol configuration.
218	fn request_response_config(
219		protocol_name: ProtocolName,
220		fallback_names: Vec<ProtocolName>,
221		max_request_size: u64,
222		max_response_size: u64,
223		request_timeout: Duration,
224		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
225	) -> Self::RequestResponseProtocolConfig {
226		Self::RequestResponseProtocolConfig {
227			name: protocol_name,
228			fallback_names,
229			max_request_size,
230			max_response_size,
231			request_timeout,
232			inbound_queue,
233		}
234	}
235
236	/// Start [`NetworkBackend`] event loop.
237	async fn run(mut self) {
238		self.run().await
239	}
240}
241
242impl<B, H> NetworkWorker<B, H>
243where
244	B: BlockT + 'static,
245	H: ExHashT,
246{
247	/// Creates the network service.
248	///
249	/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
250	/// for the network processing to advance. From it, you can extract a `NetworkService` using
251	/// `worker.service()`. The `NetworkService` can be shared through the codebase.
252	pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
253		let peer_store_handle = params.network_config.peer_store_handle();
254		let FullNetworkConfiguration {
255			notification_protocols,
256			request_response_protocols,
257			mut network_config,
258			..
259		} = params.network_config;
260
261		// Private and public keys configuration.
262		let local_identity = network_config.node_key.clone().into_keypair()?;
263		let local_public = local_identity.public();
264		let local_peer_id = local_public.to_peer_id();
265
266		// Convert to libp2p types.
267		let local_identity: ed25519::Keypair = local_identity.into();
268		let local_public: ed25519::PublicKey = local_public.into();
269		let local_peer_id: PeerId = local_peer_id.into();
270
271		network_config.boot_nodes = network_config
272			.boot_nodes
273			.into_iter()
274			.filter(|boot_node| boot_node.peer_id != crate::PeerId::from(&local_peer_id))
275			.collect();
276		network_config.default_peers_set.reserved_nodes = network_config
277			.default_peers_set
278			.reserved_nodes
279			.into_iter()
280			.filter(|reserved_node| {
281				if reserved_node.peer_id == crate::PeerId::from(&local_peer_id) {
282					warn!(
283						target: LOG_TARGET,
284						"Local peer ID used in reserved node, ignoring: {}",
285						reserved_node,
286					);
287					false
288				} else {
289					true
290				}
291			})
292			.collect();
293
294		// Ensure the listen addresses are consistent with the transport.
295		ensure_addresses_consistent_with_transport(
296			network_config.listen_addresses.iter(),
297			&network_config.transport,
298		)?;
299		ensure_addresses_consistent_with_transport(
300			network_config.boot_nodes.iter().map(|x| &x.multiaddr),
301			&network_config.transport,
302		)?;
303		ensure_addresses_consistent_with_transport(
304			network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
305			&network_config.transport,
306		)?;
307		for notification_protocol in &notification_protocols {
308			ensure_addresses_consistent_with_transport(
309				notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
310				&network_config.transport,
311			)?;
312		}
313		ensure_addresses_consistent_with_transport(
314			network_config.public_addresses.iter(),
315			&network_config.transport,
316		)?;
317
318		let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
319
320		if let Some(path) = &network_config.net_config_path {
321			fs::create_dir_all(path)?;
322		}
323
324		info!(
325			target: LOG_TARGET,
326			"🏷  Local node identity is: {}",
327			local_peer_id.to_base58(),
328		);
329		info!(target: LOG_TARGET, "Running libp2p network backend");
330
331		let (transport, bandwidth) = {
332			let config_mem = match network_config.transport {
333				TransportConfig::MemoryOnly => true,
334				TransportConfig::Normal { .. } => false,
335			};
336
337			transport::build_transport(local_identity.clone().into(), config_mem)
338		};
339
340		let (to_notifications, from_protocol_controllers) =
341			tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
342
343		// We must prepend a hardcoded default peer set to notification protocols.
344		let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
345			.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
346
347		let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
348			.enumerate()
349			.map(|(set_id, set_config)| {
350				let proto_set_config = ProtoSetConfig {
351					in_peers: set_config.in_peers,
352					out_peers: set_config.out_peers,
353					reserved_nodes: set_config
354						.reserved_nodes
355						.iter()
356						.map(|node| node.peer_id.into())
357						.collect(),
358					reserved_only: set_config.non_reserved_mode.is_reserved_only(),
359				};
360
361				ProtocolController::new(
362					SetId::from(set_id),
363					proto_set_config,
364					to_notifications.clone(),
365					Arc::clone(&peer_store_handle),
366				)
367			})
368			.unzip();
369
370		// Shortcut to default (sync) peer set protocol handle.
371		let sync_protocol_handle = protocol_handles[0].clone();
372
373		// Spawn `ProtocolController` runners.
374		protocol_controllers
375			.into_iter()
376			.for_each(|controller| (params.executor)(controller.run().boxed()));
377
378		// Protocol name to protocol id mapping. The first protocol is always block announce (sync)
379		// protocol, aka default (hardcoded) peer set.
380		let notification_protocol_ids: HashMap<ProtocolName, SetId> =
381			iter::once(&params.block_announce_config)
382				.chain(notification_protocols.iter())
383				.enumerate()
384				.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
385				.collect();
386
387		let known_addresses = {
388			// Collect all reserved nodes and bootnodes addresses.
389			let mut addresses: Vec<_> = network_config
390				.default_peers_set
391				.reserved_nodes
392				.iter()
393				.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
394				.chain(notification_protocols.iter().flat_map(|protocol| {
395					protocol
396						.set_config()
397						.reserved_nodes
398						.iter()
399						.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
400				}))
401				.chain(
402					network_config
403						.boot_nodes
404						.iter()
405						.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
406				)
407				.collect();
408
409			// Remove possible duplicates.
410			addresses.sort();
411			addresses.dedup();
412
413			addresses
414		};
415
416		// Check for duplicate bootnodes.
417		network_config.boot_nodes.iter().try_for_each(|bootnode| {
418			if let Some(other) = network_config
419				.boot_nodes
420				.iter()
421				.filter(|o| o.multiaddr == bootnode.multiaddr)
422				.find(|o| o.peer_id != bootnode.peer_id)
423			{
424				Err(Error::DuplicateBootnode {
425					address: bootnode.multiaddr.clone().into(),
426					first_id: bootnode.peer_id.into(),
427					second_id: other.peer_id.into(),
428				})
429			} else {
430				Ok(())
431			}
432		})?;
433
434		// List of bootnode multiaddresses.
435		let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
436
437		for bootnode in network_config.boot_nodes.iter() {
438			boot_node_ids
439				.entry(bootnode.peer_id.into())
440				.or_default()
441				.push(bootnode.multiaddr.clone().into());
442		}
443
444		let boot_node_ids = Arc::new(boot_node_ids);
445
446		let num_connected = Arc::new(AtomicUsize::new(0));
447		let external_addresses = Arc::new(Mutex::new(HashSet::new()));
448
449		let (protocol, notif_protocol_handles) = Protocol::new(
450			From::from(&params.role),
451			params.notification_metrics,
452			notification_protocols,
453			params.block_announce_config,
454			Arc::clone(&peer_store_handle),
455			protocol_handles.clone(),
456			from_protocol_controllers,
457		)?;
458
459		// Build the swarm.
460		let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
461			let user_agent =
462				format!("{} ({})", network_config.client_version, network_config.node_name);
463
464			let discovery_config = {
465				let mut config = DiscoveryConfig::new(local_peer_id);
466				config.with_permanent_addresses(
467					known_addresses
468						.iter()
469						.map(|(peer, address)| (peer.into(), address.clone().into()))
470						.collect::<Vec<_>>(),
471				);
472				config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
473				config.with_kademlia(
474					params.genesis_hash,
475					params.fork_id.as_deref(),
476					&params.protocol_id,
477				);
478				config.with_dht_random_walk(network_config.enable_dht_random_walk);
479				config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
480				config.use_kademlia_disjoint_query_paths(
481					network_config.kademlia_disjoint_query_paths,
482				);
483				config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
484
485				match network_config.transport {
486					TransportConfig::MemoryOnly => {
487						config.with_mdns(false);
488						config.allow_private_ip(false);
489					},
490					TransportConfig::Normal {
491						enable_mdns,
492						allow_private_ip: allow_private_ipv4,
493						..
494					} => {
495						config.with_mdns(enable_mdns);
496						config.allow_private_ip(allow_private_ipv4);
497					},
498				}
499
500				config
501			};
502
503			let behaviour = {
504				let result = Behaviour::new(
505					protocol,
506					user_agent,
507					local_public.into(),
508					discovery_config,
509					request_response_protocols,
510					Arc::clone(&peer_store_handle),
511					external_addresses.clone(),
512					network_config.public_addresses.iter().cloned().map(Into::into).collect(),
513					ConnectionLimits::default()
514						.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
515						.with_max_established_incoming(Some(
516							crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
517						)),
518				);
519
520				match result {
521					Ok(b) => b,
522					Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
523						return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
524					},
525				}
526			};
527
528			let swarm = {
529				struct SpawnImpl<F>(F);
530				impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
531					fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
532						(self.0)(f)
533					}
534				}
535
536				let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
537					.with_substream_upgrade_protocol_override(upgrade::Version::V1)
538					.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
539					// NOTE: 24 is somewhat arbitrary and should be tuned in the future if
540					// necessary. See <https://github.com/paritytech/substrate/pull/6080>
541					.with_per_connection_event_buffer_size(24)
542					.with_max_negotiating_inbound_streams(2048)
543					.with_idle_connection_timeout(network_config.idle_connection_timeout);
544
545				Swarm::new(transport, behaviour, local_peer_id, config)
546			};
547
548			(swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
549		};
550
551		// Initialize the metrics.
552		let metrics = match &params.metrics_registry {
553			Some(registry) => Some(metrics::register(
554				registry,
555				MetricSources {
556					bandwidth: bandwidth.clone(),
557					connected_peers: num_connected.clone(),
558				},
559			)?),
560			None => None,
561		};
562
563		// Listen on multiaddresses.
564		for addr in &network_config.listen_addresses {
565			if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
566				warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
567			}
568		}
569
570		// Add external addresses.
571		for addr in &network_config.public_addresses {
572			Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
573		}
574
575		let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
576
577		let service = Arc::new(NetworkService {
578			bandwidth,
579			external_addresses,
580			listen_addresses: listen_addresses_set.clone(),
581			num_connected: num_connected.clone(),
582			local_peer_id,
583			local_identity: local_identity.into(),
584			to_worker,
585			notification_protocol_ids,
586			protocol_handles,
587			sync_protocol_handle,
588			peer_store_handle: Arc::clone(&peer_store_handle),
589			_marker: PhantomData,
590			_block: Default::default(),
591		});
592
593		Ok(NetworkWorker {
594			listen_addresses: listen_addresses_set,
595			num_connected,
596			network_service: swarm,
597			service,
598			from_service,
599			event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
600			metrics,
601			boot_node_ids,
602			reported_invalid_boot_nodes: Default::default(),
603			peer_store_handle: Arc::clone(&peer_store_handle),
604			notif_protocol_handles,
605			_marker: Default::default(),
606			_block: Default::default(),
607		})
608	}
609
610	/// High-level network status information.
611	pub fn status(&self) -> NetworkStatus {
612		NetworkStatus {
613			num_connected_peers: self.num_connected_peers(),
614			total_bytes_inbound: self.total_bytes_inbound(),
615			total_bytes_outbound: self.total_bytes_outbound(),
616		}
617	}
618
619	/// Returns the total number of bytes received so far.
620	pub fn total_bytes_inbound(&self) -> u64 {
621		self.service.bandwidth.total_inbound()
622	}
623
624	/// Returns the total number of bytes sent so far.
625	pub fn total_bytes_outbound(&self) -> u64 {
626		self.service.bandwidth.total_outbound()
627	}
628
629	/// Returns the number of peers we're connected to.
630	pub fn num_connected_peers(&self) -> usize {
631		self.network_service.behaviour().user_protocol().num_sync_peers()
632	}
633
634	/// Adds an address for a node.
635	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
636		self.network_service.behaviour_mut().add_known_address(peer_id, addr);
637	}
638
639	/// Return a `NetworkService` that can be shared through the code base and can be used to
640	/// manipulate the worker.
641	pub fn service(&self) -> &Arc<NetworkService<B, H>> {
642		&self.service
643	}
644
645	/// Returns the local `PeerId`.
646	pub fn local_peer_id(&self) -> &PeerId {
647		Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
648	}
649
650	/// Returns the list of addresses we are listening on.
651	///
652	/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
653	pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
654		Swarm::<Behaviour<B>>::listeners(&self.network_service)
655	}
656
657	/// Get network state.
658	///
659	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
660	/// everywhere about this. Please don't use this function to retrieve actual information.
661	pub fn network_state(&mut self) -> NetworkState {
662		let swarm = &mut self.network_service;
663		let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
664		let connected_peers = {
665			let swarm = &mut *swarm;
666			open.iter()
667				.filter_map(move |peer_id| {
668					let known_addresses = if let Ok(addrs) =
669						NetworkBehaviour::handle_pending_outbound_connection(
670							swarm.behaviour_mut(),
671							ConnectionId::new_unchecked(0), // dummy value
672							Some(*peer_id),
673							&vec![],
674							Endpoint::Listener,
675						) {
676						addrs.into_iter().collect()
677					} else {
678						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
679						return None;
680					};
681
682					let endpoint = if let Some(e) =
683						swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
684					{
685						e.clone().into()
686					} else {
687						error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
688						and debug information about {:?}", peer_id);
689						return None;
690					};
691
692					Some((
693						peer_id.to_base58(),
694						NetworkStatePeer {
695							endpoint,
696							version_string: swarm
697								.behaviour_mut()
698								.node(peer_id)
699								.and_then(|i| i.client_version().map(|s| s.to_owned())),
700							latest_ping_time: swarm
701								.behaviour_mut()
702								.node(peer_id)
703								.and_then(|i| i.latest_ping()),
704							known_addresses,
705						},
706					))
707				})
708				.collect()
709		};
710
711		let not_connected_peers = {
712			let swarm = &mut *swarm;
713			swarm
714				.behaviour_mut()
715				.known_peers()
716				.into_iter()
717				.filter(|p| open.iter().all(|n| n != p))
718				.map(move |peer_id| {
719					let known_addresses = if let Ok(addrs) =
720						NetworkBehaviour::handle_pending_outbound_connection(
721							swarm.behaviour_mut(),
722							ConnectionId::new_unchecked(0), // dummy value
723							Some(peer_id),
724							&vec![],
725							Endpoint::Listener,
726						) {
727						addrs.into_iter().collect()
728					} else {
729						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
730						Default::default()
731					};
732
733					(
734						peer_id.to_base58(),
735						NetworkStateNotConnectedPeer {
736							version_string: swarm
737								.behaviour_mut()
738								.node(&peer_id)
739								.and_then(|i| i.client_version().map(|s| s.to_owned())),
740							latest_ping_time: swarm
741								.behaviour_mut()
742								.node(&peer_id)
743								.and_then(|i| i.latest_ping()),
744							known_addresses,
745						},
746					)
747				})
748				.collect()
749		};
750
751		let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
752		let listened_addresses = swarm.listeners().cloned().collect();
753		let external_addresses = swarm.external_addresses().cloned().collect();
754
755		NetworkState {
756			peer_id,
757			listened_addresses,
758			external_addresses,
759			connected_peers,
760			not_connected_peers,
761			// TODO: Check what info we can include here.
762			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
763			peerset: serde_json::json!(
764				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
765			),
766		}
767	}
768
769	/// Removes a `PeerId` from the list of reserved peers.
770	pub fn remove_reserved_peer(&self, peer: PeerId) {
771		self.service.remove_reserved_peer(peer.into());
772	}
773
774	/// Adds a `PeerId` and its `Multiaddr` as reserved.
775	pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
776		self.service.add_reserved_peer(peer)
777	}
778}
779
780impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
781	/// Get network state.
782	///
783	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
784	/// everywhere about this. Please don't use this function to retrieve actual information.
785	///
786	/// Returns an error if the `NetworkWorker` is no longer running.
787	pub async fn network_state(&self) -> Result<NetworkState, ()> {
788		let (tx, rx) = oneshot::channel();
789
790		let _ = self
791			.to_worker
792			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
793
794		match rx.await {
795			Ok(v) => v.map_err(|_| ()),
796			// The channel can only be closed if the network worker no longer exists.
797			Err(_) => Err(()),
798		}
799	}
800
801	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
802	///
803	/// Returns an `Err` if one of the given addresses is invalid or contains an
804	/// invalid peer ID (which includes the local peer ID).
805	fn split_multiaddr_and_peer_id(
806		&self,
807		peers: HashSet<Multiaddr>,
808	) -> Result<Vec<(PeerId, Multiaddr)>, String> {
809		peers
810			.into_iter()
811			.map(|mut addr| {
812				let peer = match addr.pop() {
813					Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
814					_ => return Err("Missing PeerId from address".to_string()),
815				};
816
817				// Make sure the local peer ID is never added to the PSM
818				// or added as a "known address", even if given.
819				if peer == self.local_peer_id {
820					Err("Local peer ID in peer set.".to_string())
821				} else {
822					Ok((peer, addr))
823				}
824			})
825			.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
826	}
827}
828
829impl<B, H> NetworkStateInfo for NetworkService<B, H>
830where
831	B: subsoil::runtime::traits::Block,
832	H: ExHashT,
833{
834	/// Returns the local external addresses.
835	fn external_addresses(&self) -> Vec<crate::types::multiaddr::Multiaddr> {
836		self.external_addresses.lock().iter().cloned().map(Into::into).collect()
837	}
838
839	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
840	fn listen_addresses(&self) -> Vec<crate::types::multiaddr::Multiaddr> {
841		self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
842	}
843
844	/// Returns the local Peer ID.
845	fn local_peer_id(&self) -> crate::types::PeerId {
846		self.local_peer_id.into()
847	}
848}
849
850impl<B, H> NetworkSigner for NetworkService<B, H>
851where
852	B: subsoil::runtime::traits::Block,
853	H: ExHashT,
854{
855	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
856		let public_key = self.local_identity.public();
857		let bytes = self.local_identity.sign(msg.as_ref())?;
858
859		Ok(Signature {
860			public_key: crate::service::signature::PublicKey::Libp2p(public_key),
861			bytes,
862		})
863	}
864
865	fn verify(
866		&self,
867		peer_id: crate::types::PeerId,
868		public_key: &Vec<u8>,
869		signature: &Vec<u8>,
870		message: &Vec<u8>,
871	) -> Result<bool, String> {
872		let public_key =
873			PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
874		let peer_id: PeerId = peer_id.into();
875		let remote: libp2p::PeerId = public_key.to_peer_id();
876
877		Ok(peer_id == remote && public_key.verify(message, signature))
878	}
879}
880
881impl<B, H> NetworkDHTProvider for NetworkService<B, H>
882where
883	B: BlockT + 'static,
884	H: ExHashT,
885{
886	/// Start finding closest peerst to the target peer ID in the DHT.
887	///
888	/// This will generate either a `ClosestPeersFound` or a `ClosestPeersNotFound` event and pass
889	/// it as an item on the [`NetworkWorker`] stream.
890	fn find_closest_peers(&self, target: crate::types::PeerId) {
891		let _ = self
892			.to_worker
893			.unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
894	}
895
896	/// Start getting a value from the DHT.
897	///
898	/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an
899	/// item on the [`NetworkWorker`] stream.
900	fn get_value(&self, key: &KademliaKey) {
901		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
902	}
903
904	/// Start putting a value in the DHT.
905	///
906	/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an
907	/// item on the [`NetworkWorker`] stream.
908	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
909		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
910	}
911
912	fn put_record_to(
913		&self,
914		record: Record,
915		peers: HashSet<crate::types::PeerId>,
916		update_local_storage: bool,
917	) {
918		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
919			record,
920			peers,
921			update_local_storage,
922		});
923	}
924
925	fn store_record(
926		&self,
927		key: KademliaKey,
928		value: Vec<u8>,
929		publisher: Option<crate::types::PeerId>,
930		expires: Option<Instant>,
931	) {
932		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
933			key,
934			value,
935			publisher.map(Into::into),
936			expires,
937		));
938	}
939
940	fn start_providing(&self, key: KademliaKey) {
941		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
942	}
943
944	fn stop_providing(&self, key: KademliaKey) {
945		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
946	}
947
948	fn get_providers(&self, key: KademliaKey) {
949		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
950	}
951}
952
953#[async_trait::async_trait]
954impl<B, H> NetworkStatusProvider for NetworkService<B, H>
955where
956	B: BlockT + 'static,
957	H: ExHashT,
958{
959	async fn status(&self) -> Result<NetworkStatus, ()> {
960		let (tx, rx) = oneshot::channel();
961
962		let _ = self
963			.to_worker
964			.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
965
966		match rx.await {
967			Ok(v) => v.map_err(|_| ()),
968			// The channel can only be closed if the network worker no longer exists.
969			Err(_) => Err(()),
970		}
971	}
972
973	async fn network_state(&self) -> Result<NetworkState, ()> {
974		let (tx, rx) = oneshot::channel();
975
976		let _ = self
977			.to_worker
978			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
979
980		match rx.await {
981			Ok(v) => v.map_err(|_| ()),
982			// The channel can only be closed if the network worker no longer exists.
983			Err(_) => Err(()),
984		}
985	}
986}
987
988#[async_trait::async_trait]
989impl<B, H> NetworkPeers for NetworkService<B, H>
990where
991	B: BlockT + 'static,
992	H: ExHashT,
993{
994	fn set_authorized_peers(&self, peers: HashSet<crate::types::PeerId>) {
995		self.sync_protocol_handle
996			.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
997	}
998
999	fn set_authorized_only(&self, reserved_only: bool) {
1000		self.sync_protocol_handle.set_reserved_only(reserved_only);
1001	}
1002
1003	fn add_known_address(
1004		&self,
1005		peer_id: crate::types::PeerId,
1006		addr: crate::types::multiaddr::Multiaddr,
1007	) {
1008		let _ = self
1009			.to_worker
1010			.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1011	}
1012
1013	fn report_peer(&self, peer_id: crate::types::PeerId, cost_benefit: ReputationChange) {
1014		self.peer_store_handle.report_peer(peer_id, cost_benefit);
1015	}
1016
1017	fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32 {
1018		self.peer_store_handle.peer_reputation(peer_id)
1019	}
1020
1021	fn disconnect_peer(&self, peer_id: crate::types::PeerId, protocol: ProtocolName) {
1022		let _ = self
1023			.to_worker
1024			.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1025	}
1026
1027	fn accept_unreserved_peers(&self) {
1028		self.sync_protocol_handle.set_reserved_only(false);
1029	}
1030
1031	fn deny_unreserved_peers(&self) {
1032		self.sync_protocol_handle.set_reserved_only(true);
1033	}
1034
1035	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1036		// Make sure the local peer ID is never added as a reserved peer.
1037		if peer.peer_id == crate::PeerId::from(&self.local_peer_id) {
1038			return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1039		}
1040
1041		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1042			peer.peer_id.into(),
1043			peer.multiaddr.into(),
1044		));
1045		self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1046
1047		Ok(())
1048	}
1049
1050	fn remove_reserved_peer(&self, peer_id: crate::types::PeerId) {
1051		self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1052	}
1053
1054	fn set_reserved_peers(
1055		&self,
1056		protocol: ProtocolName,
1057		peers: HashSet<crate::types::multiaddr::Multiaddr>,
1058	) -> Result<(), String> {
1059		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1060			return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1061		};
1062
1063		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1064		let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1065
1066		let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1067
1068		for (peer_id, addr) in peers_addrs.into_iter() {
1069			// Make sure the local peer ID is never added to the PSM.
1070			if peer_id == self.local_peer_id {
1071				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1072			}
1073
1074			peers.insert(peer_id.into());
1075
1076			if !addr.is_empty() {
1077				let _ = self
1078					.to_worker
1079					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1080			}
1081		}
1082
1083		self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1084
1085		Ok(())
1086	}
1087
1088	fn add_peers_to_reserved_set(
1089		&self,
1090		protocol: ProtocolName,
1091		peers: HashSet<crate::types::multiaddr::Multiaddr>,
1092	) -> Result<(), String> {
1093		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1094			return Err(format!(
1095				"Cannot add peers to reserved set of unknown protocol: {}",
1096				protocol
1097			));
1098		};
1099
1100		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1101		let peers = self.split_multiaddr_and_peer_id(peers)?;
1102
1103		for (peer_id, addr) in peers.into_iter() {
1104			// Make sure the local peer ID is never added to the PSM.
1105			if peer_id == self.local_peer_id {
1106				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1107			}
1108
1109			if !addr.is_empty() {
1110				let _ = self
1111					.to_worker
1112					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1113			}
1114
1115			self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1116		}
1117
1118		Ok(())
1119	}
1120
1121	fn remove_peers_from_reserved_set(
1122		&self,
1123		protocol: ProtocolName,
1124		peers: Vec<crate::types::PeerId>,
1125	) -> Result<(), String> {
1126		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1127			return Err(format!(
1128				"Cannot remove peers from reserved set of unknown protocol: {}",
1129				protocol
1130			));
1131		};
1132
1133		for peer_id in peers.into_iter() {
1134			self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1135		}
1136
1137		Ok(())
1138	}
1139
1140	fn sync_num_connected(&self) -> usize {
1141		self.num_connected.load(Ordering::Relaxed)
1142	}
1143
1144	fn peer_role(&self, peer_id: crate::types::PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
1145		match Roles::decode_all(&mut &handshake[..]) {
1146			Ok(role) => Some(role.into()),
1147			Err(_) => {
1148				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1149				self.peer_store_handle.peer_role(&(peer_id.into()))
1150			},
1151		}
1152	}
1153
1154	/// Get the list of reserved peers.
1155	///
1156	/// Returns an error if the `NetworkWorker` is no longer running.
1157	async fn reserved_peers(&self) -> Result<Vec<crate::types::PeerId>, ()> {
1158		let (tx, rx) = oneshot::channel();
1159
1160		self.sync_protocol_handle.reserved_peers(tx);
1161
1162		// The channel can only be closed if `ProtocolController` no longer exists.
1163		rx.await
1164			.map(|peers| peers.into_iter().map(From::from).collect())
1165			.map_err(|_| ())
1166	}
1167}
1168
1169impl<B, H> NetworkEventStream for NetworkService<B, H>
1170where
1171	B: BlockT + 'static,
1172	H: ExHashT,
1173{
1174	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1175		let (tx, rx) = out_events::channel(name, 100_000);
1176		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1177		Box::pin(rx)
1178	}
1179}
1180
1181#[async_trait::async_trait]
1182impl<B, H> NetworkRequest for NetworkService<B, H>
1183where
1184	B: BlockT + 'static,
1185	H: ExHashT,
1186{
1187	async fn request(
1188		&self,
1189		target: crate::types::PeerId,
1190		protocol: ProtocolName,
1191		request: Vec<u8>,
1192		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1193		connect: IfDisconnected,
1194	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1195		let (tx, rx) = oneshot::channel();
1196
1197		self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1198
1199		match rx.await {
1200			Ok(v) => v,
1201			// The channel can only be closed if the network worker no longer exists. If the
1202			// network worker no longer exists, then all connections to `target` are necessarily
1203			// closed, and we legitimately report this situation as a "ConnectionClosed".
1204			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1205		}
1206	}
1207
1208	fn start_request(
1209		&self,
1210		target: crate::types::PeerId,
1211		protocol: ProtocolName,
1212		request: Vec<u8>,
1213		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1214		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1215		connect: IfDisconnected,
1216	) {
1217		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1218			target: target.into(),
1219			protocol: protocol.into(),
1220			request,
1221			fallback_request,
1222			pending_response: tx,
1223			connect,
1224		});
1225	}
1226}
1227
1228/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
1229#[must_use]
1230pub struct NotificationSender {
1231	sink: NotificationsSink,
1232
1233	/// Name of the protocol on the wire.
1234	protocol_name: ProtocolName,
1235
1236	/// Field extracted from the [`Metrics`] struct and necessary to report the
1237	/// notifications-related metrics.
1238	notification_size_metric: Option<Histogram>,
1239}
1240
1241#[async_trait::async_trait]
1242impl NotificationSenderT for NotificationSender {
1243	async fn ready(
1244		&self,
1245	) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1246		Ok(Box::new(NotificationSenderReady {
1247			ready: match self.sink.reserve_notification().await {
1248				Ok(r) => Some(r),
1249				Err(()) => return Err(NotificationSenderError::Closed),
1250			},
1251			peer_id: self.sink.peer_id(),
1252			protocol_name: &self.protocol_name,
1253			notification_size_metric: self.notification_size_metric.clone(),
1254		}))
1255	}
1256}
1257
1258/// Reserved slot in the notifications buffer, ready to accept data.
1259#[must_use]
1260pub struct NotificationSenderReady<'a> {
1261	ready: Option<Ready<'a>>,
1262
1263	/// Target of the notification.
1264	peer_id: &'a PeerId,
1265
1266	/// Name of the protocol on the wire.
1267	protocol_name: &'a ProtocolName,
1268
1269	/// Field extracted from the [`Metrics`] struct and necessary to report the
1270	/// notifications-related metrics.
1271	notification_size_metric: Option<Histogram>,
1272}
1273
1274impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1275	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1276		if let Some(notification_size_metric) = &self.notification_size_metric {
1277			notification_size_metric.observe(notification.len() as f64);
1278		}
1279
1280		trace!(
1281			target: LOG_TARGET,
1282			"External API => Notification({:?}, {}, {} bytes)",
1283			self.peer_id, self.protocol_name, notification.len(),
1284		);
1285		trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1286
1287		self.ready
1288			.take()
1289			.ok_or(NotificationSenderError::Closed)?
1290			.send(notification)
1291			.map_err(|()| NotificationSenderError::Closed)
1292	}
1293}
1294
1295/// Messages sent from the `NetworkService` to the `NetworkWorker`.
1296///
1297/// Each entry corresponds to a method of `NetworkService`.
1298enum ServiceToWorkerMsg {
1299	FindClosestPeers(PeerId),
1300	GetValue(KademliaKey),
1301	PutValue(KademliaKey, Vec<u8>),
1302	PutRecordTo {
1303		record: Record,
1304		peers: HashSet<crate::types::PeerId>,
1305		update_local_storage: bool,
1306	},
1307	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1308	StartProviding(KademliaKey),
1309	StopProviding(KademliaKey),
1310	GetProviders(KademliaKey),
1311	AddKnownAddress(PeerId, Multiaddr),
1312	EventStream(out_events::Sender),
1313	Request {
1314		target: PeerId,
1315		protocol: ProtocolName,
1316		request: Vec<u8>,
1317		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1318		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1319		connect: IfDisconnected,
1320	},
1321	NetworkStatus {
1322		pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1323	},
1324	NetworkState {
1325		pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1326	},
1327	DisconnectPeer(PeerId, ProtocolName),
1328}
1329
1330/// Main network worker. Must be polled in order for the network to advance.
1331///
1332/// You are encouraged to poll this in a separate background thread or task.
1333#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1334pub struct NetworkWorker<B, H>
1335where
1336	B: BlockT + 'static,
1337	H: ExHashT,
1338{
1339	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1340	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1341	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1342	num_connected: Arc<AtomicUsize>,
1343	/// The network service that can be extracted and shared through the codebase.
1344	service: Arc<NetworkService<B, H>>,
1345	/// The *actual* network.
1346	network_service: Swarm<Behaviour<B>>,
1347	/// Messages from the [`NetworkService`] that must be processed.
1348	from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1349	/// Senders for events that happen on the network.
1350	event_streams: out_events::OutChannels,
1351	/// Prometheus network metrics.
1352	metrics: Option<Metrics>,
1353	/// The `PeerId`'s of all boot nodes mapped to the registered addresses.
1354	boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1355	/// Boot nodes that we already have reported as invalid.
1356	reported_invalid_boot_nodes: HashSet<PeerId>,
1357	/// Peer reputation store handle.
1358	peer_store_handle: Arc<dyn PeerStoreProvider>,
1359	/// Notification protocol handles.
1360	notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1361	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
1362	/// compatibility.
1363	_marker: PhantomData<H>,
1364	/// Marker for block type
1365	_block: PhantomData<B>,
1366}
1367
1368impl<B, H> NetworkWorker<B, H>
1369where
1370	B: BlockT + 'static,
1371	H: ExHashT,
1372{
1373	/// Run the network.
1374	pub async fn run(mut self) {
1375		while self.next_action().await {}
1376	}
1377
1378	/// Perform one action on the network.
1379	///
1380	/// Returns `false` when the worker should be shutdown.
1381	/// Use in tests only.
1382	pub async fn next_action(&mut self) -> bool {
1383		futures::select! {
1384			// Next message from the service.
1385			msg = self.from_service.next() => {
1386				if let Some(msg) = msg {
1387					self.handle_worker_message(msg);
1388				} else {
1389					return false
1390				}
1391			},
1392			// Next event from `Swarm` (the stream guaranteed to never terminate).
1393			event = self.network_service.select_next_some() => {
1394				self.handle_swarm_event(event);
1395			},
1396		};
1397
1398		// Update the `num_connected` count shared with the `NetworkService`.
1399		let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1400		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1401
1402		if let Some(metrics) = self.metrics.as_ref() {
1403			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1404				for (lower_ilog2_bucket_bound, num_entries) in buckets {
1405					metrics
1406						.kbuckets_num_nodes
1407						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1408						.set(num_entries as u64);
1409				}
1410			}
1411			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1412				metrics.kademlia_records_count.set(num_entries as u64);
1413			}
1414			if let Some(num_entries) =
1415				self.network_service.behaviour_mut().kademlia_records_total_size()
1416			{
1417				metrics.kademlia_records_sizes_total.set(num_entries as u64);
1418			}
1419
1420			metrics.pending_connections.set(
1421				Swarm::network_info(&self.network_service).connection_counters().num_pending()
1422					as u64,
1423			);
1424		}
1425
1426		true
1427	}
1428
1429	/// Process the next message coming from the `NetworkService`.
1430	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1431		match msg {
1432			ServiceToWorkerMsg::FindClosestPeers(target) => {
1433				self.network_service.behaviour_mut().find_closest_peers(target)
1434			},
1435			ServiceToWorkerMsg::GetValue(key) => {
1436				self.network_service.behaviour_mut().get_value(key.into())
1437			},
1438			ServiceToWorkerMsg::PutValue(key, value) => {
1439				self.network_service.behaviour_mut().put_value(key.into(), value)
1440			},
1441			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1442				.network_service
1443				.behaviour_mut()
1444				.put_record_to(record.into(), peers, update_local_storage),
1445			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1446				.network_service
1447				.behaviour_mut()
1448				.store_record(key.into(), value, publisher, expires),
1449			ServiceToWorkerMsg::StartProviding(key) => {
1450				self.network_service.behaviour_mut().start_providing(key.into())
1451			},
1452			ServiceToWorkerMsg::StopProviding(key) => {
1453				self.network_service.behaviour_mut().stop_providing(&key.into())
1454			},
1455			ServiceToWorkerMsg::GetProviders(key) => {
1456				self.network_service.behaviour_mut().get_providers(key.into())
1457			},
1458			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1459				self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1460			},
1461			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1462			ServiceToWorkerMsg::Request {
1463				target,
1464				protocol,
1465				request,
1466				fallback_request,
1467				pending_response,
1468				connect,
1469			} => {
1470				self.network_service.behaviour_mut().send_request(
1471					&target,
1472					protocol,
1473					request,
1474					fallback_request,
1475					pending_response,
1476					connect,
1477				);
1478			},
1479			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1480				let _ = pending_response.send(Ok(self.status()));
1481			},
1482			ServiceToWorkerMsg::NetworkState { pending_response } => {
1483				let _ = pending_response.send(Ok(self.network_state()));
1484			},
1485			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1486				.network_service
1487				.behaviour_mut()
1488				.user_protocol_mut()
1489				.disconnect_peer(&who, protocol_name),
1490		}
1491	}
1492
1493	/// Process the next event coming from `Swarm`.
1494	fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1495		match event {
1496			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1497				if let Some(metrics) = self.metrics.as_ref() {
1498					match result {
1499						Ok(serve_time) => {
1500							metrics
1501								.requests_in_success_total
1502								.with_label_values(&[&protocol])
1503								.observe(serve_time.as_secs_f64());
1504						},
1505						Err(err) => {
1506							let reason = match err {
1507								ResponseFailure::Network(InboundFailure::Timeout) => {
1508									Some("timeout")
1509								},
1510								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1511								// `UnsupportedProtocols` is reported for every single
1512								// inbound request whenever a request with an unsupported
1513								// protocol is received. This is not reported in order to
1514								// avoid confusions.
1515								{
1516									None
1517								},
1518								ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1519									Some("busy-omitted")
1520								},
1521								ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1522									Some("connection-closed")
1523								},
1524								ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1525							};
1526
1527							if let Some(reason) = reason {
1528								metrics
1529									.requests_in_failure_total
1530									.with_label_values(&[&protocol, reason])
1531									.inc();
1532							}
1533						},
1534					}
1535				}
1536			},
1537			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1538				protocol,
1539				duration,
1540				result,
1541				..
1542			}) => {
1543				if let Some(metrics) = self.metrics.as_ref() {
1544					match result {
1545						Ok(_) => {
1546							metrics
1547								.requests_out_success_total
1548								.with_label_values(&[&protocol])
1549								.observe(duration.as_secs_f64());
1550						},
1551						Err(err) => {
1552							let reason = match err {
1553								RequestFailure::NotConnected => "not-connected",
1554								RequestFailure::UnknownProtocol => "unknown-protocol",
1555								RequestFailure::Refused => "refused",
1556								RequestFailure::Obsolete => "obsolete",
1557								RequestFailure::Network(OutboundFailure::DialFailure) => {
1558									"dial-failure"
1559								},
1560								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1561								RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1562									"connection-closed"
1563								},
1564								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1565									"unsupported"
1566								},
1567								RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1568							};
1569
1570							metrics
1571								.requests_out_failure_total
1572								.with_label_values(&[&protocol, reason])
1573								.inc();
1574						},
1575					}
1576				}
1577			},
1578			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1579				for change in changes {
1580					self.peer_store_handle.report_peer(peer.into(), change);
1581				}
1582			},
1583			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1584				peer_id,
1585				info:
1586					IdentifyInfo {
1587						protocol_version, agent_version, mut listen_addrs, protocols, ..
1588					},
1589			}) => {
1590				if listen_addrs.len() > 30 {
1591					debug!(
1592						target: LOG_TARGET,
1593						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1594						peer_id, protocol_version, agent_version
1595					);
1596					listen_addrs.truncate(30);
1597				}
1598				for addr in listen_addrs {
1599					self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1600						&peer_id,
1601						&protocols,
1602						addr.clone(),
1603					);
1604				}
1605				self.peer_store_handle.add_known_peer(peer_id.into());
1606			},
1607			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1608				self.peer_store_handle.add_known_peer(peer_id.into());
1609			},
1610			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1611				if let Some(metrics) = self.metrics.as_ref() {
1612					metrics.kademlia_random_queries_total.inc();
1613				}
1614			},
1615			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1616				remote,
1617				set_id,
1618				direction,
1619				negotiated_fallback,
1620				notifications_sink,
1621				received_handshake,
1622			}) => {
1623				let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1624					remote,
1625					direction,
1626					received_handshake,
1627					negotiated_fallback,
1628					notifications_sink,
1629				);
1630			},
1631			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1632				remote,
1633				set_id,
1634				notifications_sink,
1635			}) => {
1636				let _ = self.notif_protocol_handles[usize::from(set_id)]
1637					.report_notification_sink_replaced(remote, notifications_sink);
1638
1639				// TODO: Notifications might have been lost as a result of the previous
1640				// connection being dropped, and as a result it would be preferable to notify
1641				// the users of this fact by simulating the substream being closed then
1642				// reopened.
1643				// The code below doesn't compile because `role` is unknown. Propagating the
1644				// handshake of the secondary connections is quite an invasive change and
1645				// would conflict with https://github.com/paritytech/substrate/issues/6403.
1646				// Considering that dropping notifications is generally regarded as
1647				// acceptable, this bug is at the moment intentionally left there and is
1648				// intended to be fixed at the same time as
1649				// https://github.com/paritytech/substrate/issues/6403.
1650				// self.event_streams.send(Event::NotificationStreamClosed {
1651				// remote,
1652				// protocol,
1653				// });
1654				// self.event_streams.send(Event::NotificationStreamOpened {
1655				// remote,
1656				// protocol,
1657				// role,
1658				// });
1659			},
1660			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1661				let _ = self.notif_protocol_handles[usize::from(set_id)]
1662					.report_substream_closed(remote);
1663			},
1664			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1665				remote,
1666				set_id,
1667				notification,
1668			}) => {
1669				let _ = self.notif_protocol_handles[usize::from(set_id)]
1670					.report_notification_received(remote, notification);
1671			},
1672			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1673				match (self.metrics.as_ref(), duration) {
1674					(Some(metrics), Some(duration)) => {
1675						let query_type = match event {
1676							DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1677							DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1678							DhtEvent::ValueFound(_) => "value-found",
1679							DhtEvent::ValueNotFound(_) => "value-not-found",
1680							DhtEvent::ValuePut(_) => "value-put",
1681							DhtEvent::ValuePutFailed(_) => "value-put-failed",
1682							DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1683							DhtEvent::StartedProviding(_) => "started-providing",
1684							DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1685							DhtEvent::ProvidersFound(_, _) => "providers-found",
1686							DhtEvent::NoMoreProviders(_) => "no-more-providers",
1687							DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1688						};
1689						metrics
1690							.kademlia_query_duration
1691							.with_label_values(&[query_type])
1692							.observe(duration.as_secs_f64());
1693					},
1694					_ => {},
1695				}
1696
1697				self.event_streams.send(Event::Dht(event));
1698			},
1699			SwarmEvent::Behaviour(BehaviourOut::None) => {
1700				// Ignored event from lower layers.
1701			},
1702			SwarmEvent::ConnectionEstablished {
1703				peer_id,
1704				endpoint,
1705				num_established,
1706				concurrent_dial_errors,
1707				..
1708			} => {
1709				if let Some(errors) = concurrent_dial_errors {
1710					debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1711				} else {
1712					debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1713				}
1714
1715				if let Some(metrics) = self.metrics.as_ref() {
1716					let direction = match endpoint {
1717						ConnectedPoint::Dialer { .. } => "out",
1718						ConnectedPoint::Listener { .. } => "in",
1719					};
1720					metrics.connections_opened_total.with_label_values(&[direction]).inc();
1721
1722					if num_established.get() == 1 {
1723						metrics.distinct_peers_connections_opened_total.inc();
1724					}
1725				}
1726			},
1727			SwarmEvent::ConnectionClosed {
1728				connection_id,
1729				peer_id,
1730				cause,
1731				endpoint,
1732				num_established,
1733			} => {
1734				debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1735				if let Some(metrics) = self.metrics.as_ref() {
1736					let direction = match endpoint {
1737						ConnectedPoint::Dialer { .. } => "out",
1738						ConnectedPoint::Listener { .. } => "in",
1739					};
1740					let reason = match cause {
1741						Some(ConnectionError::IO(_)) => "transport-error",
1742						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1743						None => "actively-closed",
1744					};
1745					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1746
1747					// `num_established` represents the number of *remaining* connections.
1748					if num_established == 0 {
1749						metrics.distinct_peers_connections_closed_total.inc();
1750					}
1751				}
1752			},
1753			SwarmEvent::NewListenAddr { address, .. } => {
1754				trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1755				if let Some(metrics) = self.metrics.as_ref() {
1756					metrics.listeners_local_addresses.inc();
1757				}
1758				self.listen_addresses.lock().insert(address.clone());
1759			},
1760			SwarmEvent::ExpiredListenAddr { address, .. } => {
1761				info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1762				if let Some(metrics) = self.metrics.as_ref() {
1763					metrics.listeners_local_addresses.dec();
1764				}
1765				self.listen_addresses.lock().remove(&address);
1766			},
1767			SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1768				if let Some(peer_id) = peer_id {
1769					trace!(
1770						target: LOG_TARGET,
1771						"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1772					);
1773
1774					let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1775
1776					if let Some(addresses) =
1777						not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1778					{
1779						if let DialError::WrongPeerId { obtained, endpoint } = &error {
1780							if let ConnectedPoint::Dialer {
1781								address,
1782								role_override: _,
1783								port_use: _,
1784							} = endpoint
1785							{
1786								let address_without_peer_id = parse_addr(address.clone().into())
1787									.map_or_else(|_| address.clone(), |r| r.1.into());
1788
1789								// Only report for address of boot node that was added at startup of
1790								// the node and not for any address that the node learned of the
1791								// boot node.
1792								if addresses.iter().any(|a| address_without_peer_id == *a) {
1793									warn!(
1794										"💔 The bootnode you want to connect to at `{address}` provided a \
1795										 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1796									);
1797
1798									self.reported_invalid_boot_nodes.insert(peer_id);
1799								}
1800							}
1801						}
1802					}
1803				}
1804
1805				if let Some(metrics) = self.metrics.as_ref() {
1806					let reason = match error {
1807						DialError::Denied { cause } => {
1808							if cause.downcast::<Exceeded>().is_ok() {
1809								Some("limit-reached")
1810							} else {
1811								None
1812							}
1813						},
1814						DialError::LocalPeerId { .. } => Some("local-peer-id"),
1815						DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1816						DialError::Transport(_) => Some("transport-error"),
1817						DialError::NoAddresses
1818						| DialError::DialPeerConditionFalse(_)
1819						| DialError::Aborted => None, // ignore them
1820					};
1821					if let Some(reason) = reason {
1822						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1823					}
1824				}
1825			},
1826			SwarmEvent::Dialing { connection_id, peer_id } => {
1827				trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1828			},
1829			SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1830				trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1831				if let Some(metrics) = self.metrics.as_ref() {
1832					metrics.incoming_connections_total.inc();
1833				}
1834			},
1835			SwarmEvent::IncomingConnectionError {
1836				connection_id,
1837				local_addr,
1838				send_back_addr,
1839				error,
1840			} => {
1841				debug!(
1842					target: LOG_TARGET,
1843					"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1844				);
1845				if let Some(metrics) = self.metrics.as_ref() {
1846					let reason = match error {
1847						ListenError::Denied { cause } => {
1848							if cause.downcast::<Exceeded>().is_ok() {
1849								Some("limit-reached")
1850							} else {
1851								None
1852							}
1853						},
1854						ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1855							Some("invalid-peer-id")
1856						},
1857						ListenError::Transport(_) => Some("transport-error"),
1858						ListenError::Aborted => None, // ignore it
1859					};
1860
1861					if let Some(reason) = reason {
1862						metrics
1863							.incoming_connections_errors_total
1864							.with_label_values(&[reason])
1865							.inc();
1866					}
1867				}
1868			},
1869			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1870				if let Some(metrics) = self.metrics.as_ref() {
1871					metrics.listeners_local_addresses.sub(addresses.len() as u64);
1872				}
1873				let mut listen_addresses = self.listen_addresses.lock();
1874				for addr in &addresses {
1875					listen_addresses.remove(addr);
1876				}
1877				drop(listen_addresses);
1878
1879				let addrs =
1880					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1881				match reason {
1882					Ok(()) => error!(
1883						target: LOG_TARGET,
1884						"📪 Libp2p listener ({}) closed gracefully",
1885						addrs
1886					),
1887					Err(e) => error!(
1888						target: LOG_TARGET,
1889						"📪 Libp2p listener ({}) closed: {}",
1890						addrs, e
1891					),
1892				}
1893			},
1894			SwarmEvent::ListenerError { error, .. } => {
1895				debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1896				if let Some(metrics) = self.metrics.as_ref() {
1897					metrics.listeners_errors_total.inc();
1898				}
1899			},
1900			SwarmEvent::NewExternalAddrCandidate { address } => {
1901				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1902			},
1903			SwarmEvent::ExternalAddrConfirmed { address } => {
1904				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1905			},
1906			SwarmEvent::ExternalAddrExpired { address } => {
1907				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1908			},
1909			SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1910				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1911			},
1912			event => {
1913				warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1914			},
1915		}
1916	}
1917}
1918
1919impl<B, H> Unpin for NetworkWorker<B, H>
1920where
1921	B: BlockT + 'static,
1922	H: ExHashT,
1923{
1924}
1925
1926pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1927	addresses: impl Iterator<Item = &'a crate::types::multiaddr::Multiaddr>,
1928	transport: &TransportConfig,
1929) -> Result<(), Error> {
1930	use crate::types::multiaddr::Protocol;
1931
1932	if matches!(transport, TransportConfig::MemoryOnly) {
1933		let addresses: Vec<_> = addresses
1934			.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1935			.cloned()
1936			.collect();
1937
1938		if !addresses.is_empty() {
1939			return Err(Error::AddressesForAnotherTransport {
1940				transport: transport.clone(),
1941				addresses,
1942			});
1943		}
1944	} else {
1945		let addresses: Vec<_> = addresses
1946			.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1947			.cloned()
1948			.collect();
1949
1950		if !addresses.is_empty() {
1951			return Err(Error::AddressesForAnotherTransport {
1952				transport: transport.clone(),
1953				addresses,
1954			});
1955		}
1956	}
1957
1958	Ok(())
1959}