1use crate::{
10 config::{
11 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
12 SetConfig, TransportConfig,
13 },
14 error::Error,
15 event::{DhtEvent, Event},
16 litep2p::{
17 discovery::{Discovery, DiscoveryEvent},
18 peerstore::Peerstore,
19 service::{Litep2pNetworkService, NetworkServiceCommand},
20 shim::{
21 bitswap::BitswapServer,
22 notification::{
23 config::{NotificationProtocolConfig, ProtocolControlHandle},
24 peerset::PeersetCommand,
25 },
26 request_response::{RequestResponseConfig, RequestResponseProtocol},
27 },
28 },
29 peer_store::PeerStoreProvider,
30 service::{
31 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
32 out_events,
33 traits::{BandwidthSink, NetworkBackend, NetworkService},
34 },
35 NetworkStatus, NotificationService, ProtocolName,
36};
37
38use crate::types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
39use codec::Encode;
40use futures::StreamExt;
41use litep2p::{
42 config::ConfigBuilder,
43 crypto::ed25519::Keypair,
44 error::{DialError, NegotiationError},
45 executor::Executor,
46 protocol::{
47 libp2p::{
48 bitswap::Config as BitswapConfig,
49 kademlia::{QueryId, Record},
50 },
51 request_response::ConfigBuilder as RequestResponseConfigBuilder,
52 },
53 transport::{
54 tcp::config::Config as TcpTransportConfig,
55 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
56 },
57 types::{
58 multiaddr::{Multiaddr, Protocol},
59 ConnectionId,
60 },
61 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
62};
63use soil_prometheus::Registry;
64
65use crate::common::{role::Roles, ExHashT};
66use crate::types::PeerId;
67use soil_client::client_api::BlockBackend;
68use soil_client::utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
69use subsoil::runtime::traits::Block as BlockT;
70
71use std::{
72 cmp,
73 collections::{hash_map::Entry, HashMap, HashSet},
74 fs,
75 future::Future,
76 iter,
77 pin::Pin,
78 sync::{
79 atomic::{AtomicUsize, Ordering},
80 Arc,
81 },
82 time::{Duration, Instant},
83};
84
85mod discovery;
86mod peerstore;
87mod service;
88mod shim;
89
90struct Litep2pBandwidthSink {
92 sink: litep2p::BandwidthSink,
93}
94
95impl BandwidthSink for Litep2pBandwidthSink {
96 fn total_inbound(&self) -> u64 {
97 self.sink.inbound() as u64
98 }
99
100 fn total_outbound(&self) -> u64 {
101 self.sink.outbound() as u64
102 }
103}
104
105struct Litep2pExecutor {
107 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
109}
110
111impl Executor for Litep2pExecutor {
112 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
113 (self.executor)(future)
114 }
115
116 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
117 (self.executor)(future)
118 }
119}
120
121const LOG_TARGET: &str = "sub-libp2p";
123
124struct ConnectionContext {
126 endpoints: HashMap<ConnectionId, Endpoint>,
128
129 num_connections: usize,
131}
132
133#[derive(Debug)]
135enum KadQuery {
136 FindNode(PeerId, Instant),
138 GetValue(RecordKey, Instant),
140 PutValue(RecordKey, Instant),
142 GetProviders(RecordKey, Instant),
144 AddProvider(RecordKey, Instant),
146}
147
148pub struct Litep2pNetworkBackend {
150 litep2p: Litep2p,
152
153 network_service: Arc<dyn NetworkService>,
155
156 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
158
159 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
161
162 pending_queries: HashMap<QueryId, KadQuery>,
164
165 discovery: Discovery,
167
168 num_connected: Arc<AtomicUsize>,
170
171 peers: HashMap<litep2p::PeerId, ConnectionContext>,
173
174 peerstore_handle: Arc<dyn PeerStoreProvider>,
176
177 block_announce_protocol: ProtocolName,
179
180 event_streams: out_events::OutChannels,
182
183 metrics: Option<Metrics>,
185}
186
187impl Litep2pNetworkBackend {
188 fn parse_addresses(
191 addresses: impl Iterator<Item = Multiaddr>,
192 ) -> HashMap<PeerId, Vec<Multiaddr>> {
193 addresses
194 .into_iter()
195 .filter_map(|address| match address.iter().next() {
196 Some(
197 Protocol::Dns(_)
198 | Protocol::Dns4(_)
199 | Protocol::Dns6(_)
200 | Protocol::Ip6(_)
201 | Protocol::Ip4(_),
202 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
203 {
204 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
205 .map_or(None, |peer| Some((peer, Some(address)))),
206 _ => None,
207 },
208 Some(Protocol::P2p(multihash)) => {
209 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None)))
210 },
211 _ => None,
212 })
213 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
214 let entry = acc.entry(peer).or_default();
215 maybe_address.map(|address| entry.push(address));
216
217 acc
218 })
219 }
220
221 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
223 Self::parse_addresses(peers.into_iter())
224 .into_iter()
225 .filter_map(|(peer, addresses)| {
226 if addresses.is_empty() {
228 return Some(peer);
229 }
230
231 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
232 log::warn!(
233 target: LOG_TARGET,
234 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
235 );
236 return None;
237 }
238
239 self.peerstore_handle.add_known_peer(peer);
240 Some(peer)
241 })
242 .collect()
243 }
244}
245
246impl Litep2pNetworkBackend {
247 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
249 let secret: litep2p::crypto::ed25519::SecretKey =
250 node_key.clone().into_keypair()?.secret().into();
251
252 let local_identity = Keypair::from(secret);
253 let local_public = local_identity.public();
254 let local_peer_id = local_public.to_peer_id();
255
256 Ok((local_identity, local_peer_id))
257 }
258
259 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
261 config: &FullNetworkConfiguration<B, H, Self>,
262 ) -> ConfigBuilder {
263 let _ = match config.network_config.transport {
264 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
265 TransportConfig::Normal { .. } => false,
266 };
267 let config_builder = ConfigBuilder::new();
268
269 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
270 .network_config
271 .listen_addresses
272 .iter()
273 .filter_map(|address| {
274 use crate::types::multiaddr::Protocol;
275
276 let mut iter = address.iter();
277
278 match iter.next() {
279 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
280 protocol => {
281 log::error!(
282 target: LOG_TARGET,
283 "unknown protocol {protocol:?}, ignoring {address:?}",
284 );
285
286 return None;
287 },
288 }
289
290 match iter.next() {
291 Some(Protocol::Tcp(_)) => match iter.next() {
292 Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
293 Some((None, Some(address.clone())))
294 },
295 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
296 protocol => {
297 log::error!(
298 target: LOG_TARGET,
299 "unknown protocol {protocol:?}, ignoring {address:?}",
300 );
301 None
302 },
303 },
304 protocol => {
305 log::error!(
306 target: LOG_TARGET,
307 "unknown protocol {protocol:?}, ignoring {address:?}",
308 );
309 None
310 },
311 }
312 })
313 .unzip();
314
315 config_builder
316 .with_websocket(WebSocketTransportConfig {
317 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
318 yamux_config: litep2p::yamux::Config::default(),
319 nodelay: true,
320 ..Default::default()
321 })
322 .with_tcp(TcpTransportConfig {
323 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
324 yamux_config: litep2p::yamux::Config::default(),
325 nodelay: true,
326 ..Default::default()
327 })
328 }
329}
330
331#[async_trait::async_trait]
332impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
333 type NotificationProtocolConfig = NotificationProtocolConfig;
334 type RequestResponseProtocolConfig = RequestResponseConfig;
335 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
336 type PeerStore = Peerstore;
337 type BitswapConfig = BitswapConfig;
338
339 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
340 where
341 Self: Sized,
342 {
343 let (keypair, local_peer_id) =
344 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
345 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
346
347 params.network_config.network_config.boot_nodes = params
348 .network_config
349 .network_config
350 .boot_nodes
351 .into_iter()
352 .filter(|boot_node| boot_node.peer_id != PeerId::from(&local_peer_id))
353 .collect();
354 params.network_config.network_config.default_peers_set.reserved_nodes = params
355 .network_config
356 .network_config
357 .default_peers_set
358 .reserved_nodes
359 .into_iter()
360 .filter(|reserved_node| {
361 if reserved_node.peer_id == PeerId::from(&local_peer_id) {
362 log::warn!(
363 target: LOG_TARGET,
364 "Local peer ID used in reserved node, ignoring: {reserved_node}",
365 );
366 false
367 } else {
368 true
369 }
370 })
371 .collect();
372
373 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
374 fs::create_dir_all(path)?;
375 }
376
377 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
378 log::info!(target: LOG_TARGET, "Running litep2p network backend");
379
380 params.network_config.sanity_check_addresses()?;
381 params.network_config.sanity_check_bootnodes()?;
382
383 let mut config_builder =
384 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
385 let known_addresses = params.network_config.known_addresses();
386 let peer_store_handle = params.network_config.peer_store_handle();
387 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
388
389 let FullNetworkConfiguration {
390 notification_protocols,
391 request_response_protocols,
392 network_config,
393 ..
394 } = params.network_config;
395
396 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
402 let mut notif_protocols = HashMap::from_iter([(
403 params.block_announce_config.protocol_name().clone(),
404 params.block_announce_config.handle,
405 )]);
406
407 config_builder = notification_protocols
409 .into_iter()
410 .fold(config_builder, |config_builder, mut config| {
411 config.config.set_handshake(Roles::from(¶ms.role).encode());
412 notif_protocols.insert(config.protocol_name, config.handle);
413
414 config_builder.with_notification_protocol(config.config)
415 })
416 .with_notification_protocol(params.block_announce_config.config);
417
418 let metrics = match ¶ms.metrics_registry {
420 Some(registry) => Some(register_without_sources(registry)?),
421 None => None,
422 };
423
424 let (mut request_response_receivers, request_response_senders): (
430 HashMap<_, _>,
431 HashMap<_, _>,
432 ) = request_response_protocols
433 .iter()
434 .map(|config| {
435 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
436 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
437 })
438 .unzip();
439
440 config_builder = request_response_protocols.into_iter().fold(
441 config_builder,
442 |config_builder, config| {
443 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
444 Litep2pProtocolName::from(config.protocol_name.clone()),
445 )
446 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
447 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
448 .with_timeout(config.request_timeout)
449 .build();
450
451 let protocol = RequestResponseProtocol::new(
452 config.protocol_name.clone(),
453 handle,
454 Arc::clone(&peer_store_handle),
455 config.inbound_queue,
456 request_response_receivers
457 .remove(&config.protocol_name)
458 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
459 request_response_senders.clone(),
460 metrics.clone(),
461 );
462
463 executor.run(Box::pin(async move {
464 protocol.run().await;
465 }));
466
467 config_builder.with_request_response_protocol(protocol_config)
468 },
469 );
470
471 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
473 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
474 use crate::types::multiaddr::Protocol;
475
476 let address = match address.iter().last() {
477 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
478 address.with(Protocol::P2p(peer.into()))
479 },
480 Some(Protocol::P2p(_)) => address,
481 _ => return acc,
482 };
483
484 acc.entry(peer.into()).or_default().push(address.into());
485 peer_store_handle.add_known_peer(peer);
486
487 acc
488 });
489
490 let listen_addresses = Arc::new(Default::default());
492 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
493 Discovery::new(
494 local_peer_id,
495 &network_config,
496 params.genesis_hash,
497 params.fork_id.as_deref(),
498 ¶ms.protocol_id,
499 known_addresses.clone(),
500 Arc::clone(&listen_addresses),
501 Arc::clone(&peer_store_handle),
502 );
503
504 config_builder = config_builder
505 .with_known_addresses(known_addresses.clone().into_iter())
506 .with_libp2p_ping(ping_config)
507 .with_libp2p_identify(identify_config)
508 .with_libp2p_kademlia(kademlia_config)
509 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
510 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
511 ))
512 .with_keep_alive_timeout(network_config.idle_connection_timeout)
513 .with_system_resolver()
516 .with_executor(executor);
517
518 if let Some(config) = maybe_mdns_config {
519 config_builder = config_builder.with_mdns(config);
520 }
521
522 if let Some(config) = params.bitswap_config {
523 config_builder = config_builder.with_libp2p_bitswap(config);
524 }
525
526 let litep2p =
527 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
528
529 litep2p.listen_addresses().for_each(|address| {
530 log::debug!(target: LOG_TARGET, "listening on: {address}");
531
532 listen_addresses.write().insert(address.clone());
533 });
534
535 let public_addresses = litep2p.public_addresses();
536 for address in network_config.public_addresses.iter() {
537 if let Err(err) = public_addresses.add_address(address.clone().into()) {
538 log::warn!(
539 target: LOG_TARGET,
540 "failed to add public address {address:?}: {err:?}",
541 );
542 }
543 }
544
545 let network_service = Arc::new(Litep2pNetworkService::new(
546 local_peer_id,
547 keypair.clone(),
548 cmd_tx,
549 Arc::clone(&peer_store_handle),
550 notif_protocols.clone(),
551 block_announce_protocol.clone(),
552 request_response_senders,
553 Arc::clone(&listen_addresses),
554 public_addresses,
555 ));
556
557 let num_connected = Arc::new(Default::default());
559 let bandwidth: Arc<dyn BandwidthSink> =
560 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
561
562 if let Some(registry) = ¶ms.metrics_registry {
563 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
564 }
565
566 Ok(Self {
567 network_service,
568 cmd_rx,
569 metrics,
570 peerset_handles: notif_protocols,
571 num_connected,
572 discovery,
573 pending_queries: HashMap::new(),
574 peerstore_handle: peer_store_handle,
575 block_announce_protocol,
576 event_streams: out_events::OutChannels::new(None)?,
577 peers: HashMap::new(),
578 litep2p,
579 })
580 }
581
582 fn network_service(&self) -> Arc<dyn NetworkService> {
583 Arc::clone(&self.network_service)
584 }
585
586 fn peer_store(
587 bootnodes: Vec<crate::types::PeerId>,
588 metrics_registry: Option<Registry>,
589 ) -> Self::PeerStore {
590 Peerstore::new(bootnodes, metrics_registry)
591 }
592
593 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
594 NotificationMetrics::new(registry)
595 }
596
597 fn bitswap_server(
599 client: Arc<dyn BlockBackend<B> + Send + Sync>,
600 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
601 BitswapServer::new(client)
602 }
603
604 fn notification_config(
606 protocol_name: ProtocolName,
607 fallback_names: Vec<ProtocolName>,
608 max_notification_size: u64,
609 handshake: Option<NotificationHandshake>,
610 set_config: SetConfig,
611 metrics: NotificationMetrics,
612 peerstore_handle: Arc<dyn PeerStoreProvider>,
613 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
614 Self::NotificationProtocolConfig::new(
615 protocol_name,
616 fallback_names,
617 max_notification_size as usize,
618 handshake,
619 set_config,
620 metrics,
621 peerstore_handle,
622 )
623 }
624
625 fn request_response_config(
627 protocol_name: ProtocolName,
628 fallback_names: Vec<ProtocolName>,
629 max_request_size: u64,
630 max_response_size: u64,
631 request_timeout: Duration,
632 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
633 ) -> Self::RequestResponseProtocolConfig {
634 Self::RequestResponseProtocolConfig::new(
635 protocol_name,
636 fallback_names,
637 max_request_size,
638 max_response_size,
639 request_timeout,
640 inbound_queue,
641 )
642 }
643
644 async fn run(mut self) {
646 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
647
648 loop {
649 let num_connected_peers = self
650 .peerset_handles
651 .get(&self.block_announce_protocol)
652 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
653 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
654
655 tokio::select! {
656 command = self.cmd_rx.next() => match command {
657 None => return,
658 Some(command) => match command {
659 NetworkServiceCommand::FindClosestPeers { target } => {
660 let query_id = self.discovery.find_node(target.into()).await;
661 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
662 }
663 NetworkServiceCommand::GetValue{ key } => {
664 let query_id = self.discovery.get_value(key.clone()).await;
665 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
666 }
667 NetworkServiceCommand::PutValue { key, value } => {
668 let query_id = self.discovery.put_value(key.clone(), value).await;
669 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
670 }
671 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
672 let kademlia_key = record.key.clone();
673 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
674 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
675 }
676 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
677 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
678 }
679 NetworkServiceCommand::StartProviding { key } => {
680 let query_id = self.discovery.start_providing(key.clone()).await;
681 self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
682 }
683 NetworkServiceCommand::StopProviding { key } => {
684 self.discovery.stop_providing(key).await;
685 }
686 NetworkServiceCommand::GetProviders { key } => {
687 let query_id = self.discovery.get_providers(key.clone()).await;
688 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
689 }
690 NetworkServiceCommand::EventStream { tx } => {
691 self.event_streams.push(tx);
692 }
693 NetworkServiceCommand::Status { tx } => {
694 let _ = tx.send(NetworkStatus {
695 num_connected_peers: self
696 .peerset_handles
697 .get(&self.block_announce_protocol)
698 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
699 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
700 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
701 });
702 }
703 NetworkServiceCommand::AddPeersToReservedSet {
704 protocol,
705 peers,
706 } => {
707 let peers = self.add_addresses(peers.into_iter().map(Into::into));
708
709 match self.peerset_handles.get(&protocol) {
710 Some(handle) => {
711 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
712 }
713 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
714 };
715 }
716 NetworkServiceCommand::AddKnownAddress { peer, address } => {
717 let mut address: Multiaddr = address.into();
718
719 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
720 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
721 }
722
723 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
724 self.peerstore_handle.add_known_peer(peer);
728 } else {
729 log::debug!(
730 target: LOG_TARGET,
731 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
732 );
733 }
734 },
735 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
736 let peers = self.add_addresses(peers.into_iter().map(Into::into));
737
738 match self.peerset_handles.get(&protocol) {
739 Some(handle) => {
740 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
741 }
742 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
743 }
744
745 },
746 NetworkServiceCommand::DisconnectPeer {
747 protocol,
748 peer,
749 } => {
750 let Some(handle) = self.peerset_handles.get(&protocol) else {
751 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
752 continue
753 };
754
755 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
756 }
757 NetworkServiceCommand::SetReservedOnly {
758 protocol,
759 reserved_only,
760 } => {
761 let Some(handle) = self.peerset_handles.get(&protocol) else {
762 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
763 continue
764 };
765
766 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
767 }
768 NetworkServiceCommand::RemoveReservedPeers {
769 protocol,
770 peers,
771 } => {
772 let Some(handle) = self.peerset_handles.get(&protocol) else {
773 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
774 continue
775 };
776
777 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
778 }
779 }
780 },
781 event = self.discovery.next() => match event {
782 None => return,
783 Some(DiscoveryEvent::Discovered { addresses }) => {
784 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
786 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
787 self.peerstore_handle.add_known_peer(peer);
788 }
789 }
790 }
791 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
792 for peer in peers {
793 self.peerstore_handle.add_known_peer(peer.into());
794 }
795 }
796 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
797 match self.pending_queries.remove(&query_id) {
798 Some(KadQuery::FindNode(_, started)) => {
799 log::trace!(
800 target: LOG_TARGET,
801 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
802 );
803
804 self.event_streams.send(
805 Event::Dht(
806 DhtEvent::ClosestPeersFound(
807 target.into(),
808 peers
809 .into_iter()
810 .map(|(peer, addrs)| (
811 peer.into(),
812 addrs.into_iter().map(Into::into).collect(),
813 ))
814 .collect(),
815 )
816 )
817 );
818
819 if let Some(ref metrics) = self.metrics {
820 metrics
821 .kademlia_query_duration
822 .with_label_values(&["node-find"])
823 .observe(started.elapsed().as_secs_f64());
824 }
825 },
826 query => {
827 log::error!(
828 target: LOG_TARGET,
829 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
830 );
831 debug_assert!(false);
832 }
833 }
834 },
835 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
836 if !self.pending_queries.contains_key(&query_id) {
837 log::error!(
838 target: LOG_TARGET,
839 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
840 );
841
842 continue
843 }
844
845 let peer_id: crate::types::PeerId = record.peer.into();
846 let record = PeerRecord {
847 record: P2PRecord {
848 key: record.record.key.to_vec().into(),
849 value: record.record.value,
850 publisher: record.record.publisher.map(|peer_id| {
851 let peer_id: crate::types::PeerId = peer_id.into();
852 peer_id.into()
853 }),
854 expires: record.record.expires,
855 },
856 peer: Some(peer_id.into()),
857 };
858
859 self.event_streams.send(
860 Event::Dht(
861 DhtEvent::ValueFound(
862 record.into()
863 )
864 )
865 );
866 }
867 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
868 match self.pending_queries.remove(&query_id) {
869 Some(KadQuery::GetValue(key, started)) => {
870 log::trace!(
871 target: LOG_TARGET,
872 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
873 );
874
875 if let Some(ref metrics) = self.metrics {
876 metrics
877 .kademlia_query_duration
878 .with_label_values(&["value-get"])
879 .observe(started.elapsed().as_secs_f64());
880 }
881 },
882 query => {
883 log::error!(
884 target: LOG_TARGET,
885 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
886 );
887 debug_assert!(false);
888 },
889 }
890 }
891 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
892 match self.pending_queries.remove(&query_id) {
893 Some(KadQuery::PutValue(key, started)) => {
894 log::trace!(
895 target: LOG_TARGET,
896 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
897 );
898
899 self.event_streams.send(Event::Dht(
900 DhtEvent::ValuePut(key)
901 ));
902
903 if let Some(ref metrics) = self.metrics {
904 metrics
905 .kademlia_query_duration
906 .with_label_values(&["value-put"])
907 .observe(started.elapsed().as_secs_f64());
908 }
909 },
910 query => {
911 log::error!(
912 target: LOG_TARGET,
913 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
914 );
915 debug_assert!(false);
916 }
917 }
918 }
919 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
920 match self.pending_queries.remove(&query_id) {
921 Some(KadQuery::GetProviders(key, started)) => {
922 log::trace!(
923 target: LOG_TARGET,
924 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
925 );
926
927 providers.iter().for_each(|p| {
932 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
933 });
934
935 self.event_streams.send(Event::Dht(
936 DhtEvent::ProvidersFound(
937 key.clone().into(),
938 providers.into_iter().map(|p| p.peer.into()).collect()
939 )
940 ));
941
942 self.event_streams.send(Event::Dht(
945 DhtEvent::NoMoreProviders(key.into())
946 ));
947
948 if let Some(ref metrics) = self.metrics {
949 metrics
950 .kademlia_query_duration
951 .with_label_values(&["providers-get"])
952 .observe(started.elapsed().as_secs_f64());
953 }
954 },
955 query => {
956 log::error!(
957 target: LOG_TARGET,
958 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
959 );
960 debug_assert!(false);
961 }
962 }
963 }
964 Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
965 match self.pending_queries.remove(&query_id) {
966 Some(KadQuery::AddProvider(key, started)) => {
967 debug_assert_eq!(key, provided_key.into());
968
969 log::trace!(
970 target: LOG_TARGET,
971 "`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
972 );
973
974 self.event_streams.send(Event::Dht(
975 DhtEvent::StartedProviding(key.into())
976 ));
977
978 if let Some(ref metrics) = self.metrics {
979 metrics
980 .kademlia_query_duration
981 .with_label_values(&["provider-add"])
982 .observe(started.elapsed().as_secs_f64());
983 }
984 }
985 Some(_) => {
986 log::error!(
987 target: LOG_TARGET,
988 "Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
989 );
990 debug_assert!(false);
991 }
992 None => {
993 log::trace!(
994 target: LOG_TARGET,
995 "`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
996 );
997 }
998 }
999 }
1000 Some(DiscoveryEvent::QueryFailed { query_id }) => {
1001 match self.pending_queries.remove(&query_id) {
1002 Some(KadQuery::FindNode(peer_id, started)) => {
1003 log::debug!(
1004 target: LOG_TARGET,
1005 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1006 );
1007
1008 self.event_streams.send(Event::Dht(
1009 DhtEvent::ClosestPeersNotFound(peer_id.into())
1010 ));
1011
1012 if let Some(ref metrics) = self.metrics {
1013 metrics
1014 .kademlia_query_duration
1015 .with_label_values(&["node-find-failed"])
1016 .observe(started.elapsed().as_secs_f64());
1017 }
1018 },
1019 Some(KadQuery::GetValue(key, started)) => {
1020 log::debug!(
1021 target: LOG_TARGET,
1022 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1023 );
1024
1025 self.event_streams.send(Event::Dht(
1026 DhtEvent::ValueNotFound(key)
1027 ));
1028
1029 if let Some(ref metrics) = self.metrics {
1030 metrics
1031 .kademlia_query_duration
1032 .with_label_values(&["value-get-failed"])
1033 .observe(started.elapsed().as_secs_f64());
1034 }
1035 },
1036 Some(KadQuery::PutValue(key, started)) => {
1037 log::debug!(
1038 target: LOG_TARGET,
1039 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1040 );
1041
1042 self.event_streams.send(Event::Dht(
1043 DhtEvent::ValuePutFailed(key)
1044 ));
1045
1046 if let Some(ref metrics) = self.metrics {
1047 metrics
1048 .kademlia_query_duration
1049 .with_label_values(&["value-put-failed"])
1050 .observe(started.elapsed().as_secs_f64());
1051 }
1052 },
1053 Some(KadQuery::GetProviders(key, started)) => {
1054 log::debug!(
1055 target: LOG_TARGET,
1056 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1057 );
1058
1059 self.event_streams.send(Event::Dht(
1060 DhtEvent::ProvidersNotFound(key)
1061 ));
1062
1063 if let Some(ref metrics) = self.metrics {
1064 metrics
1065 .kademlia_query_duration
1066 .with_label_values(&["providers-get-failed"])
1067 .observe(started.elapsed().as_secs_f64());
1068 }
1069 },
1070 Some(KadQuery::AddProvider(key, started)) => {
1071 log::debug!(
1072 target: LOG_TARGET,
1073 "`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1074 );
1075
1076 self.event_streams.send(Event::Dht(
1077 DhtEvent::StartProvidingFailed(key)
1078 ));
1079
1080 if let Some(ref metrics) = self.metrics {
1081 metrics
1082 .kademlia_query_duration
1083 .with_label_values(&["provider-add-failed"])
1084 .observe(started.elapsed().as_secs_f64());
1085 }
1086 },
1087 None => {
1088 log::debug!(
1089 target: LOG_TARGET,
1090 "non-existent query (likely republishing a provider) failed ({query_id:?})",
1091 );
1092 }
1093 }
1094 }
1095 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1096 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1097 }
1098 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1099 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1100 Ok(inserted) => if inserted {
1101 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1102 },
1103 Err(err) => {
1104 log::warn!(
1105 target: LOG_TARGET,
1106 "🔍 Failed to add discovered external address {address:?}: {err:?}",
1107 );
1108 },
1109 }
1110 }
1111 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1112 let local_peer_id = self.litep2p.local_peer_id();
1113
1114 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1116 address.with(Protocol::P2p(*local_peer_id.as_ref()))
1117 } else {
1118 address
1119 };
1120
1121 if self.litep2p.public_addresses().remove_address(&address) {
1122 log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1123 } else {
1124 log::warn!(
1125 target: LOG_TARGET,
1126 "🔍 Failed to remove expired external address {address:?}"
1127 );
1128 }
1129 }
1130 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1131 log::trace!(
1132 target: LOG_TARGET,
1133 "ping time with {peer:?}: {rtt:?}",
1134 );
1135 }
1136 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1137 self.event_streams.send(Event::Dht(
1138 DhtEvent::PutRecordRequest(
1139 key.into(),
1140 value,
1141 publisher.map(Into::into),
1142 expires,
1143 )
1144 ));
1145 },
1146
1147 Some(DiscoveryEvent::RandomKademliaStarted) => {
1148 if let Some(metrics) = self.metrics.as_ref() {
1149 metrics.kademlia_random_queries_total.inc();
1150 }
1151 }
1152 },
1153 event = self.litep2p.next_event() => match event {
1154 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1155 let Some(metrics) = &self.metrics else {
1156 continue;
1157 };
1158
1159 let direction = match endpoint {
1160 Endpoint::Dialer { .. } => "out",
1161 Endpoint::Listener { .. } => {
1162 metrics.incoming_connections_total.inc();
1167
1168 "in"
1169 },
1170 };
1171 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1172
1173 match self.peers.entry(peer) {
1174 Entry::Vacant(entry) => {
1175 entry.insert(ConnectionContext {
1176 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1177 num_connections: 1usize,
1178 });
1179 metrics.distinct_peers_connections_opened_total.inc();
1180 }
1181 Entry::Occupied(entry) => {
1182 let entry = entry.into_mut();
1183 entry.num_connections += 1;
1184 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1185 }
1186 }
1187 }
1188 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1189 let Some(metrics) = &self.metrics else {
1190 continue;
1191 };
1192
1193 let Some(context) = self.peers.get_mut(&peer) else {
1194 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1195 continue
1196 };
1197
1198 let direction = match context.endpoints.remove(&connection_id) {
1199 None => {
1200 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1201 continue
1202 }
1203 Some(endpoint) => {
1204 context.num_connections -= 1;
1205
1206 match endpoint {
1207 Endpoint::Dialer { .. } => "out",
1208 Endpoint::Listener { .. } => "in",
1209 }
1210 }
1211 };
1212
1213 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1214
1215 if context.num_connections == 0 {
1216 self.peers.remove(&peer);
1217 metrics.distinct_peers_connections_closed_total.inc();
1218 }
1219 }
1220 Some(Litep2pEvent::DialFailure { address, error }) => {
1221 log::debug!(
1222 target: LOG_TARGET,
1223 "failed to dial peer at {address:?}: {error:?}",
1224 );
1225
1226 if let Some(metrics) = &self.metrics {
1227 let reason = match error {
1228 DialError::Timeout => "timeout",
1229 DialError::AddressError(_) => "invalid-address",
1230 DialError::DnsError(_) => "cannot-resolve-dns",
1231 DialError::NegotiationError(error) => match error {
1232 NegotiationError::Timeout => "timeout",
1233 NegotiationError::PeerIdMissing => "missing-peer-id",
1234 NegotiationError::StateMismatch => "state-mismatch",
1235 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1236 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1237 NegotiationError::SnowError(_) => "noise-error",
1238 NegotiationError::ParseError(_) => "parse-error",
1239 NegotiationError::IoError(_) => "io-error",
1240 NegotiationError::WebSocket(_) => "webscoket-error",
1241 NegotiationError::BadSignature => "bad-signature",
1242 }
1243 };
1244
1245 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1246 }
1247 }
1248 Some(Litep2pEvent::ListDialFailures { errors }) => {
1249 log::debug!(
1250 target: LOG_TARGET,
1251 "failed to dial peer on multiple addresses {errors:?}",
1252 );
1253
1254 if let Some(metrics) = &self.metrics {
1255 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1256 }
1257 }
1258 None => {
1259 log::error!(
1260 target: LOG_TARGET,
1261 "Litep2p backend terminated"
1262 );
1263 return
1264 }
1265 },
1266 }
1267 }
1268 }
1269}