snarkos_node_router/
heartbeat.rs1use crate::{
17 ConnectedPeer,
18 NodeType,
19 Outbound,
20 PeerPoolHandling,
21 Router,
22 bootstrap_peers,
23 messages::{DisconnectReason, Message, PeerRequest},
24};
25use snarkvm::prelude::Network;
26
27use snarkos_node_tcp::P2P;
28
29use colored::Colorize;
30use rand::{Rng, prelude::IteratorRandom, rngs::OsRng};
31
32pub const fn max(a: usize, b: usize) -> usize {
35 match a > b {
36 true => a,
37 false => b,
38 }
39}
40
41#[async_trait]
42pub trait Heartbeat<N: Network>: Outbound<N> {
43 const HEARTBEAT_IN_SECS: u64 = 25; const MINIMUM_NUMBER_OF_PEERS: usize = 3;
47 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
49 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
51 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
53 const IP_BAN_TIME_IN_SECS: u64 = 300;
55
56 async fn heartbeat(&self) {
58 self.safety_check_minimum_number_of_peers();
59 self.log_connected_peers();
60
61 self.remove_stale_connected_peers();
63 self.remove_oldest_connected_peer();
65 self.handle_connected_peers();
67 self.handle_bootstrap_peers().await;
69 self.handle_trusted_peers().await;
71 self.handle_puzzle_request();
73 self.handle_banned_ips();
75 }
76
77 fn safety_check_minimum_number_of_peers(&self) {
80 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
82 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
83 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
84 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
85 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
86 }
87
88 fn log_connected_peers(&self) {
90 let connected_peers = self.router().connected_peers();
92 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
93 match connected_peers.len() {
94 0 => debug!("No connected peers"),
95 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
96 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
97 }
98 }
99
100 fn remove_stale_connected_peers(&self) {
102 for peer in self.router().get_connected_peers() {
104 let elapsed = peer.last_seen.elapsed();
106 if elapsed > Router::<N>::MAX_RADIO_SILENCE {
107 warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
108 self.router().disconnect(peer.listener_addr);
110 }
111 }
112 }
113
114 fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
122 let is_block_synced = self.is_block_synced();
124
125 let mut peers = self.router().filter_connected_peers(|peer| {
130 !peer.trusted
131 && peer.node_type != NodeType::BootstrapClient
132 && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) });
135 peers.sort_by_key(|peer| peer.last_seen);
136
137 peers
138 }
139
140 fn remove_oldest_connected_peer(&self) {
144 if !self.router().allow_external_peers() {
146 return;
147 }
148
149 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
151 return;
152 }
153
154 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
158 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
159 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
160 self.router().disconnect(oldest);
161 }
162 }
163
164 fn handle_connected_peers(&self) {
166 let rng = &mut OsRng;
168
169 let num_connected = self.router().number_of_connected_peers();
171 let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
173
174 let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
176 let (max_peers, max_provers) = if reduce_peers {
178 (Self::MEDIAN_NUMBER_OF_PEERS, 0)
179 } else {
180 (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
181 };
182
183 let num_surplus_peers = num_connected.saturating_sub(max_peers);
185 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
187 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
189 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
191
192 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
193 debug!(
194 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
195 );
196
197 let provers_to_disconnect = self
199 .router()
200 .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
201 .into_iter()
202 .choose_multiple(rng, num_surplus_provers);
203
204 let peers_to_disconnect = self
206 .get_removable_peers()
207 .into_iter()
208 .filter(|peer| !peer.node_type.is_prover()) .take(num_surplus_clients_validators);
210
211 for peer in peers_to_disconnect.chain(provers_to_disconnect) {
213 if self.router().node_type().is_prover() {
215 continue;
216 }
217
218 let peer_addr = peer.listener_addr;
219 info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
220 self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
221 self.router().disconnect(peer_addr);
223 }
224 }
225
226 let num_connected = self.router().number_of_connected_peers();
228 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
230
231 if num_deficient > 0 {
232 let rng = &mut OsRng;
234
235 let own_height = self.router().ledger.latest_block_height();
238 let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
239 .router()
240 .get_candidate_peers()
241 .into_iter()
242 .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
243 let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
245 for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
246 self.router().connect(peer.listener_addr);
247 }
248 for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
249 self.router().connect(peer.listener_addr);
250 }
251
252 if self.router().allow_external_peers() {
253 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
255 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
256 }
257 }
258 }
259 }
260
261 async fn handle_bootstrap_peers(&self) {
263 let mut candidate_bootstrap = Vec::new();
265 let connected_bootstrap =
266 self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
267 for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
268 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
269 candidate_bootstrap.push(bootstrap_ip);
270 }
271 }
272 if connected_bootstrap.is_empty() {
274 let rng = &mut OsRng;
276 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
278 match self.router().connect(peer_ip) {
279 Some(hdl) => {
280 let result = hdl.await;
281 if let Err(err) = result {
282 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
283 }
284 }
285 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
286 }
287 }
288 }
289 let num_surplus = connected_bootstrap.len().saturating_sub(1);
291 if num_surplus > 0 {
292 let rng = &mut OsRng;
294 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
296 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
297 self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
298 self.router().disconnect(peer.listener_addr);
300 }
301 }
302 }
303
304 async fn handle_trusted_peers(&self) {
306 let handles: Vec<_> = self
308 .router()
309 .unconnected_trusted_peers()
310 .iter()
311 .filter_map(|listener_addr| {
312 debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
313 let hdl = self.router().connect(*listener_addr);
314 if hdl.is_none() {
315 warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
316 }
317 hdl
318 })
319 .collect();
320
321 for result in futures::future::join_all(handles).await {
322 if let Err(err) = result {
323 warn!("Could not connect to trusted peer: {err}");
324 }
325 }
326 }
327
328 fn handle_puzzle_request(&self) {
330 }
332
333 fn handle_banned_ips(&self) {
335 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
336 }
337}