sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	service::{
43		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44		out_events,
45		traits::{BandwidthSink, NetworkBackend, NetworkService},
46	},
47	NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53	config::ConfigBuilder,
54	crypto::ed25519::Keypair,
55	error::{DialError, NegotiationError},
56	executor::Executor,
57	protocol::{
58		libp2p::{
59			bitswap::Config as BitswapConfig,
60			kademlia::{QueryId, Record},
61		},
62		request_response::ConfigBuilder as RequestResponseConfigBuilder,
63	},
64	transport::{
65		tcp::config::Config as TcpTransportConfig,
66		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67	},
68	types::{
69		multiaddr::{Multiaddr, Protocol},
70		ConnectionId,
71	},
72	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84	cmp,
85	collections::{hash_map::Entry, HashMap, HashSet},
86	fs,
87	future::Future,
88	iter,
89	pin::Pin,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Litep2p bandwidth sink.
103struct Litep2pBandwidthSink {
104	sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108	fn total_inbound(&self) -> u64 {
109		self.sink.inbound() as u64
110	}
111
112	fn total_outbound(&self) -> u64 {
113		self.sink.outbound() as u64
114	}
115}
116
117/// Litep2p task executor.
118struct Litep2pExecutor {
119	/// Executor.
120	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125		(self.executor)(future)
126	}
127
128	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129		(self.executor)(future)
130	}
131}
132
133/// Logging target for the file.
134const LOG_TARGET: &str = "sub-libp2p";
135
136/// Peer context.
137struct ConnectionContext {
138	/// Peer endpoints.
139	endpoints: HashMap<ConnectionId, Endpoint>,
140
141	/// Number of active connections.
142	num_connections: usize,
143}
144
145/// Kademlia query we are tracking.
146#[derive(Debug)]
147enum KadQuery {
148	/// `FIND_NODE` query for target and when it was initiated.
149	FindNode(PeerId, Instant),
150	/// `GET_VALUE` query for key and when it was initiated.
151	GetValue(RecordKey, Instant),
152	/// `PUT_VALUE` query for key and when it was initiated.
153	PutValue(RecordKey, Instant),
154	/// `GET_PROVIDERS` query for key and when it was initiated.
155	GetProviders(RecordKey, Instant),
156	/// `ADD_PROVIDER` query for key and when it was initiated.
157	AddProvider(RecordKey, Instant),
158}
159
160/// Networking backend for `litep2p`.
161pub struct Litep2pNetworkBackend {
162	/// Main `litep2p` object.
163	litep2p: Litep2p,
164
165	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
166	network_service: Arc<dyn NetworkService>,
167
168	/// RX channel for receiving commands from `Litep2pNetworkService`.
169	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
170
171	/// `Peerset` handles to notification protocols.
172	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
173
174	/// Pending Kademlia queries.
175	pending_queries: HashMap<QueryId, KadQuery>,
176
177	/// Discovery.
178	discovery: Discovery,
179
180	/// Number of connected peers.
181	num_connected: Arc<AtomicUsize>,
182
183	/// Connected peers.
184	peers: HashMap<litep2p::PeerId, ConnectionContext>,
185
186	/// Peerstore.
187	peerstore_handle: Arc<dyn PeerStoreProvider>,
188
189	/// Block announce protocol name.
190	block_announce_protocol: ProtocolName,
191
192	/// Sender for DHT events.
193	event_streams: out_events::OutChannels,
194
195	/// Prometheus metrics.
196	metrics: Option<Metrics>,
197}
198
199impl Litep2pNetworkBackend {
200	/// From an iterator of multiaddress(es), parse and group all addresses of peers
201	/// so that litep2p can consume the information easily.
202	fn parse_addresses(
203		addresses: impl Iterator<Item = Multiaddr>,
204	) -> HashMap<PeerId, Vec<Multiaddr>> {
205		addresses
206			.into_iter()
207			.filter_map(|address| match address.iter().next() {
208				Some(
209					Protocol::Dns(_) |
210					Protocol::Dns4(_) |
211					Protocol::Dns6(_) |
212					Protocol::Ip6(_) |
213					Protocol::Ip4(_),
214				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
215				{
216					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
217						.map_or(None, |peer| Some((peer, Some(address)))),
218					_ => None,
219				},
220				Some(Protocol::P2p(multihash)) =>
221					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
222				_ => None,
223			})
224			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
225				let entry = acc.entry(peer).or_default();
226				maybe_address.map(|address| entry.push(address));
227
228				acc
229			})
230	}
231
232	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
233	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
234		Self::parse_addresses(peers.into_iter())
235			.into_iter()
236			.filter_map(|(peer, addresses)| {
237				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
238				if addresses.is_empty() {
239					return Some(peer)
240				}
241
242				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
243					log::warn!(
244						target: LOG_TARGET,
245						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
246					);
247					return None
248				}
249
250				self.peerstore_handle.add_known_peer(peer);
251				Some(peer)
252			})
253			.collect()
254	}
255}
256
257impl Litep2pNetworkBackend {
258	/// Get `litep2p` keypair from `NodeKeyConfig`.
259	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
260		let secret: litep2p::crypto::ed25519::SecretKey =
261			node_key.clone().into_keypair()?.secret().into();
262
263		let local_identity = Keypair::from(secret);
264		let local_public = local_identity.public();
265		let local_peer_id = local_public.to_peer_id();
266
267		Ok((local_identity, local_peer_id))
268	}
269
270	/// Configure transport protocols for `Litep2pNetworkBackend`.
271	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
272		config: &FullNetworkConfiguration<B, H, Self>,
273	) -> ConfigBuilder {
274		let _ = match config.network_config.transport {
275			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
276			TransportConfig::Normal { .. } => false,
277		};
278		let config_builder = ConfigBuilder::new();
279
280		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
281			.network_config
282			.listen_addresses
283			.iter()
284			.filter_map(|address| {
285				use sc_network_types::multiaddr::Protocol;
286
287				let mut iter = address.iter();
288
289				match iter.next() {
290					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
291					protocol => {
292						log::error!(
293							target: LOG_TARGET,
294							"unknown protocol {protocol:?}, ignoring {address:?}",
295						);
296
297						return None
298					},
299				}
300
301				match iter.next() {
302					Some(Protocol::Tcp(_)) => match iter.next() {
303						Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
304							Some((None, Some(address.clone()))),
305						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
306						protocol => {
307							log::error!(
308								target: LOG_TARGET,
309								"unknown protocol {protocol:?}, ignoring {address:?}",
310							);
311							None
312						},
313					},
314					protocol => {
315						log::error!(
316							target: LOG_TARGET,
317							"unknown protocol {protocol:?}, ignoring {address:?}",
318						);
319						None
320					},
321				}
322			})
323			.unzip();
324
325		config_builder
326			.with_websocket(WebSocketTransportConfig {
327				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
328				yamux_config: litep2p::yamux::Config::default(),
329				nodelay: true,
330				..Default::default()
331			})
332			.with_tcp(TcpTransportConfig {
333				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
334				yamux_config: litep2p::yamux::Config::default(),
335				nodelay: true,
336				..Default::default()
337			})
338	}
339}
340
341#[async_trait::async_trait]
342impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
343	type NotificationProtocolConfig = NotificationProtocolConfig;
344	type RequestResponseProtocolConfig = RequestResponseConfig;
345	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
346	type PeerStore = Peerstore;
347	type BitswapConfig = BitswapConfig;
348
349	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
350	where
351		Self: Sized,
352	{
353		let (keypair, local_peer_id) =
354			Self::get_keypair(&params.network_config.network_config.node_key)?;
355		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
356
357		params.network_config.network_config.boot_nodes = params
358			.network_config
359			.network_config
360			.boot_nodes
361			.into_iter()
362			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
363			.collect();
364		params.network_config.network_config.default_peers_set.reserved_nodes = params
365			.network_config
366			.network_config
367			.default_peers_set
368			.reserved_nodes
369			.into_iter()
370			.filter(|reserved_node| {
371				if reserved_node.peer_id == local_peer_id.into() {
372					log::warn!(
373						target: LOG_TARGET,
374						"Local peer ID used in reserved node, ignoring: {reserved_node}",
375					);
376					false
377				} else {
378					true
379				}
380			})
381			.collect();
382
383		if let Some(path) = &params.network_config.network_config.net_config_path {
384			fs::create_dir_all(path)?;
385		}
386
387		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
388		log::info!(target: LOG_TARGET, "Running litep2p network backend");
389
390		params.network_config.sanity_check_addresses()?;
391		params.network_config.sanity_check_bootnodes()?;
392
393		let mut config_builder =
394			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
395		let known_addresses = params.network_config.known_addresses();
396		let peer_store_handle = params.network_config.peer_store_handle();
397		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
398
399		let FullNetworkConfiguration {
400			notification_protocols,
401			request_response_protocols,
402			network_config,
403			..
404		} = params.network_config;
405
406		// initialize notification protocols
407		//
408		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
409		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
410		// of Polkadot SDK to control connectivity of the notification protocol
411		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
412		let mut notif_protocols = HashMap::from_iter([(
413			params.block_announce_config.protocol_name().clone(),
414			params.block_announce_config.handle,
415		)]);
416
417		// handshake for all but the syncing protocol is set to node role
418		config_builder = notification_protocols
419			.into_iter()
420			.fold(config_builder, |config_builder, mut config| {
421				config.config.set_handshake(Roles::from(&params.role).encode());
422				notif_protocols.insert(config.protocol_name, config.handle);
423
424				config_builder.with_notification_protocol(config.config)
425			})
426			.with_notification_protocol(params.block_announce_config.config);
427
428		// initialize request-response protocols
429		let metrics = match &params.metrics_registry {
430			Some(registry) => Some(register_without_sources(registry)?),
431			None => None,
432		};
433
434		// create channels that are used to send request before initializing protocols so the
435		// senders can be passed onto all request-response protocols
436		//
437		// all protocols must have each others' senders so they can send the fallback request in
438		// case the main protocol is not supported by the remote peer and user specified a fallback
439		let (mut request_response_receivers, request_response_senders): (
440			HashMap<_, _>,
441			HashMap<_, _>,
442		) = request_response_protocols
443			.iter()
444			.map(|config| {
445				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
446				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
447			})
448			.unzip();
449
450		config_builder = request_response_protocols.into_iter().fold(
451			config_builder,
452			|config_builder, config| {
453				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
454					Litep2pProtocolName::from(config.protocol_name.clone()),
455				)
456				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
457				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
458				.with_timeout(config.request_timeout)
459				.build();
460
461				let protocol = RequestResponseProtocol::new(
462					config.protocol_name.clone(),
463					handle,
464					Arc::clone(&peer_store_handle),
465					config.inbound_queue,
466					request_response_receivers
467						.remove(&config.protocol_name)
468						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
469					request_response_senders.clone(),
470					metrics.clone(),
471				);
472
473				executor.run(Box::pin(async move {
474					protocol.run().await;
475				}));
476
477				config_builder.with_request_response_protocol(protocol_config)
478			},
479		);
480
481		// collect known addresses
482		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
483			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
484				use sc_network_types::multiaddr::Protocol;
485
486				let address = match address.iter().last() {
487					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
488						address.with(Protocol::P2p(peer.into())),
489					Some(Protocol::P2p(_)) => address,
490					_ => return acc,
491				};
492
493				acc.entry(peer.into()).or_default().push(address.into());
494				peer_store_handle.add_known_peer(peer);
495
496				acc
497			});
498
499		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
500		let listen_addresses = Arc::new(Default::default());
501		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
502			Discovery::new(
503				local_peer_id,
504				&network_config,
505				params.genesis_hash,
506				params.fork_id.as_deref(),
507				&params.protocol_id,
508				known_addresses.clone(),
509				Arc::clone(&listen_addresses),
510				Arc::clone(&peer_store_handle),
511			);
512
513		config_builder = config_builder
514			.with_known_addresses(known_addresses.clone().into_iter())
515			.with_libp2p_ping(ping_config)
516			.with_libp2p_identify(identify_config)
517			.with_libp2p_kademlia(kademlia_config)
518			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
519				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
520			))
521			.with_keep_alive_timeout(network_config.idle_connection_timeout)
522			// Use system DNS resolver to enable intranet domain resolution and administrator
523			// control over DNS lookup.
524			.with_system_resolver()
525			.with_executor(executor);
526
527		if let Some(config) = maybe_mdns_config {
528			config_builder = config_builder.with_mdns(config);
529		}
530
531		if let Some(config) = params.bitswap_config {
532			config_builder = config_builder.with_libp2p_bitswap(config);
533		}
534
535		let litep2p =
536			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
537
538		litep2p.listen_addresses().for_each(|address| {
539			log::debug!(target: LOG_TARGET, "listening on: {address}");
540
541			listen_addresses.write().insert(address.clone());
542		});
543
544		let public_addresses = litep2p.public_addresses();
545		for address in network_config.public_addresses.iter() {
546			if let Err(err) = public_addresses.add_address(address.clone().into()) {
547				log::warn!(
548					target: LOG_TARGET,
549					"failed to add public address {address:?}: {err:?}",
550				);
551			}
552		}
553
554		let network_service = Arc::new(Litep2pNetworkService::new(
555			local_peer_id,
556			keypair.clone(),
557			cmd_tx,
558			Arc::clone(&peer_store_handle),
559			notif_protocols.clone(),
560			block_announce_protocol.clone(),
561			request_response_senders,
562			Arc::clone(&listen_addresses),
563			public_addresses,
564		));
565
566		// register rest of the metrics now that `Litep2p` has been created
567		let num_connected = Arc::new(Default::default());
568		let bandwidth: Arc<dyn BandwidthSink> =
569			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
570
571		if let Some(registry) = &params.metrics_registry {
572			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
573		}
574
575		Ok(Self {
576			network_service,
577			cmd_rx,
578			metrics,
579			peerset_handles: notif_protocols,
580			num_connected,
581			discovery,
582			pending_queries: HashMap::new(),
583			peerstore_handle: peer_store_handle,
584			block_announce_protocol,
585			event_streams: out_events::OutChannels::new(None)?,
586			peers: HashMap::new(),
587			litep2p,
588		})
589	}
590
591	fn network_service(&self) -> Arc<dyn NetworkService> {
592		Arc::clone(&self.network_service)
593	}
594
595	fn peer_store(
596		bootnodes: Vec<sc_network_types::PeerId>,
597		metrics_registry: Option<Registry>,
598	) -> Self::PeerStore {
599		Peerstore::new(bootnodes, metrics_registry)
600	}
601
602	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
603		NotificationMetrics::new(registry)
604	}
605
606	/// Create Bitswap server.
607	fn bitswap_server(
608		client: Arc<dyn BlockBackend<B> + Send + Sync>,
609	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
610		BitswapServer::new(client)
611	}
612
613	/// Create notification protocol configuration for `protocol`.
614	fn notification_config(
615		protocol_name: ProtocolName,
616		fallback_names: Vec<ProtocolName>,
617		max_notification_size: u64,
618		handshake: Option<NotificationHandshake>,
619		set_config: SetConfig,
620		metrics: NotificationMetrics,
621		peerstore_handle: Arc<dyn PeerStoreProvider>,
622	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
623		Self::NotificationProtocolConfig::new(
624			protocol_name,
625			fallback_names,
626			max_notification_size as usize,
627			handshake,
628			set_config,
629			metrics,
630			peerstore_handle,
631		)
632	}
633
634	/// Create request-response protocol configuration.
635	fn request_response_config(
636		protocol_name: ProtocolName,
637		fallback_names: Vec<ProtocolName>,
638		max_request_size: u64,
639		max_response_size: u64,
640		request_timeout: Duration,
641		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
642	) -> Self::RequestResponseProtocolConfig {
643		Self::RequestResponseProtocolConfig::new(
644			protocol_name,
645			fallback_names,
646			max_request_size,
647			max_response_size,
648			request_timeout,
649			inbound_queue,
650		)
651	}
652
653	/// Start [`Litep2pNetworkBackend`] event loop.
654	async fn run(mut self) {
655		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
656
657		loop {
658			let num_connected_peers = self
659				.peerset_handles
660				.get(&self.block_announce_protocol)
661				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
662			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
663
664			tokio::select! {
665				command = self.cmd_rx.next() => match command {
666					None => return,
667					Some(command) => match command {
668						NetworkServiceCommand::FindClosestPeers { target } => {
669							let query_id = self.discovery.find_node(target.into()).await;
670							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
671						}
672						NetworkServiceCommand::GetValue{ key } => {
673							let query_id = self.discovery.get_value(key.clone()).await;
674							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
675						}
676						NetworkServiceCommand::PutValue { key, value } => {
677							let query_id = self.discovery.put_value(key.clone(), value).await;
678							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
679						}
680						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
681							let kademlia_key = record.key.clone();
682							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
683							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
684						}
685						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
686							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
687						}
688						NetworkServiceCommand::StartProviding { key } => {
689							let query_id = self.discovery.start_providing(key.clone()).await;
690							self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
691						}
692						NetworkServiceCommand::StopProviding { key } => {
693							self.discovery.stop_providing(key).await;
694						}
695						NetworkServiceCommand::GetProviders { key } => {
696							let query_id = self.discovery.get_providers(key.clone()).await;
697							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
698						}
699						NetworkServiceCommand::EventStream { tx } => {
700							self.event_streams.push(tx);
701						}
702						NetworkServiceCommand::Status { tx } => {
703							let _ = tx.send(NetworkStatus {
704								num_connected_peers: self
705									.peerset_handles
706									.get(&self.block_announce_protocol)
707									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
708								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
709								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
710							});
711						}
712						NetworkServiceCommand::AddPeersToReservedSet {
713							protocol,
714							peers,
715						} => {
716							let peers = self.add_addresses(peers.into_iter().map(Into::into));
717
718							match self.peerset_handles.get(&protocol) {
719								Some(handle) => {
720									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
721								}
722								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
723							};
724						}
725						NetworkServiceCommand::AddKnownAddress { peer, address } => {
726							let mut address: Multiaddr = address.into();
727
728							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
729								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
730							}
731
732							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
733								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
734								// event when a new address is added for a peer, which leads to the
735								// peer being added to peerstore. Do the same directly here.
736								self.peerstore_handle.add_known_peer(peer);
737							} else {
738								log::debug!(
739									target: LOG_TARGET,
740									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
741								);
742							}
743						},
744						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
745							let peers = self.add_addresses(peers.into_iter().map(Into::into));
746
747							match self.peerset_handles.get(&protocol) {
748								Some(handle) => {
749									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
750								}
751								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
752							}
753
754						},
755						NetworkServiceCommand::DisconnectPeer {
756							protocol,
757							peer,
758						} => {
759							let Some(handle) = self.peerset_handles.get(&protocol) else {
760								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
761								continue
762							};
763
764							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
765						}
766						NetworkServiceCommand::SetReservedOnly {
767							protocol,
768							reserved_only,
769						} => {
770							let Some(handle) = self.peerset_handles.get(&protocol) else {
771								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
772								continue
773							};
774
775							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
776						}
777						NetworkServiceCommand::RemoveReservedPeers {
778							protocol,
779							peers,
780						} => {
781							let Some(handle) = self.peerset_handles.get(&protocol) else {
782								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
783								continue
784							};
785
786							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
787						}
788					}
789				},
790				event = self.discovery.next() => match event {
791					None => return,
792					Some(DiscoveryEvent::Discovered { addresses }) => {
793						// if at least one address was added for the peer, report the peer to `Peerstore`
794						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
795							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
796								self.peerstore_handle.add_known_peer(peer);
797							}
798						}
799					}
800					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
801						for peer in peers {
802							self.peerstore_handle.add_known_peer(peer.into());
803						}
804					}
805					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
806						match self.pending_queries.remove(&query_id) {
807							Some(KadQuery::FindNode(_, started)) => {
808								log::trace!(
809									target: LOG_TARGET,
810									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
811								);
812
813								self.event_streams.send(
814									Event::Dht(
815										DhtEvent::ClosestPeersFound(
816											target.into(),
817											peers
818												.into_iter()
819												.map(|(peer, addrs)| (
820													peer.into(),
821													addrs.into_iter().map(Into::into).collect(),
822												))
823												.collect(),
824										)
825									)
826								);
827
828								if let Some(ref metrics) = self.metrics {
829									metrics
830										.kademlia_query_duration
831										.with_label_values(&["node-find"])
832										.observe(started.elapsed().as_secs_f64());
833								}
834							},
835							query => {
836								log::error!(
837									target: LOG_TARGET,
838									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
839								);
840								debug_assert!(false);
841							}
842						}
843					},
844					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
845						if !self.pending_queries.contains_key(&query_id) {
846							log::error!(
847								target: LOG_TARGET,
848								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
849							);
850
851							continue
852						}
853
854						let peer_id: sc_network_types::PeerId = record.peer.into();
855						let record = PeerRecord {
856							record: P2PRecord {
857								key: record.record.key.to_vec().into(),
858								value: record.record.value,
859								publisher: record.record.publisher.map(|peer_id| {
860									let peer_id: sc_network_types::PeerId = peer_id.into();
861									peer_id.into()
862								}),
863								expires: record.record.expires,
864							},
865							peer: Some(peer_id.into()),
866						};
867
868						self.event_streams.send(
869							Event::Dht(
870								DhtEvent::ValueFound(
871									record.into()
872								)
873							)
874						);
875					}
876					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
877						match self.pending_queries.remove(&query_id) {
878							Some(KadQuery::GetValue(key, started)) => {
879								log::trace!(
880									target: LOG_TARGET,
881									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
882								);
883
884								if let Some(ref metrics) = self.metrics {
885									metrics
886										.kademlia_query_duration
887										.with_label_values(&["value-get"])
888										.observe(started.elapsed().as_secs_f64());
889								}
890							},
891							query => {
892								log::error!(
893									target: LOG_TARGET,
894									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
895								);
896								debug_assert!(false);
897							},
898						}
899					}
900					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
901						match self.pending_queries.remove(&query_id) {
902							Some(KadQuery::PutValue(key, started)) => {
903								log::trace!(
904									target: LOG_TARGET,
905									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
906								);
907
908								self.event_streams.send(Event::Dht(
909									DhtEvent::ValuePut(key)
910								));
911
912								if let Some(ref metrics) = self.metrics {
913									metrics
914										.kademlia_query_duration
915										.with_label_values(&["value-put"])
916										.observe(started.elapsed().as_secs_f64());
917								}
918							},
919							query => {
920								log::error!(
921									target: LOG_TARGET,
922									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
923								);
924								debug_assert!(false);
925							}
926						}
927					}
928					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
929						match self.pending_queries.remove(&query_id) {
930							Some(KadQuery::GetProviders(key, started)) => {
931								log::trace!(
932									target: LOG_TARGET,
933									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
934								);
935
936								// We likely requested providers to connect to them,
937								// so let's add their addresses to litep2p's transport manager.
938								// Consider also looking the addresses of providers up with `FIND_NODE`
939								// query, as it can yield more up to date addresses.
940								providers.iter().for_each(|p| {
941									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
942								});
943
944								self.event_streams.send(Event::Dht(
945									DhtEvent::ProvidersFound(
946										key.clone().into(),
947										providers.into_iter().map(|p| p.peer.into()).collect()
948									)
949								));
950
951								// litep2p returns all providers in a single event, so we let
952								// subscribers know no more providers will be yielded.
953								self.event_streams.send(Event::Dht(
954									DhtEvent::NoMoreProviders(key.into())
955								));
956
957								if let Some(ref metrics) = self.metrics {
958									metrics
959										.kademlia_query_duration
960										.with_label_values(&["providers-get"])
961										.observe(started.elapsed().as_secs_f64());
962								}
963							},
964							query => {
965								log::error!(
966									target: LOG_TARGET,
967									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
968								);
969								debug_assert!(false);
970							}
971						}
972					}
973					Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
974						match self.pending_queries.remove(&query_id) {
975							Some(KadQuery::AddProvider(key, started)) => {
976								debug_assert_eq!(key, provided_key.into());
977
978								log::trace!(
979									target: LOG_TARGET,
980									"`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
981								);
982
983								self.event_streams.send(Event::Dht(
984									DhtEvent::StartedProviding(key.into())
985								));
986
987								if let Some(ref metrics) = self.metrics {
988									metrics
989										.kademlia_query_duration
990										.with_label_values(&["provider-add"])
991										.observe(started.elapsed().as_secs_f64());
992								}
993							}
994							Some(_) => {
995								log::error!(
996									target: LOG_TARGET,
997									"Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
998								);
999								debug_assert!(false);
1000							}
1001							None => {
1002								log::trace!(
1003									target: LOG_TARGET,
1004									"`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1005								);
1006							}
1007						}
1008					}
1009					Some(DiscoveryEvent::QueryFailed { query_id }) => {
1010						match self.pending_queries.remove(&query_id) {
1011							Some(KadQuery::FindNode(peer_id, started)) => {
1012								log::debug!(
1013									target: LOG_TARGET,
1014									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1015								);
1016
1017								self.event_streams.send(Event::Dht(
1018									DhtEvent::ClosestPeersNotFound(peer_id.into())
1019								));
1020
1021								if let Some(ref metrics) = self.metrics {
1022									metrics
1023										.kademlia_query_duration
1024										.with_label_values(&["node-find-failed"])
1025										.observe(started.elapsed().as_secs_f64());
1026								}
1027							},
1028							Some(KadQuery::GetValue(key, started)) => {
1029								log::debug!(
1030									target: LOG_TARGET,
1031									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1032								);
1033
1034								self.event_streams.send(Event::Dht(
1035									DhtEvent::ValueNotFound(key)
1036								));
1037
1038								if let Some(ref metrics) = self.metrics {
1039									metrics
1040										.kademlia_query_duration
1041										.with_label_values(&["value-get-failed"])
1042										.observe(started.elapsed().as_secs_f64());
1043								}
1044							},
1045							Some(KadQuery::PutValue(key, started)) => {
1046								log::debug!(
1047									target: LOG_TARGET,
1048									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1049								);
1050
1051								self.event_streams.send(Event::Dht(
1052									DhtEvent::ValuePutFailed(key)
1053								));
1054
1055								if let Some(ref metrics) = self.metrics {
1056									metrics
1057										.kademlia_query_duration
1058										.with_label_values(&["value-put-failed"])
1059										.observe(started.elapsed().as_secs_f64());
1060								}
1061							},
1062							Some(KadQuery::GetProviders(key, started)) => {
1063								log::debug!(
1064									target: LOG_TARGET,
1065									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1066								);
1067
1068								self.event_streams.send(Event::Dht(
1069									DhtEvent::ProvidersNotFound(key)
1070								));
1071
1072								if let Some(ref metrics) = self.metrics {
1073									metrics
1074										.kademlia_query_duration
1075										.with_label_values(&["providers-get-failed"])
1076										.observe(started.elapsed().as_secs_f64());
1077								}
1078							},
1079							Some(KadQuery::AddProvider(key, started)) => {
1080								log::debug!(
1081									target: LOG_TARGET,
1082									"`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1083								);
1084
1085								self.event_streams.send(Event::Dht(
1086									DhtEvent::StartProvidingFailed(key)
1087								));
1088
1089								if let Some(ref metrics) = self.metrics {
1090									metrics
1091										.kademlia_query_duration
1092										.with_label_values(&["provider-add-failed"])
1093										.observe(started.elapsed().as_secs_f64());
1094								}
1095							},
1096							None => {
1097								log::debug!(
1098									target: LOG_TARGET,
1099									"non-existent query (likely republishing a provider) failed ({query_id:?})",
1100								);
1101							}
1102						}
1103					}
1104					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1105						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1106					}
1107					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1108						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1109							Ok(inserted) => if inserted {
1110								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1111							},
1112							Err(err) => {
1113								log::warn!(
1114									target: LOG_TARGET,
1115									"🔍 Failed to add discovered external address {address:?}: {err:?}",
1116								);
1117							},
1118						}
1119					}
1120					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1121						let local_peer_id = self.litep2p.local_peer_id();
1122
1123						// Litep2p requires the peer ID to be present in the address.
1124						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1125							address.with(Protocol::P2p(*local_peer_id.as_ref()))
1126						} else {
1127							address
1128						};
1129
1130						if self.litep2p.public_addresses().remove_address(&address) {
1131							log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1132						} else {
1133							log::warn!(
1134								target: LOG_TARGET,
1135								"🔍 Failed to remove expired external address {address:?}"
1136							);
1137						}
1138					}
1139					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1140						log::trace!(
1141							target: LOG_TARGET,
1142							"ping time with {peer:?}: {rtt:?}",
1143						);
1144					}
1145					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1146						self.event_streams.send(Event::Dht(
1147							DhtEvent::PutRecordRequest(
1148								key.into(),
1149								value,
1150								publisher.map(Into::into),
1151								expires,
1152							)
1153						));
1154					},
1155
1156					Some(DiscoveryEvent::RandomKademliaStarted) => {
1157						if let Some(metrics) = self.metrics.as_ref() {
1158							metrics.kademlia_random_queries_total.inc();
1159						}
1160					}
1161				},
1162				event = self.litep2p.next_event() => match event {
1163					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1164						let Some(metrics) = &self.metrics else {
1165							continue;
1166						};
1167
1168						let direction = match endpoint {
1169							Endpoint::Dialer { .. } => "out",
1170							Endpoint::Listener { .. } => {
1171								// Increment incoming connections counter.
1172								//
1173								// Note: For litep2p these are represented by established negotiated connections,
1174								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1175								metrics.incoming_connections_total.inc();
1176
1177								"in"
1178							},
1179						};
1180						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1181
1182						match self.peers.entry(peer) {
1183							Entry::Vacant(entry) => {
1184								entry.insert(ConnectionContext {
1185									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1186									num_connections: 1usize,
1187								});
1188								metrics.distinct_peers_connections_opened_total.inc();
1189							}
1190							Entry::Occupied(entry) => {
1191								let entry = entry.into_mut();
1192								entry.num_connections += 1;
1193								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1194							}
1195						}
1196					}
1197					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1198						let Some(metrics) = &self.metrics else {
1199							continue;
1200						};
1201
1202						let Some(context) = self.peers.get_mut(&peer) else {
1203							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1204							continue
1205						};
1206
1207						let direction = match context.endpoints.remove(&connection_id) {
1208							None => {
1209								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1210								continue
1211							}
1212							Some(endpoint) => {
1213								context.num_connections -= 1;
1214
1215								match endpoint {
1216									Endpoint::Dialer { .. } => "out",
1217									Endpoint::Listener { .. } => "in",
1218								}
1219							}
1220						};
1221
1222						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1223
1224						if context.num_connections == 0 {
1225							self.peers.remove(&peer);
1226							metrics.distinct_peers_connections_closed_total.inc();
1227						}
1228					}
1229					Some(Litep2pEvent::DialFailure { address, error }) => {
1230						log::debug!(
1231							target: LOG_TARGET,
1232							"failed to dial peer at {address:?}: {error:?}",
1233						);
1234
1235						if let Some(metrics) = &self.metrics {
1236							let reason = match error {
1237								DialError::Timeout => "timeout",
1238								DialError::AddressError(_) => "invalid-address",
1239								DialError::DnsError(_) => "cannot-resolve-dns",
1240								DialError::NegotiationError(error) => match error {
1241									NegotiationError::Timeout => "timeout",
1242									NegotiationError::PeerIdMissing => "missing-peer-id",
1243									NegotiationError::StateMismatch => "state-mismatch",
1244									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1245									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1246									NegotiationError::SnowError(_) => "noise-error",
1247									NegotiationError::ParseError(_) => "parse-error",
1248									NegotiationError::IoError(_) => "io-error",
1249									NegotiationError::WebSocket(_) => "webscoket-error",
1250									NegotiationError::BadSignature => "bad-signature",
1251								}
1252							};
1253
1254							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1255						}
1256					}
1257					Some(Litep2pEvent::ListDialFailures { errors }) => {
1258						log::debug!(
1259							target: LOG_TARGET,
1260							"failed to dial peer on multiple addresses {errors:?}",
1261						);
1262
1263						if let Some(metrics) = &self.metrics {
1264							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1265						}
1266					}
1267					None => {
1268						log::error!(
1269								target: LOG_TARGET,
1270								"Litep2p backend terminated"
1271						);
1272						return
1273					}
1274				},
1275			}
1276		}
1277	}
1278}