1use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver, bootstrap_peers};
17
18use aleo_std::{StorageMode, aleo_ledger_dir};
19use snarkos_node_tcp::{P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
20use snarkvm::prelude::{Address, Network};
21
22use anyhow::{Result, bail};
23#[cfg(feature = "locktick")]
24use locktick::parking_lot::RwLock;
25#[cfg(not(feature = "locktick"))]
26use parking_lot::RwLock;
27use std::{
28 cmp,
29 collections::{
30 HashSet,
31 hash_map::{Entry, HashMap},
32 },
33 fs,
34 io::{self, Write},
35 net::{IpAddr, SocketAddr},
36 str::FromStr,
37 time::Instant,
38};
39use tokio::task;
40use tracing::*;
41
42pub trait PeerPoolHandling<N: Network>: P2P {
43 const OWNER: &str;
44
45 const MAXIMUM_POOL_SIZE: usize;
47
48 const PEER_SLASHING_COUNT: usize;
51
52 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
53
54 fn resolver(&self) -> &RwLock<Resolver<N>>;
55
56 fn is_dev(&self) -> bool;
58
59 fn trusted_peers_only(&self) -> bool;
61
62 fn node_type(&self) -> NodeType;
64
65 fn local_ip(&self) -> SocketAddr {
67 self.tcp().listening_addr().expect("The TCP listener is not enabled")
68 }
69
70 fn is_local_ip(&self, addr: SocketAddr) -> bool {
72 addr == self.local_ip()
73 || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
74 }
75
76 fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
78 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
79 }
80
81 fn max_connected_peers(&self) -> usize {
83 self.tcp().config().max_connections as usize
84 }
85
86 fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> {
93 if self.is_local_ip(listener_addr) {
95 bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER);
96 }
97 if self.number_of_connected_peers() >= self.max_connected_peers() {
99 bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER);
100 }
101 if self.is_connected(listener_addr) {
103 debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER);
104 return Ok(true);
105 }
106 if self.is_connecting(listener_addr) {
108 debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER);
109 return Ok(true);
110 }
111 if self.is_ip_banned(listener_addr.ip()) {
113 bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip());
114 }
115 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
117 bail!("{} Dropping connection attempt to '{listener_addr}' (untrusted)", Self::OWNER);
118 }
119 Ok(false)
120 }
121
122 fn connect(&self, listener_addr: SocketAddr) -> Option<task::JoinHandle<bool>> {
127 match self.check_connection_attempt(listener_addr) {
129 Ok(true) => return None,
130 Ok(false) => {}
131 Err(error) => {
132 warn!("{} {error}", Self::OWNER);
133 return None;
134 }
135 }
136
137 let is_trusted_or_bootstrap =
140 self.is_trusted(listener_addr) || bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr);
141
142 let tcp = self.tcp().clone();
143 Some(tokio::spawn(async move {
144 debug!("{} Connecting to {listener_addr}...", Self::OWNER);
145 match tcp.connect(listener_addr).await {
147 Ok(_) => true,
148 Err(error) => {
149 if is_trusted_or_bootstrap {
150 warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
151 } else {
152 debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
153 }
154 false
155 }
156 }
157 }))
158 }
159
160 fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
163 if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
164 let tcp = self.tcp().clone();
165 tokio::spawn(async move { tcp.disconnect(connected_addr).await })
166 } else {
167 tokio::spawn(async { false })
168 }
169 }
170
171 fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) {
173 if let Some(peer) = self.peer_pool().write().get_mut(&listener_addr) {
174 if let Peer::Connected(peer) = peer {
175 let aleo_addr = if self.node_type() == NodeType::BootstrapClient
180 && peer.connection_mode == ConnectionMode::Router
181 {
182 None
183 } else {
184 Some(peer.aleo_addr)
185 };
186 self.resolver().write().remove_peer(peer.connected_addr, aleo_addr);
187 }
188 peer.downgrade_to_candidate(listener_addr);
189 }
190 }
191
192 fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
196 let trusted_peers = self.trusted_peers();
197
198 let mut peer_pool = self.peer_pool().write();
201
202 let mut num_updates: usize = 0;
204 listener_addrs.retain(|&(addr, height)| {
205 !self.is_ip_banned(addr.ip())
206 && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
207 && peer_pool
208 .get(&addr)
209 .map(|peer| peer.is_candidate() && height.is_some())
210 .inspect(|is_valid_update| {
211 if *is_valid_update {
212 num_updates += 1
213 }
214 })
215 .unwrap_or(true)
216 });
217
218 if listener_addrs.is_empty() {
220 return;
221 }
222
223 if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
225 && Self::PEER_SLASHING_COUNT != 0
226 {
227 let mut peers_to_slash = peer_pool
229 .iter()
230 .filter_map(|(addr, peer)| {
231 (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
232 })
233 .collect::<Vec<_>>();
234
235 let known_peers = self.tcp().known_peers().snapshot();
237
238 let default_value = (0, Instant::now());
240 peers_to_slash.sort_unstable_by_key(|addr| {
241 let (num_failures, last_seen) = known_peers
242 .get(&addr.ip())
243 .map(|stats| (stats.failures(), stats.timestamp()))
244 .unwrap_or(default_value);
245 (cmp::Reverse(num_failures), last_seen)
246 });
247
248 peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
250
251 peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
253
254 self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
256 }
257
258 listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
260
261 if listener_addrs.is_empty() {
263 return;
264 }
265
266 for (addr, height) in listener_addrs {
268 match peer_pool.entry(addr) {
269 Entry::Vacant(entry) => {
270 entry.insert(Peer::new_candidate(addr, false));
271 }
272 Entry::Occupied(mut entry) => {
273 if let Peer::Candidate(peer) = entry.get_mut() {
274 peer.last_height_seen = height;
275 }
276 }
277 }
278 }
279 }
280
281 fn remove_peer(&self, listener_addr: SocketAddr) {
283 self.peer_pool().write().remove(&listener_addr);
284 }
285
286 fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
288 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
289 Some(peer.connected_addr)
290 } else {
291 None
292 }
293 }
294
295 fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
297 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
298 Some(peer.aleo_addr)
299 } else {
300 None
301 }
302 }
303
304 fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
306 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
307 }
308
309 fn is_connected(&self, listener_addr: SocketAddr) -> bool {
311 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
312 }
313
314 fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
316 self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
318 }
319
320 fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
322 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
323 }
324
325 fn number_of_peers(&self) -> usize {
327 self.peer_pool().read().len()
328 }
329
330 fn number_of_connected_peers(&self) -> usize {
332 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
333 }
334
335 fn number_of_connecting_peers(&self) -> usize {
337 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
338 }
339
340 fn number_of_candidate_peers(&self) -> usize {
342 self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
343 }
344
345 fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
347 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
348 Some(peer.clone())
349 } else {
350 None
351 }
352 }
353
354 fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
357 &self,
358 listener_addr: &SocketAddr,
359 mut update_fn: F,
360 ) -> bool {
361 if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
362 update_fn(peer);
363 true
364 } else {
365 false
366 }
367 }
368
369 fn get_peers(&self) -> Vec<Peer<N>> {
371 self.peer_pool().read().values().cloned().collect()
372 }
373
374 fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
376 self.filter_connected_peers(|_| true)
377 }
378
379 fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
382 let mut peers = self.get_connected_peers();
384 let known_peers = self.tcp().known_peers().snapshot();
386
387 peers.sort_unstable_by_key(|peer| {
389 if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
390 (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
392 } else {
393 (cmp::Reverse(peer.last_height_seen), 0)
395 }
396 });
397 if let Some(max) = max_entries {
398 peers.truncate(max);
399 }
400
401 peers
402 }
403
404 fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
406 self.peer_pool()
407 .read()
408 .values()
409 .filter_map(|p| {
410 if let Peer::Connected(peer) = p
411 && predicate(peer)
412 {
413 Some(peer)
414 } else {
415 None
416 }
417 })
418 .cloned()
419 .collect()
420 }
421
422 fn connected_peers(&self) -> Vec<SocketAddr> {
424 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
425 }
426
427 fn trusted_peers(&self) -> Vec<SocketAddr> {
429 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
430 }
431
432 fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
434 self.peer_pool()
435 .read()
436 .values()
437 .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
438 .collect()
439 }
440
441 fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> {
443 self.peer_pool()
444 .read()
445 .iter()
446 .filter_map(
447 |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None },
448 )
449 .collect()
450 }
451
452 fn load_cached_peers(storage_mode: &StorageMode, filename: &str) -> Result<Vec<SocketAddr>> {
455 let mut peer_cache_path = aleo_ledger_dir(N::ID, storage_mode);
456 peer_cache_path.push(filename);
457
458 let peers = match fs::read_to_string(&peer_cache_path) {
459 Ok(cached_peers_str) => {
460 let mut cached_peers = Vec::new();
461 for peer_addr_str in cached_peers_str.lines() {
462 match SocketAddr::from_str(peer_addr_str) {
463 Ok(addr) => cached_peers.push(addr),
464 Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
465 }
466 }
467 cached_peers
468 }
469 Err(error) if error.kind() == io::ErrorKind::NotFound => {
470 Vec::new()
472 }
473 Err(error) => {
474 warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, peer_cache_path.display());
475 Vec::new()
476 }
477 };
478
479 Ok(peers)
480 }
481
482 fn save_best_peers(&self, storage_mode: &StorageMode, filename: &str, max_entries: Option<usize>) -> Result<()> {
485 let mut peers = self.get_peers();
487
488 let known_peers = self.tcp().known_peers().snapshot();
490
491 peers.sort_unstable_by_key(|peer| {
493 if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
494 (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
496 } else {
497 (cmp::Reverse(peer.last_height_seen()), 0)
499 }
500 });
501 if let Some(max) = max_entries {
502 peers.truncate(max);
503 }
504
505 let mut path = aleo_ledger_dir(N::ID, storage_mode);
507 path.push(filename);
508 let mut file = fs::File::create(path)?;
509 for peer in peers {
510 writeln!(file, "{}", peer.listener_addr())?;
511 }
512
513 Ok(())
514 }
515
516 fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool {
521 match self.peer_pool().write().entry(listener_addr) {
522 Entry::Vacant(entry) => {
523 entry.insert(Peer::new_connecting(listener_addr, false));
524 true
525 }
526 Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
527 entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted()));
528 true
529 }
530 Entry::Occupied(_) => false,
531 }
532 }
533
534 fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
537 let ip = listener_addr.ip();
538 debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
539
540 self.tcp().banned_peers().update_ip_ban(ip);
542
543 self.disconnect(listener_addr);
545 self.remove_peer(listener_addr);
547 }
548
549 fn is_ip_banned(&self, ip: IpAddr) -> bool {
551 self.tcp().banned_peers().is_ip_banned(&ip)
552 }
553
554 fn update_ip_ban(&self, ip: IpAddr) {
556 self.tcp().banned_peers().update_ip_ban(ip);
557 }
558}