1use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7 types::{AddrInfo, BannedAddr},
8 PeerStore,
9};
10use crate::protocols::{
11 disconnect_message::DisconnectMessageProtocol,
12 discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13 feeler::Feeler,
14 identify::{Flags, IdentifyCallback, IdentifyProtocol},
15 ping::PingHandler,
16 support_protocols::SupportProtocols,
17};
18use crate::services::{
19 dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20 protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{default_support_all_protocols, NetworkConfig, SupportProtocol};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{channel::mpsc::Sender, Future};
30use ipnetwork::IpNetwork;
31use p2p::{
32 async_trait,
33 builder::ServiceBuilder,
34 bytes::Bytes,
35 context::{ServiceContext, SessionContext},
36 error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37 multiaddr::{Multiaddr, Protocol},
38 secio::{self, error::SecioError, PeerId, SecioKeyPair},
39 service::{
40 ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41 TargetSession,
42 },
43 traits::ServiceHandle,
44 utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45 yamux::config::Config as YamuxConfig,
46 SessionId,
47};
48use rand::prelude::IteratorRandom;
49#[cfg(feature = "with_sentry")]
50use sentry::{capture_message, with_scope, Level};
51#[cfg(not(target_family = "wasm"))]
52use std::sync::mpsc;
53use std::{
54 borrow::Cow,
55 cmp::max,
56 collections::{HashMap, HashSet},
57 pin::Pin,
58 sync::{
59 atomic::{AtomicBool, Ordering},
60 Arc,
61 },
62 thread,
63};
64use tokio::{self, sync::oneshot};
65
66const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
67const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
68const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
70
71pub struct NetworkState {
73 pub(crate) peer_registry: RwLock<PeerRegistry>,
74 pub(crate) peer_store: Mutex<PeerStore>,
75 pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
77 dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
78 public_addrs: RwLock<HashSet<Multiaddr>>,
81 pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
82 local_private_key: secio::SecioKeyPair,
83 local_peer_id: PeerId,
84 pub(crate) bootnodes: Vec<Multiaddr>,
85 pub(crate) config: NetworkConfig,
86 pub(crate) active: AtomicBool,
87 pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
90 pub(crate) required_flags: Flags,
91
92 pub(crate) ckb2023: AtomicBool,
93}
94
95impl NetworkState {
96 #[cfg(not(target_family = "wasm"))]
98 pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
99 config.create_dir_if_not_exists()?;
100 let local_private_key = config.fetch_private_key()?;
101 let local_peer_id = local_private_key.peer_id();
102 let public_addrs: HashSet<Multiaddr> = config
104 .listen_addresses
105 .iter()
106 .chain(config.public_addresses.iter())
107 .cloned()
108 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
109 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
110 _ => {
111 if extract_peer_id(&addr).is_none() {
112 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
113 }
114 Some(addr)
115 }
116 })
117 .collect();
118 info!("Loading the peer store. This process may take a few seconds to complete.");
119
120 let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
121 config.peer_store_path(),
122 ));
123 let bootnodes = config.bootnodes();
124
125 let peer_registry = PeerRegistry::new(
126 config.max_inbound_peers(),
127 config.max_outbound_peers(),
128 config.whitelist_only,
129 config.whitelist_peers(),
130 );
131
132 Ok(NetworkState {
133 peer_store,
134 config,
135 bootnodes,
136 peer_registry: RwLock::new(peer_registry),
137 dialing_addrs: RwLock::new(HashMap::default()),
138 public_addrs: RwLock::new(public_addrs),
139 listened_addrs: RwLock::new(Vec::new()),
140 pending_observed_addrs: RwLock::new(HashSet::default()),
141 local_private_key,
142 local_peer_id,
143 active: AtomicBool::new(true),
144 protocols: RwLock::new(Vec::new()),
145 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
146 ckb2023: AtomicBool::new(false),
147 })
148 }
149
150 #[cfg(target_family = "wasm")]
151 pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
152 let local_private_key = config.fetch_private_key()?;
153 let local_peer_id = local_private_key.peer_id();
154 let public_addrs: HashSet<Multiaddr> = config
156 .listen_addresses
157 .iter()
158 .chain(config.public_addresses.iter())
159 .cloned()
160 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
161 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
162 _ => {
163 if extract_peer_id(&addr).is_none() {
164 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
165 }
166 Some(addr)
167 }
168 })
169 .collect();
170 info!("Loading the peer store. This process may take a few seconds to complete.");
171 let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
172 let bootnodes = config.bootnodes();
173
174 let peer_registry = PeerRegistry::new(
175 config.max_inbound_peers(),
176 config.max_outbound_peers(),
177 config.whitelist_only,
178 config.whitelist_peers(),
179 );
180 Ok(NetworkState {
181 peer_store,
182 config,
183 bootnodes,
184 peer_registry: RwLock::new(peer_registry),
185 dialing_addrs: RwLock::new(HashMap::default()),
186 public_addrs: RwLock::new(public_addrs),
187 listened_addrs: RwLock::new(Vec::new()),
188 pending_observed_addrs: RwLock::new(HashSet::default()),
189 local_private_key,
190 local_peer_id,
191 active: AtomicBool::new(true),
192 protocols: RwLock::new(Vec::new()),
193 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
194 ckb2023: AtomicBool::new(false),
195 })
196 }
197
198 pub fn ckb2023(self, init: bool) -> Self {
200 self.ckb2023.store(init, Ordering::SeqCst);
201 self
202 }
203
204 pub fn required_flags(mut self, flags: Flags) -> Self {
207 self.required_flags = flags;
208 self
209 }
210
211 pub(crate) fn report_session(
212 &self,
213 p2p_control: &ServiceControl,
214 session_id: SessionId,
215 behaviour: Behaviour,
216 ) {
217 if let Some(addr) = self.with_peer_registry(|reg| {
218 reg.get_peer(session_id)
219 .filter(|peer| !peer.is_whitelist)
220 .map(|peer| peer.connected_addr.clone())
221 }) {
222 trace!("Report {:?} because {:?}", addr, behaviour);
223 let report_result = self.peer_store.lock().report(&addr, behaviour);
224 if report_result.is_banned() {
225 if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
226 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
227 }
228 }
229 } else {
230 debug!(
231 "Report {} failure: not found in peer registry or it is on the whitelist",
232 session_id
233 );
234 }
235 }
236
237 pub(crate) fn ban_session(
238 &self,
239 p2p_control: &ServiceControl,
240 session_id: SessionId,
241 duration: Duration,
242 reason: String,
243 ) {
244 if let Some(addr) = self.with_peer_registry(|reg| {
245 reg.get_peer(session_id)
246 .filter(|peer| !peer.is_whitelist)
247 .map(|peer| peer.connected_addr.clone())
248 }) {
249 info!(
250 "Ban peer {:?} for {} seconds, reason: {}",
251 addr,
252 duration.as_secs(),
253 reason
254 );
255 if let Some(metrics) = ckb_metrics::handle() {
256 metrics.ckb_network_ban_peer.inc();
257 }
258 if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
259 let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
260 self.peer_store.lock().ban_addr(
261 &peer.connected_addr,
262 duration.as_millis() as u64,
263 reason,
264 );
265 if let Err(err) =
266 disconnect_with_message(p2p_control, peer.session_id, message.as_str())
267 {
268 debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
269 }
270 }
271 } else {
272 debug!(
273 "Ban session({}) failed: not found in peer registry or it is on the whitelist",
274 session_id
275 );
276 }
277 }
278
279 pub(crate) fn accept_peer(
280 &self,
281 session_context: &SessionContext,
282 ) -> Result<Option<Peer>, Error> {
283 let mut peer_store = self.peer_store.lock();
286 let accept_peer_result = {
287 self.peer_registry.write().accept_peer(
288 session_context.address.clone(),
289 session_context.id,
290 session_context.ty,
291 &mut peer_store,
292 )
293 };
294 accept_peer_result.map_err(Into::into)
295 }
296
297 pub fn with_peer_registry<F, T>(&self, callback: F) -> T
299 where
300 F: FnOnce(&PeerRegistry) -> T,
301 {
302 callback(&self.peer_registry.read())
303 }
304
305 pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
307 where
308 F: FnOnce(&mut PeerRegistry) -> T,
309 {
310 callback(&mut self.peer_registry.write())
311 }
312
313 pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
315 where
316 F: FnOnce(&mut PeerStore) -> T,
317 {
318 callback(&mut self.peer_store.lock())
319 }
320
321 pub fn local_peer_id(&self) -> &PeerId {
323 &self.local_peer_id
324 }
325
326 pub fn local_private_key(&self) -> &secio::SecioKeyPair {
328 &self.local_private_key
329 }
330
331 pub fn node_id(&self) -> String {
333 self.local_peer_id().to_base58()
334 }
335
336 pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
337 self.public_addrs
338 .read()
339 .iter()
340 .take(count)
341 .cloned()
342 .collect()
343 }
344
345 pub(crate) fn connection_status(&self) -> ConnectionStatus {
346 self.peer_registry.read().connection_status()
347 }
348
349 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
351 let listened_addrs = self.listened_addrs.read();
352 self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
353 .into_iter()
354 .filter_map(|addr| {
355 if !listened_addrs.contains(&addr) {
356 Some((addr, 1))
357 } else {
358 None
359 }
360 })
361 .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
362 .map(|(addr, score)| (addr.to_string(), score))
363 .collect()
364 }
365
366 pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
367 self.dial_identify(p2p_control, address);
368 }
369
370 pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
372 self.protocols
373 .read()
374 .iter()
375 .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
376 .collect::<Vec<_>>()
377 }
378
379 pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
380 let peer_id = extract_peer_id(addr);
381 if peer_id.is_none() {
382 error!("Do not dial addr without peer id, addr: {}", addr);
383 return false;
384 }
385 let peer_id = peer_id.as_ref().unwrap();
386
387 if self.local_peer_id() == peer_id {
388 trace!("Do not dial self: {:?}, {}", peer_id, addr);
389 return false;
390 }
391 if self.public_addrs.read().contains(addr) {
392 trace!(
393 "Do not dial listened address(self): {:?}, {}",
394 peer_id,
395 addr
396 );
397 return false;
398 }
399
400 let peer_in_registry = self.with_peer_registry(|reg| {
401 reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
402 });
403 if peer_in_registry {
404 trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
405 return false;
406 }
407
408 if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
409 trace!(
410 "Do not send repeated dial commands to network service: {:?}, {}",
411 peer_id,
412 addr
413 );
414 if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
415 #[cfg(feature = "with_sentry")]
416 with_scope(
417 |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
418 || {
419 capture_message(
420 &format!(
421 "Dialing {:?}, {:?} for more than {} seconds, \
422 something is wrong in network service",
423 peer_id,
424 addr,
425 DIAL_HANG_TIMEOUT.as_secs(),
426 ),
427 Level::Warning,
428 )
429 },
430 );
431 }
432 return false;
433 }
434
435 true
436 }
437
438 pub(crate) fn dial_success(&self, addr: &Multiaddr) {
439 if let Some(peer_id) = extract_peer_id(addr) {
440 self.dialing_addrs.write().remove(&peer_id);
441 }
442 }
443
444 pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
445 self.with_peer_registry_mut(|reg| {
446 reg.remove_feeler(addr);
447 });
448
449 if let Some(peer_id) = extract_peer_id(addr) {
450 self.dialing_addrs.write().remove(&peer_id);
451 }
452 }
453
454 fn dial_inner(
457 &self,
458 p2p_control: &ServiceControl,
459 addr: Multiaddr,
460 target: TargetProtocol,
461 ) -> Result<(), Error> {
462 if !self.can_dial(&addr) {
463 return Err(Error::Dial(format!("ignore dialing addr {addr}")));
464 }
465
466 debug!("Dialing {addr}");
467 p2p_control.dial(addr.clone(), target)?;
468 self.dialing_addrs.write().insert(
469 extract_peer_id(&addr).expect("verified addr"),
470 Instant::now(),
471 );
472 Ok(())
473 }
474
475 pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
477 if let Err(err) = self.dial_inner(
478 p2p_control,
479 addr,
480 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
481 ) {
482 debug!("dial_identify error: {err}");
483 }
484 }
485
486 pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
488 if let Err(err) = self.dial_inner(
489 p2p_control,
490 addr.clone(),
491 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
492 ) {
493 debug!("dial_feeler error {err}");
494 } else {
495 self.with_peer_registry_mut(|reg| {
496 reg.add_feeler(&addr);
497 });
498 }
499 }
500
501 pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
503 let mut pending_observed_addrs = self.pending_observed_addrs.write();
504 if pending_observed_addrs.is_empty() {
505 let addrs = self.public_addrs.read();
506 if addrs.is_empty() {
507 return;
508 }
509 if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
511 if let Err(err) = p2p_control.dial(
512 addr.clone(),
513 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
514 ) {
515 trace!("try_dial_observed_addrs {err} failed in public address")
516 }
517 }
518 } else {
519 for addr in pending_observed_addrs.drain() {
520 trace!("try dial observed addr: {:?}", addr);
521 if let Err(err) = p2p_control.dial(
522 addr,
523 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
524 ) {
525 trace!("try_dial_observed_addrs {err} failed in pending observed addresses")
526 }
527 }
528 }
529 }
530
531 pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
533 let mut pending_observed_addrs = self.pending_observed_addrs.write();
534 pending_observed_addrs.extend(iter)
535 }
536
537 pub fn is_active(&self) -> bool {
539 self.active.load(Ordering::Acquire)
540 }
541}
542
543pub struct EventHandler {
545 pub(crate) network_state: Arc<NetworkState>,
546}
547
548impl EventHandler {
549 pub fn new(network_state: Arc<NetworkState>) -> Self {
551 Self { network_state }
552 }
553}
554
555pub trait ExitHandler: Send + Unpin + 'static {
557 fn notify_exit(&self);
559}
560
561#[derive(Clone, Default)]
563pub struct DefaultExitHandler {
564 lock: Arc<Mutex<()>>,
565 exit: Arc<Condvar>,
566}
567
568impl DefaultExitHandler {
569 pub fn wait_for_exit(&self) {
571 self.exit.wait(&mut self.lock.lock());
572 }
573}
574
575impl ExitHandler for DefaultExitHandler {
576 fn notify_exit(&self) {
577 self.exit.notify_all();
578 }
579}
580
581impl EventHandler {
582 fn inbound_eviction(&self) -> Vec<PeerIndex> {
583 if self.network_state.config.bootnode_mode {
584 let status = self.network_state.connection_status();
585
586 if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
587 self.network_state
588 .with_peer_registry(|registry| {
589 registry
590 .peers()
591 .values()
592 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
593 .map(|peer| peer.session_id)
594 .collect::<Vec<SessionId>>()
595 })
596 .into_iter()
597 .enumerate()
598 .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
599 .collect()
600 } else {
601 Vec::new()
602 }
603 } else {
604 Vec::new()
605 }
606 }
607}
608
609#[async_trait]
610impl ServiceHandle for EventHandler {
611 async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
612 match error {
613 ServiceError::DialerError { address, error } => {
614 let mut public_addrs = self.network_state.public_addrs.write();
615
616 match error {
617 DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
618 SecioError::ConnectSelf,
619 )) => {
620 debug!("dial observed address success: {:?}", address);
621 if let Some(ip) = multiaddr_to_socketaddr(&address) {
622 if is_reachable(ip.ip()) {
623 public_addrs.insert(address);
624 }
625 }
626 return;
627 }
628 DialerErrorKind::IoError(e)
629 if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
630 {
631 warn!("DialerError({}) {}", address, e);
632 }
633 _ => {
634 debug!("DialerError({}) {}", address, error);
635 }
636 }
637 if public_addrs.remove(&address) {
638 info!(
639 "Dial {} failed, remove it from network_state.public_addrs",
640 address
641 );
642 }
643 self.network_state.dial_failed(&address);
644 }
645 ServiceError::ProtocolError {
646 id,
647 proto_id,
648 error,
649 } => {
650 debug!("ProtocolError({}, {}) {}", id, proto_id, error);
651 let message = format!("ProtocolError id={proto_id}");
652 self.network_state.ban_session(
654 &context.control().clone().into(),
655 id,
656 Duration::from_secs(300),
657 message,
658 );
659 }
660 ServiceError::SessionTimeout { session_context } => {
661 debug!(
662 "SessionTimeout({}, {})",
663 session_context.id, session_context.address,
664 );
665 }
666 ServiceError::MuxerError {
667 session_context,
668 error,
669 } => {
670 debug!(
671 "MuxerError({}, {}), substream error {}, disconnect it",
672 session_context.id, session_context.address, error,
673 );
674 }
675 ServiceError::ListenError { address, error } => {
676 debug!("ListenError: address={:?}, error={:?}", address, error);
677 }
678 ServiceError::ProtocolSelectError {
679 proto_name,
680 session_context,
681 } => {
682 debug!(
683 "ProtocolSelectError: proto_name={:?}, session_id={}",
684 proto_name, session_context.id,
685 );
686 }
687 ServiceError::SessionBlocked { session_context } => {
688 debug!("SessionBlocked: {}", session_context.id);
689 }
690 ServiceError::ProtocolHandleError { proto_id, error } => {
691 debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
692
693 let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
694 {
695 if let Some(id) = opt_session_id {
696 self.network_state.ban_session(
697 &context.control().clone().into(),
698 id,
699 Duration::from_secs(300),
700 format!("protocol {proto_id} panic when process peer message"),
701 );
702 }
703 #[cfg(feature = "with_sentry")]
704 with_scope(
705 |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
706 || {
707 capture_message(
708 &format!("ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"),
709 Level::Warning,
710 )
711 },
712 );
713 error!("ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}");
714
715 broadcast_exit_signals();
716 }
717 }
718 }
719 }
720
721 async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
722 match event {
724 ServiceEvent::SessionOpen { session_context } => {
725 debug!(
726 "SessionOpen({}, {})",
727 session_context.id, session_context.address,
728 );
729
730 self.network_state.dial_success(&session_context.address);
731
732 let iter = self.inbound_eviction();
733
734 let control = context.control().clone().into();
735
736 for peer in iter {
737 if let Err(err) =
738 disconnect_with_message(&control, peer, "bootnode random eviction")
739 {
740 debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
741 }
742 }
743
744 if self
745 .network_state
746 .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
747 {
748 debug!(
749 "Feeler connected {} => {}",
750 session_context.id, session_context.address,
751 );
752 } else {
753 match self.network_state.accept_peer(&session_context) {
754 Ok(Some(evicted_peer)) => {
755 debug!(
756 "Disconnect peer, {} => {}",
757 evicted_peer.session_id, evicted_peer.connected_addr,
758 );
759 if let Err(err) = disconnect_with_message(
760 &control,
761 evicted_peer.session_id,
762 "evict because accepted better peer",
763 ) {
764 debug!(
765 "Disconnect failed {:?}, error: {:?}",
766 evicted_peer.session_id, err
767 );
768 }
769 }
770 Ok(None) => debug!(
771 "{} open, registry {} success",
772 session_context.id, session_context.address,
773 ),
774 Err(err) => {
775 debug!(
776 "Peer registry failed {:?}. Disconnect {} => {}",
777 err, session_context.id, session_context.address,
778 );
779 if let Err(err) = disconnect_with_message(
780 &control,
781 session_context.id,
782 "reject peer connection",
783 ) {
784 debug!(
785 "Disconnect failed {:?}, error: {:?}",
786 session_context.id, err
787 );
788 }
789 }
790 }
791 }
792 }
793 ServiceEvent::SessionClose { session_context } => {
794 debug!(
795 "SessionClose({}, {})",
796 session_context.id, session_context.address,
797 );
798 let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
799 reg.remove_feeler(&session_context.address);
801 reg.remove_peer(session_context.id).is_some()
802 });
803 if peer_exists {
804 debug!(
805 "{} closed. Remove {} from peer_registry",
806 session_context.id, session_context.address,
807 );
808 self.network_state.with_peer_store_mut(|peer_store| {
809 peer_store.remove_disconnected_peer(&session_context.address);
810 });
811 }
812 }
813 _ => {
814 info!("p2p service event: {:?}", event);
815 }
816 }
817 }
818}
819
820pub struct NetworkService {
822 p2p_service: Service<EventHandler, SecioKeyPair>,
823 network_state: Arc<NetworkState>,
824 ping_controller: Option<Sender<()>>,
825 bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
827 version: String,
828}
829
830impl NetworkService {
831 pub fn new(
833 network_state: Arc<NetworkState>,
834 protocols: Vec<CKBProtocol>,
835 required_protocol_ids: Vec<ProtocolId>,
836 identify_announce: (String, String, Flags),
838 transport_type: TransportType,
839 ) -> Self {
840 let config = &network_state.config;
841
842 if config.support_protocols.iter().collect::<HashSet<_>>()
843 != default_support_all_protocols()
844 .iter()
845 .collect::<HashSet<_>>()
846 {
847 warn!(
848 "Customized supported protocols: {:?}",
849 config.support_protocols
850 );
851 }
852
853 let mut protocol_metas = protocols
855 .into_iter()
856 .map(CKBProtocol::build)
857 .collect::<Vec<_>>();
858
859 let identify_callback = IdentifyCallback::new(
863 Arc::clone(&network_state),
864 identify_announce.0,
865 identify_announce.1.clone(),
866 identify_announce.2,
867 );
868 let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
869 ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
870 });
871 protocol_metas.push(identify_meta);
872
873 let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
875 let ping_interval = Duration::from_secs(config.ping_interval_secs);
876 let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
877
878 let ping_network_state = Arc::clone(&network_state);
879 let (ping_handler, ping_controller) =
880 PingHandler::new(ping_interval, ping_timeout, ping_network_state);
881 let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
882 ProtocolHandle::Callback(Box::new(ping_handler))
883 });
884 protocol_metas.push(ping_meta);
885 Some(ping_controller)
886 } else {
887 None
888 };
889
890 if config
892 .support_protocols
893 .contains(&SupportProtocol::Discovery)
894 {
895 let addr_mgr = DiscoveryAddressManager {
896 network_state: Arc::clone(&network_state),
897 discovery_local_address: config.discovery_local_address,
898 };
899 let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
900 ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
901 addr_mgr,
902 config
903 .discovery_announce_check_interval_secs
904 .map(Duration::from_secs),
905 )))
906 });
907 protocol_metas.push(disc_meta);
908 }
909
910 if config.support_protocols.contains(&SupportProtocol::Feeler) {
912 let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
913 let network_state = Arc::clone(&network_state);
914 move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
915 });
916 protocol_metas.push(feeler_meta);
917 }
918
919 if config
921 .support_protocols
922 .contains(&SupportProtocol::DisconnectMessage)
923 {
924 let disconnect_message_state = Arc::clone(&network_state);
925 let disconnect_message_meta = SupportProtocols::DisconnectMessage
926 .build_meta_with_service_handle(move || {
927 ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
928 disconnect_message_state,
929 )))
930 });
931 protocol_metas.push(disconnect_message_meta);
932 }
933
934 let mut service_builder = ServiceBuilder::default();
935 let yamux_config = YamuxConfig {
936 max_stream_count: protocol_metas.len(),
937 max_stream_window_size: 1024 * 1024,
938 ..Default::default()
939 };
940 for meta in protocol_metas.into_iter() {
941 network_state
942 .protocols
943 .write()
944 .push((meta.id(), meta.name(), meta.support_versions()));
945 service_builder = service_builder.insert_protocol(meta);
946 }
947 let event_handler = EventHandler {
948 network_state: Arc::clone(&network_state),
949 };
950 service_builder = service_builder
951 .handshake_type(network_state.local_private_key.clone().into())
952 .yamux_config(yamux_config)
953 .forever(true)
954 .max_connection_number(1024)
955 .set_send_buffer_size(config.max_send_buffer())
956 .set_channel_size(config.channel_size())
957 .timeout(Duration::from_secs(5));
958
959 #[cfg(not(target_family = "wasm"))]
960 {
961 service_builder = service_builder.upnp(config.upnp);
962 }
963
964 #[cfg(target_os = "linux")]
965 let p2p_service = {
966 if config.reuse_port_on_linux {
967 let iter = config.listen_addresses.iter();
968
969 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
970 enum BindType {
971 None,
972 Ws,
973 Tcp,
974 Both,
975 }
976 impl BindType {
977 fn transform(&mut self, other: TransportType) {
978 match (&self, other) {
979 (BindType::None, TransportType::Ws) => *self = BindType::Ws,
980 (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
981 (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
982 (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
983 _ => (),
984 }
985 }
986
987 fn is_ready(&self) -> bool {
988 matches!(self, BindType::Both)
990 }
991 }
992
993 let mut init = BindType::None;
994
995 for multi_addr in iter {
996 if init.is_ready() {
997 break;
998 }
999 match find_type(multi_addr) {
1000 TransportType::Tcp => {
1001 if matches!(init, BindType::Tcp) {
1003 continue;
1004 }
1005 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1006 let domain = socket2::Domain::for_address(addr);
1007 let bind_fn = move |socket: p2p::service::TcpSocket| {
1008 let socket_ref = socket2::SockRef::from(&socket);
1009 #[cfg(all(
1010 unix,
1011 not(target_os = "solaris"),
1012 not(target_os = "illumos")
1013 ))]
1014 socket_ref.set_reuse_port(true)?;
1015 socket_ref.set_reuse_address(true)?;
1016 if socket_ref.domain()? == domain {
1017 socket_ref.bind(&addr.into())?;
1018 }
1019 Ok(socket)
1020 };
1021 init.transform(TransportType::Tcp);
1022 service_builder = service_builder.tcp_config(bind_fn);
1023 }
1024 }
1025 TransportType::Ws | TransportType::Wss => {
1026 if matches!(init, BindType::Ws) {
1028 continue;
1029 }
1030 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1031 let domain = socket2::Domain::for_address(addr);
1032 let bind_fn = move |socket: p2p::service::TcpSocket| {
1033 let socket_ref = socket2::SockRef::from(&socket);
1034 #[cfg(all(
1035 unix,
1036 not(target_os = "solaris"),
1037 not(target_os = "illumos")
1038 ))]
1039 socket_ref.set_reuse_port(true)?;
1040 socket_ref.set_reuse_address(true)?;
1041 if socket_ref.domain()? == domain {
1042 socket_ref.bind(&addr.into())?;
1043 }
1044 Ok(socket)
1045 };
1046 init.transform(TransportType::Ws);
1047 service_builder = service_builder.tcp_config_on_ws(bind_fn);
1048 }
1049 }
1050 }
1051 }
1052 }
1053
1054 service_builder.build(event_handler)
1055 };
1056
1057 #[cfg(not(target_os = "linux"))]
1058 let p2p_service = service_builder.build(event_handler);
1065
1066 let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1068 let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1069 Arc::clone(&network_state),
1070 p2p_service.control().to_owned().into(),
1071 required_protocol_ids,
1072 );
1073 let mut bg_services = vec![
1074 Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1075 Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1076 ];
1077 if config.outbound_peer_service_enabled() {
1078 let outbound_peer_service = OutboundPeerService::new(
1079 Arc::clone(&network_state),
1080 p2p_service.control().to_owned().into(),
1081 Duration::from_secs(config.connect_outbound_interval_secs),
1082 transport_type,
1083 );
1084 bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1085 };
1086
1087 #[cfg(feature = "with_dns_seeding")]
1088 if config.dns_seeding_service_enabled() {
1089 let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1090 Arc::clone(&network_state),
1091 config.dns_seeds.clone(),
1092 );
1093 bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1094 };
1095
1096 NetworkService {
1097 p2p_service,
1098 network_state,
1099 ping_controller,
1100 bg_services,
1101 version: identify_announce.1,
1102 }
1103 }
1104
1105 pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1107 let config = self.network_state.config.clone();
1108
1109 let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1110
1111 for addr in self.network_state.config.whitelist_peers() {
1113 debug!("Dial whitelist_peers {:?}", addr);
1114 self.network_state.dial_identify(&p2p_control, addr);
1115 }
1116
1117 let target = &self.network_state.required_flags;
1118
1119 let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1122 let count = max((config.max_outbound_peers >> 1) as usize, 1);
1123 let mut addrs: Vec<_> = peer_store
1124 .fetch_addrs_to_attempt(count, *target, |_| true)
1125 .into_iter()
1126 .map(|paddr| paddr.addr)
1127 .collect();
1128 let bootnodes = self
1130 .network_state
1131 .bootnodes
1132 .iter()
1133 .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1134 .into_iter()
1135 .cloned();
1136 addrs.extend(bootnodes);
1137 addrs
1138 });
1139
1140 for addr in bootnodes {
1142 debug!("Dial bootnode {:?}", addr);
1143 self.network_state.dial_identify(&p2p_control, addr);
1144 }
1145
1146 let Self {
1147 mut p2p_service,
1148 network_state,
1149 ping_controller,
1150 bg_services,
1151 version,
1152 } = self;
1153
1154 let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1156 .into_iter()
1157 .map(|bg_service| {
1158 let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1159 (signal_sender, (bg_service, signal_receiver))
1160 })
1161 .unzip();
1162
1163 let receiver: CancellationToken = new_tokio_exit_rx();
1164 #[cfg(not(target_family = "wasm"))]
1165 let (start_sender, start_receiver) = mpsc::channel();
1166 {
1167 #[cfg(not(target_family = "wasm"))]
1168 let network_state = Arc::clone(&network_state);
1169 let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1170 handle.spawn_task(async move {
1171 #[cfg(not(target_family = "wasm"))]
1172 {
1173 let listen_addresses = {
1174 let mut addresses = config.listen_addresses.clone();
1175 if config.reuse_tcp_with_ws {
1176 let ws_listens = addresses
1177 .iter()
1178 .cloned()
1179 .filter_map(|mut addr| {
1180 if matches!(find_type(&addr), TransportType::Tcp) {
1181 addr.push(Protocol::Ws);
1182 Some(addr)
1183 } else {
1184 None
1185 }
1186 })
1187 .collect::<Vec<_>>();
1188
1189 addresses.extend(ws_listens);
1190 }
1191 let mut addresses = addresses
1192 .into_iter()
1193 .collect::<HashSet<_>>()
1194 .into_iter()
1195 .collect::<Vec<_>>();
1196 addresses.sort_by(|a, b| {
1197 let ty_a = find_type(a);
1198 let ty_b = find_type(b);
1199
1200 ty_a.cmp(&ty_b)
1201 });
1202
1203 addresses
1204 };
1205
1206 for addr in &listen_addresses {
1207 match p2p_service.listen(addr.to_owned()).await {
1208 Ok(listen_address) => {
1209 info!("Listen on address: {}", listen_address);
1210 network_state
1211 .listened_addrs
1212 .write()
1213 .push(listen_address.clone());
1214 }
1215 Err(err) => {
1216 warn!(
1217 "Listen on address {} failed, due to error: {}",
1218 addr.clone(),
1219 err
1220 );
1221 start_sender
1222 .send(Err(Error::P2P(P2PError::Transport(err))))
1223 .expect("channel abnormal shutdown");
1224 return;
1225 }
1226 };
1227 }
1228 start_sender.send(Ok(())).unwrap();
1229 }
1230
1231 p2p::runtime::spawn(async move { p2p_service.run().await });
1232 tokio::select! {
1233 _ = receiver.cancelled() => {
1234 info!("NetworkService receive exit signal, start shutdown...");
1235 let _ = p2p_control.shutdown().await;
1236 drop(bg_signals);
1238
1239 info!("NetworkService shutdown now");
1240 },
1241 else => {
1242 let _ = p2p_control.shutdown().await;
1243 drop(bg_signals);
1245 },
1246 }
1247 });
1248 }
1249 for (mut service, mut receiver) in bg_receivers {
1250 handle.spawn_task(async move {
1251 loop {
1252 tokio::select! {
1253 _ = &mut service => {},
1254 _ = &mut receiver => break
1255 }
1256 }
1257 });
1258 }
1259 #[cfg(not(target_family = "wasm"))]
1260 if let Ok(Err(e)) = start_receiver.recv() {
1261 return Err(e);
1262 }
1263
1264 Ok(NetworkController {
1265 version,
1266 network_state,
1267 p2p_control,
1268 ping_controller,
1269 })
1270 }
1271}
1272
1273#[derive(Clone)]
1275pub struct NetworkController {
1276 version: String,
1277 network_state: Arc<NetworkState>,
1278 p2p_control: ServiceControl,
1279 ping_controller: Option<Sender<()>>,
1280}
1281
1282impl NetworkController {
1283 pub fn init_ckb2023(&self) {
1285 self.network_state.ckb2023.store(true, Ordering::SeqCst);
1286 }
1287
1288 pub fn load_ckb2023(&self) -> bool {
1290 self.network_state.ckb2023.load(Ordering::SeqCst)
1291 }
1292
1293 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1295 self.network_state.public_urls(max_urls)
1296 }
1297
1298 pub fn version(&self) -> &String {
1300 &self.version
1301 }
1302
1303 pub fn node_id(&self) -> String {
1305 self.network_state.node_id()
1306 }
1307
1308 pub fn p2p_control(&self) -> &ServiceControl {
1310 &self.p2p_control
1311 }
1312
1313 pub fn add_node(&self, address: Multiaddr) {
1315 self.network_state.add_node(&self.p2p_control, address)
1316 }
1317
1318 pub fn remove_node(&self, peer_id: &PeerId) {
1320 if let Some(session_id) = self
1321 .network_state
1322 .peer_registry
1323 .read()
1324 .get_key_by_peer_id(peer_id)
1325 {
1326 if let Err(err) =
1327 disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1328 {
1329 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1330 }
1331 } else {
1332 error!("Cannot find peer {:?}", peer_id);
1333 }
1334 }
1335
1336 pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1338 self.network_state
1339 .peer_store
1340 .lock()
1341 .ban_list()
1342 .get_banned_addrs()
1343 }
1344
1345 pub fn clear_banned_addrs(&self) {
1347 self.network_state.peer_store.lock().clear_ban_list();
1348 }
1349
1350 pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1352 self.network_state
1353 .peer_store
1354 .lock()
1355 .addr_manager()
1356 .get(addr)
1357 .cloned()
1358 }
1359
1360 pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1362 self.disconnect_peers_in_ip_range(address, &ban_reason);
1363 self.network_state
1364 .peer_store
1365 .lock()
1366 .ban_network(address, ban_until, ban_reason)
1367 }
1368
1369 pub fn unban(&self, address: &IpNetwork) {
1371 self.network_state
1372 .peer_store
1373 .lock()
1374 .mut_ban_list()
1375 .unban_network(address);
1376 }
1377
1378 pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1380 self.network_state.with_peer_registry(|reg| {
1381 reg.peers()
1382 .iter()
1383 .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1384 .collect::<Vec<_>>()
1385 })
1386 }
1387
1388 pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1390 self.network_state
1391 .ban_session(&self.p2p_control, peer_index, duration, reason);
1392 }
1393
1394 fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1396 self.network_state.with_peer_registry(|reg| {
1397 reg.peers().iter().for_each(|(peer_index, peer)| {
1398 if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1399 if address.contains(addr.ip()) {
1400 let _ = disconnect_with_message(
1401 &self.p2p_control,
1402 *peer_index,
1403 &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1404 );
1405 }
1406 }
1407 })
1408 });
1409 }
1410
1411 fn try_broadcast(
1412 &self,
1413 quick: bool,
1414 target: Option<SessionId>,
1415 proto_id: ProtocolId,
1416 data: Bytes,
1417 ) -> Result<(), SendErrorKind> {
1418 let now = Instant::now();
1419 loop {
1420 let target = target
1421 .map(TargetSession::Single)
1422 .unwrap_or(TargetSession::All);
1423 let result = if quick {
1424 self.p2p_control
1425 .quick_filter_broadcast(target, proto_id, data.clone())
1426 } else {
1427 self.p2p_control
1428 .filter_broadcast(target, proto_id, data.clone())
1429 };
1430 match result {
1431 Ok(()) => {
1432 return Ok(());
1433 }
1434 Err(SendErrorKind::WouldBlock) => {
1435 if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1436 warn!("Broadcast message to {} timeout", proto_id);
1437 return Err(SendErrorKind::WouldBlock);
1438 }
1439 thread::sleep(P2P_TRY_SEND_INTERVAL);
1440 }
1441 Err(err) => {
1442 warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1443 return Err(err);
1444 }
1445 }
1446 }
1447 }
1448
1449 pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1451 self.try_broadcast(false, None, proto_id, data)
1452 }
1453
1454 pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1456 self.try_broadcast(true, None, proto_id, data)
1457 }
1458
1459 pub fn send_message_to(
1461 &self,
1462 session_id: SessionId,
1463 proto_id: ProtocolId,
1464 data: Bytes,
1465 ) -> Result<(), SendErrorKind> {
1466 self.try_broadcast(false, Some(session_id), proto_id, data)
1467 }
1468
1469 pub fn is_active(&self) -> bool {
1471 self.network_state.is_active()
1472 }
1473
1474 pub fn set_active(&self, active: bool) {
1476 self.network_state.active.store(active, Ordering::Release);
1477 }
1478
1479 pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1481 self.network_state.protocols.read().clone()
1482 }
1483
1484 pub fn ping_peers(&self) {
1486 if let Some(mut ping_controller) = self.ping_controller.clone() {
1487 let _ignore = ping_controller.try_send(());
1488 }
1489 }
1490}
1491
1492pub(crate) fn disconnect_with_message(
1494 control: &ServiceControl,
1495 peer_index: SessionId,
1496 message: &str,
1497) -> Result<(), SendErrorKind> {
1498 if !message.is_empty() {
1499 let data = Bytes::from(message.as_bytes().to_vec());
1500 control.quick_send_message_to(
1502 peer_index,
1503 SupportProtocols::DisconnectMessage.protocol_id(),
1504 data,
1505 )?;
1506 }
1507 control.disconnect(peer_index)
1508}
1509
1510pub(crate) async fn async_disconnect_with_message(
1511 control: &ServiceAsyncControl,
1512 peer_index: SessionId,
1513 message: &str,
1514) -> Result<(), SendErrorKind> {
1515 if !message.is_empty() {
1516 let data = Bytes::from(message.as_bytes().to_vec());
1517 control
1519 .quick_send_message_to(
1520 peer_index,
1521 SupportProtocols::DisconnectMessage.protocol_id(),
1522 data,
1523 )
1524 .await?;
1525 }
1526 control.disconnect(peer_index).await
1527}
1528
1529#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1531pub enum TransportType {
1532 Tcp,
1534 Ws,
1536 Wss,
1538}
1539
1540pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1541 let mut iter = addr.iter();
1542
1543 iter.find_map(|proto| match proto {
1544 Protocol::Ws => Some(TransportType::Ws),
1545 Protocol::Wss => Some(TransportType::Wss),
1546 _ => None,
1547 })
1548 .unwrap_or(TransportType::Tcp)
1549}