snarkos_node_router/
heartbeat.rs1use crate::{
17 ConnectedPeer,
18 Outbound,
19 Router,
20 messages::{DisconnectReason, Message, PeerRequest},
21};
22use snarkvm::prelude::Network;
23
24use snarkos_node_tcp::P2P;
25
26use colored::Colorize;
27use rand::{Rng, prelude::IteratorRandom, rngs::OsRng};
28
29pub const fn max(a: usize, b: usize) -> usize {
32 match a > b {
33 true => a,
34 false => b,
35 }
36}
37
38#[async_trait]
39pub trait Heartbeat<N: Network>: Outbound<N> {
40 const HEARTBEAT_IN_SECS: u64 = 25; const MINIMUM_NUMBER_OF_PEERS: usize = 3;
44 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
46 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
48 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
50 const IP_BAN_TIME_IN_SECS: u64 = 300;
52
53 async fn heartbeat(&self) {
55 self.safety_check_minimum_number_of_peers();
56 self.log_connected_peers();
57
58 self.remove_stale_connected_peers();
60 self.remove_oldest_connected_peer();
62 self.handle_connected_peers();
64 self.handle_bootstrap_peers().await;
66 self.handle_trusted_peers().await;
68 self.handle_puzzle_request();
70 self.handle_banned_ips();
72 }
73
74 fn safety_check_minimum_number_of_peers(&self) {
77 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
79 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
80 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
81 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
82 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
83 }
84
85 fn log_connected_peers(&self) {
87 let connected_peers = self.router().connected_peers();
89 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
90 match connected_peers.len() {
91 0 => debug!("No connected peers"),
92 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
93 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
94 }
95 }
96
97 fn remove_stale_connected_peers(&self) {
99 for peer in self.router().get_connected_peers() {
101 let elapsed = peer.last_seen.elapsed();
103 if elapsed > Router::<N>::MAX_RADIO_SILENCE {
104 warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
105 self.router().disconnect(peer.listener_addr);
107 }
108 }
109 }
110
111 fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
120 let bootstrap = self.router().bootstrap_peers();
122 let is_block_synced = self.is_block_synced();
124
125 let mut peers = self.router().filter_connected_peers(|peer| {
130 !peer.trusted
131 && !bootstrap.contains(&peer.listener_addr)
132 && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) });
135 peers.sort_by_key(|peer| (peer.node_type.is_validator(), peer.last_seen));
136
137 peers
138 }
139
140 fn remove_oldest_connected_peer(&self) {
144 if !self.router().allow_external_peers() {
146 return;
147 }
148
149 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
151 return;
152 }
153
154 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
158 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
159 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
160 self.router().disconnect(oldest);
161 }
162 }
163
164 fn handle_connected_peers(&self) {
166 let rng = &mut OsRng;
168
169 let num_connected = self.router().number_of_connected_peers();
171 let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
173
174 let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
176 let (max_peers, max_provers) = if reduce_peers {
178 (Self::MEDIAN_NUMBER_OF_PEERS, 0)
179 } else {
180 (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
181 };
182
183 let num_surplus_peers = num_connected.saturating_sub(max_peers);
185 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
187 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
189 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
191
192 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
193 debug!(
194 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
195 );
196
197 let bootstrap = self.router().bootstrap_peers();
199
200 let provers_to_disconnect = self
202 .router()
203 .filter_connected_peers(|peer| {
204 peer.node_type.is_prover() && !peer.trusted && !bootstrap.contains(&peer.listener_addr)
205 })
206 .into_iter()
207 .choose_multiple(rng, num_surplus_provers);
208
209 let peers_to_disconnect = self
211 .get_removable_peers()
212 .into_iter()
213 .filter(|peer| !peer.node_type.is_prover()) .take(num_surplus_clients_validators);
215
216 for peer in peers_to_disconnect.chain(provers_to_disconnect) {
218 if self.router().node_type().is_prover() && peer.node_type.is_validator() {
220 continue;
221 }
222
223 let peer_addr = peer.listener_addr;
224 info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
225 self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
226 self.router().disconnect(peer_addr);
228 }
229 }
230
231 let num_connected = self.router().number_of_connected_peers();
233 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
235
236 if num_deficient > 0 {
237 let rng = &mut OsRng;
239
240 for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
242 self.router().connect(peer_ip);
243 }
244
245 if self.router().allow_external_peers() {
246 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
248 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
249 }
250 }
251 }
252 }
253
254 async fn handle_bootstrap_peers(&self) {
256 let mut connected_bootstrap = Vec::new();
258 let mut candidate_bootstrap = Vec::new();
259 let connected_peers = self.router().connected_peers();
260 for bootstrap_ip in self.router().bootstrap_peers() {
261 match connected_peers.contains(&bootstrap_ip) {
262 true => connected_bootstrap.push(bootstrap_ip),
263 false => candidate_bootstrap.push(bootstrap_ip),
264 }
265 }
266 if connected_bootstrap.is_empty() {
268 let rng = &mut OsRng;
270 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
272 match self.router().connect(peer_ip) {
273 Some(hdl) => {
274 let result = hdl.await;
275 if let Err(err) = result {
276 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
277 }
278 }
279 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
280 }
281 }
282 }
283 let num_surplus = connected_bootstrap.len().saturating_sub(1);
285 if num_surplus > 0 {
286 let rng = &mut OsRng;
288 for peer_ip in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
290 info!("Disconnecting from '{peer_ip}' (exceeded maximum bootstrap)");
291 self.router().send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
292 self.router().disconnect(peer_ip);
294 }
295 }
296 }
297
298 async fn handle_trusted_peers(&self) {
300 let handles: Vec<_> = self
302 .router()
303 .unconnected_trusted_peers()
304 .iter()
305 .filter_map(|listener_addr| {
306 debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
307 let hdl = self.router().connect(*listener_addr);
308 if hdl.is_none() {
309 warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
310 }
311 hdl
312 })
313 .collect();
314
315 for result in futures::future::join_all(handles).await {
316 if let Err(err) = result {
317 warn!("Could not connect to trusted peer: {err}");
318 }
319 }
320 }
321
322 fn handle_puzzle_request(&self) {
324 }
326
327 fn handle_banned_ips(&self) {
329 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
330 }
331}