Skip to main content

forest/libp2p/
service.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    sync::Arc,
6    time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use crate::{blocks::GossipBlock, rpc::net::NetInfoResult};
10use crate::{chain::ChainStore, utils::encoding::from_slice_with_fallback};
11use crate::{
12    libp2p_bitswap::{
13        BitswapStoreRead, BitswapStoreReadWrite, request_manager::BitswapRequestManager,
14    },
15    utils::flume::FlumeSenderExt as _,
16};
17use crate::{message::SignedMessage, networks::GenesisNetworkName};
18use ahash::{HashMap, HashSet};
19use anyhow::Context as _;
20use cid::Cid;
21use flume::Sender;
22use futures::{select, stream::StreamExt as _};
23use fvm_ipld_blockstore::Blockstore;
24pub use libp2p::gossipsub::{IdentTopic, Topic};
25use libp2p::{
26    PeerId, Swarm, SwarmBuilder,
27    autonat::NatStatus,
28    connection_limits::Exceeded,
29    core::Multiaddr,
30    gossipsub, identify,
31    identity::Keypair,
32    metrics::{Metrics, Recorder},
33    multiaddr::Protocol,
34    noise, ping, request_response,
35    swarm::{DialError, SwarmEvent},
36    tcp, yamux,
37};
38use nonzero_ext::nonzero;
39use tokio_stream::wrappers::IntervalStream;
40use tracing::{debug, error, info, trace, warn};
41
42use super::{
43    ForestBehaviour, ForestBehaviourEvent, Libp2pConfig,
44    chain_exchange::{ChainExchangeRequest, ChainExchangeResponse, make_chain_exchange_response},
45    discovery::{DerivedDiscoveryBehaviourEvent, PeerInfo},
46};
47use crate::libp2p::{
48    PeerManager, PeerOperation,
49    chain_exchange::ChainExchangeBehaviour,
50    discovery::DiscoveryEvent,
51    hello::{HelloBehaviour, HelloRequest, HelloResponse},
52    rpc::RequestResponseError,
53};
54
55pub(in crate::libp2p) mod metrics {
56    use prometheus_client::metrics::{family::Family, gauge::Gauge};
57    use std::sync::LazyLock;
58
59    use crate::metrics::KindLabel;
60
61    pub static NETWORK_CONTAINER_CAPACITIES: LazyLock<Family<KindLabel, Gauge>> = {
62        LazyLock::new(|| {
63            let metric = Family::default();
64            crate::metrics::default_registry().register(
65                "network_container_capacities",
66                "Capacity for each container",
67                metric.clone(),
68            );
69            metric
70        })
71    };
72
73    pub mod values {
74        use crate::metrics::KindLabel;
75
76        pub const HELLO_REQUEST_TABLE: KindLabel = KindLabel::new("hello_request_table");
77        pub const CHAIN_EXCHANGE_REQUEST_TABLE: KindLabel = KindLabel::new("cx_request_table");
78    }
79}
80
81fn libp2p_metrics_enabled() -> bool {
82    crate::utils::misc::env::is_env_truthy("FOREST_LIBP2P_METRICS_ENABLED")
83}
84
85/// `Gossipsub` Filecoin blocks topic identifier.
86pub const PUBSUB_BLOCK_STR: &str = "/fil/blocks";
87/// `Gossipsub` Filecoin messages topic identifier.
88pub const PUBSUB_MSG_STR: &str = "/fil/msgs";
89
90const PUBSUB_TOPICS: [&str; 2] = [PUBSUB_BLOCK_STR, PUBSUB_MSG_STR];
91
92pub const BITSWAP_TIMEOUT: Duration = Duration::from_secs(30);
93
94/// Events emitted by this Service.
95#[allow(clippy::large_enum_variant)]
96#[derive(Debug)]
97pub enum NetworkEvent {
98    PubsubMessage {
99        message: PubsubMessage,
100    },
101    HelloRequestInbound,
102    HelloResponseOutbound {
103        source: PeerId,
104        request: HelloRequest,
105    },
106    HelloRequestOutbound,
107    HelloResponseInbound,
108    ChainExchangeRequestOutbound,
109    ChainExchangeResponseInbound,
110    ChainExchangeRequestInbound,
111    ChainExchangeResponseOutbound,
112    PeerConnected(PeerId),
113    PeerDisconnected(PeerId),
114}
115
116/// Message types that can come over `GossipSub`
117#[allow(clippy::large_enum_variant)]
118#[derive(Debug, Clone)]
119pub enum PubsubMessage {
120    /// Messages that come over the block topic
121    Block(GossipBlock),
122    /// Messages that come over the message topic
123    Message(SignedMessage),
124}
125
126/// Messages into the service to handle.
127#[derive(Debug)]
128pub enum NetworkMessage {
129    PubsubMessage {
130        topic: IdentTopic,
131        message: Vec<u8>,
132    },
133    ChainExchangeRequest {
134        peer_id: PeerId,
135        request: ChainExchangeRequest,
136        response_channel: flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
137    },
138    HelloRequest {
139        peer_id: PeerId,
140        request: HelloRequest,
141        response_channel: flume::Sender<HelloResponse>,
142    },
143    BitswapRequest {
144        cid: Cid,
145        response_channel: flume::Sender<bool>,
146    },
147    JSONRPCRequest {
148        method: NetRPCMethods,
149    },
150}
151
152/// Network RPC API methods used to gather data from libp2p node.
153#[derive(Debug)]
154pub enum NetRPCMethods {
155    AddrsListen(flume::Sender<(PeerId, HashSet<Multiaddr>)>),
156    Peer(flume::Sender<Option<HashSet<Multiaddr>>>, PeerId),
157    Peers(flume::Sender<HashMap<PeerId, HashSet<Multiaddr>>>),
158    ProtectPeer(flume::Sender<()>, HashSet<PeerId>),
159    UnprotectPeer(flume::Sender<()>, HashSet<PeerId>),
160    ListProtectedPeers(flume::Sender<HashSet<PeerId>>),
161    Info(flume::Sender<NetInfoResult>),
162    Connect(flume::Sender<bool>, PeerId, HashSet<Multiaddr>),
163    Disconnect(flume::Sender<()>, PeerId),
164    AgentVersion(flume::Sender<Option<String>>, PeerId),
165    AutoNATStatus(flume::Sender<NatStatus>),
166}
167
168/// The `Libp2pService` listens to events from the libp2p swarm.
169pub struct Libp2pService<DB> {
170    swarm: Swarm<ForestBehaviour>,
171    bootstrap_peers: HashMap<PeerId, Multiaddr>,
172    cs: Arc<ChainStore<DB>>,
173    peer_manager: Arc<PeerManager>,
174    network_receiver_in: flume::Receiver<NetworkMessage>,
175    network_sender_in: Sender<NetworkMessage>,
176    network_receiver_out: flume::Receiver<NetworkEvent>,
177    network_sender_out: Sender<NetworkEvent>,
178    network_name: String,
179    genesis_cid: Cid,
180}
181
182impl<DB> Libp2pService<DB>
183where
184    DB: Blockstore + BitswapStoreReadWrite + Sync + Send + 'static,
185{
186    pub async fn new(
187        config: Libp2pConfig,
188        cs: Arc<ChainStore<DB>>,
189        peer_manager: Arc<PeerManager>,
190        net_keypair: Keypair,
191        network_name: GenesisNetworkName,
192        genesis_cid: Cid,
193    ) -> anyhow::Result<Self> {
194        let behaviour =
195            ForestBehaviour::new(&net_keypair, &config, &network_name, peer_manager.clone())
196                .await?;
197        let mut swarm = SwarmBuilder::with_existing_identity(net_keypair)
198            .with_tokio()
199            .with_tcp(
200                tcp::Config::default().nodelay(true),
201                noise::Config::new,
202                yamux::Config::default,
203            )?
204            .with_quic()
205            .with_dns()?
206            .with_bandwidth_metrics(&mut crate::metrics::collector_registry())
207            .with_behaviour(|_| behaviour)?
208            .with_swarm_config(|config| {
209                config
210                    .with_notify_handler_buffer_size(nonzero!(20usize))
211                    .with_per_connection_event_buffer_size(64)
212                    .with_idle_connection_timeout(Duration::from_secs(60 * 10))
213            })
214            .build();
215
216        // Subscribe to gossipsub topics with the network name suffix
217        for topic in PUBSUB_TOPICS.iter() {
218            let t = Topic::new(format!("{topic}/{network_name}"));
219            swarm
220                .behaviour_mut()
221                .subscribe(&t)
222                .with_context(|| format!("Failed to subscribe gossipsub topic {t}"))?;
223        }
224
225        let (network_sender_in, network_receiver_in) = flume::unbounded();
226        let (network_sender_out, network_receiver_out) = flume::unbounded();
227
228        // Hint at the multihash which has to go in the `/p2p/<multihash>` part of the
229        // peer's multiaddress. Useful if others want to use this node to bootstrap
230        // from.
231        info!("p2p network peer id: {}", swarm.local_peer_id());
232
233        // Listen on network endpoints before being detached and connecting to any peers.
234        for addr in &config.listening_multiaddrs {
235            match swarm.listen_on(addr.clone()) {
236                Ok(id) => loop {
237                    if let SwarmEvent::NewListenAddr {
238                        address,
239                        listener_id,
240                    } = swarm.select_next_some().await
241                        && id == listener_id
242                    {
243                        info!("p2p peer is now listening on: {address}");
244                        break;
245                    }
246                },
247                Err(err) => error!("Fail to listen on {addr}: {err}"),
248            }
249        }
250
251        if swarm.listeners().count() == 0 {
252            anyhow::bail!("p2p peer failed to listen on any network endpoints");
253        }
254
255        let bootstrap_peers = config
256            .bootstrap_peers
257            .iter()
258            .filter_map(|ma| match ma.iter().last() {
259                Some(Protocol::P2p(peer)) => Some((peer, ma.clone())),
260                _ => None,
261            })
262            .collect();
263
264        Ok(Libp2pService {
265            swarm,
266            bootstrap_peers,
267            cs,
268            peer_manager,
269            network_receiver_in,
270            network_sender_in,
271            network_receiver_out,
272            network_sender_out,
273            network_name: network_name.into(),
274            genesis_cid,
275        })
276    }
277
278    /// Starts the libp2p service networking stack. This Future resolves when
279    /// shutdown occurs.
280    pub async fn run(mut self) -> anyhow::Result<()> {
281        info!("Running libp2p service");
282
283        // Bootstrap with Kademlia
284        if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
285            warn!("Failed to bootstrap with Kademlia: {e}");
286        }
287
288        let bitswap_request_manager = self.swarm.behaviour().bitswap.request_manager();
289        let mut swarm_stream = self.swarm.fuse();
290        let mut network_stream = self.network_receiver_in.stream().fuse();
291        let mut interval =
292            IntervalStream::new(tokio::time::interval(Duration::from_secs(15))).fuse();
293        let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, self.network_name);
294        let pubsub_msg_str = format!("{}/{}", PUBSUB_MSG_STR, self.network_name);
295
296        let (cx_response_tx, cx_response_rx) = flume::unbounded();
297
298        let mut cx_response_rx_stream = cx_response_rx.stream().fuse();
299        let mut bitswap_outbound_request_stream =
300            bitswap_request_manager.outbound_request_stream().fuse();
301        let mut peer_ops_rx_stream = self.peer_manager.peer_ops_rx().stream().fuse();
302        let metrics = if libp2p_metrics_enabled() {
303            Some(Metrics::new(&mut crate::metrics::collector_registry()))
304        } else {
305            None
306        };
307
308        const BOOTSTRAP_PEER_DIALER_INTERVAL: tokio::time::Duration =
309            tokio::time::Duration::from_secs(60);
310        let mut bootstrap_peer_dialer_interval_stream =
311            IntervalStream::new(tokio::time::interval_at(
312                tokio::time::Instant::now() + BOOTSTRAP_PEER_DIALER_INTERVAL,
313                BOOTSTRAP_PEER_DIALER_INTERVAL,
314            ))
315            .fuse();
316        loop {
317            select! {
318                swarm_event = swarm_stream.next() => match swarm_event {
319                    // outbound events
320                    Some(SwarmEvent::Behaviour(event)) => {
321                        if let Some(m) = &metrics {
322                            m.record(&event);
323                        }
324                        handle_forest_behaviour_event(
325                            swarm_stream.get_mut(),
326                            &bitswap_request_manager,
327                            &self.peer_manager,
328                            event,
329                            &self.cs,
330                            &self.genesis_cid,
331                            &self.network_sender_out,
332                            cx_response_tx.clone(),
333                            &pubsub_block_str,
334                            &pubsub_msg_str,).await;
335                    },
336                    None => { break; },
337                    _ => { },
338                },
339                rpc_message = network_stream.next() => match rpc_message {
340                    // Inbound messages
341                    Some(message) => {
342                        handle_network_message(
343                            swarm_stream.get_mut(),
344                            self.cs.clone(),
345                            bitswap_request_manager.clone(),
346                            message,
347                            &self.network_sender_out,
348                            &self.peer_manager).await;
349                    }
350                    None => { break; }
351                },
352                interval_event = interval.next() => if interval_event.is_some() {
353                    // Print peer count on an interval.
354                    trace!("Peers connected: {}", swarm_stream.get_mut().behaviour_mut().peers().len());
355                },
356                cs_pair_opt = cx_response_rx_stream.next() => {
357                    if let Some((_request_id, channel, cx_response)) = cs_pair_opt {
358                        let behaviour = swarm_stream.get_mut().behaviour_mut();
359                        if let Err(e) = behaviour.chain_exchange.send_response(channel, cx_response) {
360                            debug!("Error sending chain exchange response: {e:?}");
361                        }
362                    }
363                },
364                bitswap_outbound_request_opt = bitswap_outbound_request_stream.next() => {
365                    if let Some((peer, request)) = bitswap_outbound_request_opt {
366                        let bitswap = &mut swarm_stream.get_mut().behaviour_mut().bitswap;
367                        bitswap.send_request(&peer, request);
368                    }
369                }
370                peer_ops_opt = peer_ops_rx_stream.next() => {
371                    if let Some(peer_ops) = peer_ops_opt {
372                        handle_peer_ops(swarm_stream.get_mut(), peer_ops, &self.bootstrap_peers);
373                    }
374                },
375                _ = bootstrap_peer_dialer_interval_stream.next() => {
376                    dial_to_bootstrap_peers_if_needed(swarm_stream.get_mut(), &self.bootstrap_peers);
377                }
378            };
379        }
380        Ok(())
381    }
382
383    /// Returns a sender which allows sending messages to the libp2p service.
384    pub fn network_sender(&self) -> Sender<NetworkMessage> {
385        self.network_sender_in.clone()
386    }
387
388    /// Returns a receiver to listen to network events emitted from the service.
389    pub fn network_receiver(&self) -> flume::Receiver<NetworkEvent> {
390        self.network_receiver_out.clone()
391    }
392
393    pub fn peer_manager(&self) -> &Arc<PeerManager> {
394        &self.peer_manager
395    }
396}
397
398fn dial_to_bootstrap_peers_if_needed(
399    swarm: &mut Swarm<ForestBehaviour>,
400    bootstrap_peers: &HashMap<PeerId, Multiaddr>,
401) {
402    for (peer, ma) in bootstrap_peers {
403        if !swarm.behaviour().peers().contains(peer) {
404            info!("Re-dialing to bootstrap peer at {ma}");
405            if let Err(e) = swarm.dial(ma.clone()) {
406                warn!("{e}");
407            }
408        }
409    }
410}
411
412fn handle_peer_ops(
413    swarm: &mut Swarm<ForestBehaviour>,
414    peer_ops: PeerOperation,
415    bootstrap_peers: &HashMap<PeerId, Multiaddr>,
416) {
417    use PeerOperation::*;
418    match peer_ops {
419        Ban {
420            peer,
421            user_agent,
422            reason,
423        } => {
424            // Do not ban bootstrap nodes
425            if !bootstrap_peers.contains_key(&peer) {
426                let user_agent = user_agent.unwrap_or_default();
427                debug!(%peer, %user_agent, %reason, "Banning peer");
428                swarm.behaviour_mut().blocked_peers.block_peer(peer);
429            }
430        }
431        Unban(peer) => {
432            debug!(%peer, "Unbanning peer");
433            swarm.behaviour_mut().blocked_peers.unblock_peer(peer);
434        }
435    }
436}
437
438async fn handle_network_message(
439    swarm: &mut Swarm<ForestBehaviour>,
440    store: Arc<impl BitswapStoreReadWrite>,
441    bitswap_request_manager: Arc<BitswapRequestManager>,
442    message: NetworkMessage,
443    network_sender_out: &Sender<NetworkEvent>,
444    peer_manager: &PeerManager,
445) {
446    match message {
447        NetworkMessage::PubsubMessage { topic, message } => {
448            if let Err(e) = swarm.behaviour_mut().publish(topic, message) {
449                warn!("Failed to send gossipsub message: {:?}", e);
450            }
451        }
452        NetworkMessage::HelloRequest {
453            peer_id,
454            request,
455            response_channel,
456        } => {
457            let _request_id =
458                swarm
459                    .behaviour_mut()
460                    .hello
461                    .send_request(&peer_id, request, response_channel);
462            emit_event(network_sender_out, NetworkEvent::HelloRequestOutbound).await;
463        }
464        NetworkMessage::ChainExchangeRequest {
465            peer_id,
466            request,
467            response_channel,
468        } => {
469            let _request_id = swarm.behaviour_mut().chain_exchange.send_request(
470                &peer_id,
471                request,
472                response_channel,
473            );
474            emit_event(
475                network_sender_out,
476                NetworkEvent::ChainExchangeRequestOutbound,
477            )
478            .await;
479        }
480        NetworkMessage::BitswapRequest {
481            cid,
482            response_channel,
483        } => {
484            bitswap_request_manager.get_block(
485                store,
486                cid,
487                BITSWAP_TIMEOUT,
488                Some(response_channel),
489                None,
490            );
491        }
492        NetworkMessage::JSONRPCRequest { method } => {
493            match method {
494                NetRPCMethods::AddrsListen(response_channel) => {
495                    let listeners = Swarm::listeners(swarm).cloned().collect();
496                    let peer_id = Swarm::local_peer_id(swarm);
497                    response_channel.send_or_warn((*peer_id, listeners));
498                }
499                NetRPCMethods::Peer(response_channel, peer) => {
500                    let addresses = swarm.behaviour().peer_addresses().get(&peer).cloned();
501                    response_channel.send_or_warn(addresses);
502                }
503                NetRPCMethods::Peers(response_channel) => {
504                    let peer_addresses = swarm.behaviour().peer_addresses();
505                    response_channel.send_or_warn(peer_addresses);
506                }
507                NetRPCMethods::ProtectPeer(tx, peer_ids) => {
508                    peer_ids.into_iter().for_each(|peer_id| {
509                        peer_manager.protect_peer(peer_id);
510                    });
511                    tx.send_or_warn(());
512                }
513                NetRPCMethods::ListProtectedPeers(tx) => {
514                    tx.send_or_warn(peer_manager.list_protected_peers());
515                }
516                NetRPCMethods::UnprotectPeer(tx, peer_ids) => {
517                    peer_ids.iter().for_each(|peer_id| {
518                        peer_manager.unprotect_peer(peer_id);
519                    });
520                    tx.send_or_warn(());
521                }
522                NetRPCMethods::Info(response_channel) => {
523                    response_channel.send_or_warn(swarm.network_info().into());
524                }
525                NetRPCMethods::Connect(response_channel, peer_id, addresses) => {
526                    let mut success = false;
527                    for mut multiaddr in addresses {
528                        multiaddr.push(Protocol::P2p(peer_id));
529
530                        match Swarm::dial(swarm, multiaddr.clone()) {
531                            Ok(_) => {
532                                info!("Dialed {multiaddr}");
533                                success = true;
534                                break;
535                            }
536                            Err(e) => {
537                                match e {
538                                    DialError::Denied { cause } => {
539                                        // try to get a more specific error cause
540                                        if let Some(cause) = cause.downcast_ref::<Exceeded>() {
541                                            error!(
542                                                "Denied dialing (limits exceeded) {multiaddr}: {cause}"
543                                            );
544                                        } else {
545                                            error!("Denied dialing {multiaddr}: {cause}")
546                                        }
547                                    }
548                                    e => {
549                                        error!("Failed to dial {multiaddr}: {e}");
550                                    }
551                                };
552                            }
553                        };
554                    }
555
556                    response_channel.send_or_warn(success);
557                }
558                NetRPCMethods::Disconnect(response_channel, peer_id) => {
559                    let _ = Swarm::disconnect_peer_id(swarm, peer_id);
560                    response_channel.send_or_warn(());
561                }
562                NetRPCMethods::AgentVersion(response_channel, peer_id) => {
563                    let agent_version = swarm.behaviour().peer_info(&peer_id).and_then(|info| {
564                        info.identify_info
565                            .as_ref()
566                            .map(|id| id.agent_version.clone())
567                    });
568                    response_channel.send_or_warn(agent_version);
569                }
570                NetRPCMethods::AutoNATStatus(response_channel) => {
571                    let nat_status = swarm.behaviour().discovery.nat_status();
572                    response_channel.send_or_warn(nat_status);
573                }
574            }
575        }
576    }
577}
578
579async fn handle_discovery_event(
580    peer_info_map: &HashMap<PeerId, PeerInfo>,
581    discovery_out: DiscoveryEvent,
582    network_sender_out: &Sender<NetworkEvent>,
583    peer_manager: &PeerManager,
584) {
585    match discovery_out {
586        DiscoveryEvent::PeerConnected(peer_id) => {
587            trace!("Peer connected, {peer_id}");
588            emit_event(network_sender_out, NetworkEvent::PeerConnected(peer_id)).await;
589        }
590        DiscoveryEvent::PeerDisconnected(peer_id) => {
591            trace!("Peer disconnected, {peer_id}");
592            emit_event(network_sender_out, NetworkEvent::PeerDisconnected(peer_id)).await;
593        }
594        DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
595            DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
596                peer_id,
597                info,
598                ..
599            }) => {
600                let protocols = HashSet::from_iter(info.protocols.iter().map(|p| p.to_string()));
601                if !protocols.contains(super::hello::HELLO_PROTOCOL_NAME) {
602                    peer_manager
603                        .ban_peer_with_default_duration(
604                            *peer_id,
605                            "hello protocol unsupported",
606                            |p| get_user_agent(peer_info_map, p),
607                        )
608                        .await;
609                } else if !protocols.contains(super::chain_exchange::CHAIN_EXCHANGE_PROTOCOL_NAME) {
610                    peer_manager
611                        .ban_peer_with_default_duration(
612                            *peer_id,
613                            "chain exchange protocol unsupported",
614                            |p| get_user_agent(peer_info_map, p),
615                        )
616                        .await;
617                }
618            }
619            DerivedDiscoveryBehaviourEvent::Identify(_) => {}
620            _ => {}
621        },
622    }
623}
624
625async fn handle_gossip_event(
626    e: gossipsub::Event,
627    network_sender_out: &Sender<NetworkEvent>,
628    pubsub_block_str: &str,
629    pubsub_msg_str: &str,
630) {
631    if let gossipsub::Event::Message {
632        propagation_source: source,
633        message,
634        ..
635    } = e
636    {
637        let topic = message.topic.as_str();
638        let message = message.data;
639        trace!("Got a Gossip Message from {:?}", source);
640        if topic == pubsub_block_str {
641            match from_slice_with_fallback::<GossipBlock>(&message) {
642                Ok(b) => {
643                    emit_event(
644                        network_sender_out,
645                        NetworkEvent::PubsubMessage {
646                            message: PubsubMessage::Block(b),
647                        },
648                    )
649                    .await;
650                }
651                Err(e) => {
652                    warn!("Gossip Block from peer {source:?} could not be deserialized: {e}",);
653                }
654            }
655        } else if topic == pubsub_msg_str {
656            match from_slice_with_fallback::<SignedMessage>(&message) {
657                Ok(m) => {
658                    emit_event(
659                        network_sender_out,
660                        NetworkEvent::PubsubMessage {
661                            message: PubsubMessage::Message(m),
662                        },
663                    )
664                    .await;
665                }
666                Err(e) => {
667                    warn!("Gossip Message from peer {source:?} could not be deserialized: {e}");
668                }
669            }
670        } else {
671            warn!("Getting gossip messages from unknown topic: {topic}");
672        }
673    }
674}
675
676async fn handle_hello_event(
677    peer_info_map: &HashMap<PeerId, PeerInfo>,
678    hello: &mut HelloBehaviour,
679    event: request_response::Event<HelloRequest, HelloResponse, HelloResponse>,
680    peer_manager: &PeerManager,
681    genesis_cid: &Cid,
682    network_sender_out: &Sender<NetworkEvent>,
683) {
684    match event {
685        request_response::Event::Message { peer, message, .. } => match message {
686            request_response::Message::Request {
687                request, channel, ..
688            } => {
689                emit_event(network_sender_out, NetworkEvent::HelloRequestInbound).await;
690
691                let arrival = SystemTime::now()
692                    .duration_since(UNIX_EPOCH)
693                    .expect("System time before unix epoch")
694                    .as_nanos()
695                    .try_into()
696                    .expect("System time since unix epoch should not exceed u64");
697
698                trace!("Received hello request: {:?}", request);
699                if &request.genesis_cid != genesis_cid {
700                    peer_manager
701                        .ban_peer_with_default_duration(
702                            peer,
703                            format!(
704                                "Genesis hash mismatch: {} received, {genesis_cid} expected",
705                                request.genesis_cid
706                            ),
707                            |p| get_user_agent(peer_info_map, p),
708                        )
709                        .await;
710                } else {
711                    let sent = SystemTime::now()
712                        .duration_since(UNIX_EPOCH)
713                        .expect("System time before unix epoch")
714                        .as_nanos()
715                        .try_into()
716                        .expect("System time since unix epoch should not exceed u64");
717
718                    // Send hello response immediately, no need to have the overhead of emitting
719                    // channel and polling future here.
720                    if let Err(e) = hello.send_response(channel, HelloResponse { arrival, sent }) {
721                        warn!("Failed to send HelloResponse: {e:?}");
722                    } else {
723                        emit_event(
724                            network_sender_out,
725                            NetworkEvent::HelloResponseOutbound {
726                                source: peer,
727                                request,
728                            },
729                        )
730                        .await;
731                    }
732                }
733            }
734            request_response::Message::Response {
735                request_id,
736                response,
737            } => {
738                emit_event(network_sender_out, NetworkEvent::HelloResponseInbound).await;
739                hello.handle_response(&request_id, response).await;
740            }
741        },
742        request_response::Event::OutboundFailure {
743            request_id,
744            peer,
745            error,
746            ..
747        } => {
748            hello.on_outbound_failure(&request_id);
749            match error {
750                request_response::OutboundFailure::UnsupportedProtocols => {
751                    peer_manager
752                        .ban_peer_with_default_duration(peer, "Hello protocol unsupported", |p| {
753                            get_user_agent(peer_info_map, p)
754                        })
755                        .await;
756                }
757                _ => {
758                    peer_manager.mark_peer_bad(peer, format!("Hello outbound failure {error}"));
759                }
760            }
761        }
762        request_response::Event::InboundFailure { .. } => {}
763        request_response::Event::ResponseSent { .. } => (),
764    }
765}
766
767async fn handle_ping_event(ping_event: ping::Event) {
768    match ping_event.result {
769        Ok(rtt) => {
770            trace!(
771                "PingSuccess::Ping rtt to {} is {} ms",
772                ping_event.peer,
773                rtt.as_millis()
774            );
775        }
776        Err(ping::Failure::Unsupported) => {
777            debug!(peer=%ping_event.peer, "Ping protocol unsupported");
778        }
779        Err(ping::Failure::Timeout) => {
780            debug!("Ping timeout: {}", ping_event.peer);
781        }
782        Err(ping::Failure::Other { error }) => {
783            debug!("Ping failure: {error}");
784        }
785    }
786}
787
788async fn handle_chain_exchange_event<DB>(
789    chain_exchange: &mut ChainExchangeBehaviour,
790    ce_event: request_response::Event<ChainExchangeRequest, ChainExchangeResponse>,
791    db: &Arc<ChainStore<DB>>,
792    network_sender_out: &Sender<NetworkEvent>,
793    cx_response_tx: Sender<(
794        request_response::InboundRequestId,
795        request_response::ResponseChannel<ChainExchangeResponse>,
796        ChainExchangeResponse,
797    )>,
798) where
799    DB: Blockstore + Sync + Send + 'static,
800{
801    match ce_event {
802        request_response::Event::Message { peer, message, .. } => match message {
803            request_response::Message::Request {
804                request,
805                channel,
806                request_id,
807            } => {
808                trace!(
809                    "Received chain_exchange request (request_id:{request_id}, peer_id: {peer:?})",
810                );
811                emit_event(
812                    network_sender_out,
813                    NetworkEvent::ChainExchangeRequestInbound,
814                )
815                .await;
816
817                let db = db.clone();
818                tokio::task::spawn(async move {
819                    if let Err(e) = cx_response_tx.send((
820                        request_id,
821                        channel,
822                        make_chain_exchange_response(&db, &request),
823                    )) {
824                        debug!("Failed to send ChainExchangeResponse: {e:?}");
825                    }
826                });
827            }
828            request_response::Message::Response {
829                request_id,
830                response,
831            } => {
832                emit_event(
833                    network_sender_out,
834                    NetworkEvent::ChainExchangeResponseInbound,
835                )
836                .await;
837                chain_exchange
838                    .handle_inbound_response(&request_id, response)
839                    .await;
840            }
841        },
842        request_response::Event::OutboundFailure {
843            request_id, error, ..
844        } => {
845            chain_exchange.on_outbound_error(&request_id, error);
846        }
847        request_response::Event::InboundFailure { peer, error, .. } => {
848            debug!(
849                "ChainExchange inbound error (peer: {:?}): {:?}",
850                peer, error
851            );
852        }
853        request_response::Event::ResponseSent { .. } => {
854            emit_event(
855                network_sender_out,
856                NetworkEvent::ChainExchangeResponseOutbound,
857            )
858            .await;
859        }
860    }
861}
862
863#[allow(clippy::too_many_arguments)]
864async fn handle_forest_behaviour_event<DB>(
865    swarm: &mut Swarm<ForestBehaviour>,
866    bitswap_request_manager: &Arc<BitswapRequestManager>,
867    peer_manager: &PeerManager,
868    event: ForestBehaviourEvent,
869    db: &Arc<ChainStore<DB>>,
870    genesis_cid: &Cid,
871    network_sender_out: &Sender<NetworkEvent>,
872    cx_response_tx: Sender<(
873        request_response::InboundRequestId,
874        request_response::ResponseChannel<ChainExchangeResponse>,
875        ChainExchangeResponse,
876    )>,
877    pubsub_block_str: &str,
878    pubsub_msg_str: &str,
879) where
880    DB: Blockstore + BitswapStoreRead + Sync + Send + 'static,
881{
882    match event {
883        ForestBehaviourEvent::Discovery(discovery_out) => {
884            handle_discovery_event(
885                &swarm.behaviour().discovery.peer_info,
886                discovery_out,
887                network_sender_out,
888                peer_manager,
889            )
890            .await
891        }
892        ForestBehaviourEvent::Gossipsub(e) => {
893            handle_gossip_event(e, network_sender_out, pubsub_block_str, pubsub_msg_str).await
894        }
895        ForestBehaviourEvent::Hello(rr_event) => {
896            let behaviour_mut = swarm.behaviour_mut();
897            handle_hello_event(
898                &behaviour_mut.discovery.peer_info,
899                &mut behaviour_mut.hello,
900                rr_event,
901                peer_manager,
902                genesis_cid,
903                network_sender_out,
904            )
905            .await
906        }
907        ForestBehaviourEvent::Bitswap(event) => {
908            if let Err(e) = bitswap_request_manager.handle_event(
909                &mut swarm.behaviour_mut().bitswap,
910                db.blockstore(),
911                event,
912            ) {
913                warn!("bitswap: {e}");
914            }
915        }
916        ForestBehaviourEvent::Ping(ping_event) => handle_ping_event(ping_event).await,
917        ForestBehaviourEvent::ConnectionLimits(_) => {}
918        ForestBehaviourEvent::BlockedPeers(_) => {}
919        ForestBehaviourEvent::ChainExchange(ce_event) => {
920            handle_chain_exchange_event(
921                &mut swarm.behaviour_mut().chain_exchange,
922                ce_event,
923                db,
924                network_sender_out,
925                cx_response_tx,
926            )
927            .await
928        }
929    }
930}
931
932async fn emit_event(sender: &Sender<NetworkEvent>, event: NetworkEvent) {
933    if sender.send_async(event).await.is_err() {
934        error!("Failed to emit event: Network channel receiver has been dropped");
935    }
936}
937
938fn get_user_agent(peer_info_map: &HashMap<PeerId, PeerInfo>, peer: &PeerId) -> Option<String> {
939    peer_info_map
940        .get(peer)
941        .and_then(|i| i.identify_info.as_ref())
942        .map(|i| i.agent_version.clone())
943}