1use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7 PeerStore,
8 types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11 disconnect_message::DisconnectMessageProtocol,
12 discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13 feeler::Feeler,
14 identify::{Flags, IdentifyCallback, IdentifyProtocol},
15 ping::PingHandler,
16 support_protocols::SupportProtocols,
17};
18use crate::services::{
19 dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20 protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{Future, channel::mpsc::Sender};
30use ipnetwork::IpNetwork;
31use p2p::{
32 SessionId, async_trait,
33 builder::ServiceBuilder,
34 bytes::Bytes,
35 context::{ServiceContext, SessionContext},
36 error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37 multiaddr::{Multiaddr, Protocol},
38 secio::{self, PeerId, SecioKeyPair, error::SecioError},
39 service::{
40 ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41 TargetSession,
42 },
43 traits::ServiceHandle,
44 utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45 yamux::config::Config as YamuxConfig,
46};
47use rand::prelude::IteratorRandom;
48#[cfg(feature = "with_sentry")]
49use sentry::{Level, capture_message, with_scope};
50#[cfg(not(target_family = "wasm"))]
51use std::sync::mpsc;
52use std::{
53 borrow::Cow,
54 cmp::max,
55 collections::{HashMap, HashSet},
56 pin::Pin,
57 sync::{
58 Arc,
59 atomic::{AtomicBool, Ordering},
60 },
61 thread,
62};
63use tokio::{self, sync::oneshot};
64
65const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
66const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
67const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
69
70pub struct NetworkState {
72 pub(crate) peer_registry: RwLock<PeerRegistry>,
73 pub(crate) peer_store: Mutex<PeerStore>,
74 pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
76 dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77 public_addrs: HashSet<Multiaddr>,
79 observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
80 local_private_key: secio::SecioKeyPair,
81 local_peer_id: PeerId,
82 pub(crate) bootnodes: Vec<Multiaddr>,
83 pub(crate) config: NetworkConfig,
84 pub(crate) active: AtomicBool,
85 pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
88 pub(crate) required_flags: Flags,
89}
90
91impl NetworkState {
92 #[cfg(not(target_family = "wasm"))]
94 pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
95 config.create_dir_if_not_exists()?;
96 let local_private_key = config.fetch_private_key()?;
97 let local_peer_id = local_private_key.peer_id();
98 let public_addrs: HashSet<Multiaddr> = config
100 .listen_addresses
101 .iter()
102 .chain(config.public_addresses.iter())
103 .cloned()
104 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
105 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
106 _ => {
107 match extract_peer_id(&addr) {
108 Some(peer_id) if peer_id != local_peer_id => {
109 error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
110 std::process::exit(1);
111 }
112 Some(_) => (),
113 None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
114 }
115 Some(addr)
116 }
117 })
118 .collect();
119 info!("Loading the peer store. This process may take a few seconds to complete.");
120
121 let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
122 config.peer_store_path(),
123 ));
124 let bootnodes = config.bootnodes();
125
126 let peer_registry = PeerRegistry::new(
127 config.max_inbound_peers(),
128 config.max_outbound_peers(),
129 config.whitelist_only,
130 config.whitelist_peers(),
131 config.disable_block_relay_only_connection,
132 );
133
134 Ok(NetworkState {
135 peer_store,
136 config,
137 bootnodes,
138 peer_registry: RwLock::new(peer_registry),
139 dialing_addrs: RwLock::new(HashMap::default()),
140 public_addrs,
141 listened_addrs: RwLock::new(Vec::new()),
142 observed_addrs: RwLock::new(HashMap::default()),
143 local_private_key,
144 local_peer_id,
145 active: AtomicBool::new(true),
146 protocols: RwLock::new(Vec::new()),
147 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
148 })
149 }
150
151 #[cfg(target_family = "wasm")]
152 pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
153 let local_private_key = config.fetch_private_key()?;
154 let local_peer_id = local_private_key.peer_id();
155 let public_addrs: HashSet<Multiaddr> = config
157 .listen_addresses
158 .iter()
159 .chain(config.public_addresses.iter())
160 .cloned()
161 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
162 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
163 _ => {
164 if extract_peer_id(&addr).is_none() {
165 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
166 }
167 Some(addr)
168 }
169 })
170 .collect();
171 info!("Loading the peer store. This process may take a few seconds to complete.");
172 let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
173 let bootnodes = config.bootnodes();
174
175 let peer_registry = PeerRegistry::new(
176 config.max_inbound_peers(),
177 config.max_outbound_peers(),
178 config.whitelist_only,
179 config.whitelist_peers(),
180 config.disable_block_relay_only_connection,
181 );
182 Ok(NetworkState {
183 peer_store,
184 config,
185 bootnodes,
186 peer_registry: RwLock::new(peer_registry),
187 dialing_addrs: RwLock::new(HashMap::default()),
188 public_addrs,
189 listened_addrs: RwLock::new(Vec::new()),
190 observed_addrs: RwLock::new(HashMap::default()),
191 local_private_key,
192 local_peer_id,
193 active: AtomicBool::new(true),
194 protocols: RwLock::new(Vec::new()),
195 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
196 })
197 }
198
199 pub fn required_flags(mut self, flags: Flags) -> Self {
202 self.required_flags = flags;
203 self
204 }
205
206 pub(crate) fn report_session(
207 &self,
208 p2p_control: &ServiceControl,
209 session_id: SessionId,
210 behaviour: Behaviour,
211 ) {
212 if let Some(addr) = self.with_peer_registry(|reg| {
213 reg.get_peer(session_id)
214 .filter(|peer| !peer.is_whitelist)
215 .map(|peer| peer.connected_addr.clone())
216 }) {
217 trace!("Report {:?} because {:?}", addr, behaviour);
218 let report_result = self.peer_store.lock().report(&addr, behaviour);
219 if report_result.is_banned() {
220 if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
221 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
222 }
223 }
224 } else {
225 debug!(
226 "Report {} failure: not found in peer registry or it is on the whitelist",
227 session_id
228 );
229 }
230 }
231
232 pub(crate) fn ban_session(
233 &self,
234 p2p_control: &ServiceControl,
235 session_id: SessionId,
236 duration: Duration,
237 reason: String,
238 ) {
239 if let Some(addr) = self.with_peer_registry(|reg| {
240 reg.get_peer(session_id)
241 .filter(|peer| !peer.is_whitelist)
242 .map(|peer| peer.connected_addr.clone())
243 }) {
244 info!(
245 "Ban peer {:?} for {} seconds, reason: {}",
246 addr,
247 duration.as_secs(),
248 reason
249 );
250 if let Some(metrics) = ckb_metrics::handle() {
251 metrics.ckb_network_ban_peer.inc();
252 }
253 if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
254 let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
255 self.peer_store.lock().ban_addr(
256 &peer.connected_addr,
257 duration.as_millis() as u64,
258 reason,
259 );
260 if let Err(err) =
261 disconnect_with_message(p2p_control, peer.session_id, message.as_str())
262 {
263 debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
264 }
265 }
266 } else {
267 debug!(
268 "Ban session({}) failed: not found in peer registry or it is on the whitelist",
269 session_id
270 );
271 }
272 }
273
274 pub(crate) fn accept_peer(
275 &self,
276 session_context: &SessionContext,
277 ) -> Result<Option<Peer>, Error> {
278 let mut peer_store = self.peer_store.lock();
281 let accept_peer_result = {
282 self.peer_registry.write().accept_peer(
283 session_context.address.clone(),
284 session_context.id,
285 session_context.ty,
286 &mut peer_store,
287 )
288 };
289 accept_peer_result
290 }
291
292 pub fn with_peer_registry<F, T>(&self, callback: F) -> T
294 where
295 F: FnOnce(&PeerRegistry) -> T,
296 {
297 callback(&self.peer_registry.read())
298 }
299
300 pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
302 where
303 F: FnOnce(&mut PeerRegistry) -> T,
304 {
305 callback(&mut self.peer_registry.write())
306 }
307
308 pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
310 where
311 F: FnOnce(&mut PeerStore) -> T,
312 {
313 callback(&mut self.peer_store.lock())
314 }
315
316 pub fn local_peer_id(&self) -> &PeerId {
318 &self.local_peer_id
319 }
320
321 pub fn local_private_key(&self) -> &secio::SecioKeyPair {
323 &self.local_private_key
324 }
325
326 pub fn node_id(&self) -> String {
328 self.local_peer_id().to_base58()
329 }
330
331 pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
332 if self.public_addrs.len() <= count {
333 return self.public_addrs.iter().cloned().collect();
334 } else {
335 self.public_addrs
336 .iter()
337 .cloned()
338 .choose_multiple(&mut rand::thread_rng(), count)
339 }
340 }
341
342 pub(crate) fn connection_status(&self) -> ConnectionStatus {
343 self.peer_registry.read().connection_status()
344 }
345
346 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
348 let listened_addrs = self.listened_addrs.read();
349 self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
350 .into_iter()
351 .filter_map(|addr| {
352 if !listened_addrs.contains(&addr) {
353 Some((addr, 1))
354 } else {
355 None
356 }
357 })
358 .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
359 .map(|(addr, score)| (addr.to_string(), score))
360 .collect()
361 }
362
363 pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
364 self.dial_identify(p2p_control, address);
365 }
366
367 pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
369 self.protocols
370 .read()
371 .iter()
372 .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
373 .collect::<Vec<_>>()
374 }
375
376 pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
377 let peer_id = extract_peer_id(addr);
378 if peer_id.is_none() {
379 error!("Do not dial addr without peer id, addr: {}", addr);
380 return false;
381 }
382 let peer_id = peer_id.as_ref().unwrap();
383
384 if self.local_peer_id() == peer_id {
385 trace!("Do not dial self: {:?}, {}", peer_id, addr);
386 return false;
387 }
388 if self.public_addrs.contains(addr) {
389 trace!(
390 "Do not dial listened address(self): {:?}, {}",
391 peer_id, addr
392 );
393 return false;
394 }
395
396 let peer_in_registry = self.with_peer_registry(|reg| {
397 reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
398 });
399 if peer_in_registry {
400 trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
401 return false;
402 }
403
404 if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
405 trace!(
406 "Do not send repeated dial commands to network service: {:?}, {}",
407 peer_id, addr
408 );
409 if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
410 #[cfg(feature = "with_sentry")]
411 with_scope(
412 |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
413 || {
414 capture_message(
415 &format!(
416 "Dialing {:?}, {:?} for more than {} seconds, \
417 something is wrong in network service",
418 peer_id,
419 addr,
420 DIAL_HANG_TIMEOUT.as_secs(),
421 ),
422 Level::Warning,
423 )
424 },
425 );
426 }
427 return false;
428 }
429
430 true
431 }
432
433 pub(crate) fn dial_success(&self, addr: &Multiaddr) {
434 if let Some(peer_id) = extract_peer_id(addr) {
435 self.dialing_addrs.write().remove(&peer_id);
436 }
437 }
438
439 pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
440 self.with_peer_registry_mut(|reg| {
441 reg.remove_feeler(addr);
442 });
443
444 if let Some(peer_id) = extract_peer_id(addr) {
445 self.dialing_addrs.write().remove(&peer_id);
446 }
447 }
448
449 fn dial_inner(
452 &self,
453 p2p_control: &ServiceControl,
454 addr: Multiaddr,
455 target: TargetProtocol,
456 ) -> Result<(), Error> {
457 if !self.can_dial(&addr) {
458 return Err(Error::Dial(format!("ignore dialing addr {addr}")));
459 }
460
461 debug!("Dialing {addr}");
462 p2p_control.dial(addr.clone(), target)?;
463 self.dialing_addrs.write().insert(
464 extract_peer_id(&addr).expect("verified addr"),
465 Instant::now(),
466 );
467 Ok(())
468 }
469
470 pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
472 if let Err(err) = self.dial_inner(
473 p2p_control,
474 addr,
475 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
476 ) {
477 debug!("dial_identify error: {err}");
478 }
479 }
480
481 pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
483 if let Err(err) = self.dial_inner(
484 p2p_control,
485 addr.clone(),
486 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
487 ) {
488 debug!("dial_feeler error {err}");
489 } else {
490 self.with_peer_registry_mut(|reg| {
491 reg.add_feeler(&addr);
492 });
493 }
494 }
495
496 pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
498 let mut pending_observed_addrs = self.observed_addrs.write();
499 pending_observed_addrs.insert(session_id, addr);
500 }
501
502 pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
504 let observed_addrs = self
505 .observed_addrs
506 .read()
507 .values()
508 .cloned()
509 .collect::<HashSet<_>>();
510 if observed_addrs.len() <= count {
511 return observed_addrs.into_iter().collect();
512 } else {
513 observed_addrs
514 .into_iter()
515 .choose_multiple(&mut rand::thread_rng(), count)
516 }
517 }
518
519 pub fn is_active(&self) -> bool {
521 self.active.load(Ordering::Acquire)
522 }
523}
524
525pub struct EventHandler {
527 pub(crate) network_state: Arc<NetworkState>,
528}
529
530impl EventHandler {
531 pub fn new(network_state: Arc<NetworkState>) -> Self {
533 Self { network_state }
534 }
535}
536
537pub trait ExitHandler: Send + Unpin + 'static {
539 fn notify_exit(&self);
541}
542
543#[derive(Clone, Default)]
545pub struct DefaultExitHandler {
546 lock: Arc<Mutex<()>>,
547 exit: Arc<Condvar>,
548}
549
550impl DefaultExitHandler {
551 pub fn wait_for_exit(&self) {
553 self.exit.wait(&mut self.lock.lock());
554 }
555}
556
557impl ExitHandler for DefaultExitHandler {
558 fn notify_exit(&self) {
559 self.exit.notify_all();
560 }
561}
562
563impl EventHandler {
564 fn inbound_eviction(&self) -> Vec<PeerIndex> {
565 if self.network_state.config.bootnode_mode {
566 let status = self.network_state.connection_status();
567
568 if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
569 self.network_state
570 .with_peer_registry(|registry| {
571 registry
572 .peers()
573 .values()
574 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
575 .map(|peer| peer.session_id)
576 .collect::<Vec<SessionId>>()
577 })
578 .into_iter()
579 .enumerate()
580 .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
581 .collect()
582 } else {
583 Vec::new()
584 }
585 } else {
586 Vec::new()
587 }
588 }
589}
590
591#[async_trait]
592impl ServiceHandle for EventHandler {
593 async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
594 match error {
595 ServiceError::DialerError { address, error } => {
596 match error {
597 DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
598 SecioError::ConnectSelf,
599 )) => {
600 debug!("dial observed address success: {:?}", address);
601 }
602 DialerErrorKind::IoError(e)
603 if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
604 {
605 warn!("DialerError({}) {}", address, e);
606 }
607 _ => {
608 debug!("DialerError({}) {}", address, error);
609 }
610 }
611 self.network_state.dial_failed(&address);
612 }
613 ServiceError::ProtocolError {
614 id,
615 proto_id,
616 error,
617 } => {
618 debug!("ProtocolError({}, {}) {}", id, proto_id, error);
619 let message = format!("ProtocolError id={proto_id}");
620 self.network_state.ban_session(
622 &context.control().clone().into(),
623 id,
624 Duration::from_secs(300),
625 message,
626 );
627 }
628 ServiceError::SessionTimeout { session_context } => {
629 debug!(
630 "SessionTimeout({}, {})",
631 session_context.id, session_context.address,
632 );
633 }
634 ServiceError::MuxerError {
635 session_context,
636 error,
637 } => {
638 debug!(
639 "MuxerError({}, {}), substream error {}, disconnect it",
640 session_context.id, session_context.address, error,
641 );
642 }
643 ServiceError::ListenError { address, error } => {
644 debug!("ListenError: address={:?}, error={:?}", address, error);
645 }
646 ServiceError::ProtocolSelectError {
647 proto_name,
648 session_context,
649 } => {
650 debug!(
651 "ProtocolSelectError: proto_name={:?}, session_id={}",
652 proto_name, session_context.id,
653 );
654 }
655 ServiceError::SessionBlocked { session_context } => {
656 debug!("SessionBlocked: {}", session_context.id);
657 }
658 ServiceError::ProtocolHandleError { proto_id, error } => {
659 debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
660
661 let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
662 {
663 if let Some(id) = opt_session_id {
664 self.network_state.ban_session(
665 &context.control().clone().into(),
666 id,
667 Duration::from_secs(300),
668 format!("protocol {proto_id} panic when process peer message"),
669 );
670 }
671 #[cfg(feature = "with_sentry")]
672 with_scope(
673 |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
674 || {
675 capture_message(
676 &format!(
677 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
678 ),
679 Level::Warning,
680 )
681 },
682 );
683 error!(
684 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
685 );
686
687 broadcast_exit_signals();
688 }
689 }
690 }
691 }
692
693 async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
694 match event {
696 ServiceEvent::SessionOpen { session_context } => {
697 debug!(
698 "SessionOpen({}, {})",
699 session_context.id, session_context.address,
700 );
701
702 self.network_state.dial_success(&session_context.address);
703
704 let iter = self.inbound_eviction();
705
706 let control = context.control().clone().into();
707
708 for peer in iter {
709 if let Err(err) =
710 disconnect_with_message(&control, peer, "bootnode random eviction")
711 {
712 debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
713 }
714 }
715
716 if self
717 .network_state
718 .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
719 {
720 debug!(
721 "Feeler connected {} => {}",
722 session_context.id, session_context.address,
723 );
724 } else {
725 match self.network_state.accept_peer(&session_context) {
726 Ok(Some(evicted_peer)) => {
727 debug!(
728 "Disconnect peer, {} => {}",
729 evicted_peer.session_id, evicted_peer.connected_addr,
730 );
731 if let Err(err) = disconnect_with_message(
732 &control,
733 evicted_peer.session_id,
734 "evict because accepted better peer",
735 ) {
736 debug!(
737 "Disconnect failed {:?}, error: {:?}",
738 evicted_peer.session_id, err
739 );
740 }
741 }
742 Ok(None) => debug!(
743 "{} open, registry {} success",
744 session_context.id, session_context.address,
745 ),
746 Err(err) => {
747 debug!(
748 "Peer registry failed {:?}. Disconnect {} => {}",
749 err, session_context.id, session_context.address,
750 );
751 if let Err(err) = disconnect_with_message(
752 &control,
753 session_context.id,
754 "reject peer connection",
755 ) {
756 debug!(
757 "Disconnect failed {:?}, error: {:?}",
758 session_context.id, err
759 );
760 }
761 }
762 }
763 }
764 }
765 ServiceEvent::SessionClose { session_context } => {
766 debug!(
767 "SessionClose({}, {})",
768 session_context.id, session_context.address,
769 );
770 let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
771 reg.remove_feeler(&session_context.address);
773 reg.remove_peer(session_context.id).is_some()
774 });
775 if peer_exists {
776 debug!(
777 "{} closed. Remove {} from peer_registry",
778 session_context.id, session_context.address,
779 );
780 self.network_state.with_peer_store_mut(|peer_store| {
781 peer_store.remove_disconnected_peer(&session_context.address);
782 });
783 }
784 self.network_state
785 .observed_addrs
786 .write()
787 .remove(&session_context.id);
788 }
789 _ => {
790 info!("p2p service event: {:?}", event);
791 }
792 }
793 }
794}
795
796pub struct NetworkService {
798 p2p_service: Service<EventHandler, SecioKeyPair>,
799 network_state: Arc<NetworkState>,
800 ping_controller: Option<Sender<()>>,
801 bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
803 version: String,
804}
805
806impl NetworkService {
807 pub fn new(
809 network_state: Arc<NetworkState>,
810 protocols: Vec<CKBProtocol>,
811 required_protocol_ids: Vec<ProtocolId>,
812 identify_announce: (String, String, Flags),
814 transport_type: TransportType,
815 ) -> Self {
816 let config = &network_state.config;
817
818 if config.support_protocols.iter().collect::<HashSet<_>>()
819 != default_support_all_protocols()
820 .iter()
821 .collect::<HashSet<_>>()
822 {
823 warn!(
824 "Customized supported protocols: {:?}",
825 config.support_protocols
826 );
827 }
828
829 let mut protocol_metas = protocols
831 .into_iter()
832 .map(CKBProtocol::build)
833 .collect::<Vec<_>>();
834
835 let identify_callback = IdentifyCallback::new(
839 Arc::clone(&network_state),
840 identify_announce.0,
841 identify_announce.1.clone(),
842 identify_announce.2,
843 );
844 let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
845 ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
846 });
847 protocol_metas.push(identify_meta);
848
849 let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
851 let ping_interval = Duration::from_secs(config.ping_interval_secs);
852 let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
853
854 let ping_network_state = Arc::clone(&network_state);
855 let (ping_handler, ping_controller) =
856 PingHandler::new(ping_interval, ping_timeout, ping_network_state);
857 let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
858 ProtocolHandle::Callback(Box::new(ping_handler))
859 });
860 protocol_metas.push(ping_meta);
861 Some(ping_controller)
862 } else {
863 None
864 };
865
866 if config
868 .support_protocols
869 .contains(&SupportProtocol::Discovery)
870 {
871 let addr_mgr = DiscoveryAddressManager {
872 network_state: Arc::clone(&network_state),
873 discovery_local_address: config.discovery_local_address,
874 };
875 let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
876 ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
877 addr_mgr,
878 config
879 .discovery_announce_check_interval_secs
880 .map(Duration::from_secs),
881 )))
882 });
883 protocol_metas.push(disc_meta);
884 }
885
886 if config.support_protocols.contains(&SupportProtocol::Feeler) {
888 let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
889 let network_state = Arc::clone(&network_state);
890 move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
891 });
892 protocol_metas.push(feeler_meta);
893 }
894
895 if config
897 .support_protocols
898 .contains(&SupportProtocol::DisconnectMessage)
899 {
900 let disconnect_message_state = Arc::clone(&network_state);
901 let disconnect_message_meta = SupportProtocols::DisconnectMessage
902 .build_meta_with_service_handle(move || {
903 ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
904 disconnect_message_state,
905 )))
906 });
907 protocol_metas.push(disconnect_message_meta);
908 }
909
910 #[cfg(not(target_family = "wasm"))]
912 if config
913 .support_protocols
914 .contains(&SupportProtocol::HolePunching)
915 {
916 let hole_punching_state = Arc::clone(&network_state);
917 let hole_punching_meta =
918 SupportProtocols::HolePunching.build_meta_with_service_handle(move || {
919 ProtocolHandle::Callback(Box::new(
920 crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
921 ))
922 });
923 protocol_metas.push(hole_punching_meta);
924 }
925
926 let mut service_builder = ServiceBuilder::default();
927 let yamux_config = YamuxConfig {
928 max_stream_count: protocol_metas.len(),
929 max_stream_window_size: 1024 * 1024,
930 ..Default::default()
931 };
932 for meta in protocol_metas.into_iter() {
933 network_state
934 .protocols
935 .write()
936 .push((meta.id(), meta.name(), meta.support_versions()));
937 service_builder = service_builder.insert_protocol(meta);
938 }
939 let event_handler = EventHandler {
940 network_state: Arc::clone(&network_state),
941 };
942 service_builder = service_builder
943 .handshake_type(network_state.local_private_key.clone().into())
944 .yamux_config(yamux_config)
945 .forever(true)
946 .max_connection_number(1024)
947 .set_send_buffer_size(config.max_send_buffer())
948 .set_channel_size(config.channel_size())
949 .timeout(Duration::from_secs(5));
950
951 #[cfg(not(target_family = "wasm"))]
952 {
953 service_builder = service_builder.upnp(config.upnp);
954 }
955
956 #[cfg(target_os = "linux")]
957 let p2p_service = {
958 if config.reuse_port_on_linux {
959 let iter = config.listen_addresses.iter();
960
961 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
962 enum BindType {
963 None,
964 Ws,
965 Tcp,
966 Both,
967 }
968 impl BindType {
969 fn transform(&mut self, other: TransportType) {
970 match (&self, other) {
971 (BindType::None, TransportType::Ws) => *self = BindType::Ws,
972 (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
973 (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
974 (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
975 _ => (),
976 }
977 }
978
979 fn is_ready(&self) -> bool {
980 matches!(self, BindType::Both)
982 }
983 }
984
985 let mut init = BindType::None;
986
987 for multi_addr in iter {
988 if init.is_ready() {
989 break;
990 }
991 match find_type(multi_addr) {
992 TransportType::Tcp => {
993 if matches!(init, BindType::Tcp) {
995 continue;
996 }
997 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
998 let domain = socket2::Domain::for_address(addr);
999 let bind_fn = move |socket: p2p::service::TcpSocket, ctxt: p2p::service::TransformerContext| {
1000 let socket_ref = socket2::SockRef::from(&socket);
1001 #[cfg(all(
1002 unix,
1003 not(target_os = "solaris"),
1004 not(target_os = "illumos")
1005 ))]
1006 socket_ref.set_reuse_port(true)?;
1007 socket_ref.set_reuse_address(true)?;
1008 match ctxt.state {
1009 p2p::service::SocketState::Listen => {
1010 Ok(socket)
1011 }
1012 p2p::service::SocketState::Dial => {
1013 if socket_ref.domain()? == domain {
1014 socket_ref.bind(&addr.into())?;
1015 }
1016 Ok(socket)
1017 }
1018 }
1019 };
1020 init.transform(TransportType::Tcp);
1021 service_builder = service_builder.tcp_config(bind_fn);
1022 }
1023 }
1024 TransportType::Ws | TransportType::Wss => {
1025 if matches!(init, BindType::Ws) {
1027 continue;
1028 }
1029 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1030 let domain = socket2::Domain::for_address(addr);
1031 let bind_fn = move |socket: p2p::service::TcpSocket, ctxt: p2p::service::TransformerContext| {
1032 let socket_ref = socket2::SockRef::from(&socket);
1033 #[cfg(all(
1034 unix,
1035 not(target_os = "solaris"),
1036 not(target_os = "illumos")
1037 ))]
1038 socket_ref.set_reuse_port(true)?;
1039 socket_ref.set_reuse_address(true)?;
1040 match ctxt.state {
1041 p2p::service::SocketState::Listen => {
1042 Ok(socket)
1043 }
1044 p2p::service::SocketState::Dial => {
1045 if socket_ref.domain()? == domain {
1046 socket_ref.bind(&addr.into())?;
1047 }
1048 Ok(socket)
1049 }
1050 }
1051 };
1052 init.transform(TransportType::Ws);
1053 service_builder = service_builder.tcp_config_on_ws(bind_fn);
1054 }
1055 }
1056 }
1057 }
1058 }
1059
1060 service_builder.build(event_handler)
1061 };
1062
1063 #[cfg(not(target_os = "linux"))]
1064 let p2p_service = service_builder.build(event_handler);
1071
1072 let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1074 let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1075 Arc::clone(&network_state),
1076 p2p_service.control().to_owned().into(),
1077 required_protocol_ids,
1078 );
1079 let mut bg_services = vec![
1080 Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1081 Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1082 ];
1083 if config.outbound_peer_service_enabled() {
1084 let outbound_peer_service = OutboundPeerService::new(
1085 Arc::clone(&network_state),
1086 p2p_service.control().to_owned().into(),
1087 Duration::from_secs(config.connect_outbound_interval_secs),
1088 transport_type,
1089 );
1090 bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1091 };
1092
1093 #[cfg(feature = "with_dns_seeding")]
1094 if config.dns_seeding_service_enabled() {
1095 let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1096 Arc::clone(&network_state),
1097 config.dns_seeds.clone(),
1098 );
1099 bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1100 };
1101
1102 NetworkService {
1103 p2p_service,
1104 network_state,
1105 ping_controller,
1106 bg_services,
1107 version: identify_announce.1,
1108 }
1109 }
1110
1111 pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1113 let config = self.network_state.config.clone();
1114
1115 let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1116
1117 for addr in self.network_state.config.whitelist_peers() {
1119 debug!("Dial whitelist_peers {:?}", addr);
1120 self.network_state.dial_identify(&p2p_control, addr);
1121 }
1122
1123 let target = &self.network_state.required_flags;
1124
1125 let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1128 let count = max((config.max_outbound_peers >> 1) as usize, 1);
1129 let mut addrs: Vec<_> = peer_store
1130 .fetch_addrs_to_attempt(count, *target, |_| true)
1131 .into_iter()
1132 .map(|paddr| paddr.addr)
1133 .collect();
1134 let anchors: Vec<_> = peer_store.mut_anchors().drain().collect();
1136 addrs.extend(anchors);
1137 let bootnodes = self
1139 .network_state
1140 .bootnodes
1141 .iter()
1142 .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1143 .into_iter()
1144 .cloned();
1145 addrs.extend(bootnodes);
1146 addrs
1147 });
1148
1149 for addr in bootnodes {
1151 debug!("Dial bootnode {:?}", addr);
1152 self.network_state.dial_identify(&p2p_control, addr);
1153 }
1154
1155 let Self {
1156 mut p2p_service,
1157 network_state,
1158 ping_controller,
1159 bg_services,
1160 version,
1161 } = self;
1162
1163 let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1165 .into_iter()
1166 .map(|bg_service| {
1167 let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1168 (signal_sender, (bg_service, signal_receiver))
1169 })
1170 .unzip();
1171
1172 let receiver: CancellationToken = new_tokio_exit_rx();
1173 #[cfg(not(target_family = "wasm"))]
1174 let (start_sender, start_receiver) = mpsc::channel();
1175 {
1176 #[cfg(not(target_family = "wasm"))]
1177 let network_state = Arc::clone(&network_state);
1178 let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1179 handle.spawn_task(async move {
1180 #[cfg(not(target_family = "wasm"))]
1181 {
1182 let listen_addresses = {
1183 let mut addresses = config.listen_addresses.clone();
1184 if config.reuse_tcp_with_ws {
1185 let ws_listens = addresses
1186 .iter()
1187 .cloned()
1188 .filter_map(|mut addr| {
1189 if matches!(find_type(&addr), TransportType::Tcp) {
1190 addr.push(Protocol::Ws);
1191 Some(addr)
1192 } else {
1193 None
1194 }
1195 })
1196 .collect::<Vec<_>>();
1197
1198 addresses.extend(ws_listens);
1199 }
1200 let mut addresses = addresses
1201 .into_iter()
1202 .collect::<HashSet<_>>()
1203 .into_iter()
1204 .collect::<Vec<_>>();
1205 addresses.sort_by(|a, b| {
1206 let ty_a = find_type(a);
1207 let ty_b = find_type(b);
1208
1209 ty_a.cmp(&ty_b)
1210 });
1211
1212 addresses
1213 };
1214
1215 for addr in &listen_addresses {
1216 match p2p_service.listen(addr.to_owned()).await {
1217 Ok(listen_address) => {
1218 info!("Listen on address: {}", listen_address);
1219 network_state
1220 .listened_addrs
1221 .write()
1222 .push(listen_address.clone());
1223 }
1224 Err(err) => {
1225 warn!(
1226 "Listen on address {} failed, due to error: {}",
1227 addr.clone(),
1228 err
1229 );
1230 start_sender
1231 .send(Err(Error::P2P(P2PError::Transport(err))))
1232 .expect("channel abnormal shutdown");
1233 return;
1234 }
1235 };
1236 }
1237 start_sender.send(Ok(())).unwrap();
1238 }
1239
1240 p2p::runtime::spawn(async move { p2p_service.run().await });
1241 tokio::select! {
1242 _ = receiver.cancelled() => {
1243 info!("NetworkService receive exit signal, start shutdown...");
1244 let _ = p2p_control.shutdown().await;
1245 drop(bg_signals);
1247
1248 info!("NetworkService shutdown now");
1249 },
1250 else => {
1251 let _ = p2p_control.shutdown().await;
1252 drop(bg_signals);
1254 },
1255 }
1256 });
1257 }
1258 for (mut service, mut receiver) in bg_receivers {
1259 handle.spawn_task(async move {
1260 loop {
1261 tokio::select! {
1262 _ = &mut service => {},
1263 _ = &mut receiver => break
1264 }
1265 }
1266 });
1267 }
1268 #[cfg(not(target_family = "wasm"))]
1269 if let Ok(Err(e)) = start_receiver.recv() {
1270 return Err(e);
1271 }
1272
1273 Ok(NetworkController {
1274 version,
1275 network_state,
1276 p2p_control,
1277 ping_controller,
1278 })
1279 }
1280}
1281
1282#[derive(Clone)]
1284pub struct NetworkController {
1285 version: String,
1286 network_state: Arc<NetworkState>,
1287 p2p_control: ServiceControl,
1288 ping_controller: Option<Sender<()>>,
1289}
1290
1291impl NetworkController {
1292 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1294 self.network_state.public_urls(max_urls)
1295 }
1296
1297 pub fn version(&self) -> &String {
1299 &self.version
1300 }
1301
1302 pub fn node_id(&self) -> String {
1304 self.network_state.node_id()
1305 }
1306
1307 pub fn p2p_control(&self) -> &ServiceControl {
1309 &self.p2p_control
1310 }
1311
1312 pub fn add_node(&self, address: Multiaddr) {
1314 self.network_state.add_node(&self.p2p_control, address)
1315 }
1316
1317 pub fn remove_node(&self, peer_id: &PeerId) {
1319 if let Some(session_id) = self
1320 .network_state
1321 .peer_registry
1322 .read()
1323 .get_key_by_peer_id(peer_id)
1324 {
1325 if let Err(err) =
1326 disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1327 {
1328 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1329 }
1330 } else {
1331 error!("Cannot find peer {:?}", peer_id);
1332 }
1333 }
1334
1335 pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1337 self.network_state
1338 .peer_store
1339 .lock()
1340 .ban_list()
1341 .get_banned_addrs()
1342 }
1343
1344 pub fn clear_banned_addrs(&self) {
1346 self.network_state.peer_store.lock().clear_ban_list();
1347 }
1348
1349 pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1351 self.network_state
1352 .peer_store
1353 .lock()
1354 .addr_manager()
1355 .get(addr)
1356 .cloned()
1357 }
1358
1359 pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1361 self.disconnect_peers_in_ip_range(address, &ban_reason);
1362 self.network_state
1363 .peer_store
1364 .lock()
1365 .ban_network(address, ban_until, ban_reason)
1366 }
1367
1368 pub fn unban(&self, address: &IpNetwork) {
1370 self.network_state
1371 .peer_store
1372 .lock()
1373 .mut_ban_list()
1374 .unban_network(address);
1375 }
1376
1377 pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1379 self.network_state.with_peer_registry(|reg| {
1380 reg.peers()
1381 .iter()
1382 .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1383 .collect::<Vec<_>>()
1384 })
1385 }
1386
1387 pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1389 self.network_state
1390 .ban_session(&self.p2p_control, peer_index, duration, reason);
1391 }
1392
1393 fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1395 self.network_state.with_peer_registry(|reg| {
1396 reg.peers().iter().for_each(|(peer_index, peer)| {
1397 if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1398 if address.contains(addr.ip()) {
1399 let _ = disconnect_with_message(
1400 &self.p2p_control,
1401 *peer_index,
1402 &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1403 );
1404 }
1405 }
1406 })
1407 });
1408 }
1409
1410 fn try_broadcast(
1411 &self,
1412 quick: bool,
1413 target: Option<SessionId>,
1414 proto_id: ProtocolId,
1415 data: Bytes,
1416 ) -> Result<(), SendErrorKind> {
1417 let now = Instant::now();
1418 loop {
1419 let target = target
1420 .map(TargetSession::Single)
1421 .unwrap_or(TargetSession::All);
1422 let result = if quick {
1423 self.p2p_control
1424 .quick_filter_broadcast(target, proto_id, data.clone())
1425 } else {
1426 self.p2p_control
1427 .filter_broadcast(target, proto_id, data.clone())
1428 };
1429 match result {
1430 Ok(()) => {
1431 return Ok(());
1432 }
1433 Err(SendErrorKind::WouldBlock) => {
1434 if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1435 warn!("Broadcast message to {} timeout", proto_id);
1436 return Err(SendErrorKind::WouldBlock);
1437 }
1438 thread::sleep(P2P_TRY_SEND_INTERVAL);
1439 }
1440 Err(err) => {
1441 warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1442 return Err(err);
1443 }
1444 }
1445 }
1446 }
1447
1448 pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1450 self.try_broadcast(false, None, proto_id, data)
1451 }
1452
1453 pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1455 self.try_broadcast(true, None, proto_id, data)
1456 }
1457
1458 pub fn send_message_to(
1460 &self,
1461 session_id: SessionId,
1462 proto_id: ProtocolId,
1463 data: Bytes,
1464 ) -> Result<(), SendErrorKind> {
1465 self.try_broadcast(false, Some(session_id), proto_id, data)
1466 }
1467
1468 pub fn is_active(&self) -> bool {
1470 self.network_state.is_active()
1471 }
1472
1473 pub fn set_active(&self, active: bool) {
1475 self.network_state.active.store(active, Ordering::Release);
1476 }
1477
1478 pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1480 self.network_state.protocols.read().clone()
1481 }
1482
1483 pub fn ping_peers(&self) {
1485 if let Some(mut ping_controller) = self.ping_controller.clone() {
1486 let _ignore = ping_controller.try_send(());
1487 }
1488 }
1489}
1490
1491pub(crate) fn disconnect_with_message(
1493 control: &ServiceControl,
1494 peer_index: SessionId,
1495 message: &str,
1496) -> Result<(), SendErrorKind> {
1497 if !message.is_empty() {
1498 let data = Bytes::from(message.as_bytes().to_vec());
1499 control.quick_send_message_to(
1501 peer_index,
1502 SupportProtocols::DisconnectMessage.protocol_id(),
1503 data,
1504 )?;
1505 }
1506 control.disconnect(peer_index)
1507}
1508
1509pub(crate) async fn async_disconnect_with_message(
1510 control: &ServiceAsyncControl,
1511 peer_index: SessionId,
1512 message: &str,
1513) -> Result<(), SendErrorKind> {
1514 if !message.is_empty() {
1515 let data = Bytes::from(message.as_bytes().to_vec());
1516 control
1518 .quick_send_message_to(
1519 peer_index,
1520 SupportProtocols::DisconnectMessage.protocol_id(),
1521 data,
1522 )
1523 .await?;
1524 }
1525 control.disconnect(peer_index).await
1526}
1527
1528#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1530pub enum TransportType {
1531 Tcp,
1533 Ws,
1535 Wss,
1537}
1538
1539#[allow(dead_code)]
1540pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1541 let mut iter = addr.iter();
1542
1543 iter.find_map(|proto| match proto {
1544 Protocol::Ws => Some(TransportType::Ws),
1545 Protocol::Wss => Some(TransportType::Wss),
1546 _ => None,
1547 })
1548 .unwrap_or(TransportType::Tcp)
1549}