Skip to main content

forest/libp2p/
service.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    sync::Arc,
6    time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use crate::{blocks::GossipBlock, rpc::net::NetInfoResult};
10use crate::{chain::ChainStore, utils::encoding::from_slice_with_fallback};
11use crate::{
12    libp2p_bitswap::{
13        BitswapStoreRead, BitswapStoreReadWrite, request_manager::BitswapRequestManager,
14    },
15    utils::flume::FlumeSenderExt as _,
16};
17use crate::{message::SignedMessage, networks::GenesisNetworkName};
18use ahash::{HashMap, HashSet};
19use anyhow::Context as _;
20use cid::Cid;
21use flume::Sender;
22use futures::{select, stream::StreamExt as _};
23use fvm_ipld_blockstore::Blockstore;
24pub use libp2p::gossipsub::{IdentTopic, Topic};
25use libp2p::{
26    PeerId, Swarm, SwarmBuilder,
27    autonat::NatStatus,
28    connection_limits::Exceeded,
29    core::Multiaddr,
30    gossipsub, identify,
31    identity::Keypair,
32    metrics::{Metrics, Recorder},
33    multiaddr::Protocol,
34    noise, ping, request_response,
35    swarm::{DialError, SwarmEvent},
36    tcp, yamux,
37};
38use nonzero_ext::nonzero;
39use tokio_stream::wrappers::IntervalStream;
40use tracing::{debug, error, info, trace, warn};
41
42use super::{
43    ForestBehaviour, ForestBehaviourEvent, Libp2pConfig,
44    chain_exchange::{ChainExchangeRequest, ChainExchangeResponse, make_chain_exchange_response},
45    discovery::{DerivedDiscoveryBehaviourEvent, PeerInfo},
46};
47use crate::libp2p::{
48    PeerManager, PeerOperation,
49    chain_exchange::ChainExchangeBehaviour,
50    discovery::DiscoveryEvent,
51    hello::{HelloBehaviour, HelloRequest, HelloResponse},
52    rpc::RequestResponseError,
53};
54
55pub(in crate::libp2p) mod metrics {
56    use prometheus_client::metrics::{family::Family, gauge::Gauge};
57    use std::sync::LazyLock;
58
59    use crate::metrics::KindLabel;
60
61    pub static NETWORK_CONTAINER_CAPACITIES: LazyLock<Family<KindLabel, Gauge>> = {
62        LazyLock::new(|| {
63            let metric = Family::default();
64            crate::metrics::default_registry().register(
65                "network_container_capacities",
66                "Capacity for each container",
67                metric.clone(),
68            );
69            metric
70        })
71    };
72
73    pub mod values {
74        use crate::metrics::KindLabel;
75
76        pub const HELLO_REQUEST_TABLE: KindLabel = KindLabel::new("hello_request_table");
77        pub const CHAIN_EXCHANGE_REQUEST_TABLE: KindLabel = KindLabel::new("cx_request_table");
78    }
79}
80
81crate::def_is_env_truthy!(libp2p_metrics_enabled, "FOREST_LIBP2P_METRICS_ENABLED");
82
83/// `Gossipsub` Filecoin blocks topic identifier.
84pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks";
85/// `Gossipsub` Filecoin messages topic identifier.
86pub const PUBSUB_MSG_STR: &str = "/fil/msgs";
87
88const PUBSUB_TOPICS: [&str; 2] = [PUBSUB_BLOCK_STR, PUBSUB_MSG_STR];
89
90pub const BITSWAP_TIMEOUT: Duration = Duration::from_secs(30);
91
92/// Events emitted by this Service.
93#[allow(clippy::large_enum_variant)]
94#[derive(Debug)]
95pub enum NetworkEvent {
96    PubsubMessage {
97        message: PubsubMessage,
98    },
99    HelloRequestInbound,
100    HelloResponseOutbound {
101        source: PeerId,
102        request: HelloRequest,
103    },
104    HelloRequestOutbound,
105    HelloResponseInbound,
106    ChainExchangeRequestOutbound,
107    ChainExchangeResponseInbound,
108    ChainExchangeRequestInbound,
109    ChainExchangeResponseOutbound,
110    PeerConnected(PeerId),
111    PeerDisconnected(PeerId),
112}
113
114/// Message types that can come over `GossipSub`
115#[allow(clippy::large_enum_variant)]
116#[derive(Debug, Clone)]
117pub enum PubsubMessage {
118    /// Messages that come over the block topic
119    Block(GossipBlock),
120    /// Messages that come over the message topic
121    Message(SignedMessage),
122}
123
124/// Messages into the service to handle.
125#[derive(Debug)]
126pub enum NetworkMessage {
127    PubsubMessage {
128        topic: IdentTopic,
129        message: Vec<u8>,
130    },
131    ChainExchangeRequest {
132        peer_id: PeerId,
133        request: ChainExchangeRequest,
134        response_channel: flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
135    },
136    HelloRequest {
137        peer_id: PeerId,
138        request: HelloRequest,
139        response_channel: flume::Sender<HelloResponse>,
140    },
141    BitswapRequest {
142        cid: Cid,
143        response_channel: flume::Sender<bool>,
144    },
145    JSONRPCRequest {
146        method: NetRPCMethods,
147    },
148}
149
150/// Network RPC API methods used to gather data from libp2p node.
151#[derive(Debug)]
152pub enum NetRPCMethods {
153    AddrsListen(flume::Sender<(PeerId, HashSet<Multiaddr>)>),
154    Peer(flume::Sender<Option<HashSet<Multiaddr>>>, PeerId),
155    Peers(flume::Sender<HashMap<PeerId, HashSet<Multiaddr>>>),
156    ProtectPeer(flume::Sender<()>, HashSet<PeerId>),
157    UnprotectPeer(flume::Sender<()>, HashSet<PeerId>),
158    ListProtectedPeers(flume::Sender<HashSet<PeerId>>),
159    Info(flume::Sender<NetInfoResult>),
160    Connect(flume::Sender<bool>, PeerId, HashSet<Multiaddr>),
161    Disconnect(flume::Sender<()>, PeerId),
162    AgentVersion(flume::Sender<Option<String>>, PeerId),
163    AutoNATStatus(flume::Sender<NatStatus>),
164}
165
166/// The `Libp2pService` listens to events from the libp2p swarm.
167pub struct Libp2pService<DB> {
168    swarm: Swarm<ForestBehaviour>,
169    bootstrap_peers: HashMap<PeerId, Multiaddr>,
170    cs: Arc<ChainStore<DB>>,
171    peer_manager: Arc<PeerManager>,
172    network_receiver_in: flume::Receiver<NetworkMessage>,
173    network_sender_in: Sender<NetworkMessage>,
174    network_receiver_out: flume::Receiver<NetworkEvent>,
175    network_sender_out: Sender<NetworkEvent>,
176    network_name: String,
177    genesis_cid: Cid,
178}
179
180impl<DB> Libp2pService<DB>
181where
182    DB: Blockstore + BitswapStoreReadWrite + Sync + Send + 'static,
183{
184    pub async fn new(
185        config: Libp2pConfig,
186        cs: Arc<ChainStore<DB>>,
187        peer_manager: Arc<PeerManager>,
188        net_keypair: Keypair,
189        network_name: GenesisNetworkName,
190        genesis_cid: Cid,
191    ) -> anyhow::Result<Self> {
192        let behaviour =
193            ForestBehaviour::new(&net_keypair, &config, &network_name, peer_manager.clone())
194                .await?;
195        let mut swarm = SwarmBuilder::with_existing_identity(net_keypair)
196            .with_tokio()
197            .with_tcp(
198                tcp::Config::default().nodelay(true),
199                noise::Config::new,
200                yamux::Config::default,
201            )?
202            .with_quic()
203            .with_dns()?
204            .with_bandwidth_metrics(&mut crate::metrics::collector_registry())
205            .with_behaviour(|_| behaviour)?
206            .with_swarm_config(|config| {
207                config
208                    .with_notify_handler_buffer_size(nonzero!(20usize))
209                    .with_per_connection_event_buffer_size(64)
210                    .with_idle_connection_timeout(Duration::from_secs(60 * 10))
211            })
212            .build();
213
214        // Subscribe to gossipsub topics with the network name suffix
215        for topic in PUBSUB_TOPICS.iter() {
216            let t = Topic::new(format!("{topic}/{network_name}"));
217            swarm
218                .behaviour_mut()
219                .subscribe(&t)
220                .with_context(|| format!("Failed to subscribe gossipsub topic {t}"))?;
221        }
222
223        let (network_sender_in, network_receiver_in) = flume::unbounded();
224        let (network_sender_out, network_receiver_out) = flume::unbounded();
225
226        // Hint at the multihash which has to go in the `/p2p/<multihash>` part of the
227        // peer's multiaddress. Useful if others want to use this node to bootstrap
228        // from.
229        info!("p2p network peer id: {}", swarm.local_peer_id());
230
231        // Listen on network endpoints before being detached and connecting to any peers.
232        for addr in &config.listening_multiaddrs {
233            match swarm.listen_on(addr.clone()) {
234                Ok(id) => loop {
235                    if let SwarmEvent::NewListenAddr {
236                        address,
237                        listener_id,
238                    } = swarm.select_next_some().await
239                        && id == listener_id
240                    {
241                        info!("p2p peer is now listening on: {address}");
242                        break;
243                    }
244                },
245                Err(err) => error!("Fail to listen on {addr}: {err}"),
246            }
247        }
248
249        if swarm.listeners().count() == 0 {
250            anyhow::bail!("p2p peer failed to listen on any network endpoints");
251        }
252
253        let bootstrap_peers = config
254            .bootstrap_peers
255            .iter()
256            .filter_map(|ma| match ma.iter().last() {
257                Some(Protocol::P2p(peer)) => Some((peer, ma.clone())),
258                _ => None,
259            })
260            .collect();
261
262        Ok(Libp2pService {
263            swarm,
264            bootstrap_peers,
265            cs,
266            peer_manager,
267            network_receiver_in,
268            network_sender_in,
269            network_receiver_out,
270            network_sender_out,
271            network_name: network_name.into(),
272            genesis_cid,
273        })
274    }
275
276    /// Starts the libp2p service networking stack. This Future resolves when
277    /// shutdown occurs.
278    pub async fn run(mut self) -> anyhow::Result<()> {
279        info!("Running libp2p service");
280
281        // Bootstrap with Kademlia
282        if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
283            warn!("Failed to bootstrap with Kademlia: {e:#}");
284        }
285
286        let bitswap_request_manager = self.swarm.behaviour().bitswap.request_manager();
287        let mut swarm_stream = self.swarm.fuse();
288        let mut network_stream = self.network_receiver_in.stream().fuse();
289        let mut interval =
290            IntervalStream::new(tokio::time::interval(Duration::from_secs(15))).fuse();
291        let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, self.network_name);
292        let pubsub_msg_str = format!("{}/{}", PUBSUB_MSG_STR, self.network_name);
293
294        let (cx_response_tx, cx_response_rx) = flume::unbounded();
295
296        let mut cx_response_rx_stream = cx_response_rx.stream().fuse();
297        let mut bitswap_outbound_request_stream =
298            bitswap_request_manager.outbound_request_stream().fuse();
299        let mut peer_ops_rx_stream = self.peer_manager.peer_ops_rx().stream().fuse();
300        let metrics = if libp2p_metrics_enabled() {
301            Some(Metrics::new(&mut crate::metrics::collector_registry()))
302        } else {
303            None
304        };
305
306        const BOOTSTRAP_PEER_DIALER_INTERVAL: tokio::time::Duration =
307            tokio::time::Duration::from_secs(60);
308        let mut bootstrap_peer_dialer_interval_stream =
309            IntervalStream::new(tokio::time::interval_at(
310                tokio::time::Instant::now() + BOOTSTRAP_PEER_DIALER_INTERVAL,
311                BOOTSTRAP_PEER_DIALER_INTERVAL,
312            ))
313            .fuse();
314        loop {
315            select! {
316                swarm_event = swarm_stream.next() => match swarm_event {
317                    // outbound events
318                    Some(SwarmEvent::Behaviour(event)) => {
319                        if let Some(m) = &metrics {
320                            m.record(&event);
321                        }
322                        handle_forest_behaviour_event(
323                            swarm_stream.get_mut(),
324                            &bitswap_request_manager,
325                            &self.peer_manager,
326                            event,
327                            &self.cs,
328                            &self.genesis_cid,
329                            &self.network_sender_out,
330                            cx_response_tx.clone(),
331                            &pubsub_block_str,
332                            &pubsub_msg_str,).await;
333                    },
334                    None => { break; },
335                    _ => { },
336                },
337                rpc_message = network_stream.next() => match rpc_message {
338                    // Inbound messages
339                    Some(message) => {
340                        handle_network_message(
341                            swarm_stream.get_mut(),
342                            self.cs.clone(),
343                            bitswap_request_manager.clone(),
344                            message,
345                            &self.network_sender_out,
346                            &self.peer_manager).await;
347                    }
348                    None => { break; }
349                },
350                interval_event = interval.next() => if interval_event.is_some() {
351                    // Print peer count on an interval.
352                    trace!("Peers connected: {}", swarm_stream.get_mut().behaviour_mut().peers().len());
353                },
354                cs_pair_opt = cx_response_rx_stream.next() => {
355                    if let Some((_request_id, channel, cx_response)) = cs_pair_opt {
356                        let behaviour = swarm_stream.get_mut().behaviour_mut();
357                        if let Err(e) = behaviour.chain_exchange.send_response(channel, cx_response) {
358                            debug!("Error sending chain exchange response: {e:?}");
359                        }
360                    }
361                },
362                bitswap_outbound_request_opt = bitswap_outbound_request_stream.next() => {
363                    if let Some((peer, request)) = bitswap_outbound_request_opt {
364                        let bitswap = &mut swarm_stream.get_mut().behaviour_mut().bitswap;
365                        bitswap.send_request(&peer, request);
366                    }
367                }
368                peer_ops_opt = peer_ops_rx_stream.next() => {
369                    if let Some(peer_ops) = peer_ops_opt {
370                        handle_peer_ops(swarm_stream.get_mut(), peer_ops, &self.bootstrap_peers);
371                    }
372                },
373                _ = bootstrap_peer_dialer_interval_stream.next() => {
374                    dial_to_bootstrap_peers_if_needed(swarm_stream.get_mut(), &self.bootstrap_peers);
375                }
376            };
377        }
378        Ok(())
379    }
380
381    /// Returns a sender which allows sending messages to the libp2p service.
382    pub fn network_sender(&self) -> Sender<NetworkMessage> {
383        self.network_sender_in.clone()
384    }
385
386    /// Returns a receiver to listen to network events emitted from the service.
387    pub fn network_receiver(&self) -> flume::Receiver<NetworkEvent> {
388        self.network_receiver_out.clone()
389    }
390
391    pub fn peer_manager(&self) -> &Arc<PeerManager> {
392        &self.peer_manager
393    }
394}
395
396fn dial_to_bootstrap_peers_if_needed(
397    swarm: &mut Swarm<ForestBehaviour>,
398    bootstrap_peers: &HashMap<PeerId, Multiaddr>,
399) {
400    for (peer, ma) in bootstrap_peers {
401        if !swarm.behaviour().peers().contains(peer) {
402            info!("Re-dialing to bootstrap peer at {ma}");
403            if let Err(e) = swarm.dial(ma.clone()) {
404                warn!("{e}");
405            }
406        }
407    }
408}
409
410fn handle_peer_ops(
411    swarm: &mut Swarm<ForestBehaviour>,
412    peer_ops: PeerOperation,
413    bootstrap_peers: &HashMap<PeerId, Multiaddr>,
414) {
415    use PeerOperation::*;
416    match peer_ops {
417        Ban {
418            peer,
419            user_agent,
420            reason,
421        } => {
422            // Do not ban bootstrap nodes
423            if !bootstrap_peers.contains_key(&peer) {
424                let user_agent = user_agent.unwrap_or_default();
425                debug!(%peer, %user_agent, %reason, "Banning peer");
426                swarm.behaviour_mut().blocked_peers.block_peer(peer);
427            }
428        }
429        Unban(peer) => {
430            debug!(%peer, "Unbanning peer");
431            swarm.behaviour_mut().blocked_peers.unblock_peer(peer);
432        }
433    }
434}
435
436async fn handle_network_message(
437    swarm: &mut Swarm<ForestBehaviour>,
438    store: Arc<impl BitswapStoreReadWrite>,
439    bitswap_request_manager: Arc<BitswapRequestManager>,
440    message: NetworkMessage,
441    network_sender_out: &Sender<NetworkEvent>,
442    peer_manager: &PeerManager,
443) {
444    match message {
445        NetworkMessage::PubsubMessage { topic, message } => {
446            match swarm.behaviour_mut().publish(topic, message) {
447                Ok(_) => (),
448                Err(gossipsub::PublishError::Duplicate) => {
449                    // Safe to ignore, the message has already been published and deduped by gossipsub, so no need to log this as a warning.
450                    // This matches `go-libp2p-pubsub` behavior https://github.com/libp2p/go-libp2p-pubsub/blob/v0.15.0/topic.go#L240-L242
451                    debug!("Failed to send gossipsub message: duplicate");
452                }
453                Err(e) => {
454                    warn!("Failed to send gossipsub message: {e:#}");
455                }
456            }
457        }
458        NetworkMessage::HelloRequest {
459            peer_id,
460            request,
461            response_channel,
462        } => {
463            let _request_id =
464                swarm
465                    .behaviour_mut()
466                    .hello
467                    .send_request(&peer_id, request, response_channel);
468            emit_event(network_sender_out, NetworkEvent::HelloRequestOutbound).await;
469        }
470        NetworkMessage::ChainExchangeRequest {
471            peer_id,
472            request,
473            response_channel,
474        } => {
475            let _request_id = swarm.behaviour_mut().chain_exchange.send_request(
476                &peer_id,
477                request,
478                response_channel,
479            );
480            emit_event(
481                network_sender_out,
482                NetworkEvent::ChainExchangeRequestOutbound,
483            )
484            .await;
485        }
486        NetworkMessage::BitswapRequest {
487            cid,
488            response_channel,
489        } => {
490            bitswap_request_manager.get_block(
491                store,
492                cid,
493                BITSWAP_TIMEOUT,
494                Some(response_channel),
495                None,
496            );
497        }
498        NetworkMessage::JSONRPCRequest { method } => {
499            match method {
500                NetRPCMethods::AddrsListen(response_channel) => {
501                    let listeners = Swarm::listeners(swarm).cloned().collect();
502                    let peer_id = Swarm::local_peer_id(swarm);
503                    response_channel.send_or_warn((*peer_id, listeners));
504                }
505                NetRPCMethods::Peer(response_channel, peer) => {
506                    let addresses = swarm.behaviour().peer_addresses().get(&peer).cloned();
507                    response_channel.send_or_warn(addresses);
508                }
509                NetRPCMethods::Peers(response_channel) => {
510                    let peer_addresses = swarm.behaviour().peer_addresses();
511                    response_channel.send_or_warn(peer_addresses);
512                }
513                NetRPCMethods::ProtectPeer(tx, peer_ids) => {
514                    peer_ids.into_iter().for_each(|peer_id| {
515                        peer_manager.protect_peer(peer_id);
516                    });
517                    tx.send_or_warn(());
518                }
519                NetRPCMethods::ListProtectedPeers(tx) => {
520                    tx.send_or_warn(peer_manager.list_protected_peers());
521                }
522                NetRPCMethods::UnprotectPeer(tx, peer_ids) => {
523                    peer_ids.iter().for_each(|peer_id| {
524                        peer_manager.unprotect_peer(peer_id);
525                    });
526                    tx.send_or_warn(());
527                }
528                NetRPCMethods::Info(response_channel) => {
529                    response_channel.send_or_warn(swarm.network_info().into());
530                }
531                NetRPCMethods::Connect(response_channel, peer_id, addresses) => {
532                    let mut success = false;
533                    for mut multiaddr in addresses {
534                        multiaddr.push(Protocol::P2p(peer_id));
535
536                        match Swarm::dial(swarm, multiaddr.clone()) {
537                            Ok(_) => {
538                                info!("Dialed {multiaddr}");
539                                success = true;
540                                break;
541                            }
542                            Err(e) => {
543                                match e {
544                                    DialError::Denied { cause } => {
545                                        // try to get a more specific error cause
546                                        if let Some(cause) = cause.downcast_ref::<Exceeded>() {
547                                            error!(
548                                                "Denied dialing (limits exceeded) {multiaddr}: {cause}"
549                                            );
550                                        } else {
551                                            error!("Denied dialing {multiaddr}: {cause}")
552                                        }
553                                    }
554                                    e => {
555                                        error!("Failed to dial {multiaddr}: {e}");
556                                    }
557                                };
558                            }
559                        };
560                    }
561
562                    response_channel.send_or_warn(success);
563                }
564                NetRPCMethods::Disconnect(response_channel, peer_id) => {
565                    let _ = Swarm::disconnect_peer_id(swarm, peer_id);
566                    response_channel.send_or_warn(());
567                }
568                NetRPCMethods::AgentVersion(response_channel, peer_id) => {
569                    let agent_version = swarm.behaviour().peer_info(&peer_id).and_then(|info| {
570                        info.identify_info
571                            .as_ref()
572                            .map(|id| id.agent_version.clone())
573                    });
574                    response_channel.send_or_warn(agent_version);
575                }
576                NetRPCMethods::AutoNATStatus(response_channel) => {
577                    let nat_status = swarm.behaviour().discovery.nat_status();
578                    response_channel.send_or_warn(nat_status);
579                }
580            }
581        }
582    }
583}
584
585async fn handle_discovery_event(
586    peer_info_map: &HashMap<PeerId, PeerInfo>,
587    discovery_out: DiscoveryEvent,
588    network_sender_out: &Sender<NetworkEvent>,
589    peer_manager: &PeerManager,
590) {
591    match discovery_out {
592        DiscoveryEvent::PeerConnected(peer_id) => {
593            trace!("Peer connected, {peer_id}");
594            emit_event(network_sender_out, NetworkEvent::PeerConnected(peer_id)).await;
595        }
596        DiscoveryEvent::PeerDisconnected(peer_id) => {
597            trace!("Peer disconnected, {peer_id}");
598            emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await;
599        }
600        DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
601            DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
602                peer_id,
603                info,
604                ..
605            }) => {
606                let protocols = HashSet::from_iter(info.protocols.iter().map(|p| p.to_string()));
607                if !protocols.contains(super::hello::HELLO_PROTOCOL_NAME) {
608                    peer_manager
609                        .ban_peer_with_default_duration(
610                            *peer_id,
611                            "hello protocol unsupported",
612                            |p| get_user_agent(peer_info_map, p),
613                        )
614                        .await;
615                } else if !protocols.contains(super::chain_exchange::CHAIN_EXCHANGE_PROTOCOL_NAME) {
616                    peer_manager
617                        .ban_peer_with_default_duration(
618                            *peer_id,
619                            "chain exchange protocol unsupported",
620                            |p| get_user_agent(peer_info_map, p),
621                        )
622                        .await;
623                }
624            }
625            DerivedDiscoveryBehaviourEvent::Identify(_) => {}
626            _ => {}
627        },
628    }
629}
630
631async fn handle_gossip_event(
632    e: gossipsub::Event,
633    network_sender_out: &Sender<NetworkEvent>,
634    pubsub_block_str: &str,
635    pubsub_msg_str: &str,
636) {
637    if let gossipsub::Event::Message {
638        propagation_source: source,
639        message,
640        ..
641    } = e
642    {
643        let topic = message.topic.as_str();
644        let message = message.data;
645        trace!("Got a Gossip Message from {:?}", source);
646        if topic == pubsub_block_str {
647            match from_slice_with_fallback::<GossipBlock>(&message) {
648                Ok(b) => {
649                    emit_event(
650                        network_sender_out,
651                        NetworkEvent::PubsubMessage {
652                            message: PubsubMessage::Block(b),
653                        },
654                    )
655                    .await;
656                }
657                Err(e) => {
658                    warn!("Gossip Block from peer {source:?} could not be deserialized: {e:#}",);
659                }
660            }
661        } else if topic == pubsub_msg_str {
662            match from_slice_with_fallback::<SignedMessage>(&message) {
663                Ok(m) => {
664                    emit_event(
665                        network_sender_out,
666                        NetworkEvent::PubsubMessage {
667                            message: PubsubMessage::Message(m),
668                        },
669                    )
670                    .await;
671                }
672                Err(e) => {
673                    warn!("Gossip Message from peer {source:?} could not be deserialized: {e:#}");
674                }
675            }
676        } else {
677            warn!("Getting gossip messages from unknown topic: {topic}");
678        }
679    }
680}
681
682async fn handle_hello_event(
683    peer_info_map: &HashMap<PeerId, PeerInfo>,
684    hello: &mut HelloBehaviour,
685    event: request_response::Event<HelloRequest, HelloResponse, HelloResponse>,
686    peer_manager: &PeerManager,
687    genesis_cid: &Cid,
688    network_sender_out: &Sender<NetworkEvent>,
689) {
690    match event {
691        request_response::Event::Message { peer, message, .. } => match message {
692            request_response::Message::Request {
693                request, channel, ..
694            } => {
695                emit_event(network_sender_out, NetworkEvent::HelloRequestInbound).await;
696
697                let arrival = SystemTime::now()
698                    .duration_since(UNIX_EPOCH)
699                    .expect("System time before unix epoch")
700                    .as_nanos()
701                    .try_into()
702                    .expect("System time since unix epoch should not exceed u64");
703
704                trace!("Received hello request: {:?}", request);
705                if &request.genesis_cid != genesis_cid {
706                    peer_manager
707                        .ban_peer_with_default_duration(
708                            peer,
709                            format!(
710                                "Genesis hash mismatch: {} received, {genesis_cid} expected",
711                                request.genesis_cid
712                            ),
713                            |p| get_user_agent(peer_info_map, p),
714                        )
715                        .await;
716                } else {
717                    let sent = SystemTime::now()
718                        .duration_since(UNIX_EPOCH)
719                        .expect("System time before unix epoch")
720                        .as_nanos()
721                        .try_into()
722                        .expect("System time since unix epoch should not exceed u64");
723
724                    // Send hello response immediately, no need to have the overhead of emitting
725                    // channel and polling future here.
726                    if let Err(e) = hello.send_response(channel, HelloResponse { arrival, sent }) {
727                        warn!("Failed to send HelloResponse: {e:?}");
728                    } else {
729                        emit_event(
730                            network_sender_out,
731                            NetworkEvent::HelloResponseOutbound {
732                                source: peer,
733                                request,
734                            },
735                        )
736                        .await;
737                    }
738                }
739            }
740            request_response::Message::Response {
741                request_id,
742                response,
743            } => {
744                emit_event(network_sender_out, NetworkEvent::HelloResponseInbound).await;
745                hello.handle_response(&request_id, response).await;
746            }
747        },
748        request_response::Event::OutboundFailure {
749            request_id,
750            peer,
751            error,
752            ..
753        } => {
754            hello.on_outbound_failure(&request_id);
755            match error {
756                request_response::OutboundFailure::UnsupportedProtocols => {
757                    peer_manager
758                        .ban_peer_with_default_duration(peer, "Hello protocol unsupported", |p| {
759                            get_user_agent(peer_info_map, p)
760                        })
761                        .await;
762                }
763                _ => {
764                    peer_manager.mark_peer_bad(peer, format!("Hello outbound failure {error}"));
765                }
766            }
767        }
768        request_response::Event::InboundFailure { .. } => {}
769        request_response::Event::ResponseSent { .. } => (),
770    }
771}
772
773async fn handle_ping_event(ping_event: ping::Event) {
774    match ping_event.result {
775        Ok(rtt) => {
776            trace!(
777                "PingSuccess::Ping rtt to {} is {} ms",
778                ping_event.peer,
779                rtt.as_millis()
780            );
781        }
782        Err(ping::Failure::Unsupported) => {
783            debug!(peer=%ping_event.peer, "Ping protocol unsupported");
784        }
785        Err(ping::Failure::Timeout) => {
786            debug!("Ping timeout: {}", ping_event.peer);
787        }
788        Err(ping::Failure::Other { error }) => {
789            debug!("Ping failure: {error}");
790        }
791    }
792}
793
794async fn handle_chain_exchange_event<DB>(
795    chain_exchange: &mut ChainExchangeBehaviour,
796    ce_event: request_response::Event<ChainExchangeRequest, ChainExchangeResponse>,
797    db: &Arc<ChainStore<DB>>,
798    network_sender_out: &Sender<NetworkEvent>,
799    cx_response_tx: Sender<(
800        request_response::InboundRequestId,
801        request_response::ResponseChannel<ChainExchangeResponse>,
802        ChainExchangeResponse,
803    )>,
804) where
805    DB: Blockstore + Sync + Send + 'static,
806{
807    match ce_event {
808        request_response::Event::Message { peer, message, .. } => match message {
809            request_response::Message::Request {
810                request,
811                channel,
812                request_id,
813            } => {
814                let Some(per_peer_permit) = chain_exchange.try_acquire_peer_permit(peer) else {
815                    debug!("Rejecting chain_exchange request from {peer}: per-peer cap reached");
816                    let _ = chain_exchange.send_response(
817                        channel,
818                        ChainExchangeResponse::go_away("per-peer concurrent request cap reached"),
819                    );
820                    return;
821                };
822                let Some(global_permit) = chain_exchange.try_acquire_request_permit() else {
823                    debug!("Rejecting chain_exchange request from {peer}: global cap reached");
824                    let _ = chain_exchange.send_response(
825                        channel,
826                        ChainExchangeResponse::go_away("global concurrent request cap reached"),
827                    );
828                    return;
829                };
830
831                trace!(
832                    "Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})",
833                );
834                emit_event(
835                    network_sender_out,
836                    NetworkEvent::ChainExchangeRequestInbound,
837                )
838                .await;
839
840                let db = db.clone();
841                tokio::task::spawn(async move {
842                    let _per_peer_permit = per_peer_permit;
843                    let _global_permit = global_permit;
844                    if let Err(e) = cx_response_tx.send((
845                        request_id,
846                        channel,
847                        make_chain_exchange_response(&db, &request),
848                    )) {
849                        debug!("Failed to send ChainExchangeResponse: {e:?}");
850                    }
851                });
852            }
853            request_response::Message::Response {
854                request_id,
855                response,
856            } => {
857                emit_event(
858                    network_sender_out,
859                    NetworkEvent::ChainExchangeResponseInbound,
860                )
861                .await;
862                chain_exchange
863                    .handle_inbound_response(&request_id, response)
864                    .await;
865            }
866        },
867        request_response::Event::OutboundFailure {
868            request_id, error, ..
869        } => {
870            chain_exchange.on_outbound_error(&request_id, error);
871        }
872        request_response::Event::InboundFailure { peer, error, .. } => {
873            debug!(
874                "ChainExchange inbound error (peer: {:?}): {:?}",
875                peer, error
876            );
877        }
878        request_response::Event::ResponseSent { .. } => {
879            emit_event(
880                network_sender_out,
881                NetworkEvent::ChainExchangeResponseOutbound,
882            )
883            .await;
884        }
885    }
886}
887
888#[allow(clippy::too_many_arguments)]
889async fn handle_forest_behaviour_event<DB>(
890    swarm: &mut Swarm<ForestBehaviour>,
891    bitswap_request_manager: &Arc<BitswapRequestManager>,
892    peer_manager: &PeerManager,
893    event: ForestBehaviourEvent,
894    db: &Arc<ChainStore<DB>>,
895    genesis_cid: &Cid,
896    network_sender_out: &Sender<NetworkEvent>,
897    cx_response_tx: Sender<(
898        request_response::InboundRequestId,
899        request_response::ResponseChannel<ChainExchangeResponse>,
900        ChainExchangeResponse,
901    )>,
902    pubsub_block_str: &str,
903    pubsub_msg_str: &str,
904) where
905    DB: Blockstore + BitswapStoreRead + Sync + Send + 'static,
906{
907    match event {
908        ForestBehaviourEvent::Discovery(discovery_out) => {
909            handle_discovery_event(
910                &swarm.behaviour().discovery.peer_info,
911                discovery_out,
912                network_sender_out,
913                peer_manager,
914            )
915            .await
916        }
917        ForestBehaviourEvent::Gossipsub(e) => {
918            handle_gossip_event(e, network_sender_out, pubsub_block_str, pubsub_msg_str).await
919        }
920        ForestBehaviourEvent::Hello(rr_event) => {
921            let behaviour_mut = swarm.behaviour_mut();
922            handle_hello_event(
923                &behaviour_mut.discovery.peer_info,
924                &mut behaviour_mut.hello,
925                rr_event,
926                peer_manager,
927                genesis_cid,
928                network_sender_out,
929            )
930            .await
931        }
932        ForestBehaviourEvent::Bitswap(event) => {
933            if let Err(e) = bitswap_request_manager.handle_event(
934                &mut swarm.behaviour_mut().bitswap,
935                db.blockstore(),
936                event,
937            ) {
938                warn!("bitswap: {e:#}");
939            }
940        }
941        ForestBehaviourEvent::Ping(ping_event) => handle_ping_event(ping_event).await,
942        ForestBehaviourEvent::ConnectionLimits(_) => {}
943        ForestBehaviourEvent::BlockedPeers(_) => {}
944        ForestBehaviourEvent::ChainExchange(ce_event) => {
945            handle_chain_exchange_event(
946                &mut swarm.behaviour_mut().chain_exchange,
947                ce_event,
948                db,
949                network_sender_out,
950                cx_response_tx,
951            )
952            .await
953        }
954    }
955}
956
957async fn emit_event(sender: &Sender<NetworkEvent>, event: NetworkEvent) {
958    if sender.send_async(event).await.is_err() {
959        error!("Failed to emit event: Network channel receiver has been dropped");
960    }
961}
962
963fn get_user_agent(peer_info_map: &HashMap<PeerId, PeerInfo>, peer: &PeerId) -> Option<String> {
964    peer_info_map
965        .get(peer)
966        .and_then(|i| i.identify_info.as_ref())
967        .map(|i| i.agent_version.clone())
968}