1use crate::{
22 config::{
23 FullNetworkConfiguration, IncomingRequest, NodeKeyConfig, NotificationHandshake, Params,
24 SetConfig, TransportConfig,
25 },
26 error::Error,
27 event::{DhtEvent, Event},
28 litep2p::{
29 discovery::{Discovery, DiscoveryEvent},
30 peerstore::Peerstore,
31 service::{Litep2pNetworkService, NetworkServiceCommand},
32 shim::{
33 bitswap::BitswapServer,
34 notification::{
35 config::{NotificationProtocolConfig, ProtocolControlHandle},
36 peerset::PeersetCommand,
37 },
38 request_response::{RequestResponseConfig, RequestResponseProtocol},
39 },
40 },
41 peer_store::PeerStoreProvider,
42 service::{
43 metrics::{register_without_sources, MetricSources, Metrics, NotificationMetrics},
44 out_events,
45 traits::{BandwidthSink, NetworkBackend, NetworkService},
46 },
47 NetworkStatus, NotificationService, ProtocolName,
48};
49
50use codec::Encode;
51use futures::StreamExt;
52use litep2p::{
53 config::ConfigBuilder,
54 crypto::ed25519::Keypair,
55 error::{DialError, NegotiationError},
56 executor::Executor,
57 protocol::{
58 libp2p::{
59 bitswap::Config as BitswapConfig,
60 kademlia::{QueryId, Record},
61 },
62 request_response::ConfigBuilder as RequestResponseConfigBuilder,
63 },
64 transport::{
65 tcp::config::Config as TcpTransportConfig,
66 websocket::config::Config as WebSocketTransportConfig, ConnectionLimitsConfig, Endpoint,
67 },
68 types::{
69 multiaddr::{Multiaddr, Protocol},
70 ConnectionId,
71 },
72 Litep2p, Litep2pEvent, ProtocolName as Litep2pProtocolName,
73};
74use prometheus_endpoint::Registry;
75use sc_network_types::kad::{Key as RecordKey, PeerRecord, Record as P2PRecord};
76
77use sc_client_api::BlockBackend;
78use sc_network_common::{role::Roles, ExHashT};
79use sc_network_types::PeerId;
80use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
81use sp_runtime::traits::Block as BlockT;
82
83use std::{
84 cmp,
85 collections::{hash_map::Entry, HashMap, HashSet},
86 fs,
87 future::Future,
88 iter,
89 pin::Pin,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97mod discovery;
98mod peerstore;
99mod service;
100mod shim;
101
102struct Litep2pBandwidthSink {
104 sink: litep2p::BandwidthSink,
105}
106
107impl BandwidthSink for Litep2pBandwidthSink {
108 fn total_inbound(&self) -> u64 {
109 self.sink.inbound() as u64
110 }
111
112 fn total_outbound(&self) -> u64 {
113 self.sink.outbound() as u64
114 }
115}
116
117struct Litep2pExecutor {
119 executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
121}
122
123impl Executor for Litep2pExecutor {
124 fn run(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
125 (self.executor)(future)
126 }
127
128 fn run_with_name(&self, _: &'static str, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
129 (self.executor)(future)
130 }
131}
132
133const LOG_TARGET: &str = "sub-libp2p";
135
136struct ConnectionContext {
138 endpoints: HashMap<ConnectionId, Endpoint>,
140
141 num_connections: usize,
143}
144
145#[derive(Debug)]
147enum KadQuery {
148 FindNode(PeerId, Instant),
150 GetValue(RecordKey, Instant),
152 PutValue(RecordKey, Instant),
154 GetProviders(RecordKey, Instant),
156 AddProvider(RecordKey, Instant),
158}
159
160pub struct Litep2pNetworkBackend {
162 litep2p: Litep2p,
164
165 network_service: Arc<dyn NetworkService>,
167
168 cmd_rx: TracingUnboundedReceiver<NetworkServiceCommand>,
170
171 peerset_handles: HashMap<ProtocolName, ProtocolControlHandle>,
173
174 pending_queries: HashMap<QueryId, KadQuery>,
176
177 discovery: Discovery,
179
180 num_connected: Arc<AtomicUsize>,
182
183 peers: HashMap<litep2p::PeerId, ConnectionContext>,
185
186 peerstore_handle: Arc<dyn PeerStoreProvider>,
188
189 block_announce_protocol: ProtocolName,
191
192 event_streams: out_events::OutChannels,
194
195 metrics: Option<Metrics>,
197}
198
199impl Litep2pNetworkBackend {
200 fn parse_addresses(
203 addresses: impl Iterator<Item = Multiaddr>,
204 ) -> HashMap<PeerId, Vec<Multiaddr>> {
205 addresses
206 .into_iter()
207 .filter_map(|address| match address.iter().next() {
208 Some(
209 Protocol::Dns(_) |
210 Protocol::Dns4(_) |
211 Protocol::Dns6(_) |
212 Protocol::Ip6(_) |
213 Protocol::Ip4(_),
214 ) => match address.iter().find(|protocol| std::matches!(protocol, Protocol::P2p(_)))
215 {
216 Some(Protocol::P2p(multihash)) => PeerId::from_multihash(multihash.into())
217 .map_or(None, |peer| Some((peer, Some(address)))),
218 _ => None,
219 },
220 Some(Protocol::P2p(multihash)) =>
221 PeerId::from_multihash(multihash.into()).map_or(None, |peer| Some((peer, None))),
222 _ => None,
223 })
224 .fold(HashMap::new(), |mut acc, (peer, maybe_address)| {
225 let entry = acc.entry(peer).or_default();
226 maybe_address.map(|address| entry.push(address));
227
228 acc
229 })
230 }
231
232 fn add_addresses(&mut self, peers: impl Iterator<Item = Multiaddr>) -> HashSet<PeerId> {
234 Self::parse_addresses(peers.into_iter())
235 .into_iter()
236 .filter_map(|(peer, addresses)| {
237 if addresses.is_empty() {
239 return Some(peer)
240 }
241
242 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) == 0 {
243 log::warn!(
244 target: LOG_TARGET,
245 "couldn't add any addresses for {peer:?} and it won't be added as reserved peer",
246 );
247 return None
248 }
249
250 self.peerstore_handle.add_known_peer(peer);
251 Some(peer)
252 })
253 .collect()
254 }
255}
256
257impl Litep2pNetworkBackend {
258 fn get_keypair(node_key: &NodeKeyConfig) -> Result<(Keypair, litep2p::PeerId), Error> {
260 let secret: litep2p::crypto::ed25519::SecretKey =
261 node_key.clone().into_keypair()?.secret().into();
262
263 let local_identity = Keypair::from(secret);
264 let local_public = local_identity.public();
265 let local_peer_id = local_public.to_peer_id();
266
267 Ok((local_identity, local_peer_id))
268 }
269
270 fn configure_transport<B: BlockT + 'static, H: ExHashT>(
272 config: &FullNetworkConfiguration<B, H, Self>,
273 ) -> ConfigBuilder {
274 let _ = match config.network_config.transport {
275 TransportConfig::MemoryOnly => panic!("memory transport not supported"),
276 TransportConfig::Normal { .. } => false,
277 };
278 let config_builder = ConfigBuilder::new();
279
280 let (tcp, websocket): (Vec<Option<_>>, Vec<Option<_>>) = config
281 .network_config
282 .listen_addresses
283 .iter()
284 .filter_map(|address| {
285 use sc_network_types::multiaddr::Protocol;
286
287 let mut iter = address.iter();
288
289 match iter.next() {
290 Some(Protocol::Ip4(_) | Protocol::Ip6(_)) => {},
291 protocol => {
292 log::error!(
293 target: LOG_TARGET,
294 "unknown protocol {protocol:?}, ignoring {address:?}",
295 );
296
297 return None
298 },
299 }
300
301 match iter.next() {
302 Some(Protocol::Tcp(_)) => match iter.next() {
303 Some(Protocol::Ws(_) | Protocol::Wss(_)) =>
304 Some((None, Some(address.clone()))),
305 Some(Protocol::P2p(_)) | None => Some((Some(address.clone()), None)),
306 protocol => {
307 log::error!(
308 target: LOG_TARGET,
309 "unknown protocol {protocol:?}, ignoring {address:?}",
310 );
311 None
312 },
313 },
314 protocol => {
315 log::error!(
316 target: LOG_TARGET,
317 "unknown protocol {protocol:?}, ignoring {address:?}",
318 );
319 None
320 },
321 }
322 })
323 .unzip();
324
325 config_builder
326 .with_websocket(WebSocketTransportConfig {
327 listen_addresses: websocket.into_iter().flatten().map(Into::into).collect(),
328 yamux_config: litep2p::yamux::Config::default(),
329 nodelay: true,
330 ..Default::default()
331 })
332 .with_tcp(TcpTransportConfig {
333 listen_addresses: tcp.into_iter().flatten().map(Into::into).collect(),
334 yamux_config: litep2p::yamux::Config::default(),
335 nodelay: true,
336 ..Default::default()
337 })
338 }
339}
340
341#[async_trait::async_trait]
342impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBackend {
343 type NotificationProtocolConfig = NotificationProtocolConfig;
344 type RequestResponseProtocolConfig = RequestResponseConfig;
345 type NetworkService<Block, Hash> = Arc<Litep2pNetworkService>;
346 type PeerStore = Peerstore;
347 type BitswapConfig = BitswapConfig;
348
349 fn new(mut params: Params<B, H, Self>) -> Result<Self, Error>
350 where
351 Self: Sized,
352 {
353 let (keypair, local_peer_id) =
354 Self::get_keypair(¶ms.network_config.network_config.node_key)?;
355 let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000);
356
357 params.network_config.network_config.boot_nodes = params
358 .network_config
359 .network_config
360 .boot_nodes
361 .into_iter()
362 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
363 .collect();
364 params.network_config.network_config.default_peers_set.reserved_nodes = params
365 .network_config
366 .network_config
367 .default_peers_set
368 .reserved_nodes
369 .into_iter()
370 .filter(|reserved_node| {
371 if reserved_node.peer_id == local_peer_id.into() {
372 log::warn!(
373 target: LOG_TARGET,
374 "Local peer ID used in reserved node, ignoring: {reserved_node}",
375 );
376 false
377 } else {
378 true
379 }
380 })
381 .collect();
382
383 if let Some(path) = ¶ms.network_config.network_config.net_config_path {
384 fs::create_dir_all(path)?;
385 }
386
387 log::info!(target: LOG_TARGET, "Local node identity is: {local_peer_id}");
388 log::info!(target: LOG_TARGET, "Running litep2p network backend");
389
390 params.network_config.sanity_check_addresses()?;
391 params.network_config.sanity_check_bootnodes()?;
392
393 let mut config_builder =
394 Self::configure_transport(¶ms.network_config).with_keypair(keypair.clone());
395 let known_addresses = params.network_config.known_addresses();
396 let peer_store_handle = params.network_config.peer_store_handle();
397 let executor = Arc::new(Litep2pExecutor { executor: params.executor });
398
399 let FullNetworkConfiguration {
400 notification_protocols,
401 request_response_protocols,
402 network_config,
403 ..
404 } = params.network_config;
405
406 let block_announce_protocol = params.block_announce_config.protocol_name().clone();
412 let mut notif_protocols = HashMap::from_iter([(
413 params.block_announce_config.protocol_name().clone(),
414 params.block_announce_config.handle,
415 )]);
416
417 config_builder = notification_protocols
419 .into_iter()
420 .fold(config_builder, |config_builder, mut config| {
421 config.config.set_handshake(Roles::from(¶ms.role).encode());
422 notif_protocols.insert(config.protocol_name, config.handle);
423
424 config_builder.with_notification_protocol(config.config)
425 })
426 .with_notification_protocol(params.block_announce_config.config);
427
428 let metrics = match ¶ms.metrics_registry {
430 Some(registry) => Some(register_without_sources(registry)?),
431 None => None,
432 };
433
434 let (mut request_response_receivers, request_response_senders): (
440 HashMap<_, _>,
441 HashMap<_, _>,
442 ) = request_response_protocols
443 .iter()
444 .map(|config| {
445 let (tx, rx) = tracing_unbounded("outbound-requests", 10_000);
446 ((config.protocol_name.clone(), rx), (config.protocol_name.clone(), tx))
447 })
448 .unzip();
449
450 config_builder = request_response_protocols.into_iter().fold(
451 config_builder,
452 |config_builder, config| {
453 let (protocol_config, handle) = RequestResponseConfigBuilder::new(
454 Litep2pProtocolName::from(config.protocol_name.clone()),
455 )
456 .with_max_size(cmp::max(config.max_request_size, config.max_response_size) as usize)
457 .with_fallback_names(config.fallback_names.into_iter().map(From::from).collect())
458 .with_timeout(config.request_timeout)
459 .build();
460
461 let protocol = RequestResponseProtocol::new(
462 config.protocol_name.clone(),
463 handle,
464 Arc::clone(&peer_store_handle),
465 config.inbound_queue,
466 request_response_receivers
467 .remove(&config.protocol_name)
468 .expect("receiver exists as it was just added and there are no duplicate protocols; qed"),
469 request_response_senders.clone(),
470 metrics.clone(),
471 );
472
473 executor.run(Box::pin(async move {
474 protocol.run().await;
475 }));
476
477 config_builder.with_request_response_protocol(protocol_config)
478 },
479 );
480
481 let known_addresses: HashMap<litep2p::PeerId, Vec<Multiaddr>> =
483 known_addresses.into_iter().fold(HashMap::new(), |mut acc, (peer, address)| {
484 use sc_network_types::multiaddr::Protocol;
485
486 let address = match address.iter().last() {
487 Some(Protocol::Ws(_) | Protocol::Wss(_) | Protocol::Tcp(_)) =>
488 address.with(Protocol::P2p(peer.into())),
489 Some(Protocol::P2p(_)) => address,
490 _ => return acc,
491 };
492
493 acc.entry(peer.into()).or_default().push(address.into());
494 peer_store_handle.add_known_peer(peer);
495
496 acc
497 });
498
499 let listen_addresses = Arc::new(Default::default());
501 let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
502 Discovery::new(
503 local_peer_id,
504 &network_config,
505 params.genesis_hash,
506 params.fork_id.as_deref(),
507 ¶ms.protocol_id,
508 known_addresses.clone(),
509 Arc::clone(&listen_addresses),
510 Arc::clone(&peer_store_handle),
511 );
512
513 config_builder = config_builder
514 .with_known_addresses(known_addresses.clone().into_iter())
515 .with_libp2p_ping(ping_config)
516 .with_libp2p_identify(identify_config)
517 .with_libp2p_kademlia(kademlia_config)
518 .with_connection_limits(ConnectionLimitsConfig::default().max_incoming_connections(
519 Some(crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING as usize),
520 ))
521 .with_keep_alive_timeout(network_config.idle_connection_timeout)
522 .with_system_resolver()
525 .with_executor(executor);
526
527 if let Some(config) = maybe_mdns_config {
528 config_builder = config_builder.with_mdns(config);
529 }
530
531 if let Some(config) = params.bitswap_config {
532 config_builder = config_builder.with_libp2p_bitswap(config);
533 }
534
535 let litep2p =
536 Litep2p::new(config_builder.build()).map_err(|error| Error::Litep2p(error))?;
537
538 litep2p.listen_addresses().for_each(|address| {
539 log::debug!(target: LOG_TARGET, "listening on: {address}");
540
541 listen_addresses.write().insert(address.clone());
542 });
543
544 let public_addresses = litep2p.public_addresses();
545 for address in network_config.public_addresses.iter() {
546 if let Err(err) = public_addresses.add_address(address.clone().into()) {
547 log::warn!(
548 target: LOG_TARGET,
549 "failed to add public address {address:?}: {err:?}",
550 );
551 }
552 }
553
554 let network_service = Arc::new(Litep2pNetworkService::new(
555 local_peer_id,
556 keypair.clone(),
557 cmd_tx,
558 Arc::clone(&peer_store_handle),
559 notif_protocols.clone(),
560 block_announce_protocol.clone(),
561 request_response_senders,
562 Arc::clone(&listen_addresses),
563 public_addresses,
564 ));
565
566 let num_connected = Arc::new(Default::default());
568 let bandwidth: Arc<dyn BandwidthSink> =
569 Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });
570
571 if let Some(registry) = ¶ms.metrics_registry {
572 MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
573 }
574
575 Ok(Self {
576 network_service,
577 cmd_rx,
578 metrics,
579 peerset_handles: notif_protocols,
580 num_connected,
581 discovery,
582 pending_queries: HashMap::new(),
583 peerstore_handle: peer_store_handle,
584 block_announce_protocol,
585 event_streams: out_events::OutChannels::new(None)?,
586 peers: HashMap::new(),
587 litep2p,
588 })
589 }
590
591 fn network_service(&self) -> Arc<dyn NetworkService> {
592 Arc::clone(&self.network_service)
593 }
594
595 fn peer_store(
596 bootnodes: Vec<sc_network_types::PeerId>,
597 metrics_registry: Option<Registry>,
598 ) -> Self::PeerStore {
599 Peerstore::new(bootnodes, metrics_registry)
600 }
601
602 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
603 NotificationMetrics::new(registry)
604 }
605
606 fn bitswap_server(
608 client: Arc<dyn BlockBackend<B> + Send + Sync>,
609 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
610 BitswapServer::new(client)
611 }
612
613 fn notification_config(
615 protocol_name: ProtocolName,
616 fallback_names: Vec<ProtocolName>,
617 max_notification_size: u64,
618 handshake: Option<NotificationHandshake>,
619 set_config: SetConfig,
620 metrics: NotificationMetrics,
621 peerstore_handle: Arc<dyn PeerStoreProvider>,
622 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
623 Self::NotificationProtocolConfig::new(
624 protocol_name,
625 fallback_names,
626 max_notification_size as usize,
627 handshake,
628 set_config,
629 metrics,
630 peerstore_handle,
631 )
632 }
633
634 fn request_response_config(
636 protocol_name: ProtocolName,
637 fallback_names: Vec<ProtocolName>,
638 max_request_size: u64,
639 max_response_size: u64,
640 request_timeout: Duration,
641 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
642 ) -> Self::RequestResponseProtocolConfig {
643 Self::RequestResponseProtocolConfig::new(
644 protocol_name,
645 fallback_names,
646 max_request_size,
647 max_response_size,
648 request_timeout,
649 inbound_queue,
650 )
651 }
652
653 async fn run(mut self) {
655 log::debug!(target: LOG_TARGET, "starting litep2p network backend");
656
657 loop {
658 let num_connected_peers = self
659 .peerset_handles
660 .get(&self.block_announce_protocol)
661 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
662 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
663
664 tokio::select! {
665 command = self.cmd_rx.next() => match command {
666 None => return,
667 Some(command) => match command {
668 NetworkServiceCommand::FindClosestPeers { target } => {
669 let query_id = self.discovery.find_node(target.into()).await;
670 self.pending_queries.insert(query_id, KadQuery::FindNode(target, Instant::now()));
671 }
672 NetworkServiceCommand::GetValue{ key } => {
673 let query_id = self.discovery.get_value(key.clone()).await;
674 self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now()));
675 }
676 NetworkServiceCommand::PutValue { key, value } => {
677 let query_id = self.discovery.put_value(key.clone(), value).await;
678 self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now()));
679 }
680 NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
681 let kademlia_key = record.key.clone();
682 let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await;
683 self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now()));
684 }
685 NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => {
686 self.discovery.store_record(key, value, publisher.map(Into::into), expires).await;
687 }
688 NetworkServiceCommand::StartProviding { key } => {
689 let query_id = self.discovery.start_providing(key.clone()).await;
690 self.pending_queries.insert(query_id, KadQuery::AddProvider(key, Instant::now()));
691 }
692 NetworkServiceCommand::StopProviding { key } => {
693 self.discovery.stop_providing(key).await;
694 }
695 NetworkServiceCommand::GetProviders { key } => {
696 let query_id = self.discovery.get_providers(key.clone()).await;
697 self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now()));
698 }
699 NetworkServiceCommand::EventStream { tx } => {
700 self.event_streams.push(tx);
701 }
702 NetworkServiceCommand::Status { tx } => {
703 let _ = tx.send(NetworkStatus {
704 num_connected_peers: self
705 .peerset_handles
706 .get(&self.block_announce_protocol)
707 .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
708 total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
709 total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
710 });
711 }
712 NetworkServiceCommand::AddPeersToReservedSet {
713 protocol,
714 peers,
715 } => {
716 let peers = self.add_addresses(peers.into_iter().map(Into::into));
717
718 match self.peerset_handles.get(&protocol) {
719 Some(handle) => {
720 let _ = handle.tx.unbounded_send(PeersetCommand::AddReservedPeers { peers });
721 }
722 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
723 };
724 }
725 NetworkServiceCommand::AddKnownAddress { peer, address } => {
726 let mut address: Multiaddr = address.into();
727
728 if !address.iter().any(|protocol| std::matches!(protocol, Protocol::P2p(_))) {
729 address.push(Protocol::P2p(litep2p::PeerId::from(peer).into()));
730 }
731
732 if self.litep2p.add_known_address(peer.into(), iter::once(address.clone())) > 0 {
733 self.peerstore_handle.add_known_peer(peer);
737 } else {
738 log::debug!(
739 target: LOG_TARGET,
740 "couldn't add known address ({address}) for {peer:?}, unsupported transport"
741 );
742 }
743 },
744 NetworkServiceCommand::SetReservedPeers { protocol, peers } => {
745 let peers = self.add_addresses(peers.into_iter().map(Into::into));
746
747 match self.peerset_handles.get(&protocol) {
748 Some(handle) => {
749 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedPeers { peers });
750 }
751 None => log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist"),
752 }
753
754 },
755 NetworkServiceCommand::DisconnectPeer {
756 protocol,
757 peer,
758 } => {
759 let Some(handle) = self.peerset_handles.get(&protocol) else {
760 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
761 continue
762 };
763
764 let _ = handle.tx.unbounded_send(PeersetCommand::DisconnectPeer { peer });
765 }
766 NetworkServiceCommand::SetReservedOnly {
767 protocol,
768 reserved_only,
769 } => {
770 let Some(handle) = self.peerset_handles.get(&protocol) else {
771 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
772 continue
773 };
774
775 let _ = handle.tx.unbounded_send(PeersetCommand::SetReservedOnly { reserved_only });
776 }
777 NetworkServiceCommand::RemoveReservedPeers {
778 protocol,
779 peers,
780 } => {
781 let Some(handle) = self.peerset_handles.get(&protocol) else {
782 log::warn!(target: LOG_TARGET, "protocol {protocol} doens't exist");
783 continue
784 };
785
786 let _ = handle.tx.unbounded_send(PeersetCommand::RemoveReservedPeers { peers });
787 }
788 }
789 },
790 event = self.discovery.next() => match event {
791 None => return,
792 Some(DiscoveryEvent::Discovered { addresses }) => {
793 for (peer, addresses) in Litep2pNetworkBackend::parse_addresses(addresses.into_iter()) {
795 if self.litep2p.add_known_address(peer.into(), addresses.clone().into_iter()) > 0 {
796 self.peerstore_handle.add_known_peer(peer);
797 }
798 }
799 }
800 Some(DiscoveryEvent::RoutingTableUpdate { peers }) => {
801 for peer in peers {
802 self.peerstore_handle.add_known_peer(peer.into());
803 }
804 }
805 Some(DiscoveryEvent::FindNodeSuccess { query_id, target, peers }) => {
806 match self.pending_queries.remove(&query_id) {
807 Some(KadQuery::FindNode(_, started)) => {
808 log::trace!(
809 target: LOG_TARGET,
810 "`FIND_NODE` for {target:?} ({query_id:?}) succeeded",
811 );
812
813 self.event_streams.send(
814 Event::Dht(
815 DhtEvent::ClosestPeersFound(
816 target.into(),
817 peers
818 .into_iter()
819 .map(|(peer, addrs)| (
820 peer.into(),
821 addrs.into_iter().map(Into::into).collect(),
822 ))
823 .collect(),
824 )
825 )
826 );
827
828 if let Some(ref metrics) = self.metrics {
829 metrics
830 .kademlia_query_duration
831 .with_label_values(&["node-find"])
832 .observe(started.elapsed().as_secs_f64());
833 }
834 },
835 query => {
836 log::error!(
837 target: LOG_TARGET,
838 "Missing/invalid pending query for `FIND_NODE`: {query:?}"
839 );
840 debug_assert!(false);
841 }
842 }
843 },
844 Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
845 if !self.pending_queries.contains_key(&query_id) {
846 log::error!(
847 target: LOG_TARGET,
848 "Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
849 );
850
851 continue
852 }
853
854 let peer_id: sc_network_types::PeerId = record.peer.into();
855 let record = PeerRecord {
856 record: P2PRecord {
857 key: record.record.key.to_vec().into(),
858 value: record.record.value,
859 publisher: record.record.publisher.map(|peer_id| {
860 let peer_id: sc_network_types::PeerId = peer_id.into();
861 peer_id.into()
862 }),
863 expires: record.record.expires,
864 },
865 peer: Some(peer_id.into()),
866 };
867
868 self.event_streams.send(
869 Event::Dht(
870 DhtEvent::ValueFound(
871 record.into()
872 )
873 )
874 );
875 }
876 Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
877 match self.pending_queries.remove(&query_id) {
878 Some(KadQuery::GetValue(key, started)) => {
879 log::trace!(
880 target: LOG_TARGET,
881 "`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
882 );
883
884 if let Some(ref metrics) = self.metrics {
885 metrics
886 .kademlia_query_duration
887 .with_label_values(&["value-get"])
888 .observe(started.elapsed().as_secs_f64());
889 }
890 },
891 query => {
892 log::error!(
893 target: LOG_TARGET,
894 "Missing/invalid pending query for `GET_VALUE`: {query:?}"
895 );
896 debug_assert!(false);
897 },
898 }
899 }
900 Some(DiscoveryEvent::PutRecordSuccess { query_id }) => {
901 match self.pending_queries.remove(&query_id) {
902 Some(KadQuery::PutValue(key, started)) => {
903 log::trace!(
904 target: LOG_TARGET,
905 "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded",
906 );
907
908 self.event_streams.send(Event::Dht(
909 DhtEvent::ValuePut(key)
910 ));
911
912 if let Some(ref metrics) = self.metrics {
913 metrics
914 .kademlia_query_duration
915 .with_label_values(&["value-put"])
916 .observe(started.elapsed().as_secs_f64());
917 }
918 },
919 query => {
920 log::error!(
921 target: LOG_TARGET,
922 "Missing/invalid pending query for `PUT_VALUE`: {query:?}"
923 );
924 debug_assert!(false);
925 }
926 }
927 }
928 Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => {
929 match self.pending_queries.remove(&query_id) {
930 Some(KadQuery::GetProviders(key, started)) => {
931 log::trace!(
932 target: LOG_TARGET,
933 "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded",
934 );
935
936 providers.iter().for_each(|p| {
941 self.litep2p.add_known_address(p.peer, p.addresses.clone().into_iter());
942 });
943
944 self.event_streams.send(Event::Dht(
945 DhtEvent::ProvidersFound(
946 key.clone().into(),
947 providers.into_iter().map(|p| p.peer.into()).collect()
948 )
949 ));
950
951 self.event_streams.send(Event::Dht(
954 DhtEvent::NoMoreProviders(key.into())
955 ));
956
957 if let Some(ref metrics) = self.metrics {
958 metrics
959 .kademlia_query_duration
960 .with_label_values(&["providers-get"])
961 .observe(started.elapsed().as_secs_f64());
962 }
963 },
964 query => {
965 log::error!(
966 target: LOG_TARGET,
967 "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}"
968 );
969 debug_assert!(false);
970 }
971 }
972 }
973 Some(DiscoveryEvent::AddProviderSuccess { query_id, provided_key }) => {
974 match self.pending_queries.remove(&query_id) {
975 Some(KadQuery::AddProvider(key, started)) => {
976 debug_assert_eq!(key, provided_key.into());
977
978 log::trace!(
979 target: LOG_TARGET,
980 "`ADD_PROVIDER` for {key:?} ({query_id:?}) succeeded",
981 );
982
983 self.event_streams.send(Event::Dht(
984 DhtEvent::StartedProviding(key.into())
985 ));
986
987 if let Some(ref metrics) = self.metrics {
988 metrics
989 .kademlia_query_duration
990 .with_label_values(&["provider-add"])
991 .observe(started.elapsed().as_secs_f64());
992 }
993 }
994 Some(_) => {
995 log::error!(
996 target: LOG_TARGET,
997 "Invalid pending query for `ADD_PROVIDER`: {query_id:?}"
998 );
999 debug_assert!(false);
1000 }
1001 None => {
1002 log::trace!(
1003 target: LOG_TARGET,
1004 "`ADD_PROVIDER` for key {provided_key:?} ({query_id:?}) succeeded (republishing)",
1005 );
1006 }
1007 }
1008 }
1009 Some(DiscoveryEvent::QueryFailed { query_id }) => {
1010 match self.pending_queries.remove(&query_id) {
1011 Some(KadQuery::FindNode(peer_id, started)) => {
1012 log::debug!(
1013 target: LOG_TARGET,
1014 "`FIND_NODE` ({query_id:?}) failed for target {peer_id:?}",
1015 );
1016
1017 self.event_streams.send(Event::Dht(
1018 DhtEvent::ClosestPeersNotFound(peer_id.into())
1019 ));
1020
1021 if let Some(ref metrics) = self.metrics {
1022 metrics
1023 .kademlia_query_duration
1024 .with_label_values(&["node-find-failed"])
1025 .observe(started.elapsed().as_secs_f64());
1026 }
1027 },
1028 Some(KadQuery::GetValue(key, started)) => {
1029 log::debug!(
1030 target: LOG_TARGET,
1031 "`GET_VALUE` ({query_id:?}) failed for key {key:?}",
1032 );
1033
1034 self.event_streams.send(Event::Dht(
1035 DhtEvent::ValueNotFound(key)
1036 ));
1037
1038 if let Some(ref metrics) = self.metrics {
1039 metrics
1040 .kademlia_query_duration
1041 .with_label_values(&["value-get-failed"])
1042 .observe(started.elapsed().as_secs_f64());
1043 }
1044 },
1045 Some(KadQuery::PutValue(key, started)) => {
1046 log::debug!(
1047 target: LOG_TARGET,
1048 "`PUT_VALUE` ({query_id:?}) failed for key {key:?}",
1049 );
1050
1051 self.event_streams.send(Event::Dht(
1052 DhtEvent::ValuePutFailed(key)
1053 ));
1054
1055 if let Some(ref metrics) = self.metrics {
1056 metrics
1057 .kademlia_query_duration
1058 .with_label_values(&["value-put-failed"])
1059 .observe(started.elapsed().as_secs_f64());
1060 }
1061 },
1062 Some(KadQuery::GetProviders(key, started)) => {
1063 log::debug!(
1064 target: LOG_TARGET,
1065 "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}"
1066 );
1067
1068 self.event_streams.send(Event::Dht(
1069 DhtEvent::ProvidersNotFound(key)
1070 ));
1071
1072 if let Some(ref metrics) = self.metrics {
1073 metrics
1074 .kademlia_query_duration
1075 .with_label_values(&["providers-get-failed"])
1076 .observe(started.elapsed().as_secs_f64());
1077 }
1078 },
1079 Some(KadQuery::AddProvider(key, started)) => {
1080 log::debug!(
1081 target: LOG_TARGET,
1082 "`ADD_PROVIDER` ({query_id:?}) failed with key {key:?}",
1083 );
1084
1085 self.event_streams.send(Event::Dht(
1086 DhtEvent::StartProvidingFailed(key)
1087 ));
1088
1089 if let Some(ref metrics) = self.metrics {
1090 metrics
1091 .kademlia_query_duration
1092 .with_label_values(&["provider-add-failed"])
1093 .observe(started.elapsed().as_secs_f64());
1094 }
1095 },
1096 None => {
1097 log::debug!(
1098 target: LOG_TARGET,
1099 "non-existent query (likely republishing a provider) failed ({query_id:?})",
1100 );
1101 }
1102 }
1103 }
1104 Some(DiscoveryEvent::Identified { peer, listen_addresses, supported_protocols, .. }) => {
1105 self.discovery.add_self_reported_address(peer, supported_protocols, listen_addresses).await;
1106 }
1107 Some(DiscoveryEvent::ExternalAddressDiscovered { address }) => {
1108 match self.litep2p.public_addresses().add_address(address.clone().into()) {
1109 Ok(inserted) => if inserted {
1110 log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
1111 },
1112 Err(err) => {
1113 log::warn!(
1114 target: LOG_TARGET,
1115 "🔍 Failed to add discovered external address {address:?}: {err:?}",
1116 );
1117 },
1118 }
1119 }
1120 Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
1121 let local_peer_id = self.litep2p.local_peer_id();
1122
1123 let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
1125 address.with(Protocol::P2p(*local_peer_id.as_ref()))
1126 } else {
1127 address
1128 };
1129
1130 if self.litep2p.public_addresses().remove_address(&address) {
1131 log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
1132 } else {
1133 log::warn!(
1134 target: LOG_TARGET,
1135 "🔍 Failed to remove expired external address {address:?}"
1136 );
1137 }
1138 }
1139 Some(DiscoveryEvent::Ping { peer, rtt }) => {
1140 log::trace!(
1141 target: LOG_TARGET,
1142 "ping time with {peer:?}: {rtt:?}",
1143 );
1144 }
1145 Some(DiscoveryEvent::IncomingRecord { record: Record { key, value, publisher, expires }} ) => {
1146 self.event_streams.send(Event::Dht(
1147 DhtEvent::PutRecordRequest(
1148 key.into(),
1149 value,
1150 publisher.map(Into::into),
1151 expires,
1152 )
1153 ));
1154 },
1155
1156 Some(DiscoveryEvent::RandomKademliaStarted) => {
1157 if let Some(metrics) = self.metrics.as_ref() {
1158 metrics.kademlia_random_queries_total.inc();
1159 }
1160 }
1161 },
1162 event = self.litep2p.next_event() => match event {
1163 Some(Litep2pEvent::ConnectionEstablished { peer, endpoint }) => {
1164 let Some(metrics) = &self.metrics else {
1165 continue;
1166 };
1167
1168 let direction = match endpoint {
1169 Endpoint::Dialer { .. } => "out",
1170 Endpoint::Listener { .. } => {
1171 metrics.incoming_connections_total.inc();
1176
1177 "in"
1178 },
1179 };
1180 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1181
1182 match self.peers.entry(peer) {
1183 Entry::Vacant(entry) => {
1184 entry.insert(ConnectionContext {
1185 endpoints: HashMap::from_iter([(endpoint.connection_id(), endpoint)]),
1186 num_connections: 1usize,
1187 });
1188 metrics.distinct_peers_connections_opened_total.inc();
1189 }
1190 Entry::Occupied(entry) => {
1191 let entry = entry.into_mut();
1192 entry.num_connections += 1;
1193 entry.endpoints.insert(endpoint.connection_id(), endpoint);
1194 }
1195 }
1196 }
1197 Some(Litep2pEvent::ConnectionClosed { peer, connection_id }) => {
1198 let Some(metrics) = &self.metrics else {
1199 continue;
1200 };
1201
1202 let Some(context) = self.peers.get_mut(&peer) else {
1203 log::debug!(target: LOG_TARGET, "unknown peer disconnected: {peer:?} ({connection_id:?})");
1204 continue
1205 };
1206
1207 let direction = match context.endpoints.remove(&connection_id) {
1208 None => {
1209 log::debug!(target: LOG_TARGET, "connection {connection_id:?} doesn't exist for {peer:?} ");
1210 continue
1211 }
1212 Some(endpoint) => {
1213 context.num_connections -= 1;
1214
1215 match endpoint {
1216 Endpoint::Dialer { .. } => "out",
1217 Endpoint::Listener { .. } => "in",
1218 }
1219 }
1220 };
1221
1222 metrics.connections_closed_total.with_label_values(&[direction, "actively-closed"]).inc();
1223
1224 if context.num_connections == 0 {
1225 self.peers.remove(&peer);
1226 metrics.distinct_peers_connections_closed_total.inc();
1227 }
1228 }
1229 Some(Litep2pEvent::DialFailure { address, error }) => {
1230 log::debug!(
1231 target: LOG_TARGET,
1232 "failed to dial peer at {address:?}: {error:?}",
1233 );
1234
1235 if let Some(metrics) = &self.metrics {
1236 let reason = match error {
1237 DialError::Timeout => "timeout",
1238 DialError::AddressError(_) => "invalid-address",
1239 DialError::DnsError(_) => "cannot-resolve-dns",
1240 DialError::NegotiationError(error) => match error {
1241 NegotiationError::Timeout => "timeout",
1242 NegotiationError::PeerIdMissing => "missing-peer-id",
1243 NegotiationError::StateMismatch => "state-mismatch",
1244 NegotiationError::PeerIdMismatch(_,_) => "peer-id-missmatch",
1245 NegotiationError::MultistreamSelectError(_) => "multistream-select-error",
1246 NegotiationError::SnowError(_) => "noise-error",
1247 NegotiationError::ParseError(_) => "parse-error",
1248 NegotiationError::IoError(_) => "io-error",
1249 NegotiationError::WebSocket(_) => "webscoket-error",
1250 NegotiationError::BadSignature => "bad-signature",
1251 }
1252 };
1253
1254 metrics.pending_connections_errors_total.with_label_values(&[&reason]).inc();
1255 }
1256 }
1257 Some(Litep2pEvent::ListDialFailures { errors }) => {
1258 log::debug!(
1259 target: LOG_TARGET,
1260 "failed to dial peer on multiple addresses {errors:?}",
1261 );
1262
1263 if let Some(metrics) = &self.metrics {
1264 metrics.pending_connections_errors_total.with_label_values(&["transport-errors"]).inc();
1265 }
1266 }
1267 None => {
1268 log::error!(
1269 target: LOG_TARGET,
1270 "Litep2p backend terminated"
1271 );
1272 return
1273 }
1274 },
1275 }
1276 }
1277 }
1278}