1use crate::{
2 codecs::{
3 RequestResponseProtocols,
4 postcard::PostcardCodec,
5 request_response::RequestResponseMessageHandler,
6 },
7 config::Config,
8 connection_limits,
9 connection_limits::ConnectionLimits,
10 discovery,
11 gossipsub::config::build_gossipsub_behaviour,
12 heartbeat,
13 limited_behaviour::LimitedBehaviour,
14 peer_report,
15 request_response::messages::{
16 RequestMessage,
17 V2ResponseMessage,
18 },
19};
20use fuel_core_types::fuel_types::BlockHeight;
21use libp2p::{
22 Multiaddr,
23 PeerId,
24 allow_block_list,
25 gossipsub::{
26 self,
27 MessageAcceptance,
28 MessageId,
29 PublishError,
30 TopicHash,
31 },
32 identify,
33 request_response::{
34 self,
35 OutboundRequestId,
36 ProtocolSupport,
37 ResponseChannel,
38 },
39 swarm::NetworkBehaviour,
40};
41use std::collections::HashSet;
42
43const MAX_PENDING_INCOMING_CONNECTIONS: u32 = 100;
44const MAX_PENDING_OUTGOING_CONNECTIONS: u32 = 100;
45
46#[derive(NetworkBehaviour)]
48pub struct FuelBehaviour {
49 blocked_peer: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,
52
53 connection_limits: connection_limits::Behaviour,
55
56 discovery: discovery::Behaviour,
58
59 gossipsub: LimitedBehaviour<gossipsub::Behaviour>,
61
62 heartbeat: LimitedBehaviour<heartbeat::Behaviour>,
64
65 identify: LimitedBehaviour<identify::Behaviour>,
67
68 peer_report: LimitedBehaviour<peer_report::Behaviour>,
70
71 request_response: LimitedBehaviour<
73 request_response::Behaviour<RequestResponseMessageHandler<PostcardCodec>>,
74 >,
75}
76
77impl FuelBehaviour {
78 pub(crate) fn new(
79 p2p_config: &Config,
80 request_response_codec: RequestResponseMessageHandler<PostcardCodec>,
81 ) -> anyhow::Result<Self> {
82 let local_public_key = p2p_config.keypair.public();
83 let local_peer_id = PeerId::from_public_key(&local_public_key);
84
85 let discovery_config = {
86 let mut discovery_config =
87 discovery::Config::new(local_peer_id, p2p_config.network_name.clone());
88
89 discovery_config
90 .enable_mdns(p2p_config.enable_mdns)
91 .max_peers_connected(p2p_config.max_discovery_peers_connected as usize)
92 .with_bootstrap_nodes(p2p_config.bootstrap_nodes.clone())
93 .with_reserved_nodes(p2p_config.reserved_nodes.clone())
94 .enable_reserved_nodes_only_mode(p2p_config.reserved_nodes_only_mode);
95
96 if let Some(random_walk) = p2p_config.random_walk {
97 discovery_config.with_random_walk(random_walk);
98 }
99
100 if let Some(duration) = p2p_config.connection_idle_timeout {
101 discovery_config.set_connection_idle_timeout(duration);
102 }
103
104 discovery_config
105 };
106
107 let gossipsub = build_gossipsub_behaviour(p2p_config);
108
109 let peer_report = peer_report::Behaviour::new(&p2p_config.reserved_nodes);
110 let reserved_peer_ids: HashSet<_> = peer_report
111 .reserved_nodes_multiaddr()
112 .keys()
113 .cloned()
114 .collect();
115
116 let identify = {
117 let identify_config = identify::Config::new(
118 "/fuel/1.0".to_string(),
119 p2p_config.keypair.public(),
120 );
121 if let Some(interval) = p2p_config.identify_interval {
122 identify::Behaviour::new(identify_config.with_interval(interval))
123 } else {
124 identify::Behaviour::new(identify_config)
125 }
126 };
127
128 let heartbeat = heartbeat::Behaviour::new(
129 p2p_config.heartbeat_config.clone(),
130 BlockHeight::default(),
131 );
132
133 let connection_limits = connection_limits::Behaviour::new(
134 ConnectionLimits::default()
135 .with_max_pending_incoming(Some(MAX_PENDING_INCOMING_CONNECTIONS))
136 .with_max_pending_outgoing(Some(
137 p2p_config
138 .max_outgoing_connections
139 .min(MAX_PENDING_OUTGOING_CONNECTIONS),
140 ))
141 .with_max_established_outgoing(Some(p2p_config.max_outgoing_connections))
142 .with_max_established_per_peer(p2p_config.max_connections_per_peer)
143 .with_max_established(Some(p2p_config.max_discovery_peers_connected)),
144 reserved_peer_ids,
145 );
146 let connections = connection_limits.connections();
147
148 let req_res_protocol = request_response_codec
149 .get_req_res_protocols()
150 .map(|protocol| (protocol, ProtocolSupport::Full));
151
152 let req_res_config = request_response::Config::default()
153 .with_request_timeout(p2p_config.set_request_timeout)
154 .with_max_concurrent_streams(p2p_config.max_concurrent_streams);
155
156 let request_response = request_response::Behaviour::with_codec(
157 request_response_codec.clone(),
158 req_res_protocol,
159 req_res_config,
160 );
161
162 let discovery = discovery_config.finish()?;
163 let limit = p2p_config.max_functional_peers_connected as usize;
164
165 let gossipsub = LimitedBehaviour::new(limit, connections.clone(), gossipsub);
166 let peer_report = LimitedBehaviour::new(limit, connections.clone(), peer_report);
167 let identify = LimitedBehaviour::new(limit, connections.clone(), identify);
168 let heartbeat = LimitedBehaviour::new(limit, connections.clone(), heartbeat);
169 let request_response =
170 LimitedBehaviour::new(limit, connections.clone(), request_response);
171
172 Ok(Self {
173 discovery,
174 gossipsub,
175 peer_report,
176 request_response,
177 blocked_peer: Default::default(),
178 identify,
179 heartbeat,
180 connection_limits,
181 })
182 }
183
184 pub fn add_addresses_to_discovery(
185 &mut self,
186 peer_id: &PeerId,
187 addresses: Vec<Multiaddr>,
188 ) {
189 for address in addresses {
190 self.discovery.add_address(peer_id, address.clone());
191 }
192 }
193
194 pub fn publish_message(
195 &mut self,
196 topic_hash: TopicHash,
197 encoded_data: Vec<u8>,
198 ) -> Result<MessageId, PublishError> {
199 self.gossipsub.publish(topic_hash, encoded_data)
200 }
201
202 pub fn send_request_msg(
203 &mut self,
204 message_request: RequestMessage,
205 peer_id: &PeerId,
206 ) -> OutboundRequestId {
207 self.request_response.send_request(peer_id, message_request)
208 }
209
210 pub fn send_response_msg(
211 &mut self,
212 channel: ResponseChannel<V2ResponseMessage>,
213 message: V2ResponseMessage,
214 ) -> Result<(), V2ResponseMessage> {
215 self.request_response.send_response(channel, message)
216 }
217
218 pub fn report_message_validation_result(
219 &mut self,
220 msg_id: &MessageId,
221 propagation_source: &PeerId,
222 acceptance: MessageAcceptance,
223 ) -> Option<f64> {
224 let should_check_score = matches!(acceptance, MessageAcceptance::Reject);
225
226 match self.gossipsub.report_message_validation_result(
227 msg_id,
228 propagation_source,
229 acceptance,
230 ) {
231 Ok(true) => {
232 tracing::debug!(target: "fuel-p2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, propagation_source);
233 if should_check_score {
234 return self.gossipsub.peer_score(propagation_source);
235 }
236 }
237 Ok(false) => {
238 tracing::warn!(target: "fuel-p2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id);
239 }
240 Err(e) => {
241 tracing::error!(target: "fuel-p2p", "Failed to report Message with MessageId: {} with Error: {:?}", msg_id, e);
242 }
243 }
244
245 None
246 }
247
248 pub fn update_block_height(&mut self, block_height: BlockHeight) {
249 self.heartbeat.update_block_height(block_height);
250 }
251
252 #[cfg(test)]
253 pub fn get_peer_score(&self, peer_id: &PeerId) -> Option<f64> {
254 self.gossipsub.peer_score(peer_id)
255 }
256
257 pub fn block_peer(&mut self, peer_id: PeerId) {
258 self.blocked_peer.block_peer(peer_id)
259 }
260}