1use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver};
17
18use snarkos_node_tcp::{ConnectError, P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
19use snarkvm::prelude::{Address, Network};
20
21use anyhow::Result;
22#[cfg(feature = "locktick")]
23use locktick::parking_lot::RwLock;
24#[cfg(not(feature = "locktick"))]
25use parking_lot::RwLock;
26use std::{
27 cmp,
28 collections::{
29 HashSet,
30 hash_map::{Entry, HashMap},
31 },
32 fs,
33 io::{self, Write},
34 net::{IpAddr, SocketAddr},
35 path::Path,
36 str::FromStr,
37 time::Instant,
38};
39use tokio::task;
40use tracing::*;
41
42#[derive(Debug)]
45pub enum PeeringError {
46 NoExternalPeersAllowed,
47}
48
49impl snarkos_node_tcp::ApplicationError for PeeringError {}
50
51impl std::fmt::Display for PeeringError {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 Self::NoExternalPeersAllowed => write!(f, "no untrusted peers allowed"),
55 }
56 }
57}
58
59pub trait PeerPoolHandling<N: Network>: P2P {
60 const OWNER: &str;
61
62 const MAXIMUM_POOL_SIZE: usize;
64
65 const PEER_SLASHING_COUNT: usize;
68
69 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
71
72 fn resolver(&self) -> &RwLock<Resolver<N>>;
74
75 fn is_dev(&self) -> bool;
77
78 fn trusted_peers_only(&self) -> bool;
80
81 fn node_type(&self) -> NodeType;
83
84 fn local_ip(&self) -> SocketAddr {
86 self.tcp().listening_addr().expect("The TCP listener is not enabled")
87 }
88
89 fn is_local_ip(&self, addr: SocketAddr) -> bool {
91 addr == self.local_ip()
92 || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
93 }
94
95 fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
97 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
98 }
99
100 fn max_connected_peers(&self) -> usize {
102 self.tcp().config().max_connections as usize
103 }
104
105 fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
107 if self.is_local_ip(listener_addr) {
109 return Err(ConnectError::SelfConnect { address: listener_addr });
110 }
111 if self.number_of_connected_peers() >= self.max_connected_peers() {
113 return Err(ConnectError::MaximumConnectionsReached { limit: self.max_connected_peers() as u16 });
114 }
115 if self.is_connected(listener_addr) {
117 return Err(ConnectError::AlreadyConnected { address: listener_addr });
118 }
119 if self.is_connecting(listener_addr) {
121 return Err(ConnectError::AlreadyConnecting { address: listener_addr });
122 }
123 if self.is_ip_banned(listener_addr.ip()) {
125 return Err(ConnectError::BannedIp { ip: listener_addr.ip() });
126 }
127 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
129 return Err(ConnectError::application(PeeringError::NoExternalPeersAllowed));
130 }
131
132 Ok(())
133 }
134
135 fn connect(&self, listener_addr: SocketAddr) -> Result<task::JoinHandle<Result<(), ConnectError>>, ConnectError> {
143 self.check_connection_attempt(listener_addr)?;
145
146 if let Some(Peer::Candidate(peer)) = self.peer_pool().write().get_mut(&listener_addr) {
148 peer.last_connection_attempt = Some(Instant::now());
149 peer.total_connection_attempts += 1;
150 } else {
151 warn!("{} No candidate peer entry exists for '{listener_addr:?}' while connecting.", Self::OWNER);
152 }
153
154 let tcp = self.tcp().clone();
155 Ok(tokio::spawn(async move {
156 debug!("{} Connecting to {listener_addr}...", Self::OWNER);
157 tcp.connect(listener_addr).await
158 }))
159 }
160
161 fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
164 if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
165 let tcp = self.tcp().clone();
166 tokio::spawn(async move { tcp.disconnect(connected_addr).await })
167 } else {
168 tokio::spawn(async { false })
169 }
170 }
171
172 fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
176 let mut peer_pool = self.peer_pool().write();
177 let Some(peer) = peer_pool.get_mut(&listener_addr) else {
178 trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
179 return false;
180 };
181
182 if let Peer::Connected(conn_peer) = peer {
183 let aleo_addr = if self.node_type() == NodeType::BootstrapClient
188 && conn_peer.connection_mode == ConnectionMode::Router
189 {
190 None
191 } else {
192 Some(conn_peer.aleo_addr)
193 };
194 self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
195 peer.downgrade_to_candidate(listener_addr);
196 true
197 } else {
198 peer.downgrade_to_candidate(listener_addr);
199 false
200 }
201 }
202
203 fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
207 let trusted_peers = self.trusted_peers();
208
209 let mut peer_pool = self.peer_pool().write();
212
213 let mut num_updates: usize = 0;
215 listener_addrs.retain(|&(addr, height)| {
216 !self.is_ip_banned(addr.ip())
217 && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
218 && peer_pool
219 .get(&addr)
220 .map(|peer| peer.is_candidate() && height.is_some())
221 .inspect(|is_valid_update| {
222 if *is_valid_update {
223 num_updates += 1
224 }
225 })
226 .unwrap_or(true)
227 });
228
229 if listener_addrs.is_empty() {
231 return;
232 }
233
234 if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
236 && Self::PEER_SLASHING_COUNT != 0
237 {
238 let mut peers_to_slash = peer_pool
240 .iter()
241 .filter_map(|(addr, peer)| {
242 (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
243 })
244 .collect::<Vec<_>>();
245
246 let known_peers = self.tcp().known_peers().snapshot();
248
249 let default_value = (0, Instant::now());
251 peers_to_slash.sort_unstable_by_key(|addr| {
252 let (num_failures, last_seen) = known_peers
253 .get(&addr.ip())
254 .map(|stats| (stats.failures(), stats.timestamp()))
255 .unwrap_or(default_value);
256 (cmp::Reverse(num_failures), last_seen)
257 });
258
259 peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
261
262 peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
264
265 self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
267 }
268
269 listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
271
272 if listener_addrs.is_empty() {
274 return;
275 }
276
277 for (addr, height) in listener_addrs {
279 match peer_pool.entry(addr) {
280 Entry::Vacant(entry) => {
281 entry.insert(Peer::new_candidate(addr, false));
282 }
283 Entry::Occupied(mut entry) => {
284 if let Peer::Candidate(peer) = entry.get_mut() {
285 peer.last_height_seen = height;
286 }
287 }
288 }
289 }
290 }
291
292 fn remove_peer(&self, listener_addr: SocketAddr) {
294 self.peer_pool().write().remove(&listener_addr);
295 }
296
297 fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
299 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
300 Some(peer.connected_addr)
301 } else {
302 None
303 }
304 }
305
306 fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
308 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
309 Some(peer.aleo_addr)
310 } else {
311 None
312 }
313 }
314
315 fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
317 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
318 }
319
320 fn is_connected(&self, listener_addr: SocketAddr) -> bool {
322 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
323 }
324
325 fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
327 self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
329 }
330
331 fn is_connecting_or_connected(&self, listener_addr: SocketAddr) -> bool {
333 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting() || peer.is_connected())
334 }
335
336 fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
338 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
339 }
340
341 fn number_of_peers(&self) -> usize {
343 self.peer_pool().read().len()
344 }
345
346 fn number_of_connected_peers(&self) -> usize {
348 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count()
349 }
350
351 fn number_of_connecting_peers(&self) -> usize {
353 self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count()
354 }
355
356 fn number_of_candidate_peers(&self) -> usize {
358 self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
359 }
360
361 fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
363 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
364 Some(peer.clone())
365 } else {
366 None
367 }
368 }
369
370 fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
373 &self,
374 listener_addr: &SocketAddr,
375 mut update_fn: F,
376 ) -> bool {
377 if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
378 update_fn(peer);
379 true
380 } else {
381 false
382 }
383 }
384
385 fn get_peers(&self) -> Vec<Peer<N>> {
387 self.peer_pool().read().values().cloned().collect()
388 }
389
390 fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
392 self.filter_connected_peers(|_| true)
393 }
394
395 fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
398 let mut peers = self.get_connected_peers();
400 let known_peers = self.tcp().known_peers().snapshot();
402
403 peers.sort_unstable_by_key(|peer| {
405 if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
406 (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
408 } else {
409 (cmp::Reverse(peer.last_height_seen), 0)
411 }
412 });
413 if let Some(max) = max_entries {
414 peers.truncate(max);
415 }
416
417 peers
418 }
419
420 fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
422 self.peer_pool()
423 .read()
424 .values()
425 .filter_map(|p| {
426 if let Peer::Connected(peer) = p
427 && predicate(peer)
428 {
429 Some(peer)
430 } else {
431 None
432 }
433 })
434 .cloned()
435 .collect()
436 }
437
438 fn connected_peers(&self) -> Vec<SocketAddr> {
440 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
441 }
442
443 fn trusted_peers(&self) -> Vec<SocketAddr> {
445 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
446 }
447
448 fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
450 self.peer_pool()
451 .read()
452 .values()
453 .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
454 .collect()
455 }
456
457 fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
459 self.peer_pool()
460 .read()
461 .values()
462 .filter_map(|peer| {
463 if let Peer::Candidate(peer) = peer
464 && peer.trusted
465 {
466 Some(peer.clone())
467 } else {
468 None
469 }
470 })
471 .collect()
472 }
473
474 fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
477 let peers = match fs::read_to_string(path) {
478 Ok(cached_peers_str) => {
479 let mut cached_peers = Vec::new();
480 for peer_addr_str in cached_peers_str.lines() {
481 match SocketAddr::from_str(peer_addr_str) {
482 Ok(addr) => cached_peers.push(addr),
483 Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
484 }
485 }
486 cached_peers
487 }
488 Err(error) if error.kind() == io::ErrorKind::NotFound => {
489 Vec::new()
491 }
492 Err(error) => {
493 warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
494 Vec::new()
495 }
496 };
497
498 Ok(peers)
499 }
500
501 fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
509 let mut peers = self.get_peers();
511
512 let known_peers = self.tcp().known_peers().snapshot();
514
515 peers.sort_unstable_by_key(|peer| {
517 if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
518 (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
520 } else {
521 (cmp::Reverse(peer.last_height_seen()), 0)
523 }
524 });
525 if let Some(max) = max_entries {
526 peers.truncate(max);
527 }
528
529 let addrs: HashSet<_> = peers
531 .iter()
532 .map(
533 |peer| {
534 if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
535 },
536 )
537 .collect();
538
539 let mut file = fs::File::create(path)?;
540 for addr in addrs {
541 writeln!(file, "{addr}")?;
542 }
543
544 Ok(())
545 }
546
547 fn add_connecting_peer(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
552 match self.peer_pool().write().entry(listener_addr) {
553 Entry::Vacant(entry) => {
554 entry.insert(Peer::new_connecting(listener_addr, false));
555 Ok(())
556 }
557 Entry::Occupied(mut entry) => match entry.get() {
558 peer @ Peer::Candidate(_) => {
559 entry.insert(Peer::new_connecting(listener_addr, peer.is_trusted()));
560 Ok(())
561 }
562 Peer::Connecting(_) => Err(ConnectError::AlreadyConnecting { address: listener_addr }),
563 Peer::Connected(_) => Err(ConnectError::AlreadyConnected { address: listener_addr }),
564 },
565 }
566 }
567
568 fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
571 if self.is_dev() {
573 return;
574 }
575
576 let ip = listener_addr.ip();
577 debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
578
579 self.tcp().banned_peers().update_ip_ban(ip);
581
582 self.disconnect(listener_addr);
584 self.remove_peer(listener_addr);
586 }
587
588 fn is_ip_banned(&self, ip: IpAddr) -> bool {
590 self.tcp().banned_peers().is_ip_banned(&ip)
591 }
592
593 fn update_ip_ban(&self, ip: IpAddr) {
595 self.tcp().banned_peers().update_ip_ban(ip);
596 }
597}