snarkos_node_router/heartbeat.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17 Outbound,
18 Peer,
19 Router,
20 messages::{DisconnectReason, Message, PeerRequest},
21};
22use snarkvm::prelude::Network;
23
24use snarkos_node_tcp::P2P;
25
26use colored::Colorize;
27use rand::{Rng, prelude::IteratorRandom, rngs::OsRng};
28
29/// A helper function to compute the maximum of two numbers.
30/// See Rust issue 92391: https://github.com/rust-lang/rust/issues/92391.
31pub const fn max(a: usize, b: usize) -> usize {
32 match a > b {
33 true => a,
34 false => b,
35 }
36}
37
38#[async_trait]
39pub trait Heartbeat<N: Network>: Outbound<N> {
40 /// The duration in seconds to sleep in between heartbeat executions.
41 const HEARTBEAT_IN_SECS: u64 = 25; // 25 seconds
42 /// The minimum number of peers required to maintain connections with.
43 const MINIMUM_NUMBER_OF_PEERS: usize = 3;
44 /// The median number of peers to maintain connections with.
45 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
46 /// The maximum number of peers permitted to maintain connections with.
47 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
48 /// The maximum number of provers to maintain connections with.
49 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
50 /// The amount of time an IP address is prohibited from connecting.
51 const IP_BAN_TIME_IN_SECS: u64 = 300;
52
53 /// Handles the heartbeat request.
54 async fn heartbeat(&self) {
55 self.safety_check_minimum_number_of_peers();
56 self.log_connected_peers();
57
58 // Remove any stale connected peers.
59 self.remove_stale_connected_peers();
60 // Remove the oldest connected peer.
61 self.remove_oldest_connected_peer();
62 // Keep the number of connected peers within the allowed range.
63 self.handle_connected_peers();
64 // Keep the bootstrap peers within the allowed range.
65 self.handle_bootstrap_peers().await;
66 // Keep the trusted peers connected.
67 self.handle_trusted_peers().await;
68 // Keep the puzzle request up to date.
69 self.handle_puzzle_request();
70 // Unban any addresses whose ban time has expired.
71 self.handle_banned_ips();
72 }
73
74 /// TODO (howardwu): Consider checking minimum number of validators, to exclude clients and provers.
75 /// This function performs safety checks on the setting for the minimum number of peers.
76 fn safety_check_minimum_number_of_peers(&self) {
77 // Perform basic sanity checks on the configuration for the number of peers.
78 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
79 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
80 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
81 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
82 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
83 }
84
85 /// This function logs the connected peers.
86 fn log_connected_peers(&self) {
87 // Log the connected peers.
88 let connected_peers = self.router().connected_peers();
89 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
90 match connected_peers.len() {
91 0 => debug!("No connected peers"),
92 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
93 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
94 }
95 }
96
97 /// This function removes any connected peers that have not communicated within the predefined time.
98 fn remove_stale_connected_peers(&self) {
99 // Check if any connected peer is stale.
100 for peer in self.router().get_connected_peers() {
101 // Disconnect if the peer has not communicated back within the predefined time.
102 let elapsed = peer.last_seen().elapsed().as_secs();
103 if elapsed > Router::<N>::RADIO_SILENCE_IN_SECS {
104 warn!("Peer {} has not communicated in {elapsed} seconds", peer.ip());
105 // Disconnect from this peer.
106 self.router().disconnect(peer.ip());
107 }
108 }
109 }
110
111 /// Returns a sorted vector of network addresses of all removable connected peers
112 /// where the first entry has the lowest priority and the last one the highest.
113 ///
114 /// Rules:
115 /// - Trusted peers and bootstrap nodes are not removable.
116 /// - Peers that we are currently syncing with are not removable.
117 /// - Validators are considered higher priority than provers or clients.
118 /// - Connections that have not been seen in a while are considered lower priority.
119 fn get_removable_peers(&self) -> Vec<Peer<N>> {
120 // The trusted peers (specified at runtime).
121 let trusted = self.router().trusted_peers();
122 // The hardcoded bootstrap nodes.
123 let bootstrap = self.router().bootstrap_peers();
124 // Are we synced already? (cache this here, so it does not need to be recomputed)
125 let is_block_synced = self.is_block_synced();
126
127 // Sort by priority, where lowest priority will be at the beginning
128 // of the vector.
129 // Note, that this gives equal priority to clients and provers, which
130 // we might want to change in the future.
131 let mut peers = self.router().get_connected_peers();
132 peers.sort_by_key(|peer| (peer.is_validator(), peer.last_seen()));
133
134 // Determine which of the peers can be removed.
135 peers
136 .into_iter()
137 .filter(|peer| {
138 !trusted.contains(&peer.ip()) // Always keep trusted nodes.
139 && !bootstrap.contains(&peer.ip()) // Always keep bootstrap nodes.
140 && !self.router().cache.contains_inbound_block_request(&peer.ip()) // This peer is currently syncing from us.
141 && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) // We are currently syncing from this peer.
142 })
143 .collect()
144 }
145
146 /// This function removes the peer that we have not heard from the longest,
147 /// to keep the connections fresh.
148 /// It only triggers if the router is above the minimum number of connected peers.
149 fn remove_oldest_connected_peer(&self) {
150 // Skip if the router is at or below the minimum number of connected peers.
151 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
152 return;
153 }
154
155 // Skip if the node is not requesting peers.
156 if !self.router().allow_external_peers() {
157 return;
158 }
159
160 // Disconnect from the oldest connected peer, which is the first entry in the list
161 // of removable peers.
162 // Do nothing, if the list is empty.
163 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.ip()) {
164 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
165 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
166 self.router().disconnect(oldest);
167 }
168 }
169
170 /// This function keeps the number of connected peers within the allowed range.
171 fn handle_connected_peers(&self) {
172 // Initialize an RNG.
173 let rng = &mut OsRng;
174
175 // Obtain the number of connected peers.
176 let num_connected = self.router().number_of_connected_peers();
177 // Obtain the number of connected provers.
178 let num_connected_provers = self.router().number_of_connected_provers();
179
180 // Consider rotating more external peers every ~10 heartbeats.
181 let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
182 // Determine the maximum number of peers and provers to keep.
183 let (max_peers, max_provers) = if reduce_peers {
184 (Self::MEDIAN_NUMBER_OF_PEERS, 0)
185 } else {
186 (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
187 };
188
189 // Compute the number of surplus peers.
190 let num_surplus_peers = num_connected.saturating_sub(max_peers);
191 // Compute the number of surplus provers.
192 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
193 // Compute the number of provers remaining connected.
194 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
195 // Compute the number of surplus clients and validators.
196 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
197
198 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
199 debug!(
200 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
201 );
202
203 // Retrieve the trusted peers.
204 let trusted = self.router().trusted_peers();
205 // Retrieve the bootstrap peers.
206 let bootstrap = self.router().bootstrap_peers();
207
208 // Determine the provers to disconnect from.
209 let provers_to_disconnect = self
210 .router()
211 .connected_provers()
212 .into_iter()
213 .filter(|peer_ip| !trusted.contains(peer_ip) && !bootstrap.contains(peer_ip))
214 .choose_multiple(rng, num_surplus_provers);
215
216 // Determine the clients and validators to disconnect from.
217 let peers_to_disconnect = self
218 .get_removable_peers()
219 .into_iter()
220 .filter(|peer| !peer.is_prover()) // remove provers as those are handled separately
221 .map(|p| p.ip())
222 .take(num_surplus_clients_validators);
223
224 // Proceed to send disconnect requests to these peers.
225 for peer_ip in peers_to_disconnect.chain(provers_to_disconnect) {
226 // TODO (howardwu): Remove this after specializing this function.
227 if self.router().node_type().is_prover() {
228 if let Some(peer) = self.router().get_connected_peer(&peer_ip) {
229 if peer.node_type().is_validator() {
230 continue;
231 }
232 }
233 }
234
235 info!("Disconnecting from '{peer_ip}' (exceeded maximum connections)");
236 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
237 // Disconnect from this peer.
238 self.router().disconnect(peer_ip);
239 }
240 }
241
242 // Obtain the number of connected peers.
243 let num_connected = self.router().number_of_connected_peers();
244 // Compute the number of deficit peers.
245 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
246
247 if num_deficient > 0 {
248 // Initialize an RNG.
249 let rng = &mut OsRng;
250
251 // Attempt to connect to more peers.
252 for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
253 self.router().connect(peer_ip);
254 }
255
256 if self.router().allow_external_peers() {
257 // Request more peers from the connected peers.
258 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
259 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
260 }
261 }
262 }
263 }
264
265 /// This function keeps the number of bootstrap peers within the allowed range.
266 async fn handle_bootstrap_peers(&self) {
267 // Split the bootstrap peers into connected and candidate lists.
268 let mut connected_bootstrap = Vec::new();
269 let mut candidate_bootstrap = Vec::new();
270 for bootstrap_ip in self.router().bootstrap_peers() {
271 match self.router().is_connected(&bootstrap_ip) {
272 true => connected_bootstrap.push(bootstrap_ip),
273 false => candidate_bootstrap.push(bootstrap_ip),
274 }
275 }
276 // If there are not enough connected bootstrap peers, connect to more.
277 if connected_bootstrap.is_empty() {
278 // Initialize an RNG.
279 let rng = &mut OsRng;
280 // Attempt to connect to a bootstrap peer.
281 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
282 match self.router().connect(peer_ip) {
283 Some(hdl) => {
284 let result = hdl.await;
285 if let Err(err) = result {
286 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
287 }
288 }
289 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
290 }
291 }
292 }
293 // Determine if the node is connected to more bootstrap peers than allowed.
294 let num_surplus = connected_bootstrap.len().saturating_sub(1);
295 if num_surplus > 0 {
296 // Initialize an RNG.
297 let rng = &mut OsRng;
298 // Proceed to send disconnect requests to these bootstrap peers.
299 for peer_ip in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
300 info!("Disconnecting from '{peer_ip}' (exceeded maximum bootstrap)");
301 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
302 // Disconnect from this peer.
303 self.router().disconnect(peer_ip);
304 }
305 }
306 }
307
308 /// This function attempts to connect to any disconnected trusted peers.
309 async fn handle_trusted_peers(&self) {
310 // Ensure that the trusted nodes are connected.
311 let handles: Vec<_> = self
312 .router()
313 .trusted_peers()
314 .iter()
315 .filter_map(|peer_ip| {
316 // If the peer is not connected, attempt to connect to it.
317 if self.router().is_connected(peer_ip) {
318 None
319 } else {
320 debug!("Attempting to (re-)connect to trusted peer `{peer_ip}`");
321 let hdl = self.router().connect(*peer_ip);
322 if hdl.is_none() {
323 warn!("Could not initiate connection to trusted peer at `{peer_ip}`");
324 }
325 hdl
326 }
327 })
328 .collect();
329
330 for result in futures::future::join_all(handles).await {
331 if let Err(err) = result {
332 warn!("Could not connect to trusted peer: {err}");
333 }
334 }
335 }
336
337 /// This function updates the puzzle if network has updated.
338 fn handle_puzzle_request(&self) {
339 // No-op
340 }
341
342 // Remove addresses whose ban time has expired.
343 fn handle_banned_ips(&self) {
344 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
345 }
346}