ckb_network/
network.rs

1//! Global state struct and start function
2use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7    types::{AddrInfo, BannedAddr},
8    PeerStore,
9};
10use crate::protocols::{
11    disconnect_message::DisconnectMessageProtocol,
12    discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13    feeler::Feeler,
14    identify::{Flags, IdentifyCallback, IdentifyProtocol},
15    ping::PingHandler,
16    support_protocols::SupportProtocols,
17};
18use crate::services::{
19    dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20    protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{default_support_all_protocols, NetworkConfig, SupportProtocol};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{channel::mpsc::Sender, Future};
30use ipnetwork::IpNetwork;
31use p2p::{
32    async_trait,
33    builder::ServiceBuilder,
34    bytes::Bytes,
35    context::{ServiceContext, SessionContext},
36    error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37    multiaddr::{Multiaddr, Protocol},
38    secio::{self, error::SecioError, PeerId, SecioKeyPair},
39    service::{
40        ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41        TargetSession,
42    },
43    traits::ServiceHandle,
44    utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45    yamux::config::Config as YamuxConfig,
46    SessionId,
47};
48use rand::prelude::IteratorRandom;
49#[cfg(feature = "with_sentry")]
50use sentry::{capture_message, with_scope, Level};
51#[cfg(not(target_family = "wasm"))]
52use std::sync::mpsc;
53use std::{
54    borrow::Cow,
55    cmp::max,
56    collections::{HashMap, HashSet},
57    pin::Pin,
58    sync::{
59        atomic::{AtomicBool, Ordering},
60        Arc,
61    },
62    thread,
63};
64use tokio::{self, sync::oneshot};
65
66const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
67const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
68// After 5 minutes we consider this dial hang
69const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
70
71/// The global shared state of the network module
72pub struct NetworkState {
73    pub(crate) peer_registry: RwLock<PeerRegistry>,
74    pub(crate) peer_store: Mutex<PeerStore>,
75    /// Node listened addresses
76    pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
77    dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
78    /// Node public addresses,
79    /// includes manually public addrs and remote peer observed addrs
80    public_addrs: RwLock<HashSet<Multiaddr>>,
81    pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
82    local_private_key: secio::SecioKeyPair,
83    local_peer_id: PeerId,
84    pub(crate) bootnodes: Vec<Multiaddr>,
85    pub(crate) config: NetworkConfig,
86    pub(crate) active: AtomicBool,
87    /// Node supported protocols
88    /// fields: ProtocolId, Protocol Name, Supported Versions
89    pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
90    pub(crate) required_flags: Flags,
91
92    pub(crate) ckb2023: AtomicBool,
93}
94
95impl NetworkState {
96    /// Init from config
97    #[cfg(not(target_family = "wasm"))]
98    pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
99        config.create_dir_if_not_exists()?;
100        let local_private_key = config.fetch_private_key()?;
101        let local_peer_id = local_private_key.peer_id();
102        // set max score to public addresses
103        let public_addrs: HashSet<Multiaddr> = config
104            .listen_addresses
105            .iter()
106            .chain(config.public_addresses.iter())
107            .cloned()
108            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
109                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
110                _ => {
111                    if extract_peer_id(&addr).is_none() {
112                        addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
113                    }
114                    Some(addr)
115                }
116            })
117            .collect();
118        info!("Loading the peer store. This process may take a few seconds to complete.");
119
120        let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
121            config.peer_store_path(),
122        ));
123        let bootnodes = config.bootnodes();
124
125        let peer_registry = PeerRegistry::new(
126            config.max_inbound_peers(),
127            config.max_outbound_peers(),
128            config.whitelist_only,
129            config.whitelist_peers(),
130        );
131
132        Ok(NetworkState {
133            peer_store,
134            config,
135            bootnodes,
136            peer_registry: RwLock::new(peer_registry),
137            dialing_addrs: RwLock::new(HashMap::default()),
138            public_addrs: RwLock::new(public_addrs),
139            listened_addrs: RwLock::new(Vec::new()),
140            pending_observed_addrs: RwLock::new(HashSet::default()),
141            local_private_key,
142            local_peer_id,
143            active: AtomicBool::new(true),
144            protocols: RwLock::new(Vec::new()),
145            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
146            ckb2023: AtomicBool::new(false),
147        })
148    }
149
150    #[cfg(target_family = "wasm")]
151    pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
152        let local_private_key = config.fetch_private_key()?;
153        let local_peer_id = local_private_key.peer_id();
154        // set max score to public addresses
155        let public_addrs: HashSet<Multiaddr> = config
156            .listen_addresses
157            .iter()
158            .chain(config.public_addresses.iter())
159            .cloned()
160            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
161                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
162                _ => {
163                    if extract_peer_id(&addr).is_none() {
164                        addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
165                    }
166                    Some(addr)
167                }
168            })
169            .collect();
170        info!("Loading the peer store. This process may take a few seconds to complete.");
171        let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
172        let bootnodes = config.bootnodes();
173
174        let peer_registry = PeerRegistry::new(
175            config.max_inbound_peers(),
176            config.max_outbound_peers(),
177            config.whitelist_only,
178            config.whitelist_peers(),
179        );
180        Ok(NetworkState {
181            peer_store,
182            config,
183            bootnodes,
184            peer_registry: RwLock::new(peer_registry),
185            dialing_addrs: RwLock::new(HashMap::default()),
186            public_addrs: RwLock::new(public_addrs),
187            listened_addrs: RwLock::new(Vec::new()),
188            pending_observed_addrs: RwLock::new(HashSet::default()),
189            local_private_key,
190            local_peer_id,
191            active: AtomicBool::new(true),
192            protocols: RwLock::new(Vec::new()),
193            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
194            ckb2023: AtomicBool::new(false),
195        })
196    }
197
198    /// fork flag
199    pub fn ckb2023(self, init: bool) -> Self {
200        self.ckb2023.store(init, Ordering::SeqCst);
201        self
202    }
203
204    /// use to discovery get nodes message to announce what kind of node information need from the other peer
205    /// default with `Flags::SYNC | Flags::DISCOVERY | Flags::RELAY`
206    pub fn required_flags(mut self, flags: Flags) -> Self {
207        self.required_flags = flags;
208        self
209    }
210
211    pub(crate) fn report_session(
212        &self,
213        p2p_control: &ServiceControl,
214        session_id: SessionId,
215        behaviour: Behaviour,
216    ) {
217        if let Some(addr) = self.with_peer_registry(|reg| {
218            reg.get_peer(session_id)
219                .filter(|peer| !peer.is_whitelist)
220                .map(|peer| peer.connected_addr.clone())
221        }) {
222            trace!("Report {:?} because {:?}", addr, behaviour);
223            let report_result = self.peer_store.lock().report(&addr, behaviour);
224            if report_result.is_banned() {
225                if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
226                    debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
227                }
228            }
229        } else {
230            debug!(
231                "Report {} failure: not found in peer registry or it is on the whitelist",
232                session_id
233            );
234        }
235    }
236
237    pub(crate) fn ban_session(
238        &self,
239        p2p_control: &ServiceControl,
240        session_id: SessionId,
241        duration: Duration,
242        reason: String,
243    ) {
244        if let Some(addr) = self.with_peer_registry(|reg| {
245            reg.get_peer(session_id)
246                .filter(|peer| !peer.is_whitelist)
247                .map(|peer| peer.connected_addr.clone())
248        }) {
249            info!(
250                "Ban peer {:?} for {} seconds, reason: {}",
251                addr,
252                duration.as_secs(),
253                reason
254            );
255            if let Some(metrics) = ckb_metrics::handle() {
256                metrics.ckb_network_ban_peer.inc();
257            }
258            if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
259                let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
260                self.peer_store.lock().ban_addr(
261                    &peer.connected_addr,
262                    duration.as_millis() as u64,
263                    reason,
264                );
265                if let Err(err) =
266                    disconnect_with_message(p2p_control, peer.session_id, message.as_str())
267                {
268                    debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
269                }
270            }
271        } else {
272            debug!(
273                "Ban session({}) failed: not found in peer registry or it is on the whitelist",
274                session_id
275            );
276        }
277    }
278
279    pub(crate) fn accept_peer(
280        &self,
281        session_context: &SessionContext,
282    ) -> Result<Option<Peer>, Error> {
283        // NOTE: be careful, here easy cause a deadlock,
284        //    because peer_store's lock scope across peer_registry's lock scope
285        let mut peer_store = self.peer_store.lock();
286        let accept_peer_result = {
287            self.peer_registry.write().accept_peer(
288                session_context.address.clone(),
289                session_context.id,
290                session_context.ty,
291                &mut peer_store,
292            )
293        };
294        accept_peer_result.map_err(Into::into)
295    }
296
297    /// For restrict lock in inner scope
298    pub fn with_peer_registry<F, T>(&self, callback: F) -> T
299    where
300        F: FnOnce(&PeerRegistry) -> T,
301    {
302        callback(&self.peer_registry.read())
303    }
304
305    // For restrict lock in inner scope
306    pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
307    where
308        F: FnOnce(&mut PeerRegistry) -> T,
309    {
310        callback(&mut self.peer_registry.write())
311    }
312
313    // For restrict lock in inner scope
314    pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
315    where
316        F: FnOnce(&mut PeerStore) -> T,
317    {
318        callback(&mut self.peer_store.lock())
319    }
320
321    /// Get peer id of local node
322    pub fn local_peer_id(&self) -> &PeerId {
323        &self.local_peer_id
324    }
325
326    /// Use on test
327    pub fn local_private_key(&self) -> &secio::SecioKeyPair {
328        &self.local_private_key
329    }
330
331    /// Get local node's peer id in base58 format string
332    pub fn node_id(&self) -> String {
333        self.local_peer_id().to_base58()
334    }
335
336    pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
337        self.public_addrs
338            .read()
339            .iter()
340            .take(count)
341            .cloned()
342            .collect()
343    }
344
345    pub(crate) fn connection_status(&self) -> ConnectionStatus {
346        self.peer_registry.read().connection_status()
347    }
348
349    /// Get local node's listen address list
350    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
351        let listened_addrs = self.listened_addrs.read();
352        self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
353            .into_iter()
354            .filter_map(|addr| {
355                if !listened_addrs.contains(&addr) {
356                    Some((addr, 1))
357                } else {
358                    None
359                }
360            })
361            .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
362            .map(|(addr, score)| (addr.to_string(), score))
363            .collect()
364    }
365
366    pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
367        self.dial_identify(p2p_control, address);
368    }
369
370    /// use a filter to get protocol id list
371    pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
372        self.protocols
373            .read()
374            .iter()
375            .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
376            .collect::<Vec<_>>()
377    }
378
379    pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
380        let peer_id = extract_peer_id(addr);
381        if peer_id.is_none() {
382            error!("Do not dial addr without peer id, addr: {}", addr);
383            return false;
384        }
385        let peer_id = peer_id.as_ref().unwrap();
386
387        if self.local_peer_id() == peer_id {
388            trace!("Do not dial self: {:?}, {}", peer_id, addr);
389            return false;
390        }
391        if self.public_addrs.read().contains(addr) {
392            trace!(
393                "Do not dial listened address(self): {:?}, {}",
394                peer_id,
395                addr
396            );
397            return false;
398        }
399
400        let peer_in_registry = self.with_peer_registry(|reg| {
401            reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
402        });
403        if peer_in_registry {
404            trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
405            return false;
406        }
407
408        if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
409            trace!(
410                "Do not send repeated dial commands to network service: {:?}, {}",
411                peer_id,
412                addr
413            );
414            if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
415                #[cfg(feature = "with_sentry")]
416                with_scope(
417                    |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
418                    || {
419                        capture_message(
420                            &format!(
421                                "Dialing {:?}, {:?} for more than {} seconds, \
422                                 something is wrong in network service",
423                                peer_id,
424                                addr,
425                                DIAL_HANG_TIMEOUT.as_secs(),
426                            ),
427                            Level::Warning,
428                        )
429                    },
430                );
431            }
432            return false;
433        }
434
435        true
436    }
437
438    pub(crate) fn dial_success(&self, addr: &Multiaddr) {
439        if let Some(peer_id) = extract_peer_id(addr) {
440            self.dialing_addrs.write().remove(&peer_id);
441        }
442    }
443
444    pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
445        self.with_peer_registry_mut(|reg| {
446            reg.remove_feeler(addr);
447        });
448
449        if let Some(peer_id) = extract_peer_id(addr) {
450            self.dialing_addrs.write().remove(&peer_id);
451        }
452    }
453
454    /// Dial
455    /// return value indicates the dialing is actually sent or denied.
456    fn dial_inner(
457        &self,
458        p2p_control: &ServiceControl,
459        addr: Multiaddr,
460        target: TargetProtocol,
461    ) -> Result<(), Error> {
462        if !self.can_dial(&addr) {
463            return Err(Error::Dial(format!("ignore dialing addr {addr}")));
464        }
465
466        debug!("Dialing {addr}");
467        p2p_control.dial(addr.clone(), target)?;
468        self.dialing_addrs.write().insert(
469            extract_peer_id(&addr).expect("verified addr"),
470            Instant::now(),
471        );
472        Ok(())
473    }
474
475    /// Dial just identify protocol
476    pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
477        if let Err(err) = self.dial_inner(
478            p2p_control,
479            addr,
480            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
481        ) {
482            debug!("dial_identify error: {err}");
483        }
484    }
485
486    /// Dial just feeler protocol
487    pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
488        if let Err(err) = self.dial_inner(
489            p2p_control,
490            addr.clone(),
491            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
492        ) {
493            debug!("dial_feeler error {err}");
494        } else {
495            self.with_peer_registry_mut(|reg| {
496                reg.add_feeler(&addr);
497            });
498        }
499    }
500
501    /// this method is intent to check observed addr by dial to self
502    pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
503        let mut pending_observed_addrs = self.pending_observed_addrs.write();
504        if pending_observed_addrs.is_empty() {
505            let addrs = self.public_addrs.read();
506            if addrs.is_empty() {
507                return;
508            }
509            // random get addr
510            if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
511                if let Err(err) = p2p_control.dial(
512                    addr.clone(),
513                    TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
514                ) {
515                    trace!("try_dial_observed_addrs {err} failed in public address")
516                }
517            }
518        } else {
519            for addr in pending_observed_addrs.drain() {
520                trace!("try dial observed addr: {:?}", addr);
521                if let Err(err) = p2p_control.dial(
522                    addr,
523                    TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
524                ) {
525                    trace!("try_dial_observed_addrs {err} failed in pending observed addresses")
526                }
527            }
528        }
529    }
530
531    /// add observed address for identify protocol
532    pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
533        let mut pending_observed_addrs = self.pending_observed_addrs.write();
534        pending_observed_addrs.extend(iter)
535    }
536
537    /// Network message processing controller, default is true, if false, discard any received messages
538    pub fn is_active(&self) -> bool {
539        self.active.load(Ordering::Acquire)
540    }
541}
542
543/// Used to handle global events of tentacle, such as session open/close
544pub struct EventHandler {
545    pub(crate) network_state: Arc<NetworkState>,
546}
547
548impl EventHandler {
549    /// init an event handler
550    pub fn new(network_state: Arc<NetworkState>) -> Self {
551        Self { network_state }
552    }
553}
554
555/// Exit trait used to notify all other module to exit
556pub trait ExitHandler: Send + Unpin + 'static {
557    /// notify other module to exit
558    fn notify_exit(&self);
559}
560
561/// Default exit handle
562#[derive(Clone, Default)]
563pub struct DefaultExitHandler {
564    lock: Arc<Mutex<()>>,
565    exit: Arc<Condvar>,
566}
567
568impl DefaultExitHandler {
569    /// Block on current thread util exit notify
570    pub fn wait_for_exit(&self) {
571        self.exit.wait(&mut self.lock.lock());
572    }
573}
574
575impl ExitHandler for DefaultExitHandler {
576    fn notify_exit(&self) {
577        self.exit.notify_all();
578    }
579}
580
581impl EventHandler {
582    fn inbound_eviction(&self) -> Vec<PeerIndex> {
583        if self.network_state.config.bootnode_mode {
584            let status = self.network_state.connection_status();
585
586            if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
587                self.network_state
588                    .with_peer_registry(|registry| {
589                        registry
590                            .peers()
591                            .values()
592                            .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
593                            .map(|peer| peer.session_id)
594                            .collect::<Vec<SessionId>>()
595                    })
596                    .into_iter()
597                    .enumerate()
598                    .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
599                    .collect()
600            } else {
601                Vec::new()
602            }
603        } else {
604            Vec::new()
605        }
606    }
607}
608
609#[async_trait]
610impl ServiceHandle for EventHandler {
611    async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
612        match error {
613            ServiceError::DialerError { address, error } => {
614                let mut public_addrs = self.network_state.public_addrs.write();
615
616                match error {
617                    DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
618                        SecioError::ConnectSelf,
619                    )) => {
620                        debug!("dial observed address success: {:?}", address);
621                        if let Some(ip) = multiaddr_to_socketaddr(&address) {
622                            if is_reachable(ip.ip()) {
623                                public_addrs.insert(address);
624                            }
625                        }
626                        return;
627                    }
628                    DialerErrorKind::IoError(e)
629                        if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
630                    {
631                        warn!("DialerError({}) {}", address, e);
632                    }
633                    _ => {
634                        debug!("DialerError({}) {}", address, error);
635                    }
636                }
637                if public_addrs.remove(&address) {
638                    info!(
639                        "Dial {} failed, remove it from network_state.public_addrs",
640                        address
641                    );
642                }
643                self.network_state.dial_failed(&address);
644            }
645            ServiceError::ProtocolError {
646                id,
647                proto_id,
648                error,
649            } => {
650                debug!("ProtocolError({}, {}) {}", id, proto_id, error);
651                let message = format!("ProtocolError id={proto_id}");
652                // Ban because misbehave of remote peer
653                self.network_state.ban_session(
654                    &context.control().clone().into(),
655                    id,
656                    Duration::from_secs(300),
657                    message,
658                );
659            }
660            ServiceError::SessionTimeout { session_context } => {
661                debug!(
662                    "SessionTimeout({}, {})",
663                    session_context.id, session_context.address,
664                );
665            }
666            ServiceError::MuxerError {
667                session_context,
668                error,
669            } => {
670                debug!(
671                    "MuxerError({}, {}), substream error {}, disconnect it",
672                    session_context.id, session_context.address, error,
673                );
674            }
675            ServiceError::ListenError { address, error } => {
676                debug!("ListenError: address={:?}, error={:?}", address, error);
677            }
678            ServiceError::ProtocolSelectError {
679                proto_name,
680                session_context,
681            } => {
682                debug!(
683                    "ProtocolSelectError: proto_name={:?}, session_id={}",
684                    proto_name, session_context.id,
685                );
686            }
687            ServiceError::SessionBlocked { session_context } => {
688                debug!("SessionBlocked: {}", session_context.id);
689            }
690            ServiceError::ProtocolHandleError { proto_id, error } => {
691                debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
692
693                let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
694                {
695                    if let Some(id) = opt_session_id {
696                        self.network_state.ban_session(
697                            &context.control().clone().into(),
698                            id,
699                            Duration::from_secs(300),
700                            format!("protocol {proto_id} panic when process peer message"),
701                        );
702                    }
703                    #[cfg(feature = "with_sentry")]
704                    with_scope(
705                        |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
706                        || {
707                            capture_message(
708                                &format!("ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"),
709                                Level::Warning,
710                            )
711                        },
712                    );
713                    error!("ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}");
714
715                    broadcast_exit_signals();
716                }
717            }
718        }
719    }
720
721    async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
722        // When session disconnect update status anyway
723        match event {
724            ServiceEvent::SessionOpen { session_context } => {
725                debug!(
726                    "SessionOpen({}, {})",
727                    session_context.id, session_context.address,
728                );
729
730                self.network_state.dial_success(&session_context.address);
731
732                let iter = self.inbound_eviction();
733
734                let control = context.control().clone().into();
735
736                for peer in iter {
737                    if let Err(err) =
738                        disconnect_with_message(&control, peer, "bootnode random eviction")
739                    {
740                        debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
741                    }
742                }
743
744                if self
745                    .network_state
746                    .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
747                {
748                    debug!(
749                        "Feeler connected {} => {}",
750                        session_context.id, session_context.address,
751                    );
752                } else {
753                    match self.network_state.accept_peer(&session_context) {
754                        Ok(Some(evicted_peer)) => {
755                            debug!(
756                                "Disconnect peer, {} => {}",
757                                evicted_peer.session_id, evicted_peer.connected_addr,
758                            );
759                            if let Err(err) = disconnect_with_message(
760                                &control,
761                                evicted_peer.session_id,
762                                "evict because accepted better peer",
763                            ) {
764                                debug!(
765                                    "Disconnect failed {:?}, error: {:?}",
766                                    evicted_peer.session_id, err
767                                );
768                            }
769                        }
770                        Ok(None) => debug!(
771                            "{} open, registry {} success",
772                            session_context.id, session_context.address,
773                        ),
774                        Err(err) => {
775                            debug!(
776                                "Peer registry failed {:?}. Disconnect {} => {}",
777                                err, session_context.id, session_context.address,
778                            );
779                            if let Err(err) = disconnect_with_message(
780                                &control,
781                                session_context.id,
782                                "reject peer connection",
783                            ) {
784                                debug!(
785                                    "Disconnect failed {:?}, error: {:?}",
786                                    session_context.id, err
787                                );
788                            }
789                        }
790                    }
791                }
792            }
793            ServiceEvent::SessionClose { session_context } => {
794                debug!(
795                    "SessionClose({}, {})",
796                    session_context.id, session_context.address,
797                );
798                let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
799                    // should make sure feelers is clean
800                    reg.remove_feeler(&session_context.address);
801                    reg.remove_peer(session_context.id).is_some()
802                });
803                if peer_exists {
804                    debug!(
805                        "{} closed. Remove {} from peer_registry",
806                        session_context.id, session_context.address,
807                    );
808                    self.network_state.with_peer_store_mut(|peer_store| {
809                        peer_store.remove_disconnected_peer(&session_context.address);
810                    });
811                }
812            }
813            _ => {
814                info!("p2p service event: {:?}", event);
815            }
816        }
817    }
818}
819
820/// Ckb network service, use to start p2p network
821pub struct NetworkService {
822    p2p_service: Service<EventHandler, SecioKeyPair>,
823    network_state: Arc<NetworkState>,
824    ping_controller: Option<Sender<()>>,
825    // Background services
826    bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
827    version: String,
828}
829
830impl NetworkService {
831    /// init with all config
832    pub fn new(
833        network_state: Arc<NetworkState>,
834        protocols: Vec<CKBProtocol>,
835        required_protocol_ids: Vec<ProtocolId>,
836        // name, version, flags
837        identify_announce: (String, String, Flags),
838        transport_type: TransportType,
839    ) -> Self {
840        let config = &network_state.config;
841
842        if config.support_protocols.iter().collect::<HashSet<_>>()
843            != default_support_all_protocols()
844                .iter()
845                .collect::<HashSet<_>>()
846        {
847            warn!(
848                "Customized supported protocols: {:?}",
849                config.support_protocols
850            );
851        }
852
853        // == Build p2p service struct
854        let mut protocol_metas = protocols
855            .into_iter()
856            .map(CKBProtocol::build)
857            .collect::<Vec<_>>();
858
859        // == Build special protocols
860
861        // Identify is a core protocol, user cannot disable it via config
862        let identify_callback = IdentifyCallback::new(
863            Arc::clone(&network_state),
864            identify_announce.0,
865            identify_announce.1.clone(),
866            identify_announce.2,
867        );
868        let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
869            ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
870        });
871        protocol_metas.push(identify_meta);
872
873        // Ping protocol
874        let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
875            let ping_interval = Duration::from_secs(config.ping_interval_secs);
876            let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
877
878            let ping_network_state = Arc::clone(&network_state);
879            let (ping_handler, ping_controller) =
880                PingHandler::new(ping_interval, ping_timeout, ping_network_state);
881            let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
882                ProtocolHandle::Callback(Box::new(ping_handler))
883            });
884            protocol_metas.push(ping_meta);
885            Some(ping_controller)
886        } else {
887            None
888        };
889
890        // Discovery protocol
891        if config
892            .support_protocols
893            .contains(&SupportProtocol::Discovery)
894        {
895            let addr_mgr = DiscoveryAddressManager {
896                network_state: Arc::clone(&network_state),
897                discovery_local_address: config.discovery_local_address,
898            };
899            let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
900                ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
901                    addr_mgr,
902                    config
903                        .discovery_announce_check_interval_secs
904                        .map(Duration::from_secs),
905                )))
906            });
907            protocol_metas.push(disc_meta);
908        }
909
910        // Feeler protocol
911        if config.support_protocols.contains(&SupportProtocol::Feeler) {
912            let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
913                let network_state = Arc::clone(&network_state);
914                move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
915            });
916            protocol_metas.push(feeler_meta);
917        }
918
919        // DisconnectMessage protocol
920        if config
921            .support_protocols
922            .contains(&SupportProtocol::DisconnectMessage)
923        {
924            let disconnect_message_state = Arc::clone(&network_state);
925            let disconnect_message_meta = SupportProtocols::DisconnectMessage
926                .build_meta_with_service_handle(move || {
927                    ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
928                        disconnect_message_state,
929                    )))
930                });
931            protocol_metas.push(disconnect_message_meta);
932        }
933
934        let mut service_builder = ServiceBuilder::default();
935        let yamux_config = YamuxConfig {
936            max_stream_count: protocol_metas.len(),
937            max_stream_window_size: 1024 * 1024,
938            ..Default::default()
939        };
940        for meta in protocol_metas.into_iter() {
941            network_state
942                .protocols
943                .write()
944                .push((meta.id(), meta.name(), meta.support_versions()));
945            service_builder = service_builder.insert_protocol(meta);
946        }
947        let event_handler = EventHandler {
948            network_state: Arc::clone(&network_state),
949        };
950        service_builder = service_builder
951            .handshake_type(network_state.local_private_key.clone().into())
952            .yamux_config(yamux_config)
953            .forever(true)
954            .max_connection_number(1024)
955            .set_send_buffer_size(config.max_send_buffer())
956            .set_channel_size(config.channel_size())
957            .timeout(Duration::from_secs(5));
958
959        #[cfg(not(target_family = "wasm"))]
960        {
961            service_builder = service_builder.upnp(config.upnp);
962        }
963
964        #[cfg(target_os = "linux")]
965        let p2p_service = {
966            if config.reuse_port_on_linux {
967                let iter = config.listen_addresses.iter();
968
969                #[derive(Clone, Copy, Debug, Eq, PartialEq)]
970                enum BindType {
971                    None,
972                    Ws,
973                    Tcp,
974                    Both,
975                }
976                impl BindType {
977                    fn transform(&mut self, other: TransportType) {
978                        match (&self, other) {
979                            (BindType::None, TransportType::Ws) => *self = BindType::Ws,
980                            (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
981                            (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
982                            (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
983                            _ => (),
984                        }
985                    }
986
987                    fn is_ready(&self) -> bool {
988                        // should change to Both if ckb enable ws
989                        matches!(self, BindType::Both)
990                    }
991                }
992
993                let mut init = BindType::None;
994
995                for multi_addr in iter {
996                    if init.is_ready() {
997                        break;
998                    }
999                    match find_type(multi_addr) {
1000                        TransportType::Tcp => {
1001                            // only bind once
1002                            if matches!(init, BindType::Tcp) {
1003                                continue;
1004                            }
1005                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1006                                let domain = socket2::Domain::for_address(addr);
1007                                let bind_fn = move |socket: p2p::service::TcpSocket| {
1008                                    let socket_ref = socket2::SockRef::from(&socket);
1009                                    #[cfg(all(
1010                                        unix,
1011                                        not(target_os = "solaris"),
1012                                        not(target_os = "illumos")
1013                                    ))]
1014                                    socket_ref.set_reuse_port(true)?;
1015                                    socket_ref.set_reuse_address(true)?;
1016                                    if socket_ref.domain()? == domain {
1017                                        socket_ref.bind(&addr.into())?;
1018                                    }
1019                                    Ok(socket)
1020                                };
1021                                init.transform(TransportType::Tcp);
1022                                service_builder = service_builder.tcp_config(bind_fn);
1023                            }
1024                        }
1025                        TransportType::Ws | TransportType::Wss => {
1026                            // only bind once
1027                            if matches!(init, BindType::Ws) {
1028                                continue;
1029                            }
1030                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1031                                let domain = socket2::Domain::for_address(addr);
1032                                let bind_fn = move |socket: p2p::service::TcpSocket| {
1033                                    let socket_ref = socket2::SockRef::from(&socket);
1034                                    #[cfg(all(
1035                                        unix,
1036                                        not(target_os = "solaris"),
1037                                        not(target_os = "illumos")
1038                                    ))]
1039                                    socket_ref.set_reuse_port(true)?;
1040                                    socket_ref.set_reuse_address(true)?;
1041                                    if socket_ref.domain()? == domain {
1042                                        socket_ref.bind(&addr.into())?;
1043                                    }
1044                                    Ok(socket)
1045                                };
1046                                init.transform(TransportType::Ws);
1047                                service_builder = service_builder.tcp_config_on_ws(bind_fn);
1048                            }
1049                        }
1050                    }
1051                }
1052            }
1053
1054            service_builder.build(event_handler)
1055        };
1056
1057        #[cfg(not(target_os = "linux"))]
1058        // The default permissions of Windows are not enough to enable this function,
1059        // and the administrator permissions of group permissions must be turned on.
1060        // This operation is very burdensome for windows users, so it is turned off by default
1061        //
1062        // The integration test fails after MacOS is turned on, the behavior is different from linux.
1063        // Decision to turn off it
1064        let p2p_service = service_builder.build(event_handler);
1065
1066        // == Build background service tasks
1067        let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1068        let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1069            Arc::clone(&network_state),
1070            p2p_service.control().to_owned().into(),
1071            required_protocol_ids,
1072        );
1073        let mut bg_services = vec![
1074            Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1075            Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1076        ];
1077        if config.outbound_peer_service_enabled() {
1078            let outbound_peer_service = OutboundPeerService::new(
1079                Arc::clone(&network_state),
1080                p2p_service.control().to_owned().into(),
1081                Duration::from_secs(config.connect_outbound_interval_secs),
1082                transport_type,
1083            );
1084            bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1085        };
1086
1087        #[cfg(feature = "with_dns_seeding")]
1088        if config.dns_seeding_service_enabled() {
1089            let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1090                Arc::clone(&network_state),
1091                config.dns_seeds.clone(),
1092            );
1093            bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1094        };
1095
1096        NetworkService {
1097            p2p_service,
1098            network_state,
1099            ping_controller,
1100            bg_services,
1101            version: identify_announce.1,
1102        }
1103    }
1104
1105    /// Start the network in the background and return a controller
1106    pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1107        let config = self.network_state.config.clone();
1108
1109        let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1110
1111        // dial whitelist_nodes
1112        for addr in self.network_state.config.whitelist_peers() {
1113            debug!("Dial whitelist_peers {:?}", addr);
1114            self.network_state.dial_identify(&p2p_control, addr);
1115        }
1116
1117        let target = &self.network_state.required_flags;
1118
1119        // get bootnodes
1120        // try get addrs from peer_store, if peer_store have no enough addrs then use bootnodes
1121        let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1122            let count = max((config.max_outbound_peers >> 1) as usize, 1);
1123            let mut addrs: Vec<_> = peer_store
1124                .fetch_addrs_to_attempt(count, *target, |_| true)
1125                .into_iter()
1126                .map(|paddr| paddr.addr)
1127                .collect();
1128            // Get bootnodes randomly
1129            let bootnodes = self
1130                .network_state
1131                .bootnodes
1132                .iter()
1133                .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1134                .into_iter()
1135                .cloned();
1136            addrs.extend(bootnodes);
1137            addrs
1138        });
1139
1140        // dial half bootnodes
1141        for addr in bootnodes {
1142            debug!("Dial bootnode {:?}", addr);
1143            self.network_state.dial_identify(&p2p_control, addr);
1144        }
1145
1146        let Self {
1147            mut p2p_service,
1148            network_state,
1149            ping_controller,
1150            bg_services,
1151            version,
1152        } = self;
1153
1154        // NOTE: for ensure background task finished
1155        let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1156            .into_iter()
1157            .map(|bg_service| {
1158                let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1159                (signal_sender, (bg_service, signal_receiver))
1160            })
1161            .unzip();
1162
1163        let receiver: CancellationToken = new_tokio_exit_rx();
1164        #[cfg(not(target_family = "wasm"))]
1165        let (start_sender, start_receiver) = mpsc::channel();
1166        {
1167            #[cfg(not(target_family = "wasm"))]
1168            let network_state = Arc::clone(&network_state);
1169            let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1170            handle.spawn_task(async move {
1171                #[cfg(not(target_family = "wasm"))]
1172                {
1173                    let listen_addresses = {
1174                        let mut addresses = config.listen_addresses.clone();
1175                        if config.reuse_tcp_with_ws {
1176                            let ws_listens = addresses
1177                                .iter()
1178                                .cloned()
1179                                .filter_map(|mut addr| {
1180                                    if matches!(find_type(&addr), TransportType::Tcp) {
1181                                        addr.push(Protocol::Ws);
1182                                        Some(addr)
1183                                    } else {
1184                                        None
1185                                    }
1186                                })
1187                                .collect::<Vec<_>>();
1188
1189                            addresses.extend(ws_listens);
1190                        }
1191                        let mut addresses = addresses
1192                            .into_iter()
1193                            .collect::<HashSet<_>>()
1194                            .into_iter()
1195                            .collect::<Vec<_>>();
1196                        addresses.sort_by(|a, b| {
1197                            let ty_a = find_type(a);
1198                            let ty_b = find_type(b);
1199
1200                            ty_a.cmp(&ty_b)
1201                        });
1202
1203                        addresses
1204                    };
1205
1206                    for addr in &listen_addresses {
1207                        match p2p_service.listen(addr.to_owned()).await {
1208                            Ok(listen_address) => {
1209                                info!("Listen on address: {}", listen_address);
1210                                network_state
1211                                    .listened_addrs
1212                                    .write()
1213                                    .push(listen_address.clone());
1214                            }
1215                            Err(err) => {
1216                                warn!(
1217                                    "Listen on address {} failed, due to error: {}",
1218                                    addr.clone(),
1219                                    err
1220                                );
1221                                start_sender
1222                                    .send(Err(Error::P2P(P2PError::Transport(err))))
1223                                    .expect("channel abnormal shutdown");
1224                                return;
1225                            }
1226                        };
1227                    }
1228                    start_sender.send(Ok(())).unwrap();
1229                }
1230
1231                p2p::runtime::spawn(async move { p2p_service.run().await });
1232                tokio::select! {
1233                    _ = receiver.cancelled() => {
1234                        info!("NetworkService receive exit signal, start shutdown...");
1235                        let _ = p2p_control.shutdown().await;
1236                        // Drop senders to stop all corresponding background task
1237                        drop(bg_signals);
1238
1239                        info!("NetworkService shutdown now");
1240                    },
1241                    else => {
1242                        let _ = p2p_control.shutdown().await;
1243                        // Drop senders to stop all corresponding background task
1244                        drop(bg_signals);
1245                    },
1246                }
1247            });
1248        }
1249        for (mut service, mut receiver) in bg_receivers {
1250            handle.spawn_task(async move {
1251                loop {
1252                    tokio::select! {
1253                        _ = &mut service => {},
1254                        _ = &mut receiver => break
1255                    }
1256                }
1257            });
1258        }
1259        #[cfg(not(target_family = "wasm"))]
1260        if let Ok(Err(e)) = start_receiver.recv() {
1261            return Err(e);
1262        }
1263
1264        Ok(NetworkController {
1265            version,
1266            network_state,
1267            p2p_control,
1268            ping_controller,
1269        })
1270    }
1271}
1272
1273/// Network controller
1274#[derive(Clone)]
1275pub struct NetworkController {
1276    version: String,
1277    network_state: Arc<NetworkState>,
1278    p2p_control: ServiceControl,
1279    ping_controller: Option<Sender<()>>,
1280}
1281
1282impl NetworkController {
1283    /// Set ckb2023 start
1284    pub fn init_ckb2023(&self) {
1285        self.network_state.ckb2023.store(true, Ordering::SeqCst);
1286    }
1287
1288    /// Get ckb2023 flag
1289    pub fn load_ckb2023(&self) -> bool {
1290        self.network_state.ckb2023.load(Ordering::SeqCst)
1291    }
1292
1293    /// Node listen address list
1294    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1295        self.network_state.public_urls(max_urls)
1296    }
1297
1298    /// ckb version
1299    pub fn version(&self) -> &String {
1300        &self.version
1301    }
1302
1303    /// Node peer id's base58 format string
1304    pub fn node_id(&self) -> String {
1305        self.network_state.node_id()
1306    }
1307
1308    /// p2p service control
1309    pub fn p2p_control(&self) -> &ServiceControl {
1310        &self.p2p_control
1311    }
1312
1313    /// Dial remote node
1314    pub fn add_node(&self, address: Multiaddr) {
1315        self.network_state.add_node(&self.p2p_control, address)
1316    }
1317
1318    /// Disconnect session with peer id
1319    pub fn remove_node(&self, peer_id: &PeerId) {
1320        if let Some(session_id) = self
1321            .network_state
1322            .peer_registry
1323            .read()
1324            .get_key_by_peer_id(peer_id)
1325        {
1326            if let Err(err) =
1327                disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1328            {
1329                debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1330            }
1331        } else {
1332            error!("Cannot find peer {:?}", peer_id);
1333        }
1334    }
1335
1336    /// Get banned peer list
1337    pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1338        self.network_state
1339            .peer_store
1340            .lock()
1341            .ban_list()
1342            .get_banned_addrs()
1343    }
1344
1345    /// Clear banned list
1346    pub fn clear_banned_addrs(&self) {
1347        self.network_state.peer_store.lock().clear_ban_list();
1348    }
1349
1350    /// Get address info from peer store
1351    pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1352        self.network_state
1353            .peer_store
1354            .lock()
1355            .addr_manager()
1356            .get(addr)
1357            .cloned()
1358    }
1359
1360    /// Ban an ip
1361    pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1362        self.disconnect_peers_in_ip_range(address, &ban_reason);
1363        self.network_state
1364            .peer_store
1365            .lock()
1366            .ban_network(address, ban_until, ban_reason)
1367    }
1368
1369    /// Unban an ip
1370    pub fn unban(&self, address: &IpNetwork) {
1371        self.network_state
1372            .peer_store
1373            .lock()
1374            .mut_ban_list()
1375            .unban_network(address);
1376    }
1377
1378    /// Return all connected peers' information
1379    pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1380        self.network_state.with_peer_registry(|reg| {
1381            reg.peers()
1382                .iter()
1383                .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1384                .collect::<Vec<_>>()
1385        })
1386    }
1387
1388    /// Ban an peer through peer index
1389    pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1390        self.network_state
1391            .ban_session(&self.p2p_control, peer_index, duration, reason);
1392    }
1393
1394    /// disconnect peers with matched peer_ip or peer_ip_network, eg: 192.168.0.2 or 192.168.0.0/24
1395    fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1396        self.network_state.with_peer_registry(|reg| {
1397            reg.peers().iter().for_each(|(peer_index, peer)| {
1398                if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1399                    if address.contains(addr.ip()) {
1400                        let _ = disconnect_with_message(
1401                            &self.p2p_control,
1402                            *peer_index,
1403                            &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1404                        );
1405                    }
1406                }
1407            })
1408        });
1409    }
1410
1411    fn try_broadcast(
1412        &self,
1413        quick: bool,
1414        target: Option<SessionId>,
1415        proto_id: ProtocolId,
1416        data: Bytes,
1417    ) -> Result<(), SendErrorKind> {
1418        let now = Instant::now();
1419        loop {
1420            let target = target
1421                .map(TargetSession::Single)
1422                .unwrap_or(TargetSession::All);
1423            let result = if quick {
1424                self.p2p_control
1425                    .quick_filter_broadcast(target, proto_id, data.clone())
1426            } else {
1427                self.p2p_control
1428                    .filter_broadcast(target, proto_id, data.clone())
1429            };
1430            match result {
1431                Ok(()) => {
1432                    return Ok(());
1433                }
1434                Err(SendErrorKind::WouldBlock) => {
1435                    if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1436                        warn!("Broadcast message to {} timeout", proto_id);
1437                        return Err(SendErrorKind::WouldBlock);
1438                    }
1439                    thread::sleep(P2P_TRY_SEND_INTERVAL);
1440                }
1441                Err(err) => {
1442                    warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1443                    return Err(err);
1444                }
1445            }
1446        }
1447    }
1448
1449    /// Broadcast a message to all connected peers
1450    pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1451        self.try_broadcast(false, None, proto_id, data)
1452    }
1453
1454    /// Broadcast a message to all connected peers through quick queue
1455    pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1456        self.try_broadcast(true, None, proto_id, data)
1457    }
1458
1459    /// Send message to one connected peer
1460    pub fn send_message_to(
1461        &self,
1462        session_id: SessionId,
1463        proto_id: ProtocolId,
1464        data: Bytes,
1465    ) -> Result<(), SendErrorKind> {
1466        self.try_broadcast(false, Some(session_id), proto_id, data)
1467    }
1468
1469    /// network message processing controller, always true, if false, discard any received messages
1470    pub fn is_active(&self) -> bool {
1471        self.network_state.is_active()
1472    }
1473
1474    /// Change active status, if set false discard any received messages
1475    pub fn set_active(&self, active: bool) {
1476        self.network_state.active.store(active, Ordering::Release);
1477    }
1478
1479    /// Return all connected peers' protocols info
1480    pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1481        self.network_state.protocols.read().clone()
1482    }
1483
1484    /// Try ping all connected peers
1485    pub fn ping_peers(&self) {
1486        if let Some(mut ping_controller) = self.ping_controller.clone() {
1487            let _ignore = ping_controller.try_send(());
1488        }
1489    }
1490}
1491
1492// Send an optional message before disconnect a peer
1493pub(crate) fn disconnect_with_message(
1494    control: &ServiceControl,
1495    peer_index: SessionId,
1496    message: &str,
1497) -> Result<(), SendErrorKind> {
1498    if !message.is_empty() {
1499        let data = Bytes::from(message.as_bytes().to_vec());
1500        // Must quick send, otherwise this message will be dropped.
1501        control.quick_send_message_to(
1502            peer_index,
1503            SupportProtocols::DisconnectMessage.protocol_id(),
1504            data,
1505        )?;
1506    }
1507    control.disconnect(peer_index)
1508}
1509
1510pub(crate) async fn async_disconnect_with_message(
1511    control: &ServiceAsyncControl,
1512    peer_index: SessionId,
1513    message: &str,
1514) -> Result<(), SendErrorKind> {
1515    if !message.is_empty() {
1516        let data = Bytes::from(message.as_bytes().to_vec());
1517        // Must quick send, otherwise this message will be dropped.
1518        control
1519            .quick_send_message_to(
1520                peer_index,
1521                SupportProtocols::DisconnectMessage.protocol_id(),
1522                data,
1523            )
1524            .await?;
1525    }
1526    control.disconnect(peer_index).await
1527}
1528
1529/// Transport type on ckb
1530#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1531pub enum TransportType {
1532    /// Tcp
1533    Tcp,
1534    /// Ws
1535    Ws,
1536    /// Wss only on wasm
1537    Wss,
1538}
1539
1540pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1541    let mut iter = addr.iter();
1542
1543    iter.find_map(|proto| match proto {
1544        Protocol::Ws => Some(TransportType::Ws),
1545        Protocol::Wss => Some(TransportType::Wss),
1546        _ => None,
1547    })
1548    .unwrap_or(TransportType::Tcp)
1549}