1use crate::errors::Error;
3#[cfg(not(target_family = "wasm"))]
4use crate::errors::P2PError;
5use crate::peer_registry::{ConnectionStatus, PeerRegistry};
6use crate::peer_store::{
7 PeerStore,
8 types::{AddrInfo, BannedAddr},
9};
10use crate::protocols::{
11 disconnect_message::DisconnectMessageProtocol,
12 discovery::{DiscoveryAddressManager, DiscoveryProtocol},
13 feeler::Feeler,
14 identify::{Flags, IdentifyCallback, IdentifyProtocol},
15 ping::PingHandler,
16 support_protocols::SupportProtocols,
17};
18use crate::services::{
19 dump_peer_store::DumpPeerStoreService, outbound_peer::OutboundPeerService,
20 protocol_type_checker::ProtocolTypeCheckerService,
21};
22use crate::{Behaviour, CKBProtocol, Peer, PeerIndex, ProtocolId, ServiceControl};
23use ckb_app_config::{NetworkConfig, SupportProtocol, default_support_all_protocols};
24use ckb_logger::{debug, error, info, trace, warn};
25use ckb_spawn::Spawn;
26use ckb_stop_handler::{CancellationToken, broadcast_exit_signals, new_tokio_exit_rx};
27use ckb_systemtime::{Duration, Instant};
28use ckb_util::{Condvar, Mutex, RwLock};
29use futures::{Future, channel::mpsc::Sender};
30use ipnetwork::IpNetwork;
31use p2p::{
32 SessionId, async_trait,
33 builder::ServiceBuilder,
34 bytes::Bytes,
35 context::{ServiceContext, SessionContext},
36 error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
37 multiaddr::{Multiaddr, Protocol},
38 secio::{self, PeerId, SecioKeyPair, error::SecioError},
39 service::{
40 ProtocolHandle, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
41 TargetSession,
42 },
43 traits::ServiceHandle,
44 utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
45 yamux::config::Config as YamuxConfig,
46};
47use rand::prelude::IteratorRandom;
48#[cfg(feature = "with_sentry")]
49use sentry::{Level, capture_message, with_scope};
50#[cfg(not(target_family = "wasm"))]
51use std::sync::mpsc;
52use std::{
53 borrow::Cow,
54 cmp::max,
55 collections::{HashMap, HashSet},
56 pin::Pin,
57 sync::{
58 Arc,
59 atomic::{AtomicBool, Ordering},
60 },
61 thread,
62};
63use tokio::{self, sync::oneshot};
64
65const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
66const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
67const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
69
70pub struct NetworkState {
72 pub(crate) peer_registry: RwLock<PeerRegistry>,
73 pub(crate) peer_store: Mutex<PeerStore>,
74 pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
76 dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77 public_addrs: RwLock<HashSet<Multiaddr>>,
80 pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
81 local_private_key: secio::SecioKeyPair,
82 local_peer_id: PeerId,
83 pub(crate) bootnodes: Vec<Multiaddr>,
84 pub(crate) config: NetworkConfig,
85 pub(crate) active: AtomicBool,
86 pub(crate) protocols: RwLock<Vec<(ProtocolId, String, Vec<String>)>>,
89 pub(crate) required_flags: Flags,
90
91 pub(crate) ckb2023: AtomicBool,
92}
93
94impl NetworkState {
95 #[cfg(not(target_family = "wasm"))]
97 pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
98 config.create_dir_if_not_exists()?;
99 let local_private_key = config.fetch_private_key()?;
100 let local_peer_id = local_private_key.peer_id();
101 let public_addrs: HashSet<Multiaddr> = config
103 .listen_addresses
104 .iter()
105 .chain(config.public_addresses.iter())
106 .cloned()
107 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
108 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
109 _ => {
110 match extract_peer_id(&addr) {
111 Some(peer_id) if peer_id != local_peer_id => {
112 error!("Don't include addresses that not associated with this node in the public_addresses list: {:?}", addr);
113 std::process::exit(1);
114 }
115 Some(_) => (),
116 None => addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))),
117 }
118 Some(addr)
119 }
120 })
121 .collect();
122 info!("Loading the peer store. This process may take a few seconds to complete.");
123
124 let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
125 config.peer_store_path(),
126 ));
127 let bootnodes = config.bootnodes();
128
129 let peer_registry = PeerRegistry::new(
130 config.max_inbound_peers(),
131 config.max_outbound_peers(),
132 config.whitelist_only,
133 config.whitelist_peers(),
134 );
135
136 Ok(NetworkState {
137 peer_store,
138 config,
139 bootnodes,
140 peer_registry: RwLock::new(peer_registry),
141 dialing_addrs: RwLock::new(HashMap::default()),
142 public_addrs: RwLock::new(public_addrs),
143 listened_addrs: RwLock::new(Vec::new()),
144 pending_observed_addrs: RwLock::new(HashSet::default()),
145 local_private_key,
146 local_peer_id,
147 active: AtomicBool::new(true),
148 protocols: RwLock::new(Vec::new()),
149 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
150 ckb2023: AtomicBool::new(false),
151 })
152 }
153
154 #[cfg(target_family = "wasm")]
155 pub async fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
156 let local_private_key = config.fetch_private_key()?;
157 let local_peer_id = local_private_key.peer_id();
158 let public_addrs: HashSet<Multiaddr> = config
160 .listen_addresses
161 .iter()
162 .chain(config.public_addresses.iter())
163 .cloned()
164 .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
165 Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
166 _ => {
167 if extract_peer_id(&addr).is_none() {
168 addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
169 }
170 Some(addr)
171 }
172 })
173 .collect();
174 info!("Loading the peer store. This process may take a few seconds to complete.");
175 let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await);
176 let bootnodes = config.bootnodes();
177
178 let peer_registry = PeerRegistry::new(
179 config.max_inbound_peers(),
180 config.max_outbound_peers(),
181 config.whitelist_only,
182 config.whitelist_peers(),
183 );
184 Ok(NetworkState {
185 peer_store,
186 config,
187 bootnodes,
188 peer_registry: RwLock::new(peer_registry),
189 dialing_addrs: RwLock::new(HashMap::default()),
190 public_addrs: RwLock::new(public_addrs),
191 listened_addrs: RwLock::new(Vec::new()),
192 pending_observed_addrs: RwLock::new(HashSet::default()),
193 local_private_key,
194 local_peer_id,
195 active: AtomicBool::new(true),
196 protocols: RwLock::new(Vec::new()),
197 required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY,
198 ckb2023: AtomicBool::new(false),
199 })
200 }
201
202 pub fn ckb2023(self, init: bool) -> Self {
204 self.ckb2023.store(init, Ordering::SeqCst);
205 self
206 }
207
208 pub fn required_flags(mut self, flags: Flags) -> Self {
211 self.required_flags = flags;
212 self
213 }
214
215 pub(crate) fn report_session(
216 &self,
217 p2p_control: &ServiceControl,
218 session_id: SessionId,
219 behaviour: Behaviour,
220 ) {
221 if let Some(addr) = self.with_peer_registry(|reg| {
222 reg.get_peer(session_id)
223 .filter(|peer| !peer.is_whitelist)
224 .map(|peer| peer.connected_addr.clone())
225 }) {
226 trace!("Report {:?} because {:?}", addr, behaviour);
227 let report_result = self.peer_store.lock().report(&addr, behaviour);
228 if report_result.is_banned() {
229 if let Err(err) = disconnect_with_message(p2p_control, session_id, "banned") {
230 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
231 }
232 }
233 } else {
234 debug!(
235 "Report {} failure: not found in peer registry or it is on the whitelist",
236 session_id
237 );
238 }
239 }
240
241 pub(crate) fn ban_session(
242 &self,
243 p2p_control: &ServiceControl,
244 session_id: SessionId,
245 duration: Duration,
246 reason: String,
247 ) {
248 if let Some(addr) = self.with_peer_registry(|reg| {
249 reg.get_peer(session_id)
250 .filter(|peer| !peer.is_whitelist)
251 .map(|peer| peer.connected_addr.clone())
252 }) {
253 info!(
254 "Ban peer {:?} for {} seconds, reason: {}",
255 addr,
256 duration.as_secs(),
257 reason
258 );
259 if let Some(metrics) = ckb_metrics::handle() {
260 metrics.ckb_network_ban_peer.inc();
261 }
262 if let Some(peer) = self.with_peer_registry_mut(|reg| reg.remove_peer(session_id)) {
263 let message = format!("Ban for {} seconds, reason: {}", duration.as_secs(), reason);
264 self.peer_store.lock().ban_addr(
265 &peer.connected_addr,
266 duration.as_millis() as u64,
267 reason,
268 );
269 if let Err(err) =
270 disconnect_with_message(p2p_control, peer.session_id, message.as_str())
271 {
272 debug!("Disconnect failed {:?}, error: {:?}", peer.session_id, err);
273 }
274 }
275 } else {
276 debug!(
277 "Ban session({}) failed: not found in peer registry or it is on the whitelist",
278 session_id
279 );
280 }
281 }
282
283 pub(crate) fn accept_peer(
284 &self,
285 session_context: &SessionContext,
286 ) -> Result<Option<Peer>, Error> {
287 let mut peer_store = self.peer_store.lock();
290 let accept_peer_result = {
291 self.peer_registry.write().accept_peer(
292 session_context.address.clone(),
293 session_context.id,
294 session_context.ty,
295 &mut peer_store,
296 )
297 };
298 accept_peer_result
299 }
300
301 pub fn with_peer_registry<F, T>(&self, callback: F) -> T
303 where
304 F: FnOnce(&PeerRegistry) -> T,
305 {
306 callback(&self.peer_registry.read())
307 }
308
309 pub(crate) fn with_peer_registry_mut<F, T>(&self, callback: F) -> T
311 where
312 F: FnOnce(&mut PeerRegistry) -> T,
313 {
314 callback(&mut self.peer_registry.write())
315 }
316
317 pub(crate) fn with_peer_store_mut<F, T>(&self, callback: F) -> T
319 where
320 F: FnOnce(&mut PeerStore) -> T,
321 {
322 callback(&mut self.peer_store.lock())
323 }
324
325 pub fn local_peer_id(&self) -> &PeerId {
327 &self.local_peer_id
328 }
329
330 pub fn local_private_key(&self) -> &secio::SecioKeyPair {
332 &self.local_private_key
333 }
334
335 pub fn node_id(&self) -> String {
337 self.local_peer_id().to_base58()
338 }
339
340 pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341 self.public_addrs
342 .read()
343 .iter()
344 .take(count)
345 .cloned()
346 .collect()
347 }
348
349 pub(crate) fn connection_status(&self) -> ConnectionStatus {
350 self.peer_registry.read().connection_status()
351 }
352
353 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
355 let listened_addrs = self.listened_addrs.read();
356 self.public_addrs(max_urls.saturating_sub(listened_addrs.len()))
357 .into_iter()
358 .filter_map(|addr| {
359 if !listened_addrs.contains(&addr) {
360 Some((addr, 1))
361 } else {
362 None
363 }
364 })
365 .chain(listened_addrs.iter().map(|addr| (addr.to_owned(), 1)))
366 .map(|(addr, score)| (addr.to_string(), score))
367 .collect()
368 }
369
370 pub(crate) fn add_node(&self, p2p_control: &ServiceControl, address: Multiaddr) {
371 self.dial_identify(p2p_control, address);
372 }
373
374 pub fn get_protocol_ids<F: Fn(ProtocolId) -> bool>(&self, filter: F) -> Vec<ProtocolId> {
376 self.protocols
377 .read()
378 .iter()
379 .filter_map(|&(id, _, _)| if filter(id) { Some(id) } else { None })
380 .collect::<Vec<_>>()
381 }
382
383 pub(crate) fn can_dial(&self, addr: &Multiaddr) -> bool {
384 let peer_id = extract_peer_id(addr);
385 if peer_id.is_none() {
386 error!("Do not dial addr without peer id, addr: {}", addr);
387 return false;
388 }
389 let peer_id = peer_id.as_ref().unwrap();
390
391 if self.local_peer_id() == peer_id {
392 trace!("Do not dial self: {:?}, {}", peer_id, addr);
393 return false;
394 }
395 if self.public_addrs.read().contains(addr) {
396 trace!(
397 "Do not dial listened address(self): {:?}, {}",
398 peer_id, addr
399 );
400 return false;
401 }
402
403 let peer_in_registry = self.with_peer_registry(|reg| {
404 reg.get_key_by_peer_id(peer_id).is_some() || reg.is_feeler(addr)
405 });
406 if peer_in_registry {
407 trace!("Do not dial peer in registry: {:?}, {}", peer_id, addr);
408 return false;
409 }
410
411 if let Some(dial_started) = self.dialing_addrs.read().get(peer_id) {
412 trace!(
413 "Do not send repeated dial commands to network service: {:?}, {}",
414 peer_id, addr
415 );
416 if Instant::now().saturating_duration_since(*dial_started) > DIAL_HANG_TIMEOUT {
417 #[cfg(feature = "with_sentry")]
418 with_scope(
419 |scope| scope.set_fingerprint(Some(&["ckb-network", "dialing-timeout"])),
420 || {
421 capture_message(
422 &format!(
423 "Dialing {:?}, {:?} for more than {} seconds, \
424 something is wrong in network service",
425 peer_id,
426 addr,
427 DIAL_HANG_TIMEOUT.as_secs(),
428 ),
429 Level::Warning,
430 )
431 },
432 );
433 }
434 return false;
435 }
436
437 true
438 }
439
440 pub(crate) fn dial_success(&self, addr: &Multiaddr) {
441 if let Some(peer_id) = extract_peer_id(addr) {
442 self.dialing_addrs.write().remove(&peer_id);
443 }
444 }
445
446 pub(crate) fn dial_failed(&self, addr: &Multiaddr) {
447 self.with_peer_registry_mut(|reg| {
448 reg.remove_feeler(addr);
449 });
450
451 if let Some(peer_id) = extract_peer_id(addr) {
452 self.dialing_addrs.write().remove(&peer_id);
453 }
454 }
455
456 fn dial_inner(
459 &self,
460 p2p_control: &ServiceControl,
461 addr: Multiaddr,
462 target: TargetProtocol,
463 ) -> Result<(), Error> {
464 if !self.can_dial(&addr) {
465 return Err(Error::Dial(format!("ignore dialing addr {addr}")));
466 }
467
468 debug!("Dialing {addr}");
469 p2p_control.dial(addr.clone(), target)?;
470 self.dialing_addrs.write().insert(
471 extract_peer_id(&addr).expect("verified addr"),
472 Instant::now(),
473 );
474 Ok(())
475 }
476
477 pub fn dial_identify(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
479 if let Err(err) = self.dial_inner(
480 p2p_control,
481 addr,
482 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
483 ) {
484 debug!("dial_identify error: {err}");
485 }
486 }
487
488 pub fn dial_feeler(&self, p2p_control: &ServiceControl, addr: Multiaddr) {
490 if let Err(err) = self.dial_inner(
491 p2p_control,
492 addr.clone(),
493 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
494 ) {
495 debug!("dial_feeler error {err}");
496 } else {
497 self.with_peer_registry_mut(|reg| {
498 reg.add_feeler(&addr);
499 });
500 }
501 }
502
503 pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
505 let mut pending_observed_addrs = self.pending_observed_addrs.write();
506 if pending_observed_addrs.is_empty() {
507 let addrs = self.public_addrs.read();
508 if addrs.is_empty() {
509 return;
510 }
511 if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
513 if let Err(err) = p2p_control.dial(
514 addr.clone(),
515 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
516 ) {
517 trace!("try_dial_observed_addrs {err} failed in public address")
518 }
519 }
520 } else {
521 for addr in pending_observed_addrs.drain() {
522 trace!("try dial observed addr: {:?}", addr);
523 if let Err(err) = p2p_control.dial(
524 addr,
525 TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
526 ) {
527 trace!("try_dial_observed_addrs {err} failed in pending observed addresses")
528 }
529 }
530 }
531 }
532
533 pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
535 let mut pending_observed_addrs = self.pending_observed_addrs.write();
536 pending_observed_addrs.extend(iter)
537 }
538
539 pub fn is_active(&self) -> bool {
541 self.active.load(Ordering::Acquire)
542 }
543}
544
545pub struct EventHandler {
547 pub(crate) network_state: Arc<NetworkState>,
548}
549
550impl EventHandler {
551 pub fn new(network_state: Arc<NetworkState>) -> Self {
553 Self { network_state }
554 }
555}
556
557pub trait ExitHandler: Send + Unpin + 'static {
559 fn notify_exit(&self);
561}
562
563#[derive(Clone, Default)]
565pub struct DefaultExitHandler {
566 lock: Arc<Mutex<()>>,
567 exit: Arc<Condvar>,
568}
569
570impl DefaultExitHandler {
571 pub fn wait_for_exit(&self) {
573 self.exit.wait(&mut self.lock.lock());
574 }
575}
576
577impl ExitHandler for DefaultExitHandler {
578 fn notify_exit(&self) {
579 self.exit.notify_all();
580 }
581}
582
583impl EventHandler {
584 fn inbound_eviction(&self) -> Vec<PeerIndex> {
585 if self.network_state.config.bootnode_mode {
586 let status = self.network_state.connection_status();
587
588 if status.max_inbound <= status.non_whitelist_inbound.saturating_add(10) {
589 self.network_state
590 .with_peer_registry(|registry| {
591 registry
592 .peers()
593 .values()
594 .filter(|peer| peer.is_inbound() && !peer.is_whitelist)
595 .map(|peer| peer.session_id)
596 .collect::<Vec<SessionId>>()
597 })
598 .into_iter()
599 .enumerate()
600 .filter_map(|(index, peer)| if index & 0x1 != 0 { Some(peer) } else { None })
601 .collect()
602 } else {
603 Vec::new()
604 }
605 } else {
606 Vec::new()
607 }
608 }
609}
610
611#[async_trait]
612impl ServiceHandle for EventHandler {
613 async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
614 match error {
615 ServiceError::DialerError { address, error } => {
616 let mut public_addrs = self.network_state.public_addrs.write();
617
618 match error {
619 DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
620 SecioError::ConnectSelf,
621 )) => {
622 debug!("dial observed address success: {:?}", address);
623 if let Some(ip) = multiaddr_to_socketaddr(&address) {
624 if is_reachable(ip.ip()) {
625 public_addrs.insert(address);
626 }
627 }
628 return;
629 }
630 DialerErrorKind::IoError(e)
631 if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
632 {
633 warn!("DialerError({}) {}", address, e);
634 }
635 _ => {
636 debug!("DialerError({}) {}", address, error);
637 }
638 }
639 if public_addrs.remove(&address) {
640 info!(
641 "Dial {} failed, remove it from network_state.public_addrs",
642 address
643 );
644 }
645 self.network_state.dial_failed(&address);
646 }
647 ServiceError::ProtocolError {
648 id,
649 proto_id,
650 error,
651 } => {
652 debug!("ProtocolError({}, {}) {}", id, proto_id, error);
653 let message = format!("ProtocolError id={proto_id}");
654 self.network_state.ban_session(
656 &context.control().clone().into(),
657 id,
658 Duration::from_secs(300),
659 message,
660 );
661 }
662 ServiceError::SessionTimeout { session_context } => {
663 debug!(
664 "SessionTimeout({}, {})",
665 session_context.id, session_context.address,
666 );
667 }
668 ServiceError::MuxerError {
669 session_context,
670 error,
671 } => {
672 debug!(
673 "MuxerError({}, {}), substream error {}, disconnect it",
674 session_context.id, session_context.address, error,
675 );
676 }
677 ServiceError::ListenError { address, error } => {
678 debug!("ListenError: address={:?}, error={:?}", address, error);
679 }
680 ServiceError::ProtocolSelectError {
681 proto_name,
682 session_context,
683 } => {
684 debug!(
685 "ProtocolSelectError: proto_name={:?}, session_id={}",
686 proto_name, session_context.id,
687 );
688 }
689 ServiceError::SessionBlocked { session_context } => {
690 debug!("SessionBlocked: {}", session_context.id);
691 }
692 ServiceError::ProtocolHandleError { proto_id, error } => {
693 debug!("ProtocolHandleError: {:?}, proto_id: {}", error, proto_id);
694
695 let ProtocolHandleErrorKind::AbnormallyClosed(opt_session_id) = error;
696 {
697 if let Some(id) = opt_session_id {
698 self.network_state.ban_session(
699 &context.control().clone().into(),
700 id,
701 Duration::from_secs(300),
702 format!("protocol {proto_id} panic when process peer message"),
703 );
704 }
705 #[cfg(feature = "with_sentry")]
706 with_scope(
707 |scope| scope.set_fingerprint(Some(&["ckb-network", "p2p-service-error"])),
708 || {
709 capture_message(
710 &format!(
711 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
712 ),
713 Level::Warning,
714 )
715 },
716 );
717 error!(
718 "ProtocolHandleError: AbnormallyClosed, proto_id: {opt_session_id:?}, session id: {opt_session_id:?}"
719 );
720
721 broadcast_exit_signals();
722 }
723 }
724 }
725 }
726
727 async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
728 match event {
730 ServiceEvent::SessionOpen { session_context } => {
731 debug!(
732 "SessionOpen({}, {})",
733 session_context.id, session_context.address,
734 );
735
736 self.network_state.dial_success(&session_context.address);
737
738 let iter = self.inbound_eviction();
739
740 let control = context.control().clone().into();
741
742 for peer in iter {
743 if let Err(err) =
744 disconnect_with_message(&control, peer, "bootnode random eviction")
745 {
746 debug!("Inbound eviction failed {:?}, error: {:?}", peer, err);
747 }
748 }
749
750 if self
751 .network_state
752 .with_peer_registry(|reg| reg.is_feeler(&session_context.address))
753 {
754 debug!(
755 "Feeler connected {} => {}",
756 session_context.id, session_context.address,
757 );
758 } else {
759 match self.network_state.accept_peer(&session_context) {
760 Ok(Some(evicted_peer)) => {
761 debug!(
762 "Disconnect peer, {} => {}",
763 evicted_peer.session_id, evicted_peer.connected_addr,
764 );
765 if let Err(err) = disconnect_with_message(
766 &control,
767 evicted_peer.session_id,
768 "evict because accepted better peer",
769 ) {
770 debug!(
771 "Disconnect failed {:?}, error: {:?}",
772 evicted_peer.session_id, err
773 );
774 }
775 }
776 Ok(None) => debug!(
777 "{} open, registry {} success",
778 session_context.id, session_context.address,
779 ),
780 Err(err) => {
781 debug!(
782 "Peer registry failed {:?}. Disconnect {} => {}",
783 err, session_context.id, session_context.address,
784 );
785 if let Err(err) = disconnect_with_message(
786 &control,
787 session_context.id,
788 "reject peer connection",
789 ) {
790 debug!(
791 "Disconnect failed {:?}, error: {:?}",
792 session_context.id, err
793 );
794 }
795 }
796 }
797 }
798 }
799 ServiceEvent::SessionClose { session_context } => {
800 debug!(
801 "SessionClose({}, {})",
802 session_context.id, session_context.address,
803 );
804 let peer_exists = self.network_state.with_peer_registry_mut(|reg| {
805 reg.remove_feeler(&session_context.address);
807 reg.remove_peer(session_context.id).is_some()
808 });
809 if peer_exists {
810 debug!(
811 "{} closed. Remove {} from peer_registry",
812 session_context.id, session_context.address,
813 );
814 self.network_state.with_peer_store_mut(|peer_store| {
815 peer_store.remove_disconnected_peer(&session_context.address);
816 });
817 }
818 }
819 _ => {
820 info!("p2p service event: {:?}", event);
821 }
822 }
823 }
824}
825
826pub struct NetworkService {
828 p2p_service: Service<EventHandler, SecioKeyPair>,
829 network_state: Arc<NetworkState>,
830 ping_controller: Option<Sender<()>>,
831 bg_services: Vec<Pin<Box<dyn Future<Output = ()> + 'static + Send>>>,
833 version: String,
834}
835
836impl NetworkService {
837 pub fn new(
839 network_state: Arc<NetworkState>,
840 protocols: Vec<CKBProtocol>,
841 required_protocol_ids: Vec<ProtocolId>,
842 identify_announce: (String, String, Flags),
844 transport_type: TransportType,
845 ) -> Self {
846 let config = &network_state.config;
847
848 if config.support_protocols.iter().collect::<HashSet<_>>()
849 != default_support_all_protocols()
850 .iter()
851 .collect::<HashSet<_>>()
852 {
853 warn!(
854 "Customized supported protocols: {:?}",
855 config.support_protocols
856 );
857 }
858
859 let mut protocol_metas = protocols
861 .into_iter()
862 .map(CKBProtocol::build)
863 .collect::<Vec<_>>();
864
865 let identify_callback = IdentifyCallback::new(
869 Arc::clone(&network_state),
870 identify_announce.0,
871 identify_announce.1.clone(),
872 identify_announce.2,
873 );
874 let identify_meta = SupportProtocols::Identify.build_meta_with_service_handle(move || {
875 ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(identify_callback)))
876 });
877 protocol_metas.push(identify_meta);
878
879 let ping_controller = if config.support_protocols.contains(&SupportProtocol::Ping) {
881 let ping_interval = Duration::from_secs(config.ping_interval_secs);
882 let ping_timeout = Duration::from_secs(config.ping_timeout_secs);
883
884 let ping_network_state = Arc::clone(&network_state);
885 let (ping_handler, ping_controller) =
886 PingHandler::new(ping_interval, ping_timeout, ping_network_state);
887 let ping_meta = SupportProtocols::Ping.build_meta_with_service_handle(move || {
888 ProtocolHandle::Callback(Box::new(ping_handler))
889 });
890 protocol_metas.push(ping_meta);
891 Some(ping_controller)
892 } else {
893 None
894 };
895
896 if config
898 .support_protocols
899 .contains(&SupportProtocol::Discovery)
900 {
901 let addr_mgr = DiscoveryAddressManager {
902 network_state: Arc::clone(&network_state),
903 discovery_local_address: config.discovery_local_address,
904 };
905 let disc_meta = SupportProtocols::Discovery.build_meta_with_service_handle(move || {
906 ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(
907 addr_mgr,
908 config
909 .discovery_announce_check_interval_secs
910 .map(Duration::from_secs),
911 )))
912 });
913 protocol_metas.push(disc_meta);
914 }
915
916 if config.support_protocols.contains(&SupportProtocol::Feeler) {
918 let feeler_meta = SupportProtocols::Feeler.build_meta_with_service_handle({
919 let network_state = Arc::clone(&network_state);
920 move || ProtocolHandle::Callback(Box::new(Feeler::new(Arc::clone(&network_state))))
921 });
922 protocol_metas.push(feeler_meta);
923 }
924
925 if config
927 .support_protocols
928 .contains(&SupportProtocol::DisconnectMessage)
929 {
930 let disconnect_message_state = Arc::clone(&network_state);
931 let disconnect_message_meta = SupportProtocols::DisconnectMessage
932 .build_meta_with_service_handle(move || {
933 ProtocolHandle::Callback(Box::new(DisconnectMessageProtocol::new(
934 disconnect_message_state,
935 )))
936 });
937 protocol_metas.push(disconnect_message_meta);
938 }
939
940 let mut service_builder = ServiceBuilder::default();
941 let yamux_config = YamuxConfig {
942 max_stream_count: protocol_metas.len(),
943 max_stream_window_size: 1024 * 1024,
944 ..Default::default()
945 };
946 for meta in protocol_metas.into_iter() {
947 network_state
948 .protocols
949 .write()
950 .push((meta.id(), meta.name(), meta.support_versions()));
951 service_builder = service_builder.insert_protocol(meta);
952 }
953 let event_handler = EventHandler {
954 network_state: Arc::clone(&network_state),
955 };
956 service_builder = service_builder
957 .handshake_type(network_state.local_private_key.clone().into())
958 .yamux_config(yamux_config)
959 .forever(true)
960 .max_connection_number(1024)
961 .set_send_buffer_size(config.max_send_buffer())
962 .set_channel_size(config.channel_size())
963 .timeout(Duration::from_secs(5));
964
965 #[cfg(not(target_family = "wasm"))]
966 {
967 service_builder = service_builder.upnp(config.upnp);
968 }
969
970 #[cfg(target_os = "linux")]
971 let p2p_service = {
972 if config.reuse_port_on_linux {
973 let iter = config.listen_addresses.iter();
974
975 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
976 enum BindType {
977 None,
978 Ws,
979 Tcp,
980 Both,
981 }
982 impl BindType {
983 fn transform(&mut self, other: TransportType) {
984 match (&self, other) {
985 (BindType::None, TransportType::Ws) => *self = BindType::Ws,
986 (BindType::None, TransportType::Tcp) => *self = BindType::Tcp,
987 (BindType::Ws, TransportType::Tcp) => *self = BindType::Both,
988 (BindType::Tcp, TransportType::Ws) => *self = BindType::Both,
989 _ => (),
990 }
991 }
992
993 fn is_ready(&self) -> bool {
994 matches!(self, BindType::Both)
996 }
997 }
998
999 let mut init = BindType::None;
1000
1001 for multi_addr in iter {
1002 if init.is_ready() {
1003 break;
1004 }
1005 match find_type(multi_addr) {
1006 TransportType::Tcp => {
1007 if matches!(init, BindType::Tcp) {
1009 continue;
1010 }
1011 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1012 let domain = socket2::Domain::for_address(addr);
1013 let bind_fn = move |socket: p2p::service::TcpSocket| {
1014 let socket_ref = socket2::SockRef::from(&socket);
1015 #[cfg(all(
1016 unix,
1017 not(target_os = "solaris"),
1018 not(target_os = "illumos")
1019 ))]
1020 socket_ref.set_reuse_port(true)?;
1021 socket_ref.set_reuse_address(true)?;
1022 if socket_ref.domain()? == domain {
1023 socket_ref.bind(&addr.into())?;
1024 }
1025 Ok(socket)
1026 };
1027 init.transform(TransportType::Tcp);
1028 service_builder = service_builder.tcp_config(bind_fn);
1029 }
1030 }
1031 TransportType::Ws | TransportType::Wss => {
1032 if matches!(init, BindType::Ws) {
1034 continue;
1035 }
1036 if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
1037 let domain = socket2::Domain::for_address(addr);
1038 let bind_fn = move |socket: p2p::service::TcpSocket| {
1039 let socket_ref = socket2::SockRef::from(&socket);
1040 #[cfg(all(
1041 unix,
1042 not(target_os = "solaris"),
1043 not(target_os = "illumos")
1044 ))]
1045 socket_ref.set_reuse_port(true)?;
1046 socket_ref.set_reuse_address(true)?;
1047 if socket_ref.domain()? == domain {
1048 socket_ref.bind(&addr.into())?;
1049 }
1050 Ok(socket)
1051 };
1052 init.transform(TransportType::Ws);
1053 service_builder = service_builder.tcp_config_on_ws(bind_fn);
1054 }
1055 }
1056 }
1057 }
1058 }
1059
1060 service_builder.build(event_handler)
1061 };
1062
1063 #[cfg(not(target_os = "linux"))]
1064 let p2p_service = service_builder.build(event_handler);
1071
1072 let dump_peer_store_service = DumpPeerStoreService::new(Arc::clone(&network_state));
1074 let protocol_type_checker_service = ProtocolTypeCheckerService::new(
1075 Arc::clone(&network_state),
1076 p2p_service.control().to_owned().into(),
1077 required_protocol_ids,
1078 );
1079 let mut bg_services = vec![
1080 Box::pin(dump_peer_store_service) as Pin<Box<_>>,
1081 Box::pin(protocol_type_checker_service) as Pin<Box<_>>,
1082 ];
1083 if config.outbound_peer_service_enabled() {
1084 let outbound_peer_service = OutboundPeerService::new(
1085 Arc::clone(&network_state),
1086 p2p_service.control().to_owned().into(),
1087 Duration::from_secs(config.connect_outbound_interval_secs),
1088 transport_type,
1089 );
1090 bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
1091 };
1092
1093 #[cfg(feature = "with_dns_seeding")]
1094 if config.dns_seeding_service_enabled() {
1095 let dns_seeding_service = crate::services::dns_seeding::DnsSeedingService::new(
1096 Arc::clone(&network_state),
1097 config.dns_seeds.clone(),
1098 );
1099 bg_services.push(Box::pin(dns_seeding_service.start()) as Pin<Box<_>>);
1100 };
1101
1102 NetworkService {
1103 p2p_service,
1104 network_state,
1105 ping_controller,
1106 bg_services,
1107 version: identify_announce.1,
1108 }
1109 }
1110
1111 pub fn start<S: Spawn>(self, handle: &S) -> Result<NetworkController, Error> {
1113 let config = self.network_state.config.clone();
1114
1115 let p2p_control: ServiceControl = self.p2p_service.control().to_owned().into();
1116
1117 for addr in self.network_state.config.whitelist_peers() {
1119 debug!("Dial whitelist_peers {:?}", addr);
1120 self.network_state.dial_identify(&p2p_control, addr);
1121 }
1122
1123 let target = &self.network_state.required_flags;
1124
1125 let bootnodes = self.network_state.with_peer_store_mut(|peer_store| {
1128 let count = max((config.max_outbound_peers >> 1) as usize, 1);
1129 let mut addrs: Vec<_> = peer_store
1130 .fetch_addrs_to_attempt(count, *target, |_| true)
1131 .into_iter()
1132 .map(|paddr| paddr.addr)
1133 .collect();
1134 let bootnodes = self
1136 .network_state
1137 .bootnodes
1138 .iter()
1139 .choose_multiple(&mut rand::thread_rng(), count.saturating_sub(addrs.len()))
1140 .into_iter()
1141 .cloned();
1142 addrs.extend(bootnodes);
1143 addrs
1144 });
1145
1146 for addr in bootnodes {
1148 debug!("Dial bootnode {:?}", addr);
1149 self.network_state.dial_identify(&p2p_control, addr);
1150 }
1151
1152 let Self {
1153 mut p2p_service,
1154 network_state,
1155 ping_controller,
1156 bg_services,
1157 version,
1158 } = self;
1159
1160 let (bg_signals, bg_receivers): (Vec<_>, Vec<_>) = bg_services
1162 .into_iter()
1163 .map(|bg_service| {
1164 let (signal_sender, signal_receiver) = oneshot::channel::<()>();
1165 (signal_sender, (bg_service, signal_receiver))
1166 })
1167 .unzip();
1168
1169 let receiver: CancellationToken = new_tokio_exit_rx();
1170 #[cfg(not(target_family = "wasm"))]
1171 let (start_sender, start_receiver) = mpsc::channel();
1172 {
1173 #[cfg(not(target_family = "wasm"))]
1174 let network_state = Arc::clone(&network_state);
1175 let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
1176 handle.spawn_task(async move {
1177 #[cfg(not(target_family = "wasm"))]
1178 {
1179 let listen_addresses = {
1180 let mut addresses = config.listen_addresses.clone();
1181 if config.reuse_tcp_with_ws {
1182 let ws_listens = addresses
1183 .iter()
1184 .cloned()
1185 .filter_map(|mut addr| {
1186 if matches!(find_type(&addr), TransportType::Tcp) {
1187 addr.push(Protocol::Ws);
1188 Some(addr)
1189 } else {
1190 None
1191 }
1192 })
1193 .collect::<Vec<_>>();
1194
1195 addresses.extend(ws_listens);
1196 }
1197 let mut addresses = addresses
1198 .into_iter()
1199 .collect::<HashSet<_>>()
1200 .into_iter()
1201 .collect::<Vec<_>>();
1202 addresses.sort_by(|a, b| {
1203 let ty_a = find_type(a);
1204 let ty_b = find_type(b);
1205
1206 ty_a.cmp(&ty_b)
1207 });
1208
1209 addresses
1210 };
1211
1212 for addr in &listen_addresses {
1213 match p2p_service.listen(addr.to_owned()).await {
1214 Ok(listen_address) => {
1215 info!("Listen on address: {}", listen_address);
1216 network_state
1217 .listened_addrs
1218 .write()
1219 .push(listen_address.clone());
1220 }
1221 Err(err) => {
1222 warn!(
1223 "Listen on address {} failed, due to error: {}",
1224 addr.clone(),
1225 err
1226 );
1227 start_sender
1228 .send(Err(Error::P2P(P2PError::Transport(err))))
1229 .expect("channel abnormal shutdown");
1230 return;
1231 }
1232 };
1233 }
1234 start_sender.send(Ok(())).unwrap();
1235 }
1236
1237 p2p::runtime::spawn(async move { p2p_service.run().await });
1238 tokio::select! {
1239 _ = receiver.cancelled() => {
1240 info!("NetworkService receive exit signal, start shutdown...");
1241 let _ = p2p_control.shutdown().await;
1242 drop(bg_signals);
1244
1245 info!("NetworkService shutdown now");
1246 },
1247 else => {
1248 let _ = p2p_control.shutdown().await;
1249 drop(bg_signals);
1251 },
1252 }
1253 });
1254 }
1255 for (mut service, mut receiver) in bg_receivers {
1256 handle.spawn_task(async move {
1257 loop {
1258 tokio::select! {
1259 _ = &mut service => {},
1260 _ = &mut receiver => break
1261 }
1262 }
1263 });
1264 }
1265 #[cfg(not(target_family = "wasm"))]
1266 if let Ok(Err(e)) = start_receiver.recv() {
1267 return Err(e);
1268 }
1269
1270 Ok(NetworkController {
1271 version,
1272 network_state,
1273 p2p_control,
1274 ping_controller,
1275 })
1276 }
1277}
1278
1279#[derive(Clone)]
1281pub struct NetworkController {
1282 version: String,
1283 network_state: Arc<NetworkState>,
1284 p2p_control: ServiceControl,
1285 ping_controller: Option<Sender<()>>,
1286}
1287
1288impl NetworkController {
1289 pub fn init_ckb2023(&self) {
1291 self.network_state.ckb2023.store(true, Ordering::SeqCst);
1292 }
1293
1294 pub fn load_ckb2023(&self) -> bool {
1296 self.network_state.ckb2023.load(Ordering::SeqCst)
1297 }
1298
1299 pub fn public_urls(&self, max_urls: usize) -> Vec<(String, u8)> {
1301 self.network_state.public_urls(max_urls)
1302 }
1303
1304 pub fn version(&self) -> &String {
1306 &self.version
1307 }
1308
1309 pub fn node_id(&self) -> String {
1311 self.network_state.node_id()
1312 }
1313
1314 pub fn p2p_control(&self) -> &ServiceControl {
1316 &self.p2p_control
1317 }
1318
1319 pub fn add_node(&self, address: Multiaddr) {
1321 self.network_state.add_node(&self.p2p_control, address)
1322 }
1323
1324 pub fn remove_node(&self, peer_id: &PeerId) {
1326 if let Some(session_id) = self
1327 .network_state
1328 .peer_registry
1329 .read()
1330 .get_key_by_peer_id(peer_id)
1331 {
1332 if let Err(err) =
1333 disconnect_with_message(&self.p2p_control, session_id, "disconnect manually")
1334 {
1335 debug!("Disconnect failed {:?}, error: {:?}", session_id, err);
1336 }
1337 } else {
1338 error!("Cannot find peer {:?}", peer_id);
1339 }
1340 }
1341
1342 pub fn get_banned_addrs(&self) -> Vec<BannedAddr> {
1344 self.network_state
1345 .peer_store
1346 .lock()
1347 .ban_list()
1348 .get_banned_addrs()
1349 }
1350
1351 pub fn clear_banned_addrs(&self) {
1353 self.network_state.peer_store.lock().clear_ban_list();
1354 }
1355
1356 pub fn addr_info(&self, addr: &Multiaddr) -> Option<AddrInfo> {
1358 self.network_state
1359 .peer_store
1360 .lock()
1361 .addr_manager()
1362 .get(addr)
1363 .cloned()
1364 }
1365
1366 pub fn ban(&self, address: IpNetwork, ban_until: u64, ban_reason: String) {
1368 self.disconnect_peers_in_ip_range(address, &ban_reason);
1369 self.network_state
1370 .peer_store
1371 .lock()
1372 .ban_network(address, ban_until, ban_reason)
1373 }
1374
1375 pub fn unban(&self, address: &IpNetwork) {
1377 self.network_state
1378 .peer_store
1379 .lock()
1380 .mut_ban_list()
1381 .unban_network(address);
1382 }
1383
1384 pub fn connected_peers(&self) -> Vec<(PeerIndex, Peer)> {
1386 self.network_state.with_peer_registry(|reg| {
1387 reg.peers()
1388 .iter()
1389 .map(|(peer_index, peer)| (*peer_index, peer.clone()))
1390 .collect::<Vec<_>>()
1391 })
1392 }
1393
1394 pub fn ban_peer(&self, peer_index: PeerIndex, duration: Duration, reason: String) {
1396 self.network_state
1397 .ban_session(&self.p2p_control, peer_index, duration, reason);
1398 }
1399
1400 fn disconnect_peers_in_ip_range(&self, address: IpNetwork, reason: &str) {
1402 self.network_state.with_peer_registry(|reg| {
1403 reg.peers().iter().for_each(|(peer_index, peer)| {
1404 if let Some(addr) = multiaddr_to_socketaddr(&peer.connected_addr) {
1405 if address.contains(addr.ip()) {
1406 let _ = disconnect_with_message(
1407 &self.p2p_control,
1408 *peer_index,
1409 &format!("Ban peer {}, reason: {}", addr.ip(), reason),
1410 );
1411 }
1412 }
1413 })
1414 });
1415 }
1416
1417 fn try_broadcast(
1418 &self,
1419 quick: bool,
1420 target: Option<SessionId>,
1421 proto_id: ProtocolId,
1422 data: Bytes,
1423 ) -> Result<(), SendErrorKind> {
1424 let now = Instant::now();
1425 loop {
1426 let target = target
1427 .map(TargetSession::Single)
1428 .unwrap_or(TargetSession::All);
1429 let result = if quick {
1430 self.p2p_control
1431 .quick_filter_broadcast(target, proto_id, data.clone())
1432 } else {
1433 self.p2p_control
1434 .filter_broadcast(target, proto_id, data.clone())
1435 };
1436 match result {
1437 Ok(()) => {
1438 return Ok(());
1439 }
1440 Err(SendErrorKind::WouldBlock) => {
1441 if Instant::now().saturating_duration_since(now) > P2P_SEND_TIMEOUT {
1442 warn!("Broadcast message to {} timeout", proto_id);
1443 return Err(SendErrorKind::WouldBlock);
1444 }
1445 thread::sleep(P2P_TRY_SEND_INTERVAL);
1446 }
1447 Err(err) => {
1448 warn!("Broadcast message to {} failed: {:?}", proto_id, err);
1449 return Err(err);
1450 }
1451 }
1452 }
1453 }
1454
1455 pub fn broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1457 self.try_broadcast(false, None, proto_id, data)
1458 }
1459
1460 pub fn quick_broadcast(&self, proto_id: ProtocolId, data: Bytes) -> Result<(), SendErrorKind> {
1462 self.try_broadcast(true, None, proto_id, data)
1463 }
1464
1465 pub fn send_message_to(
1467 &self,
1468 session_id: SessionId,
1469 proto_id: ProtocolId,
1470 data: Bytes,
1471 ) -> Result<(), SendErrorKind> {
1472 self.try_broadcast(false, Some(session_id), proto_id, data)
1473 }
1474
1475 pub fn is_active(&self) -> bool {
1477 self.network_state.is_active()
1478 }
1479
1480 pub fn set_active(&self, active: bool) {
1482 self.network_state.active.store(active, Ordering::Release);
1483 }
1484
1485 pub fn protocols(&self) -> Vec<(ProtocolId, String, Vec<String>)> {
1487 self.network_state.protocols.read().clone()
1488 }
1489
1490 pub fn ping_peers(&self) {
1492 if let Some(mut ping_controller) = self.ping_controller.clone() {
1493 let _ignore = ping_controller.try_send(());
1494 }
1495 }
1496}
1497
1498pub(crate) fn disconnect_with_message(
1500 control: &ServiceControl,
1501 peer_index: SessionId,
1502 message: &str,
1503) -> Result<(), SendErrorKind> {
1504 if !message.is_empty() {
1505 let data = Bytes::from(message.as_bytes().to_vec());
1506 control.quick_send_message_to(
1508 peer_index,
1509 SupportProtocols::DisconnectMessage.protocol_id(),
1510 data,
1511 )?;
1512 }
1513 control.disconnect(peer_index)
1514}
1515
1516pub(crate) async fn async_disconnect_with_message(
1517 control: &ServiceAsyncControl,
1518 peer_index: SessionId,
1519 message: &str,
1520) -> Result<(), SendErrorKind> {
1521 if !message.is_empty() {
1522 let data = Bytes::from(message.as_bytes().to_vec());
1523 control
1525 .quick_send_message_to(
1526 peer_index,
1527 SupportProtocols::DisconnectMessage.protocol_id(),
1528 data,
1529 )
1530 .await?;
1531 }
1532 control.disconnect(peer_index).await
1533}
1534
1535#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
1537pub enum TransportType {
1538 Tcp,
1540 Ws,
1542 Wss,
1544}
1545
1546#[allow(dead_code)]
1547pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
1548 let mut iter = addr.iter();
1549
1550 iter.find_map(|proto| match proto {
1551 Protocol::Ws => Some(TransportType::Ws),
1552 Protocol::Wss => Some(TransportType::Wss),
1553 _ => None,
1554 })
1555 .unwrap_or(TransportType::Tcp)
1556}