1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 service::{
43 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44 out_events,
45 traits::{BandwidthSink, NetworkBackend, NetworkService},
46 },
47 NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53 config::ConfigBuilder,
54 crypto::ed25519::Keypair,
55 error::{DialError, NegotiationError},
56 executor::Executor,
57 protocol::{
58 libp2p::{
59 bitswap::Config as BitswapConfig,
60 kademlia::{QueryId, Record},
61 },
62 request_response::ConfigBuilder as RequestResponseConfigBuilder,
63 },
64 transport::{
65 tcp::config::Config as TcpTransportConfig,
66 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67 },
68 types::{
69 multiaddr::{Multiaddr, Protocol},
70 ConnectionId,
71 },
72 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84 cmp,
85 collections::{hash_map::Entry, HashMap, HashSet},
86 fs,
87 future::Future,
88 iter,
89 pin::Pin,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102struct Litep2pBandwidthSink {
104 sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108 fn total_inbound(&self) -> u64 {
109 self.sink.inbound() as u64
110 }
111
112 fn total_outbound(&self) -> u64 {
113 self.sink.outbound() as u64
114 }
115}
116
117struct Litep2pExecutor {
119 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125 (self.executor)(future)
126 }
127
128 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129 (self.executor)(future)
130 }
131}
132
133const LOG_TARGET: &str = "sub-libp2p";
135
136struct ConnectionContext {
138 endpoints: HashMap<ConnectionId, Endpoint>,
140
141 num_connections: usize,
143}
144
145#[derive(Debug)]
147enum KadQuery {
148 FindNode(PeerId, Instant),
150 GetValue(RecordKey, Instant),
152 PutValue(RecordKey, Instant),
154 GetProviders(RecordKey, Instant),
156 AddProvider(RecordKey, Instant),
158}
159
160pub struct Litep2pNetworkBackend {
162 litep2p: Litep2p,
164
165 network_service: Arc<dyn NetworkService>,
167
168 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
170
171 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
173
174 pending_queries: HashMap<QueryId, KadQuery>,
176
177 discovery: Discovery,
179
180 num_connected: Arc<AtomicUsize>,
182
183 peers: HashMap<litep2p::PeerId, ConnectionContext>,
185
186 peerstore_handle: Arc<dyn PeerStoreProvider>,
188
189 block_announce_protocol: ProtocolName,
191
192 event_streams: out_events::OutChannels,
194
195 metrics: Option<Metrics>,
197}
198
199impl Litep2pNetworkBackend {
200 fn parse_addresses(
203 addresses: impl Iterator<Item = Multiaddr>,
204 ) -> HashMap<PeerId, Vec<Multiaddr>> {
205 addresses
206 .into_iter()
207 .filter_map(|address| match address.iter().next() {
208 Some(
209 Protocol::Dns(_) |
210 Protocol::Dns4(_) |
211 Protocol::Dns6(_) |
212 Protocol::Ip6(_) |
213 Protocol::Ip4(_),
214 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
215 {
216 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
217 .map_or(None, |peer| Some((peer, Some(address)))),
218 _ => None,
219 },
220 Some(Protocol::P2p(multihash)) => {
221 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None)))
222 },
223 _ => None,
224 })
225 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
226 let entry = acc.entry(peer).or_default();
227 maybe_address.map(|address| entry.push(address));
228
229 acc
230 })
231 }
232
233 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
235 Self::parse_addresses(peers.into_iter())
236 .into_iter()
237 .filter_map(|(peer, addresses)| {
238 if addresses.is_empty() {
240 return Some(peer);
241 }
242
243 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
244 log::warn!(
245 target: LOG_TARGET,
246 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
247 );
248 return None;
249 }
250
251 self.peerstore_handle.add_known_peer(peer);
252 Some(peer)
253 })
254 .collect()
255 }
256}
257
258impl Litep2pNetworkBackend {
259 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
261 let secret: litep2p::crypto::ed25519::SecretKey =
262 node_key.clone().into_keypair()?.secret().into();
263
264 let local_identity = Keypair::from(secret);
265 let local_public = local_identity.public();
266 let local_peer_id = local_public.to_peer_id();
267
268 Ok((local_identity, local_peer_id))
269 }
270
271 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
273 config: &FullNetworkConfiguration<B, H, Self>,
274 ) -> ConfigBuilder {
275 let _ = match config.network_config.transport {
276 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
277 TransportConfig::Normal { .. } => false,
278 };
279 let config_builder = ConfigBuilder::new();
280
281 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
282 .network_config
283 .listen_addresses
284 .iter()
285 .filter_map(|address| {
286 use sc_network_types::multiaddr::Protocol;
287
288 let mut iter = address.iter();
289
290 match iter.next() {
291 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
292 protocol => {
293 log::error!(
294 target: LOG_TARGET,
295 "unknown protocol {protocol:?}, ignoring {address:?}",
296 );
297
298 return None;
299 },
300 }
301
302 match iter.next() {
303 Some(Protocol::Tcp(_)) => match iter.next() {
304 Some(Protocol::Ws(_) | Protocol::Wss(_)) => {
305 Some((None, Some(address.clone())))
306 },
307 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
308 protocol => {
309 log::error!(
310 target: LOG_TARGET,
311 "unknown protocol {protocol:?}, ignoring {address:?}",
312 );
313 None
314 },
315 },
316 protocol => {
317 log::error!(
318 target: LOG_TARGET,
319 "unknown protocol {protocol:?}, ignoring {address:?}",
320 );
321 None
322 },
323 }
324 })
325 .unzip();
326
327 config_builder
328 .with_websocket(WebSocketTransportConfig {
329 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
330 yamux_config: litep2p::yamux::Config::default(),
331 nodelay: true,
332 ..Default::default()
333 })
334 .with_tcp(TcpTransportConfig {
335 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
336 yamux_config: litep2p::yamux::Config::default(),
337 nodelay: true,
338 ..Default::default()
339 })
340 }
341}
342
343#[async_trait::async_trait]
344impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
345 type NotificationProtocolConfig = NotificationProtocolConfig;
346 type RequestResponseProtocolConfig = RequestResponseConfig;
347 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
348 type PeerStore = Peerstore;
349 type BitswapConfig = BitswapConfig;
350
351 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
352 where
353 Self: Sized,
354 {
355 let (keypair, local_peer_id) =
356 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
357 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
358
359 params.network_config.network_config.boot_nodes = params
360 .network_config
361 .network_config
362 .boot_nodes
363 .into_iter()
364 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
365 .collect();
366 params.network_config.network_config.default_peers_set.reserved_nodes = params
367 .network_config
368 .network_config
369 .default_peers_set
370 .reserved_nodes
371 .into_iter()
372 .filter(|reserved_node| {
373 if reserved_node.peer_id == local_peer_id.into() {
374 log::warn!(
375 target: LOG_TARGET,
376 "Local peer ID used in reserved node, ignoring: {reserved_node}",
377 );
378 false
379 } else {
380 true
381 }
382 })
383 .collect();
384
385 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
386 fs::create_dir_all(path)?;
387 }
388
389 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
390 log::info!(target: LOG_TARGET, "Running litep2p network backend");
391
392 params.network_config.sanity_check_addresses()?;
393 params.network_config.sanity_check_bootnodes()?;
394
395 let mut config_builder =
396 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
397 let known_addresses = params.network_config.known_addresses();
398 let peer_store_handle = params.network_config.peer_store_handle();
399 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
400
401 let FullNetworkConfiguration {
402 notification_protocols,
403 request_response_protocols,
404 network_config,
405 ..
406 } = params.network_config;
407
408 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
414 let mut notif_protocols = HashMap::from_iter([(
415 params.block_announce_config.protocol_name().clone(),
416 params.block_announce_config.handle,
417 )]);
418
419 config_builder = notification_protocols
421 .into_iter()
422 .fold(config_builder, |config_builder, mut config| {
423 config.config.set_handshake(Roles::from(¶ms.role).encode());
424 notif_protocols.insert(config.protocol_name, config.handle);
425
426 config_builder.with_notification_protocol(config.config)
427 })
428 .with_notification_protocol(params.block_announce_config.config);
429
430 let metrics = match ¶ms.metrics_registry {
432 Some(registry) => Some(register_without_sources(registry)?),
433 None => None,
434 };
435
436 let (mut request_response_receivers, request_response_senders): (
442 HashMap<_, _>,
443 HashMap<_, _>,
444 ) = request_response_protocols
445 .iter()
446 .map(|config| {
447 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
448 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
449 })
450 .unzip();
451
452 config_builder = request_response_protocols.into_iter().fold(
453 config_builder,
454 |config_builder, config| {
455 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
456 Litep2pProtocolName::from(config.protocol_name.clone()),
457 )
458 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
459 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
460 .with_timeout(config.request_timeout)
461 .build();
462
463 let protocol = RequestResponseProtocol::new(
464 config.protocol_name.clone(),
465 handle,
466 Arc::clone(&peer_store_handle),
467 config.inbound_queue,
468 request_response_receivers
469 .remove(&config.protocol_name)
470 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
471 request_response_senders.clone(),
472 metrics.clone(),
473 );
474
475 executor.run(Box::pin(async move {
476 protocol.run().await;
477 }));
478
479 config_builder.with_request_response_protocol(protocol_config)
480 },
481 );
482
483 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
485 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
486 use sc_network_types::multiaddr::Protocol;
487
488 let address = match address.iter().last() {
489 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) => {
490 address.with(Protocol::P2p(peer.into()))
491 },
492 Some(Protocol::P2p(_)) => address,
493 _ => return acc,
494 };
495
496 acc.entry(peer.into()).or_default().push(address.into());
497 peer_store_handle.add_known_peer(peer);
498
499 acc
500 });
501
502 let listen_addresses = Arc::new(Default::default());
504 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
505 Discovery::new(
506 local_peer_id,
507 &network_config,
508 params.genesis_hash,
509 params.fork_id.as_deref(),
510 ¶ms.protocol_id,
511 known_addresses.clone(),
512 Arc::clone(&listen_addresses),
513 Arc::clone(&peer_store_handle),
514 );
515
516 config_builder = config_builder
517 .with_known_addresses(known_addresses.clone().into_iter())
518 .with_libp2p_ping(ping_config)
519 .with_libp2p_identify(identify_config)
520 .with_libp2p_kademlia(kademlia_config)
521 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
522 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
523 ))
524 .with_keep_alive_timeout(network_config.idle_connection_timeout)
525 .with_system_resolver()
528 .with_executor(executor);
529
530 if let Some(config) = maybe_mdns_config {
531 config_builder = config_builder.with_mdns(config);
532 }
533
534 if let Some(config) = params.bitswap_config {
535 config_builder = config_builder.with_libp2p_bitswap(config);
536 }
537
538 let litep2p =
539 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
540
541 litep2p.listen_addresses().for_each(|address| {
542 log::debug!(target: LOG_TARGET, "listening on: {address}");
543
544 listen_addresses.write().insert(address.clone());
545 });
546
547 let public_addresses = litep2p.public_addresses();
548 for address in network_config.public_addresses.iter() {
549 if let Err(err) = public_addresses.add_address(address.clone().into()) {
550 log::warn!(
551 target: LOG_TARGET,
552 "failed to add public address {address:?}: {err:?}",
553 );
554 }
555 }
556
557 let network_service = Arc::new(Litep2pNetworkService::new(
558 local_peer_id,
559 keypair.clone(),
560 cmd_tx,
561 Arc::clone(&peer_store_handle),
562 notif_protocols.clone(),
563 block_announce_protocol.clone(),
564 request_response_senders,
565 Arc::clone(&listen_addresses),
566 public_addresses,
567 ));
568
569 let num_connected = Arc::new(Default::default());
571 let bandwidth: Arc<dyn BandwidthSink> =
572 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
573
574 if let Some(registry) = ¶ms.metrics_registry {
575 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
576 }
577
578 Ok(Self {
579 network_service,
580 cmd_rx,
581 metrics,
582 peerset_handles: notif_protocols,
583 num_connected,
584 discovery,
585 pending_queries: HashMap::new(),
586 peerstore_handle: peer_store_handle,
587 block_announce_protocol,
588 event_streams: out_events::OutChannels::new(None)?,
589 peers: HashMap::new(),
590 litep2p,
591 })
592 }
593
594 fn network_service(&self) -> Arc<dyn NetworkService> {
595 Arc::clone(&self.network_service)
596 }
597
598 fn peer_store(
599 bootnodes: Vec<sc_network_types::PeerId>,
600 metrics_registry: Option<Registry>,
601 ) -> Self::PeerStore {
602 Peerstore::new(bootnodes, metrics_registry)
603 }
604
605 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
606 NotificationMetrics::new(registry)
607 }
608
609 fn bitswap_server(
611 client: Arc<dyn BlockBackend<B> + Send + Sync>,
612 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
613 BitswapServer::new(client)
614 }
615
616 fn notification_config(
618 protocol_name: ProtocolName,
619 fallback_names: Vec<ProtocolName>,
620 max_notification_size: u64,
621 handshake: Option<NotificationHandshake>,
622 set_config: SetConfig,
623 metrics: NotificationMetrics,
624 peerstore_handle: Arc<dyn PeerStoreProvider>,
625 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
626 Self::NotificationProtocolConfig::new(
627 protocol_name,
628 fallback_names,
629 max_notification_size as usize,
630 handshake,
631 set_config,
632 metrics,
633 peerstore_handle,
634 )
635 }
636
637 fn request_response_config(
639 protocol_name: ProtocolName,
640 fallback_names: Vec<ProtocolName>,
641 max_request_size: u64,
642 max_response_size: u64,
643 request_timeout: Duration,
644 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
645 ) -> Self::RequestResponseProtocolConfig {
646 Self::RequestResponseProtocolConfig::new(
647 protocol_name,
648 fallback_names,
649 max_request_size,
650 max_response_size,
651 request_timeout,
652 inbound_queue,
653 )
654 }
655
656 async fn run(mut self) {
658 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
659
660 loop {
661 let num_connected_peers = self
662 .peerset_handles
663 .get(&self.block_announce_protocol)
664 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
665 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
666
667 tokio::select! {
668 command = self.cmd_rx.next() => match command {
669 None => return,
670 Some(command) => match command {
671 NetworkServiceCommand::FindClosestPeers { target } => {
672 let query_id = self.discovery.find_node(target.into()).await;
673 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
674 }
675 NetworkServiceCommand::GetValue{ key } => {
676 let query_id = self.discovery.get_value(key.clone()).await;
677 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
678 }
679 NetworkServiceCommand::PutValue { key, value } => {
680 let query_id = self.discovery.put_value(key.clone(), value).await;
681 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
682 }
683 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
684 let kademlia_key = record.key.clone();
685 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
686 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
687 }
688 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
689 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
690 }
691 NetworkServiceCommand::StartProviding { key } => {
692 let query_id = self.discovery.start_providing(key.clone()).await;
693 self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
694 }
695 NetworkServiceCommand::StopProviding { key } => {
696 self.discovery.stop_providing(key).await;
697 }
698 NetworkServiceCommand::GetProviders { key } => {
699 let query_id = self.discovery.get_providers(key.clone()).await;
700 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
701 }
702 NetworkServiceCommand::EventStream { tx } => {
703 self.event_streams.push(tx);
704 }
705 NetworkServiceCommand::Status { tx } => {
706 let _ = tx.send(NetworkStatus {
707 num_connected_peers: self
708 .peerset_handles
709 .get(&self.block_announce_protocol)
710 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
711 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
712 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
713 });
714 }
715 NetworkServiceCommand::AddPeersToReservedSet {
716 protocol,
717 peers,
718 } => {
719 let peers = self.add_addresses(peers.into_iter().map(Into::into));
720
721 match self.peerset_handles.get(&protocol) {
722 Some(handle) => {
723 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
724 }
725 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
726 };
727 }
728 NetworkServiceCommand::AddKnownAddress { peer, address } => {
729 let mut address: Multiaddr = address.into();
730
731 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
732 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
733 }
734
735 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
736 self.peerstore_handle.add_known_peer(peer);
740 } else {
741 log::debug!(
742 target: LOG_TARGET,
743 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
744 );
745 }
746 },
747 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
748 let peers = self.add_addresses(peers.into_iter().map(Into::into));
749
750 match self.peerset_handles.get(&protocol) {
751 Some(handle) => {
752 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
753 }
754 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
755 }
756
757 },
758 NetworkServiceCommand::DisconnectPeer {
759 protocol,
760 peer,
761 } => {
762 let Some(handle) = self.peerset_handles.get(&protocol) else {
763 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
764 continue
765 };
766
767 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
768 }
769 NetworkServiceCommand::SetReservedOnly {
770 protocol,
771 reserved_only,
772 } => {
773 let Some(handle) = self.peerset_handles.get(&protocol) else {
774 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
775 continue
776 };
777
778 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
779 }
780 NetworkServiceCommand::RemoveReservedPeers {
781 protocol,
782 peers,
783 } => {
784 let Some(handle) = self.peerset_handles.get(&protocol) else {
785 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
786 continue
787 };
788
789 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
790 }
791 }
792 },
793 event = self.discovery.next() => match event {
794 None => return,
795 Some(DiscoveryEvent::Discovered { addresses }) => {
796 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
798 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
799 self.peerstore_handle.add_known_peer(peer);
800 }
801 }
802 }
803 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
804 for peer in peers {
805 self.peerstore_handle.add_known_peer(peer.into());
806 }
807 }
808 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
809 match self.pending_queries.remove(&query_id) {
810 Some(KadQuery::FindNode(_, started)) => {
811 log::trace!(
812 target: LOG_TARGET,
813 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
814 );
815
816 self.event_streams.send(
817 Event::Dht(
818 DhtEvent::ClosestPeersFound(
819 target.into(),
820 peers
821 .into_iter()
822 .map(|(peer, addrs)| (
823 peer.into(),
824 addrs.into_iter().map(Into::into).collect(),
825 ))
826 .collect(),
827 )
828 )
829 );
830
831 if let Some(ref metrics) = self.metrics {
832 metrics
833 .kademlia_query_duration
834 .with_label_values(&["node-find"])
835 .observe(started.elapsed().as_secs_f64());
836 }
837 },
838 query => {
839 log::error!(
840 target: LOG_TARGET,
841 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
842 );
843 debug_assert!(false);
844 }
845 }
846 },
847 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
848 if !self.pending_queries.contains_key(&query_id) {
849 log::error!(
850 target: LOG_TARGET,
851 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
852 );
853
854 continue
855 }
856
857 let peer_id: sc_network_types::PeerId = record.peer.into();
858 let record = PeerRecord {
859 record: P2PRecord {
860 key: record.record.key.to_vec().into(),
861 value: record.record.value,
862 publisher: record.record.publisher.map(|peer_id| {
863 let peer_id: sc_network_types::PeerId = peer_id.into();
864 peer_id.into()
865 }),
866 expires: record.record.expires,
867 },
868 peer: Some(peer_id.into()),
869 };
870
871 self.event_streams.send(
872 Event::Dht(
873 DhtEvent::ValueFound(
874 record.into()
875 )
876 )
877 );
878 }
879 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
880 match self.pending_queries.remove(&query_id) {
881 Some(KadQuery::GetValue(key, started)) => {
882 log::trace!(
883 target: LOG_TARGET,
884 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
885 );
886
887 if let Some(ref metrics) = self.metrics {
888 metrics
889 .kademlia_query_duration
890 .with_label_values(&["value-get"])
891 .observe(started.elapsed().as_secs_f64());
892 }
893 },
894 query => {
895 log::error!(
896 target: LOG_TARGET,
897 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
898 );
899 debug_assert!(false);
900 },
901 }
902 }
903 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
904 match self.pending_queries.remove(&query_id) {
905 Some(KadQuery::PutValue(key, started)) => {
906 log::trace!(
907 target: LOG_TARGET,
908 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
909 );
910
911 self.event_streams.send(Event::Dht(
912 DhtEvent::ValuePut(key)
913 ));
914
915 if let Some(ref metrics) = self.metrics {
916 metrics
917 .kademlia_query_duration
918 .with_label_values(&["value-put"])
919 .observe(started.elapsed().as_secs_f64());
920 }
921 },
922 query => {
923 log::error!(
924 target: LOG_TARGET,
925 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
926 );
927 debug_assert!(false);
928 }
929 }
930 }
931 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
932 match self.pending_queries.remove(&query_id) {
933 Some(KadQuery::GetProviders(key, started)) => {
934 log::trace!(
935 target: LOG_TARGET,
936 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
937 );
938
939 providers.iter().for_each(|p| {
944 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
945 });
946
947 self.event_streams.send(Event::Dht(
948 DhtEvent::ProvidersFound(
949 key.clone().into(),
950 providers.into_iter().map(|p| p.peer.into()).collect()
951 )
952 ));
953
954 self.event_streams.send(Event::Dht(
957 DhtEvent::NoMoreProviders(key.into())
958 ));
959
960 if let Some(ref metrics) = self.metrics {
961 metrics
962 .kademlia_query_duration
963 .with_label_values(&["providers-get"])
964 .observe(started.elapsed().as_secs_f64());
965 }
966 },
967 query => {
968 log::error!(
969 target: LOG_TARGET,
970 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
971 );
972 debug_assert!(false);
973 }
974 }
975 }
976 Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
977 match self.pending_queries.remove(&query_id) {
978 Some(KadQuery::AddProvider(key, started)) => {
979 debug_assert_eq!(key, provided_key.into());
980
981 log::trace!(
982 target: LOG_TARGET,
983 "`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
984 );
985
986 self.event_streams.send(Event::Dht(
987 DhtEvent::StartedProviding(key.into())
988 ));
989
990 if let Some(ref metrics) = self.metrics {
991 metrics
992 .kademlia_query_duration
993 .with_label_values(&["provider-add"])
994 .observe(started.elapsed().as_secs_f64());
995 }
996 }
997 Some(_) => {
998 log::error!(
999 target: LOG_TARGET,
1000 "Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
1001 );
1002 debug_assert!(false);
1003 }
1004 None => {
1005 log::trace!(
1006 target: LOG_TARGET,
1007 "`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1008 );
1009 }
1010 }
1011 }
1012 Some(DiscoveryEvent::QueryFailed { query_id }) => {
1013 match self.pending_queries.remove(&query_id) {
1014 Some(KadQuery::FindNode(peer_id, started)) => {
1015 log::debug!(
1016 target: LOG_TARGET,
1017 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1018 );
1019
1020 self.event_streams.send(Event::Dht(
1021 DhtEvent::ClosestPeersNotFound(peer_id.into())
1022 ));
1023
1024 if let Some(ref metrics) = self.metrics {
1025 metrics
1026 .kademlia_query_duration
1027 .with_label_values(&["node-find-failed"])
1028 .observe(started.elapsed().as_secs_f64());
1029 }
1030 },
1031 Some(KadQuery::GetValue(key, started)) => {
1032 log::debug!(
1033 target: LOG_TARGET,
1034 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1035 );
1036
1037 self.event_streams.send(Event::Dht(
1038 DhtEvent::ValueNotFound(key)
1039 ));
1040
1041 if let Some(ref metrics) = self.metrics {
1042 metrics
1043 .kademlia_query_duration
1044 .with_label_values(&["value-get-failed"])
1045 .observe(started.elapsed().as_secs_f64());
1046 }
1047 },
1048 Some(KadQuery::PutValue(key, started)) => {
1049 log::debug!(
1050 target: LOG_TARGET,
1051 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1052 );
1053
1054 self.event_streams.send(Event::Dht(
1055 DhtEvent::ValuePutFailed(key)
1056 ));
1057
1058 if let Some(ref metrics) = self.metrics {
1059 metrics
1060 .kademlia_query_duration
1061 .with_label_values(&["value-put-failed"])
1062 .observe(started.elapsed().as_secs_f64());
1063 }
1064 },
1065 Some(KadQuery::GetProviders(key, started)) => {
1066 log::debug!(
1067 target: LOG_TARGET,
1068 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1069 );
1070
1071 self.event_streams.send(Event::Dht(
1072 DhtEvent::ProvidersNotFound(key)
1073 ));
1074
1075 if let Some(ref metrics) = self.metrics {
1076 metrics
1077 .kademlia_query_duration
1078 .with_label_values(&["providers-get-failed"])
1079 .observe(started.elapsed().as_secs_f64());
1080 }
1081 },
1082 Some(KadQuery::AddProvider(key, started)) => {
1083 log::debug!(
1084 target: LOG_TARGET,
1085 "`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1086 );
1087
1088 self.event_streams.send(Event::Dht(
1089 DhtEvent::StartProvidingFailed(key)
1090 ));
1091
1092 if let Some(ref metrics) = self.metrics {
1093 metrics
1094 .kademlia_query_duration
1095 .with_label_values(&["provider-add-failed"])
1096 .observe(started.elapsed().as_secs_f64());
1097 }
1098 },
1099 None => {
1100 log::debug!(
1101 target: LOG_TARGET,
1102 "non-existent query (likely republishing a provider) failed ({query_id:?})",
1103 );
1104 }
1105 }
1106 }
1107 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1108 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1109 }
1110 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1111 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1112 Ok(inserted) => if inserted {
1113 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1114 },
1115 Err(err) => {
1116 log::warn!(
1117 target: LOG_TARGET,
1118 "🔍 Failed to add discovered external address {address:?}: {err:?}",
1119 );
1120 },
1121 }
1122 }
1123 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1124 let local_peer_id = self.litep2p.local_peer_id();
1125
1126 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1128 address.with(Protocol::P2p(*local_peer_id.as_ref()))
1129 } else {
1130 address
1131 };
1132
1133 if self.litep2p.public_addresses().remove_address(&address) {
1134 log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1135 } else {
1136 log::warn!(
1137 target: LOG_TARGET,
1138 "🔍 Failed to remove expired external address {address:?}"
1139 );
1140 }
1141 }
1142 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1143 log::trace!(
1144 target: LOG_TARGET,
1145 "ping time with {peer:?}: {rtt:?}",
1146 );
1147 }
1148 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1149 self.event_streams.send(Event::Dht(
1150 DhtEvent::PutRecordRequest(
1151 key.into(),
1152 value,
1153 publisher.map(Into::into),
1154 expires,
1155 )
1156 ));
1157 },
1158
1159 Some(DiscoveryEvent::RandomKademliaStarted) => {
1160 if let Some(metrics) = self.metrics.as_ref() {
1161 metrics.kademlia_random_queries_total.inc();
1162 }
1163 }
1164 },
1165 event = self.litep2p.next_event() => match event {
1166 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1167 let Some(metrics) = &self.metrics else {
1168 continue;
1169 };
1170
1171 let direction = match endpoint {
1172 Endpoint::Dialer { .. } => "out",
1173 Endpoint::Listener { .. } => {
1174 metrics.incoming_connections_total.inc();
1179
1180 "in"
1181 },
1182 };
1183 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1184
1185 match self.peers.entry(peer) {
1186 Entry::Vacant(entry) => {
1187 entry.insert(ConnectionContext {
1188 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1189 num_connections: 1usize,
1190 });
1191 metrics.distinct_peers_connections_opened_total.inc();
1192 }
1193 Entry::Occupied(entry) => {
1194 let entry = entry.into_mut();
1195 entry.num_connections += 1;
1196 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1197 }
1198 }
1199 }
1200 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1201 let Some(metrics) = &self.metrics else {
1202 continue;
1203 };
1204
1205 let Some(context) = self.peers.get_mut(&peer) else {
1206 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1207 continue
1208 };
1209
1210 let direction = match context.endpoints.remove(&connection_id) {
1211 None => {
1212 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1213 continue
1214 }
1215 Some(endpoint) => {
1216 context.num_connections -= 1;
1217
1218 match endpoint {
1219 Endpoint::Dialer { .. } => "out",
1220 Endpoint::Listener { .. } => "in",
1221 }
1222 }
1223 };
1224
1225 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1226
1227 if context.num_connections == 0 {
1228 self.peers.remove(&peer);
1229 metrics.distinct_peers_connections_closed_total.inc();
1230 }
1231 }
1232 Some(Litep2pEvent::DialFailure { address, error }) => {
1233 log::debug!(
1234 target: LOG_TARGET,
1235 "failed to dial peer at {address:?}: {error:?}",
1236 );
1237
1238 if let Some(metrics) = &self.metrics {
1239 let reason = match error {
1240 DialError::Timeout => "timeout",
1241 DialError::AddressError(_) => "invalid-address",
1242 DialError::DnsError(_) => "cannot-resolve-dns",
1243 DialError::NegotiationError(error) => match error {
1244 NegotiationError::Timeout => "timeout",
1245 NegotiationError::PeerIdMissing => "missing-peer-id",
1246 NegotiationError::StateMismatch => "state-mismatch",
1247 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1248 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1249 NegotiationError::SnowError(_) => "noise-error",
1250 NegotiationError::ParseError(_) => "parse-error",
1251 NegotiationError::IoError(_) => "io-error",
1252 NegotiationError::WebSocket(_) => "webscoket-error",
1253 NegotiationError::BadSignature => "bad-signature",
1254 }
1255 };
1256
1257 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1258 }
1259 }
1260 Some(Litep2pEvent::ListDialFailures { errors }) => {
1261 log::debug!(
1262 target: LOG_TARGET,
1263 "failed to dial peer on multiple addresses {errors:?}",
1264 );
1265
1266 if let Some(metrics) = &self.metrics {
1267 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1268 }
1269 }
1270 None => {
1271 log::error!(
1272 target: LOG_TARGET,
1273 "Litep2p backend terminated"
1274 );
1275 return
1276 }
1277 },
1278 }
1279 }
1280 }
1281}