1use crate::{
19 behaviour::{self, Behaviour, BehaviourOut},
20 bitswap::BitswapRequestHandler,
21 config::{
22 parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
23 NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
24 },
25 discovery::DiscoveryConfig,
26 error::Error,
27 event::{DhtEvent, Event},
28 network_state::{
29 NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
30 },
31 peer_store::{PeerStore, PeerStoreProvider},
32 protocol::{self, Protocol, Ready},
33 protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
34 request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
35 service::{
36 signature::{Signature, SigningError},
37 traits::{
38 BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
39 NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
40 NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
41 NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
42 },
43 },
44 transport,
45 types::ProtocolName,
46 NotificationService, ReputationChange,
47};
48
49use crate::types::kad::{Key as KademliaKey, Record};
50use codec::DecodeAll;
51use futures::{channel::oneshot, prelude::*};
52use libp2p::{
53 connection_limits::{ConnectionLimits, Exceeded},
54 core::{upgrade, ConnectedPoint, Endpoint},
55 identify::Info as IdentifyInfo,
56 identity::ed25519,
57 multiaddr::{self, Multiaddr},
58 swarm::{
59 Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
60 NetworkBehaviour, Swarm, SwarmEvent,
61 },
62 PeerId,
63};
64use log::{debug, error, info, trace, warn};
65use metrics::{Histogram, MetricSources, Metrics};
66use parking_lot::Mutex;
67use soil_prometheus::Registry;
68
69use crate::common::{
70 role::{ObservedRole, Roles},
71 ExHashT,
72};
73use soil_client::client_api::BlockBackend;
74use soil_client::utils::mpsc::{
75 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
76};
77use subsoil::runtime::traits::Block as BlockT;
78
79pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
80pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
81pub use metrics::NotificationMetrics;
82pub use protocol::NotificationsSink;
83use std::{
84 collections::{HashMap, HashSet},
85 fs, iter,
86 marker::PhantomData,
87 num::NonZeroUsize,
88 pin::Pin,
89 str,
90 sync::{
91 atomic::{AtomicUsize, Ordering},
92 Arc,
93 },
94 time::{Duration, Instant},
95};
96
97pub(crate) mod metrics;
98pub(crate) mod out_events;
99
100pub mod signature;
101pub mod traits;
102
103const LOG_TARGET: &str = "sub-libp2p";
105
106struct Libp2pBandwidthSink {
107 #[allow(deprecated)]
108 sink: Arc<transport::BandwidthSinks>,
109}
110
111impl BandwidthSink for Libp2pBandwidthSink {
112 fn total_inbound(&self) -> u64 {
113 self.sink.total_inbound()
114 }
115
116 fn total_outbound(&self) -> u64 {
117 self.sink.total_outbound()
118 }
119}
120
121pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
123 num_connected: Arc<AtomicUsize>,
125 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
127 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
129 local_peer_id: PeerId,
131 local_identity: Keypair,
133 bandwidth: Arc<dyn BandwidthSink>,
135 to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
137 notification_protocol_ids: HashMap<ProtocolName, SetId>,
140 protocol_handles: Vec<protocol_controller::ProtocolHandle>,
143 sync_protocol_handle: protocol_controller::ProtocolHandle,
145 peer_store_handle: Arc<dyn PeerStoreProvider>,
147 _marker: PhantomData<H>,
150 _block: PhantomData<B>,
152}
153
154#[async_trait::async_trait]
155impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
156where
157 B: BlockT + 'static,
158 H: ExHashT,
159{
160 type NotificationProtocolConfig = NonDefaultSetConfig;
161 type RequestResponseProtocolConfig = RequestResponseConfig;
162 type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
163 type PeerStore = PeerStore;
164 type BitswapConfig = RequestResponseConfig;
165
166 fn new(params: Params<B, H, Self>) -> Result<Self, Error>
167 where
168 Self: Sized,
169 {
170 NetworkWorker::new(params)
171 }
172
173 fn network_service(&self) -> Arc<dyn NetworkServiceT> {
175 self.service.clone()
176 }
177
178 fn peer_store(
180 bootnodes: Vec<crate::types::PeerId>,
181 metrics_registry: Option<Registry>,
182 ) -> Self::PeerStore {
183 PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
184 }
185
186 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
187 NotificationMetrics::new(registry)
188 }
189
190 fn bitswap_server(
191 client: Arc<dyn BlockBackend<B> + Send + Sync>,
192 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
193 let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
194
195 (Box::pin(async move { handler.run().await }), protocol_config)
196 }
197
198 fn notification_config(
200 protocol_name: ProtocolName,
201 fallback_names: Vec<ProtocolName>,
202 max_notification_size: u64,
203 handshake: Option<NotificationHandshake>,
204 set_config: SetConfig,
205 _metrics: NotificationMetrics,
206 _peerstore_handle: Arc<dyn PeerStoreProvider>,
207 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
208 NonDefaultSetConfig::new(
209 protocol_name,
210 fallback_names,
211 max_notification_size,
212 handshake,
213 set_config,
214 )
215 }
216
217 fn request_response_config(
219 protocol_name: ProtocolName,
220 fallback_names: Vec<ProtocolName>,
221 max_request_size: u64,
222 max_response_size: u64,
223 request_timeout: Duration,
224 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
225 ) -> Self::RequestResponseProtocolConfig {
226 Self::RequestResponseProtocolConfig {
227 name: protocol_name,
228 fallback_names,
229 max_request_size,
230 max_response_size,
231 request_timeout,
232 inbound_queue,
233 }
234 }
235
236 async fn run(mut self) {
238 self.run().await
239 }
240}
241
242impl<B, H> NetworkWorker<B, H>
243where
244 B: BlockT + 'static,
245 H: ExHashT,
246{
247 pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
253 let peer_store_handle = params.network_config.peer_store_handle();
254 let FullNetworkConfiguration {
255 notification_protocols,
256 request_response_protocols,
257 mut network_config,
258 ..
259 } = params.network_config;
260
261 let local_identity = network_config.node_key.clone().into_keypair()?;
263 let local_public = local_identity.public();
264 let local_peer_id = local_public.to_peer_id();
265
266 let local_identity: ed25519::Keypair = local_identity.into();
268 let local_public: ed25519::PublicKey = local_public.into();
269 let local_peer_id: PeerId = local_peer_id.into();
270
271 network_config.boot_nodes = network_config
272 .boot_nodes
273 .into_iter()
274 .filter(|boot_node| boot_node.peer_id != crate::PeerId::from(&local_peer_id))
275 .collect();
276 network_config.default_peers_set.reserved_nodes = network_config
277 .default_peers_set
278 .reserved_nodes
279 .into_iter()
280 .filter(|reserved_node| {
281 if reserved_node.peer_id == crate::PeerId::from(&local_peer_id) {
282 warn!(
283 target: LOG_TARGET,
284 "Local peer ID used in reserved node, ignoring: {}",
285 reserved_node,
286 );
287 false
288 } else {
289 true
290 }
291 })
292 .collect();
293
294 ensure_addresses_consistent_with_transport(
296 network_config.listen_addresses.iter(),
297 &network_config.transport,
298 )?;
299 ensure_addresses_consistent_with_transport(
300 network_config.boot_nodes.iter().map(|x| &x.multiaddr),
301 &network_config.transport,
302 )?;
303 ensure_addresses_consistent_with_transport(
304 network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
305 &network_config.transport,
306 )?;
307 for notification_protocol in ¬ification_protocols {
308 ensure_addresses_consistent_with_transport(
309 notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
310 &network_config.transport,
311 )?;
312 }
313 ensure_addresses_consistent_with_transport(
314 network_config.public_addresses.iter(),
315 &network_config.transport,
316 )?;
317
318 let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
319
320 if let Some(path) = &network_config.net_config_path {
321 fs::create_dir_all(path)?;
322 }
323
324 info!(
325 target: LOG_TARGET,
326 "🏷 Local node identity is: {}",
327 local_peer_id.to_base58(),
328 );
329 info!(target: LOG_TARGET, "Running libp2p network backend");
330
331 let (transport, bandwidth) = {
332 let config_mem = match network_config.transport {
333 TransportConfig::MemoryOnly => true,
334 TransportConfig::Normal { .. } => false,
335 };
336
337 transport::build_transport(local_identity.clone().into(), config_mem)
338 };
339
340 let (to_notifications, from_protocol_controllers) =
341 tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
342
343 let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
345 .chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
346
347 let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
348 .enumerate()
349 .map(|(set_id, set_config)| {
350 let proto_set_config = ProtoSetConfig {
351 in_peers: set_config.in_peers,
352 out_peers: set_config.out_peers,
353 reserved_nodes: set_config
354 .reserved_nodes
355 .iter()
356 .map(|node| node.peer_id.into())
357 .collect(),
358 reserved_only: set_config.non_reserved_mode.is_reserved_only(),
359 };
360
361 ProtocolController::new(
362 SetId::from(set_id),
363 proto_set_config,
364 to_notifications.clone(),
365 Arc::clone(&peer_store_handle),
366 )
367 })
368 .unzip();
369
370 let sync_protocol_handle = protocol_handles[0].clone();
372
373 protocol_controllers
375 .into_iter()
376 .for_each(|controller| (params.executor)(controller.run().boxed()));
377
378 let notification_protocol_ids: HashMap<ProtocolName, SetId> =
381 iter::once(¶ms.block_announce_config)
382 .chain(notification_protocols.iter())
383 .enumerate()
384 .map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
385 .collect();
386
387 let known_addresses = {
388 let mut addresses: Vec<_> = network_config
390 .default_peers_set
391 .reserved_nodes
392 .iter()
393 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
394 .chain(notification_protocols.iter().flat_map(|protocol| {
395 protocol
396 .set_config()
397 .reserved_nodes
398 .iter()
399 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
400 }))
401 .chain(
402 network_config
403 .boot_nodes
404 .iter()
405 .map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
406 )
407 .collect();
408
409 addresses.sort();
411 addresses.dedup();
412
413 addresses
414 };
415
416 network_config.boot_nodes.iter().try_for_each(|bootnode| {
418 if let Some(other) = network_config
419 .boot_nodes
420 .iter()
421 .filter(|o| o.multiaddr == bootnode.multiaddr)
422 .find(|o| o.peer_id != bootnode.peer_id)
423 {
424 Err(Error::DuplicateBootnode {
425 address: bootnode.multiaddr.clone().into(),
426 first_id: bootnode.peer_id.into(),
427 second_id: other.peer_id.into(),
428 })
429 } else {
430 Ok(())
431 }
432 })?;
433
434 let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
436
437 for bootnode in network_config.boot_nodes.iter() {
438 boot_node_ids
439 .entry(bootnode.peer_id.into())
440 .or_default()
441 .push(bootnode.multiaddr.clone().into());
442 }
443
444 let boot_node_ids = Arc::new(boot_node_ids);
445
446 let num_connected = Arc::new(AtomicUsize::new(0));
447 let external_addresses = Arc::new(Mutex::new(HashSet::new()));
448
449 let (protocol, notif_protocol_handles) = Protocol::new(
450 From::from(¶ms.role),
451 params.notification_metrics,
452 notification_protocols,
453 params.block_announce_config,
454 Arc::clone(&peer_store_handle),
455 protocol_handles.clone(),
456 from_protocol_controllers,
457 )?;
458
459 let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
461 let user_agent =
462 format!("{} ({})", network_config.client_version, network_config.node_name);
463
464 let discovery_config = {
465 let mut config = DiscoveryConfig::new(local_peer_id);
466 config.with_permanent_addresses(
467 known_addresses
468 .iter()
469 .map(|(peer, address)| (peer.into(), address.clone().into()))
470 .collect::<Vec<_>>(),
471 );
472 config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
473 config.with_kademlia(
474 params.genesis_hash,
475 params.fork_id.as_deref(),
476 ¶ms.protocol_id,
477 );
478 config.with_dht_random_walk(network_config.enable_dht_random_walk);
479 config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
480 config.use_kademlia_disjoint_query_paths(
481 network_config.kademlia_disjoint_query_paths,
482 );
483 config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
484
485 match network_config.transport {
486 TransportConfig::MemoryOnly => {
487 config.with_mdns(false);
488 config.allow_private_ip(false);
489 },
490 TransportConfig::Normal {
491 enable_mdns,
492 allow_private_ip: allow_private_ipv4,
493 ..
494 } => {
495 config.with_mdns(enable_mdns);
496 config.allow_private_ip(allow_private_ipv4);
497 },
498 }
499
500 config
501 };
502
503 let behaviour = {
504 let result = Behaviour::new(
505 protocol,
506 user_agent,
507 local_public.into(),
508 discovery_config,
509 request_response_protocols,
510 Arc::clone(&peer_store_handle),
511 external_addresses.clone(),
512 network_config.public_addresses.iter().cloned().map(Into::into).collect(),
513 ConnectionLimits::default()
514 .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
515 .with_max_established_incoming(Some(
516 crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
517 )),
518 );
519
520 match result {
521 Ok(b) => b,
522 Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
523 return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
524 },
525 }
526 };
527
528 let swarm = {
529 struct SpawnImpl<F>(F);
530 impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
531 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
532 (self.0)(f)
533 }
534 }
535
536 let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
537 .with_substream_upgrade_protocol_override(upgrade::Version::V1)
538 .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
539 .with_per_connection_event_buffer_size(24)
542 .with_max_negotiating_inbound_streams(2048)
543 .with_idle_connection_timeout(network_config.idle_connection_timeout);
544
545 Swarm::new(transport, behaviour, local_peer_id, config)
546 };
547
548 (swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
549 };
550
551 let metrics = match ¶ms.metrics_registry {
553 Some(registry) => Some(metrics::register(
554 registry,
555 MetricSources {
556 bandwidth: bandwidth.clone(),
557 connected_peers: num_connected.clone(),
558 },
559 )?),
560 None => None,
561 };
562
563 for addr in &network_config.listen_addresses {
565 if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
566 warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
567 }
568 }
569
570 for addr in &network_config.public_addresses {
572 Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
573 }
574
575 let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
576
577 let service = Arc::new(NetworkService {
578 bandwidth,
579 external_addresses,
580 listen_addresses: listen_addresses_set.clone(),
581 num_connected: num_connected.clone(),
582 local_peer_id,
583 local_identity: local_identity.into(),
584 to_worker,
585 notification_protocol_ids,
586 protocol_handles,
587 sync_protocol_handle,
588 peer_store_handle: Arc::clone(&peer_store_handle),
589 _marker: PhantomData,
590 _block: Default::default(),
591 });
592
593 Ok(NetworkWorker {
594 listen_addresses: listen_addresses_set,
595 num_connected,
596 network_service: swarm,
597 service,
598 from_service,
599 event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
600 metrics,
601 boot_node_ids,
602 reported_invalid_boot_nodes: Default::default(),
603 peer_store_handle: Arc::clone(&peer_store_handle),
604 notif_protocol_handles,
605 _marker: Default::default(),
606 _block: Default::default(),
607 })
608 }
609
610 pub fn status(&self) -> NetworkStatus {
612 NetworkStatus {
613 num_connected_peers: self.num_connected_peers(),
614 total_bytes_inbound: self.total_bytes_inbound(),
615 total_bytes_outbound: self.total_bytes_outbound(),
616 }
617 }
618
619 pub fn total_bytes_inbound(&self) -> u64 {
621 self.service.bandwidth.total_inbound()
622 }
623
624 pub fn total_bytes_outbound(&self) -> u64 {
626 self.service.bandwidth.total_outbound()
627 }
628
629 pub fn num_connected_peers(&self) -> usize {
631 self.network_service.behaviour().user_protocol().num_sync_peers()
632 }
633
634 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
636 self.network_service.behaviour_mut().add_known_address(peer_id, addr);
637 }
638
639 pub fn service(&self) -> &Arc<NetworkService<B, H>> {
642 &self.service
643 }
644
645 pub fn local_peer_id(&self) -> &PeerId {
647 Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
648 }
649
650 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
654 Swarm::<Behaviour<B>>::listeners(&self.network_service)
655 }
656
657 pub fn network_state(&mut self) -> NetworkState {
662 let swarm = &mut self.network_service;
663 let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
664 let connected_peers = {
665 let swarm = &mut *swarm;
666 open.iter()
667 .filter_map(move |peer_id| {
668 let known_addresses = if let Ok(addrs) =
669 NetworkBehaviour::handle_pending_outbound_connection(
670 swarm.behaviour_mut(),
671 ConnectionId::new_unchecked(0), Some(*peer_id),
673 &vec![],
674 Endpoint::Listener,
675 ) {
676 addrs.into_iter().collect()
677 } else {
678 error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
679 return None;
680 };
681
682 let endpoint = if let Some(e) =
683 swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
684 {
685 e.clone().into()
686 } else {
687 error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
688 and debug information about {:?}", peer_id);
689 return None;
690 };
691
692 Some((
693 peer_id.to_base58(),
694 NetworkStatePeer {
695 endpoint,
696 version_string: swarm
697 .behaviour_mut()
698 .node(peer_id)
699 .and_then(|i| i.client_version().map(|s| s.to_owned())),
700 latest_ping_time: swarm
701 .behaviour_mut()
702 .node(peer_id)
703 .and_then(|i| i.latest_ping()),
704 known_addresses,
705 },
706 ))
707 })
708 .collect()
709 };
710
711 let not_connected_peers = {
712 let swarm = &mut *swarm;
713 swarm
714 .behaviour_mut()
715 .known_peers()
716 .into_iter()
717 .filter(|p| open.iter().all(|n| n != p))
718 .map(move |peer_id| {
719 let known_addresses = if let Ok(addrs) =
720 NetworkBehaviour::handle_pending_outbound_connection(
721 swarm.behaviour_mut(),
722 ConnectionId::new_unchecked(0), Some(peer_id),
724 &vec![],
725 Endpoint::Listener,
726 ) {
727 addrs.into_iter().collect()
728 } else {
729 error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
730 Default::default()
731 };
732
733 (
734 peer_id.to_base58(),
735 NetworkStateNotConnectedPeer {
736 version_string: swarm
737 .behaviour_mut()
738 .node(&peer_id)
739 .and_then(|i| i.client_version().map(|s| s.to_owned())),
740 latest_ping_time: swarm
741 .behaviour_mut()
742 .node(&peer_id)
743 .and_then(|i| i.latest_ping()),
744 known_addresses,
745 },
746 )
747 })
748 .collect()
749 };
750
751 let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
752 let listened_addresses = swarm.listeners().cloned().collect();
753 let external_addresses = swarm.external_addresses().cloned().collect();
754
755 NetworkState {
756 peer_id,
757 listened_addresses,
758 external_addresses,
759 connected_peers,
760 not_connected_peers,
761 peerset: serde_json::json!(
764 "Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
765 ),
766 }
767 }
768
769 pub fn remove_reserved_peer(&self, peer: PeerId) {
771 self.service.remove_reserved_peer(peer.into());
772 }
773
774 pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
776 self.service.add_reserved_peer(peer)
777 }
778}
779
780impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
781 pub async fn network_state(&self) -> Result<NetworkState, ()> {
788 let (tx, rx) = oneshot::channel();
789
790 let _ = self
791 .to_worker
792 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
793
794 match rx.await {
795 Ok(v) => v.map_err(|_| ()),
796 Err(_) => Err(()),
798 }
799 }
800
801 fn split_multiaddr_and_peer_id(
806 &self,
807 peers: HashSet<Multiaddr>,
808 ) -> Result<Vec<(PeerId, Multiaddr)>, String> {
809 peers
810 .into_iter()
811 .map(|mut addr| {
812 let peer = match addr.pop() {
813 Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
814 _ => return Err("Missing PeerId from address".to_string()),
815 };
816
817 if peer == self.local_peer_id {
820 Err("Local peer ID in peer set.".to_string())
821 } else {
822 Ok((peer, addr))
823 }
824 })
825 .collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
826 }
827}
828
829impl<B, H> NetworkStateInfo for NetworkService<B, H>
830where
831 B: subsoil::runtime::traits::Block,
832 H: ExHashT,
833{
834 fn external_addresses(&self) -> Vec<crate::types::multiaddr::Multiaddr> {
836 self.external_addresses.lock().iter().cloned().map(Into::into).collect()
837 }
838
839 fn listen_addresses(&self) -> Vec<crate::types::multiaddr::Multiaddr> {
841 self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
842 }
843
844 fn local_peer_id(&self) -> crate::types::PeerId {
846 self.local_peer_id.into()
847 }
848}
849
850impl<B, H> NetworkSigner for NetworkService<B, H>
851where
852 B: subsoil::runtime::traits::Block,
853 H: ExHashT,
854{
855 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
856 let public_key = self.local_identity.public();
857 let bytes = self.local_identity.sign(msg.as_ref())?;
858
859 Ok(Signature {
860 public_key: crate::service::signature::PublicKey::Libp2p(public_key),
861 bytes,
862 })
863 }
864
865 fn verify(
866 &self,
867 peer_id: crate::types::PeerId,
868 public_key: &Vec<u8>,
869 signature: &Vec<u8>,
870 message: &Vec<u8>,
871 ) -> Result<bool, String> {
872 let public_key =
873 PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
874 let peer_id: PeerId = peer_id.into();
875 let remote: libp2p::PeerId = public_key.to_peer_id();
876
877 Ok(peer_id == remote && public_key.verify(message, signature))
878 }
879}
880
881impl<B, H> NetworkDHTProvider for NetworkService<B, H>
882where
883 B: BlockT + 'static,
884 H: ExHashT,
885{
886 fn find_closest_peers(&self, target: crate::types::PeerId) {
891 let _ = self
892 .to_worker
893 .unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
894 }
895
896 fn get_value(&self, key: &KademliaKey) {
901 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
902 }
903
904 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
909 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
910 }
911
912 fn put_record_to(
913 &self,
914 record: Record,
915 peers: HashSet<crate::types::PeerId>,
916 update_local_storage: bool,
917 ) {
918 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
919 record,
920 peers,
921 update_local_storage,
922 });
923 }
924
925 fn store_record(
926 &self,
927 key: KademliaKey,
928 value: Vec<u8>,
929 publisher: Option<crate::types::PeerId>,
930 expires: Option<Instant>,
931 ) {
932 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
933 key,
934 value,
935 publisher.map(Into::into),
936 expires,
937 ));
938 }
939
940 fn start_providing(&self, key: KademliaKey) {
941 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
942 }
943
944 fn stop_providing(&self, key: KademliaKey) {
945 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
946 }
947
948 fn get_providers(&self, key: KademliaKey) {
949 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
950 }
951}
952
953#[async_trait::async_trait]
954impl<B, H> NetworkStatusProvider for NetworkService<B, H>
955where
956 B: BlockT + 'static,
957 H: ExHashT,
958{
959 async fn status(&self) -> Result<NetworkStatus, ()> {
960 let (tx, rx) = oneshot::channel();
961
962 let _ = self
963 .to_worker
964 .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
965
966 match rx.await {
967 Ok(v) => v.map_err(|_| ()),
968 Err(_) => Err(()),
970 }
971 }
972
973 async fn network_state(&self) -> Result<NetworkState, ()> {
974 let (tx, rx) = oneshot::channel();
975
976 let _ = self
977 .to_worker
978 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
979
980 match rx.await {
981 Ok(v) => v.map_err(|_| ()),
982 Err(_) => Err(()),
984 }
985 }
986}
987
988#[async_trait::async_trait]
989impl<B, H> NetworkPeers for NetworkService<B, H>
990where
991 B: BlockT + 'static,
992 H: ExHashT,
993{
994 fn set_authorized_peers(&self, peers: HashSet<crate::types::PeerId>) {
995 self.sync_protocol_handle
996 .set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
997 }
998
999 fn set_authorized_only(&self, reserved_only: bool) {
1000 self.sync_protocol_handle.set_reserved_only(reserved_only);
1001 }
1002
1003 fn add_known_address(
1004 &self,
1005 peer_id: crate::types::PeerId,
1006 addr: crate::types::multiaddr::Multiaddr,
1007 ) {
1008 let _ = self
1009 .to_worker
1010 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1011 }
1012
1013 fn report_peer(&self, peer_id: crate::types::PeerId, cost_benefit: ReputationChange) {
1014 self.peer_store_handle.report_peer(peer_id, cost_benefit);
1015 }
1016
1017 fn peer_reputation(&self, peer_id: &crate::types::PeerId) -> i32 {
1018 self.peer_store_handle.peer_reputation(peer_id)
1019 }
1020
1021 fn disconnect_peer(&self, peer_id: crate::types::PeerId, protocol: ProtocolName) {
1022 let _ = self
1023 .to_worker
1024 .unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1025 }
1026
1027 fn accept_unreserved_peers(&self) {
1028 self.sync_protocol_handle.set_reserved_only(false);
1029 }
1030
1031 fn deny_unreserved_peers(&self) {
1032 self.sync_protocol_handle.set_reserved_only(true);
1033 }
1034
1035 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1036 if peer.peer_id == crate::PeerId::from(&self.local_peer_id) {
1038 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1039 }
1040
1041 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1042 peer.peer_id.into(),
1043 peer.multiaddr.into(),
1044 ));
1045 self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1046
1047 Ok(())
1048 }
1049
1050 fn remove_reserved_peer(&self, peer_id: crate::types::PeerId) {
1051 self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1052 }
1053
1054 fn set_reserved_peers(
1055 &self,
1056 protocol: ProtocolName,
1057 peers: HashSet<crate::types::multiaddr::Multiaddr>,
1058 ) -> Result<(), String> {
1059 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1060 return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1061 };
1062
1063 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1064 let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1065
1066 let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1067
1068 for (peer_id, addr) in peers_addrs.into_iter() {
1069 if peer_id == self.local_peer_id {
1071 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1072 }
1073
1074 peers.insert(peer_id.into());
1075
1076 if !addr.is_empty() {
1077 let _ = self
1078 .to_worker
1079 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1080 }
1081 }
1082
1083 self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1084
1085 Ok(())
1086 }
1087
1088 fn add_peers_to_reserved_set(
1089 &self,
1090 protocol: ProtocolName,
1091 peers: HashSet<crate::types::multiaddr::Multiaddr>,
1092 ) -> Result<(), String> {
1093 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1094 return Err(format!(
1095 "Cannot add peers to reserved set of unknown protocol: {}",
1096 protocol
1097 ));
1098 };
1099
1100 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1101 let peers = self.split_multiaddr_and_peer_id(peers)?;
1102
1103 for (peer_id, addr) in peers.into_iter() {
1104 if peer_id == self.local_peer_id {
1106 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1107 }
1108
1109 if !addr.is_empty() {
1110 let _ = self
1111 .to_worker
1112 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1113 }
1114
1115 self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1116 }
1117
1118 Ok(())
1119 }
1120
1121 fn remove_peers_from_reserved_set(
1122 &self,
1123 protocol: ProtocolName,
1124 peers: Vec<crate::types::PeerId>,
1125 ) -> Result<(), String> {
1126 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1127 return Err(format!(
1128 "Cannot remove peers from reserved set of unknown protocol: {}",
1129 protocol
1130 ));
1131 };
1132
1133 for peer_id in peers.into_iter() {
1134 self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1135 }
1136
1137 Ok(())
1138 }
1139
1140 fn sync_num_connected(&self) -> usize {
1141 self.num_connected.load(Ordering::Relaxed)
1142 }
1143
1144 fn peer_role(&self, peer_id: crate::types::PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
1145 match Roles::decode_all(&mut &handshake[..]) {
1146 Ok(role) => Some(role.into()),
1147 Err(_) => {
1148 log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1149 self.peer_store_handle.peer_role(&(peer_id.into()))
1150 },
1151 }
1152 }
1153
1154 async fn reserved_peers(&self) -> Result<Vec<crate::types::PeerId>, ()> {
1158 let (tx, rx) = oneshot::channel();
1159
1160 self.sync_protocol_handle.reserved_peers(tx);
1161
1162 rx.await
1164 .map(|peers| peers.into_iter().map(From::from).collect())
1165 .map_err(|_| ())
1166 }
1167}
1168
1169impl<B, H> NetworkEventStream for NetworkService<B, H>
1170where
1171 B: BlockT + 'static,
1172 H: ExHashT,
1173{
1174 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1175 let (tx, rx) = out_events::channel(name, 100_000);
1176 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1177 Box::pin(rx)
1178 }
1179}
1180
1181#[async_trait::async_trait]
1182impl<B, H> NetworkRequest for NetworkService<B, H>
1183where
1184 B: BlockT + 'static,
1185 H: ExHashT,
1186{
1187 async fn request(
1188 &self,
1189 target: crate::types::PeerId,
1190 protocol: ProtocolName,
1191 request: Vec<u8>,
1192 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1193 connect: IfDisconnected,
1194 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1195 let (tx, rx) = oneshot::channel();
1196
1197 self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1198
1199 match rx.await {
1200 Ok(v) => v,
1201 Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1205 }
1206 }
1207
1208 fn start_request(
1209 &self,
1210 target: crate::types::PeerId,
1211 protocol: ProtocolName,
1212 request: Vec<u8>,
1213 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1214 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1215 connect: IfDisconnected,
1216 ) {
1217 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1218 target: target.into(),
1219 protocol: protocol.into(),
1220 request,
1221 fallback_request,
1222 pending_response: tx,
1223 connect,
1224 });
1225 }
1226}
1227
1228#[must_use]
1230pub struct NotificationSender {
1231 sink: NotificationsSink,
1232
1233 protocol_name: ProtocolName,
1235
1236 notification_size_metric: Option<Histogram>,
1239}
1240
1241#[async_trait::async_trait]
1242impl NotificationSenderT for NotificationSender {
1243 async fn ready(
1244 &self,
1245 ) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1246 Ok(Box::new(NotificationSenderReady {
1247 ready: match self.sink.reserve_notification().await {
1248 Ok(r) => Some(r),
1249 Err(()) => return Err(NotificationSenderError::Closed),
1250 },
1251 peer_id: self.sink.peer_id(),
1252 protocol_name: &self.protocol_name,
1253 notification_size_metric: self.notification_size_metric.clone(),
1254 }))
1255 }
1256}
1257
1258#[must_use]
1260pub struct NotificationSenderReady<'a> {
1261 ready: Option<Ready<'a>>,
1262
1263 peer_id: &'a PeerId,
1265
1266 protocol_name: &'a ProtocolName,
1268
1269 notification_size_metric: Option<Histogram>,
1272}
1273
1274impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1275 fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1276 if let Some(notification_size_metric) = &self.notification_size_metric {
1277 notification_size_metric.observe(notification.len() as f64);
1278 }
1279
1280 trace!(
1281 target: LOG_TARGET,
1282 "External API => Notification({:?}, {}, {} bytes)",
1283 self.peer_id, self.protocol_name, notification.len(),
1284 );
1285 trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1286
1287 self.ready
1288 .take()
1289 .ok_or(NotificationSenderError::Closed)?
1290 .send(notification)
1291 .map_err(|()| NotificationSenderError::Closed)
1292 }
1293}
1294
1295enum ServiceToWorkerMsg {
1299 FindClosestPeers(PeerId),
1300 GetValue(KademliaKey),
1301 PutValue(KademliaKey, Vec<u8>),
1302 PutRecordTo {
1303 record: Record,
1304 peers: HashSet<crate::types::PeerId>,
1305 update_local_storage: bool,
1306 },
1307 StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1308 StartProviding(KademliaKey),
1309 StopProviding(KademliaKey),
1310 GetProviders(KademliaKey),
1311 AddKnownAddress(PeerId, Multiaddr),
1312 EventStream(out_events::Sender),
1313 Request {
1314 target: PeerId,
1315 protocol: ProtocolName,
1316 request: Vec<u8>,
1317 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1318 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1319 connect: IfDisconnected,
1320 },
1321 NetworkStatus {
1322 pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1323 },
1324 NetworkState {
1325 pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1326 },
1327 DisconnectPeer(PeerId, ProtocolName),
1328}
1329
1330#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1334pub struct NetworkWorker<B, H>
1335where
1336 B: BlockT + 'static,
1337 H: ExHashT,
1338{
1339 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1341 num_connected: Arc<AtomicUsize>,
1343 service: Arc<NetworkService<B, H>>,
1345 network_service: Swarm<Behaviour<B>>,
1347 from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1349 event_streams: out_events::OutChannels,
1351 metrics: Option<Metrics>,
1353 boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1355 reported_invalid_boot_nodes: HashSet<PeerId>,
1357 peer_store_handle: Arc<dyn PeerStoreProvider>,
1359 notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1361 _marker: PhantomData<H>,
1364 _block: PhantomData<B>,
1366}
1367
1368impl<B, H> NetworkWorker<B, H>
1369where
1370 B: BlockT + 'static,
1371 H: ExHashT,
1372{
1373 pub async fn run(mut self) {
1375 while self.next_action().await {}
1376 }
1377
1378 pub async fn next_action(&mut self) -> bool {
1383 futures::select! {
1384 msg = self.from_service.next() => {
1386 if let Some(msg) = msg {
1387 self.handle_worker_message(msg);
1388 } else {
1389 return false
1390 }
1391 },
1392 event = self.network_service.select_next_some() => {
1394 self.handle_swarm_event(event);
1395 },
1396 };
1397
1398 let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1400 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1401
1402 if let Some(metrics) = self.metrics.as_ref() {
1403 if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1404 for (lower_ilog2_bucket_bound, num_entries) in buckets {
1405 metrics
1406 .kbuckets_num_nodes
1407 .with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1408 .set(num_entries as u64);
1409 }
1410 }
1411 if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1412 metrics.kademlia_records_count.set(num_entries as u64);
1413 }
1414 if let Some(num_entries) =
1415 self.network_service.behaviour_mut().kademlia_records_total_size()
1416 {
1417 metrics.kademlia_records_sizes_total.set(num_entries as u64);
1418 }
1419
1420 metrics.pending_connections.set(
1421 Swarm::network_info(&self.network_service).connection_counters().num_pending()
1422 as u64,
1423 );
1424 }
1425
1426 true
1427 }
1428
1429 fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1431 match msg {
1432 ServiceToWorkerMsg::FindClosestPeers(target) => {
1433 self.network_service.behaviour_mut().find_closest_peers(target)
1434 },
1435 ServiceToWorkerMsg::GetValue(key) => {
1436 self.network_service.behaviour_mut().get_value(key.into())
1437 },
1438 ServiceToWorkerMsg::PutValue(key, value) => {
1439 self.network_service.behaviour_mut().put_value(key.into(), value)
1440 },
1441 ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1442 .network_service
1443 .behaviour_mut()
1444 .put_record_to(record.into(), peers, update_local_storage),
1445 ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1446 .network_service
1447 .behaviour_mut()
1448 .store_record(key.into(), value, publisher, expires),
1449 ServiceToWorkerMsg::StartProviding(key) => {
1450 self.network_service.behaviour_mut().start_providing(key.into())
1451 },
1452 ServiceToWorkerMsg::StopProviding(key) => {
1453 self.network_service.behaviour_mut().stop_providing(&key.into())
1454 },
1455 ServiceToWorkerMsg::GetProviders(key) => {
1456 self.network_service.behaviour_mut().get_providers(key.into())
1457 },
1458 ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1459 self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1460 },
1461 ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1462 ServiceToWorkerMsg::Request {
1463 target,
1464 protocol,
1465 request,
1466 fallback_request,
1467 pending_response,
1468 connect,
1469 } => {
1470 self.network_service.behaviour_mut().send_request(
1471 &target,
1472 protocol,
1473 request,
1474 fallback_request,
1475 pending_response,
1476 connect,
1477 );
1478 },
1479 ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1480 let _ = pending_response.send(Ok(self.status()));
1481 },
1482 ServiceToWorkerMsg::NetworkState { pending_response } => {
1483 let _ = pending_response.send(Ok(self.network_state()));
1484 },
1485 ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1486 .network_service
1487 .behaviour_mut()
1488 .user_protocol_mut()
1489 .disconnect_peer(&who, protocol_name),
1490 }
1491 }
1492
1493 fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1495 match event {
1496 SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1497 if let Some(metrics) = self.metrics.as_ref() {
1498 match result {
1499 Ok(serve_time) => {
1500 metrics
1501 .requests_in_success_total
1502 .with_label_values(&[&protocol])
1503 .observe(serve_time.as_secs_f64());
1504 },
1505 Err(err) => {
1506 let reason = match err {
1507 ResponseFailure::Network(InboundFailure::Timeout) => {
1508 Some("timeout")
1509 },
1510 ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1511 {
1516 None
1517 },
1518 ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1519 Some("busy-omitted")
1520 },
1521 ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1522 Some("connection-closed")
1523 },
1524 ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1525 };
1526
1527 if let Some(reason) = reason {
1528 metrics
1529 .requests_in_failure_total
1530 .with_label_values(&[&protocol, reason])
1531 .inc();
1532 }
1533 },
1534 }
1535 }
1536 },
1537 SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1538 protocol,
1539 duration,
1540 result,
1541 ..
1542 }) => {
1543 if let Some(metrics) = self.metrics.as_ref() {
1544 match result {
1545 Ok(_) => {
1546 metrics
1547 .requests_out_success_total
1548 .with_label_values(&[&protocol])
1549 .observe(duration.as_secs_f64());
1550 },
1551 Err(err) => {
1552 let reason = match err {
1553 RequestFailure::NotConnected => "not-connected",
1554 RequestFailure::UnknownProtocol => "unknown-protocol",
1555 RequestFailure::Refused => "refused",
1556 RequestFailure::Obsolete => "obsolete",
1557 RequestFailure::Network(OutboundFailure::DialFailure) => {
1558 "dial-failure"
1559 },
1560 RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1561 RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1562 "connection-closed"
1563 },
1564 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1565 "unsupported"
1566 },
1567 RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1568 };
1569
1570 metrics
1571 .requests_out_failure_total
1572 .with_label_values(&[&protocol, reason])
1573 .inc();
1574 },
1575 }
1576 }
1577 },
1578 SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1579 for change in changes {
1580 self.peer_store_handle.report_peer(peer.into(), change);
1581 }
1582 },
1583 SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1584 peer_id,
1585 info:
1586 IdentifyInfo {
1587 protocol_version, agent_version, mut listen_addrs, protocols, ..
1588 },
1589 }) => {
1590 if listen_addrs.len() > 30 {
1591 debug!(
1592 target: LOG_TARGET,
1593 "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1594 peer_id, protocol_version, agent_version
1595 );
1596 listen_addrs.truncate(30);
1597 }
1598 for addr in listen_addrs {
1599 self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1600 &peer_id,
1601 &protocols,
1602 addr.clone(),
1603 );
1604 }
1605 self.peer_store_handle.add_known_peer(peer_id.into());
1606 },
1607 SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1608 self.peer_store_handle.add_known_peer(peer_id.into());
1609 },
1610 SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1611 if let Some(metrics) = self.metrics.as_ref() {
1612 metrics.kademlia_random_queries_total.inc();
1613 }
1614 },
1615 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1616 remote,
1617 set_id,
1618 direction,
1619 negotiated_fallback,
1620 notifications_sink,
1621 received_handshake,
1622 }) => {
1623 let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1624 remote,
1625 direction,
1626 received_handshake,
1627 negotiated_fallback,
1628 notifications_sink,
1629 );
1630 },
1631 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1632 remote,
1633 set_id,
1634 notifications_sink,
1635 }) => {
1636 let _ = self.notif_protocol_handles[usize::from(set_id)]
1637 .report_notification_sink_replaced(remote, notifications_sink);
1638
1639 },
1660 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1661 let _ = self.notif_protocol_handles[usize::from(set_id)]
1662 .report_substream_closed(remote);
1663 },
1664 SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1665 remote,
1666 set_id,
1667 notification,
1668 }) => {
1669 let _ = self.notif_protocol_handles[usize::from(set_id)]
1670 .report_notification_received(remote, notification);
1671 },
1672 SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1673 match (self.metrics.as_ref(), duration) {
1674 (Some(metrics), Some(duration)) => {
1675 let query_type = match event {
1676 DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1677 DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1678 DhtEvent::ValueFound(_) => "value-found",
1679 DhtEvent::ValueNotFound(_) => "value-not-found",
1680 DhtEvent::ValuePut(_) => "value-put",
1681 DhtEvent::ValuePutFailed(_) => "value-put-failed",
1682 DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1683 DhtEvent::StartedProviding(_) => "started-providing",
1684 DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1685 DhtEvent::ProvidersFound(_, _) => "providers-found",
1686 DhtEvent::NoMoreProviders(_) => "no-more-providers",
1687 DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1688 };
1689 metrics
1690 .kademlia_query_duration
1691 .with_label_values(&[query_type])
1692 .observe(duration.as_secs_f64());
1693 },
1694 _ => {},
1695 }
1696
1697 self.event_streams.send(Event::Dht(event));
1698 },
1699 SwarmEvent::Behaviour(BehaviourOut::None) => {
1700 },
1702 SwarmEvent::ConnectionEstablished {
1703 peer_id,
1704 endpoint,
1705 num_established,
1706 concurrent_dial_errors,
1707 ..
1708 } => {
1709 if let Some(errors) = concurrent_dial_errors {
1710 debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1711 } else {
1712 debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1713 }
1714
1715 if let Some(metrics) = self.metrics.as_ref() {
1716 let direction = match endpoint {
1717 ConnectedPoint::Dialer { .. } => "out",
1718 ConnectedPoint::Listener { .. } => "in",
1719 };
1720 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1721
1722 if num_established.get() == 1 {
1723 metrics.distinct_peers_connections_opened_total.inc();
1724 }
1725 }
1726 },
1727 SwarmEvent::ConnectionClosed {
1728 connection_id,
1729 peer_id,
1730 cause,
1731 endpoint,
1732 num_established,
1733 } => {
1734 debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1735 if let Some(metrics) = self.metrics.as_ref() {
1736 let direction = match endpoint {
1737 ConnectedPoint::Dialer { .. } => "out",
1738 ConnectedPoint::Listener { .. } => "in",
1739 };
1740 let reason = match cause {
1741 Some(ConnectionError::IO(_)) => "transport-error",
1742 Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1743 None => "actively-closed",
1744 };
1745 metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1746
1747 if num_established == 0 {
1749 metrics.distinct_peers_connections_closed_total.inc();
1750 }
1751 }
1752 },
1753 SwarmEvent::NewListenAddr { address, .. } => {
1754 trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1755 if let Some(metrics) = self.metrics.as_ref() {
1756 metrics.listeners_local_addresses.inc();
1757 }
1758 self.listen_addresses.lock().insert(address.clone());
1759 },
1760 SwarmEvent::ExpiredListenAddr { address, .. } => {
1761 info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1762 if let Some(metrics) = self.metrics.as_ref() {
1763 metrics.listeners_local_addresses.dec();
1764 }
1765 self.listen_addresses.lock().remove(&address);
1766 },
1767 SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1768 if let Some(peer_id) = peer_id {
1769 trace!(
1770 target: LOG_TARGET,
1771 "Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1772 );
1773
1774 let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1775
1776 if let Some(addresses) =
1777 not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1778 {
1779 if let DialError::WrongPeerId { obtained, endpoint } = &error {
1780 if let ConnectedPoint::Dialer {
1781 address,
1782 role_override: _,
1783 port_use: _,
1784 } = endpoint
1785 {
1786 let address_without_peer_id = parse_addr(address.clone().into())
1787 .map_or_else(|_| address.clone(), |r| r.1.into());
1788
1789 if addresses.iter().any(|a| address_without_peer_id == *a) {
1793 warn!(
1794 "💔 The bootnode you want to connect to at `{address}` provided a \
1795 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1796 );
1797
1798 self.reported_invalid_boot_nodes.insert(peer_id);
1799 }
1800 }
1801 }
1802 }
1803 }
1804
1805 if let Some(metrics) = self.metrics.as_ref() {
1806 let reason = match error {
1807 DialError::Denied { cause } => {
1808 if cause.downcast::<Exceeded>().is_ok() {
1809 Some("limit-reached")
1810 } else {
1811 None
1812 }
1813 },
1814 DialError::LocalPeerId { .. } => Some("local-peer-id"),
1815 DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1816 DialError::Transport(_) => Some("transport-error"),
1817 DialError::NoAddresses
1818 | DialError::DialPeerConditionFalse(_)
1819 | DialError::Aborted => None, };
1821 if let Some(reason) = reason {
1822 metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1823 }
1824 }
1825 },
1826 SwarmEvent::Dialing { connection_id, peer_id } => {
1827 trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1828 },
1829 SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1830 trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1831 if let Some(metrics) = self.metrics.as_ref() {
1832 metrics.incoming_connections_total.inc();
1833 }
1834 },
1835 SwarmEvent::IncomingConnectionError {
1836 connection_id,
1837 local_addr,
1838 send_back_addr,
1839 error,
1840 } => {
1841 debug!(
1842 target: LOG_TARGET,
1843 "Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1844 );
1845 if let Some(metrics) = self.metrics.as_ref() {
1846 let reason = match error {
1847 ListenError::Denied { cause } => {
1848 if cause.downcast::<Exceeded>().is_ok() {
1849 Some("limit-reached")
1850 } else {
1851 None
1852 }
1853 },
1854 ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1855 Some("invalid-peer-id")
1856 },
1857 ListenError::Transport(_) => Some("transport-error"),
1858 ListenError::Aborted => None, };
1860
1861 if let Some(reason) = reason {
1862 metrics
1863 .incoming_connections_errors_total
1864 .with_label_values(&[reason])
1865 .inc();
1866 }
1867 }
1868 },
1869 SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1870 if let Some(metrics) = self.metrics.as_ref() {
1871 metrics.listeners_local_addresses.sub(addresses.len() as u64);
1872 }
1873 let mut listen_addresses = self.listen_addresses.lock();
1874 for addr in &addresses {
1875 listen_addresses.remove(addr);
1876 }
1877 drop(listen_addresses);
1878
1879 let addrs =
1880 addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1881 match reason {
1882 Ok(()) => error!(
1883 target: LOG_TARGET,
1884 "📪 Libp2p listener ({}) closed gracefully",
1885 addrs
1886 ),
1887 Err(e) => error!(
1888 target: LOG_TARGET,
1889 "📪 Libp2p listener ({}) closed: {}",
1890 addrs, e
1891 ),
1892 }
1893 },
1894 SwarmEvent::ListenerError { error, .. } => {
1895 debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1896 if let Some(metrics) = self.metrics.as_ref() {
1897 metrics.listeners_errors_total.inc();
1898 }
1899 },
1900 SwarmEvent::NewExternalAddrCandidate { address } => {
1901 trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1902 },
1903 SwarmEvent::ExternalAddrConfirmed { address } => {
1904 trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1905 },
1906 SwarmEvent::ExternalAddrExpired { address } => {
1907 trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1908 },
1909 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1910 trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1911 },
1912 event => {
1913 warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1914 },
1915 }
1916 }
1917}
1918
1919impl<B, H> Unpin for NetworkWorker<B, H>
1920where
1921 B: BlockT + 'static,
1922 H: ExHashT,
1923{
1924}
1925
1926pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1927 addresses: impl Iterator<Item = &'a crate::types::multiaddr::Multiaddr>,
1928 transport: &TransportConfig,
1929) -> Result<(), Error> {
1930 use crate::types::multiaddr::Protocol;
1931
1932 if matches!(transport, TransportConfig::MemoryOnly) {
1933 let addresses: Vec<_> = addresses
1934 .filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1935 .cloned()
1936 .collect();
1937
1938 if !addresses.is_empty() {
1939 return Err(Error::AddressesForAnotherTransport {
1940 transport: transport.clone(),
1941 addresses,
1942 });
1943 }
1944 } else {
1945 let addresses: Vec<_> = addresses
1946 .filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1947 .cloned()
1948 .collect();
1949
1950 if !addresses.is_empty() {
1951 return Err(Error::AddressesForAnotherTransport {
1952 transport: transport.clone(),
1953 addresses,
1954 });
1955 }
1956 }
1957
1958 Ok(())
1959}