1use crate::{
31 behaviour::{self, Behaviour, BehaviourOut},
32 bitswap::BitswapRequestHandler,
33 config::{
34 parse_addr, FullNetworkConfiguration, IncomingRequest, MultiaddrWithPeerId,
35 NonDefaultSetConfig, NotificationHandshake, Params, SetConfig, TransportConfig,
36 },
37 discovery::DiscoveryConfig,
38 error::Error,
39 event::{DhtEvent, Event},
40 network_state::{
41 NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
42 },
43 peer_store::{PeerStore, PeerStoreProvider},
44 protocol::{self, Protocol, Ready},
45 protocol_controller::{self, ProtoSetConfig, ProtocolController, SetId},
46 request_responses::{IfDisconnected, ProtocolConfig as RequestResponseConfig, RequestFailure},
47 service::{
48 signature::{Signature, SigningError},
49 traits::{
50 BandwidthSink, NetworkBackend, NetworkDHTProvider, NetworkEventStream, NetworkPeers,
51 NetworkRequest, NetworkService as NetworkServiceT, NetworkSigner, NetworkStateInfo,
52 NetworkStatus, NetworkStatusProvider, NotificationSender as NotificationSenderT,
53 NotificationSenderError, NotificationSenderReady as NotificationSenderReadyT,
54 },
55 },
56 transport,
57 types::ProtocolName,
58 NotificationService, ReputationChange,
59};
60
61use codec::DecodeAll;
62use futures::{channel::oneshot, prelude::*};
63use libp2p::{
64 connection_limits::{ConnectionLimits, Exceeded},
65 core::{upgrade, ConnectedPoint, Endpoint},
66 identify::Info as IdentifyInfo,
67 identity::ed25519,
68 multiaddr::{self, Multiaddr},
69 swarm::{
70 Config as SwarmConfig, ConnectionError, ConnectionId, DialError, Executor, ListenError,
71 NetworkBehaviour, Swarm, SwarmEvent,
72 },
73 PeerId,
74};
75use log::{debug, error, info, trace, warn};
76use metrics::{Histogram, MetricSources, Metrics};
77use parking_lot::Mutex;
78use prometheus_endpoint::Registry;
79use sc_network_types::kad::{Key as KademliaKey, Record};
80
81use sc_client_api::BlockBackend;
82use sc_network_common::{
83 role::{ObservedRole, Roles},
84 ExHashT,
85};
86use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
87use sp_runtime::traits::Block as BlockT;
88
89pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure};
90pub use libp2p::identity::{DecodingError, Keypair, PublicKey};
91pub use metrics::NotificationMetrics;
92pub use protocol::NotificationsSink;
93use std::{
94 collections::{HashMap, HashSet},
95 fs, iter,
96 marker::PhantomData,
97 num::NonZeroUsize,
98 pin::Pin,
99 str,
100 sync::{
101 atomic::{AtomicUsize, Ordering},
102 Arc,
103 },
104 time::{Duration, Instant},
105};
106
107pub(crate) mod metrics;
108pub(crate) mod out_events;
109
110pub mod signature;
111pub mod traits;
112
113const LOG_TARGET: &str = "sub-libp2p";
115
116struct Libp2pBandwidthSink {
117 #[allow(deprecated)]
118 sink: Arc<transport::BandwidthSinks>,
119}
120
121impl BandwidthSink for Libp2pBandwidthSink {
122 fn total_inbound(&self) -> u64 {
123 self.sink.total_inbound()
124 }
125
126 fn total_outbound(&self) -> u64 {
127 self.sink.total_outbound()
128 }
129}
130
131pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
133 num_connected: Arc<AtomicUsize>,
135 external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
137 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
139 local_peer_id: PeerId,
141 local_identity: Keypair,
143 bandwidth: Arc<dyn BandwidthSink>,
145 to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
147 notification_protocol_ids: HashMap<ProtocolName, SetId>,
150 protocol_handles: Vec<protocol_controller::ProtocolHandle>,
153 sync_protocol_handle: protocol_controller::ProtocolHandle,
155 peer_store_handle: Arc<dyn PeerStoreProvider>,
157 _marker: PhantomData<H>,
160 _block: PhantomData<B>,
162}
163
164#[async_trait::async_trait]
165impl<B, H> NetworkBackend<B, H> for NetworkWorker<B, H>
166where
167 B: BlockT + 'static,
168 H: ExHashT,
169{
170 type NotificationProtocolConfig = NonDefaultSetConfig;
171 type RequestResponseProtocolConfig = RequestResponseConfig;
172 type NetworkService<Block, Hash> = Arc<NetworkService<B, H>>;
173 type PeerStore = PeerStore;
174 type BitswapConfig = RequestResponseConfig;
175
176 fn new(params: Params<B, H, Self>) -> Result<Self, Error>
177 where
178 Self: Sized,
179 {
180 NetworkWorker::new(params)
181 }
182
183 fn network_service(&self) -> Arc<dyn NetworkServiceT> {
185 self.service.clone()
186 }
187
188 fn peer_store(
190 bootnodes: Vec<sc_network_types::PeerId>,
191 metrics_registry: Option<Registry>,
192 ) -> Self::PeerStore {
193 PeerStore::new(bootnodes.into_iter().map(From::from).collect(), metrics_registry)
194 }
195
196 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics {
197 NotificationMetrics::new(registry)
198 }
199
200 fn bitswap_server(
201 client: Arc<dyn BlockBackend<B> + Send + Sync>,
202 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig) {
203 let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
204
205 (Box::pin(async move { handler.run().await }), protocol_config)
206 }
207
208 fn notification_config(
210 protocol_name: ProtocolName,
211 fallback_names: Vec<ProtocolName>,
212 max_notification_size: u64,
213 handshake: Option<NotificationHandshake>,
214 set_config: SetConfig,
215 _metrics: NotificationMetrics,
216 _peerstore_handle: Arc<dyn PeerStoreProvider>,
217 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>) {
218 NonDefaultSetConfig::new(
219 protocol_name,
220 fallback_names,
221 max_notification_size,
222 handshake,
223 set_config,
224 )
225 }
226
227 fn request_response_config(
229 protocol_name: ProtocolName,
230 fallback_names: Vec<ProtocolName>,
231 max_request_size: u64,
232 max_response_size: u64,
233 request_timeout: Duration,
234 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
235 ) -> Self::RequestResponseProtocolConfig {
236 Self::RequestResponseProtocolConfig {
237 name: protocol_name,
238 fallback_names,
239 max_request_size,
240 max_response_size,
241 request_timeout,
242 inbound_queue,
243 }
244 }
245
246 async fn run(mut self) {
248 self.run().await
249 }
250}
251
252impl<B, H> NetworkWorker<B, H>
253where
254 B: BlockT + 'static,
255 H: ExHashT,
256{
257 pub fn new(params: Params<B, H, Self>) -> Result<Self, Error> {
263 let peer_store_handle = params.network_config.peer_store_handle();
264 let FullNetworkConfiguration {
265 notification_protocols,
266 request_response_protocols,
267 mut network_config,
268 ..
269 } = params.network_config;
270
271 let local_identity = network_config.node_key.clone().into_keypair()?;
273 let local_public = local_identity.public();
274 let local_peer_id = local_public.to_peer_id();
275
276 let local_identity: ed25519::Keypair = local_identity.into();
278 let local_public: ed25519::PublicKey = local_public.into();
279 let local_peer_id: PeerId = local_peer_id.into();
280
281 network_config.boot_nodes = network_config
282 .boot_nodes
283 .into_iter()
284 .filter(|boot_node| boot_node.peer_id != local_peer_id.into())
285 .collect();
286 network_config.default_peers_set.reserved_nodes = network_config
287 .default_peers_set
288 .reserved_nodes
289 .into_iter()
290 .filter(|reserved_node| {
291 if reserved_node.peer_id == local_peer_id.into() {
292 warn!(
293 target: LOG_TARGET,
294 "Local peer ID used in reserved node, ignoring: {}",
295 reserved_node,
296 );
297 false
298 } else {
299 true
300 }
301 })
302 .collect();
303
304 ensure_addresses_consistent_with_transport(
306 network_config.listen_addresses.iter(),
307 &network_config.transport,
308 )?;
309 ensure_addresses_consistent_with_transport(
310 network_config.boot_nodes.iter().map(|x| &x.multiaddr),
311 &network_config.transport,
312 )?;
313 ensure_addresses_consistent_with_transport(
314 network_config.default_peers_set.reserved_nodes.iter().map(|x| &x.multiaddr),
315 &network_config.transport,
316 )?;
317 for notification_protocol in ¬ification_protocols {
318 ensure_addresses_consistent_with_transport(
319 notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
320 &network_config.transport,
321 )?;
322 }
323 ensure_addresses_consistent_with_transport(
324 network_config.public_addresses.iter(),
325 &network_config.transport,
326 )?;
327
328 let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);
329
330 if let Some(path) = &network_config.net_config_path {
331 fs::create_dir_all(path)?;
332 }
333
334 info!(
335 target: LOG_TARGET,
336 "🏷 Local node identity is: {}",
337 local_peer_id.to_base58(),
338 );
339 info!(target: LOG_TARGET, "Running libp2p network backend");
340
341 let (transport, bandwidth) = {
342 let config_mem = match network_config.transport {
343 TransportConfig::MemoryOnly => true,
344 TransportConfig::Normal { .. } => false,
345 };
346
347 transport::build_transport(local_identity.clone().into(), config_mem)
348 };
349
350 let (to_notifications, from_protocol_controllers) =
351 tracing_unbounded("mpsc_protocol_controllers_to_notifications", 10_000);
352
353 let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
355 .chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
356
357 let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
358 .enumerate()
359 .map(|(set_id, set_config)| {
360 let proto_set_config = ProtoSetConfig {
361 in_peers: set_config.in_peers,
362 out_peers: set_config.out_peers,
363 reserved_nodes: set_config
364 .reserved_nodes
365 .iter()
366 .map(|node| node.peer_id.into())
367 .collect(),
368 reserved_only: set_config.non_reserved_mode.is_reserved_only(),
369 };
370
371 ProtocolController::new(
372 SetId::from(set_id),
373 proto_set_config,
374 to_notifications.clone(),
375 Arc::clone(&peer_store_handle),
376 )
377 })
378 .unzip();
379
380 let sync_protocol_handle = protocol_handles[0].clone();
382
383 protocol_controllers
385 .into_iter()
386 .for_each(|controller| (params.executor)(controller.run().boxed()));
387
388 let notification_protocol_ids: HashMap<ProtocolName, SetId> =
391 iter::once(¶ms.block_announce_config)
392 .chain(notification_protocols.iter())
393 .enumerate()
394 .map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
395 .collect();
396
397 let known_addresses = {
398 let mut addresses: Vec<_> = network_config
400 .default_peers_set
401 .reserved_nodes
402 .iter()
403 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
404 .chain(notification_protocols.iter().flat_map(|protocol| {
405 protocol
406 .set_config()
407 .reserved_nodes
408 .iter()
409 .map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
410 }))
411 .chain(
412 network_config
413 .boot_nodes
414 .iter()
415 .map(|bootnode| (bootnode.peer_id, bootnode.multiaddr.clone())),
416 )
417 .collect();
418
419 addresses.sort();
421 addresses.dedup();
422
423 addresses
424 };
425
426 network_config.boot_nodes.iter().try_for_each(|bootnode| {
428 if let Some(other) = network_config
429 .boot_nodes
430 .iter()
431 .filter(|o| o.multiaddr == bootnode.multiaddr)
432 .find(|o| o.peer_id != bootnode.peer_id)
433 {
434 Err(Error::DuplicateBootnode {
435 address: bootnode.multiaddr.clone().into(),
436 first_id: bootnode.peer_id.into(),
437 second_id: other.peer_id.into(),
438 })
439 } else {
440 Ok(())
441 }
442 })?;
443
444 let mut boot_node_ids = HashMap::<PeerId, Vec<Multiaddr>>::new();
446
447 for bootnode in network_config.boot_nodes.iter() {
448 boot_node_ids
449 .entry(bootnode.peer_id.into())
450 .or_default()
451 .push(bootnode.multiaddr.clone().into());
452 }
453
454 let boot_node_ids = Arc::new(boot_node_ids);
455
456 let num_connected = Arc::new(AtomicUsize::new(0));
457 let external_addresses = Arc::new(Mutex::new(HashSet::new()));
458
459 let (protocol, notif_protocol_handles) = Protocol::new(
460 From::from(¶ms.role),
461 params.notification_metrics,
462 notification_protocols,
463 params.block_announce_config,
464 Arc::clone(&peer_store_handle),
465 protocol_handles.clone(),
466 from_protocol_controllers,
467 )?;
468
469 let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
471 let user_agent =
472 format!("{} ({})", network_config.client_version, network_config.node_name);
473
474 let discovery_config = {
475 let mut config = DiscoveryConfig::new(local_peer_id);
476 config.with_permanent_addresses(
477 known_addresses
478 .iter()
479 .map(|(peer, address)| (peer.into(), address.clone().into()))
480 .collect::<Vec<_>>(),
481 );
482 config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15);
483 config.with_kademlia(
484 params.genesis_hash,
485 params.fork_id.as_deref(),
486 ¶ms.protocol_id,
487 );
488 config.with_dht_random_walk(network_config.enable_dht_random_walk);
489 config.allow_non_globals_in_dht(network_config.allow_non_globals_in_dht);
490 config.use_kademlia_disjoint_query_paths(
491 network_config.kademlia_disjoint_query_paths,
492 );
493 config.with_kademlia_replication_factor(network_config.kademlia_replication_factor);
494
495 match network_config.transport {
496 TransportConfig::MemoryOnly => {
497 config.with_mdns(false);
498 config.allow_private_ip(false);
499 },
500 TransportConfig::Normal {
501 enable_mdns,
502 allow_private_ip: allow_private_ipv4,
503 ..
504 } => {
505 config.with_mdns(enable_mdns);
506 config.allow_private_ip(allow_private_ipv4);
507 },
508 }
509
510 config
511 };
512
513 let behaviour = {
514 let result = Behaviour::new(
515 protocol,
516 user_agent,
517 local_public.into(),
518 discovery_config,
519 request_response_protocols,
520 Arc::clone(&peer_store_handle),
521 external_addresses.clone(),
522 network_config.public_addresses.iter().cloned().map(Into::into).collect(),
523 ConnectionLimits::default()
524 .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
525 .with_max_established_incoming(Some(
526 crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING,
527 )),
528 );
529
530 match result {
531 Ok(b) => b,
532 Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => {
533 return Err(Error::DuplicateRequestResponseProtocol { protocol: proto })
534 },
535 }
536 };
537
538 let swarm = {
539 struct SpawnImpl<F>(F);
540 impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
541 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
542 (self.0)(f)
543 }
544 }
545
546 let config = SwarmConfig::with_executor(SpawnImpl(params.executor))
547 .with_substream_upgrade_protocol_override(upgrade::Version::V1)
548 .with_notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed"))
549 .with_per_connection_event_buffer_size(24)
552 .with_max_negotiating_inbound_streams(2048)
553 .with_idle_connection_timeout(network_config.idle_connection_timeout);
554
555 Swarm::new(transport, behaviour, local_peer_id, config)
556 };
557
558 (swarm, Arc::new(Libp2pBandwidthSink { sink: bandwidth }))
559 };
560
561 let metrics = match ¶ms.metrics_registry {
563 Some(registry) => Some(metrics::register(
564 registry,
565 MetricSources {
566 bandwidth: bandwidth.clone(),
567 connected_peers: num_connected.clone(),
568 },
569 )?),
570 None => None,
571 };
572
573 for addr in &network_config.listen_addresses {
575 if let Err(err) = Swarm::<Behaviour<B>>::listen_on(&mut swarm, addr.clone().into()) {
576 warn!(target: LOG_TARGET, "Can't listen on {} because: {:?}", addr, err)
577 }
578 }
579
580 for addr in &network_config.public_addresses {
582 Swarm::<Behaviour<B>>::add_external_address(&mut swarm, addr.clone().into());
583 }
584
585 let listen_addresses_set = Arc::new(Mutex::new(HashSet::new()));
586
587 let service = Arc::new(NetworkService {
588 bandwidth,
589 external_addresses,
590 listen_addresses: listen_addresses_set.clone(),
591 num_connected: num_connected.clone(),
592 local_peer_id,
593 local_identity: local_identity.into(),
594 to_worker,
595 notification_protocol_ids,
596 protocol_handles,
597 sync_protocol_handle,
598 peer_store_handle: Arc::clone(&peer_store_handle),
599 _marker: PhantomData,
600 _block: Default::default(),
601 });
602
603 Ok(NetworkWorker {
604 listen_addresses: listen_addresses_set,
605 num_connected,
606 network_service: swarm,
607 service,
608 from_service,
609 event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
610 metrics,
611 boot_node_ids,
612 reported_invalid_boot_nodes: Default::default(),
613 peer_store_handle: Arc::clone(&peer_store_handle),
614 notif_protocol_handles,
615 _marker: Default::default(),
616 _block: Default::default(),
617 })
618 }
619
620 pub fn status(&self) -> NetworkStatus {
622 NetworkStatus {
623 num_connected_peers: self.num_connected_peers(),
624 total_bytes_inbound: self.total_bytes_inbound(),
625 total_bytes_outbound: self.total_bytes_outbound(),
626 }
627 }
628
629 pub fn total_bytes_inbound(&self) -> u64 {
631 self.service.bandwidth.total_inbound()
632 }
633
634 pub fn total_bytes_outbound(&self) -> u64 {
636 self.service.bandwidth.total_outbound()
637 }
638
639 pub fn num_connected_peers(&self) -> usize {
641 self.network_service.behaviour().user_protocol().num_sync_peers()
642 }
643
644 pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
646 self.network_service.behaviour_mut().add_known_address(peer_id, addr);
647 }
648
649 pub fn service(&self) -> &Arc<NetworkService<B, H>> {
652 &self.service
653 }
654
655 pub fn local_peer_id(&self) -> &PeerId {
657 Swarm::<Behaviour<B>>::local_peer_id(&self.network_service)
658 }
659
660 pub fn listen_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
664 Swarm::<Behaviour<B>>::listeners(&self.network_service)
665 }
666
667 pub fn network_state(&mut self) -> NetworkState {
672 let swarm = &mut self.network_service;
673 let open = swarm.behaviour_mut().user_protocol().open_peers().cloned().collect::<Vec<_>>();
674 let connected_peers = {
675 let swarm = &mut *swarm;
676 open.iter()
677 .filter_map(move |peer_id| {
678 let known_addresses = if let Ok(addrs) =
679 NetworkBehaviour::handle_pending_outbound_connection(
680 swarm.behaviour_mut(),
681 ConnectionId::new_unchecked(0), Some(*peer_id),
683 &vec![],
684 Endpoint::Listener,
685 ) {
686 addrs.into_iter().collect()
687 } else {
688 error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
689 return None;
690 };
691
692 let endpoint = if let Some(e) =
693 swarm.behaviour_mut().node(peer_id).and_then(|i| i.endpoint())
694 {
695 e.clone().into()
696 } else {
697 error!(target: LOG_TARGET, "Found state inconsistency between custom protocol \
698 and debug information about {:?}", peer_id);
699 return None;
700 };
701
702 Some((
703 peer_id.to_base58(),
704 NetworkStatePeer {
705 endpoint,
706 version_string: swarm
707 .behaviour_mut()
708 .node(peer_id)
709 .and_then(|i| i.client_version().map(|s| s.to_owned())),
710 latest_ping_time: swarm
711 .behaviour_mut()
712 .node(peer_id)
713 .and_then(|i| i.latest_ping()),
714 known_addresses,
715 },
716 ))
717 })
718 .collect()
719 };
720
721 let not_connected_peers = {
722 let swarm = &mut *swarm;
723 swarm
724 .behaviour_mut()
725 .known_peers()
726 .into_iter()
727 .filter(|p| open.iter().all(|n| n != p))
728 .map(move |peer_id| {
729 let known_addresses = if let Ok(addrs) =
730 NetworkBehaviour::handle_pending_outbound_connection(
731 swarm.behaviour_mut(),
732 ConnectionId::new_unchecked(0), Some(peer_id),
734 &vec![],
735 Endpoint::Listener,
736 ) {
737 addrs.into_iter().collect()
738 } else {
739 error!(target: LOG_TARGET, "Was not able to get known addresses for {:?}", peer_id);
740 Default::default()
741 };
742
743 (
744 peer_id.to_base58(),
745 NetworkStateNotConnectedPeer {
746 version_string: swarm
747 .behaviour_mut()
748 .node(&peer_id)
749 .and_then(|i| i.client_version().map(|s| s.to_owned())),
750 latest_ping_time: swarm
751 .behaviour_mut()
752 .node(&peer_id)
753 .and_then(|i| i.latest_ping()),
754 known_addresses,
755 },
756 )
757 })
758 .collect()
759 };
760
761 let peer_id = Swarm::<Behaviour<B>>::local_peer_id(swarm).to_base58();
762 let listened_addresses = swarm.listeners().cloned().collect();
763 let external_addresses = swarm.external_addresses().cloned().collect();
764
765 NetworkState {
766 peer_id,
767 listened_addresses,
768 external_addresses,
769 connected_peers,
770 not_connected_peers,
771 peerset: serde_json::json!(
774 "Unimplemented. See https://github.com/paritytech/substrate/issues/14160."
775 ),
776 }
777 }
778
779 pub fn remove_reserved_peer(&self, peer: PeerId) {
781 self.service.remove_reserved_peer(peer.into());
782 }
783
784 pub fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
786 self.service.add_reserved_peer(peer)
787 }
788}
789
790impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
791 pub async fn network_state(&self) -> Result<NetworkState, ()> {
798 let (tx, rx) = oneshot::channel();
799
800 let _ = self
801 .to_worker
802 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
803
804 match rx.await {
805 Ok(v) => v.map_err(|_| ()),
806 Err(_) => Err(()),
808 }
809 }
810
811 fn split_multiaddr_and_peer_id(
816 &self,
817 peers: HashSet<Multiaddr>,
818 ) -> Result<Vec<(PeerId, Multiaddr)>, String> {
819 peers
820 .into_iter()
821 .map(|mut addr| {
822 let peer = match addr.pop() {
823 Some(multiaddr::Protocol::P2p(peer_id)) => peer_id,
824 _ => return Err("Missing PeerId from address".to_string()),
825 };
826
827 if peer == self.local_peer_id {
830 Err("Local peer ID in peer set.".to_string())
831 } else {
832 Ok((peer, addr))
833 }
834 })
835 .collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()
836 }
837}
838
839impl<B, H> NetworkStateInfo for NetworkService<B, H>
840where
841 B: sp_runtime::traits::Block,
842 H: ExHashT,
843{
844 fn external_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
846 self.external_addresses.lock().iter().cloned().map(Into::into).collect()
847 }
848
849 fn listen_addresses(&self) -> Vec<sc_network_types::multiaddr::Multiaddr> {
851 self.listen_addresses.lock().iter().cloned().map(Into::into).collect()
852 }
853
854 fn local_peer_id(&self) -> sc_network_types::PeerId {
856 self.local_peer_id.into()
857 }
858}
859
860impl<B, H> NetworkSigner for NetworkService<B, H>
861where
862 B: sp_runtime::traits::Block,
863 H: ExHashT,
864{
865 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
866 let public_key = self.local_identity.public();
867 let bytes = self.local_identity.sign(msg.as_ref())?;
868
869 Ok(Signature {
870 public_key: crate::service::signature::PublicKey::Libp2p(public_key),
871 bytes,
872 })
873 }
874
875 fn verify(
876 &self,
877 peer_id: sc_network_types::PeerId,
878 public_key: &Vec<u8>,
879 signature: &Vec<u8>,
880 message: &Vec<u8>,
881 ) -> Result<bool, String> {
882 let public_key =
883 PublicKey::try_decode_protobuf(&public_key).map_err(|error| error.to_string())?;
884 let peer_id: PeerId = peer_id.into();
885 let remote: libp2p::PeerId = public_key.to_peer_id();
886
887 Ok(peer_id == remote && public_key.verify(message, signature))
888 }
889}
890
891impl<B, H> NetworkDHTProvider for NetworkService<B, H>
892where
893 B: BlockT + 'static,
894 H: ExHashT,
895{
896 fn find_closest_peers(&self, target: sc_network_types::PeerId) {
901 let _ = self
902 .to_worker
903 .unbounded_send(ServiceToWorkerMsg::FindClosestPeers(target.into()));
904 }
905
906 fn get_value(&self, key: &KademliaKey) {
911 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
912 }
913
914 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
919 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
920 }
921
922 fn put_record_to(
923 &self,
924 record: Record,
925 peers: HashSet<sc_network_types::PeerId>,
926 update_local_storage: bool,
927 ) {
928 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutRecordTo {
929 record,
930 peers,
931 update_local_storage,
932 });
933 }
934
935 fn store_record(
936 &self,
937 key: KademliaKey,
938 value: Vec<u8>,
939 publisher: Option<sc_network_types::PeerId>,
940 expires: Option<Instant>,
941 ) {
942 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord(
943 key,
944 value,
945 publisher.map(Into::into),
946 expires,
947 ));
948 }
949
950 fn start_providing(&self, key: KademliaKey) {
951 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key));
952 }
953
954 fn stop_providing(&self, key: KademliaKey) {
955 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key));
956 }
957
958 fn get_providers(&self, key: KademliaKey) {
959 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key));
960 }
961}
962
963#[async_trait::async_trait]
964impl<B, H> NetworkStatusProvider for NetworkService<B, H>
965where
966 B: BlockT + 'static,
967 H: ExHashT,
968{
969 async fn status(&self) -> Result<NetworkStatus, ()> {
970 let (tx, rx) = oneshot::channel();
971
972 let _ = self
973 .to_worker
974 .unbounded_send(ServiceToWorkerMsg::NetworkStatus { pending_response: tx });
975
976 match rx.await {
977 Ok(v) => v.map_err(|_| ()),
978 Err(_) => Err(()),
980 }
981 }
982
983 async fn network_state(&self) -> Result<NetworkState, ()> {
984 let (tx, rx) = oneshot::channel();
985
986 let _ = self
987 .to_worker
988 .unbounded_send(ServiceToWorkerMsg::NetworkState { pending_response: tx });
989
990 match rx.await {
991 Ok(v) => v.map_err(|_| ()),
992 Err(_) => Err(()),
994 }
995 }
996}
997
998#[async_trait::async_trait]
999impl<B, H> NetworkPeers for NetworkService<B, H>
1000where
1001 B: BlockT + 'static,
1002 H: ExHashT,
1003{
1004 fn set_authorized_peers(&self, peers: HashSet<sc_network_types::PeerId>) {
1005 self.sync_protocol_handle
1006 .set_reserved_peers(peers.iter().map(|peer| (*peer).into()).collect());
1007 }
1008
1009 fn set_authorized_only(&self, reserved_only: bool) {
1010 self.sync_protocol_handle.set_reserved_only(reserved_only);
1011 }
1012
1013 fn add_known_address(
1014 &self,
1015 peer_id: sc_network_types::PeerId,
1016 addr: sc_network_types::multiaddr::Multiaddr,
1017 ) {
1018 let _ = self
1019 .to_worker
1020 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id.into(), addr.into()));
1021 }
1022
1023 fn report_peer(&self, peer_id: sc_network_types::PeerId, cost_benefit: ReputationChange) {
1024 self.peer_store_handle.report_peer(peer_id, cost_benefit);
1025 }
1026
1027 fn peer_reputation(&self, peer_id: &sc_network_types::PeerId) -> i32 {
1028 self.peer_store_handle.peer_reputation(peer_id)
1029 }
1030
1031 fn disconnect_peer(&self, peer_id: sc_network_types::PeerId, protocol: ProtocolName) {
1032 let _ = self
1033 .to_worker
1034 .unbounded_send(ServiceToWorkerMsg::DisconnectPeer(peer_id.into(), protocol));
1035 }
1036
1037 fn accept_unreserved_peers(&self) {
1038 self.sync_protocol_handle.set_reserved_only(false);
1039 }
1040
1041 fn deny_unreserved_peers(&self) {
1042 self.sync_protocol_handle.set_reserved_only(true);
1043 }
1044
1045 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
1046 if peer.peer_id == self.local_peer_id.into() {
1048 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1049 }
1050
1051 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(
1052 peer.peer_id.into(),
1053 peer.multiaddr.into(),
1054 ));
1055 self.sync_protocol_handle.add_reserved_peer(peer.peer_id.into());
1056
1057 Ok(())
1058 }
1059
1060 fn remove_reserved_peer(&self, peer_id: sc_network_types::PeerId) {
1061 self.sync_protocol_handle.remove_reserved_peer(peer_id.into());
1062 }
1063
1064 fn set_reserved_peers(
1065 &self,
1066 protocol: ProtocolName,
1067 peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1068 ) -> Result<(), String> {
1069 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1070 return Err(format!("Cannot set reserved peers for unknown protocol: {}", protocol));
1071 };
1072
1073 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1074 let peers_addrs = self.split_multiaddr_and_peer_id(peers)?;
1075
1076 let mut peers: HashSet<PeerId> = HashSet::with_capacity(peers_addrs.len());
1077
1078 for (peer_id, addr) in peers_addrs.into_iter() {
1079 if peer_id == self.local_peer_id {
1081 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1082 }
1083
1084 peers.insert(peer_id.into());
1085
1086 if !addr.is_empty() {
1087 let _ = self
1088 .to_worker
1089 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1090 }
1091 }
1092
1093 self.protocol_handles[usize::from(*set_id)].set_reserved_peers(peers);
1094
1095 Ok(())
1096 }
1097
1098 fn add_peers_to_reserved_set(
1099 &self,
1100 protocol: ProtocolName,
1101 peers: HashSet<sc_network_types::multiaddr::Multiaddr>,
1102 ) -> Result<(), String> {
1103 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1104 return Err(format!(
1105 "Cannot add peers to reserved set of unknown protocol: {}",
1106 protocol
1107 ));
1108 };
1109
1110 let peers: HashSet<Multiaddr> = peers.into_iter().map(Into::into).collect();
1111 let peers = self.split_multiaddr_and_peer_id(peers)?;
1112
1113 for (peer_id, addr) in peers.into_iter() {
1114 if peer_id == self.local_peer_id {
1116 return Err("Local peer ID cannot be added as a reserved peer.".to_string());
1117 }
1118
1119 if !addr.is_empty() {
1120 let _ = self
1121 .to_worker
1122 .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
1123 }
1124
1125 self.protocol_handles[usize::from(*set_id)].add_reserved_peer(peer_id);
1126 }
1127
1128 Ok(())
1129 }
1130
1131 fn remove_peers_from_reserved_set(
1132 &self,
1133 protocol: ProtocolName,
1134 peers: Vec<sc_network_types::PeerId>,
1135 ) -> Result<(), String> {
1136 let Some(set_id) = self.notification_protocol_ids.get(&protocol) else {
1137 return Err(format!(
1138 "Cannot remove peers from reserved set of unknown protocol: {}",
1139 protocol
1140 ));
1141 };
1142
1143 for peer_id in peers.into_iter() {
1144 self.protocol_handles[usize::from(*set_id)].remove_reserved_peer(peer_id.into());
1145 }
1146
1147 Ok(())
1148 }
1149
1150 fn sync_num_connected(&self) -> usize {
1151 self.num_connected.load(Ordering::Relaxed)
1152 }
1153
1154 fn peer_role(
1155 &self,
1156 peer_id: sc_network_types::PeerId,
1157 handshake: Vec<u8>,
1158 ) -> Option<ObservedRole> {
1159 match Roles::decode_all(&mut &handshake[..]) {
1160 Ok(role) => Some(role.into()),
1161 Err(_) => {
1162 log::debug!(target: LOG_TARGET, "handshake doesn't contain peer role: {handshake:?}");
1163 self.peer_store_handle.peer_role(&(peer_id.into()))
1164 },
1165 }
1166 }
1167
1168 async fn reserved_peers(&self) -> Result<Vec<sc_network_types::PeerId>, ()> {
1172 let (tx, rx) = oneshot::channel();
1173
1174 self.sync_protocol_handle.reserved_peers(tx);
1175
1176 rx.await
1178 .map(|peers| peers.into_iter().map(From::from).collect())
1179 .map_err(|_| ())
1180 }
1181}
1182
1183impl<B, H> NetworkEventStream for NetworkService<B, H>
1184where
1185 B: BlockT + 'static,
1186 H: ExHashT,
1187{
1188 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
1189 let (tx, rx) = out_events::channel(name, 100_000);
1190 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
1191 Box::pin(rx)
1192 }
1193}
1194
1195#[async_trait::async_trait]
1196impl<B, H> NetworkRequest for NetworkService<B, H>
1197where
1198 B: BlockT + 'static,
1199 H: ExHashT,
1200{
1201 async fn request(
1202 &self,
1203 target: sc_network_types::PeerId,
1204 protocol: ProtocolName,
1205 request: Vec<u8>,
1206 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1207 connect: IfDisconnected,
1208 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure> {
1209 let (tx, rx) = oneshot::channel();
1210
1211 self.start_request(target.into(), protocol, request, fallback_request, tx, connect);
1212
1213 match rx.await {
1214 Ok(v) => v,
1215 Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)),
1219 }
1220 }
1221
1222 fn start_request(
1223 &self,
1224 target: sc_network_types::PeerId,
1225 protocol: ProtocolName,
1226 request: Vec<u8>,
1227 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1228 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1229 connect: IfDisconnected,
1230 ) {
1231 let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
1232 target: target.into(),
1233 protocol: protocol.into(),
1234 request,
1235 fallback_request,
1236 pending_response: tx,
1237 connect,
1238 });
1239 }
1240}
1241
1242#[must_use]
1244pub struct NotificationSender {
1245 sink: NotificationsSink,
1246
1247 protocol_name: ProtocolName,
1249
1250 notification_size_metric: Option<Histogram>,
1253}
1254
1255#[async_trait::async_trait]
1256impl NotificationSenderT for NotificationSender {
1257 async fn ready(
1258 &self,
1259 ) -> Result<Box<dyn NotificationSenderReadyT + '_>, NotificationSenderError> {
1260 Ok(Box::new(NotificationSenderReady {
1261 ready: match self.sink.reserve_notification().await {
1262 Ok(r) => Some(r),
1263 Err(()) => return Err(NotificationSenderError::Closed),
1264 },
1265 peer_id: self.sink.peer_id(),
1266 protocol_name: &self.protocol_name,
1267 notification_size_metric: self.notification_size_metric.clone(),
1268 }))
1269 }
1270}
1271
1272#[must_use]
1274pub struct NotificationSenderReady<'a> {
1275 ready: Option<Ready<'a>>,
1276
1277 peer_id: &'a PeerId,
1279
1280 protocol_name: &'a ProtocolName,
1282
1283 notification_size_metric: Option<Histogram>,
1286}
1287
1288impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> {
1289 fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError> {
1290 if let Some(notification_size_metric) = &self.notification_size_metric {
1291 notification_size_metric.observe(notification.len() as f64);
1292 }
1293
1294 trace!(
1295 target: LOG_TARGET,
1296 "External API => Notification({:?}, {}, {} bytes)",
1297 self.peer_id, self.protocol_name, notification.len(),
1298 );
1299 trace!(target: LOG_TARGET, "Handler({:?}) <= Async notification", self.peer_id);
1300
1301 self.ready
1302 .take()
1303 .ok_or(NotificationSenderError::Closed)?
1304 .send(notification)
1305 .map_err(|()| NotificationSenderError::Closed)
1306 }
1307}
1308
1309enum ServiceToWorkerMsg {
1313 FindClosestPeers(PeerId),
1314 GetValue(KademliaKey),
1315 PutValue(KademliaKey, Vec<u8>),
1316 PutRecordTo {
1317 record: Record,
1318 peers: HashSet<sc_network_types::PeerId>,
1319 update_local_storage: bool,
1320 },
1321 StoreRecord(KademliaKey, Vec<u8>, Option<PeerId>, Option<Instant>),
1322 StartProviding(KademliaKey),
1323 StopProviding(KademliaKey),
1324 GetProviders(KademliaKey),
1325 AddKnownAddress(PeerId, Multiaddr),
1326 EventStream(out_events::Sender),
1327 Request {
1328 target: PeerId,
1329 protocol: ProtocolName,
1330 request: Vec<u8>,
1331 fallback_request: Option<(Vec<u8>, ProtocolName)>,
1332 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
1333 connect: IfDisconnected,
1334 },
1335 NetworkStatus {
1336 pending_response: oneshot::Sender<Result<NetworkStatus, RequestFailure>>,
1337 },
1338 NetworkState {
1339 pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
1340 },
1341 DisconnectPeer(PeerId, ProtocolName),
1342}
1343
1344#[must_use = "The NetworkWorker must be polled in order for the network to advance"]
1348pub struct NetworkWorker<B, H>
1349where
1350 B: BlockT + 'static,
1351 H: ExHashT,
1352{
1353 listen_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
1355 num_connected: Arc<AtomicUsize>,
1357 service: Arc<NetworkService<B, H>>,
1359 network_service: Swarm<Behaviour<B>>,
1361 from_service: TracingUnboundedReceiver<ServiceToWorkerMsg>,
1363 event_streams: out_events::OutChannels,
1365 metrics: Option<Metrics>,
1367 boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
1369 reported_invalid_boot_nodes: HashSet<PeerId>,
1371 peer_store_handle: Arc<dyn PeerStoreProvider>,
1373 notif_protocol_handles: Vec<protocol::ProtocolHandle>,
1375 _marker: PhantomData<H>,
1378 _block: PhantomData<B>,
1380}
1381
1382impl<B, H> NetworkWorker<B, H>
1383where
1384 B: BlockT + 'static,
1385 H: ExHashT,
1386{
1387 pub async fn run(mut self) {
1389 while self.next_action().await {}
1390 }
1391
1392 pub async fn next_action(&mut self) -> bool {
1397 futures::select! {
1398 msg = self.from_service.next() => {
1400 if let Some(msg) = msg {
1401 self.handle_worker_message(msg);
1402 } else {
1403 return false
1404 }
1405 },
1406 event = self.network_service.select_next_some() => {
1408 self.handle_swarm_event(event);
1409 },
1410 };
1411
1412 let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
1414 self.num_connected.store(num_connected_peers, Ordering::Relaxed);
1415
1416 if let Some(metrics) = self.metrics.as_ref() {
1417 if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() {
1418 for (lower_ilog2_bucket_bound, num_entries) in buckets {
1419 metrics
1420 .kbuckets_num_nodes
1421 .with_label_values(&[&lower_ilog2_bucket_bound.to_string()])
1422 .set(num_entries as u64);
1423 }
1424 }
1425 if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() {
1426 metrics.kademlia_records_count.set(num_entries as u64);
1427 }
1428 if let Some(num_entries) =
1429 self.network_service.behaviour_mut().kademlia_records_total_size()
1430 {
1431 metrics.kademlia_records_sizes_total.set(num_entries as u64);
1432 }
1433
1434 metrics.pending_connections.set(
1435 Swarm::network_info(&self.network_service).connection_counters().num_pending()
1436 as u64,
1437 );
1438 }
1439
1440 true
1441 }
1442
1443 fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) {
1445 match msg {
1446 ServiceToWorkerMsg::FindClosestPeers(target) => {
1447 self.network_service.behaviour_mut().find_closest_peers(target)
1448 },
1449 ServiceToWorkerMsg::GetValue(key) => {
1450 self.network_service.behaviour_mut().get_value(key.into())
1451 },
1452 ServiceToWorkerMsg::PutValue(key, value) => {
1453 self.network_service.behaviour_mut().put_value(key.into(), value)
1454 },
1455 ServiceToWorkerMsg::PutRecordTo { record, peers, update_local_storage } => self
1456 .network_service
1457 .behaviour_mut()
1458 .put_record_to(record.into(), peers, update_local_storage),
1459 ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self
1460 .network_service
1461 .behaviour_mut()
1462 .store_record(key.into(), value, publisher, expires),
1463 ServiceToWorkerMsg::StartProviding(key) => {
1464 self.network_service.behaviour_mut().start_providing(key.into())
1465 },
1466 ServiceToWorkerMsg::StopProviding(key) => {
1467 self.network_service.behaviour_mut().stop_providing(&key.into())
1468 },
1469 ServiceToWorkerMsg::GetProviders(key) => {
1470 self.network_service.behaviour_mut().get_providers(key.into())
1471 },
1472 ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => {
1473 self.network_service.behaviour_mut().add_known_address(peer_id, addr)
1474 },
1475 ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender),
1476 ServiceToWorkerMsg::Request {
1477 target,
1478 protocol,
1479 request,
1480 fallback_request,
1481 pending_response,
1482 connect,
1483 } => {
1484 self.network_service.behaviour_mut().send_request(
1485 &target,
1486 protocol,
1487 request,
1488 fallback_request,
1489 pending_response,
1490 connect,
1491 );
1492 },
1493 ServiceToWorkerMsg::NetworkStatus { pending_response } => {
1494 let _ = pending_response.send(Ok(self.status()));
1495 },
1496 ServiceToWorkerMsg::NetworkState { pending_response } => {
1497 let _ = pending_response.send(Ok(self.network_state()));
1498 },
1499 ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self
1500 .network_service
1501 .behaviour_mut()
1502 .user_protocol_mut()
1503 .disconnect_peer(&who, protocol_name),
1504 }
1505 }
1506
1507 fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut>) {
1509 match event {
1510 SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => {
1511 if let Some(metrics) = self.metrics.as_ref() {
1512 match result {
1513 Ok(serve_time) => {
1514 metrics
1515 .requests_in_success_total
1516 .with_label_values(&[&protocol])
1517 .observe(serve_time.as_secs_f64());
1518 },
1519 Err(err) => {
1520 let reason = match err {
1521 ResponseFailure::Network(InboundFailure::Timeout) => {
1522 Some("timeout")
1523 },
1524 ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
1525 {
1530 None
1531 },
1532 ResponseFailure::Network(InboundFailure::ResponseOmission) => {
1533 Some("busy-omitted")
1534 },
1535 ResponseFailure::Network(InboundFailure::ConnectionClosed) => {
1536 Some("connection-closed")
1537 },
1538 ResponseFailure::Network(InboundFailure::Io(_)) => Some("io"),
1539 };
1540
1541 if let Some(reason) = reason {
1542 metrics
1543 .requests_in_failure_total
1544 .with_label_values(&[&protocol, reason])
1545 .inc();
1546 }
1547 },
1548 }
1549 }
1550 },
1551 SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
1552 protocol,
1553 duration,
1554 result,
1555 ..
1556 }) => {
1557 if let Some(metrics) = self.metrics.as_ref() {
1558 match result {
1559 Ok(_) => {
1560 metrics
1561 .requests_out_success_total
1562 .with_label_values(&[&protocol])
1563 .observe(duration.as_secs_f64());
1564 },
1565 Err(err) => {
1566 let reason = match err {
1567 RequestFailure::NotConnected => "not-connected",
1568 RequestFailure::UnknownProtocol => "unknown-protocol",
1569 RequestFailure::Refused => "refused",
1570 RequestFailure::Obsolete => "obsolete",
1571 RequestFailure::Network(OutboundFailure::DialFailure) => {
1572 "dial-failure"
1573 },
1574 RequestFailure::Network(OutboundFailure::Timeout) => "timeout",
1575 RequestFailure::Network(OutboundFailure::ConnectionClosed) => {
1576 "connection-closed"
1577 },
1578 RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
1579 "unsupported"
1580 },
1581 RequestFailure::Network(OutboundFailure::Io(_)) => "io",
1582 };
1583
1584 metrics
1585 .requests_out_failure_total
1586 .with_label_values(&[&protocol, reason])
1587 .inc();
1588 },
1589 }
1590 }
1591 },
1592 SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => {
1593 for change in changes {
1594 self.peer_store_handle.report_peer(peer.into(), change);
1595 }
1596 },
1597 SwarmEvent::Behaviour(BehaviourOut::PeerIdentify {
1598 peer_id,
1599 info:
1600 IdentifyInfo {
1601 protocol_version, agent_version, mut listen_addrs, protocols, ..
1602 },
1603 }) => {
1604 if listen_addrs.len() > 30 {
1605 debug!(
1606 target: LOG_TARGET,
1607 "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}",
1608 peer_id, protocol_version, agent_version
1609 );
1610 listen_addrs.truncate(30);
1611 }
1612 for addr in listen_addrs {
1613 self.network_service.behaviour_mut().add_self_reported_address_to_dht(
1614 &peer_id,
1615 &protocols,
1616 addr.clone(),
1617 );
1618 }
1619 self.peer_store_handle.add_known_peer(peer_id.into());
1620 },
1621 SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => {
1622 self.peer_store_handle.add_known_peer(peer_id.into());
1623 },
1624 SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => {
1625 if let Some(metrics) = self.metrics.as_ref() {
1626 metrics.kademlia_random_queries_total.inc();
1627 }
1628 },
1629 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
1630 remote,
1631 set_id,
1632 direction,
1633 negotiated_fallback,
1634 notifications_sink,
1635 received_handshake,
1636 }) => {
1637 let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
1638 remote,
1639 direction,
1640 received_handshake,
1641 negotiated_fallback,
1642 notifications_sink,
1643 );
1644 },
1645 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
1646 remote,
1647 set_id,
1648 notifications_sink,
1649 }) => {
1650 let _ = self.notif_protocol_handles[usize::from(set_id)]
1651 .report_notification_sink_replaced(remote, notifications_sink);
1652
1653 },
1674 SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
1675 let _ = self.notif_protocol_handles[usize::from(set_id)]
1676 .report_substream_closed(remote);
1677 },
1678 SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
1679 remote,
1680 set_id,
1681 notification,
1682 }) => {
1683 let _ = self.notif_protocol_handles[usize::from(set_id)]
1684 .report_notification_received(remote, notification);
1685 },
1686 SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
1687 match (self.metrics.as_ref(), duration) {
1688 (Some(metrics), Some(duration)) => {
1689 let query_type = match event {
1690 DhtEvent::ClosestPeersFound(_, _) => "peers-found",
1691 DhtEvent::ClosestPeersNotFound(_) => "peers-not-found",
1692 DhtEvent::ValueFound(_) => "value-found",
1693 DhtEvent::ValueNotFound(_) => "value-not-found",
1694 DhtEvent::ValuePut(_) => "value-put",
1695 DhtEvent::ValuePutFailed(_) => "value-put-failed",
1696 DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request",
1697 DhtEvent::StartedProviding(_) => "started-providing",
1698 DhtEvent::StartProvidingFailed(_) => "start-providing-failed",
1699 DhtEvent::ProvidersFound(_, _) => "providers-found",
1700 DhtEvent::NoMoreProviders(_) => "no-more-providers",
1701 DhtEvent::ProvidersNotFound(_) => "providers-not-found",
1702 };
1703 metrics
1704 .kademlia_query_duration
1705 .with_label_values(&[query_type])
1706 .observe(duration.as_secs_f64());
1707 },
1708 _ => {},
1709 }
1710
1711 self.event_streams.send(Event::Dht(event));
1712 },
1713 SwarmEvent::Behaviour(BehaviourOut::None) => {
1714 },
1716 SwarmEvent::ConnectionEstablished {
1717 peer_id,
1718 endpoint,
1719 num_established,
1720 concurrent_dial_errors,
1721 ..
1722 } => {
1723 if let Some(errors) = concurrent_dial_errors {
1724 debug!(target: LOG_TARGET, "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors);
1725 } else {
1726 debug!(target: LOG_TARGET, "Libp2p => Connected({:?})", peer_id);
1727 }
1728
1729 if let Some(metrics) = self.metrics.as_ref() {
1730 let direction = match endpoint {
1731 ConnectedPoint::Dialer { .. } => "out",
1732 ConnectedPoint::Listener { .. } => "in",
1733 };
1734 metrics.connections_opened_total.with_label_values(&[direction]).inc();
1735
1736 if num_established.get() == 1 {
1737 metrics.distinct_peers_connections_opened_total.inc();
1738 }
1739 }
1740 },
1741 SwarmEvent::ConnectionClosed {
1742 connection_id,
1743 peer_id,
1744 cause,
1745 endpoint,
1746 num_established,
1747 } => {
1748 debug!(target: LOG_TARGET, "Libp2p => Disconnected({peer_id:?} via {connection_id:?}, {cause:?})");
1749 if let Some(metrics) = self.metrics.as_ref() {
1750 let direction = match endpoint {
1751 ConnectedPoint::Dialer { .. } => "out",
1752 ConnectedPoint::Listener { .. } => "in",
1753 };
1754 let reason = match cause {
1755 Some(ConnectionError::IO(_)) => "transport-error",
1756 Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout",
1757 None => "actively-closed",
1758 };
1759 metrics.connections_closed_total.with_label_values(&[direction, reason]).inc();
1760
1761 if num_established == 0 {
1763 metrics.distinct_peers_connections_closed_total.inc();
1764 }
1765 }
1766 },
1767 SwarmEvent::NewListenAddr { address, .. } => {
1768 trace!(target: LOG_TARGET, "Libp2p => NewListenAddr({})", address);
1769 if let Some(metrics) = self.metrics.as_ref() {
1770 metrics.listeners_local_addresses.inc();
1771 }
1772 self.listen_addresses.lock().insert(address.clone());
1773 },
1774 SwarmEvent::ExpiredListenAddr { address, .. } => {
1775 info!(target: LOG_TARGET, "📪 No longer listening on {}", address);
1776 if let Some(metrics) = self.metrics.as_ref() {
1777 metrics.listeners_local_addresses.dec();
1778 }
1779 self.listen_addresses.lock().remove(&address);
1780 },
1781 SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => {
1782 if let Some(peer_id) = peer_id {
1783 trace!(
1784 target: LOG_TARGET,
1785 "Libp2p => Failed to reach {peer_id:?} via {connection_id:?}: {error}",
1786 );
1787
1788 let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id);
1789
1790 if let Some(addresses) =
1791 not_reported.then(|| self.boot_node_ids.get(&peer_id)).flatten()
1792 {
1793 if let DialError::WrongPeerId { obtained, endpoint } = &error {
1794 if let ConnectedPoint::Dialer {
1795 address,
1796 role_override: _,
1797 port_use: _,
1798 } = endpoint
1799 {
1800 let address_without_peer_id = parse_addr(address.clone().into())
1801 .map_or_else(|_| address.clone(), |r| r.1.into());
1802
1803 if addresses.iter().any(|a| address_without_peer_id == *a) {
1807 warn!(
1808 "💔 The bootnode you want to connect to at `{address}` provided a \
1809 different peer ID `{obtained}` than the one you expect `{peer_id}`.",
1810 );
1811
1812 self.reported_invalid_boot_nodes.insert(peer_id);
1813 }
1814 }
1815 }
1816 }
1817 }
1818
1819 if let Some(metrics) = self.metrics.as_ref() {
1820 let reason = match error {
1821 DialError::Denied { cause } => {
1822 if cause.downcast::<Exceeded>().is_ok() {
1823 Some("limit-reached")
1824 } else {
1825 None
1826 }
1827 },
1828 DialError::LocalPeerId { .. } => Some("local-peer-id"),
1829 DialError::WrongPeerId { .. } => Some("invalid-peer-id"),
1830 DialError::Transport(_) => Some("transport-error"),
1831 DialError::NoAddresses |
1832 DialError::DialPeerConditionFalse(_) |
1833 DialError::Aborted => None, };
1835 if let Some(reason) = reason {
1836 metrics.pending_connections_errors_total.with_label_values(&[reason]).inc();
1837 }
1838 }
1839 },
1840 SwarmEvent::Dialing { connection_id, peer_id } => {
1841 trace!(target: LOG_TARGET, "Libp2p => Dialing({peer_id:?}) via {connection_id:?}")
1842 },
1843 SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => {
1844 trace!(target: LOG_TARGET, "Libp2p => IncomingConnection({local_addr},{send_back_addr} via {connection_id:?}))");
1845 if let Some(metrics) = self.metrics.as_ref() {
1846 metrics.incoming_connections_total.inc();
1847 }
1848 },
1849 SwarmEvent::IncomingConnectionError {
1850 connection_id,
1851 local_addr,
1852 send_back_addr,
1853 error,
1854 } => {
1855 debug!(
1856 target: LOG_TARGET,
1857 "Libp2p => IncomingConnectionError({local_addr},{send_back_addr} via {connection_id:?}): {error}"
1858 );
1859 if let Some(metrics) = self.metrics.as_ref() {
1860 let reason = match error {
1861 ListenError::Denied { cause } => {
1862 if cause.downcast::<Exceeded>().is_ok() {
1863 Some("limit-reached")
1864 } else {
1865 None
1866 }
1867 },
1868 ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => {
1869 Some("invalid-peer-id")
1870 },
1871 ListenError::Transport(_) => Some("transport-error"),
1872 ListenError::Aborted => None, };
1874
1875 if let Some(reason) = reason {
1876 metrics
1877 .incoming_connections_errors_total
1878 .with_label_values(&[reason])
1879 .inc();
1880 }
1881 }
1882 },
1883 SwarmEvent::ListenerClosed { reason, addresses, .. } => {
1884 if let Some(metrics) = self.metrics.as_ref() {
1885 metrics.listeners_local_addresses.sub(addresses.len() as u64);
1886 }
1887 let mut listen_addresses = self.listen_addresses.lock();
1888 for addr in &addresses {
1889 listen_addresses.remove(addr);
1890 }
1891 drop(listen_addresses);
1892
1893 let addrs =
1894 addresses.into_iter().map(|a| a.to_string()).collect::<Vec<_>>().join(", ");
1895 match reason {
1896 Ok(()) => error!(
1897 target: LOG_TARGET,
1898 "📪 Libp2p listener ({}) closed gracefully",
1899 addrs
1900 ),
1901 Err(e) => error!(
1902 target: LOG_TARGET,
1903 "📪 Libp2p listener ({}) closed: {}",
1904 addrs, e
1905 ),
1906 }
1907 },
1908 SwarmEvent::ListenerError { error, .. } => {
1909 debug!(target: LOG_TARGET, "Libp2p => ListenerError: {}", error);
1910 if let Some(metrics) = self.metrics.as_ref() {
1911 metrics.listeners_errors_total.inc();
1912 }
1913 },
1914 SwarmEvent::NewExternalAddrCandidate { address } => {
1915 trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrCandidate: {address:?}");
1916 },
1917 SwarmEvent::ExternalAddrConfirmed { address } => {
1918 trace!(target: LOG_TARGET, "Libp2p => ExternalAddrConfirmed: {address:?}");
1919 },
1920 SwarmEvent::ExternalAddrExpired { address } => {
1921 trace!(target: LOG_TARGET, "Libp2p => ExternalAddrExpired: {address:?}");
1922 },
1923 SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
1924 trace!(target: LOG_TARGET, "Libp2p => NewExternalAddrOfPeer({peer_id:?}): {address:?}")
1925 },
1926 event => {
1927 warn!(target: LOG_TARGET, "New unknown SwarmEvent libp2p event: {event:?}");
1928 },
1929 }
1930 }
1931}
1932
1933impl<B, H> Unpin for NetworkWorker<B, H>
1934where
1935 B: BlockT + 'static,
1936 H: ExHashT,
1937{
1938}
1939
1940pub(crate) fn ensure_addresses_consistent_with_transport<'a>(
1941 addresses: impl Iterator<Item = &'a sc_network_types::multiaddr::Multiaddr>,
1942 transport: &TransportConfig,
1943) -> Result<(), Error> {
1944 use sc_network_types::multiaddr::Protocol;
1945
1946 if matches!(transport, TransportConfig::MemoryOnly) {
1947 let addresses: Vec<_> = addresses
1948 .filter(|x| x.iter().any(|y| !matches!(y, Protocol::Memory(_))))
1949 .cloned()
1950 .collect();
1951
1952 if !addresses.is_empty() {
1953 return Err(Error::AddressesForAnotherTransport {
1954 transport: transport.clone(),
1955 addresses,
1956 });
1957 }
1958 } else {
1959 let addresses: Vec<_> = addresses
1960 .filter(|x| x.iter().any(|y| matches!(y, Protocol::Memory(_))))
1961 .cloned()
1962 .collect();
1963
1964 if !addresses.is_empty() {
1965 return Err(Error::AddressesForAnotherTransport {
1966 transport: transport.clone(),
1967 addresses,
1968 });
1969 }
1970 }
1971
1972 Ok(())
1973}