#![allow(missing_docs)]
use crate::{
P2PError, PeerId, Result,
adaptive::TrustEngine,
adaptive::trust::DEFAULT_NEUTRAL_TRUST,
address::MultiAddr,
dht::core_engine::{AddressType, AtomicInstant, NodeInfo},
dht::{AdmissionResult, DhtCoreEngine, DhtKey, Key, RoutingTableEvent},
error::{DhtError, IdentityError, NetworkError},
network::{NodeConfig, NodeMode},
};
use anyhow::Context as _;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry as DashEntry;
use futures::stream::{FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::{RwLock, Semaphore, broadcast, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use uuid::Uuid;
const MIN_CONCURRENT_OPERATIONS: usize = 10;
const MAX_CANDIDATE_NODES: usize = 200;
const MAX_MESSAGE_SIZE: usize = 64 * 1024;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
const SELF_RELIABILITY_SCORE: f64 = 1.0;
pub(crate) const IDENTITY_EXCHANGE_TIMEOUT: Duration = Duration::from_secs(5);
const STALE_REVALIDATION_PING_RTT: Duration = Duration::from_secs(1);
const STALE_REVALIDATION_BUDGET: Duration =
IDENTITY_EXCHANGE_TIMEOUT.saturating_add(STALE_REVALIDATION_PING_RTT);
const PENDING_DIAL_BROADCAST_CAPACITY: usize = 16;
const MAX_CONCURRENT_REVALIDATIONS: usize = 8;
const MAX_CONCURRENT_REVALIDATION_PINGS: usize = 4;
const STALE_BUCKET_THRESHOLD: Duration = Duration::from_secs(3600);
const SELF_LOOKUP_INTERVAL_MIN: Duration = Duration::from_secs(300);
const SELF_LOOKUP_INTERVAL_MAX: Duration = Duration::from_secs(600);
const BUCKET_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
const AUTO_REBOOTSTRAP_THRESHOLD: usize = 3;
const TASK_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
const REBOOTSTRAP_COOLDOWN: Duration = Duration::from_secs(300);
const DIAL_FAILURE_CACHE_TTL: Duration = Duration::from_secs(30 * 60);
const IDENTITY_FAILURE_CACHE_TTL: Duration = Duration::from_secs(5 * 60);
const IDENTITY_MISMATCH_CACHE_TTL: Duration = Duration::from_secs(30 * 60);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DHTNode {
pub peer_id: PeerId,
pub addresses: Vec<MultiAddr>,
#[serde(default)]
pub address_types: Vec<AddressType>,
pub distance: Option<Vec<u8>>,
pub reliability: f64,
}
impl DHTNode {
pub fn typed_addresses(&self) -> Vec<(MultiAddr, AddressType)> {
self.addresses
.iter()
.enumerate()
.map(|(i, addr)| {
let ty = self
.address_types
.get(i)
.copied()
.unwrap_or(AddressType::Unverified);
(addr.clone(), ty)
})
.collect()
}
pub fn addresses_by_priority(&self) -> Vec<MultiAddr> {
let mut typed = self.typed_addresses();
typed.sort_by_key(|(_, ty)| ty.priority());
typed.into_iter().map(|(addr, _)| addr).collect()
}
pub fn merge_from(&mut self, other: DHTNode) {
while self.address_types.len() < self.addresses.len() {
self.address_types.push(AddressType::Unverified);
}
for (addr, ty) in other.typed_addresses() {
if let Some(pos) = self.addresses.iter().position(|a| a == &addr) {
if ty.priority() < self.address_types[pos].priority() {
self.address_types[pos] = ty;
}
} else {
self.addresses.push(addr);
self.address_types.push(ty);
}
}
let mut pairs: Vec<(MultiAddr, AddressType)> = self
.addresses
.drain(..)
.zip(self.address_types.drain(..))
.collect();
pairs.sort_by_key(|(_, ty)| ty.priority());
for (addr, ty) in pairs {
self.addresses.push(addr);
self.address_types.push(ty);
}
if other.reliability > self.reliability {
self.reliability = other.reliability;
}
}
}
pub type SerializableDHTNode = DHTNode;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhtNetworkConfig {
pub peer_id: PeerId,
pub node_config: NodeConfig,
pub request_timeout: Duration,
pub max_concurrent_operations: usize,
pub enable_security: bool,
pub swap_threshold: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtNetworkOperation {
FindNode { key: Key },
Ping,
Join,
Leave,
PublishAddressSet {
seq: u64,
addresses: Vec<(crate::MultiAddr, AddressType)>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtNetworkResult {
NodesFound {
key: Key,
nodes: Vec<SerializableDHTNode>,
},
PongReceived {
responder: PeerId,
latency: Duration,
},
JoinSuccess {
assigned_key: Key,
bootstrap_peers: usize,
},
LeaveSuccess,
PeerRejected,
PublishAddressAck,
Error { operation: String, error: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhtNetworkMessage {
pub message_id: String,
pub source: PeerId,
pub target: Option<PeerId>,
pub message_type: DhtMessageType,
pub payload: DhtNetworkOperation,
pub result: Option<DhtNetworkResult>,
pub timestamp: u64,
pub ttl: u8,
pub hop_count: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtMessageType {
Request,
Response,
Broadcast,
Error,
}
pub struct DhtNetworkManager {
dht: Arc<RwLock<DhtCoreEngine>>,
transport: Arc<crate::transport_handle::TransportHandle>,
trust_engine: Option<Arc<TrustEngine>>,
config: DhtNetworkConfig,
active_operations: Arc<Mutex<HashMap<String, DhtOperationContext>>>,
event_tx: broadcast::Sender<DhtNetworkEvent>,
stats: Arc<RwLock<DhtNetworkStats>>,
message_handler_semaphore: Arc<Semaphore>,
revalidation_semaphore: Arc<Semaphore>,
bucket_revalidation_active: Arc<parking_lot::Mutex<HashSet<usize>>>,
shutdown: CancellationToken,
event_handler_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
self_lookup_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
bucket_refresh_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
last_rebootstrap: tokio::sync::Mutex<Option<Instant>>,
dial_failure_cache: Arc<DialFailureCache>,
identity_failure_cache: Arc<IdentityFailureCache>,
pending_peer_dials: Arc<DashMap<PeerId, broadcast::Sender<PendingDialOutcome>>>,
}
#[derive(Clone, Debug)]
enum PendingDialOutcome {
Connected,
DialFailed { candidates_count: usize },
IdentityFailed { err: String },
IdentityMismatch { actual: PeerId },
}
impl PendingDialOutcome {
fn into_result(self, peer_id: &PeerId) -> Result<()> {
let peer_hex = peer_id.to_hex();
match self {
Self::Connected => Ok(()),
Self::DialFailed { candidates_count } => {
Err(P2PError::Network(NetworkError::PeerNotFound(
format!(
"failed to dial {} at any of {} candidate address(es)",
peer_hex, candidates_count
)
.into(),
)))
}
Self::IdentityFailed { err } => Err(P2PError::Network(NetworkError::ProtocolError(
format!("identity exchange with {} failed: {}", peer_hex, err).into(),
))),
Self::IdentityMismatch { actual } => {
Err(P2PError::Identity(IdentityError::IdentityMismatch {
expected: peer_hex.into(),
actual: actual.to_hex().into(),
}))
}
}
}
}
#[derive(Debug, Default)]
struct DialFailureCache {
entries: DashMap<SocketAddr, Instant>,
}
impl DialFailureCache {
fn new() -> Self {
Self::default()
}
fn is_failed(&self, addr: &SocketAddr) -> bool {
match self.entries.entry(*addr) {
DashEntry::Occupied(entry) => {
if entry.get().elapsed() < DIAL_FAILURE_CACHE_TTL {
true
} else {
entry.remove();
false
}
}
DashEntry::Vacant(_) => false,
}
}
fn record_failure(&self, addr: SocketAddr) {
self.entries.insert(addr, Instant::now());
}
fn clear(&self, addr: &SocketAddr) {
self.entries.remove(addr);
}
}
#[derive(Debug, Default)]
struct IdentityFailureCache {
entries: DashMap<PeerId, Instant>,
}
impl IdentityFailureCache {
fn new() -> Self {
Self::default()
}
fn is_failed(&self, peer_id: &PeerId) -> bool {
match self.entries.entry(*peer_id) {
DashEntry::Occupied(entry) => {
if Instant::now() < *entry.get() {
true
} else {
entry.remove();
false
}
}
DashEntry::Vacant(_) => false,
}
}
fn record_failure(&self, peer_id: PeerId) {
self.entries
.insert(peer_id, Instant::now() + IDENTITY_FAILURE_CACHE_TTL);
}
fn record_mismatch(&self, peer_id: PeerId) {
self.entries
.insert(peer_id, Instant::now() + IDENTITY_MISMATCH_CACHE_TTL);
}
fn clear(&self, peer_id: &PeerId) {
self.entries.remove(peer_id);
}
}
struct DhtOperationContext {
operation: DhtNetworkOperation,
peer_id: PeerId,
started_at: Instant,
timeout: Duration,
contacted_nodes: Vec<PeerId>,
response_tx: Option<oneshot::Sender<(PeerId, DhtNetworkResult)>>,
}
impl std::fmt::Debug for DhtOperationContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DhtOperationContext")
.field("operation", &self.operation)
.field("peer_id", &self.peer_id)
.field("started_at", &self.started_at)
.field("timeout", &self.timeout)
.field("contacted_nodes", &self.contacted_nodes)
.field("response_tx", &self.response_tx.is_some())
.finish()
}
}
#[derive(Debug, Clone)]
pub enum DhtNetworkEvent {
PeerDiscovered { peer_id: PeerId, dht_key: Key },
PeerDisconnected { peer_id: PeerId },
KClosestPeersChanged {
old: Vec<PeerId>,
new: Vec<PeerId>,
},
PeerAdded { peer_id: PeerId },
PeerRemoved { peer_id: PeerId },
RoutingTableReady { num_peers: usize },
BootstrapComplete { num_peers: usize },
OperationCompleted {
operation: String,
success: bool,
duration: Duration,
},
NetworkStatusChanged {
connected_peers: usize,
routing_table_size: usize,
},
Error { error: String },
}
#[derive(Debug, Clone, Default)]
pub struct DhtNetworkStats {
pub total_operations: u64,
pub successful_operations: u64,
pub failed_operations: u64,
pub avg_operation_latency: Duration,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connected_peers: usize,
pub routing_table_size: usize,
}
struct BucketRevalidationGuard {
active: Arc<parking_lot::Mutex<HashSet<usize>>>,
bucket_idx: usize,
}
impl Drop for BucketRevalidationGuard {
fn drop(&mut self) {
self.active.lock().remove(&self.bucket_idx);
}
}
const QUORUM_TOP_N: usize = 3;
const QUORUM_THRESHOLD: usize = 2;
type SubjectReports = HashMap<PeerId, DHTNode>;
fn best_tier_priority(node: &DHTNode) -> u8 {
node.typed_addresses()
.iter()
.map(|(_, t)| t.priority())
.min()
.unwrap_or(u8::MAX)
}
fn report_signature(node: &DHTNode) -> Vec<(MultiAddr, u8)> {
let mut sig: Vec<(MultiAddr, u8)> = node
.typed_addresses()
.into_iter()
.map(|(addr, t)| (addr, t.priority()))
.collect();
sig.sort_by_key(|a| a.0.to_string());
sig
}
fn compute_winner<'a>(
subject_id: &PeerId,
reports: &'a SubjectReports,
) -> Option<(PeerId, &'a DHTNode)> {
if reports.is_empty() {
return None;
}
if let Some(node) = reports.get(subject_id) {
return Some((*subject_id, node));
}
let mut by_dist: Vec<(PeerId, &DHTNode, Key, u8)> = reports
.iter()
.map(|(rid, node)| {
(
*rid,
node,
rid.xor_distance(subject_id),
best_tier_priority(node),
)
})
.collect();
by_dist.sort_by(|a, b| a.2.cmp(&b.2).then(a.3.cmp(&b.3)));
let top_n = &by_dist[..by_dist.len().min(QUORUM_TOP_N)];
if top_n.len() >= QUORUM_THRESHOLD {
let mut buckets: HashMap<Vec<(MultiAddr, u8)>, Vec<PeerId>> = HashMap::new();
for (rid, node, _, _) in top_n {
buckets
.entry(report_signature(node))
.or_default()
.push(*rid);
}
if let Some(group) = buckets.values().find(|g| g.len() >= QUORUM_THRESHOLD)
&& let Some(winner_rid) = group
.iter()
.copied()
.min_by_key(|rid| rid.xor_distance(subject_id))
{
return reports.get(&winner_rid).map(|node| (winner_rid, node));
}
}
let (rid, node, _, _) = by_dist.first()?;
Some((*rid, *node))
}
impl DhtNetworkManager {
fn new_from_components(
transport: Arc<crate::transport_handle::TransportHandle>,
trust_engine: Option<Arc<TrustEngine>>,
config: DhtNetworkConfig,
) -> Result<Self> {
let mut dht_instance = DhtCoreEngine::new(
config.peer_id,
config.node_config.dht_config.k_value,
config.node_config.allow_loopback,
config.swap_threshold,
)
.map_err(|e| P2PError::Dht(DhtError::OperationFailed(e.to_string().into())))?;
if let Some(diversity) = &config.node_config.diversity_config {
dht_instance.set_ip_diversity_config(diversity.clone());
}
let dht = Arc::new(RwLock::new(dht_instance));
let (event_tx, _) = broadcast::channel(crate::DEFAULT_EVENT_CHANNEL_CAPACITY);
let message_handler_semaphore = Arc::new(Semaphore::new(
config
.max_concurrent_operations
.max(MIN_CONCURRENT_OPERATIONS),
));
Ok(Self {
dht,
transport,
trust_engine,
config,
active_operations: Arc::new(Mutex::new(HashMap::new())),
event_tx,
stats: Arc::new(RwLock::new(DhtNetworkStats::default())),
message_handler_semaphore,
revalidation_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REVALIDATIONS)),
bucket_revalidation_active: Arc::new(parking_lot::Mutex::new(HashSet::new())),
shutdown: CancellationToken::new(),
event_handler_handle: Arc::new(RwLock::new(None)),
self_lookup_handle: Arc::new(RwLock::new(None)),
bucket_refresh_handle: Arc::new(RwLock::new(None)),
last_rebootstrap: tokio::sync::Mutex::new(None),
dial_failure_cache: Arc::new(DialFailureCache::new()),
identity_failure_cache: Arc::new(IdentityFailureCache::new()),
pending_peer_dials: Arc::new(DashMap::new()),
})
}
pub fn k_value(&self) -> usize {
self.config.node_config.dht_config.k_value
}
async fn handle_find_node_request(
&self,
key: &Key,
requester: &PeerId,
) -> Result<DhtNetworkResult> {
trace!(
"FIND_NODE: resolving closer nodes for key {}",
hex::encode(key)
);
let candidate_nodes = self.find_closest_nodes_local(key, self.k_value()).await;
let closer_nodes = Self::filter_response_nodes(candidate_nodes, requester);
for node in &closer_nodes {
let addrs: Vec<String> = node.addresses.iter().map(|a| format!("{}", a)).collect();
debug!(
"FIND_NODE response: peer={} addresses={:?}",
node.peer_id.to_hex(),
addrs
);
}
Ok(DhtNetworkResult::NodesFound {
key: *key,
nodes: closer_nodes,
})
}
pub async fn new(
transport: Arc<crate::transport_handle::TransportHandle>,
trust_engine: Option<Arc<TrustEngine>>,
mut config: DhtNetworkConfig,
) -> Result<Self> {
let transport_app_peer_id = transport.peer_id();
if config.peer_id == PeerId::from_bytes([0u8; 32]) {
config.peer_id = transport_app_peer_id;
} else if config.peer_id != transport_app_peer_id {
warn!(
"DHT config peer_id ({}) differs from transport peer_id ({}); using config value",
config.peer_id.to_hex(),
transport_app_peer_id.to_hex()
);
}
info!(
"Creating attached DHT Network Manager for peer: {}",
config.peer_id.to_hex()
);
let manager = Self::new_from_components(transport, trust_engine, config)?;
info!("Attached DHT Network Manager created successfully");
Ok(manager)
}
pub async fn start(self: &Arc<Self>) -> Result<()> {
info!("Starting DHT Network Manager...");
self.start_network_event_handler(Arc::clone(self)).await?;
self.reconcile_connected_peers().await;
self.spawn_self_lookup_task().await;
self.spawn_bucket_refresh_task().await;
info!("DHT Network Manager started successfully");
Ok(())
}
async fn spawn_self_lookup_task(self: &Arc<Self>) {
let this = Arc::clone(self);
let shutdown = self.shutdown.clone();
let handle_slot = Arc::clone(&self.self_lookup_handle);
let handle = tokio::spawn(async move {
loop {
let interval =
Self::randomised_interval(SELF_LOOKUP_INTERVAL_MIN, SELF_LOOKUP_INTERVAL_MAX);
tokio::select! {
() = tokio::time::sleep(interval) => {}
() = shutdown.cancelled() => break,
}
tokio::select! {
() = shutdown.cancelled() => break,
_ = async {
if let Err(e) = this.trigger_self_lookup().await {
warn!("Periodic self-lookup failed: {e}");
}
this.revalidate_stale_k_closest().await;
this.maybe_rebootstrap().await;
} => {}
}
}
});
*handle_slot.write().await = Some(handle);
}
async fn spawn_bucket_refresh_task(self: &Arc<Self>) {
let this = Arc::clone(self);
let shutdown = self.shutdown.clone();
let handle_slot = Arc::clone(&self.bucket_refresh_handle);
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = tokio::time::sleep(BUCKET_REFRESH_INTERVAL) => {}
() = shutdown.cancelled() => break,
}
let shutdown_ref = &shutdown;
tokio::select! {
() = shutdown.cancelled() => break,
_ = async {
let stale_indices = this
.dht
.read()
.await
.stale_bucket_indices(STALE_BUCKET_THRESHOLD)
.await;
if stale_indices.is_empty() {
trace!("Bucket refresh: no stale buckets");
return;
}
debug!("Bucket refresh: {} stale buckets", stale_indices.len());
let k = this.k_value();
for bucket_idx in stale_indices {
if shutdown_ref.is_cancelled() {
break;
}
let random_key = {
let dht = this.dht.read().await;
dht.generate_random_key_for_bucket(bucket_idx)
};
let Some(key) = random_key else {
continue;
};
let key_bytes: Key = *key.as_bytes();
match this.find_closest_nodes_network(&key_bytes, k).await {
Ok(nodes) => {
trace!(
"Bucket refresh[{bucket_idx}]: discovered {} peers",
nodes.len()
);
for dht_node in nodes {
if dht_node.peer_id == this.config.peer_id {
continue;
}
this.dial_addresses(&dht_node.peer_id, &dht_node.typed_addresses())
.await;
}
}
Err(e) => {
debug!("Bucket refresh[{bucket_idx}] lookup failed: {e}");
}
}
}
this.maybe_rebootstrap().await;
} => {}
}
}
});
*handle_slot.write().await = Some(handle);
}
pub async fn trigger_self_lookup(&self) -> Result<()> {
let self_id = self.config.peer_id;
let self_key: Key = *self_id.as_bytes();
let k = self.k_value();
match self.find_closest_nodes_network(&self_key, k).await {
Ok(nodes) => {
debug!("Self-lookup discovered {} peers", nodes.len());
for dht_node in nodes {
if dht_node.peer_id == self_id {
continue;
}
self.dial_addresses(&dht_node.peer_id, &dht_node.typed_addresses())
.await;
}
Ok(())
}
Err(e) => {
debug!("Self-lookup failed: {e}");
Err(e)
}
}
}
async fn maybe_rebootstrap(&self) {
let rt_size = self.get_routing_table_size().await;
if rt_size >= AUTO_REBOOTSTRAP_THRESHOLD {
return;
}
{
let mut guard = self.last_rebootstrap.lock().await;
if let Some(last) = *guard
&& last.elapsed() < REBOOTSTRAP_COOLDOWN
{
trace!(
"Auto re-bootstrap skipped: cooldown ({:?} remaining)",
REBOOTSTRAP_COOLDOWN.saturating_sub(last.elapsed())
);
return;
}
*guard = Some(Instant::now());
}
info!(
"Auto re-bootstrap: routing table size ({rt_size}) below threshold ({})",
AUTO_REBOOTSTRAP_THRESHOLD
);
let connected = self.transport.connected_peers().await;
if connected.is_empty() {
debug!("Auto re-bootstrap: no connected peers to bootstrap from");
return;
}
match self.bootstrap_from_peers(&connected).await {
Ok(discovered) => {
info!("Auto re-bootstrap discovered {discovered} peers");
}
Err(e) => {
warn!("Auto re-bootstrap failed: {e}");
}
}
}
fn randomised_interval(min: Duration, max: Duration) -> Duration {
let range_secs = max.as_secs().saturating_sub(min.as_secs());
if range_secs == 0 {
return min;
}
let random_bytes = PeerId::random();
let bytes = random_bytes.to_bytes();
let random_value = u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]);
let jitter = Duration::from_secs(random_value % (range_secs + 1));
min + jitter
}
pub async fn bootstrap_from_peers(&self, peers: &[PeerId]) -> Result<usize> {
let key = *self.config.peer_id.as_bytes();
let mut seen = HashSet::new();
let mut to_dial: Vec<(PeerId, Vec<(MultiAddr, AddressType)>)> = Vec::new();
for peer_id in peers {
let op = DhtNetworkOperation::FindNode { key };
match self.send_dht_request(peer_id, op, None).await {
Ok(DhtNetworkResult::NodesFound { nodes, .. }) => {
for node in &nodes {
let typed = node.typed_addresses();
let dialable_count =
typed.iter().filter(|(a, _)| Self::is_dialable(a)).count();
debug!(
"DHT bootstrap: peer={} num_addresses={} dialable={}",
node.peer_id.to_hex(),
node.addresses.len(),
dialable_count
);
self.merge_gossiped_typed_addresses(node).await;
if seen.insert(node.peer_id) && dialable_count > 0 {
to_dial.push((node.peer_id, typed));
}
}
}
Ok(_) => {}
Err(e) => {
warn!("Bootstrap FIND_NODE to {} failed: {}", peer_id.to_hex(), e);
}
}
}
if matches!(self.config.node_config.mode, NodeMode::Client) {
debug!(
"DHT bootstrap: client mode — skipping {} gossiped-peer dial(s)",
to_dial.len()
);
} else {
for (peer_id, typed) in to_dial {
self.dial_addresses(&peer_id, &typed).await;
}
}
let rt_size = self.get_routing_table_size().await;
if self.event_tx.receiver_count() > 0 {
let _ = self
.event_tx
.send(DhtNetworkEvent::RoutingTableReady { num_peers: rt_size });
}
info!("Routing table ready: {rt_size} peers (reachability classification pending)");
Ok(seen.len())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping DHT Network Manager...");
self.leave_network().await?;
self.shutdown.cancel();
self.dht.read().await.signal_shutdown();
async fn join_task(name: &str, slot: &RwLock<Option<tokio::task::JoinHandle<()>>>) {
if let Some(mut handle) = slot.write().await.take() {
match tokio::time::timeout(TASK_SHUTDOWN_TIMEOUT, &mut handle).await {
Ok(Ok(())) => debug!("{name} task stopped cleanly"),
Ok(Err(e)) if e.is_cancelled() => debug!("{name} task was cancelled"),
Ok(Err(e)) => warn!("{name} task panicked: {e}"),
Err(_) => {
warn!(
"{name} task did not stop within {}s, aborting",
TASK_SHUTDOWN_TIMEOUT.as_secs()
);
handle.abort();
}
}
}
}
join_task("event handler", &self.event_handler_handle).await;
join_task("self-lookup", &self.self_lookup_handle).await;
join_task("bucket refresh", &self.bucket_refresh_handle).await;
info!("DHT Network Manager stopped");
Ok(())
}
pub async fn find_closest_nodes(&self, key: &Key, count: usize) -> Result<Vec<DHTNode>> {
self.find_closest_nodes_network(key, count).await
}
pub async fn find_node(&self, key: &Key) -> Result<DhtNetworkResult> {
info!("Finding nodes closest to key: {}", hex::encode(key));
let closest_nodes = self.find_closest_nodes_network(key, self.k_value()).await?;
let serializable_nodes: Vec<SerializableDHTNode> = closest_nodes.into_iter().collect();
info!(
"Found {} nodes closest to key: {}",
serializable_nodes.len(),
hex::encode(key)
);
Ok(DhtNetworkResult::NodesFound {
key: *key,
nodes: serializable_nodes,
})
}
pub async fn ping(&self, peer_id: &PeerId) -> Result<DhtNetworkResult> {
info!("Pinging peer: {}", peer_id.to_hex());
let start_time = Instant::now();
let operation = DhtNetworkOperation::Ping;
match self.send_dht_request(peer_id, operation, None).await {
Ok(DhtNetworkResult::PongReceived { responder, .. }) => {
let latency = start_time.elapsed();
info!("Received pong from {} in {:?}", responder, latency);
Ok(DhtNetworkResult::PongReceived { responder, latency })
}
Ok(result) => {
warn!("Unexpected ping result: {:?}", result);
Err(P2PError::Dht(crate::error::DhtError::RoutingError(
"Unexpected ping response".to_string().into(),
)))
}
Err(e) => {
warn!("Ping failed to {}: {}", peer_id.to_hex(), e);
Err(e)
}
}
}
async fn leave_network(&self) -> Result<()> {
Ok(())
}
pub async fn find_closest_nodes_local(&self, key: &Key, count: usize) -> Vec<DHTNode> {
debug!(
"[LOCAL] Finding {} closest nodes to key: {}",
count,
hex::encode(key)
);
let dht_guard = self.dht.read().await;
match dht_guard.find_nodes(&DhtKey::from_bytes(*key), count).await {
Ok(nodes) => nodes
.into_iter()
.filter(|node| !self.is_local_peer_id(&node.id))
.map(|node| DHTNode {
peer_id: node.id,
address_types: node.address_types,
addresses: node.addresses,
distance: None,
reliability: SELF_RELIABILITY_SCORE,
})
.collect(),
Err(e) => {
warn!("find_nodes failed for key {}: {e}", hex::encode(key));
Vec::new()
}
}
}
pub async fn find_closest_nodes_local_with_self(
&self,
key: &Key,
count: usize,
) -> Vec<DHTNode> {
let mut nodes = self.find_closest_nodes_local(key, count).await;
nodes.push(self.local_dht_node().await);
let key_peer = PeerId::from_bytes(*key);
nodes.sort_by(|a, b| {
let da = a.peer_id.xor_distance(&key_peer);
let db = b.peer_id.xor_distance(&key_peer);
da.cmp(&db)
});
nodes.truncate(count);
nodes
}
pub async fn find_closest_nodes_network(
&self,
key: &Key,
count: usize,
) -> Result<Vec<DHTNode>> {
const MAX_ITERATIONS: usize = 20;
const ALPHA: usize = 3;
debug!(
"[NETWORK] Finding {} closest nodes to key: {}",
count,
hex::encode(key)
);
let target_key = DhtKey::from_bytes(*key);
let mut queried_nodes: HashSet<PeerId> = HashSet::new();
let mut best_nodes: Vec<DHTNode> = Vec::new();
best_nodes.push(self.local_dht_node().await);
self.mark_self_queried(&mut queried_nodes);
let mut candidates: BTreeMap<(Key, PeerId), DHTNode> = BTreeMap::new();
let mut subject_reports: HashMap<PeerId, SubjectReports> = HashMap::new();
let initial = self.find_closest_nodes_local(key, count).await;
for node in initial {
if !queried_nodes.contains(&node.peer_id) {
let dist = node.peer_id.distance(&target_key);
candidates.entry((dist, node.peer_id)).or_insert(node);
}
}
let mut previous_top_k: Vec<PeerId> = Vec::new();
for iteration in 0..MAX_ITERATIONS {
if candidates.is_empty() {
debug!(
"[NETWORK] No more candidates after {} iterations",
iteration
);
break;
}
let mut batch: Vec<DHTNode> = Vec::new();
while batch.len() < ALPHA {
let Some(entry) = candidates.first_entry() else {
break;
};
let node = entry.remove();
if queried_nodes.contains(&node.peer_id) {
continue;
}
batch.push(node);
}
if batch.is_empty() {
debug!(
"[NETWORK] All candidates queried after {} iterations",
iteration
);
break;
}
info!(
"[NETWORK] Iteration {}: querying {} nodes",
iteration,
batch.len()
);
let query_stream: FuturesUnordered<_> = batch
.iter()
.map(|node| {
let peer_id = node.peer_id;
let typed = node.typed_addresses();
let op = DhtNetworkOperation::FindNode { key: *key };
async move {
let _ = self.ensure_peer_channel(&peer_id, &typed).await;
(
peer_id,
self.send_dht_request(&peer_id, op, Some(&typed)).await,
)
}
})
.collect();
let results = Self::collect_iteration_results(query_stream).await;
for (peer_id, result) in results {
queried_nodes.insert(peer_id);
match result {
Ok(DhtNetworkResult::NodesFound { mut nodes, .. }) => {
if let Some(queried_node) = batch.iter().find(|n| n.peer_id == peer_id) {
best_nodes.push(queried_node.clone());
}
nodes.sort_by(|a, b| Self::compare_node_distance(a, b, key));
nodes.truncate(self.k_value());
for node in nodes {
if queried_nodes.contains(&node.peer_id)
|| self.is_local_peer_id(&node.peer_id)
{
continue;
}
self.merge_gossiped_typed_addresses(&node).await;
let subject_id = node.peer_id;
let dist = subject_id.distance(&target_key);
let cand_key = (dist, subject_id);
let reports = subject_reports.entry(subject_id).or_default();
reports.insert(peer_id, node);
let winner_node = match compute_winner(&subject_id, reports) {
Some((_, node)) => node.clone(),
None => continue,
};
if let std::collections::btree_map::Entry::Occupied(mut e) =
candidates.entry(cand_key)
{
e.insert(winner_node);
continue;
}
if candidates.len() >= MAX_CANDIDATE_NODES {
let farthest_key = candidates.keys().next_back().copied();
match farthest_key {
Some(fk) if cand_key < fk => {
candidates.remove(&fk);
subject_reports.remove(&fk.1);
}
_ => {
trace!(
"[NETWORK] Candidate queue at capacity ({}), dropping {}",
MAX_CANDIDATE_NODES,
subject_id.to_hex()
);
continue;
}
}
}
candidates.insert(cand_key, winner_node);
}
}
Ok(DhtNetworkResult::PeerRejected) => {
info!(
"[NETWORK] Peer {} rejected us — removing from routing table",
peer_id.to_hex()
);
let mut dht = self.dht.write().await;
let rt_events = dht.remove_node_by_id(&peer_id).await;
drop(dht);
self.broadcast_routing_events(&rt_events);
let _ = self.transport.disconnect_peer(&peer_id).await;
}
Ok(_) => {
if let Some(queried_node) = batch.iter().find(|n| n.peer_id == peer_id) {
best_nodes.push(queried_node.clone());
}
}
Err(e) => {
trace!("[NETWORK] Query to {} failed: {}", peer_id.to_hex(), e);
}
}
}
best_nodes.sort_by(|a, b| Self::compare_node_distance(a, b, key));
best_nodes.dedup_by_key(|n| n.peer_id);
best_nodes.truncate(count);
let current_top_k: Vec<PeerId> = best_nodes.iter().map(|n| n.peer_id).collect();
if current_top_k == previous_top_k {
if best_nodes.len() < count && !candidates.is_empty() {
previous_top_k = current_top_k;
continue;
}
let has_promising_candidate = best_nodes.last().is_some_and(|worst| {
let worst_dist = worst.peer_id.distance(&target_key);
candidates
.keys()
.next()
.is_some_and(|(dist, _)| *dist < worst_dist)
});
if !has_promising_candidate {
info!(
"[NETWORK] {}: Top-K converged after {} iterations",
self.config.peer_id.to_hex(),
iteration + 1
);
break;
}
}
previous_top_k = current_top_k;
}
best_nodes.sort_by(|a, b| Self::compare_node_distance(a, b, key));
best_nodes.dedup_by_key(|n| n.peer_id);
best_nodes.truncate(count);
info!(
"[NETWORK] Found {} closest nodes: {:?}",
best_nodes.len(),
best_nodes
.iter()
.map(|n| {
let h = n.peer_id.to_hex();
h[..8.min(h.len())].to_string()
})
.collect::<Vec<_>>()
);
Ok(best_nodes)
}
fn compare_node_distance(a: &DHTNode, b: &DHTNode, key: &Key) -> std::cmp::Ordering {
let target_key = DhtKey::from_bytes(*key);
a.peer_id
.distance(&target_key)
.cmp(&b.peer_id.distance(&target_key))
}
async fn collect_iteration_results<S>(mut stream: S) -> Vec<(PeerId, Result<DhtNetworkResult>)>
where
S: futures::Stream<Item = (PeerId, Result<DhtNetworkResult>)> + Unpin,
{
let mut results = Vec::new();
let Some(first) = stream.next().await else {
return results;
};
results.push(first);
let grace = Duration::from_secs(ITERATION_GRACE_TIMEOUT_SECS);
let _ = tokio::time::timeout(grace, async {
while let Some(next) = stream.next().await {
results.push(next);
}
})
.await;
results
}
fn filter_response_nodes(
candidate_nodes: Vec<DHTNode>,
requester_peer_id: &PeerId,
) -> Vec<DHTNode> {
candidate_nodes
.into_iter()
.filter(|node| node.peer_id != *requester_peer_id)
.collect()
}
async fn local_dht_node(&self) -> DHTNode {
let mut addresses: Vec<MultiAddr> = Vec::new();
for observed in self.transport.direct_external_addresses() {
let resolved = MultiAddr::quic(observed);
if !addresses.contains(&resolved) {
addresses.push(resolved);
}
}
for la in self.transport.listen_addrs().await {
let Some(sa) = la.dialable_socket_addr() else {
continue;
};
if sa.port() == 0 || sa.ip().is_unspecified() {
continue;
}
let resolved = MultiAddr::quic(sa);
if !addresses.contains(&resolved) {
addresses.push(resolved);
}
}
DHTNode {
peer_id: self.config.peer_id,
addresses,
address_types: Vec::new(), distance: None,
reliability: SELF_RELIABILITY_SCORE,
}
}
fn mark_self_queried(&self, queried: &mut HashSet<PeerId>) {
queried.insert(self.config.peer_id);
}
pub(crate) fn first_direct_dialable(node: &DHTNode) -> Option<MultiAddr> {
for (i, addr) in node.addresses.iter().enumerate() {
let addr_type = node
.address_types
.get(i)
.copied()
.unwrap_or(AddressType::Unverified);
if addr_type != AddressType::Direct {
continue;
}
let Some(sa) = addr.dialable_socket_addr() else {
continue;
};
if sa.ip().is_unspecified() {
continue;
}
return Some(addr.clone());
}
None
}
fn is_dialable(addr: &MultiAddr) -> bool {
let Some(sa) = addr.dialable_socket_addr() else {
trace!("Skipping non-dialable address: {addr}");
return false;
};
if sa.ip().is_unspecified() {
warn!("Rejecting unspecified address: {addr}");
return false;
}
if sa.ip().is_loopback() {
trace!("Accepting loopback address (local/test): {addr}");
}
true
}
async fn dial_addresses(
&self,
peer_id: &PeerId,
typed_addresses: &[(MultiAddr, AddressType)],
) -> Option<String> {
if self.transport.is_peer_connected(peer_id).await {
trace!(
"dial_addresses: peer {} already connected, skipping dial",
peer_id.to_hex()
);
return None;
}
let plan = Self::select_dial_candidates(typed_addresses);
if plan.is_empty() {
debug!(
"dial_addresses: no dialable addresses for {}",
peer_id.to_hex()
);
return None;
}
let mut attempted = 0usize;
let mut skipped_cached = 0usize;
for (addr, _ty) in &plan {
attempted += 1;
let Some(socket_addr) = addr.dialable_socket_addr() else {
continue;
};
if self.dial_failure_cache.is_failed(&socket_addr) {
skipped_cached += 1;
trace!(
"dial_addresses: skipping recently failed address {} for {}",
addr,
peer_id.to_hex()
);
continue;
}
match self.dial_candidate(peer_id, addr).await {
Some(channel_id) => {
self.dial_failure_cache.clear(&socket_addr);
return Some(channel_id);
}
None => {
self.dial_failure_cache.record_failure(socket_addr);
}
}
}
debug!(
"dial_addresses: all {} attempted address(es) failed for {} ({} skipped from failure cache)",
attempted,
peer_id.to_hex(),
skipped_cached
);
None
}
async fn record_peer_failure(&self, peer_id: &PeerId) {
if let Some(ref engine) = self.trust_engine {
engine.update_node_stats(
peer_id,
crate::adaptive::NodeStatisticsUpdate::FailedResponse,
);
}
}
async fn ensure_peer_channel(
&self,
peer_id: &PeerId,
candidates: &[(MultiAddr, AddressType)],
) -> Result<()> {
if self.transport.is_peer_connected(peer_id).await {
return Ok(());
}
let local_hex = self.config.peer_id.to_hex();
let peer_hex = peer_id.to_hex();
let tx = match self.pending_peer_dials.entry(*peer_id) {
DashEntry::Occupied(entry) => {
let mut rx = entry.get().subscribe();
drop(entry);
debug!(
"[STEP 1b] {} -> {}: joining in-flight dial",
local_hex, peer_hex
);
return match rx.recv().await {
Ok(outcome) => outcome.into_result(peer_id),
Err(_) => Err(P2PError::Network(NetworkError::PeerNotFound(
format!(
"in-flight dial to {} dropped before producing a result",
peer_hex
)
.into(),
))),
};
}
DashEntry::Vacant(entry) => {
if self.identity_failure_cache.is_failed(peer_id) {
debug!(
"[STEP 1b] {} -> {}: suppressed by identity-failure cache",
local_hex, peer_hex
);
return Err(P2PError::Network(NetworkError::ProtocolError(
format!(
"identity exchange with {} suppressed (recent failure)",
peer_hex
)
.into(),
)));
}
let (tx, _) = broadcast::channel(PENDING_DIAL_BROADCAST_CAPACITY);
entry.insert(tx.clone());
tx
}
};
struct DialGuard<'a> {
map: &'a DashMap<PeerId, broadcast::Sender<PendingDialOutcome>>,
peer_id: PeerId,
cleared: bool,
}
impl<'a> Drop for DialGuard<'a> {
fn drop(&mut self) {
if !self.cleared {
self.map.remove(&self.peer_id);
}
}
}
let mut guard = DialGuard {
map: &self.pending_peer_dials,
peer_id: *peer_id,
cleared: false,
};
let outcome = self
.run_owned_dial(peer_id, candidates, &local_hex, &peer_hex)
.await;
if matches!(outcome, PendingDialOutcome::IdentityMismatch { .. }) {
let rt_events = {
let mut dht = self.dht.write().await;
dht.remove_node_by_id(peer_id).await
};
self.broadcast_routing_events(&rt_events);
}
self.pending_peer_dials.remove(peer_id);
guard.cleared = true;
let _ = tx.send(outcome.clone());
outcome.into_result(peer_id)
}
async fn run_owned_dial(
&self,
peer_id: &PeerId,
candidates: &[(MultiAddr, AddressType)],
local_hex: &str,
peer_hex: &str,
) -> PendingDialOutcome {
info!(
"[STEP 1b] {} -> {}: No open channel, trying {} dialable address(es)",
local_hex,
peer_hex,
candidates.len()
);
let Some(channel_id) = self.dial_addresses(peer_id, candidates).await else {
warn!(
"[STEP 1b] {} -> {}: dial failed for all {} candidate address(es)",
local_hex,
peer_hex,
candidates.len()
);
self.record_peer_failure(peer_id).await;
return PendingDialOutcome::DialFailed {
candidates_count: candidates.len(),
};
};
let identity_timeout = self.config.request_timeout.min(IDENTITY_EXCHANGE_TIMEOUT);
match self
.transport
.wait_for_peer_identity(&channel_id, identity_timeout)
.await
{
Ok(authenticated) if &authenticated == peer_id => {
debug!(
"[STEP 1b] {} -> {}: identity confirmed ({})",
local_hex,
peer_hex,
authenticated.to_hex()
);
self.identity_failure_cache.clear(peer_id);
PendingDialOutcome::Connected
}
Ok(authenticated) => {
warn!(
"[STEP 1b] {} -> {}: identity MISMATCH — authenticated as {}. \
Routing table entry may be stale.",
local_hex,
peer_hex,
authenticated.to_hex()
);
self.identity_failure_cache.record_mismatch(*peer_id);
PendingDialOutcome::IdentityMismatch {
actual: authenticated,
}
}
Err(e) => {
warn!(
"[STEP 1b] {} -> {}: identity exchange failed, disconnecting channel: {}",
local_hex, peer_hex, e
);
self.transport.disconnect_channel(&channel_id).await;
self.record_peer_failure(peer_id).await;
self.identity_failure_cache.record_failure(*peer_id);
PendingDialOutcome::IdentityFailed { err: e.to_string() }
}
}
}
fn sweep_expired_operations(&self) {
let mut ops = match self.active_operations.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(
"active_operations mutex poisoned in sweep_expired_operations, recovering"
);
poisoned.into_inner()
}
};
let now = Instant::now();
ops.retain(|id, ctx| {
let expired = now.duration_since(ctx.started_at) > ctx.timeout * 2;
if expired {
warn!(
"Sweeping expired DHT operation {id} (age {:?}, timeout {:?})",
now.duration_since(ctx.started_at),
ctx.timeout
);
}
!expired
});
}
async fn send_dht_request(
&self,
peer_id: &PeerId,
operation: DhtNetworkOperation,
candidates: Option<&[(MultiAddr, AddressType)]>,
) -> Result<DhtNetworkResult> {
self.sweep_expired_operations();
let message_id = Uuid::new_v4().to_string();
let message = DhtNetworkMessage {
message_id: message_id.clone(),
source: self.config.peer_id,
target: Some(*peer_id),
message_type: DhtMessageType::Request,
payload: operation,
result: None, timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|_| {
P2PError::Network(NetworkError::ProtocolError(
"System clock error: unable to get current timestamp".into(),
))
})?
.as_secs(),
ttl: 10,
hop_count: 0,
};
let message_data = postcard::to_stdvec(&message)
.map_err(|e| P2PError::Serialization(e.to_string().into()))?;
let (response_tx, response_rx) = oneshot::channel();
let contacted_nodes = vec![*peer_id];
let operation_context = DhtOperationContext {
operation: message.payload.clone(),
peer_id: *peer_id,
started_at: Instant::now(),
timeout: self.config.request_timeout,
contacted_nodes,
response_tx: Some(response_tx),
};
if let Ok(mut ops) = self.active_operations.lock() {
ops.insert(message_id.clone(), operation_context);
}
debug!(
"[STEP 1] {} -> {}: Sending {:?} request (msg_id: {})",
self.config.peer_id.to_hex(),
peer_id.to_hex(),
message.payload,
message_id
);
let candidate_addresses: Vec<(MultiAddr, AddressType)> = if let Some(provided) = candidates
{
provided.to_vec()
} else {
self.peer_addresses_for_dial_typed(peer_id).await
};
if let Err(e) = self
.ensure_peer_channel(peer_id, &candidate_addresses)
.await
{
if let Ok(mut ops) = self.active_operations.lock() {
ops.remove(&message_id);
}
return Err(e);
}
let result = match self
.transport
.send_message(peer_id, "/dht/1.0.0", message_data)
.await
{
Ok(_) => {
debug!(
"[STEP 2] {} -> {}: Message sent successfully, waiting for response...",
self.config.peer_id.to_hex(),
peer_id.to_hex()
);
let result = self
.wait_for_response(&message_id, response_rx, peer_id)
.await;
match &result {
Ok(r) => debug!(
"[STEP 6] {} <- {}: Got response: {:?}",
self.config.peer_id.to_hex(),
peer_id.to_hex(),
std::mem::discriminant(r)
),
Err(e) => warn!(
"[STEP 6 FAILED] {} <- {}: Response error: {}",
self.config.peer_id.to_hex(),
peer_id.to_hex(),
e
),
}
result
}
Err(e) => {
warn!(
"[STEP 1 FAILED] Failed to send DHT request to {}: {}",
peer_id.to_hex(),
e
);
Err(e)
}
};
if let Ok(mut ops) = self.active_operations.lock() {
ops.remove(&message_id);
}
if result.is_err() {
self.record_peer_failure(peer_id).await;
}
result
}
fn is_local_peer_id(&self, peer_id: &PeerId) -> bool {
*peer_id == self.config.peer_id
}
async fn canonical_app_peer_id(&self, peer_id: &PeerId) -> Option<PeerId> {
if self.transport.is_known_app_peer_id(peer_id).await {
return Some(*peer_id);
}
if self.transport.is_peer_connected(peer_id).await {
return Some(*peer_id);
}
None
}
async fn dial_candidate(&self, peer_id: &PeerId, address: &MultiAddr) -> Option<String> {
let peer_hex = peer_id.to_hex();
if address.ip().is_some_and(|ip| ip.is_unspecified()) {
debug!(
"dial_candidate: rejecting unspecified address for {}: {}",
peer_hex, address
);
return None;
}
let dial_timeout = self
.transport
.connection_timeout()
.min(self.config.request_timeout);
match tokio::time::timeout(dial_timeout, self.transport.connect_peer(address)).await {
Ok(Ok(channel_id)) => {
debug!(
"dial_candidate: connected to {} at {} (channel {})",
peer_hex, address, channel_id
);
Some(channel_id)
}
Ok(Err(e)) => {
debug!(
"dial_candidate: failed to connect to {} at {}: {}",
peer_hex, address, e
);
None
}
Err(_) => {
debug!(
"dial_candidate: timeout connecting to {} at {} (>{:?})",
peer_hex, address, dial_timeout
);
None
}
}
}
pub(crate) async fn peer_addresses_for_dial(&self, peer_id: &PeerId) -> Vec<MultiAddr> {
self.peer_addresses_for_dial_typed(peer_id)
.await
.into_iter()
.map(|(addr, _ty)| addr)
.collect()
}
pub(crate) async fn peer_addresses_for_dial_typed(
&self,
peer_id: &PeerId,
) -> Vec<(MultiAddr, AddressType)> {
let typed = self
.dht
.read()
.await
.get_node_addresses_typed(peer_id)
.await;
if !typed.is_empty() {
return Self::dialable_addresses_typed(&typed);
}
if let Some(info) = self.transport.peer_info(peer_id).await {
return info
.addresses
.into_iter()
.filter(Self::is_dialable)
.map(|a| (a, AddressType::Unverified))
.collect();
}
Vec::new()
}
fn dialable_addresses_typed(
typed: &[(MultiAddr, AddressType)],
) -> Vec<(MultiAddr, AddressType)> {
let mut candidates: Vec<(MultiAddr, AddressType)> = typed
.iter()
.filter(|pair| Self::is_dialable(&pair.0))
.cloned()
.collect();
candidates.sort_by_key(|pair| pair.1.priority());
candidates
}
fn select_dial_candidates(typed: &[(MultiAddr, AddressType)]) -> Vec<(MultiAddr, AddressType)> {
let dialable: Vec<(MultiAddr, AddressType)> = typed
.iter()
.filter(|pair| Self::is_dialable(&pair.0))
.cloned()
.collect();
let relay = dialable
.iter()
.find(|(_, t)| *t == AddressType::Relay)
.cloned();
let direct = dialable
.iter()
.find(|(_, t)| *t == AddressType::Direct)
.cloned();
let other = dialable
.iter()
.filter(|(_, t)| !matches!(*t, AddressType::Relay | AddressType::Direct))
.min_by_key(|(_, t)| t.priority())
.cloned();
match (relay, direct, other) {
(Some(r), Some(d), _) => vec![r, d],
(Some(r), None, _) => vec![r],
(None, Some(d), Some(o)) => vec![d, o],
(None, Some(d), None) => vec![d],
(None, None, Some(o)) => vec![o],
(None, None, None) => Vec::new(),
}
}
async fn wait_for_response(
&self,
_message_id: &str,
response_rx: oneshot::Receiver<(PeerId, DhtNetworkResult)>,
_peer_id: &PeerId,
) -> Result<DhtNetworkResult> {
let response_timeout = self.config.request_timeout;
match tokio::time::timeout(response_timeout, response_rx).await {
Ok(Ok((_source, result))) => Ok(result),
Ok(Err(_recv_error)) => {
Err(P2PError::Network(NetworkError::ProtocolError(
"Response channel closed unexpectedly".into(),
)))
}
Err(_timeout) => Err(P2PError::Network(NetworkError::Timeout)),
}
}
pub async fn handle_dht_message(
&self,
data: &[u8],
sender: &PeerId,
) -> Result<Option<Vec<u8>>> {
if data.len() > MAX_MESSAGE_SIZE {
warn!(
"Rejecting oversized DHT message from {sender}: {} bytes (max: {MAX_MESSAGE_SIZE})",
data.len()
);
return Err(P2PError::Validation(
format!(
"Message size {} bytes exceeds maximum allowed size of {MAX_MESSAGE_SIZE} bytes",
data.len()
)
.into(),
));
}
let message: DhtNetworkMessage = postcard::from_bytes(data)
.map_err(|e| P2PError::Serialization(e.to_string().into()))?;
debug!(
"[STEP 3] {}: Received {:?} from {} (msg_id: {})",
self.config.peer_id.to_hex(),
message.message_type,
sender,
message.message_id
);
self.update_peer_info(*sender, &message).await;
match message.message_type {
DhtMessageType::Request => {
debug!(
"[STEP 3a] {}: Processing {:?} request from {}",
self.config.peer_id.to_hex(),
message.payload,
sender
);
let result = self.handle_dht_request(&message, sender).await?;
debug!(
"[STEP 4] {}: Sending response {:?} back to {} (msg_id: {})",
self.config.peer_id.to_hex(),
std::mem::discriminant(&result),
sender,
message.message_id
);
let response = self.create_response_message(&message, result)?;
Ok(Some(postcard::to_stdvec(&response).map_err(|e| {
P2PError::Serialization(e.to_string().into())
})?))
}
DhtMessageType::Response => {
debug!(
"[STEP 5] {}: Received response from {} (msg_id: {})",
self.config.peer_id.to_hex(),
sender,
message.message_id
);
self.handle_dht_response(&message, sender).await?;
Ok(None)
}
DhtMessageType::Broadcast => {
self.handle_dht_broadcast(&message).await?;
Ok(None)
}
DhtMessageType::Error => {
warn!("Received DHT error message: {:?}", message);
Ok(None)
}
}
}
async fn handle_dht_request(
&self,
message: &DhtNetworkMessage,
authenticated_sender: &PeerId,
) -> Result<DhtNetworkResult> {
match &message.payload {
DhtNetworkOperation::FindNode { key } => {
debug!("Handling FIND_NODE request for key: {}", hex::encode(key));
self.handle_find_node_request(key, authenticated_sender)
.await
}
DhtNetworkOperation::Ping => {
debug!("Handling PING request from: {}", authenticated_sender);
Ok(DhtNetworkResult::PongReceived {
responder: self.config.peer_id,
latency: Duration::from_millis(0), })
}
DhtNetworkOperation::Join => {
debug!("Handling JOIN request from: {}", authenticated_sender);
let dht_key = *authenticated_sender.as_bytes();
debug!("Node {} joined the network", authenticated_sender);
Ok(DhtNetworkResult::JoinSuccess {
assigned_key: dht_key,
bootstrap_peers: 1,
})
}
DhtNetworkOperation::Leave => {
debug!("Handling LEAVE request from: {}", authenticated_sender);
Ok(DhtNetworkResult::LeaveSuccess)
}
DhtNetworkOperation::PublishAddressSet { seq, addresses } => {
info!(
"Handling PUBLISH_ADDRESS_SET from {}: seq={} addrs={}",
authenticated_sender,
seq,
addresses.len()
);
let dht = self.dht.read().await;
dht.replace_node_addresses(authenticated_sender, addresses.clone(), *seq)
.await;
Ok(DhtNetworkResult::PublishAddressAck)
}
}
}
#[allow(dead_code)]
pub async fn send_request(
&self,
peer_id: &PeerId,
operation: DhtNetworkOperation,
) -> Result<DhtNetworkResult> {
self.send_dht_request(peer_id, operation, None).await
}
async fn handle_dht_response(
&self,
message: &DhtNetworkMessage,
sender: &PeerId,
) -> Result<()> {
let message_id = &message.message_id;
debug!("Handling DHT response for message_id: {message_id}");
let result = match &message.result {
Some(r) => r.clone(),
None => {
warn!("DHT response message {message_id} has no result field");
return Ok(());
}
};
let Some(sender_app_id) = self.canonical_app_peer_id(sender).await else {
warn!(
"Rejecting DHT response for {message_id}: sender {} has no authenticated app identity",
sender
);
return Ok(());
};
let Ok(mut ops) = self.active_operations.lock() else {
warn!("active_operations mutex poisoned");
return Ok(());
};
if let Some(context) = ops.get_mut(message_id) {
let source_authorized = context.peer_id == sender_app_id
|| context.contacted_nodes.contains(&sender_app_id);
if !source_authorized {
warn!(
"Rejecting DHT response for {message_id}: sender app_id {} \
(transport={}) not in contacted peers (expected {} or one of {:?})",
sender_app_id.to_hex(),
sender,
context.peer_id.to_hex(),
context
.contacted_nodes
.iter()
.map(PeerId::to_hex)
.collect::<Vec<_>>()
);
return Ok(());
}
if let Some(tx) = context.response_tx.take() {
debug!(
"[STEP 5a] {}: Delivering response for msg_id {} to waiting request",
self.config.peer_id.to_hex(),
message_id
);
if tx.send((sender_app_id, result)).is_err() {
warn!(
"[STEP 5a FAILED] {}: Response channel closed for msg_id {} (receiver timed out)",
self.config.peer_id.to_hex(),
message_id
);
}
} else {
debug!(
"Response already delivered for message_id: {message_id}, ignoring duplicate"
);
}
} else {
warn!(
"[STEP 5 FAILED] {}: No active operation found for msg_id {} (may have timed out)",
self.config.peer_id.to_hex(),
message_id
);
}
Ok(())
}
async fn handle_dht_broadcast(&self, _message: &DhtNetworkMessage) -> Result<()> {
debug!("DHT broadcast handling not fully implemented yet");
Ok(())
}
fn create_response_message(
&self,
request: &DhtNetworkMessage,
result: DhtNetworkResult,
) -> Result<DhtNetworkMessage> {
let payload = match &result {
DhtNetworkResult::NodesFound { key, .. } => DhtNetworkOperation::FindNode { key: *key },
DhtNetworkResult::PongReceived { .. } => DhtNetworkOperation::Ping,
DhtNetworkResult::JoinSuccess { .. } => DhtNetworkOperation::Join,
DhtNetworkResult::LeaveSuccess => DhtNetworkOperation::Leave,
DhtNetworkResult::PublishAddressAck => DhtNetworkOperation::Ping,
DhtNetworkResult::PeerRejected => request.payload.clone(),
DhtNetworkResult::Error { .. } => {
return Err(P2PError::Dht(crate::error::DhtError::RoutingError(
"Cannot create response for error result".to_string().into(),
)));
}
};
Ok(DhtNetworkMessage {
message_id: request.message_id.clone(),
source: self.config.peer_id,
target: Some(request.source),
message_type: DhtMessageType::Response,
payload,
result: Some(result),
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|_| {
P2PError::Network(NetworkError::ProtocolError(
"System clock error: unable to get current timestamp".into(),
))
})?
.as_secs(),
ttl: request.ttl.saturating_sub(1),
hop_count: request.hop_count.saturating_add(1),
})
}
async fn update_peer_info(&self, peer_id: PeerId, _message: &DhtNetworkMessage) {
let Some(app_peer_id) = self.canonical_app_peer_id(&peer_id).await else {
debug!(
"Ignoring DHT peer update for unauthenticated transport peer {}",
peer_id
);
return;
};
let dht = self.dht.read().await;
if dht
.touch_node_typed(&app_peer_id, None, crate::dht::AddressType::Unverified)
.await
{
trace!("Touched routing table entry for {}", app_peer_id.to_hex());
}
}
async fn reconcile_connected_peers(&self) {
let connected = self.transport.connected_peers().await;
if connected.is_empty() {
return;
}
info!(
"Reconciling {} already-connected peers for DHT state",
connected.len()
);
let mut skipped = 0u32;
for peer_id in connected {
if let Some(ua) = self.transport.peer_user_agent(&peer_id).await {
self.handle_peer_connected(peer_id, &ua).await;
} else {
skipped += 1;
debug!(
"Skipping reconciliation for peer {} — user agent not yet known",
peer_id.to_hex()
);
}
}
if skipped > 0 {
info!(
"Skipped {} peers during reconciliation (user agent unknown, will arrive via PeerConnected)",
skipped
);
}
}
async fn handle_peer_connected(&self, node_id: PeerId, user_agent: &str) {
let app_peer_id_hex = node_id.to_hex();
info!(
"DHT peer connected: app_id={}, user_agent={}",
app_peer_id_hex, user_agent
);
let dht_key = *node_id.as_bytes();
let addresses: Vec<MultiAddr> = if let Some(info) = self.transport.peer_info(&node_id).await
{
info.addresses
.into_iter()
.filter(Self::is_dialable)
.collect()
} else {
warn!("peer_info unavailable for app_peer_id {}", app_peer_id_hex);
Vec::new()
};
if addresses.is_empty() {
warn!(
"Peer {} has no valid addresses, skipping DHT routing table addition",
app_peer_id_hex
);
return;
}
if !crate::network::is_dht_participant(user_agent) {
info!(
"Skipping DHT routing table for ephemeral peer {} (user_agent={})",
app_peer_id_hex, user_agent
);
} else {
let address_types = vec![crate::dht::AddressType::Unverified; addresses.len()];
let node_info = NodeInfo {
id: node_id,
addresses,
address_types,
last_seen: AtomicInstant::now(),
};
let trust_fn = |peer_id: &PeerId| -> f64 {
self.trust_engine
.as_ref()
.map(|engine| engine.score(peer_id))
.unwrap_or(DEFAULT_NEUTRAL_TRUST)
};
let add_result = self.dht.write().await.add_node(node_info, &trust_fn).await;
match add_result {
Ok(AdmissionResult::Admitted(rt_events)) => {
info!("Added peer {} to DHT routing table", app_peer_id_hex);
self.broadcast_routing_events(&rt_events);
}
Ok(AdmissionResult::StaleRevalidationNeeded {
candidate,
candidate_ips,
candidate_bucket_idx,
stale_peers,
}) => {
debug!(
"Peer {} admission deferred: {} stale peers need revalidation",
app_peer_id_hex,
stale_peers.len()
);
match self
.revalidate_and_retry_admission(
candidate,
candidate_ips,
candidate_bucket_idx,
stale_peers,
&trust_fn,
)
.await
{
Ok(rt_events) => {
info!(
"Added peer {} to DHT routing table after stale revalidation",
app_peer_id_hex
);
self.broadcast_routing_events(&rt_events);
}
Err(e) => {
warn!(
"Stale revalidation for peer {} failed: {}",
app_peer_id_hex, e
);
}
}
}
Err(e) => {
warn!(
"Failed to add peer {} to DHT routing table: {}",
app_peer_id_hex, e
);
}
}
}
if self.event_tx.receiver_count() > 0 {
let _ = self.event_tx.send(DhtNetworkEvent::PeerDiscovered {
peer_id: node_id,
dht_key,
});
}
}
async fn start_network_event_handler(&self, self_arc: Arc<Self>) -> Result<()> {
info!("Starting network event handler...");
let mut events = self.transport.subscribe_events();
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => {
info!("Network event handler shutting down");
break;
}
recv = events.recv() => {
match recv {
Ok(event) => match event {
crate::network::P2PEvent::PeerConnected(peer_id, ref user_agent) => {
self_arc.handle_peer_connected(peer_id, user_agent).await;
}
crate::network::P2PEvent::PeerDisconnected(peer_id) => {
info!(
"DHT peer fully disconnected: app_id={}",
peer_id.to_hex()
);
if self_arc.event_tx.receiver_count() > 0
&& let Err(e) = self_arc
.event_tx
.send(DhtNetworkEvent::PeerDisconnected {
peer_id,
})
{
warn!(
"Failed to send PeerDisconnected event: {}",
e
);
}
}
crate::network::P2PEvent::Message {
topic,
source,
data,
} => {
trace!(
" [EVENT] Message received: topic={}, source={:?}, {} bytes",
topic,
source,
data.len()
);
if topic == "/dht/1.0.0" {
let Some(source_peer) = source else {
warn!("Ignoring unsigned DHT message");
continue;
};
trace!(" [EVENT] Processing DHT message from {}", source_peer);
let manager_clone = Arc::clone(&self_arc);
let semaphore = Arc::clone(&self_arc.message_handler_semaphore);
tokio::spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => {
warn!("Message handler semaphore closed");
return;
}
};
match tokio::time::timeout(
REQUEST_TIMEOUT,
manager_clone.handle_dht_message(&data, &source_peer),
)
.await
{
Ok(Ok(Some(response))) => {
if let Err(e) = manager_clone
.transport
.send_message(&source_peer, "/dht/1.0.0", response)
.await
{
warn!(
"Failed to send DHT response to {}: {}",
source_peer, e
);
}
}
Ok(Ok(None)) => {
}
Ok(Err(e)) => {
warn!(
"Failed to handle DHT message from {}: {}",
source_peer, e
);
}
Err(_) => {
warn!(
"DHT message handler timed out after {:?} for peer {}: potential DoS attempt or slow processing",
REQUEST_TIMEOUT, source_peer
);
}
}
});
}
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!("Network event handler lagged, skipped {} events", skipped);
}
Err(broadcast::error::RecvError::Closed) => {
info!("Network event channel closed, stopping event handler");
break;
}
}
}
}
}
});
*self.event_handler_handle.write().await = Some(handle);
Ok(())
}
async fn revalidate_and_retry_admission(
&self,
candidate: NodeInfo,
candidate_ips: Vec<IpAddr>,
bucket_idx: usize,
stale_peers: Vec<(PeerId, usize)>,
trust_fn: &impl Fn(&PeerId) -> f64,
) -> anyhow::Result<Vec<RoutingTableEvent>> {
if stale_peers.is_empty() {
return Err(anyhow::anyhow!("no stale peers to revalidate"));
}
let _permit = self
.revalidation_semaphore
.clone()
.try_acquire_owned()
.map_err(|_| anyhow::anyhow!("global revalidation limit reached"))?;
{
let mut active = self.bucket_revalidation_active.lock();
if active.contains(&bucket_idx) {
return Err(anyhow::anyhow!(
"revalidation already in progress for bucket {bucket_idx}"
));
}
active.insert(bucket_idx);
}
let _bucket_guard = BucketRevalidationGuard {
active: self.bucket_revalidation_active.clone(),
bucket_idx,
};
let mut evicted_peers = Vec::new();
let mut retained_peers = Vec::new();
for chunk in stale_peers.chunks(MAX_CONCURRENT_REVALIDATION_PINGS) {
let results = futures::future::join_all(chunk.iter().map(|(peer_id, _)| async {
let responded = self.ping_with_identity_confirmation(peer_id).await;
(*peer_id, responded)
}))
.await;
for (peer_id, responded) in results {
if responded {
retained_peers.push(peer_id);
} else {
evicted_peers.push(peer_id);
}
}
}
if evicted_peers.is_empty() {
return Err(anyhow::anyhow!(
"all stale peers responded — no room for candidate"
));
}
let mut dht = self.dht.write().await;
let mut all_events = Vec::new();
for peer_id in &evicted_peers {
let removal_events = dht.remove_node_by_id(peer_id).await;
all_events.extend(removal_events);
}
let admission_events = dht
.re_evaluate_admission(candidate, &candidate_ips, trust_fn)
.await?;
all_events.extend(admission_events);
Ok(all_events)
}
async fn ping_peer(&self, peer_id: &PeerId) -> anyhow::Result<()> {
self.send_dht_request(peer_id, DhtNetworkOperation::Ping, None)
.await
.map(|_| ())
.context("ping failed")
}
async fn ping_with_identity_confirmation(&self, peer_id: &PeerId) -> bool {
let ping_ok = tokio::time::timeout(STALE_REVALIDATION_BUDGET, self.ping_peer(peer_id))
.await
.is_ok_and(|r| r.is_ok());
if !ping_ok {
return false;
}
self.transport.is_known_app_peer_id(peer_id).await
}
async fn revalidate_stale_k_closest(&self) {
let stale_peers = {
let dht = self.dht.read().await;
dht.stale_k_closest().await
};
if stale_peers.is_empty() {
return;
}
debug!("Revalidating {} stale K-closest peer(s)", stale_peers.len());
let mut non_responders = Vec::new();
for chunk in stale_peers.chunks(MAX_CONCURRENT_REVALIDATION_PINGS) {
let results = futures::future::join_all(chunk.iter().map(|peer_id| async {
let responded = self.ping_with_identity_confirmation(peer_id).await;
(*peer_id, responded)
}))
.await;
for (peer_id, responded) in results {
if !responded {
non_responders.push(peer_id);
}
}
}
if non_responders.is_empty() {
debug!("All stale K-closest peers responded — no evictions");
return;
}
let all_events = {
let mut dht = self.dht.write().await;
let mut events = Vec::new();
for peer_id in &non_responders {
events.extend(dht.remove_node_by_id(peer_id).await);
}
events
};
self.broadcast_routing_events(&all_events);
info!("Evicted {} offline K-closest peer(s)", non_responders.len());
}
fn broadcast_routing_events(&self, events: &[RoutingTableEvent]) {
if self.event_tx.receiver_count() == 0 {
return;
}
for event in events {
match event {
RoutingTableEvent::PeerAdded(id) => {
let _ = self
.event_tx
.send(DhtNetworkEvent::PeerAdded { peer_id: *id });
}
RoutingTableEvent::PeerRemoved(id) => {
let _ = self
.event_tx
.send(DhtNetworkEvent::PeerRemoved { peer_id: *id });
}
RoutingTableEvent::KClosestPeersChanged { old, new } => {
let _ = self.event_tx.send(DhtNetworkEvent::KClosestPeersChanged {
old: old.clone(),
new: new.clone(),
});
}
}
}
}
pub async fn touch_node(&self, peer_id: &PeerId, address: Option<&MultiAddr>) -> bool {
let dht = self.dht.read().await;
dht.touch_node(peer_id, address).await
}
pub async fn touch_node_typed(
&self,
peer_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: crate::dht::AddressType,
) -> bool {
let dht = self.dht.read().await;
dht.touch_node_typed(peer_id, address, addr_type).await
}
pub async fn merge_gossiped_typed_addresses(&self, node: &DHTNode) {
let dht = self.dht.read().await;
for (addr, ty) in node.typed_addresses() {
dht.merge_typed_address_upgrade_only(&node.peer_id, &addr, ty)
.await;
}
}
pub async fn get_stats(&self) -> DhtNetworkStats {
self.stats.read().await.clone()
}
pub fn emit_event(&self, event: DhtNetworkEvent) {
if self.event_tx.receiver_count() > 0 {
let _ = self.event_tx.send(event);
}
}
pub fn subscribe_events(&self) -> broadcast::Receiver<DhtNetworkEvent> {
self.event_tx.subscribe()
}
pub async fn get_connected_peers(&self) -> Vec<PeerId> {
self.transport.connected_peers().await
}
pub async fn get_routing_table_size(&self) -> usize {
self.dht.read().await.routing_table_size().await
}
pub async fn is_in_routing_table(&self, peer_id: &PeerId) -> bool {
let dht_guard = self.dht.read().await;
dht_guard.has_node(peer_id).await
}
pub async fn routing_table_peers(&self) -> Vec<DHTNode> {
let dht_guard = self.dht.read().await;
let nodes = dht_guard.all_nodes().await;
drop(dht_guard);
nodes
.into_iter()
.map(|node| {
let reliability = self
.trust_engine
.as_ref()
.map(|engine| engine.score(&node.id))
.unwrap_or(DEFAULT_NEUTRAL_TRUST);
DHTNode {
peer_id: node.id,
address_types: node.address_types,
addresses: node.addresses,
distance: None,
reliability,
}
})
.collect()
}
pub fn peer_id(&self) -> &PeerId {
&self.config.peer_id
}
pub async fn publish_address_set_to_peers(
&self,
typed_addresses: Vec<(crate::MultiAddr, AddressType)>,
peers: &[DHTNode],
) {
let seq = Self::next_publish_seq();
let op = DhtNetworkOperation::PublishAddressSet {
seq,
addresses: typed_addresses.clone(),
};
for peer in peers {
if peer.peer_id == self.config.peer_id {
continue; }
let peer_typed = peer.typed_addresses();
match self
.send_dht_request(&peer.peer_id, op.clone(), Some(&peer_typed))
.await
{
Ok(_) => {
debug!(
peer = %peer.peer_id.to_hex(),
addrs = typed_addresses.len(),
seq,
"published address set to peer",
);
}
Err(e) => {
debug!(
"Failed to publish address set to peer {}: {}",
peer.peer_id.to_hex(),
e
);
}
}
}
}
fn next_publish_seq() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0)
}
pub fn local_addr(&self) -> Option<MultiAddr> {
self.transport.local_addr()
}
pub async fn connect_to_peer(&self, address: &MultiAddr) -> Result<String> {
self.transport.connect_peer(address).await
}
pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
&self.transport
}
pub fn trust_engine(&self) -> Option<Arc<TrustEngine>> {
self.trust_engine.clone()
}
}
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 15;
const ITERATION_GRACE_TIMEOUT_SECS: u64 = 5;
const DEFAULT_MAX_CONCURRENT_OPS: usize = 100;
impl Default for DhtNetworkConfig {
fn default() -> Self {
Self {
peer_id: PeerId::from_bytes([0u8; 32]),
node_config: NodeConfig::default(),
request_timeout: Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS),
max_concurrent_operations: DEFAULT_MAX_CONCURRENT_OPS,
enable_security: true,
swap_threshold: 0.0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn is_dialable_accepts_quic_with_routable_ip() {
let quic = MultiAddr::quic("203.0.113.7:9000".parse().unwrap());
assert!(DhtNetworkManager::is_dialable(&quic));
}
#[test]
fn stale_revalidation_budget_covers_identity_exchange_plus_ping_rtt() {
assert_eq!(
STALE_REVALIDATION_BUDGET,
IDENTITY_EXCHANGE_TIMEOUT + STALE_REVALIDATION_PING_RTT,
);
}
#[test]
fn is_dialable_rejects_non_quic_transports() {
let ble = MultiAddr::new(crate::address::TransportAddr::Ble {
mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
psm: 0x0025,
});
let tcp = MultiAddr::tcp("10.0.0.1:80".parse().unwrap());
let lora = MultiAddr::new(crate::address::TransportAddr::LoRa {
dev_addr: [0xDE, 0xAD, 0xBE, 0xEF],
freq_hz: 868_000_000,
});
assert!(!DhtNetworkManager::is_dialable(&ble));
assert!(!DhtNetworkManager::is_dialable(&tcp));
assert!(!DhtNetworkManager::is_dialable(&lora));
}
#[test]
fn is_dialable_rejects_unspecified_ip() {
let unspecified = MultiAddr::quic("0.0.0.0:9000".parse().unwrap());
assert!(!DhtNetworkManager::is_dialable(&unspecified));
}
#[test]
fn is_dialable_accepts_loopback() {
let loopback = MultiAddr::quic("127.0.0.1:9000".parse().unwrap());
assert!(DhtNetworkManager::is_dialable(&loopback));
}
fn dht_node(seed: u8, entries: Vec<(&str, AddressType)>) -> DHTNode {
let (addresses, address_types): (Vec<MultiAddr>, Vec<AddressType>) = entries
.into_iter()
.map(|(s, t)| (s.parse().unwrap(), t))
.unzip();
DHTNode {
peer_id: PeerId::from_bytes([seed; 32]),
addresses,
address_types,
distance: None,
reliability: 1.0,
}
}
#[test]
fn merge_from_adds_relay_entry_to_existing_unverified() {
let mut existing = dht_node(
1,
vec![("/ip4/192.0.2.10/udp/10003/quic", AddressType::Unverified)],
);
let incoming = dht_node(
1,
vec![("/ip4/198.51.100.7/udp/44100/quic", AddressType::Relay)],
);
existing.merge_from(incoming);
assert_eq!(existing.addresses.len(), 2);
assert_eq!(existing.address_types[0], AddressType::Relay);
assert_eq!(existing.address_types[1], AddressType::Unverified);
assert_eq!(
existing.addresses[0],
"/ip4/198.51.100.7/udp/44100/quic"
.parse::<MultiAddr>()
.unwrap()
);
}
#[test]
fn merge_from_upgrades_existing_tag_but_never_demotes() {
let addr = "/ip4/198.51.100.7/udp/44100/quic";
let mut existing = dht_node(1, vec![(addr, AddressType::Relay)]);
let incoming_demotion = dht_node(1, vec![(addr, AddressType::Unverified)]);
existing.merge_from(incoming_demotion);
assert_eq!(existing.addresses.len(), 1);
assert_eq!(existing.address_types[0], AddressType::Relay);
let mut existing = dht_node(1, vec![(addr, AddressType::Unverified)]);
let incoming_promotion = dht_node(1, vec![(addr, AddressType::Direct)]);
existing.merge_from(incoming_promotion);
assert_eq!(existing.addresses.len(), 1);
assert_eq!(existing.address_types[0], AddressType::Direct);
}
#[test]
fn merge_from_dedupes_identical_relay_entry() {
let addr = "/ip4/198.51.100.7/udp/44100/quic";
let mut existing = dht_node(1, vec![(addr, AddressType::Relay)]);
let incoming = dht_node(1, vec![(addr, AddressType::Relay)]);
existing.merge_from(incoming);
assert_eq!(existing.addresses.len(), 1);
assert_eq!(existing.address_types[0], AddressType::Relay);
}
fn peer_with_leading(byte: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = byte;
PeerId::from_bytes(bytes)
}
fn report(subject_byte: u8, addr_str: &str, ty: AddressType) -> DHTNode {
DHTNode {
peer_id: peer_with_leading(subject_byte),
addresses: vec![addr_str.parse().unwrap()],
address_types: vec![ty],
distance: None,
reliability: 1.0,
}
}
#[test]
fn winner_empty_reports_returns_none() {
let reports = SubjectReports::new();
assert!(compute_winner(&peer_with_leading(0x01), &reports).is_none());
}
#[test]
fn winner_self_report_locks_in() {
let subject = peer_with_leading(0x01);
let close_third_party = peer_with_leading(0x02);
let mut reports = SubjectReports::new();
reports.insert(
subject,
report(0x01, "/ip4/10.0.0.1/udp/9000/quic", AddressType::Direct),
);
reports.insert(
close_third_party,
report(0x01, "/ip4/198.51.100.1/udp/9000/quic", AddressType::Relay),
);
let (winner_rid, _node) = compute_winner(&subject, &reports).unwrap();
assert_eq!(winner_rid, subject);
}
#[test]
fn winner_single_third_party_wins_by_default() {
let subject = peer_with_leading(0xF0);
let responder = peer_with_leading(0x10);
let mut reports = SubjectReports::new();
reports.insert(
responder,
report(0xF0, "/ip4/1.1.1.1/udp/9000/quic", AddressType::Direct),
);
let (winner_rid, _) = compute_winner(&subject, &reports).unwrap();
assert_eq!(winner_rid, responder);
}
#[test]
fn winner_quorum_consensus_beats_closer_dissenter() {
let subject = peer_with_leading(0xF0);
let dissenter = peer_with_leading(0xF1); let agree_1 = peer_with_leading(0xF2);
let agree_2 = peer_with_leading(0xF3);
let mut reports = SubjectReports::new();
reports.insert(
dissenter,
report(0xF0, "/ip4/6.6.6.6/udp/9000/quic", AddressType::Direct),
);
let consensus_addr = "/ip4/1.1.1.1/udp/9000/quic";
reports.insert(agree_1, report(0xF0, consensus_addr, AddressType::Direct));
reports.insert(agree_2, report(0xF0, consensus_addr, AddressType::Direct));
let (winner_rid, winner_node) = compute_winner(&subject, &reports).unwrap();
assert!(winner_rid == agree_1 || winner_rid == agree_2);
assert_eq!(
winner_node.addresses[0],
consensus_addr.parse::<MultiAddr>().unwrap()
);
}
#[test]
fn winner_no_quorum_falls_back_to_closest_xor() {
let subject = peer_with_leading(0xF0);
let closest = peer_with_leading(0xF1);
let mid = peer_with_leading(0xF4);
let far = peer_with_leading(0xFF);
let mut reports = SubjectReports::new();
reports.insert(
closest,
report(0xF0, "/ip4/1.1.1.1/udp/9000/quic", AddressType::Direct),
);
reports.insert(
mid,
report(0xF0, "/ip4/2.2.2.2/udp/9000/quic", AddressType::Direct),
);
reports.insert(
far,
report(0xF0, "/ip4/3.3.3.3/udp/9000/quic", AddressType::Direct),
);
let (winner_rid, _) = compute_winner(&subject, &reports).unwrap();
assert_eq!(winner_rid, closest);
}
#[test]
fn winner_outlier_quorum_of_two_from_three() {
let subject = peer_with_leading(0xF0);
let r_a = peer_with_leading(0xF1);
let r_b = peer_with_leading(0xF2);
let r_c = peer_with_leading(0xF3);
let agree = "/ip4/9.9.9.9/udp/9000/quic";
let mut reports = SubjectReports::new();
reports.insert(r_a, report(0xF0, agree, AddressType::Direct));
reports.insert(
r_b,
report(0xF0, "/ip4/8.8.8.8/udp/9000/quic", AddressType::Direct),
);
reports.insert(r_c, report(0xF0, agree, AddressType::Direct));
let (winner_rid, winner_node) = compute_winner(&subject, &reports).unwrap();
assert!(winner_rid == r_a || winner_rid == r_c);
assert_eq!(
winner_node.addresses[0],
agree.parse::<MultiAddr>().unwrap()
);
}
#[test]
fn best_tier_priority_picks_strongest_tag() {
let node = dht_node(
1,
vec![
("/ip4/10.0.0.1/udp/9000/quic", AddressType::Unverified),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Relay),
("/ip4/192.0.2.9/udp/9002/quic", AddressType::Direct),
],
);
assert_eq!(best_tier_priority(&node), AddressType::Relay.priority());
}
#[test]
fn best_tier_priority_empty_node_returns_max() {
let node = DHTNode {
peer_id: PeerId::from_bytes([1u8; 32]),
addresses: vec![],
address_types: vec![],
distance: None,
reliability: 1.0,
};
assert_eq!(best_tier_priority(&node), u8::MAX);
}
#[test]
fn first_direct_dialable_picks_direct_over_relay() {
let node = dht_node(
1,
vec![
("/ip4/10.0.0.1/udp/9000/quic", AddressType::Relay),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
],
);
let picked = DhtNetworkManager::first_direct_dialable(&node).unwrap();
assert_eq!(
picked,
"/ip4/203.0.113.7/udp/9001/quic"
.parse::<MultiAddr>()
.unwrap()
);
}
#[test]
fn first_direct_dialable_returns_none_when_only_relay() {
let node = dht_node(1, vec![("/ip4/10.0.0.1/udp/9000/quic", AddressType::Relay)]);
assert_eq!(DhtNetworkManager::first_direct_dialable(&node), None);
}
#[test]
fn first_direct_dialable_skips_wildcard_direct() {
let node = dht_node(
1,
vec![
("/ip4/0.0.0.0/udp/9000/quic", AddressType::Direct),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
],
);
let picked = DhtNetworkManager::first_direct_dialable(&node).unwrap();
assert_eq!(
picked,
"/ip4/203.0.113.7/udp/9001/quic"
.parse::<MultiAddr>()
.unwrap()
);
}
#[test]
fn first_direct_dialable_returns_none_for_empty_node() {
let node = DHTNode {
peer_id: PeerId::from_bytes([1u8; 32]),
addresses: vec![],
address_types: vec![],
distance: None,
reliability: 1.0,
};
assert_eq!(DhtNetworkManager::first_direct_dialable(&node), None);
}
#[test]
fn first_direct_dialable_skips_natted() {
let node = dht_node(
1,
vec![
("/ip4/10.0.0.1/udp/9000/quic", AddressType::NATted),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
],
);
let picked = DhtNetworkManager::first_direct_dialable(&node).unwrap();
assert_eq!(
picked,
"/ip4/203.0.113.7/udp/9001/quic"
.parse::<MultiAddr>()
.unwrap()
);
}
#[test]
fn first_direct_dialable_skips_unverified() {
let node = dht_node(
1,
vec![
("/ip4/10.0.0.1/udp/9000/quic", AddressType::Unverified),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
],
);
let picked = DhtNetworkManager::first_direct_dialable(&node).unwrap();
assert_eq!(
picked,
"/ip4/203.0.113.7/udp/9001/quic"
.parse::<MultiAddr>()
.unwrap()
);
}
#[test]
fn first_direct_dialable_returns_none_when_only_unverified() {
let node = dht_node(
1,
vec![("/ip4/10.0.0.1/udp/9000/quic", AddressType::Unverified)],
);
assert_eq!(DhtNetworkManager::first_direct_dialable(&node), None);
}
#[test]
fn first_direct_dialable_rejects_legacy_untagged_addresses() {
let node = DHTNode {
peer_id: PeerId::from_bytes([1u8; 32]),
addresses: vec!["/ip4/203.0.113.7/udp/9001/quic".parse().unwrap()],
address_types: vec![], distance: None,
reliability: 1.0,
};
assert_eq!(DhtNetworkManager::first_direct_dialable(&node), None);
}
fn typed(entries: Vec<(&str, AddressType)>) -> Vec<(MultiAddr, AddressType)> {
entries
.into_iter()
.map(|(s, t)| (s.parse().unwrap(), t))
.collect()
}
#[test]
fn select_dial_candidates_returns_empty_for_empty_input() {
let picks = DhtNetworkManager::select_dial_candidates(&[]);
assert!(picks.is_empty());
}
#[test]
fn select_dial_candidates_relay_plus_direct_gives_two() {
let addrs = typed(vec![
("/ip4/198.51.100.1/udp/9000/quic", AddressType::Relay),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
("/ip4/192.0.2.9/udp/9002/quic", AddressType::Unverified),
("/ip4/192.0.2.10/udp/9003/quic", AddressType::NATted),
]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 2);
assert_eq!(picks[0].1, AddressType::Relay);
assert_eq!(picks[1].1, AddressType::Direct);
}
#[test]
fn select_dial_candidates_relay_only_without_direct_is_one() {
let addrs = typed(vec![
("/ip4/198.51.100.1/udp/9000/quic", AddressType::Relay),
("/ip4/192.0.2.9/udp/9002/quic", AddressType::Unverified),
("/ip4/192.0.2.10/udp/9003/quic", AddressType::NATted),
]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 1);
assert_eq!(picks[0].1, AddressType::Relay);
}
#[test]
fn select_dial_candidates_direct_plus_unverified_when_no_relay() {
let addrs = typed(vec![
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
("/ip4/192.0.2.9/udp/9002/quic", AddressType::Unverified),
("/ip4/192.0.2.10/udp/9003/quic", AddressType::NATted),
]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 2);
assert_eq!(picks[0].1, AddressType::Direct);
assert_eq!(picks[1].1, AddressType::Unverified);
}
#[test]
fn select_dial_candidates_direct_only_is_one() {
let addrs = typed(vec![(
"/ip4/203.0.113.7/udp/9001/quic",
AddressType::Direct,
)]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 1);
assert_eq!(picks[0].1, AddressType::Direct);
}
#[test]
fn select_dial_candidates_only_unverified_is_one() {
let addrs = typed(vec![
("/ip4/192.0.2.9/udp/9002/quic", AddressType::Unverified),
("/ip4/192.0.2.11/udp/9004/quic", AddressType::Unverified),
("/ip4/192.0.2.10/udp/9003/quic", AddressType::NATted),
]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 1);
assert_eq!(picks[0].1, AddressType::Unverified);
}
#[test]
fn select_dial_candidates_only_natted_is_one() {
let addrs = typed(vec![("/ip4/192.0.2.10/udp/9003/quic", AddressType::NATted)]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 1);
assert_eq!(picks[0].1, AddressType::NATted);
}
#[test]
fn select_dial_candidates_filters_undialable_wildcard() {
let addrs = typed(vec![
("/ip4/0.0.0.0/udp/9000/quic", AddressType::Direct),
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 1);
assert_eq!(picks[0].1, AddressType::Direct);
assert_eq!(
picks[0].0,
"/ip4/203.0.113.7/udp/9001/quic"
.parse::<MultiAddr>()
.unwrap()
);
}
#[test]
fn select_dial_candidates_prefers_unverified_over_natted() {
let addrs = typed(vec![
("/ip4/203.0.113.7/udp/9001/quic", AddressType::Direct),
("/ip4/192.0.2.10/udp/9003/quic", AddressType::NATted),
("/ip4/192.0.2.9/udp/9002/quic", AddressType::Unverified),
]);
let picks = DhtNetworkManager::select_dial_candidates(&addrs);
assert_eq!(picks.len(), 2);
assert_eq!(picks[0].1, AddressType::Direct);
assert_eq!(picks[1].1, AddressType::Unverified);
}
fn sock(s: &str) -> SocketAddr {
s.parse().unwrap()
}
#[test]
fn dial_failure_cache_records_and_checks() {
let cache = DialFailureCache::new();
let addr = sock("203.0.113.7:9001");
assert!(!cache.is_failed(&addr), "empty cache never reports failed");
cache.record_failure(addr);
assert!(
cache.is_failed(&addr),
"recorded address must be treated as failed within the TTL"
);
}
#[test]
fn dial_failure_cache_clear_removes_entry() {
let cache = DialFailureCache::new();
let addr = sock("203.0.113.7:9001");
cache.record_failure(addr);
cache.clear(&addr);
assert!(
!cache.is_failed(&addr),
"clear() must drop the entry so a subsequent dial is allowed"
);
}
#[test]
fn dial_failure_cache_expires_stale_entries_on_read() {
let cache = DialFailureCache::new();
let addr = sock("203.0.113.7:9001");
let Some(stale) =
Instant::now().checked_sub(DIAL_FAILURE_CACHE_TTL + Duration::from_secs(1))
else {
eprintln!(
"skipping: runner Instant is fresher than DIAL_FAILURE_CACHE_TTL ({DIAL_FAILURE_CACHE_TTL:?})"
);
return;
};
cache.entries.insert(addr, stale);
assert!(
!cache.is_failed(&addr),
"stale entry must not suppress a fresh dial"
);
assert!(
cache.entries.get(&addr).is_none(),
"stale entry must be evicted lazily on read"
);
}
#[test]
fn dial_failure_cache_independent_keys_do_not_collide() {
let cache = DialFailureCache::new();
let a = sock("203.0.113.7:9001");
let b = sock("203.0.113.8:9001");
cache.record_failure(a);
assert!(cache.is_failed(&a));
assert!(!cache.is_failed(&b), "different SocketAddr must not hit");
}
fn pid(byte: u8) -> PeerId {
PeerId::from_bytes([byte; 32])
}
#[test]
fn identity_failure_cache_records_and_checks() {
let cache = IdentityFailureCache::new();
let peer = pid(7);
assert!(!cache.is_failed(&peer), "empty cache never reports failed");
cache.record_failure(peer);
assert!(
cache.is_failed(&peer),
"recorded peer must be treated as failed within the TTL"
);
}
#[test]
fn identity_failure_cache_clear_removes_entry() {
let cache = IdentityFailureCache::new();
let peer = pid(7);
cache.record_failure(peer);
cache.clear(&peer);
assert!(
!cache.is_failed(&peer),
"clear() must drop the entry so a successful re-handshake is not suppressed"
);
}
#[test]
fn identity_failure_cache_expires_stale_entries_on_read() {
let cache = IdentityFailureCache::new();
let peer = pid(7);
let Some(expired) = Instant::now().checked_sub(Duration::from_secs(1)) else {
eprintln!("skipping: runner Instant epoch is too fresh to subtract from");
return;
};
cache.entries.insert(peer, expired);
assert!(
!cache.is_failed(&peer),
"expired entry must not suppress a fresh dial"
);
assert!(
cache.entries.get(&peer).is_none(),
"expired entry must be evicted lazily on read"
);
}
#[test]
fn identity_failure_cache_independent_keys_do_not_collide() {
let cache = IdentityFailureCache::new();
let a = pid(7);
let b = pid(8);
cache.record_failure(a);
assert!(cache.is_failed(&a));
assert!(!cache.is_failed(&b), "different PeerId must not hit");
}
#[test]
fn identity_failure_cache_mismatch_uses_longer_ttl_than_failure() {
let cache = IdentityFailureCache::new();
let failed_peer = pid(7);
let mismatched_peer = pid(8);
cache.record_failure(failed_peer);
cache.record_mismatch(mismatched_peer);
let failed_expiry = *cache
.entries
.get(&failed_peer)
.expect("record_failure must insert an entry")
.value();
let mismatch_expiry = *cache
.entries
.get(&mismatched_peer)
.expect("record_mismatch must insert an entry")
.value();
assert!(
mismatch_expiry > failed_expiry,
"mismatch suppression must outlast a plain identity failure"
);
assert!(cache.is_failed(&failed_peer));
assert!(cache.is_failed(&mismatched_peer));
}
#[test]
fn test_peer_rejected_round_trips_through_serialization() {
let result = DhtNetworkResult::PeerRejected;
let bytes = postcard::to_stdvec(&result).expect("serialization should succeed");
let deserialized: DhtNetworkResult =
postcard::from_bytes(&bytes).expect("deserialization should succeed");
assert!(
matches!(deserialized, DhtNetworkResult::PeerRejected),
"round-tripped result should be PeerRejected, got: {deserialized:?}"
);
}
#[test]
fn test_routing_table_ready_event_construction() {
let event = DhtNetworkEvent::RoutingTableReady { num_peers: 42 };
assert!(
matches!(event, DhtNetworkEvent::RoutingTableReady { num_peers: 42 }),
"RoutingTableReady event should carry the peer count"
);
}
#[test]
fn test_bootstrap_complete_event_construction() {
let event = DhtNetworkEvent::BootstrapComplete { num_peers: 42 };
assert!(
matches!(event, DhtNetworkEvent::BootstrapComplete { num_peers: 42 }),
"BootstrapComplete event should carry the peer count"
);
}
#[test]
fn test_k_closest_changed_event_uses_old_new_naming() {
let old = vec![PeerId::random(), PeerId::random()];
let new = vec![PeerId::random()];
let event = DhtNetworkEvent::KClosestPeersChanged {
old: old.clone(),
new: new.clone(),
};
match event {
DhtNetworkEvent::KClosestPeersChanged {
old: got_old,
new: got_new,
} => {
assert_eq!(got_old, old);
assert_eq!(got_new, new);
}
_ => panic!("expected KClosestPeersChanged"),
}
}
#[test]
fn test_peer_rejected_response_message_preserves_request_payload() {
let request = DhtNetworkMessage {
message_id: "test-123".to_string(),
source: PeerId::random(),
target: Some(PeerId::random()),
message_type: DhtMessageType::Request,
payload: DhtNetworkOperation::Ping,
result: None,
timestamp: 0,
ttl: 10,
hop_count: 0,
};
let response = DhtNetworkMessage {
message_id: request.message_id.clone(),
source: PeerId::random(),
target: Some(request.source),
message_type: DhtMessageType::Response,
payload: request.payload.clone(),
result: Some(DhtNetworkResult::PeerRejected),
timestamp: 0,
ttl: request.ttl.saturating_sub(1),
hop_count: request.hop_count.saturating_add(1),
};
let bytes = postcard::to_stdvec(&response).expect("serialize response");
let decoded: DhtNetworkMessage =
postcard::from_bytes(&bytes).expect("deserialize response");
assert!(
matches!(decoded.result, Some(DhtNetworkResult::PeerRejected)),
"response result should be PeerRejected"
);
assert!(
matches!(decoded.payload, DhtNetworkOperation::Ping),
"response should echo the request's Ping payload"
);
}
}