1use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7 PeerStore,
8 types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11 disconnect_message::DisconnectMessageProtocol,
12 discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13 feeler::Feeler,
14 identify::{Flags, IdentifyCallback, IdentifyProtocol},
15 ping::PingHandler,
16 support_protocols::SupportProtocols,
17};
18#[cfg(not(target_family = "wasm"))]
19use crate::proxy;
20use crate::services::{
21 dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
22 protocol_type_checker::ProtocolTypeCheckerService,
23};
24use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
25use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
26use ckb_logger::{debug, error, info, trace, warn};
27use ckb_spawn::Spawn;
28use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
29use ckb_systemtime::{Duration, Instant};
30use ckb_util::{Condvar, Mutex, RwLock};
31use futures::{Future, channel::mpsc::Sender};
32use ipnetwork::IpNetwork;
33use p2p::error::TransportErrorKind;
34use p2p::{
35 SessionId, async_trait,
36 builder::ServiceBuilder,
37 bytes::Bytes,
38 context::{ServiceContext, SessionContext},
39 error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
40 multiaddr::{Multiaddr, Protocol},
41 secio::{self, PeerId, SecioKeyPair, error::SecioError},
42 service::{
43 ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
44 TargetSession,
45 },
46 traits::ServiceHandle,
47 utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
48 yamux::config::Config as YamuxConfig,
49};
50use rand::prelude::IteratorRandom;
51#[cfg(feature = "with_sentry")]
52use sentry::{Level, capture_message, with_scope};
53#[cfg(not(target_family = "wasm"))]
54use std::sync::mpsc;
55use std::{
56 borrow::Cow,
57 cmp::max,
58 collections::{HashMap, HashSet},
59 pin::Pin,
60 sync::{
61 Arc,
62 atomic::{AtomicBool, Ordering},
63 },
64 thread,
65};
66use tokio::{self, sync::oneshot};
67
68const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
69const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
70const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
72
73pub struct NetworkState {
75 pub(crate) peer_registry: RwLock<PeerRegistry>,
76 pub(crate) peer_store: Mutex<PeerStore>,
77 pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
79 dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
80 public_addrs: RwLock<HashSet<Multiaddr>>,
82 observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
83 local_private_key: secio::SecioKeyPair,
84 local_peer_id: PeerId,
85 pub(crate) bootnodes: Vec<Multiaddr>,
86 pub(crate) config: NetworkConfig,
87 pub(crate) active: AtomicBool,
88 pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
91 pub(crate) required_flags: Flags,
92}
93
94impl NetworkState {
95 #[cfg(not(target_family = "wasm"))]
97 pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
98 config.create_dir_if_not_exists()?;
99 let local_private_key = config.fetch_private_key()?;
100 let local_peer_id = local_private_key.peer_id();
101 let public_addrs: HashSet<Multiaddr> = config
103 .listen_addresses
104 .iter()
105 .chain(config.public_addresses.iter())
106 .cloned()
107 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
108 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
109 _ => {
110 match extract_peer_id(&addr) {
111 Some(peer_id) if peer_id != local_peer_id => {
112 error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
113 std::process::exit(1);
114 }
115 Some(_) => (),
116 None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
117 }
118 Some(addr)
119 }
120 })
121 .collect();
122 info!("Loading the peer store. This process may take a few seconds to complete.");
123
124 let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
125 config.peer_store_path(),
126 ));
127 info!("Loaded the peer store.");
128
129 if let Some(ref proxy_url) = config.proxy.proxy_url {
130 proxy::check_proxy_url(proxy_url).map_err(Error::Config)?;
131 }
132
133 let bootnodes = config.bootnodes();
134
135 let peer_registry = PeerRegistry::new(
136 config.max_inbound_peers(),
137 config.max_outbound_peers(),
138 config.whitelist_only,
139 config.whitelist_peers(),
140 config.disable_block_relay_only_connection,
141 );
142
143 Ok(NetworkState {
144 peer_store,
145 config,
146 bootnodes,
147 peer_registry: RwLock::new(peer_registry),
148 dialing_addrs: RwLock::new(HashMap::default()),
149 public_addrs: RwLock::new(public_addrs),
150 listened_addrs: RwLock::new(Vec::new()),
151 observed_addrs: RwLock::new(HashMap::default()),
152 local_private_key,
153 local_peer_id,
154 active: AtomicBool::new(true),
155 protocols: RwLock::new(Vec::new()),
156 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
157 })
158 }
159
160 #[cfg(target_family = "wasm")]
161 pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
162 let local_private_key = config.fetch_private_key()?;
163 let local_peer_id = local_private_key.peer_id();
164 let public_addrs: HashSet<Multiaddr> = config
166 .listen_addresses
167 .iter()
168 .chain(config.public_addresses.iter())
169 .cloned()
170 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
171 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
172 _ => {
173 if extract_peer_id(&addr).is_none() {
174 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
175 }
176 Some(addr)
177 }
178 })
179 .collect();
180 info!("Loading the peer store. This process may take a few seconds to complete.");
181 let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
182 let bootnodes = config.bootnodes();
183
184 let peer_registry = PeerRegistry::new(
185 config.max_inbound_peers(),
186 config.max_outbound_peers(),
187 config.whitelist_only,
188 config.whitelist_peers(),
189 config.disable_block_relay_only_connection,
190 );
191 Ok(NetworkState {
192 peer_store,
193 config,
194 bootnodes,
195 peer_registry: RwLock::new(peer_registry),
196 dialing_addrs: RwLock::new(HashMap::default()),
197 public_addrs: RwLock::new(public_addrs),
198 listened_addrs: RwLock::new(Vec::new()),
199 observed_addrs: RwLock::new(HashMap::default()),
200 local_private_key,
201 local_peer_id,
202 active: AtomicBool::new(true),
203 protocols: RwLock::new(Vec::new()),
204 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
205 })
206 }
207
208 pub fn required_flags(mut self, flags: Flags) -> Self {
211 self.required_flags = flags;
212 self
213 }
214
215 pub(crate) fn report_session(
216 &self,
217 p2p_control: &ServiceControl,
218 session_id: SessionId,
219 behaviour: Behaviour,
220 ) {
221 if let Some(addr) = self.with_peer_registry(|reg| {
222 reg.get_peer(session_id)
223 .filter(|peer| !peer.is_whitelist)
224 .map(|peer| peer.connected_addr.clone())
225 }) {
226 trace!("Report {:?} because {:?}", addr, behaviour);
227 let report_result = self.peer_store.lock().report(&addr, behaviour);
228 if report_result.is_banned()
229 && let Err(err) = disconnect_with_message(p2p_control, session_id, "banned")
230 {
231 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
232 }
233 } else {
234 debug!(
235 "Report {} failure: not found in peer registry or it is on the whitelist",
236 session_id
237 );
238 }
239 }
240
241 pub(crate) fn ban_session(
242 &self,
243 p2p_control: &ServiceControl,
244 session_id: SessionId,
245 duration: Duration,
246 reason: String,
247 ) {
248 if let Some(addr) = self.with_peer_registry(|reg| {
249 reg.get_peer(session_id)
250 .filter(|peer| !peer.is_whitelist)
251 .map(|peer| peer.connected_addr.clone())
252 }) {
253 info!(
254 "Ban peer {:?} for {} seconds, reason: {}",
255 addr,
256 duration.as_secs(),
257 reason
258 );
259 if let Some(metrics) = ckb_metrics::handle() {
260 metrics.ckb_network_ban_peer.inc();
261 }
262 if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
263 let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
264 self.peer_store.lock().ban_addr(
265 &peer.connected_addr,
266 duration.as_millis() as u64,
267 reason,
268 );
269 if let Err(err) =
270 disconnect_with_message(p2p_control, peer.session_id, message.as_str())
271 {
272 debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
273 }
274 }
275 } else {
276 debug!(
277 "Ban session({}) failed: not found in peer registry or it is on the whitelist",
278 session_id
279 );
280 }
281 }
282
283 pub(crate) fn accept_peer(
284 &self,
285 session_context: &SessionContext,
286 ) -> Result<Option<Peer>, Error> {
287 let mut peer_store = self.peer_store.lock();
290
291 {
292 self.peer_registry.write().accept_peer(
293 session_context.address.clone(),
294 session_context.id,
295 session_context.ty,
296 &mut peer_store,
297 )
298 }
299 }
300
301 pub fn with_peer_registry<F, T>(&self, callback: F) -> T
303 where
304 F: FnOnce(&PeerRegistry) -> T,
305 {
306 callback(&self.peer_registry.read())
307 }
308
309 pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
311 where
312 F: FnOnce(&mut PeerRegistry) -> T,
313 {
314 callback(&mut self.peer_registry.write())
315 }
316
317 pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
319 where
320 F: FnOnce(&mut PeerStore) -> T,
321 {
322 callback(&mut self.peer_store.lock())
323 }
324
325 pub fn local_peer_id(&self) -> &PeerId {
327 &self.local_peer_id
328 }
329
330 pub fn local_private_key(&self) -> &secio::SecioKeyPair {
332 &self.local_private_key
333 }
334
335 pub fn node_id(&self) -> String {
337 self.local_peer_id().to_base58()
338 }
339
340 pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341 let public_addrs = self.public_addrs.read();
342 if public_addrs.len() <= count {
343 return public_addrs.iter().cloned().collect();
344 } else {
345 public_addrs
346 .iter()
347 .cloned()
348 .choose_multiple(&mut rand::thread_rng(), count)
349 }
350 }
351
352 pub fn add_public_addr(&self, addr: Multiaddr) {
355 self.public_addrs.write().insert(addr);
356 }
357
358 pub(crate) fn connection_status(&self) -> ConnectionStatus {
359 self.peer_registry.read().connection_status()
360 }
361
362 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
364 let listened_addrs = self.listened_addrs.read();
365 self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
366 .into_iter()
367 .filter_map(|addr| {
368 if !listened_addrs.contains(&addr) {
369 Some((addr, 1))
370 } else {
371 None
372 }
373 })
374 .chain(listened_addrs.iter().map(|addr| (addr.clone(), 1)))
375 .map(|(addr, score)| (addr.to_string(), score))
376 .collect()
377 }
378
379 pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
380 self.dial_identify(p2p_control, address);
381 }
382
383 pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
385 self.protocols
386 .read()
387 .iter()
388 .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
389 .collect::<Vec<_>>()
390 }
391
392 pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
393 let peer_id = extract_peer_id(addr);
394 if peer_id.is_none() {
395 error!("Do not dial addr without peer id, addr: {}", addr);
396 return false;
397 }
398 let peer_id = peer_id.as_ref().unwrap();
399
400 if self.local_peer_id() == peer_id {
401 trace!("Do not dial self: {:?}, {}", peer_id, addr);
402 return false;
403 }
404 if self.public_addrs.read().contains(addr) {
405 trace!(
406 "Do not dial listened address(self): {:?}, {}",
407 peer_id, addr
408 );
409 return false;
410 }
411
412 let peer_in_registry = self.with_peer_registry(|reg| {
413 reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
414 });
415 if peer_in_registry {
416 trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
417 return false;
418 }
419
420 if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
421 trace!(
422 "Do not send repeated dial commands to network service: {:?}, {}",
423 peer_id, addr
424 );
425 if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
426 #[cfg(feature = "with_sentry")]
427 with_scope(
428 |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
429 || {
430 capture_message(
431 &format!(
432 "Dialing {:?}, {:?} for more than {} seconds, \
433 something is wrong in network service",
434 peer_id,
435 addr,
436 DIAL_HANG_TIMEOUT.as_secs(),
437 ),
438 Level::Warning,
439 )
440 },
441 );
442 }
443 return false;
444 }
445
446 true
447 }
448
449 pub(crate) fn dial_success(&self, addr: &Multiaddr) {
450 if let Some(peer_id) = extract_peer_id(addr) {
451 self.dialing_addrs.write().remove(&peer_id);
452 }
453 }
454
455 pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
456 self.with_peer_registry_mut(|reg| {
457 reg.remove_feeler(addr);
458 });
459
460 if let Some(peer_id) = extract_peer_id(addr) {
461 self.dialing_addrs.write().remove(&peer_id);
462 }
463 }
464
465 fn dial_inner(
468 &self,
469 p2p_control: &ServiceControl,
470 addr: Multiaddr,
471 target: TargetProtocol,
472 ) -> Result<(), Error> {
473 if !self.can_dial(&addr) {
474 return Err(Error::Dial(format!("ignore dialing addr {addr}")));
475 }
476
477 debug!("Dialing {addr}");
478 p2p_control.dial(addr.clone(), target)?;
479 self.dialing_addrs.write().insert(
480 extract_peer_id(&addr).expect("verified addr"),
481 Instant::now(),
482 );
483 Ok(())
484 }
485
486 pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
488 if let Err(err) = self.dial_inner(
489 p2p_control,
490 addr,
491 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
492 ) {
493 debug!("dial_identify error: {err}");
494 }
495 }
496
497 pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
499 if let Err(err) = self.dial_inner(
500 p2p_control,
501 addr.clone(),
502 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
503 ) {
504 debug!("dial_feeler error {err}");
505 } else {
506 self.with_peer_registry_mut(|reg| {
507 reg.add_feeler(&addr);
508 });
509 }
510 }
511
512 pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
514 let mut pending_observed_addrs = self.observed_addrs.write();
515 pending_observed_addrs.insert(session_id, addr);
516 }
517
518 pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
520 let observed_addrs = self
521 .observed_addrs
522 .read()
523 .values()
524 .cloned()
525 .collect::<HashSet<_>>();
526 if observed_addrs.len() <= count {
527 return observed_addrs.into_iter().collect();
528 } else {
529 observed_addrs
530 .into_iter()
531 .choose_multiple(&mut rand::thread_rng(), count)
532 }
533 }
534
535 pub fn is_active(&self) -> bool {
537 self.active.load(Ordering::Acquire)
538 }
539}
540
541pub struct EventHandler {
543 pub(crate) network_state: Arc<NetworkState>,
544}
545
546impl EventHandler {
547 pub fn new(network_state: Arc<NetworkState>) -> Self {
549 Self { network_state }
550 }
551}
552
553pub trait ExitHandler: Send + Unpin + 'static {
555 fn notify_exit(&self);
557}
558
559#[derive(Clone, Default)]
561pub struct DefaultExitHandler {
562 lock: Arc<Mutex<()>>,
563 exit: Arc<Condvar>,
564}
565
566impl DefaultExitHandler {
567 pub fn wait_for_exit(&self) {
569 self.exit.wait(&mut self.lock.lock());
570 }
571}
572
573impl ExitHandler for DefaultExitHandler {
574 fn notify_exit(&self) {
575 self.exit.notify_all();
576 }
577}
578
579impl EventHandler {
580 fn inbound_eviction(&self) -> Vec<PeerIndex> {
581 if self.network_state.config.bootnode_mode {
582 let status = self.network_state.connection_status();
583
584 if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
585 self.network_state
586 .with_peer_registry(|registry| {
587 registry
588 .peers()
589 .values()
590 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
591 .map(|peer| peer.session_id)
592 .collect::<Vec<SessionId>>()
593 })
594 .into_iter()
595 .enumerate()
596 .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
597 .collect()
598 } else {
599 Vec::new()
600 }
601 } else {
602 Vec::new()
603 }
604 }
605}
606
607#[async_trait]
608impl ServiceHandle for EventHandler {
609 async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
610 match error {
611 ServiceError::DialerError { address, error } => {
612 match error {
613 DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
614 SecioError::ConnectSelf,
615 )) => {
616 debug!("dial observed address success: {:?}", address);
617 }
618 DialerErrorKind::IoError(e)
619 if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
620 {
621 warn!("DialerError({}) {}", address, e);
622 }
623 DialerErrorKind::TransportError(e)
624 if matches!(&e, TransportErrorKind::ProxyError(_proxy_err)) =>
625 {
626 let err = e.to_string();
627 if err.contains("failed to establish connection to target:General failure")
628 || err.contains(
629 "failed to establish connection to target:Connection refused",
630 )
631 {
632 debug!("DialerError({}) {}", address, e);
633 } else {
634 error!("Is the proxy server down? DialerError({}) {}", address, e);
635 }
636 }
637 _ => {
638 debug!("DialerError({}) {}", address, error);
639 }
640 }
641 self.network_state.dial_failed(&address);
642 }
643 ServiceError::ProtocolError {
644 id,
645 proto_id,
646 error,
647 } => {
648 debug!("ProtocolError({}, {}) {}", id, proto_id, error);
649 let message = format!("ProtocolError id={proto_id}");
650 self.network_state.ban_session(
652 &context.control().clone().into(),
653 id,
654 Duration::from_secs(300),
655 message,
656 );
657 }
658 ServiceError::SessionTimeout { session_context } => {
659 debug!(
660 "SessionTimeout({}, {})",
661 session_context.id, session_context.address,
662 );
663 }
664 ServiceError::MuxerError {
665 session_context,
666 error,
667 } => {
668 debug!(
669 "MuxerError({}, {}), substream error {}, disconnect it",
670 session_context.id, session_context.address, error,
671 );
672 }
673 ServiceError::ListenError { address, error } => {
674 debug!("ListenError: address={:?}, error={:?}", address, error);
675 }
676 ServiceError::ProtocolSelectError {
677 proto_name,
678 session_context,
679 } => {
680 debug!(
681 "ProtocolSelectError: proto_name={:?}, session_id={}",
682 proto_name, session_context.id,
683 );
684 }
685 ServiceError::SessionBlocked { session_context } => {
686 debug!("SessionBlocked: {}", session_context.id);
687 }
688 ServiceError::ProtocolHandleError { proto_id, error } => {
689 debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
690
691 let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
692 {
693 if let Some(id) = opt_session_id {
694 self.network_state.ban_session(
695 &context.control().clone().into(),
696 id,
697 Duration::from_secs(300),
698 format!("protocol {proto_id} panic when process peer message"),
699 );
700 }
701 #[cfg(feature = "with_sentry")]
702 with_scope(
703 |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
704 || {
705 capture_message(
706 &format!(
707 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
708 ),
709 Level::Warning,
710 )
711 },
712 );
713 error!(
714 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
715 );
716
717 broadcast_exit_signals();
718 }
719 }
720 }
721 }
722
723 async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
724 match event {
726 ServiceEvent::SessionOpen { session_context } => {
727 debug!(
728 "SessionOpen({}, {})",
729 session_context.id, session_context.address,
730 );
731
732 self.network_state.dial_success(&session_context.address);
733
734 let iter = self.inbound_eviction();
735
736 let control = context.control().clone().into();
737
738 for peer in iter {
739 if let Err(err) =
740 disconnect_with_message(&control, peer, "bootnode random eviction")
741 {
742 debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
743 }
744 }
745
746 if self
747 .network_state
748 .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
749 {
750 debug!(
751 "Feeler connected {} => {}",
752 session_context.id, session_context.address,
753 );
754 } else {
755 match self.network_state.accept_peer(&session_context) {
756 Ok(Some(evicted_peer)) => {
757 debug!(
758 "Disconnect peer, {} => {}",
759 evicted_peer.session_id, evicted_peer.connected_addr,
760 );
761 if let Err(err) = disconnect_with_message(
762 &control,
763 evicted_peer.session_id,
764 "evict because accepted better peer",
765 ) {
766 debug!(
767 "Disconnect failed {:?}, error: {:?}",
768 evicted_peer.session_id, err
769 );
770 }
771 }
772 Ok(None) => debug!(
773 "{} open, registry {} success",
774 session_context.id, session_context.address,
775 ),
776 Err(err) => {
777 debug!(
778 "Peer registry failed {:?}. Disconnect {} => {}",
779 err, session_context.id, session_context.address,
780 );
781 if let Err(err) = disconnect_with_message(
782 &control,
783 session_context.id,
784 "reject peer connection",
785 ) {
786 debug!(
787 "Disconnect failed {:?}, error: {:?}",
788 session_context.id, err
789 );
790 }
791 }
792 }
793 }
794 }
795 ServiceEvent::SessionClose { session_context } => {
796 debug!(
797 "SessionClose({}, {})",
798 session_context.id, session_context.address,
799 );
800 let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
801 reg.remove_feeler(&session_context.address);
803 reg.remove_peer(session_context.id).is_some()
804 });
805 if peer_exists {
806 debug!(
807 "{} closed. Remove {} from peer_registry",
808 session_context.id, session_context.address,
809 );
810 self.network_state.with_peer_store_mut(|peer_store| {
811 peer_store.remove_disconnected_peer(&session_context.address);
812 });
813 }
814 self.network_state
815 .observed_addrs
816 .write()
817 .remove(&session_context.id);
818 }
819 _ => {
820 info!("p2p service event: {:?}", event);
821 }
822 }
823 }
824}
825
826pub struct NetworkService {
828 p2p_service: Service<EventHandler, SecioKeyPair>,
829 network_state: Arc<NetworkState>,
830 ping_controller: Option<Sender<()>>,
831 bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
833 version: String,
834}
835
836impl NetworkService {
837 pub fn new(
839 network_state: Arc<NetworkState>,
840 protocols: Vec<CKBProtocol>,
841 required_protocol_ids: Vec<ProtocolId>,
842 identify_announce: (String, String, Flags),
844 transport_type: TransportType,
845 ) -> Self {
846 let config = &network_state.config;
847
848 if config.support_protocols.iter().collect::<HashSet<_>>()
849 != default_support_all_protocols()
850 .iter()
851 .collect::<HashSet<_>>()
852 {
853 warn!(
854 "Customized supported protocols: {:?}",
855 config.support_protocols
856 );
857 }
858
859 let mut protocol_metas = protocols
861 .into_iter()
862 .map(CKBProtocol::build)
863 .collect::<Vec<_>>();
864
865 let identify_callback = IdentifyCallback::new(
869 Arc::clone(&network_state),
870 identify_announce.0,
871 identify_announce.1.clone(),
872 identify_announce.2,
873 );
874 let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
875 ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
876 });
877 protocol_metas.push(identify_meta);
878
879 let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
881 let ping_interval = Duration::from_secs(config.ping_interval_secs);
882 let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
883
884 let ping_network_state = Arc::clone(&network_state);
885 let (ping_handler, ping_controller) =
886 PingHandler::new(ping_interval, ping_timeout, ping_network_state);
887 let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
888 ProtocolHandle::Callback(Box::new(ping_handler))
889 });
890 protocol_metas.push(ping_meta);
891 Some(ping_controller)
892 } else {
893 None
894 };
895
896 if config
898 .support_protocols
899 .contains(&SupportProtocol::Discovery)
900 {
901 let addr_mgr = DiscoveryAddressManager {
902 network_state: Arc::clone(&network_state),
903 discovery_local_address: config.discovery_local_address,
904 };
905 let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
906 ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
907 addr_mgr,
908 config
909 .discovery_announce_check_interval_secs
910 .map(Duration::from_secs),
911 )))
912 });
913 protocol_metas.push(disc_meta);
914 }
915
916 if config.support_protocols.contains(&SupportProtocol::Feeler) {
918 let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
919 let network_state = Arc::clone(&network_state);
920 move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
921 });
922 protocol_metas.push(feeler_meta);
923 }
924
925 if config
927 .support_protocols
928 .contains(&SupportProtocol::DisconnectMessage)
929 {
930 let disconnect_message_state = Arc::clone(&network_state);
931 let disconnect_message_meta = SupportProtocols::DisconnectMessage
932 .build_meta_with_service_handle(move || {
933 ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
934 disconnect_message_state,
935 )))
936 });
937 protocol_metas.push(disconnect_message_meta);
938 }
939
940 #[cfg(not(target_family = "wasm"))]
942 if config
943 .support_protocols
944 .contains(&SupportProtocol::HolePunching)
945 {
946 let hole_punching_state = Arc::clone(&network_state);
947 let hole_punching_meta =
948 SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
949 ProtocolHandle::Callback(Box::new(
950 crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
951 ))
952 });
953 protocol_metas.push(hole_punching_meta);
954 }
955
956 let mut service_builder = ServiceBuilder::default();
957 let yamux_config = YamuxConfig {
958 max_stream_count: protocol_metas.len(),
959 max_stream_window_size: 1024 * 1024,
960 ..Default::default()
961 };
962 for meta in protocol_metas.into_iter() {
963 network_state
964 .protocols
965 .write()
966 .push((meta.id(), meta.name(), meta.support_versions()));
967 service_builder = service_builder.insert_protocol(meta);
968 }
969 let event_handler = EventHandler {
970 network_state: Arc::clone(&network_state),
971 };
972 service_builder = service_builder
973 .handshake_type(network_state.local_private_key.clone().into())
974 .yamux_config(yamux_config)
975 .forever(true)
976 .max_connection_number(1024)
977 .set_send_buffer_size(config.max_send_buffer())
978 .set_channel_size(config.channel_size())
979 .timeout(Duration::from_secs(5))
980 .onion_timeout(Duration::from_secs(120))
981 .trusted_proxies(config.trusted_proxies.clone());
982
983 #[cfg(not(target_family = "wasm"))]
984 {
985 service_builder = service_builder.upnp(config.upnp);
986
987 if let Some(proxy_url) = &config.proxy.proxy_url {
990 service_builder = service_builder
991 .tcp_proxy_config(proxy_url)
992 .tcp_proxy_random_auth(config.proxy.proxy_random_auth);
993 info!(
994 "set tcp_proxy_config: {:?}, proxy_random_auth: {}",
995 config.proxy.proxy_url.clone(),
996 config.proxy.proxy_random_auth
997 );
998 };
999
1000 let onion_proxy_url = {
1001 config.onion.onion_server.clone().map(|onion_server| {
1002 if !onion_server.starts_with("socks5://") {
1003 format!("socks5://{}", onion_server)
1004 } else {
1005 onion_server
1006 }
1007 })
1008 };
1009 if let Some(onion_proxy_url) = onion_proxy_url {
1010 info!("set tcp_onion_config: {:?}", onion_proxy_url);
1011 service_builder = service_builder.tcp_onion_config(&onion_proxy_url);
1012 }
1013 }
1014
1015 #[cfg(target_os = "linux")]
1016 let p2p_service = {
1017 if config.reuse_port_on_linux {
1018 let iter = config.listen_addresses.iter();
1019
1020 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1021 enum BindType {
1022 None,
1023 Ws,
1024 Tcp,
1025 Both,
1026 }
1027 impl BindType {
1028 fn transform(&mut self, other: TransportType) {
1029 match (&self, other) {
1030 (BindType::None, TransportType::Ws) => *self = BindType::Ws,
1031 (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
1032 (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
1033 (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
1034 _ => (),
1035 }
1036 }
1037
1038 fn is_ready(&self) -> bool {
1039 matches!(self, BindType::Both)
1041 }
1042 }
1043
1044 let mut init = BindType::None;
1045
1046 let proxy_config_enable =
1047 config.proxy.proxy_url.is_some() || config.onion.onion_server.is_some();
1048
1049 let bind_fn_with_addr =
1050 move |socket: p2p::service::TcpSocket,
1051 ctxt: p2p::service::TransformerContext,
1052 addr: std::net::SocketAddr| {
1053 let socket_ref = socket2::SockRef::from(&socket);
1054 #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
1055 socket_ref.set_reuse_port(true)?;
1056 socket_ref.set_reuse_address(true)?;
1057 match ctxt.state {
1058 p2p::service::SocketState::Listen => Ok(socket),
1059 p2p::service::SocketState::Dial => {
1060 let domain = socket2::Domain::for_address(addr);
1061 if socket_ref.domain()? == domain {
1062 if proxy_config_enable {
1063 debug!("skip bind since proxy is enabled");
1065 } else {
1066 socket_ref.bind(&addr.into())?;
1067 }
1068 }
1069 Ok(socket)
1070 }
1071 }
1072 };
1073
1074 for multi_addr in iter {
1075 if init.is_ready() {
1076 break;
1077 }
1078
1079 match find_type(multi_addr) {
1080 TransportType::Tcp => {
1081 if matches!(init, BindType::Tcp) {
1083 continue;
1084 }
1085 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1086 let bind_fn = move |socket: p2p::service::TcpSocket,
1087 ctxt: p2p::service::TransformerContext| {
1088 bind_fn_with_addr(socket, ctxt, addr)
1089 };
1090 init.transform(TransportType::Tcp);
1091 service_builder = service_builder.tcp_config(bind_fn);
1092 }
1093 }
1094 TransportType::Ws | TransportType::Wss => {
1095 if matches!(init, BindType::Ws) {
1097 continue;
1098 }
1099 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1100 let bind_fn = move |socket: p2p::service::TcpSocket,
1101 ctxt: p2p::service::TransformerContext| {
1102 bind_fn_with_addr(socket, ctxt, addr)
1103 };
1104 init.transform(TransportType::Ws);
1105 service_builder = service_builder.tcp_config_on_ws(bind_fn);
1106 }
1107 }
1108 }
1109 }
1110 }
1111
1112 service_builder.build(event_handler)
1113 };
1114
1115 #[cfg(not(target_os = "linux"))]
1116 let p2p_service = service_builder.build(event_handler);
1123
1124 let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1126 let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1127 Arc::clone(&network_state),
1128 p2p_service.control().to_owned().into(),
1129 required_protocol_ids,
1130 );
1131 let mut bg_services = vec![
1132 Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1133 Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1134 ];
1135 if config.outbound_peer_service_enabled() {
1136 let outbound_peer_service = OutboundPeerService::new(
1137 Arc::clone(&network_state),
1138 p2p_service.control().to_owned().into(),
1139 Duration::from_secs(config.connect_outbound_interval_secs),
1140 transport_type,
1141 );
1142 bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1143 };
1144
1145 #[cfg(feature = "with_dns_seeding")]
1146 if config.dns_seeding_service_enabled() {
1147 let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1148 Arc::clone(&network_state),
1149 config.dns_seeds.clone(),
1150 );
1151 bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1152 };
1153
1154 NetworkService {
1155 p2p_service,
1156 network_state,
1157 ping_controller,
1158 bg_services,
1159 version: identify_announce.1,
1160 }
1161 }
1162
1163 pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1165 let config = self.network_state.config.clone();
1166
1167 let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1168
1169 for addr in self.network_state.config.whitelist_peers() {
1171 debug!("Dial whitelist_peers {:?}", addr);
1172 self.network_state.dial_identify(&p2p_control, addr);
1173 }
1174
1175 let target = &self.network_state.required_flags;
1176
1177 let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1180 let count = max((config.max_outbound_peers >> 1) as usize, 1);
1181 let mut addrs: Vec<_> = peer_store
1182 .fetch_addrs_to_attempt(count, *target, |_| true)
1183 .into_iter()
1184 .map(|paddr| paddr.addr)
1185 .collect();
1186 let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1188 addrs.extend(anchors);
1189 let bootnodes = self
1191 .network_state
1192 .bootnodes
1193 .iter()
1194 .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1195 .into_iter()
1196 .cloned();
1197 addrs.extend(bootnodes);
1198 addrs
1199 });
1200
1201 for addr in bootnodes {
1203 debug!("Dial bootnode {:?}", addr);
1204 self.network_state.dial_identify(&p2p_control, addr);
1205 }
1206
1207 let Self {
1208 mut p2p_service,
1209 network_state,
1210 ping_controller,
1211 bg_services,
1212 version,
1213 } = self;
1214
1215 let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1217 .into_iter()
1218 .map(|bg_service| {
1219 let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1220 (signal_sender, (bg_service, signal_receiver))
1221 })
1222 .unzip();
1223
1224 let receiver: CancellationToken = new_tokio_exit_rx();
1225 #[cfg(not(target_family = "wasm"))]
1226 let (start_sender, start_receiver) = mpsc::channel();
1227 {
1228 #[cfg(not(target_family = "wasm"))]
1229 let network_state = Arc::clone(&network_state);
1230 let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1231 handle.spawn_task(async move {
1232 #[cfg(not(target_family = "wasm"))]
1233 {
1234 let listen_addresses = {
1235 let mut addresses = config.listen_addresses.clone();
1236 if config.reuse_tcp_with_ws {
1237 let ws_listens = addresses
1238 .iter()
1239 .cloned()
1240 .filter_map(|mut addr| {
1241 if matches!(find_type(&addr), TransportType::Tcp) {
1242 addr.push(Protocol::Ws);
1243 Some(addr)
1244 } else {
1245 None
1246 }
1247 })
1248 .collect::<Vec<_>>();
1249
1250 addresses.extend(ws_listens);
1251 }
1252 let mut addresses = addresses
1253 .into_iter()
1254 .collect::<HashSet<_>>()
1255 .into_iter()
1256 .collect::<Vec<_>>();
1257 addresses.sort_by(|a, b| {
1258 let ty_a = find_type(a);
1259 let ty_b = find_type(b);
1260
1261 ty_a.cmp(&ty_b)
1262 });
1263
1264 addresses
1265 };
1266
1267 for addr in &listen_addresses {
1268 match p2p_service.listen(addr.to_owned()).await {
1269 Ok(listen_address) => {
1270 info!("Listen on address: {}", listen_address);
1271 network_state
1272 .listened_addrs
1273 .write()
1274 .push(listen_address.clone());
1275 }
1276 Err(err) => {
1277 warn!(
1278 "Listen on address {} failed, due to error: {}",
1279 addr.clone(),
1280 err
1281 );
1282 start_sender
1283 .send(Err(Error::P2P(P2PError::Transport(err))))
1284 .expect("channel abnormal shutdown");
1285 return;
1286 }
1287 };
1288 }
1289 start_sender.send(Ok(())).unwrap();
1290 }
1291
1292 p2p::runtime::spawn(async move { p2p_service.run().await });
1293 tokio::select! {
1294 _ = receiver.cancelled() => {
1295 info!("NetworkService receive exit signal, start shutdown...");
1296 let _ = p2p_control.shutdown().await;
1297 drop(bg_signals);
1299
1300 info!("NetworkService shutdown now");
1301 },
1302 else => {
1303 let _ = p2p_control.shutdown().await;
1304 drop(bg_signals);
1306 },
1307 }
1308 });
1309 }
1310 for (mut service, mut receiver) in bg_receivers {
1311 handle.spawn_task(async move {
1312 loop {
1313 tokio::select! {
1314 _ = &mut service => {},
1315 _ = &mut receiver => break
1316 }
1317 }
1318 });
1319 }
1320 #[cfg(not(target_family = "wasm"))]
1321 if let Ok(Err(e)) = start_receiver.recv() {
1322 return Err(e);
1323 }
1324
1325 Ok(NetworkController {
1326 version,
1327 network_state,
1328 p2p_control,
1329 ping_controller,
1330 })
1331 }
1332}
1333
1334#[derive(Clone)]
1336pub struct NetworkController {
1337 version: String,
1338 network_state: Arc<NetworkState>,
1339 p2p_control: ServiceControl,
1340 ping_controller: Option<Sender<()>>,
1341}
1342
1343impl NetworkController {
1344 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1346 self.network_state.public_urls(max_urls)
1347 }
1348
1349 pub fn version(&self) -> &String {
1351 &self.version
1352 }
1353
1354 pub fn node_id(&self) -> String {
1356 self.network_state.node_id()
1357 }
1358
1359 pub fn p2p_control(&self) -> &ServiceControl {
1361 &self.p2p_control
1362 }
1363
1364 pub fn async_p2p_control(&self) -> ServiceAsyncControl {
1366 self.p2p_control.clone().into()
1367 }
1368
1369 pub fn add_node(&self, address: Multiaddr) {
1371 self.network_state.add_node(&self.p2p_control, address)
1372 }
1373
1374 pub fn add_public_addr(&self, public_addr: Multiaddr) {
1376 self.network_state.add_public_addr(public_addr)
1377 }
1378
1379 pub fn remove_node(&self, peer_id: &PeerId) {
1381 if let Some(session_id) = self
1382 .network_state
1383 .peer_registry
1384 .read()
1385 .get_key_by_peer_id(peer_id)
1386 {
1387 if let Err(err) =
1388 disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1389 {
1390 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1391 }
1392 } else {
1393 error!("Cannot find peer {:?}", peer_id);
1394 }
1395 }
1396
1397 pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1399 self.network_state
1400 .peer_store
1401 .lock()
1402 .ban_list()
1403 .get_banned_addrs()
1404 }
1405
1406 pub fn clear_banned_addrs(&self) {
1408 self.network_state.peer_store.lock().clear_ban_list();
1409 }
1410
1411 pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1413 self.network_state
1414 .peer_store
1415 .lock()
1416 .addr_manager()
1417 .get(addr)
1418 .cloned()
1419 }
1420
1421 pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1423 self.disconnect_peers_in_ip_range(address, &ban_reason);
1424 self.network_state
1425 .peer_store
1426 .lock()
1427 .ban_network(address, ban_until, ban_reason)
1428 }
1429
1430 pub fn unban(&self, address: &IpNetwork) {
1432 self.network_state
1433 .peer_store
1434 .lock()
1435 .mut_ban_list()
1436 .unban_network(address);
1437 }
1438
1439 pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1441 self.network_state.with_peer_registry(|reg| {
1442 reg.peers()
1443 .iter()
1444 .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1445 .collect::<Vec<_>>()
1446 })
1447 }
1448
1449 pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1451 self.network_state
1452 .ban_session(&self.p2p_control, peer_index, duration, reason);
1453 }
1454
1455 fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1457 self.network_state.with_peer_registry(|reg| {
1458 reg.peers().iter().for_each(|(peer_index, peer)| {
1459 if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr)
1460 && address.contains(addr.ip())
1461 {
1462 let _ = disconnect_with_message(
1463 &self.p2p_control,
1464 *peer_index,
1465 &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1466 );
1467 }
1468 })
1469 });
1470 }
1471
1472 fn try_broadcast(
1473 &self,
1474 quick: bool,
1475 target: Option<SessionId>,
1476 proto_id: ProtocolId,
1477 data: Bytes,
1478 ) -> Result<(), SendErrorKind> {
1479 let now = Instant::now();
1480 loop {
1481 let target = target
1482 .map(TargetSession::Single)
1483 .unwrap_or(TargetSession::All);
1484 let result = if quick {
1485 self.p2p_control
1486 .quick_filter_broadcast(target, proto_id, data.clone())
1487 } else {
1488 self.p2p_control
1489 .filter_broadcast(target, proto_id, data.clone())
1490 };
1491 match result {
1492 Ok(()) => {
1493 return Ok(());
1494 }
1495 Err(SendErrorKind::WouldBlock) => {
1496 if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1497 warn!("Broadcast message to {} timeout", proto_id);
1498 return Err(SendErrorKind::WouldBlock);
1499 }
1500 thread::sleep(P2P_TRY_SEND_INTERVAL);
1501 }
1502 Err(err) => {
1503 warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1504 return Err(err);
1505 }
1506 }
1507 }
1508 }
1509
1510 fn broadcast_inner<S: Spawn>(
1511 &self,
1512 quick: bool,
1513 target: TargetSession,
1514 proto_id: ProtocolId,
1515 data: Bytes,
1516 handle: &S,
1517 ) {
1518 let async_control: ServiceAsyncControl = self.p2p_control.clone().into();
1519
1520 handle.spawn_task(async move {
1521 if quick {
1522 let _ignore = async_control
1523 .quick_filter_broadcast(target, proto_id, data)
1524 .await;
1525 } else {
1526 let _ignore = async_control.filter_broadcast(target, proto_id, data).await;
1527 }
1528 })
1529 }
1530
1531 pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1533 self.try_broadcast(false, None, proto_id, data)
1534 }
1535
1536 pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1538 self.try_broadcast(true, None, proto_id, data)
1539 }
1540
1541 pub fn send_message_to(
1543 &self,
1544 session_id: SessionId,
1545 proto_id: ProtocolId,
1546 data: Bytes,
1547 ) -> Result<(), SendErrorKind> {
1548 self.try_broadcast(false, Some(session_id), proto_id, data)
1549 }
1550
1551 pub fn broadcast_with_handle<S: Spawn>(&self, proto_id: ProtocolId, data: Bytes, handle: &S) {
1553 self.broadcast_inner(false, TargetSession::All, proto_id, data, handle)
1554 }
1555
1556 pub fn quick_broadcast_with_handle<S: Spawn>(
1558 &self,
1559 proto_id: ProtocolId,
1560 data: Bytes,
1561 handle: &S,
1562 ) {
1563 self.broadcast_inner(true, TargetSession::All, proto_id, data, handle)
1564 }
1565
1566 pub fn send_message_to_with_handle<S: Spawn>(
1568 &self,
1569 session_id: SessionId,
1570 proto_id: ProtocolId,
1571 data: Bytes,
1572 handle: &S,
1573 ) {
1574 self.broadcast_inner(
1575 false,
1576 TargetSession::Single(session_id),
1577 proto_id,
1578 data,
1579 handle,
1580 )
1581 }
1582
1583 pub fn is_active(&self) -> bool {
1585 self.network_state.is_active()
1586 }
1587
1588 pub fn set_active(&self, active: bool) {
1590 self.network_state.active.store(active, Ordering::Release);
1591 }
1592
1593 pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1595 self.network_state.protocols.read().clone()
1596 }
1597
1598 pub fn ping_peers(&self) {
1600 if let Some(mut ping_controller) = self.ping_controller.clone() {
1601 let _ignore = ping_controller.try_send(());
1602 }
1603 }
1604}
1605
1606pub(crate) fn disconnect_with_message(
1608 control: &ServiceControl,
1609 peer_index: SessionId,
1610 message: &str,
1611) -> Result<(), SendErrorKind> {
1612 if !message.is_empty() {
1613 let data = Bytes::from(message.as_bytes().to_vec());
1614 control.quick_send_message_to(
1616 peer_index,
1617 SupportProtocols::DisconnectMessage.protocol_id(),
1618 data,
1619 )?;
1620 }
1621 control.disconnect(peer_index)
1622}
1623
1624pub(crate) async fn async_disconnect_with_message(
1625 control: &ServiceAsyncControl,
1626 peer_index: SessionId,
1627 message: &str,
1628) -> Result<(), SendErrorKind> {
1629 if !message.is_empty() {
1630 let data = Bytes::from(message.as_bytes().to_vec());
1631 control
1633 .quick_send_message_to(
1634 peer_index,
1635 SupportProtocols::DisconnectMessage.protocol_id(),
1636 data,
1637 )
1638 .await?;
1639 }
1640 control.disconnect(peer_index).await
1641}
1642
1643#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1645pub enum TransportType {
1646 Tcp,
1648 Ws,
1650 Wss,
1652}
1653
1654#[allow(dead_code)]
1655pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1656 let mut iter = addr.iter();
1657
1658 iter.find_map(|proto| match proto {
1659 Protocol::Ws => Some(TransportType::Ws),
1660 Protocol::Wss => Some(TransportType::Wss),
1661 _ => None,
1662 })
1663 .unwrap_or(TransportType::Tcp)
1664}