Skip to main content

soil_network/litep2p/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! `NetworkBackend` implementation for `litep2p`.
8
9use crate::{
10	config::{
11		FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
12		SetConfig, TransportConfig,
13	},
14	error::Error,
15	event::{DhtEvent, Event},
16	litep2p::{
17		discovery::{Discovery, DiscoveryEvent},
18		peerstore::Peerstore,
19		service::{Litep2pNetworkService, NetworkServiceCommand},
20		shim::{
21			bitswap::BitswapServer,
22			notification::{
23				config::{NotificationProtocolConfig, ProtocolControlHandle},
24				peerset::PeersetCommand,
25			},
26			request_response::{RequestResponseConfig, RequestResponseProtocol},
27		},
28	},
29	peer_store::PeerStoreProvider,
30	service::{
31		metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
32		out_events,
33		traits::{BandwidthSink, NetworkBackend, NetworkService},
34	},
35	NetworkStatus, NotificationService, ProtocolName,
36};
37
38use crate::types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
39use codec::Encode;
40use futures::StreamExt;
41use litep2p::{
42	config::ConfigBuilder,
43	crypto::ed25519::Keypair,
44	error::{DialError, NegotiationError},
45	executor::Executor,
46	protocol::{
47		libp2p::{
48			bitswap::Config as BitswapConfig,
49			kademlia::{QueryId, Record},
50		},
51		request_response::ConfigBuilder as RequestResponseConfigBuilder,
52	},
53	transport::{
54		tcp::config::Config as TcpTransportConfig,
55		websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
56	},
57	types::{
58		multiaddr::{Multiaddr, Protocol},
59		ConnectionId,
60	},
61	Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
62};
63use soil_prometheus::Registry;
64
65use crate::common::{role::Roles, ExHashT};
66use crate::types::PeerId;
67use soil_client::client_api::BlockBackend;
68use soil_client::utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
69use subsoil::runtime::traits::Block as BlockT;
70
71use std::{
72	cmp,
73	collections::{hash_map::Entry, HashMap, HashSet},
74	fs,
75	future::Future,
76	iter,
77	pin::Pin,
78	sync::{
79		atomic::{AtomicUsize, Ordering},
80		Arc,
81	},
82	time::{Duration, Instant},
83};
84
85mod discovery;
86mod peerstore;
87mod service;
88mod shim;
89
90/// Litep2p bandwidth sink.
91struct Litep2pBandwidthSink {
92	sink: litep2p::BandwidthSink,
93}
94
95impl BandwidthSink for Litep2pBandwidthSink {
96	fn total_inbound(&self) -> u64 {
97		self.sink.inbound() as u64
98	}
99
100	fn total_outbound(&self) -> u64 {
101		self.sink.outbound() as u64
102	}
103}
104
105/// Litep2p task executor.
106struct Litep2pExecutor {
107	/// Executor.
108	executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
109}
110
111impl Executor for Litep2pExecutor {
112	fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
113		(self.executor)(future)
114	}
115
116	fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
117		(self.executor)(future)
118	}
119}
120
121/// Logging target for the file.
122const LOG_TARGET: &str = "sub-libp2p";
123
124/// Peer context.
125struct ConnectionContext {
126	/// Peer endpoints.
127	endpoints: HashMap<ConnectionId, Endpoint>,
128
129	/// Number of active connections.
130	num_connections: usize,
131}
132
133/// Kademlia query we are tracking.
134#[derive(Debug)]
135enum KadQuery {
136	/// `FIND_NODE` query for target and when it was initiated.
137	FindNode(PeerId, Instant),
138	/// `GET_VALUE` query for key and when it was initiated.
139	GetValue(RecordKey, Instant),
140	/// `PUT_VALUE` query for key and when it was initiated.
141	PutValue(RecordKey, Instant),
142	/// `GET_PROVIDERS` query for key and when it was initiated.
143	GetProviders(RecordKey, Instant),
144	/// `ADD_PROVIDER` query for key and when it was initiated.
145	AddProvider(RecordKey, Instant),
146}
147
148/// Networking backend for `litep2p`.
149pub struct Litep2pNetworkBackend {
150	/// Main `litep2p` object.
151	litep2p: Litep2p,
152
153	/// `NetworkService` implementation for `Litep2pNetworkBackend`.
154	network_service: Arc<dyn NetworkService>,
155
156	/// RX channel for receiving commands from `Litep2pNetworkService`.
157	cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
158
159	/// `Peerset` handles to notification protocols.
160	peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
161
162	/// Pending Kademlia queries.
163	pending_queries: HashMap<QueryId, KadQuery>,
164
165	/// Discovery.
166	discovery: Discovery,
167
168	/// Number of connected peers.
169	num_connected: Arc<AtomicUsize>,
170
171	/// Connected peers.
172	peers: HashMap<litep2p::PeerId, ConnectionContext>,
173
174	/// Peerstore.
175	peerstore_handle: Arc<dyn PeerStoreProvider>,
176
177	/// Block announce protocol name.
178	block_announce_protocol: ProtocolName,
179
180	/// Sender for DHT events.
181	event_streams: out_events::OutChannels,
182
183	/// Prometheus metrics.
184	metrics: Option<Metrics>,
185}
186
187impl Litep2pNetworkBackend {
188	/// From an iterator of multiaddress(es), parse and group all addresses of peers
189	/// so that litep2p can consume the information easily.
190	fn parse_addresses(
191		addresses: impl Iterator<Item = Multiaddr>,
192	) -> HashMap<PeerId, Vec<Multiaddr>> {
193		addresses
194			.into_iter()
195			.filter_map(|address| match address.iter().next() {
196				Some(
197					Protocol::Dns(_)
198					| Protocol::Dns4(_)
199					| Protocol::Dns6(_)
200					| Protocol::Ip6(_)
201					| Protocol::Ip4(_),
202				) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
203				{
204					Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
205						.map_or(None, |peer| Some((peer, Some(address)))),
206					_ => None,
207				},
208				Some(Protocol::P2p(multihash)) => {
209					PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None)))
210				},
211				_ => None,
212			})
213			.fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
214				let entry = acc.entry(peer).or_default();
215				maybe_address.map(|address| entry.push(address));
216
217				acc
218			})
219	}
220
221	/// Add new known addresses to `litep2p` and return the parsed peer IDs.
222	fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
223		Self::parse_addresses(peers.into_iter())
224			.into_iter()
225			.filter_map(|(peer, addresses)| {
226				// `peers` contained multiaddress in the form `/p2p/<peer ID>`
227				if addresses.is_empty() {
228					return Some(peer);
229				}
230
231				if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
232					log::warn!(
233						target: LOG_TARGET,
234						"couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
235					);
236					return None;
237				}
238
239				self.peerstore_handle.add_known_peer(peer);
240				Some(peer)
241			})
242			.collect()
243	}
244}
245
246impl Litep2pNetworkBackend {
247	/// Get `litep2p` keypair from `NodeKeyConfig`.
248	fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
249		let secret: litep2p::crypto::ed25519::SecretKey =
250			node_key.clone().into_keypair()?.secret().into();
251
252		let local_identity = Keypair::from(secret);
253		let local_public = local_identity.public();
254		let local_peer_id = local_public.to_peer_id();
255
256		Ok((local_identity, local_peer_id))
257	}
258
259	/// Configure transport protocols for `Litep2pNetworkBackend`.
260	fn configure_transport<B: BlockT + 'static, H: ExHashT>(
261		config: &FullNetworkConfiguration<B, H, Self>,
262	) -> ConfigBuilder {
263		let _ = match config.network_config.transport {
264			TransportConfig::MemoryOnly => panic!("memory transport not supported"),
265			TransportConfig::Normal { .. } => false,
266		};
267		let config_builder = ConfigBuilder::new();
268
269		let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
270			.network_config
271			.listen_addresses
272			.iter()
273			.filter_map(|address| {
274				use crate::types::multiaddr::Protocol;
275
276				let mut iter = address.iter();
277
278				match iter.next() {
279					Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
280					protocol => {
281						log::error!(
282							target: LOG_TARGET,
283							"unknown protocol {protocol:?}, ignoring {address:?}",
284						);
285
286						return None;
287					},
288				}
289
290				match iter.next() {
291					Some(Protocol::Tcp(_)) => match iter.next() {
292						Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
293							Some((None, Some(address.clone())))
294						},
295						Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
296						protocol => {
297							log::error!(
298								target: LOG_TARGET,
299								"unknown protocol {protocol:?}, ignoring {address:?}",
300							);
301							None
302						},
303					},
304					protocol => {
305						log::error!(
306							target: LOG_TARGET,
307							"unknown protocol {protocol:?}, ignoring {address:?}",
308						);
309						None
310					},
311				}
312			})
313			.unzip();
314
315		config_builder
316			.with_websocket(WebSocketTransportConfig {
317				listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
318				yamux_config: litep2p::yamux::Config::default(),
319				nodelay: true,
320				..Default::default()
321			})
322			.with_tcp(TcpTransportConfig {
323				listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
324				yamux_config: litep2p::yamux::Config::default(),
325				nodelay: true,
326				..Default::default()
327			})
328	}
329}
330
331#[async_trait::async_trait]
332impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
333	type NotificationProtocolConfig = NotificationProtocolConfig;
334	type RequestResponseProtocolConfig = RequestResponseConfig;
335	type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
336	type PeerStore = Peerstore;
337	type BitswapConfig = BitswapConfig;
338
339	fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
340	where
341		Self: Sized,
342	{
343		let (keypair, local_peer_id) =
344			Self::get_keypair(&params.network_config.network_config.node_key)?;
345		let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
346
347		params.network_config.network_config.boot_nodes = params
348			.network_config
349			.network_config
350			.boot_nodes
351			.into_iter()
352			.filter(|boot_node| boot_node.peer_id != PeerId::from(&local_peer_id))
353			.collect();
354		params.network_config.network_config.default_peers_set.reserved_nodes = params
355			.network_config
356			.network_config
357			.default_peers_set
358			.reserved_nodes
359			.into_iter()
360			.filter(|reserved_node| {
361				if reserved_node.peer_id == PeerId::from(&local_peer_id) {
362					log::warn!(
363						target: LOG_TARGET,
364						"Local peer ID used in reserved node, ignoring: {reserved_node}",
365					);
366					false
367				} else {
368					true
369				}
370			})
371			.collect();
372
373		if let Some(path) = &params.network_config.network_config.net_config_path {
374			fs::create_dir_all(path)?;
375		}
376
377		log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
378		log::info!(target: LOG_TARGET, "Running litep2p network backend");
379
380		params.network_config.sanity_check_addresses()?;
381		params.network_config.sanity_check_bootnodes()?;
382
383		let mut config_builder =
384			Self::configure_transport(&params.network_config).with_keypair(keypair.clone());
385		let known_addresses = params.network_config.known_addresses();
386		let peer_store_handle = params.network_config.peer_store_handle();
387		let executor = Arc::new(Litep2pExecutor { executor: params.executor });
388
389		let FullNetworkConfiguration {
390			notification_protocols,
391			request_response_protocols,
392			network_config,
393			..
394		} = params.network_config;
395
396		// initialize notification protocols
397		//
398		// pass the protocol configuration to `Litep2pConfigBuilder` and save the TX channel
399		// to the protocol's `Peerset` together with the protocol name to allow other subsystems
400		// of Polkadot SDK to control connectivity of the notification protocol
401		let block_announce_protocol = params.block_announce_config.protocol_name().clone();
402		let mut notif_protocols = HashMap::from_iter([(
403			params.block_announce_config.protocol_name().clone(),
404			params.block_announce_config.handle,
405		)]);
406
407		// handshake for all but the syncing protocol is set to node role
408		config_builder = notification_protocols
409			.into_iter()
410			.fold(config_builder, |config_builder, mut config| {
411				config.config.set_handshake(Roles::from(&params.role).encode());
412				notif_protocols.insert(config.protocol_name, config.handle);
413
414				config_builder.with_notification_protocol(config.config)
415			})
416			.with_notification_protocol(params.block_announce_config.config);
417
418		// initialize request-response protocols
419		let metrics = match &params.metrics_registry {
420			Some(registry) => Some(register_without_sources(registry)?),
421			None => None,
422		};
423
424		// create channels that are used to send request before initializing protocols so the
425		// senders can be passed onto all request-response protocols
426		//
427		// all protocols must have each others' senders so they can send the fallback request in
428		// case the main protocol is not supported by the remote peer and user specified a fallback
429		let (mut request_response_receivers, request_response_senders): (
430			HashMap<_, _>,
431			HashMap<_, _>,
432		) = request_response_protocols
433			.iter()
434			.map(|config| {
435				let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
436				((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
437			})
438			.unzip();
439
440		config_builder = request_response_protocols.into_iter().fold(
441			config_builder,
442			|config_builder, config| {
443				let (protocol_config, handle) = RequestResponseConfigBuilder::new(
444					Litep2pProtocolName::from(config.protocol_name.clone()),
445				)
446				.with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
447				.with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
448				.with_timeout(config.request_timeout)
449				.build();
450
451				let protocol = RequestResponseProtocol::new(
452					config.protocol_name.clone(),
453					handle,
454					Arc::clone(&peer_store_handle),
455					config.inbound_queue,
456					request_response_receivers
457						.remove(&config.protocol_name)
458						.expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
459					request_response_senders.clone(),
460					metrics.clone(),
461				);
462
463				executor.run(Box::pin(async move {
464					protocol.run().await;
465				}));
466
467				config_builder.with_request_response_protocol(protocol_config)
468			},
469		);
470
471		// collect known addresses
472		let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
473			known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
474				use crate::types::multiaddr::Protocol;
475
476				let address = match address.iter().last() {
477					Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
478						address.with(Protocol::P2p(peer.into()))
479					},
480					Some(Protocol::P2p(_)) => address,
481					_ => return acc,
482				};
483
484				acc.entry(peer.into()).or_default().push(address.into());
485				peer_store_handle.add_known_peer(peer);
486
487				acc
488			});
489
490		// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
491		let listen_addresses = Arc::new(Default::default());
492		let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
493			Discovery::new(
494				local_peer_id,
495				&network_config,
496				params.genesis_hash,
497				params.fork_id.as_deref(),
498				&params.protocol_id,
499				known_addresses.clone(),
500				Arc::clone(&listen_addresses),
501				Arc::clone(&peer_store_handle),
502			);
503
504		config_builder = config_builder
505			.with_known_addresses(known_addresses.clone().into_iter())
506			.with_libp2p_ping(ping_config)
507			.with_libp2p_identify(identify_config)
508			.with_libp2p_kademlia(kademlia_config)
509			.with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
510				Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
511			))
512			.with_keep_alive_timeout(network_config.idle_connection_timeout)
513			// Use system DNS resolver to enable intranet domain resolution and administrator
514			// control over DNS lookup.
515			.with_system_resolver()
516			.with_executor(executor);
517
518		if let Some(config) = maybe_mdns_config {
519			config_builder = config_builder.with_mdns(config);
520		}
521
522		if let Some(config) = params.bitswap_config {
523			config_builder = config_builder.with_libp2p_bitswap(config);
524		}
525
526		let litep2p =
527			Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
528
529		litep2p.listen_addresses().for_each(|address| {
530			log::debug!(target: LOG_TARGET, "listening on: {address}");
531
532			listen_addresses.write().insert(address.clone());
533		});
534
535		let public_addresses = litep2p.public_addresses();
536		for address in network_config.public_addresses.iter() {
537			if let Err(err) = public_addresses.add_address(address.clone().into()) {
538				log::warn!(
539					target: LOG_TARGET,
540					"failed to add public address {address:?}: {err:?}",
541				);
542			}
543		}
544
545		let network_service = Arc::new(Litep2pNetworkService::new(
546			local_peer_id,
547			keypair.clone(),
548			cmd_tx,
549			Arc::clone(&peer_store_handle),
550			notif_protocols.clone(),
551			block_announce_protocol.clone(),
552			request_response_senders,
553			Arc::clone(&listen_addresses),
554			public_addresses,
555		));
556
557		// register rest of the metrics now that `Litep2p` has been created
558		let num_connected = Arc::new(Default::default());
559		let bandwidth: Arc<dyn BandwidthSink> =
560			Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
561
562		if let Some(registry) = &params.metrics_registry {
563			MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
564		}
565
566		Ok(Self {
567			network_service,
568			cmd_rx,
569			metrics,
570			peerset_handles: notif_protocols,
571			num_connected,
572			discovery,
573			pending_queries: HashMap::new(),
574			peerstore_handle: peer_store_handle,
575			block_announce_protocol,
576			event_streams: out_events::OutChannels::new(None)?,
577			peers: HashMap::new(),
578			litep2p,
579		})
580	}
581
582	fn network_service(&self) -> Arc<dyn NetworkService> {
583		Arc::clone(&self.network_service)
584	}
585
586	fn peer_store(
587		bootnodes: Vec<crate::types::PeerId>,
588		metrics_registry: Option<Registry>,
589	) -> Self::PeerStore {
590		Peerstore::new(bootnodes, metrics_registry)
591	}
592
593	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
594		NotificationMetrics::new(registry)
595	}
596
597	/// Create Bitswap server.
598	fn bitswap_server(
599		client: Arc<dyn BlockBackend<B> + Send + Sync>,
600	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
601		BitswapServer::new(client)
602	}
603
604	/// Create notification protocol configuration for `protocol`.
605	fn notification_config(
606		protocol_name: ProtocolName,
607		fallback_names: Vec<ProtocolName>,
608		max_notification_size: u64,
609		handshake: Option<NotificationHandshake>,
610		set_config: SetConfig,
611		metrics: NotificationMetrics,
612		peerstore_handle: Arc<dyn PeerStoreProvider>,
613	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
614		Self::NotificationProtocolConfig::new(
615			protocol_name,
616			fallback_names,
617			max_notification_size as usize,
618			handshake,
619			set_config,
620			metrics,
621			peerstore_handle,
622		)
623	}
624
625	/// Create request-response protocol configuration.
626	fn request_response_config(
627		protocol_name: ProtocolName,
628		fallback_names: Vec<ProtocolName>,
629		max_request_size: u64,
630		max_response_size: u64,
631		request_timeout: Duration,
632		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
633	) -> Self::RequestResponseProtocolConfig {
634		Self::RequestResponseProtocolConfig::new(
635			protocol_name,
636			fallback_names,
637			max_request_size,
638			max_response_size,
639			request_timeout,
640			inbound_queue,
641		)
642	}
643
644	/// Start [`Litep2pNetworkBackend`] event loop.
645	async fn run(mut self) {
646		log::debug!(target: LOG_TARGET, "starting litep2p network backend");
647
648		loop {
649			let num_connected_peers = self
650				.peerset_handles
651				.get(&self.block_announce_protocol)
652				.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
653			self.num_connected.store(num_connected_peers, Ordering::Relaxed);
654
655			tokio::select! {
656				command = self.cmd_rx.next() => match command {
657					None => return,
658					Some(command) => match command {
659						NetworkServiceCommand::FindClosestPeers { target } => {
660							let query_id = self.discovery.find_node(target.into()).await;
661							self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
662						}
663						NetworkServiceCommand::GetValue{ key } => {
664							let query_id = self.discovery.get_value(key.clone()).await;
665							self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
666						}
667						NetworkServiceCommand::PutValue { key, value } => {
668							let query_id = self.discovery.put_value(key.clone(), value).await;
669							self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
670						}
671						NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
672							let kademlia_key = record.key.clone();
673							let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
674							self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
675						}
676						NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
677							self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
678						}
679						NetworkServiceCommand::StartProviding { key } => {
680							let query_id = self.discovery.start_providing(key.clone()).await;
681							self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
682						}
683						NetworkServiceCommand::StopProviding { key } => {
684							self.discovery.stop_providing(key).await;
685						}
686						NetworkServiceCommand::GetProviders { key } => {
687							let query_id = self.discovery.get_providers(key.clone()).await;
688							self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
689						}
690						NetworkServiceCommand::EventStream { tx } => {
691							self.event_streams.push(tx);
692						}
693						NetworkServiceCommand::Status { tx } => {
694							let _ = tx.send(NetworkStatus {
695								num_connected_peers: self
696									.peerset_handles
697									.get(&self.block_announce_protocol)
698									.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
699								total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
700								total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
701							});
702						}
703						NetworkServiceCommand::AddPeersToReservedSet {
704							protocol,
705							peers,
706						} => {
707							let peers = self.add_addresses(peers.into_iter().map(Into::into));
708
709							match self.peerset_handles.get(&protocol) {
710								Some(handle) => {
711									let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
712								}
713								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
714							};
715						}
716						NetworkServiceCommand::AddKnownAddress { peer, address } => {
717							let mut address: Multiaddr = address.into();
718
719							if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
720								address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
721							}
722
723							if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
724								// libp2p backend generates `DiscoveryOut::Discovered(peer_id)`
725								// event when a new address is added for a peer, which leads to the
726								// peer being added to peerstore. Do the same directly here.
727								self.peerstore_handle.add_known_peer(peer);
728							} else {
729								log::debug!(
730									target: LOG_TARGET,
731									"couldn't add known address ({address}) for {peer:?}, unsupported transport"
732								);
733							}
734						},
735						NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
736							let peers = self.add_addresses(peers.into_iter().map(Into::into));
737
738							match self.peerset_handles.get(&protocol) {
739								Some(handle) => {
740									let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
741								}
742								None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
743							}
744
745						},
746						NetworkServiceCommand::DisconnectPeer {
747							protocol,
748							peer,
749						} => {
750							let Some(handle) = self.peerset_handles.get(&protocol) else {
751								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
752								continue
753							};
754
755							let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
756						}
757						NetworkServiceCommand::SetReservedOnly {
758							protocol,
759							reserved_only,
760						} => {
761							let Some(handle) = self.peerset_handles.get(&protocol) else {
762								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
763								continue
764							};
765
766							let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
767						}
768						NetworkServiceCommand::RemoveReservedPeers {
769							protocol,
770							peers,
771						} => {
772							let Some(handle) = self.peerset_handles.get(&protocol) else {
773								log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
774								continue
775							};
776
777							let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
778						}
779					}
780				},
781				event = self.discovery.next() => match event {
782					None => return,
783					Some(DiscoveryEvent::Discovered { addresses }) => {
784						// if at least one address was added for the peer, report the peer to `Peerstore`
785						for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
786							if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
787								self.peerstore_handle.add_known_peer(peer);
788							}
789						}
790					}
791					Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
792						for peer in peers {
793							self.peerstore_handle.add_known_peer(peer.into());
794						}
795					}
796					Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
797						match self.pending_queries.remove(&query_id) {
798							Some(KadQuery::FindNode(_, started)) => {
799								log::trace!(
800									target: LOG_TARGET,
801									"`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
802								);
803
804								self.event_streams.send(
805									Event::Dht(
806										DhtEvent::ClosestPeersFound(
807											target.into(),
808											peers
809												.into_iter()
810												.map(|(peer, addrs)| (
811													peer.into(),
812													addrs.into_iter().map(Into::into).collect(),
813												))
814												.collect(),
815										)
816									)
817								);
818
819								if let Some(ref metrics) = self.metrics {
820									metrics
821										.kademlia_query_duration
822										.with_label_values(&["node-find"])
823										.observe(started.elapsed().as_secs_f64());
824								}
825							},
826							query => {
827								log::error!(
828									target: LOG_TARGET,
829									"Missing/invalid pending query for `FIND_NODE`: {query:?}"
830								);
831								debug_assert!(false);
832							}
833						}
834					},
835					Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
836						if !self.pending_queries.contains_key(&query_id) {
837							log::error!(
838								target: LOG_TARGET,
839								"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
840							);
841
842							continue
843						}
844
845						let peer_id: crate::types::PeerId = record.peer.into();
846						let record = PeerRecord {
847							record: P2PRecord {
848								key: record.record.key.to_vec().into(),
849								value: record.record.value,
850								publisher: record.record.publisher.map(|peer_id| {
851									let peer_id: crate::types::PeerId = peer_id.into();
852									peer_id.into()
853								}),
854								expires: record.record.expires,
855							},
856							peer: Some(peer_id.into()),
857						};
858
859						self.event_streams.send(
860							Event::Dht(
861								DhtEvent::ValueFound(
862									record.into()
863								)
864							)
865						);
866					}
867					Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
868						match self.pending_queries.remove(&query_id) {
869							Some(KadQuery::GetValue(key, started)) => {
870								log::trace!(
871									target: LOG_TARGET,
872									"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
873								);
874
875								if let Some(ref metrics) = self.metrics {
876									metrics
877										.kademlia_query_duration
878										.with_label_values(&["value-get"])
879										.observe(started.elapsed().as_secs_f64());
880								}
881							},
882							query => {
883								log::error!(
884									target: LOG_TARGET,
885									"Missing/invalid pending query for `GET_VALUE`: {query:?}"
886								);
887								debug_assert!(false);
888							},
889						}
890					}
891					Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
892						match self.pending_queries.remove(&query_id) {
893							Some(KadQuery::PutValue(key, started)) => {
894								log::trace!(
895									target: LOG_TARGET,
896									"`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
897								);
898
899								self.event_streams.send(Event::Dht(
900									DhtEvent::ValuePut(key)
901								));
902
903								if let Some(ref metrics) = self.metrics {
904									metrics
905										.kademlia_query_duration
906										.with_label_values(&["value-put"])
907										.observe(started.elapsed().as_secs_f64());
908								}
909							},
910							query => {
911								log::error!(
912									target: LOG_TARGET,
913									"Missing/invalid pending query for `PUT_VALUE`: {query:?}"
914								);
915								debug_assert!(false);
916							}
917						}
918					}
919					Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
920						match self.pending_queries.remove(&query_id) {
921							Some(KadQuery::GetProviders(key, started)) => {
922								log::trace!(
923									target: LOG_TARGET,
924									"`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
925								);
926
927								// We likely requested providers to connect to them,
928								// so let's add their addresses to litep2p's transport manager.
929								// Consider also looking the addresses of providers up with `FIND_NODE`
930								// query, as it can yield more up to date addresses.
931								providers.iter().for_each(|p| {
932									self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
933								});
934
935								self.event_streams.send(Event::Dht(
936									DhtEvent::ProvidersFound(
937										key.clone().into(),
938										providers.into_iter().map(|p| p.peer.into()).collect()
939									)
940								));
941
942								// litep2p returns all providers in a single event, so we let
943								// subscribers know no more providers will be yielded.
944								self.event_streams.send(Event::Dht(
945									DhtEvent::NoMoreProviders(key.into())
946								));
947
948								if let Some(ref metrics) = self.metrics {
949									metrics
950										.kademlia_query_duration
951										.with_label_values(&["providers-get"])
952										.observe(started.elapsed().as_secs_f64());
953								}
954							},
955							query => {
956								log::error!(
957									target: LOG_TARGET,
958									"Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
959								);
960								debug_assert!(false);
961							}
962						}
963					}
964					Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
965						match self.pending_queries.remove(&query_id) {
966							Some(KadQuery::AddProvider(key, started)) => {
967								debug_assert_eq!(key, provided_key.into());
968
969								log::trace!(
970									target: LOG_TARGET,
971									"`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
972								);
973
974								self.event_streams.send(Event::Dht(
975									DhtEvent::StartedProviding(key.into())
976								));
977
978								if let Some(ref metrics) = self.metrics {
979									metrics
980										.kademlia_query_duration
981										.with_label_values(&["provider-add"])
982										.observe(started.elapsed().as_secs_f64());
983								}
984							}
985							Some(_) => {
986								log::error!(
987									target: LOG_TARGET,
988									"Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
989								);
990								debug_assert!(false);
991							}
992							None => {
993								log::trace!(
994									target: LOG_TARGET,
995									"`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
996								);
997							}
998						}
999					}
1000					Some(DiscoveryEvent::QueryFailed { query_id }) => {
1001						match self.pending_queries.remove(&query_id) {
1002							Some(KadQuery::FindNode(peer_id, started)) => {
1003								log::debug!(
1004									target: LOG_TARGET,
1005									"`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1006								);
1007
1008								self.event_streams.send(Event::Dht(
1009									DhtEvent::ClosestPeersNotFound(peer_id.into())
1010								));
1011
1012								if let Some(ref metrics) = self.metrics {
1013									metrics
1014										.kademlia_query_duration
1015										.with_label_values(&["node-find-failed"])
1016										.observe(started.elapsed().as_secs_f64());
1017								}
1018							},
1019							Some(KadQuery::GetValue(key, started)) => {
1020								log::debug!(
1021									target: LOG_TARGET,
1022									"`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1023								);
1024
1025								self.event_streams.send(Event::Dht(
1026									DhtEvent::ValueNotFound(key)
1027								));
1028
1029								if let Some(ref metrics) = self.metrics {
1030									metrics
1031										.kademlia_query_duration
1032										.with_label_values(&["value-get-failed"])
1033										.observe(started.elapsed().as_secs_f64());
1034								}
1035							},
1036							Some(KadQuery::PutValue(key, started)) => {
1037								log::debug!(
1038									target: LOG_TARGET,
1039									"`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1040								);
1041
1042								self.event_streams.send(Event::Dht(
1043									DhtEvent::ValuePutFailed(key)
1044								));
1045
1046								if let Some(ref metrics) = self.metrics {
1047									metrics
1048										.kademlia_query_duration
1049										.with_label_values(&["value-put-failed"])
1050										.observe(started.elapsed().as_secs_f64());
1051								}
1052							},
1053							Some(KadQuery::GetProviders(key, started)) => {
1054								log::debug!(
1055									target: LOG_TARGET,
1056									"`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1057								);
1058
1059								self.event_streams.send(Event::Dht(
1060									DhtEvent::ProvidersNotFound(key)
1061								));
1062
1063								if let Some(ref metrics) = self.metrics {
1064									metrics
1065										.kademlia_query_duration
1066										.with_label_values(&["providers-get-failed"])
1067										.observe(started.elapsed().as_secs_f64());
1068								}
1069							},
1070							Some(KadQuery::AddProvider(key, started)) => {
1071								log::debug!(
1072									target: LOG_TARGET,
1073									"`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1074								);
1075
1076								self.event_streams.send(Event::Dht(
1077									DhtEvent::StartProvidingFailed(key)
1078								));
1079
1080								if let Some(ref metrics) = self.metrics {
1081									metrics
1082										.kademlia_query_duration
1083										.with_label_values(&["provider-add-failed"])
1084										.observe(started.elapsed().as_secs_f64());
1085								}
1086							},
1087							None => {
1088								log::debug!(
1089									target: LOG_TARGET,
1090									"non-existent query (likely republishing a provider) failed ({query_id:?})",
1091								);
1092							}
1093						}
1094					}
1095					Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1096						self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1097					}
1098					Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1099						match self.litep2p.public_addresses().add_address(address.clone().into()) {
1100							Ok(inserted) => if inserted {
1101								log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1102							},
1103							Err(err) => {
1104								log::warn!(
1105									target: LOG_TARGET,
1106									"🔍 Failed to add discovered external address {address:?}: {err:?}",
1107								);
1108							},
1109						}
1110					}
1111					Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1112						let local_peer_id = self.litep2p.local_peer_id();
1113
1114						// Litep2p requires the peer ID to be present in the address.
1115						let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1116							address.with(Protocol::P2p(*local_peer_id.as_ref()))
1117						} else {
1118							address
1119						};
1120
1121						if self.litep2p.public_addresses().remove_address(&address) {
1122							log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1123						} else {
1124							log::warn!(
1125								target: LOG_TARGET,
1126								"🔍 Failed to remove expired external address {address:?}"
1127							);
1128						}
1129					}
1130					Some(DiscoveryEvent::Ping { peer, rtt }) => {
1131						log::trace!(
1132							target: LOG_TARGET,
1133							"ping time with {peer:?}: {rtt:?}",
1134						);
1135					}
1136					Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1137						self.event_streams.send(Event::Dht(
1138							DhtEvent::PutRecordRequest(
1139								key.into(),
1140								value,
1141								publisher.map(Into::into),
1142								expires,
1143							)
1144						));
1145					},
1146
1147					Some(DiscoveryEvent::RandomKademliaStarted) => {
1148						if let Some(metrics) = self.metrics.as_ref() {
1149							metrics.kademlia_random_queries_total.inc();
1150						}
1151					}
1152				},
1153				event = self.litep2p.next_event() => match event {
1154					Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1155						let Some(metrics) = &self.metrics else {
1156							continue;
1157						};
1158
1159						let direction = match endpoint {
1160							Endpoint::Dialer { .. } => "out",
1161							Endpoint::Listener { .. } => {
1162								// Increment incoming connections counter.
1163								//
1164								// Note: For litep2p these are represented by established negotiated connections,
1165								// while for libp2p (legacy) these represent not-yet-negotiated connections.
1166								metrics.incoming_connections_total.inc();
1167
1168								"in"
1169							},
1170						};
1171						metrics.connections_opened_total.with_label_values(&[direction]).inc();
1172
1173						match self.peers.entry(peer) {
1174							Entry::Vacant(entry) => {
1175								entry.insert(ConnectionContext {
1176									endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1177									num_connections: 1usize,
1178								});
1179								metrics.distinct_peers_connections_opened_total.inc();
1180							}
1181							Entry::Occupied(entry) => {
1182								let entry = entry.into_mut();
1183								entry.num_connections += 1;
1184								entry.endpoints.insert(endpoint.connection_id(), endpoint);
1185							}
1186						}
1187					}
1188					Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1189						let Some(metrics) = &self.metrics else {
1190							continue;
1191						};
1192
1193						let Some(context) = self.peers.get_mut(&peer) else {
1194							log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1195							continue
1196						};
1197
1198						let direction = match context.endpoints.remove(&connection_id) {
1199							None => {
1200								log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1201								continue
1202							}
1203							Some(endpoint) => {
1204								context.num_connections -= 1;
1205
1206								match endpoint {
1207									Endpoint::Dialer { .. } => "out",
1208									Endpoint::Listener { .. } => "in",
1209								}
1210							}
1211						};
1212
1213						metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1214
1215						if context.num_connections == 0 {
1216							self.peers.remove(&peer);
1217							metrics.distinct_peers_connections_closed_total.inc();
1218						}
1219					}
1220					Some(Litep2pEvent::DialFailure { address, error }) => {
1221						log::debug!(
1222							target: LOG_TARGET,
1223							"failed to dial peer at {address:?}: {error:?}",
1224						);
1225
1226						if let Some(metrics) = &self.metrics {
1227							let reason = match error {
1228								DialError::Timeout => "timeout",
1229								DialError::AddressError(_) => "invalid-address",
1230								DialError::DnsError(_) => "cannot-resolve-dns",
1231								DialError::NegotiationError(error) => match error {
1232									NegotiationError::Timeout => "timeout",
1233									NegotiationError::PeerIdMissing => "missing-peer-id",
1234									NegotiationError::StateMismatch => "state-mismatch",
1235									NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1236									NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1237									NegotiationError::SnowError(_) => "noise-error",
1238									NegotiationError::ParseError(_) => "parse-error",
1239									NegotiationError::IoError(_) => "io-error",
1240									NegotiationError::WebSocket(_) => "webscoket-error",
1241									NegotiationError::BadSignature => "bad-signature",
1242								}
1243							};
1244
1245							metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1246						}
1247					}
1248					Some(Litep2pEvent::ListDialFailures { errors }) => {
1249						log::debug!(
1250							target: LOG_TARGET,
1251							"failed to dial peer on multiple addresses {errors:?}",
1252						);
1253
1254						if let Some(metrics) = &self.metrics {
1255							metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1256						}
1257					}
1258					None => {
1259						log::error!(
1260								target: LOG_TARGET,
1261								"Litep2p backend terminated"
1262						);
1263						return
1264					}
1265				},
1266			}
1267		}
1268	}
1269}