ckb_network/
network.rs

1//! Global state struct and start function
2use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7    PeerStore,
8    types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11    disconnect_message::DisconnectMessageProtocol,
12    discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13    feeler::Feeler,
14    identify::{Flags, IdentifyCallback, IdentifyProtocol},
15    ping::PingHandler,
16    support_protocols::SupportProtocols,
17};
18use crate::services::{
19    dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20    protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{Future, channel::mpsc::Sender};
30use ipnetwork::IpNetwork;
31use p2p::{
32    SessionId, async_trait,
33    builder::ServiceBuilder,
34    bytes::Bytes,
35    context::{ServiceContext, SessionContext},
36    error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37    multiaddr::{Multiaddr, Protocol},
38    secio::{self, PeerId, SecioKeyPair, error::SecioError},
39    service::{
40        ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41        TargetSession,
42    },
43    traits::ServiceHandle,
44    utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45    yamux::config::Config as YamuxConfig,
46};
47use rand::prelude::IteratorRandom;
48#[cfg(feature = "with_sentry")]
49use sentry::{Level, capture_message, with_scope};
50#[cfg(not(target_family = "wasm"))]
51use std::sync::mpsc;
52use std::{
53    borrow::Cow,
54    cmp::max,
55    collections::{HashMap, HashSet},
56    pin::Pin,
57    sync::{
58        Arc,
59        atomic::{AtomicBool, Ordering},
60    },
61    thread,
62};
63use tokio::{self, sync::oneshot};
64
65const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
66const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
67// After 5 minutes we consider this dial hang
68const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
69
70/// The global shared state of the network module
71pub struct NetworkState {
72    pub(crate) peer_registry: RwLock<PeerRegistry>,
73    pub(crate) peer_store: Mutex<PeerStore>,
74    /// Node listened addresses
75    pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
76    dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77    /// Node public addresses by config
78    public_addrs: HashSet<Multiaddr>,
79    observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
80    local_private_key: secio::SecioKeyPair,
81    local_peer_id: PeerId,
82    pub(crate) bootnodes: Vec<Multiaddr>,
83    pub(crate) config: NetworkConfig,
84    pub(crate) active: AtomicBool,
85    /// Node supported protocols
86    /// fields: ProtocolId, Protocol Name, Supported Versions
87    pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
88    pub(crate) required_flags: Flags,
89}
90
91impl NetworkState {
92    /// Init from config
93    #[cfg(not(target_family = "wasm"))]
94    pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
95        config.create_dir_if_not_exists()?;
96        let local_private_key = config.fetch_private_key()?;
97        let local_peer_id = local_private_key.peer_id();
98        // set max score to public addresses
99        let public_addrs: HashSet<Multiaddr> = config
100            .listen_addresses
101            .iter()
102            .chain(config.public_addresses.iter())
103            .cloned()
104            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
105                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
106                _ => {
107                    match extract_peer_id(&addr) {
108                        Some(peer_id) if peer_id != local_peer_id => {
109                            error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
110                            std::process::exit(1);
111                        }
112                        Some(_) => (),
113                        None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
114                    }
115                    Some(addr)
116                }
117            })
118            .collect();
119        info!("Loading the peer store. This process may take a few seconds to complete.");
120
121        let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
122            config.peer_store_path(),
123        ));
124        let bootnodes = config.bootnodes();
125
126        let peer_registry = PeerRegistry::new(
127            config.max_inbound_peers(),
128            config.max_outbound_peers(),
129            config.whitelist_only,
130            config.whitelist_peers(),
131            config.disable_block_relay_only_connection,
132        );
133
134        Ok(NetworkState {
135            peer_store,
136            config,
137            bootnodes,
138            peer_registry: RwLock::new(peer_registry),
139            dialing_addrs: RwLock::new(HashMap::default()),
140            public_addrs,
141            listened_addrs: RwLock::new(Vec::new()),
142            observed_addrs: RwLock::new(HashMap::default()),
143            local_private_key,
144            local_peer_id,
145            active: AtomicBool::new(true),
146            protocols: RwLock::new(Vec::new()),
147            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
148        })
149    }
150
151    #[cfg(target_family = "wasm")]
152    pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
153        let local_private_key = config.fetch_private_key()?;
154        let local_peer_id = local_private_key.peer_id();
155        // set max score to public addresses
156        let public_addrs: HashSet<Multiaddr> = config
157            .listen_addresses
158            .iter()
159            .chain(config.public_addresses.iter())
160            .cloned()
161            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
162                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
163                _ => {
164                    if extract_peer_id(&addr).is_none() {
165                        addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
166                    }
167                    Some(addr)
168                }
169            })
170            .collect();
171        info!("Loading the peer store. This process may take a few seconds to complete.");
172        let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
173        let bootnodes = config.bootnodes();
174
175        let peer_registry = PeerRegistry::new(
176            config.max_inbound_peers(),
177            config.max_outbound_peers(),
178            config.whitelist_only,
179            config.whitelist_peers(),
180            config.disable_block_relay_only_connection,
181        );
182        Ok(NetworkState {
183            peer_store,
184            config,
185            bootnodes,
186            peer_registry: RwLock::new(peer_registry),
187            dialing_addrs: RwLock::new(HashMap::default()),
188            public_addrs,
189            listened_addrs: RwLock::new(Vec::new()),
190            observed_addrs: RwLock::new(HashMap::default()),
191            local_private_key,
192            local_peer_id,
193            active: AtomicBool::new(true),
194            protocols: RwLock::new(Vec::new()),
195            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
196        })
197    }
198
199    /// use to discovery get nodes message to announce what kind of node information need from the other peer
200    /// default with `Flags::SYNC | Flags::DISCOVERY | Flags::RELAY`
201    pub fn required_flags(mut self, flags: Flags) -> Self {
202        self.required_flags = flags;
203        self
204    }
205
206    pub(crate) fn report_session(
207        &self,
208        p2p_control: &ServiceControl,
209        session_id: SessionId,
210        behaviour: Behaviour,
211    ) {
212        if let Some(addr) = self.with_peer_registry(|reg| {
213            reg.get_peer(session_id)
214                .filter(|peer| !peer.is_whitelist)
215                .map(|peer| peer.connected_addr.clone())
216        }) {
217            trace!("Report {:?} because {:?}", addr, behaviour);
218            let report_result = self.peer_store.lock().report(&addr, behaviour);
219            if report_result.is_banned() {
220                if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
221                    debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
222                }
223            }
224        } else {
225            debug!(
226                "Report {} failure: not found in peer registry or it is on the whitelist",
227                session_id
228            );
229        }
230    }
231
232    pub(crate) fn ban_session(
233        &self,
234        p2p_control: &ServiceControl,
235        session_id: SessionId,
236        duration: Duration,
237        reason: String,
238    ) {
239        if let Some(addr) = self.with_peer_registry(|reg| {
240            reg.get_peer(session_id)
241                .filter(|peer| !peer.is_whitelist)
242                .map(|peer| peer.connected_addr.clone())
243        }) {
244            info!(
245                "Ban peer {:?} for {} seconds, reason: {}",
246                addr,
247                duration.as_secs(),
248                reason
249            );
250            if let Some(metrics) = ckb_metrics::handle() {
251                metrics.ckb_network_ban_peer.inc();
252            }
253            if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
254                let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
255                self.peer_store.lock().ban_addr(
256                    &peer.connected_addr,
257                    duration.as_millis() as u64,
258                    reason,
259                );
260                if let Err(err) =
261                    disconnect_with_message(p2p_control, peer.session_id, message.as_str())
262                {
263                    debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
264                }
265            }
266        } else {
267            debug!(
268                "Ban session({}) failed: not found in peer registry or it is on the whitelist",
269                session_id
270            );
271        }
272    }
273
274    pub(crate) fn accept_peer(
275        &self,
276        session_context: &SessionContext,
277    ) -> Result<Option<Peer>, Error> {
278        // NOTE: be careful, here easy cause a deadlock,
279        //    because peer_store's lock scope across peer_registry's lock scope
280        let mut peer_store = self.peer_store.lock();
281        let accept_peer_result = {
282            self.peer_registry.write().accept_peer(
283                session_context.address.clone(),
284                session_context.id,
285                session_context.ty,
286                &mut peer_store,
287            )
288        };
289        accept_peer_result
290    }
291
292    /// For restrict lock in inner scope
293    pub fn with_peer_registry<F, T>(&self, callback: F) -> T
294    where
295        F: FnOnce(&PeerRegistry) -> T,
296    {
297        callback(&self.peer_registry.read())
298    }
299
300    // For restrict lock in inner scope
301    pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
302    where
303        F: FnOnce(&mut PeerRegistry) -> T,
304    {
305        callback(&mut self.peer_registry.write())
306    }
307
308    // For restrict lock in inner scope
309    pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
310    where
311        F: FnOnce(&mut PeerStore) -> T,
312    {
313        callback(&mut self.peer_store.lock())
314    }
315
316    /// Get peer id of local node
317    pub fn local_peer_id(&self) -> &PeerId {
318        &self.local_peer_id
319    }
320
321    /// Use on test
322    pub fn local_private_key(&self) -> &secio::SecioKeyPair {
323        &self.local_private_key
324    }
325
326    /// Get local node's peer id in base58 format string
327    pub fn node_id(&self) -> String {
328        self.local_peer_id().to_base58()
329    }
330
331    pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
332        if self.public_addrs.len() <= count {
333            return self.public_addrs.iter().cloned().collect();
334        } else {
335            self.public_addrs
336                .iter()
337                .cloned()
338                .choose_multiple(&mut rand::thread_rng(), count)
339        }
340    }
341
342    pub(crate) fn connection_status(&self) -> ConnectionStatus {
343        self.peer_registry.read().connection_status()
344    }
345
346    /// Get local node's listen address list
347    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
348        let listened_addrs = self.listened_addrs.read();
349        self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
350            .into_iter()
351            .filter_map(|addr| {
352                if !listened_addrs.contains(&addr) {
353                    Some((addr, 1))
354                } else {
355                    None
356                }
357            })
358            .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
359            .map(|(addr, score)| (addr.to_string(), score))
360            .collect()
361    }
362
363    pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
364        self.dial_identify(p2p_control, address);
365    }
366
367    /// use a filter to get protocol id list
368    pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
369        self.protocols
370            .read()
371            .iter()
372            .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
373            .collect::<Vec<_>>()
374    }
375
376    pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
377        let peer_id = extract_peer_id(addr);
378        if peer_id.is_none() {
379            error!("Do not dial addr without peer id, addr: {}", addr);
380            return false;
381        }
382        let peer_id = peer_id.as_ref().unwrap();
383
384        if self.local_peer_id() == peer_id {
385            trace!("Do not dial self: {:?}, {}", peer_id, addr);
386            return false;
387        }
388        if self.public_addrs.contains(addr) {
389            trace!(
390                "Do not dial listened address(self): {:?}, {}",
391                peer_id, addr
392            );
393            return false;
394        }
395
396        let peer_in_registry = self.with_peer_registry(|reg| {
397            reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
398        });
399        if peer_in_registry {
400            trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
401            return false;
402        }
403
404        if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
405            trace!(
406                "Do not send repeated dial commands to network service: {:?}, {}",
407                peer_id, addr
408            );
409            if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
410                #[cfg(feature = "with_sentry")]
411                with_scope(
412                    |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
413                    || {
414                        capture_message(
415                            &format!(
416                                "Dialing {:?}, {:?} for more than {} seconds, \
417                                 something is wrong in network service",
418                                peer_id,
419                                addr,
420                                DIAL_HANG_TIMEOUT.as_secs(),
421                            ),
422                            Level::Warning,
423                        )
424                    },
425                );
426            }
427            return false;
428        }
429
430        true
431    }
432
433    pub(crate) fn dial_success(&self, addr: &Multiaddr) {
434        if let Some(peer_id) = extract_peer_id(addr) {
435            self.dialing_addrs.write().remove(&peer_id);
436        }
437    }
438
439    pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
440        self.with_peer_registry_mut(|reg| {
441            reg.remove_feeler(addr);
442        });
443
444        if let Some(peer_id) = extract_peer_id(addr) {
445            self.dialing_addrs.write().remove(&peer_id);
446        }
447    }
448
449    /// Dial
450    /// return value indicates the dialing is actually sent or denied.
451    fn dial_inner(
452        &self,
453        p2p_control: &ServiceControl,
454        addr: Multiaddr,
455        target: TargetProtocol,
456    ) -> Result<(), Error> {
457        if !self.can_dial(&addr) {
458            return Err(Error::Dial(format!("ignore dialing addr {addr}")));
459        }
460
461        debug!("Dialing {addr}");
462        p2p_control.dial(addr.clone(), target)?;
463        self.dialing_addrs.write().insert(
464            extract_peer_id(&addr).expect("verified addr"),
465            Instant::now(),
466        );
467        Ok(())
468    }
469
470    /// Dial just identify protocol
471    pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
472        if let Err(err) = self.dial_inner(
473            p2p_control,
474            addr,
475            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
476        ) {
477            debug!("dial_identify error: {err}");
478        }
479    }
480
481    /// Dial just feeler protocol
482    pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
483        if let Err(err) = self.dial_inner(
484            p2p_control,
485            addr.clone(),
486            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
487        ) {
488            debug!("dial_feeler error {err}");
489        } else {
490            self.with_peer_registry_mut(|reg| {
491                reg.add_feeler(&addr);
492            });
493        }
494    }
495
496    /// add observed address for identify protocol
497    pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
498        let mut pending_observed_addrs = self.observed_addrs.write();
499        pending_observed_addrs.insert(session_id, addr);
500    }
501
502    // randomly select count addresses from observed_addrs
503    pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
504        let observed_addrs = self
505            .observed_addrs
506            .read()
507            .values()
508            .cloned()
509            .collect::<HashSet<_>>();
510        if observed_addrs.len() <= count {
511            return observed_addrs.into_iter().collect();
512        } else {
513            observed_addrs
514                .into_iter()
515                .choose_multiple(&mut rand::thread_rng(), count)
516        }
517    }
518
519    /// Network message processing controller, default is true, if false, discard any received messages
520    pub fn is_active(&self) -> bool {
521        self.active.load(Ordering::Acquire)
522    }
523}
524
525/// Used to handle global events of tentacle, such as session open/close
526pub struct EventHandler {
527    pub(crate) network_state: Arc<NetworkState>,
528}
529
530impl EventHandler {
531    /// init an event handler
532    pub fn new(network_state: Arc<NetworkState>) -> Self {
533        Self { network_state }
534    }
535}
536
537/// Exit trait used to notify all other module to exit
538pub trait ExitHandler: Send + Unpin + 'static {
539    /// notify other module to exit
540    fn notify_exit(&self);
541}
542
543/// Default exit handle
544#[derive(Clone, Default)]
545pub struct DefaultExitHandler {
546    lock: Arc<Mutex<()>>,
547    exit: Arc<Condvar>,
548}
549
550impl DefaultExitHandler {
551    /// Block on current thread util exit notify
552    pub fn wait_for_exit(&self) {
553        self.exit.wait(&mut self.lock.lock());
554    }
555}
556
557impl ExitHandler for DefaultExitHandler {
558    fn notify_exit(&self) {
559        self.exit.notify_all();
560    }
561}
562
563impl EventHandler {
564    fn inbound_eviction(&self) -> Vec<PeerIndex> {
565        if self.network_state.config.bootnode_mode {
566            let status = self.network_state.connection_status();
567
568            if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
569                self.network_state
570                    .with_peer_registry(|registry| {
571                        registry
572                            .peers()
573                            .values()
574                            .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
575                            .map(|peer| peer.session_id)
576                            .collect::<Vec<SessionId>>()
577                    })
578                    .into_iter()
579                    .enumerate()
580                    .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
581                    .collect()
582            } else {
583                Vec::new()
584            }
585        } else {
586            Vec::new()
587        }
588    }
589}
590
591#[async_trait]
592impl ServiceHandle for EventHandler {
593    async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
594        match error {
595            ServiceError::DialerError { address, error } => {
596                match error {
597                    DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
598                        SecioError::ConnectSelf,
599                    )) => {
600                        debug!("dial observed address success: {:?}", address);
601                    }
602                    DialerErrorKind::IoError(e)
603                        if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
604                    {
605                        warn!("DialerError({}) {}", address, e);
606                    }
607                    _ => {
608                        debug!("DialerError({}) {}", address, error);
609                    }
610                }
611                self.network_state.dial_failed(&address);
612            }
613            ServiceError::ProtocolError {
614                id,
615                proto_id,
616                error,
617            } => {
618                debug!("ProtocolError({}, {}) {}", id, proto_id, error);
619                let message = format!("ProtocolError id={proto_id}");
620                // Ban because misbehave of remote peer
621                self.network_state.ban_session(
622                    &context.control().clone().into(),
623                    id,
624                    Duration::from_secs(300),
625                    message,
626                );
627            }
628            ServiceError::SessionTimeout { session_context } => {
629                debug!(
630                    "SessionTimeout({}, {})",
631                    session_context.id, session_context.address,
632                );
633            }
634            ServiceError::MuxerError {
635                session_context,
636                error,
637            } => {
638                debug!(
639                    "MuxerError({}, {}), substream error {}, disconnect it",
640                    session_context.id, session_context.address, error,
641                );
642            }
643            ServiceError::ListenError { address, error } => {
644                debug!("ListenError: address={:?}, error={:?}", address, error);
645            }
646            ServiceError::ProtocolSelectError {
647                proto_name,
648                session_context,
649            } => {
650                debug!(
651                    "ProtocolSelectError: proto_name={:?}, session_id={}",
652                    proto_name, session_context.id,
653                );
654            }
655            ServiceError::SessionBlocked { session_context } => {
656                debug!("SessionBlocked: {}", session_context.id);
657            }
658            ServiceError::ProtocolHandleError { proto_id, error } => {
659                debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
660
661                let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
662                {
663                    if let Some(id) = opt_session_id {
664                        self.network_state.ban_session(
665                            &context.control().clone().into(),
666                            id,
667                            Duration::from_secs(300),
668                            format!("protocol {proto_id} panic when process peer message"),
669                        );
670                    }
671                    #[cfg(feature = "with_sentry")]
672                    with_scope(
673                        |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
674                        || {
675                            capture_message(
676                                &format!(
677                                    "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
678                                ),
679                                Level::Warning,
680                            )
681                        },
682                    );
683                    error!(
684                        "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
685                    );
686
687                    broadcast_exit_signals();
688                }
689            }
690        }
691    }
692
693    async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
694        // When session disconnect update status anyway
695        match event {
696            ServiceEvent::SessionOpen { session_context } => {
697                debug!(
698                    "SessionOpen({}, {})",
699                    session_context.id, session_context.address,
700                );
701
702                self.network_state.dial_success(&session_context.address);
703
704                let iter = self.inbound_eviction();
705
706                let control = context.control().clone().into();
707
708                for peer in iter {
709                    if let Err(err) =
710                        disconnect_with_message(&control, peer, "bootnode random eviction")
711                    {
712                        debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
713                    }
714                }
715
716                if self
717                    .network_state
718                    .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
719                {
720                    debug!(
721                        "Feeler connected {} => {}",
722                        session_context.id, session_context.address,
723                    );
724                } else {
725                    match self.network_state.accept_peer(&session_context) {
726                        Ok(Some(evicted_peer)) => {
727                            debug!(
728                                "Disconnect peer, {} => {}",
729                                evicted_peer.session_id, evicted_peer.connected_addr,
730                            );
731                            if let Err(err) = disconnect_with_message(
732                                &control,
733                                evicted_peer.session_id,
734                                "evict because accepted better peer",
735                            ) {
736                                debug!(
737                                    "Disconnect failed {:?}, error: {:?}",
738                                    evicted_peer.session_id, err
739                                );
740                            }
741                        }
742                        Ok(None) => debug!(
743                            "{} open, registry {} success",
744                            session_context.id, session_context.address,
745                        ),
746                        Err(err) => {
747                            debug!(
748                                "Peer registry failed {:?}. Disconnect {} => {}",
749                                err, session_context.id, session_context.address,
750                            );
751                            if let Err(err) = disconnect_with_message(
752                                &control,
753                                session_context.id,
754                                "reject peer connection",
755                            ) {
756                                debug!(
757                                    "Disconnect failed {:?}, error: {:?}",
758                                    session_context.id, err
759                                );
760                            }
761                        }
762                    }
763                }
764            }
765            ServiceEvent::SessionClose { session_context } => {
766                debug!(
767                    "SessionClose({}, {})",
768                    session_context.id, session_context.address,
769                );
770                let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
771                    // should make sure feelers is clean
772                    reg.remove_feeler(&session_context.address);
773                    reg.remove_peer(session_context.id).is_some()
774                });
775                if peer_exists {
776                    debug!(
777                        "{} closed. Remove {} from peer_registry",
778                        session_context.id, session_context.address,
779                    );
780                    self.network_state.with_peer_store_mut(|peer_store| {
781                        peer_store.remove_disconnected_peer(&session_context.address);
782                    });
783                }
784                self.network_state
785                    .observed_addrs
786                    .write()
787                    .remove(&session_context.id);
788            }
789            _ => {
790                info!("p2p service event: {:?}", event);
791            }
792        }
793    }
794}
795
796/// Ckb network service, use to start p2p network
797pub struct NetworkService {
798    p2p_service: Service<EventHandler, SecioKeyPair>,
799    network_state: Arc<NetworkState>,
800    ping_controller: Option<Sender<()>>,
801    // Background services
802    bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
803    version: String,
804}
805
806impl NetworkService {
807    /// init with all config
808    pub fn new(
809        network_state: Arc<NetworkState>,
810        protocols: Vec<CKBProtocol>,
811        required_protocol_ids: Vec<ProtocolId>,
812        // name, version, flags
813        identify_announce: (String, String, Flags),
814        transport_type: TransportType,
815    ) -> Self {
816        let config = &network_state.config;
817
818        if config.support_protocols.iter().collect::<HashSet<_>>()
819            != default_support_all_protocols()
820                .iter()
821                .collect::<HashSet<_>>()
822        {
823            warn!(
824                "Customized supported protocols: {:?}",
825                config.support_protocols
826            );
827        }
828
829        // == Build p2p service struct
830        let mut protocol_metas = protocols
831            .into_iter()
832            .map(CKBProtocol::build)
833            .collect::<Vec<_>>();
834
835        // == Build special protocols
836
837        // Identify is a core protocol, user cannot disable it via config
838        let identify_callback = IdentifyCallback::new(
839            Arc::clone(&network_state),
840            identify_announce.0,
841            identify_announce.1.clone(),
842            identify_announce.2,
843        );
844        let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
845            ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
846        });
847        protocol_metas.push(identify_meta);
848
849        // Ping protocol
850        let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
851            let ping_interval = Duration::from_secs(config.ping_interval_secs);
852            let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
853
854            let ping_network_state = Arc::clone(&network_state);
855            let (ping_handler, ping_controller) =
856                PingHandler::new(ping_interval, ping_timeout, ping_network_state);
857            let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
858                ProtocolHandle::Callback(Box::new(ping_handler))
859            });
860            protocol_metas.push(ping_meta);
861            Some(ping_controller)
862        } else {
863            None
864        };
865
866        // Discovery protocol
867        if config
868            .support_protocols
869            .contains(&SupportProtocol::Discovery)
870        {
871            let addr_mgr = DiscoveryAddressManager {
872                network_state: Arc::clone(&network_state),
873                discovery_local_address: config.discovery_local_address,
874            };
875            let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
876                ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
877                    addr_mgr,
878                    config
879                        .discovery_announce_check_interval_secs
880                        .map(Duration::from_secs),
881                )))
882            });
883            protocol_metas.push(disc_meta);
884        }
885
886        // Feeler protocol
887        if config.support_protocols.contains(&SupportProtocol::Feeler) {
888            let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
889                let network_state = Arc::clone(&network_state);
890                move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
891            });
892            protocol_metas.push(feeler_meta);
893        }
894
895        // DisconnectMessage protocol
896        if config
897            .support_protocols
898            .contains(&SupportProtocol::DisconnectMessage)
899        {
900            let disconnect_message_state = Arc::clone(&network_state);
901            let disconnect_message_meta = SupportProtocols::DisconnectMessage
902                .build_meta_with_service_handle(move || {
903                    ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
904                        disconnect_message_state,
905                    )))
906                });
907            protocol_metas.push(disconnect_message_meta);
908        }
909
910        // HolePunching protocol
911        #[cfg(not(target_family = "wasm"))]
912        if config
913            .support_protocols
914            .contains(&SupportProtocol::HolePunching)
915        {
916            let hole_punching_state = Arc::clone(&network_state);
917            let hole_punching_meta =
918                SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
919                    ProtocolHandle::Callback(Box::new(
920                        crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
921                    ))
922                });
923            protocol_metas.push(hole_punching_meta);
924        }
925
926        let mut service_builder = ServiceBuilder::default();
927        let yamux_config = YamuxConfig {
928            max_stream_count: protocol_metas.len(),
929            max_stream_window_size: 1024 * 1024,
930            ..Default::default()
931        };
932        for meta in protocol_metas.into_iter() {
933            network_state
934                .protocols
935                .write()
936                .push((meta.id(), meta.name(), meta.support_versions()));
937            service_builder = service_builder.insert_protocol(meta);
938        }
939        let event_handler = EventHandler {
940            network_state: Arc::clone(&network_state),
941        };
942        service_builder = service_builder
943            .handshake_type(network_state.local_private_key.clone().into())
944            .yamux_config(yamux_config)
945            .forever(true)
946            .max_connection_number(1024)
947            .set_send_buffer_size(config.max_send_buffer())
948            .set_channel_size(config.channel_size())
949            .timeout(Duration::from_secs(5));
950
951        #[cfg(not(target_family = "wasm"))]
952        {
953            service_builder = service_builder.upnp(config.upnp);
954        }
955
956        #[cfg(target_os = "linux")]
957        let p2p_service = {
958            if config.reuse_port_on_linux {
959                let iter = config.listen_addresses.iter();
960
961                #[derive(Clone, Copy, Debug, Eq, PartialEq)]
962                enum BindType {
963                    None,
964                    Ws,
965                    Tcp,
966                    Both,
967                }
968                impl BindType {
969                    fn transform(&mut self, other: TransportType) {
970                        match (&self, other) {
971                            (BindType::None, TransportType::Ws) => *self = BindType::Ws,
972                            (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
973                            (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
974                            (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
975                            _ => (),
976                        }
977                    }
978
979                    fn is_ready(&self) -> bool {
980                        // should change to Both if ckb enable ws
981                        matches!(self, BindType::Both)
982                    }
983                }
984
985                let mut init = BindType::None;
986
987                let bind_fn_with_addr =
988                    move |socket: p2p::service::TcpSocket,
989                          ctxt: p2p::service::TransformerContext,
990                          addr: std::net::SocketAddr| {
991                        let socket_ref = socket2::SockRef::from(&socket);
992                        #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
993                        socket_ref.set_reuse_port(true)?;
994                        socket_ref.set_reuse_address(true)?;
995                        match ctxt.state {
996                            p2p::service::SocketState::Listen => Ok(socket),
997                            p2p::service::SocketState::Dial => {
998                                let domain = socket2::Domain::for_address(addr);
999                                if socket_ref.domain()? == domain {
1000                                    socket_ref.bind(&addr.into())?;
1001                                }
1002                                Ok(socket)
1003                            }
1004                        }
1005                    };
1006
1007                for multi_addr in iter {
1008                    if init.is_ready() {
1009                        break;
1010                    }
1011                    match find_type(multi_addr) {
1012                        TransportType::Tcp => {
1013                            // only bind once
1014                            if matches!(init, BindType::Tcp) {
1015                                continue;
1016                            }
1017                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1018                                let bind_fn = move |socket: p2p::service::TcpSocket,
1019                                                    ctxt: p2p::service::TransformerContext| {
1020                                    bind_fn_with_addr(socket, ctxt, addr)
1021                                };
1022                                init.transform(TransportType::Tcp);
1023                                service_builder = service_builder.tcp_config(bind_fn);
1024                            }
1025                        }
1026                        TransportType::Ws | TransportType::Wss => {
1027                            // only bind once
1028                            if matches!(init, BindType::Ws) {
1029                                continue;
1030                            }
1031                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1032                                let bind_fn = move |socket: p2p::service::TcpSocket,
1033                                                    ctxt: p2p::service::TransformerContext| {
1034                                    bind_fn_with_addr(socket, ctxt, addr)
1035                                };
1036                                init.transform(TransportType::Ws);
1037                                service_builder = service_builder.tcp_config_on_ws(bind_fn);
1038                            }
1039                        }
1040                    }
1041                }
1042            }
1043
1044            service_builder.build(event_handler)
1045        };
1046
1047        #[cfg(not(target_os = "linux"))]
1048        // The default permissions of Windows are not enough to enable this function,
1049        // and the administrator permissions of group permissions must be turned on.
1050        // This operation is very burdensome for windows users, so it is turned off by default
1051        //
1052        // The integration test fails after MacOS is turned on, the behavior is different from linux.
1053        // Decision to turn off it
1054        let p2p_service = service_builder.build(event_handler);
1055
1056        // == Build background service tasks
1057        let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1058        let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1059            Arc::clone(&network_state),
1060            p2p_service.control().to_owned().into(),
1061            required_protocol_ids,
1062        );
1063        let mut bg_services = vec![
1064            Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1065            Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1066        ];
1067        if config.outbound_peer_service_enabled() {
1068            let outbound_peer_service = OutboundPeerService::new(
1069                Arc::clone(&network_state),
1070                p2p_service.control().to_owned().into(),
1071                Duration::from_secs(config.connect_outbound_interval_secs),
1072                transport_type,
1073            );
1074            bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1075        };
1076
1077        #[cfg(feature = "with_dns_seeding")]
1078        if config.dns_seeding_service_enabled() {
1079            let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1080                Arc::clone(&network_state),
1081                config.dns_seeds.clone(),
1082            );
1083            bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1084        };
1085
1086        NetworkService {
1087            p2p_service,
1088            network_state,
1089            ping_controller,
1090            bg_services,
1091            version: identify_announce.1,
1092        }
1093    }
1094
1095    /// Start the network in the background and return a controller
1096    pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1097        let config = self.network_state.config.clone();
1098
1099        let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1100
1101        // dial whitelist_nodes
1102        for addr in self.network_state.config.whitelist_peers() {
1103            debug!("Dial whitelist_peers {:?}", addr);
1104            self.network_state.dial_identify(&p2p_control, addr);
1105        }
1106
1107        let target = &self.network_state.required_flags;
1108
1109        // get bootnodes
1110        // try get addrs from peer_store, if peer_store have no enough addrs then use bootnodes
1111        let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1112            let count = max((config.max_outbound_peers >> 1) as usize, 1);
1113            let mut addrs: Vec<_> = peer_store
1114                .fetch_addrs_to_attempt(count, *target, |_| true)
1115                .into_iter()
1116                .map(|paddr| paddr.addr)
1117                .collect();
1118            // tried to re-connect to anchors on startup
1119            let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1120            addrs.extend(anchors);
1121            // Get bootnodes randomly
1122            let bootnodes = self
1123                .network_state
1124                .bootnodes
1125                .iter()
1126                .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1127                .into_iter()
1128                .cloned();
1129            addrs.extend(bootnodes);
1130            addrs
1131        });
1132
1133        // dial half bootnodes
1134        for addr in bootnodes {
1135            debug!("Dial bootnode {:?}", addr);
1136            self.network_state.dial_identify(&p2p_control, addr);
1137        }
1138
1139        let Self {
1140            mut p2p_service,
1141            network_state,
1142            ping_controller,
1143            bg_services,
1144            version,
1145        } = self;
1146
1147        // NOTE: for ensure background task finished
1148        let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1149            .into_iter()
1150            .map(|bg_service| {
1151                let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1152                (signal_sender, (bg_service, signal_receiver))
1153            })
1154            .unzip();
1155
1156        let receiver: CancellationToken = new_tokio_exit_rx();
1157        #[cfg(not(target_family = "wasm"))]
1158        let (start_sender, start_receiver) = mpsc::channel();
1159        {
1160            #[cfg(not(target_family = "wasm"))]
1161            let network_state = Arc::clone(&network_state);
1162            let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1163            handle.spawn_task(async move {
1164                #[cfg(not(target_family = "wasm"))]
1165                {
1166                    let listen_addresses = {
1167                        let mut addresses = config.listen_addresses.clone();
1168                        if config.reuse_tcp_with_ws {
1169                            let ws_listens = addresses
1170                                .iter()
1171                                .cloned()
1172                                .filter_map(|mut addr| {
1173                                    if matches!(find_type(&addr), TransportType::Tcp) {
1174                                        addr.push(Protocol::Ws);
1175                                        Some(addr)
1176                                    } else {
1177                                        None
1178                                    }
1179                                })
1180                                .collect::<Vec<_>>();
1181
1182                            addresses.extend(ws_listens);
1183                        }
1184                        let mut addresses = addresses
1185                            .into_iter()
1186                            .collect::<HashSet<_>>()
1187                            .into_iter()
1188                            .collect::<Vec<_>>();
1189                        addresses.sort_by(|a, b| {
1190                            let ty_a = find_type(a);
1191                            let ty_b = find_type(b);
1192
1193                            ty_a.cmp(&ty_b)
1194                        });
1195
1196                        addresses
1197                    };
1198
1199                    for addr in &listen_addresses {
1200                        match p2p_service.listen(addr.to_owned()).await {
1201                            Ok(listen_address) => {
1202                                info!("Listen on address: {}", listen_address);
1203                                network_state
1204                                    .listened_addrs
1205                                    .write()
1206                                    .push(listen_address.clone());
1207                            }
1208                            Err(err) => {
1209                                warn!(
1210                                    "Listen on address {} failed, due to error: {}",
1211                                    addr.clone(),
1212                                    err
1213                                );
1214                                start_sender
1215                                    .send(Err(Error::P2P(P2PError::Transport(err))))
1216                                    .expect("channel abnormal shutdown");
1217                                return;
1218                            }
1219                        };
1220                    }
1221                    start_sender.send(Ok(())).unwrap();
1222                }
1223
1224                p2p::runtime::spawn(async move { p2p_service.run().await });
1225                tokio::select! {
1226                    _ = receiver.cancelled() => {
1227                        info!("NetworkService receive exit signal, start shutdown...");
1228                        let _ = p2p_control.shutdown().await;
1229                        // Drop senders to stop all corresponding background task
1230                        drop(bg_signals);
1231
1232                        info!("NetworkService shutdown now");
1233                    },
1234                    else => {
1235                        let _ = p2p_control.shutdown().await;
1236                        // Drop senders to stop all corresponding background task
1237                        drop(bg_signals);
1238                    },
1239                }
1240            });
1241        }
1242        for (mut service, mut receiver) in bg_receivers {
1243            handle.spawn_task(async move {
1244                loop {
1245                    tokio::select! {
1246                        _ = &mut service => {},
1247                        _ = &mut receiver => break
1248                    }
1249                }
1250            });
1251        }
1252        #[cfg(not(target_family = "wasm"))]
1253        if let Ok(Err(e)) = start_receiver.recv() {
1254            return Err(e);
1255        }
1256
1257        Ok(NetworkController {
1258            version,
1259            network_state,
1260            p2p_control,
1261            ping_controller,
1262        })
1263    }
1264}
1265
1266/// Network controller
1267#[derive(Clone)]
1268pub struct NetworkController {
1269    version: String,
1270    network_state: Arc<NetworkState>,
1271    p2p_control: ServiceControl,
1272    ping_controller: Option<Sender<()>>,
1273}
1274
1275impl NetworkController {
1276    /// Node listen address list
1277    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1278        self.network_state.public_urls(max_urls)
1279    }
1280
1281    /// ckb version
1282    pub fn version(&self) -> &String {
1283        &self.version
1284    }
1285
1286    /// Node peer id's base58 format string
1287    pub fn node_id(&self) -> String {
1288        self.network_state.node_id()
1289    }
1290
1291    /// p2p service control
1292    pub fn p2p_control(&self) -> &ServiceControl {
1293        &self.p2p_control
1294    }
1295
1296    /// Dial remote node
1297    pub fn add_node(&self, address: Multiaddr) {
1298        self.network_state.add_node(&self.p2p_control, address)
1299    }
1300
1301    /// Disconnect session with peer id
1302    pub fn remove_node(&self, peer_id: &PeerId) {
1303        if let Some(session_id) = self
1304            .network_state
1305            .peer_registry
1306            .read()
1307            .get_key_by_peer_id(peer_id)
1308        {
1309            if let Err(err) =
1310                disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1311            {
1312                debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1313            }
1314        } else {
1315            error!("Cannot find peer {:?}", peer_id);
1316        }
1317    }
1318
1319    /// Get banned peer list
1320    pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1321        self.network_state
1322            .peer_store
1323            .lock()
1324            .ban_list()
1325            .get_banned_addrs()
1326    }
1327
1328    /// Clear banned list
1329    pub fn clear_banned_addrs(&self) {
1330        self.network_state.peer_store.lock().clear_ban_list();
1331    }
1332
1333    /// Get address info from peer store
1334    pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1335        self.network_state
1336            .peer_store
1337            .lock()
1338            .addr_manager()
1339            .get(addr)
1340            .cloned()
1341    }
1342
1343    /// Ban an ip
1344    pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1345        self.disconnect_peers_in_ip_range(address, &ban_reason);
1346        self.network_state
1347            .peer_store
1348            .lock()
1349            .ban_network(address, ban_until, ban_reason)
1350    }
1351
1352    /// Unban an ip
1353    pub fn unban(&self, address: &IpNetwork) {
1354        self.network_state
1355            .peer_store
1356            .lock()
1357            .mut_ban_list()
1358            .unban_network(address);
1359    }
1360
1361    /// Return all connected peers' information
1362    pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1363        self.network_state.with_peer_registry(|reg| {
1364            reg.peers()
1365                .iter()
1366                .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1367                .collect::<Vec<_>>()
1368        })
1369    }
1370
1371    /// Ban an peer through peer index
1372    pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1373        self.network_state
1374            .ban_session(&self.p2p_control, peer_index, duration, reason);
1375    }
1376
1377    /// disconnect peers with matched peer_ip or peer_ip_network, eg: 192.168.0.2 or 192.168.0.0/24
1378    fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1379        self.network_state.with_peer_registry(|reg| {
1380            reg.peers().iter().for_each(|(peer_index, peer)| {
1381                if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1382                    if address.contains(addr.ip()) {
1383                        let _ = disconnect_with_message(
1384                            &self.p2p_control,
1385                            *peer_index,
1386                            &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1387                        );
1388                    }
1389                }
1390            })
1391        });
1392    }
1393
1394    fn try_broadcast(
1395        &self,
1396        quick: bool,
1397        target: Option<SessionId>,
1398        proto_id: ProtocolId,
1399        data: Bytes,
1400    ) -> Result<(), SendErrorKind> {
1401        let now = Instant::now();
1402        loop {
1403            let target = target
1404                .map(TargetSession::Single)
1405                .unwrap_or(TargetSession::All);
1406            let result = if quick {
1407                self.p2p_control
1408                    .quick_filter_broadcast(target, proto_id, data.clone())
1409            } else {
1410                self.p2p_control
1411                    .filter_broadcast(target, proto_id, data.clone())
1412            };
1413            match result {
1414                Ok(()) => {
1415                    return Ok(());
1416                }
1417                Err(SendErrorKind::WouldBlock) => {
1418                    if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1419                        warn!("Broadcast message to {} timeout", proto_id);
1420                        return Err(SendErrorKind::WouldBlock);
1421                    }
1422                    thread::sleep(P2P_TRY_SEND_INTERVAL);
1423                }
1424                Err(err) => {
1425                    warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1426                    return Err(err);
1427                }
1428            }
1429        }
1430    }
1431
1432    /// Broadcast a message to all connected peers
1433    pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1434        self.try_broadcast(false, None, proto_id, data)
1435    }
1436
1437    /// Broadcast a message to all connected peers through quick queue
1438    pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1439        self.try_broadcast(true, None, proto_id, data)
1440    }
1441
1442    /// Send message to one connected peer
1443    pub fn send_message_to(
1444        &self,
1445        session_id: SessionId,
1446        proto_id: ProtocolId,
1447        data: Bytes,
1448    ) -> Result<(), SendErrorKind> {
1449        self.try_broadcast(false, Some(session_id), proto_id, data)
1450    }
1451
1452    /// network message processing controller, always true, if false, discard any received messages
1453    pub fn is_active(&self) -> bool {
1454        self.network_state.is_active()
1455    }
1456
1457    /// Change active status, if set false discard any received messages
1458    pub fn set_active(&self, active: bool) {
1459        self.network_state.active.store(active, Ordering::Release);
1460    }
1461
1462    /// Return all connected peers' protocols info
1463    pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1464        self.network_state.protocols.read().clone()
1465    }
1466
1467    /// Try ping all connected peers
1468    pub fn ping_peers(&self) {
1469        if let Some(mut ping_controller) = self.ping_controller.clone() {
1470            let _ignore = ping_controller.try_send(());
1471        }
1472    }
1473}
1474
1475// Send an optional message before disconnect a peer
1476pub(crate) fn disconnect_with_message(
1477    control: &ServiceControl,
1478    peer_index: SessionId,
1479    message: &str,
1480) -> Result<(), SendErrorKind> {
1481    if !message.is_empty() {
1482        let data = Bytes::from(message.as_bytes().to_vec());
1483        // Must quick send, otherwise this message will be dropped.
1484        control.quick_send_message_to(
1485            peer_index,
1486            SupportProtocols::DisconnectMessage.protocol_id(),
1487            data,
1488        )?;
1489    }
1490    control.disconnect(peer_index)
1491}
1492
1493pub(crate) async fn async_disconnect_with_message(
1494    control: &ServiceAsyncControl,
1495    peer_index: SessionId,
1496    message: &str,
1497) -> Result<(), SendErrorKind> {
1498    if !message.is_empty() {
1499        let data = Bytes::from(message.as_bytes().to_vec());
1500        // Must quick send, otherwise this message will be dropped.
1501        control
1502            .quick_send_message_to(
1503                peer_index,
1504                SupportProtocols::DisconnectMessage.protocol_id(),
1505                data,
1506            )
1507            .await?;
1508    }
1509    control.disconnect(peer_index).await
1510}
1511
1512/// Transport type on ckb
1513#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1514pub enum TransportType {
1515    /// Tcp
1516    Tcp,
1517    /// Ws
1518    Ws,
1519    /// Wss only on wasm
1520    Wss,
1521}
1522
1523#[allow(dead_code)]
1524pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1525    let mut iter = addr.iter();
1526
1527    iter.find_map(|proto| match proto {
1528        Protocol::Ws => Some(TransportType::Ws),
1529        Protocol::Wss => Some(TransportType::Wss),
1530        _ => None,
1531    })
1532    .unwrap_or(TransportType::Tcp)
1533}