use crate::{CandidatePeer, ConnectedPeer, ConnectionMode, NodeType, Peer, Resolver};
#[cfg(test)]
mod tests {
use super::*;
use crate::Peer;
use snarkos_node_tcp::{Config, P2P, Tcp};
use snarkvm::{prelude::Rng, utilities::TestRng};
use std::{collections::HashMap, net::SocketAddr, time::Instant};
type CurrentNetwork = snarkvm::prelude::MainnetV0;
struct MockPeerPool<N: Network> {
tcp: Tcp,
peer_pool: RwLock<HashMap<SocketAddr, Peer<N>>>,
resolver: RwLock<Resolver<N>>,
}
impl<N: Network> MockPeerPool<N> {
fn new() -> Self {
let config = Config { listener_ip: None, ..Default::default() };
Self { tcp: Tcp::new(config), peer_pool: Default::default(), resolver: Default::default() }
}
}
impl<N: Network> P2P for MockPeerPool<N> {
fn tcp(&self) -> &Tcp {
&self.tcp
}
}
impl<N: Network> PeerPoolHandling<N> for MockPeerPool<N> {
const MAXIMUM_POOL_SIZE: usize = 100;
const OWNER: &str = "MockPeerPool";
const PEER_SLASHING_COUNT: usize = 10;
fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
&self.peer_pool
}
fn resolver(&self) -> &RwLock<Resolver<N>> {
&self.resolver
}
fn is_dev(&self) -> bool {
false
}
fn trusted_peers_only(&self) -> bool {
false
}
fn node_type(&self) -> NodeType {
NodeType::Client
}
}
fn make_connected_peer(port: u16, node_type: NodeType, rng: &mut TestRng) -> (SocketAddr, Peer<CurrentNetwork>) {
use snarkvm::prelude::Address;
let listener_addr = SocketAddr::from(([127, 0, 0, 1], port));
let connected_addr = SocketAddr::from(([127, 0, 0, 1], port + 10000));
let now = Instant::now();
let peer = Peer::Connected(ConnectedPeer {
listener_addr,
connected_addr,
connection_mode: ConnectionMode::Router,
trusted: false,
aleo_addr: Address::<CurrentNetwork>::new(rng.random()),
node_type,
version: 1,
snarkos_sha: None,
last_height_seen: None,
first_seen: now,
last_seen: now,
});
(listener_addr, peer)
}
#[test]
fn test_peer_state_transitions() {
use snarkvm::prelude::Address;
let pool = MockPeerPool::<CurrentNetwork>::new();
let mut rng = TestRng::default();
let listener_addr = SocketAddr::from(([192, 0, 2, 1], 4000));
let connected_addr = SocketAddr::from(([192, 0, 2, 1], 14000));
let aleo_addr = Address::<CurrentNetwork>::new(rng.random());
pool.peer_pool().write().insert(listener_addr, Peer::new_candidate(listener_addr, false));
assert_eq!(pool.number_of_candidate_peers(), 1);
assert_eq!(pool.number_of_connecting_peers(), Some(0));
assert_eq!(pool.number_of_connected_peers(), 0);
assert!(!pool.is_connecting(listener_addr));
assert!(!pool.is_connected(listener_addr));
assert!(pool.add_connecting_peer(listener_addr).is_ok());
assert_eq!(pool.number_of_candidate_peers(), 0);
assert_eq!(pool.number_of_connecting_peers(), Some(1));
assert_eq!(pool.number_of_connected_peers(), 0);
assert!(pool.is_connecting(listener_addr));
assert!(!pool.is_connected(listener_addr));
pool.peer_pool().write().get_mut(&listener_addr).unwrap().upgrade_to_connected(
connected_addr,
listener_addr.port(),
aleo_addr,
NodeType::Validator,
1,
None,
ConnectionMode::Router,
);
assert_eq!(pool.number_of_candidate_peers(), 0);
assert_eq!(pool.number_of_connecting_peers(), Some(0));
assert_eq!(pool.number_of_connected_peers(), 1);
assert!(!pool.is_connecting(listener_addr));
assert!(pool.is_connected(listener_addr));
assert_eq!(pool.number_of_connected_validators(), Some(1));
let connected = pool.get_connected_peer(listener_addr).expect("peer should be connected");
assert_eq!(connected.listener_addr, listener_addr);
assert_eq!(connected.connected_addr, connected_addr);
assert_eq!(connected.aleo_addr, aleo_addr);
assert_eq!(connected.node_type, NodeType::Validator);
}
#[test]
fn test_number_of_connected_validators() {
let pool = MockPeerPool::<CurrentNetwork>::new();
let mut rng = TestRng::default();
assert_eq!(pool.number_of_connected_validators(), Some(0));
let (addr1, peer1) = make_connected_peer(3000, NodeType::Validator, &mut rng);
let (addr2, peer2) = make_connected_peer(3001, NodeType::Validator, &mut rng);
let (addr3, peer3) = make_connected_peer(3002, NodeType::Client, &mut rng);
{
let mut pool_write = pool.peer_pool().write();
pool_write.insert(addr1, peer1);
pool_write.insert(addr2, peer2);
pool_write.insert(addr3, peer3);
}
assert_eq!(pool.number_of_connected_validators(), Some(2));
assert_eq!(pool.number_of_connected_peers(), 3);
let candidate_addr = SocketAddr::from(([127, 0, 0, 1], 3003));
pool.peer_pool().write().insert(candidate_addr, Peer::new_candidate(candidate_addr, false));
assert_eq!(pool.number_of_connected_validators(), Some(2));
assert_eq!(pool.number_of_connected_peers(), 3);
}
}
use snarkos_node_tcp::{ConnectError, P2P, is_bogon_ip, is_unspecified_or_broadcast_ip};
use snarkvm::prelude::{Address, Network};
use anyhow::Result;
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{
cmp,
collections::{
HashSet,
hash_map::{Entry, HashMap},
},
fs,
io::{self, Write},
net::{IpAddr, SocketAddr},
path::Path,
str::FromStr,
time::Instant,
};
use tokio::task;
use tracing::*;
#[derive(Debug)]
pub enum PeeringError {
NoExternalPeersAllowed,
}
impl snarkos_node_tcp::ApplicationError for PeeringError {}
impl std::fmt::Display for PeeringError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NoExternalPeersAllowed => write!(f, "no untrusted peers allowed"),
}
}
}
pub trait PeerPoolHandling<N: Network>: P2P {
const OWNER: &str;
const MAXIMUM_POOL_SIZE: usize;
const PEER_SLASHING_COUNT: usize;
fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>>;
fn resolver(&self) -> &RwLock<Resolver<N>>;
fn is_dev(&self) -> bool;
fn trusted_peers_only(&self) -> bool;
fn node_type(&self) -> NodeType;
fn local_ip(&self) -> SocketAddr {
self.tcp().listening_addr().expect("The TCP listener is not enabled")
}
fn is_local_ip(&self, addr: SocketAddr) -> bool {
addr == self.local_ip()
|| (addr.ip().is_unspecified() || addr.ip().is_loopback()) && addr.port() == self.local_ip().port()
}
fn is_valid_peer_ip(&self, ip: SocketAddr) -> bool {
!self.is_local_ip(ip) && !is_bogon_ip(ip.ip()) && !is_unspecified_or_broadcast_ip(ip.ip())
}
fn max_connected_peers(&self) -> usize {
self.tcp().config().max_connections as usize
}
fn check_connection_attempt(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
if self.is_local_ip(listener_addr) {
return Err(ConnectError::SelfConnect { address: listener_addr });
}
if self.number_of_connected_peers() >= self.max_connected_peers() {
return Err(ConnectError::MaximumConnectionsReached { limit: self.max_connected_peers() as u16 });
}
if self.is_connected(listener_addr) {
return Err(ConnectError::AlreadyConnected { address: listener_addr });
}
if self.is_connecting(listener_addr) {
return Err(ConnectError::AlreadyConnecting { address: listener_addr });
}
if self.is_ip_banned(listener_addr.ip()) {
return Err(ConnectError::BannedIp { ip: listener_addr.ip() });
}
if self.trusted_peers_only() && !self.is_trusted(listener_addr) {
return Err(ConnectError::application(PeeringError::NoExternalPeersAllowed));
}
Ok(())
}
fn connect(&self, listener_addr: SocketAddr) -> Result<task::JoinHandle<Result<(), ConnectError>>, ConnectError> {
self.check_connection_attempt(listener_addr)?;
if let Some(Peer::Candidate(peer)) = self.peer_pool().write().get_mut(&listener_addr) {
peer.last_connection_attempt = Some(Instant::now());
peer.total_connection_attempts += 1;
} else {
warn!("{} No candidate peer entry exists for '{listener_addr:?}' while connecting.", Self::OWNER);
}
let tcp = self.tcp().clone();
Ok(tokio::spawn(async move {
debug!("{} Connecting to {listener_addr}...", Self::OWNER);
tcp.connect(listener_addr).await
}))
}
fn disconnect(&self, listener_addr: SocketAddr) -> task::JoinHandle<bool> {
if let Some(connected_addr) = self.resolve_to_ambiguous(listener_addr) {
let tcp = self.tcp().clone();
tokio::spawn(async move { tcp.disconnect(connected_addr).await })
} else {
tokio::spawn(async { false })
}
}
fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) -> bool {
let mut peer_pool = self.peer_pool().write();
let Some(peer) = peer_pool.get_mut(&listener_addr) else {
trace!("{} Downgrade peer to candidate failed - peer not found", Self::OWNER);
return false;
};
if let Peer::Connected(conn_peer) = peer {
let aleo_addr = if self.node_type() == NodeType::BootstrapClient
&& conn_peer.connection_mode == ConnectionMode::Router
{
None
} else {
Some(conn_peer.aleo_addr)
};
self.resolver().write().remove_peer(conn_peer.connected_addr, aleo_addr);
peer.downgrade_to_candidate(listener_addr);
true
} else {
peer.downgrade_to_candidate(listener_addr);
false
}
}
fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
let trusted_peers = self.trusted_peers();
let mut peer_pool = self.peer_pool().write();
let mut num_updates: usize = 0;
listener_addrs.retain(|&(addr, height)| {
!self.is_ip_banned(addr.ip())
&& if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
&& peer_pool
.get(&addr)
.map(|peer| peer.is_candidate() && height.is_some())
.inspect(|is_valid_update| {
if *is_valid_update {
num_updates += 1
}
})
.unwrap_or(true)
});
if listener_addrs.is_empty() {
return;
}
if peer_pool.len() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
&& Self::PEER_SLASHING_COUNT != 0
{
let mut peers_to_slash = peer_pool
.iter()
.filter_map(|(addr, peer)| {
(matches!(peer, Peer::Candidate(_)) && !trusted_peers.contains(addr)).then_some(*addr)
})
.collect::<Vec<_>>();
let known_peers = self.tcp().known_peers().snapshot();
let default_value = (0, Instant::now());
peers_to_slash.sort_unstable_by_key(|addr| {
let (num_failures, last_seen) = known_peers
.get(&addr.ip())
.map(|stats| (stats.failures(), stats.timestamp()))
.unwrap_or(default_value);
(cmp::Reverse(num_failures), last_seen)
});
peers_to_slash.truncate(Self::PEER_SLASHING_COUNT);
peer_pool.retain(|addr, _| !peers_to_slash.contains(addr));
self.tcp().known_peers().batch_remove(peers_to_slash.iter().map(|addr| addr.ip()));
}
listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(peer_pool.len()));
if listener_addrs.is_empty() {
return;
}
for (addr, height) in listener_addrs {
match peer_pool.entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::new_candidate(addr, false));
}
Entry::Occupied(mut entry) => {
if let Peer::Candidate(peer) = entry.get_mut() {
peer.last_height_seen = height;
}
}
}
}
}
fn remove_peer(&self, listener_addr: SocketAddr) {
self.peer_pool().write().remove(&listener_addr);
}
fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option<SocketAddr> {
if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
Some(peer.connected_addr)
} else {
None
}
}
fn resolve_to_aleo_addr(&self, listener_addr: SocketAddr) -> Option<Address<N>> {
if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
Some(peer.aleo_addr)
} else {
None
}
}
fn is_connecting(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting())
}
fn is_connected(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected())
}
fn is_connected_address(&self, aleo_address: Address<N>) -> bool {
self.resolver().read().get_peer_ip_for_address(aleo_address).is_some()
}
fn is_connecting_or_connected(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting() || peer.is_connected())
}
fn is_trusted(&self, listener_addr: SocketAddr) -> bool {
self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted())
}
fn number_of_peers(&self) -> usize {
self.peer_pool().read().len()
}
fn number_of_connected_peers(&self) -> usize {
self.peer_pool().read().values().filter(|peer| peer.is_connected()).count()
}
#[cfg(feature = "metrics")]
fn number_of_connected_validators(&self) -> Option<usize> {
Some(
self.peer_pool()
.try_read()?
.values()
.filter(|peer| peer.as_connected().is_some_and(|peer| peer.is_validator()))
.count(),
)
}
#[cfg(feature = "metrics")]
fn number_of_connecting_peers(&self) -> Option<usize> {
Some(self.peer_pool().try_read()?.values().filter(|peer| peer.is_connecting()).count())
}
fn number_of_candidate_peers(&self) -> usize {
self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count()
}
fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option<ConnectedPeer<N>> {
if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) {
Some(peer.clone())
} else {
None
}
}
fn update_connected_peer<F: FnMut(&mut ConnectedPeer<N>)>(
&self,
listener_addr: &SocketAddr,
mut update_fn: F,
) -> bool {
if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) {
update_fn(peer);
true
} else {
false
}
}
fn get_peers(&self) -> Vec<Peer<N>> {
self.peer_pool().read().values().cloned().collect()
}
fn get_connected_peers(&self) -> Vec<ConnectedPeer<N>> {
self.filter_connected_peers(|_| true)
}
fn get_best_connected_peers(&self, max_entries: Option<usize>) -> Vec<ConnectedPeer<N>> {
let mut peers = self.get_connected_peers();
let known_peers = self.tcp().known_peers().snapshot();
peers.sort_unstable_by_key(|peer| {
if let Some(peer_stats) = known_peers.get(&peer.listener_addr.ip()) {
(cmp::Reverse(peer.last_height_seen), peer_stats.failures())
} else {
(cmp::Reverse(peer.last_height_seen), 0)
}
});
if let Some(max) = max_entries {
peers.truncate(max);
}
peers
}
fn filter_connected_peers<P: FnMut(&ConnectedPeer<N>) -> bool>(&self, mut predicate: P) -> Vec<ConnectedPeer<N>> {
self.peer_pool()
.read()
.values()
.filter_map(|p| {
if let Peer::Connected(peer) = p
&& predicate(peer)
{
Some(peer)
} else {
None
}
})
.cloned()
.collect()
}
fn connected_peers(&self) -> Vec<SocketAddr> {
self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect()
}
fn trusted_peers(&self) -> Vec<SocketAddr> {
self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect()
}
fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
self.peer_pool()
.read()
.values()
.filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
.collect()
}
fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
self.peer_pool()
.read()
.values()
.filter_map(|peer| {
if let Peer::Candidate(peer) = peer
&& peer.trusted
{
Some(peer.clone())
} else {
None
}
})
.collect()
}
fn load_cached_peers(path: &Path) -> Result<Vec<SocketAddr>> {
let peers = match fs::read_to_string(path) {
Ok(cached_peers_str) => {
let mut cached_peers = Vec::new();
for peer_addr_str in cached_peers_str.lines() {
match SocketAddr::from_str(peer_addr_str) {
Ok(addr) => cached_peers.push(addr),
Err(error) => warn!("Couldn't parse the cached peer address '{peer_addr_str}': {error}"),
}
}
cached_peers
}
Err(error) if error.kind() == io::ErrorKind::NotFound => {
Vec::new()
}
Err(error) => {
warn!("{} Couldn't load cached peers at {}: {error}", Self::OWNER, path.display());
Vec::new()
}
};
Ok(peers)
}
fn save_best_peers(&self, path: &Path, max_entries: Option<usize>, store_ports: bool) -> Result<()> {
let mut peers = self.get_peers();
let known_peers = self.tcp().known_peers().snapshot();
peers.sort_unstable_by_key(|peer| {
if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) {
(cmp::Reverse(peer.last_height_seen()), peer_stats.failures())
} else {
(cmp::Reverse(peer.last_height_seen()), 0)
}
});
if let Some(max) = max_entries {
peers.truncate(max);
}
let addrs: HashSet<_> = peers
.iter()
.map(
|peer| {
if store_ports { peer.listener_addr().to_string() } else { peer.listener_addr().ip().to_string() }
},
)
.collect();
let mut file = fs::File::create(path)?;
for addr in addrs {
writeln!(file, "{addr}")?;
}
Ok(())
}
fn add_connecting_peer(&self, listener_addr: SocketAddr) -> Result<(), ConnectError> {
match self.peer_pool().write().entry(listener_addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::new_connecting(listener_addr, false));
Ok(())
}
Entry::Occupied(mut entry) => match entry.get() {
peer @ Peer::Candidate(_) => {
entry.insert(Peer::new_connecting(listener_addr, peer.is_trusted()));
Ok(())
}
Peer::Connecting(_) => Err(ConnectError::AlreadyConnecting { address: listener_addr }),
Peer::Connected(_) => Err(ConnectError::AlreadyConnected { address: listener_addr }),
},
}
}
fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) {
if self.is_dev() {
return;
}
let ip = listener_addr.ip();
debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default());
self.tcp().banned_peers().update_ip_ban(ip);
self.disconnect(listener_addr);
self.remove_peer(listener_addr);
}
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.tcp().banned_peers().is_ip_banned(&ip)
}
fn update_ip_ban(&self, ip: IpAddr) {
self.tcp().banned_peers().update_ip_ban(ip);
}
}