Skip to main content

sc_network/litep2p/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! `NetworkBackend` implementation for `litep2p`.
20
21use crate::{
22	config::{
23		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24		SetConfig, TransportConfig,
25	},
26	error::Error,
27	event::{DhtEvent, Event},
28	litep2p::{
29		discovery::{Discovery, DiscoveryEvent},
30		peerstore::Peerstore,
31		service::{Litep2pNetworkService, NetworkServiceCommand},
32		shim::{
33			bitswap::BitswapServer,
34			notification::{
35				config::{NotificationProtocolConfig, ProtocolControlHandle},
36				peerset::PeersetCommand,
37			},
38			request_response::{RequestResponseConfig, RequestResponseProtocol},
39		},
40	},
41	peer_store::PeerStoreProvider,
42	service::{
43		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44		out_events,
45		traits::{BandwidthSink, NetworkBackend, NetworkService},
46	},
47	NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53	config::ConfigBuilder,
54	crypto::ed25519::Keypair,
55	error::{DialError, NegotiationError},
56	executor::Executor,
57	protocol::{
58		libp2p::{
59			bitswap::Config as BitswapConfig,
60			kademlia::{QueryId, Record},
61		},
62		request_response::ConfigBuilder as RequestResponseConfigBuilder,
63	},
64	transport::{
65		tcp::config::Config as TcpTransportConfig,
66		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67	},
68	types::{
69		multiaddr::{Multiaddr, Protocol},
70		ConnectionId,
71	},
72	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84	cmp,
85	collections::{hash_map::Entry, HashMap, HashSet},
86	fs,
87	future::Future,
88	iter,
89	pin::Pin,
90	sync::{
91		atomic::{AtomicUsize, Ordering},
92		Arc,
93	},
94	time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102/// Litep2p bandwidth sink.
103struct Litep2pBandwidthSink {
104	sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108	fn total_inbound(&self) -> u64 {
109		self.sink.inbound() as u64
110	}
111
112	fn total_outbound(&self) -> u64 {
113		self.sink.outbound() as u64
114	}
115}
116
117/// Litep2p task executor.
118struct Litep2pExecutor {
119	/// Executor.
120	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125		(self.executor)(future)
126	}
127
128	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129		(self.executor)(future)
130	}
131}
132
133/// Logging target for the file.
134const LOG_TARGET: &str = "sub-libp2p";
135
136/// Peer context.
137struct ConnectionContext {
138	/// Peer endpoints.
139	endpoints: HashMap<ConnectionId, Endpoint>,
140
141	/// Number of active connections.
142	num_connections: usize,
143}
144
145/// Kademlia query we are tracking.
146#[derive(Debug)]
147enum KadQuery {
148	/// `FIND_NODE` query for target and when it was initiated.
149	FindNode(PeerId, Instant),
150	/// `GET_VALUE` query for key and when it was initiated.
151	GetValue(RecordKey, Instant),
152	/// `PUT_VALUE` query for key and when it was initiated.
153	PutValue(RecordKey, Instant),
154	/// `GET_PROVIDERS` query for key and when it was initiated.
155	GetProviders(RecordKey, Instant),
156	/// `ADD_PROVIDER` query for key and when it was initiated.
157	AddProvider(RecordKey, Instant),
158}
159
160/// Networking backend for `litep2p`.
161pub struct Litep2pNetworkBackend {
162	/// Main `litep2p` object.
163	litep2p: Litep2p,
164
165	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
166	network_service: Arc<dyn NetworkService>,
167
168	/// RX channel for receiving commands from `Litep2pNetworkService`.
169	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
170
171	/// `Peerset` handles to notification protocols.
172	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
173
174	/// Pending Kademlia queries.
175	pending_queries: HashMap<QueryId, KadQuery>,
176
177	/// Discovery.
178	discovery: Discovery,
179
180	/// Number of connected peers.
181	num_connected: Arc<AtomicUsize>,
182
183	/// Connected peers.
184	peers: HashMap<litep2p::PeerId, ConnectionContext>,
185
186	/// Peerstore.
187	peerstore_handle: Arc<dyn PeerStoreProvider>,
188
189	/// Block announce protocol name.
190	block_announce_protocol: ProtocolName,
191
192	/// Sender for DHT events.
193	event_streams: out_events::OutChannels,
194
195	/// Prometheus metrics.
196	metrics: Option<Metrics>,
197}
198
199impl Litep2pNetworkBackend {
200	/// From an iterator of multiaddress(es), parse and group all addresses of peers
201	/// so that litep2p can consume the information easily.
202	fn parse_addresses(
203		addresses: impl Iterator<Item = Multiaddr>,
204	) -> HashMap<PeerId, Vec<Multiaddr>> {
205		addresses
206			.into_iter()
207			.filter_map(|address| match address.iter().next() {
208				Some(
209					Protocol::Dns(_) |
210					Protocol::Dns4(_) |
211					Protocol::Dns6(_) |
212					Protocol::Ip6(_) |
213					Protocol::Ip4(_),
214				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
215				{
216					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
217						.map_or(None, |peer| Some((peer, Some(address)))),
218					_ => None,
219				},
220				Some(Protocol::P2p(multihash)) => {
221					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None)))
222				},
223				_ => None,
224			})
225			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
226				let entry = acc.entry(peer).or_default();
227				maybe_address.map(|address| entry.push(address));
228
229				acc
230			})
231	}
232
233	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
234	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
235		Self::parse_addresses(peers.into_iter())
236			.into_iter()
237			.filter_map(|(peer, addresses)| {
238				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
239				if addresses.is_empty() {
240					return Some(peer);
241				}
242
243				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
244					log::warn!(
245						target: LOG_TARGET,
246						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
247					);
248					return None;
249				}
250
251				self.peerstore_handle.add_known_peer(peer);
252				Some(peer)
253			})
254			.collect()
255	}
256}
257
258impl Litep2pNetworkBackend {
259	/// Get `litep2p` keypair from `NodeKeyConfig`.
260	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
261		let secret: litep2p::crypto::ed25519::SecretKey =
262			node_key.clone().into_keypair()?.secret().into();
263
264		let local_identity = Keypair::from(secret);
265		let local_public = local_identity.public();
266		let local_peer_id = local_public.to_peer_id();
267
268		Ok((local_identity, local_peer_id))
269	}
270
271	/// Configure transport protocols for `Litep2pNetworkBackend`.
272	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
273		config: &FullNetworkConfiguration<B, H, Self>,
274	) -> ConfigBuilder {
275		let _ = match config.network_config.transport {
276			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
277			TransportConfig::Normal { .. } => false,
278		};
279		let config_builder = ConfigBuilder::new();
280
281		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
282			.network_config
283			.listen_addresses
284			.iter()
285			.filter_map(|address| {
286				use sc_network_types::multiaddr::Protocol;
287
288				let mut iter = address.iter();
289
290				match iter.next() {
291					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
292					protocol => {
293						log::error!(
294							target: LOG_TARGET,
295							"unknown protocol {protocol:?}, ignoring {address:?}",
296						);
297
298						return None;
299					},
300				}
301
302				match iter.next() {
303					Some(Protocol::Tcp(_)) => match iter.next() {
304						Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
305							Some((None, Some(address.clone())))
306						},
307						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
308						protocol => {
309							log::error!(
310								target: LOG_TARGET,
311								"unknown protocol {protocol:?}, ignoring {address:?}",
312							);
313							None
314						},
315					},
316					protocol => {
317						log::error!(
318							target: LOG_TARGET,
319							"unknown protocol {protocol:?}, ignoring {address:?}",
320						);
321						None
322					},
323				}
324			})
325			.unzip();
326
327		config_builder
328			.with_websocket(WebSocketTransportConfig {
329				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
330				yamux_config: litep2p::yamux::Config::default(),
331				nodelay: true,
332				..Default::default()
333			})
334			.with_tcp(TcpTransportConfig {
335				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
336				yamux_config: litep2p::yamux::Config::default(),
337				nodelay: true,
338				..Default::default()
339			})
340	}
341}
342
343#[async_trait::async_trait]
344impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
345	type NotificationProtocolConfig = NotificationProtocolConfig;
346	type RequestResponseProtocolConfig = RequestResponseConfig;
347	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
348	type PeerStore = Peerstore;
349	type BitswapConfig = BitswapConfig;
350
351	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
352	where
353		Self: Sized,
354	{
355		let (keypair, local_peer_id) =
356			Self::get_keypair(&params.network_config.network_config.node_key)?;
357		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
358
359		params.network_config.network_config.boot_nodes = params
360			.network_config
361			.network_config
362			.boot_nodes
363			.into_iter()
364			.filter(|boot_node| boot_node.peer_id != local_peer_id.into())
365			.collect();
366		params.network_config.network_config.default_peers_set.reserved_nodes = params
367			.network_config
368			.network_config
369			.default_peers_set
370			.reserved_nodes
371			.into_iter()
372			.filter(|reserved_node| {
373				if reserved_node.peer_id == local_peer_id.into() {
374					log::warn!(
375						target: LOG_TARGET,
376						"Local peer ID used in reserved node, ignoring: {reserved_node}",
377					);
378					false
379				} else {
380					true
381				}
382			})
383			.collect();
384
385		if let Some(path) = &params.network_config.network_config.net_config_path {
386			fs::create_dir_all(path)?;
387		}
388
389		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
390		log::info!(target: LOG_TARGET, "Running litep2p network backend");
391
392		params.network_config.sanity_check_addresses()?;
393		params.network_config.sanity_check_bootnodes()?;
394
395		let mut config_builder =
396			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
397		let known_addresses = params.network_config.known_addresses();
398		let peer_store_handle = params.network_config.peer_store_handle();
399		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
400
401		let FullNetworkConfiguration {
402			notification_protocols,
403			request_response_protocols,
404			network_config,
405			..
406		} = params.network_config;
407
408		// initialize notification protocols
409		//
410		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
411		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
412		// of Polkadot SDK to control connectivity of the notification protocol
413		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
414		let mut notif_protocols = HashMap::from_iter([(
415			params.block_announce_config.protocol_name().clone(),
416			params.block_announce_config.handle,
417		)]);
418
419		// handshake for all but the syncing protocol is set to node role
420		config_builder = notification_protocols
421			.into_iter()
422			.fold(config_builder, |config_builder, mut config| {
423				config.config.set_handshake(Roles::from(&params.role).encode());
424				notif_protocols.insert(config.protocol_name, config.handle);
425
426				config_builder.with_notification_protocol(config.config)
427			})
428			.with_notification_protocol(params.block_announce_config.config);
429
430		// initialize request-response protocols
431		let metrics = match &params.metrics_registry {
432			Some(registry) => Some(register_without_sources(registry)?),
433			None => None,
434		};
435
436		// create channels that are used to send request before initializing protocols so the
437		// senders can be passed onto all request-response protocols
438		//
439		// all protocols must have each others' senders so they can send the fallback request in
440		// case the main protocol is not supported by the remote peer and user specified a fallback
441		let (mut request_response_receivers, request_response_senders): (
442			HashMap<_, _>,
443			HashMap<_, _>,
444		) = request_response_protocols
445			.iter()
446			.map(|config| {
447				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
448				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
449			})
450			.unzip();
451
452		config_builder = request_response_protocols.into_iter().fold(
453			config_builder,
454			|config_builder, config| {
455				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
456					Litep2pProtocolName::from(config.protocol_name.clone()),
457				)
458				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
459				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
460				.with_timeout(config.request_timeout)
461				.build();
462
463				let protocol = RequestResponseProtocol::new(
464					config.protocol_name.clone(),
465					handle,
466					Arc::clone(&peer_store_handle),
467					config.inbound_queue,
468					request_response_receivers
469						.remove(&config.protocol_name)
470						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
471					request_response_senders.clone(),
472					metrics.clone(),
473				);
474
475				executor.run(Box::pin(async move {
476					protocol.run().await;
477				}));
478
479				config_builder.with_request_response_protocol(protocol_config)
480			},
481		);
482
483		// collect known addresses
484		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
485			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
486				use sc_network_types::multiaddr::Protocol;
487
488				let address = match address.iter().last() {
489					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
490						address.with(Protocol::P2p(peer.into()))
491					},
492					Some(Protocol::P2p(_)) => address,
493					_ => return acc,
494				};
495
496				acc.entry(peer.into()).or_default().push(address.into());
497				peer_store_handle.add_known_peer(peer);
498
499				acc
500			});
501
502		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
503		let listen_addresses = Arc::new(Default::default());
504		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
505			Discovery::new(
506				local_peer_id,
507				&network_config,
508				params.genesis_hash,
509				params.fork_id.as_deref(),
510				&params.protocol_id,
511				known_addresses.clone(),
512				Arc::clone(&listen_addresses),
513				Arc::clone(&peer_store_handle),
514			);
515
516		config_builder = config_builder
517			.with_known_addresses(known_addresses.clone().into_iter())
518			.with_libp2p_ping(ping_config)
519			.with_libp2p_identify(identify_config)
520			.with_libp2p_kademlia(kademlia_config)
521			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
522				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
523			))
524			.with_keep_alive_timeout(network_config.idle_connection_timeout)
525			// Use system DNS resolver to enable intranet domain resolution and administrator
526			// control over DNS lookup.
527			.with_system_resolver()
528			.with_executor(executor);
529
530		if let Some(config) = maybe_mdns_config {
531			config_builder = config_builder.with_mdns(config);
532		}
533
534		if let Some(config) = params.bitswap_config {
535			config_builder = config_builder.with_libp2p_bitswap(config);
536		}
537
538		let litep2p =
539			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
540
541		litep2p.listen_addresses().for_each(|address| {
542			log::debug!(target: LOG_TARGET, "listening on: {address}");
543
544			listen_addresses.write().insert(address.clone());
545		});
546
547		let public_addresses = litep2p.public_addresses();
548		for address in network_config.public_addresses.iter() {
549			if let Err(err) = public_addresses.add_address(address.clone().into()) {
550				log::warn!(
551					target: LOG_TARGET,
552					"failed to add public address {address:?}: {err:?}",
553				);
554			}
555		}
556
557		let network_service = Arc::new(Litep2pNetworkService::new(
558			local_peer_id,
559			keypair.clone(),
560			cmd_tx,
561			Arc::clone(&peer_store_handle),
562			notif_protocols.clone(),
563			block_announce_protocol.clone(),
564			request_response_senders,
565			Arc::clone(&listen_addresses),
566			public_addresses,
567		));
568
569		// register rest of the metrics now that `Litep2p` has been created
570		let num_connected = Arc::new(Default::default());
571		let bandwidth: Arc<dyn BandwidthSink> =
572			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
573
574		if let Some(registry) = &params.metrics_registry {
575			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
576		}
577
578		Ok(Self {
579			network_service,
580			cmd_rx,
581			metrics,
582			peerset_handles: notif_protocols,
583			num_connected,
584			discovery,
585			pending_queries: HashMap::new(),
586			peerstore_handle: peer_store_handle,
587			block_announce_protocol,
588			event_streams: out_events::OutChannels::new(None)?,
589			peers: HashMap::new(),
590			litep2p,
591		})
592	}
593
594	fn network_service(&self) -> Arc<dyn NetworkService> {
595		Arc::clone(&self.network_service)
596	}
597
598	fn peer_store(
599		bootnodes: Vec<sc_network_types::PeerId>,
600		metrics_registry: Option<Registry>,
601	) -> Self::PeerStore {
602		Peerstore::new(bootnodes, metrics_registry)
603	}
604
605	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
606		NotificationMetrics::new(registry)
607	}
608
609	/// Create Bitswap server.
610	fn bitswap_server(
611		client: Arc<dyn BlockBackend<B> + Send + Sync>,
612	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
613		BitswapServer::new(client)
614	}
615
616	/// Create notification protocol configuration for `protocol`.
617	fn notification_config(
618		protocol_name: ProtocolName,
619		fallback_names: Vec<ProtocolName>,
620		max_notification_size: u64,
621		handshake: Option<NotificationHandshake>,
622		set_config: SetConfig,
623		metrics: NotificationMetrics,
624		peerstore_handle: Arc<dyn PeerStoreProvider>,
625	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
626		Self::NotificationProtocolConfig::new(
627			protocol_name,
628			fallback_names,
629			max_notification_size as usize,
630			handshake,
631			set_config,
632			metrics,
633			peerstore_handle,
634		)
635	}
636
637	/// Create request-response protocol configuration.
638	fn request_response_config(
639		protocol_name: ProtocolName,
640		fallback_names: Vec<ProtocolName>,
641		max_request_size: u64,
642		max_response_size: u64,
643		request_timeout: Duration,
644		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
645	) -> Self::RequestResponseProtocolConfig {
646		Self::RequestResponseProtocolConfig::new(
647			protocol_name,
648			fallback_names,
649			max_request_size,
650			max_response_size,
651			request_timeout,
652			inbound_queue,
653		)
654	}
655
656	/// Start [`Litep2pNetworkBackend`] event loop.
657	async fn run(mut self) {
658		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
659
660		loop {
661			let num_connected_peers = self
662				.peerset_handles
663				.get(&self.block_announce_protocol)
664				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
665			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
666
667			tokio::select! {
668				command = self.cmd_rx.next() => match command {
669					None => return,
670					Some(command) => match command {
671						NetworkServiceCommand::FindClosestPeers { target } => {
672							let query_id = self.discovery.find_node(target.into()).await;
673							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
674						}
675						NetworkServiceCommand::GetValue{ key } => {
676							let query_id = self.discovery.get_value(key.clone()).await;
677							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
678						}
679						NetworkServiceCommand::PutValue { key, value } => {
680							let query_id = self.discovery.put_value(key.clone(), value).await;
681							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
682						}
683						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
684							let kademlia_key = record.key.clone();
685							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
686							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
687						}
688						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
689							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
690						}
691						NetworkServiceCommand::StartProviding { key } => {
692							let query_id = self.discovery.start_providing(key.clone()).await;
693							self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
694						}
695						NetworkServiceCommand::StopProviding { key } => {
696							self.discovery.stop_providing(key).await;
697						}
698						NetworkServiceCommand::GetProviders { key } => {
699							let query_id = self.discovery.get_providers(key.clone()).await;
700							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
701						}
702						NetworkServiceCommand::EventStream { tx } => {
703							self.event_streams.push(tx);
704						}
705						NetworkServiceCommand::Status { tx } => {
706							let _ = tx.send(NetworkStatus {
707								num_connected_peers: self
708									.peerset_handles
709									.get(&self.block_announce_protocol)
710									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
711								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
712								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
713							});
714						}
715						NetworkServiceCommand::AddPeersToReservedSet {
716							protocol,
717							peers,
718						} => {
719							let peers = self.add_addresses(peers.into_iter().map(Into::into));
720
721							match self.peerset_handles.get(&protocol) {
722								Some(handle) => {
723									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
724								}
725								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
726							};
727						}
728						NetworkServiceCommand::AddKnownAddress { peer, address } => {
729							let mut address: Multiaddr = address.into();
730
731							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
732								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
733							}
734
735							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
736								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
737								// event when a new address is added for a peer, which leads to the
738								// peer being added to peerstore. Do the same directly here.
739								self.peerstore_handle.add_known_peer(peer);
740							} else {
741								log::debug!(
742									target: LOG_TARGET,
743									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
744								);
745							}
746						},
747						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
748							let peers = self.add_addresses(peers.into_iter().map(Into::into));
749
750							match self.peerset_handles.get(&protocol) {
751								Some(handle) => {
752									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
753								}
754								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
755							}
756
757						},
758						NetworkServiceCommand::DisconnectPeer {
759							protocol,
760							peer,
761						} => {
762							let Some(handle) = self.peerset_handles.get(&protocol) else {
763								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
764								continue
765							};
766
767							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
768						}
769						NetworkServiceCommand::SetReservedOnly {
770							protocol,
771							reserved_only,
772						} => {
773							let Some(handle) = self.peerset_handles.get(&protocol) else {
774								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
775								continue
776							};
777
778							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
779						}
780						NetworkServiceCommand::RemoveReservedPeers {
781							protocol,
782							peers,
783						} => {
784							let Some(handle) = self.peerset_handles.get(&protocol) else {
785								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
786								continue
787							};
788
789							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
790						}
791					}
792				},
793				event = self.discovery.next() => match event {
794					None => return,
795					Some(DiscoveryEvent::Discovered { addresses }) => {
796						// if at least one address was added for the peer, report the peer to `Peerstore`
797						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
798							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
799								self.peerstore_handle.add_known_peer(peer);
800							}
801						}
802					}
803					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
804						for peer in peers {
805							self.peerstore_handle.add_known_peer(peer.into());
806						}
807					}
808					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
809						match self.pending_queries.remove(&query_id) {
810							Some(KadQuery::FindNode(_, started)) => {
811								log::trace!(
812									target: LOG_TARGET,
813									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
814								);
815
816								self.event_streams.send(
817									Event::Dht(
818										DhtEvent::ClosestPeersFound(
819											target.into(),
820											peers
821												.into_iter()
822												.map(|(peer, addrs)| (
823													peer.into(),
824													addrs.into_iter().map(Into::into).collect(),
825												))
826												.collect(),
827										)
828									)
829								);
830
831								if let Some(ref metrics) = self.metrics {
832									metrics
833										.kademlia_query_duration
834										.with_label_values(&["node-find"])
835										.observe(started.elapsed().as_secs_f64());
836								}
837							},
838							query => {
839								log::error!(
840									target: LOG_TARGET,
841									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
842								);
843								debug_assert!(false);
844							}
845						}
846					},
847					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
848						if !self.pending_queries.contains_key(&query_id) {
849							log::error!(
850								target: LOG_TARGET,
851								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
852							);
853
854							continue
855						}
856
857						let peer_id: sc_network_types::PeerId = record.peer.into();
858						let record = PeerRecord {
859							record: P2PRecord {
860								key: record.record.key.to_vec().into(),
861								value: record.record.value,
862								publisher: record.record.publisher.map(|peer_id| {
863									let peer_id: sc_network_types::PeerId = peer_id.into();
864									peer_id.into()
865								}),
866								expires: record.record.expires,
867							},
868							peer: Some(peer_id.into()),
869						};
870
871						self.event_streams.send(
872							Event::Dht(
873								DhtEvent::ValueFound(
874									record.into()
875								)
876							)
877						);
878					}
879					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
880						match self.pending_queries.remove(&query_id) {
881							Some(KadQuery::GetValue(key, started)) => {
882								log::trace!(
883									target: LOG_TARGET,
884									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
885								);
886
887								if let Some(ref metrics) = self.metrics {
888									metrics
889										.kademlia_query_duration
890										.with_label_values(&["value-get"])
891										.observe(started.elapsed().as_secs_f64());
892								}
893							},
894							query => {
895								log::error!(
896									target: LOG_TARGET,
897									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
898								);
899								debug_assert!(false);
900							},
901						}
902					}
903					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
904						match self.pending_queries.remove(&query_id) {
905							Some(KadQuery::PutValue(key, started)) => {
906								log::trace!(
907									target: LOG_TARGET,
908									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
909								);
910
911								self.event_streams.send(Event::Dht(
912									DhtEvent::ValuePut(key)
913								));
914
915								if let Some(ref metrics) = self.metrics {
916									metrics
917										.kademlia_query_duration
918										.with_label_values(&["value-put"])
919										.observe(started.elapsed().as_secs_f64());
920								}
921							},
922							query => {
923								log::error!(
924									target: LOG_TARGET,
925									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
926								);
927								debug_assert!(false);
928							}
929						}
930					}
931					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
932						match self.pending_queries.remove(&query_id) {
933							Some(KadQuery::GetProviders(key, started)) => {
934								log::trace!(
935									target: LOG_TARGET,
936									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
937								);
938
939								// We likely requested providers to connect to them,
940								// so let's add their addresses to litep2p's transport manager.
941								// Consider also looking the addresses of providers up with `FIND_NODE`
942								// query, as it can yield more up to date addresses.
943								providers.iter().for_each(|p| {
944									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
945								});
946
947								self.event_streams.send(Event::Dht(
948									DhtEvent::ProvidersFound(
949										key.clone().into(),
950										providers.into_iter().map(|p| p.peer.into()).collect()
951									)
952								));
953
954								// litep2p returns all providers in a single event, so we let
955								// subscribers know no more providers will be yielded.
956								self.event_streams.send(Event::Dht(
957									DhtEvent::NoMoreProviders(key.into())
958								));
959
960								if let Some(ref metrics) = self.metrics {
961									metrics
962										.kademlia_query_duration
963										.with_label_values(&["providers-get"])
964										.observe(started.elapsed().as_secs_f64());
965								}
966							},
967							query => {
968								log::error!(
969									target: LOG_TARGET,
970									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
971								);
972								debug_assert!(false);
973							}
974						}
975					}
976					Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
977						match self.pending_queries.remove(&query_id) {
978							Some(KadQuery::AddProvider(key, started)) => {
979								debug_assert_eq!(key, provided_key.into());
980
981								log::trace!(
982									target: LOG_TARGET,
983									"`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
984								);
985
986								self.event_streams.send(Event::Dht(
987									DhtEvent::StartedProviding(key.into())
988								));
989
990								if let Some(ref metrics) = self.metrics {
991									metrics
992										.kademlia_query_duration
993										.with_label_values(&["provider-add"])
994										.observe(started.elapsed().as_secs_f64());
995								}
996							}
997							Some(_) => {
998								log::error!(
999									target: LOG_TARGET,
1000									"Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
1001								);
1002								debug_assert!(false);
1003							}
1004							None => {
1005								log::trace!(
1006									target: LOG_TARGET,
1007									"`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1008								);
1009							}
1010						}
1011					}
1012					Some(DiscoveryEvent::QueryFailed { query_id }) => {
1013						match self.pending_queries.remove(&query_id) {
1014							Some(KadQuery::FindNode(peer_id, started)) => {
1015								log::debug!(
1016									target: LOG_TARGET,
1017									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1018								);
1019
1020								self.event_streams.send(Event::Dht(
1021									DhtEvent::ClosestPeersNotFound(peer_id.into())
1022								));
1023
1024								if let Some(ref metrics) = self.metrics {
1025									metrics
1026										.kademlia_query_duration
1027										.with_label_values(&["node-find-failed"])
1028										.observe(started.elapsed().as_secs_f64());
1029								}
1030							},
1031							Some(KadQuery::GetValue(key, started)) => {
1032								log::debug!(
1033									target: LOG_TARGET,
1034									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1035								);
1036
1037								self.event_streams.send(Event::Dht(
1038									DhtEvent::ValueNotFound(key)
1039								));
1040
1041								if let Some(ref metrics) = self.metrics {
1042									metrics
1043										.kademlia_query_duration
1044										.with_label_values(&["value-get-failed"])
1045										.observe(started.elapsed().as_secs_f64());
1046								}
1047							},
1048							Some(KadQuery::PutValue(key, started)) => {
1049								log::debug!(
1050									target: LOG_TARGET,
1051									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1052								);
1053
1054								self.event_streams.send(Event::Dht(
1055									DhtEvent::ValuePutFailed(key)
1056								));
1057
1058								if let Some(ref metrics) = self.metrics {
1059									metrics
1060										.kademlia_query_duration
1061										.with_label_values(&["value-put-failed"])
1062										.observe(started.elapsed().as_secs_f64());
1063								}
1064							},
1065							Some(KadQuery::GetProviders(key, started)) => {
1066								log::debug!(
1067									target: LOG_TARGET,
1068									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1069								);
1070
1071								self.event_streams.send(Event::Dht(
1072									DhtEvent::ProvidersNotFound(key)
1073								));
1074
1075								if let Some(ref metrics) = self.metrics {
1076									metrics
1077										.kademlia_query_duration
1078										.with_label_values(&["providers-get-failed"])
1079										.observe(started.elapsed().as_secs_f64());
1080								}
1081							},
1082							Some(KadQuery::AddProvider(key, started)) => {
1083								log::debug!(
1084									target: LOG_TARGET,
1085									"`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1086								);
1087
1088								self.event_streams.send(Event::Dht(
1089									DhtEvent::StartProvidingFailed(key)
1090								));
1091
1092								if let Some(ref metrics) = self.metrics {
1093									metrics
1094										.kademlia_query_duration
1095										.with_label_values(&["provider-add-failed"])
1096										.observe(started.elapsed().as_secs_f64());
1097								}
1098							},
1099							None => {
1100								log::debug!(
1101									target: LOG_TARGET,
1102									"non-existent query (likely republishing a provider) failed ({query_id:?})",
1103								);
1104							}
1105						}
1106					}
1107					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1108						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1109					}
1110					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1111						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1112							Ok(inserted) => if inserted {
1113								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1114							},
1115							Err(err) => {
1116								log::warn!(
1117									target: LOG_TARGET,
1118									"🔍 Failed to add discovered external address {address:?}: {err:?}",
1119								);
1120							},
1121						}
1122					}
1123					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1124						let local_peer_id = self.litep2p.local_peer_id();
1125
1126						// Litep2p requires the peer ID to be present in the address.
1127						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1128							address.with(Protocol::P2p(*local_peer_id.as_ref()))
1129						} else {
1130							address
1131						};
1132
1133						if self.litep2p.public_addresses().remove_address(&address) {
1134							log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1135						} else {
1136							log::warn!(
1137								target: LOG_TARGET,
1138								"🔍 Failed to remove expired external address {address:?}"
1139							);
1140						}
1141					}
1142					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1143						log::trace!(
1144							target: LOG_TARGET,
1145							"ping time with {peer:?}: {rtt:?}",
1146						);
1147					}
1148					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1149						self.event_streams.send(Event::Dht(
1150							DhtEvent::PutRecordRequest(
1151								key.into(),
1152								value,
1153								publisher.map(Into::into),
1154								expires,
1155							)
1156						));
1157					},
1158
1159					Some(DiscoveryEvent::RandomKademliaStarted) => {
1160						if let Some(metrics) = self.metrics.as_ref() {
1161							metrics.kademlia_random_queries_total.inc();
1162						}
1163					}
1164				},
1165				event = self.litep2p.next_event() => match event {
1166					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1167						let Some(metrics) = &self.metrics else {
1168							continue;
1169						};
1170
1171						let direction = match endpoint {
1172							Endpoint::Dialer { .. } => "out",
1173							Endpoint::Listener { .. } => {
1174								// Increment incoming connections counter.
1175								//
1176								// Note: For litep2p these are represented by established negotiated connections,
1177								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1178								metrics.incoming_connections_total.inc();
1179
1180								"in"
1181							},
1182						};
1183						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1184
1185						match self.peers.entry(peer) {
1186							Entry::Vacant(entry) => {
1187								entry.insert(ConnectionContext {
1188									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1189									num_connections: 1usize,
1190								});
1191								metrics.distinct_peers_connections_opened_total.inc();
1192							}
1193							Entry::Occupied(entry) => {
1194								let entry = entry.into_mut();
1195								entry.num_connections += 1;
1196								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1197							}
1198						}
1199					}
1200					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1201						let Some(metrics) = &self.metrics else {
1202							continue;
1203						};
1204
1205						let Some(context) = self.peers.get_mut(&peer) else {
1206							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1207							continue
1208						};
1209
1210						let direction = match context.endpoints.remove(&connection_id) {
1211							None => {
1212								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1213								continue
1214							}
1215							Some(endpoint) => {
1216								context.num_connections -= 1;
1217
1218								match endpoint {
1219									Endpoint::Dialer { .. } => "out",
1220									Endpoint::Listener { .. } => "in",
1221								}
1222							}
1223						};
1224
1225						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1226
1227						if context.num_connections == 0 {
1228							self.peers.remove(&peer);
1229							metrics.distinct_peers_connections_closed_total.inc();
1230						}
1231					}
1232					Some(Litep2pEvent::DialFailure { address, error }) => {
1233						log::debug!(
1234							target: LOG_TARGET,
1235							"failed to dial peer at {address:?}: {error:?}",
1236						);
1237
1238						if let Some(metrics) = &self.metrics {
1239							let reason = match error {
1240								DialError::Timeout => "timeout",
1241								DialError::AddressError(_) => "invalid-address",
1242								DialError::DnsError(_) => "cannot-resolve-dns",
1243								DialError::NegotiationError(error) => match error {
1244									NegotiationError::Timeout => "timeout",
1245									NegotiationError::PeerIdMissing => "missing-peer-id",
1246									NegotiationError::StateMismatch => "state-mismatch",
1247									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1248									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1249									NegotiationError::SnowError(_) => "noise-error",
1250									NegotiationError::ParseError(_) => "parse-error",
1251									NegotiationError::IoError(_) => "io-error",
1252									NegotiationError::WebSocket(_) => "webscoket-error",
1253									NegotiationError::BadSignature => "bad-signature",
1254								}
1255							};
1256
1257							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1258						}
1259					}
1260					Some(Litep2pEvent::ListDialFailures { errors }) => {
1261						log::debug!(
1262							target: LOG_TARGET,
1263							"failed to dial peer on multiple addresses {errors:?}",
1264						);
1265
1266						if let Some(metrics) = &self.metrics {
1267							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1268						}
1269					}
1270					None => {
1271						log::error!(
1272								target: LOG_TARGET,
1273								"Litep2p backend terminated"
1274						);
1275						return
1276					}
1277				},
1278			}
1279		}
1280	}
1281}