snarkos_node_router/
heartbeat.rs1use crate::{
17 ConnectedPeer,
18 NodeType,
19 Outbound,
20 PeerPoolHandling,
21 Router,
22 bootstrap_peers,
23 messages::{DisconnectReason, Message, PeerRequest},
24};
25use snarkvm::prelude::Network;
26
27use snarkos_node_tcp::P2P;
28
29use colored::Colorize;
30use rand::{prelude::IteratorRandom, rngs::OsRng};
31
32pub const fn max(a: usize, b: usize) -> usize {
35 match a > b {
36 true => a,
37 false => b,
38 }
39}
40
41#[async_trait]
42pub trait Heartbeat<N: Network>: Outbound<N> {
43 const HEARTBEAT_IN_SECS: u64 = 25; const MINIMUM_NUMBER_OF_PEERS: usize = 3;
47 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
49 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
51 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
53 const IP_BAN_TIME_IN_SECS: u64 = 300;
55
56 async fn heartbeat(&self) {
58 self.safety_check_minimum_number_of_peers();
59 self.log_connected_peers();
60
61 self.remove_stale_connected_peers();
63 self.remove_oldest_connected_peer();
65 self.handle_connected_peers();
67 self.handle_bootstrap_peers().await;
69 self.handle_trusted_peers().await;
71 self.handle_puzzle_request();
73 self.handle_banned_ips();
75 }
76
77 fn safety_check_minimum_number_of_peers(&self) {
80 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
82 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
83 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
84 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
85 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
86 }
87
88 fn log_connected_peers(&self) {
90 let connected_peers = self.router().connected_peers();
92 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
93 match connected_peers.len() {
94 0 => warn!("No connected peers"),
95 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
96 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
97 }
98 }
99
100 fn remove_stale_connected_peers(&self) {
102 for peer in self.router().get_connected_peers() {
104 let elapsed = peer.last_seen.elapsed();
106 if elapsed > Router::<N>::MAX_RADIO_SILENCE {
107 warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
108 self.router().disconnect(peer.listener_addr);
110 }
111 }
112 }
113
114 fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
122 let is_block_synced = self.is_block_synced();
124
125 let mut peers = self.router().filter_connected_peers(|peer| {
130 !peer.trusted
131 && peer.node_type != NodeType::BootstrapClient
132 && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) });
135 peers.sort_by_key(|peer| peer.last_seen);
136
137 peers
138 }
139
140 fn remove_oldest_connected_peer(&self) {
144 if self.router().trusted_peers_only() {
146 return;
147 }
148
149 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
151 return;
152 }
153
154 if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
158 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
159 let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
160 self.router().disconnect(oldest);
161 }
162 }
163
164 fn handle_connected_peers(&self) {
166 let rng = &mut OsRng;
168
169 let num_connected = self.router().number_of_connected_peers();
171 let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();
173
174 let (max_peers, max_provers) = (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS);
176
177 let num_surplus_peers = num_connected.saturating_sub(max_peers);
179 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
181 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
183 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
185
186 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
187 debug!(
188 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
189 );
190
191 let provers_to_disconnect = self
193 .router()
194 .filter_connected_peers(|peer| peer.node_type.is_prover() && !peer.trusted)
195 .into_iter()
196 .choose_multiple(rng, num_surplus_provers);
197
198 let peers_to_disconnect = self
200 .get_removable_peers()
201 .into_iter()
202 .filter(|peer| !peer.node_type.is_prover()) .take(num_surplus_clients_validators);
204
205 for peer in peers_to_disconnect.chain(provers_to_disconnect) {
207 if self.router().node_type().is_prover() {
209 continue;
210 }
211
212 let peer_addr = peer.listener_addr;
213 info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
214 self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
215 self.router().disconnect(peer_addr);
217 }
218 }
219
220 let num_connected = self.router().number_of_connected_peers();
222 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
224
225 if num_deficient > 0 {
226 let rng = &mut OsRng;
228
229 let own_height = self.router().ledger.latest_block_height();
232 let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
233 .router()
234 .get_candidate_peers()
235 .into_iter()
236 .partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
237 let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
239 for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
240 self.router().connect(peer.listener_addr);
241 }
242 for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
243 self.router().connect(peer.listener_addr);
244 }
245
246 if !self.router().trusted_peers_only() {
247 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
249 self.router().send(peer_ip, Message::PeerRequest(PeerRequest));
250 }
251 }
252 }
253 }
254
255 async fn handle_bootstrap_peers(&self) {
257 if self.router().trusted_peers_only() {
259 return;
260 }
261 let mut candidate_bootstrap = Vec::new();
263 let connected_bootstrap =
264 self.router().filter_connected_peers(|peer| peer.node_type == NodeType::BootstrapClient);
265 for bootstrap_ip in bootstrap_peers::<N>(self.router().is_dev()) {
266 if !connected_bootstrap.iter().any(|peer| peer.listener_addr == bootstrap_ip) {
267 candidate_bootstrap.push(bootstrap_ip);
268 }
269 }
270 if connected_bootstrap.is_empty() {
272 let rng = &mut OsRng;
274 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
276 match self.router().connect(peer_ip) {
277 Some(hdl) => {
278 let result = hdl.await;
279 if let Err(err) = result {
280 warn!("Failed to connect to bootstrap peer at {peer_ip}: {err}");
281 }
282 }
283 None => warn!("Could not initiate connect to bootstrap peer at {peer_ip}"),
284 }
285 }
286 }
287 let num_surplus = connected_bootstrap.len().saturating_sub(1);
289 if num_surplus > 0 {
290 let rng = &mut OsRng;
292 for peer in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
294 info!("Disconnecting from '{}' (exceeded maximum bootstrap)", peer.listener_addr);
295 self.router().send(peer.listener_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
296 self.router().disconnect(peer.listener_addr);
298 }
299 }
300 }
301
302 async fn handle_trusted_peers(&self) {
304 let handles: Vec<_> = self
306 .router()
307 .unconnected_trusted_peers()
308 .iter()
309 .filter_map(|listener_addr| {
310 debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
311 let hdl = self.router().connect(*listener_addr);
312 if hdl.is_none() {
313 warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
314 }
315 hdl
316 })
317 .collect();
318
319 for result in futures::future::join_all(handles).await {
320 if let Err(err) = result {
321 warn!("Could not connect to trusted peer: {err}");
322 }
323 }
324 }
325
326 fn handle_puzzle_request(&self) {
328 }
330
331 fn handle_banned_ips(&self) {
333 self.router().tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
334 }
335}