1use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver, bootstrap_peers};
17
18use snarkos_node_tcp::{P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
19use snarkvm::prelude::{Address, Network};
20
21use anyhow::{Result, bail};
22#[cfg(feature = "locktick")]
23use locktick::parking_lot::RwLock;
24#[cfg(not(feature = "locktick"))]
25use parking_lot::RwLock;
26use std::{
27 cmp,
28 collections::{
29 HashSet,
30 hash_map::{Entry, HashMap},
31 },
32 fs,
33 io::{self, Write},
34 net::{IpAddr, SocketAddr},
35 path::Path,
36 str::FromStr,
37 time::Instant,
38};
39use tokio::task;
40use tracing::*;
41
42pub trait PeerPoolHandling<N: Network>: P2P {
43 const OWNER: &str;
44
45 const MAXIMUM_POOL_SIZE: usize;
47
48 const PEER_SLASHING_COUNT: usize;
51
52 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
53
54 fn resolver(&self) -> &RwLock<Resolver<N>>;
55
56 fn is_dev(&self) -> bool;
58
59 fn trusted_peers_only(&self) -> bool;
61
62 fn node_type(&self) -> NodeType;
64
65 fn local_ip(&self) -> SocketAddr {
67 self.tcp().listening_addr().expect("The TCP listener is not enabled")
68 }
69
70 fn is_local_ip(&self, addr: SocketAddr) -> bool {
72 addr == self.local_ip()
73 || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
74 }
75
76 fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
78 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
79 }
80
81 fn max_connected_peers(&self) -> usize {
83 self.tcp().config().max_connections as usize
84 }
85
86 fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<bool> {
93 if self.is_local_ip(listener_addr) {
95 bail!("{} Dropping connection attempt to '{listener_addr}' (attempted to self-connect)", Self::OWNER);
96 }
97 if self.number_of_connected_peers() >= self.max_connected_peers() {
99 bail!("{} Dropping connection attempt to '{listener_addr}' (maximum peers reached)", Self::OWNER);
100 }
101 if self.is_connected(listener_addr) {
103 debug!("{} Dropping connection attempt to '{listener_addr}' (already connected)", Self::OWNER);
104 return Ok(true);
105 }
106 if self.is_connecting(listener_addr) {
108 debug!("{} Dropping connection attempt to '{listener_addr}' (already connecting)", Self::OWNER);
109 return Ok(true);
110 }
111 if self.is_ip_banned(listener_addr.ip()) {
113 bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip());
114 }
115 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
117 bail!("{} Dropping connection attempt to '{listener_addr}' (untrusted)", Self::OWNER);
118 }
119 Ok(false)
120 }
121
122 fn connect(&self, listener_addr: SocketAddr) -> Option<task::JoinHandle<bool>> {
127 match self.check_connection_attempt(listener_addr) {
129 Ok(true) => return None,
130 Ok(false) => {}
131 Err(error) => {
132 warn!("{} {error}", Self::OWNER);
133 return None;
134 }
135 }
136
137 let is_trusted_or_bootstrap =
140 self.is_trusted(listener_addr) || bootstrap_peers::<N>(self.is_dev()).contains(&listener_addr);
141
142 let tcp = self.tcp().clone();
143 Some(tokio::spawn(async move {
144 debug!("{} Connecting to {listener_addr}...", Self::OWNER);
145 match tcp.connect(listener_addr).await {
147 Ok(_) => true,
148 Err(error) => {
149 if is_trusted_or_bootstrap {
150 warn!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
151 } else {
152 debug!("{} Unable to connect to '{listener_addr}' - {error}", Self::OWNER);
153 }
154 false
155 }
156 }
157 }))
158 }
159
160 fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
163 if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
164 let tcp = self.tcp().clone();
165 tokio::spawn(async move { tcp.disconnect(connected_addr).await })
166 } else {
167 tokio::spawn(async { false })
168 }
169 }
170
171 fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
175 let mut peer_pool = self.peer_pool().write();
176 let Some(peer) = peer_pool.get_mut(&listener_addr) else {
177 trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
178 return false;
179 };
180
181 if let Peer::Connected(conn_peer) = peer {
182 let aleo_addr = if self.node_type() == NodeType::BootstrapClient
187 && conn_peer.connection_mode == ConnectionMode::Router
188 {
189 None
190 } else {
191 Some(conn_peer.aleo_addr)
192 };
193 self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
194 peer.downgrade_to_candidate(listener_addr);
195 true
196 } else {
197 peer.downgrade_to_candidate(listener_addr);
198 false
199 }
200 }
201
202 fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
206 let trusted_peers = self.trusted_peers();
207
208 let mut peer_pool = self.peer_pool().write();
211
212 let mut num_updates: usize = 0;
214 listener_addrs.retain(|&(addr, height)| {
215 !self.is_ip_banned(addr.ip())
216 && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
217 && peer_pool
218 .get(&addr)
219 .map(|peer| peer.is_candidate() && height.is_some())
220 .inspect(|is_valid_update| {
221 if *is_valid_update {
222 num_updates += 1
223 }
224 })
225 .unwrap_or(true)
226 });
227
228 if listener_addrs.is_empty() {
230 return;
231 }
232
233 if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
235 && Self::PEER_SLASHING_COUNT != 0
236 {
237 let mut peers_to_slash = peer_pool
239 .iter()
240 .filter_map(|(addr, peer)| {
241 (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
242 })
243 .collect::<Vec<_>>();
244
245 let known_peers = self.tcp().known_peers().snapshot();
247
248 let default_value = (0, Instant::now());
250 peers_to_slash.sort_unstable_by_key(|addr| {
251 let (num_failures, last_seen) = known_peers
252 .get(&addr.ip())
253 .map(|stats| (stats.failures(), stats.timestamp()))
254 .unwrap_or(default_value);
255 (cmp::Reverse(num_failures), last_seen)
256 });
257
258 peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
260
261 peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
263
264 self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
266 }
267
268 listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
270
271 if listener_addrs.is_empty() {
273 return;
274 }
275
276 for (addr, height) in listener_addrs {
278 match peer_pool.entry(addr) {
279 Entry::Vacant(entry) => {
280 entry.insert(Peer::new_candidate(addr, false));
281 }
282 Entry::Occupied(mut entry) => {
283 if let Peer::Candidate(peer) = entry.get_mut() {
284 peer.last_height_seen = height;
285 }
286 }
287 }
288 }
289 }
290
291 fn remove_peer(&self, listener_addr: SocketAddr) {
293 self.peer_pool().write().remove(&listener_addr);
294 }
295
296 fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
298 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
299 Some(peer.connected_addr)
300 } else {
301 None
302 }
303 }
304
305 fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
307 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
308 Some(peer.aleo_addr)
309 } else {
310 None
311 }
312 }
313
314 fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
316 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
317 }
318
319 fn is_connected(&self, listener_addr: SocketAddr) -> bool {
321 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
322 }
323
324 fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
326 self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
328 }
329
330 fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
332 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
333 }
334
335 fn number_of_peers(&self) -> usize {
337 self.peer_pool().read().len()
338 }
339
340 fn number_of_connected_peers(&self) -> usize {
342 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
343 }
344
345 fn number_of_connecting_peers(&self) -> usize {
347 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
348 }
349
350 fn number_of_candidate_peers(&self) -> usize {
352 self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
353 }
354
355 fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
357 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
358 Some(peer.clone())
359 } else {
360 None
361 }
362 }
363
364 fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
367 &self,
368 listener_addr: &SocketAddr,
369 mut update_fn: F,
370 ) -> bool {
371 if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
372 update_fn(peer);
373 true
374 } else {
375 false
376 }
377 }
378
379 fn get_peers(&self) -> Vec<Peer<N>> {
381 self.peer_pool().read().values().cloned().collect()
382 }
383
384 fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
386 self.filter_connected_peers(|_| true)
387 }
388
389 fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
392 let mut peers = self.get_connected_peers();
394 let known_peers = self.tcp().known_peers().snapshot();
396
397 peers.sort_unstable_by_key(|peer| {
399 if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
400 (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
402 } else {
403 (cmp::Reverse(peer.last_height_seen), 0)
405 }
406 });
407 if let Some(max) = max_entries {
408 peers.truncate(max);
409 }
410
411 peers
412 }
413
414 fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
416 self.peer_pool()
417 .read()
418 .values()
419 .filter_map(|p| {
420 if let Peer::Connected(peer) = p
421 && predicate(peer)
422 {
423 Some(peer)
424 } else {
425 None
426 }
427 })
428 .cloned()
429 .collect()
430 }
431
432 fn connected_peers(&self) -> Vec<SocketAddr> {
434 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
435 }
436
437 fn trusted_peers(&self) -> Vec<SocketAddr> {
439 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
440 }
441
442 fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
444 self.peer_pool()
445 .read()
446 .values()
447 .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
448 .collect()
449 }
450
451 fn unconnected_trusted_peers(&self) -> HashSet<SocketAddr> {
453 self.peer_pool()
454 .read()
455 .iter()
456 .filter_map(
457 |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None },
458 )
459 .collect()
460 }
461
462 fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
465 let peers = match fs::read_to_string(path) {
466 Ok(cached_peers_str) => {
467 let mut cached_peers = Vec::new();
468 for peer_addr_str in cached_peers_str.lines() {
469 match SocketAddr::from_str(peer_addr_str) {
470 Ok(addr) => cached_peers.push(addr),
471 Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
472 }
473 }
474 cached_peers
475 }
476 Err(error) if error.kind() == io::ErrorKind::NotFound => {
477 Vec::new()
479 }
480 Err(error) => {
481 warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
482 Vec::new()
483 }
484 };
485
486 Ok(peers)
487 }
488
489 fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
497 let mut peers = self.get_peers();
499
500 let known_peers = self.tcp().known_peers().snapshot();
502
503 peers.sort_unstable_by_key(|peer| {
505 if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
506 (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
508 } else {
509 (cmp::Reverse(peer.last_height_seen()), 0)
511 }
512 });
513 if let Some(max) = max_entries {
514 peers.truncate(max);
515 }
516
517 let addrs: HashSet<_> = peers
519 .iter()
520 .map(
521 |peer| {
522 if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
523 },
524 )
525 .collect();
526
527 let mut file = fs::File::create(path)?;
528 for addr in addrs {
529 writeln!(file, "{addr}")?;
530 }
531
532 Ok(())
533 }
534
535 fn add_connecting_peer(&self, listener_addr: SocketAddr) -> bool {
540 match self.peer_pool().write().entry(listener_addr) {
541 Entry::Vacant(entry) => {
542 entry.insert(Peer::new_connecting(listener_addr, false));
543 true
544 }
545 Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
546 entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted()));
547 true
548 }
549 Entry::Occupied(_) => false,
550 }
551 }
552
553 fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
556 if self.is_dev() {
558 return;
559 }
560
561 let ip = listener_addr.ip();
562 debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
563
564 self.tcp().banned_peers().update_ip_ban(ip);
566
567 self.disconnect(listener_addr);
569 self.remove_peer(listener_addr);
571 }
572
573 fn is_ip_banned(&self, ip: IpAddr) -> bool {
575 self.tcp().banned_peers().is_ip_banned(&ip)
576 }
577
578 fn update_ip_ban(&self, ip: IpAddr) {
580 self.tcp().banned_peers().update_ip_ban(ip);
581 }
582}