ckb_network/
network.rs

1//! Global state struct and start function
2use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7    PeerStore,
8    types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11    disconnect_message::DisconnectMessageProtocol,
12    discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13    feeler::Feeler,
14    identify::{Flags, IdentifyCallback, IdentifyProtocol},
15    ping::PingHandler,
16    support_protocols::SupportProtocols,
17};
18use crate::services::{
19    dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20    protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{Future, channel::mpsc::Sender};
30use ipnetwork::IpNetwork;
31use p2p::{
32    SessionId, async_trait,
33    builder::ServiceBuilder,
34    bytes::Bytes,
35    context::{ServiceContext, SessionContext},
36    error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37    multiaddr::{Multiaddr, Protocol},
38    secio::{self, PeerId, SecioKeyPair, error::SecioError},
39    service::{
40        ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41        TargetSession,
42    },
43    traits::ServiceHandle,
44    utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45    yamux::config::Config as YamuxConfig,
46};
47use rand::prelude::IteratorRandom;
48#[cfg(feature = "with_sentry")]
49use sentry::{Level, capture_message, with_scope};
50#[cfg(not(target_family = "wasm"))]
51use std::sync::mpsc;
52use std::{
53    borrow::Cow,
54    cmp::max,
55    collections::{HashMap, HashSet},
56    pin::Pin,
57    sync::{
58        Arc,
59        atomic::{AtomicBool, Ordering},
60    },
61    thread,
62};
63use tokio::{self, sync::oneshot};
64
65const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
66const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
67// After 5 minutes we consider this dial hang
68const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
69
70/// The global shared state of the network module
71pub struct NetworkState {
72    pub(crate) peer_registry: RwLock<PeerRegistry>,
73    pub(crate) peer_store: Mutex<PeerStore>,
74    /// Node listened addresses
75    pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
76    dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77    /// Node public addresses,
78    /// includes manually public addrs and remote peer observed addrs
79    public_addrs: RwLock<HashSet<Multiaddr>>,
80    pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
81    local_private_key: secio::SecioKeyPair,
82    local_peer_id: PeerId,
83    pub(crate) bootnodes: Vec<Multiaddr>,
84    pub(crate) config: NetworkConfig,
85    pub(crate) active: AtomicBool,
86    /// Node supported protocols
87    /// fields: ProtocolId, Protocol Name, Supported Versions
88    pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
89    pub(crate) required_flags: Flags,
90
91    pub(crate) ckb2023: AtomicBool,
92}
93
94impl NetworkState {
95    /// Init from config
96    #[cfg(not(target_family = "wasm"))]
97    pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
98        config.create_dir_if_not_exists()?;
99        let local_private_key = config.fetch_private_key()?;
100        let local_peer_id = local_private_key.peer_id();
101        // set max score to public addresses
102        let public_addrs: HashSet<Multiaddr> = config
103            .listen_addresses
104            .iter()
105            .chain(config.public_addresses.iter())
106            .cloned()
107            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
108                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
109                _ => {
110                    match extract_peer_id(&addr) {
111                        Some(peer_id) if peer_id != local_peer_id => {
112                            error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
113                            std::process::exit(1);
114                        }
115                        Some(_) => (),
116                        None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
117                    }
118                    Some(addr)
119                }
120            })
121            .collect();
122        info!("Loading the peer store. This process may take a few seconds to complete.");
123
124        let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
125            config.peer_store_path(),
126        ));
127        let bootnodes = config.bootnodes();
128
129        let peer_registry = PeerRegistry::new(
130            config.max_inbound_peers(),
131            config.max_outbound_peers(),
132            config.whitelist_only,
133            config.whitelist_peers(),
134        );
135
136        Ok(NetworkState {
137            peer_store,
138            config,
139            bootnodes,
140            peer_registry: RwLock::new(peer_registry),
141            dialing_addrs: RwLock::new(HashMap::default()),
142            public_addrs: RwLock::new(public_addrs),
143            listened_addrs: RwLock::new(Vec::new()),
144            pending_observed_addrs: RwLock::new(HashSet::default()),
145            local_private_key,
146            local_peer_id,
147            active: AtomicBool::new(true),
148            protocols: RwLock::new(Vec::new()),
149            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
150            ckb2023: AtomicBool::new(false),
151        })
152    }
153
154    #[cfg(target_family = "wasm")]
155    pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
156        let local_private_key = config.fetch_private_key()?;
157        let local_peer_id = local_private_key.peer_id();
158        // set max score to public addresses
159        let public_addrs: HashSet<Multiaddr> = config
160            .listen_addresses
161            .iter()
162            .chain(config.public_addresses.iter())
163            .cloned()
164            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
165                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
166                _ => {
167                    if extract_peer_id(&addr).is_none() {
168                        addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
169                    }
170                    Some(addr)
171                }
172            })
173            .collect();
174        info!("Loading the peer store. This process may take a few seconds to complete.");
175        let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
176        let bootnodes = config.bootnodes();
177
178        let peer_registry = PeerRegistry::new(
179            config.max_inbound_peers(),
180            config.max_outbound_peers(),
181            config.whitelist_only,
182            config.whitelist_peers(),
183        );
184        Ok(NetworkState {
185            peer_store,
186            config,
187            bootnodes,
188            peer_registry: RwLock::new(peer_registry),
189            dialing_addrs: RwLock::new(HashMap::default()),
190            public_addrs: RwLock::new(public_addrs),
191            listened_addrs: RwLock::new(Vec::new()),
192            pending_observed_addrs: RwLock::new(HashSet::default()),
193            local_private_key,
194            local_peer_id,
195            active: AtomicBool::new(true),
196            protocols: RwLock::new(Vec::new()),
197            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
198            ckb2023: AtomicBool::new(false),
199        })
200    }
201
202    /// fork flag
203    pub fn ckb2023(self, init: bool) -> Self {
204        self.ckb2023.store(init, Ordering::SeqCst);
205        self
206    }
207
208    /// use to discovery get nodes message to announce what kind of node information need from the other peer
209    /// default with `Flags::SYNC | Flags::DISCOVERY | Flags::RELAY`
210    pub fn required_flags(mut self, flags: Flags) -> Self {
211        self.required_flags = flags;
212        self
213    }
214
215    pub(crate) fn report_session(
216        &self,
217        p2p_control: &ServiceControl,
218        session_id: SessionId,
219        behaviour: Behaviour,
220    ) {
221        if let Some(addr) = self.with_peer_registry(|reg| {
222            reg.get_peer(session_id)
223                .filter(|peer| !peer.is_whitelist)
224                .map(|peer| peer.connected_addr.clone())
225        }) {
226            trace!("Report {:?} because {:?}", addr, behaviour);
227            let report_result = self.peer_store.lock().report(&addr, behaviour);
228            if report_result.is_banned() {
229                if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
230                    debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
231                }
232            }
233        } else {
234            debug!(
235                "Report {} failure: not found in peer registry or it is on the whitelist",
236                session_id
237            );
238        }
239    }
240
241    pub(crate) fn ban_session(
242        &self,
243        p2p_control: &ServiceControl,
244        session_id: SessionId,
245        duration: Duration,
246        reason: String,
247    ) {
248        if let Some(addr) = self.with_peer_registry(|reg| {
249            reg.get_peer(session_id)
250                .filter(|peer| !peer.is_whitelist)
251                .map(|peer| peer.connected_addr.clone())
252        }) {
253            info!(
254                "Ban peer {:?} for {} seconds, reason: {}",
255                addr,
256                duration.as_secs(),
257                reason
258            );
259            if let Some(metrics) = ckb_metrics::handle() {
260                metrics.ckb_network_ban_peer.inc();
261            }
262            if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
263                let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
264                self.peer_store.lock().ban_addr(
265                    &peer.connected_addr,
266                    duration.as_millis() as u64,
267                    reason,
268                );
269                if let Err(err) =
270                    disconnect_with_message(p2p_control, peer.session_id, message.as_str())
271                {
272                    debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
273                }
274            }
275        } else {
276            debug!(
277                "Ban session({}) failed: not found in peer registry or it is on the whitelist",
278                session_id
279            );
280        }
281    }
282
283    pub(crate) fn accept_peer(
284        &self,
285        session_context: &SessionContext,
286    ) -> Result<Option<Peer>, Error> {
287        // NOTE: be careful, here easy cause a deadlock,
288        //    because peer_store's lock scope across peer_registry's lock scope
289        let mut peer_store = self.peer_store.lock();
290        let accept_peer_result = {
291            self.peer_registry.write().accept_peer(
292                session_context.address.clone(),
293                session_context.id,
294                session_context.ty,
295                &mut peer_store,
296            )
297        };
298        accept_peer_result
299    }
300
301    /// For restrict lock in inner scope
302    pub fn with_peer_registry<F, T>(&self, callback: F) -> T
303    where
304        F: FnOnce(&PeerRegistry) -> T,
305    {
306        callback(&self.peer_registry.read())
307    }
308
309    // For restrict lock in inner scope
310    pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
311    where
312        F: FnOnce(&mut PeerRegistry) -> T,
313    {
314        callback(&mut self.peer_registry.write())
315    }
316
317    // For restrict lock in inner scope
318    pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
319    where
320        F: FnOnce(&mut PeerStore) -> T,
321    {
322        callback(&mut self.peer_store.lock())
323    }
324
325    /// Get peer id of local node
326    pub fn local_peer_id(&self) -> &PeerId {
327        &self.local_peer_id
328    }
329
330    /// Use on test
331    pub fn local_private_key(&self) -> &secio::SecioKeyPair {
332        &self.local_private_key
333    }
334
335    /// Get local node's peer id in base58 format string
336    pub fn node_id(&self) -> String {
337        self.local_peer_id().to_base58()
338    }
339
340    pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341        self.public_addrs
342            .read()
343            .iter()
344            .take(count)
345            .cloned()
346            .collect()
347    }
348
349    pub(crate) fn connection_status(&self) -> ConnectionStatus {
350        self.peer_registry.read().connection_status()
351    }
352
353    /// Get local node's listen address list
354    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
355        let listened_addrs = self.listened_addrs.read();
356        self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
357            .into_iter()
358            .filter_map(|addr| {
359                if !listened_addrs.contains(&addr) {
360                    Some((addr, 1))
361                } else {
362                    None
363                }
364            })
365            .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
366            .map(|(addr, score)| (addr.to_string(), score))
367            .collect()
368    }
369
370    pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
371        self.dial_identify(p2p_control, address);
372    }
373
374    /// use a filter to get protocol id list
375    pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
376        self.protocols
377            .read()
378            .iter()
379            .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
380            .collect::<Vec<_>>()
381    }
382
383    pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
384        let peer_id = extract_peer_id(addr);
385        if peer_id.is_none() {
386            error!("Do not dial addr without peer id, addr: {}", addr);
387            return false;
388        }
389        let peer_id = peer_id.as_ref().unwrap();
390
391        if self.local_peer_id() == peer_id {
392            trace!("Do not dial self: {:?}, {}", peer_id, addr);
393            return false;
394        }
395        if self.public_addrs.read().contains(addr) {
396            trace!(
397                "Do not dial listened address(self): {:?}, {}",
398                peer_id, addr
399            );
400            return false;
401        }
402
403        let peer_in_registry = self.with_peer_registry(|reg| {
404            reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
405        });
406        if peer_in_registry {
407            trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
408            return false;
409        }
410
411        if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
412            trace!(
413                "Do not send repeated dial commands to network service: {:?}, {}",
414                peer_id, addr
415            );
416            if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
417                #[cfg(feature = "with_sentry")]
418                with_scope(
419                    |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
420                    || {
421                        capture_message(
422                            &format!(
423                                "Dialing {:?}, {:?} for more than {} seconds, \
424                                 something is wrong in network service",
425                                peer_id,
426                                addr,
427                                DIAL_HANG_TIMEOUT.as_secs(),
428                            ),
429                            Level::Warning,
430                        )
431                    },
432                );
433            }
434            return false;
435        }
436
437        true
438    }
439
440    pub(crate) fn dial_success(&self, addr: &Multiaddr) {
441        if let Some(peer_id) = extract_peer_id(addr) {
442            self.dialing_addrs.write().remove(&peer_id);
443        }
444    }
445
446    pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
447        self.with_peer_registry_mut(|reg| {
448            reg.remove_feeler(addr);
449        });
450
451        if let Some(peer_id) = extract_peer_id(addr) {
452            self.dialing_addrs.write().remove(&peer_id);
453        }
454    }
455
456    /// Dial
457    /// return value indicates the dialing is actually sent or denied.
458    fn dial_inner(
459        &self,
460        p2p_control: &ServiceControl,
461        addr: Multiaddr,
462        target: TargetProtocol,
463    ) -> Result<(), Error> {
464        if !self.can_dial(&addr) {
465            return Err(Error::Dial(format!("ignore dialing addr {addr}")));
466        }
467
468        debug!("Dialing {addr}");
469        p2p_control.dial(addr.clone(), target)?;
470        self.dialing_addrs.write().insert(
471            extract_peer_id(&addr).expect("verified addr"),
472            Instant::now(),
473        );
474        Ok(())
475    }
476
477    /// Dial just identify protocol
478    pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
479        if let Err(err) = self.dial_inner(
480            p2p_control,
481            addr,
482            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
483        ) {
484            debug!("dial_identify error: {err}");
485        }
486    }
487
488    /// Dial just feeler protocol
489    pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
490        if let Err(err) = self.dial_inner(
491            p2p_control,
492            addr.clone(),
493            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
494        ) {
495            debug!("dial_feeler error {err}");
496        } else {
497            self.with_peer_registry_mut(|reg| {
498                reg.add_feeler(&addr);
499            });
500        }
501    }
502
503    /// this method is intent to check observed addr by dial to self
504    pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
505        let mut pending_observed_addrs = self.pending_observed_addrs.write();
506        if pending_observed_addrs.is_empty() {
507            let addrs = self.public_addrs.read();
508            if addrs.is_empty() {
509                return;
510            }
511            // random get addr
512            if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
513                if let Err(err) = p2p_control.dial(
514                    addr.clone(),
515                    TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
516                ) {
517                    trace!("try_dial_observed_addrs {err} failed in public address")
518                }
519            }
520        } else {
521            for addr in pending_observed_addrs.drain() {
522                trace!("try dial observed addr: {:?}", addr);
523                if let Err(err) = p2p_control.dial(
524                    addr,
525                    TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
526                ) {
527                    trace!("try_dial_observed_addrs {err} failed in pending observed addresses")
528                }
529            }
530        }
531    }
532
533    /// add observed address for identify protocol
534    pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
535        let mut pending_observed_addrs = self.pending_observed_addrs.write();
536        pending_observed_addrs.extend(iter)
537    }
538
539    /// Network message processing controller, default is true, if false, discard any received messages
540    pub fn is_active(&self) -> bool {
541        self.active.load(Ordering::Acquire)
542    }
543}
544
545/// Used to handle global events of tentacle, such as session open/close
546pub struct EventHandler {
547    pub(crate) network_state: Arc<NetworkState>,
548}
549
550impl EventHandler {
551    /// init an event handler
552    pub fn new(network_state: Arc<NetworkState>) -> Self {
553        Self { network_state }
554    }
555}
556
557/// Exit trait used to notify all other module to exit
558pub trait ExitHandler: Send + Unpin + 'static {
559    /// notify other module to exit
560    fn notify_exit(&self);
561}
562
563/// Default exit handle
564#[derive(Clone, Default)]
565pub struct DefaultExitHandler {
566    lock: Arc<Mutex<()>>,
567    exit: Arc<Condvar>,
568}
569
570impl DefaultExitHandler {
571    /// Block on current thread util exit notify
572    pub fn wait_for_exit(&self) {
573        self.exit.wait(&mut self.lock.lock());
574    }
575}
576
577impl ExitHandler for DefaultExitHandler {
578    fn notify_exit(&self) {
579        self.exit.notify_all();
580    }
581}
582
583impl EventHandler {
584    fn inbound_eviction(&self) -> Vec<PeerIndex> {
585        if self.network_state.config.bootnode_mode {
586            let status = self.network_state.connection_status();
587
588            if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
589                self.network_state
590                    .with_peer_registry(|registry| {
591                        registry
592                            .peers()
593                            .values()
594                            .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
595                            .map(|peer| peer.session_id)
596                            .collect::<Vec<SessionId>>()
597                    })
598                    .into_iter()
599                    .enumerate()
600                    .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
601                    .collect()
602            } else {
603                Vec::new()
604            }
605        } else {
606            Vec::new()
607        }
608    }
609}
610
611#[async_trait]
612impl ServiceHandle for EventHandler {
613    async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
614        match error {
615            ServiceError::DialerError { address, error } => {
616                let mut public_addrs = self.network_state.public_addrs.write();
617
618                match error {
619                    DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
620                        SecioError::ConnectSelf,
621                    )) => {
622                        debug!("dial observed address success: {:?}", address);
623                        if let Some(ip) = multiaddr_to_socketaddr(&address) {
624                            if is_reachable(ip.ip()) {
625                                public_addrs.insert(address);
626                            }
627                        }
628                        return;
629                    }
630                    DialerErrorKind::IoError(e)
631                        if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
632                    {
633                        warn!("DialerError({}) {}", address, e);
634                    }
635                    _ => {
636                        debug!("DialerError({}) {}", address, error);
637                    }
638                }
639                if public_addrs.remove(&address) {
640                    info!(
641                        "Dial {} failed, remove it from network_state.public_addrs",
642                        address
643                    );
644                }
645                self.network_state.dial_failed(&address);
646            }
647            ServiceError::ProtocolError {
648                id,
649                proto_id,
650                error,
651            } => {
652                debug!("ProtocolError({}, {}) {}", id, proto_id, error);
653                let message = format!("ProtocolError id={proto_id}");
654                // Ban because misbehave of remote peer
655                self.network_state.ban_session(
656                    &context.control().clone().into(),
657                    id,
658                    Duration::from_secs(300),
659                    message,
660                );
661            }
662            ServiceError::SessionTimeout { session_context } => {
663                debug!(
664                    "SessionTimeout({}, {})",
665                    session_context.id, session_context.address,
666                );
667            }
668            ServiceError::MuxerError {
669                session_context,
670                error,
671            } => {
672                debug!(
673                    "MuxerError({}, {}), substream error {}, disconnect it",
674                    session_context.id, session_context.address, error,
675                );
676            }
677            ServiceError::ListenError { address, error } => {
678                debug!("ListenError: address={:?}, error={:?}", address, error);
679            }
680            ServiceError::ProtocolSelectError {
681                proto_name,
682                session_context,
683            } => {
684                debug!(
685                    "ProtocolSelectError: proto_name={:?}, session_id={}",
686                    proto_name, session_context.id,
687                );
688            }
689            ServiceError::SessionBlocked { session_context } => {
690                debug!("SessionBlocked: {}", session_context.id);
691            }
692            ServiceError::ProtocolHandleError { proto_id, error } => {
693                debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
694
695                let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
696                {
697                    if let Some(id) = opt_session_id {
698                        self.network_state.ban_session(
699                            &context.control().clone().into(),
700                            id,
701                            Duration::from_secs(300),
702                            format!("protocol {proto_id} panic when process peer message"),
703                        );
704                    }
705                    #[cfg(feature = "with_sentry")]
706                    with_scope(
707                        |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
708                        || {
709                            capture_message(
710                                &format!(
711                                    "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
712                                ),
713                                Level::Warning,
714                            )
715                        },
716                    );
717                    error!(
718                        "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
719                    );
720
721                    broadcast_exit_signals();
722                }
723            }
724        }
725    }
726
727    async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
728        // When session disconnect update status anyway
729        match event {
730            ServiceEvent::SessionOpen { session_context } => {
731                debug!(
732                    "SessionOpen({}, {})",
733                    session_context.id, session_context.address,
734                );
735
736                self.network_state.dial_success(&session_context.address);
737
738                let iter = self.inbound_eviction();
739
740                let control = context.control().clone().into();
741
742                for peer in iter {
743                    if let Err(err) =
744                        disconnect_with_message(&control, peer, "bootnode random eviction")
745                    {
746                        debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
747                    }
748                }
749
750                if self
751                    .network_state
752                    .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
753                {
754                    debug!(
755                        "Feeler connected {} => {}",
756                        session_context.id, session_context.address,
757                    );
758                } else {
759                    match self.network_state.accept_peer(&session_context) {
760                        Ok(Some(evicted_peer)) => {
761                            debug!(
762                                "Disconnect peer, {} => {}",
763                                evicted_peer.session_id, evicted_peer.connected_addr,
764                            );
765                            if let Err(err) = disconnect_with_message(
766                                &control,
767                                evicted_peer.session_id,
768                                "evict because accepted better peer",
769                            ) {
770                                debug!(
771                                    "Disconnect failed {:?}, error: {:?}",
772                                    evicted_peer.session_id, err
773                                );
774                            }
775                        }
776                        Ok(None) => debug!(
777                            "{} open, registry {} success",
778                            session_context.id, session_context.address,
779                        ),
780                        Err(err) => {
781                            debug!(
782                                "Peer registry failed {:?}. Disconnect {} => {}",
783                                err, session_context.id, session_context.address,
784                            );
785                            if let Err(err) = disconnect_with_message(
786                                &control,
787                                session_context.id,
788                                "reject peer connection",
789                            ) {
790                                debug!(
791                                    "Disconnect failed {:?}, error: {:?}",
792                                    session_context.id, err
793                                );
794                            }
795                        }
796                    }
797                }
798            }
799            ServiceEvent::SessionClose { session_context } => {
800                debug!(
801                    "SessionClose({}, {})",
802                    session_context.id, session_context.address,
803                );
804                let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
805                    // should make sure feelers is clean
806                    reg.remove_feeler(&session_context.address);
807                    reg.remove_peer(session_context.id).is_some()
808                });
809                if peer_exists {
810                    debug!(
811                        "{} closed. Remove {} from peer_registry",
812                        session_context.id, session_context.address,
813                    );
814                    self.network_state.with_peer_store_mut(|peer_store| {
815                        peer_store.remove_disconnected_peer(&session_context.address);
816                    });
817                }
818            }
819            _ => {
820                info!("p2p service event: {:?}", event);
821            }
822        }
823    }
824}
825
826/// Ckb network service, use to start p2p network
827pub struct NetworkService {
828    p2p_service: Service<EventHandler, SecioKeyPair>,
829    network_state: Arc<NetworkState>,
830    ping_controller: Option<Sender<()>>,
831    // Background services
832    bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
833    version: String,
834}
835
836impl NetworkService {
837    /// init with all config
838    pub fn new(
839        network_state: Arc<NetworkState>,
840        protocols: Vec<CKBProtocol>,
841        required_protocol_ids: Vec<ProtocolId>,
842        // name, version, flags
843        identify_announce: (String, String, Flags),
844        transport_type: TransportType,
845    ) -> Self {
846        let config = &network_state.config;
847
848        if config.support_protocols.iter().collect::<HashSet<_>>()
849            != default_support_all_protocols()
850                .iter()
851                .collect::<HashSet<_>>()
852        {
853            warn!(
854                "Customized supported protocols: {:?}",
855                config.support_protocols
856            );
857        }
858
859        // == Build p2p service struct
860        let mut protocol_metas = protocols
861            .into_iter()
862            .map(CKBProtocol::build)
863            .collect::<Vec<_>>();
864
865        // == Build special protocols
866
867        // Identify is a core protocol, user cannot disable it via config
868        let identify_callback = IdentifyCallback::new(
869            Arc::clone(&network_state),
870            identify_announce.0,
871            identify_announce.1.clone(),
872            identify_announce.2,
873        );
874        let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
875            ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
876        });
877        protocol_metas.push(identify_meta);
878
879        // Ping protocol
880        let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
881            let ping_interval = Duration::from_secs(config.ping_interval_secs);
882            let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
883
884            let ping_network_state = Arc::clone(&network_state);
885            let (ping_handler, ping_controller) =
886                PingHandler::new(ping_interval, ping_timeout, ping_network_state);
887            let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
888                ProtocolHandle::Callback(Box::new(ping_handler))
889            });
890            protocol_metas.push(ping_meta);
891            Some(ping_controller)
892        } else {
893            None
894        };
895
896        // Discovery protocol
897        if config
898            .support_protocols
899            .contains(&SupportProtocol::Discovery)
900        {
901            let addr_mgr = DiscoveryAddressManager {
902                network_state: Arc::clone(&network_state),
903                discovery_local_address: config.discovery_local_address,
904            };
905            let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
906                ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
907                    addr_mgr,
908                    config
909                        .discovery_announce_check_interval_secs
910                        .map(Duration::from_secs),
911                )))
912            });
913            protocol_metas.push(disc_meta);
914        }
915
916        // Feeler protocol
917        if config.support_protocols.contains(&SupportProtocol::Feeler) {
918            let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
919                let network_state = Arc::clone(&network_state);
920                move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
921            });
922            protocol_metas.push(feeler_meta);
923        }
924
925        // DisconnectMessage protocol
926        if config
927            .support_protocols
928            .contains(&SupportProtocol::DisconnectMessage)
929        {
930            let disconnect_message_state = Arc::clone(&network_state);
931            let disconnect_message_meta = SupportProtocols::DisconnectMessage
932                .build_meta_with_service_handle(move || {
933                    ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
934                        disconnect_message_state,
935                    )))
936                });
937            protocol_metas.push(disconnect_message_meta);
938        }
939
940        let mut service_builder = ServiceBuilder::default();
941        let yamux_config = YamuxConfig {
942            max_stream_count: protocol_metas.len(),
943            max_stream_window_size: 1024 * 1024,
944            ..Default::default()
945        };
946        for meta in protocol_metas.into_iter() {
947            network_state
948                .protocols
949                .write()
950                .push((meta.id(), meta.name(), meta.support_versions()));
951            service_builder = service_builder.insert_protocol(meta);
952        }
953        let event_handler = EventHandler {
954            network_state: Arc::clone(&network_state),
955        };
956        service_builder = service_builder
957            .handshake_type(network_state.local_private_key.clone().into())
958            .yamux_config(yamux_config)
959            .forever(true)
960            .max_connection_number(1024)
961            .set_send_buffer_size(config.max_send_buffer())
962            .set_channel_size(config.channel_size())
963            .timeout(Duration::from_secs(5));
964
965        #[cfg(not(target_family = "wasm"))]
966        {
967            service_builder = service_builder.upnp(config.upnp);
968        }
969
970        #[cfg(target_os = "linux")]
971        let p2p_service = {
972            if config.reuse_port_on_linux {
973                let iter = config.listen_addresses.iter();
974
975                #[derive(Clone, Copy, Debug, Eq, PartialEq)]
976                enum BindType {
977                    None,
978                    Ws,
979                    Tcp,
980                    Both,
981                }
982                impl BindType {
983                    fn transform(&mut self, other: TransportType) {
984                        match (&self, other) {
985                            (BindType::None, TransportType::Ws) => *self = BindType::Ws,
986                            (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
987                            (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
988                            (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
989                            _ => (),
990                        }
991                    }
992
993                    fn is_ready(&self) -> bool {
994                        // should change to Both if ckb enable ws
995                        matches!(self, BindType::Both)
996                    }
997                }
998
999                let mut init = BindType::None;
1000
1001                for multi_addr in iter {
1002                    if init.is_ready() {
1003                        break;
1004                    }
1005                    match find_type(multi_addr) {
1006                        TransportType::Tcp => {
1007                            // only bind once
1008                            if matches!(init, BindType::Tcp) {
1009                                continue;
1010                            }
1011                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1012                                let domain = socket2::Domain::for_address(addr);
1013                                let bind_fn = move |socket: p2p::service::TcpSocket| {
1014                                    let socket_ref = socket2::SockRef::from(&socket);
1015                                    #[cfg(all(
1016                                        unix,
1017                                        not(target_os = "solaris"),
1018                                        not(target_os = "illumos")
1019                                    ))]
1020                                    socket_ref.set_reuse_port(true)?;
1021                                    socket_ref.set_reuse_address(true)?;
1022                                    if socket_ref.domain()? == domain {
1023                                        socket_ref.bind(&addr.into())?;
1024                                    }
1025                                    Ok(socket)
1026                                };
1027                                init.transform(TransportType::Tcp);
1028                                service_builder = service_builder.tcp_config(bind_fn);
1029                            }
1030                        }
1031                        TransportType::Ws | TransportType::Wss => {
1032                            // only bind once
1033                            if matches!(init, BindType::Ws) {
1034                                continue;
1035                            }
1036                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1037                                let domain = socket2::Domain::for_address(addr);
1038                                let bind_fn = move |socket: p2p::service::TcpSocket| {
1039                                    let socket_ref = socket2::SockRef::from(&socket);
1040                                    #[cfg(all(
1041                                        unix,
1042                                        not(target_os = "solaris"),
1043                                        not(target_os = "illumos")
1044                                    ))]
1045                                    socket_ref.set_reuse_port(true)?;
1046                                    socket_ref.set_reuse_address(true)?;
1047                                    if socket_ref.domain()? == domain {
1048                                        socket_ref.bind(&addr.into())?;
1049                                    }
1050                                    Ok(socket)
1051                                };
1052                                init.transform(TransportType::Ws);
1053                                service_builder = service_builder.tcp_config_on_ws(bind_fn);
1054                            }
1055                        }
1056                    }
1057                }
1058            }
1059
1060            service_builder.build(event_handler)
1061        };
1062
1063        #[cfg(not(target_os = "linux"))]
1064        // The default permissions of Windows are not enough to enable this function,
1065        // and the administrator permissions of group permissions must be turned on.
1066        // This operation is very burdensome for windows users, so it is turned off by default
1067        //
1068        // The integration test fails after MacOS is turned on, the behavior is different from linux.
1069        // Decision to turn off it
1070        let p2p_service = service_builder.build(event_handler);
1071
1072        // == Build background service tasks
1073        let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1074        let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1075            Arc::clone(&network_state),
1076            p2p_service.control().to_owned().into(),
1077            required_protocol_ids,
1078        );
1079        let mut bg_services = vec![
1080            Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1081            Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1082        ];
1083        if config.outbound_peer_service_enabled() {
1084            let outbound_peer_service = OutboundPeerService::new(
1085                Arc::clone(&network_state),
1086                p2p_service.control().to_owned().into(),
1087                Duration::from_secs(config.connect_outbound_interval_secs),
1088                transport_type,
1089            );
1090            bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1091        };
1092
1093        #[cfg(feature = "with_dns_seeding")]
1094        if config.dns_seeding_service_enabled() {
1095            let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1096                Arc::clone(&network_state),
1097                config.dns_seeds.clone(),
1098            );
1099            bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1100        };
1101
1102        NetworkService {
1103            p2p_service,
1104            network_state,
1105            ping_controller,
1106            bg_services,
1107            version: identify_announce.1,
1108        }
1109    }
1110
1111    /// Start the network in the background and return a controller
1112    pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1113        let config = self.network_state.config.clone();
1114
1115        let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1116
1117        // dial whitelist_nodes
1118        for addr in self.network_state.config.whitelist_peers() {
1119            debug!("Dial whitelist_peers {:?}", addr);
1120            self.network_state.dial_identify(&p2p_control, addr);
1121        }
1122
1123        let target = &self.network_state.required_flags;
1124
1125        // get bootnodes
1126        // try get addrs from peer_store, if peer_store have no enough addrs then use bootnodes
1127        let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1128            let count = max((config.max_outbound_peers >> 1) as usize, 1);
1129            let mut addrs: Vec<_> = peer_store
1130                .fetch_addrs_to_attempt(count, *target, |_| true)
1131                .into_iter()
1132                .map(|paddr| paddr.addr)
1133                .collect();
1134            // Get bootnodes randomly
1135            let bootnodes = self
1136                .network_state
1137                .bootnodes
1138                .iter()
1139                .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1140                .into_iter()
1141                .cloned();
1142            addrs.extend(bootnodes);
1143            addrs
1144        });
1145
1146        // dial half bootnodes
1147        for addr in bootnodes {
1148            debug!("Dial bootnode {:?}", addr);
1149            self.network_state.dial_identify(&p2p_control, addr);
1150        }
1151
1152        let Self {
1153            mut p2p_service,
1154            network_state,
1155            ping_controller,
1156            bg_services,
1157            version,
1158        } = self;
1159
1160        // NOTE: for ensure background task finished
1161        let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1162            .into_iter()
1163            .map(|bg_service| {
1164                let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1165                (signal_sender, (bg_service, signal_receiver))
1166            })
1167            .unzip();
1168
1169        let receiver: CancellationToken = new_tokio_exit_rx();
1170        #[cfg(not(target_family = "wasm"))]
1171        let (start_sender, start_receiver) = mpsc::channel();
1172        {
1173            #[cfg(not(target_family = "wasm"))]
1174            let network_state = Arc::clone(&network_state);
1175            let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1176            handle.spawn_task(async move {
1177                #[cfg(not(target_family = "wasm"))]
1178                {
1179                    let listen_addresses = {
1180                        let mut addresses = config.listen_addresses.clone();
1181                        if config.reuse_tcp_with_ws {
1182                            let ws_listens = addresses
1183                                .iter()
1184                                .cloned()
1185                                .filter_map(|mut addr| {
1186                                    if matches!(find_type(&addr), TransportType::Tcp) {
1187                                        addr.push(Protocol::Ws);
1188                                        Some(addr)
1189                                    } else {
1190                                        None
1191                                    }
1192                                })
1193                                .collect::<Vec<_>>();
1194
1195                            addresses.extend(ws_listens);
1196                        }
1197                        let mut addresses = addresses
1198                            .into_iter()
1199                            .collect::<HashSet<_>>()
1200                            .into_iter()
1201                            .collect::<Vec<_>>();
1202                        addresses.sort_by(|a, b| {
1203                            let ty_a = find_type(a);
1204                            let ty_b = find_type(b);
1205
1206                            ty_a.cmp(&ty_b)
1207                        });
1208
1209                        addresses
1210                    };
1211
1212                    for addr in &listen_addresses {
1213                        match p2p_service.listen(addr.to_owned()).await {
1214                            Ok(listen_address) => {
1215                                info!("Listen on address: {}", listen_address);
1216                                network_state
1217                                    .listened_addrs
1218                                    .write()
1219                                    .push(listen_address.clone());
1220                            }
1221                            Err(err) => {
1222                                warn!(
1223                                    "Listen on address {} failed, due to error: {}",
1224                                    addr.clone(),
1225                                    err
1226                                );
1227                                start_sender
1228                                    .send(Err(Error::P2P(P2PError::Transport(err))))
1229                                    .expect("channel abnormal shutdown");
1230                                return;
1231                            }
1232                        };
1233                    }
1234                    start_sender.send(Ok(())).unwrap();
1235                }
1236
1237                p2p::runtime::spawn(async move { p2p_service.run().await });
1238                tokio::select! {
1239                    _ = receiver.cancelled() => {
1240                        info!("NetworkService receive exit signal, start shutdown...");
1241                        let _ = p2p_control.shutdown().await;
1242                        // Drop senders to stop all corresponding background task
1243                        drop(bg_signals);
1244
1245                        info!("NetworkService shutdown now");
1246                    },
1247                    else => {
1248                        let _ = p2p_control.shutdown().await;
1249                        // Drop senders to stop all corresponding background task
1250                        drop(bg_signals);
1251                    },
1252                }
1253            });
1254        }
1255        for (mut service, mut receiver) in bg_receivers {
1256            handle.spawn_task(async move {
1257                loop {
1258                    tokio::select! {
1259                        _ = &mut service => {},
1260                        _ = &mut receiver => break
1261                    }
1262                }
1263            });
1264        }
1265        #[cfg(not(target_family = "wasm"))]
1266        if let Ok(Err(e)) = start_receiver.recv() {
1267            return Err(e);
1268        }
1269
1270        Ok(NetworkController {
1271            version,
1272            network_state,
1273            p2p_control,
1274            ping_controller,
1275        })
1276    }
1277}
1278
1279/// Network controller
1280#[derive(Clone)]
1281pub struct NetworkController {
1282    version: String,
1283    network_state: Arc<NetworkState>,
1284    p2p_control: ServiceControl,
1285    ping_controller: Option<Sender<()>>,
1286}
1287
1288impl NetworkController {
1289    /// Set ckb2023 start
1290    pub fn init_ckb2023(&self) {
1291        self.network_state.ckb2023.store(true, Ordering::SeqCst);
1292    }
1293
1294    /// Get ckb2023 flag
1295    pub fn load_ckb2023(&self) -> bool {
1296        self.network_state.ckb2023.load(Ordering::SeqCst)
1297    }
1298
1299    /// Node listen address list
1300    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1301        self.network_state.public_urls(max_urls)
1302    }
1303
1304    /// ckb version
1305    pub fn version(&self) -> &String {
1306        &self.version
1307    }
1308
1309    /// Node peer id's base58 format string
1310    pub fn node_id(&self) -> String {
1311        self.network_state.node_id()
1312    }
1313
1314    /// p2p service control
1315    pub fn p2p_control(&self) -> &ServiceControl {
1316        &self.p2p_control
1317    }
1318
1319    /// Dial remote node
1320    pub fn add_node(&self, address: Multiaddr) {
1321        self.network_state.add_node(&self.p2p_control, address)
1322    }
1323
1324    /// Disconnect session with peer id
1325    pub fn remove_node(&self, peer_id: &PeerId) {
1326        if let Some(session_id) = self
1327            .network_state
1328            .peer_registry
1329            .read()
1330            .get_key_by_peer_id(peer_id)
1331        {
1332            if let Err(err) =
1333                disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1334            {
1335                debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1336            }
1337        } else {
1338            error!("Cannot find peer {:?}", peer_id);
1339        }
1340    }
1341
1342    /// Get banned peer list
1343    pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1344        self.network_state
1345            .peer_store
1346            .lock()
1347            .ban_list()
1348            .get_banned_addrs()
1349    }
1350
1351    /// Clear banned list
1352    pub fn clear_banned_addrs(&self) {
1353        self.network_state.peer_store.lock().clear_ban_list();
1354    }
1355
1356    /// Get address info from peer store
1357    pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1358        self.network_state
1359            .peer_store
1360            .lock()
1361            .addr_manager()
1362            .get(addr)
1363            .cloned()
1364    }
1365
1366    /// Ban an ip
1367    pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1368        self.disconnect_peers_in_ip_range(address, &ban_reason);
1369        self.network_state
1370            .peer_store
1371            .lock()
1372            .ban_network(address, ban_until, ban_reason)
1373    }
1374
1375    /// Unban an ip
1376    pub fn unban(&self, address: &IpNetwork) {
1377        self.network_state
1378            .peer_store
1379            .lock()
1380            .mut_ban_list()
1381            .unban_network(address);
1382    }
1383
1384    /// Return all connected peers' information
1385    pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1386        self.network_state.with_peer_registry(|reg| {
1387            reg.peers()
1388                .iter()
1389                .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1390                .collect::<Vec<_>>()
1391        })
1392    }
1393
1394    /// Ban an peer through peer index
1395    pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1396        self.network_state
1397            .ban_session(&self.p2p_control, peer_index, duration, reason);
1398    }
1399
1400    /// disconnect peers with matched peer_ip or peer_ip_network, eg: 192.168.0.2 or 192.168.0.0/24
1401    fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1402        self.network_state.with_peer_registry(|reg| {
1403            reg.peers().iter().for_each(|(peer_index, peer)| {
1404                if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1405                    if address.contains(addr.ip()) {
1406                        let _ = disconnect_with_message(
1407                            &self.p2p_control,
1408                            *peer_index,
1409                            &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1410                        );
1411                    }
1412                }
1413            })
1414        });
1415    }
1416
1417    fn try_broadcast(
1418        &self,
1419        quick: bool,
1420        target: Option<SessionId>,
1421        proto_id: ProtocolId,
1422        data: Bytes,
1423    ) -> Result<(), SendErrorKind> {
1424        let now = Instant::now();
1425        loop {
1426            let target = target
1427                .map(TargetSession::Single)
1428                .unwrap_or(TargetSession::All);
1429            let result = if quick {
1430                self.p2p_control
1431                    .quick_filter_broadcast(target, proto_id, data.clone())
1432            } else {
1433                self.p2p_control
1434                    .filter_broadcast(target, proto_id, data.clone())
1435            };
1436            match result {
1437                Ok(()) => {
1438                    return Ok(());
1439                }
1440                Err(SendErrorKind::WouldBlock) => {
1441                    if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1442                        warn!("Broadcast message to {} timeout", proto_id);
1443                        return Err(SendErrorKind::WouldBlock);
1444                    }
1445                    thread::sleep(P2P_TRY_SEND_INTERVAL);
1446                }
1447                Err(err) => {
1448                    warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1449                    return Err(err);
1450                }
1451            }
1452        }
1453    }
1454
1455    /// Broadcast a message to all connected peers
1456    pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1457        self.try_broadcast(false, None, proto_id, data)
1458    }
1459
1460    /// Broadcast a message to all connected peers through quick queue
1461    pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1462        self.try_broadcast(true, None, proto_id, data)
1463    }
1464
1465    /// Send message to one connected peer
1466    pub fn send_message_to(
1467        &self,
1468        session_id: SessionId,
1469        proto_id: ProtocolId,
1470        data: Bytes,
1471    ) -> Result<(), SendErrorKind> {
1472        self.try_broadcast(false, Some(session_id), proto_id, data)
1473    }
1474
1475    /// network message processing controller, always true, if false, discard any received messages
1476    pub fn is_active(&self) -> bool {
1477        self.network_state.is_active()
1478    }
1479
1480    /// Change active status, if set false discard any received messages
1481    pub fn set_active(&self, active: bool) {
1482        self.network_state.active.store(active, Ordering::Release);
1483    }
1484
1485    /// Return all connected peers' protocols info
1486    pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1487        self.network_state.protocols.read().clone()
1488    }
1489
1490    /// Try ping all connected peers
1491    pub fn ping_peers(&self) {
1492        if let Some(mut ping_controller) = self.ping_controller.clone() {
1493            let _ignore = ping_controller.try_send(());
1494        }
1495    }
1496}
1497
1498// Send an optional message before disconnect a peer
1499pub(crate) fn disconnect_with_message(
1500    control: &ServiceControl,
1501    peer_index: SessionId,
1502    message: &str,
1503) -> Result<(), SendErrorKind> {
1504    if !message.is_empty() {
1505        let data = Bytes::from(message.as_bytes().to_vec());
1506        // Must quick send, otherwise this message will be dropped.
1507        control.quick_send_message_to(
1508            peer_index,
1509            SupportProtocols::DisconnectMessage.protocol_id(),
1510            data,
1511        )?;
1512    }
1513    control.disconnect(peer_index)
1514}
1515
1516pub(crate) async fn async_disconnect_with_message(
1517    control: &ServiceAsyncControl,
1518    peer_index: SessionId,
1519    message: &str,
1520) -> Result<(), SendErrorKind> {
1521    if !message.is_empty() {
1522        let data = Bytes::from(message.as_bytes().to_vec());
1523        // Must quick send, otherwise this message will be dropped.
1524        control
1525            .quick_send_message_to(
1526                peer_index,
1527                SupportProtocols::DisconnectMessage.protocol_id(),
1528                data,
1529            )
1530            .await?;
1531    }
1532    control.disconnect(peer_index).await
1533}
1534
1535/// Transport type on ckb
1536#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1537pub enum TransportType {
1538    /// Tcp
1539    Tcp,
1540    /// Ws
1541    Ws,
1542    /// Wss only on wasm
1543    Wss,
1544}
1545
1546#[allow(dead_code)]
1547pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1548    let mut iter = addr.iter();
1549
1550    iter.find_map(|proto| match proto {
1551        Protocol::Ws => Some(TransportType::Ws),
1552        Protocol::Wss => Some(TransportType::Wss),
1553        _ => None,
1554    })
1555    .unwrap_or(TransportType::Tcp)
1556}