Skip to main content

sc_network/
service.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Main entry point of the sc-network crate.
20//!
21//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`].
22//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that
23//! terminates only when all instances of the control handles [`NetworkService`] were dropped.
24//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an
25//! `Arc<NetworkService>` by calling [`NetworkWorker::service`].
26//!
27//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
28//! which is then processed by [`NetworkWorker::next_action`].
29
30use crate::{
31	behaviour::{self, Behaviour, BehaviourOut},
32	bitswap::BitswapRequestHandler,
33	config::{
34		parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35		NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36	},
37	discovery::DiscoveryConfig,
38	error::Error,
39	event::{DhtEvent, Event},
40	network_state::{
41		NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42	},
43	peer_store::{PeerStore, PeerStoreProvider},
44	protocol::{self, Protocol, Ready},
45	protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46	request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47	service::{
48		signature::{Signature, SigningError},
49		traits::{
50			BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51			NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52			NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53			NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54		},
55	},
56	transport,
57	types::ProtocolName,
58	NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use futures::{channel::oneshot, prelude::*};
63use libp2p::{
64	connection_limits::{ConnectionLimits, Exceeded},
65	core::{upgrade, ConnectedPoint, Endpoint},
66	identify::Info as IdentifyInfo,
67	identity::ed25519,
68	multiaddr::{self, Multiaddr},
69	swarm::{
70		Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
71		NetworkBehaviour, Swarm, SwarmEvent,
72	},
73	PeerId,
74};
75use log::{debug, error, info, trace, warn};
76use metrics::{Histogram, MetricSources, Metrics};
77use parking_lot::Mutex;
78use prometheus_endpoint::Registry;
79use sc_network_types::kad::{Key as KademliaKey, Record};
80
81use sc_client_api::BlockBackend;
82use sc_network_common::{
83	role::{ObservedRole, Roles},
84	ExHashT,
85};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
87use sp_runtime::traits::Block as BlockT;
88
89pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
90pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
91pub use metrics::NotificationMetrics;
92pub use protocol::NotificationsSink;
93use std::{
94	collections::{HashMap, HashSet},
95	fs, iter,
96	marker::PhantomData,
97	num::NonZeroUsize,
98	pin::Pin,
99	str,
100	sync::{
101		atomic::{AtomicUsize, Ordering},
102		Arc,
103	},
104	time::{Duration, Instant},
105};
106
107pub(crate) mod metrics;
108pub(crate) mod out_events;
109
110pub mod signature;
111pub mod traits;
112
113/// Logging target for the file.
114const LOG_TARGET: &str = "sub-libp2p";
115
116struct Libp2pBandwidthSink {
117	#[allow(deprecated)]
118	sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122	fn total_inbound(&self) -> u64 {
123		self.sink.total_inbound()
124	}
125
126	fn total_outbound(&self) -> u64 {
127		self.sink.total_outbound()
128	}
129}
130
131/// Substrate network service. Handles network IO and manages connectivity.
132pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133	/// Number of peers we're connected to.
134	num_connected: Arc<AtomicUsize>,
135	/// The local external addresses.
136	external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137	/// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`.
138	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139	/// Local copy of the `PeerId` of the local node.
140	local_peer_id: PeerId,
141	/// The `KeyPair` that defines the `PeerId` of the local node.
142	local_identity: Keypair,
143	/// Bandwidth logging system. Can be queried to know the average bandwidth consumed.
144	bandwidth: Arc<dyn BandwidthSink>,
145	/// Channel that sends messages to the actual worker.
146	to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147	/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
148	/// initialization.
149	notification_protocol_ids: HashMap<ProtocolName, SetId>,
150	/// Handles to manage peer connections on notification protocols. The vector never changes
151	/// after initialization.
152	protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153	/// Shortcut to sync protocol handle (`protocol_handles[0]`).
154	sync_protocol_handle: protocol_controller::ProtocolHandle,
155	/// Handle to `PeerStore`.
156	peer_store_handle: Arc<dyn PeerStoreProvider>,
157	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
158	/// compatibility.
159	_marker: PhantomData<H>,
160	/// Marker for block type
161	_block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167	B: BlockT + 'static,
168	H: ExHashT,
169{
170	type NotificationProtocolConfig = NonDefaultSetConfig;
171	type RequestResponseProtocolConfig = RequestResponseConfig;
172	type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173	type PeerStore = PeerStore;
174	type BitswapConfig = RequestResponseConfig;
175
176	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177	where
178		Self: Sized,
179	{
180		NetworkWorker::new(params)
181	}
182
183	/// Get handle to `NetworkService` of the `NetworkBackend`.
184	fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185		self.service.clone()
186	}
187
188	/// Create `PeerStore`.
189	fn peer_store(
190		bootnodes: Vec<sc_network_types::PeerId>,
191		metrics_registry: Option<Registry>,
192	) -> Self::PeerStore {
193		PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194	}
195
196	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197		NotificationMetrics::new(registry)
198	}
199
200	fn bitswap_server(
201		client: Arc<dyn BlockBackend<B> + Send + Sync>,
202	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
203		let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
204
205		(Box::pin(async move { handler.run().await }), protocol_config)
206	}
207
208	/// Create notification protocol configuration.
209	fn notification_config(
210		protocol_name: ProtocolName,
211		fallback_names: Vec<ProtocolName>,
212		max_notification_size: u64,
213		handshake: Option<NotificationHandshake>,
214		set_config: SetConfig,
215		_metrics: NotificationMetrics,
216		_peerstore_handle: Arc<dyn PeerStoreProvider>,
217	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
218		NonDefaultSetConfig::new(
219			protocol_name,
220			fallback_names,
221			max_notification_size,
222			handshake,
223			set_config,
224		)
225	}
226
227	/// Create request-response protocol configuration.
228	fn request_response_config(
229		protocol_name: ProtocolName,
230		fallback_names: Vec<ProtocolName>,
231		max_request_size: u64,
232		max_response_size: u64,
233		request_timeout: Duration,
234		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
235	) -> Self::RequestResponseProtocolConfig {
236		Self::RequestResponseProtocolConfig {
237			name: protocol_name,
238			fallback_names,
239			max_request_size,
240			max_response_size,
241			request_timeout,
242			inbound_queue,
243		}
244	}
245
246	/// Start [`NetworkBackend`] event loop.
247	async fn run(mut self) {
248		self.run().await
249	}
250}
251
252impl<B, H> NetworkWorker<B, H>
253where
254	B: BlockT + 'static,
255	H: ExHashT,
256{
257	/// Creates the network service.
258	///
259	/// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order
260	/// for the network processing to advance. From it, you can extract a `NetworkService` using
261	/// `worker.service()`. The `NetworkService` can be shared through the codebase.
262	pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
263		let peer_store_handle = params.network_config.peer_store_handle();
264		let FullNetworkConfiguration {
265			notification_protocols,
266			request_response_protocols,
267			mut network_config,
268			..
269		} = params.network_config;
270
271		// Private and public keys configuration.
272		let local_identity = network_config.node_key.clone().into_keypair()?;
273		let local_public = local_identity.public();
274		let local_peer_id = local_public.to_peer_id();
275
276		// Convert to libp2p types.
277		let local_identity: ed25519::Keypair = local_identity.into();
278		let local_public: ed25519::PublicKey = local_public.into();
279		let local_peer_id: PeerId = local_peer_id.into();
280
281		network_config.boot_nodes = network_config
282			.boot_nodes
283			.into_iter()
284			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
285			.collect();
286		network_config.default_peers_set.reserved_nodes = network_config
287			.default_peers_set
288			.reserved_nodes
289			.into_iter()
290			.filter(|reserved_node| {
291				if reserved_node.peer_id == local_peer_id.into() {
292					warn!(
293						target: LOG_TARGET,
294						"Local peer ID used in reserved node, ignoring: {}",
295						reserved_node,
296					);
297					false
298				} else {
299					true
300				}
301			})
302			.collect();
303
304		// Ensure the listen addresses are consistent with the transport.
305		ensure_addresses_consistent_with_transport(
306			network_config.listen_addresses.iter(),
307			&network_config.transport,
308		)?;
309		ensure_addresses_consistent_with_transport(
310			network_config.boot_nodes.iter().map(|x| &x.multiaddr),
311			&network_config.transport,
312		)?;
313		ensure_addresses_consistent_with_transport(
314			network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
315			&network_config.transport,
316		)?;
317		for notification_protocol in &notification_protocols {
318			ensure_addresses_consistent_with_transport(
319				notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
320				&network_config.transport,
321			)?;
322		}
323		ensure_addresses_consistent_with_transport(
324			network_config.public_addresses.iter(),
325			&network_config.transport,
326		)?;
327
328		let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
329
330		if let Some(path) = &network_config.net_config_path {
331			fs::create_dir_all(path)?;
332		}
333
334		info!(
335			target: LOG_TARGET,
336			"🏷  Local node identity is: {}",
337			local_peer_id.to_base58(),
338		);
339		info!(target: LOG_TARGET, "Running libp2p network backend");
340
341		let (transport, bandwidth) = {
342			let config_mem = match network_config.transport {
343				TransportConfig::MemoryOnly => true,
344				TransportConfig::Normal { .. } => false,
345			};
346
347			transport::build_transport(local_identity.clone().into(), config_mem)
348		};
349
350		let (to_notifications, from_protocol_controllers) =
351			tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
352
353		// We must prepend a hardcoded default peer set to notification protocols.
354		let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
355			.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
356
357		let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
358			.enumerate()
359			.map(|(set_id, set_config)| {
360				let proto_set_config = ProtoSetConfig {
361					in_peers: set_config.in_peers,
362					out_peers: set_config.out_peers,
363					reserved_nodes: set_config
364						.reserved_nodes
365						.iter()
366						.map(|node| node.peer_id.into())
367						.collect(),
368					reserved_only: set_config.non_reserved_mode.is_reserved_only(),
369				};
370
371				ProtocolController::new(
372					SetId::from(set_id),
373					proto_set_config,
374					to_notifications.clone(),
375					Arc::clone(&peer_store_handle),
376				)
377			})
378			.unzip();
379
380		// Shortcut to default (sync) peer set protocol handle.
381		let sync_protocol_handle = protocol_handles[0].clone();
382
383		// Spawn `ProtocolController` runners.
384		protocol_controllers
385			.into_iter()
386			.for_each(|controller| (params.executor)(controller.run().boxed()));
387
388		// Protocol name to protocol id mapping. The first protocol is always block announce (sync)
389		// protocol, aka default (hardcoded) peer set.
390		let notification_protocol_ids: HashMap<ProtocolName, SetId> =
391			iter::once(&params.block_announce_config)
392				.chain(notification_protocols.iter())
393				.enumerate()
394				.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
395				.collect();
396
397		let known_addresses = {
398			// Collect all reserved nodes and bootnodes addresses.
399			let mut addresses: Vec<_> = network_config
400				.default_peers_set
401				.reserved_nodes
402				.iter()
403				.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
404				.chain(notification_protocols.iter().flat_map(|protocol| {
405					protocol
406						.set_config()
407						.reserved_nodes
408						.iter()
409						.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
410				}))
411				.chain(
412					network_config
413						.boot_nodes
414						.iter()
415						.map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
416				)
417				.collect();
418
419			// Remove possible duplicates.
420			addresses.sort();
421			addresses.dedup();
422
423			addresses
424		};
425
426		// Check for duplicate bootnodes.
427		network_config.boot_nodes.iter().try_for_each(|bootnode| {
428			if let Some(other) = network_config
429				.boot_nodes
430				.iter()
431				.filter(|o| o.multiaddr == bootnode.multiaddr)
432				.find(|o| o.peer_id != bootnode.peer_id)
433			{
434				Err(Error::DuplicateBootnode {
435					address: bootnode.multiaddr.clone().into(),
436					first_id: bootnode.peer_id.into(),
437					second_id: other.peer_id.into(),
438				})
439			} else {
440				Ok(())
441			}
442		})?;
443
444		// List of bootnode multiaddresses.
445		let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
446
447		for bootnode in network_config.boot_nodes.iter() {
448			boot_node_ids
449				.entry(bootnode.peer_id.into())
450				.or_default()
451				.push(bootnode.multiaddr.clone().into());
452		}
453
454		let boot_node_ids = Arc::new(boot_node_ids);
455
456		let num_connected = Arc::new(AtomicUsize::new(0));
457		let external_addresses = Arc::new(Mutex::new(HashSet::new()));
458
459		let (protocol, notif_protocol_handles) = Protocol::new(
460			From::from(&params.role),
461			params.notification_metrics,
462			notification_protocols,
463			params.block_announce_config,
464			Arc::clone(&peer_store_handle),
465			protocol_handles.clone(),
466			from_protocol_controllers,
467		)?;
468
469		// Build the swarm.
470		let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
471			let user_agent =
472				format!("{} ({})", network_config.client_version, network_config.node_name);
473
474			let discovery_config = {
475				let mut config = DiscoveryConfig::new(local_peer_id);
476				config.with_permanent_addresses(
477					known_addresses
478						.iter()
479						.map(|(peer, address)| (peer.into(), address.clone().into()))
480						.collect::<Vec<_>>(),
481				);
482				config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
483				config.with_kademlia(
484					params.genesis_hash,
485					params.fork_id.as_deref(),
486					&params.protocol_id,
487				);
488				config.with_dht_random_walk(network_config.enable_dht_random_walk);
489				config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
490				config.use_kademlia_disjoint_query_paths(
491					network_config.kademlia_disjoint_query_paths,
492				);
493				config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
494
495				match network_config.transport {
496					TransportConfig::MemoryOnly => {
497						config.with_mdns(false);
498						config.allow_private_ip(false);
499					},
500					TransportConfig::Normal {
501						enable_mdns,
502						allow_private_ip: allow_private_ipv4,
503						..
504					} => {
505						config.with_mdns(enable_mdns);
506						config.allow_private_ip(allow_private_ipv4);
507					},
508				}
509
510				config
511			};
512
513			let behaviour = {
514				let result = Behaviour::new(
515					protocol,
516					user_agent,
517					local_public.into(),
518					discovery_config,
519					request_response_protocols,
520					Arc::clone(&peer_store_handle),
521					external_addresses.clone(),
522					network_config.public_addresses.iter().cloned().map(Into::into).collect(),
523					ConnectionLimits::default()
524						.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
525						.with_max_established_incoming(Some(
526							crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
527						)),
528				);
529
530				match result {
531					Ok(b) => b,
532					Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
533						return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
534					},
535				}
536			};
537
538			let swarm = {
539				struct SpawnImpl<F>(F);
540				impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
541					fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
542						(self.0)(f)
543					}
544				}
545
546				let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
547					.with_substream_upgrade_protocol_override(upgrade::Version::V1)
548					.with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
549					// NOTE: 24 is somewhat arbitrary and should be tuned in the future if
550					// necessary. See <https://github.com/paritytech/substrate/pull/6080>
551					.with_per_connection_event_buffer_size(24)
552					.with_max_negotiating_inbound_streams(2048)
553					.with_idle_connection_timeout(network_config.idle_connection_timeout);
554
555				Swarm::new(transport, behaviour, local_peer_id, config)
556			};
557
558			(swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
559		};
560
561		// Initialize the metrics.
562		let metrics = match &params.metrics_registry {
563			Some(registry) => Some(metrics::register(
564				registry,
565				MetricSources {
566					bandwidth: bandwidth.clone(),
567					connected_peers: num_connected.clone(),
568				},
569			)?),
570			None => None,
571		};
572
573		// Listen on multiaddresses.
574		for addr in &network_config.listen_addresses {
575			if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
576				warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
577			}
578		}
579
580		// Add external addresses.
581		for addr in &network_config.public_addresses {
582			Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
583		}
584
585		let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
586
587		let service = Arc::new(NetworkService {
588			bandwidth,
589			external_addresses,
590			listen_addresses: listen_addresses_set.clone(),
591			num_connected: num_connected.clone(),
592			local_peer_id,
593			local_identity: local_identity.into(),
594			to_worker,
595			notification_protocol_ids,
596			protocol_handles,
597			sync_protocol_handle,
598			peer_store_handle: Arc::clone(&peer_store_handle),
599			_marker: PhantomData,
600			_block: Default::default(),
601		});
602
603		Ok(NetworkWorker {
604			listen_addresses: listen_addresses_set,
605			num_connected,
606			network_service: swarm,
607			service,
608			from_service,
609			event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
610			metrics,
611			boot_node_ids,
612			reported_invalid_boot_nodes: Default::default(),
613			peer_store_handle: Arc::clone(&peer_store_handle),
614			notif_protocol_handles,
615			_marker: Default::default(),
616			_block: Default::default(),
617		})
618	}
619
620	/// High-level network status information.
621	pub fn status(&self) -> NetworkStatus {
622		NetworkStatus {
623			num_connected_peers: self.num_connected_peers(),
624			total_bytes_inbound: self.total_bytes_inbound(),
625			total_bytes_outbound: self.total_bytes_outbound(),
626		}
627	}
628
629	/// Returns the total number of bytes received so far.
630	pub fn total_bytes_inbound(&self) -> u64 {
631		self.service.bandwidth.total_inbound()
632	}
633
634	/// Returns the total number of bytes sent so far.
635	pub fn total_bytes_outbound(&self) -> u64 {
636		self.service.bandwidth.total_outbound()
637	}
638
639	/// Returns the number of peers we're connected to.
640	pub fn num_connected_peers(&self) -> usize {
641		self.network_service.behaviour().user_protocol().num_sync_peers()
642	}
643
644	/// Adds an address for a node.
645	pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
646		self.network_service.behaviour_mut().add_known_address(peer_id, addr);
647	}
648
649	/// Return a `NetworkService` that can be shared through the code base and can be used to
650	/// manipulate the worker.
651	pub fn service(&self) -> &Arc<NetworkService<B, H>> {
652		&self.service
653	}
654
655	/// Returns the local `PeerId`.
656	pub fn local_peer_id(&self) -> &PeerId {
657		Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
658	}
659
660	/// Returns the list of addresses we are listening on.
661	///
662	/// Does **NOT** include a trailing `/p2p/` with our `PeerId`.
663	pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
664		Swarm::<Behaviour<B>>::listeners(&self.network_service)
665	}
666
667	/// Get network state.
668	///
669	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
670	/// everywhere about this. Please don't use this function to retrieve actual information.
671	pub fn network_state(&mut self) -> NetworkState {
672		let swarm = &mut self.network_service;
673		let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
674		let connected_peers = {
675			let swarm = &mut *swarm;
676			open.iter()
677				.filter_map(move |peer_id| {
678					let known_addresses = if let Ok(addrs) =
679						NetworkBehaviour::handle_pending_outbound_connection(
680							swarm.behaviour_mut(),
681							ConnectionId::new_unchecked(0), // dummy value
682							Some(*peer_id),
683							&vec![],
684							Endpoint::Listener,
685						) {
686						addrs.into_iter().collect()
687					} else {
688						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
689						return None;
690					};
691
692					let endpoint = if let Some(e) =
693						swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
694					{
695						e.clone().into()
696					} else {
697						error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
698						and debug information about {:?}", peer_id);
699						return None;
700					};
701
702					Some((
703						peer_id.to_base58(),
704						NetworkStatePeer {
705							endpoint,
706							version_string: swarm
707								.behaviour_mut()
708								.node(peer_id)
709								.and_then(|i| i.client_version().map(|s| s.to_owned())),
710							latest_ping_time: swarm
711								.behaviour_mut()
712								.node(peer_id)
713								.and_then(|i| i.latest_ping()),
714							known_addresses,
715						},
716					))
717				})
718				.collect()
719		};
720
721		let not_connected_peers = {
722			let swarm = &mut *swarm;
723			swarm
724				.behaviour_mut()
725				.known_peers()
726				.into_iter()
727				.filter(|p| open.iter().all(|n| n != p))
728				.map(move |peer_id| {
729					let known_addresses = if let Ok(addrs) =
730						NetworkBehaviour::handle_pending_outbound_connection(
731							swarm.behaviour_mut(),
732							ConnectionId::new_unchecked(0), // dummy value
733							Some(peer_id),
734							&vec![],
735							Endpoint::Listener,
736						) {
737						addrs.into_iter().collect()
738					} else {
739						error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
740						Default::default()
741					};
742
743					(
744						peer_id.to_base58(),
745						NetworkStateNotConnectedPeer {
746							version_string: swarm
747								.behaviour_mut()
748								.node(&peer_id)
749								.and_then(|i| i.client_version().map(|s| s.to_owned())),
750							latest_ping_time: swarm
751								.behaviour_mut()
752								.node(&peer_id)
753								.and_then(|i| i.latest_ping()),
754							known_addresses,
755						},
756					)
757				})
758				.collect()
759		};
760
761		let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
762		let listened_addresses = swarm.listeners().cloned().collect();
763		let external_addresses = swarm.external_addresses().cloned().collect();
764
765		NetworkState {
766			peer_id,
767			listened_addresses,
768			external_addresses,
769			connected_peers,
770			not_connected_peers,
771			// TODO: Check what info we can include here.
772			//       Issue reference: https://github.com/paritytech/substrate/issues/14160.
773			peerset: serde_json::json!(
774				"Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
775			),
776		}
777	}
778
779	/// Removes a `PeerId` from the list of reserved peers.
780	pub fn remove_reserved_peer(&self, peer: PeerId) {
781		self.service.remove_reserved_peer(peer.into());
782	}
783
784	/// Adds a `PeerId` and its `Multiaddr` as reserved.
785	pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
786		self.service.add_reserved_peer(peer)
787	}
788}
789
790impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
791	/// Get network state.
792	///
793	/// **Note**: Use this only for debugging. This API is unstable. There are warnings literally
794	/// everywhere about this. Please don't use this function to retrieve actual information.
795	///
796	/// Returns an error if the `NetworkWorker` is no longer running.
797	pub async fn network_state(&self) -> Result<NetworkState, ()> {
798		let (tx, rx) = oneshot::channel();
799
800		let _ = self
801			.to_worker
802			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
803
804		match rx.await {
805			Ok(v) => v.map_err(|_| ()),
806			// The channel can only be closed if the network worker no longer exists.
807			Err(_) => Err(()),
808		}
809	}
810
811	/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
812	///
813	/// Returns an `Err` if one of the given addresses is invalid or contains an
814	/// invalid peer ID (which includes the local peer ID).
815	fn split_multiaddr_and_peer_id(
816		&self,
817		peers: HashSet<Multiaddr>,
818	) -> Result<Vec<(PeerId, Multiaddr)>, String> {
819		peers
820			.into_iter()
821			.map(|mut addr| {
822				let peer = match addr.pop() {
823					Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
824					_ => return Err("Missing PeerId from address".to_string()),
825				};
826
827				// Make sure the local peer ID is never added to the PSM
828				// or added as a "known address", even if given.
829				if peer == self.local_peer_id {
830					Err("Local peer ID in peer set.".to_string())
831				} else {
832					Ok((peer, addr))
833				}
834			})
835			.collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
836	}
837}
838
839impl<B, H> NetworkStateInfo for NetworkService<B, H>
840where
841	B: sp_runtime::traits::Block,
842	H: ExHashT,
843{
844	/// Returns the local external addresses.
845	fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
846		self.external_addresses.lock().iter().cloned().map(Into::into).collect()
847	}
848
849	/// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`).
850	fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
851		self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
852	}
853
854	/// Returns the local Peer ID.
855	fn local_peer_id(&self) -> sc_network_types::PeerId {
856		self.local_peer_id.into()
857	}
858}
859
860impl<B, H> NetworkSigner for NetworkService<B, H>
861where
862	B: sp_runtime::traits::Block,
863	H: ExHashT,
864{
865	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
866		let public_key = self.local_identity.public();
867		let bytes = self.local_identity.sign(msg.as_ref())?;
868
869		Ok(Signature {
870			public_key: crate::service::signature::PublicKey::Libp2p(public_key),
871			bytes,
872		})
873	}
874
875	fn verify(
876		&self,
877		peer_id: sc_network_types::PeerId,
878		public_key: &Vec<u8>,
879		signature: &Vec<u8>,
880		message: &Vec<u8>,
881	) -> Result<bool, String> {
882		let public_key =
883			PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
884		let peer_id: PeerId = peer_id.into();
885		let remote: libp2p::PeerId = public_key.to_peer_id();
886
887		Ok(peer_id == remote && public_key.verify(message, signature))
888	}
889}
890
891impl<B, H> NetworkDHTProvider for NetworkService<B, H>
892where
893	B: BlockT + 'static,
894	H: ExHashT,
895{
896	/// Start finding closest peerst to the target peer ID in the DHT.
897	///
898	/// This will generate either a `ClosestPeersFound` or a `ClosestPeersNotFound` event and pass
899	/// it as an item on the [`NetworkWorker`] stream.
900	fn find_closest_peers(&self, target: sc_network_types::PeerId) {
901		let _ = self
902			.to_worker
903			.unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
904	}
905
906	/// Start getting a value from the DHT.
907	///
908	/// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an
909	/// item on the [`NetworkWorker`] stream.
910	fn get_value(&self, key: &KademliaKey) {
911		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
912	}
913
914	/// Start putting a value in the DHT.
915	///
916	/// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an
917	/// item on the [`NetworkWorker`] stream.
918	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
919		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
920	}
921
922	fn put_record_to(
923		&self,
924		record: Record,
925		peers: HashSet<sc_network_types::PeerId>,
926		update_local_storage: bool,
927	) {
928		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
929			record,
930			peers,
931			update_local_storage,
932		});
933	}
934
935	fn store_record(
936		&self,
937		key: KademliaKey,
938		value: Vec<u8>,
939		publisher: Option<sc_network_types::PeerId>,
940		expires: Option<Instant>,
941	) {
942		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
943			key,
944			value,
945			publisher.map(Into::into),
946			expires,
947		));
948	}
949
950	fn start_providing(&self, key: KademliaKey) {
951		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
952	}
953
954	fn stop_providing(&self, key: KademliaKey) {
955		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
956	}
957
958	fn get_providers(&self, key: KademliaKey) {
959		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
960	}
961}
962
963#[async_trait::async_trait]
964impl<B, H> NetworkStatusProvider for NetworkService<B, H>
965where
966	B: BlockT + 'static,
967	H: ExHashT,
968{
969	async fn status(&self) -> Result<NetworkStatus, ()> {
970		let (tx, rx) = oneshot::channel();
971
972		let _ = self
973			.to_worker
974			.unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
975
976		match rx.await {
977			Ok(v) => v.map_err(|_| ()),
978			// The channel can only be closed if the network worker no longer exists.
979			Err(_) => Err(()),
980		}
981	}
982
983	async fn network_state(&self) -> Result<NetworkState, ()> {
984		let (tx, rx) = oneshot::channel();
985
986		let _ = self
987			.to_worker
988			.unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
989
990		match rx.await {
991			Ok(v) => v.map_err(|_| ()),
992			// The channel can only be closed if the network worker no longer exists.
993			Err(_) => Err(()),
994		}
995	}
996}
997
998#[async_trait::async_trait]
999impl<B, H> NetworkPeers for NetworkService<B, H>
1000where
1001	B: BlockT + 'static,
1002	H: ExHashT,
1003{
1004	fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1005		self.sync_protocol_handle
1006			.set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1007	}
1008
1009	fn set_authorized_only(&self, reserved_only: bool) {
1010		self.sync_protocol_handle.set_reserved_only(reserved_only);
1011	}
1012
1013	fn add_known_address(
1014		&self,
1015		peer_id: sc_network_types::PeerId,
1016		addr: sc_network_types::multiaddr::Multiaddr,
1017	) {
1018		let _ = self
1019			.to_worker
1020			.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1021	}
1022
1023	fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1024		self.peer_store_handle.report_peer(peer_id, cost_benefit);
1025	}
1026
1027	fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1028		self.peer_store_handle.peer_reputation(peer_id)
1029	}
1030
1031	fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1032		let _ = self
1033			.to_worker
1034			.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1035	}
1036
1037	fn accept_unreserved_peers(&self) {
1038		self.sync_protocol_handle.set_reserved_only(false);
1039	}
1040
1041	fn deny_unreserved_peers(&self) {
1042		self.sync_protocol_handle.set_reserved_only(true);
1043	}
1044
1045	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1046		// Make sure the local peer ID is never added as a reserved peer.
1047		if peer.peer_id == self.local_peer_id.into() {
1048			return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1049		}
1050
1051		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1052			peer.peer_id.into(),
1053			peer.multiaddr.into(),
1054		));
1055		self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1056
1057		Ok(())
1058	}
1059
1060	fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1061		self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1062	}
1063
1064	fn set_reserved_peers(
1065		&self,
1066		protocol: ProtocolName,
1067		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1068	) -> Result<(), String> {
1069		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1070			return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1071		};
1072
1073		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1074		let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1075
1076		let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1077
1078		for (peer_id, addr) in peers_addrs.into_iter() {
1079			// Make sure the local peer ID is never added to the PSM.
1080			if peer_id == self.local_peer_id {
1081				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1082			}
1083
1084			peers.insert(peer_id.into());
1085
1086			if !addr.is_empty() {
1087				let _ = self
1088					.to_worker
1089					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1090			}
1091		}
1092
1093		self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1094
1095		Ok(())
1096	}
1097
1098	fn add_peers_to_reserved_set(
1099		&self,
1100		protocol: ProtocolName,
1101		peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1102	) -> Result<(), String> {
1103		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1104			return Err(format!(
1105				"Cannot add peers to reserved set of unknown protocol: {}",
1106				protocol
1107			));
1108		};
1109
1110		let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1111		let peers = self.split_multiaddr_and_peer_id(peers)?;
1112
1113		for (peer_id, addr) in peers.into_iter() {
1114			// Make sure the local peer ID is never added to the PSM.
1115			if peer_id == self.local_peer_id {
1116				return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1117			}
1118
1119			if !addr.is_empty() {
1120				let _ = self
1121					.to_worker
1122					.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1123			}
1124
1125			self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1126		}
1127
1128		Ok(())
1129	}
1130
1131	fn remove_peers_from_reserved_set(
1132		&self,
1133		protocol: ProtocolName,
1134		peers: Vec<sc_network_types::PeerId>,
1135	) -> Result<(), String> {
1136		let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1137			return Err(format!(
1138				"Cannot remove peers from reserved set of unknown protocol: {}",
1139				protocol
1140			));
1141		};
1142
1143		for peer_id in peers.into_iter() {
1144			self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1145		}
1146
1147		Ok(())
1148	}
1149
1150	fn sync_num_connected(&self) -> usize {
1151		self.num_connected.load(Ordering::Relaxed)
1152	}
1153
1154	fn peer_role(
1155		&self,
1156		peer_id: sc_network_types::PeerId,
1157		handshake: Vec<u8>,
1158	) -> Option<ObservedRole> {
1159		match Roles::decode_all(&mut &handshake[..]) {
1160			Ok(role) => Some(role.into()),
1161			Err(_) => {
1162				log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1163				self.peer_store_handle.peer_role(&(peer_id.into()))
1164			},
1165		}
1166	}
1167
1168	/// Get the list of reserved peers.
1169	///
1170	/// Returns an error if the `NetworkWorker` is no longer running.
1171	async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1172		let (tx, rx) = oneshot::channel();
1173
1174		self.sync_protocol_handle.reserved_peers(tx);
1175
1176		// The channel can only be closed if `ProtocolController` no longer exists.
1177		rx.await
1178			.map(|peers| peers.into_iter().map(From::from).collect())
1179			.map_err(|_| ())
1180	}
1181}
1182
1183impl<B, H> NetworkEventStream for NetworkService<B, H>
1184where
1185	B: BlockT + 'static,
1186	H: ExHashT,
1187{
1188	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1189		let (tx, rx) = out_events::channel(name, 100_000);
1190		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1191		Box::pin(rx)
1192	}
1193}
1194
1195#[async_trait::async_trait]
1196impl<B, H> NetworkRequest for NetworkService<B, H>
1197where
1198	B: BlockT + 'static,
1199	H: ExHashT,
1200{
1201	async fn request(
1202		&self,
1203		target: sc_network_types::PeerId,
1204		protocol: ProtocolName,
1205		request: Vec<u8>,
1206		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1207		connect: IfDisconnected,
1208	) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1209		let (tx, rx) = oneshot::channel();
1210
1211		self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1212
1213		match rx.await {
1214			Ok(v) => v,
1215			// The channel can only be closed if the network worker no longer exists. If the
1216			// network worker no longer exists, then all connections to `target` are necessarily
1217			// closed, and we legitimately report this situation as a "ConnectionClosed".
1218			Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1219		}
1220	}
1221
1222	fn start_request(
1223		&self,
1224		target: sc_network_types::PeerId,
1225		protocol: ProtocolName,
1226		request: Vec<u8>,
1227		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1228		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1229		connect: IfDisconnected,
1230	) {
1231		let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1232			target: target.into(),
1233			protocol: protocol.into(),
1234			request,
1235			fallback_request,
1236			pending_response: tx,
1237			connect,
1238		});
1239	}
1240}
1241
1242/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
1243#[must_use]
1244pub struct NotificationSender {
1245	sink: NotificationsSink,
1246
1247	/// Name of the protocol on the wire.
1248	protocol_name: ProtocolName,
1249
1250	/// Field extracted from the [`Metrics`] struct and necessary to report the
1251	/// notifications-related metrics.
1252	notification_size_metric: Option<Histogram>,
1253}
1254
1255#[async_trait::async_trait]
1256impl NotificationSenderT for NotificationSender {
1257	async fn ready(
1258		&self,
1259	) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1260		Ok(Box::new(NotificationSenderReady {
1261			ready: match self.sink.reserve_notification().await {
1262				Ok(r) => Some(r),
1263				Err(()) => return Err(NotificationSenderError::Closed),
1264			},
1265			peer_id: self.sink.peer_id(),
1266			protocol_name: &self.protocol_name,
1267			notification_size_metric: self.notification_size_metric.clone(),
1268		}))
1269	}
1270}
1271
1272/// Reserved slot in the notifications buffer, ready to accept data.
1273#[must_use]
1274pub struct NotificationSenderReady<'a> {
1275	ready: Option<Ready<'a>>,
1276
1277	/// Target of the notification.
1278	peer_id: &'a PeerId,
1279
1280	/// Name of the protocol on the wire.
1281	protocol_name: &'a ProtocolName,
1282
1283	/// Field extracted from the [`Metrics`] struct and necessary to report the
1284	/// notifications-related metrics.
1285	notification_size_metric: Option<Histogram>,
1286}
1287
1288impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1289	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1290		if let Some(notification_size_metric) = &self.notification_size_metric {
1291			notification_size_metric.observe(notification.len() as f64);
1292		}
1293
1294		trace!(
1295			target: LOG_TARGET,
1296			"External API => Notification({:?}, {}, {} bytes)",
1297			self.peer_id, self.protocol_name, notification.len(),
1298		);
1299		trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1300
1301		self.ready
1302			.take()
1303			.ok_or(NotificationSenderError::Closed)?
1304			.send(notification)
1305			.map_err(|()| NotificationSenderError::Closed)
1306	}
1307}
1308
1309/// Messages sent from the `NetworkService` to the `NetworkWorker`.
1310///
1311/// Each entry corresponds to a method of `NetworkService`.
1312enum ServiceToWorkerMsg {
1313	FindClosestPeers(PeerId),
1314	GetValue(KademliaKey),
1315	PutValue(KademliaKey, Vec<u8>),
1316	PutRecordTo {
1317		record: Record,
1318		peers: HashSet<sc_network_types::PeerId>,
1319		update_local_storage: bool,
1320	},
1321	StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1322	StartProviding(KademliaKey),
1323	StopProviding(KademliaKey),
1324	GetProviders(KademliaKey),
1325	AddKnownAddress(PeerId, Multiaddr),
1326	EventStream(out_events::Sender),
1327	Request {
1328		target: PeerId,
1329		protocol: ProtocolName,
1330		request: Vec<u8>,
1331		fallback_request: Option<(Vec<u8>, ProtocolName)>,
1332		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1333		connect: IfDisconnected,
1334	},
1335	NetworkStatus {
1336		pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1337	},
1338	NetworkState {
1339		pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1340	},
1341	DisconnectPeer(PeerId, ProtocolName),
1342}
1343
1344/// Main network worker. Must be polled in order for the network to advance.
1345///
1346/// You are encouraged to poll this in a separate background thread or task.
1347#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1348pub struct NetworkWorker<B, H>
1349where
1350	B: BlockT + 'static,
1351	H: ExHashT,
1352{
1353	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1354	listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1355	/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
1356	num_connected: Arc<AtomicUsize>,
1357	/// The network service that can be extracted and shared through the codebase.
1358	service: Arc<NetworkService<B, H>>,
1359	/// The *actual* network.
1360	network_service: Swarm<Behaviour<B>>,
1361	/// Messages from the [`NetworkService`] that must be processed.
1362	from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1363	/// Senders for events that happen on the network.
1364	event_streams: out_events::OutChannels,
1365	/// Prometheus network metrics.
1366	metrics: Option<Metrics>,
1367	/// The `PeerId`'s of all boot nodes mapped to the registered addresses.
1368	boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1369	/// Boot nodes that we already have reported as invalid.
1370	reported_invalid_boot_nodes: HashSet<PeerId>,
1371	/// Peer reputation store handle.
1372	peer_store_handle: Arc<dyn PeerStoreProvider>,
1373	/// Notification protocol handles.
1374	notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1375	/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
1376	/// compatibility.
1377	_marker: PhantomData<H>,
1378	/// Marker for block type
1379	_block: PhantomData<B>,
1380}
1381
1382impl<B, H> NetworkWorker<B, H>
1383where
1384	B: BlockT + 'static,
1385	H: ExHashT,
1386{
1387	/// Run the network.
1388	pub async fn run(mut self) {
1389		while self.next_action().await {}
1390	}
1391
1392	/// Perform one action on the network.
1393	///
1394	/// Returns `false` when the worker should be shutdown.
1395	/// Use in tests only.
1396	pub async fn next_action(&mut self) -> bool {
1397		futures::select! {
1398			// Next message from the service.
1399			msg = self.from_service.next() => {
1400				if let Some(msg) = msg {
1401					self.handle_worker_message(msg);
1402				} else {
1403					return false
1404				}
1405			},
1406			// Next event from `Swarm` (the stream guaranteed to never terminate).
1407			event = self.network_service.select_next_some() => {
1408				self.handle_swarm_event(event);
1409			},
1410		};
1411
1412		// Update the `num_connected` count shared with the `NetworkService`.
1413		let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1414		self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1415
1416		if let Some(metrics) = self.metrics.as_ref() {
1417			if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1418				for (lower_ilog2_bucket_bound, num_entries) in buckets {
1419					metrics
1420						.kbuckets_num_nodes
1421						.with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1422						.set(num_entries as u64);
1423				}
1424			}
1425			if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1426				metrics.kademlia_records_count.set(num_entries as u64);
1427			}
1428			if let Some(num_entries) =
1429				self.network_service.behaviour_mut().kademlia_records_total_size()
1430			{
1431				metrics.kademlia_records_sizes_total.set(num_entries as u64);
1432			}
1433
1434			metrics.pending_connections.set(
1435				Swarm::network_info(&self.network_service).connection_counters().num_pending()
1436					as u64,
1437			);
1438		}
1439
1440		true
1441	}
1442
1443	/// Process the next message coming from the `NetworkService`.
1444	fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1445		match msg {
1446			ServiceToWorkerMsg::FindClosestPeers(target) => {
1447				self.network_service.behaviour_mut().find_closest_peers(target)
1448			},
1449			ServiceToWorkerMsg::GetValue(key) => {
1450				self.network_service.behaviour_mut().get_value(key.into())
1451			},
1452			ServiceToWorkerMsg::PutValue(key, value) => {
1453				self.network_service.behaviour_mut().put_value(key.into(), value)
1454			},
1455			ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1456				.network_service
1457				.behaviour_mut()
1458				.put_record_to(record.into(), peers, update_local_storage),
1459			ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1460				.network_service
1461				.behaviour_mut()
1462				.store_record(key.into(), value, publisher, expires),
1463			ServiceToWorkerMsg::StartProviding(key) => {
1464				self.network_service.behaviour_mut().start_providing(key.into())
1465			},
1466			ServiceToWorkerMsg::StopProviding(key) => {
1467				self.network_service.behaviour_mut().stop_providing(&key.into())
1468			},
1469			ServiceToWorkerMsg::GetProviders(key) => {
1470				self.network_service.behaviour_mut().get_providers(key.into())
1471			},
1472			ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1473				self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1474			},
1475			ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1476			ServiceToWorkerMsg::Request {
1477				target,
1478				protocol,
1479				request,
1480				fallback_request,
1481				pending_response,
1482				connect,
1483			} => {
1484				self.network_service.behaviour_mut().send_request(
1485					&target,
1486					protocol,
1487					request,
1488					fallback_request,
1489					pending_response,
1490					connect,
1491				);
1492			},
1493			ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1494				let _ = pending_response.send(Ok(self.status()));
1495			},
1496			ServiceToWorkerMsg::NetworkState { pending_response } => {
1497				let _ = pending_response.send(Ok(self.network_state()));
1498			},
1499			ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1500				.network_service
1501				.behaviour_mut()
1502				.user_protocol_mut()
1503				.disconnect_peer(&who, protocol_name),
1504		}
1505	}
1506
1507	/// Process the next event coming from `Swarm`.
1508	fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1509		match event {
1510			SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1511				if let Some(metrics) = self.metrics.as_ref() {
1512					match result {
1513						Ok(serve_time) => {
1514							metrics
1515								.requests_in_success_total
1516								.with_label_values(&[&protocol])
1517								.observe(serve_time.as_secs_f64());
1518						},
1519						Err(err) => {
1520							let reason = match err {
1521								ResponseFailure::Network(InboundFailure::Timeout) => {
1522									Some("timeout")
1523								},
1524								ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1525								// `UnsupportedProtocols` is reported for every single
1526								// inbound request whenever a request with an unsupported
1527								// protocol is received. This is not reported in order to
1528								// avoid confusions.
1529								{
1530									None
1531								},
1532								ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1533									Some("busy-omitted")
1534								},
1535								ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1536									Some("connection-closed")
1537								},
1538								ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1539							};
1540
1541							if let Some(reason) = reason {
1542								metrics
1543									.requests_in_failure_total
1544									.with_label_values(&[&protocol, reason])
1545									.inc();
1546							}
1547						},
1548					}
1549				}
1550			},
1551			SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1552				protocol,
1553				duration,
1554				result,
1555				..
1556			}) => {
1557				if let Some(metrics) = self.metrics.as_ref() {
1558					match result {
1559						Ok(_) => {
1560							metrics
1561								.requests_out_success_total
1562								.with_label_values(&[&protocol])
1563								.observe(duration.as_secs_f64());
1564						},
1565						Err(err) => {
1566							let reason = match err {
1567								RequestFailure::NotConnected => "not-connected",
1568								RequestFailure::UnknownProtocol => "unknown-protocol",
1569								RequestFailure::Refused => "refused",
1570								RequestFailure::Obsolete => "obsolete",
1571								RequestFailure::Network(OutboundFailure::DialFailure) => {
1572									"dial-failure"
1573								},
1574								RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1575								RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1576									"connection-closed"
1577								},
1578								RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1579									"unsupported"
1580								},
1581								RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1582							};
1583
1584							metrics
1585								.requests_out_failure_total
1586								.with_label_values(&[&protocol, reason])
1587								.inc();
1588						},
1589					}
1590				}
1591			},
1592			SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1593				for change in changes {
1594					self.peer_store_handle.report_peer(peer.into(), change);
1595				}
1596			},
1597			SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1598				peer_id,
1599				info:
1600					IdentifyInfo {
1601						protocol_version, agent_version, mut listen_addrs, protocols, ..
1602					},
1603			}) => {
1604				if listen_addrs.len() > 30 {
1605					debug!(
1606						target: LOG_TARGET,
1607						"Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1608						peer_id, protocol_version, agent_version
1609					);
1610					listen_addrs.truncate(30);
1611				}
1612				for addr in listen_addrs {
1613					self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1614						&peer_id,
1615						&protocols,
1616						addr.clone(),
1617					);
1618				}
1619				self.peer_store_handle.add_known_peer(peer_id.into());
1620			},
1621			SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1622				self.peer_store_handle.add_known_peer(peer_id.into());
1623			},
1624			SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1625				if let Some(metrics) = self.metrics.as_ref() {
1626					metrics.kademlia_random_queries_total.inc();
1627				}
1628			},
1629			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1630				remote,
1631				set_id,
1632				direction,
1633				negotiated_fallback,
1634				notifications_sink,
1635				received_handshake,
1636			}) => {
1637				let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1638					remote,
1639					direction,
1640					received_handshake,
1641					negotiated_fallback,
1642					notifications_sink,
1643				);
1644			},
1645			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1646				remote,
1647				set_id,
1648				notifications_sink,
1649			}) => {
1650				let _ = self.notif_protocol_handles[usize::from(set_id)]
1651					.report_notification_sink_replaced(remote, notifications_sink);
1652
1653				// TODO: Notifications might have been lost as a result of the previous
1654				// connection being dropped, and as a result it would be preferable to notify
1655				// the users of this fact by simulating the substream being closed then
1656				// reopened.
1657				// The code below doesn't compile because `role` is unknown. Propagating the
1658				// handshake of the secondary connections is quite an invasive change and
1659				// would conflict with https://github.com/paritytech/substrate/issues/6403.
1660				// Considering that dropping notifications is generally regarded as
1661				// acceptable, this bug is at the moment intentionally left there and is
1662				// intended to be fixed at the same time as
1663				// https://github.com/paritytech/substrate/issues/6403.
1664				// self.event_streams.send(Event::NotificationStreamClosed {
1665				// remote,
1666				// protocol,
1667				// });
1668				// self.event_streams.send(Event::NotificationStreamOpened {
1669				// remote,
1670				// protocol,
1671				// role,
1672				// });
1673			},
1674			SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1675				let _ = self.notif_protocol_handles[usize::from(set_id)]
1676					.report_substream_closed(remote);
1677			},
1678			SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1679				remote,
1680				set_id,
1681				notification,
1682			}) => {
1683				let _ = self.notif_protocol_handles[usize::from(set_id)]
1684					.report_notification_received(remote, notification);
1685			},
1686			SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1687				match (self.metrics.as_ref(), duration) {
1688					(Some(metrics), Some(duration)) => {
1689						let query_type = match event {
1690							DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1691							DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1692							DhtEvent::ValueFound(_) => "value-found",
1693							DhtEvent::ValueNotFound(_) => "value-not-found",
1694							DhtEvent::ValuePut(_) => "value-put",
1695							DhtEvent::ValuePutFailed(_) => "value-put-failed",
1696							DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1697							DhtEvent::StartedProviding(_) => "started-providing",
1698							DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1699							DhtEvent::ProvidersFound(_, _) => "providers-found",
1700							DhtEvent::NoMoreProviders(_) => "no-more-providers",
1701							DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1702						};
1703						metrics
1704							.kademlia_query_duration
1705							.with_label_values(&[query_type])
1706							.observe(duration.as_secs_f64());
1707					},
1708					_ => {},
1709				}
1710
1711				self.event_streams.send(Event::Dht(event));
1712			},
1713			SwarmEvent::Behaviour(BehaviourOut::None) => {
1714				// Ignored event from lower layers.
1715			},
1716			SwarmEvent::ConnectionEstablished {
1717				peer_id,
1718				endpoint,
1719				num_established,
1720				concurrent_dial_errors,
1721				..
1722			} => {
1723				if let Some(errors) = concurrent_dial_errors {
1724					debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1725				} else {
1726					debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1727				}
1728
1729				if let Some(metrics) = self.metrics.as_ref() {
1730					let direction = match endpoint {
1731						ConnectedPoint::Dialer { .. } => "out",
1732						ConnectedPoint::Listener { .. } => "in",
1733					};
1734					metrics.connections_opened_total.with_label_values(&[direction]).inc();
1735
1736					if num_established.get() == 1 {
1737						metrics.distinct_peers_connections_opened_total.inc();
1738					}
1739				}
1740			},
1741			SwarmEvent::ConnectionClosed {
1742				connection_id,
1743				peer_id,
1744				cause,
1745				endpoint,
1746				num_established,
1747			} => {
1748				debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1749				if let Some(metrics) = self.metrics.as_ref() {
1750					let direction = match endpoint {
1751						ConnectedPoint::Dialer { .. } => "out",
1752						ConnectedPoint::Listener { .. } => "in",
1753					};
1754					let reason = match cause {
1755						Some(ConnectionError::IO(_)) => "transport-error",
1756						Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1757						None => "actively-closed",
1758					};
1759					metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1760
1761					// `num_established` represents the number of *remaining* connections.
1762					if num_established == 0 {
1763						metrics.distinct_peers_connections_closed_total.inc();
1764					}
1765				}
1766			},
1767			SwarmEvent::NewListenAddr { address, .. } => {
1768				trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1769				if let Some(metrics) = self.metrics.as_ref() {
1770					metrics.listeners_local_addresses.inc();
1771				}
1772				self.listen_addresses.lock().insert(address.clone());
1773			},
1774			SwarmEvent::ExpiredListenAddr { address, .. } => {
1775				info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1776				if let Some(metrics) = self.metrics.as_ref() {
1777					metrics.listeners_local_addresses.dec();
1778				}
1779				self.listen_addresses.lock().remove(&address);
1780			},
1781			SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1782				if let Some(peer_id) = peer_id {
1783					trace!(
1784						target: LOG_TARGET,
1785						"Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1786					);
1787
1788					let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1789
1790					if let Some(addresses) =
1791						not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1792					{
1793						if let DialError::WrongPeerId { obtained, endpoint } = &error {
1794							if let ConnectedPoint::Dialer {
1795								address,
1796								role_override: _,
1797								port_use: _,
1798							} = endpoint
1799							{
1800								let address_without_peer_id = parse_addr(address.clone().into())
1801									.map_or_else(|_| address.clone(), |r| r.1.into());
1802
1803								// Only report for address of boot node that was added at startup of
1804								// the node and not for any address that the node learned of the
1805								// boot node.
1806								if addresses.iter().any(|a| address_without_peer_id == *a) {
1807									warn!(
1808										"💔 The bootnode you want to connect to at `{address}` provided a \
1809										 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1810									);
1811
1812									self.reported_invalid_boot_nodes.insert(peer_id);
1813								}
1814							}
1815						}
1816					}
1817				}
1818
1819				if let Some(metrics) = self.metrics.as_ref() {
1820					let reason = match error {
1821						DialError::Denied { cause } => {
1822							if cause.downcast::<Exceeded>().is_ok() {
1823								Some("limit-reached")
1824							} else {
1825								None
1826							}
1827						},
1828						DialError::LocalPeerId { .. } => Some("local-peer-id"),
1829						DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1830						DialError::Transport(_) => Some("transport-error"),
1831						DialError::NoAddresses |
1832						DialError::DialPeerConditionFalse(_) |
1833						DialError::Aborted => None, // ignore them
1834					};
1835					if let Some(reason) = reason {
1836						metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1837					}
1838				}
1839			},
1840			SwarmEvent::Dialing { connection_id, peer_id } => {
1841				trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1842			},
1843			SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1844				trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1845				if let Some(metrics) = self.metrics.as_ref() {
1846					metrics.incoming_connections_total.inc();
1847				}
1848			},
1849			SwarmEvent::IncomingConnectionError {
1850				connection_id,
1851				local_addr,
1852				send_back_addr,
1853				error,
1854			} => {
1855				debug!(
1856					target: LOG_TARGET,
1857					"Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1858				);
1859				if let Some(metrics) = self.metrics.as_ref() {
1860					let reason = match error {
1861						ListenError::Denied { cause } => {
1862							if cause.downcast::<Exceeded>().is_ok() {
1863								Some("limit-reached")
1864							} else {
1865								None
1866							}
1867						},
1868						ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1869							Some("invalid-peer-id")
1870						},
1871						ListenError::Transport(_) => Some("transport-error"),
1872						ListenError::Aborted => None, // ignore it
1873					};
1874
1875					if let Some(reason) = reason {
1876						metrics
1877							.incoming_connections_errors_total
1878							.with_label_values(&[reason])
1879							.inc();
1880					}
1881				}
1882			},
1883			SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1884				if let Some(metrics) = self.metrics.as_ref() {
1885					metrics.listeners_local_addresses.sub(addresses.len() as u64);
1886				}
1887				let mut listen_addresses = self.listen_addresses.lock();
1888				for addr in &addresses {
1889					listen_addresses.remove(addr);
1890				}
1891				drop(listen_addresses);
1892
1893				let addrs =
1894					addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1895				match reason {
1896					Ok(()) => error!(
1897						target: LOG_TARGET,
1898						"📪 Libp2p listener ({}) closed gracefully",
1899						addrs
1900					),
1901					Err(e) => error!(
1902						target: LOG_TARGET,
1903						"📪 Libp2p listener ({}) closed: {}",
1904						addrs, e
1905					),
1906				}
1907			},
1908			SwarmEvent::ListenerError { error, .. } => {
1909				debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1910				if let Some(metrics) = self.metrics.as_ref() {
1911					metrics.listeners_errors_total.inc();
1912				}
1913			},
1914			SwarmEvent::NewExternalAddrCandidate { address } => {
1915				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1916			},
1917			SwarmEvent::ExternalAddrConfirmed { address } => {
1918				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1919			},
1920			SwarmEvent::ExternalAddrExpired { address } => {
1921				trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1922			},
1923			SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1924				trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1925			},
1926			event => {
1927				warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1928			},
1929		}
1930	}
1931}
1932
1933impl<B, H> Unpin for NetworkWorker<B, H>
1934where
1935	B: BlockT + 'static,
1936	H: ExHashT,
1937{
1938}
1939
1940pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1941	addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1942	transport: &TransportConfig,
1943) -> Result<(), Error> {
1944	use sc_network_types::multiaddr::Protocol;
1945
1946	if matches!(transport, TransportConfig::MemoryOnly) {
1947		let addresses: Vec<_> = addresses
1948			.filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1949			.cloned()
1950			.collect();
1951
1952		if !addresses.is_empty() {
1953			return Err(Error::AddressesForAnotherTransport {
1954				transport: transport.clone(),
1955				addresses,
1956			});
1957		}
1958	} else {
1959		let addresses: Vec<_> = addresses
1960			.filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1961			.cloned()
1962			.collect();
1963
1964		if !addresses.is_empty() {
1965			return Err(Error::AddressesForAnotherTransport {
1966				transport: transport.clone(),
1967				addresses,
1968			});
1969		}
1970	}
1971
1972	Ok(())
1973}