snarkos_node_router/
heartbeat.rs1use crate::{
17 Outbound,
18 Router,
19 messages::{DisconnectReason, Message, PeerRequest},
20};
21use snarkvm::prelude::Network;
22
23use colored::Colorize;
24use rand::{Rng, prelude::IteratorRandom, rngs::OsRng};
25
26pub const fn max(a: usize, b: usize) -> usize {
29 match a > b {
30 true => a,
31 false => b,
32 }
33}
34
35pub trait Heartbeat<N: Network>: Outbound<N> {
36 const HEARTBEAT_IN_SECS: u64 = 25; const MINIMUM_NUMBER_OF_PEERS: usize = 3;
40 const MEDIAN_NUMBER_OF_PEERS: usize = max(Self::MAXIMUM_NUMBER_OF_PEERS / 2, Self::MINIMUM_NUMBER_OF_PEERS);
42 const MAXIMUM_NUMBER_OF_PEERS: usize = 21;
44 const MAXIMUM_NUMBER_OF_PROVERS: usize = Self::MAXIMUM_NUMBER_OF_PEERS / 4;
46 const IP_BAN_TIME_IN_SECS: u64 = 300;
48
49 fn heartbeat(&self) {
51 self.safety_check_minimum_number_of_peers();
52 self.log_connected_peers();
53
54 self.remove_stale_connected_peers();
56 self.remove_oldest_connected_peer();
58 self.handle_connected_peers();
60 self.handle_bootstrap_peers();
62 self.handle_trusted_peers();
64 self.handle_puzzle_request();
66 self.handle_banned_ips();
68 }
69
70 fn safety_check_minimum_number_of_peers(&self) {
73 assert!(Self::MINIMUM_NUMBER_OF_PEERS >= 1, "The minimum number of peers must be at least 1.");
75 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
76 assert!(Self::MINIMUM_NUMBER_OF_PEERS <= Self::MEDIAN_NUMBER_OF_PEERS);
77 assert!(Self::MEDIAN_NUMBER_OF_PEERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
78 assert!(Self::MAXIMUM_NUMBER_OF_PROVERS <= Self::MAXIMUM_NUMBER_OF_PEERS);
79 }
80
81 fn log_connected_peers(&self) {
83 let connected_peers = self.router().connected_peers();
85 let connected_peers_fmt = format!("{connected_peers:?}").dimmed();
86 match connected_peers.len() {
87 0 => debug!("No connected peers"),
88 1 => debug!("Connected to 1 peer: {connected_peers_fmt}"),
89 num_connected => debug!("Connected to {num_connected} peers {connected_peers_fmt}"),
90 }
91 }
92
93 fn remove_stale_connected_peers(&self) {
95 for peer in self.router().get_connected_peers() {
97 let elapsed = peer.last_seen().elapsed().as_secs();
99 if elapsed > Router::<N>::RADIO_SILENCE_IN_SECS {
100 warn!("Peer {} has not communicated in {elapsed} seconds", peer.ip());
101 self.router().disconnect(peer.ip());
103 }
104 }
105 }
106
107 fn remove_oldest_connected_peer(&self) {
110 if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
112 return;
113 }
114
115 if !self.router().allow_external_peers() {
117 return;
118 }
119
120 let trusted = self.router().trusted_peers();
122 let bootstrap = self.router().bootstrap_peers();
124
125 let oldest_peer = self
127 .router()
128 .get_connected_peers()
129 .iter()
130 .filter(|peer| !trusted.contains(&peer.ip()) && !bootstrap.contains(&peer.ip()))
131 .filter(|peer| !self.router().cache.contains_inbound_block_request(&peer.ip())) .filter(|peer| self.is_block_synced() || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) .min_by_key(|peer| peer.last_seen())
134 .map(|peer| peer.ip());
135
136 if let Some(oldest) = oldest_peer {
138 info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
139 let _ = self.send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
140 self.router().disconnect(oldest);
142 }
143 }
144
145 fn handle_connected_peers(&self) {
148 let rng = &mut OsRng;
150
151 let num_connected = self.router().number_of_connected_peers();
153 let num_connected_provers = self.router().number_of_connected_provers();
155
156 let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
158 let (max_peers, max_provers) = if reduce_peers {
160 (Self::MEDIAN_NUMBER_OF_PEERS, 0)
161 } else {
162 (Self::MAXIMUM_NUMBER_OF_PEERS, Self::MAXIMUM_NUMBER_OF_PROVERS)
163 };
164
165 let num_surplus_peers = num_connected.saturating_sub(max_peers);
167 let num_surplus_provers = num_connected_provers.saturating_sub(max_provers);
169 let num_remaining_provers = num_connected_provers.saturating_sub(num_surplus_provers);
171 let num_surplus_clients_validators = num_surplus_peers.saturating_sub(num_remaining_provers);
173
174 if num_surplus_provers > 0 || num_surplus_clients_validators > 0 {
175 debug!(
176 "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
177 );
178
179 let trusted = self.router().trusted_peers();
181 let bootstrap = self.router().bootstrap_peers();
183
184 let prover_ips_to_disconnect = self
186 .router()
187 .connected_provers()
188 .into_iter()
189 .filter(|peer_ip| !trusted.contains(peer_ip) && !bootstrap.contains(peer_ip))
190 .choose_multiple(rng, num_surplus_provers);
191
192 let peer_ips_to_disconnect = self
196 .router()
197 .get_connected_peers()
198 .into_iter()
199 .filter_map(|peer| {
200 let peer_ip = peer.ip();
201 if !peer.is_prover() && !trusted.contains(&peer_ip) && !bootstrap.contains(&peer_ip) && (self.is_block_synced() || (!self.is_block_synced() && self.router().cache.num_outbound_block_requests(&peer.ip()) == 0))
206 {
207 Some(peer_ip)
208 } else {
209 None
210 }
211 })
212 .choose_multiple(rng, num_surplus_clients_validators);
213
214 for peer_ip in peer_ips_to_disconnect.into_iter().chain(prover_ips_to_disconnect) {
216 if self.router().node_type().is_prover() {
218 if let Some(peer) = self.router().get_connected_peer(&peer_ip) {
219 if peer.node_type().is_validator() {
220 continue;
221 }
222 }
223 }
224
225 info!("Disconnecting from '{peer_ip}' (exceeded maximum connections)");
226 self.send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
227 self.router().disconnect(peer_ip);
229 }
230 }
231
232 let num_connected = self.router().number_of_connected_peers();
234 let num_deficient = Self::MEDIAN_NUMBER_OF_PEERS.saturating_sub(num_connected);
236
237 if num_deficient > 0 {
238 let rng = &mut OsRng;
240
241 for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
243 self.router().connect(peer_ip);
244 }
245
246 if self.router().allow_external_peers() {
247 for peer_ip in self.router().connected_peers().into_iter().choose_multiple(rng, 3) {
249 self.send(peer_ip, Message::PeerRequest(PeerRequest));
250 }
251 }
252 }
253 }
254
255 fn handle_bootstrap_peers(&self) {
257 let mut connected_bootstrap = Vec::new();
259 let mut candidate_bootstrap = Vec::new();
260 for bootstrap_ip in self.router().bootstrap_peers() {
261 match self.router().is_connected(&bootstrap_ip) {
262 true => connected_bootstrap.push(bootstrap_ip),
263 false => candidate_bootstrap.push(bootstrap_ip),
264 }
265 }
266 if connected_bootstrap.is_empty() {
268 let rng = &mut OsRng;
270 if let Some(peer_ip) = candidate_bootstrap.into_iter().choose(rng) {
272 self.router().connect(peer_ip);
273 }
274 }
275 let num_surplus = connected_bootstrap.len().saturating_sub(1);
277 if num_surplus > 0 {
278 let rng = &mut OsRng;
280 for peer_ip in connected_bootstrap.into_iter().choose_multiple(rng, num_surplus) {
282 info!("Disconnecting from '{peer_ip}' (exceeded maximum bootstrap)");
283 self.send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
284 self.router().disconnect(peer_ip);
286 }
287 }
288 }
289
290 fn handle_trusted_peers(&self) {
292 for peer_ip in self.router().trusted_peers() {
294 if !self.router().is_connected(peer_ip) {
296 self.router().connect(*peer_ip);
298 }
299 }
300 }
301
302 fn handle_puzzle_request(&self) {
304 }
306
307 fn handle_banned_ips(&self) {
309 self.tcp().banned_peers().remove_old_bans(Self::IP_BAN_TIME_IN_SECS);
310 }
311}