Skip to main content

ckb_network/
network.rs

1//! Global state struct and start function
2use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7    PeerStore,
8    types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11    disconnect_message::DisconnectMessageProtocol,
12    discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13    feeler::Feeler,
14    identify::{Flags, IdentifyCallback, IdentifyProtocol},
15    ping::PingHandler,
16    support_protocols::SupportProtocols,
17};
18#[cfg(not(target_family = "wasm"))]
19use crate::proxy;
20use crate::services::{
21    dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
22    protocol_type_checker::ProtocolTypeCheckerService,
23};
24use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
25use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
26use ckb_logger::{debug, error, info, trace, warn};
27use ckb_spawn::Spawn;
28use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
29use ckb_systemtime::{Duration, Instant};
30use ckb_util::{Condvar, Mutex, RwLock};
31use futures::{Future, channel::mpsc::Sender};
32use ipnetwork::IpNetwork;
33use p2p::error::TransportErrorKind;
34use p2p::{
35    SessionId, async_trait,
36    builder::ServiceBuilder,
37    bytes::Bytes,
38    context::{ServiceContext, SessionContext},
39    error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
40    multiaddr::{Multiaddr, Protocol},
41    secio::{self, PeerId, SecioKeyPair, error::SecioError},
42    service::{
43        ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
44        TargetSession,
45    },
46    traits::ServiceHandle,
47    utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
48    yamux::config::Config as YamuxConfig,
49};
50use rand::prelude::IteratorRandom;
51#[cfg(feature = "with_sentry")]
52use sentry::{Level, capture_message, with_scope};
53#[cfg(not(target_family = "wasm"))]
54use std::sync::mpsc;
55use std::{
56    borrow::Cow,
57    cmp::max,
58    collections::{HashMap, HashSet},
59    pin::Pin,
60    sync::{
61        Arc,
62        atomic::{AtomicBool, Ordering},
63    },
64    thread,
65};
66use tokio::{self, sync::oneshot};
67
68const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
69const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
70// After 5 minutes we consider this dial hang
71const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
72
73/// The global shared state of the network module
74pub struct NetworkState {
75    pub(crate) peer_registry: RwLock<PeerRegistry>,
76    pub(crate) peer_store: Mutex<PeerStore>,
77    /// Node listened addresses
78    pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
79    dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
80    /// Node public addresses by config
81    public_addrs: RwLock<HashSet<Multiaddr>>,
82    observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
83    local_private_key: secio::SecioKeyPair,
84    local_peer_id: PeerId,
85    pub(crate) bootnodes: Vec<Multiaddr>,
86    pub(crate) config: NetworkConfig,
87    pub(crate) active: AtomicBool,
88    /// Node supported protocols
89    /// fields: ProtocolId, Protocol Name, Supported Versions
90    pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
91    pub(crate) required_flags: Flags,
92}
93
94impl NetworkState {
95    /// Init from config
96    #[cfg(not(target_family = "wasm"))]
97    pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
98        config.create_dir_if_not_exists()?;
99        let local_private_key = config.fetch_private_key()?;
100        let local_peer_id = local_private_key.peer_id();
101        // set max score to public addresses
102        let public_addrs: HashSet<Multiaddr> = config
103            .listen_addresses
104            .iter()
105            .chain(config.public_addresses.iter())
106            .cloned()
107            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
108                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
109                _ => {
110                    match extract_peer_id(&addr) {
111                        Some(peer_id) if peer_id != local_peer_id => {
112                            error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
113                            std::process::exit(1);
114                        }
115                        Some(_) => (),
116                        None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
117                    }
118                    Some(addr)
119                }
120            })
121            .collect();
122        info!("Loading the peer store. This process may take a few seconds to complete.");
123
124        let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
125            config.peer_store_path(),
126        ));
127        info!("Loaded the peer store.");
128
129        if let Some(ref proxy_url) = config.proxy.proxy_url {
130            proxy::check_proxy_url(proxy_url).map_err(Error::Config)?;
131        }
132
133        let bootnodes = config.bootnodes();
134
135        let peer_registry = PeerRegistry::new(
136            config.max_inbound_peers(),
137            config.max_outbound_peers(),
138            config.whitelist_only,
139            config.whitelist_peers(),
140            config.disable_block_relay_only_connection,
141        );
142
143        Ok(NetworkState {
144            peer_store,
145            config,
146            bootnodes,
147            peer_registry: RwLock::new(peer_registry),
148            dialing_addrs: RwLock::new(HashMap::default()),
149            public_addrs: RwLock::new(public_addrs),
150            listened_addrs: RwLock::new(Vec::new()),
151            observed_addrs: RwLock::new(HashMap::default()),
152            local_private_key,
153            local_peer_id,
154            active: AtomicBool::new(true),
155            protocols: RwLock::new(Vec::new()),
156            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
157        })
158    }
159
160    #[cfg(target_family = "wasm")]
161    pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
162        let local_private_key = config.fetch_private_key()?;
163        let local_peer_id = local_private_key.peer_id();
164        // set max score to public addresses
165        let public_addrs: HashSet<Multiaddr> = config
166            .listen_addresses
167            .iter()
168            .chain(config.public_addresses.iter())
169            .cloned()
170            .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
171                Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
172                _ => {
173                    if extract_peer_id(&addr).is_none() {
174                        addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
175                    }
176                    Some(addr)
177                }
178            })
179            .collect();
180        info!("Loading the peer store. This process may take a few seconds to complete.");
181        let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
182        let bootnodes = config.bootnodes();
183
184        let peer_registry = PeerRegistry::new(
185            config.max_inbound_peers(),
186            config.max_outbound_peers(),
187            config.whitelist_only,
188            config.whitelist_peers(),
189            config.disable_block_relay_only_connection,
190        );
191        Ok(NetworkState {
192            peer_store,
193            config,
194            bootnodes,
195            peer_registry: RwLock::new(peer_registry),
196            dialing_addrs: RwLock::new(HashMap::default()),
197            public_addrs: RwLock::new(public_addrs),
198            listened_addrs: RwLock::new(Vec::new()),
199            observed_addrs: RwLock::new(HashMap::default()),
200            local_private_key,
201            local_peer_id,
202            active: AtomicBool::new(true),
203            protocols: RwLock::new(Vec::new()),
204            required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
205        })
206    }
207
208    /// use to discovery get nodes message to announce what kind of node information need from the other peer
209    /// default with `Flags::SYNC | Flags::DISCOVERY | Flags::RELAY`
210    pub fn required_flags(mut self, flags: Flags) -> Self {
211        self.required_flags = flags;
212        self
213    }
214
215    pub(crate) fn report_session(
216        &self,
217        p2p_control: &ServiceControl,
218        session_id: SessionId,
219        behaviour: Behaviour,
220    ) {
221        if let Some(addr) = self.with_peer_registry(|reg| {
222            reg.get_peer(session_id)
223                .filter(|peer| !peer.is_whitelist)
224                .map(|peer| peer.connected_addr.clone())
225        }) {
226            trace!("Report {:?} because {:?}", addr, behaviour);
227            let report_result = self.peer_store.lock().report(&addr, behaviour);
228            if report_result.is_banned()
229                && let Err(err) = disconnect_with_message(p2p_control, session_id, "banned")
230            {
231                debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
232            }
233        } else {
234            debug!(
235                "Report {} failure: not found in peer registry or it is on the whitelist",
236                session_id
237            );
238        }
239    }
240
241    pub(crate) fn ban_session(
242        &self,
243        p2p_control: &ServiceControl,
244        session_id: SessionId,
245        duration: Duration,
246        reason: String,
247    ) {
248        if let Some(addr) = self.with_peer_registry(|reg| {
249            reg.get_peer(session_id)
250                .filter(|peer| !peer.is_whitelist)
251                .map(|peer| peer.connected_addr.clone())
252        }) {
253            info!(
254                "Ban peer {:?} for {} seconds, reason: {}",
255                addr,
256                duration.as_secs(),
257                reason
258            );
259            if let Some(metrics) = ckb_metrics::handle() {
260                metrics.ckb_network_ban_peer.inc();
261            }
262            if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
263                let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
264                self.peer_store.lock().ban_addr(
265                    &peer.connected_addr,
266                    duration.as_millis() as u64,
267                    reason,
268                );
269                if let Err(err) =
270                    disconnect_with_message(p2p_control, peer.session_id, message.as_str())
271                {
272                    debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
273                }
274            }
275        } else {
276            debug!(
277                "Ban session({}) failed: not found in peer registry or it is on the whitelist",
278                session_id
279            );
280        }
281    }
282
283    pub(crate) fn accept_peer(
284        &self,
285        session_context: &SessionContext,
286    ) -> Result<Option<Peer>, Error> {
287        // NOTE: be careful, here easy cause a deadlock,
288        //    because peer_store's lock scope across peer_registry's lock scope
289        let mut peer_store = self.peer_store.lock();
290
291        {
292            self.peer_registry.write().accept_peer(
293                session_context.address.clone(),
294                session_context.id,
295                session_context.ty,
296                &mut peer_store,
297            )
298        }
299    }
300
301    /// For restrict lock in inner scope
302    pub fn with_peer_registry<F, T>(&self, callback: F) -> T
303    where
304        F: FnOnce(&PeerRegistry) -> T,
305    {
306        callback(&self.peer_registry.read())
307    }
308
309    // For restrict lock in inner scope
310    pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
311    where
312        F: FnOnce(&mut PeerRegistry) -> T,
313    {
314        callback(&mut self.peer_registry.write())
315    }
316
317    // For restrict lock in inner scope
318    pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
319    where
320        F: FnOnce(&mut PeerStore) -> T,
321    {
322        callback(&mut self.peer_store.lock())
323    }
324
325    /// Get peer id of local node
326    pub fn local_peer_id(&self) -> &PeerId {
327        &self.local_peer_id
328    }
329
330    /// Use on test
331    pub fn local_private_key(&self) -> &secio::SecioKeyPair {
332        &self.local_private_key
333    }
334
335    /// Get local node's peer id in base58 format string
336    pub fn node_id(&self) -> String {
337        self.local_peer_id().to_base58()
338    }
339
340    pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341        let public_addrs = self.public_addrs.read();
342        if public_addrs.len() <= count {
343            return public_addrs.iter().cloned().collect();
344        } else {
345            public_addrs
346                .iter()
347                .cloned()
348                .choose_multiple(&mut rand::thread_rng(), count)
349        }
350    }
351
352    /// After onion service created,
353    /// ckb use this method to add onion address to public_addr
354    pub fn add_public_addr(&self, addr: Multiaddr) {
355        self.public_addrs.write().insert(addr);
356    }
357
358    pub(crate) fn connection_status(&self) -> ConnectionStatus {
359        self.peer_registry.read().connection_status()
360    }
361
362    /// Get local node's listen address list
363    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
364        let listened_addrs = self.listened_addrs.read();
365        self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
366            .into_iter()
367            .filter_map(|addr| {
368                if !listened_addrs.contains(&addr) {
369                    Some((addr, 1))
370                } else {
371                    None
372                }
373            })
374            .chain(listened_addrs.iter().map(|addr| (addr.clone(), 1)))
375            .map(|(addr, score)| (addr.to_string(), score))
376            .collect()
377    }
378
379    pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
380        self.dial_identify(p2p_control, address);
381    }
382
383    /// use a filter to get protocol id list
384    pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
385        self.protocols
386            .read()
387            .iter()
388            .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
389            .collect::<Vec<_>>()
390    }
391
392    pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
393        let peer_id = extract_peer_id(addr);
394        if peer_id.is_none() {
395            error!("Do not dial addr without peer id, addr: {}", addr);
396            return false;
397        }
398        let peer_id = peer_id.as_ref().unwrap();
399
400        if self.local_peer_id() == peer_id {
401            trace!("Do not dial self: {:?}, {}", peer_id, addr);
402            return false;
403        }
404        if self.public_addrs.read().contains(addr) {
405            trace!(
406                "Do not dial listened address(self): {:?}, {}",
407                peer_id, addr
408            );
409            return false;
410        }
411
412        let peer_in_registry = self.with_peer_registry(|reg| {
413            reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
414        });
415        if peer_in_registry {
416            trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
417            return false;
418        }
419
420        if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
421            trace!(
422                "Do not send repeated dial commands to network service: {:?}, {}",
423                peer_id, addr
424            );
425            if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
426                #[cfg(feature = "with_sentry")]
427                with_scope(
428                    |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
429                    || {
430                        capture_message(
431                            &format!(
432                                "Dialing {:?}, {:?} for more than {} seconds, \
433                                 something is wrong in network service",
434                                peer_id,
435                                addr,
436                                DIAL_HANG_TIMEOUT.as_secs(),
437                            ),
438                            Level::Warning,
439                        )
440                    },
441                );
442            }
443            return false;
444        }
445
446        true
447    }
448
449    pub(crate) fn dial_success(&self, addr: &Multiaddr) {
450        if let Some(peer_id) = extract_peer_id(addr) {
451            self.dialing_addrs.write().remove(&peer_id);
452        }
453    }
454
455    pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
456        self.with_peer_registry_mut(|reg| {
457            reg.remove_feeler(addr);
458        });
459
460        if let Some(peer_id) = extract_peer_id(addr) {
461            self.dialing_addrs.write().remove(&peer_id);
462        }
463    }
464
465    /// Dial
466    /// return value indicates the dialing is actually sent or denied.
467    fn dial_inner(
468        &self,
469        p2p_control: &ServiceControl,
470        addr: Multiaddr,
471        target: TargetProtocol,
472    ) -> Result<(), Error> {
473        if !self.can_dial(&addr) {
474            return Err(Error::Dial(format!("ignore dialing addr {addr}")));
475        }
476
477        debug!("Dialing {addr}");
478        p2p_control.dial(addr.clone(), target)?;
479        self.dialing_addrs.write().insert(
480            extract_peer_id(&addr).expect("verified addr"),
481            Instant::now(),
482        );
483        Ok(())
484    }
485
486    /// Dial just identify protocol
487    pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
488        if let Err(err) = self.dial_inner(
489            p2p_control,
490            addr,
491            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
492        ) {
493            debug!("dial_identify error: {err}");
494        }
495    }
496
497    /// Dial just feeler protocol
498    pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
499        if let Err(err) = self.dial_inner(
500            p2p_control,
501            addr.clone(),
502            TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
503        ) {
504            debug!("dial_feeler error {err}");
505        } else {
506            self.with_peer_registry_mut(|reg| {
507                reg.add_feeler(&addr);
508            });
509        }
510    }
511
512    /// add observed address for identify protocol
513    pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
514        let mut pending_observed_addrs = self.observed_addrs.write();
515        pending_observed_addrs.insert(session_id, addr);
516    }
517
518    // randomly select count addresses from observed_addrs
519    pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
520        let observed_addrs = self
521            .observed_addrs
522            .read()
523            .values()
524            .cloned()
525            .collect::<HashSet<_>>();
526        if observed_addrs.len() <= count {
527            return observed_addrs.into_iter().collect();
528        } else {
529            observed_addrs
530                .into_iter()
531                .choose_multiple(&mut rand::thread_rng(), count)
532        }
533    }
534
535    /// Network message processing controller, default is true, if false, discard any received messages
536    pub fn is_active(&self) -> bool {
537        self.active.load(Ordering::Acquire)
538    }
539}
540
541/// Used to handle global events of tentacle, such as session open/close
542pub struct EventHandler {
543    pub(crate) network_state: Arc<NetworkState>,
544}
545
546impl EventHandler {
547    /// init an event handler
548    pub fn new(network_state: Arc<NetworkState>) -> Self {
549        Self { network_state }
550    }
551}
552
553/// Exit trait used to notify all other module to exit
554pub trait ExitHandler: Send + Unpin + 'static {
555    /// notify other module to exit
556    fn notify_exit(&self);
557}
558
559/// Default exit handle
560#[derive(Clone, Default)]
561pub struct DefaultExitHandler {
562    lock: Arc<Mutex<()>>,
563    exit: Arc<Condvar>,
564}
565
566impl DefaultExitHandler {
567    /// Block on current thread util exit notify
568    pub fn wait_for_exit(&self) {
569        self.exit.wait(&mut self.lock.lock());
570    }
571}
572
573impl ExitHandler for DefaultExitHandler {
574    fn notify_exit(&self) {
575        self.exit.notify_all();
576    }
577}
578
579impl EventHandler {
580    fn inbound_eviction(&self) -> Vec<PeerIndex> {
581        if self.network_state.config.bootnode_mode {
582            let status = self.network_state.connection_status();
583
584            if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
585                self.network_state
586                    .with_peer_registry(|registry| {
587                        registry
588                            .peers()
589                            .values()
590                            .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
591                            .map(|peer| peer.session_id)
592                            .collect::<Vec<SessionId>>()
593                    })
594                    .into_iter()
595                    .enumerate()
596                    .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
597                    .collect()
598            } else {
599                Vec::new()
600            }
601        } else {
602            Vec::new()
603        }
604    }
605}
606
607#[async_trait]
608impl ServiceHandle for EventHandler {
609    async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
610        match error {
611            ServiceError::DialerError { address, error } => {
612                match error {
613                    DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
614                        SecioError::ConnectSelf,
615                    )) => {
616                        debug!("dial observed address success: {:?}", address);
617                    }
618                    DialerErrorKind::IoError(e)
619                        if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
620                    {
621                        warn!("DialerError({}) {}", address, e);
622                    }
623                    DialerErrorKind::TransportError(e)
624                        if matches!(&e, TransportErrorKind::ProxyError(_proxy_err)) =>
625                    {
626                        let err = e.to_string();
627                        if err.contains("failed to establish connection to target:General failure")
628                            || err.contains(
629                                "failed to establish connection to target:Connection refused",
630                            )
631                        {
632                            debug!("DialerError({}) {}", address, e);
633                        } else {
634                            error!("Is the proxy server down? DialerError({}) {}", address, e);
635                        }
636                    }
637                    _ => {
638                        debug!("DialerError({}) {}", address, error);
639                    }
640                }
641                self.network_state.dial_failed(&address);
642            }
643            ServiceError::ProtocolError {
644                id,
645                proto_id,
646                error,
647            } => {
648                debug!("ProtocolError({}, {}) {}", id, proto_id, error);
649                let message = format!("ProtocolError id={proto_id}");
650                // Ban because misbehave of remote peer
651                self.network_state.ban_session(
652                    &context.control().clone().into(),
653                    id,
654                    Duration::from_secs(300),
655                    message,
656                );
657            }
658            ServiceError::SessionTimeout { session_context } => {
659                debug!(
660                    "SessionTimeout({}, {})",
661                    session_context.id, session_context.address,
662                );
663            }
664            ServiceError::MuxerError {
665                session_context,
666                error,
667            } => {
668                debug!(
669                    "MuxerError({}, {}), substream error {}, disconnect it",
670                    session_context.id, session_context.address, error,
671                );
672            }
673            ServiceError::ListenError { address, error } => {
674                debug!("ListenError: address={:?}, error={:?}", address, error);
675            }
676            ServiceError::ProtocolSelectError {
677                proto_name,
678                session_context,
679            } => {
680                debug!(
681                    "ProtocolSelectError: proto_name={:?}, session_id={}",
682                    proto_name, session_context.id,
683                );
684            }
685            ServiceError::SessionBlocked { session_context } => {
686                debug!("SessionBlocked: {}", session_context.id);
687            }
688            ServiceError::ProtocolHandleError { proto_id, error } => {
689                debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
690
691                let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
692                {
693                    if let Some(id) = opt_session_id {
694                        self.network_state.ban_session(
695                            &context.control().clone().into(),
696                            id,
697                            Duration::from_secs(300),
698                            format!("protocol {proto_id} panic when process peer message"),
699                        );
700                    }
701                    #[cfg(feature = "with_sentry")]
702                    with_scope(
703                        |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
704                        || {
705                            capture_message(
706                                &format!(
707                                    "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
708                                ),
709                                Level::Warning,
710                            )
711                        },
712                    );
713                    error!(
714                        "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
715                    );
716
717                    broadcast_exit_signals();
718                }
719            }
720        }
721    }
722
723    async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
724        // When session disconnect update status anyway
725        match event {
726            ServiceEvent::SessionOpen { session_context } => {
727                debug!(
728                    "SessionOpen({}, {})",
729                    session_context.id, session_context.address,
730                );
731
732                self.network_state.dial_success(&session_context.address);
733
734                let iter = self.inbound_eviction();
735
736                let control = context.control().clone().into();
737
738                for peer in iter {
739                    if let Err(err) =
740                        disconnect_with_message(&control, peer, "bootnode random eviction")
741                    {
742                        debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
743                    }
744                }
745
746                if self
747                    .network_state
748                    .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
749                {
750                    debug!(
751                        "Feeler connected {} => {}",
752                        session_context.id, session_context.address,
753                    );
754                } else {
755                    match self.network_state.accept_peer(&session_context) {
756                        Ok(Some(evicted_peer)) => {
757                            debug!(
758                                "Disconnect peer, {} => {}",
759                                evicted_peer.session_id, evicted_peer.connected_addr,
760                            );
761                            if let Err(err) = disconnect_with_message(
762                                &control,
763                                evicted_peer.session_id,
764                                "evict because accepted better peer",
765                            ) {
766                                debug!(
767                                    "Disconnect failed {:?}, error: {:?}",
768                                    evicted_peer.session_id, err
769                                );
770                            }
771                        }
772                        Ok(None) => debug!(
773                            "{} open, registry {} success",
774                            session_context.id, session_context.address,
775                        ),
776                        Err(err) => {
777                            debug!(
778                                "Peer registry failed {:?}. Disconnect {} => {}",
779                                err, session_context.id, session_context.address,
780                            );
781                            if let Err(err) = disconnect_with_message(
782                                &control,
783                                session_context.id,
784                                "reject peer connection",
785                            ) {
786                                debug!(
787                                    "Disconnect failed {:?}, error: {:?}",
788                                    session_context.id, err
789                                );
790                            }
791                        }
792                    }
793                }
794            }
795            ServiceEvent::SessionClose { session_context } => {
796                debug!(
797                    "SessionClose({}, {})",
798                    session_context.id, session_context.address,
799                );
800                let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
801                    // should make sure feelers is clean
802                    reg.remove_feeler(&session_context.address);
803                    reg.remove_peer(session_context.id).is_some()
804                });
805                if peer_exists {
806                    debug!(
807                        "{} closed. Remove {} from peer_registry",
808                        session_context.id, session_context.address,
809                    );
810                    self.network_state.with_peer_store_mut(|peer_store| {
811                        peer_store.remove_disconnected_peer(&session_context.address);
812                    });
813                }
814                self.network_state
815                    .observed_addrs
816                    .write()
817                    .remove(&session_context.id);
818            }
819            _ => {
820                info!("p2p service event: {:?}", event);
821            }
822        }
823    }
824}
825
826/// Ckb network service, use to start p2p network
827pub struct NetworkService {
828    p2p_service: Service<EventHandler, SecioKeyPair>,
829    network_state: Arc<NetworkState>,
830    ping_controller: Option<Sender<()>>,
831    // Background services
832    bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
833    version: String,
834}
835
836impl NetworkService {
837    /// init with all config
838    pub fn new(
839        network_state: Arc<NetworkState>,
840        protocols: Vec<CKBProtocol>,
841        required_protocol_ids: Vec<ProtocolId>,
842        // name, version, flags
843        identify_announce: (String, String, Flags),
844        transport_type: TransportType,
845    ) -> Self {
846        let config = &network_state.config;
847
848        if config.support_protocols.iter().collect::<HashSet<_>>()
849            != default_support_all_protocols()
850                .iter()
851                .collect::<HashSet<_>>()
852        {
853            warn!(
854                "Customized supported protocols: {:?}",
855                config.support_protocols
856            );
857        }
858
859        // == Build p2p service struct
860        let mut protocol_metas = protocols
861            .into_iter()
862            .map(CKBProtocol::build)
863            .collect::<Vec<_>>();
864
865        // == Build special protocols
866
867        // Identify is a core protocol, user cannot disable it via config
868        let identify_callback = IdentifyCallback::new(
869            Arc::clone(&network_state),
870            identify_announce.0,
871            identify_announce.1.clone(),
872            identify_announce.2,
873        );
874        let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
875            ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
876        });
877        protocol_metas.push(identify_meta);
878
879        // Ping protocol
880        let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
881            let ping_interval = Duration::from_secs(config.ping_interval_secs);
882            let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
883
884            let ping_network_state = Arc::clone(&network_state);
885            let (ping_handler, ping_controller) =
886                PingHandler::new(ping_interval, ping_timeout, ping_network_state);
887            let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
888                ProtocolHandle::Callback(Box::new(ping_handler))
889            });
890            protocol_metas.push(ping_meta);
891            Some(ping_controller)
892        } else {
893            None
894        };
895
896        // Discovery protocol
897        if config
898            .support_protocols
899            .contains(&SupportProtocol::Discovery)
900        {
901            let addr_mgr = DiscoveryAddressManager {
902                network_state: Arc::clone(&network_state),
903                discovery_local_address: config.discovery_local_address,
904            };
905            let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
906                ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
907                    addr_mgr,
908                    config
909                        .discovery_announce_check_interval_secs
910                        .map(Duration::from_secs),
911                )))
912            });
913            protocol_metas.push(disc_meta);
914        }
915
916        // Feeler protocol
917        if config.support_protocols.contains(&SupportProtocol::Feeler) {
918            let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
919                let network_state = Arc::clone(&network_state);
920                move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
921            });
922            protocol_metas.push(feeler_meta);
923        }
924
925        // DisconnectMessage protocol
926        if config
927            .support_protocols
928            .contains(&SupportProtocol::DisconnectMessage)
929        {
930            let disconnect_message_state = Arc::clone(&network_state);
931            let disconnect_message_meta = SupportProtocols::DisconnectMessage
932                .build_meta_with_service_handle(move || {
933                    ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
934                        disconnect_message_state,
935                    )))
936                });
937            protocol_metas.push(disconnect_message_meta);
938        }
939
940        // HolePunching protocol
941        #[cfg(not(target_family = "wasm"))]
942        if config
943            .support_protocols
944            .contains(&SupportProtocol::HolePunching)
945        {
946            let hole_punching_state = Arc::clone(&network_state);
947            let hole_punching_meta =
948                SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
949                    ProtocolHandle::Callback(Box::new(
950                        crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
951                    ))
952                });
953            protocol_metas.push(hole_punching_meta);
954        }
955
956        let mut service_builder = ServiceBuilder::default();
957        let yamux_config = YamuxConfig {
958            max_stream_count: protocol_metas.len(),
959            max_stream_window_size: 1024 * 1024,
960            ..Default::default()
961        };
962        for meta in protocol_metas.into_iter() {
963            network_state
964                .protocols
965                .write()
966                .push((meta.id(), meta.name(), meta.support_versions()));
967            service_builder = service_builder.insert_protocol(meta);
968        }
969        let event_handler = EventHandler {
970            network_state: Arc::clone(&network_state),
971        };
972        service_builder = service_builder
973            .handshake_type(network_state.local_private_key.clone().into())
974            .yamux_config(yamux_config)
975            .forever(true)
976            .max_connection_number(1024)
977            .set_send_buffer_size(config.max_send_buffer())
978            .set_channel_size(config.channel_size())
979            .timeout(Duration::from_secs(5))
980            .onion_timeout(Duration::from_secs(120))
981            .trusted_proxies(config.trusted_proxies.clone());
982
983        #[cfg(not(target_family = "wasm"))]
984        {
985            service_builder = service_builder.upnp(config.upnp);
986
987            // set proxy and onion config
988
989            if let Some(proxy_url) = &config.proxy.proxy_url {
990                service_builder = service_builder
991                    .tcp_proxy_config(proxy_url)
992                    .tcp_proxy_random_auth(config.proxy.proxy_random_auth);
993                info!(
994                    "set tcp_proxy_config: {:?}, proxy_random_auth: {}",
995                    config.proxy.proxy_url.clone(),
996                    config.proxy.proxy_random_auth
997                );
998            };
999
1000            let onion_proxy_url = {
1001                config.onion.onion_server.clone().map(|onion_server| {
1002                    if !onion_server.starts_with("socks5://") {
1003                        format!("socks5://{}", onion_server)
1004                    } else {
1005                        onion_server
1006                    }
1007                })
1008            };
1009            if let Some(onion_proxy_url) = onion_proxy_url {
1010                info!("set tcp_onion_config: {:?}", onion_proxy_url);
1011                service_builder = service_builder.tcp_onion_config(&onion_proxy_url);
1012            }
1013        }
1014
1015        #[cfg(target_os = "linux")]
1016        let p2p_service = {
1017            if config.reuse_port_on_linux {
1018                let iter = config.listen_addresses.iter();
1019
1020                #[derive(Clone, Copy, Debug, Eq, PartialEq)]
1021                enum BindType {
1022                    None,
1023                    Ws,
1024                    Tcp,
1025                    Both,
1026                }
1027                impl BindType {
1028                    fn transform(&mut self, other: TransportType) {
1029                        match (&self, other) {
1030                            (BindType::None, TransportType::Ws) => *self = BindType::Ws,
1031                            (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
1032                            (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
1033                            (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
1034                            _ => (),
1035                        }
1036                    }
1037
1038                    fn is_ready(&self) -> bool {
1039                        // should change to Both if ckb enable ws
1040                        matches!(self, BindType::Both)
1041                    }
1042                }
1043
1044                let mut init = BindType::None;
1045
1046                let proxy_config_enable =
1047                    config.proxy.proxy_url.is_some() || config.onion.onion_server.is_some();
1048
1049                let bind_fn_with_addr =
1050                    move |socket: p2p::service::TcpSocket,
1051                          ctxt: p2p::service::TransformerContext,
1052                          addr: std::net::SocketAddr| {
1053                        let socket_ref = socket2::SockRef::from(&socket);
1054                        #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
1055                        socket_ref.set_reuse_port(true)?;
1056                        socket_ref.set_reuse_address(true)?;
1057                        match ctxt.state {
1058                            p2p::service::SocketState::Listen => Ok(socket),
1059                            p2p::service::SocketState::Dial => {
1060                                let domain = socket2::Domain::for_address(addr);
1061                                if socket_ref.domain()? == domain {
1062                                    if proxy_config_enable {
1063                                        // skip bind if proxy enabled
1064                                        debug!("skip bind since proxy is enabled");
1065                                    } else {
1066                                        socket_ref.bind(&addr.into())?;
1067                                    }
1068                                }
1069                                Ok(socket)
1070                            }
1071                        }
1072                    };
1073
1074                for multi_addr in iter {
1075                    if init.is_ready() {
1076                        break;
1077                    }
1078
1079                    match find_type(multi_addr) {
1080                        TransportType::Tcp => {
1081                            // only bind once
1082                            if matches!(init, BindType::Tcp) {
1083                                continue;
1084                            }
1085                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1086                                let bind_fn = move |socket: p2p::service::TcpSocket,
1087                                                    ctxt: p2p::service::TransformerContext| {
1088                                    bind_fn_with_addr(socket, ctxt, addr)
1089                                };
1090                                init.transform(TransportType::Tcp);
1091                                service_builder = service_builder.tcp_config(bind_fn);
1092                            }
1093                        }
1094                        TransportType::Ws | TransportType::Wss => {
1095                            // only bind once
1096                            if matches!(init, BindType::Ws) {
1097                                continue;
1098                            }
1099                            if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1100                                let bind_fn = move |socket: p2p::service::TcpSocket,
1101                                                    ctxt: p2p::service::TransformerContext| {
1102                                    bind_fn_with_addr(socket, ctxt, addr)
1103                                };
1104                                init.transform(TransportType::Ws);
1105                                service_builder = service_builder.tcp_config_on_ws(bind_fn);
1106                            }
1107                        }
1108                    }
1109                }
1110            }
1111
1112            service_builder.build(event_handler)
1113        };
1114
1115        #[cfg(not(target_os = "linux"))]
1116        // The default permissions of Windows are not enough to enable this function,
1117        // and the administrator permissions of group permissions must be turned on.
1118        // This operation is very burdensome for windows users, so it is turned off by default
1119        //
1120        // The integration test fails after MacOS is turned on, the behavior is different from linux.
1121        // Decision to turn off it
1122        let p2p_service = service_builder.build(event_handler);
1123
1124        // == Build background service tasks
1125        let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1126        let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1127            Arc::clone(&network_state),
1128            p2p_service.control().to_owned().into(),
1129            required_protocol_ids,
1130        );
1131        let mut bg_services = vec![
1132            Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1133            Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1134        ];
1135        if config.outbound_peer_service_enabled() {
1136            let outbound_peer_service = OutboundPeerService::new(
1137                Arc::clone(&network_state),
1138                p2p_service.control().to_owned().into(),
1139                Duration::from_secs(config.connect_outbound_interval_secs),
1140                transport_type,
1141            );
1142            bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1143        };
1144
1145        #[cfg(feature = "with_dns_seeding")]
1146        if config.dns_seeding_service_enabled() {
1147            let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1148                Arc::clone(&network_state),
1149                config.dns_seeds.clone(),
1150            );
1151            bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1152        };
1153
1154        NetworkService {
1155            p2p_service,
1156            network_state,
1157            ping_controller,
1158            bg_services,
1159            version: identify_announce.1,
1160        }
1161    }
1162
1163    /// Start the network in the background and return a controller
1164    pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1165        let config = self.network_state.config.clone();
1166
1167        let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1168
1169        // dial whitelist_nodes
1170        for addr in self.network_state.config.whitelist_peers() {
1171            debug!("Dial whitelist_peers {:?}", addr);
1172            self.network_state.dial_identify(&p2p_control, addr);
1173        }
1174
1175        let target = &self.network_state.required_flags;
1176
1177        // get bootnodes
1178        // try get addrs from peer_store, if peer_store have no enough addrs then use bootnodes
1179        let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1180            let count = max((config.max_outbound_peers >> 1) as usize, 1);
1181            let mut addrs: Vec<_> = peer_store
1182                .fetch_addrs_to_attempt(count, *target, |_| true)
1183                .into_iter()
1184                .map(|paddr| paddr.addr)
1185                .collect();
1186            // tried to re-connect to anchors on startup
1187            let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1188            addrs.extend(anchors);
1189            // Get bootnodes randomly
1190            let bootnodes = self
1191                .network_state
1192                .bootnodes
1193                .iter()
1194                .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1195                .into_iter()
1196                .cloned();
1197            addrs.extend(bootnodes);
1198            addrs
1199        });
1200
1201        // dial half bootnodes
1202        for addr in bootnodes {
1203            debug!("Dial bootnode {:?}", addr);
1204            self.network_state.dial_identify(&p2p_control, addr);
1205        }
1206
1207        let Self {
1208            mut p2p_service,
1209            network_state,
1210            ping_controller,
1211            bg_services,
1212            version,
1213        } = self;
1214
1215        // NOTE: for ensure background task finished
1216        let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1217            .into_iter()
1218            .map(|bg_service| {
1219                let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1220                (signal_sender, (bg_service, signal_receiver))
1221            })
1222            .unzip();
1223
1224        let receiver: CancellationToken = new_tokio_exit_rx();
1225        #[cfg(not(target_family = "wasm"))]
1226        let (start_sender, start_receiver) = mpsc::channel();
1227        {
1228            #[cfg(not(target_family = "wasm"))]
1229            let network_state = Arc::clone(&network_state);
1230            let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1231            handle.spawn_task(async move {
1232                #[cfg(not(target_family = "wasm"))]
1233                {
1234                    let listen_addresses = {
1235                        let mut addresses = config.listen_addresses.clone();
1236                        if config.reuse_tcp_with_ws {
1237                            let ws_listens = addresses
1238                                .iter()
1239                                .cloned()
1240                                .filter_map(|mut addr| {
1241                                    if matches!(find_type(&addr), TransportType::Tcp) {
1242                                        addr.push(Protocol::Ws);
1243                                        Some(addr)
1244                                    } else {
1245                                        None
1246                                    }
1247                                })
1248                                .collect::<Vec<_>>();
1249
1250                            addresses.extend(ws_listens);
1251                        }
1252                        let mut addresses = addresses
1253                            .into_iter()
1254                            .collect::<HashSet<_>>()
1255                            .into_iter()
1256                            .collect::<Vec<_>>();
1257                        addresses.sort_by(|a, b| {
1258                            let ty_a = find_type(a);
1259                            let ty_b = find_type(b);
1260
1261                            ty_a.cmp(&ty_b)
1262                        });
1263
1264                        addresses
1265                    };
1266
1267                    for addr in &listen_addresses {
1268                        match p2p_service.listen(addr.to_owned()).await {
1269                            Ok(listen_address) => {
1270                                info!("Listen on address: {}", listen_address);
1271                                network_state
1272                                    .listened_addrs
1273                                    .write()
1274                                    .push(listen_address.clone());
1275                            }
1276                            Err(err) => {
1277                                warn!(
1278                                    "Listen on address {} failed, due to error: {}",
1279                                    addr.clone(),
1280                                    err
1281                                );
1282                                start_sender
1283                                    .send(Err(Error::P2P(P2PError::Transport(err))))
1284                                    .expect("channel abnormal shutdown");
1285                                return;
1286                            }
1287                        };
1288                    }
1289                    start_sender.send(Ok(())).unwrap();
1290                }
1291
1292                p2p::runtime::spawn(async move { p2p_service.run().await });
1293                tokio::select! {
1294                    _ = receiver.cancelled() => {
1295                        info!("NetworkService receive exit signal, start shutdown...");
1296                        let _ = p2p_control.shutdown().await;
1297                        // Drop senders to stop all corresponding background task
1298                        drop(bg_signals);
1299
1300                        info!("NetworkService shutdown now");
1301                    },
1302                    else => {
1303                        let _ = p2p_control.shutdown().await;
1304                        // Drop senders to stop all corresponding background task
1305                        drop(bg_signals);
1306                    },
1307                }
1308            });
1309        }
1310        for (mut service, mut receiver) in bg_receivers {
1311            handle.spawn_task(async move {
1312                loop {
1313                    tokio::select! {
1314                        _ = &mut service => {},
1315                        _ = &mut receiver => break
1316                    }
1317                }
1318            });
1319        }
1320        #[cfg(not(target_family = "wasm"))]
1321        if let Ok(Err(e)) = start_receiver.recv() {
1322            return Err(e);
1323        }
1324
1325        Ok(NetworkController {
1326            version,
1327            network_state,
1328            p2p_control,
1329            ping_controller,
1330        })
1331    }
1332}
1333
1334/// Network controller
1335#[derive(Clone)]
1336pub struct NetworkController {
1337    version: String,
1338    network_state: Arc<NetworkState>,
1339    p2p_control: ServiceControl,
1340    ping_controller: Option<Sender<()>>,
1341}
1342
1343impl NetworkController {
1344    /// Node listen address list
1345    pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1346        self.network_state.public_urls(max_urls)
1347    }
1348
1349    /// ckb version
1350    pub fn version(&self) -> &String {
1351        &self.version
1352    }
1353
1354    /// Node peer id's base58 format string
1355    pub fn node_id(&self) -> String {
1356        self.network_state.node_id()
1357    }
1358
1359    /// p2p service control
1360    pub fn p2p_control(&self) -> &ServiceControl {
1361        &self.p2p_control
1362    }
1363
1364    /// async p2p service control
1365    pub fn async_p2p_control(&self) -> ServiceAsyncControl {
1366        self.p2p_control.clone().into()
1367    }
1368
1369    /// Dial remote node
1370    pub fn add_node(&self, address: Multiaddr) {
1371        self.network_state.add_node(&self.p2p_control, address)
1372    }
1373
1374    /// Add a public_addr to NetworkState.public_addrs
1375    pub fn add_public_addr(&self, public_addr: Multiaddr) {
1376        self.network_state.add_public_addr(public_addr)
1377    }
1378
1379    /// Disconnect session with peer id
1380    pub fn remove_node(&self, peer_id: &PeerId) {
1381        if let Some(session_id) = self
1382            .network_state
1383            .peer_registry
1384            .read()
1385            .get_key_by_peer_id(peer_id)
1386        {
1387            if let Err(err) =
1388                disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1389            {
1390                debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1391            }
1392        } else {
1393            error!("Cannot find peer {:?}", peer_id);
1394        }
1395    }
1396
1397    /// Get banned peer list
1398    pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1399        self.network_state
1400            .peer_store
1401            .lock()
1402            .ban_list()
1403            .get_banned_addrs()
1404    }
1405
1406    /// Clear banned list
1407    pub fn clear_banned_addrs(&self) {
1408        self.network_state.peer_store.lock().clear_ban_list();
1409    }
1410
1411    /// Get address info from peer store
1412    pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1413        self.network_state
1414            .peer_store
1415            .lock()
1416            .addr_manager()
1417            .get(addr)
1418            .cloned()
1419    }
1420
1421    /// Ban an ip
1422    pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1423        self.disconnect_peers_in_ip_range(address, &ban_reason);
1424        self.network_state
1425            .peer_store
1426            .lock()
1427            .ban_network(address, ban_until, ban_reason)
1428    }
1429
1430    /// Unban an ip
1431    pub fn unban(&self, address: &IpNetwork) {
1432        self.network_state
1433            .peer_store
1434            .lock()
1435            .mut_ban_list()
1436            .unban_network(address);
1437    }
1438
1439    /// Return all connected peers' information
1440    pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1441        self.network_state.with_peer_registry(|reg| {
1442            reg.peers()
1443                .iter()
1444                .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1445                .collect::<Vec<_>>()
1446        })
1447    }
1448
1449    /// Ban an peer through peer index
1450    pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1451        self.network_state
1452            .ban_session(&self.p2p_control, peer_index, duration, reason);
1453    }
1454
1455    /// disconnect peers with matched peer_ip or peer_ip_network, eg: 192.168.0.2 or 192.168.0.0/24
1456    fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1457        self.network_state.with_peer_registry(|reg| {
1458            reg.peers().iter().for_each(|(peer_index, peer)| {
1459                if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr)
1460                    && address.contains(addr.ip())
1461                {
1462                    let _ = disconnect_with_message(
1463                        &self.p2p_control,
1464                        *peer_index,
1465                        &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1466                    );
1467                }
1468            })
1469        });
1470    }
1471
1472    fn try_broadcast(
1473        &self,
1474        quick: bool,
1475        target: Option<SessionId>,
1476        proto_id: ProtocolId,
1477        data: Bytes,
1478    ) -> Result<(), SendErrorKind> {
1479        let now = Instant::now();
1480        loop {
1481            let target = target
1482                .map(TargetSession::Single)
1483                .unwrap_or(TargetSession::All);
1484            let result = if quick {
1485                self.p2p_control
1486                    .quick_filter_broadcast(target, proto_id, data.clone())
1487            } else {
1488                self.p2p_control
1489                    .filter_broadcast(target, proto_id, data.clone())
1490            };
1491            match result {
1492                Ok(()) => {
1493                    return Ok(());
1494                }
1495                Err(SendErrorKind::WouldBlock) => {
1496                    if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1497                        warn!("Broadcast message to {} timeout", proto_id);
1498                        return Err(SendErrorKind::WouldBlock);
1499                    }
1500                    thread::sleep(P2P_TRY_SEND_INTERVAL);
1501                }
1502                Err(err) => {
1503                    warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1504                    return Err(err);
1505                }
1506            }
1507        }
1508    }
1509
1510    fn broadcast_inner<S: Spawn>(
1511        &self,
1512        quick: bool,
1513        target: TargetSession,
1514        proto_id: ProtocolId,
1515        data: Bytes,
1516        handle: &S,
1517    ) {
1518        let async_control: ServiceAsyncControl = self.p2p_control.clone().into();
1519
1520        handle.spawn_task(async move {
1521            if quick {
1522                let _ignore = async_control
1523                    .quick_filter_broadcast(target, proto_id, data)
1524                    .await;
1525            } else {
1526                let _ignore = async_control.filter_broadcast(target, proto_id, data).await;
1527            }
1528        })
1529    }
1530
1531    /// Broadcast a message to all connected peers
1532    pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1533        self.try_broadcast(false, None, proto_id, data)
1534    }
1535
1536    /// Broadcast a message to all connected peers through quick queue
1537    pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1538        self.try_broadcast(true, None, proto_id, data)
1539    }
1540
1541    /// Send message to one connected peer
1542    pub fn send_message_to(
1543        &self,
1544        session_id: SessionId,
1545        proto_id: ProtocolId,
1546        data: Bytes,
1547    ) -> Result<(), SendErrorKind> {
1548        self.try_broadcast(false, Some(session_id), proto_id, data)
1549    }
1550
1551    /// Broadcast a message to all connected peers
1552    pub fn broadcast_with_handle<S: Spawn>(&self, proto_id: ProtocolId, data: Bytes, handle: &S) {
1553        self.broadcast_inner(false, TargetSession::All, proto_id, data, handle)
1554    }
1555
1556    /// Broadcast a message to all connected peers through quick queue
1557    pub fn quick_broadcast_with_handle<S: Spawn>(
1558        &self,
1559        proto_id: ProtocolId,
1560        data: Bytes,
1561        handle: &S,
1562    ) {
1563        self.broadcast_inner(true, TargetSession::All, proto_id, data, handle)
1564    }
1565
1566    /// Send message to one connected peer
1567    pub fn send_message_to_with_handle<S: Spawn>(
1568        &self,
1569        session_id: SessionId,
1570        proto_id: ProtocolId,
1571        data: Bytes,
1572        handle: &S,
1573    ) {
1574        self.broadcast_inner(
1575            false,
1576            TargetSession::Single(session_id),
1577            proto_id,
1578            data,
1579            handle,
1580        )
1581    }
1582
1583    /// network message processing controller, always true, if false, discard any received messages
1584    pub fn is_active(&self) -> bool {
1585        self.network_state.is_active()
1586    }
1587
1588    /// Change active status, if set false discard any received messages
1589    pub fn set_active(&self, active: bool) {
1590        self.network_state.active.store(active, Ordering::Release);
1591    }
1592
1593    /// Return all connected peers' protocols info
1594    pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1595        self.network_state.protocols.read().clone()
1596    }
1597
1598    /// Try ping all connected peers
1599    pub fn ping_peers(&self) {
1600        if let Some(mut ping_controller) = self.ping_controller.clone() {
1601            let _ignore = ping_controller.try_send(());
1602        }
1603    }
1604}
1605
1606// Send an optional message before disconnect a peer
1607pub(crate) fn disconnect_with_message(
1608    control: &ServiceControl,
1609    peer_index: SessionId,
1610    message: &str,
1611) -> Result<(), SendErrorKind> {
1612    if !message.is_empty() {
1613        let data = Bytes::from(message.as_bytes().to_vec());
1614        // Must quick send, otherwise this message will be dropped.
1615        control.quick_send_message_to(
1616            peer_index,
1617            SupportProtocols::DisconnectMessage.protocol_id(),
1618            data,
1619        )?;
1620    }
1621    control.disconnect(peer_index)
1622}
1623
1624pub(crate) async fn async_disconnect_with_message(
1625    control: &ServiceAsyncControl,
1626    peer_index: SessionId,
1627    message: &str,
1628) -> Result<(), SendErrorKind> {
1629    if !message.is_empty() {
1630        let data = Bytes::from(message.as_bytes().to_vec());
1631        // Must quick send, otherwise this message will be dropped.
1632        control
1633            .quick_send_message_to(
1634                peer_index,
1635                SupportProtocols::DisconnectMessage.protocol_id(),
1636                data,
1637            )
1638            .await?;
1639    }
1640    control.disconnect(peer_index).await
1641}
1642
1643/// Transport type on ckb
1644#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1645pub enum TransportType {
1646    /// Tcp
1647    Tcp,
1648    /// Ws
1649    Ws,
1650    /// Wss only on wasm
1651    Wss,
1652}
1653
1654#[allow(dead_code)]
1655pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1656    let mut iter = addr.iter();
1657
1658    iter.find_map(|proto| match proto {
1659        Protocol::Ws => Some(TransportType::Ws),
1660        Protocol::Wss => Some(TransportType::Wss),
1661        _ => None,
1662    })
1663    .unwrap_or(TransportType::Tcp)
1664}