1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 service::{
43 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44 out_events,
45 traits::{BandwidthSink, NetworkBackend, NetworkService},
46 },
47 NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53 config::ConfigBuilder,
54 crypto::ed25519::Keypair,
55 error::{DialError, NegotiationError},
56 executor::Executor,
57 protocol::{
58 libp2p::{
59 bitswap::Config as BitswapConfig,
60 kademlia::{QueryId, Record},
61 },
62 request_response::ConfigBuilder as RequestResponseConfigBuilder,
63 },
64 transport::{
65 tcp::config::Config as TcpTransportConfig,
66 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67 },
68 types::{
69 multiaddr::{Multiaddr, Protocol},
70 ConnectionId,
71 },
72 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84 cmp,
85 collections::{hash_map::Entry, HashMap, HashSet},
86 fs,
87 future::Future,
88 iter,
89 pin::Pin,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
104
105struct Litep2pBandwidthSink {
107 sink: litep2p::BandwidthSink,
108}
109
110impl BandwidthSink for Litep2pBandwidthSink {
111 fn total_inbound(&self) -> u64 {
112 self.sink.inbound() as u64
113 }
114
115 fn total_outbound(&self) -> u64 {
116 self.sink.outbound() as u64
117 }
118}
119
120struct Litep2pExecutor {
122 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
124}
125
126impl Executor for Litep2pExecutor {
127 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
128 (self.executor)(future)
129 }
130
131 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
132 (self.executor)(future)
133 }
134}
135
136const LOG_TARGET: &str = "sub-libp2p";
138
139struct ConnectionContext {
141 endpoints: HashMap<ConnectionId, Endpoint>,
143
144 num_connections: usize,
146}
147
148#[derive(Debug)]
150enum KadQuery {
151 GetValue(RecordKey, Instant),
153 PutValue(RecordKey, Instant),
155 GetProviders(RecordKey, Instant),
157}
158
159pub struct Litep2pNetworkBackend {
161 litep2p: Litep2p,
163
164 network_service: Arc<dyn NetworkService>,
166
167 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
169
170 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
172
173 pending_queries: HashMap<QueryId, KadQuery>,
175
176 discovery: Discovery,
178
179 num_connected: Arc<AtomicUsize>,
181
182 peers: HashMap<litep2p::PeerId, ConnectionContext>,
184
185 peerstore_handle: Arc<dyn PeerStoreProvider>,
187
188 block_announce_protocol: ProtocolName,
190
191 event_streams: out_events::OutChannels,
193
194 metrics: Option<Metrics>,
196}
197
198impl Litep2pNetworkBackend {
199 fn parse_addresses(
202 addresses: impl Iterator<Item = Multiaddr>,
203 ) -> HashMap<PeerId, Vec<Multiaddr>> {
204 addresses
205 .into_iter()
206 .filter_map(|address| match address.iter().next() {
207 Some(
208 Protocol::Dns(_) |
209 Protocol::Dns4(_) |
210 Protocol::Dns6(_) |
211 Protocol::Ip6(_) |
212 Protocol::Ip4(_),
213 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
214 {
215 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
216 .map_or(None, |peer| Some((peer, Some(address)))),
217 _ => None,
218 },
219 Some(Protocol::P2p(multihash)) =>
220 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
221 _ => None,
222 })
223 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
224 let entry = acc.entry(peer).or_default();
225 maybe_address.map(|address| entry.push(address));
226
227 acc
228 })
229 }
230
231 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
233 Self::parse_addresses(peers.into_iter())
234 .into_iter()
235 .filter_map(|(peer, addresses)| {
236 if addresses.is_empty() {
238 return Some(peer)
239 }
240
241 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
242 log::warn!(
243 target: LOG_TARGET,
244 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
245 );
246 return None
247 }
248
249 self.peerstore_handle.add_known_peer(peer);
250 Some(peer)
251 })
252 .collect()
253 }
254}
255
256impl Litep2pNetworkBackend {
257 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
259 let secret: litep2p::crypto::ed25519::SecretKey =
260 node_key.clone().into_keypair()?.secret().into();
261
262 let local_identity = Keypair::from(secret);
263 let local_public = local_identity.public();
264 let local_peer_id = local_public.to_peer_id();
265
266 Ok((local_identity, local_peer_id))
267 }
268
269 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
271 config: &FullNetworkConfiguration<B, H, Self>,
272 ) -> ConfigBuilder {
273 let _ = match config.network_config.transport {
274 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
275 TransportConfig::Normal { .. } => false,
276 };
277 let config_builder = ConfigBuilder::new();
278
279 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
280 .network_config
281 .listen_addresses
282 .iter()
283 .filter_map(|address| {
284 use sc_network_types::multiaddr::Protocol;
285
286 let mut iter = address.iter();
287
288 match iter.next() {
289 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
290 protocol => {
291 log::error!(
292 target: LOG_TARGET,
293 "unknown protocol {protocol:?}, ignoring {address:?}",
294 );
295
296 return None
297 },
298 }
299
300 match iter.next() {
301 Some(Protocol::Tcp(_)) => match iter.next() {
302 Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
303 Some((None, Some(address.clone()))),
304 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
305 protocol => {
306 log::error!(
307 target: LOG_TARGET,
308 "unknown protocol {protocol:?}, ignoring {address:?}",
309 );
310 None
311 },
312 },
313 protocol => {
314 log::error!(
315 target: LOG_TARGET,
316 "unknown protocol {protocol:?}, ignoring {address:?}",
317 );
318 None
319 },
320 }
321 })
322 .unzip();
323
324 config_builder
325 .with_websocket(WebSocketTransportConfig {
326 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
327 yamux_config: litep2p::yamux::Config::default(),
328 nodelay: true,
329 ..Default::default()
330 })
331 .with_tcp(TcpTransportConfig {
332 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
333 yamux_config: litep2p::yamux::Config::default(),
334 nodelay: true,
335 ..Default::default()
336 })
337 }
338}
339
340#[async_trait::async_trait]
341impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
342 type NotificationProtocolConfig = NotificationProtocolConfig;
343 type RequestResponseProtocolConfig = RequestResponseConfig;
344 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
345 type PeerStore = Peerstore;
346 type BitswapConfig = BitswapConfig;
347
348 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
349 where
350 Self: Sized,
351 {
352 let (keypair, local_peer_id) =
353 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
354 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
355
356 params.network_config.network_config.boot_nodes = params
357 .network_config
358 .network_config
359 .boot_nodes
360 .into_iter()
361 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
362 .collect();
363 params.network_config.network_config.default_peers_set.reserved_nodes = params
364 .network_config
365 .network_config
366 .default_peers_set
367 .reserved_nodes
368 .into_iter()
369 .filter(|reserved_node| {
370 if reserved_node.peer_id == local_peer_id.into() {
371 log::warn!(
372 target: LOG_TARGET,
373 "Local peer ID used in reserved node, ignoring: {reserved_node}",
374 );
375 false
376 } else {
377 true
378 }
379 })
380 .collect();
381
382 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
383 fs::create_dir_all(path)?;
384 }
385
386 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
387 log::info!(target: LOG_TARGET, "Running litep2p network backend");
388
389 params.network_config.sanity_check_addresses()?;
390 params.network_config.sanity_check_bootnodes()?;
391
392 let mut config_builder =
393 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
394 let known_addresses = params.network_config.known_addresses();
395 let peer_store_handle = params.network_config.peer_store_handle();
396 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
397
398 let FullNetworkConfiguration {
399 notification_protocols,
400 request_response_protocols,
401 network_config,
402 ..
403 } = params.network_config;
404
405 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
411 let mut notif_protocols = HashMap::from_iter([(
412 params.block_announce_config.protocol_name().clone(),
413 params.block_announce_config.handle,
414 )]);
415
416 config_builder = notification_protocols
418 .into_iter()
419 .fold(config_builder, |config_builder, mut config| {
420 config.config.set_handshake(Roles::from(¶ms.role).encode());
421 notif_protocols.insert(config.protocol_name, config.handle);
422
423 config_builder.with_notification_protocol(config.config)
424 })
425 .with_notification_protocol(params.block_announce_config.config);
426
427 let metrics = match ¶ms.metrics_registry {
429 Some(registry) => Some(register_without_sources(registry)?),
430 None => None,
431 };
432
433 let (mut request_response_receivers, request_response_senders): (
439 HashMap<_, _>,
440 HashMap<_, _>,
441 ) = request_response_protocols
442 .iter()
443 .map(|config| {
444 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
445 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
446 })
447 .unzip();
448
449 config_builder = request_response_protocols.into_iter().fold(
450 config_builder,
451 |config_builder, config| {
452 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
453 Litep2pProtocolName::from(config.protocol_name.clone()),
454 )
455 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
456 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
457 .with_timeout(config.request_timeout)
458 .build();
459
460 let protocol = RequestResponseProtocol::new(
461 config.protocol_name.clone(),
462 handle,
463 Arc::clone(&peer_store_handle),
464 config.inbound_queue,
465 request_response_receivers
466 .remove(&config.protocol_name)
467 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
468 request_response_senders.clone(),
469 metrics.clone(),
470 );
471
472 executor.run(Box::pin(async move {
473 protocol.run().await;
474 }));
475
476 config_builder.with_request_response_protocol(protocol_config)
477 },
478 );
479
480 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
482 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
483 use sc_network_types::multiaddr::Protocol;
484
485 let address = match address.iter().last() {
486 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
487 address.with(Protocol::P2p(peer.into())),
488 Some(Protocol::P2p(_)) => address,
489 _ => return acc,
490 };
491
492 acc.entry(peer.into()).or_default().push(address.into());
493 peer_store_handle.add_known_peer(peer);
494
495 acc
496 });
497
498 let listen_addresses = Arc::new(Default::default());
500 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
501 Discovery::new(
502 local_peer_id,
503 &network_config,
504 params.genesis_hash,
505 params.fork_id.as_deref(),
506 ¶ms.protocol_id,
507 known_addresses.clone(),
508 Arc::clone(&listen_addresses),
509 Arc::clone(&peer_store_handle),
510 );
511
512 config_builder = config_builder
513 .with_known_addresses(known_addresses.clone().into_iter())
514 .with_libp2p_ping(ping_config)
515 .with_libp2p_identify(identify_config)
516 .with_libp2p_kademlia(kademlia_config)
517 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
518 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
519 ))
520 .with_keep_alive_timeout(KEEP_ALIVE_TIMEOUT)
523 .with_executor(executor);
524
525 if let Some(config) = maybe_mdns_config {
526 config_builder = config_builder.with_mdns(config);
527 }
528
529 if let Some(config) = params.bitswap_config {
530 config_builder = config_builder.with_libp2p_bitswap(config);
531 }
532
533 let litep2p =
534 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
535
536 litep2p.listen_addresses().for_each(|address| {
537 log::debug!(target: LOG_TARGET, "listening on: {address}");
538
539 listen_addresses.write().insert(address.clone());
540 });
541
542 let public_addresses = litep2p.public_addresses();
543 for address in network_config.public_addresses.iter() {
544 if let Err(err) = public_addresses.add_address(address.clone().into()) {
545 log::warn!(
546 target: LOG_TARGET,
547 "failed to add public address {address:?}: {err:?}",
548 );
549 }
550 }
551
552 let network_service = Arc::new(Litep2pNetworkService::new(
553 local_peer_id,
554 keypair.clone(),
555 cmd_tx,
556 Arc::clone(&peer_store_handle),
557 notif_protocols.clone(),
558 block_announce_protocol.clone(),
559 request_response_senders,
560 Arc::clone(&listen_addresses),
561 public_addresses,
562 ));
563
564 let num_connected = Arc::new(Default::default());
566 let bandwidth: Arc<dyn BandwidthSink> =
567 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
568
569 if let Some(registry) = ¶ms.metrics_registry {
570 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
571 }
572
573 Ok(Self {
574 network_service,
575 cmd_rx,
576 metrics,
577 peerset_handles: notif_protocols,
578 num_connected,
579 discovery,
580 pending_queries: HashMap::new(),
581 peerstore_handle: peer_store_handle,
582 block_announce_protocol,
583 event_streams: out_events::OutChannels::new(None)?,
584 peers: HashMap::new(),
585 litep2p,
586 })
587 }
588
589 fn network_service(&self) -> Arc<dyn NetworkService> {
590 Arc::clone(&self.network_service)
591 }
592
593 fn peer_store(
594 bootnodes: Vec<sc_network_types::PeerId>,
595 metrics_registry: Option<Registry>,
596 ) -> Self::PeerStore {
597 Peerstore::new(bootnodes, metrics_registry)
598 }
599
600 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
601 NotificationMetrics::new(registry)
602 }
603
604 fn bitswap_server(
606 client: Arc<dyn BlockBackend<B> + Send + Sync>,
607 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
608 BitswapServer::new(client)
609 }
610
611 fn notification_config(
613 protocol_name: ProtocolName,
614 fallback_names: Vec<ProtocolName>,
615 max_notification_size: u64,
616 handshake: Option<NotificationHandshake>,
617 set_config: SetConfig,
618 metrics: NotificationMetrics,
619 peerstore_handle: Arc<dyn PeerStoreProvider>,
620 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
621 Self::NotificationProtocolConfig::new(
622 protocol_name,
623 fallback_names,
624 max_notification_size as usize,
625 handshake,
626 set_config,
627 metrics,
628 peerstore_handle,
629 )
630 }
631
632 fn request_response_config(
634 protocol_name: ProtocolName,
635 fallback_names: Vec<ProtocolName>,
636 max_request_size: u64,
637 max_response_size: u64,
638 request_timeout: Duration,
639 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
640 ) -> Self::RequestResponseProtocolConfig {
641 Self::RequestResponseProtocolConfig::new(
642 protocol_name,
643 fallback_names,
644 max_request_size,
645 max_response_size,
646 request_timeout,
647 inbound_queue,
648 )
649 }
650
651 async fn run(mut self) {
653 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
654
655 loop {
656 let num_connected_peers = self
657 .peerset_handles
658 .get(&self.block_announce_protocol)
659 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
660 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
661
662 tokio::select! {
663 command = self.cmd_rx.next() => match command {
664 None => return,
665 Some(command) => match command {
666 NetworkServiceCommand::GetValue{ key } => {
667 let query_id = self.discovery.get_value(key.clone()).await;
668 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
669 }
670 NetworkServiceCommand::PutValue { key, value } => {
671 let query_id = self.discovery.put_value(key.clone(), value).await;
672 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
673 }
674 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
675 let kademlia_key = record.key.clone();
676 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
677 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
678 }
679 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
680 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
681 }
682 NetworkServiceCommand::StartProviding { key } => {
683 self.discovery.start_providing(key).await;
684 }
685 NetworkServiceCommand::StopProviding { key } => {
686 self.discovery.stop_providing(key).await;
687 }
688 NetworkServiceCommand::GetProviders { key } => {
689 let query_id = self.discovery.get_providers(key.clone()).await;
690 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
691 }
692 NetworkServiceCommand::EventStream { tx } => {
693 self.event_streams.push(tx);
694 }
695 NetworkServiceCommand::Status { tx } => {
696 let _ = tx.send(NetworkStatus {
697 num_connected_peers: self
698 .peerset_handles
699 .get(&self.block_announce_protocol)
700 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
701 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
702 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
703 });
704 }
705 NetworkServiceCommand::AddPeersToReservedSet {
706 protocol,
707 peers,
708 } => {
709 let peers = self.add_addresses(peers.into_iter().map(Into::into));
710
711 match self.peerset_handles.get(&protocol) {
712 Some(handle) => {
713 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
714 }
715 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
716 };
717 }
718 NetworkServiceCommand::AddKnownAddress { peer, address } => {
719 let mut address: Multiaddr = address.into();
720
721 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
722 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
723 }
724
725 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) == 0usize {
726 log::debug!(
727 target: LOG_TARGET,
728 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
729 );
730 }
731 },
732 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
733 let peers = self.add_addresses(peers.into_iter().map(Into::into));
734
735 match self.peerset_handles.get(&protocol) {
736 Some(handle) => {
737 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
738 }
739 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
740 }
741
742 },
743 NetworkServiceCommand::DisconnectPeer {
744 protocol,
745 peer,
746 } => {
747 let Some(handle) = self.peerset_handles.get(&protocol) else {
748 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
749 continue
750 };
751
752 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
753 }
754 NetworkServiceCommand::SetReservedOnly {
755 protocol,
756 reserved_only,
757 } => {
758 let Some(handle) = self.peerset_handles.get(&protocol) else {
759 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
760 continue
761 };
762
763 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
764 }
765 NetworkServiceCommand::RemoveReservedPeers {
766 protocol,
767 peers,
768 } => {
769 let Some(handle) = self.peerset_handles.get(&protocol) else {
770 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
771 continue
772 };
773
774 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
775 }
776 }
777 },
778 event = self.discovery.next() => match event {
779 None => return,
780 Some(DiscoveryEvent::Discovered { addresses }) => {
781 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
783 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
784 self.peerstore_handle.add_known_peer(peer);
785 }
786 }
787 }
788 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
789 for peer in peers {
790 self.peerstore_handle.add_known_peer(peer.into());
791 }
792 }
793 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
794 if !self.pending_queries.contains_key(&query_id) {
795 log::error!(
796 target: LOG_TARGET,
797 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
798 );
799
800 continue
801 }
802
803 let peer_id: sc_network_types::PeerId = record.peer.into();
804 let record = PeerRecord {
805 record: P2PRecord {
806 key: record.record.key.to_vec().into(),
807 value: record.record.value,
808 publisher: record.record.publisher.map(|peer_id| {
809 let peer_id: sc_network_types::PeerId = peer_id.into();
810 peer_id.into()
811 }),
812 expires: record.record.expires,
813 },
814 peer: Some(peer_id.into()),
815 };
816
817 self.event_streams.send(
818 Event::Dht(
819 DhtEvent::ValueFound(
820 record.into()
821 )
822 )
823 );
824 }
825 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
826 match self.pending_queries.remove(&query_id) {
827 Some(KadQuery::GetValue(key, started)) => {
828 log::trace!(
829 target: LOG_TARGET,
830 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
831 );
832
833 if let Some(ref metrics) = self.metrics {
834 metrics
835 .kademlia_query_duration
836 .with_label_values(&["value-get"])
837 .observe(started.elapsed().as_secs_f64());
838 }
839 },
840 query => {
841 log::error!(
842 target: LOG_TARGET,
843 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
844 );
845 debug_assert!(false);
846 },
847 }
848 }
849 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
850 match self.pending_queries.remove(&query_id) {
851 Some(KadQuery::PutValue(key, started)) => {
852 log::trace!(
853 target: LOG_TARGET,
854 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
855 );
856
857 self.event_streams.send(Event::Dht(
858 DhtEvent::ValuePut(key)
859 ));
860
861 if let Some(ref metrics) = self.metrics {
862 metrics
863 .kademlia_query_duration
864 .with_label_values(&["value-put"])
865 .observe(started.elapsed().as_secs_f64());
866 }
867 },
868 query => {
869 log::error!(
870 target: LOG_TARGET,
871 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
872 );
873 debug_assert!(false);
874 }
875 }
876 }
877 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
878 match self.pending_queries.remove(&query_id) {
879 Some(KadQuery::GetProviders(key, started)) => {
880 log::trace!(
881 target: LOG_TARGET,
882 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
883 );
884
885 self.event_streams.send(Event::Dht(
886 DhtEvent::ProvidersFound(
887 key.into(),
888 providers.into_iter().map(|p| p.peer.into()).collect()
889 )
890 ));
891
892 if let Some(ref metrics) = self.metrics {
893 metrics
894 .kademlia_query_duration
895 .with_label_values(&["providers-get"])
896 .observe(started.elapsed().as_secs_f64());
897 }
898 },
899 query => {
900 log::error!(
901 target: LOG_TARGET,
902 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
903 );
904 debug_assert!(false);
905 }
906 }
907 }
908 Some(DiscoveryEvent::QueryFailed { query_id }) => {
909 match self.pending_queries.remove(&query_id) {
910 Some(KadQuery::GetValue(key, started)) => {
911 log::debug!(
912 target: LOG_TARGET,
913 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
914 );
915
916 self.event_streams.send(Event::Dht(
917 DhtEvent::ValueNotFound(key)
918 ));
919
920 if let Some(ref metrics) = self.metrics {
921 metrics
922 .kademlia_query_duration
923 .with_label_values(&["value-get-failed"])
924 .observe(started.elapsed().as_secs_f64());
925 }
926 },
927 Some(KadQuery::PutValue(key, started)) => {
928 log::debug!(
929 target: LOG_TARGET,
930 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
931 );
932
933 self.event_streams.send(Event::Dht(
934 DhtEvent::ValuePutFailed(key)
935 ));
936
937 if let Some(ref metrics) = self.metrics {
938 metrics
939 .kademlia_query_duration
940 .with_label_values(&["value-put-failed"])
941 .observe(started.elapsed().as_secs_f64());
942 }
943 },
944 Some(KadQuery::GetProviders(key, started)) => {
945 log::debug!(
946 target: LOG_TARGET,
947 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
948 );
949
950 self.event_streams.send(Event::Dht(
951 DhtEvent::ProvidersNotFound(key)
952 ));
953
954 if let Some(ref metrics) = self.metrics {
955 metrics
956 .kademlia_query_duration
957 .with_label_values(&["providers-get-failed"])
958 .observe(started.elapsed().as_secs_f64());
959 }
960 },
961 None => {
962 log::warn!(
963 target: LOG_TARGET,
964 "non-existent query failed ({query_id:?})",
965 );
966 }
967 }
968 }
969 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
970 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
971 }
972 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
973 match self.litep2p.public_addresses().add_address(address.clone().into()) {
974 Ok(inserted) => if inserted {
975 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
976 },
977 Err(err) => {
978 log::warn!(
979 target: LOG_TARGET,
980 "🔍 Failed to add discovered external address {address:?}: {err:?}",
981 );
982 },
983 }
984 }
985 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
986 let local_peer_id = self.litep2p.local_peer_id();
987
988 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
990 address.with(Protocol::P2p(*local_peer_id.as_ref()))
991 } else {
992 address
993 };
994
995 if self.litep2p.public_addresses().remove_address(&address) {
996 log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
997 } else {
998 log::warn!(
999 target: LOG_TARGET,
1000 "🔍 Failed to remove expired external address {address:?}"
1001 );
1002 }
1003 }
1004 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1005 log::trace!(
1006 target: LOG_TARGET,
1007 "ping time with {peer:?}: {rtt:?}",
1008 );
1009 }
1010 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1011 self.event_streams.send(Event::Dht(
1012 DhtEvent::PutRecordRequest(
1013 key.into(),
1014 value,
1015 publisher.map(Into::into),
1016 expires,
1017 )
1018 ));
1019 },
1020
1021 Some(DiscoveryEvent::RandomKademliaStarted) => {
1022 if let Some(metrics) = self.metrics.as_ref() {
1023 metrics.kademlia_random_queries_total.inc();
1024 }
1025 }
1026 },
1027 event = self.litep2p.next_event() => match event {
1028 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1029 let Some(metrics) = &self.metrics else {
1030 continue;
1031 };
1032
1033 let direction = match endpoint {
1034 Endpoint::Dialer { .. } => "out",
1035 Endpoint::Listener { .. } => {
1036 metrics.incoming_connections_total.inc();
1041
1042 "in"
1043 },
1044 };
1045 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1046
1047 match self.peers.entry(peer) {
1048 Entry::Vacant(entry) => {
1049 entry.insert(ConnectionContext {
1050 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1051 num_connections: 1usize,
1052 });
1053 metrics.distinct_peers_connections_opened_total.inc();
1054 }
1055 Entry::Occupied(entry) => {
1056 let entry = entry.into_mut();
1057 entry.num_connections += 1;
1058 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1059 }
1060 }
1061 }
1062 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1063 let Some(metrics) = &self.metrics else {
1064 continue;
1065 };
1066
1067 let Some(context) = self.peers.get_mut(&peer) else {
1068 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1069 continue
1070 };
1071
1072 let direction = match context.endpoints.remove(&connection_id) {
1073 None => {
1074 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1075 continue
1076 }
1077 Some(endpoint) => {
1078 context.num_connections -= 1;
1079
1080 match endpoint {
1081 Endpoint::Dialer { .. } => "out",
1082 Endpoint::Listener { .. } => "in",
1083 }
1084 }
1085 };
1086
1087 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1088
1089 if context.num_connections == 0 {
1090 self.peers.remove(&peer);
1091 metrics.distinct_peers_connections_closed_total.inc();
1092 }
1093 }
1094 Some(Litep2pEvent::DialFailure { address, error }) => {
1095 log::debug!(
1096 target: LOG_TARGET,
1097 "failed to dial peer at {address:?}: {error:?}",
1098 );
1099
1100 if let Some(metrics) = &self.metrics {
1101 let reason = match error {
1102 DialError::Timeout => "timeout",
1103 DialError::AddressError(_) => "invalid-address",
1104 DialError::DnsError(_) => "cannot-resolve-dns",
1105 DialError::NegotiationError(error) => match error {
1106 NegotiationError::Timeout => "timeout",
1107 NegotiationError::PeerIdMissing => "missing-peer-id",
1108 NegotiationError::StateMismatch => "state-mismatch",
1109 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1110 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1111 NegotiationError::SnowError(_) => "noise-error",
1112 NegotiationError::ParseError(_) => "parse-error",
1113 NegotiationError::IoError(_) => "io-error",
1114 NegotiationError::WebSocket(_) => "webscoket-error",
1115 NegotiationError::BadSignature => "bad-signature",
1116 }
1117 };
1118
1119 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1120 }
1121 }
1122 Some(Litep2pEvent::ListDialFailures { errors }) => {
1123 log::debug!(
1124 target: LOG_TARGET,
1125 "failed to dial peer on multiple addresses {errors:?}",
1126 );
1127
1128 if let Some(metrics) = &self.metrics {
1129 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1130 }
1131 }
1132 None => {
1133 log::error!(
1134 target: LOG_TARGET,
1135 "Litep2p backend terminated"
1136 );
1137 return
1138 }
1139 },
1140 }
1141 }
1142 }
1143}