1use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7 PeerStore,
8 types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11 disconnect_message::DisconnectMessageProtocol,
12 discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13 feeler::Feeler,
14 identify::{Flags, IdentifyCallback, IdentifyProtocol},
15 ping::PingHandler,
16 support_protocols::SupportProtocols,
17};
18use crate::services::{
19 dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20 protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{Future, channel::mpsc::Sender};
30use ipnetwork::IpNetwork;
31use p2p::{
32 SessionId, async_trait,
33 builder::ServiceBuilder,
34 bytes::Bytes,
35 context::{ServiceContext, SessionContext},
36 error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37 multiaddr::{Multiaddr, Protocol},
38 secio::{self, PeerId, SecioKeyPair, error::SecioError},
39 service::{
40 ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41 TargetSession,
42 },
43 traits::ServiceHandle,
44 utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45 yamux::config::Config as YamuxConfig,
46};
47use rand::prelude::IteratorRandom;
48#[cfg(feature = "with_sentry")]
49use sentry::{Level, capture_message, with_scope};
50#[cfg(not(target_family = "wasm"))]
51use std::sync::mpsc;
52use std::{
53 borrow::Cow,
54 cmp::max,
55 collections::{HashMap, HashSet},
56 pin::Pin,
57 sync::{
58 Arc,
59 atomic::{AtomicBool, Ordering},
60 },
61 thread,
62};
63use tokio::{self, sync::oneshot};
64
65const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
66const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
67const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
69
70pub struct NetworkState {
72 pub(crate) peer_registry: RwLock<PeerRegistry>,
73 pub(crate) peer_store: Mutex<PeerStore>,
74 pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
76 dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77 public_addrs: HashSet<Multiaddr>,
79 observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
80 local_private_key: secio::SecioKeyPair,
81 local_peer_id: PeerId,
82 pub(crate) bootnodes: Vec<Multiaddr>,
83 pub(crate) config: NetworkConfig,
84 pub(crate) active: AtomicBool,
85 pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
88 pub(crate) required_flags: Flags,
89}
90
91impl NetworkState {
92 #[cfg(not(target_family = "wasm"))]
94 pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
95 config.create_dir_if_not_exists()?;
96 let local_private_key = config.fetch_private_key()?;
97 let local_peer_id = local_private_key.peer_id();
98 let public_addrs: HashSet<Multiaddr> = config
100 .listen_addresses
101 .iter()
102 .chain(config.public_addresses.iter())
103 .cloned()
104 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
105 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
106 _ => {
107 match extract_peer_id(&addr) {
108 Some(peer_id) if peer_id != local_peer_id => {
109 error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
110 std::process::exit(1);
111 }
112 Some(_) => (),
113 None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
114 }
115 Some(addr)
116 }
117 })
118 .collect();
119 info!("Loading the peer store. This process may take a few seconds to complete.");
120
121 let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
122 config.peer_store_path(),
123 ));
124 let bootnodes = config.bootnodes();
125
126 let peer_registry = PeerRegistry::new(
127 config.max_inbound_peers(),
128 config.max_outbound_peers(),
129 config.whitelist_only,
130 config.whitelist_peers(),
131 config.disable_block_relay_only_connection,
132 );
133
134 Ok(NetworkState {
135 peer_store,
136 config,
137 bootnodes,
138 peer_registry: RwLock::new(peer_registry),
139 dialing_addrs: RwLock::new(HashMap::default()),
140 public_addrs,
141 listened_addrs: RwLock::new(Vec::new()),
142 observed_addrs: RwLock::new(HashMap::default()),
143 local_private_key,
144 local_peer_id,
145 active: AtomicBool::new(true),
146 protocols: RwLock::new(Vec::new()),
147 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
148 })
149 }
150
151 #[cfg(target_family = "wasm")]
152 pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
153 let local_private_key = config.fetch_private_key()?;
154 let local_peer_id = local_private_key.peer_id();
155 let public_addrs: HashSet<Multiaddr> = config
157 .listen_addresses
158 .iter()
159 .chain(config.public_addresses.iter())
160 .cloned()
161 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
162 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
163 _ => {
164 if extract_peer_id(&addr).is_none() {
165 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
166 }
167 Some(addr)
168 }
169 })
170 .collect();
171 info!("Loading the peer store. This process may take a few seconds to complete.");
172 let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
173 let bootnodes = config.bootnodes();
174
175 let peer_registry = PeerRegistry::new(
176 config.max_inbound_peers(),
177 config.max_outbound_peers(),
178 config.whitelist_only,
179 config.whitelist_peers(),
180 config.disable_block_relay_only_connection,
181 );
182 Ok(NetworkState {
183 peer_store,
184 config,
185 bootnodes,
186 peer_registry: RwLock::new(peer_registry),
187 dialing_addrs: RwLock::new(HashMap::default()),
188 public_addrs,
189 listened_addrs: RwLock::new(Vec::new()),
190 observed_addrs: RwLock::new(HashMap::default()),
191 local_private_key,
192 local_peer_id,
193 active: AtomicBool::new(true),
194 protocols: RwLock::new(Vec::new()),
195 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
196 })
197 }
198
199 pub fn required_flags(mut self, flags: Flags) -> Self {
202 self.required_flags = flags;
203 self
204 }
205
206 pub(crate) fn report_session(
207 &self,
208 p2p_control: &ServiceControl,
209 session_id: SessionId,
210 behaviour: Behaviour,
211 ) {
212 if let Some(addr) = self.with_peer_registry(|reg| {
213 reg.get_peer(session_id)
214 .filter(|peer| !peer.is_whitelist)
215 .map(|peer| peer.connected_addr.clone())
216 }) {
217 trace!("Report {:?} because {:?}", addr, behaviour);
218 let report_result = self.peer_store.lock().report(&addr, behaviour);
219 if report_result.is_banned() {
220 if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
221 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
222 }
223 }
224 } else {
225 debug!(
226 "Report {} failure: not found in peer registry or it is on the whitelist",
227 session_id
228 );
229 }
230 }
231
232 pub(crate) fn ban_session(
233 &self,
234 p2p_control: &ServiceControl,
235 session_id: SessionId,
236 duration: Duration,
237 reason: String,
238 ) {
239 if let Some(addr) = self.with_peer_registry(|reg| {
240 reg.get_peer(session_id)
241 .filter(|peer| !peer.is_whitelist)
242 .map(|peer| peer.connected_addr.clone())
243 }) {
244 info!(
245 "Ban peer {:?} for {} seconds, reason: {}",
246 addr,
247 duration.as_secs(),
248 reason
249 );
250 if let Some(metrics) = ckb_metrics::handle() {
251 metrics.ckb_network_ban_peer.inc();
252 }
253 if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
254 let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
255 self.peer_store.lock().ban_addr(
256 &peer.connected_addr,
257 duration.as_millis() as u64,
258 reason,
259 );
260 if let Err(err) =
261 disconnect_with_message(p2p_control, peer.session_id, message.as_str())
262 {
263 debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
264 }
265 }
266 } else {
267 debug!(
268 "Ban session({}) failed: not found in peer registry or it is on the whitelist",
269 session_id
270 );
271 }
272 }
273
274 pub(crate) fn accept_peer(
275 &self,
276 session_context: &SessionContext,
277 ) -> Result<Option<Peer>, Error> {
278 let mut peer_store = self.peer_store.lock();
281 let accept_peer_result = {
282 self.peer_registry.write().accept_peer(
283 session_context.address.clone(),
284 session_context.id,
285 session_context.ty,
286 &mut peer_store,
287 )
288 };
289 accept_peer_result
290 }
291
292 pub fn with_peer_registry<F, T>(&self, callback: F) -> T
294 where
295 F: FnOnce(&PeerRegistry) -> T,
296 {
297 callback(&self.peer_registry.read())
298 }
299
300 pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
302 where
303 F: FnOnce(&mut PeerRegistry) -> T,
304 {
305 callback(&mut self.peer_registry.write())
306 }
307
308 pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
310 where
311 F: FnOnce(&mut PeerStore) -> T,
312 {
313 callback(&mut self.peer_store.lock())
314 }
315
316 pub fn local_peer_id(&self) -> &PeerId {
318 &self.local_peer_id
319 }
320
321 pub fn local_private_key(&self) -> &secio::SecioKeyPair {
323 &self.local_private_key
324 }
325
326 pub fn node_id(&self) -> String {
328 self.local_peer_id().to_base58()
329 }
330
331 pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
332 if self.public_addrs.len() <= count {
333 return self.public_addrs.iter().cloned().collect();
334 } else {
335 self.public_addrs
336 .iter()
337 .cloned()
338 .choose_multiple(&mut rand::thread_rng(), count)
339 }
340 }
341
342 pub(crate) fn connection_status(&self) -> ConnectionStatus {
343 self.peer_registry.read().connection_status()
344 }
345
346 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
348 let listened_addrs = self.listened_addrs.read();
349 self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
350 .into_iter()
351 .filter_map(|addr| {
352 if !listened_addrs.contains(&addr) {
353 Some((addr, 1))
354 } else {
355 None
356 }
357 })
358 .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
359 .map(|(addr, score)| (addr.to_string(), score))
360 .collect()
361 }
362
363 pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
364 self.dial_identify(p2p_control, address);
365 }
366
367 pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
369 self.protocols
370 .read()
371 .iter()
372 .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
373 .collect::<Vec<_>>()
374 }
375
376 pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
377 let peer_id = extract_peer_id(addr);
378 if peer_id.is_none() {
379 error!("Do not dial addr without peer id, addr: {}", addr);
380 return false;
381 }
382 let peer_id = peer_id.as_ref().unwrap();
383
384 if self.local_peer_id() == peer_id {
385 trace!("Do not dial self: {:?}, {}", peer_id, addr);
386 return false;
387 }
388 if self.public_addrs.contains(addr) {
389 trace!(
390 "Do not dial listened address(self): {:?}, {}",
391 peer_id, addr
392 );
393 return false;
394 }
395
396 let peer_in_registry = self.with_peer_registry(|reg| {
397 reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
398 });
399 if peer_in_registry {
400 trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
401 return false;
402 }
403
404 if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
405 trace!(
406 "Do not send repeated dial commands to network service: {:?}, {}",
407 peer_id, addr
408 );
409 if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
410 #[cfg(feature = "with_sentry")]
411 with_scope(
412 |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
413 || {
414 capture_message(
415 &format!(
416 "Dialing {:?}, {:?} for more than {} seconds, \
417 something is wrong in network service",
418 peer_id,
419 addr,
420 DIAL_HANG_TIMEOUT.as_secs(),
421 ),
422 Level::Warning,
423 )
424 },
425 );
426 }
427 return false;
428 }
429
430 true
431 }
432
433 pub(crate) fn dial_success(&self, addr: &Multiaddr) {
434 if let Some(peer_id) = extract_peer_id(addr) {
435 self.dialing_addrs.write().remove(&peer_id);
436 }
437 }
438
439 pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
440 self.with_peer_registry_mut(|reg| {
441 reg.remove_feeler(addr);
442 });
443
444 if let Some(peer_id) = extract_peer_id(addr) {
445 self.dialing_addrs.write().remove(&peer_id);
446 }
447 }
448
449 fn dial_inner(
452 &self,
453 p2p_control: &ServiceControl,
454 addr: Multiaddr,
455 target: TargetProtocol,
456 ) -> Result<(), Error> {
457 if !self.can_dial(&addr) {
458 return Err(Error::Dial(format!("ignore dialing addr {addr}")));
459 }
460
461 debug!("Dialing {addr}");
462 p2p_control.dial(addr.clone(), target)?;
463 self.dialing_addrs.write().insert(
464 extract_peer_id(&addr).expect("verified addr"),
465 Instant::now(),
466 );
467 Ok(())
468 }
469
470 pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
472 if let Err(err) = self.dial_inner(
473 p2p_control,
474 addr,
475 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
476 ) {
477 debug!("dial_identify error: {err}");
478 }
479 }
480
481 pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
483 if let Err(err) = self.dial_inner(
484 p2p_control,
485 addr.clone(),
486 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
487 ) {
488 debug!("dial_feeler error {err}");
489 } else {
490 self.with_peer_registry_mut(|reg| {
491 reg.add_feeler(&addr);
492 });
493 }
494 }
495
496 pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
498 let mut pending_observed_addrs = self.observed_addrs.write();
499 pending_observed_addrs.insert(session_id, addr);
500 }
501
502 pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
504 let observed_addrs = self
505 .observed_addrs
506 .read()
507 .values()
508 .cloned()
509 .collect::<HashSet<_>>();
510 if observed_addrs.len() <= count {
511 return observed_addrs.into_iter().collect();
512 } else {
513 observed_addrs
514 .into_iter()
515 .choose_multiple(&mut rand::thread_rng(), count)
516 }
517 }
518
519 pub fn is_active(&self) -> bool {
521 self.active.load(Ordering::Acquire)
522 }
523}
524
525pub struct EventHandler {
527 pub(crate) network_state: Arc<NetworkState>,
528}
529
530impl EventHandler {
531 pub fn new(network_state: Arc<NetworkState>) -> Self {
533 Self { network_state }
534 }
535}
536
537pub trait ExitHandler: Send + Unpin + 'static {
539 fn notify_exit(&self);
541}
542
543#[derive(Clone, Default)]
545pub struct DefaultExitHandler {
546 lock: Arc<Mutex<()>>,
547 exit: Arc<Condvar>,
548}
549
550impl DefaultExitHandler {
551 pub fn wait_for_exit(&self) {
553 self.exit.wait(&mut self.lock.lock());
554 }
555}
556
557impl ExitHandler for DefaultExitHandler {
558 fn notify_exit(&self) {
559 self.exit.notify_all();
560 }
561}
562
563impl EventHandler {
564 fn inbound_eviction(&self) -> Vec<PeerIndex> {
565 if self.network_state.config.bootnode_mode {
566 let status = self.network_state.connection_status();
567
568 if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
569 self.network_state
570 .with_peer_registry(|registry| {
571 registry
572 .peers()
573 .values()
574 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
575 .map(|peer| peer.session_id)
576 .collect::<Vec<SessionId>>()
577 })
578 .into_iter()
579 .enumerate()
580 .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
581 .collect()
582 } else {
583 Vec::new()
584 }
585 } else {
586 Vec::new()
587 }
588 }
589}
590
591#[async_trait]
592impl ServiceHandle for EventHandler {
593 async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
594 match error {
595 ServiceError::DialerError { address, error } => {
596 match error {
597 DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
598 SecioError::ConnectSelf,
599 )) => {
600 debug!("dial observed address success: {:?}", address);
601 }
602 DialerErrorKind::IoError(e)
603 if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
604 {
605 warn!("DialerError({}) {}", address, e);
606 }
607 _ => {
608 debug!("DialerError({}) {}", address, error);
609 }
610 }
611 self.network_state.dial_failed(&address);
612 }
613 ServiceError::ProtocolError {
614 id,
615 proto_id,
616 error,
617 } => {
618 debug!("ProtocolError({}, {}) {}", id, proto_id, error);
619 let message = format!("ProtocolError id={proto_id}");
620 self.network_state.ban_session(
622 &context.control().clone().into(),
623 id,
624 Duration::from_secs(300),
625 message,
626 );
627 }
628 ServiceError::SessionTimeout { session_context } => {
629 debug!(
630 "SessionTimeout({}, {})",
631 session_context.id, session_context.address,
632 );
633 }
634 ServiceError::MuxerError {
635 session_context,
636 error,
637 } => {
638 debug!(
639 "MuxerError({}, {}), substream error {}, disconnect it",
640 session_context.id, session_context.address, error,
641 );
642 }
643 ServiceError::ListenError { address, error } => {
644 debug!("ListenError: address={:?}, error={:?}", address, error);
645 }
646 ServiceError::ProtocolSelectError {
647 proto_name,
648 session_context,
649 } => {
650 debug!(
651 "ProtocolSelectError: proto_name={:?}, session_id={}",
652 proto_name, session_context.id,
653 );
654 }
655 ServiceError::SessionBlocked { session_context } => {
656 debug!("SessionBlocked: {}", session_context.id);
657 }
658 ServiceError::ProtocolHandleError { proto_id, error } => {
659 debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
660
661 let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
662 {
663 if let Some(id) = opt_session_id {
664 self.network_state.ban_session(
665 &context.control().clone().into(),
666 id,
667 Duration::from_secs(300),
668 format!("protocol {proto_id} panic when process peer message"),
669 );
670 }
671 #[cfg(feature = "with_sentry")]
672 with_scope(
673 |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
674 || {
675 capture_message(
676 &format!(
677 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
678 ),
679 Level::Warning,
680 )
681 },
682 );
683 error!(
684 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
685 );
686
687 broadcast_exit_signals();
688 }
689 }
690 }
691 }
692
693 async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
694 match event {
696 ServiceEvent::SessionOpen { session_context } => {
697 debug!(
698 "SessionOpen({}, {})",
699 session_context.id, session_context.address,
700 );
701
702 self.network_state.dial_success(&session_context.address);
703
704 let iter = self.inbound_eviction();
705
706 let control = context.control().clone().into();
707
708 for peer in iter {
709 if let Err(err) =
710 disconnect_with_message(&control, peer, "bootnode random eviction")
711 {
712 debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
713 }
714 }
715
716 if self
717 .network_state
718 .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
719 {
720 debug!(
721 "Feeler connected {} => {}",
722 session_context.id, session_context.address,
723 );
724 } else {
725 match self.network_state.accept_peer(&session_context) {
726 Ok(Some(evicted_peer)) => {
727 debug!(
728 "Disconnect peer, {} => {}",
729 evicted_peer.session_id, evicted_peer.connected_addr,
730 );
731 if let Err(err) = disconnect_with_message(
732 &control,
733 evicted_peer.session_id,
734 "evict because accepted better peer",
735 ) {
736 debug!(
737 "Disconnect failed {:?}, error: {:?}",
738 evicted_peer.session_id, err
739 );
740 }
741 }
742 Ok(None) => debug!(
743 "{} open, registry {} success",
744 session_context.id, session_context.address,
745 ),
746 Err(err) => {
747 debug!(
748 "Peer registry failed {:?}. Disconnect {} => {}",
749 err, session_context.id, session_context.address,
750 );
751 if let Err(err) = disconnect_with_message(
752 &control,
753 session_context.id,
754 "reject peer connection",
755 ) {
756 debug!(
757 "Disconnect failed {:?}, error: {:?}",
758 session_context.id, err
759 );
760 }
761 }
762 }
763 }
764 }
765 ServiceEvent::SessionClose { session_context } => {
766 debug!(
767 "SessionClose({}, {})",
768 session_context.id, session_context.address,
769 );
770 let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
771 reg.remove_feeler(&session_context.address);
773 reg.remove_peer(session_context.id).is_some()
774 });
775 if peer_exists {
776 debug!(
777 "{} closed. Remove {} from peer_registry",
778 session_context.id, session_context.address,
779 );
780 self.network_state.with_peer_store_mut(|peer_store| {
781 peer_store.remove_disconnected_peer(&session_context.address);
782 });
783 }
784 self.network_state
785 .observed_addrs
786 .write()
787 .remove(&session_context.id);
788 }
789 _ => {
790 info!("p2p service event: {:?}", event);
791 }
792 }
793 }
794}
795
796pub struct NetworkService {
798 p2p_service: Service<EventHandler, SecioKeyPair>,
799 network_state: Arc<NetworkState>,
800 ping_controller: Option<Sender<()>>,
801 bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
803 version: String,
804}
805
806impl NetworkService {
807 pub fn new(
809 network_state: Arc<NetworkState>,
810 protocols: Vec<CKBProtocol>,
811 required_protocol_ids: Vec<ProtocolId>,
812 identify_announce: (String, String, Flags),
814 transport_type: TransportType,
815 ) -> Self {
816 let config = &network_state.config;
817
818 if config.support_protocols.iter().collect::<HashSet<_>>()
819 != default_support_all_protocols()
820 .iter()
821 .collect::<HashSet<_>>()
822 {
823 warn!(
824 "Customized supported protocols: {:?}",
825 config.support_protocols
826 );
827 }
828
829 let mut protocol_metas = protocols
831 .into_iter()
832 .map(CKBProtocol::build)
833 .collect::<Vec<_>>();
834
835 let identify_callback = IdentifyCallback::new(
839 Arc::clone(&network_state),
840 identify_announce.0,
841 identify_announce.1.clone(),
842 identify_announce.2,
843 );
844 let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
845 ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
846 });
847 protocol_metas.push(identify_meta);
848
849 let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
851 let ping_interval = Duration::from_secs(config.ping_interval_secs);
852 let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
853
854 let ping_network_state = Arc::clone(&network_state);
855 let (ping_handler, ping_controller) =
856 PingHandler::new(ping_interval, ping_timeout, ping_network_state);
857 let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
858 ProtocolHandle::Callback(Box::new(ping_handler))
859 });
860 protocol_metas.push(ping_meta);
861 Some(ping_controller)
862 } else {
863 None
864 };
865
866 if config
868 .support_protocols
869 .contains(&SupportProtocol::Discovery)
870 {
871 let addr_mgr = DiscoveryAddressManager {
872 network_state: Arc::clone(&network_state),
873 discovery_local_address: config.discovery_local_address,
874 };
875 let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
876 ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
877 addr_mgr,
878 config
879 .discovery_announce_check_interval_secs
880 .map(Duration::from_secs),
881 )))
882 });
883 protocol_metas.push(disc_meta);
884 }
885
886 if config.support_protocols.contains(&SupportProtocol::Feeler) {
888 let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
889 let network_state = Arc::clone(&network_state);
890 move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
891 });
892 protocol_metas.push(feeler_meta);
893 }
894
895 if config
897 .support_protocols
898 .contains(&SupportProtocol::DisconnectMessage)
899 {
900 let disconnect_message_state = Arc::clone(&network_state);
901 let disconnect_message_meta = SupportProtocols::DisconnectMessage
902 .build_meta_with_service_handle(move || {
903 ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
904 disconnect_message_state,
905 )))
906 });
907 protocol_metas.push(disconnect_message_meta);
908 }
909
910 #[cfg(not(target_family = "wasm"))]
912 if config
913 .support_protocols
914 .contains(&SupportProtocol::HolePunching)
915 {
916 let hole_punching_state = Arc::clone(&network_state);
917 let hole_punching_meta =
918 SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
919 ProtocolHandle::Callback(Box::new(
920 crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
921 ))
922 });
923 protocol_metas.push(hole_punching_meta);
924 }
925
926 let mut service_builder = ServiceBuilder::default();
927 let yamux_config = YamuxConfig {
928 max_stream_count: protocol_metas.len(),
929 max_stream_window_size: 1024 * 1024,
930 ..Default::default()
931 };
932 for meta in protocol_metas.into_iter() {
933 network_state
934 .protocols
935 .write()
936 .push((meta.id(), meta.name(), meta.support_versions()));
937 service_builder = service_builder.insert_protocol(meta);
938 }
939 let event_handler = EventHandler {
940 network_state: Arc::clone(&network_state),
941 };
942 service_builder = service_builder
943 .handshake_type(network_state.local_private_key.clone().into())
944 .yamux_config(yamux_config)
945 .forever(true)
946 .max_connection_number(1024)
947 .set_send_buffer_size(config.max_send_buffer())
948 .set_channel_size(config.channel_size())
949 .timeout(Duration::from_secs(5));
950
951 #[cfg(not(target_family = "wasm"))]
952 {
953 service_builder = service_builder.upnp(config.upnp);
954 }
955
956 #[cfg(target_os = "linux")]
957 let p2p_service = {
958 if config.reuse_port_on_linux {
959 let iter = config.listen_addresses.iter();
960
961 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
962 enum BindType {
963 None,
964 Ws,
965 Tcp,
966 Both,
967 }
968 impl BindType {
969 fn transform(&mut self, other: TransportType) {
970 match (&self, other) {
971 (BindType::None, TransportType::Ws) => *self = BindType::Ws,
972 (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
973 (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
974 (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
975 _ => (),
976 }
977 }
978
979 fn is_ready(&self) -> bool {
980 matches!(self, BindType::Both)
982 }
983 }
984
985 let mut init = BindType::None;
986
987 let bind_fn_with_addr =
988 move |socket: p2p::service::TcpSocket,
989 ctxt: p2p::service::TransformerContext,
990 addr: std::net::SocketAddr| {
991 let socket_ref = socket2::SockRef::from(&socket);
992 #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
993 socket_ref.set_reuse_port(true)?;
994 socket_ref.set_reuse_address(true)?;
995 match ctxt.state {
996 p2p::service::SocketState::Listen => Ok(socket),
997 p2p::service::SocketState::Dial => {
998 let domain = socket2::Domain::for_address(addr);
999 if socket_ref.domain()? == domain {
1000 socket_ref.bind(&addr.into())?;
1001 }
1002 Ok(socket)
1003 }
1004 }
1005 };
1006
1007 for multi_addr in iter {
1008 if init.is_ready() {
1009 break;
1010 }
1011 match find_type(multi_addr) {
1012 TransportType::Tcp => {
1013 if matches!(init, BindType::Tcp) {
1015 continue;
1016 }
1017 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1018 let bind_fn = move |socket: p2p::service::TcpSocket,
1019 ctxt: p2p::service::TransformerContext| {
1020 bind_fn_with_addr(socket, ctxt, addr)
1021 };
1022 init.transform(TransportType::Tcp);
1023 service_builder = service_builder.tcp_config(bind_fn);
1024 }
1025 }
1026 TransportType::Ws | TransportType::Wss => {
1027 if matches!(init, BindType::Ws) {
1029 continue;
1030 }
1031 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1032 let bind_fn = move |socket: p2p::service::TcpSocket,
1033 ctxt: p2p::service::TransformerContext| {
1034 bind_fn_with_addr(socket, ctxt, addr)
1035 };
1036 init.transform(TransportType::Ws);
1037 service_builder = service_builder.tcp_config_on_ws(bind_fn);
1038 }
1039 }
1040 }
1041 }
1042 }
1043
1044 service_builder.build(event_handler)
1045 };
1046
1047 #[cfg(not(target_os = "linux"))]
1048 let p2p_service = service_builder.build(event_handler);
1055
1056 let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1058 let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1059 Arc::clone(&network_state),
1060 p2p_service.control().to_owned().into(),
1061 required_protocol_ids,
1062 );
1063 let mut bg_services = vec![
1064 Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1065 Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1066 ];
1067 if config.outbound_peer_service_enabled() {
1068 let outbound_peer_service = OutboundPeerService::new(
1069 Arc::clone(&network_state),
1070 p2p_service.control().to_owned().into(),
1071 Duration::from_secs(config.connect_outbound_interval_secs),
1072 transport_type,
1073 );
1074 bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1075 };
1076
1077 #[cfg(feature = "with_dns_seeding")]
1078 if config.dns_seeding_service_enabled() {
1079 let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1080 Arc::clone(&network_state),
1081 config.dns_seeds.clone(),
1082 );
1083 bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1084 };
1085
1086 NetworkService {
1087 p2p_service,
1088 network_state,
1089 ping_controller,
1090 bg_services,
1091 version: identify_announce.1,
1092 }
1093 }
1094
1095 pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1097 let config = self.network_state.config.clone();
1098
1099 let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1100
1101 for addr in self.network_state.config.whitelist_peers() {
1103 debug!("Dial whitelist_peers {:?}", addr);
1104 self.network_state.dial_identify(&p2p_control, addr);
1105 }
1106
1107 let target = &self.network_state.required_flags;
1108
1109 let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1112 let count = max((config.max_outbound_peers >> 1) as usize, 1);
1113 let mut addrs: Vec<_> = peer_store
1114 .fetch_addrs_to_attempt(count, *target, |_| true)
1115 .into_iter()
1116 .map(|paddr| paddr.addr)
1117 .collect();
1118 let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1120 addrs.extend(anchors);
1121 let bootnodes = self
1123 .network_state
1124 .bootnodes
1125 .iter()
1126 .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1127 .into_iter()
1128 .cloned();
1129 addrs.extend(bootnodes);
1130 addrs
1131 });
1132
1133 for addr in bootnodes {
1135 debug!("Dial bootnode {:?}", addr);
1136 self.network_state.dial_identify(&p2p_control, addr);
1137 }
1138
1139 let Self {
1140 mut p2p_service,
1141 network_state,
1142 ping_controller,
1143 bg_services,
1144 version,
1145 } = self;
1146
1147 let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1149 .into_iter()
1150 .map(|bg_service| {
1151 let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1152 (signal_sender, (bg_service, signal_receiver))
1153 })
1154 .unzip();
1155
1156 let receiver: CancellationToken = new_tokio_exit_rx();
1157 #[cfg(not(target_family = "wasm"))]
1158 let (start_sender, start_receiver) = mpsc::channel();
1159 {
1160 #[cfg(not(target_family = "wasm"))]
1161 let network_state = Arc::clone(&network_state);
1162 let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1163 handle.spawn_task(async move {
1164 #[cfg(not(target_family = "wasm"))]
1165 {
1166 let listen_addresses = {
1167 let mut addresses = config.listen_addresses.clone();
1168 if config.reuse_tcp_with_ws {
1169 let ws_listens = addresses
1170 .iter()
1171 .cloned()
1172 .filter_map(|mut addr| {
1173 if matches!(find_type(&addr), TransportType::Tcp) {
1174 addr.push(Protocol::Ws);
1175 Some(addr)
1176 } else {
1177 None
1178 }
1179 })
1180 .collect::<Vec<_>>();
1181
1182 addresses.extend(ws_listens);
1183 }
1184 let mut addresses = addresses
1185 .into_iter()
1186 .collect::<HashSet<_>>()
1187 .into_iter()
1188 .collect::<Vec<_>>();
1189 addresses.sort_by(|a, b| {
1190 let ty_a = find_type(a);
1191 let ty_b = find_type(b);
1192
1193 ty_a.cmp(&ty_b)
1194 });
1195
1196 addresses
1197 };
1198
1199 for addr in &listen_addresses {
1200 match p2p_service.listen(addr.to_owned()).await {
1201 Ok(listen_address) => {
1202 info!("Listen on address: {}", listen_address);
1203 network_state
1204 .listened_addrs
1205 .write()
1206 .push(listen_address.clone());
1207 }
1208 Err(err) => {
1209 warn!(
1210 "Listen on address {} failed, due to error: {}",
1211 addr.clone(),
1212 err
1213 );
1214 start_sender
1215 .send(Err(Error::P2P(P2PError::Transport(err))))
1216 .expect("channel abnormal shutdown");
1217 return;
1218 }
1219 };
1220 }
1221 start_sender.send(Ok(())).unwrap();
1222 }
1223
1224 p2p::runtime::spawn(async move { p2p_service.run().await });
1225 tokio::select! {
1226 _ = receiver.cancelled() => {
1227 info!("NetworkService receive exit signal, start shutdown...");
1228 let _ = p2p_control.shutdown().await;
1229 drop(bg_signals);
1231
1232 info!("NetworkService shutdown now");
1233 },
1234 else => {
1235 let _ = p2p_control.shutdown().await;
1236 drop(bg_signals);
1238 },
1239 }
1240 });
1241 }
1242 for (mut service, mut receiver) in bg_receivers {
1243 handle.spawn_task(async move {
1244 loop {
1245 tokio::select! {
1246 _ = &mut service => {},
1247 _ = &mut receiver => break
1248 }
1249 }
1250 });
1251 }
1252 #[cfg(not(target_family = "wasm"))]
1253 if let Ok(Err(e)) = start_receiver.recv() {
1254 return Err(e);
1255 }
1256
1257 Ok(NetworkController {
1258 version,
1259 network_state,
1260 p2p_control,
1261 ping_controller,
1262 })
1263 }
1264}
1265
1266#[derive(Clone)]
1268pub struct NetworkController {
1269 version: String,
1270 network_state: Arc<NetworkState>,
1271 p2p_control: ServiceControl,
1272 ping_controller: Option<Sender<()>>,
1273}
1274
1275impl NetworkController {
1276 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1278 self.network_state.public_urls(max_urls)
1279 }
1280
1281 pub fn version(&self) -> &String {
1283 &self.version
1284 }
1285
1286 pub fn node_id(&self) -> String {
1288 self.network_state.node_id()
1289 }
1290
1291 pub fn p2p_control(&self) -> &ServiceControl {
1293 &self.p2p_control
1294 }
1295
1296 pub fn add_node(&self, address: Multiaddr) {
1298 self.network_state.add_node(&self.p2p_control, address)
1299 }
1300
1301 pub fn remove_node(&self, peer_id: &PeerId) {
1303 if let Some(session_id) = self
1304 .network_state
1305 .peer_registry
1306 .read()
1307 .get_key_by_peer_id(peer_id)
1308 {
1309 if let Err(err) =
1310 disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1311 {
1312 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1313 }
1314 } else {
1315 error!("Cannot find peer {:?}", peer_id);
1316 }
1317 }
1318
1319 pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1321 self.network_state
1322 .peer_store
1323 .lock()
1324 .ban_list()
1325 .get_banned_addrs()
1326 }
1327
1328 pub fn clear_banned_addrs(&self) {
1330 self.network_state.peer_store.lock().clear_ban_list();
1331 }
1332
1333 pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1335 self.network_state
1336 .peer_store
1337 .lock()
1338 .addr_manager()
1339 .get(addr)
1340 .cloned()
1341 }
1342
1343 pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1345 self.disconnect_peers_in_ip_range(address, &ban_reason);
1346 self.network_state
1347 .peer_store
1348 .lock()
1349 .ban_network(address, ban_until, ban_reason)
1350 }
1351
1352 pub fn unban(&self, address: &IpNetwork) {
1354 self.network_state
1355 .peer_store
1356 .lock()
1357 .mut_ban_list()
1358 .unban_network(address);
1359 }
1360
1361 pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1363 self.network_state.with_peer_registry(|reg| {
1364 reg.peers()
1365 .iter()
1366 .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1367 .collect::<Vec<_>>()
1368 })
1369 }
1370
1371 pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1373 self.network_state
1374 .ban_session(&self.p2p_control, peer_index, duration, reason);
1375 }
1376
1377 fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1379 self.network_state.with_peer_registry(|reg| {
1380 reg.peers().iter().for_each(|(peer_index, peer)| {
1381 if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1382 if address.contains(addr.ip()) {
1383 let _ = disconnect_with_message(
1384 &self.p2p_control,
1385 *peer_index,
1386 &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1387 );
1388 }
1389 }
1390 })
1391 });
1392 }
1393
1394 fn try_broadcast(
1395 &self,
1396 quick: bool,
1397 target: Option<SessionId>,
1398 proto_id: ProtocolId,
1399 data: Bytes,
1400 ) -> Result<(), SendErrorKind> {
1401 let now = Instant::now();
1402 loop {
1403 let target = target
1404 .map(TargetSession::Single)
1405 .unwrap_or(TargetSession::All);
1406 let result = if quick {
1407 self.p2p_control
1408 .quick_filter_broadcast(target, proto_id, data.clone())
1409 } else {
1410 self.p2p_control
1411 .filter_broadcast(target, proto_id, data.clone())
1412 };
1413 match result {
1414 Ok(()) => {
1415 return Ok(());
1416 }
1417 Err(SendErrorKind::WouldBlock) => {
1418 if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1419 warn!("Broadcast message to {} timeout", proto_id);
1420 return Err(SendErrorKind::WouldBlock);
1421 }
1422 thread::sleep(P2P_TRY_SEND_INTERVAL);
1423 }
1424 Err(err) => {
1425 warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1426 return Err(err);
1427 }
1428 }
1429 }
1430 }
1431
1432 pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1434 self.try_broadcast(false, None, proto_id, data)
1435 }
1436
1437 pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1439 self.try_broadcast(true, None, proto_id, data)
1440 }
1441
1442 pub fn send_message_to(
1444 &self,
1445 session_id: SessionId,
1446 proto_id: ProtocolId,
1447 data: Bytes,
1448 ) -> Result<(), SendErrorKind> {
1449 self.try_broadcast(false, Some(session_id), proto_id, data)
1450 }
1451
1452 pub fn is_active(&self) -> bool {
1454 self.network_state.is_active()
1455 }
1456
1457 pub fn set_active(&self, active: bool) {
1459 self.network_state.active.store(active, Ordering::Release);
1460 }
1461
1462 pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1464 self.network_state.protocols.read().clone()
1465 }
1466
1467 pub fn ping_peers(&self) {
1469 if let Some(mut ping_controller) = self.ping_controller.clone() {
1470 let _ignore = ping_controller.try_send(());
1471 }
1472 }
1473}
1474
1475pub(crate) fn disconnect_with_message(
1477 control: &ServiceControl,
1478 peer_index: SessionId,
1479 message: &str,
1480) -> Result<(), SendErrorKind> {
1481 if !message.is_empty() {
1482 let data = Bytes::from(message.as_bytes().to_vec());
1483 control.quick_send_message_to(
1485 peer_index,
1486 SupportProtocols::DisconnectMessage.protocol_id(),
1487 data,
1488 )?;
1489 }
1490 control.disconnect(peer_index)
1491}
1492
1493pub(crate) async fn async_disconnect_with_message(
1494 control: &ServiceAsyncControl,
1495 peer_index: SessionId,
1496 message: &str,
1497) -> Result<(), SendErrorKind> {
1498 if !message.is_empty() {
1499 let data = Bytes::from(message.as_bytes().to_vec());
1500 control
1502 .quick_send_message_to(
1503 peer_index,
1504 SupportProtocols::DisconnectMessage.protocol_id(),
1505 data,
1506 )
1507 .await?;
1508 }
1509 control.disconnect(peer_index).await
1510}
1511
1512#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1514pub enum TransportType {
1515 Tcp,
1517 Ws,
1519 Wss,
1521}
1522
1523#[allow(dead_code)]
1524pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1525 let mut iter = addr.iter();
1526
1527 iter.find_map(|proto| match proto {
1528 Protocol::Ws => Some(TransportType::Ws),
1529 Protocol::Wss => Some(TransportType::Wss),
1530 _ => None,
1531 })
1532 .unwrap_or(TransportType::Tcp)
1533}