1use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver};
17
18#[cfg(test)]
19mod tests {
20 use super::*;
21 use crate::Peer;
22 use snarkos_node_tcp::{Config, P2P, Tcp};
23 use snarkvm::{prelude::Rng, utilities::TestRng};
24
25 use std::{collections::HashMap, net::SocketAddr, time::Instant};
26
27 type CurrentNetwork = snarkvm::prelude::MainnetV0;
28
29 struct MockPeerPool<N: Network> {
30 tcp: Tcp,
31 peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
32 resolver: RwLock<Resolver<N>>,
33 }
34
35 impl<N: Network> MockPeerPool<N> {
36 fn new() -> Self {
37 let config = Config { listener_ip: None, ..Default::default() };
38 Self { tcp: Tcp::new(config), peer_pool: Default::default(), resolver: Default::default() }
39 }
40 }
41
42 impl<N: Network> P2P for MockPeerPool<N> {
43 fn tcp(&self) -> &Tcp {
44 &self.tcp
45 }
46 }
47
48 impl<N: Network> PeerPoolHandling<N> for MockPeerPool<N> {
49 const MAXIMUM_POOL_SIZE: usize = 100;
50 const OWNER: &str = "MockPeerPool";
51 const PEER_SLASHING_COUNT: usize = 10;
52
53 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
54 &self.peer_pool
55 }
56
57 fn resolver(&self) -> &RwLock<Resolver<N>> {
58 &self.resolver
59 }
60
61 fn is_dev(&self) -> bool {
62 false
63 }
64
65 fn trusted_peers_only(&self) -> bool {
66 false
67 }
68
69 fn node_type(&self) -> NodeType {
70 NodeType::Client
71 }
72 }
73
74 fn make_connected_peer(port: u16, node_type: NodeType, rng: &mut TestRng) -> (SocketAddr, Peer<CurrentNetwork>) {
75 use snarkvm::prelude::Address;
76 let listener_addr = SocketAddr::from(([127, 0, 0, 1], port));
77 let connected_addr = SocketAddr::from(([127, 0, 0, 1], port + 10000));
78 let now = Instant::now();
79 let peer = Peer::Connected(ConnectedPeer {
80 listener_addr,
81 connected_addr,
82 connection_mode: ConnectionMode::Router,
83 trusted: false,
84 aleo_addr: Address::<CurrentNetwork>::new(rng.random()),
85 node_type,
86 version: 1,
87 snarkos_sha: None,
88 last_height_seen: None,
89 first_seen: now,
90 last_seen: now,
91 });
92 (listener_addr, peer)
93 }
94
95 #[test]
96 fn test_peer_state_transitions() {
97 use snarkvm::prelude::Address;
98
99 let pool = MockPeerPool::<CurrentNetwork>::new();
100 let mut rng = TestRng::default();
101
102 let listener_addr = SocketAddr::from(([192, 0, 2, 1], 4000));
103 let connected_addr = SocketAddr::from(([192, 0, 2, 1], 14000));
104 let aleo_addr = Address::<CurrentNetwork>::new(rng.random());
105
106 pool.peer_pool().write().insert(listener_addr, Peer::new_candidate(listener_addr, false));
108
109 assert_eq!(pool.number_of_candidate_peers(), 1);
110 assert_eq!(pool.number_of_connecting_peers(), Some(0));
111 assert_eq!(pool.number_of_connected_peers(), 0);
112 assert!(!pool.is_connecting(listener_addr));
113 assert!(!pool.is_connected(listener_addr));
114
115 assert!(pool.add_connecting_peer(listener_addr).is_ok());
117
118 assert_eq!(pool.number_of_candidate_peers(), 0);
119 assert_eq!(pool.number_of_connecting_peers(), Some(1));
120 assert_eq!(pool.number_of_connected_peers(), 0);
121 assert!(pool.is_connecting(listener_addr));
122 assert!(!pool.is_connected(listener_addr));
123
124 pool.peer_pool().write().get_mut(&listener_addr).unwrap().upgrade_to_connected(
126 connected_addr,
127 listener_addr.port(),
128 aleo_addr,
129 NodeType::Validator,
130 1,
131 None,
132 ConnectionMode::Router,
133 );
134
135 assert_eq!(pool.number_of_candidate_peers(), 0);
136 assert_eq!(pool.number_of_connecting_peers(), Some(0));
137 assert_eq!(pool.number_of_connected_peers(), 1);
138 assert!(!pool.is_connecting(listener_addr));
139 assert!(pool.is_connected(listener_addr));
140 assert_eq!(pool.number_of_connected_validators(), Some(1));
141
142 let connected = pool.get_connected_peer(listener_addr).expect("peer should be connected");
144 assert_eq!(connected.listener_addr, listener_addr);
145 assert_eq!(connected.connected_addr, connected_addr);
146 assert_eq!(connected.aleo_addr, aleo_addr);
147 assert_eq!(connected.node_type, NodeType::Validator);
148 }
149
150 #[test]
151 fn test_number_of_connected_validators() {
152 let pool = MockPeerPool::<CurrentNetwork>::new();
153 let mut rng = TestRng::default();
154
155 assert_eq!(pool.number_of_connected_validators(), Some(0));
157
158 let (addr1, peer1) = make_connected_peer(3000, NodeType::Validator, &mut rng);
160 let (addr2, peer2) = make_connected_peer(3001, NodeType::Validator, &mut rng);
161 let (addr3, peer3) = make_connected_peer(3002, NodeType::Client, &mut rng);
162 {
163 let mut pool_write = pool.peer_pool().write();
164 pool_write.insert(addr1, peer1);
165 pool_write.insert(addr2, peer2);
166 pool_write.insert(addr3, peer3);
167 }
168
169 assert_eq!(pool.number_of_connected_validators(), Some(2));
170 assert_eq!(pool.number_of_connected_peers(), 3);
171
172 let candidate_addr = SocketAddr::from(([127, 0, 0, 1], 3003));
174 pool.peer_pool().write().insert(candidate_addr, Peer::new_candidate(candidate_addr, false));
175
176 assert_eq!(pool.number_of_connected_validators(), Some(2));
177 assert_eq!(pool.number_of_connected_peers(), 3);
178 }
179}
180
181use snarkos_node_tcp::{ConnectError, P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
182use snarkvm::prelude::{Address, Network};
183
184use anyhow::Result;
185#[cfg(feature = "locktick")]
186use locktick::parking_lot::RwLock;
187#[cfg(not(feature = "locktick"))]
188use parking_lot::RwLock;
189use std::{
190 cmp,
191 collections::{
192 HashSet,
193 hash_map::{Entry, HashMap},
194 },
195 fs,
196 io::{self, Write},
197 net::{IpAddr, SocketAddr},
198 path::Path,
199 str::FromStr,
200 time::Instant,
201};
202use tokio::task;
203use tracing::*;
204
205#[derive(Debug)]
208pub enum PeeringError {
209 NoExternalPeersAllowed,
210}
211
212impl snarkos_node_tcp::ApplicationError for PeeringError {}
213
214impl std::fmt::Display for PeeringError {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 match self {
217 Self::NoExternalPeersAllowed => write!(f, "no untrusted peers allowed"),
218 }
219 }
220}
221
222pub trait PeerPoolHandling<N: Network>: P2P {
223 const OWNER: &str;
224
225 const MAXIMUM_POOL_SIZE: usize;
227
228 const PEER_SLASHING_COUNT: usize;
231
232 fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
234
235 fn resolver(&self) -> &RwLock<Resolver<N>>;
237
238 fn is_dev(&self) -> bool;
240
241 fn trusted_peers_only(&self) -> bool;
243
244 fn node_type(&self) -> NodeType;
246
247 fn local_ip(&self) -> SocketAddr {
249 self.tcp().listening_addr().expect("The TCP listener is not enabled")
250 }
251
252 fn is_local_ip(&self, addr: SocketAddr) -> bool {
254 addr == self.local_ip()
255 || (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
256 }
257
258 fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
260 !self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
261 }
262
263 fn max_connected_peers(&self) -> usize {
265 self.tcp().config().max_connections as usize
266 }
267
268 fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
270 if self.is_local_ip(listener_addr) {
272 return Err(ConnectError::SelfConnect { address: listener_addr });
273 }
274 if self.number_of_connected_peers() >= self.max_connected_peers() {
276 return Err(ConnectError::MaximumConnectionsReached { limit: self.max_connected_peers() as u16 });
277 }
278 if self.is_connected(listener_addr) {
280 return Err(ConnectError::AlreadyConnected { address: listener_addr });
281 }
282 if self.is_connecting(listener_addr) {
284 return Err(ConnectError::AlreadyConnecting { address: listener_addr });
285 }
286 if self.is_ip_banned(listener_addr.ip()) {
288 return Err(ConnectError::BannedIp { ip: listener_addr.ip() });
289 }
290 if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
292 return Err(ConnectError::application(PeeringError::NoExternalPeersAllowed));
293 }
294
295 Ok(())
296 }
297
298 fn connect(&self, listener_addr: SocketAddr) -> Result<task::JoinHandle<Result<(), ConnectError>>, ConnectError> {
306 self.check_connection_attempt(listener_addr)?;
308
309 if let Some(Peer::Candidate(peer)) = self.peer_pool().write().get_mut(&listener_addr) {
311 peer.last_connection_attempt = Some(Instant::now());
312 peer.total_connection_attempts += 1;
313 } else {
314 warn!("{} No candidate peer entry exists for '{listener_addr:?}' while connecting.", Self::OWNER);
315 }
316
317 let tcp = self.tcp().clone();
318 Ok(tokio::spawn(async move {
319 debug!("{} Connecting to {listener_addr}...", Self::OWNER);
320 tcp.connect(listener_addr).await
321 }))
322 }
323
324 fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
327 if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
328 let tcp = self.tcp().clone();
329 tokio::spawn(async move { tcp.disconnect(connected_addr).await })
330 } else {
331 tokio::spawn(async { false })
332 }
333 }
334
335 fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
339 let mut peer_pool = self.peer_pool().write();
340 let Some(peer) = peer_pool.get_mut(&listener_addr) else {
341 trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
342 return false;
343 };
344
345 if let Peer::Connected(conn_peer) = peer {
346 let aleo_addr = if self.node_type() == NodeType::BootstrapClient
351 && conn_peer.connection_mode == ConnectionMode::Router
352 {
353 None
354 } else {
355 Some(conn_peer.aleo_addr)
356 };
357 self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
358 peer.downgrade_to_candidate(listener_addr);
359 true
360 } else {
361 peer.downgrade_to_candidate(listener_addr);
362 false
363 }
364 }
365
366 fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
370 let trusted_peers = self.trusted_peers();
371
372 let mut peer_pool = self.peer_pool().write();
375
376 let mut num_updates: usize = 0;
378 listener_addrs.retain(|&(addr, height)| {
379 !self.is_ip_banned(addr.ip())
380 && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
381 && peer_pool
382 .get(&addr)
383 .map(|peer| peer.is_candidate() && height.is_some())
384 .inspect(|is_valid_update| {
385 if *is_valid_update {
386 num_updates += 1
387 }
388 })
389 .unwrap_or(true)
390 });
391
392 if listener_addrs.is_empty() {
394 return;
395 }
396
397 if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
399 && Self::PEER_SLASHING_COUNT != 0
400 {
401 let mut peers_to_slash = peer_pool
403 .iter()
404 .filter_map(|(addr, peer)| {
405 (matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
406 })
407 .collect::<Vec<_>>();
408
409 let known_peers = self.tcp().known_peers().snapshot();
411
412 let default_value = (0, Instant::now());
414 peers_to_slash.sort_unstable_by_key(|addr| {
415 let (num_failures, last_seen) = known_peers
416 .get(&addr.ip())
417 .map(|stats| (stats.failures(), stats.timestamp()))
418 .unwrap_or(default_value);
419 (cmp::Reverse(num_failures), last_seen)
420 });
421
422 peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
424
425 peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
427
428 self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
430 }
431
432 listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
434
435 if listener_addrs.is_empty() {
437 return;
438 }
439
440 for (addr, height) in listener_addrs {
442 match peer_pool.entry(addr) {
443 Entry::Vacant(entry) => {
444 entry.insert(Peer::new_candidate(addr, false));
445 }
446 Entry::Occupied(mut entry) => {
447 if let Peer::Candidate(peer) = entry.get_mut() {
448 peer.last_height_seen = height;
449 }
450 }
451 }
452 }
453 }
454
455 fn remove_peer(&self, listener_addr: SocketAddr) {
457 self.peer_pool().write().remove(&listener_addr);
458 }
459
460 fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
462 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
463 Some(peer.connected_addr)
464 } else {
465 None
466 }
467 }
468
469 fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
471 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
472 Some(peer.aleo_addr)
473 } else {
474 None
475 }
476 }
477
478 fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
480 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
481 }
482
483 fn is_connected(&self, listener_addr: SocketAddr) -> bool {
485 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
486 }
487
488 fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
490 self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
492 }
493
494 fn is_connecting_or_connected(&self, listener_addr: SocketAddr) -> bool {
496 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting() || peer.is_connected())
497 }
498
499 fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
501 self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
502 }
503
504 fn number_of_peers(&self) -> usize {
506 self.peer_pool().read().len()
507 }
508
509 fn number_of_connected_peers(&self) -> usize {
511 self.peer_pool().read().values().filter(|peer| peer.is_connected()).count()
512 }
513
514 #[cfg(feature = "metrics")]
516 fn number_of_connected_validators(&self) -> Option<usize> {
517 Some(
518 self.peer_pool()
519 .try_read()?
520 .values()
521 .filter(|peer| peer.as_connected().is_some_and(|peer| peer.is_validator()))
522 .count(),
523 )
524 }
525
526 #[cfg(feature = "metrics")]
528 fn number_of_connecting_peers(&self) -> Option<usize> {
529 Some(self.peer_pool().try_read()?.values().filter(|peer| peer.is_connecting()).count())
530 }
531
532 fn number_of_candidate_peers(&self) -> usize {
534 self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
535 }
536
537 fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
539 if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
540 Some(peer.clone())
541 } else {
542 None
543 }
544 }
545
546 fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
549 &self,
550 listener_addr: &SocketAddr,
551 mut update_fn: F,
552 ) -> bool {
553 if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
554 update_fn(peer);
555 true
556 } else {
557 false
558 }
559 }
560
561 fn get_peers(&self) -> Vec<Peer<N>> {
563 self.peer_pool().read().values().cloned().collect()
564 }
565
566 fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
568 self.filter_connected_peers(|_| true)
569 }
570
571 fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
574 let mut peers = self.get_connected_peers();
576 let known_peers = self.tcp().known_peers().snapshot();
578
579 peers.sort_unstable_by_key(|peer| {
581 if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
582 (cmp::Reverse(peer.last_height_seen), peer_stats.failures())
584 } else {
585 (cmp::Reverse(peer.last_height_seen), 0)
587 }
588 });
589 if let Some(max) = max_entries {
590 peers.truncate(max);
591 }
592
593 peers
594 }
595
596 fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
598 self.peer_pool()
599 .read()
600 .values()
601 .filter_map(|p| {
602 if let Peer::Connected(peer) = p
603 && predicate(peer)
604 {
605 Some(peer)
606 } else {
607 None
608 }
609 })
610 .cloned()
611 .collect()
612 }
613
614 fn connected_peers(&self) -> Vec<SocketAddr> {
616 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
617 }
618
619 fn trusted_peers(&self) -> Vec<SocketAddr> {
621 self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
622 }
623
624 fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
626 self.peer_pool()
627 .read()
628 .values()
629 .filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
630 .collect()
631 }
632
633 fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
635 self.peer_pool()
636 .read()
637 .values()
638 .filter_map(|peer| {
639 if let Peer::Candidate(peer) = peer
640 && peer.trusted
641 {
642 Some(peer.clone())
643 } else {
644 None
645 }
646 })
647 .collect()
648 }
649
650 fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
653 let peers = match fs::read_to_string(path) {
654 Ok(cached_peers_str) => {
655 let mut cached_peers = Vec::new();
656 for peer_addr_str in cached_peers_str.lines() {
657 match SocketAddr::from_str(peer_addr_str) {
658 Ok(addr) => cached_peers.push(addr),
659 Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
660 }
661 }
662 cached_peers
663 }
664 Err(error) if error.kind() == io::ErrorKind::NotFound => {
665 Vec::new()
667 }
668 Err(error) => {
669 warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
670 Vec::new()
671 }
672 };
673
674 Ok(peers)
675 }
676
677 fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
685 let mut peers = self.get_peers();
687
688 let known_peers = self.tcp().known_peers().snapshot();
690
691 peers.sort_unstable_by_key(|peer| {
693 if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
694 (cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
696 } else {
697 (cmp::Reverse(peer.last_height_seen()), 0)
699 }
700 });
701 if let Some(max) = max_entries {
702 peers.truncate(max);
703 }
704
705 let addrs: HashSet<_> = peers
707 .iter()
708 .map(
709 |peer| {
710 if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
711 },
712 )
713 .collect();
714
715 let mut file = fs::File::create(path)?;
716 for addr in addrs {
717 writeln!(file, "{addr}")?;
718 }
719
720 Ok(())
721 }
722
723 fn add_connecting_peer(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
728 match self.peer_pool().write().entry(listener_addr) {
729 Entry::Vacant(entry) => {
730 entry.insert(Peer::new_connecting(listener_addr, false));
731 Ok(())
732 }
733 Entry::Occupied(mut entry) => match entry.get() {
734 peer @ Peer::Candidate(_) => {
735 entry.insert(Peer::new_connecting(listener_addr, peer.is_trusted()));
736 Ok(())
737 }
738 Peer::Connecting(_) => Err(ConnectError::AlreadyConnecting { address: listener_addr }),
739 Peer::Connected(_) => Err(ConnectError::AlreadyConnected { address: listener_addr }),
740 },
741 }
742 }
743
744 fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
747 if self.is_dev() {
749 return;
750 }
751
752 let ip = listener_addr.ip();
753 debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
754
755 self.tcp().banned_peers().update_ip_ban(ip);
757
758 self.disconnect(listener_addr);
760 self.remove_peer(listener_addr);
762 }
763
764 fn is_ip_banned(&self, ip: IpAddr) -> bool {
766 self.tcp().banned_peers().is_ip_banned(&ip)
767 }
768
769 fn update_ip_ban(&self, ip: IpAddr) {
771 self.tcp().banned_peers().update_ip_ban(ip);
772 }
773}