forest/libp2p/
behaviour.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    num::NonZeroUsize,
6    sync::{Arc, LazyLock},
7};
8
9use super::{
10    PeerManager,
11    discovery::{DerivedDiscoveryBehaviourEvent, DiscoveryEvent, PeerInfo},
12};
13use crate::libp2p_bitswap::BitswapBehaviour;
14use crate::utils::{encoding::blake2b_256, version::FOREST_VERSION_STRING};
15use crate::{
16    libp2p::{
17        chain_exchange::ChainExchangeBehaviour,
18        config::Libp2pConfig,
19        discovery::{DiscoveryBehaviour, DiscoveryConfig},
20        gossip_params::{build_peer_score_params, build_peer_score_threshold},
21        hello::HelloBehaviour,
22    },
23    networks::GenesisNetworkName,
24};
25use ahash::{HashMap, HashSet};
26use libp2p::{
27    Multiaddr, allow_block_list, connection_limits,
28    gossipsub::{
29        self, IdentTopic as Topic, MessageAuthenticity, MessageId, PublishError, SubscriptionError,
30        ValidationMode,
31    },
32    identity::{Keypair, PeerId},
33    kad::QueryId,
34    metrics::{Metrics, Recorder},
35    ping, request_response,
36    swarm::NetworkBehaviour,
37};
38use tracing::info;
39
40/// Libp2p behavior for the Forest node. This handles all sub protocols needed
41/// for a Filecoin node.
42#[derive(NetworkBehaviour)]
43pub(in crate::libp2p) struct ForestBehaviour {
44    // Behaviours that manage connections should come first, to get rid of some panics in debug build.
45    // See <https://github.com/libp2p/rust-libp2p/issues/4773#issuecomment-2042676966>
46    connection_limits: connection_limits::Behaviour,
47    pub(super) blocked_peers: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,
48    pub(super) discovery: DiscoveryBehaviour,
49    ping: ping::Behaviour,
50    gossipsub: gossipsub::Behaviour,
51    pub(super) hello: HelloBehaviour,
52    pub(super) chain_exchange: ChainExchangeBehaviour,
53    pub(super) bitswap: BitswapBehaviour,
54}
55
56impl Recorder<ForestBehaviourEvent> for Metrics {
57    fn record(&self, event: &ForestBehaviourEvent) {
58        match event {
59            ForestBehaviourEvent::Gossipsub(e) => self.record(e),
60            ForestBehaviourEvent::Ping(ping_event) => self.record(ping_event),
61            ForestBehaviourEvent::Discovery(DiscoveryEvent::Discovery(e)) => match e.as_ref() {
62                DerivedDiscoveryBehaviourEvent::Identify(e) => self.record(e),
63                DerivedDiscoveryBehaviourEvent::Kademlia(e) => self.record(e),
64                _ => {}
65            },
66            _ => {}
67        }
68    }
69}
70
71impl ForestBehaviour {
72    pub async fn new(
73        local_key: &Keypair,
74        config: &Libp2pConfig,
75        network_name: &GenesisNetworkName,
76        peer_manager: Arc<PeerManager>,
77    ) -> anyhow::Result<Self> {
78        const MAX_ESTABLISHED_PER_PEER: u32 = 4;
79        static MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER: LazyLock<usize> = LazyLock::new(
80            || {
81                std::env::var("FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER")
82                .ok()
83                .map(|it|
84                    it.parse::<NonZeroUsize>()
85                        .expect("Failed to parse the `FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER` environment variable value, a positive integer is expected.")
86                        .get())
87                .unwrap_or(10)
88            },
89        );
90
91        let max_concurrent_request_response_streams = (config.target_peer_count as usize)
92            .saturating_mul(*MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER);
93
94        let mut gs_config_builder = gossipsub::ConfigBuilder::default();
95        gs_config_builder.max_transmit_size(1 << 20);
96        gs_config_builder.validation_mode(ValidationMode::Strict);
97        gs_config_builder.message_id_fn(|msg: &gossipsub::Message| {
98            let s = blake2b_256(&msg.data);
99            MessageId::from(s)
100        });
101
102        let gossipsub_config = gs_config_builder.build().unwrap();
103        let mut gossipsub = gossipsub::Behaviour::new(
104            MessageAuthenticity::Signed(local_key.clone()),
105            gossipsub_config,
106        )
107        .unwrap();
108
109        gossipsub
110            .with_peer_score(
111                build_peer_score_params(network_name),
112                build_peer_score_threshold(),
113            )
114            .unwrap();
115
116        let bitswap = BitswapBehaviour::new(
117            &[
118                "/chain/ipfs/bitswap/1.2.0",
119                "/chain/ipfs/bitswap/1.1.0",
120                "/chain/ipfs/bitswap/1.0.0",
121                "/chain/ipfs/bitswap",
122            ],
123            request_response::Config::default()
124                .with_max_concurrent_streams(max_concurrent_request_response_streams),
125        );
126        crate::libp2p_bitswap::register_metrics(&mut crate::metrics::collector_registry());
127
128        let discovery = DiscoveryConfig::new(local_key.public(), network_name)
129            .with_mdns(config.mdns)
130            .with_kademlia(config.kademlia)
131            .with_user_defined(config.bootstrap_peers.clone())
132            .await?
133            .target_peer_count(config.target_peer_count as u64)
134            .finish()?;
135
136        let connection_limits = connection_limits::Behaviour::new(
137            connection_limits::ConnectionLimits::default()
138                .with_max_pending_incoming(Some(
139                    config
140                        .target_peer_count
141                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
142                ))
143                .with_max_pending_outgoing(Some(
144                    config
145                        .target_peer_count
146                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
147                ))
148                .with_max_established_incoming(Some(
149                    config
150                        .target_peer_count
151                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
152                ))
153                .with_max_established_outgoing(Some(
154                    config
155                        .target_peer_count
156                        .saturating_mul(MAX_ESTABLISHED_PER_PEER),
157                ))
158                .with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER)),
159        );
160
161        info!("libp2p Forest version: {}", FOREST_VERSION_STRING.as_str());
162        Ok(ForestBehaviour {
163            gossipsub,
164            discovery,
165            ping: Default::default(),
166            connection_limits,
167            blocked_peers: Default::default(),
168            bitswap,
169            hello: HelloBehaviour::new(
170                request_response::Config::default()
171                    .with_max_concurrent_streams(max_concurrent_request_response_streams),
172                peer_manager,
173            ),
174            chain_exchange: ChainExchangeBehaviour::new(
175                request_response::Config::default()
176                    .with_max_concurrent_streams(max_concurrent_request_response_streams),
177            ),
178        })
179    }
180
181    /// Bootstrap Kademlia network
182    pub fn bootstrap(&mut self) -> Result<QueryId, String> {
183        self.discovery.bootstrap()
184    }
185
186    /// Publish data over the gossip network.
187    pub fn publish(
188        &mut self,
189        topic: Topic,
190        data: impl Into<Vec<u8>>,
191    ) -> Result<MessageId, PublishError> {
192        self.gossipsub.publish(topic, data)
193    }
194
195    /// Subscribe to a gossip topic.
196    pub fn subscribe(&mut self, topic: &Topic) -> Result<bool, SubscriptionError> {
197        self.gossipsub.subscribe(topic)
198    }
199
200    /// Returns a set of peer ids
201    pub fn peers(&self) -> &HashSet<PeerId> {
202        self.discovery.peers()
203    }
204
205    /// Returns a map of peer ids and their multi-addresses
206    pub fn peer_addresses(&self) -> HashMap<PeerId, HashSet<Multiaddr>> {
207        self.discovery.peer_addresses()
208    }
209
210    pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
211        self.discovery.peer_info(peer_id)
212    }
213}