sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	service::{
43		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44		out_events,
45		traits::{BandwidthSink, NetworkBackend, NetworkService},
46	},
47	NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53	config::ConfigBuilder,
54	crypto::ed25519::Keypair,
55	error::{DialError, NegotiationError},
56	executor::Executor,
57	protocol::{
58		libp2p::{
59			bitswap::Config as BitswapConfig,
60			kademlia::{QueryId, Record},
61		},
62		request_response::ConfigBuilder as RequestResponseConfigBuilder,
63	},
64	transport::{
65		tcp::config::Config as TcpTransportConfig,
66		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67	},
68	types::{
69		multiaddr::{Multiaddr, Protocol},
70		ConnectionId,
71	},
72	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84	cmp,
85	collections::{hash_map::Entry, HashMap, HashSet},
86	fs,
87	future::Future,
88	iter,
89	pin::Pin,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Timeout for connection waiting new substreams.
103const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
104
105/// Litep2p bandwidth sink.
106struct Litep2pBandwidthSink {
107	sink: litep2p::BandwidthSink,
108}
109
110impl BandwidthSink for Litep2pBandwidthSink {
111	fn total_inbound(&self) -> u64 {
112		self.sink.inbound() as u64
113	}
114
115	fn total_outbound(&self) -> u64 {
116		self.sink.outbound() as u64
117	}
118}
119
120/// Litep2p task executor.
121struct Litep2pExecutor {
122	/// Executor.
123	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
124}
125
126impl Executor for Litep2pExecutor {
127	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
128		(self.executor)(future)
129	}
130
131	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
132		(self.executor)(future)
133	}
134}
135
136/// Logging target for the file.
137const LOG_TARGET: &str = "sub-libp2p";
138
139/// Peer context.
140struct ConnectionContext {
141	/// Peer endpoints.
142	endpoints: HashMap<ConnectionId, Endpoint>,
143
144	/// Number of active connections.
145	num_connections: usize,
146}
147
148/// Kademlia query we are tracking.
149#[derive(Debug)]
150enum KadQuery {
151	/// `GET_VALUE` query for key and when it was initiated.
152	GetValue(RecordKey, Instant),
153	/// `PUT_VALUE` query for key and when it was initiated.
154	PutValue(RecordKey, Instant),
155	/// `GET_PROVIDERS` query for key and when it was initiated.
156	GetProviders(RecordKey, Instant),
157}
158
159/// Networking backend for `litep2p`.
160pub struct Litep2pNetworkBackend {
161	/// Main `litep2p` object.
162	litep2p: Litep2p,
163
164	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
165	network_service: Arc<dyn NetworkService>,
166
167	/// RX channel for receiving commands from `Litep2pNetworkService`.
168	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
169
170	/// `Peerset` handles to notification protocols.
171	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
172
173	/// Pending Kademlia queries.
174	pending_queries: HashMap<QueryId, KadQuery>,
175
176	/// Discovery.
177	discovery: Discovery,
178
179	/// Number of connected peers.
180	num_connected: Arc<AtomicUsize>,
181
182	/// Connected peers.
183	peers: HashMap<litep2p::PeerId, ConnectionContext>,
184
185	/// Peerstore.
186	peerstore_handle: Arc<dyn PeerStoreProvider>,
187
188	/// Block announce protocol name.
189	block_announce_protocol: ProtocolName,
190
191	/// Sender for DHT events.
192	event_streams: out_events::OutChannels,
193
194	/// Prometheus metrics.
195	metrics: Option<Metrics>,
196}
197
198impl Litep2pNetworkBackend {
199	/// From an iterator of multiaddress(es), parse and group all addresses of peers
200	/// so that litep2p can consume the information easily.
201	fn parse_addresses(
202		addresses: impl Iterator<Item = Multiaddr>,
203	) -> HashMap<PeerId, Vec<Multiaddr>> {
204		addresses
205			.into_iter()
206			.filter_map(|address| match address.iter().next() {
207				Some(
208					Protocol::Dns(_) |
209					Protocol::Dns4(_) |
210					Protocol::Dns6(_) |
211					Protocol::Ip6(_) |
212					Protocol::Ip4(_),
213				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
214				{
215					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
216						.map_or(None, |peer| Some((peer, Some(address)))),
217					_ => None,
218				},
219				Some(Protocol::P2p(multihash)) =>
220					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
221				_ => None,
222			})
223			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
224				let entry = acc.entry(peer).or_default();
225				maybe_address.map(|address| entry.push(address));
226
227				acc
228			})
229	}
230
231	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
232	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
233		Self::parse_addresses(peers.into_iter())
234			.into_iter()
235			.filter_map(|(peer, addresses)| {
236				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
237				if addresses.is_empty() {
238					return Some(peer)
239				}
240
241				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
242					log::warn!(
243						target: LOG_TARGET,
244						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
245					);
246					return None
247				}
248
249				self.peerstore_handle.add_known_peer(peer);
250				Some(peer)
251			})
252			.collect()
253	}
254}
255
256impl Litep2pNetworkBackend {
257	/// Get `litep2p` keypair from `NodeKeyConfig`.
258	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
259		let secret: litep2p::crypto::ed25519::SecretKey =
260			node_key.clone().into_keypair()?.secret().into();
261
262		let local_identity = Keypair::from(secret);
263		let local_public = local_identity.public();
264		let local_peer_id = local_public.to_peer_id();
265
266		Ok((local_identity, local_peer_id))
267	}
268
269	/// Configure transport protocols for `Litep2pNetworkBackend`.
270	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
271		config: &FullNetworkConfiguration<B, H, Self>,
272	) -> ConfigBuilder {
273		let _ = match config.network_config.transport {
274			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
275			TransportConfig::Normal { .. } => false,
276		};
277		let config_builder = ConfigBuilder::new();
278
279		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
280			.network_config
281			.listen_addresses
282			.iter()
283			.filter_map(|address| {
284				use sc_network_types::multiaddr::Protocol;
285
286				let mut iter = address.iter();
287
288				match iter.next() {
289					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
290					protocol => {
291						log::error!(
292							target: LOG_TARGET,
293							"unknown protocol {protocol:?}, ignoring {address:?}",
294						);
295
296						return None
297					},
298				}
299
300				match iter.next() {
301					Some(Protocol::Tcp(_)) => match iter.next() {
302						Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
303							Some((None, Some(address.clone()))),
304						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
305						protocol => {
306							log::error!(
307								target: LOG_TARGET,
308								"unknown protocol {protocol:?}, ignoring {address:?}",
309							);
310							None
311						},
312					},
313					protocol => {
314						log::error!(
315							target: LOG_TARGET,
316							"unknown protocol {protocol:?}, ignoring {address:?}",
317						);
318						None
319					},
320				}
321			})
322			.unzip();
323
324		config_builder
325			.with_websocket(WebSocketTransportConfig {
326				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
327				yamux_config: litep2p::yamux::Config::default(),
328				nodelay: true,
329				..Default::default()
330			})
331			.with_tcp(TcpTransportConfig {
332				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
333				yamux_config: litep2p::yamux::Config::default(),
334				nodelay: true,
335				..Default::default()
336			})
337	}
338}
339
340#[async_trait::async_trait]
341impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
342	type NotificationProtocolConfig = NotificationProtocolConfig;
343	type RequestResponseProtocolConfig = RequestResponseConfig;
344	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
345	type PeerStore = Peerstore;
346	type BitswapConfig = BitswapConfig;
347
348	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
349	where
350		Self: Sized,
351	{
352		let (keypair, local_peer_id) =
353			Self::get_keypair(&params.network_config.network_config.node_key)?;
354		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
355
356		params.network_config.network_config.boot_nodes = params
357			.network_config
358			.network_config
359			.boot_nodes
360			.into_iter()
361			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
362			.collect();
363		params.network_config.network_config.default_peers_set.reserved_nodes = params
364			.network_config
365			.network_config
366			.default_peers_set
367			.reserved_nodes
368			.into_iter()
369			.filter(|reserved_node| {
370				if reserved_node.peer_id == local_peer_id.into() {
371					log::warn!(
372						target: LOG_TARGET,
373						"Local peer ID used in reserved node, ignoring: {reserved_node}",
374					);
375					false
376				} else {
377					true
378				}
379			})
380			.collect();
381
382		if let Some(path) = &params.network_config.network_config.net_config_path {
383			fs::create_dir_all(path)?;
384		}
385
386		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
387		log::info!(target: LOG_TARGET, "Running litep2p network backend");
388
389		params.network_config.sanity_check_addresses()?;
390		params.network_config.sanity_check_bootnodes()?;
391
392		let mut config_builder =
393			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
394		let known_addresses = params.network_config.known_addresses();
395		let peer_store_handle = params.network_config.peer_store_handle();
396		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
397
398		let FullNetworkConfiguration {
399			notification_protocols,
400			request_response_protocols,
401			network_config,
402			..
403		} = params.network_config;
404
405		// initialize notification protocols
406		//
407		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
408		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
409		// of Polkadot SDK to control connectivity of the notification protocol
410		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
411		let mut notif_protocols = HashMap::from_iter([(
412			params.block_announce_config.protocol_name().clone(),
413			params.block_announce_config.handle,
414		)]);
415
416		// handshake for all but the syncing protocol is set to node role
417		config_builder = notification_protocols
418			.into_iter()
419			.fold(config_builder, |config_builder, mut config| {
420				config.config.set_handshake(Roles::from(&params.role).encode());
421				notif_protocols.insert(config.protocol_name, config.handle);
422
423				config_builder.with_notification_protocol(config.config)
424			})
425			.with_notification_protocol(params.block_announce_config.config);
426
427		// initialize request-response protocols
428		let metrics = match &params.metrics_registry {
429			Some(registry) => Some(register_without_sources(registry)?),
430			None => None,
431		};
432
433		// create channels that are used to send request before initializing protocols so the
434		// senders can be passed onto all request-response protocols
435		//
436		// all protocols must have each others' senders so they can send the fallback request in
437		// case the main protocol is not supported by the remote peer and user specified a fallback
438		let (mut request_response_receivers, request_response_senders): (
439			HashMap<_, _>,
440			HashMap<_, _>,
441		) = request_response_protocols
442			.iter()
443			.map(|config| {
444				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
445				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
446			})
447			.unzip();
448
449		config_builder = request_response_protocols.into_iter().fold(
450			config_builder,
451			|config_builder, config| {
452				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
453					Litep2pProtocolName::from(config.protocol_name.clone()),
454				)
455				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
456				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
457				.with_timeout(config.request_timeout)
458				.build();
459
460				let protocol = RequestResponseProtocol::new(
461					config.protocol_name.clone(),
462					handle,
463					Arc::clone(&peer_store_handle),
464					config.inbound_queue,
465					request_response_receivers
466						.remove(&config.protocol_name)
467						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
468					request_response_senders.clone(),
469					metrics.clone(),
470				);
471
472				executor.run(Box::pin(async move {
473					protocol.run().await;
474				}));
475
476				config_builder.with_request_response_protocol(protocol_config)
477			},
478		);
479
480		// collect known addresses
481		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
482			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
483				use sc_network_types::multiaddr::Protocol;
484
485				let address = match address.iter().last() {
486					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
487						address.with(Protocol::P2p(peer.into())),
488					Some(Protocol::P2p(_)) => address,
489					_ => return acc,
490				};
491
492				acc.entry(peer.into()).or_default().push(address.into());
493				peer_store_handle.add_known_peer(peer);
494
495				acc
496			});
497
498		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
499		let listen_addresses = Arc::new(Default::default());
500		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
501			Discovery::new(
502				local_peer_id,
503				&network_config,
504				params.genesis_hash,
505				params.fork_id.as_deref(),
506				&params.protocol_id,
507				known_addresses.clone(),
508				Arc::clone(&listen_addresses),
509				Arc::clone(&peer_store_handle),
510			);
511
512		config_builder = config_builder
513			.with_known_addresses(known_addresses.clone().into_iter())
514			.with_libp2p_ping(ping_config)
515			.with_libp2p_identify(identify_config)
516			.with_libp2p_kademlia(kademlia_config)
517			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
518				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
519			))
520			// This has the same effect as `libp2p::Swarm::with_idle_connection_timeout` which is
521			// set to 10 seconds as well.
522			.with_keep_alive_timeout(KEEP_ALIVE_TIMEOUT)
523			.with_executor(executor);
524
525		if let Some(config) = maybe_mdns_config {
526			config_builder = config_builder.with_mdns(config);
527		}
528
529		if let Some(config) = params.bitswap_config {
530			config_builder = config_builder.with_libp2p_bitswap(config);
531		}
532
533		let litep2p =
534			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
535
536		litep2p.listen_addresses().for_each(|address| {
537			log::debug!(target: LOG_TARGET, "listening on: {address}");
538
539			listen_addresses.write().insert(address.clone());
540		});
541
542		let public_addresses = litep2p.public_addresses();
543		for address in network_config.public_addresses.iter() {
544			if let Err(err) = public_addresses.add_address(address.clone().into()) {
545				log::warn!(
546					target: LOG_TARGET,
547					"failed to add public address {address:?}: {err:?}",
548				);
549			}
550		}
551
552		let network_service = Arc::new(Litep2pNetworkService::new(
553			local_peer_id,
554			keypair.clone(),
555			cmd_tx,
556			Arc::clone(&peer_store_handle),
557			notif_protocols.clone(),
558			block_announce_protocol.clone(),
559			request_response_senders,
560			Arc::clone(&listen_addresses),
561			public_addresses,
562		));
563
564		// register rest of the metrics now that `Litep2p` has been created
565		let num_connected = Arc::new(Default::default());
566		let bandwidth: Arc<dyn BandwidthSink> =
567			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
568
569		if let Some(registry) = &params.metrics_registry {
570			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
571		}
572
573		Ok(Self {
574			network_service,
575			cmd_rx,
576			metrics,
577			peerset_handles: notif_protocols,
578			num_connected,
579			discovery,
580			pending_queries: HashMap::new(),
581			peerstore_handle: peer_store_handle,
582			block_announce_protocol,
583			event_streams: out_events::OutChannels::new(None)?,
584			peers: HashMap::new(),
585			litep2p,
586		})
587	}
588
589	fn network_service(&self) -> Arc<dyn NetworkService> {
590		Arc::clone(&self.network_service)
591	}
592
593	fn peer_store(
594		bootnodes: Vec<sc_network_types::PeerId>,
595		metrics_registry: Option<Registry>,
596	) -> Self::PeerStore {
597		Peerstore::new(bootnodes, metrics_registry)
598	}
599
600	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
601		NotificationMetrics::new(registry)
602	}
603
604	/// Create Bitswap server.
605	fn bitswap_server(
606		client: Arc<dyn BlockBackend<B> + Send + Sync>,
607	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
608		BitswapServer::new(client)
609	}
610
611	/// Create notification protocol configuration for `protocol`.
612	fn notification_config(
613		protocol_name: ProtocolName,
614		fallback_names: Vec<ProtocolName>,
615		max_notification_size: u64,
616		handshake: Option<NotificationHandshake>,
617		set_config: SetConfig,
618		metrics: NotificationMetrics,
619		peerstore_handle: Arc<dyn PeerStoreProvider>,
620	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
621		Self::NotificationProtocolConfig::new(
622			protocol_name,
623			fallback_names,
624			max_notification_size as usize,
625			handshake,
626			set_config,
627			metrics,
628			peerstore_handle,
629		)
630	}
631
632	/// Create request-response protocol configuration.
633	fn request_response_config(
634		protocol_name: ProtocolName,
635		fallback_names: Vec<ProtocolName>,
636		max_request_size: u64,
637		max_response_size: u64,
638		request_timeout: Duration,
639		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
640	) -> Self::RequestResponseProtocolConfig {
641		Self::RequestResponseProtocolConfig::new(
642			protocol_name,
643			fallback_names,
644			max_request_size,
645			max_response_size,
646			request_timeout,
647			inbound_queue,
648		)
649	}
650
651	/// Start [`Litep2pNetworkBackend`] event loop.
652	async fn run(mut self) {
653		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
654
655		loop {
656			let num_connected_peers = self
657				.peerset_handles
658				.get(&self.block_announce_protocol)
659				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
660			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
661
662			tokio::select! {
663				command = self.cmd_rx.next() => match command {
664					None => return,
665					Some(command) => match command {
666						NetworkServiceCommand::GetValue{ key } => {
667							let query_id = self.discovery.get_value(key.clone()).await;
668							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
669						}
670						NetworkServiceCommand::PutValue { key, value } => {
671							let query_id = self.discovery.put_value(key.clone(), value).await;
672							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
673						}
674						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
675							let kademlia_key = record.key.clone();
676							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
677							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
678						}
679						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
680							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
681						}
682						NetworkServiceCommand::StartProviding { key } => {
683							self.discovery.start_providing(key).await;
684						}
685						NetworkServiceCommand::StopProviding { key } => {
686							self.discovery.stop_providing(key).await;
687						}
688						NetworkServiceCommand::GetProviders { key } => {
689							let query_id = self.discovery.get_providers(key.clone()).await;
690							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
691						}
692						NetworkServiceCommand::EventStream { tx } => {
693							self.event_streams.push(tx);
694						}
695						NetworkServiceCommand::Status { tx } => {
696							let _ = tx.send(NetworkStatus {
697								num_connected_peers: self
698									.peerset_handles
699									.get(&self.block_announce_protocol)
700									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
701								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
702								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
703							});
704						}
705						NetworkServiceCommand::AddPeersToReservedSet {
706							protocol,
707							peers,
708						} => {
709							let peers = self.add_addresses(peers.into_iter().map(Into::into));
710
711							match self.peerset_handles.get(&protocol) {
712								Some(handle) => {
713									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
714								}
715								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
716							};
717						}
718						NetworkServiceCommand::AddKnownAddress { peer, address } => {
719							let mut address: Multiaddr = address.into();
720
721							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
722								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
723							}
724
725							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
726								log::debug!(
727									target: LOG_TARGET,
728									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
729								);
730							}
731						},
732						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
733							let peers = self.add_addresses(peers.into_iter().map(Into::into));
734
735							match self.peerset_handles.get(&protocol) {
736								Some(handle) => {
737									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
738								}
739								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
740							}
741
742						},
743						NetworkServiceCommand::DisconnectPeer {
744							protocol,
745							peer,
746						} => {
747							let Some(handle) = self.peerset_handles.get(&protocol) else {
748								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
749								continue
750							};
751
752							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
753						}
754						NetworkServiceCommand::SetReservedOnly {
755							protocol,
756							reserved_only,
757						} => {
758							let Some(handle) = self.peerset_handles.get(&protocol) else {
759								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
760								continue
761							};
762
763							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
764						}
765						NetworkServiceCommand::RemoveReservedPeers {
766							protocol,
767							peers,
768						} => {
769							let Some(handle) = self.peerset_handles.get(&protocol) else {
770								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
771								continue
772							};
773
774							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
775						}
776					}
777				},
778				event = self.discovery.next() => match event {
779					None => return,
780					Some(DiscoveryEvent::Discovered { addresses }) => {
781						// if at least one address was added for the peer, report the peer to `Peerstore`
782						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
783							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
784								self.peerstore_handle.add_known_peer(peer);
785							}
786						}
787					}
788					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
789						for peer in peers {
790							self.peerstore_handle.add_known_peer(peer.into());
791						}
792					}
793					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
794						if !self.pending_queries.contains_key(&query_id) {
795							log::error!(
796								target: LOG_TARGET,
797								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
798							);
799
800							continue
801						}
802
803						let peer_id: sc_network_types::PeerId = record.peer.into();
804						let record = PeerRecord {
805							record: P2PRecord {
806								key: record.record.key.to_vec().into(),
807								value: record.record.value,
808								publisher: record.record.publisher.map(|peer_id| {
809									let peer_id: sc_network_types::PeerId = peer_id.into();
810									peer_id.into()
811								}),
812								expires: record.record.expires,
813							},
814							peer: Some(peer_id.into()),
815						};
816
817						self.event_streams.send(
818							Event::Dht(
819								DhtEvent::ValueFound(
820									record.into()
821								)
822							)
823						);
824					}
825					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
826						match self.pending_queries.remove(&query_id) {
827							Some(KadQuery::GetValue(key, started)) => {
828								log::trace!(
829									target: LOG_TARGET,
830									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
831								);
832
833								if let Some(ref metrics) = self.metrics {
834									metrics
835										.kademlia_query_duration
836										.with_label_values(&["value-get"])
837										.observe(started.elapsed().as_secs_f64());
838								}
839							},
840							query => {
841								log::error!(
842									target: LOG_TARGET,
843									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
844								);
845								debug_assert!(false);
846							},
847						}
848					}
849					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
850						match self.pending_queries.remove(&query_id) {
851							Some(KadQuery::PutValue(key, started)) => {
852								log::trace!(
853									target: LOG_TARGET,
854									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
855								);
856
857								self.event_streams.send(Event::Dht(
858									DhtEvent::ValuePut(key)
859								));
860
861								if let Some(ref metrics) = self.metrics {
862									metrics
863										.kademlia_query_duration
864										.with_label_values(&["value-put"])
865										.observe(started.elapsed().as_secs_f64());
866								}
867							},
868							query => {
869								log::error!(
870									target: LOG_TARGET,
871									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
872								);
873								debug_assert!(false);
874							}
875						}
876					}
877					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
878						match self.pending_queries.remove(&query_id) {
879							Some(KadQuery::GetProviders(key, started)) => {
880								log::trace!(
881									target: LOG_TARGET,
882									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
883								);
884
885								self.event_streams.send(Event::Dht(
886									DhtEvent::ProvidersFound(
887										key.into(),
888										providers.into_iter().map(|p| p.peer.into()).collect()
889									)
890								));
891
892								if let Some(ref metrics) = self.metrics {
893									metrics
894										.kademlia_query_duration
895										.with_label_values(&["providers-get"])
896										.observe(started.elapsed().as_secs_f64());
897								}
898							},
899							query => {
900								log::error!(
901									target: LOG_TARGET,
902									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
903								);
904								debug_assert!(false);
905							}
906						}
907					}
908					Some(DiscoveryEvent::QueryFailed { query_id }) => {
909						match self.pending_queries.remove(&query_id) {
910							Some(KadQuery::GetValue(key, started)) => {
911								log::debug!(
912									target: LOG_TARGET,
913									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
914								);
915
916								self.event_streams.send(Event::Dht(
917									DhtEvent::ValueNotFound(key)
918								));
919
920								if let Some(ref metrics) = self.metrics {
921									metrics
922										.kademlia_query_duration
923										.with_label_values(&["value-get-failed"])
924										.observe(started.elapsed().as_secs_f64());
925								}
926							},
927							Some(KadQuery::PutValue(key, started)) => {
928								log::debug!(
929									target: LOG_TARGET,
930									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
931								);
932
933								self.event_streams.send(Event::Dht(
934									DhtEvent::ValuePutFailed(key)
935								));
936
937								if let Some(ref metrics) = self.metrics {
938									metrics
939										.kademlia_query_duration
940										.with_label_values(&["value-put-failed"])
941										.observe(started.elapsed().as_secs_f64());
942								}
943							},
944							Some(KadQuery::GetProviders(key, started)) => {
945								log::debug!(
946									target: LOG_TARGET,
947									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
948								);
949
950								self.event_streams.send(Event::Dht(
951									DhtEvent::ProvidersNotFound(key)
952								));
953
954								if let Some(ref metrics) = self.metrics {
955									metrics
956										.kademlia_query_duration
957										.with_label_values(&["providers-get-failed"])
958										.observe(started.elapsed().as_secs_f64());
959								}
960							},
961							None => {
962								log::warn!(
963									target: LOG_TARGET,
964									"non-existent query failed ({query_id:?})",
965								);
966							}
967						}
968					}
969					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
970						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
971					}
972					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
973						match self.litep2p.public_addresses().add_address(address.clone().into()) {
974							Ok(inserted) => if inserted {
975								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
976							},
977							Err(err) => {
978								log::warn!(
979									target: LOG_TARGET,
980									"🔍 Failed to add discovered external address {address:?}: {err:?}",
981								);
982							},
983						}
984					}
985					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
986						let local_peer_id = self.litep2p.local_peer_id();
987
988						// Litep2p requires the peer ID to be present in the address.
989						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
990							address.with(Protocol::P2p(*local_peer_id.as_ref()))
991						} else {
992							address
993						};
994
995						if self.litep2p.public_addresses().remove_address(&address) {
996							log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
997						} else {
998							log::warn!(
999								target: LOG_TARGET,
1000								"🔍 Failed to remove expired external address {address:?}"
1001							);
1002						}
1003					}
1004					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1005						log::trace!(
1006							target: LOG_TARGET,
1007							"ping time with {peer:?}: {rtt:?}",
1008						);
1009					}
1010					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1011						self.event_streams.send(Event::Dht(
1012							DhtEvent::PutRecordRequest(
1013								key.into(),
1014								value,
1015								publisher.map(Into::into),
1016								expires,
1017							)
1018						));
1019					},
1020
1021					Some(DiscoveryEvent::RandomKademliaStarted) => {
1022						if let Some(metrics) = self.metrics.as_ref() {
1023							metrics.kademlia_random_queries_total.inc();
1024						}
1025					}
1026				},
1027				event = self.litep2p.next_event() => match event {
1028					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1029						let Some(metrics) = &self.metrics else {
1030							continue;
1031						};
1032
1033						let direction = match endpoint {
1034							Endpoint::Dialer { .. } => "out",
1035							Endpoint::Listener { .. } => {
1036								// Increment incoming connections counter.
1037								//
1038								// Note: For litep2p these are represented by established negotiated connections,
1039								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1040								metrics.incoming_connections_total.inc();
1041
1042								"in"
1043							},
1044						};
1045						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1046
1047						match self.peers.entry(peer) {
1048							Entry::Vacant(entry) => {
1049								entry.insert(ConnectionContext {
1050									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1051									num_connections: 1usize,
1052								});
1053								metrics.distinct_peers_connections_opened_total.inc();
1054							}
1055							Entry::Occupied(entry) => {
1056								let entry = entry.into_mut();
1057								entry.num_connections += 1;
1058								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1059							}
1060						}
1061					}
1062					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1063						let Some(metrics) = &self.metrics else {
1064							continue;
1065						};
1066
1067						let Some(context) = self.peers.get_mut(&peer) else {
1068							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1069							continue
1070						};
1071
1072						let direction = match context.endpoints.remove(&connection_id) {
1073							None => {
1074								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1075								continue
1076							}
1077							Some(endpoint) => {
1078								context.num_connections -= 1;
1079
1080								match endpoint {
1081									Endpoint::Dialer { .. } => "out",
1082									Endpoint::Listener { .. } => "in",
1083								}
1084							}
1085						};
1086
1087						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1088
1089						if context.num_connections == 0 {
1090							self.peers.remove(&peer);
1091							metrics.distinct_peers_connections_closed_total.inc();
1092						}
1093					}
1094					Some(Litep2pEvent::DialFailure { address, error }) => {
1095						log::debug!(
1096							target: LOG_TARGET,
1097							"failed to dial peer at {address:?}: {error:?}",
1098						);
1099
1100						if let Some(metrics) = &self.metrics {
1101							let reason = match error {
1102								DialError::Timeout => "timeout",
1103								DialError::AddressError(_) => "invalid-address",
1104								DialError::DnsError(_) => "cannot-resolve-dns",
1105								DialError::NegotiationError(error) => match error {
1106									NegotiationError::Timeout => "timeout",
1107									NegotiationError::PeerIdMissing => "missing-peer-id",
1108									NegotiationError::StateMismatch => "state-mismatch",
1109									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1110									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1111									NegotiationError::SnowError(_) => "noise-error",
1112									NegotiationError::ParseError(_) => "parse-error",
1113									NegotiationError::IoError(_) => "io-error",
1114									NegotiationError::WebSocket(_) => "webscoket-error",
1115									NegotiationError::BadSignature => "bad-signature",
1116								}
1117							};
1118
1119							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1120						}
1121					}
1122					Some(Litep2pEvent::ListDialFailures { errors }) => {
1123						log::debug!(
1124							target: LOG_TARGET,
1125							"failed to dial peer on multiple addresses {errors:?}",
1126						);
1127
1128						if let Some(metrics) = &self.metrics {
1129							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1130						}
1131					}
1132					None => {
1133						log::error!(
1134								target: LOG_TARGET,
1135								"Litep2p backend terminated"
1136						);
1137						return
1138					}
1139				},
1140			}
1141		}
1142	}
1143}