1use std::{
5 sync::Arc,
6 time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use crate::{blocks::GossipBlock, rpc::net::NetInfoResult};
10use crate::{chain::ChainStore, utils::encoding::from_slice_with_fallback};
11use crate::{
12 libp2p_bitswap::{
13 BitswapStoreRead, BitswapStoreReadWrite, request_manager::BitswapRequestManager,
14 },
15 utils::flume::FlumeSenderExt as _,
16};
17use crate::{message::SignedMessage, networks::GenesisNetworkName};
18use ahash::{HashMap, HashSet};
19use anyhow::Context as _;
20use cid::Cid;
21use flume::Sender;
22use futures::{select, stream::StreamExt as _};
23use fvm_ipld_blockstore::Blockstore;
24pub use libp2p::gossipsub::{IdentTopic, Topic};
25use libp2p::{
26 PeerId, Swarm, SwarmBuilder,
27 autonat::NatStatus,
28 connection_limits::Exceeded,
29 core::Multiaddr,
30 gossipsub, identify,
31 identity::Keypair,
32 metrics::{Metrics, Recorder},
33 multiaddr::Protocol,
34 noise, ping, request_response,
35 swarm::{DialError, SwarmEvent},
36 tcp, yamux,
37};
38use nonzero_ext::nonzero;
39use tokio_stream::wrappers::IntervalStream;
40use tracing::{debug, error, info, trace, warn};
41
42use super::{
43 ForestBehaviour, ForestBehaviourEvent, Libp2pConfig,
44 chain_exchange::{ChainExchangeRequest, ChainExchangeResponse, make_chain_exchange_response},
45 discovery::{DerivedDiscoveryBehaviourEvent, PeerInfo},
46};
47use crate::libp2p::{
48 PeerManager, PeerOperation,
49 chain_exchange::ChainExchangeBehaviour,
50 discovery::DiscoveryEvent,
51 hello::{HelloBehaviour, HelloRequest, HelloResponse},
52 rpc::RequestResponseError,
53};
54
55pub(in crate::libp2p) mod metrics {
56 use prometheus_client::metrics::{family::Family, gauge::Gauge};
57 use std::sync::LazyLock;
58
59 use crate::metrics::KindLabel;
60
61 pub static NETWORK_CONTAINER_CAPACITIES: LazyLock<Family<KindLabel, Gauge>> = {
62 LazyLock::new(|| {
63 let metric = Family::default();
64 crate::metrics::default_registry().register(
65 "network_container_capacities",
66 "Capacity for each container",
67 metric.clone(),
68 );
69 metric
70 })
71 };
72
73 pub mod values {
74 use crate::metrics::KindLabel;
75
76 pub const HELLO_REQUEST_TABLE: KindLabel = KindLabel::new("hello_request_table");
77 pub const CHAIN_EXCHANGE_REQUEST_TABLE: KindLabel = KindLabel::new("cx_request_table");
78 }
79}
80
81fn libp2p_metrics_enabled() -> bool {
82 crate::utils::misc::env::is_env_truthy("FOREST_LIBP2P_METRICS_ENABLED")
83}
84
85pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks";
87pub const PUBSUB_MSG_STR: &str = "/fil/msgs";
89
90const PUBSUB_TOPICS: [&str; 2] = [PUBSUB_BLOCK_STR, PUBSUB_MSG_STR];
91
92pub const BITSWAP_TIMEOUT: Duration = Duration::from_secs(30);
93
94#[allow(clippy::large_enum_variant)]
96#[derive(Debug)]
97pub enum NetworkEvent {
98 PubsubMessage {
99 message: PubsubMessage,
100 },
101 HelloRequestInbound,
102 HelloResponseOutbound {
103 source: PeerId,
104 request: HelloRequest,
105 },
106 HelloRequestOutbound,
107 HelloResponseInbound,
108 ChainExchangeRequestOutbound,
109 ChainExchangeResponseInbound,
110 ChainExchangeRequestInbound,
111 ChainExchangeResponseOutbound,
112 PeerConnected(PeerId),
113 PeerDisconnected(PeerId),
114}
115
116#[allow(clippy::large_enum_variant)]
118#[derive(Debug, Clone)]
119pub enum PubsubMessage {
120 Block(GossipBlock),
122 Message(SignedMessage),
124}
125
126#[derive(Debug)]
128pub enum NetworkMessage {
129 PubsubMessage {
130 topic: IdentTopic,
131 message: Vec<u8>,
132 },
133 ChainExchangeRequest {
134 peer_id: PeerId,
135 request: ChainExchangeRequest,
136 response_channel: flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
137 },
138 HelloRequest {
139 peer_id: PeerId,
140 request: HelloRequest,
141 response_channel: flume::Sender<HelloResponse>,
142 },
143 BitswapRequest {
144 cid: Cid,
145 response_channel: flume::Sender<bool>,
146 },
147 JSONRPCRequest {
148 method: NetRPCMethods,
149 },
150}
151
152#[derive(Debug)]
154pub enum NetRPCMethods {
155 AddrsListen(flume::Sender<(PeerId, HashSet<Multiaddr>)>),
156 Peer(flume::Sender<Option<HashSet<Multiaddr>>>, PeerId),
157 Peers(flume::Sender<HashMap<PeerId, HashSet<Multiaddr>>>),
158 ProtectPeer(flume::Sender<()>, HashSet<PeerId>),
159 UnprotectPeer(flume::Sender<()>, HashSet<PeerId>),
160 ListProtectedPeers(flume::Sender<HashSet<PeerId>>),
161 Info(flume::Sender<NetInfoResult>),
162 Connect(flume::Sender<bool>, PeerId, HashSet<Multiaddr>),
163 Disconnect(flume::Sender<()>, PeerId),
164 AgentVersion(flume::Sender<Option<String>>, PeerId),
165 AutoNATStatus(flume::Sender<NatStatus>),
166}
167
168pub struct Libp2pService<DB> {
170 swarm: Swarm<ForestBehaviour>,
171 bootstrap_peers: HashMap<PeerId, Multiaddr>,
172 cs: Arc<ChainStore<DB>>,
173 peer_manager: Arc<PeerManager>,
174 network_receiver_in: flume::Receiver<NetworkMessage>,
175 network_sender_in: Sender<NetworkMessage>,
176 network_receiver_out: flume::Receiver<NetworkEvent>,
177 network_sender_out: Sender<NetworkEvent>,
178 network_name: String,
179 genesis_cid: Cid,
180}
181
182impl<DB> Libp2pService<DB>
183where
184 DB: Blockstore + BitswapStoreReadWrite + Sync + Send + 'static,
185{
186 pub async fn new(
187 config: Libp2pConfig,
188 cs: Arc<ChainStore<DB>>,
189 peer_manager: Arc<PeerManager>,
190 net_keypair: Keypair,
191 network_name: GenesisNetworkName,
192 genesis_cid: Cid,
193 ) -> anyhow::Result<Self> {
194 let behaviour =
195 ForestBehaviour::new(&net_keypair, &config, &network_name, peer_manager.clone())
196 .await?;
197 let mut swarm = SwarmBuilder::with_existing_identity(net_keypair)
198 .with_tokio()
199 .with_tcp(
200 tcp::Config::default().nodelay(true),
201 noise::Config::new,
202 yamux::Config::default,
203 )?
204 .with_quic()
205 .with_dns()?
206 .with_bandwidth_metrics(&mut crate::metrics::collector_registry())
207 .with_behaviour(|_| behaviour)?
208 .with_swarm_config(|config| {
209 config
210 .with_notify_handler_buffer_size(nonzero!(20usize))
211 .with_per_connection_event_buffer_size(64)
212 .with_idle_connection_timeout(Duration::from_secs(60 * 10))
213 })
214 .build();
215
216 for topic in PUBSUB_TOPICS.iter() {
218 let t = Topic::new(format!("{topic}/{network_name}"));
219 swarm
220 .behaviour_mut()
221 .subscribe(&t)
222 .with_context(|| format!("Failed to subscribe gossipsub topic {t}"))?;
223 }
224
225 let (network_sender_in, network_receiver_in) = flume::unbounded();
226 let (network_sender_out, network_receiver_out) = flume::unbounded();
227
228 info!("p2p network peer id: {}", swarm.local_peer_id());
232
233 for addr in &config.listening_multiaddrs {
235 match swarm.listen_on(addr.clone()) {
236 Ok(id) => loop {
237 if let SwarmEvent::NewListenAddr {
238 address,
239 listener_id,
240 } = swarm.select_next_some().await
241 && id == listener_id
242 {
243 info!("p2p peer is now listening on: {address}");
244 break;
245 }
246 },
247 Err(err) => error!("Fail to listen on {addr}: {err}"),
248 }
249 }
250
251 if swarm.listeners().count() == 0 {
252 anyhow::bail!("p2p peer failed to listen on any network endpoints");
253 }
254
255 let bootstrap_peers = config
256 .bootstrap_peers
257 .iter()
258 .filter_map(|ma| match ma.iter().last() {
259 Some(Protocol::P2p(peer)) => Some((peer, ma.clone())),
260 _ => None,
261 })
262 .collect();
263
264 Ok(Libp2pService {
265 swarm,
266 bootstrap_peers,
267 cs,
268 peer_manager,
269 network_receiver_in,
270 network_sender_in,
271 network_receiver_out,
272 network_sender_out,
273 network_name: network_name.into(),
274 genesis_cid,
275 })
276 }
277
278 pub async fn run(mut self) -> anyhow::Result<()> {
281 info!("Running libp2p service");
282
283 if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
285 warn!("Failed to bootstrap with Kademlia: {e}");
286 }
287
288 let bitswap_request_manager = self.swarm.behaviour().bitswap.request_manager();
289 let mut swarm_stream = self.swarm.fuse();
290 let mut network_stream = self.network_receiver_in.stream().fuse();
291 let mut interval =
292 IntervalStream::new(tokio::time::interval(Duration::from_secs(15))).fuse();
293 let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, self.network_name);
294 let pubsub_msg_str = format!("{}/{}", PUBSUB_MSG_STR, self.network_name);
295
296 let (cx_response_tx, cx_response_rx) = flume::unbounded();
297
298 let mut cx_response_rx_stream = cx_response_rx.stream().fuse();
299 let mut bitswap_outbound_request_stream =
300 bitswap_request_manager.outbound_request_stream().fuse();
301 let mut peer_ops_rx_stream = self.peer_manager.peer_ops_rx().stream().fuse();
302 let metrics = if libp2p_metrics_enabled() {
303 Some(Metrics::new(&mut crate::metrics::collector_registry()))
304 } else {
305 None
306 };
307
308 const BOOTSTRAP_PEER_DIALER_INTERVAL: tokio::time::Duration =
309 tokio::time::Duration::from_secs(60);
310 let mut bootstrap_peer_dialer_interval_stream =
311 IntervalStream::new(tokio::time::interval_at(
312 tokio::time::Instant::now() + BOOTSTRAP_PEER_DIALER_INTERVAL,
313 BOOTSTRAP_PEER_DIALER_INTERVAL,
314 ))
315 .fuse();
316 loop {
317 select! {
318 swarm_event = swarm_stream.next() => match swarm_event {
319 Some(SwarmEvent::Behaviour(event)) => {
321 if let Some(m) = &metrics {
322 m.record(&event);
323 }
324 handle_forest_behaviour_event(
325 swarm_stream.get_mut(),
326 &bitswap_request_manager,
327 &self.peer_manager,
328 event,
329 &self.cs,
330 &self.genesis_cid,
331 &self.network_sender_out,
332 cx_response_tx.clone(),
333 &pubsub_block_str,
334 &pubsub_msg_str,).await;
335 },
336 None => { break; },
337 _ => { },
338 },
339 rpc_message = network_stream.next() => match rpc_message {
340 Some(message) => {
342 handle_network_message(
343 swarm_stream.get_mut(),
344 self.cs.clone(),
345 bitswap_request_manager.clone(),
346 message,
347 &self.network_sender_out,
348 &self.peer_manager).await;
349 }
350 None => { break; }
351 },
352 interval_event = interval.next() => if interval_event.is_some() {
353 trace!("Peers connected: {}", swarm_stream.get_mut().behaviour_mut().peers().len());
355 },
356 cs_pair_opt = cx_response_rx_stream.next() => {
357 if let Some((_request_id, channel, cx_response)) = cs_pair_opt {
358 let behaviour = swarm_stream.get_mut().behaviour_mut();
359 if let Err(e) = behaviour.chain_exchange.send_response(channel, cx_response) {
360 debug!("Error sending chain exchange response: {e:?}");
361 }
362 }
363 },
364 bitswap_outbound_request_opt = bitswap_outbound_request_stream.next() => {
365 if let Some((peer, request)) = bitswap_outbound_request_opt {
366 let bitswap = &mut swarm_stream.get_mut().behaviour_mut().bitswap;
367 bitswap.send_request(&peer, request);
368 }
369 }
370 peer_ops_opt = peer_ops_rx_stream.next() => {
371 if let Some(peer_ops) = peer_ops_opt {
372 handle_peer_ops(swarm_stream.get_mut(), peer_ops, &self.bootstrap_peers);
373 }
374 },
375 _ = bootstrap_peer_dialer_interval_stream.next() => {
376 dial_to_bootstrap_peers_if_needed(swarm_stream.get_mut(), &self.bootstrap_peers);
377 }
378 };
379 }
380 Ok(())
381 }
382
383 pub fn network_sender(&self) -> Sender<NetworkMessage> {
385 self.network_sender_in.clone()
386 }
387
388 pub fn network_receiver(&self) -> flume::Receiver<NetworkEvent> {
390 self.network_receiver_out.clone()
391 }
392
393 pub fn peer_manager(&self) -> &Arc<PeerManager> {
394 &self.peer_manager
395 }
396}
397
398fn dial_to_bootstrap_peers_if_needed(
399 swarm: &mut Swarm<ForestBehaviour>,
400 bootstrap_peers: &HashMap<PeerId, Multiaddr>,
401) {
402 for (peer, ma) in bootstrap_peers {
403 if !swarm.behaviour().peers().contains(peer) {
404 info!("Re-dialing to bootstrap peer at {ma}");
405 if let Err(e) = swarm.dial(ma.clone()) {
406 warn!("{e}");
407 }
408 }
409 }
410}
411
412fn handle_peer_ops(
413 swarm: &mut Swarm<ForestBehaviour>,
414 peer_ops: PeerOperation,
415 bootstrap_peers: &HashMap<PeerId, Multiaddr>,
416) {
417 use PeerOperation::*;
418 match peer_ops {
419 Ban {
420 peer,
421 user_agent,
422 reason,
423 } => {
424 if !bootstrap_peers.contains_key(&peer) {
426 let user_agent = user_agent.unwrap_or_default();
427 debug!(%peer, %user_agent, %reason, "Banning peer");
428 swarm.behaviour_mut().blocked_peers.block_peer(peer);
429 }
430 }
431 Unban(peer) => {
432 debug!(%peer, "Unbanning peer");
433 swarm.behaviour_mut().blocked_peers.unblock_peer(peer);
434 }
435 }
436}
437
438async fn handle_network_message(
439 swarm: &mut Swarm<ForestBehaviour>,
440 store: Arc<impl BitswapStoreReadWrite>,
441 bitswap_request_manager: Arc<BitswapRequestManager>,
442 message: NetworkMessage,
443 network_sender_out: &Sender<NetworkEvent>,
444 peer_manager: &PeerManager,
445) {
446 match message {
447 NetworkMessage::PubsubMessage { topic, message } => {
448 if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
449 warn!("Failed to send gossipsub message: {:?}", e);
450 }
451 }
452 NetworkMessage::HelloRequest {
453 peer_id,
454 request,
455 response_channel,
456 } => {
457 let _request_id =
458 swarm
459 .behaviour_mut()
460 .hello
461 .send_request(&peer_id, request, response_channel);
462 emit_event(network_sender_out, NetworkEvent::HelloRequestOutbound).await;
463 }
464 NetworkMessage::ChainExchangeRequest {
465 peer_id,
466 request,
467 response_channel,
468 } => {
469 let _request_id = swarm.behaviour_mut().chain_exchange.send_request(
470 &peer_id,
471 request,
472 response_channel,
473 );
474 emit_event(
475 network_sender_out,
476 NetworkEvent::ChainExchangeRequestOutbound,
477 )
478 .await;
479 }
480 NetworkMessage::BitswapRequest {
481 cid,
482 response_channel,
483 } => {
484 bitswap_request_manager.get_block(
485 store,
486 cid,
487 BITSWAP_TIMEOUT,
488 Some(response_channel),
489 None,
490 );
491 }
492 NetworkMessage::JSONRPCRequest { method } => {
493 match method {
494 NetRPCMethods::AddrsListen(response_channel) => {
495 let listeners = Swarm::listeners(swarm).cloned().collect();
496 let peer_id = Swarm::local_peer_id(swarm);
497 response_channel.send_or_warn((*peer_id, listeners));
498 }
499 NetRPCMethods::Peer(response_channel, peer) => {
500 let addresses = swarm.behaviour().peer_addresses().get(&peer).cloned();
501 response_channel.send_or_warn(addresses);
502 }
503 NetRPCMethods::Peers(response_channel) => {
504 let peer_addresses = swarm.behaviour().peer_addresses();
505 response_channel.send_or_warn(peer_addresses);
506 }
507 NetRPCMethods::ProtectPeer(tx, peer_ids) => {
508 peer_ids.into_iter().for_each(|peer_id| {
509 peer_manager.protect_peer(peer_id);
510 });
511 tx.send_or_warn(());
512 }
513 NetRPCMethods::ListProtectedPeers(tx) => {
514 tx.send_or_warn(peer_manager.list_protected_peers());
515 }
516 NetRPCMethods::UnprotectPeer(tx, peer_ids) => {
517 peer_ids.iter().for_each(|peer_id| {
518 peer_manager.unprotect_peer(peer_id);
519 });
520 tx.send_or_warn(());
521 }
522 NetRPCMethods::Info(response_channel) => {
523 response_channel.send_or_warn(swarm.network_info().into());
524 }
525 NetRPCMethods::Connect(response_channel, peer_id, addresses) => {
526 let mut success = false;
527 for mut multiaddr in addresses {
528 multiaddr.push(Protocol::P2p(peer_id));
529
530 match Swarm::dial(swarm, multiaddr.clone()) {
531 Ok(_) => {
532 info!("Dialed {multiaddr}");
533 success = true;
534 break;
535 }
536 Err(e) => {
537 match e {
538 DialError::Denied { cause } => {
539 if let Some(cause) = cause.downcast_ref::<Exceeded>() {
541 error!(
542 "Denied dialing (limits exceeded) {multiaddr}: {cause}"
543 );
544 } else {
545 error!("Denied dialing {multiaddr}: {cause}")
546 }
547 }
548 e => {
549 error!("Failed to dial {multiaddr}: {e}");
550 }
551 };
552 }
553 };
554 }
555
556 response_channel.send_or_warn(success);
557 }
558 NetRPCMethods::Disconnect(response_channel, peer_id) => {
559 let _ = Swarm::disconnect_peer_id(swarm, peer_id);
560 response_channel.send_or_warn(());
561 }
562 NetRPCMethods::AgentVersion(response_channel, peer_id) => {
563 let agent_version = swarm.behaviour().peer_info(&peer_id).and_then(|info| {
564 info.identify_info
565 .as_ref()
566 .map(|id| id.agent_version.clone())
567 });
568 response_channel.send_or_warn(agent_version);
569 }
570 NetRPCMethods::AutoNATStatus(response_channel) => {
571 let nat_status = swarm.behaviour().discovery.nat_status();
572 response_channel.send_or_warn(nat_status);
573 }
574 }
575 }
576 }
577}
578
579async fn handle_discovery_event(
580 peer_info_map: &HashMap<PeerId, PeerInfo>,
581 discovery_out: DiscoveryEvent,
582 network_sender_out: &Sender<NetworkEvent>,
583 peer_manager: &PeerManager,
584) {
585 match discovery_out {
586 DiscoveryEvent::PeerConnected(peer_id) => {
587 trace!("Peer connected, {peer_id}");
588 emit_event(network_sender_out, NetworkEvent::PeerConnected(peer_id)).await;
589 }
590 DiscoveryEvent::PeerDisconnected(peer_id) => {
591 trace!("Peer disconnected, {peer_id}");
592 emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await;
593 }
594 DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
595 DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
596 peer_id,
597 info,
598 ..
599 }) => {
600 let protocols = HashSet::from_iter(info.protocols.iter().map(|p| p.to_string()));
601 if !protocols.contains(super::hello::HELLO_PROTOCOL_NAME) {
602 peer_manager
603 .ban_peer_with_default_duration(
604 *peer_id,
605 "hello protocol unsupported",
606 |p| get_user_agent(peer_info_map, p),
607 )
608 .await;
609 } else if !protocols.contains(super::chain_exchange::CHAIN_EXCHANGE_PROTOCOL_NAME) {
610 peer_manager
611 .ban_peer_with_default_duration(
612 *peer_id,
613 "chain exchange protocol unsupported",
614 |p| get_user_agent(peer_info_map, p),
615 )
616 .await;
617 }
618 }
619 DerivedDiscoveryBehaviourEvent::Identify(_) => {}
620 _ => {}
621 },
622 }
623}
624
625async fn handle_gossip_event(
626 e: gossipsub::Event,
627 network_sender_out: &Sender<NetworkEvent>,
628 pubsub_block_str: &str,
629 pubsub_msg_str: &str,
630) {
631 if let gossipsub::Event::Message {
632 propagation_source: source,
633 message,
634 ..
635 } = e
636 {
637 let topic = message.topic.as_str();
638 let message = message.data;
639 trace!("Got a Gossip Message from {:?}", source);
640 if topic == pubsub_block_str {
641 match from_slice_with_fallback::<GossipBlock>(&message) {
642 Ok(b) => {
643 emit_event(
644 network_sender_out,
645 NetworkEvent::PubsubMessage {
646 message: PubsubMessage::Block(b),
647 },
648 )
649 .await;
650 }
651 Err(e) => {
652 warn!("Gossip Block from peer {source:?} could not be deserialized: {e}",);
653 }
654 }
655 } else if topic == pubsub_msg_str {
656 match from_slice_with_fallback::<SignedMessage>(&message) {
657 Ok(m) => {
658 emit_event(
659 network_sender_out,
660 NetworkEvent::PubsubMessage {
661 message: PubsubMessage::Message(m),
662 },
663 )
664 .await;
665 }
666 Err(e) => {
667 warn!("Gossip Message from peer {source:?} could not be deserialized: {e}");
668 }
669 }
670 } else {
671 warn!("Getting gossip messages from unknown topic: {topic}");
672 }
673 }
674}
675
676async fn handle_hello_event(
677 peer_info_map: &HashMap<PeerId, PeerInfo>,
678 hello: &mut HelloBehaviour,
679 event: request_response::Event<HelloRequest, HelloResponse, HelloResponse>,
680 peer_manager: &PeerManager,
681 genesis_cid: &Cid,
682 network_sender_out: &Sender<NetworkEvent>,
683) {
684 match event {
685 request_response::Event::Message { peer, message, .. } => match message {
686 request_response::Message::Request {
687 request, channel, ..
688 } => {
689 emit_event(network_sender_out, NetworkEvent::HelloRequestInbound).await;
690
691 let arrival = SystemTime::now()
692 .duration_since(UNIX_EPOCH)
693 .expect("System time before unix epoch")
694 .as_nanos()
695 .try_into()
696 .expect("System time since unix epoch should not exceed u64");
697
698 trace!("Received hello request: {:?}", request);
699 if &request.genesis_cid != genesis_cid {
700 peer_manager
701 .ban_peer_with_default_duration(
702 peer,
703 format!(
704 "Genesis hash mismatch: {} received, {genesis_cid} expected",
705 request.genesis_cid
706 ),
707 |p| get_user_agent(peer_info_map, p),
708 )
709 .await;
710 } else {
711 let sent = SystemTime::now()
712 .duration_since(UNIX_EPOCH)
713 .expect("System time before unix epoch")
714 .as_nanos()
715 .try_into()
716 .expect("System time since unix epoch should not exceed u64");
717
718 if let Err(e) = hello.send_response(channel, HelloResponse { arrival, sent }) {
721 warn!("Failed to send HelloResponse: {e:?}");
722 } else {
723 emit_event(
724 network_sender_out,
725 NetworkEvent::HelloResponseOutbound {
726 source: peer,
727 request,
728 },
729 )
730 .await;
731 }
732 }
733 }
734 request_response::Message::Response {
735 request_id,
736 response,
737 } => {
738 emit_event(network_sender_out, NetworkEvent::HelloResponseInbound).await;
739 hello.handle_response(&request_id, response).await;
740 }
741 },
742 request_response::Event::OutboundFailure {
743 request_id,
744 peer,
745 error,
746 ..
747 } => {
748 hello.on_outbound_failure(&request_id);
749 match error {
750 request_response::OutboundFailure::UnsupportedProtocols => {
751 peer_manager
752 .ban_peer_with_default_duration(peer, "Hello protocol unsupported", |p| {
753 get_user_agent(peer_info_map, p)
754 })
755 .await;
756 }
757 _ => {
758 peer_manager.mark_peer_bad(peer, format!("Hello outbound failure {error}"));
759 }
760 }
761 }
762 request_response::Event::InboundFailure { .. } => {}
763 request_response::Event::ResponseSent { .. } => (),
764 }
765}
766
767async fn handle_ping_event(ping_event: ping::Event) {
768 match ping_event.result {
769 Ok(rtt) => {
770 trace!(
771 "PingSuccess::Ping rtt to {} is {} ms",
772 ping_event.peer,
773 rtt.as_millis()
774 );
775 }
776 Err(ping::Failure::Unsupported) => {
777 debug!(peer=%ping_event.peer, "Ping protocol unsupported");
778 }
779 Err(ping::Failure::Timeout) => {
780 debug!("Ping timeout: {}", ping_event.peer);
781 }
782 Err(ping::Failure::Other { error }) => {
783 debug!("Ping failure: {error}");
784 }
785 }
786}
787
788async fn handle_chain_exchange_event<DB>(
789 chain_exchange: &mut ChainExchangeBehaviour,
790 ce_event: request_response::Event<ChainExchangeRequest, ChainExchangeResponse>,
791 db: &Arc<ChainStore<DB>>,
792 network_sender_out: &Sender<NetworkEvent>,
793 cx_response_tx: Sender<(
794 request_response::InboundRequestId,
795 request_response::ResponseChannel<ChainExchangeResponse>,
796 ChainExchangeResponse,
797 )>,
798) where
799 DB: Blockstore + Sync + Send + 'static,
800{
801 match ce_event {
802 request_response::Event::Message { peer, message, .. } => match message {
803 request_response::Message::Request {
804 request,
805 channel,
806 request_id,
807 } => {
808 trace!(
809 "Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})",
810 );
811 emit_event(
812 network_sender_out,
813 NetworkEvent::ChainExchangeRequestInbound,
814 )
815 .await;
816
817 let db = db.clone();
818 tokio::task::spawn(async move {
819 if let Err(e) = cx_response_tx.send((
820 request_id,
821 channel,
822 make_chain_exchange_response(&db, &request),
823 )) {
824 debug!("Failed to send ChainExchangeResponse: {e:?}");
825 }
826 });
827 }
828 request_response::Message::Response {
829 request_id,
830 response,
831 } => {
832 emit_event(
833 network_sender_out,
834 NetworkEvent::ChainExchangeResponseInbound,
835 )
836 .await;
837 chain_exchange
838 .handle_inbound_response(&request_id, response)
839 .await;
840 }
841 },
842 request_response::Event::OutboundFailure {
843 request_id, error, ..
844 } => {
845 chain_exchange.on_outbound_error(&request_id, error);
846 }
847 request_response::Event::InboundFailure { peer, error, .. } => {
848 debug!(
849 "ChainExchange inbound error (peer: {:?}): {:?}",
850 peer, error
851 );
852 }
853 request_response::Event::ResponseSent { .. } => {
854 emit_event(
855 network_sender_out,
856 NetworkEvent::ChainExchangeResponseOutbound,
857 )
858 .await;
859 }
860 }
861}
862
863#[allow(clippy::too_many_arguments)]
864async fn handle_forest_behaviour_event<DB>(
865 swarm: &mut Swarm<ForestBehaviour>,
866 bitswap_request_manager: &Arc<BitswapRequestManager>,
867 peer_manager: &PeerManager,
868 event: ForestBehaviourEvent,
869 db: &Arc<ChainStore<DB>>,
870 genesis_cid: &Cid,
871 network_sender_out: &Sender<NetworkEvent>,
872 cx_response_tx: Sender<(
873 request_response::InboundRequestId,
874 request_response::ResponseChannel<ChainExchangeResponse>,
875 ChainExchangeResponse,
876 )>,
877 pubsub_block_str: &str,
878 pubsub_msg_str: &str,
879) where
880 DB: Blockstore + BitswapStoreRead + Sync + Send + 'static,
881{
882 match event {
883 ForestBehaviourEvent::Discovery(discovery_out) => {
884 handle_discovery_event(
885 &swarm.behaviour().discovery.peer_info,
886 discovery_out,
887 network_sender_out,
888 peer_manager,
889 )
890 .await
891 }
892 ForestBehaviourEvent::Gossipsub(e) => {
893 handle_gossip_event(e, network_sender_out, pubsub_block_str, pubsub_msg_str).await
894 }
895 ForestBehaviourEvent::Hello(rr_event) => {
896 let behaviour_mut = swarm.behaviour_mut();
897 handle_hello_event(
898 &behaviour_mut.discovery.peer_info,
899 &mut behaviour_mut.hello,
900 rr_event,
901 peer_manager,
902 genesis_cid,
903 network_sender_out,
904 )
905 .await
906 }
907 ForestBehaviourEvent::Bitswap(event) => {
908 if let Err(e) = bitswap_request_manager.handle_event(
909 &mut swarm.behaviour_mut().bitswap,
910 db.blockstore(),
911 event,
912 ) {
913 warn!("bitswap: {e}");
914 }
915 }
916 ForestBehaviourEvent::Ping(ping_event) => handle_ping_event(ping_event).await,
917 ForestBehaviourEvent::ConnectionLimits(_) => {}
918 ForestBehaviourEvent::BlockedPeers(_) => {}
919 ForestBehaviourEvent::ChainExchange(ce_event) => {
920 handle_chain_exchange_event(
921 &mut swarm.behaviour_mut().chain_exchange,
922 ce_event,
923 db,
924 network_sender_out,
925 cx_response_tx,
926 )
927 .await
928 }
929 }
930}
931
932async fn emit_event(sender: &Sender<NetworkEvent>, event: NetworkEvent) {
933 if sender.send_async(event).await.is_err() {
934 error!("Failed to emit event: Network channel receiver has been dropped");
935 }
936}
937
938fn get_user_agent(peer_info_map: &HashMap<PeerId, PeerInfo>, peer: &PeerId) -> Option<String> {
939 peer_info_map
940 .get(peer)
941 .and_then(|i| i.identify_info.as_ref())
942 .map(|i| i.agent_version.clone())
943}