snarkos_node_router/
heartbeat.rs1use crate::{
17 CandidatePeer,
18 ConnectedPeer,
19 NodeType,
20 Outbound,
21 PeerPoolHandling,
22 Router,
23 bootstrap_peers,
24 messages::{DisconnectReason, Message, PeerRequest},
25};
26
27use snarkos_node_tcp::{ConnectError, P2P};
28
29use snarkvm::prelude::Network;
30
31use colored::Colorize;
32use futures::future::join_all;
33use rand::{prelude::IteratorRandom, rngs::OsRng};
34use std::time::Duration;
35use tokio::task::JoinError;
36
37pub const fn max(a: usize, b: usize) -> usize {
40 match a > b {
41 true => a,
42 false => b,
43 }
44}
45
46#[async_trait]
47pub trait Heartbeat<N: Network>: Outbound<N> {
48 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
50 const MINIMUM_NUMBER_OF_PEERS: usize = 3;
52 const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
54 const STARTUP_GRACE_PERIOD: Duration = Duration::from_secs(60);
56 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
58 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
60 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
62 const IP_BAN_TIME_IN_SECS: u64 = 300;
64
65 async fn heartbeat(&self) {
67 self.safety_check_minimum_number_of_peers();
68 self.log_connected_peers();
69
70 self.remove_stale_connected_peers();
72 self.remove_oldest_connected_peer();
74 self.handle_connected_peers().await;
76 self.handle_bootstrap_peers().await;
78 self.handle_trusted_peers().await;
80 self.handle_puzzle_request();
82 self.handle_banned_ips();
84 }
85
86 fn safety_check_minimum_number_of_peers(&self) {
89 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
91 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
92 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
93 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
94 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
95 }
96
97 fn log_connected_peers(&self) {
99 let connected_peers = self.router().connected_peers();
101 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
102 match connected_peers.len() {
103 0 => {
104 if self.router().tcp().uptime() > Self::STARTUP_GRACE_PERIOD {
106 warn!("No connected peers")
107 }
108 }
109 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
110 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
111 }
112 }
113
114 fn remove_stale_connected_peers(&self) {
116 for peer in self.router().get_connected_peers() {
118 let elapsed = peer.last_seen.elapsed();
120 if elapsed > Router::<N>::MAX_RADIO_SILENCE {
121 warn!("Peer '{}' has not communicated in {elapsed:?}", peer.listener_addr);
122 self.router().disconnect(peer.listener_addr);
124 }
125 }
126 }
127
128 fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
136 let is_block_synced = self.is_block_synced();
138
139 let mut peers = self.router().filter_connected_peers(|peer| {
144 !peer.trusted
145 && peer.node_type != NodeType::BootstrapClient
146 && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) });
149 peers.sort_by_key(|peer| peer.last_seen);
150
151 peers
152 }
153
154 fn remove_oldest_connected_peer(&self) {
158 if self.router().trusted_peers_only() {
160 return;
161 }
162
163 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
165 return;
166 }
167
168 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
172 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
173 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
174 self.router().disconnect(oldest);
175 }
176 }
177
178 #[inline]
181 fn log_if_connect_error(result: Result<Result<(), ConnectError>, JoinError>, context: &str) {
182 match result {
183 Ok(Ok(())) => {}
185 Ok(Err(err @ ConnectError::AlreadyConnecting { .. }))
186 | Ok(Err(err @ ConnectError::AlreadyConnected { .. })) => {
187 debug!("{context}: {err}");
189 }
190 Ok(Err(err)) => warn!("{context}: {err}"),
192 Err(err) => error!("{context}: {err}"),
194 }
195 }
196
197 async fn handle_connected_peers(&self) {
199 let rng = &mut OsRng;
201
202 let num_connected = self.router().number_of_connected_peers();
204 let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
206
207 let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
209
210 let num_surplus_peers = num_connected.saturating_sub(max_peers);
212 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
214 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
216 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
218
219 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
220 debug!(
221 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
222 );
223
224 let provers_to_disconnect = self
226 .router()
227 .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
228 .into_iter()
229 .choose_multiple(rng, num_surplus_provers);
230
231 let peers_to_disconnect = self
233 .get_removable_peers()
234 .into_iter()
235 .filter(|peer| !peer.node_type.is_prover()) .take(num_surplus_clients_validators);
237
238 for peer in peers_to_disconnect.chain(provers_to_disconnect) {
240 if self.router().node_type().is_prover() {
242 continue;
243 }
244
245 let peer_addr = peer.listener_addr;
246 info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
247 self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
248 self.router().disconnect(peer_addr);
250 }
251 }
252
253 let num_connected = self.router().number_of_connected_peers();
255 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
257
258 if num_deficient > 0 {
259 let rng = &mut OsRng;
261
262 let own_height = self.router().ledger.latest_block_height();
265 let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
266 .router()
267 .get_candidate_peers()
268 .into_iter()
269 .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
270 let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
272
273 let higher_peers = higher_peers.into_iter().choose_multiple(rng, num_higher_peers);
274 let other_peers =
275 other_peers.into_iter().choose_multiple(rng, num_deficient.saturating_sub(num_higher_peers));
276
277 self.try_connect_to_peers(higher_peers.into_iter().chain(other_peers)).await;
279
280 if !self.router().trusted_peers_only() {
281 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
283 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
284 }
285 }
286 }
287 }
288
289 async fn handle_bootstrap_peers(&self) {
291 if self.router().trusted_peers_only() {
293 return;
294 }
295 let mut candidate_bootstrap = Vec::new();
297 let connected_bootstrap =
298 self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
299 for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
300 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
301 candidate_bootstrap.push(bootstrap_ip);
302 }
303 }
304 if connected_bootstrap.is_empty() {
306 let rng = &mut OsRng;
308 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
310 match self.router().connect(peer_ip) {
311 Ok(hdl) => {
312 Self::log_if_connect_error(
313 hdl.await,
314 &format!("Could not connect to bootstrap peer at '{peer_ip:?}'"),
315 );
316 }
317 Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
318 Err(err) => warn!("Could not initiate connection to bootstrap peer at '{peer_ip:?}' - {err}"),
319 }
320 }
321 }
322 let num_surplus = connected_bootstrap.len().saturating_sub(1);
324 if num_surplus > 0 {
325 let rng = &mut OsRng;
327 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
329 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
330 self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
331 self.router().disconnect(peer.listener_addr);
333 }
334 }
335 }
336
337 async fn try_connect_to_peers(&self, peers: impl Iterator<Item = CandidatePeer> + Send + 'static) {
341 let (peer_info, hdls): (Vec<_>, Vec<_>) = peers
342 .filter_map(|peer| {
343 let peer_type = if peer.trusted { "trusted peer" } else { "peer" };
344
345 if let Some(last_connection_attempt) = peer.last_connection_attempt
348 && last_connection_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
349 {
350 return None;
351 }
352
353 let addr = peer.listener_addr;
355 let attempt_no = peer.total_connection_attempts + 1;
356
357 debug!("(Re-)connecting to {peer_type} '{addr}' (attempt #{attempt_no})");
359 match self.router().connect(addr) {
360 Ok(hdl) => Some(((addr, attempt_no, peer_type), hdl)),
361 Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => None,
362 Err(err) => {
363 warn!("Could not initiate connection to {peer_type} at '{addr}' - {err}");
364 None
365 }
366 }
367 })
368 .unzip();
369
370 for ((peer_addr, attempt_no, peer_type), result) in peer_info.into_iter().zip(join_all(hdls).await) {
372 Self::log_if_connect_error(
373 result,
374 &format!("Could not connect to {peer_type} at '{peer_addr}' (attempt #{attempt_no})"),
375 );
376 }
377 }
378
379 async fn handle_trusted_peers(&self) {
381 self.try_connect_to_peers(self.router().get_trusted_candidate_peers().into_iter()).await;
382 }
383
384 fn handle_puzzle_request(&self) {
386 }
388
389 fn handle_banned_ips(&self) {
391 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
392 }
393}