pezsc_network/
service.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Main entry point of the sc-network crate.
20//!
21//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
22//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
23//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
24//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
25//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
26//!
27//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
28//! which is then processed by [`NetworkWorker::next_action`].
29
30use crate::{
31	behaviour::{self, Behaviour, BehaviourOut},
32	bitswap::BitswapRequestHandler,
33	config::{
34		parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35		NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36	},
37	discovery::DiscoveryConfig,
38	error::Error,
39	event::{DhtEvent, Event},
40	network_state::{
41		NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42	},
43	peer_store::{PeerStore, PeerStoreProvider},
44	protocol::{self, Protocol, Ready},
45	protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46	request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47	service::{
48		signature::{Signature, SigningError},
49		traits::{
50			BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51			NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52			NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53			NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54		},
55	},
56	transport,
57	types::ProtocolName,
58	NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use futures::{channel::oneshot, prelude::*};
63use libp2p::{
64	connection_limits::{ConnectionLimits, Exceeded},
65	core::{upgrade, ConnectedPoint, Endpoint},
66	identify::Info as IdentifyInfo,
67	identity::ed25519,
68	multiaddr::{self, Multiaddr},
69	swarm::{
70		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
71		NetworkBehaviour, Swarm, SwarmEvent,
72	},
73	PeerId,
74};
75use log::{debug, error, info, trace, warn};
76use metrics::{Histogram, MetricSources, Metrics};
77use parking_lot::Mutex;
78use pezsc_network_types::kad::{Key as KademliaKey, Record};
79use prometheus_endpoint::Registry;
80
81use pezsc_client_api::BlockBackend;
82use pezsc_network_common::{
83	role::{ObservedRole, Roles},
84	ExHashT,
85};
86use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
87use pezsp_runtime::traits::Block as BlockT;
88
89pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
90pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
91pub use metrics::NotificationMetrics;
92pub use protocol::NotificationsSink;
93use std::{
94	collections::{HashMap, HashSet},
95	fs, iter,
96	marker::PhantomData,
97	num::NonZeroUsize,
98	pin::Pin,
99	str,
100	sync::{
101		atomic::{AtomicUsize, Ordering},
102		Arc,
103	},
104	time::{Duration, Instant},
105};
106
107pub(crate) mod metrics;
108pub(crate) mod out_events;
109
110pub mod signature;
111pub mod traits;
112
113/// Logging target for the file.
114const LOG_TARGET: &str = "sub-libp2p";
115
116/// Stub bandwidth sink that returns 0 for all metrics.
117/// Bandwidth logging was removed in libp2p 0.56.0.
118/// TODO: Implement custom bandwidth tracking if needed.
119struct NoBandwidthSink;
120
121impl BandwidthSink for NoBandwidthSink {
122	fn total_inbound(&self) -> u64 {
123		0
124	}
125
126	fn total_outbound(&self) -> u64 {
127		0
128	}
129}
130
131/// Bizinikiwi network service. Handles network IO and manages connectivity.
132pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133	/// Number of peers we're connected to.
134	num_connected: Arc<AtomicUsize>,
135	/// The local external addresses.
136	external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
138	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139	/// Local copy of the `PeerId` of the local node.
140	local_peer_id: PeerId,
141	/// The `KeyPair` that defines the `PeerId` of the local node.
142	local_identity: Keypair,
143	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
144	bandwidth: Arc<dyn BandwidthSink>,
145	/// Channel that sends messages to the actual worker.
146	to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147	/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
148	/// initialization.
149	notification_protocol_ids: HashMap<ProtocolName, SetId>,
150	/// Handles to manage peer connections on notification protocols. The vector never changes
151	/// after initialization.
152	protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153	/// Shortcut to sync protocol handle (`protocol_handles[0]`).
154	sync_protocol_handle: protocol_controller::ProtocolHandle,
155	/// Handle to `PeerStore`.
156	peer_store_handle: Arc<dyn PeerStoreProvider>,
157	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
158	/// compatibility.
159	_marker: PhantomData<H>,
160	/// Marker for block type
161	_block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167	B: BlockT + 'static,
168	H: ExHashT,
169{
170	type NotificationProtocolConfig = NonDefaultSetConfig;
171	type RequestResponseProtocolConfig = RequestResponseConfig;
172	type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173	type PeerStore = PeerStore;
174	type BitswapConfig = RequestResponseConfig;
175
176	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177	where
178		Self: Sized,
179	{
180		NetworkWorker::new(params)
181	}
182
183	/// Get handle to `NetworkService` of the `NetworkBackend`.
184	fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185		self.service.clone()
186	}
187
188	/// Create `PeerStore`.
189	fn peer_store(
190		bootnodes: Vec<pezsc_network_types::PeerId>,
191		metrics_registry: Option<Registry>,
192	) -> Self::PeerStore {
193		PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194	}
195
196	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197		NotificationMetrics::new(registry)
198	}
199
200	fn bitswap_server(
201		client: Arc<dyn BlockBackend<B> + Send + Sync>,
202	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
203		let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
204
205		(Box::pin(async move { handler.run().await }), protocol_config)
206	}
207
208	/// Create notification protocol configuration.
209	fn notification_config(
210		protocol_name: ProtocolName,
211		fallback_names: Vec<ProtocolName>,
212		max_notification_size: u64,
213		handshake: Option<NotificationHandshake>,
214		set_config: SetConfig,
215		_metrics: NotificationMetrics,
216		_peerstore_handle: Arc<dyn PeerStoreProvider>,
217	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
218		NonDefaultSetConfig::new(
219			protocol_name,
220			fallback_names,
221			max_notification_size,
222			handshake,
223			set_config,
224		)
225	}
226
227	/// Create request-response protocol configuration.
228	fn request_response_config(
229		protocol_name: ProtocolName,
230		fallback_names: Vec<ProtocolName>,
231		max_request_size: u64,
232		max_response_size: u64,
233		request_timeout: Duration,
234		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
235	) -> Self::RequestResponseProtocolConfig {
236		Self::RequestResponseProtocolConfig {
237			name: protocol_name,
238			fallback_names,
239			max_request_size,
240			max_response_size,
241			request_timeout,
242			inbound_queue,
243		}
244	}
245
246	/// Start [`NetworkBackend`] event loop.
247	async fn run(mut self) {
248		self.run().await
249	}
250}
251
252impl<B, H> NetworkWorker<B, H>
253where
254	B: BlockT + 'static,
255	H: ExHashT,
256{
257	/// Creates the network service.
258	///
259	/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
260	/// for the network processing to advance. From it, you can extract a `NetworkService` using
261	/// `worker.service()`. The `NetworkService` can be shared through the codebase.
262	pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
263		let peer_store_handle = params.network_config.peer_store_handle();
264		let FullNetworkConfiguration {
265			notification_protocols,
266			request_response_protocols,
267			mut network_config,
268			..
269		} = params.network_config;
270
271		// Private and public keys configuration.
272		let local_identity = network_config.node_key.clone().into_keypair()?;
273		let local_public = local_identity.public();
274		let local_peer_id = local_public.to_peer_id();
275
276		// Convert to libp2p types.
277		let local_identity: ed25519::Keypair = local_identity.into();
278		let local_public: ed25519::PublicKey = local_public.into();
279		let local_peer_id: PeerId = local_peer_id.into();
280
281		network_config.boot_nodes = network_config
282			.boot_nodes
283			.into_iter()
284			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
285			.collect();
286		network_config.default_peers_set.reserved_nodes = network_config
287			.default_peers_set
288			.reserved_nodes
289			.into_iter()
290			.filter(|reserved_node| {
291				if reserved_node.peer_id == local_peer_id.into() {
292					warn!(
293						target: LOG_TARGET,
294						"Local peer ID used in reserved node, ignoring: {}",
295						reserved_node,
296					);
297					false
298				} else {
299					true
300				}
301			})
302			.collect();
303
304		// Ensure the listen addresses are consistent with the transport.
305		ensure_addresses_consistent_with_transport(
306			network_config.listen_addresses.iter(),
307			&network_config.transport,
308		)?;
309		ensure_addresses_consistent_with_transport(
310			network_config.boot_nodes.iter().map(|x| &x.multiaddr),
311			&network_config.transport,
312		)?;
313		ensure_addresses_consistent_with_transport(
314			network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
315			&network_config.transport,
316		)?;
317		for notification_protocol in &notification_protocols {
318			ensure_addresses_consistent_with_transport(
319				notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
320				&network_config.transport,
321			)?;
322		}
323		ensure_addresses_consistent_with_transport(
324			network_config.public_addresses.iter(),
325			&network_config.transport,
326		)?;
327
328		let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
329
330		if let Some(path) = &network_config.net_config_path {
331			fs::create_dir_all(path)?;
332		}
333
334		info!(
335			target: LOG_TARGET,
336			"🏷  Local node identity is: {}",
337			local_peer_id.to_base58(),
338		);
339		info!(target: LOG_TARGET, "Running libp2p network backend");
340
341		let transport = {
342			let config_mem = match network_config.transport {
343				TransportConfig::MemoryOnly => true,
344				TransportConfig::Normal { .. } => false,
345			};
346
347			transport::build_transport(local_identity.clone().into(), config_mem)
348		};
349
350		let (to_notifications, from_protocol_controllers) =
351			tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
352
353		// We must prepend a hardcoded default peer set to notification protocols.
354		let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
355			.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
356
357		let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
358			.enumerate()
359			.map(|(set_id, set_config)| {
360				let proto_set_config = ProtoSetConfig {
361					in_peers: set_config.in_peers,
362					out_peers: set_config.out_peers,
363					reserved_nodes: set_config
364						.reserved_nodes
365						.iter()
366						.map(|node| node.peer_id.into())
367						.collect(),
368					reserved_only: set_config.non_reserved_mode.is_reserved_only(),
369				};
370
371				ProtocolController::new(
372					SetId::from(set_id),
373					proto_set_config,
374					to_notifications.clone(),
375					Arc::clone(&peer_store_handle),
376				)
377			})
378			.unzip();
379
380		// Shortcut to default (sync) peer set protocol handle.
381		let sync_protocol_handle = protocol_handles[0].clone();
382
383		// Spawn `ProtocolController` runners.
384		protocol_controllers
385			.into_iter()
386			.for_each(|controller| (params.executor)(controller.run().boxed()));
387
388		// Protocol name to protocol id mapping. The first protocol is always block announce (sync)
389		// protocol, aka default (hardcoded) peer set.
390		let notification_protocol_ids: HashMap<ProtocolName, SetId> =
391			iter::once(&params.block_announce_config)
392				.chain(notification_protocols.iter())
393				.enumerate()
394				.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
395				.collect();
396
397		let known_addresses = {
398			// Collect all reserved nodes and bootnodes addresses.
399			let mut addresses: Vec<_> = network_config
400				.default_peers_set
401				.reserved_nodes
402				.iter()
403				.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
404				.chain(notification_protocols.iter().flat_map(|protocol| {
405					protocol
406						.set_config()
407						.reserved_nodes
408						.iter()
409						.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
410				}))
411				.chain(
412					network_config
413						.boot_nodes
414						.iter()
415						.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
416				)
417				.collect();
418
419			// Remove possible duplicates.
420			addresses.sort();
421			addresses.dedup();
422
423			addresses
424		};
425
426		// Check for duplicate bootnodes.
427		network_config.boot_nodes.iter().try_for_each(|bootnode| {
428			if let Some(other) = network_config
429				.boot_nodes
430				.iter()
431				.filter(|o| o.multiaddr == bootnode.multiaddr)
432				.find(|o| o.peer_id != bootnode.peer_id)
433			{
434				Err(Error::DuplicateBootnode {
435					address: bootnode.multiaddr.clone().into(),
436					first_id: bootnode.peer_id.into(),
437					second_id: other.peer_id.into(),
438				})
439			} else {
440				Ok(())
441			}
442		})?;
443
444		// List of bootnode multiaddresses.
445		let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
446
447		for bootnode in network_config.boot_nodes.iter() {
448			boot_node_ids
449				.entry(bootnode.peer_id.into())
450				.or_default()
451				.push(bootnode.multiaddr.clone().into());
452		}
453
454		let boot_node_ids = Arc::new(boot_node_ids);
455
456		let num_connected = Arc::new(AtomicUsize::new(0));
457		let external_addresses = Arc::new(Mutex::new(HashSet::new()));
458
459		let (protocol, notif_protocol_handles) = Protocol::new(
460			From::from(&params.role),
461			params.notification_metrics,
462			notification_protocols,
463			params.block_announce_config,
464			Arc::clone(&peer_store_handle),
465			protocol_handles.clone(),
466			from_protocol_controllers,
467		)?;
468
469		// Build the swarm.
470		let mut swarm = {
471			let user_agent =
472				format!("{} ({})", network_config.client_version, network_config.node_name);
473
474			let discovery_config = {
475				let mut config = DiscoveryConfig::new(local_peer_id);
476				config.with_permanent_addresses(
477					known_addresses
478						.iter()
479						.map(|(peer, address)| (peer.into(), address.clone().into()))
480						.collect::<Vec<_>>(),
481				);
482				config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
483				config.with_kademlia(
484					params.genesis_hash,
485					params.fork_id.as_deref(),
486					&params.protocol_id,
487				);
488				config.with_dht_random_walk(network_config.enable_dht_random_walk);
489				config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
490				config.use_kademlia_disjoint_query_paths(
491					network_config.kademlia_disjoint_query_paths,
492				);
493				config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
494
495				match network_config.transport {
496					TransportConfig::MemoryOnly => {
497						config.with_mdns(false);
498						config.allow_private_ip(false);
499					},
500					TransportConfig::Normal {
501						enable_mdns,
502						allow_private_ip: allow_private_ipv4,
503						..
504					} => {
505						config.with_mdns(enable_mdns);
506						config.allow_private_ip(allow_private_ipv4);
507					},
508				}
509
510				config
511			};
512
513			let behaviour = {
514				let result = Behaviour::new(
515					protocol,
516					user_agent,
517					local_public.into(),
518					discovery_config,
519					request_response_protocols,
520					Arc::clone(&peer_store_handle),
521					external_addresses.clone(),
522					network_config.public_addresses.iter().cloned().map(Into::into).collect(),
523					ConnectionLimits::default()
524						.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
525						.with_max_established_incoming(Some(
526							crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
527						)),
528				);
529
530				match result {
531					Ok(b) => b,
532					Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
533						return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
534					},
535				}
536			};
537
538			let swarm = {
539				struct SpawnImpl<F>(F);
540				impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
541					fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
542						(self.0)(f)
543					}
544				}
545
546				let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
547					.with_substream_upgrade_protocol_override(upgrade::Version::V1)
548					.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
549					// NOTE: 24 is somewhat arbitrary and should be tuned in the future if
550					// necessary. See <https://github.com/pezkuwichain/pezkuwi-sdk/issues/221>
551					.with_per_connection_event_buffer_size(24)
552					.with_max_negotiating_inbound_streams(2048)
553					.with_idle_connection_timeout(network_config.idle_connection_timeout);
554
555				Swarm::new(transport, behaviour, local_peer_id, config)
556			};
557			swarm
558		};
559
560		// Stub bandwidth sink (bandwidth logging removed in libp2p 0.56.0)
561		let bandwidth: Arc<dyn BandwidthSink> = Arc::new(NoBandwidthSink);
562
563		// Initialize the metrics.
564		let metrics = match &params.metrics_registry {
565			Some(registry) => Some(metrics::register(
566				registry,
567				MetricSources {
568					bandwidth: bandwidth.clone(),
569					connected_peers: num_connected.clone(),
570				},
571			)?),
572			None => None,
573		};
574
575		// Listen on multiaddresses.
576		for addr in &network_config.listen_addresses {
577			if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
578				warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
579			}
580		}
581
582		// Add external addresses.
583		for addr in &network_config.public_addresses {
584			Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
585		}
586
587		let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
588
589		let service = Arc::new(NetworkService {
590			bandwidth,
591			external_addresses,
592			listen_addresses: listen_addresses_set.clone(),
593			num_connected: num_connected.clone(),
594			local_peer_id,
595			local_identity: local_identity.into(),
596			to_worker,
597			notification_protocol_ids,
598			protocol_handles,
599			sync_protocol_handle,
600			peer_store_handle: Arc::clone(&peer_store_handle),
601			_marker: PhantomData,
602			_block: Default::default(),
603		});
604
605		Ok(NetworkWorker {
606			listen_addresses: listen_addresses_set,
607			num_connected,
608			network_service: swarm,
609			service,
610			from_service,
611			event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
612			metrics,
613			boot_node_ids,
614			reported_invalid_boot_nodes: Default::default(),
615			peer_store_handle: Arc::clone(&peer_store_handle),
616			notif_protocol_handles,
617			_marker: Default::default(),
618			_block: Default::default(),
619		})
620	}
621
622	/// High-level network status information.
623	pub fn status(&self) -> NetworkStatus {
624		NetworkStatus {
625			num_connected_peers: self.num_connected_peers(),
626			total_bytes_inbound: self.total_bytes_inbound(),
627			total_bytes_outbound: self.total_bytes_outbound(),
628		}
629	}
630
631	/// Returns the total number of bytes received so far.
632	pub fn total_bytes_inbound(&self) -> u64 {
633		self.service.bandwidth.total_inbound()
634	}
635
636	/// Returns the total number of bytes sent so far.
637	pub fn total_bytes_outbound(&self) -> u64 {
638		self.service.bandwidth.total_outbound()
639	}
640
641	/// Returns the number of peers we're connected to.
642	pub fn num_connected_peers(&self) -> usize {
643		self.network_service.behaviour().user_protocol().num_sync_peers()
644	}
645
646	/// Adds an address for a node.
647	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
648		self.network_service.behaviour_mut().add_known_address(peer_id, addr);
649	}
650
651	/// Return a `NetworkService` that can be shared through the code base and can be used to
652	/// manipulate the worker.
653	pub fn service(&self) -> &Arc<NetworkService<B, H>> {
654		&self.service
655	}
656
657	/// Returns the local `PeerId`.
658	pub fn local_peer_id(&self) -> &PeerId {
659		Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
660	}
661
662	/// Returns the list of addresses we are listening on.
663	///
664	/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
665	pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
666		Swarm::<Behaviour<B>>::listeners(&self.network_service)
667	}
668
669	/// Get network state.
670	///
671	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
672	/// everywhere about this. Please don't use this function to retrieve actual information.
673	pub fn network_state(&mut self) -> NetworkState {
674		let swarm = &mut self.network_service;
675		let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
676		let connected_peers = {
677			let swarm = &mut *swarm;
678			open.iter()
679				.filter_map(move |peer_id| {
680					let known_addresses = if let Ok(addrs) =
681						NetworkBehaviour::handle_pending_outbound_connection(
682							swarm.behaviour_mut(),
683							ConnectionId::new_unchecked(0), // dummy value
684							Some(*peer_id),
685							&vec![],
686							Endpoint::Listener,
687						) {
688						addrs.into_iter().collect()
689					} else {
690						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
691						return None;
692					};
693
694					let endpoint = if let Some(e) =
695						swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
696					{
697						e.clone().into()
698					} else {
699						error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
700						and debug information about {:?}", peer_id);
701						return None;
702					};
703
704					Some((
705						peer_id.to_base58(),
706						NetworkStatePeer {
707							endpoint,
708							version_string: swarm
709								.behaviour_mut()
710								.node(peer_id)
711								.and_then(|i| i.client_version().map(|s| s.to_owned())),
712							latest_ping_time: swarm
713								.behaviour_mut()
714								.node(peer_id)
715								.and_then(|i| i.latest_ping()),
716							known_addresses,
717						},
718					))
719				})
720				.collect()
721		};
722
723		let not_connected_peers = {
724			let swarm = &mut *swarm;
725			swarm
726				.behaviour_mut()
727				.known_peers()
728				.into_iter()
729				.filter(|p| open.iter().all(|n| n != p))
730				.map(move |peer_id| {
731					let known_addresses = if let Ok(addrs) =
732						NetworkBehaviour::handle_pending_outbound_connection(
733							swarm.behaviour_mut(),
734							ConnectionId::new_unchecked(0), // dummy value
735							Some(peer_id),
736							&vec![],
737							Endpoint::Listener,
738						) {
739						addrs.into_iter().collect()
740					} else {
741						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
742						Default::default()
743					};
744
745					(
746						peer_id.to_base58(),
747						NetworkStateNotConnectedPeer {
748							version_string: swarm
749								.behaviour_mut()
750								.node(&peer_id)
751								.and_then(|i| i.client_version().map(|s| s.to_owned())),
752							latest_ping_time: swarm
753								.behaviour_mut()
754								.node(&peer_id)
755								.and_then(|i| i.latest_ping()),
756							known_addresses,
757						},
758					)
759				})
760				.collect()
761		};
762
763		let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
764		let listened_addresses = swarm.listeners().cloned().collect();
765		let external_addresses = swarm.external_addresses().cloned().collect();
766
767		NetworkState {
768			peer_id,
769			listened_addresses,
770			external_addresses,
771			connected_peers,
772			not_connected_peers,
773			// TODO: Check what info we can include here.
774			//       Issue reference: https://github.com/pezkuwichain/pezkuwi-sdk/issues/328.
775			peerset: serde_json::json!(
776				"Unimplemented. See https://github.com/pezkuwichain/pezkuwi-sdk/issues/328."
777			),
778		}
779	}
780
781	/// Removes a `PeerId` from the list of reserved peers.
782	pub fn remove_reserved_peer(&self, peer: PeerId) {
783		self.service.remove_reserved_peer(peer.into());
784	}
785
786	/// Adds a `PeerId` and its `Multiaddr` as reserved.
787	pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
788		self.service.add_reserved_peer(peer)
789	}
790}
791
792impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
793	/// Get network state.
794	///
795	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
796	/// everywhere about this. Please don't use this function to retrieve actual information.
797	///
798	/// Returns an error if the `NetworkWorker` is no longer running.
799	pub async fn network_state(&self) -> Result<NetworkState, ()> {
800		let (tx, rx) = oneshot::channel();
801
802		let _ = self
803			.to_worker
804			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
805
806		match rx.await {
807			Ok(v) => v.map_err(|_| ()),
808			// The channel can only be closed if the network worker no longer exists.
809			Err(_) => Err(()),
810		}
811	}
812
813	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
814	///
815	/// Returns an `Err` if one of the given addresses is invalid or contains an
816	/// invalid peer ID (which includes the local peer ID).
817	fn split_multiaddr_and_peer_id(
818		&self,
819		peers: HashSet<Multiaddr>,
820	) -> Result<Vec<(PeerId, Multiaddr)>, String> {
821		peers
822			.into_iter()
823			.map(|mut addr| {
824				let peer = match addr.pop() {
825					Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
826					_ => return Err("Missing PeerId from address".to_string()),
827				};
828
829				// Make sure the local peer ID is never added to the PSM
830				// or added as a "known address", even if given.
831				if peer == self.local_peer_id {
832					Err("Local peer ID in peer set.".to_string())
833				} else {
834					Ok((peer, addr))
835				}
836			})
837			.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
838	}
839}
840
841impl<B, H> NetworkStateInfo for NetworkService<B, H>
842where
843	B: pezsp_runtime::traits::Block,
844	H: ExHashT,
845{
846	/// Returns the local external addresses.
847	fn external_addresses(&self) -> Vec<pezsc_network_types::multiaddr::Multiaddr> {
848		self.external_addresses.lock().iter().cloned().map(Into::into).collect()
849	}
850
851	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
852	fn listen_addresses(&self) -> Vec<pezsc_network_types::multiaddr::Multiaddr> {
853		self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
854	}
855
856	/// Returns the local Peer ID.
857	fn local_peer_id(&self) -> pezsc_network_types::PeerId {
858		self.local_peer_id.into()
859	}
860}
861
862impl<B, H> NetworkSigner for NetworkService<B, H>
863where
864	B: pezsp_runtime::traits::Block,
865	H: ExHashT,
866{
867	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
868		let public_key = self.local_identity.public();
869		let bytes = self.local_identity.sign(msg.as_ref())?;
870
871		Ok(Signature {
872			public_key: crate::service::signature::PublicKey::Libp2p(public_key),
873			bytes,
874		})
875	}
876
877	fn verify(
878		&self,
879		peer_id: pezsc_network_types::PeerId,
880		public_key: &Vec<u8>,
881		signature: &Vec<u8>,
882		message: &Vec<u8>,
883	) -> Result<bool, String> {
884		let public_key =
885			PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
886		let peer_id: PeerId = peer_id.into();
887		let remote: libp2p::PeerId = public_key.to_peer_id();
888
889		Ok(peer_id == remote && public_key.verify(message, signature))
890	}
891}
892
893impl<B, H> NetworkDHTProvider for NetworkService<B, H>
894where
895	B: BlockT + 'static,
896	H: ExHashT,
897{
898	/// Start finding closest peerst to the target peer ID in the DHT.
899	///
900	/// This will generate either a `ClosestPeersFound` or a `ClosestPeersNotFound` event and pass
901	/// it as an item on the [`NetworkWorker`] stream.
902	fn find_closest_peers(&self, target: pezsc_network_types::PeerId) {
903		let _ = self
904			.to_worker
905			.unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
906	}
907
908	/// Start getting a value from the DHT.
909	///
910	/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an
911	/// item on the [`NetworkWorker`] stream.
912	fn get_value(&self, key: &KademliaKey) {
913		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
914	}
915
916	/// Start putting a value in the DHT.
917	///
918	/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an
919	/// item on the [`NetworkWorker`] stream.
920	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
921		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
922	}
923
924	fn put_record_to(
925		&self,
926		record: Record,
927		peers: HashSet<pezsc_network_types::PeerId>,
928		update_local_storage: bool,
929	) {
930		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
931			record,
932			peers,
933			update_local_storage,
934		});
935	}
936
937	fn store_record(
938		&self,
939		key: KademliaKey,
940		value: Vec<u8>,
941		publisher: Option<pezsc_network_types::PeerId>,
942		expires: Option<Instant>,
943	) {
944		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
945			key,
946			value,
947			publisher.map(Into::into),
948			expires,
949		));
950	}
951
952	fn start_providing(&self, key: KademliaKey) {
953		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
954	}
955
956	fn stop_providing(&self, key: KademliaKey) {
957		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
958	}
959
960	fn get_providers(&self, key: KademliaKey) {
961		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
962	}
963}
964
965#[async_trait::async_trait]
966impl<B, H> NetworkStatusProvider for NetworkService<B, H>
967where
968	B: BlockT + 'static,
969	H: ExHashT,
970{
971	async fn status(&self) -> Result<NetworkStatus, ()> {
972		let (tx, rx) = oneshot::channel();
973
974		let _ = self
975			.to_worker
976			.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
977
978		match rx.await {
979			Ok(v) => v.map_err(|_| ()),
980			// The channel can only be closed if the network worker no longer exists.
981			Err(_) => Err(()),
982		}
983	}
984
985	async fn network_state(&self) -> Result<NetworkState, ()> {
986		let (tx, rx) = oneshot::channel();
987
988		let _ = self
989			.to_worker
990			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
991
992		match rx.await {
993			Ok(v) => v.map_err(|_| ()),
994			// The channel can only be closed if the network worker no longer exists.
995			Err(_) => Err(()),
996		}
997	}
998}
999
1000#[async_trait::async_trait]
1001impl<B, H> NetworkPeers for NetworkService<B, H>
1002where
1003	B: BlockT + 'static,
1004	H: ExHashT,
1005{
1006	fn set_authorized_peers(&self, peers: HashSet<pezsc_network_types::PeerId>) {
1007		self.sync_protocol_handle
1008			.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1009	}
1010
1011	fn set_authorized_only(&self, reserved_only: bool) {
1012		self.sync_protocol_handle.set_reserved_only(reserved_only);
1013	}
1014
1015	fn add_known_address(
1016		&self,
1017		peer_id: pezsc_network_types::PeerId,
1018		addr: pezsc_network_types::multiaddr::Multiaddr,
1019	) {
1020		let _ = self
1021			.to_worker
1022			.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1023	}
1024
1025	fn report_peer(&self, peer_id: pezsc_network_types::PeerId, cost_benefit: ReputationChange) {
1026		self.peer_store_handle.report_peer(peer_id, cost_benefit);
1027	}
1028
1029	fn peer_reputation(&self, peer_id: &pezsc_network_types::PeerId) -> i32 {
1030		self.peer_store_handle.peer_reputation(peer_id)
1031	}
1032
1033	fn disconnect_peer(&self, peer_id: pezsc_network_types::PeerId, protocol: ProtocolName) {
1034		let _ = self
1035			.to_worker
1036			.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1037	}
1038
1039	fn accept_unreserved_peers(&self) {
1040		self.sync_protocol_handle.set_reserved_only(false);
1041	}
1042
1043	fn deny_unreserved_peers(&self) {
1044		self.sync_protocol_handle.set_reserved_only(true);
1045	}
1046
1047	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1048		// Make sure the local peer ID is never added as a reserved peer.
1049		if peer.peer_id == self.local_peer_id.into() {
1050			return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1051		}
1052
1053		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1054			peer.peer_id.into(),
1055			peer.multiaddr.into(),
1056		));
1057		self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1058
1059		Ok(())
1060	}
1061
1062	fn remove_reserved_peer(&self, peer_id: pezsc_network_types::PeerId) {
1063		self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1064	}
1065
1066	fn set_reserved_peers(
1067		&self,
1068		protocol: ProtocolName,
1069		peers: HashSet<pezsc_network_types::multiaddr::Multiaddr>,
1070	) -> Result<(), String> {
1071		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1072			return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1073		};
1074
1075		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1076		let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1077
1078		let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1079
1080		for (peer_id, addr) in peers_addrs.into_iter() {
1081			// Make sure the local peer ID is never added to the PSM.
1082			if peer_id == self.local_peer_id {
1083				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1084			}
1085
1086			peers.insert(peer_id.into());
1087
1088			if !addr.is_empty() {
1089				let _ = self
1090					.to_worker
1091					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1092			}
1093		}
1094
1095		self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1096
1097		Ok(())
1098	}
1099
1100	fn add_peers_to_reserved_set(
1101		&self,
1102		protocol: ProtocolName,
1103		peers: HashSet<pezsc_network_types::multiaddr::Multiaddr>,
1104	) -> Result<(), String> {
1105		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1106			return Err(format!(
1107				"Cannot add peers to reserved set of unknown protocol: {}",
1108				protocol
1109			));
1110		};
1111
1112		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1113		let peers = self.split_multiaddr_and_peer_id(peers)?;
1114
1115		for (peer_id, addr) in peers.into_iter() {
1116			// Make sure the local peer ID is never added to the PSM.
1117			if peer_id == self.local_peer_id {
1118				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1119			}
1120
1121			if !addr.is_empty() {
1122				let _ = self
1123					.to_worker
1124					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1125			}
1126
1127			self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1128		}
1129
1130		Ok(())
1131	}
1132
1133	fn remove_peers_from_reserved_set(
1134		&self,
1135		protocol: ProtocolName,
1136		peers: Vec<pezsc_network_types::PeerId>,
1137	) -> Result<(), String> {
1138		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1139			return Err(format!(
1140				"Cannot remove peers from reserved set of unknown protocol: {}",
1141				protocol
1142			));
1143		};
1144
1145		for peer_id in peers.into_iter() {
1146			self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1147		}
1148
1149		Ok(())
1150	}
1151
1152	fn sync_num_connected(&self) -> usize {
1153		self.num_connected.load(Ordering::Relaxed)
1154	}
1155
1156	fn peer_role(
1157		&self,
1158		peer_id: pezsc_network_types::PeerId,
1159		handshake: Vec<u8>,
1160	) -> Option<ObservedRole> {
1161		match Roles::decode_all(&mut &handshake[..]) {
1162			Ok(role) => Some(role.into()),
1163			Err(_) => {
1164				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1165				self.peer_store_handle.peer_role(&(peer_id.into()))
1166			},
1167		}
1168	}
1169
1170	/// Get the list of reserved peers.
1171	///
1172	/// Returns an error if the `NetworkWorker` is no longer running.
1173	async fn reserved_peers(&self) -> Result<Vec<pezsc_network_types::PeerId>, ()> {
1174		let (tx, rx) = oneshot::channel();
1175
1176		self.sync_protocol_handle.reserved_peers(tx);
1177
1178		// The channel can only be closed if `ProtocolController` no longer exists.
1179		rx.await
1180			.map(|peers| peers.into_iter().map(From::from).collect())
1181			.map_err(|_| ())
1182	}
1183}
1184
1185impl<B, H> NetworkEventStream for NetworkService<B, H>
1186where
1187	B: BlockT + 'static,
1188	H: ExHashT,
1189{
1190	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1191		let (tx, rx) = out_events::channel(name, 100_000);
1192		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1193		Box::pin(rx)
1194	}
1195}
1196
1197#[async_trait::async_trait]
1198impl<B, H> NetworkRequest for NetworkService<B, H>
1199where
1200	B: BlockT + 'static,
1201	H: ExHashT,
1202{
1203	async fn request(
1204		&self,
1205		target: pezsc_network_types::PeerId,
1206		protocol: ProtocolName,
1207		request: Vec<u8>,
1208		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1209		connect: IfDisconnected,
1210	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1211		let (tx, rx) = oneshot::channel();
1212
1213		self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1214
1215		match rx.await {
1216			Ok(v) => v,
1217			// The channel can only be closed if the network worker no longer exists. If the
1218			// network worker no longer exists, then all connections to `target` are necessarily
1219			// closed, and we legitimately report this situation as a "ConnectionClosed".
1220			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1221		}
1222	}
1223
1224	fn start_request(
1225		&self,
1226		target: pezsc_network_types::PeerId,
1227		protocol: ProtocolName,
1228		request: Vec<u8>,
1229		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1230		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1231		connect: IfDisconnected,
1232	) {
1233		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1234			target: target.into(),
1235			protocol: protocol.into(),
1236			request,
1237			fallback_request,
1238			pending_response: tx,
1239			connect,
1240		});
1241	}
1242}
1243
1244/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
1245#[must_use]
1246pub struct NotificationSender {
1247	sink: NotificationsSink,
1248
1249	/// Name of the protocol on the wire.
1250	protocol_name: ProtocolName,
1251
1252	/// Field extracted from the [`Metrics`] struct and necessary to report the
1253	/// notifications-related metrics.
1254	notification_size_metric: Option<Histogram>,
1255}
1256
1257#[async_trait::async_trait]
1258impl NotificationSenderT for NotificationSender {
1259	async fn ready(
1260		&self,
1261	) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1262		Ok(Box::new(NotificationSenderReady {
1263			ready: match self.sink.reserve_notification().await {
1264				Ok(r) => Some(r),
1265				Err(()) => return Err(NotificationSenderError::Closed),
1266			},
1267			peer_id: self.sink.peer_id(),
1268			protocol_name: &self.protocol_name,
1269			notification_size_metric: self.notification_size_metric.clone(),
1270		}))
1271	}
1272}
1273
1274/// Reserved slot in the notifications buffer, ready to accept data.
1275#[must_use]
1276pub struct NotificationSenderReady<'a> {
1277	ready: Option<Ready<'a>>,
1278
1279	/// Target of the notification.
1280	peer_id: &'a PeerId,
1281
1282	/// Name of the protocol on the wire.
1283	protocol_name: &'a ProtocolName,
1284
1285	/// Field extracted from the [`Metrics`] struct and necessary to report the
1286	/// notifications-related metrics.
1287	notification_size_metric: Option<Histogram>,
1288}
1289
1290impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1291	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1292		if let Some(notification_size_metric) = &self.notification_size_metric {
1293			notification_size_metric.observe(notification.len() as f64);
1294		}
1295
1296		trace!(
1297			target: LOG_TARGET,
1298			"External API => Notification({:?}, {}, {} bytes)",
1299			self.peer_id, self.protocol_name, notification.len(),
1300		);
1301		trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1302
1303		self.ready
1304			.take()
1305			.ok_or(NotificationSenderError::Closed)?
1306			.send(notification)
1307			.map_err(|()| NotificationSenderError::Closed)
1308	}
1309}
1310
1311/// Messages sent from the `NetworkService` to the `NetworkWorker`.
1312///
1313/// Each entry corresponds to a method of `NetworkService`.
1314enum ServiceToWorkerMsg {
1315	FindClosestPeers(PeerId),
1316	GetValue(KademliaKey),
1317	PutValue(KademliaKey, Vec<u8>),
1318	PutRecordTo {
1319		record: Record,
1320		peers: HashSet<pezsc_network_types::PeerId>,
1321		update_local_storage: bool,
1322	},
1323	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1324	StartProviding(KademliaKey),
1325	StopProviding(KademliaKey),
1326	GetProviders(KademliaKey),
1327	AddKnownAddress(PeerId, Multiaddr),
1328	EventStream(out_events::Sender),
1329	Request {
1330		target: PeerId,
1331		protocol: ProtocolName,
1332		request: Vec<u8>,
1333		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1334		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1335		connect: IfDisconnected,
1336	},
1337	NetworkStatus {
1338		pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1339	},
1340	NetworkState {
1341		pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1342	},
1343	DisconnectPeer(PeerId, ProtocolName),
1344}
1345
1346/// Main network worker. Must be polled in order for the network to advance.
1347///
1348/// You are encouraged to poll this in a separate background thread or task.
1349#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1350pub struct NetworkWorker<B, H>
1351where
1352	B: BlockT + 'static,
1353	H: ExHashT,
1354{
1355	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1356	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1357	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1358	num_connected: Arc<AtomicUsize>,
1359	/// The network service that can be extracted and shared through the codebase.
1360	service: Arc<NetworkService<B, H>>,
1361	/// The *actual* network.
1362	network_service: Swarm<Behaviour<B>>,
1363	/// Messages from the [`NetworkService`] that must be processed.
1364	from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1365	/// Senders for events that happen on the network.
1366	event_streams: out_events::OutChannels,
1367	/// Prometheus network metrics.
1368	metrics: Option<Metrics>,
1369	/// The `PeerId`'s of all boot nodes mapped to the registered addresses.
1370	boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1371	/// Boot nodes that we already have reported as invalid.
1372	reported_invalid_boot_nodes: HashSet<PeerId>,
1373	/// Peer reputation store handle.
1374	peer_store_handle: Arc<dyn PeerStoreProvider>,
1375	/// Notification protocol handles.
1376	notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1377	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
1378	/// compatibility.
1379	_marker: PhantomData<H>,
1380	/// Marker for block type
1381	_block: PhantomData<B>,
1382}
1383
1384impl<B, H> NetworkWorker<B, H>
1385where
1386	B: BlockT + 'static,
1387	H: ExHashT,
1388{
1389	/// Run the network.
1390	pub async fn run(mut self) {
1391		while self.next_action().await {}
1392	}
1393
1394	/// Perform one action on the network.
1395	///
1396	/// Returns `false` when the worker should be shutdown.
1397	/// Use in tests only.
1398	pub async fn next_action(&mut self) -> bool {
1399		futures::select! {
1400			// Next message from the service.
1401			msg = self.from_service.next() => {
1402				if let Some(msg) = msg {
1403					self.handle_worker_message(msg);
1404				} else {
1405					return false
1406				}
1407			},
1408			// Next event from `Swarm` (the stream guaranteed to never terminate).
1409			event = self.network_service.select_next_some() => {
1410				self.handle_swarm_event(event);
1411			},
1412		};
1413
1414		// Update the `num_connected` count shared with the `NetworkService`.
1415		let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1416		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1417
1418		if let Some(metrics) = self.metrics.as_ref() {
1419			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1420				for (lower_ilog2_bucket_bound, num_entries) in buckets {
1421					metrics
1422						.kbuckets_num_nodes
1423						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1424						.set(num_entries as u64);
1425				}
1426			}
1427			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1428				metrics.kademlia_records_count.set(num_entries as u64);
1429			}
1430			if let Some(num_entries) =
1431				self.network_service.behaviour_mut().kademlia_records_total_size()
1432			{
1433				metrics.kademlia_records_sizes_total.set(num_entries as u64);
1434			}
1435
1436			metrics.pending_connections.set(
1437				Swarm::network_info(&self.network_service).connection_counters().num_pending()
1438					as u64,
1439			);
1440		}
1441
1442		true
1443	}
1444
1445	/// Process the next message coming from the `NetworkService`.
1446	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1447		match msg {
1448			ServiceToWorkerMsg::FindClosestPeers(target) => {
1449				self.network_service.behaviour_mut().find_closest_peers(target)
1450			},
1451			ServiceToWorkerMsg::GetValue(key) => {
1452				self.network_service.behaviour_mut().get_value(key.into())
1453			},
1454			ServiceToWorkerMsg::PutValue(key, value) => {
1455				self.network_service.behaviour_mut().put_value(key.into(), value)
1456			},
1457			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1458				.network_service
1459				.behaviour_mut()
1460				.put_record_to(record.into(), peers, update_local_storage),
1461			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1462				.network_service
1463				.behaviour_mut()
1464				.store_record(key.into(), value, publisher, expires),
1465			ServiceToWorkerMsg::StartProviding(key) => {
1466				self.network_service.behaviour_mut().start_providing(key.into())
1467			},
1468			ServiceToWorkerMsg::StopProviding(key) => {
1469				self.network_service.behaviour_mut().stop_providing(&key.into())
1470			},
1471			ServiceToWorkerMsg::GetProviders(key) => {
1472				self.network_service.behaviour_mut().get_providers(key.into())
1473			},
1474			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1475				self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1476			},
1477			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1478			ServiceToWorkerMsg::Request {
1479				target,
1480				protocol,
1481				request,
1482				fallback_request,
1483				pending_response,
1484				connect,
1485			} => {
1486				self.network_service.behaviour_mut().send_request(
1487					&target,
1488					protocol,
1489					request,
1490					fallback_request,
1491					pending_response,
1492					connect,
1493				);
1494			},
1495			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1496				let _ = pending_response.send(Ok(self.status()));
1497			},
1498			ServiceToWorkerMsg::NetworkState { pending_response } => {
1499				let _ = pending_response.send(Ok(self.network_state()));
1500			},
1501			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1502				.network_service
1503				.behaviour_mut()
1504				.user_protocol_mut()
1505				.disconnect_peer(&who, protocol_name),
1506		}
1507	}
1508
1509	/// Process the next event coming from `Swarm`.
1510	fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1511		match event {
1512			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1513				if let Some(metrics) = self.metrics.as_ref() {
1514					match result {
1515						Ok(serve_time) => {
1516							metrics
1517								.requests_in_success_total
1518								.with_label_values(&[&protocol])
1519								.observe(serve_time.as_secs_f64());
1520						},
1521						Err(err) => {
1522							let reason = match err {
1523								ResponseFailure::Network(InboundFailure::Timeout) => {
1524									Some("timeout")
1525								},
1526								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1527								// `UnsupportedProtocols` is reported for every single
1528								// inbound request whenever a request with an unsupported
1529								// protocol is received. This is not reported in order to
1530								// avoid confusions.
1531								{
1532									None
1533								},
1534								ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1535									Some("busy-omitted")
1536								},
1537								ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1538									Some("connection-closed")
1539								},
1540								ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1541							};
1542
1543							if let Some(reason) = reason {
1544								metrics
1545									.requests_in_failure_total
1546									.with_label_values(&[&protocol, reason])
1547									.inc();
1548							}
1549						},
1550					}
1551				}
1552			},
1553			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1554				protocol,
1555				duration,
1556				result,
1557				..
1558			}) => {
1559				if let Some(metrics) = self.metrics.as_ref() {
1560					match result {
1561						Ok(_) => {
1562							metrics
1563								.requests_out_success_total
1564								.with_label_values(&[&protocol])
1565								.observe(duration.as_secs_f64());
1566						},
1567						Err(err) => {
1568							let reason = match err {
1569								RequestFailure::NotConnected => "not-connected",
1570								RequestFailure::UnknownProtocol => "unknown-protocol",
1571								RequestFailure::Refused => "refused",
1572								RequestFailure::Obsolete => "obsolete",
1573								RequestFailure::Network(OutboundFailure::DialFailure) => {
1574									"dial-failure"
1575								},
1576								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1577								RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1578									"connection-closed"
1579								},
1580								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1581									"unsupported"
1582								},
1583								RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1584							};
1585
1586							metrics
1587								.requests_out_failure_total
1588								.with_label_values(&[&protocol, reason])
1589								.inc();
1590						},
1591					}
1592				}
1593			},
1594			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1595				for change in changes {
1596					self.peer_store_handle.report_peer(peer.into(), change);
1597				}
1598			},
1599			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1600				peer_id,
1601				info:
1602					IdentifyInfo {
1603						protocol_version, agent_version, mut listen_addrs, protocols, ..
1604					},
1605			}) => {
1606				if listen_addrs.len() > 30 {
1607					debug!(
1608						target: LOG_TARGET,
1609						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1610						peer_id, protocol_version, agent_version
1611					);
1612					listen_addrs.truncate(30);
1613				}
1614				for addr in listen_addrs {
1615					self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1616						&peer_id,
1617						&protocols,
1618						addr.clone(),
1619					);
1620				}
1621				self.peer_store_handle.add_known_peer(peer_id.into());
1622			},
1623			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1624				self.peer_store_handle.add_known_peer(peer_id.into());
1625			},
1626			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1627				if let Some(metrics) = self.metrics.as_ref() {
1628					metrics.kademlia_random_queries_total.inc();
1629				}
1630			},
1631			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1632				remote,
1633				set_id,
1634				direction,
1635				negotiated_fallback,
1636				notifications_sink,
1637				received_handshake,
1638			}) => {
1639				let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1640					remote,
1641					direction,
1642					received_handshake,
1643					negotiated_fallback,
1644					notifications_sink,
1645				);
1646			},
1647			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1648				remote,
1649				set_id,
1650				notifications_sink,
1651			}) => {
1652				let _ = self.notif_protocol_handles[usize::from(set_id)]
1653					.report_notification_sink_replaced(remote, notifications_sink);
1654
1655				// TODO: Notifications might have been lost as a result of the previous
1656				// connection being dropped, and as a result it would be preferable to notify
1657				// the users of this fact by simulating the substream being closed then
1658				// reopened.
1659				// The code below doesn't compile because `role` is unknown. Propagating the
1660				// handshake of the secondary connections is quite an invasive change and
1661				// would conflict with https://github.com/pezkuwichain/pezkuwi-sdk/issues/197.
1662				// Considering that dropping notifications is generally regarded as
1663				// acceptable, this bug is at the moment intentionally left there and is
1664				// intended to be fixed at the same time as
1665				// https://github.com/pezkuwichain/pezkuwi-sdk/issues/197.
1666				// self.event_streams.send(Event::NotificationStreamClosed {
1667				// remote,
1668				// protocol,
1669				// });
1670				// self.event_streams.send(Event::NotificationStreamOpened {
1671				// remote,
1672				// protocol,
1673				// role,
1674				// });
1675			},
1676			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1677				let _ = self.notif_protocol_handles[usize::from(set_id)]
1678					.report_substream_closed(remote);
1679			},
1680			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1681				remote,
1682				set_id,
1683				notification,
1684			}) => {
1685				let _ = self.notif_protocol_handles[usize::from(set_id)]
1686					.report_notification_received(remote, notification);
1687			},
1688			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1689				match (self.metrics.as_ref(), duration) {
1690					(Some(metrics), Some(duration)) => {
1691						let query_type = match event {
1692							DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1693							DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1694							DhtEvent::ValueFound(_) => "value-found",
1695							DhtEvent::ValueNotFound(_) => "value-not-found",
1696							DhtEvent::ValuePut(_) => "value-put",
1697							DhtEvent::ValuePutFailed(_) => "value-put-failed",
1698							DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1699							DhtEvent::StartedProviding(_) => "started-providing",
1700							DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1701							DhtEvent::ProvidersFound(_, _) => "providers-found",
1702							DhtEvent::NoMoreProviders(_) => "no-more-providers",
1703							DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1704						};
1705						metrics
1706							.kademlia_query_duration
1707							.with_label_values(&[query_type])
1708							.observe(duration.as_secs_f64());
1709					},
1710					_ => {},
1711				}
1712
1713				self.event_streams.send(Event::Dht(event));
1714			},
1715			SwarmEvent::Behaviour(BehaviourOut::None) => {
1716				// Ignored event from lower layers.
1717			},
1718			SwarmEvent::ConnectionEstablished {
1719				peer_id,
1720				endpoint,
1721				num_established,
1722				concurrent_dial_errors,
1723				..
1724			} => {
1725				if let Some(errors) = concurrent_dial_errors {
1726					debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1727				} else {
1728					debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1729				}
1730
1731				if let Some(metrics) = self.metrics.as_ref() {
1732					let direction = match endpoint {
1733						ConnectedPoint::Dialer { .. } => "out",
1734						ConnectedPoint::Listener { .. } => "in",
1735					};
1736					metrics.connections_opened_total.with_label_values(&[direction]).inc();
1737
1738					if num_established.get() == 1 {
1739						metrics.distinct_peers_connections_opened_total.inc();
1740					}
1741				}
1742			},
1743			SwarmEvent::ConnectionClosed {
1744				connection_id,
1745				peer_id,
1746				cause,
1747				endpoint,
1748				num_established,
1749			} => {
1750				debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1751				if let Some(metrics) = self.metrics.as_ref() {
1752					let direction = match endpoint {
1753						ConnectedPoint::Dialer { .. } => "out",
1754						ConnectedPoint::Listener { .. } => "in",
1755					};
1756					let reason = match cause {
1757						Some(ConnectionError::IO(_)) => "transport-error",
1758						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1759						None => "actively-closed",
1760					};
1761					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1762
1763					// `num_established` represents the number of *remaining* connections.
1764					if num_established == 0 {
1765						metrics.distinct_peers_connections_closed_total.inc();
1766					}
1767				}
1768			},
1769			SwarmEvent::NewListenAddr { address, .. } => {
1770				trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1771				if let Some(metrics) = self.metrics.as_ref() {
1772					metrics.listeners_local_addresses.inc();
1773				}
1774				self.listen_addresses.lock().insert(address.clone());
1775			},
1776			SwarmEvent::ExpiredListenAddr { address, .. } => {
1777				info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1778				if let Some(metrics) = self.metrics.as_ref() {
1779					metrics.listeners_local_addresses.dec();
1780				}
1781				self.listen_addresses.lock().remove(&address);
1782			},
1783			SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1784				if let Some(peer_id) = peer_id {
1785					trace!(
1786						target: LOG_TARGET,
1787						"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1788					);
1789
1790					let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1791
1792					if let Some(addresses) =
1793						not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1794					{
1795						if let DialError::WrongPeerId { obtained, address } = &error {
1796							{
1797								let address_without_peer_id = parse_addr(address.clone().into())
1798									.map_or_else(|_| address.clone(), |r| r.1.into());
1799
1800								// Only report for address of boot node that was added at startup of
1801								// the node and not for any address that the node learned of the
1802								// boot node.
1803								if addresses.iter().any(|a| address_without_peer_id == *a) {
1804									warn!(
1805										"💔 The bootnode you want to connect to at `{address}` provided a \
1806										 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1807									);
1808
1809									self.reported_invalid_boot_nodes.insert(peer_id);
1810								}
1811							}
1812						}
1813					}
1814				}
1815
1816				if let Some(metrics) = self.metrics.as_ref() {
1817					let reason = match error {
1818						DialError::Denied { cause } => {
1819							if cause.downcast::<Exceeded>().is_ok() {
1820								Some("limit-reached")
1821							} else {
1822								None
1823							}
1824						},
1825						DialError::LocalPeerId { .. } => Some("local-peer-id"),
1826						DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1827						DialError::Transport(_) => Some("transport-error"),
1828						DialError::NoAddresses
1829						| DialError::DialPeerConditionFalse(_)
1830						| DialError::Aborted => None, // ignore them
1831					};
1832					if let Some(reason) = reason {
1833						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1834					}
1835				}
1836			},
1837			SwarmEvent::Dialing { connection_id, peer_id } => {
1838				trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1839			},
1840			SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1841				trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1842				if let Some(metrics) = self.metrics.as_ref() {
1843					metrics.incoming_connections_total.inc();
1844				}
1845			},
1846			SwarmEvent::IncomingConnectionError {
1847				connection_id,
1848				local_addr,
1849				send_back_addr,
1850				error,
1851				..
1852			} => {
1853				debug!(
1854					target: LOG_TARGET,
1855					"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1856				);
1857				if let Some(metrics) = self.metrics.as_ref() {
1858					let reason = match error {
1859						ListenError::Denied { cause } => {
1860							if cause.downcast::<Exceeded>().is_ok() {
1861								Some("limit-reached")
1862							} else {
1863								None
1864							}
1865						},
1866						ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1867							Some("invalid-peer-id")
1868						},
1869						ListenError::Transport(_) => Some("transport-error"),
1870						ListenError::Aborted => None, // ignore it
1871					};
1872
1873					if let Some(reason) = reason {
1874						metrics
1875							.incoming_connections_errors_total
1876							.with_label_values(&[reason])
1877							.inc();
1878					}
1879				}
1880			},
1881			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1882				if let Some(metrics) = self.metrics.as_ref() {
1883					metrics.listeners_local_addresses.sub(addresses.len() as u64);
1884				}
1885				let mut listen_addresses = self.listen_addresses.lock();
1886				for addr in &addresses {
1887					listen_addresses.remove(addr);
1888				}
1889				drop(listen_addresses);
1890
1891				let addrs =
1892					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1893				match reason {
1894					Ok(()) => error!(
1895						target: LOG_TARGET,
1896						"📪 Libp2p listener ({}) closed gracefully",
1897						addrs
1898					),
1899					Err(e) => error!(
1900						target: LOG_TARGET,
1901						"📪 Libp2p listener ({}) closed: {}",
1902						addrs, e
1903					),
1904				}
1905			},
1906			SwarmEvent::ListenerError { error, .. } => {
1907				debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1908				if let Some(metrics) = self.metrics.as_ref() {
1909					metrics.listeners_errors_total.inc();
1910				}
1911			},
1912			SwarmEvent::NewExternalAddrCandidate { address } => {
1913				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1914			},
1915			SwarmEvent::ExternalAddrConfirmed { address } => {
1916				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1917			},
1918			SwarmEvent::ExternalAddrExpired { address } => {
1919				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1920			},
1921			SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1922				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1923			},
1924			event => {
1925				warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1926			},
1927		}
1928	}
1929}
1930
1931impl<B, H> Unpin for NetworkWorker<B, H>
1932where
1933	B: BlockT + 'static,
1934	H: ExHashT,
1935{
1936}
1937
1938pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1939	addresses: impl Iterator<Item = &'a pezsc_network_types::multiaddr::Multiaddr>,
1940	transport: &TransportConfig,
1941) -> Result<(), Error> {
1942	use pezsc_network_types::multiaddr::Protocol;
1943
1944	if matches!(transport, TransportConfig::MemoryOnly) {
1945		let addresses: Vec<_> = addresses
1946			.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1947			.cloned()
1948			.collect();
1949
1950		if !addresses.is_empty() {
1951			return Err(Error::AddressesForAnotherTransport {
1952				transport: transport.clone(),
1953				addresses,
1954			});
1955		}
1956	} else {
1957		let addresses: Vec<_> = addresses
1958			.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1959			.cloned()
1960			.collect();
1961
1962		if !addresses.is_empty() {
1963			return Err(Error::AddressesForAnotherTransport {
1964				transport: transport.clone(),
1965				addresses,
1966			});
1967		}
1968	}
1969
1970	Ok(())
1971}