ckb_network/
network.rs

1//! Global state struct and start function
2use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7    PeerStore,
8    types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11    disconnect_message::DisconnectMessageProtocol,
12    discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13    feeler::Feeler,
14    identify::{Flags, IdentifyCallback, IdentifyProtocol},
15    ping::PingHandler,
16    support_protocols::SupportProtocols,
17};
18#[cfg(not(target_family = "wasm"))]
19use crate::proxy;
20use crate::services::{
21    dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
22    protocol_type_checker::ProtocolTypeCheckerService,
23};
24use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
25use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
26use ckb_logger::{debug, error, info, trace, warn};
27use ckb_spawn::Spawn;
28use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
29use ckb_systemtime::{Duration, Instant};
30use ckb_util::{Condvar, Mutex, RwLock};
31use futures::{Future, channel::mpsc::Sender};
32use ipnetwork::IpNetwork;
33use p2p::error::TransportErrorKind;
34use p2p::{
35    SessionId, async_trait,
36    builder::ServiceBuilder,
37    bytes::Bytes,
38    context::{ServiceContext, SessionContext},
39    error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
40    multiaddr::{Multiaddr, Protocol},
41    secio::{self, PeerId, SecioKeyPair, error::SecioError},
42    service::{
43        ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
44        TargetSession,
45    },
46    traits::ServiceHandle,
47    utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
48    yamux::config::Config as YamuxConfig,
49};
50use rand::prelude::IteratorRandom;
51#[cfg(feature = "with_sentry")]
52use sentry::{Level, capture_message, with_scope};
53#[cfg(not(target_family = "wasm"))]
54use std::sync::mpsc;
55use std::{
56    borrow::Cow,
57    cmp::max,
58    collections::{HashMap, HashSet},
59    pin::Pin,
60    sync::{
61        Arc,
62        atomic::{AtomicBool, Ordering},
63    },
64    thread,
65};
66use tokio::{self, sync::oneshot};
67
68const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
69const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
70// After 5 minutes we consider this dial hang
71const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
72
73/// The global shared state of the network module
74pub struct NetworkState {
75    pub(crate) peer_registry: RwLock<PeerRegistry>,
76    pub(crate) peer_store: Mutex<PeerStore>,
77    /// Node listened addresses
78    pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
79    dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
80    /// Node public addresses by config
81    public_addrs: RwLock<HashSet<Multiaddr>>,
82    observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
83    local_private_key: secio::SecioKeyPair,
84    local_peer_id: PeerId,
85    pub(crate) bootnodes: Vec<Multiaddr>,
86    pub(crate) config: NetworkConfig,
87    pub(crate) active: AtomicBool,
88    /// Node supported protocols
89    /// fields: ProtocolId, Protocol Name, Supported Versions
90    pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
91    pub(crate) required_flags: Flags,
92}
93
94impl NetworkState {
95    /// Init from config
96    #[cfg(not(target_family = "wasm"))]
97    pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
98        config.create_dir_if_not_exists()?;
99        let local_private_key = config.fetch_private_key()?;
100        let local_peer_id = local_private_key.peer_id();
101        // set max score to public addresses
102        let public_addrs: HashSet<Multiaddr> = config
103            .listen_addresses
104            .iter()
105            .chain(config.public_addresses.iter())
106            .cloned()
107            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
108                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
109                _ => {
110                    match extract_peer_id(&addr) {
111                        Some(peer_id) if peer_id != local_peer_id => {
112                            error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
113                            std::process::exit(1);
114                        }
115                        Some(_) => (),
116                        None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
117                    }
118                    Some(addr)
119                }
120            })
121            .collect();
122        info!("Loading the peer store. This process may take a few seconds to complete.");
123
124        let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
125            config.peer_store_path(),
126        ));
127        info!("Loaded the peer store.");
128
129        if let Some(ref proxy_url) = config.proxy.proxy_url {
130            proxy::check_proxy_url(proxy_url).map_err(Error::Config)?;
131        }
132
133        let bootnodes = config.bootnodes();
134
135        let peer_registry = PeerRegistry::new(
136            config.max_inbound_peers(),
137            config.max_outbound_peers(),
138            config.whitelist_only,
139            config.whitelist_peers(),
140            config.disable_block_relay_only_connection,
141        );
142
143        Ok(NetworkState {
144            peer_store,
145            config,
146            bootnodes,
147            peer_registry: RwLock::new(peer_registry),
148            dialing_addrs: RwLock::new(HashMap::default()),
149            public_addrs: RwLock::new(public_addrs),
150            listened_addrs: RwLock::new(Vec::new()),
151            observed_addrs: RwLock::new(HashMap::default()),
152            local_private_key,
153            local_peer_id,
154            active: AtomicBool::new(true),
155            protocols: RwLock::new(Vec::new()),
156            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
157        })
158    }
159
160    #[cfg(target_family = "wasm")]
161    pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
162        let local_private_key = config.fetch_private_key()?;
163        let local_peer_id = local_private_key.peer_id();
164        // set max score to public addresses
165        let public_addrs: HashSet<Multiaddr> = config
166            .listen_addresses
167            .iter()
168            .chain(config.public_addresses.iter())
169            .cloned()
170            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
171                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
172                _ => {
173                    if extract_peer_id(&addr).is_none() {
174                        addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
175                    }
176                    Some(addr)
177                }
178            })
179            .collect();
180        info!("Loading the peer store. This process may take a few seconds to complete.");
181        let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
182        let bootnodes = config.bootnodes();
183
184        let peer_registry = PeerRegistry::new(
185            config.max_inbound_peers(),
186            config.max_outbound_peers(),
187            config.whitelist_only,
188            config.whitelist_peers(),
189            config.disable_block_relay_only_connection,
190        );
191        Ok(NetworkState {
192            peer_store,
193            config,
194            bootnodes,
195            peer_registry: RwLock::new(peer_registry),
196            dialing_addrs: RwLock::new(HashMap::default()),
197            public_addrs: RwLock::new(public_addrs),
198            listened_addrs: RwLock::new(Vec::new()),
199            observed_addrs: RwLock::new(HashMap::default()),
200            local_private_key,
201            local_peer_id,
202            active: AtomicBool::new(true),
203            protocols: RwLock::new(Vec::new()),
204            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
205        })
206    }
207
208    /// use to discovery get nodes message to announce what kind of node information need from the other peer
209    /// default with `Flags::SYNC | Flags::DISCOVERY | Flags::RELAY`
210    pub fn required_flags(mut self, flags: Flags) -> Self {
211        self.required_flags = flags;
212        self
213    }
214
215    pub(crate) fn report_session(
216        &self,
217        p2p_control: &ServiceControl,
218        session_id: SessionId,
219        behaviour: Behaviour,
220    ) {
221        if let Some(addr) = self.with_peer_registry(|reg| {
222            reg.get_peer(session_id)
223                .filter(|peer| !peer.is_whitelist)
224                .map(|peer| peer.connected_addr.clone())
225        }) {
226            trace!("Report {:?} because {:?}", addr, behaviour);
227            let report_result = self.peer_store.lock().report(&addr, behaviour);
228            if report_result.is_banned() {
229                if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
230                    debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
231                }
232            }
233        } else {
234            debug!(
235                "Report {} failure: not found in peer registry or it is on the whitelist",
236                session_id
237            );
238        }
239    }
240
241    pub(crate) fn ban_session(
242        &self,
243        p2p_control: &ServiceControl,
244        session_id: SessionId,
245        duration: Duration,
246        reason: String,
247    ) {
248        if let Some(addr) = self.with_peer_registry(|reg| {
249            reg.get_peer(session_id)
250                .filter(|peer| !peer.is_whitelist)
251                .map(|peer| peer.connected_addr.clone())
252        }) {
253            info!(
254                "Ban peer {:?} for {} seconds, reason: {}",
255                addr,
256                duration.as_secs(),
257                reason
258            );
259            if let Some(metrics) = ckb_metrics::handle() {
260                metrics.ckb_network_ban_peer.inc();
261            }
262            if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
263                let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
264                self.peer_store.lock().ban_addr(
265                    &peer.connected_addr,
266                    duration.as_millis() as u64,
267                    reason,
268                );
269                if let Err(err) =
270                    disconnect_with_message(p2p_control, peer.session_id, message.as_str())
271                {
272                    debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
273                }
274            }
275        } else {
276            debug!(
277                "Ban session({}) failed: not found in peer registry or it is on the whitelist",
278                session_id
279            );
280        }
281    }
282
283    pub(crate) fn accept_peer(
284        &self,
285        session_context: &SessionContext,
286    ) -> Result<Option<Peer>, Error> {
287        // NOTE: be careful, here easy cause a deadlock,
288        //    because peer_store's lock scope across peer_registry's lock scope
289        let mut peer_store = self.peer_store.lock();
290        let accept_peer_result = {
291            self.peer_registry.write().accept_peer(
292                session_context.address.clone(),
293                session_context.id,
294                session_context.ty,
295                &mut peer_store,
296            )
297        };
298        accept_peer_result
299    }
300
301    /// For restrict lock in inner scope
302    pub fn with_peer_registry<F, T>(&self, callback: F) -> T
303    where
304        F: FnOnce(&PeerRegistry) -> T,
305    {
306        callback(&self.peer_registry.read())
307    }
308
309    // For restrict lock in inner scope
310    pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
311    where
312        F: FnOnce(&mut PeerRegistry) -> T,
313    {
314        callback(&mut self.peer_registry.write())
315    }
316
317    // For restrict lock in inner scope
318    pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
319    where
320        F: FnOnce(&mut PeerStore) -> T,
321    {
322        callback(&mut self.peer_store.lock())
323    }
324
325    /// Get peer id of local node
326    pub fn local_peer_id(&self) -> &PeerId {
327        &self.local_peer_id
328    }
329
330    /// Use on test
331    pub fn local_private_key(&self) -> &secio::SecioKeyPair {
332        &self.local_private_key
333    }
334
335    /// Get local node's peer id in base58 format string
336    pub fn node_id(&self) -> String {
337        self.local_peer_id().to_base58()
338    }
339
340    pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341        let public_addrs = self.public_addrs.read();
342        if public_addrs.len() <= count {
343            return public_addrs.iter().cloned().collect();
344        } else {
345            public_addrs
346                .iter()
347                .cloned()
348                .choose_multiple(&mut rand::thread_rng(), count)
349        }
350    }
351
352    /// After onion service created,
353    /// ckb use this method to add onion address to public_addr
354    pub fn add_public_addr(&self, addr: Multiaddr) {
355        self.public_addrs.write().insert(addr);
356    }
357
358    pub(crate) fn connection_status(&self) -> ConnectionStatus {
359        self.peer_registry.read().connection_status()
360    }
361
362    /// Get local node's listen address list
363    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
364        let listened_addrs = self.listened_addrs.read();
365        self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
366            .into_iter()
367            .filter_map(|addr| {
368                if !listened_addrs.contains(&addr) {
369                    Some((addr, 1))
370                } else {
371                    None
372                }
373            })
374            .chain(listened_addrs.iter().map(|addr| (addr.clone(), 1)))
375            .map(|(addr, score)| (addr.to_string(), score))
376            .collect()
377    }
378
379    pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
380        self.dial_identify(p2p_control, address);
381    }
382
383    /// use a filter to get protocol id list
384    pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
385        self.protocols
386            .read()
387            .iter()
388            .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
389            .collect::<Vec<_>>()
390    }
391
392    pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
393        let peer_id = extract_peer_id(addr);
394        if peer_id.is_none() {
395            error!("Do not dial addr without peer id, addr: {}", addr);
396            return false;
397        }
398        let peer_id = peer_id.as_ref().unwrap();
399
400        if self.local_peer_id() == peer_id {
401            trace!("Do not dial self: {:?}, {}", peer_id, addr);
402            return false;
403        }
404        if self.public_addrs.read().contains(addr) {
405            trace!(
406                "Do not dial listened address(self): {:?}, {}",
407                peer_id, addr
408            );
409            return false;
410        }
411
412        let peer_in_registry = self.with_peer_registry(|reg| {
413            reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
414        });
415        if peer_in_registry {
416            trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
417            return false;
418        }
419
420        if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
421            trace!(
422                "Do not send repeated dial commands to network service: {:?}, {}",
423                peer_id, addr
424            );
425            if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
426                #[cfg(feature = "with_sentry")]
427                with_scope(
428                    |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
429                    || {
430                        capture_message(
431                            &format!(
432                                "Dialing {:?}, {:?} for more than {} seconds, \
433                                 something is wrong in network service",
434                                peer_id,
435                                addr,
436                                DIAL_HANG_TIMEOUT.as_secs(),
437                            ),
438                            Level::Warning,
439                        )
440                    },
441                );
442            }
443            return false;
444        }
445
446        true
447    }
448
449    pub(crate) fn dial_success(&self, addr: &Multiaddr) {
450        if let Some(peer_id) = extract_peer_id(addr) {
451            self.dialing_addrs.write().remove(&peer_id);
452        }
453    }
454
455    pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
456        self.with_peer_registry_mut(|reg| {
457            reg.remove_feeler(addr);
458        });
459
460        if let Some(peer_id) = extract_peer_id(addr) {
461            self.dialing_addrs.write().remove(&peer_id);
462        }
463    }
464
465    /// Dial
466    /// return value indicates the dialing is actually sent or denied.
467    fn dial_inner(
468        &self,
469        p2p_control: &ServiceControl,
470        addr: Multiaddr,
471        target: TargetProtocol,
472    ) -> Result<(), Error> {
473        if !self.can_dial(&addr) {
474            return Err(Error::Dial(format!("ignore dialing addr {addr}")));
475        }
476
477        debug!("Dialing {addr}");
478        p2p_control.dial(addr.clone(), target)?;
479        self.dialing_addrs.write().insert(
480            extract_peer_id(&addr).expect("verified addr"),
481            Instant::now(),
482        );
483        Ok(())
484    }
485
486    /// Dial just identify protocol
487    pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
488        if let Err(err) = self.dial_inner(
489            p2p_control,
490            addr,
491            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
492        ) {
493            debug!("dial_identify error: {err}");
494        }
495    }
496
497    /// Dial just feeler protocol
498    pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
499        if let Err(err) = self.dial_inner(
500            p2p_control,
501            addr.clone(),
502            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
503        ) {
504            debug!("dial_feeler error {err}");
505        } else {
506            self.with_peer_registry_mut(|reg| {
507                reg.add_feeler(&addr);
508            });
509        }
510    }
511
512    /// add observed address for identify protocol
513    pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
514        let mut pending_observed_addrs = self.observed_addrs.write();
515        pending_observed_addrs.insert(session_id, addr);
516    }
517
518    // randomly select count addresses from observed_addrs
519    pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
520        let observed_addrs = self
521            .observed_addrs
522            .read()
523            .values()
524            .cloned()
525            .collect::<HashSet<_>>();
526        if observed_addrs.len() <= count {
527            return observed_addrs.into_iter().collect();
528        } else {
529            observed_addrs
530                .into_iter()
531                .choose_multiple(&mut rand::thread_rng(), count)
532        }
533    }
534
535    /// Network message processing controller, default is true, if false, discard any received messages
536    pub fn is_active(&self) -> bool {
537        self.active.load(Ordering::Acquire)
538    }
539}
540
541/// Used to handle global events of tentacle, such as session open/close
542pub struct EventHandler {
543    pub(crate) network_state: Arc<NetworkState>,
544}
545
546impl EventHandler {
547    /// init an event handler
548    pub fn new(network_state: Arc<NetworkState>) -> Self {
549        Self { network_state }
550    }
551}
552
553/// Exit trait used to notify all other module to exit
554pub trait ExitHandler: Send + Unpin + 'static {
555    /// notify other module to exit
556    fn notify_exit(&self);
557}
558
559/// Default exit handle
560#[derive(Clone, Default)]
561pub struct DefaultExitHandler {
562    lock: Arc<Mutex<()>>,
563    exit: Arc<Condvar>,
564}
565
566impl DefaultExitHandler {
567    /// Block on current thread util exit notify
568    pub fn wait_for_exit(&self) {
569        self.exit.wait(&mut self.lock.lock());
570    }
571}
572
573impl ExitHandler for DefaultExitHandler {
574    fn notify_exit(&self) {
575        self.exit.notify_all();
576    }
577}
578
579impl EventHandler {
580    fn inbound_eviction(&self) -> Vec<PeerIndex> {
581        if self.network_state.config.bootnode_mode {
582            let status = self.network_state.connection_status();
583
584            if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
585                self.network_state
586                    .with_peer_registry(|registry| {
587                        registry
588                            .peers()
589                            .values()
590                            .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
591                            .map(|peer| peer.session_id)
592                            .collect::<Vec<SessionId>>()
593                    })
594                    .into_iter()
595                    .enumerate()
596                    .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
597                    .collect()
598            } else {
599                Vec::new()
600            }
601        } else {
602            Vec::new()
603        }
604    }
605}
606
607#[async_trait]
608impl ServiceHandle for EventHandler {
609    async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
610        match error {
611            ServiceError::DialerError { address, error } => {
612                match error {
613                    DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
614                        SecioError::ConnectSelf,
615                    )) => {
616                        debug!("dial observed address success: {:?}", address);
617                    }
618                    DialerErrorKind::IoError(e)
619                        if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
620                    {
621                        warn!("DialerError({}) {}", address, e);
622                    }
623                    DialerErrorKind::TransportError(e)
624                        if matches!(&e, TransportErrorKind::ProxyError(_proxy_err)) =>
625                    {
626                        let err = e.to_string();
627                        if err.contains("failed to establish connection to target:General failure")
628                            || err.contains(
629                                "failed to establish connection to target:Connection refused",
630                            )
631                        {
632                            debug!("DialerError({}) {}", address, e);
633                        } else {
634                            error!("Is the proxy server down? DialerError({}) {}", address, e);
635                        }
636                    }
637                    _ => {
638                        debug!("DialerError({}) {}", address, error);
639                    }
640                }
641                self.network_state.dial_failed(&address);
642            }
643            ServiceError::ProtocolError {
644                id,
645                proto_id,
646                error,
647            } => {
648                debug!("ProtocolError({}, {}) {}", id, proto_id, error);
649                let message = format!("ProtocolError id={proto_id}");
650                // Ban because misbehave of remote peer
651                self.network_state.ban_session(
652                    &context.control().clone().into(),
653                    id,
654                    Duration::from_secs(300),
655                    message,
656                );
657            }
658            ServiceError::SessionTimeout { session_context } => {
659                debug!(
660                    "SessionTimeout({}, {})",
661                    session_context.id, session_context.address,
662                );
663            }
664            ServiceError::MuxerError {
665                session_context,
666                error,
667            } => {
668                debug!(
669                    "MuxerError({}, {}), substream error {}, disconnect it",
670                    session_context.id, session_context.address, error,
671                );
672            }
673            ServiceError::ListenError { address, error } => {
674                debug!("ListenError: address={:?}, error={:?}", address, error);
675            }
676            ServiceError::ProtocolSelectError {
677                proto_name,
678                session_context,
679            } => {
680                debug!(
681                    "ProtocolSelectError: proto_name={:?}, session_id={}",
682                    proto_name, session_context.id,
683                );
684            }
685            ServiceError::SessionBlocked { session_context } => {
686                debug!("SessionBlocked: {}", session_context.id);
687            }
688            ServiceError::ProtocolHandleError { proto_id, error } => {
689                debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
690
691                let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
692                {
693                    if let Some(id) = opt_session_id {
694                        self.network_state.ban_session(
695                            &context.control().clone().into(),
696                            id,
697                            Duration::from_secs(300),
698                            format!("protocol {proto_id} panic when process peer message"),
699                        );
700                    }
701                    #[cfg(feature = "with_sentry")]
702                    with_scope(
703                        |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
704                        || {
705                            capture_message(
706                                &format!(
707                                    "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
708                                ),
709                                Level::Warning,
710                            )
711                        },
712                    );
713                    error!(
714                        "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
715                    );
716
717                    broadcast_exit_signals();
718                }
719            }
720        }
721    }
722
723    async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
724        // When session disconnect update status anyway
725        match event {
726            ServiceEvent::SessionOpen { session_context } => {
727                debug!(
728                    "SessionOpen({}, {})",
729                    session_context.id, session_context.address,
730                );
731
732                self.network_state.dial_success(&session_context.address);
733
734                let iter = self.inbound_eviction();
735
736                let control = context.control().clone().into();
737
738                for peer in iter {
739                    if let Err(err) =
740                        disconnect_with_message(&control, peer, "bootnode random eviction")
741                    {
742                        debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
743                    }
744                }
745
746                if self
747                    .network_state
748                    .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
749                {
750                    debug!(
751                        "Feeler connected {} => {}",
752                        session_context.id, session_context.address,
753                    );
754                } else {
755                    match self.network_state.accept_peer(&session_context) {
756                        Ok(Some(evicted_peer)) => {
757                            debug!(
758                                "Disconnect peer, {} => {}",
759                                evicted_peer.session_id, evicted_peer.connected_addr,
760                            );
761                            if let Err(err) = disconnect_with_message(
762                                &control,
763                                evicted_peer.session_id,
764                                "evict because accepted better peer",
765                            ) {
766                                debug!(
767                                    "Disconnect failed {:?}, error: {:?}",
768                                    evicted_peer.session_id, err
769                                );
770                            }
771                        }
772                        Ok(None) => debug!(
773                            "{} open, registry {} success",
774                            session_context.id, session_context.address,
775                        ),
776                        Err(err) => {
777                            debug!(
778                                "Peer registry failed {:?}. Disconnect {} => {}",
779                                err, session_context.id, session_context.address,
780                            );
781                            if let Err(err) = disconnect_with_message(
782                                &control,
783                                session_context.id,
784                                "reject peer connection",
785                            ) {
786                                debug!(
787                                    "Disconnect failed {:?}, error: {:?}",
788                                    session_context.id, err
789                                );
790                            }
791                        }
792                    }
793                }
794            }
795            ServiceEvent::SessionClose { session_context } => {
796                debug!(
797                    "SessionClose({}, {})",
798                    session_context.id, session_context.address,
799                );
800                let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
801                    // should make sure feelers is clean
802                    reg.remove_feeler(&session_context.address);
803                    reg.remove_peer(session_context.id).is_some()
804                });
805                if peer_exists {
806                    debug!(
807                        "{} closed. Remove {} from peer_registry",
808                        session_context.id, session_context.address,
809                    );
810                    self.network_state.with_peer_store_mut(|peer_store| {
811                        peer_store.remove_disconnected_peer(&session_context.address);
812                    });
813                }
814                self.network_state
815                    .observed_addrs
816                    .write()
817                    .remove(&session_context.id);
818            }
819            _ => {
820                info!("p2p service event: {:?}", event);
821            }
822        }
823    }
824}
825
826/// Ckb network service, use to start p2p network
827pub struct NetworkService {
828    p2p_service: Service<EventHandler, SecioKeyPair>,
829    network_state: Arc<NetworkState>,
830    ping_controller: Option<Sender<()>>,
831    // Background services
832    bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
833    version: String,
834}
835
836impl NetworkService {
837    /// init with all config
838    pub fn new(
839        network_state: Arc<NetworkState>,
840        protocols: Vec<CKBProtocol>,
841        required_protocol_ids: Vec<ProtocolId>,
842        // name, version, flags
843        identify_announce: (String, String, Flags),
844        transport_type: TransportType,
845    ) -> Self {
846        let config = &network_state.config;
847
848        if config.support_protocols.iter().collect::<HashSet<_>>()
849            != default_support_all_protocols()
850                .iter()
851                .collect::<HashSet<_>>()
852        {
853            warn!(
854                "Customized supported protocols: {:?}",
855                config.support_protocols
856            );
857        }
858
859        // == Build p2p service struct
860        let mut protocol_metas = protocols
861            .into_iter()
862            .map(CKBProtocol::build)
863            .collect::<Vec<_>>();
864
865        // == Build special protocols
866
867        // Identify is a core protocol, user cannot disable it via config
868        let identify_callback = IdentifyCallback::new(
869            Arc::clone(&network_state),
870            identify_announce.0,
871            identify_announce.1.clone(),
872            identify_announce.2,
873        );
874        let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
875            ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
876        });
877        protocol_metas.push(identify_meta);
878
879        // Ping protocol
880        let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
881            let ping_interval = Duration::from_secs(config.ping_interval_secs);
882            let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
883
884            let ping_network_state = Arc::clone(&network_state);
885            let (ping_handler, ping_controller) =
886                PingHandler::new(ping_interval, ping_timeout, ping_network_state);
887            let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
888                ProtocolHandle::Callback(Box::new(ping_handler))
889            });
890            protocol_metas.push(ping_meta);
891            Some(ping_controller)
892        } else {
893            None
894        };
895
896        // Discovery protocol
897        if config
898            .support_protocols
899            .contains(&SupportProtocol::Discovery)
900        {
901            let addr_mgr = DiscoveryAddressManager {
902                network_state: Arc::clone(&network_state),
903                discovery_local_address: config.discovery_local_address,
904            };
905            let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
906                ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
907                    addr_mgr,
908                    config
909                        .discovery_announce_check_interval_secs
910                        .map(Duration::from_secs),
911                )))
912            });
913            protocol_metas.push(disc_meta);
914        }
915
916        // Feeler protocol
917        if config.support_protocols.contains(&SupportProtocol::Feeler) {
918            let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
919                let network_state = Arc::clone(&network_state);
920                move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
921            });
922            protocol_metas.push(feeler_meta);
923        }
924
925        // DisconnectMessage protocol
926        if config
927            .support_protocols
928            .contains(&SupportProtocol::DisconnectMessage)
929        {
930            let disconnect_message_state = Arc::clone(&network_state);
931            let disconnect_message_meta = SupportProtocols::DisconnectMessage
932                .build_meta_with_service_handle(move || {
933                    ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
934                        disconnect_message_state,
935                    )))
936                });
937            protocol_metas.push(disconnect_message_meta);
938        }
939
940        // HolePunching protocol
941        #[cfg(not(target_family = "wasm"))]
942        if config
943            .support_protocols
944            .contains(&SupportProtocol::HolePunching)
945        {
946            let hole_punching_state = Arc::clone(&network_state);
947            let hole_punching_meta =
948                SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
949                    ProtocolHandle::Callback(Box::new(
950                        crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
951                    ))
952                });
953            protocol_metas.push(hole_punching_meta);
954        }
955
956        let mut service_builder = ServiceBuilder::default();
957        let yamux_config = YamuxConfig {
958            max_stream_count: protocol_metas.len(),
959            max_stream_window_size: 1024 * 1024,
960            ..Default::default()
961        };
962        for meta in protocol_metas.into_iter() {
963            network_state
964                .protocols
965                .write()
966                .push((meta.id(), meta.name(), meta.support_versions()));
967            service_builder = service_builder.insert_protocol(meta);
968        }
969        let event_handler = EventHandler {
970            network_state: Arc::clone(&network_state),
971        };
972        service_builder = service_builder
973            .handshake_type(network_state.local_private_key.clone().into())
974            .yamux_config(yamux_config)
975            .forever(true)
976            .max_connection_number(1024)
977            .set_send_buffer_size(config.max_send_buffer())
978            .set_channel_size(config.channel_size())
979            .timeout(Duration::from_secs(5))
980            .onion_timeout(Duration::from_secs(120));
981
982        #[cfg(not(target_family = "wasm"))]
983        {
984            service_builder = service_builder.upnp(config.upnp);
985
986            // set proxy and onion config
987
988            if let Some(proxy_url) = &config.proxy.proxy_url {
989                service_builder = service_builder
990                    .tcp_proxy_config(proxy_url)
991                    .tcp_proxy_random_auth(config.proxy.proxy_random_auth);
992                info!(
993                    "set tcp_proxy_config: {:?}, proxy_random_auth: {}",
994                    config.proxy.proxy_url.clone(),
995                    config.proxy.proxy_random_auth
996                );
997            };
998
999            let onion_proxy_url = {
1000                config.onion.onion_server.clone().map(|onion_server| {
1001                    if !onion_server.starts_with("socks5://") {
1002                        format!("socks5://{}", onion_server)
1003                    } else {
1004                        onion_server
1005                    }
1006                })
1007            };
1008            if let Some(onion_proxy_url) = onion_proxy_url {
1009                info!("set tcp_onion_config: {:?}", onion_proxy_url);
1010                service_builder = service_builder.tcp_onion_config(&onion_proxy_url);
1011            }
1012        }
1013
1014        #[cfg(target_os = "linux")]
1015        let p2p_service = {
1016            if config.reuse_port_on_linux {
1017                let iter = config.listen_addresses.iter();
1018
1019                #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1020                enum BindType {
1021                    None,
1022                    Ws,
1023                    Tcp,
1024                    Both,
1025                }
1026                impl BindType {
1027                    fn transform(&mut self, other: TransportType) {
1028                        match (&self, other) {
1029                            (BindType::None, TransportType::Ws) => *self = BindType::Ws,
1030                            (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
1031                            (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
1032                            (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
1033                            _ => (),
1034                        }
1035                    }
1036
1037                    fn is_ready(&self) -> bool {
1038                        // should change to Both if ckb enable ws
1039                        matches!(self, BindType::Both)
1040                    }
1041                }
1042
1043                let mut init = BindType::None;
1044
1045                let proxy_config_enable =
1046                    config.proxy.proxy_url.is_some() || config.onion.onion_server.is_some();
1047
1048                let bind_fn_with_addr =
1049                    move |socket: p2p::service::TcpSocket,
1050                          ctxt: p2p::service::TransformerContext,
1051                          addr: std::net::SocketAddr| {
1052                        let socket_ref = socket2::SockRef::from(&socket);
1053                        #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
1054                        socket_ref.set_reuse_port(true)?;
1055                        socket_ref.set_reuse_address(true)?;
1056                        match ctxt.state {
1057                            p2p::service::SocketState::Listen => Ok(socket),
1058                            p2p::service::SocketState::Dial => {
1059                                let domain = socket2::Domain::for_address(addr);
1060                                if socket_ref.domain()? == domain {
1061                                    if proxy_config_enable {
1062                                        // skip bind if proxy enabled
1063                                        debug!("skip bind since proxy is enabled");
1064                                    } else {
1065                                        socket_ref.bind(&addr.into())?;
1066                                    }
1067                                }
1068                                Ok(socket)
1069                            }
1070                        }
1071                    };
1072
1073                for multi_addr in iter {
1074                    if init.is_ready() {
1075                        break;
1076                    }
1077
1078                    match find_type(multi_addr) {
1079                        TransportType::Tcp => {
1080                            // only bind once
1081                            if matches!(init, BindType::Tcp) {
1082                                continue;
1083                            }
1084                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1085                                let bind_fn = move |socket: p2p::service::TcpSocket,
1086                                                    ctxt: p2p::service::TransformerContext| {
1087                                    bind_fn_with_addr(socket, ctxt, addr)
1088                                };
1089                                init.transform(TransportType::Tcp);
1090                                service_builder = service_builder.tcp_config(bind_fn);
1091                            }
1092                        }
1093                        TransportType::Ws | TransportType::Wss => {
1094                            // only bind once
1095                            if matches!(init, BindType::Ws) {
1096                                continue;
1097                            }
1098                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1099                                let bind_fn = move |socket: p2p::service::TcpSocket,
1100                                                    ctxt: p2p::service::TransformerContext| {
1101                                    bind_fn_with_addr(socket, ctxt, addr)
1102                                };
1103                                init.transform(TransportType::Ws);
1104                                service_builder = service_builder.tcp_config_on_ws(bind_fn);
1105                            }
1106                        }
1107                    }
1108                }
1109            }
1110
1111            service_builder.build(event_handler)
1112        };
1113
1114        #[cfg(not(target_os = "linux"))]
1115        // The default permissions of Windows are not enough to enable this function,
1116        // and the administrator permissions of group permissions must be turned on.
1117        // This operation is very burdensome for windows users, so it is turned off by default
1118        //
1119        // The integration test fails after MacOS is turned on, the behavior is different from linux.
1120        // Decision to turn off it
1121        let p2p_service = service_builder.build(event_handler);
1122
1123        // == Build background service tasks
1124        let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1125        let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1126            Arc::clone(&network_state),
1127            p2p_service.control().to_owned().into(),
1128            required_protocol_ids,
1129        );
1130        let mut bg_services = vec![
1131            Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1132            Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1133        ];
1134        if config.outbound_peer_service_enabled() {
1135            let outbound_peer_service = OutboundPeerService::new(
1136                Arc::clone(&network_state),
1137                p2p_service.control().to_owned().into(),
1138                Duration::from_secs(config.connect_outbound_interval_secs),
1139                transport_type,
1140            );
1141            bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1142        };
1143
1144        #[cfg(feature = "with_dns_seeding")]
1145        if config.dns_seeding_service_enabled() {
1146            let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1147                Arc::clone(&network_state),
1148                config.dns_seeds.clone(),
1149            );
1150            bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1151        };
1152
1153        NetworkService {
1154            p2p_service,
1155            network_state,
1156            ping_controller,
1157            bg_services,
1158            version: identify_announce.1,
1159        }
1160    }
1161
1162    /// Start the network in the background and return a controller
1163    pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1164        let config = self.network_state.config.clone();
1165
1166        let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1167
1168        // dial whitelist_nodes
1169        for addr in self.network_state.config.whitelist_peers() {
1170            debug!("Dial whitelist_peers {:?}", addr);
1171            self.network_state.dial_identify(&p2p_control, addr);
1172        }
1173
1174        let target = &self.network_state.required_flags;
1175
1176        // get bootnodes
1177        // try get addrs from peer_store, if peer_store have no enough addrs then use bootnodes
1178        let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1179            let count = max((config.max_outbound_peers >> 1) as usize, 1);
1180            let mut addrs: Vec<_> = peer_store
1181                .fetch_addrs_to_attempt(count, *target, |_| true)
1182                .into_iter()
1183                .map(|paddr| paddr.addr)
1184                .collect();
1185            // tried to re-connect to anchors on startup
1186            let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1187            addrs.extend(anchors);
1188            // Get bootnodes randomly
1189            let bootnodes = self
1190                .network_state
1191                .bootnodes
1192                .iter()
1193                .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1194                .into_iter()
1195                .cloned();
1196            addrs.extend(bootnodes);
1197            addrs
1198        });
1199
1200        // dial half bootnodes
1201        for addr in bootnodes {
1202            debug!("Dial bootnode {:?}", addr);
1203            self.network_state.dial_identify(&p2p_control, addr);
1204        }
1205
1206        let Self {
1207            mut p2p_service,
1208            network_state,
1209            ping_controller,
1210            bg_services,
1211            version,
1212        } = self;
1213
1214        // NOTE: for ensure background task finished
1215        let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1216            .into_iter()
1217            .map(|bg_service| {
1218                let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1219                (signal_sender, (bg_service, signal_receiver))
1220            })
1221            .unzip();
1222
1223        let receiver: CancellationToken = new_tokio_exit_rx();
1224        #[cfg(not(target_family = "wasm"))]
1225        let (start_sender, start_receiver) = mpsc::channel();
1226        {
1227            #[cfg(not(target_family = "wasm"))]
1228            let network_state = Arc::clone(&network_state);
1229            let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1230            handle.spawn_task(async move {
1231                #[cfg(not(target_family = "wasm"))]
1232                {
1233                    let listen_addresses = {
1234                        let mut addresses = config.listen_addresses.clone();
1235                        if config.reuse_tcp_with_ws {
1236                            let ws_listens = addresses
1237                                .iter()
1238                                .cloned()
1239                                .filter_map(|mut addr| {
1240                                    if matches!(find_type(&addr), TransportType::Tcp) {
1241                                        addr.push(Protocol::Ws);
1242                                        Some(addr)
1243                                    } else {
1244                                        None
1245                                    }
1246                                })
1247                                .collect::<Vec<_>>();
1248
1249                            addresses.extend(ws_listens);
1250                        }
1251                        let mut addresses = addresses
1252                            .into_iter()
1253                            .collect::<HashSet<_>>()
1254                            .into_iter()
1255                            .collect::<Vec<_>>();
1256                        addresses.sort_by(|a, b| {
1257                            let ty_a = find_type(a);
1258                            let ty_b = find_type(b);
1259
1260                            ty_a.cmp(&ty_b)
1261                        });
1262
1263                        addresses
1264                    };
1265
1266                    for addr in &listen_addresses {
1267                        match p2p_service.listen(addr.to_owned()).await {
1268                            Ok(listen_address) => {
1269                                info!("Listen on address: {}", listen_address);
1270                                network_state
1271                                    .listened_addrs
1272                                    .write()
1273                                    .push(listen_address.clone());
1274                            }
1275                            Err(err) => {
1276                                warn!(
1277                                    "Listen on address {} failed, due to error: {}",
1278                                    addr.clone(),
1279                                    err
1280                                );
1281                                start_sender
1282                                    .send(Err(Error::P2P(P2PError::Transport(err))))
1283                                    .expect("channel abnormal shutdown");
1284                                return;
1285                            }
1286                        };
1287                    }
1288                    start_sender.send(Ok(())).unwrap();
1289                }
1290
1291                p2p::runtime::spawn(async move { p2p_service.run().await });
1292                tokio::select! {
1293                    _ = receiver.cancelled() => {
1294                        info!("NetworkService receive exit signal, start shutdown...");
1295                        let _ = p2p_control.shutdown().await;
1296                        // Drop senders to stop all corresponding background task
1297                        drop(bg_signals);
1298
1299                        info!("NetworkService shutdown now");
1300                    },
1301                    else => {
1302                        let _ = p2p_control.shutdown().await;
1303                        // Drop senders to stop all corresponding background task
1304                        drop(bg_signals);
1305                    },
1306                }
1307            });
1308        }
1309        for (mut service, mut receiver) in bg_receivers {
1310            handle.spawn_task(async move {
1311                loop {
1312                    tokio::select! {
1313                        _ = &mut service => {},
1314                        _ = &mut receiver => break
1315                    }
1316                }
1317            });
1318        }
1319        #[cfg(not(target_family = "wasm"))]
1320        if let Ok(Err(e)) = start_receiver.recv() {
1321            return Err(e);
1322        }
1323
1324        Ok(NetworkController {
1325            version,
1326            network_state,
1327            p2p_control,
1328            ping_controller,
1329        })
1330    }
1331}
1332
1333/// Network controller
1334#[derive(Clone)]
1335pub struct NetworkController {
1336    version: String,
1337    network_state: Arc<NetworkState>,
1338    p2p_control: ServiceControl,
1339    ping_controller: Option<Sender<()>>,
1340}
1341
1342impl NetworkController {
1343    /// Node listen address list
1344    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1345        self.network_state.public_urls(max_urls)
1346    }
1347
1348    /// ckb version
1349    pub fn version(&self) -> &String {
1350        &self.version
1351    }
1352
1353    /// Node peer id's base58 format string
1354    pub fn node_id(&self) -> String {
1355        self.network_state.node_id()
1356    }
1357
1358    /// p2p service control
1359    pub fn p2p_control(&self) -> &ServiceControl {
1360        &self.p2p_control
1361    }
1362
1363    /// async p2p service control
1364    pub fn async_p2p_control(&self) -> ServiceAsyncControl {
1365        self.p2p_control.clone().into()
1366    }
1367
1368    /// Dial remote node
1369    pub fn add_node(&self, address: Multiaddr) {
1370        self.network_state.add_node(&self.p2p_control, address)
1371    }
1372
1373    /// Add a public_addr to NetworkState.public_addrs
1374    pub fn add_public_addr(&self, public_addr: Multiaddr) {
1375        self.network_state.add_public_addr(public_addr)
1376    }
1377
1378    /// Disconnect session with peer id
1379    pub fn remove_node(&self, peer_id: &PeerId) {
1380        if let Some(session_id) = self
1381            .network_state
1382            .peer_registry
1383            .read()
1384            .get_key_by_peer_id(peer_id)
1385        {
1386            if let Err(err) =
1387                disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1388            {
1389                debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1390            }
1391        } else {
1392            error!("Cannot find peer {:?}", peer_id);
1393        }
1394    }
1395
1396    /// Get banned peer list
1397    pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1398        self.network_state
1399            .peer_store
1400            .lock()
1401            .ban_list()
1402            .get_banned_addrs()
1403    }
1404
1405    /// Clear banned list
1406    pub fn clear_banned_addrs(&self) {
1407        self.network_state.peer_store.lock().clear_ban_list();
1408    }
1409
1410    /// Get address info from peer store
1411    pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1412        self.network_state
1413            .peer_store
1414            .lock()
1415            .addr_manager()
1416            .get(addr)
1417            .cloned()
1418    }
1419
1420    /// Ban an ip
1421    pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1422        self.disconnect_peers_in_ip_range(address, &ban_reason);
1423        self.network_state
1424            .peer_store
1425            .lock()
1426            .ban_network(address, ban_until, ban_reason)
1427    }
1428
1429    /// Unban an ip
1430    pub fn unban(&self, address: &IpNetwork) {
1431        self.network_state
1432            .peer_store
1433            .lock()
1434            .mut_ban_list()
1435            .unban_network(address);
1436    }
1437
1438    /// Return all connected peers' information
1439    pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1440        self.network_state.with_peer_registry(|reg| {
1441            reg.peers()
1442                .iter()
1443                .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1444                .collect::<Vec<_>>()
1445        })
1446    }
1447
1448    /// Ban an peer through peer index
1449    pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1450        self.network_state
1451            .ban_session(&self.p2p_control, peer_index, duration, reason);
1452    }
1453
1454    /// disconnect peers with matched peer_ip or peer_ip_network, eg: 192.168.0.2 or 192.168.0.0/24
1455    fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1456        self.network_state.with_peer_registry(|reg| {
1457            reg.peers().iter().for_each(|(peer_index, peer)| {
1458                if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1459                    if address.contains(addr.ip()) {
1460                        let _ = disconnect_with_message(
1461                            &self.p2p_control,
1462                            *peer_index,
1463                            &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1464                        );
1465                    }
1466                }
1467            })
1468        });
1469    }
1470
1471    fn try_broadcast(
1472        &self,
1473        quick: bool,
1474        target: Option<SessionId>,
1475        proto_id: ProtocolId,
1476        data: Bytes,
1477    ) -> Result<(), SendErrorKind> {
1478        let now = Instant::now();
1479        loop {
1480            let target = target
1481                .map(TargetSession::Single)
1482                .unwrap_or(TargetSession::All);
1483            let result = if quick {
1484                self.p2p_control
1485                    .quick_filter_broadcast(target, proto_id, data.clone())
1486            } else {
1487                self.p2p_control
1488                    .filter_broadcast(target, proto_id, data.clone())
1489            };
1490            match result {
1491                Ok(()) => {
1492                    return Ok(());
1493                }
1494                Err(SendErrorKind::WouldBlock) => {
1495                    if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1496                        warn!("Broadcast message to {} timeout", proto_id);
1497                        return Err(SendErrorKind::WouldBlock);
1498                    }
1499                    thread::sleep(P2P_TRY_SEND_INTERVAL);
1500                }
1501                Err(err) => {
1502                    warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1503                    return Err(err);
1504                }
1505            }
1506        }
1507    }
1508
1509    fn broadcast_inner<S: Spawn>(
1510        &self,
1511        quick: bool,
1512        target: TargetSession,
1513        proto_id: ProtocolId,
1514        data: Bytes,
1515        handle: &S,
1516    ) {
1517        let async_control: ServiceAsyncControl = self.p2p_control.clone().into();
1518
1519        handle.spawn_task(async move {
1520            if quick {
1521                let _ignore = async_control
1522                    .quick_filter_broadcast(target, proto_id, data)
1523                    .await;
1524            } else {
1525                let _ignore = async_control.filter_broadcast(target, proto_id, data).await;
1526            }
1527        })
1528    }
1529
1530    /// Broadcast a message to all connected peers
1531    pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1532        self.try_broadcast(false, None, proto_id, data)
1533    }
1534
1535    /// Broadcast a message to all connected peers through quick queue
1536    pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1537        self.try_broadcast(true, None, proto_id, data)
1538    }
1539
1540    /// Send message to one connected peer
1541    pub fn send_message_to(
1542        &self,
1543        session_id: SessionId,
1544        proto_id: ProtocolId,
1545        data: Bytes,
1546    ) -> Result<(), SendErrorKind> {
1547        self.try_broadcast(false, Some(session_id), proto_id, data)
1548    }
1549
1550    /// Broadcast a message to all connected peers
1551    pub fn broadcast_with_handle<S: Spawn>(&self, proto_id: ProtocolId, data: Bytes, handle: &S) {
1552        self.broadcast_inner(false, TargetSession::All, proto_id, data, handle)
1553    }
1554
1555    /// Broadcast a message to all connected peers through quick queue
1556    pub fn quick_broadcast_with_handle<S: Spawn>(
1557        &self,
1558        proto_id: ProtocolId,
1559        data: Bytes,
1560        handle: &S,
1561    ) {
1562        self.broadcast_inner(true, TargetSession::All, proto_id, data, handle)
1563    }
1564
1565    /// Send message to one connected peer
1566    pub fn send_message_to_with_handle<S: Spawn>(
1567        &self,
1568        session_id: SessionId,
1569        proto_id: ProtocolId,
1570        data: Bytes,
1571        handle: &S,
1572    ) {
1573        self.broadcast_inner(
1574            false,
1575            TargetSession::Single(session_id),
1576            proto_id,
1577            data,
1578            handle,
1579        )
1580    }
1581
1582    /// network message processing controller, always true, if false, discard any received messages
1583    pub fn is_active(&self) -> bool {
1584        self.network_state.is_active()
1585    }
1586
1587    /// Change active status, if set false discard any received messages
1588    pub fn set_active(&self, active: bool) {
1589        self.network_state.active.store(active, Ordering::Release);
1590    }
1591
1592    /// Return all connected peers' protocols info
1593    pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1594        self.network_state.protocols.read().clone()
1595    }
1596
1597    /// Try ping all connected peers
1598    pub fn ping_peers(&self) {
1599        if let Some(mut ping_controller) = self.ping_controller.clone() {
1600            let _ignore = ping_controller.try_send(());
1601        }
1602    }
1603}
1604
1605// Send an optional message before disconnect a peer
1606pub(crate) fn disconnect_with_message(
1607    control: &ServiceControl,
1608    peer_index: SessionId,
1609    message: &str,
1610) -> Result<(), SendErrorKind> {
1611    if !message.is_empty() {
1612        let data = Bytes::from(message.as_bytes().to_vec());
1613        // Must quick send, otherwise this message will be dropped.
1614        control.quick_send_message_to(
1615            peer_index,
1616            SupportProtocols::DisconnectMessage.protocol_id(),
1617            data,
1618        )?;
1619    }
1620    control.disconnect(peer_index)
1621}
1622
1623pub(crate) async fn async_disconnect_with_message(
1624    control: &ServiceAsyncControl,
1625    peer_index: SessionId,
1626    message: &str,
1627) -> Result<(), SendErrorKind> {
1628    if !message.is_empty() {
1629        let data = Bytes::from(message.as_bytes().to_vec());
1630        // Must quick send, otherwise this message will be dropped.
1631        control
1632            .quick_send_message_to(
1633                peer_index,
1634                SupportProtocols::DisconnectMessage.protocol_id(),
1635                data,
1636            )
1637            .await?;
1638    }
1639    control.disconnect(peer_index).await
1640}
1641
1642/// Transport type on ckb
1643#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1644pub enum TransportType {
1645    /// Tcp
1646    Tcp,
1647    /// Ws
1648    Ws,
1649    /// Wss only on wasm
1650    Wss,
1651}
1652
1653#[allow(dead_code)]
1654pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1655    let mut iter = addr.iter();
1656
1657    iter.find_map(|proto| match proto {
1658        Protocol::Ws => Some(TransportType::Ws),
1659        Protocol::Wss => Some(TransportType::Wss),
1660        _ => None,
1661    })
1662    .unwrap_or(TransportType::Tcp)
1663}