1use std::{
5 sync::Arc,
6 time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use crate::{blocks::GossipBlock, rpc::net::NetInfoResult};
10use crate::{chain::ChainStore, utils::encoding::from_slice_with_fallback};
11use crate::{
12 libp2p_bitswap::{
13 BitswapStoreRead, BitswapStoreReadWrite, request_manager::BitswapRequestManager,
14 },
15 utils::flume::FlumeSenderExt as _,
16};
17use crate::{message::SignedMessage, networks::GenesisNetworkName};
18use ahash::{HashMap, HashSet};
19use anyhow::Context as _;
20use cid::Cid;
21use flume::Sender;
22use futures::{select, stream::StreamExt as _};
23use fvm_ipld_blockstore::Blockstore;
24pub use libp2p::gossipsub::{IdentTopic, Topic};
25use libp2p::{
26 PeerId, Swarm, SwarmBuilder,
27 autonat::NatStatus,
28 connection_limits::Exceeded,
29 core::Multiaddr,
30 gossipsub, identify,
31 identity::Keypair,
32 metrics::{Metrics, Recorder},
33 multiaddr::Protocol,
34 noise, ping, request_response,
35 swarm::{DialError, SwarmEvent},
36 tcp, yamux,
37};
38use nonzero_ext::nonzero;
39use tokio_stream::wrappers::IntervalStream;
40use tracing::{debug, error, info, trace, warn};
41
42use super::{
43 ForestBehaviour, ForestBehaviourEvent, Libp2pConfig,
44 chain_exchange::{ChainExchangeRequest, ChainExchangeResponse, make_chain_exchange_response},
45 discovery::{DerivedDiscoveryBehaviourEvent, PeerInfo},
46};
47use crate::libp2p::{
48 PeerManager, PeerOperation,
49 chain_exchange::ChainExchangeBehaviour,
50 discovery::DiscoveryEvent,
51 hello::{HelloBehaviour, HelloRequest, HelloResponse},
52 rpc::RequestResponseError,
53};
54
55pub(in crate::libp2p) mod metrics {
56 use prometheus_client::metrics::{family::Family, gauge::Gauge};
57 use std::sync::LazyLock;
58
59 use crate::metrics::KindLabel;
60
61 pub static NETWORK_CONTAINER_CAPACITIES: LazyLock<Family<KindLabel, Gauge>> = {
62 LazyLock::new(|| {
63 let metric = Family::default();
64 crate::metrics::default_registry().register(
65 "network_container_capacities",
66 "Capacity for each container",
67 metric.clone(),
68 );
69 metric
70 })
71 };
72
73 pub mod values {
74 use crate::metrics::KindLabel;
75
76 pub const HELLO_REQUEST_TABLE: KindLabel = KindLabel::new("hello_request_table");
77 pub const CHAIN_EXCHANGE_REQUEST_TABLE: KindLabel = KindLabel::new("cx_request_table");
78 }
79}
80
81crate::def_is_env_truthy!(libp2p_metrics_enabled, "FOREST_LIBP2P_METRICS_ENABLED");
82
83pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks";
85pub const PUBSUB_MSG_STR: &str = "/fil/msgs";
87
88const PUBSUB_TOPICS: [&str; 2] = [PUBSUB_BLOCK_STR, PUBSUB_MSG_STR];
89
90pub const BITSWAP_TIMEOUT: Duration = Duration::from_secs(30);
91
92#[allow(clippy::large_enum_variant)]
94#[derive(Debug)]
95pub enum NetworkEvent {
96 PubsubMessage {
97 message: PubsubMessage,
98 },
99 HelloRequestInbound,
100 HelloResponseOutbound {
101 source: PeerId,
102 request: HelloRequest,
103 },
104 HelloRequestOutbound,
105 HelloResponseInbound,
106 ChainExchangeRequestOutbound,
107 ChainExchangeResponseInbound,
108 ChainExchangeRequestInbound,
109 ChainExchangeResponseOutbound,
110 PeerConnected(PeerId),
111 PeerDisconnected(PeerId),
112}
113
114#[allow(clippy::large_enum_variant)]
116#[derive(Debug, Clone)]
117pub enum PubsubMessage {
118 Block(GossipBlock),
120 Message(SignedMessage),
122}
123
124#[derive(Debug)]
126pub enum NetworkMessage {
127 PubsubMessage {
128 topic: IdentTopic,
129 message: Vec<u8>,
130 },
131 ChainExchangeRequest {
132 peer_id: PeerId,
133 request: ChainExchangeRequest,
134 response_channel: flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
135 },
136 HelloRequest {
137 peer_id: PeerId,
138 request: HelloRequest,
139 response_channel: flume::Sender<HelloResponse>,
140 },
141 BitswapRequest {
142 cid: Cid,
143 response_channel: flume::Sender<bool>,
144 },
145 JSONRPCRequest {
146 method: NetRPCMethods,
147 },
148}
149
150#[derive(Debug)]
152pub enum NetRPCMethods {
153 AddrsListen(flume::Sender<(PeerId, HashSet<Multiaddr>)>),
154 Peer(flume::Sender<Option<HashSet<Multiaddr>>>, PeerId),
155 Peers(flume::Sender<HashMap<PeerId, HashSet<Multiaddr>>>),
156 ProtectPeer(flume::Sender<()>, HashSet<PeerId>),
157 UnprotectPeer(flume::Sender<()>, HashSet<PeerId>),
158 ListProtectedPeers(flume::Sender<HashSet<PeerId>>),
159 Info(flume::Sender<NetInfoResult>),
160 Connect(flume::Sender<bool>, PeerId, HashSet<Multiaddr>),
161 Disconnect(flume::Sender<()>, PeerId),
162 AgentVersion(flume::Sender<Option<String>>, PeerId),
163 AutoNATStatus(flume::Sender<NatStatus>),
164}
165
166pub struct Libp2pService<DB> {
168 swarm: Swarm<ForestBehaviour>,
169 bootstrap_peers: HashMap<PeerId, Multiaddr>,
170 cs: Arc<ChainStore<DB>>,
171 peer_manager: Arc<PeerManager>,
172 network_receiver_in: flume::Receiver<NetworkMessage>,
173 network_sender_in: Sender<NetworkMessage>,
174 network_receiver_out: flume::Receiver<NetworkEvent>,
175 network_sender_out: Sender<NetworkEvent>,
176 network_name: String,
177 genesis_cid: Cid,
178}
179
180impl<DB> Libp2pService<DB>
181where
182 DB: Blockstore + BitswapStoreReadWrite + Sync + Send + 'static,
183{
184 pub async fn new(
185 config: Libp2pConfig,
186 cs: Arc<ChainStore<DB>>,
187 peer_manager: Arc<PeerManager>,
188 net_keypair: Keypair,
189 network_name: GenesisNetworkName,
190 genesis_cid: Cid,
191 ) -> anyhow::Result<Self> {
192 let behaviour =
193 ForestBehaviour::new(&net_keypair, &config, &network_name, peer_manager.clone())
194 .await?;
195 let mut swarm = SwarmBuilder::with_existing_identity(net_keypair)
196 .with_tokio()
197 .with_tcp(
198 tcp::Config::default().nodelay(true),
199 noise::Config::new,
200 yamux::Config::default,
201 )?
202 .with_quic()
203 .with_dns()?
204 .with_bandwidth_metrics(&mut crate::metrics::collector_registry())
205 .with_behaviour(|_| behaviour)?
206 .with_swarm_config(|config| {
207 config
208 .with_notify_handler_buffer_size(nonzero!(20usize))
209 .with_per_connection_event_buffer_size(64)
210 .with_idle_connection_timeout(Duration::from_secs(60 * 10))
211 })
212 .build();
213
214 for topic in PUBSUB_TOPICS.iter() {
216 let t = Topic::new(format!("{topic}/{network_name}"));
217 swarm
218 .behaviour_mut()
219 .subscribe(&t)
220 .with_context(|| format!("Failed to subscribe gossipsub topic {t}"))?;
221 }
222
223 let (network_sender_in, network_receiver_in) = flume::unbounded();
224 let (network_sender_out, network_receiver_out) = flume::unbounded();
225
226 info!("p2p network peer id: {}", swarm.local_peer_id());
230
231 for addr in &config.listening_multiaddrs {
233 match swarm.listen_on(addr.clone()) {
234 Ok(id) => loop {
235 if let SwarmEvent::NewListenAddr {
236 address,
237 listener_id,
238 } = swarm.select_next_some().await
239 && id == listener_id
240 {
241 info!("p2p peer is now listening on: {address}");
242 break;
243 }
244 },
245 Err(err) => error!("Fail to listen on {addr}: {err}"),
246 }
247 }
248
249 if swarm.listeners().count() == 0 {
250 anyhow::bail!("p2p peer failed to listen on any network endpoints");
251 }
252
253 let bootstrap_peers = config
254 .bootstrap_peers
255 .iter()
256 .filter_map(|ma| match ma.iter().last() {
257 Some(Protocol::P2p(peer)) => Some((peer, ma.clone())),
258 _ => None,
259 })
260 .collect();
261
262 Ok(Libp2pService {
263 swarm,
264 bootstrap_peers,
265 cs,
266 peer_manager,
267 network_receiver_in,
268 network_sender_in,
269 network_receiver_out,
270 network_sender_out,
271 network_name: network_name.into(),
272 genesis_cid,
273 })
274 }
275
276 pub async fn run(mut self) -> anyhow::Result<()> {
279 info!("Running libp2p service");
280
281 if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
283 warn!("Failed to bootstrap with Kademlia: {e:#}");
284 }
285
286 let bitswap_request_manager = self.swarm.behaviour().bitswap.request_manager();
287 let mut swarm_stream = self.swarm.fuse();
288 let mut network_stream = self.network_receiver_in.stream().fuse();
289 let mut interval =
290 IntervalStream::new(tokio::time::interval(Duration::from_secs(15))).fuse();
291 let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, self.network_name);
292 let pubsub_msg_str = format!("{}/{}", PUBSUB_MSG_STR, self.network_name);
293
294 let (cx_response_tx, cx_response_rx) = flume::unbounded();
295
296 let mut cx_response_rx_stream = cx_response_rx.stream().fuse();
297 let mut bitswap_outbound_request_stream =
298 bitswap_request_manager.outbound_request_stream().fuse();
299 let mut peer_ops_rx_stream = self.peer_manager.peer_ops_rx().stream().fuse();
300 let metrics = if libp2p_metrics_enabled() {
301 Some(Metrics::new(&mut crate::metrics::collector_registry()))
302 } else {
303 None
304 };
305
306 const BOOTSTRAP_PEER_DIALER_INTERVAL: tokio::time::Duration =
307 tokio::time::Duration::from_secs(60);
308 let mut bootstrap_peer_dialer_interval_stream =
309 IntervalStream::new(tokio::time::interval_at(
310 tokio::time::Instant::now() + BOOTSTRAP_PEER_DIALER_INTERVAL,
311 BOOTSTRAP_PEER_DIALER_INTERVAL,
312 ))
313 .fuse();
314 loop {
315 select! {
316 swarm_event = swarm_stream.next() => match swarm_event {
317 Some(SwarmEvent::Behaviour(event)) => {
319 if let Some(m) = &metrics {
320 m.record(&event);
321 }
322 handle_forest_behaviour_event(
323 swarm_stream.get_mut(),
324 &bitswap_request_manager,
325 &self.peer_manager,
326 event,
327 &self.cs,
328 &self.genesis_cid,
329 &self.network_sender_out,
330 cx_response_tx.clone(),
331 &pubsub_block_str,
332 &pubsub_msg_str,).await;
333 },
334 None => { break; },
335 _ => { },
336 },
337 rpc_message = network_stream.next() => match rpc_message {
338 Some(message) => {
340 handle_network_message(
341 swarm_stream.get_mut(),
342 self.cs.clone(),
343 bitswap_request_manager.clone(),
344 message,
345 &self.network_sender_out,
346 &self.peer_manager).await;
347 }
348 None => { break; }
349 },
350 interval_event = interval.next() => if interval_event.is_some() {
351 trace!("Peers connected: {}", swarm_stream.get_mut().behaviour_mut().peers().len());
353 },
354 cs_pair_opt = cx_response_rx_stream.next() => {
355 if let Some((_request_id, channel, cx_response)) = cs_pair_opt {
356 let behaviour = swarm_stream.get_mut().behaviour_mut();
357 if let Err(e) = behaviour.chain_exchange.send_response(channel, cx_response) {
358 debug!("Error sending chain exchange response: {e:?}");
359 }
360 }
361 },
362 bitswap_outbound_request_opt = bitswap_outbound_request_stream.next() => {
363 if let Some((peer, request)) = bitswap_outbound_request_opt {
364 let bitswap = &mut swarm_stream.get_mut().behaviour_mut().bitswap;
365 bitswap.send_request(&peer, request);
366 }
367 }
368 peer_ops_opt = peer_ops_rx_stream.next() => {
369 if let Some(peer_ops) = peer_ops_opt {
370 handle_peer_ops(swarm_stream.get_mut(), peer_ops, &self.bootstrap_peers);
371 }
372 },
373 _ = bootstrap_peer_dialer_interval_stream.next() => {
374 dial_to_bootstrap_peers_if_needed(swarm_stream.get_mut(), &self.bootstrap_peers);
375 }
376 };
377 }
378 Ok(())
379 }
380
381 pub fn network_sender(&self) -> Sender<NetworkMessage> {
383 self.network_sender_in.clone()
384 }
385
386 pub fn network_receiver(&self) -> flume::Receiver<NetworkEvent> {
388 self.network_receiver_out.clone()
389 }
390
391 pub fn peer_manager(&self) -> &Arc<PeerManager> {
392 &self.peer_manager
393 }
394}
395
396fn dial_to_bootstrap_peers_if_needed(
397 swarm: &mut Swarm<ForestBehaviour>,
398 bootstrap_peers: &HashMap<PeerId, Multiaddr>,
399) {
400 for (peer, ma) in bootstrap_peers {
401 if !swarm.behaviour().peers().contains(peer) {
402 info!("Re-dialing to bootstrap peer at {ma}");
403 if let Err(e) = swarm.dial(ma.clone()) {
404 warn!("{e}");
405 }
406 }
407 }
408}
409
410fn handle_peer_ops(
411 swarm: &mut Swarm<ForestBehaviour>,
412 peer_ops: PeerOperation,
413 bootstrap_peers: &HashMap<PeerId, Multiaddr>,
414) {
415 use PeerOperation::*;
416 match peer_ops {
417 Ban {
418 peer,
419 user_agent,
420 reason,
421 } => {
422 if !bootstrap_peers.contains_key(&peer) {
424 let user_agent = user_agent.unwrap_or_default();
425 debug!(%peer, %user_agent, %reason, "Banning peer");
426 swarm.behaviour_mut().blocked_peers.block_peer(peer);
427 }
428 }
429 Unban(peer) => {
430 debug!(%peer, "Unbanning peer");
431 swarm.behaviour_mut().blocked_peers.unblock_peer(peer);
432 }
433 }
434}
435
436async fn handle_network_message(
437 swarm: &mut Swarm<ForestBehaviour>,
438 store: Arc<impl BitswapStoreReadWrite>,
439 bitswap_request_manager: Arc<BitswapRequestManager>,
440 message: NetworkMessage,
441 network_sender_out: &Sender<NetworkEvent>,
442 peer_manager: &PeerManager,
443) {
444 match message {
445 NetworkMessage::PubsubMessage { topic, message } => {
446 match swarm.behaviour_mut().publish(topic, message) {
447 Ok(_) => (),
448 Err(gossipsub::PublishError::Duplicate) => {
449 debug!("Failed to send gossipsub message: duplicate");
452 }
453 Err(e) => {
454 warn!("Failed to send gossipsub message: {e:#}");
455 }
456 }
457 }
458 NetworkMessage::HelloRequest {
459 peer_id,
460 request,
461 response_channel,
462 } => {
463 let _request_id =
464 swarm
465 .behaviour_mut()
466 .hello
467 .send_request(&peer_id, request, response_channel);
468 emit_event(network_sender_out, NetworkEvent::HelloRequestOutbound).await;
469 }
470 NetworkMessage::ChainExchangeRequest {
471 peer_id,
472 request,
473 response_channel,
474 } => {
475 let _request_id = swarm.behaviour_mut().chain_exchange.send_request(
476 &peer_id,
477 request,
478 response_channel,
479 );
480 emit_event(
481 network_sender_out,
482 NetworkEvent::ChainExchangeRequestOutbound,
483 )
484 .await;
485 }
486 NetworkMessage::BitswapRequest {
487 cid,
488 response_channel,
489 } => {
490 bitswap_request_manager.get_block(
491 store,
492 cid,
493 BITSWAP_TIMEOUT,
494 Some(response_channel),
495 None,
496 );
497 }
498 NetworkMessage::JSONRPCRequest { method } => {
499 match method {
500 NetRPCMethods::AddrsListen(response_channel) => {
501 let listeners = Swarm::listeners(swarm).cloned().collect();
502 let peer_id = Swarm::local_peer_id(swarm);
503 response_channel.send_or_warn((*peer_id, listeners));
504 }
505 NetRPCMethods::Peer(response_channel, peer) => {
506 let addresses = swarm.behaviour().peer_addresses().get(&peer).cloned();
507 response_channel.send_or_warn(addresses);
508 }
509 NetRPCMethods::Peers(response_channel) => {
510 let peer_addresses = swarm.behaviour().peer_addresses();
511 response_channel.send_or_warn(peer_addresses);
512 }
513 NetRPCMethods::ProtectPeer(tx, peer_ids) => {
514 peer_ids.into_iter().for_each(|peer_id| {
515 peer_manager.protect_peer(peer_id);
516 });
517 tx.send_or_warn(());
518 }
519 NetRPCMethods::ListProtectedPeers(tx) => {
520 tx.send_or_warn(peer_manager.list_protected_peers());
521 }
522 NetRPCMethods::UnprotectPeer(tx, peer_ids) => {
523 peer_ids.iter().for_each(|peer_id| {
524 peer_manager.unprotect_peer(peer_id);
525 });
526 tx.send_or_warn(());
527 }
528 NetRPCMethods::Info(response_channel) => {
529 response_channel.send_or_warn(swarm.network_info().into());
530 }
531 NetRPCMethods::Connect(response_channel, peer_id, addresses) => {
532 let mut success = false;
533 for mut multiaddr in addresses {
534 multiaddr.push(Protocol::P2p(peer_id));
535
536 match Swarm::dial(swarm, multiaddr.clone()) {
537 Ok(_) => {
538 info!("Dialed {multiaddr}");
539 success = true;
540 break;
541 }
542 Err(e) => {
543 match e {
544 DialError::Denied { cause } => {
545 if let Some(cause) = cause.downcast_ref::<Exceeded>() {
547 error!(
548 "Denied dialing (limits exceeded) {multiaddr}: {cause}"
549 );
550 } else {
551 error!("Denied dialing {multiaddr}: {cause}")
552 }
553 }
554 e => {
555 error!("Failed to dial {multiaddr}: {e}");
556 }
557 };
558 }
559 };
560 }
561
562 response_channel.send_or_warn(success);
563 }
564 NetRPCMethods::Disconnect(response_channel, peer_id) => {
565 let _ = Swarm::disconnect_peer_id(swarm, peer_id);
566 response_channel.send_or_warn(());
567 }
568 NetRPCMethods::AgentVersion(response_channel, peer_id) => {
569 let agent_version = swarm.behaviour().peer_info(&peer_id).and_then(|info| {
570 info.identify_info
571 .as_ref()
572 .map(|id| id.agent_version.clone())
573 });
574 response_channel.send_or_warn(agent_version);
575 }
576 NetRPCMethods::AutoNATStatus(response_channel) => {
577 let nat_status = swarm.behaviour().discovery.nat_status();
578 response_channel.send_or_warn(nat_status);
579 }
580 }
581 }
582 }
583}
584
585async fn handle_discovery_event(
586 peer_info_map: &HashMap<PeerId, PeerInfo>,
587 discovery_out: DiscoveryEvent,
588 network_sender_out: &Sender<NetworkEvent>,
589 peer_manager: &PeerManager,
590) {
591 match discovery_out {
592 DiscoveryEvent::PeerConnected(peer_id) => {
593 trace!("Peer connected, {peer_id}");
594 emit_event(network_sender_out, NetworkEvent::PeerConnected(peer_id)).await;
595 }
596 DiscoveryEvent::PeerDisconnected(peer_id) => {
597 trace!("Peer disconnected, {peer_id}");
598 emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await;
599 }
600 DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
601 DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
602 peer_id,
603 info,
604 ..
605 }) => {
606 let protocols = HashSet::from_iter(info.protocols.iter().map(|p| p.to_string()));
607 if !protocols.contains(super::hello::HELLO_PROTOCOL_NAME) {
608 peer_manager
609 .ban_peer_with_default_duration(
610 *peer_id,
611 "hello protocol unsupported",
612 |p| get_user_agent(peer_info_map, p),
613 )
614 .await;
615 } else if !protocols.contains(super::chain_exchange::CHAIN_EXCHANGE_PROTOCOL_NAME) {
616 peer_manager
617 .ban_peer_with_default_duration(
618 *peer_id,
619 "chain exchange protocol unsupported",
620 |p| get_user_agent(peer_info_map, p),
621 )
622 .await;
623 }
624 }
625 DerivedDiscoveryBehaviourEvent::Identify(_) => {}
626 _ => {}
627 },
628 }
629}
630
631async fn handle_gossip_event(
632 e: gossipsub::Event,
633 network_sender_out: &Sender<NetworkEvent>,
634 pubsub_block_str: &str,
635 pubsub_msg_str: &str,
636) {
637 if let gossipsub::Event::Message {
638 propagation_source: source,
639 message,
640 ..
641 } = e
642 {
643 let topic = message.topic.as_str();
644 let message = message.data;
645 trace!("Got a Gossip Message from {:?}", source);
646 if topic == pubsub_block_str {
647 match from_slice_with_fallback::<GossipBlock>(&message) {
648 Ok(b) => {
649 emit_event(
650 network_sender_out,
651 NetworkEvent::PubsubMessage {
652 message: PubsubMessage::Block(b),
653 },
654 )
655 .await;
656 }
657 Err(e) => {
658 warn!("Gossip Block from peer {source:?} could not be deserialized: {e:#}",);
659 }
660 }
661 } else if topic == pubsub_msg_str {
662 match from_slice_with_fallback::<SignedMessage>(&message) {
663 Ok(m) => {
664 emit_event(
665 network_sender_out,
666 NetworkEvent::PubsubMessage {
667 message: PubsubMessage::Message(m),
668 },
669 )
670 .await;
671 }
672 Err(e) => {
673 warn!("Gossip Message from peer {source:?} could not be deserialized: {e:#}");
674 }
675 }
676 } else {
677 warn!("Getting gossip messages from unknown topic: {topic}");
678 }
679 }
680}
681
682async fn handle_hello_event(
683 peer_info_map: &HashMap<PeerId, PeerInfo>,
684 hello: &mut HelloBehaviour,
685 event: request_response::Event<HelloRequest, HelloResponse, HelloResponse>,
686 peer_manager: &PeerManager,
687 genesis_cid: &Cid,
688 network_sender_out: &Sender<NetworkEvent>,
689) {
690 match event {
691 request_response::Event::Message { peer, message, .. } => match message {
692 request_response::Message::Request {
693 request, channel, ..
694 } => {
695 emit_event(network_sender_out, NetworkEvent::HelloRequestInbound).await;
696
697 let arrival = SystemTime::now()
698 .duration_since(UNIX_EPOCH)
699 .expect("System time before unix epoch")
700 .as_nanos()
701 .try_into()
702 .expect("System time since unix epoch should not exceed u64");
703
704 trace!("Received hello request: {:?}", request);
705 if &request.genesis_cid != genesis_cid {
706 peer_manager
707 .ban_peer_with_default_duration(
708 peer,
709 format!(
710 "Genesis hash mismatch: {} received, {genesis_cid} expected",
711 request.genesis_cid
712 ),
713 |p| get_user_agent(peer_info_map, p),
714 )
715 .await;
716 } else {
717 let sent = SystemTime::now()
718 .duration_since(UNIX_EPOCH)
719 .expect("System time before unix epoch")
720 .as_nanos()
721 .try_into()
722 .expect("System time since unix epoch should not exceed u64");
723
724 if let Err(e) = hello.send_response(channel, HelloResponse { arrival, sent }) {
727 warn!("Failed to send HelloResponse: {e:?}");
728 } else {
729 emit_event(
730 network_sender_out,
731 NetworkEvent::HelloResponseOutbound {
732 source: peer,
733 request,
734 },
735 )
736 .await;
737 }
738 }
739 }
740 request_response::Message::Response {
741 request_id,
742 response,
743 } => {
744 emit_event(network_sender_out, NetworkEvent::HelloResponseInbound).await;
745 hello.handle_response(&request_id, response).await;
746 }
747 },
748 request_response::Event::OutboundFailure {
749 request_id,
750 peer,
751 error,
752 ..
753 } => {
754 hello.on_outbound_failure(&request_id);
755 match error {
756 request_response::OutboundFailure::UnsupportedProtocols => {
757 peer_manager
758 .ban_peer_with_default_duration(peer, "Hello protocol unsupported", |p| {
759 get_user_agent(peer_info_map, p)
760 })
761 .await;
762 }
763 _ => {
764 peer_manager.mark_peer_bad(peer, format!("Hello outbound failure {error}"));
765 }
766 }
767 }
768 request_response::Event::InboundFailure { .. } => {}
769 request_response::Event::ResponseSent { .. } => (),
770 }
771}
772
773async fn handle_ping_event(ping_event: ping::Event) {
774 match ping_event.result {
775 Ok(rtt) => {
776 trace!(
777 "PingSuccess::Ping rtt to {} is {} ms",
778 ping_event.peer,
779 rtt.as_millis()
780 );
781 }
782 Err(ping::Failure::Unsupported) => {
783 debug!(peer=%ping_event.peer, "Ping protocol unsupported");
784 }
785 Err(ping::Failure::Timeout) => {
786 debug!("Ping timeout: {}", ping_event.peer);
787 }
788 Err(ping::Failure::Other { error }) => {
789 debug!("Ping failure: {error}");
790 }
791 }
792}
793
794async fn handle_chain_exchange_event<DB>(
795 chain_exchange: &mut ChainExchangeBehaviour,
796 ce_event: request_response::Event<ChainExchangeRequest, ChainExchangeResponse>,
797 db: &Arc<ChainStore<DB>>,
798 network_sender_out: &Sender<NetworkEvent>,
799 cx_response_tx: Sender<(
800 request_response::InboundRequestId,
801 request_response::ResponseChannel<ChainExchangeResponse>,
802 ChainExchangeResponse,
803 )>,
804) where
805 DB: Blockstore + Sync + Send + 'static,
806{
807 match ce_event {
808 request_response::Event::Message { peer, message, .. } => match message {
809 request_response::Message::Request {
810 request,
811 channel,
812 request_id,
813 } => {
814 let Some(per_peer_permit) = chain_exchange.try_acquire_peer_permit(peer) else {
815 debug!("Rejecting chain_exchange request from {peer}: per-peer cap reached");
816 let _ = chain_exchange.send_response(
817 channel,
818 ChainExchangeResponse::go_away("per-peer concurrent request cap reached"),
819 );
820 return;
821 };
822 let Some(global_permit) = chain_exchange.try_acquire_request_permit() else {
823 debug!("Rejecting chain_exchange request from {peer}: global cap reached");
824 let _ = chain_exchange.send_response(
825 channel,
826 ChainExchangeResponse::go_away("global concurrent request cap reached"),
827 );
828 return;
829 };
830
831 trace!(
832 "Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})",
833 );
834 emit_event(
835 network_sender_out,
836 NetworkEvent::ChainExchangeRequestInbound,
837 )
838 .await;
839
840 let db = db.clone();
841 tokio::task::spawn(async move {
842 let _per_peer_permit = per_peer_permit;
843 let _global_permit = global_permit;
844 if let Err(e) = cx_response_tx.send((
845 request_id,
846 channel,
847 make_chain_exchange_response(&db, &request),
848 )) {
849 debug!("Failed to send ChainExchangeResponse: {e:?}");
850 }
851 });
852 }
853 request_response::Message::Response {
854 request_id,
855 response,
856 } => {
857 emit_event(
858 network_sender_out,
859 NetworkEvent::ChainExchangeResponseInbound,
860 )
861 .await;
862 chain_exchange
863 .handle_inbound_response(&request_id, response)
864 .await;
865 }
866 },
867 request_response::Event::OutboundFailure {
868 request_id, error, ..
869 } => {
870 chain_exchange.on_outbound_error(&request_id, error);
871 }
872 request_response::Event::InboundFailure { peer, error, .. } => {
873 debug!(
874 "ChainExchange inbound error (peer: {:?}): {:?}",
875 peer, error
876 );
877 }
878 request_response::Event::ResponseSent { .. } => {
879 emit_event(
880 network_sender_out,
881 NetworkEvent::ChainExchangeResponseOutbound,
882 )
883 .await;
884 }
885 }
886}
887
888#[allow(clippy::too_many_arguments)]
889async fn handle_forest_behaviour_event<DB>(
890 swarm: &mut Swarm<ForestBehaviour>,
891 bitswap_request_manager: &Arc<BitswapRequestManager>,
892 peer_manager: &PeerManager,
893 event: ForestBehaviourEvent,
894 db: &Arc<ChainStore<DB>>,
895 genesis_cid: &Cid,
896 network_sender_out: &Sender<NetworkEvent>,
897 cx_response_tx: Sender<(
898 request_response::InboundRequestId,
899 request_response::ResponseChannel<ChainExchangeResponse>,
900 ChainExchangeResponse,
901 )>,
902 pubsub_block_str: &str,
903 pubsub_msg_str: &str,
904) where
905 DB: Blockstore + BitswapStoreRead + Sync + Send + 'static,
906{
907 match event {
908 ForestBehaviourEvent::Discovery(discovery_out) => {
909 handle_discovery_event(
910 &swarm.behaviour().discovery.peer_info,
911 discovery_out,
912 network_sender_out,
913 peer_manager,
914 )
915 .await
916 }
917 ForestBehaviourEvent::Gossipsub(e) => {
918 handle_gossip_event(e, network_sender_out, pubsub_block_str, pubsub_msg_str).await
919 }
920 ForestBehaviourEvent::Hello(rr_event) => {
921 let behaviour_mut = swarm.behaviour_mut();
922 handle_hello_event(
923 &behaviour_mut.discovery.peer_info,
924 &mut behaviour_mut.hello,
925 rr_event,
926 peer_manager,
927 genesis_cid,
928 network_sender_out,
929 )
930 .await
931 }
932 ForestBehaviourEvent::Bitswap(event) => {
933 if let Err(e) = bitswap_request_manager.handle_event(
934 &mut swarm.behaviour_mut().bitswap,
935 db.blockstore(),
936 event,
937 ) {
938 warn!("bitswap: {e:#}");
939 }
940 }
941 ForestBehaviourEvent::Ping(ping_event) => handle_ping_event(ping_event).await,
942 ForestBehaviourEvent::ConnectionLimits(_) => {}
943 ForestBehaviourEvent::BlockedPeers(_) => {}
944 ForestBehaviourEvent::ChainExchange(ce_event) => {
945 handle_chain_exchange_event(
946 &mut swarm.behaviour_mut().chain_exchange,
947 ce_event,
948 db,
949 network_sender_out,
950 cx_response_tx,
951 )
952 .await
953 }
954 }
955}
956
957async fn emit_event(sender: &Sender<NetworkEvent>, event: NetworkEvent) {
958 if sender.send_async(event).await.is_err() {
959 error!("Failed to emit event: Network channel receiver has been dropped");
960 }
961}
962
963fn get_user_agent(peer_info_map: &HashMap<PeerId, PeerInfo>, peer: &PeerId) -> Option<String> {
964 peer_info_map
965 .get(peer)
966 .and_then(|i| i.identify_info.as_ref())
967 .map(|i| i.agent_version.clone())
968}