fuel_core_p2p/
behavior.rs

1use crate::{
2    codecs::{
3        RequestResponseProtocols,
4        postcard::PostcardCodec,
5        request_response::RequestResponseMessageHandler,
6    },
7    config::Config,
8    connection_limits,
9    connection_limits::ConnectionLimits,
10    discovery,
11    gossipsub::config::build_gossipsub_behaviour,
12    heartbeat,
13    limited_behaviour::LimitedBehaviour,
14    peer_report,
15    request_response::messages::{
16        RequestMessage,
17        V2ResponseMessage,
18    },
19};
20use fuel_core_types::fuel_types::BlockHeight;
21use libp2p::{
22    Multiaddr,
23    PeerId,
24    allow_block_list,
25    gossipsub::{
26        self,
27        MessageAcceptance,
28        MessageId,
29        PublishError,
30        TopicHash,
31    },
32    identify,
33    request_response::{
34        self,
35        OutboundRequestId,
36        ProtocolSupport,
37        ResponseChannel,
38    },
39    swarm::NetworkBehaviour,
40};
41use std::collections::HashSet;
42
43const MAX_PENDING_INCOMING_CONNECTIONS: u32 = 100;
44const MAX_PENDING_OUTGOING_CONNECTIONS: u32 = 100;
45
46/// Handles all p2p protocols needed for Fuel.
47#[derive(NetworkBehaviour)]
48pub struct FuelBehaviour {
49    /// **WARNING**: The order of the behaviours is important and fragile, at least for the tests.
50    /// The Behaviour to manage connections to blocked peers.
51    blocked_peer: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,
52
53    /// The Behaviour to manage connection limits.
54    connection_limits: connection_limits::Behaviour,
55
56    /// Node discovery
57    discovery: discovery::Behaviour,
58
59    /// Message propagation for p2p
60    gossipsub: LimitedBehaviour<gossipsub::Behaviour>,
61
62    /// Handles regular heartbeats from peers
63    heartbeat: LimitedBehaviour<heartbeat::Behaviour>,
64
65    /// The Behaviour to identify peers.
66    identify: LimitedBehaviour<identify::Behaviour>,
67
68    /// Identifies and periodically requests `BlockHeight` from connected nodes
69    peer_report: LimitedBehaviour<peer_report::Behaviour>,
70
71    /// RequestResponse protocol
72    request_response: LimitedBehaviour<
73        request_response::Behaviour<RequestResponseMessageHandler<PostcardCodec>>,
74    >,
75}
76
77impl FuelBehaviour {
78    pub(crate) fn new(
79        p2p_config: &Config,
80        request_response_codec: RequestResponseMessageHandler<PostcardCodec>,
81    ) -> anyhow::Result<Self> {
82        let local_public_key = p2p_config.keypair.public();
83        let local_peer_id = PeerId::from_public_key(&local_public_key);
84
85        let discovery_config = {
86            let mut discovery_config =
87                discovery::Config::new(local_peer_id, p2p_config.network_name.clone());
88
89            discovery_config
90                .enable_mdns(p2p_config.enable_mdns)
91                .max_peers_connected(p2p_config.max_discovery_peers_connected as usize)
92                .with_bootstrap_nodes(p2p_config.bootstrap_nodes.clone())
93                .with_reserved_nodes(p2p_config.reserved_nodes.clone())
94                .enable_reserved_nodes_only_mode(p2p_config.reserved_nodes_only_mode);
95
96            if let Some(random_walk) = p2p_config.random_walk {
97                discovery_config.with_random_walk(random_walk);
98            }
99
100            if let Some(duration) = p2p_config.connection_idle_timeout {
101                discovery_config.set_connection_idle_timeout(duration);
102            }
103
104            discovery_config
105        };
106
107        let gossipsub = build_gossipsub_behaviour(p2p_config);
108
109        let peer_report = peer_report::Behaviour::new(&p2p_config.reserved_nodes);
110        let reserved_peer_ids: HashSet<_> = peer_report
111            .reserved_nodes_multiaddr()
112            .keys()
113            .cloned()
114            .collect();
115
116        let identify = {
117            let identify_config = identify::Config::new(
118                "/fuel/1.0".to_string(),
119                p2p_config.keypair.public(),
120            );
121            if let Some(interval) = p2p_config.identify_interval {
122                identify::Behaviour::new(identify_config.with_interval(interval))
123            } else {
124                identify::Behaviour::new(identify_config)
125            }
126        };
127
128        let heartbeat = heartbeat::Behaviour::new(
129            p2p_config.heartbeat_config.clone(),
130            BlockHeight::default(),
131        );
132
133        let connection_limits = connection_limits::Behaviour::new(
134            ConnectionLimits::default()
135                .with_max_pending_incoming(Some(MAX_PENDING_INCOMING_CONNECTIONS))
136                .with_max_pending_outgoing(Some(
137                    p2p_config
138                        .max_outgoing_connections
139                        .min(MAX_PENDING_OUTGOING_CONNECTIONS),
140                ))
141                .with_max_established_outgoing(Some(p2p_config.max_outgoing_connections))
142                .with_max_established_per_peer(p2p_config.max_connections_per_peer)
143                .with_max_established(Some(p2p_config.max_discovery_peers_connected)),
144            reserved_peer_ids,
145        );
146        let connections = connection_limits.connections();
147
148        let req_res_protocol = request_response_codec
149            .get_req_res_protocols()
150            .map(|protocol| (protocol, ProtocolSupport::Full));
151
152        let req_res_config = request_response::Config::default()
153            .with_request_timeout(p2p_config.set_request_timeout)
154            .with_max_concurrent_streams(p2p_config.max_concurrent_streams);
155
156        let request_response = request_response::Behaviour::with_codec(
157            request_response_codec.clone(),
158            req_res_protocol,
159            req_res_config,
160        );
161
162        let discovery = discovery_config.finish()?;
163        let limit = p2p_config.max_functional_peers_connected as usize;
164
165        let gossipsub = LimitedBehaviour::new(limit, connections.clone(), gossipsub);
166        let peer_report = LimitedBehaviour::new(limit, connections.clone(), peer_report);
167        let identify = LimitedBehaviour::new(limit, connections.clone(), identify);
168        let heartbeat = LimitedBehaviour::new(limit, connections.clone(), heartbeat);
169        let request_response =
170            LimitedBehaviour::new(limit, connections.clone(), request_response);
171
172        Ok(Self {
173            discovery,
174            gossipsub,
175            peer_report,
176            request_response,
177            blocked_peer: Default::default(),
178            identify,
179            heartbeat,
180            connection_limits,
181        })
182    }
183
184    pub fn add_addresses_to_discovery(
185        &mut self,
186        peer_id: &PeerId,
187        addresses: Vec<Multiaddr>,
188    ) {
189        for address in addresses {
190            self.discovery.add_address(peer_id, address.clone());
191        }
192    }
193
194    pub fn publish_message(
195        &mut self,
196        topic_hash: TopicHash,
197        encoded_data: Vec<u8>,
198    ) -> Result<MessageId, PublishError> {
199        self.gossipsub.publish(topic_hash, encoded_data)
200    }
201
202    pub fn send_request_msg(
203        &mut self,
204        message_request: RequestMessage,
205        peer_id: &PeerId,
206    ) -> OutboundRequestId {
207        self.request_response.send_request(peer_id, message_request)
208    }
209
210    pub fn send_response_msg(
211        &mut self,
212        channel: ResponseChannel<V2ResponseMessage>,
213        message: V2ResponseMessage,
214    ) -> Result<(), V2ResponseMessage> {
215        self.request_response.send_response(channel, message)
216    }
217
218    pub fn report_message_validation_result(
219        &mut self,
220        msg_id: &MessageId,
221        propagation_source: &PeerId,
222        acceptance: MessageAcceptance,
223    ) -> Option<f64> {
224        let should_check_score = matches!(acceptance, MessageAcceptance::Reject);
225
226        match self.gossipsub.report_message_validation_result(
227            msg_id,
228            propagation_source,
229            acceptance,
230        ) {
231            Ok(true) => {
232                tracing::debug!(target: "fuel-p2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, propagation_source);
233                if should_check_score {
234                    return self.gossipsub.peer_score(propagation_source);
235                }
236            }
237            Ok(false) => {
238                tracing::warn!(target: "fuel-p2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id);
239            }
240            Err(e) => {
241                tracing::error!(target: "fuel-p2p", "Failed to report Message with MessageId: {} with Error: {:?}", msg_id, e);
242            }
243        }
244
245        None
246    }
247
248    pub fn update_block_height(&mut self, block_height: BlockHeight) {
249        self.heartbeat.update_block_height(block_height);
250    }
251
252    #[cfg(test)]
253    pub fn get_peer_score(&self, peer_id: &PeerId) -> Option<f64> {
254        self.gossipsub.peer_score(peer_id)
255    }
256
257    pub fn block_peer(&mut self, peer_id: PeerId) {
258        self.blocked_peer.block_peer(peer_id)
259    }
260}