1use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7 PeerStore,
8 types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11 disconnect_message::DisconnectMessageProtocol,
12 discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13 feeler::Feeler,
14 identify::{Flags, IdentifyCallback, IdentifyProtocol},
15 ping::PingHandler,
16 support_protocols::SupportProtocols,
17};
18#[cfg(not(target_family = "wasm"))]
19use crate::proxy;
20use crate::services::{
21 dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
22 protocol_type_checker::ProtocolTypeCheckerService,
23};
24use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
25use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
26use ckb_logger::{debug, error, info, trace, warn};
27use ckb_spawn::Spawn;
28use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
29use ckb_systemtime::{Duration, Instant};
30use ckb_util::{Condvar, Mutex, RwLock};
31use futures::{Future, channel::mpsc::Sender};
32use ipnetwork::IpNetwork;
33use p2p::error::TransportErrorKind;
34use p2p::{
35 SessionId, async_trait,
36 builder::ServiceBuilder,
37 bytes::Bytes,
38 context::{ServiceContext, SessionContext},
39 error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
40 multiaddr::{Multiaddr, Protocol},
41 secio::{self, PeerId, SecioKeyPair, error::SecioError},
42 service::{
43 ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
44 TargetSession,
45 },
46 traits::ServiceHandle,
47 utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
48 yamux::config::Config as YamuxConfig,
49};
50use rand::prelude::IteratorRandom;
51#[cfg(feature = "with_sentry")]
52use sentry::{Level, capture_message, with_scope};
53#[cfg(not(target_family = "wasm"))]
54use std::sync::mpsc;
55use std::{
56 borrow::Cow,
57 cmp::max,
58 collections::{HashMap, HashSet},
59 pin::Pin,
60 sync::{
61 Arc,
62 atomic::{AtomicBool, Ordering},
63 },
64 thread,
65};
66use tokio::{self, sync::oneshot};
67
68const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
69const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
70const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
72
73pub struct NetworkState {
75 pub(crate) peer_registry: RwLock<PeerRegistry>,
76 pub(crate) peer_store: Mutex<PeerStore>,
77 pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
79 dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
80 public_addrs: RwLock<HashSet<Multiaddr>>,
82 observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
83 local_private_key: secio::SecioKeyPair,
84 local_peer_id: PeerId,
85 pub(crate) bootnodes: Vec<Multiaddr>,
86 pub(crate) config: NetworkConfig,
87 pub(crate) active: AtomicBool,
88 pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
91 pub(crate) required_flags: Flags,
92}
93
94impl NetworkState {
95 #[cfg(not(target_family = "wasm"))]
97 pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
98 config.create_dir_if_not_exists()?;
99 let local_private_key = config.fetch_private_key()?;
100 let local_peer_id = local_private_key.peer_id();
101 let public_addrs: HashSet<Multiaddr> = config
103 .listen_addresses
104 .iter()
105 .chain(config.public_addresses.iter())
106 .cloned()
107 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
108 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
109 _ => {
110 match extract_peer_id(&addr) {
111 Some(peer_id) if peer_id != local_peer_id => {
112 error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
113 std::process::exit(1);
114 }
115 Some(_) => (),
116 None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
117 }
118 Some(addr)
119 }
120 })
121 .collect();
122 info!("Loading the peer store. This process may take a few seconds to complete.");
123
124 let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
125 config.peer_store_path(),
126 ));
127 info!("Loaded the peer store.");
128
129 if let Some(ref proxy_url) = config.proxy.proxy_url {
130 proxy::check_proxy_url(proxy_url).map_err(Error::Config)?;
131 }
132
133 let bootnodes = config.bootnodes();
134
135 let peer_registry = PeerRegistry::new(
136 config.max_inbound_peers(),
137 config.max_outbound_peers(),
138 config.whitelist_only,
139 config.whitelist_peers(),
140 config.disable_block_relay_only_connection,
141 );
142
143 Ok(NetworkState {
144 peer_store,
145 config,
146 bootnodes,
147 peer_registry: RwLock::new(peer_registry),
148 dialing_addrs: RwLock::new(HashMap::default()),
149 public_addrs: RwLock::new(public_addrs),
150 listened_addrs: RwLock::new(Vec::new()),
151 observed_addrs: RwLock::new(HashMap::default()),
152 local_private_key,
153 local_peer_id,
154 active: AtomicBool::new(true),
155 protocols: RwLock::new(Vec::new()),
156 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
157 })
158 }
159
160 #[cfg(target_family = "wasm")]
161 pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
162 let local_private_key = config.fetch_private_key()?;
163 let local_peer_id = local_private_key.peer_id();
164 let public_addrs: HashSet<Multiaddr> = config
166 .listen_addresses
167 .iter()
168 .chain(config.public_addresses.iter())
169 .cloned()
170 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
171 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
172 _ => {
173 if extract_peer_id(&addr).is_none() {
174 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
175 }
176 Some(addr)
177 }
178 })
179 .collect();
180 info!("Loading the peer store. This process may take a few seconds to complete.");
181 let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
182 let bootnodes = config.bootnodes();
183
184 let peer_registry = PeerRegistry::new(
185 config.max_inbound_peers(),
186 config.max_outbound_peers(),
187 config.whitelist_only,
188 config.whitelist_peers(),
189 config.disable_block_relay_only_connection,
190 );
191 Ok(NetworkState {
192 peer_store,
193 config,
194 bootnodes,
195 peer_registry: RwLock::new(peer_registry),
196 dialing_addrs: RwLock::new(HashMap::default()),
197 public_addrs: RwLock::new(public_addrs),
198 listened_addrs: RwLock::new(Vec::new()),
199 observed_addrs: RwLock::new(HashMap::default()),
200 local_private_key,
201 local_peer_id,
202 active: AtomicBool::new(true),
203 protocols: RwLock::new(Vec::new()),
204 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
205 })
206 }
207
208 pub fn required_flags(mut self, flags: Flags) -> Self {
211 self.required_flags = flags;
212 self
213 }
214
215 pub(crate) fn report_session(
216 &self,
217 p2p_control: &ServiceControl,
218 session_id: SessionId,
219 behaviour: Behaviour,
220 ) {
221 if let Some(addr) = self.with_peer_registry(|reg| {
222 reg.get_peer(session_id)
223 .filter(|peer| !peer.is_whitelist)
224 .map(|peer| peer.connected_addr.clone())
225 }) {
226 trace!("Report {:?} because {:?}", addr, behaviour);
227 let report_result = self.peer_store.lock().report(&addr, behaviour);
228 if report_result.is_banned() {
229 if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
230 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
231 }
232 }
233 } else {
234 debug!(
235 "Report {} failure: not found in peer registry or it is on the whitelist",
236 session_id
237 );
238 }
239 }
240
241 pub(crate) fn ban_session(
242 &self,
243 p2p_control: &ServiceControl,
244 session_id: SessionId,
245 duration: Duration,
246 reason: String,
247 ) {
248 if let Some(addr) = self.with_peer_registry(|reg| {
249 reg.get_peer(session_id)
250 .filter(|peer| !peer.is_whitelist)
251 .map(|peer| peer.connected_addr.clone())
252 }) {
253 info!(
254 "Ban peer {:?} for {} seconds, reason: {}",
255 addr,
256 duration.as_secs(),
257 reason
258 );
259 if let Some(metrics) = ckb_metrics::handle() {
260 metrics.ckb_network_ban_peer.inc();
261 }
262 if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
263 let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
264 self.peer_store.lock().ban_addr(
265 &peer.connected_addr,
266 duration.as_millis() as u64,
267 reason,
268 );
269 if let Err(err) =
270 disconnect_with_message(p2p_control, peer.session_id, message.as_str())
271 {
272 debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
273 }
274 }
275 } else {
276 debug!(
277 "Ban session({}) failed: not found in peer registry or it is on the whitelist",
278 session_id
279 );
280 }
281 }
282
283 pub(crate) fn accept_peer(
284 &self,
285 session_context: &SessionContext,
286 ) -> Result<Option<Peer>, Error> {
287 let mut peer_store = self.peer_store.lock();
290 let accept_peer_result = {
291 self.peer_registry.write().accept_peer(
292 session_context.address.clone(),
293 session_context.id,
294 session_context.ty,
295 &mut peer_store,
296 )
297 };
298 accept_peer_result
299 }
300
301 pub fn with_peer_registry<F, T>(&self, callback: F) -> T
303 where
304 F: FnOnce(&PeerRegistry) -> T,
305 {
306 callback(&self.peer_registry.read())
307 }
308
309 pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
311 where
312 F: FnOnce(&mut PeerRegistry) -> T,
313 {
314 callback(&mut self.peer_registry.write())
315 }
316
317 pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
319 where
320 F: FnOnce(&mut PeerStore) -> T,
321 {
322 callback(&mut self.peer_store.lock())
323 }
324
325 pub fn local_peer_id(&self) -> &PeerId {
327 &self.local_peer_id
328 }
329
330 pub fn local_private_key(&self) -> &secio::SecioKeyPair {
332 &self.local_private_key
333 }
334
335 pub fn node_id(&self) -> String {
337 self.local_peer_id().to_base58()
338 }
339
340 pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341 let public_addrs = self.public_addrs.read();
342 if public_addrs.len() <= count {
343 return public_addrs.iter().cloned().collect();
344 } else {
345 public_addrs
346 .iter()
347 .cloned()
348 .choose_multiple(&mut rand::thread_rng(), count)
349 }
350 }
351
352 pub fn add_public_addr(&self, addr: Multiaddr) {
355 self.public_addrs.write().insert(addr);
356 }
357
358 pub(crate) fn connection_status(&self) -> ConnectionStatus {
359 self.peer_registry.read().connection_status()
360 }
361
362 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
364 let listened_addrs = self.listened_addrs.read();
365 self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
366 .into_iter()
367 .filter_map(|addr| {
368 if !listened_addrs.contains(&addr) {
369 Some((addr, 1))
370 } else {
371 None
372 }
373 })
374 .chain(listened_addrs.iter().map(|addr| (addr.clone(), 1)))
375 .map(|(addr, score)| (addr.to_string(), score))
376 .collect()
377 }
378
379 pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
380 self.dial_identify(p2p_control, address);
381 }
382
383 pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
385 self.protocols
386 .read()
387 .iter()
388 .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
389 .collect::<Vec<_>>()
390 }
391
392 pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
393 let peer_id = extract_peer_id(addr);
394 if peer_id.is_none() {
395 error!("Do not dial addr without peer id, addr: {}", addr);
396 return false;
397 }
398 let peer_id = peer_id.as_ref().unwrap();
399
400 if self.local_peer_id() == peer_id {
401 trace!("Do not dial self: {:?}, {}", peer_id, addr);
402 return false;
403 }
404 if self.public_addrs.read().contains(addr) {
405 trace!(
406 "Do not dial listened address(self): {:?}, {}",
407 peer_id, addr
408 );
409 return false;
410 }
411
412 let peer_in_registry = self.with_peer_registry(|reg| {
413 reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
414 });
415 if peer_in_registry {
416 trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
417 return false;
418 }
419
420 if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
421 trace!(
422 "Do not send repeated dial commands to network service: {:?}, {}",
423 peer_id, addr
424 );
425 if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
426 #[cfg(feature = "with_sentry")]
427 with_scope(
428 |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
429 || {
430 capture_message(
431 &format!(
432 "Dialing {:?}, {:?} for more than {} seconds, \
433 something is wrong in network service",
434 peer_id,
435 addr,
436 DIAL_HANG_TIMEOUT.as_secs(),
437 ),
438 Level::Warning,
439 )
440 },
441 );
442 }
443 return false;
444 }
445
446 true
447 }
448
449 pub(crate) fn dial_success(&self, addr: &Multiaddr) {
450 if let Some(peer_id) = extract_peer_id(addr) {
451 self.dialing_addrs.write().remove(&peer_id);
452 }
453 }
454
455 pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
456 self.with_peer_registry_mut(|reg| {
457 reg.remove_feeler(addr);
458 });
459
460 if let Some(peer_id) = extract_peer_id(addr) {
461 self.dialing_addrs.write().remove(&peer_id);
462 }
463 }
464
465 fn dial_inner(
468 &self,
469 p2p_control: &ServiceControl,
470 addr: Multiaddr,
471 target: TargetProtocol,
472 ) -> Result<(), Error> {
473 if !self.can_dial(&addr) {
474 return Err(Error::Dial(format!("ignore dialing addr {addr}")));
475 }
476
477 debug!("Dialing {addr}");
478 p2p_control.dial(addr.clone(), target)?;
479 self.dialing_addrs.write().insert(
480 extract_peer_id(&addr).expect("verified addr"),
481 Instant::now(),
482 );
483 Ok(())
484 }
485
486 pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
488 if let Err(err) = self.dial_inner(
489 p2p_control,
490 addr,
491 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
492 ) {
493 debug!("dial_identify error: {err}");
494 }
495 }
496
497 pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
499 if let Err(err) = self.dial_inner(
500 p2p_control,
501 addr.clone(),
502 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
503 ) {
504 debug!("dial_feeler error {err}");
505 } else {
506 self.with_peer_registry_mut(|reg| {
507 reg.add_feeler(&addr);
508 });
509 }
510 }
511
512 pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
514 let mut pending_observed_addrs = self.observed_addrs.write();
515 pending_observed_addrs.insert(session_id, addr);
516 }
517
518 pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
520 let observed_addrs = self
521 .observed_addrs
522 .read()
523 .values()
524 .cloned()
525 .collect::<HashSet<_>>();
526 if observed_addrs.len() <= count {
527 return observed_addrs.into_iter().collect();
528 } else {
529 observed_addrs
530 .into_iter()
531 .choose_multiple(&mut rand::thread_rng(), count)
532 }
533 }
534
535 pub fn is_active(&self) -> bool {
537 self.active.load(Ordering::Acquire)
538 }
539}
540
541pub struct EventHandler {
543 pub(crate) network_state: Arc<NetworkState>,
544}
545
546impl EventHandler {
547 pub fn new(network_state: Arc<NetworkState>) -> Self {
549 Self { network_state }
550 }
551}
552
553pub trait ExitHandler: Send + Unpin + 'static {
555 fn notify_exit(&self);
557}
558
559#[derive(Clone, Default)]
561pub struct DefaultExitHandler {
562 lock: Arc<Mutex<()>>,
563 exit: Arc<Condvar>,
564}
565
566impl DefaultExitHandler {
567 pub fn wait_for_exit(&self) {
569 self.exit.wait(&mut self.lock.lock());
570 }
571}
572
573impl ExitHandler for DefaultExitHandler {
574 fn notify_exit(&self) {
575 self.exit.notify_all();
576 }
577}
578
579impl EventHandler {
580 fn inbound_eviction(&self) -> Vec<PeerIndex> {
581 if self.network_state.config.bootnode_mode {
582 let status = self.network_state.connection_status();
583
584 if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
585 self.network_state
586 .with_peer_registry(|registry| {
587 registry
588 .peers()
589 .values()
590 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
591 .map(|peer| peer.session_id)
592 .collect::<Vec<SessionId>>()
593 })
594 .into_iter()
595 .enumerate()
596 .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
597 .collect()
598 } else {
599 Vec::new()
600 }
601 } else {
602 Vec::new()
603 }
604 }
605}
606
607#[async_trait]
608impl ServiceHandle for EventHandler {
609 async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
610 match error {
611 ServiceError::DialerError { address, error } => {
612 match error {
613 DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
614 SecioError::ConnectSelf,
615 )) => {
616 debug!("dial observed address success: {:?}", address);
617 }
618 DialerErrorKind::IoError(e)
619 if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
620 {
621 warn!("DialerError({}) {}", address, e);
622 }
623 DialerErrorKind::TransportError(e)
624 if matches!(&e, TransportErrorKind::ProxyError(_proxy_err)) =>
625 {
626 let err = e.to_string();
627 if err.contains("failed to establish connection to target:General failure")
628 || err.contains(
629 "failed to establish connection to target:Connection refused",
630 )
631 {
632 debug!("DialerError({}) {}", address, e);
633 } else {
634 error!("Is the proxy server down? DialerError({}) {}", address, e);
635 }
636 }
637 _ => {
638 debug!("DialerError({}) {}", address, error);
639 }
640 }
641 self.network_state.dial_failed(&address);
642 }
643 ServiceError::ProtocolError {
644 id,
645 proto_id,
646 error,
647 } => {
648 debug!("ProtocolError({}, {}) {}", id, proto_id, error);
649 let message = format!("ProtocolError id={proto_id}");
650 self.network_state.ban_session(
652 &context.control().clone().into(),
653 id,
654 Duration::from_secs(300),
655 message,
656 );
657 }
658 ServiceError::SessionTimeout { session_context } => {
659 debug!(
660 "SessionTimeout({}, {})",
661 session_context.id, session_context.address,
662 );
663 }
664 ServiceError::MuxerError {
665 session_context,
666 error,
667 } => {
668 debug!(
669 "MuxerError({}, {}), substream error {}, disconnect it",
670 session_context.id, session_context.address, error,
671 );
672 }
673 ServiceError::ListenError { address, error } => {
674 debug!("ListenError: address={:?}, error={:?}", address, error);
675 }
676 ServiceError::ProtocolSelectError {
677 proto_name,
678 session_context,
679 } => {
680 debug!(
681 "ProtocolSelectError: proto_name={:?}, session_id={}",
682 proto_name, session_context.id,
683 );
684 }
685 ServiceError::SessionBlocked { session_context } => {
686 debug!("SessionBlocked: {}", session_context.id);
687 }
688 ServiceError::ProtocolHandleError { proto_id, error } => {
689 debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
690
691 let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
692 {
693 if let Some(id) = opt_session_id {
694 self.network_state.ban_session(
695 &context.control().clone().into(),
696 id,
697 Duration::from_secs(300),
698 format!("protocol {proto_id} panic when process peer message"),
699 );
700 }
701 #[cfg(feature = "with_sentry")]
702 with_scope(
703 |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
704 || {
705 capture_message(
706 &format!(
707 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
708 ),
709 Level::Warning,
710 )
711 },
712 );
713 error!(
714 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
715 );
716
717 broadcast_exit_signals();
718 }
719 }
720 }
721 }
722
723 async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
724 match event {
726 ServiceEvent::SessionOpen { session_context } => {
727 debug!(
728 "SessionOpen({}, {})",
729 session_context.id, session_context.address,
730 );
731
732 self.network_state.dial_success(&session_context.address);
733
734 let iter = self.inbound_eviction();
735
736 let control = context.control().clone().into();
737
738 for peer in iter {
739 if let Err(err) =
740 disconnect_with_message(&control, peer, "bootnode random eviction")
741 {
742 debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
743 }
744 }
745
746 if self
747 .network_state
748 .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
749 {
750 debug!(
751 "Feeler connected {} => {}",
752 session_context.id, session_context.address,
753 );
754 } else {
755 match self.network_state.accept_peer(&session_context) {
756 Ok(Some(evicted_peer)) => {
757 debug!(
758 "Disconnect peer, {} => {}",
759 evicted_peer.session_id, evicted_peer.connected_addr,
760 );
761 if let Err(err) = disconnect_with_message(
762 &control,
763 evicted_peer.session_id,
764 "evict because accepted better peer",
765 ) {
766 debug!(
767 "Disconnect failed {:?}, error: {:?}",
768 evicted_peer.session_id, err
769 );
770 }
771 }
772 Ok(None) => debug!(
773 "{} open, registry {} success",
774 session_context.id, session_context.address,
775 ),
776 Err(err) => {
777 debug!(
778 "Peer registry failed {:?}. Disconnect {} => {}",
779 err, session_context.id, session_context.address,
780 );
781 if let Err(err) = disconnect_with_message(
782 &control,
783 session_context.id,
784 "reject peer connection",
785 ) {
786 debug!(
787 "Disconnect failed {:?}, error: {:?}",
788 session_context.id, err
789 );
790 }
791 }
792 }
793 }
794 }
795 ServiceEvent::SessionClose { session_context } => {
796 debug!(
797 "SessionClose({}, {})",
798 session_context.id, session_context.address,
799 );
800 let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
801 reg.remove_feeler(&session_context.address);
803 reg.remove_peer(session_context.id).is_some()
804 });
805 if peer_exists {
806 debug!(
807 "{} closed. Remove {} from peer_registry",
808 session_context.id, session_context.address,
809 );
810 self.network_state.with_peer_store_mut(|peer_store| {
811 peer_store.remove_disconnected_peer(&session_context.address);
812 });
813 }
814 self.network_state
815 .observed_addrs
816 .write()
817 .remove(&session_context.id);
818 }
819 _ => {
820 info!("p2p service event: {:?}", event);
821 }
822 }
823 }
824}
825
826pub struct NetworkService {
828 p2p_service: Service<EventHandler, SecioKeyPair>,
829 network_state: Arc<NetworkState>,
830 ping_controller: Option<Sender<()>>,
831 bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
833 version: String,
834}
835
836impl NetworkService {
837 pub fn new(
839 network_state: Arc<NetworkState>,
840 protocols: Vec<CKBProtocol>,
841 required_protocol_ids: Vec<ProtocolId>,
842 identify_announce: (String, String, Flags),
844 transport_type: TransportType,
845 ) -> Self {
846 let config = &network_state.config;
847
848 if config.support_protocols.iter().collect::<HashSet<_>>()
849 != default_support_all_protocols()
850 .iter()
851 .collect::<HashSet<_>>()
852 {
853 warn!(
854 "Customized supported protocols: {:?}",
855 config.support_protocols
856 );
857 }
858
859 let mut protocol_metas = protocols
861 .into_iter()
862 .map(CKBProtocol::build)
863 .collect::<Vec<_>>();
864
865 let identify_callback = IdentifyCallback::new(
869 Arc::clone(&network_state),
870 identify_announce.0,
871 identify_announce.1.clone(),
872 identify_announce.2,
873 );
874 let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
875 ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
876 });
877 protocol_metas.push(identify_meta);
878
879 let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
881 let ping_interval = Duration::from_secs(config.ping_interval_secs);
882 let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
883
884 let ping_network_state = Arc::clone(&network_state);
885 let (ping_handler, ping_controller) =
886 PingHandler::new(ping_interval, ping_timeout, ping_network_state);
887 let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
888 ProtocolHandle::Callback(Box::new(ping_handler))
889 });
890 protocol_metas.push(ping_meta);
891 Some(ping_controller)
892 } else {
893 None
894 };
895
896 if config
898 .support_protocols
899 .contains(&SupportProtocol::Discovery)
900 {
901 let addr_mgr = DiscoveryAddressManager {
902 network_state: Arc::clone(&network_state),
903 discovery_local_address: config.discovery_local_address,
904 };
905 let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
906 ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
907 addr_mgr,
908 config
909 .discovery_announce_check_interval_secs
910 .map(Duration::from_secs),
911 )))
912 });
913 protocol_metas.push(disc_meta);
914 }
915
916 if config.support_protocols.contains(&SupportProtocol::Feeler) {
918 let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
919 let network_state = Arc::clone(&network_state);
920 move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
921 });
922 protocol_metas.push(feeler_meta);
923 }
924
925 if config
927 .support_protocols
928 .contains(&SupportProtocol::DisconnectMessage)
929 {
930 let disconnect_message_state = Arc::clone(&network_state);
931 let disconnect_message_meta = SupportProtocols::DisconnectMessage
932 .build_meta_with_service_handle(move || {
933 ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
934 disconnect_message_state,
935 )))
936 });
937 protocol_metas.push(disconnect_message_meta);
938 }
939
940 #[cfg(not(target_family = "wasm"))]
942 if config
943 .support_protocols
944 .contains(&SupportProtocol::HolePunching)
945 {
946 let hole_punching_state = Arc::clone(&network_state);
947 let hole_punching_meta =
948 SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
949 ProtocolHandle::Callback(Box::new(
950 crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
951 ))
952 });
953 protocol_metas.push(hole_punching_meta);
954 }
955
956 let mut service_builder = ServiceBuilder::default();
957 let yamux_config = YamuxConfig {
958 max_stream_count: protocol_metas.len(),
959 max_stream_window_size: 1024 * 1024,
960 ..Default::default()
961 };
962 for meta in protocol_metas.into_iter() {
963 network_state
964 .protocols
965 .write()
966 .push((meta.id(), meta.name(), meta.support_versions()));
967 service_builder = service_builder.insert_protocol(meta);
968 }
969 let event_handler = EventHandler {
970 network_state: Arc::clone(&network_state),
971 };
972 service_builder = service_builder
973 .handshake_type(network_state.local_private_key.clone().into())
974 .yamux_config(yamux_config)
975 .forever(true)
976 .max_connection_number(1024)
977 .set_send_buffer_size(config.max_send_buffer())
978 .set_channel_size(config.channel_size())
979 .timeout(Duration::from_secs(5))
980 .onion_timeout(Duration::from_secs(120));
981
982 #[cfg(not(target_family = "wasm"))]
983 {
984 service_builder = service_builder.upnp(config.upnp);
985
986 if let Some(proxy_url) = &config.proxy.proxy_url {
989 service_builder = service_builder
990 .tcp_proxy_config(proxy_url)
991 .tcp_proxy_random_auth(config.proxy.proxy_random_auth);
992 info!(
993 "set tcp_proxy_config: {:?}, proxy_random_auth: {}",
994 config.proxy.proxy_url.clone(),
995 config.proxy.proxy_random_auth
996 );
997 };
998
999 let onion_proxy_url = {
1000 config.onion.onion_server.clone().map(|onion_server| {
1001 if !onion_server.starts_with("socks5://") {
1002 format!("socks5://{}", onion_server)
1003 } else {
1004 onion_server
1005 }
1006 })
1007 };
1008 if let Some(onion_proxy_url) = onion_proxy_url {
1009 info!("set tcp_onion_config: {:?}", onion_proxy_url);
1010 service_builder = service_builder.tcp_onion_config(&onion_proxy_url);
1011 }
1012 }
1013
1014 #[cfg(target_os = "linux")]
1015 let p2p_service = {
1016 if config.reuse_port_on_linux {
1017 let iter = config.listen_addresses.iter();
1018
1019 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1020 enum BindType {
1021 None,
1022 Ws,
1023 Tcp,
1024 Both,
1025 }
1026 impl BindType {
1027 fn transform(&mut self, other: TransportType) {
1028 match (&self, other) {
1029 (BindType::None, TransportType::Ws) => *self = BindType::Ws,
1030 (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
1031 (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
1032 (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
1033 _ => (),
1034 }
1035 }
1036
1037 fn is_ready(&self) -> bool {
1038 matches!(self, BindType::Both)
1040 }
1041 }
1042
1043 let mut init = BindType::None;
1044
1045 let proxy_config_enable =
1046 config.proxy.proxy_url.is_some() || config.onion.onion_server.is_some();
1047
1048 let bind_fn_with_addr =
1049 move |socket: p2p::service::TcpSocket,
1050 ctxt: p2p::service::TransformerContext,
1051 addr: std::net::SocketAddr| {
1052 let socket_ref = socket2::SockRef::from(&socket);
1053 #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
1054 socket_ref.set_reuse_port(true)?;
1055 socket_ref.set_reuse_address(true)?;
1056 match ctxt.state {
1057 p2p::service::SocketState::Listen => Ok(socket),
1058 p2p::service::SocketState::Dial => {
1059 let domain = socket2::Domain::for_address(addr);
1060 if socket_ref.domain()? == domain {
1061 if proxy_config_enable {
1062 debug!("skip bind since proxy is enabled");
1064 } else {
1065 socket_ref.bind(&addr.into())?;
1066 }
1067 }
1068 Ok(socket)
1069 }
1070 }
1071 };
1072
1073 for multi_addr in iter {
1074 if init.is_ready() {
1075 break;
1076 }
1077
1078 match find_type(multi_addr) {
1079 TransportType::Tcp => {
1080 if matches!(init, BindType::Tcp) {
1082 continue;
1083 }
1084 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1085 let bind_fn = move |socket: p2p::service::TcpSocket,
1086 ctxt: p2p::service::TransformerContext| {
1087 bind_fn_with_addr(socket, ctxt, addr)
1088 };
1089 init.transform(TransportType::Tcp);
1090 service_builder = service_builder.tcp_config(bind_fn);
1091 }
1092 }
1093 TransportType::Ws | TransportType::Wss => {
1094 if matches!(init, BindType::Ws) {
1096 continue;
1097 }
1098 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1099 let bind_fn = move |socket: p2p::service::TcpSocket,
1100 ctxt: p2p::service::TransformerContext| {
1101 bind_fn_with_addr(socket, ctxt, addr)
1102 };
1103 init.transform(TransportType::Ws);
1104 service_builder = service_builder.tcp_config_on_ws(bind_fn);
1105 }
1106 }
1107 }
1108 }
1109 }
1110
1111 service_builder.build(event_handler)
1112 };
1113
1114 #[cfg(not(target_os = "linux"))]
1115 let p2p_service = service_builder.build(event_handler);
1122
1123 let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1125 let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1126 Arc::clone(&network_state),
1127 p2p_service.control().to_owned().into(),
1128 required_protocol_ids,
1129 );
1130 let mut bg_services = vec![
1131 Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1132 Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1133 ];
1134 if config.outbound_peer_service_enabled() {
1135 let outbound_peer_service = OutboundPeerService::new(
1136 Arc::clone(&network_state),
1137 p2p_service.control().to_owned().into(),
1138 Duration::from_secs(config.connect_outbound_interval_secs),
1139 transport_type,
1140 );
1141 bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1142 };
1143
1144 #[cfg(feature = "with_dns_seeding")]
1145 if config.dns_seeding_service_enabled() {
1146 let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1147 Arc::clone(&network_state),
1148 config.dns_seeds.clone(),
1149 );
1150 bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1151 };
1152
1153 NetworkService {
1154 p2p_service,
1155 network_state,
1156 ping_controller,
1157 bg_services,
1158 version: identify_announce.1,
1159 }
1160 }
1161
1162 pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1164 let config = self.network_state.config.clone();
1165
1166 let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1167
1168 for addr in self.network_state.config.whitelist_peers() {
1170 debug!("Dial whitelist_peers {:?}", addr);
1171 self.network_state.dial_identify(&p2p_control, addr);
1172 }
1173
1174 let target = &self.network_state.required_flags;
1175
1176 let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1179 let count = max((config.max_outbound_peers >> 1) as usize, 1);
1180 let mut addrs: Vec<_> = peer_store
1181 .fetch_addrs_to_attempt(count, *target, |_| true)
1182 .into_iter()
1183 .map(|paddr| paddr.addr)
1184 .collect();
1185 let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1187 addrs.extend(anchors);
1188 let bootnodes = self
1190 .network_state
1191 .bootnodes
1192 .iter()
1193 .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1194 .into_iter()
1195 .cloned();
1196 addrs.extend(bootnodes);
1197 addrs
1198 });
1199
1200 for addr in bootnodes {
1202 debug!("Dial bootnode {:?}", addr);
1203 self.network_state.dial_identify(&p2p_control, addr);
1204 }
1205
1206 let Self {
1207 mut p2p_service,
1208 network_state,
1209 ping_controller,
1210 bg_services,
1211 version,
1212 } = self;
1213
1214 let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1216 .into_iter()
1217 .map(|bg_service| {
1218 let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1219 (signal_sender, (bg_service, signal_receiver))
1220 })
1221 .unzip();
1222
1223 let receiver: CancellationToken = new_tokio_exit_rx();
1224 #[cfg(not(target_family = "wasm"))]
1225 let (start_sender, start_receiver) = mpsc::channel();
1226 {
1227 #[cfg(not(target_family = "wasm"))]
1228 let network_state = Arc::clone(&network_state);
1229 let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1230 handle.spawn_task(async move {
1231 #[cfg(not(target_family = "wasm"))]
1232 {
1233 let listen_addresses = {
1234 let mut addresses = config.listen_addresses.clone();
1235 if config.reuse_tcp_with_ws {
1236 let ws_listens = addresses
1237 .iter()
1238 .cloned()
1239 .filter_map(|mut addr| {
1240 if matches!(find_type(&addr), TransportType::Tcp) {
1241 addr.push(Protocol::Ws);
1242 Some(addr)
1243 } else {
1244 None
1245 }
1246 })
1247 .collect::<Vec<_>>();
1248
1249 addresses.extend(ws_listens);
1250 }
1251 let mut addresses = addresses
1252 .into_iter()
1253 .collect::<HashSet<_>>()
1254 .into_iter()
1255 .collect::<Vec<_>>();
1256 addresses.sort_by(|a, b| {
1257 let ty_a = find_type(a);
1258 let ty_b = find_type(b);
1259
1260 ty_a.cmp(&ty_b)
1261 });
1262
1263 addresses
1264 };
1265
1266 for addr in &listen_addresses {
1267 match p2p_service.listen(addr.to_owned()).await {
1268 Ok(listen_address) => {
1269 info!("Listen on address: {}", listen_address);
1270 network_state
1271 .listened_addrs
1272 .write()
1273 .push(listen_address.clone());
1274 }
1275 Err(err) => {
1276 warn!(
1277 "Listen on address {} failed, due to error: {}",
1278 addr.clone(),
1279 err
1280 );
1281 start_sender
1282 .send(Err(Error::P2P(P2PError::Transport(err))))
1283 .expect("channel abnormal shutdown");
1284 return;
1285 }
1286 };
1287 }
1288 start_sender.send(Ok(())).unwrap();
1289 }
1290
1291 p2p::runtime::spawn(async move { p2p_service.run().await });
1292 tokio::select! {
1293 _ = receiver.cancelled() => {
1294 info!("NetworkService receive exit signal, start shutdown...");
1295 let _ = p2p_control.shutdown().await;
1296 drop(bg_signals);
1298
1299 info!("NetworkService shutdown now");
1300 },
1301 else => {
1302 let _ = p2p_control.shutdown().await;
1303 drop(bg_signals);
1305 },
1306 }
1307 });
1308 }
1309 for (mut service, mut receiver) in bg_receivers {
1310 handle.spawn_task(async move {
1311 loop {
1312 tokio::select! {
1313 _ = &mut service => {},
1314 _ = &mut receiver => break
1315 }
1316 }
1317 });
1318 }
1319 #[cfg(not(target_family = "wasm"))]
1320 if let Ok(Err(e)) = start_receiver.recv() {
1321 return Err(e);
1322 }
1323
1324 Ok(NetworkController {
1325 version,
1326 network_state,
1327 p2p_control,
1328 ping_controller,
1329 })
1330 }
1331}
1332
1333#[derive(Clone)]
1335pub struct NetworkController {
1336 version: String,
1337 network_state: Arc<NetworkState>,
1338 p2p_control: ServiceControl,
1339 ping_controller: Option<Sender<()>>,
1340}
1341
1342impl NetworkController {
1343 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1345 self.network_state.public_urls(max_urls)
1346 }
1347
1348 pub fn version(&self) -> &String {
1350 &self.version
1351 }
1352
1353 pub fn node_id(&self) -> String {
1355 self.network_state.node_id()
1356 }
1357
1358 pub fn p2p_control(&self) -> &ServiceControl {
1360 &self.p2p_control
1361 }
1362
1363 pub fn async_p2p_control(&self) -> ServiceAsyncControl {
1365 self.p2p_control.clone().into()
1366 }
1367
1368 pub fn add_node(&self, address: Multiaddr) {
1370 self.network_state.add_node(&self.p2p_control, address)
1371 }
1372
1373 pub fn add_public_addr(&self, public_addr: Multiaddr) {
1375 self.network_state.add_public_addr(public_addr)
1376 }
1377
1378 pub fn remove_node(&self, peer_id: &PeerId) {
1380 if let Some(session_id) = self
1381 .network_state
1382 .peer_registry
1383 .read()
1384 .get_key_by_peer_id(peer_id)
1385 {
1386 if let Err(err) =
1387 disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1388 {
1389 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1390 }
1391 } else {
1392 error!("Cannot find peer {:?}", peer_id);
1393 }
1394 }
1395
1396 pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1398 self.network_state
1399 .peer_store
1400 .lock()
1401 .ban_list()
1402 .get_banned_addrs()
1403 }
1404
1405 pub fn clear_banned_addrs(&self) {
1407 self.network_state.peer_store.lock().clear_ban_list();
1408 }
1409
1410 pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1412 self.network_state
1413 .peer_store
1414 .lock()
1415 .addr_manager()
1416 .get(addr)
1417 .cloned()
1418 }
1419
1420 pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1422 self.disconnect_peers_in_ip_range(address, &ban_reason);
1423 self.network_state
1424 .peer_store
1425 .lock()
1426 .ban_network(address, ban_until, ban_reason)
1427 }
1428
1429 pub fn unban(&self, address: &IpNetwork) {
1431 self.network_state
1432 .peer_store
1433 .lock()
1434 .mut_ban_list()
1435 .unban_network(address);
1436 }
1437
1438 pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1440 self.network_state.with_peer_registry(|reg| {
1441 reg.peers()
1442 .iter()
1443 .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1444 .collect::<Vec<_>>()
1445 })
1446 }
1447
1448 pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1450 self.network_state
1451 .ban_session(&self.p2p_control, peer_index, duration, reason);
1452 }
1453
1454 fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1456 self.network_state.with_peer_registry(|reg| {
1457 reg.peers().iter().for_each(|(peer_index, peer)| {
1458 if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1459 if address.contains(addr.ip()) {
1460 let _ = disconnect_with_message(
1461 &self.p2p_control,
1462 *peer_index,
1463 &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1464 );
1465 }
1466 }
1467 })
1468 });
1469 }
1470
1471 fn try_broadcast(
1472 &self,
1473 quick: bool,
1474 target: Option<SessionId>,
1475 proto_id: ProtocolId,
1476 data: Bytes,
1477 ) -> Result<(), SendErrorKind> {
1478 let now = Instant::now();
1479 loop {
1480 let target = target
1481 .map(TargetSession::Single)
1482 .unwrap_or(TargetSession::All);
1483 let result = if quick {
1484 self.p2p_control
1485 .quick_filter_broadcast(target, proto_id, data.clone())
1486 } else {
1487 self.p2p_control
1488 .filter_broadcast(target, proto_id, data.clone())
1489 };
1490 match result {
1491 Ok(()) => {
1492 return Ok(());
1493 }
1494 Err(SendErrorKind::WouldBlock) => {
1495 if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1496 warn!("Broadcast message to {} timeout", proto_id);
1497 return Err(SendErrorKind::WouldBlock);
1498 }
1499 thread::sleep(P2P_TRY_SEND_INTERVAL);
1500 }
1501 Err(err) => {
1502 warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1503 return Err(err);
1504 }
1505 }
1506 }
1507 }
1508
1509 fn broadcast_inner<S: Spawn>(
1510 &self,
1511 quick: bool,
1512 target: TargetSession,
1513 proto_id: ProtocolId,
1514 data: Bytes,
1515 handle: &S,
1516 ) {
1517 let async_control: ServiceAsyncControl = self.p2p_control.clone().into();
1518
1519 handle.spawn_task(async move {
1520 if quick {
1521 let _ignore = async_control
1522 .quick_filter_broadcast(target, proto_id, data)
1523 .await;
1524 } else {
1525 let _ignore = async_control.filter_broadcast(target, proto_id, data).await;
1526 }
1527 })
1528 }
1529
1530 pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1532 self.try_broadcast(false, None, proto_id, data)
1533 }
1534
1535 pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1537 self.try_broadcast(true, None, proto_id, data)
1538 }
1539
1540 pub fn send_message_to(
1542 &self,
1543 session_id: SessionId,
1544 proto_id: ProtocolId,
1545 data: Bytes,
1546 ) -> Result<(), SendErrorKind> {
1547 self.try_broadcast(false, Some(session_id), proto_id, data)
1548 }
1549
1550 pub fn broadcast_with_handle<S: Spawn>(&self, proto_id: ProtocolId, data: Bytes, handle: &S) {
1552 self.broadcast_inner(false, TargetSession::All, proto_id, data, handle)
1553 }
1554
1555 pub fn quick_broadcast_with_handle<S: Spawn>(
1557 &self,
1558 proto_id: ProtocolId,
1559 data: Bytes,
1560 handle: &S,
1561 ) {
1562 self.broadcast_inner(true, TargetSession::All, proto_id, data, handle)
1563 }
1564
1565 pub fn send_message_to_with_handle<S: Spawn>(
1567 &self,
1568 session_id: SessionId,
1569 proto_id: ProtocolId,
1570 data: Bytes,
1571 handle: &S,
1572 ) {
1573 self.broadcast_inner(
1574 false,
1575 TargetSession::Single(session_id),
1576 proto_id,
1577 data,
1578 handle,
1579 )
1580 }
1581
1582 pub fn is_active(&self) -> bool {
1584 self.network_state.is_active()
1585 }
1586
1587 pub fn set_active(&self, active: bool) {
1589 self.network_state.active.store(active, Ordering::Release);
1590 }
1591
1592 pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1594 self.network_state.protocols.read().clone()
1595 }
1596
1597 pub fn ping_peers(&self) {
1599 if let Some(mut ping_controller) = self.ping_controller.clone() {
1600 let _ignore = ping_controller.try_send(());
1601 }
1602 }
1603}
1604
1605pub(crate) fn disconnect_with_message(
1607 control: &ServiceControl,
1608 peer_index: SessionId,
1609 message: &str,
1610) -> Result<(), SendErrorKind> {
1611 if !message.is_empty() {
1612 let data = Bytes::from(message.as_bytes().to_vec());
1613 control.quick_send_message_to(
1615 peer_index,
1616 SupportProtocols::DisconnectMessage.protocol_id(),
1617 data,
1618 )?;
1619 }
1620 control.disconnect(peer_index)
1621}
1622
1623pub(crate) async fn async_disconnect_with_message(
1624 control: &ServiceAsyncControl,
1625 peer_index: SessionId,
1626 message: &str,
1627) -> Result<(), SendErrorKind> {
1628 if !message.is_empty() {
1629 let data = Bytes::from(message.as_bytes().to_vec());
1630 control
1632 .quick_send_message_to(
1633 peer_index,
1634 SupportProtocols::DisconnectMessage.protocol_id(),
1635 data,
1636 )
1637 .await?;
1638 }
1639 control.disconnect(peer_index).await
1640}
1641
1642#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1644pub enum TransportType {
1645 Tcp,
1647 Ws,
1649 Wss,
1651}
1652
1653#[allow(dead_code)]
1654pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1655 let mut iter = addr.iter();
1656
1657 iter.find_map(|proto| match proto {
1658 Protocol::Ws => Some(TransportType::Ws),
1659 Protocol::Wss => Some(TransportType::Wss),
1660 _ => None,
1661 })
1662 .unwrap_or(TransportType::Tcp)
1663}