use std::borrow::Borrow;
use dashmap::mapref::entry::Entry;
use dashmap::mapref::one::RefMut;
use dashmap::DashMap;
use libp2p::{identify, swarm::ConnectionId, Multiaddr, PeerId};
use rand::seq::SliceRandom;
use serde::Serialize;
use smallvec::SmallVec;
use tokio::sync::watch;
#[derive(Debug)]
pub struct PeerTracker {
peers: DashMap<PeerId, PeerInfo>,
info_tx: watch::Sender<PeerTrackerInfo>,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct PeerTrackerInfo {
pub num_connected_peers: u64,
pub num_connected_trusted_peers: u64,
}
#[derive(Debug)]
struct PeerInfo {
state: PeerState,
addrs: SmallVec<[Multiaddr; 4]>,
connections: SmallVec<[ConnectionId; 1]>,
trusted: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PeerState {
Discovered,
AddressesFound,
Connected,
Identified,
}
impl PeerInfo {
fn is_connected(&self) -> bool {
matches!(self.state, PeerState::Connected | PeerState::Identified)
}
}
impl PeerTracker {
pub fn new() -> Self {
PeerTracker {
peers: DashMap::new(),
info_tx: watch::channel(PeerTrackerInfo::default()).0,
}
}
pub fn info(&self) -> PeerTrackerInfo {
self.info_tx.borrow().to_owned()
}
pub fn info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
self.info_tx.subscribe()
}
pub fn set_maybe_discovered(&self, peer: PeerId) -> bool {
match self.peers.entry(peer) {
Entry::Vacant(entry) => {
entry.insert(PeerInfo {
state: PeerState::Discovered,
addrs: SmallVec::new(),
connections: SmallVec::new(),
trusted: false,
});
true
}
Entry::Occupied(_) => false,
}
}
fn get(&self, peer: PeerId) -> RefMut<PeerId, PeerInfo> {
self.peers.entry(peer).or_insert_with(|| PeerInfo {
state: PeerState::Discovered,
addrs: SmallVec::new(),
connections: SmallVec::new(),
trusted: false,
})
}
pub fn add_addresses<I, A>(&self, peer: PeerId, addrs: I)
where
I: IntoIterator<Item = A>,
A: Borrow<Multiaddr>,
{
let mut state = self.get(peer);
for addr in addrs {
let addr = addr.borrow();
if !state.addrs.contains(addr) {
state.addrs.push(addr.to_owned());
}
}
if state.state == PeerState::Discovered && !state.addrs.is_empty() {
state.state = PeerState::AddressesFound;
}
}
pub fn set_trusted(&self, peer: PeerId, is_trusted: bool) {
let mut peer_info = self.get(peer);
if peer_info.trusted == is_trusted {
return;
}
peer_info.trusted = is_trusted;
if peer_info.is_connected() {
self.info_tx.send_modify(|tracker_info| {
if is_trusted {
tracker_info.num_connected_trusted_peers += 1;
} else {
tracker_info.num_connected_trusted_peers -= 1;
}
});
}
}
pub fn set_connected(
&self,
peer: PeerId,
connection_id: ConnectionId,
address: impl Into<Option<Multiaddr>>,
) {
let mut peer_info = self.get(peer);
if let Some(address) = address.into() {
if !peer_info.addrs.contains(&address) {
peer_info.addrs.push(address);
}
}
peer_info.connections.push(connection_id);
if !peer_info.is_connected() {
peer_info.state = PeerState::Connected;
increment_connected_peers(&self.info_tx, peer_info.trusted);
}
}
pub fn set_maybe_disconnected(&self, peer: PeerId, connection_id: ConnectionId) -> bool {
let mut peer_info = self.get(peer);
peer_info.connections.retain(|id| *id != connection_id);
if peer_info.connections.is_empty() {
if peer_info.addrs.is_empty() {
peer_info.state = PeerState::Discovered;
} else {
peer_info.state = PeerState::AddressesFound;
}
decrement_connected_peers(&self.info_tx, peer_info.trusted);
true
} else {
false
}
}
pub fn set_identified(&self, peer: PeerId, info: &identify::Info) {
let mut peer_info = self.get(peer);
for addr in &info.listen_addrs {
if !peer_info.addrs.contains(addr) {
peer_info.addrs.push(addr.to_owned());
}
}
peer_info.state = PeerState::Identified;
}
pub fn is_connected(&self, peer: PeerId) -> bool {
self.get(peer).is_connected()
}
pub fn addresses(&self, peer: PeerId) -> SmallVec<[Multiaddr; 4]> {
self.get(peer).addrs.clone()
}
pub fn remove(&self, peer: PeerId) {
self.peers.remove(&peer);
}
pub fn connected_peers(&self) -> Vec<PeerId> {
self.peers
.iter()
.filter(|pair| pair.value().is_connected())
.map(|pair| pair.key().to_owned())
.collect()
}
pub fn best_peer(&self) -> Option<PeerId> {
const MAX_PEER_SAMPLE: usize = 128;
let mut peers = self
.peers
.iter()
.filter(|pair| pair.value().is_connected())
.take(MAX_PEER_SAMPLE)
.map(|pair| pair.key().to_owned())
.collect::<SmallVec<[_; MAX_PEER_SAMPLE]>>();
peers.shuffle(&mut rand::thread_rng());
peers.first().copied()
}
pub fn best_n_peers(&self, limit: usize) -> Vec<PeerId> {
self.peers
.iter()
.filter(|pair| pair.value().is_connected())
.take(limit)
.map(|pair| pair.key().to_owned())
.collect()
}
pub fn trusted_n_peers(&self, limit: usize) -> Vec<PeerId> {
self.peers
.iter()
.filter(|pair| pair.value().is_connected() && pair.value().trusted)
.take(limit)
.map(|pair| pair.key().to_owned())
.collect()
}
}
impl Default for PeerTracker {
fn default() -> Self {
PeerTracker::new()
}
}
fn increment_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
info_tx.send_modify(|tracker_info| {
tracker_info.num_connected_peers += 1;
if trusted {
tracker_info.num_connected_trusted_peers += 1;
}
});
}
fn decrement_connected_peers(info_tx: &watch::Sender<PeerTrackerInfo>, trusted: bool) {
info_tx.send_modify(|tracker_info| {
tracker_info.num_connected_peers -= 1;
if trusted {
tracker_info.num_connected_trusted_peers -= 1;
}
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn trust_before_connect() {
let tracker = PeerTracker::new();
let mut watcher = tracker.info_watcher();
let peer = PeerId::random();
assert!(!watcher.has_changed().unwrap());
tracker.set_trusted(peer, true);
assert!(!watcher.has_changed().unwrap());
tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
assert!(watcher.has_changed().unwrap());
let info = watcher.borrow_and_update().to_owned();
assert_eq!(info.num_connected_peers, 1);
assert_eq!(info.num_connected_trusted_peers, 1);
}
#[test]
fn trust_after_connect() {
let tracker = PeerTracker::new();
let mut watcher = tracker.info_watcher();
let peer = PeerId::random();
assert!(!watcher.has_changed().unwrap());
tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
assert!(watcher.has_changed().unwrap());
let info = watcher.borrow_and_update().to_owned();
assert_eq!(info.num_connected_peers, 1);
assert_eq!(info.num_connected_trusted_peers, 0);
tracker.set_trusted(peer, true);
assert!(watcher.has_changed().unwrap());
let info = watcher.borrow_and_update().to_owned();
assert_eq!(info.num_connected_peers, 1);
assert_eq!(info.num_connected_trusted_peers, 1);
}
#[test]
fn untrust_after_connect() {
let tracker = PeerTracker::new();
let mut watcher = tracker.info_watcher();
let peer = PeerId::random();
assert!(!watcher.has_changed().unwrap());
tracker.set_trusted(peer, true);
assert!(!watcher.has_changed().unwrap());
tracker.set_connected(peer, ConnectionId::new_unchecked(1), None);
assert!(watcher.has_changed().unwrap());
let info = watcher.borrow_and_update().to_owned();
assert_eq!(info.num_connected_peers, 1);
assert_eq!(info.num_connected_trusted_peers, 1);
tracker.set_trusted(peer, false);
assert!(watcher.has_changed().unwrap());
let info = watcher.borrow_and_update().to_owned();
assert_eq!(info.num_connected_peers, 1);
assert_eq!(info.num_connected_trusted_peers, 0);
}
}