snarkos_node_router/
heartbeat.rs1use crate::{
17 CandidatePeer,
18 ConnectedPeer,
19 NodeType,
20 Outbound,
21 PeerPoolHandling,
22 bootstrap_peers,
23 messages::{DisconnectReason, Message, PeerRequest},
24};
25
26use snarkos_node_tcp::{ConnectError, P2P};
27
28use snarkvm::prelude::Network;
29
30use colored::Colorize;
31use futures::future::join_all;
32use rand::{SeedableRng, prelude::IteratorRandom};
33use rand_chacha::ChaChaRng;
34use std::time::Duration;
35use tokio::task::JoinError;
36
37pub const fn max(a: usize, b: usize) -> usize {
40 match a > b {
41 true => a,
42 false => b,
43 }
44}
45
46#[async_trait]
47pub trait Heartbeat<N: Network>: Outbound<N> {
48 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
50 const MINIMUM_NUMBER_OF_PEERS: usize = 3;
52 const MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS: Duration = Duration::from_secs(10);
54 const STARTUP_GRACE_PERIOD: Duration = Duration::from_secs(60);
56 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
58 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
60 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
62 const IP_BAN_TIME_IN_SECS: u64 = 300;
64
65 async fn heartbeat(&self) {
67 self.safety_check_minimum_number_of_peers();
68 self.log_connected_peers();
69
70 self.remove_oldest_connected_peer();
72 self.handle_connected_peers().await;
74 self.handle_bootstrap_peers().await;
76 self.handle_trusted_peers().await;
78 self.handle_puzzle_request();
80 self.handle_banned_ips();
82 }
83
84 fn safety_check_minimum_number_of_peers(&self) {
87 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
89 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
90 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
91 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
92 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
93 }
94
95 fn log_connected_peers(&self) {
97 let connected_peers = self.router().connected_peers();
99 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
100 match connected_peers.len() {
101 0 => {
102 if self.router().tcp().uptime() > Self::STARTUP_GRACE_PERIOD {
104 warn!("No connected peers")
105 }
106 }
107 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
108 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
109 }
110 }
111
112 fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
120 let is_block_synced = self.is_block_synced();
122
123 let mut peers = self.router().filter_connected_peers(|peer| {
128 !peer.trusted
129 && peer.node_type != NodeType::BootstrapClient
130 && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) });
133 peers.sort_by_key(|peer| peer.last_seen);
134
135 peers
136 }
137
138 fn remove_oldest_connected_peer(&self) {
142 if self.router().trusted_peers_only() {
144 return;
145 }
146
147 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
149 return;
150 }
151
152 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
156 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
157 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
158 self.router().disconnect(oldest);
159 }
160 }
161
162 #[inline]
165 fn log_if_connect_error(result: Result<Result<(), ConnectError>, JoinError>, context: &str) {
166 match result {
167 Ok(Ok(())) => {}
169 Ok(Err(err @ ConnectError::AlreadyConnecting { .. }))
170 | Ok(Err(err @ ConnectError::AlreadyConnected { .. })) => {
171 debug!("{context}: {err}");
173 }
174 Ok(Err(err)) => warn!("{context}: {err}"),
176 Err(err) => error!("{context}: {err}"),
178 }
179 }
180
181 async fn handle_connected_peers(&self) {
183 let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
185
186 let num_connected = self.router().number_of_connected_peers();
188 let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
190
191 let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
193
194 let num_surplus_peers = num_connected.saturating_sub(max_peers);
196 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
198 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
200 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
202
203 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
204 debug!(
205 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
206 );
207
208 let provers_to_disconnect = self
210 .router()
211 .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
212 .into_iter()
213 .sample(rng, num_surplus_provers);
214
215 let peers_to_disconnect = self
217 .get_removable_peers()
218 .into_iter()
219 .filter(|peer| !peer.node_type.is_prover()) .take(num_surplus_clients_validators);
221
222 for peer in peers_to_disconnect.chain(provers_to_disconnect) {
224 if self.router().node_type().is_prover() {
226 continue;
227 }
228
229 let peer_addr = peer.listener_addr;
230 info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
231 self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
232 self.router().disconnect(peer_addr);
234 }
235 }
236
237 let num_connected = self.router().number_of_connected_peers();
239 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
241
242 if num_deficient > 0 {
243 let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
245
246 let own_height = self.router().ledger.latest_block_height();
249 let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
250 .router()
251 .get_candidate_peers()
252 .into_iter()
253 .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
254 let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
256
257 let higher_peers = higher_peers.into_iter().sample(rng, num_higher_peers);
258 let other_peers = other_peers.into_iter().sample(rng, num_deficient.saturating_sub(num_higher_peers));
259
260 self.try_connect_to_peers(higher_peers.into_iter().chain(other_peers)).await;
262
263 if !self.router().trusted_peers_only() {
264 for peer_ip in self.router().connected_peers().into_iter().sample(rng, 3) {
266 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
267 }
268 }
269 }
270 }
271
272 async fn handle_bootstrap_peers(&self) {
274 if self.router().trusted_peers_only() {
276 return;
277 }
278 let mut candidate_bootstrap = Vec::new();
280 let connected_bootstrap =
281 self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
282 for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
283 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
284 candidate_bootstrap.push(bootstrap_ip);
285 }
286 }
287 if connected_bootstrap.is_empty() {
289 let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
291 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
293 match self.router().connect(peer_ip) {
294 Ok(hdl) => {
295 Self::log_if_connect_error(
296 hdl.await,
297 &format!("Could not connect to bootstrap peer at '{peer_ip:?}'"),
298 );
299 }
300 Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => {}
301 Err(err) => warn!("Could not initiate connection to bootstrap peer at '{peer_ip:?}' - {err}"),
302 }
303 }
304 }
305 let num_surplus = connected_bootstrap.len().saturating_sub(1);
307 if num_surplus > 0 {
308 let rng = &mut ChaChaRng::from_rng(&mut rand::rng());
310 for peer in connected_bootstrap.into_iter().sample(rng, num_surplus) {
312 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
313 self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
314 self.router().disconnect(peer.listener_addr);
316 }
317 }
318 }
319
320 async fn try_connect_to_peers(&self, peers: impl Iterator<Item = CandidatePeer> + Send + 'static) {
324 let (peer_info, hdls): (Vec<_>, Vec<_>) = peers
325 .filter_map(|peer| {
326 let peer_type = if peer.trusted { "trusted peer" } else { "peer" };
327
328 if let Some(last_connection_attempt) = peer.last_connection_attempt
331 && last_connection_attempt.elapsed() < Self::MINIMUM_TIME_BETWEEN_CONNECTION_ATTEMPTS
332 {
333 return None;
334 }
335
336 let addr = peer.listener_addr;
338 let attempt_no = peer.total_connection_attempts + 1;
339
340 debug!("(Re-)connecting to {peer_type} '{addr}' (attempt #{attempt_no})");
342 match self.router().connect(addr) {
343 Ok(hdl) => Some(((addr, attempt_no, peer_type), hdl)),
344 Err(ConnectError::AlreadyConnected { .. }) | Err(ConnectError::AlreadyConnecting { .. }) => None,
345 Err(err) => {
346 warn!("Could not initiate connection to {peer_type} at '{addr}' - {err}");
347 None
348 }
349 }
350 })
351 .unzip();
352
353 for ((peer_addr, attempt_no, peer_type), result) in peer_info.into_iter().zip(join_all(hdls).await) {
355 Self::log_if_connect_error(
356 result,
357 &format!("Could not connect to {peer_type} at '{peer_addr}' (attempt #{attempt_no})"),
358 );
359 }
360 }
361
362 async fn handle_trusted_peers(&self) {
364 self.try_connect_to_peers(self.router().get_trusted_candidate_peers().into_iter()).await;
365 }
366
367 fn handle_puzzle_request(&self) {
369 }
371
372 fn handle_banned_ips(&self) {
374 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
375 }
376}