1use std::{
5 num::NonZeroUsize,
6 sync::{Arc, LazyLock},
7};
8
9use super::{
10 PeerManager,
11 discovery::{DerivedDiscoveryBehaviourEvent, DiscoveryEvent, PeerInfo},
12};
13use crate::libp2p_bitswap::BitswapBehaviour;
14use crate::utils::{encoding::blake2b_256, version::FOREST_VERSION_STRING};
15use crate::{
16 libp2p::{
17 chain_exchange::ChainExchangeBehaviour,
18 config::Libp2pConfig,
19 discovery::{DiscoveryBehaviour, DiscoveryConfig},
20 gossip_params::{build_peer_score_params, build_peer_score_threshold},
21 hello::HelloBehaviour,
22 },
23 networks::GenesisNetworkName,
24};
25use ahash::{HashMap, HashSet};
26use libp2p::{
27 Multiaddr, allow_block_list, connection_limits,
28 gossipsub::{
29 self, IdentTopic as Topic, MessageAuthenticity, MessageId, PublishError, SubscriptionError,
30 ValidationMode,
31 },
32 identity::{Keypair, PeerId},
33 kad::QueryId,
34 metrics::{Metrics, Recorder},
35 ping, request_response,
36 swarm::NetworkBehaviour,
37};
38use tracing::info;
39
40#[derive(NetworkBehaviour)]
43pub(in crate::libp2p) struct ForestBehaviour {
44 connection_limits: connection_limits::Behaviour,
47 pub(super) blocked_peers: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,
48 pub(super) discovery: DiscoveryBehaviour,
49 ping: ping::Behaviour,
50 gossipsub: gossipsub::Behaviour,
51 pub(super) hello: HelloBehaviour,
52 pub(super) chain_exchange: ChainExchangeBehaviour,
53 pub(super) bitswap: BitswapBehaviour,
54}
55
56impl Recorder<ForestBehaviourEvent> for Metrics {
57 fn record(&self, event: &ForestBehaviourEvent) {
58 match event {
59 ForestBehaviourEvent::Gossipsub(e) => self.record(e),
60 ForestBehaviourEvent::Ping(ping_event) => self.record(ping_event),
61 ForestBehaviourEvent::Discovery(DiscoveryEvent::Discovery(e)) => match e.as_ref() {
62 DerivedDiscoveryBehaviourEvent::Identify(e) => self.record(e),
63 DerivedDiscoveryBehaviourEvent::Kademlia(e) => self.record(e),
64 _ => {}
65 },
66 _ => {}
67 }
68 }
69}
70
71impl ForestBehaviour {
72 pub async fn new(
73 local_key: &Keypair,
74 config: &Libp2pConfig,
75 network_name: &GenesisNetworkName,
76 peer_manager: Arc<PeerManager>,
77 ) -> anyhow::Result<Self> {
78 const MAX_ESTABLISHED_PER_PEER: u32 = 4;
79 static MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER: LazyLock<usize> = LazyLock::new(
80 || {
81 std::env::var("FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER")
82 .ok()
83 .map(|it|
84 it.parse::<NonZeroUsize>()
85 .expect("Failed to parse the `FOREST_MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER` environment variable value, a positive integer is expected.")
86 .get())
87 .unwrap_or(10)
88 },
89 );
90
91 let max_concurrent_request_response_streams = (config.target_peer_count as usize)
92 .saturating_mul(*MAX_CONCURRENT_REQUEST_RESPONSE_STREAMS_PER_PEER);
93
94 let mut gs_config_builder = gossipsub::ConfigBuilder::default();
95 gs_config_builder.max_transmit_size(1 << 20);
96 gs_config_builder.validation_mode(ValidationMode::Strict);
97 gs_config_builder.message_id_fn(|msg: &gossipsub::Message| {
98 let s = blake2b_256(&msg.data);
99 MessageId::from(s)
100 });
101
102 let gossipsub_config = gs_config_builder.build().unwrap();
103 let mut gossipsub = gossipsub::Behaviour::new(
104 MessageAuthenticity::Signed(local_key.clone()),
105 gossipsub_config,
106 )
107 .unwrap();
108
109 gossipsub
110 .with_peer_score(
111 build_peer_score_params(network_name),
112 build_peer_score_threshold(),
113 )
114 .unwrap();
115
116 let bitswap = BitswapBehaviour::new(
117 &[
118 "/chain/ipfs/bitswap/1.2.0",
119 "/chain/ipfs/bitswap/1.1.0",
120 "/chain/ipfs/bitswap/1.0.0",
121 "/chain/ipfs/bitswap",
122 ],
123 request_response::Config::default()
124 .with_max_concurrent_streams(max_concurrent_request_response_streams),
125 );
126 crate::libp2p_bitswap::register_metrics(&mut crate::metrics::collector_registry());
127
128 let discovery = DiscoveryConfig::new(local_key.public(), network_name)
129 .with_mdns(config.mdns)
130 .with_kademlia(config.kademlia)
131 .with_user_defined(config.bootstrap_peers.clone())
132 .await?
133 .target_peer_count(config.target_peer_count as u64)
134 .finish()?;
135
136 let connection_limits = connection_limits::Behaviour::new(
137 connection_limits::ConnectionLimits::default()
138 .with_max_pending_incoming(Some(
139 config
140 .target_peer_count
141 .saturating_mul(MAX_ESTABLISHED_PER_PEER),
142 ))
143 .with_max_pending_outgoing(Some(
144 config
145 .target_peer_count
146 .saturating_mul(MAX_ESTABLISHED_PER_PEER),
147 ))
148 .with_max_established_incoming(Some(
149 config
150 .target_peer_count
151 .saturating_mul(MAX_ESTABLISHED_PER_PEER),
152 ))
153 .with_max_established_outgoing(Some(
154 config
155 .target_peer_count
156 .saturating_mul(MAX_ESTABLISHED_PER_PEER),
157 ))
158 .with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER)),
159 );
160
161 info!("libp2p Forest version: {}", FOREST_VERSION_STRING.as_str());
162 Ok(ForestBehaviour {
163 gossipsub,
164 discovery,
165 ping: Default::default(),
166 connection_limits,
167 blocked_peers: Default::default(),
168 bitswap,
169 hello: HelloBehaviour::new(
170 request_response::Config::default()
171 .with_max_concurrent_streams(max_concurrent_request_response_streams),
172 peer_manager,
173 ),
174 chain_exchange: ChainExchangeBehaviour::new(
175 request_response::Config::default()
176 .with_max_concurrent_streams(max_concurrent_request_response_streams),
177 ),
178 })
179 }
180
181 pub fn bootstrap(&mut self) -> Result<QueryId, String> {
183 self.discovery.bootstrap()
184 }
185
186 pub fn publish(
188 &mut self,
189 topic: Topic,
190 data: impl Into<Vec<u8>>,
191 ) -> Result<MessageId, PublishError> {
192 self.gossipsub.publish(topic, data)
193 }
194
195 pub fn subscribe(&mut self, topic: &Topic) -> Result<bool, SubscriptionError> {
197 self.gossipsub.subscribe(topic)
198 }
199
200 pub fn peers(&self) -> &HashSet<PeerId> {
202 self.discovery.peers()
203 }
204
205 pub fn peer_addresses(&self) -> HashMap<PeerId, HashSet<Multiaddr>> {
207 self.discovery.peer_addresses()
208 }
209
210 pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
211 self.discovery.peer_info(peer_id)
212 }
213}