use crate::PeerId;
use crate::address::MultiAddr;
use crate::security::{IP_EXACT_LIMIT, IPDiversityConfig, canonicalize_ip, ip_subnet_limit};
use anyhow::{Result, anyhow};
use parking_lot::Mutex as PlMutex;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
#[derive(Debug)]
pub struct AtomicInstant(PlMutex<Instant>);
impl AtomicInstant {
pub fn now() -> Self {
Self(PlMutex::new(Instant::now()))
}
pub fn from_instant(i: Instant) -> Self {
Self(PlMutex::new(i))
}
pub fn load(&self) -> Instant {
*self.0.lock()
}
pub fn store_now(&self) {
*self.0.lock() = Instant::now();
}
pub fn store(&self, i: Instant) {
*self.0.lock() = i;
}
pub fn elapsed(&self) -> Duration {
self.load().elapsed()
}
}
impl Clone for AtomicInstant {
fn clone(&self) -> Self {
Self(PlMutex::new(*self.0.lock()))
}
}
impl Default for AtomicInstant {
fn default() -> Self {
Self::now()
}
}
#[cfg(test)]
use crate::adaptive::trust::DEFAULT_NEUTRAL_TRUST;
pub type DhtKey = PeerId;
#[inline]
fn xor_distance_bytes(a: &[u8; 32], b: &[u8; 32]) -> [u8; 32] {
let mut out = [0u8; 32];
for (idx, byte) in out.iter_mut().enumerate() {
*byte = a[idx] ^ b[idx];
}
out
}
const MAX_ADDRESSES_PER_NODE: usize = 8;
const MAX_NATTED_ADDRESSES: usize = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AddressType {
Relay,
Direct,
NATted,
CoordinatorHint,
}
const MAX_COORDINATOR_HINTS: usize = 5;
const LIVE_THRESHOLD: Duration = Duration::from_secs(900);
#[allow(dead_code)]
const DEFAULT_SWAP_THRESHOLD: f64 = 0.35;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub id: PeerId,
pub addresses: Vec<MultiAddr>,
#[serde(default)]
pub address_types: Vec<AddressType>,
#[serde(skip, default = "AtomicInstant::now")]
pub last_seen: AtomicInstant,
}
impl NodeInfo {
#[must_use]
pub fn socket_addr(&self) -> Option<SocketAddr> {
self.addresses.first().and_then(MultiAddr::socket_addr)
}
#[must_use]
pub fn ip(&self) -> Option<IpAddr> {
self.addresses.first().and_then(MultiAddr::ip)
}
fn all_ips(&self) -> HashSet<IpAddr> {
self.addresses
.iter()
.enumerate()
.filter(|(i, _)| self.address_type_at(*i) != AddressType::CoordinatorHint)
.filter_map(|(_, a)| a.ip().map(canonicalize_ip))
.collect()
}
pub fn replace_coordinator_hints(&mut self, hints: Vec<MultiAddr>) -> usize {
let mut i = 0;
while i < self.address_types.len() {
if self.address_types[i] == AddressType::CoordinatorHint {
self.addresses.remove(i);
self.address_types.remove(i);
} else {
i += 1;
}
}
let mut stored = 0;
for hint in hints.into_iter().take(MAX_COORDINATOR_HINTS) {
if !self.addresses.contains(&hint) {
self.addresses.push(hint);
self.address_types.push(AddressType::CoordinatorHint);
stored += 1;
}
}
self.addresses.truncate(MAX_ADDRESSES_PER_NODE);
self.address_types.truncate(MAX_ADDRESSES_PER_NODE);
stored
}
pub fn merge_address(&mut self, addr: MultiAddr) {
self.merge_typed_address(addr, AddressType::Direct);
}
pub fn merge_typed_address(&mut self, addr: MultiAddr, addr_type: AddressType) {
while self.address_types.len() < self.addresses.len() {
self.address_types.push(AddressType::Direct);
}
if let Some(pos) = self.addresses.iter().position(|a| a == &addr) {
self.addresses.remove(pos);
if pos < self.address_types.len() {
self.address_types.remove(pos);
}
}
match addr_type {
AddressType::Relay => {
self.addresses.insert(0, addr);
self.address_types.insert(0, AddressType::Relay);
}
AddressType::Direct => {
let pos = self
.address_types
.iter()
.position(|t| *t != AddressType::Relay)
.unwrap_or(self.addresses.len());
self.addresses.insert(pos, addr);
self.address_types.insert(pos, AddressType::Direct);
}
AddressType::NATted => {
let pos = self
.address_types
.iter()
.position(|t| *t == AddressType::CoordinatorHint)
.unwrap_or(self.addresses.len());
self.addresses.insert(pos, addr);
self.address_types.insert(pos, AddressType::NATted);
let natted_count = self
.address_types
.iter()
.filter(|t| **t == AddressType::NATted)
.count();
if natted_count > MAX_NATTED_ADDRESSES {
let mut to_remove = natted_count - MAX_NATTED_ADDRESSES;
let mut i = 0;
while i < self.address_types.len() && to_remove > 0 {
if self.address_types[i] == AddressType::NATted {
self.addresses.remove(i);
self.address_types.remove(i);
to_remove -= 1;
} else {
i += 1;
}
}
}
}
AddressType::CoordinatorHint => {
self.addresses.push(addr);
self.address_types.push(AddressType::CoordinatorHint);
let hint_count = self
.address_types
.iter()
.filter(|t| **t == AddressType::CoordinatorHint)
.count();
if hint_count > MAX_COORDINATOR_HINTS {
let mut to_remove = hint_count - MAX_COORDINATOR_HINTS;
let mut i = 0;
while i < self.address_types.len() && to_remove > 0 {
if self.address_types[i] == AddressType::CoordinatorHint {
self.addresses.remove(i);
self.address_types.remove(i);
to_remove -= 1;
} else {
i += 1;
}
}
}
}
}
self.addresses.truncate(MAX_ADDRESSES_PER_NODE);
self.address_types.truncate(MAX_ADDRESSES_PER_NODE);
}
pub fn address_type_at(&self, index: usize) -> AddressType {
self.address_types
.get(index)
.copied()
.unwrap_or(AddressType::Direct)
}
}
struct KBucket {
nodes: Vec<NodeInfo>,
max_size: usize,
last_refreshed: Instant,
}
impl KBucket {
fn new(max_size: usize) -> Self {
Self {
nodes: Vec::new(),
max_size,
last_refreshed: Instant::now(),
}
}
fn add_node(&mut self, mut node: NodeInfo) -> Result<()> {
if node.addresses.is_empty() {
return Err(anyhow!("NodeInfo has no addresses"));
}
node.addresses.truncate(MAX_ADDRESSES_PER_NODE);
node.address_types.truncate(MAX_ADDRESSES_PER_NODE);
if let Some(pos) = self.nodes.iter().position(|n| n.id == node.id) {
let mut existing = self.nodes.remove(pos);
existing.last_seen.store(node.last_seen.load());
for (i, addr) in node.addresses.into_iter().enumerate() {
let addr_type = node
.address_types
.get(i)
.copied()
.unwrap_or(AddressType::Direct);
existing.merge_typed_address(addr, addr_type);
}
self.nodes.push(existing);
self.last_refreshed = Instant::now();
return Ok(());
}
if self.nodes.len() < self.max_size {
self.nodes.push(node);
self.last_refreshed = Instant::now();
Ok(())
} else {
Err(anyhow!(
"K-bucket at capacity ({}/{})",
self.nodes.len(),
self.max_size
))
}
}
fn remove_node(&mut self, node_id: &PeerId) {
self.nodes.retain(|n| &n.id != node_id);
}
fn touch_node_typed(
&mut self,
node_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: AddressType,
) -> bool {
if let Some(pos) = self.nodes.iter().position(|n| &n.id == node_id) {
self.nodes[pos].last_seen.store_now();
if let Some(addr) = address {
let addr_is_loopback = addr
.ip()
.is_some_and(|ip| canonicalize_ip(ip).is_loopback());
let node_has_non_loopback = self.nodes[pos]
.addresses
.iter()
.any(|a| a.ip().is_some_and(|ip| !canonicalize_ip(ip).is_loopback()));
if !(addr_is_loopback && node_has_non_loopback) {
self.nodes[pos].merge_typed_address(addr.clone(), addr_type);
}
}
let node = self.nodes.remove(pos);
self.nodes.push(node);
self.last_refreshed = Instant::now();
true
} else {
false
}
}
fn touch_last_seen_if_merge_noop(
&self,
node_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: AddressType,
) -> Option<bool> {
let Some(pos) = self.nodes.iter().position(|n| &n.id == node_id) else {
return Some(false);
};
let node = &self.nodes[pos];
let merge_is_noop = match address {
None => true,
Some(addr) => {
if let Some(existing_pos) = node.addresses.iter().position(|a| a == addr) {
node.address_type_at(existing_pos) == addr_type
} else {
let addr_is_loopback = addr
.ip()
.is_some_and(|ip| canonicalize_ip(ip).is_loopback());
let node_has_non_loopback = node
.addresses
.iter()
.any(|a| a.ip().is_some_and(|ip| !canonicalize_ip(ip).is_loopback()));
addr_is_loopback && node_has_non_loopback
}
}
};
if merge_is_noop {
node.last_seen.store_now();
Some(true)
} else {
None
}
}
fn get_nodes(&self) -> &[NodeInfo] {
&self.nodes
}
fn find_node(&self, node_id: &PeerId) -> Option<&NodeInfo> {
self.nodes.iter().find(|n| &n.id == node_id)
}
}
pub struct KademliaRoutingTable {
buckets: Vec<KBucket>,
node_id: PeerId,
}
impl KademliaRoutingTable {
fn new(node_id: PeerId, k_value: usize) -> Self {
let mut buckets = Vec::new();
for _ in 0..KADEMLIA_BUCKET_COUNT {
buckets.push(KBucket::new(k_value));
}
Self { buckets, node_id }
}
fn add_node(&mut self, node: NodeInfo) -> Result<()> {
let bucket_index = self
.get_bucket_index(&node.id)
.ok_or_else(|| anyhow!("cannot insert self into routing table"))?;
self.buckets[bucket_index].add_node(node)
}
fn remove_node(&mut self, node_id: &PeerId) {
if let Some(bucket_index) = self.get_bucket_index(node_id) {
self.buckets[bucket_index].remove_node(node_id);
}
}
fn touch_node(
&mut self,
node_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: AddressType,
) -> bool {
match self.get_bucket_index(node_id) {
Some(bucket_index) => {
self.buckets[bucket_index].touch_node_typed(node_id, address, addr_type)
}
None => false,
}
}
fn merge_coordinator_hints(&mut self, node_id: &PeerId, hints: Vec<MultiAddr>) -> usize {
let Some(bucket_index) = self.get_bucket_index(node_id) else {
return 0;
};
let bucket = &mut self.buckets[bucket_index];
let Some(node) = bucket.nodes.iter_mut().find(|n| n.id == *node_id) else {
return 0;
};
node.replace_coordinator_hints(hints)
}
fn try_touch_last_seen(
&self,
node_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: AddressType,
) -> Option<bool> {
let bucket_index = self.get_bucket_index(node_id)?;
self.buckets[bucket_index].touch_last_seen_if_merge_noop(node_id, address, addr_type)
}
fn find_closest_nodes(&self, key: &DhtKey, count: usize) -> Vec<NodeInfo> {
let mut candidates: Vec<(NodeInfo, [u8; 32])> = Vec::with_capacity(count * 2);
for bucket in &self.buckets {
for node in bucket.get_nodes() {
let distance = xor_distance_bytes(node.id.to_bytes(), key.as_bytes());
candidates.push((node.clone(), distance));
}
}
candidates.sort_by_key(|c| c.1);
candidates
.into_iter()
.take(count)
.map(|(node, _)| node)
.collect()
}
fn get_bucket_index_for_key(&self, key: &DhtKey) -> Option<usize> {
let distance = xor_distance_bytes(self.node_id.to_bytes(), key.as_bytes());
for i in 0..256 {
let byte_index = i / 8;
let bit_index = 7 - (i % 8);
if (distance[byte_index] >> bit_index) & 1 == 1 {
return Some(i);
}
}
None }
fn find_node_by_id(&self, node_id: &PeerId) -> Option<&NodeInfo> {
let bucket_index = self.get_bucket_index(node_id)?;
self.buckets[bucket_index].find_node(node_id)
}
pub fn node_count(&self) -> usize {
self.buckets.iter().map(|b| b.get_nodes().len()).sum()
}
fn all_nodes(&self) -> Vec<NodeInfo> {
self.buckets
.iter()
.flat_map(|b| b.get_nodes().iter().cloned())
.collect()
}
fn get_bucket_index(&self, node_id: &PeerId) -> Option<usize> {
self.get_bucket_index_for_key(&DhtKey::from_bytes(*node_id.to_bytes()))
}
fn k_closest_ids(&self, k: usize) -> Vec<PeerId> {
self.find_closest_nodes(&self.node_id, k)
.into_iter()
.map(|n| n.id)
.collect()
}
fn stale_bucket_indices(&self, threshold: Duration) -> Vec<usize> {
self.buckets
.iter()
.enumerate()
.filter(|(_, b)| b.last_refreshed.elapsed() > threshold)
.map(|(i, _)| i)
.collect()
}
}
type IpSwapTier = (
usize,
usize,
Option<(PeerId, [u8; 32], Instant)>,
&'static str,
);
fn mask_ipv4(addr: Ipv4Addr, prefix_len: u8) -> Ipv4Addr {
let bits = u32::from(addr);
let mask = if prefix_len >= 32 {
u32::MAX
} else {
u32::MAX << (32 - prefix_len)
};
Ipv4Addr::from(bits & mask)
}
fn mask_ipv6(addr: Ipv6Addr, prefix_len: u8) -> Ipv6Addr {
let bits = u128::from(addr);
let mask = if prefix_len >= 128 {
u128::MAX
} else {
u128::MAX << (128 - prefix_len)
};
Ipv6Addr::from(bits & mask)
}
#[cfg(test)]
const DEFAULT_K: usize = 20;
const KADEMLIA_BUCKET_COUNT: usize = 256;
const TRUST_PROTECTION_THRESHOLD: f64 = 0.7;
#[allow(dead_code)]
pub struct RoutingTableStats {
pub total_peers: usize,
pub bucket_counts: Vec<usize>,
pub stale_peer_count: usize,
}
#[derive(Debug, Clone)]
pub enum RoutingTableEvent {
PeerAdded(PeerId),
PeerRemoved(PeerId),
#[allow(dead_code)]
KClosestPeersChanged { old: Vec<PeerId>, new: Vec<PeerId> },
}
#[derive(Debug)]
pub enum AdmissionResult {
Admitted(Vec<RoutingTableEvent>),
StaleRevalidationNeeded {
candidate: NodeInfo,
candidate_ips: Vec<IpAddr>,
candidate_bucket_idx: usize,
stale_peers: Vec<(PeerId, usize)>,
},
}
pub struct DhtCoreEngine {
node_id: PeerId,
routing_table: Arc<RwLock<KademliaRoutingTable>>,
k_value: usize,
ip_diversity_config: IPDiversityConfig,
allow_loopback: bool,
swap_threshold: f64,
live_threshold: Duration,
shutdown: CancellationToken,
}
impl DhtCoreEngine {
#[cfg(test)]
pub fn new_for_tests(node_id: PeerId) -> Result<Self> {
Self::new(node_id, DEFAULT_K, false, DEFAULT_SWAP_THRESHOLD)
}
#[cfg(test)]
pub(crate) fn routing_table_for_test(&self) -> &Arc<RwLock<KademliaRoutingTable>> {
&self.routing_table
}
pub(crate) fn new(
node_id: PeerId,
k_value: usize,
allow_loopback: bool,
swap_threshold: f64,
) -> Result<Self> {
if k_value < 4 {
return Err(anyhow!("k_value must be >= 4 (got {k_value})"));
}
if !(0.0..1.0).contains(&swap_threshold) || swap_threshold.is_nan() {
return Err(anyhow!(
"swap_threshold must be in [0.0, 1.0), got {swap_threshold}"
));
}
Ok(Self {
node_id,
routing_table: Arc::new(RwLock::new(KademliaRoutingTable::new(node_id, k_value))),
k_value,
ip_diversity_config: IPDiversityConfig::default(),
allow_loopback,
swap_threshold,
live_threshold: LIVE_THRESHOLD,
shutdown: CancellationToken::new(),
})
}
pub fn set_ip_diversity_config(&mut self, config: IPDiversityConfig) {
self.ip_diversity_config = config;
}
#[cfg(test)]
pub fn set_allow_loopback(&mut self, allow: bool) {
self.allow_loopback = allow;
}
#[cfg(test)]
pub fn set_live_threshold(&mut self, threshold: Duration) {
self.live_threshold = threshold;
}
#[allow(dead_code)]
pub fn node_id(&self) -> &PeerId {
&self.node_id
}
pub(crate) async fn stale_k_closest(&self) -> Vec<PeerId> {
let routing = self.routing_table.read().await;
routing
.find_closest_nodes(&self.node_id, self.k_value)
.into_iter()
.filter(|n| n.last_seen.elapsed() > self.live_threshold)
.map(|n| n.id)
.collect()
}
pub(crate) async fn stale_bucket_indices(&self, threshold: Duration) -> Vec<usize> {
self.routing_table
.read()
.await
.stale_bucket_indices(threshold)
}
pub(crate) fn generate_random_key_for_bucket(&self, bucket_idx: usize) -> Option<DhtKey> {
if bucket_idx >= KADEMLIA_BUCKET_COUNT {
return None;
}
let self_bytes = self.node_id.to_bytes();
let byte_idx = bucket_idx / 8;
let bit_idx = 7 - (bucket_idx % 8);
let random_bytes = PeerId::random();
let mut distance = [0u8; 32];
distance[byte_idx] = 1 << bit_idx;
let below_mask = (1u8 << bit_idx).wrapping_sub(1);
distance[byte_idx] |= random_bytes.to_bytes()[byte_idx] & below_mask;
distance[(byte_idx + 1)..32].copy_from_slice(&random_bytes.to_bytes()[(byte_idx + 1)..32]);
let mut result = [0u8; 32];
for (i, byte) in result.iter_mut().enumerate() {
*byte = self_bytes[i] ^ distance[i];
}
Some(DhtKey::from_bytes(result))
}
pub async fn routing_table_size(&self) -> usize {
self.routing_table.read().await.node_count()
}
pub async fn remove_node_by_id(&mut self, peer_id: &PeerId) -> Vec<RoutingTableEvent> {
let mut routing = self.routing_table.write().await;
if routing.find_node_by_id(peer_id).is_none() {
return Vec::new();
}
let k_before = routing.k_closest_ids(self.k_value);
routing.remove_node(peer_id);
let k_after = routing.k_closest_ids(self.k_value);
let mut events = vec![RoutingTableEvent::PeerRemoved(*peer_id)];
if k_before != k_after {
events.push(RoutingTableEvent::KClosestPeersChanged {
old: k_before,
new: k_after,
});
}
events
}
pub fn signal_shutdown(&self) {
self.shutdown.cancel();
}
pub async fn find_nodes(&self, key: &DhtKey, count: usize) -> Result<Vec<NodeInfo>> {
let routing = self.routing_table.read().await;
Ok(routing.find_closest_nodes(key, count))
}
#[allow(dead_code)]
pub async fn find_nodes_with_self(&self, key: &DhtKey, count: usize) -> Result<Vec<NodeInfo>> {
let routing = self.routing_table.read().await;
let mut candidates = routing.find_closest_nodes(key, count);
let self_info = NodeInfo {
id: self.node_id,
addresses: vec![],
address_types: vec![],
last_seen: AtomicInstant::now(),
};
let self_dist = xor_distance_bytes(self.node_id.to_bytes(), key.as_bytes());
let pos = candidates
.iter()
.position(|n| xor_distance_bytes(n.id.to_bytes(), key.as_bytes()) > self_dist)
.unwrap_or(candidates.len());
candidates.insert(pos, self_info);
candidates.truncate(count);
Ok(candidates)
}
pub async fn get_node_addresses(&self, peer_id: &PeerId) -> Vec<MultiAddr> {
let routing = self.routing_table.read().await;
routing
.find_node_by_id(peer_id)
.map(|n| n.addresses.clone())
.unwrap_or_default()
}
pub async fn has_node(&self, peer_id: &PeerId) -> bool {
let routing = self.routing_table.read().await;
routing.find_node_by_id(peer_id).is_some()
}
pub async fn all_nodes(&self) -> Vec<NodeInfo> {
self.routing_table.read().await.all_nodes()
}
#[allow(dead_code)]
pub async fn routing_table_stats(&self) -> RoutingTableStats {
let routing = self.routing_table.read().await;
let bucket_counts: Vec<usize> = routing
.buckets
.iter()
.map(|b| b.get_nodes().len())
.collect();
let total_peers: usize = bucket_counts.iter().sum();
let stale_peer_count = routing
.buckets
.iter()
.flat_map(|b| b.get_nodes())
.filter(|n| n.last_seen.elapsed() > self.live_threshold)
.count();
RoutingTableStats {
total_peers,
bucket_counts,
stale_peer_count,
}
}
pub async fn touch_node(&self, node_id: &PeerId, address: Option<&MultiAddr>) -> bool {
let mut routing = self.routing_table.write().await;
routing.touch_node(node_id, address, AddressType::Direct)
}
pub async fn touch_node_typed(
&self,
node_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: AddressType,
) -> bool {
{
let routing = self.routing_table.read().await;
match routing.try_touch_last_seen(node_id, address, addr_type) {
Some(true) => return true,
Some(false) => return false,
None => {}
}
}
let mut routing = self.routing_table.write().await;
routing.touch_node(node_id, address, addr_type)
}
pub async fn merge_coordinator_hints(&self, node_id: &PeerId, hints: Vec<MultiAddr>) -> usize {
let mut routing = self.routing_table.write().await;
routing.merge_coordinator_hints(node_id, hints)
}
pub async fn add_node(
&mut self,
node: NodeInfo,
trust_score: &impl Fn(&PeerId) -> f64,
) -> Result<AdmissionResult> {
if node.id == self.node_id {
return Err(anyhow!("cannot add self to routing table"));
}
let peer_id = node.id;
let candidate_ips: Vec<IpAddr> = node
.addresses
.iter()
.filter_map(|a| a.ip().map(canonicalize_ip))
.collect::<HashSet<_>>()
.into_iter()
.collect();
if candidate_ips.is_empty() {
let mut routing = self.routing_table.write().await;
if routing.find_node_by_id(&peer_id).is_some() {
for addr in &node.addresses {
routing.touch_node(&peer_id, Some(addr), AddressType::Direct);
}
return Ok(AdmissionResult::Admitted(vec![]));
}
let k_before = routing.k_closest_ids(self.k_value);
routing.add_node(node)?;
let k_after = routing.k_closest_ids(self.k_value);
let mut events = vec![RoutingTableEvent::PeerAdded(peer_id)];
if k_before != k_after {
events.push(RoutingTableEvent::KClosestPeersChanged {
old: k_before,
new: k_after,
});
}
return Ok(AdmissionResult::Admitted(events));
}
let mut routing = self.routing_table.write().await;
self.add_with_diversity(&mut routing, node, &candidate_ips, trust_score, true)
}
#[cfg(test)]
pub async fn add_node_no_trust(&mut self, node: NodeInfo) -> Result<Vec<RoutingTableEvent>> {
match self.add_node(node, &|_| DEFAULT_NEUTRAL_TRUST).await? {
AdmissionResult::Admitted(events) => Ok(events),
AdmissionResult::StaleRevalidationNeeded { .. } => Err(anyhow!(
"stale revalidation needed (not available in unit tests)"
)),
}
}
fn find_ip_swap_in_scope(
&self,
nodes: &[NodeInfo],
candidate_id: &PeerId,
candidate_ip: IpAddr,
candidate_distance: &[u8; 32],
scope_name: &str,
trust_score: &impl Fn(&PeerId) -> f64,
) -> Result<Option<PeerId>> {
if candidate_ip.is_loopback() {
return Ok(None);
}
let cfg = &self.ip_diversity_config;
match candidate_ip {
IpAddr::V4(v4) => {
let limit_ip = cfg.max_per_ip.unwrap_or(IP_EXACT_LIMIT);
let limit_subnet = cfg.max_per_subnet.unwrap_or(ip_subnet_limit(self.k_value));
let cand_24 = mask_ipv4(v4, 24);
let mut count_ip: usize = 0;
let mut count_subnet: usize = 0;
let mut farthest_ip: Option<(PeerId, [u8; 32], Instant)> = None;
let mut farthest_subnet: Option<(PeerId, [u8; 32], Instant)> = None;
for n in nodes {
if n.id == *candidate_id {
continue;
}
let existing_ips = n.all_ips();
if existing_ips.is_empty() {
continue;
}
let dist = xor_distance_bytes(self.node_id.to_bytes(), n.id.to_bytes());
let mut matched_ip = false;
let mut matched_subnet = false;
for existing_ip in &existing_ips {
if existing_ip.is_loopback() {
continue;
}
let IpAddr::V4(existing_v4) = existing_ip else {
continue;
};
if !matched_ip && *existing_v4 == v4 {
matched_ip = true;
}
if !matched_subnet && mask_ipv4(*existing_v4, 24) == cand_24 {
matched_subnet = true;
}
}
if matched_ip {
count_ip += 1;
if farthest_ip.as_ref().is_none_or(|(_, d, _)| dist > *d) {
farthest_ip = Some((n.id, dist, n.last_seen.load()));
}
}
if matched_subnet {
count_subnet += 1;
if farthest_subnet.as_ref().is_none_or(|(_, d, _)| dist > *d) {
farthest_subnet = Some((n.id, dist, n.last_seen.load()));
}
}
}
let tiers: [IpSwapTier; 2] = [
(count_ip, limit_ip, farthest_ip, "exact-IP"),
(count_subnet, limit_subnet, farthest_subnet, "/24"),
];
for (count, limit, farthest, tier_name) in &tiers {
if *count >= *limit {
if let Some((far_id, far_dist, far_last_seen)) = farthest
&& candidate_distance < far_dist
&& (trust_score(far_id) < TRUST_PROTECTION_THRESHOLD
|| far_last_seen.elapsed() > self.live_threshold)
{
return Ok(Some(*far_id));
}
return Err(anyhow!(
"IP diversity: {tier_name} limit ({limit}) exceeded in {scope_name}"
));
}
}
}
IpAddr::V6(v6) => {
let limit_ip = cfg.max_per_ip.unwrap_or(IP_EXACT_LIMIT);
let limit_subnet = cfg.max_per_subnet.unwrap_or(ip_subnet_limit(self.k_value));
let cand_48 = mask_ipv6(v6, 48);
let mut count_ip: usize = 0;
let mut count_subnet: usize = 0;
let mut farthest_ip: Option<(PeerId, [u8; 32], Instant)> = None;
let mut farthest_subnet: Option<(PeerId, [u8; 32], Instant)> = None;
for n in nodes {
if n.id == *candidate_id {
continue;
}
let existing_ips = n.all_ips();
if existing_ips.is_empty() {
continue;
}
let dist = xor_distance_bytes(self.node_id.to_bytes(), n.id.to_bytes());
let mut matched_ip = false;
let mut matched_subnet = false;
for existing_ip in &existing_ips {
if existing_ip.is_loopback() {
continue;
}
let IpAddr::V6(existing_v6) = existing_ip else {
continue;
};
if !matched_ip && *existing_v6 == v6 {
matched_ip = true;
}
if !matched_subnet && mask_ipv6(*existing_v6, 48) == cand_48 {
matched_subnet = true;
}
}
if matched_ip {
count_ip += 1;
if farthest_ip.as_ref().is_none_or(|(_, d, _)| dist > *d) {
farthest_ip = Some((n.id, dist, n.last_seen.load()));
}
}
if matched_subnet {
count_subnet += 1;
if farthest_subnet.as_ref().is_none_or(|(_, d, _)| dist > *d) {
farthest_subnet = Some((n.id, dist, n.last_seen.load()));
}
}
}
let tiers: [IpSwapTier; 2] = [
(count_ip, limit_ip, farthest_ip, "exact-IP"),
(count_subnet, limit_subnet, farthest_subnet, "/48"),
];
for (count, limit, farthest, tier_name) in &tiers {
if *count >= *limit {
if let Some((far_id, far_dist, far_last_seen)) = farthest
&& candidate_distance < far_dist
&& (trust_score(far_id) < TRUST_PROTECTION_THRESHOLD
|| far_last_seen.elapsed() > self.live_threshold)
{
return Ok(Some(*far_id));
}
return Err(anyhow!(
"IP diversity: {tier_name} limit ({limit}) exceeded in {scope_name}"
));
}
}
}
}
Ok(None)
}
fn collect_stale_peers_in_bucket(
routing: &KademliaRoutingTable,
bucket_idx: usize,
threshold: Duration,
) -> Vec<(PeerId, usize)> {
routing.buckets[bucket_idx]
.nodes
.iter()
.filter(|n| n.last_seen.elapsed() > threshold)
.map(|n| (n.id, bucket_idx))
.collect()
}
fn add_with_diversity(
&self,
routing: &mut KademliaRoutingTable,
node: NodeInfo,
candidate_ips: &[IpAddr],
trust_score: &impl Fn(&PeerId) -> f64,
allow_stale_revalidation: bool,
) -> Result<AdmissionResult> {
let peer_id = node.id;
if candidate_ips
.iter()
.any(|ip| ip.is_unspecified() || ip.is_multicast())
{
return Err(anyhow!(
"IP diversity: multicast or unspecified addresses rejected"
));
}
if !self.allow_loopback && candidate_ips.iter().any(|ip| ip.is_loopback()) {
return Err(anyhow!(
"IP diversity: loopback addresses rejected (allow_loopback=false)"
));
}
let all_loopback = candidate_ips.iter().all(|ip| ip.is_loopback());
if all_loopback {
if !self.allow_loopback {
return Err(anyhow!(
"IP diversity: loopback addresses rejected (allow_loopback=false)"
));
}
if routing.find_node_by_id(&peer_id).is_some() {
for addr in &node.addresses {
routing.touch_node(&peer_id, Some(addr), AddressType::Direct);
}
return Ok(AdmissionResult::Admitted(vec![]));
}
let k_before = routing.k_closest_ids(self.k_value);
routing.add_node(node)?;
let k_after = routing.k_closest_ids(self.k_value);
let mut events = vec![RoutingTableEvent::PeerAdded(peer_id)];
if k_before != k_after {
events.push(RoutingTableEvent::KClosestPeersChanged {
old: k_before,
new: k_after,
});
}
return Ok(AdmissionResult::Admitted(events));
}
let bucket_idx = routing
.get_bucket_index(&node.id)
.ok_or_else(|| anyhow!("cannot insert self into routing table"))?;
let candidate_distance = xor_distance_bytes(self.node_id.to_bytes(), node.id.to_bytes());
if let Some(pos) = routing.buckets[bucket_idx]
.nodes
.iter()
.position(|n| n.id == node.id)
{
let existing = &mut routing.buckets[bucket_idx].nodes[pos];
existing.last_seen.store_now();
for addr in &node.addresses {
let addr_is_loopback = addr
.ip()
.is_some_and(|ip| canonicalize_ip(ip).is_loopback());
let existing_has_non_loopback = existing
.addresses
.iter()
.any(|a| a.ip().is_some_and(|ip| !canonicalize_ip(ip).is_loopback()));
if addr_is_loopback && existing_has_non_loopback {
continue;
}
existing.merge_address(addr.clone());
}
let updated = routing.buckets[bucket_idx].nodes.remove(pos);
routing.buckets[bucket_idx].nodes.push(updated);
routing.buckets[bucket_idx].last_refreshed = Instant::now();
return Ok(AdmissionResult::Admitted(Vec::new()));
}
let mut all_bucket_swaps: Vec<PeerId> = Vec::new();
for &candidate_ip in candidate_ips {
if candidate_ip.is_loopback() {
continue;
}
let bucket_view: Vec<NodeInfo> = routing.buckets[bucket_idx]
.nodes
.iter()
.filter(|n| !all_bucket_swaps.contains(&n.id))
.cloned()
.collect();
let swap = self.find_ip_swap_in_scope(
&bucket_view,
&node.id,
candidate_ip,
&candidate_distance,
"bucket",
trust_score,
)?;
if let Some(id) = swap
&& !all_bucket_swaps.contains(&id)
{
all_bucket_swaps.push(id);
}
}
let close_group = routing.find_closest_nodes(&self.node_id, self.k_value);
let effective_close_len = close_group
.iter()
.filter(|n| !all_bucket_swaps.contains(&n.id))
.count();
let candidate_in_close = effective_close_len < self.k_value
|| close_group
.iter()
.rfind(|n| !all_bucket_swaps.contains(&n.id))
.map(|n| {
candidate_distance
< xor_distance_bytes(self.node_id.to_bytes(), n.id.to_bytes())
})
.unwrap_or(true);
let mut all_close_swaps: Vec<PeerId> = Vec::new();
if candidate_in_close {
let mut hyp_close: Vec<NodeInfo> = close_group
.iter()
.filter(|n| !all_bucket_swaps.contains(&n.id) && n.id != node.id)
.cloned()
.collect();
hyp_close.push(node.clone());
hyp_close.sort_by(|a, b| {
let da = xor_distance_bytes(self.node_id.to_bytes(), a.id.to_bytes());
let db = xor_distance_bytes(self.node_id.to_bytes(), b.id.to_bytes());
da.cmp(&db)
});
hyp_close.truncate(self.k_value);
for &candidate_ip in candidate_ips {
if candidate_ip.is_loopback() {
continue;
}
let close_view: Vec<NodeInfo> = hyp_close
.iter()
.filter(|n| !all_close_swaps.contains(&n.id))
.cloned()
.collect();
let swap = self.find_ip_swap_in_scope(
&close_view,
&node.id,
candidate_ip,
&candidate_distance,
"close-group",
trust_score,
)?;
if let Some(id) = swap {
if !all_bucket_swaps.contains(&id) && !all_close_swaps.contains(&id) {
all_close_swaps.push(id);
}
}
}
}
{
let bucket = &routing.buckets[bucket_idx];
let already_exists = bucket.nodes.iter().any(|n| n.id == node.id);
let has_room = bucket.nodes.len() < bucket.max_size;
let swap_frees_slot = !all_bucket_swaps.is_empty()
|| all_close_swaps
.iter()
.any(|id| routing.get_bucket_index(id) == Some(bucket_idx));
if !already_exists && !has_room && !swap_frees_slot {
if self.swap_threshold > 0.0 && trust_score(&peer_id) >= self.swap_threshold {
let lowest = bucket
.nodes
.iter()
.map(|n| (n.id, trust_score(&n.id)))
.filter(|(_, score)| *score < self.swap_threshold)
.min_by(|(_, a), (_, b)| {
a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
});
if let Some((swap_id, _)) = lowest {
all_bucket_swaps.push(swap_id);
}
}
let swap_frees_slot_now = !all_bucket_swaps.is_empty()
|| all_close_swaps
.iter()
.any(|id| routing.get_bucket_index(id) == Some(bucket_idx));
if !swap_frees_slot_now {
if allow_stale_revalidation {
let mut stale_peers = Self::collect_stale_peers_in_bucket(
routing,
bucket_idx,
self.live_threshold,
);
for close_swap_id in &all_close_swaps {
if stale_peers.iter().any(|(id, _)| id == close_swap_id) {
continue;
}
if let Some(swap_bucket_idx) = routing.get_bucket_index(close_swap_id)
&& let Some(swap_node) = routing.find_node_by_id(close_swap_id)
&& swap_node.last_seen.elapsed() > self.live_threshold
{
stale_peers.push((*close_swap_id, swap_bucket_idx));
}
}
if !stale_peers.is_empty() {
return Ok(AdmissionResult::StaleRevalidationNeeded {
candidate: node,
candidate_ips: candidate_ips.to_vec(),
candidate_bucket_idx: bucket_idx,
stale_peers,
});
}
}
return Err(anyhow!(
"K-bucket at capacity ({}/{}) with no stale peers",
bucket.nodes.len(),
bucket.max_size,
));
}
}
}
let k_before = routing.k_closest_ids(self.k_value);
let mut executed: Vec<PeerId> = Vec::with_capacity(2);
for swap_id in all_bucket_swaps
.iter()
.chain(all_close_swaps.iter())
.copied()
{
if !executed.contains(&swap_id) {
routing.remove_node(&swap_id);
executed.push(swap_id);
}
}
routing.add_node(node)?;
let mut events: Vec<RoutingTableEvent> = Vec::with_capacity(executed.len() + 2);
for removed_id in &executed {
events.push(RoutingTableEvent::PeerRemoved(*removed_id));
}
events.push(RoutingTableEvent::PeerAdded(peer_id));
let k_after = routing.k_closest_ids(self.k_value);
if k_before != k_after {
events.push(RoutingTableEvent::KClosestPeersChanged {
old: k_before,
new: k_after,
});
}
Ok(AdmissionResult::Admitted(events))
}
pub(crate) async fn re_evaluate_admission(
&mut self,
candidate: NodeInfo,
candidate_ips: &[IpAddr],
trust_score: &impl Fn(&PeerId) -> f64,
) -> Result<Vec<RoutingTableEvent>> {
let mut routing = self.routing_table.write().await;
match self.add_with_diversity(&mut routing, candidate, candidate_ips, trust_score, false)? {
AdmissionResult::Admitted(events) => Ok(events),
AdmissionResult::StaleRevalidationNeeded { .. } => {
Err(anyhow!("K-bucket still at capacity after revalidation"))
}
}
}
}
impl std::fmt::Debug for DhtCoreEngine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DhtCoreEngine")
.field("node_id", &self.node_id)
.field("routing_table", &"Arc<RwLock<KademliaRoutingTable>>")
.field("k_value", &self.k_value)
.field("ip_diversity_config", &self.ip_diversity_config)
.field("allow_loopback", &self.allow_loopback)
.field("swap_threshold", &self.swap_threshold)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::address::TransportAddr;
use std::collections::HashSet;
#[tokio::test]
async fn test_xor_distance() {
let key1 = DhtKey::from_bytes([0u8; 32]);
let key2 = DhtKey::from_bytes([255u8; 32]);
let distance = key1.distance(&key2);
assert_eq!(distance, [255u8; 32]);
}
fn make_node(byte: u8, address: &str) -> NodeInfo {
NodeInfo {
id: PeerId::from_bytes([byte; 32]),
addresses: vec![address.parse::<MultiAddr>().unwrap()],
address_types: vec![AddressType::Direct],
last_seen: AtomicInstant::now(),
}
}
#[test]
fn test_touch_node_merges_address() {
let k = 8;
let mut bucket = KBucket::new(k);
let node = make_node(1, "/ip4/1.2.3.4/udp/9000/quic");
bucket.add_node(node).unwrap();
let new_addr: MultiAddr = "/ip4/5.6.7.8/udp/9000/quic".parse().unwrap();
let old_addr: MultiAddr = "/ip4/1.2.3.4/udp/9000/quic".parse().unwrap();
let found = bucket.touch_node_typed(
&PeerId::from_bytes([1u8; 32]),
Some(&new_addr),
AddressType::Direct,
);
assert!(found);
let addrs = &bucket.get_nodes().last().unwrap().addresses;
assert_eq!(addrs[0], new_addr);
assert_eq!(addrs[1], old_addr);
}
#[test]
fn test_touch_node_none_preserves_addresses() {
let k = 8;
let mut bucket = KBucket::new(k);
let node = make_node(1, "/ip4/1.2.3.4/udp/9000/quic");
bucket.add_node(node).unwrap();
let found =
bucket.touch_node_typed(&PeerId::from_bytes([1u8; 32]), None, AddressType::Direct);
assert!(found);
let expected: MultiAddr = "/ip4/1.2.3.4/udp/9000/quic".parse().unwrap();
assert_eq!(bucket.get_nodes().last().unwrap().addresses, vec![expected]);
}
#[test]
fn test_touch_node_moves_to_tail() {
let k = 8;
let mut bucket = KBucket::new(k);
bucket
.add_node(make_node(1, "/ip4/1.1.1.1/udp/9000/quic"))
.unwrap();
bucket
.add_node(make_node(2, "/ip4/2.2.2.2/udp/9000/quic"))
.unwrap();
bucket
.add_node(make_node(3, "/ip4/3.3.3.3/udp/9000/quic"))
.unwrap();
bucket.touch_node_typed(&PeerId::from_bytes([1u8; 32]), None, AddressType::Direct);
let ids: Vec<u8> = bucket
.get_nodes()
.iter()
.map(|n| n.id.to_bytes()[0])
.collect();
assert_eq!(ids, vec![2, 3, 1]);
}
#[test]
fn test_touch_node_missing_returns_false() {
let k = 8;
let mut bucket = KBucket::new(k);
bucket
.add_node(make_node(1, "/ip4/1.1.1.1/udp/9000/quic"))
.unwrap();
let new_addr: MultiAddr = "/ip4/9.9.9.9/udp/9000/quic".parse().unwrap();
let found = bucket.touch_node_typed(
&PeerId::from_bytes([99u8; 32]),
Some(&new_addr),
AddressType::Direct,
);
assert!(!found);
}
#[test]
fn test_find_closest_nodes_no_duplicates_at_bucket_zero() {
let local_id = PeerId::from_bytes([0u8; 32]);
let mut table = KademliaRoutingTable::new(local_id, 8);
let mut id_bytes = [0u8; 32];
id_bytes[0] = 0x80; table
.add_node(NodeInfo {
id: PeerId::from_bytes(id_bytes),
addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
})
.unwrap();
id_bytes = [0u8; 32];
id_bytes[0] = 0x40; table
.add_node(NodeInfo {
id: PeerId::from_bytes(id_bytes),
addresses: vec!["/ip4/10.0.0.2/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
})
.unwrap();
let mut key_bytes = [0u8; 32];
key_bytes[0] = 0x80;
let key = DhtKey::from_bytes(key_bytes);
let results = table.find_closest_nodes(&key, 8);
let mut seen = HashSet::new();
for node in &results {
assert!(seen.insert(node.id), "Duplicate node {:?}", node.id);
}
assert_eq!(results.len(), 2);
}
#[test]
fn test_find_closest_nodes_no_duplicates_at_bucket_255() {
let local_id = PeerId::from_bytes([0u8; 32]);
let mut table = KademliaRoutingTable::new(local_id, 8);
let mut id_bytes = [0u8; 32];
id_bytes[31] = 0x01; table
.add_node(NodeInfo {
id: PeerId::from_bytes(id_bytes),
addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
})
.unwrap();
id_bytes = [0u8; 32];
id_bytes[31] = 0x02; table
.add_node(NodeInfo {
id: PeerId::from_bytes(id_bytes),
addresses: vec!["/ip4/10.0.0.2/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
})
.unwrap();
let mut key_bytes = [0u8; 32];
key_bytes[31] = 0x01;
let key = DhtKey::from_bytes(key_bytes);
let results = table.find_closest_nodes(&key, 8);
let mut seen = HashSet::new();
for node in &results {
assert!(seen.insert(node.id), "Duplicate node {:?}", node.id);
}
assert_eq!(results.len(), 2);
}
#[test]
fn test_find_closest_nodes_returns_sorted_by_distance() {
let local_id = PeerId::from_bytes([0u8; 32]);
let mut table = KademliaRoutingTable::new(local_id, 8);
for i in 0..5u8 {
let mut id_bytes = [0u8; 32];
id_bytes[0] = 0x80 >> i; table
.add_node(NodeInfo {
id: PeerId::from_bytes(id_bytes),
addresses: vec![
format!("/ip4/10.0.0.{}/udp/9000/quic", i + 1)
.parse()
.unwrap(),
],
last_seen: AtomicInstant::now(),
address_types: vec![],
})
.unwrap();
}
let key = DhtKey::from_bytes([0u8; 32]);
let results = table.find_closest_nodes(&key, 3);
assert_eq!(results.len(), 3);
for window in results.windows(2) {
let d0 = xor_distance_bytes(window[0].id.to_bytes(), key.as_bytes());
let d1 = xor_distance_bytes(window[1].id.to_bytes(), key.as_bytes());
assert!(d0 <= d1, "Results not sorted by distance");
}
}
#[test]
fn test_find_closest_nodes_empty_table() {
let local_id = PeerId::from_bytes([0u8; 32]);
let table = KademliaRoutingTable::new(local_id, 8);
let key = DhtKey::from_bytes([42u8; 32]);
let results = table.find_closest_nodes(&key, 8);
assert!(results.is_empty());
}
#[tokio::test]
async fn test_loopback_rejected_when_allow_loopback_false() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
assert!(!dht.allow_loopback);
let loopback_node = make_node(1, "/ip4/127.0.0.1/udp/9000/quic");
let result = dht.add_node_no_trust(loopback_node).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("loopback"),
"expected loopback rejection, got: {err_msg}"
);
}
#[tokio::test]
async fn test_loopback_v6_rejected_when_allow_loopback_false() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
assert!(!dht.allow_loopback);
let loopback_node = make_node(2, "/ip6/::1/udp/9000/quic");
let result = dht.add_node_no_trust(loopback_node).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("loopback"),
"expected loopback rejection, got: {err_msg}"
);
}
#[tokio::test]
async fn test_loopback_accepted_when_allow_loopback_true() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
dht.set_allow_loopback(true);
let loopback_node = make_node(1, "/ip4/127.0.0.1/udp/9000/quic");
let result = dht.add_node_no_trust(loopback_node).await;
assert!(result.is_ok(), "loopback should be accepted: {:?}", result);
}
#[tokio::test]
async fn test_non_loopback_unaffected_by_allow_loopback_flag() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
assert!(!dht.allow_loopback);
let normal_node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let result = dht.add_node_no_trust(normal_node).await;
assert!(
result.is_ok(),
"non-loopback should be accepted: {:?}",
result
);
}
#[tokio::test]
async fn test_testnet_config_disables_ip_diversity() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
dht.set_ip_diversity_config(IPDiversityConfig::testnet());
for i in 1..=8u8 {
let mut id = [0u8; 32];
id[0] = 0x80;
id[31] = i;
let node = NodeInfo {
id: PeerId::from_bytes(id),
addresses: vec!["/ip4/203.0.113.1/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
};
let result = dht.add_node_no_trust(node).await;
assert!(
result.is_ok(),
"node {i} from same IP should be accepted with testnet config: {:?}",
result
);
}
}
#[test]
fn test_add_node_rejects_empty_addresses() {
let mut bucket = KBucket::new(8);
let node = NodeInfo {
id: PeerId::from_bytes([1u8; 32]),
addresses: vec![],
last_seen: AtomicInstant::now(),
address_types: vec![],
};
assert!(bucket.add_node(node).is_err());
}
#[test]
fn test_add_node_truncates_excess_addresses() {
let mut bucket = KBucket::new(8);
let addresses: Vec<MultiAddr> = (1..=MAX_ADDRESSES_PER_NODE + 4)
.map(|i| format!("/ip4/10.0.0.{}/udp/9000/quic", i).parse().unwrap())
.collect();
assert!(addresses.len() > MAX_ADDRESSES_PER_NODE);
let node = NodeInfo {
id: PeerId::from_bytes([1u8; 32]),
addresses,
last_seen: AtomicInstant::now(),
address_types: vec![],
};
bucket.add_node(node).unwrap();
let stored = &bucket.get_nodes()[0].addresses;
assert_eq!(stored.len(), MAX_ADDRESSES_PER_NODE);
}
#[test]
fn test_add_node_replace_also_truncates() {
let mut bucket = KBucket::new(8);
bucket
.add_node(make_node(1, "/ip4/1.1.1.1/udp/9000/quic"))
.unwrap();
assert_eq!(bucket.get_nodes()[0].addresses.len(), 1);
let addresses: Vec<MultiAddr> = (1..=MAX_ADDRESSES_PER_NODE + 4)
.map(|i| format!("/ip4/10.0.0.{}/udp/9000/quic", i).parse().unwrap())
.collect();
let replacement = NodeInfo {
id: PeerId::from_bytes([1u8; 32]),
addresses,
last_seen: AtomicInstant::now(),
address_types: vec![],
};
bucket.add_node(replacement).unwrap();
let stored = &bucket.get_nodes().last().unwrap().addresses;
assert_eq!(stored.len(), MAX_ADDRESSES_PER_NODE);
}
fn make_node_with_addr(id_bytes: [u8; 32], address: &str) -> NodeInfo {
NodeInfo {
id: PeerId::from_bytes(id_bytes),
addresses: vec![address.parse::<MultiAddr>().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
}
}
const TEST_LIVE_THRESHOLD: Duration = Duration::from_secs(1);
const TEST_STALE_AGE: Duration = Duration::from_secs(2);
#[tokio::test]
async fn test_low_trust_candidate_still_admitted() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
let result = dht
.add_node(node, &|id| {
if *id == peer_id { 0.1 } else { 0.5 }
})
.await;
assert!(result.is_ok(), "low-trust candidate should be admitted");
assert!(dht.has_node(&peer_id).await);
}
#[tokio::test]
async fn test_duplicate_admission_updates_existing() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
dht.add_node_no_trust(node).await.unwrap();
let updated = NodeInfo {
id: peer_id,
addresses: vec!["/ip4/10.0.0.2/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
};
let result = dht.add_node_no_trust(updated).await;
assert!(result.is_ok(), "update short-circuit should succeed");
let addrs = dht.get_node_addresses(&peer_id).await;
assert_eq!(addrs.len(), 2);
assert_eq!(
addrs[0],
"/ip4/10.0.0.2/udp/9000/quic".parse::<MultiAddr>().unwrap()
);
}
#[tokio::test]
async fn test_loopback_injection_prevented_in_touch() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
dht.add_node_no_trust(node).await.unwrap();
let loopback_addr: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
dht.touch_node(&peer_id, Some(&loopback_addr)).await;
let addrs = dht.get_node_addresses(&peer_id).await;
assert_eq!(addrs.len(), 1, "loopback should not be merged");
assert_ne!(addrs[0], loopback_addr);
}
#[tokio::test]
async fn test_stale_trusted_peer_can_be_swapped() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
dht.set_live_threshold(TEST_LIVE_THRESHOLD);
let mut id_far = [0u8; 32];
id_far[0] = 0xFF;
let far_node = make_node_with_addr(id_far, "/ip4/10.0.1.1/udp/9000/quic");
dht.add_node_no_trust(far_node).await.unwrap();
let mut id_mid = [0u8; 32];
id_mid[0] = 0xFE;
dht.add_node_no_trust(make_node_with_addr(id_mid, "/ip4/10.0.1.1/udp/9001/quic"))
.await
.unwrap();
{
let mut routing = dht.routing_table_for_test().write().await;
let bucket_idx = routing
.get_bucket_index(&PeerId::from_bytes(id_far))
.unwrap();
let node = routing.buckets[bucket_idx]
.nodes
.iter_mut()
.find(|n| n.id == PeerId::from_bytes(id_far))
.unwrap();
node.last_seen.store(Instant::now() - TEST_STALE_AGE);
}
let mut id_close = [0u8; 32];
id_close[0] = 0x80;
let far_peer = PeerId::from_bytes(id_far);
let trust_fn = |peer_id: &PeerId| -> f64 { if *peer_id == far_peer { 0.8 } else { 0.5 } };
let result = dht
.add_node(
make_node_with_addr(id_close, "/ip4/10.0.1.1/udp/9002/quic"),
&trust_fn,
)
.await;
assert!(
result.is_ok(),
"stale trusted peer should be swappable: {:?}",
result
);
assert!(
!dht.has_node(&far_peer).await,
"stale far peer should be evicted"
);
assert!(dht.has_node(&PeerId::from_bytes(id_close)).await);
}
#[tokio::test]
async fn test_live_trusted_peer_holds_slot() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let mut id_far = [0u8; 32];
id_far[0] = 0xFF;
dht.add_node_no_trust(make_node_with_addr(id_far, "/ip4/10.0.1.1/udp/9000/quic"))
.await
.unwrap();
let mut id_mid = [0u8; 32];
id_mid[0] = 0xFE;
dht.add_node_no_trust(make_node_with_addr(id_mid, "/ip4/10.0.1.1/udp/9001/quic"))
.await
.unwrap();
let far_peer = PeerId::from_bytes(id_far);
let trust_fn = |peer_id: &PeerId| -> f64 { if *peer_id == far_peer { 0.8 } else { 0.5 } };
let mut id_close = [0u8; 32];
id_close[0] = 0x80;
let result = dht
.add_node(
make_node_with_addr(id_close, "/ip4/10.0.1.1/udp/9002/quic"),
&trust_fn,
)
.await;
assert!(result.is_err());
assert!(dht.has_node(&far_peer).await);
}
#[tokio::test]
async fn test_peer_added_event_on_insertion() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
let events = dht.add_node_no_trust(node).await.unwrap();
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerAdded(id) if *id == peer_id)),
"expected PeerAdded event for inserted peer"
);
}
#[tokio::test]
async fn test_peer_removed_event_on_removal() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
dht.add_node_no_trust(node).await.unwrap();
let events = dht.remove_node_by_id(&peer_id).await;
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerRemoved(id) if *id == peer_id)),
"expected PeerRemoved event for removed peer"
);
}
#[tokio::test]
async fn test_k_closest_changed_event_on_first_insertion() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let mut id = [0u8; 32];
id[31] = 0x01; let node = NodeInfo {
id: PeerId::from_bytes(id),
addresses: vec!["/ip4/10.0.0.1/udp/9000/quic".parse().unwrap()],
last_seen: AtomicInstant::now(),
address_types: vec![],
};
let events = dht.add_node_no_trust(node).await.unwrap();
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::KClosestPeersChanged { .. })),
"adding first close peer should trigger KClosestPeersChanged"
);
}
#[tokio::test]
async fn test_update_short_circuit_no_events() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
dht.add_node_no_trust(node.clone()).await.unwrap();
let events = dht.add_node_no_trust(node).await.unwrap();
assert!(
events.is_empty(),
"update short-circuit should produce no events"
);
}
#[tokio::test]
async fn test_swap_eviction_produces_both_events() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
dht.set_live_threshold(TEST_LIVE_THRESHOLD);
let mut id_far = [0u8; 32];
id_far[0] = 0xFF;
dht.add_node_no_trust(make_node_with_addr(id_far, "/ip4/10.0.1.1/udp/9000/quic"))
.await
.unwrap();
let mut id_mid = [0u8; 32];
id_mid[0] = 0xFE;
dht.add_node_no_trust(make_node_with_addr(id_mid, "/ip4/10.0.1.1/udp/9001/quic"))
.await
.unwrap();
{
let mut routing = dht.routing_table_for_test().write().await;
let bucket_idx = routing
.get_bucket_index(&PeerId::from_bytes(id_far))
.unwrap();
let node = routing.buckets[bucket_idx]
.nodes
.iter_mut()
.find(|n| n.id == PeerId::from_bytes(id_far))
.unwrap();
node.last_seen.store(Instant::now() - TEST_STALE_AGE);
}
let mut id_close = [0u8; 32];
id_close[0] = 0x80;
let far_peer = PeerId::from_bytes(id_far);
let close_peer = PeerId::from_bytes(id_close);
let result = dht
.add_node(
make_node_with_addr(id_close, "/ip4/10.0.1.1/udp/9002/quic"),
&|peer_id| if *peer_id == far_peer { 0.8 } else { 0.5 },
)
.await
.unwrap();
let events = match result {
AdmissionResult::Admitted(events) => events,
other => panic!("expected Admitted, got {:?}", other),
};
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerRemoved(id) if *id == far_peer)),
"swap should produce PeerRemoved for evicted peer"
);
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerAdded(id) if *id == close_peer)),
"swap should produce PeerAdded for new peer"
);
}
#[tokio::test]
async fn test_k_closest_changed_on_removal() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
dht.add_node_no_trust(node).await.unwrap();
let events = dht.remove_node_by_id(&peer_id).await;
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::KClosestPeersChanged { .. })),
"removing a peer should trigger KClosestPeersChanged"
);
}
#[tokio::test]
async fn test_stale_revalidation_needed_when_bucket_full_with_stale_peers() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
dht.set_ip_diversity_config(crate::security::IPDiversityConfig::testnet());
dht.set_live_threshold(TEST_LIVE_THRESHOLD);
for i in 1..=4u8 {
let mut id = [0u8; 32];
id[0] = 0x80;
id[31] = i;
dht.add_node_no_trust(make_node_with_addr(
id,
&format!("/ip4/10.0.0.{i}/udp/9000/quic"),
))
.await
.unwrap();
}
{
let mut routing = dht.routing_table_for_test().write().await;
let mut id_a = [0u8; 32];
id_a[0] = 0x80;
id_a[31] = 1;
let bucket_idx = routing.get_bucket_index(&PeerId::from_bytes(id_a)).unwrap();
for node in &mut routing.buckets[bucket_idx].nodes {
node.last_seen.store(Instant::now() - TEST_STALE_AGE);
}
}
let mut id_new = [0u8; 32];
id_new[0] = 0x80;
id_new[31] = 5;
let result = dht
.add_node(
make_node_with_addr(id_new, "/ip4/10.0.0.5/udp/9000/quic"),
&|_| DEFAULT_NEUTRAL_TRUST,
)
.await
.unwrap();
match result {
AdmissionResult::StaleRevalidationNeeded {
candidate,
candidate_ips,
candidate_bucket_idx: _,
stale_peers,
} => {
assert_eq!(candidate.id, PeerId::from_bytes(id_new));
assert!(!candidate_ips.is_empty());
assert_eq!(stale_peers.len(), 4, "all peers should be stale");
}
AdmissionResult::Admitted(_) => panic!("expected StaleRevalidationNeeded"),
}
}
#[tokio::test]
async fn test_no_stale_revalidation_when_bucket_full_no_stale() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
dht.set_ip_diversity_config(crate::security::IPDiversityConfig::testnet());
for i in 1..=4u8 {
let mut id = [0u8; 32];
id[0] = 0x80;
id[31] = i;
dht.add_node_no_trust(make_node_with_addr(
id,
&format!("/ip4/10.0.0.{i}/udp/9000/quic"),
))
.await
.unwrap();
}
let mut id_new = [0u8; 32];
id_new[0] = 0x80;
id_new[31] = 5;
let result = dht
.add_node(
make_node_with_addr(id_new, "/ip4/10.0.0.5/udp/9000/quic"),
&|_| DEFAULT_NEUTRAL_TRUST,
)
.await;
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("no stale peers"),
"error should mention no stale peers, got: {msg}"
);
}
#[tokio::test]
async fn test_re_evaluate_admission_after_eviction() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
dht.set_ip_diversity_config(crate::security::IPDiversityConfig::testnet());
for i in 1..=4u8 {
let mut id = [0u8; 32];
id[0] = 0x80;
id[31] = i;
dht.add_node_no_trust(make_node_with_addr(
id,
&format!("/ip4/10.0.0.{i}/udp/9000/quic"),
))
.await
.unwrap();
}
let mut id_a = [0u8; 32];
id_a[0] = 0x80;
id_a[31] = 1;
dht.remove_node_by_id(&PeerId::from_bytes(id_a)).await;
let mut id_new = [0u8; 32];
id_new[0] = 0x80;
id_new[31] = 5;
let candidate = make_node_with_addr(id_new, "/ip4/10.0.0.5/udp/9000/quic");
let candidate_ips = vec!["10.0.0.5".parse().unwrap()];
let events = dht
.re_evaluate_admission(candidate, &candidate_ips, &|_| DEFAULT_NEUTRAL_TRUST)
.await
.unwrap();
assert!(
events.iter().any(
|e| matches!(e, RoutingTableEvent::PeerAdded(id) if *id == PeerId::from_bytes(id_new))
),
"re-evaluation should produce PeerAdded"
);
assert!(dht.has_node(&PeerId::from_bytes(id_new)).await);
}
#[tokio::test]
async fn test_re_evaluate_admits_low_trust_candidate() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
20,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
let mut id = [0u8; 32];
id[0] = 0x80;
let candidate = make_node_with_addr(id, "/ip4/10.0.0.1/udp/9000/quic");
let candidate_ips = vec!["10.0.0.1".parse().unwrap()];
let result = dht
.re_evaluate_admission(candidate, &candidate_ips, &|_| 0.1)
.await;
assert!(
result.is_ok(),
"low-trust candidate should be admitted via re-evaluate"
);
}
#[tokio::test]
async fn test_re_evaluate_does_not_trigger_second_revalidation() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
dht.set_ip_diversity_config(crate::security::IPDiversityConfig::testnet());
dht.set_live_threshold(TEST_LIVE_THRESHOLD);
for i in 1..=4u8 {
let mut id = [0u8; 32];
id[0] = 0x80;
id[31] = i;
dht.add_node_no_trust(make_node_with_addr(
id,
&format!("/ip4/10.0.0.{i}/udp/9000/quic"),
))
.await
.unwrap();
}
{
let mut routing = dht.routing_table_for_test().write().await;
let mut id_a = [0u8; 32];
id_a[0] = 0x80;
id_a[31] = 1;
let bucket_idx = routing.get_bucket_index(&PeerId::from_bytes(id_a)).unwrap();
for node in &mut routing.buckets[bucket_idx].nodes {
node.last_seen.store(Instant::now() - TEST_STALE_AGE);
}
}
let mut id_new = [0u8; 32];
id_new[0] = 0x80;
id_new[31] = 5;
let candidate = make_node_with_addr(id_new, "/ip4/10.0.0.5/udp/9000/quic");
let candidate_ips = vec!["10.0.0.5".parse().unwrap()];
let result = dht
.re_evaluate_admission(candidate, &candidate_ips, &|_| DEFAULT_NEUTRAL_TRUST)
.await;
assert!(result.is_err());
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("no stale peers"),
"re-evaluation should not trigger another revalidation round, got: {msg}"
);
}
#[tokio::test]
async fn test_collect_stale_peers_in_bucket() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
20,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
dht.set_live_threshold(TEST_LIVE_THRESHOLD);
let mut id_fresh = [0u8; 32];
id_fresh[0] = 0x80;
id_fresh[31] = 1;
dht.add_node_no_trust(make_node_with_addr(id_fresh, "/ip4/10.0.0.1/udp/9000/quic"))
.await
.unwrap();
let mut id_stale = [0u8; 32];
id_stale[0] = 0x80;
id_stale[31] = 2;
dht.add_node_no_trust(make_node_with_addr(id_stale, "/ip4/10.0.0.2/udp/9000/quic"))
.await
.unwrap();
{
let mut routing = dht.routing_table_for_test().write().await;
let bucket_idx = routing
.get_bucket_index(&PeerId::from_bytes(id_stale))
.unwrap();
let node = routing.buckets[bucket_idx]
.nodes
.iter_mut()
.find(|n| n.id == PeerId::from_bytes(id_stale))
.unwrap();
node.last_seen.store(Instant::now() - TEST_STALE_AGE);
let stale = DhtCoreEngine::collect_stale_peers_in_bucket(
&routing,
bucket_idx,
TEST_LIVE_THRESHOLD,
);
assert_eq!(stale.len(), 1);
assert_eq!(stale[0].0, PeerId::from_bytes(id_stale));
}
}
#[tokio::test]
async fn test_generate_random_key_for_bucket_lands_in_correct_bucket() {
let local_id = PeerId::random();
let dht = DhtCoreEngine::new_for_tests(local_id).unwrap();
let test_indices: Vec<usize> = vec![0, 1, 7, 8, 15, 127, 128, 200, 255];
for bucket_idx in test_indices {
let key = dht
.generate_random_key_for_bucket(bucket_idx)
.expect("should produce a key for valid bucket index");
let distance = xor_distance_bytes(local_id.to_bytes(), key.as_bytes());
let leading_bit = leading_bit_position(&distance);
assert_eq!(
leading_bit,
Some(bucket_idx),
"key for bucket {bucket_idx} has wrong leading bit position: {leading_bit:?}"
);
}
}
#[tokio::test]
async fn test_generate_random_key_for_bucket_out_of_range() {
let dht = DhtCoreEngine::new_for_tests(PeerId::random()).unwrap();
assert!(dht.generate_random_key_for_bucket(256).is_none());
assert!(dht.generate_random_key_for_bucket(1000).is_none());
}
#[tokio::test]
async fn test_generate_random_key_for_bucket_produces_different_keys() {
let dht = DhtCoreEngine::new_for_tests(PeerId::random()).unwrap();
let mut keys = HashSet::new();
for _ in 0..10 {
let key = dht.generate_random_key_for_bucket(100).unwrap();
keys.insert(key);
}
assert!(
keys.len() > 1,
"generate_random_key_for_bucket should produce distinct keys"
);
}
#[tokio::test]
async fn test_stale_bucket_indices_returns_empty_when_fresh() {
let dht = DhtCoreEngine::new_for_tests(PeerId::random()).unwrap();
let stale = dht.stale_bucket_indices(Duration::from_secs(3600)).await;
assert!(
stale.is_empty(),
"freshly created routing table should have no stale buckets"
);
}
#[tokio::test]
async fn test_node_id_accessor() {
let id = PeerId::random();
let dht = DhtCoreEngine::new_for_tests(id).unwrap();
assert_eq!(*dht.node_id(), id);
}
fn leading_bit_position(distance: &[u8; 32]) -> Option<usize> {
for i in 0..256 {
let byte_index = i / 8;
let bit_index = 7 - (i % 8);
if (distance[byte_index] >> bit_index) & 1 == 1 {
return Some(i);
}
}
None
}
#[tokio::test]
async fn test_non_ip_transport_bypasses_diversity() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let mut id = [0u8; 32];
id[0] = 0x80;
id[31] = 1;
let bt_addr = MultiAddr::new(TransportAddr::Bluetooth {
mac: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0x01],
channel: 5,
});
let node = NodeInfo {
id: PeerId::from_bytes(id),
addresses: vec![bt_addr],
last_seen: AtomicInstant::now(),
address_types: vec![],
};
let result = dht.add_node_no_trust(node).await;
assert!(
result.is_ok(),
"non-IP transport should bypass diversity: {:?}",
result
);
assert!(dht.has_node(&PeerId::from_bytes(id)).await);
for i in 2..=5u8 {
let mut node_id = [0u8; 32];
node_id[0] = 0x80;
node_id[31] = i;
let bt = MultiAddr::new(TransportAddr::Bluetooth {
mac: [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, i],
channel: 5,
});
let n = NodeInfo {
id: PeerId::from_bytes(node_id),
addresses: vec![bt],
last_seen: AtomicInstant::now(),
address_types: vec![],
};
let r = dht.add_node_no_trust(n).await;
assert!(r.is_ok(), "Bluetooth node {i} should be admitted: {:?}", r);
}
}
#[tokio::test]
async fn test_local_lookup_excludes_self() {
let self_id = PeerId::from_bytes([0u8; 32]);
let mut dht = DhtCoreEngine::new_for_tests(self_id).unwrap();
dht.add_node_no_trust(make_node(1, "/ip4/10.0.0.1/udp/9000/quic"))
.await
.unwrap();
let results = dht
.find_nodes(&DhtKey::from_bytes([0u8; 32]), 10)
.await
.unwrap();
assert!(
results.iter().all(|n| n.id != self_id),
"self must be excluded from local lookup results"
);
assert_eq!(results.len(), 1, "expected the one added peer");
}
#[tokio::test]
async fn test_find_nodes_with_self_includes_self() {
let self_id = PeerId::from_bytes([0u8; 32]);
let mut dht = DhtCoreEngine::new_for_tests(self_id).unwrap();
dht.add_node_no_trust(make_node(1, "/ip4/10.0.0.1/udp/9000/quic"))
.await
.unwrap();
let results = dht
.find_nodes_with_self(&DhtKey::from_bytes([0u8; 32]), 10)
.await
.unwrap();
assert!(
results.iter().any(|n| n.id == self_id),
"self should be included in find_nodes_with_self results"
);
assert_eq!(results[0].id, self_id, "self should be the closest match");
}
#[tokio::test]
async fn test_peer_removal_produces_events() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let node = make_node(1, "/ip4/10.0.0.1/udp/9000/quic");
let peer_id = node.id;
dht.add_node_no_trust(node).await.unwrap();
assert!(dht.has_node(&peer_id).await);
let events = dht.remove_node_by_id(&peer_id).await;
assert!(
!dht.has_node(&peer_id).await,
"peer must be gone after removal"
);
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerRemoved(id) if *id == peer_id)),
"expected PeerRemoved event"
);
}
#[tokio::test]
async fn test_remove_absent_peer_produces_no_events() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let absent_peer = PeerId::from_bytes([99u8; 32]);
let events = dht.remove_node_by_id(&absent_peer).await;
assert!(
events.is_empty(),
"removing a peer not in the routing table should produce no events"
);
}
#[tokio::test]
async fn test_eclipse_resistance_live_trusted_peers() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let mut id_a = [0u8; 32];
id_a[0] = 0xFF;
dht.add_node_no_trust(make_node_with_addr(id_a, "/ip4/10.0.1.1/udp/9000/quic"))
.await
.unwrap();
let mut id_b = [0u8; 32];
id_b[0] = 0xFE;
dht.add_node_no_trust(make_node_with_addr(id_b, "/ip4/10.0.1.1/udp/9001/quic"))
.await
.unwrap();
let mut id_attacker = [0u8; 32];
id_attacker[0] = 0x80;
let peer_a = PeerId::from_bytes(id_a);
let peer_b = PeerId::from_bytes(id_b);
let trust_fn = |peer_id: &PeerId| -> f64 {
if *peer_id == peer_a || *peer_id == peer_b {
0.9 } else {
0.5
}
};
let result = dht
.add_node(
make_node_with_addr(id_attacker, "/ip4/10.0.1.1/udp/9002/quic"),
&trust_fn,
)
.await;
assert!(
result.is_err(),
"attacker should not displace live trusted peers"
);
assert!(dht.has_node(&peer_a).await, "peer A must survive");
assert!(dht.has_node(&peer_b).await, "peer B must survive");
}
#[tokio::test]
async fn test_stale_trusted_peer_displaced_by_closer_candidate() {
let mut dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
dht.set_live_threshold(TEST_LIVE_THRESHOLD);
let mut id_far = [0u8; 32];
id_far[0] = 0xFF;
dht.add_node_no_trust(make_node_with_addr(id_far, "/ip4/10.0.1.1/udp/9000/quic"))
.await
.unwrap();
let mut id_mid = [0u8; 32];
id_mid[0] = 0xFE;
dht.add_node_no_trust(make_node_with_addr(id_mid, "/ip4/10.0.1.1/udp/9001/quic"))
.await
.unwrap();
{
let mut routing = dht.routing_table_for_test().write().await;
let bucket_idx = routing
.get_bucket_index(&PeerId::from_bytes(id_far))
.unwrap();
let node = routing.buckets[bucket_idx]
.nodes
.iter_mut()
.find(|n| n.id == PeerId::from_bytes(id_far))
.unwrap();
node.last_seen.store(Instant::now() - TEST_STALE_AGE);
}
let far_peer = PeerId::from_bytes(id_far);
let trust_fn = |peer_id: &PeerId| -> f64 { if *peer_id == far_peer { 0.9 } else { 0.5 } };
let mut id_closer = [0u8; 32];
id_closer[0] = 0x80;
let result = dht
.add_node(
make_node_with_addr(id_closer, "/ip4/10.0.1.1/udp/9002/quic"),
&trust_fn,
)
.await;
assert!(
result.is_ok(),
"stale well-trusted peer should be displaceable: {:?}",
result
);
assert!(
!dht.has_node(&far_peer).await,
"stale peer should be evicted"
);
assert!(
dht.has_node(&PeerId::from_bytes(id_closer)).await,
"closer candidate should be admitted"
);
}
#[tokio::test]
async fn test_trust_event_for_absent_peer_does_not_affect_rt() {
let dht = DhtCoreEngine::new_for_tests(PeerId::from_bytes([0u8; 32])).unwrap();
let absent_peer = PeerId::from_bytes([42u8; 32]);
assert!(!dht.has_node(&absent_peer).await);
let size_before = dht.routing_table_size().await;
assert!(!dht.has_node(&absent_peer).await);
let size_after = dht.routing_table_size().await;
assert_eq!(size_before, size_after, "routing table should be unchanged");
}
#[tokio::test]
async fn test_trust_swap_out_replaces_lowest_trust_peer() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
let mut ids: Vec<[u8; 32]> = Vec::new();
for i in 0..4u8 {
let mut id = [0u8; 32];
id[0] = 0x80 + i; ids.push(id);
let addr = format!("/ip4/10.0.{}.1/udp/9000/quic", i);
dht.add_node(make_node_with_addr(id, &addr), &|_| 0.5)
.await
.unwrap();
}
let mut new_id = [0u8; 32];
new_id[0] = 0x84;
let new_peer = PeerId::from_bytes(new_id);
let low_trust_peer = PeerId::from_bytes(ids[2]);
let result = dht
.add_node(
make_node_with_addr(new_id, "/ip4/10.0.4.1/udp/9000/quic"),
&|id| {
if *id == low_trust_peer { 0.05 } else { 0.5 }
},
)
.await
.unwrap();
let events = match result {
AdmissionResult::Admitted(events) => events,
other => panic!("expected Admitted, got {other:?}"),
};
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerRemoved(id) if *id == low_trust_peer)),
"low-trust peer should be swapped out"
);
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerAdded(id) if *id == new_peer)),
"new candidate should be added"
);
assert!(dht.has_node(&new_peer).await);
assert!(!dht.has_node(&low_trust_peer).await);
}
#[tokio::test]
async fn test_trust_swap_out_picks_lowest_when_multiple_below_threshold() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
let mut ids: Vec<[u8; 32]> = Vec::new();
for i in 0..4u8 {
let mut id = [0u8; 32];
id[0] = 0x80 + i;
ids.push(id);
let addr = format!("/ip4/10.0.{}.1/udp/9000/quic", i);
dht.add_node(make_node_with_addr(id, &addr), &|_| 0.5)
.await
.unwrap();
}
let peer_a = PeerId::from_bytes(ids[1]); let peer_b = PeerId::from_bytes(ids[3]);
let mut new_id = [0u8; 32];
new_id[0] = 0x84;
let result = dht
.add_node(
make_node_with_addr(new_id, "/ip4/10.0.4.1/udp/9000/quic"),
&|id| {
if *id == peer_a {
0.10
} else if *id == peer_b {
0.05
} else {
0.5
}
},
)
.await
.unwrap();
let events = match result {
AdmissionResult::Admitted(events) => events,
other => panic!("expected Admitted, got {other:?}"),
};
assert!(
events
.iter()
.any(|e| matches!(e, RoutingTableEvent::PeerRemoved(id) if *id == peer_b)),
"peer with lowest trust (0.05) should be swapped out"
);
assert!(
dht.has_node(&peer_a).await,
"peer with trust 0.10 should remain (only one swap needed)"
);
}
#[tokio::test]
async fn test_no_trust_swap_when_all_peers_above_threshold() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
DEFAULT_SWAP_THRESHOLD,
)
.unwrap();
for i in 0..4u8 {
let mut id = [0u8; 32];
id[0] = 0x80 + i;
let addr = format!("/ip4/10.0.{}.1/udp/9000/quic", i);
dht.add_node(make_node_with_addr(id, &addr), &|_| 0.5)
.await
.unwrap();
}
let mut new_id = [0u8; 32];
new_id[0] = 0x84;
let result = dht
.add_node(
make_node_with_addr(new_id, "/ip4/10.0.4.1/udp/9000/quic"),
&|_| 0.5,
)
.await;
match result {
Ok(AdmissionResult::Admitted(_)) => {
panic!("should not be admitted when bucket is full with no low-trust peers")
}
Ok(AdmissionResult::StaleRevalidationNeeded { .. }) => {
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_no_trust_swap_when_threshold_is_zero() {
let mut dht = DhtCoreEngine::new(
PeerId::from_bytes([0u8; 32]),
4,
false,
0.0, )
.unwrap();
let mut ids: Vec<[u8; 32]> = Vec::new();
for i in 0..4u8 {
let mut id = [0u8; 32];
id[0] = 0x80 + i;
ids.push(id);
let addr = format!("/ip4/10.0.{}.1/udp/9000/quic", i);
dht.add_node(make_node_with_addr(id, &addr), &|_| 0.5)
.await
.unwrap();
}
let low_peer = PeerId::from_bytes(ids[0]);
let mut new_id = [0u8; 32];
new_id[0] = 0x84;
let result = dht
.add_node(
make_node_with_addr(new_id, "/ip4/10.0.4.1/udp/9000/quic"),
&|id| if *id == low_peer { 0.01 } else { 0.5 },
)
.await;
match result {
Ok(AdmissionResult::Admitted(_)) => {
panic!("should not be admitted when swap is disabled and bucket is full")
}
_ => {
}
}
assert!(dht.has_node(&low_peer).await);
}
}