#![allow(missing_docs)]
use crate::{
P2PError, PeerId, Result,
adaptive::TrustEngine,
adaptive::trust::DEFAULT_NEUTRAL_TRUST,
address::MultiAddr,
dht::core_engine::{AtomicInstant, NodeInfo},
dht::{AdmissionResult, DhtCoreEngine, DhtKey, Key, RoutingTableEvent},
error::{DhtError, IdentityError, NetworkError},
network::NodeConfig,
};
use anyhow::Context as _;
use futures::stream::{FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::{RwLock, Semaphore, broadcast, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use uuid::Uuid;
const MIN_CONCURRENT_OPERATIONS: usize = 10;
const MAX_CANDIDATE_NODES: usize = 200;
const MAX_MESSAGE_SIZE: usize = 64 * 1024;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
const SELF_RELIABILITY_SCORE: f64 = 1.0;
const IDENTITY_EXCHANGE_TIMEOUT: Duration = Duration::from_secs(15);
const STALE_REVALIDATION_TIMEOUT: Duration = Duration::from_secs(1);
const MAX_CONCURRENT_REVALIDATIONS: usize = 8;
const MAX_CONCURRENT_REVALIDATION_PINGS: usize = 4;
const STALE_BUCKET_THRESHOLD: Duration = Duration::from_secs(3600);
const SELF_LOOKUP_INTERVAL_MIN: Duration = Duration::from_secs(300);
const SELF_LOOKUP_INTERVAL_MAX: Duration = Duration::from_secs(600);
const BUCKET_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
const HINT_REPUBLISH_INTERVAL_MIN: Duration = Duration::from_secs(90);
const HINT_REPUBLISH_INTERVAL_MAX: Duration = Duration::from_secs(150);
const AUTO_REBOOTSTRAP_THRESHOLD: usize = 3;
const REBOOTSTRAP_COOLDOWN: Duration = Duration::from_secs(300);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DHTNode {
pub peer_id: PeerId,
pub addresses: Vec<MultiAddr>,
pub distance: Option<Vec<u8>>,
pub reliability: f64,
}
pub type SerializableDHTNode = DHTNode;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhtNetworkConfig {
pub peer_id: PeerId,
pub node_config: NodeConfig,
pub request_timeout: Duration,
pub max_concurrent_operations: usize,
pub enable_security: bool,
pub swap_threshold: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtNetworkOperation {
FindNode { key: Key },
Ping,
Join,
Leave,
PublishAddress { addresses: Vec<crate::MultiAddr> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtNetworkResult {
NodesFound {
key: Key,
nodes: Vec<SerializableDHTNode>,
},
PongReceived {
responder: PeerId,
latency: Duration,
},
JoinSuccess {
assigned_key: Key,
bootstrap_peers: usize,
},
LeaveSuccess,
PeerRejected,
PublishAddressAck,
Error { operation: String, error: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhtNetworkMessage {
pub message_id: String,
pub source: PeerId,
pub target: Option<PeerId>,
pub message_type: DhtMessageType,
pub payload: DhtNetworkOperation,
pub result: Option<DhtNetworkResult>,
pub timestamp: u64,
pub ttl: u8,
pub hop_count: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DhtMessageType {
Request,
Response,
Broadcast,
Error,
}
pub struct DhtNetworkManager {
dht: Arc<RwLock<DhtCoreEngine>>,
transport: Arc<crate::transport_handle::TransportHandle>,
trust_engine: Option<Arc<TrustEngine>>,
config: DhtNetworkConfig,
active_operations: Arc<Mutex<HashMap<String, DhtOperationContext>>>,
event_tx: broadcast::Sender<DhtNetworkEvent>,
stats: Arc<RwLock<DhtNetworkStats>>,
message_handler_semaphore: Arc<Semaphore>,
revalidation_semaphore: Arc<Semaphore>,
bucket_revalidation_active: Arc<parking_lot::Mutex<HashSet<usize>>>,
shutdown: CancellationToken,
event_handler_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
self_lookup_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
bucket_refresh_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
hint_republish_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
last_rebootstrap: tokio::sync::Mutex<Option<Instant>>,
}
struct DhtOperationContext {
operation: DhtNetworkOperation,
peer_id: PeerId,
started_at: Instant,
timeout: Duration,
contacted_nodes: Vec<PeerId>,
response_tx: Option<oneshot::Sender<(PeerId, DhtNetworkResult)>>,
}
impl std::fmt::Debug for DhtOperationContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DhtOperationContext")
.field("operation", &self.operation)
.field("peer_id", &self.peer_id)
.field("started_at", &self.started_at)
.field("timeout", &self.timeout)
.field("contacted_nodes", &self.contacted_nodes)
.field("response_tx", &self.response_tx.is_some())
.finish()
}
}
#[derive(Debug, Clone)]
pub enum DhtNetworkEvent {
PeerDiscovered { peer_id: PeerId, dht_key: Key },
PeerDisconnected { peer_id: PeerId },
KClosestPeersChanged {
old: Vec<PeerId>,
new: Vec<PeerId>,
},
PeerAdded { peer_id: PeerId },
PeerRemoved { peer_id: PeerId },
BootstrapComplete { num_peers: usize },
OperationCompleted {
operation: String,
success: bool,
duration: Duration,
},
NetworkStatusChanged {
connected_peers: usize,
routing_table_size: usize,
},
Error { error: String },
}
#[derive(Debug, Clone, Default)]
pub struct DhtNetworkStats {
pub total_operations: u64,
pub successful_operations: u64,
pub failed_operations: u64,
pub avg_operation_latency: Duration,
pub bytes_sent: u64,
pub bytes_received: u64,
pub connected_peers: usize,
pub routing_table_size: usize,
}
struct BucketRevalidationGuard {
active: Arc<parking_lot::Mutex<HashSet<usize>>>,
bucket_idx: usize,
}
impl Drop for BucketRevalidationGuard {
fn drop(&mut self) {
self.active.lock().remove(&self.bucket_idx);
}
}
impl DhtNetworkManager {
fn new_from_components(
transport: Arc<crate::transport_handle::TransportHandle>,
trust_engine: Option<Arc<TrustEngine>>,
config: DhtNetworkConfig,
) -> Result<Self> {
let mut dht_instance = DhtCoreEngine::new(
config.peer_id,
config.node_config.dht_config.k_value,
config.node_config.allow_loopback,
config.swap_threshold,
)
.map_err(|e| P2PError::Dht(DhtError::OperationFailed(e.to_string().into())))?;
if let Some(diversity) = &config.node_config.diversity_config {
dht_instance.set_ip_diversity_config(diversity.clone());
}
let dht = Arc::new(RwLock::new(dht_instance));
let (event_tx, _) = broadcast::channel(crate::DEFAULT_EVENT_CHANNEL_CAPACITY);
let message_handler_semaphore = Arc::new(Semaphore::new(
config
.max_concurrent_operations
.max(MIN_CONCURRENT_OPERATIONS),
));
Ok(Self {
dht,
transport,
trust_engine,
config,
active_operations: Arc::new(Mutex::new(HashMap::new())),
event_tx,
stats: Arc::new(RwLock::new(DhtNetworkStats::default())),
message_handler_semaphore,
revalidation_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_REVALIDATIONS)),
bucket_revalidation_active: Arc::new(parking_lot::Mutex::new(HashSet::new())),
shutdown: CancellationToken::new(),
event_handler_handle: Arc::new(RwLock::new(None)),
self_lookup_handle: Arc::new(RwLock::new(None)),
bucket_refresh_handle: Arc::new(RwLock::new(None)),
hint_republish_handle: Arc::new(RwLock::new(None)),
last_rebootstrap: tokio::sync::Mutex::new(None),
})
}
pub fn k_value(&self) -> usize {
self.config.node_config.dht_config.k_value
}
async fn handle_find_node_request(
&self,
key: &Key,
requester: &PeerId,
) -> Result<DhtNetworkResult> {
trace!(
"FIND_NODE: resolving closer nodes for key {}",
hex::encode(key)
);
let candidate_nodes = self.find_closest_nodes_local(key, self.k_value()).await;
let closer_nodes = Self::filter_response_nodes(candidate_nodes, requester);
for node in &closer_nodes {
let hint_count = Self::extract_coordinator_hints(node).len();
let addrs: Vec<String> = node.addresses.iter().map(|a| format!("{}", a)).collect();
if hint_count > 0 {
debug!(
"FindNode response includes {} coordinator hint(s) for peer {}",
hint_count,
hex::encode(&node.peer_id.to_bytes()[..8])
);
}
debug!(
"FIND_NODE response: peer={} addresses={:?}",
node.peer_id.to_hex(),
addrs
);
}
Ok(DhtNetworkResult::NodesFound {
key: *key,
nodes: closer_nodes,
})
}
pub async fn new(
transport: Arc<crate::transport_handle::TransportHandle>,
trust_engine: Option<Arc<TrustEngine>>,
mut config: DhtNetworkConfig,
) -> Result<Self> {
let transport_app_peer_id = transport.peer_id();
if config.peer_id == PeerId::from_bytes([0u8; 32]) {
config.peer_id = transport_app_peer_id;
} else if config.peer_id != transport_app_peer_id {
warn!(
"DHT config peer_id ({}) differs from transport peer_id ({}); using config value",
config.peer_id.to_hex(),
transport_app_peer_id.to_hex()
);
}
info!(
"Creating attached DHT Network Manager for peer: {}",
config.peer_id.to_hex()
);
let manager = Self::new_from_components(transport, trust_engine, config)?;
info!("Attached DHT Network Manager created successfully");
Ok(manager)
}
pub async fn start(self: &Arc<Self>) -> Result<()> {
info!("Starting DHT Network Manager...");
self.start_network_event_handler(Arc::clone(self)).await?;
self.reconcile_connected_peers().await;
self.spawn_self_lookup_task().await;
self.spawn_bucket_refresh_task().await;
self.spawn_hint_republish_task().await;
info!("DHT Network Manager started successfully");
Ok(())
}
async fn spawn_self_lookup_task(self: &Arc<Self>) {
let this = Arc::clone(self);
let shutdown = self.shutdown.clone();
let handle_slot = Arc::clone(&self.self_lookup_handle);
let handle = tokio::spawn(async move {
loop {
let interval =
Self::randomised_interval(SELF_LOOKUP_INTERVAL_MIN, SELF_LOOKUP_INTERVAL_MAX);
tokio::select! {
() = tokio::time::sleep(interval) => {}
() = shutdown.cancelled() => break,
}
tokio::select! {
() = shutdown.cancelled() => break,
_ = async {
if let Err(e) = this.trigger_self_lookup().await {
warn!("Periodic self-lookup failed: {e}");
}
this.revalidate_stale_k_closest().await;
this.maybe_rebootstrap().await;
} => {}
}
}
});
*handle_slot.write().await = Some(handle);
}
async fn spawn_bucket_refresh_task(self: &Arc<Self>) {
let this = Arc::clone(self);
let shutdown = self.shutdown.clone();
let handle_slot = Arc::clone(&self.bucket_refresh_handle);
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = tokio::time::sleep(BUCKET_REFRESH_INTERVAL) => {}
() = shutdown.cancelled() => break,
}
let shutdown_ref = &shutdown;
tokio::select! {
() = shutdown.cancelled() => break,
_ = async {
let stale_indices = this
.dht
.read()
.await
.stale_bucket_indices(STALE_BUCKET_THRESHOLD)
.await;
if stale_indices.is_empty() {
trace!("Bucket refresh: no stale buckets");
return;
}
debug!("Bucket refresh: {} stale buckets", stale_indices.len());
let k = this.k_value();
for bucket_idx in stale_indices {
if shutdown_ref.is_cancelled() {
break;
}
let random_key = {
let dht = this.dht.read().await;
dht.generate_random_key_for_bucket(bucket_idx)
};
let Some(key) = random_key else {
continue;
};
let key_bytes: Key = *key.as_bytes();
match this.find_closest_nodes_network(&key_bytes, k).await {
Ok(nodes) => {
trace!(
"Bucket refresh[{bucket_idx}]: discovered {} peers",
nodes.len()
);
for dht_node in nodes {
if dht_node.peer_id == this.config.peer_id {
continue;
}
this.dial_addresses(&dht_node.peer_id, &dht_node.addresses, None)
.await;
}
}
Err(e) => {
debug!("Bucket refresh[{bucket_idx}] lookup failed: {e}");
}
}
}
this.maybe_rebootstrap().await;
} => {}
}
}
});
*handle_slot.write().await = Some(handle);
}
async fn spawn_hint_republish_task(self: &Arc<Self>) {
let this = Arc::clone(self);
let shutdown = self.shutdown.clone();
let handle_slot = Arc::clone(&self.hint_republish_handle);
let handle = tokio::spawn(async move {
loop {
let interval = Self::randomised_interval(
HINT_REPUBLISH_INTERVAL_MIN,
HINT_REPUBLISH_INTERVAL_MAX,
);
tokio::select! {
() = tokio::time::sleep(interval) => {}
() = shutdown.cancelled() => break,
}
tokio::select! {
() = shutdown.cancelled() => break,
_ = async {
let local_node = this.local_dht_node().await;
if local_node.addresses.is_empty() {
return;
}
let hint_count = Self::extract_coordinator_hints(&local_node).len();
if hint_count == 0 {
trace!("Hint republish: no coordinator hints to publish");
return;
}
let k = this.k_value();
let self_key: Key = *this.config.peer_id.as_bytes();
let closest = this.find_closest_nodes_local(&self_key, k).await;
let peer_count = closest.len();
let hint_addrs: Vec<String> = local_node
.addresses
.iter()
.filter(|a| {
a.peer_id()
.is_some_and(|pid| *pid != local_node.peer_id)
})
.map(|a| format!("{}", a))
.collect();
info!(
"Publishing {} coordinator hint(s) to {} connected peers: {:?}",
hint_count, peer_count, hint_addrs
);
this.publish_address_to_peers(local_node.addresses, &closest)
.await;
} => {}
}
}
});
*handle_slot.write().await = Some(handle);
}
pub async fn trigger_self_lookup(&self) -> Result<()> {
let self_id = self.config.peer_id;
let self_key: Key = *self_id.as_bytes();
let k = self.k_value();
match self.find_closest_nodes_network(&self_key, k).await {
Ok(nodes) => {
debug!("Self-lookup discovered {} peers", nodes.len());
for dht_node in nodes {
if dht_node.peer_id == self_id {
continue;
}
self.dial_addresses(&dht_node.peer_id, &dht_node.addresses, None)
.await;
}
Ok(())
}
Err(e) => {
debug!("Self-lookup failed: {e}");
Err(e)
}
}
}
async fn maybe_rebootstrap(&self) {
let rt_size = self.get_routing_table_size().await;
if rt_size >= AUTO_REBOOTSTRAP_THRESHOLD {
return;
}
{
let mut guard = self.last_rebootstrap.lock().await;
if let Some(last) = *guard
&& last.elapsed() < REBOOTSTRAP_COOLDOWN
{
trace!(
"Auto re-bootstrap skipped: cooldown ({:?} remaining)",
REBOOTSTRAP_COOLDOWN.saturating_sub(last.elapsed())
);
return;
}
*guard = Some(Instant::now());
}
info!(
"Auto re-bootstrap: routing table size ({rt_size}) below threshold ({})",
AUTO_REBOOTSTRAP_THRESHOLD
);
let connected = self.transport.connected_peers().await;
if connected.is_empty() {
debug!("Auto re-bootstrap: no connected peers to bootstrap from");
return;
}
match self.bootstrap_from_peers(&connected).await {
Ok(discovered) => {
info!("Auto re-bootstrap discovered {discovered} peers");
}
Err(e) => {
warn!("Auto re-bootstrap failed: {e}");
}
}
}
fn randomised_interval(min: Duration, max: Duration) -> Duration {
let range_secs = max.as_secs().saturating_sub(min.as_secs());
if range_secs == 0 {
return min;
}
let random_bytes = PeerId::random();
let bytes = random_bytes.to_bytes();
let random_value = u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]);
let jitter = Duration::from_secs(random_value % (range_secs + 1));
min + jitter
}
pub async fn bootstrap_from_peers(&self, peers: &[PeerId]) -> Result<usize> {
let key = *self.config.peer_id.as_bytes();
let mut seen = HashSet::new();
struct DiscoveredNode {
node: DHTNode,
referrer: Option<std::net::SocketAddr>,
}
let mut discovered: Vec<DiscoveredNode> = Vec::new();
let find_node_futures = peers.iter().map(|peer_id| async {
let bootstrap_addr = self
.peer_addresses_for_dial(peer_id)
.await
.first()
.and_then(|a| a.dialable_socket_addr());
let op = DhtNetworkOperation::FindNode { key };
let result = self.send_dht_request(peer_id, op, None).await;
(*peer_id, bootstrap_addr, result)
});
let responses = futures::future::join_all(find_node_futures).await;
for (peer_id, bootstrap_addr, result) in responses {
match result {
Ok(DhtNetworkResult::NodesFound { nodes, .. }) => {
for node in nodes {
let dialable = Self::dialable_addresses(&node.addresses);
debug!(
"DHT bootstrap: peer={} num_addresses={} dialable={}",
node.peer_id.to_hex(),
node.addresses.len(),
dialable.len()
);
if seen.insert(node.peer_id) && !dialable.is_empty() {
discovered.push(DiscoveredNode {
node,
referrer: bootstrap_addr,
});
}
}
}
Ok(_) => {}
Err(e) => {
warn!("Bootstrap FIND_NODE to {} failed: {}", peer_id.to_hex(), e);
}
}
}
info!(
"Bootstrap collected {} unique peers from {} bootstrap nodes, extracting hints before dialing",
discovered.len(),
peers.len()
);
for entry in &discovered {
let hint_addrs = Self::extract_coordinator_hints(&entry.node);
if !hint_addrs.is_empty() {
let direct = Self::direct_addresses_only(&entry.node);
if let Some(target_addr) =
Self::first_dialable_address(&direct).and_then(|a| a.dialable_socket_addr())
{
debug!(
"Setting {} coordinator hint(s) for NAT node {} from DHT record",
hint_addrs.len(),
hex::encode(&entry.node.peer_id.to_bytes()[..8])
);
self.transport
.set_hole_punch_preferred_coordinators(target_addr, hint_addrs)
.await;
}
let hint_multiaddrs: Vec<crate::MultiAddr> = entry
.node
.addresses
.iter()
.filter(|addr| addr.peer_id().is_some_and(|pid| *pid != entry.node.peer_id))
.cloned()
.collect();
let stored = {
let dht = self.dht.read().await;
dht.merge_coordinator_hints(&entry.node.peer_id, hint_multiaddrs)
.await
};
if stored > 0 {
debug!(
"Merged {} coordinator hint(s) into routing table for peer {}",
stored,
hex::encode(&entry.node.peer_id.to_bytes()[..8])
);
}
}
}
info!(
"Bootstrap dialing {} peers concurrently (hints pre-loaded)",
discovered.len()
);
let dial_futures: Vec<_> = discovered
.iter()
.map(|entry| {
self.dial_addresses(&entry.node.peer_id, &entry.node.addresses, entry.referrer)
})
.collect();
futures::future::join_all(dial_futures).await;
let rt_size = self.get_routing_table_size().await;
if self.event_tx.receiver_count() > 0 {
let _ = self
.event_tx
.send(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
}
info!("Bootstrap complete: routing table has {rt_size} peers");
Ok(seen.len())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping DHT Network Manager...");
self.leave_network().await?;
self.shutdown.cancel();
self.dht.read().await.signal_shutdown();
async fn join_task(name: &str, slot: &RwLock<Option<tokio::task::JoinHandle<()>>>) {
if let Some(mut handle) = slot.write().await.take() {
match tokio::time::timeout(std::time::Duration::from_secs(10), &mut handle).await {
Ok(Ok(())) => debug!("{name} task stopped cleanly"),
Ok(Err(e)) if e.is_cancelled() => debug!("{name} task was cancelled"),
Ok(Err(e)) => warn!("{name} task panicked: {e}"),
Err(_) => {
warn!("{name} task did not stop within 10s, aborting");
handle.abort();
}
}
}
}
join_task("event handler", &self.event_handler_handle).await;
join_task("self-lookup", &self.self_lookup_handle).await;
join_task("bucket refresh", &self.bucket_refresh_handle).await;
join_task("hint republish", &self.hint_republish_handle).await;
info!("DHT Network Manager stopped");
Ok(())
}
pub async fn find_closest_nodes(&self, key: &Key, count: usize) -> Result<Vec<DHTNode>> {
self.find_closest_nodes_network(key, count).await
}
pub async fn find_node(&self, key: &Key) -> Result<DhtNetworkResult> {
info!("Finding nodes closest to key: {}", hex::encode(key));
let closest_nodes = self.find_closest_nodes_network(key, self.k_value()).await?;
let serializable_nodes: Vec<SerializableDHTNode> = closest_nodes.into_iter().collect();
info!(
"Found {} nodes closest to key: {}",
serializable_nodes.len(),
hex::encode(key)
);
Ok(DhtNetworkResult::NodesFound {
key: *key,
nodes: serializable_nodes,
})
}
pub async fn ping(&self, peer_id: &PeerId) -> Result<DhtNetworkResult> {
info!("Pinging peer: {}", peer_id.to_hex());
let start_time = Instant::now();
let operation = DhtNetworkOperation::Ping;
match self.send_dht_request(peer_id, operation, None).await {
Ok(DhtNetworkResult::PongReceived { responder, .. }) => {
let latency = start_time.elapsed();
info!("Received pong from {} in {:?}", responder, latency);
Ok(DhtNetworkResult::PongReceived { responder, latency })
}
Ok(result) => {
warn!("Unexpected ping result: {:?}", result);
Err(P2PError::Dht(crate::error::DhtError::RoutingError(
"Unexpected ping response".to_string().into(),
)))
}
Err(e) => {
warn!("Ping failed to {}: {}", peer_id.to_hex(), e);
Err(e)
}
}
}
async fn leave_network(&self) -> Result<()> {
Ok(())
}
pub async fn find_closest_nodes_local(&self, key: &Key, count: usize) -> Vec<DHTNode> {
debug!(
"[LOCAL] Finding {} closest nodes to key: {}",
count,
hex::encode(key)
);
let dht_guard = self.dht.read().await;
match dht_guard.find_nodes(&DhtKey::from_bytes(*key), count).await {
Ok(nodes) => nodes
.into_iter()
.filter(|node| !self.is_local_peer_id(&node.id))
.map(|node| DHTNode {
peer_id: node.id,
addresses: node.addresses,
distance: None,
reliability: SELF_RELIABILITY_SCORE,
})
.collect(),
Err(e) => {
warn!("find_nodes failed for key {}: {e}", hex::encode(key));
Vec::new()
}
}
}
pub async fn find_closest_nodes_local_with_self(
&self,
key: &Key,
count: usize,
) -> Vec<DHTNode> {
let mut nodes = self.find_closest_nodes_local(key, count).await;
nodes.push(self.local_dht_node().await);
let key_peer = PeerId::from_bytes(*key);
nodes.sort_by(|a, b| {
let da = a.peer_id.xor_distance(&key_peer);
let db = b.peer_id.xor_distance(&key_peer);
da.cmp(&db)
});
nodes.truncate(count);
nodes
}
pub async fn find_closest_nodes_network(
&self,
key: &Key,
count: usize,
) -> Result<Vec<DHTNode>> {
const MAX_ITERATIONS: usize = 20;
const ALPHA: usize = 3;
debug!(
"[NETWORK] Finding {} closest nodes to key: {}",
count,
hex::encode(key)
);
let target_key = DhtKey::from_bytes(*key);
let mut queried_nodes: HashSet<PeerId> = HashSet::new();
let mut best_nodes: Vec<DHTNode> = Vec::new();
let mut referrers: std::collections::HashMap<PeerId, std::net::SocketAddr> =
std::collections::HashMap::new();
best_nodes.push(self.local_dht_node().await);
self.mark_self_queried(&mut queried_nodes);
let mut candidates: BTreeMap<(Key, PeerId), DHTNode> = BTreeMap::new();
let initial = self.find_closest_nodes_local(key, count).await;
for node in initial {
if !queried_nodes.contains(&node.peer_id) {
let dist = node.peer_id.distance(&target_key);
candidates.entry((dist, node.peer_id)).or_insert(node);
}
}
let mut previous_top_k: Vec<PeerId> = Vec::new();
for iteration in 0..MAX_ITERATIONS {
if candidates.is_empty() {
debug!(
"[NETWORK] No more candidates after {} iterations",
iteration
);
break;
}
let mut batch: Vec<DHTNode> = Vec::new();
while batch.len() < ALPHA {
let Some(entry) = candidates.first_entry() else {
break;
};
let node = entry.remove();
if queried_nodes.contains(&node.peer_id) {
continue;
}
batch.push(node);
}
if batch.is_empty() {
debug!(
"[NETWORK] All candidates queried after {} iterations",
iteration
);
break;
}
info!(
"[NETWORK] Iteration {}: querying {} nodes",
iteration,
batch.len()
);
let query_stream: FuturesUnordered<_> = batch
.iter()
.map(|node| {
let peer_id = node.peer_id;
let addresses = node.addresses.clone();
let referrer = referrers.get(&peer_id).copied();
let op = DhtNetworkOperation::FindNode { key: *key };
async move {
self.dial_addresses(&peer_id, &addresses, referrer).await;
let address_hint = Self::first_dialable_address(&addresses);
(
peer_id,
self.send_dht_request(&peer_id, op, address_hint.as_ref())
.await,
)
}
})
.collect();
let results = Self::collect_iteration_results(query_stream).await;
for (peer_id, result) in results {
queried_nodes.insert(peer_id);
match result {
Ok(DhtNetworkResult::NodesFound { mut nodes, .. }) => {
if let Some(queried_node) = batch.iter().find(|n| n.peer_id == peer_id) {
best_nodes.push(queried_node.clone());
}
let referrer_addr = batch
.iter()
.find(|n| n.peer_id == peer_id)
.and_then(|n| Self::first_dialable_address(&n.addresses))
.and_then(|a| a.dialable_socket_addr());
nodes.sort_by(|a, b| Self::compare_node_distance(a, b, key));
nodes.truncate(self.k_value());
for node in nodes {
if queried_nodes.contains(&node.peer_id)
|| self.is_local_peer_id(&node.peer_id)
{
continue;
}
if let Some(ref_addr) = referrer_addr
&& let std::collections::hash_map::Entry::Vacant(e) =
referrers.entry(node.peer_id)
{
info!(
"find_closest_nodes_network: peer {} referred by {} ({})",
hex::encode(&node.peer_id.to_bytes()[..8]),
hex::encode(&peer_id.to_bytes()[..8]),
ref_addr
);
e.insert(ref_addr);
}
let hint_addrs = Self::extract_coordinator_hints(&node);
if !hint_addrs.is_empty() {
let direct = Self::direct_addresses_only(&node);
if let Some(target_addr) = Self::first_dialable_address(&direct)
.and_then(|a| a.dialable_socket_addr())
{
debug!(
"Setting {} coordinator hint(s) for NAT node {} from DHT record",
hint_addrs.len(),
hex::encode(&node.peer_id.to_bytes()[..8])
);
self.transport
.set_hole_punch_preferred_coordinators(
target_addr,
hint_addrs.clone(),
)
.await;
}
let hint_multiaddrs: Vec<crate::MultiAddr> = node
.addresses
.iter()
.enumerate()
.filter(|(_, addr)| {
addr.peer_id().is_some_and(|pid| *pid != node.peer_id)
})
.map(|(_, addr)| addr.clone())
.collect();
let stored = {
let dht = self.dht.read().await;
dht.merge_coordinator_hints(&node.peer_id, hint_multiaddrs)
.await
};
if stored > 0 {
debug!(
"Merged {} coordinator hint(s) into routing table for peer {}",
stored,
hex::encode(&node.peer_id.to_bytes()[..8])
);
}
}
let dist = node.peer_id.distance(&target_key);
let cand_key = (dist, node.peer_id);
if candidates.contains_key(&cand_key) {
continue;
}
if candidates.len() >= MAX_CANDIDATE_NODES {
let farthest_key = candidates.keys().next_back().copied();
match farthest_key {
Some(fk) if cand_key < fk => {
candidates.remove(&fk);
}
_ => {
trace!(
"[NETWORK] Candidate queue at capacity ({}), dropping {}",
MAX_CANDIDATE_NODES,
node.peer_id.to_hex()
);
continue;
}
}
}
candidates.insert(cand_key, node);
}
}
Ok(DhtNetworkResult::PeerRejected) => {
info!(
"[NETWORK] Peer {} rejected us — removing from routing table",
peer_id.to_hex()
);
let mut dht = self.dht.write().await;
let rt_events = dht.remove_node_by_id(&peer_id).await;
drop(dht);
self.broadcast_routing_events(&rt_events);
let _ = self.transport.disconnect_peer(&peer_id).await;
}
Ok(_) => {
if let Some(queried_node) = batch.iter().find(|n| n.peer_id == peer_id) {
best_nodes.push(queried_node.clone());
}
}
Err(e) => {
trace!("[NETWORK] Query to {} failed: {}", peer_id.to_hex(), e);
}
}
}
best_nodes.sort_by(|a, b| Self::compare_node_distance(a, b, key));
best_nodes.dedup_by_key(|n| n.peer_id);
best_nodes.truncate(count);
let current_top_k: Vec<PeerId> = best_nodes.iter().map(|n| n.peer_id).collect();
if current_top_k == previous_top_k {
if best_nodes.len() < count && !candidates.is_empty() {
previous_top_k = current_top_k;
continue;
}
let has_promising_candidate = best_nodes.last().is_some_and(|worst| {
let worst_dist = worst.peer_id.distance(&target_key);
candidates
.keys()
.next()
.is_some_and(|(dist, _)| *dist < worst_dist)
});
if !has_promising_candidate {
info!(
"[NETWORK] {}: Top-K converged after {} iterations",
self.config.peer_id.to_hex(),
iteration + 1
);
break;
}
}
previous_top_k = current_top_k;
}
best_nodes.sort_by(|a, b| Self::compare_node_distance(a, b, key));
best_nodes.dedup_by_key(|n| n.peer_id);
best_nodes.truncate(count);
info!(
"[NETWORK] Found {} closest nodes: {:?}",
best_nodes.len(),
best_nodes
.iter()
.map(|n| {
let h = n.peer_id.to_hex();
h[..8.min(h.len())].to_string()
})
.collect::<Vec<_>>()
);
Ok(best_nodes)
}
fn compare_node_distance(a: &DHTNode, b: &DHTNode, key: &Key) -> std::cmp::Ordering {
let target_key = DhtKey::from_bytes(*key);
a.peer_id
.distance(&target_key)
.cmp(&b.peer_id.distance(&target_key))
}
async fn collect_iteration_results<S>(mut stream: S) -> Vec<(PeerId, Result<DhtNetworkResult>)>
where
S: futures::Stream<Item = (PeerId, Result<DhtNetworkResult>)> + Unpin,
{
let mut results = Vec::new();
let Some(first) = stream.next().await else {
return results;
};
results.push(first);
let grace = Duration::from_secs(ITERATION_GRACE_TIMEOUT_SECS);
let _ = tokio::time::timeout(grace, async {
while let Some(next) = stream.next().await {
results.push(next);
}
})
.await;
results
}
fn filter_response_nodes(
candidate_nodes: Vec<DHTNode>,
requester_peer_id: &PeerId,
) -> Vec<DHTNode> {
candidate_nodes
.into_iter()
.filter(|node| node.peer_id != *requester_peer_id)
.collect()
}
async fn local_dht_node(&self) -> DHTNode {
let mut addresses: Vec<MultiAddr> = Vec::new();
for observed in self.transport.observed_external_addresses() {
let resolved = MultiAddr::quic(observed);
if !addresses.contains(&resolved) {
addresses.push(resolved);
}
}
for la in self.transport.listen_addrs().await {
let Some(sa) = la.dialable_socket_addr() else {
continue;
};
if sa.port() == 0 || sa.ip().is_unspecified() {
continue;
}
let resolved = MultiAddr::quic(sa);
if !addresses.contains(&resolved) {
addresses.push(resolved);
}
}
{
let connected = self.transport.connected_peer_addresses(5).await;
if !connected.is_empty() {
let hint_addrs: Vec<String> =
connected.iter().map(|(sa, _)| sa.to_string()).collect();
debug!(
"local_dht_node: including {} coordinator hint(s): {:?}",
connected.len(),
hint_addrs
);
for (socket_addr, peer_id) in connected {
let hint = MultiAddr::quic(socket_addr).with_peer_id(peer_id);
if !addresses.contains(&hint) {
addresses.push(hint);
}
}
}
}
DHTNode {
peer_id: self.config.peer_id,
addresses,
distance: None,
reliability: SELF_RELIABILITY_SCORE,
}
}
fn extract_coordinator_hints(node: &DHTNode) -> Vec<SocketAddr> {
node.addresses
.iter()
.filter_map(|addr| {
let hint_peer_id = addr.peer_id()?;
if *hint_peer_id == node.peer_id {
return None; }
addr.dialable_socket_addr()
})
.collect()
}
fn direct_addresses_only(node: &DHTNode) -> Vec<MultiAddr> {
node.addresses
.iter()
.filter(|addr| match addr.peer_id() {
None => true, Some(pid) => *pid == node.peer_id, })
.cloned()
.collect()
}
fn mark_self_queried(&self, queried: &mut HashSet<PeerId>) {
queried.insert(self.config.peer_id);
}
fn dialable_addresses(addresses: &[MultiAddr]) -> Vec<MultiAddr> {
addresses
.iter()
.filter(|addr| {
let Some(sa) = addr.dialable_socket_addr() else {
trace!("Skipping non-dialable address: {addr}");
return false;
};
if sa.ip().is_unspecified() {
warn!("Rejecting unspecified address: {addr}");
return false;
}
if sa.ip().is_loopback() {
trace!("Accepting loopback address (local/test): {addr}");
}
true
})
.cloned()
.collect()
}
fn first_dialable_address(addresses: &[MultiAddr]) -> Option<MultiAddr> {
Self::dialable_addresses(addresses).into_iter().next()
}
async fn dial_addresses(
&self,
peer_id: &PeerId,
addresses: &[MultiAddr],
referrer: Option<SocketAddr>,
) -> Option<String> {
let direct_only: Vec<MultiAddr> = addresses
.iter()
.filter(|addr| match addr.peer_id() {
None => true, Some(pid) => *pid == *peer_id, })
.cloned()
.collect();
let dialable = Self::dialable_addresses(&direct_only);
if dialable.is_empty() {
debug!(
"dial_addresses: no dialable addresses for {}",
peer_id.to_hex()
);
return None;
}
for addr in &dialable {
if let Some(channel_id) = self.dial_candidate(peer_id, addr, referrer).await {
return Some(channel_id);
}
}
debug!(
"dial_addresses: all {} address(es) failed for {}",
dialable.len(),
peer_id.to_hex()
);
None
}
async fn record_peer_failure(&self, peer_id: &PeerId) {
if let Some(ref engine) = self.trust_engine {
engine.update_node_stats(
peer_id,
crate::adaptive::NodeStatisticsUpdate::FailedResponse,
);
}
}
fn sweep_expired_operations(&self) {
let mut ops = match self.active_operations.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(
"active_operations mutex poisoned in sweep_expired_operations, recovering"
);
poisoned.into_inner()
}
};
let now = Instant::now();
ops.retain(|id, ctx| {
let expired = now.duration_since(ctx.started_at) > ctx.timeout * 2;
if expired {
warn!(
"Sweeping expired DHT operation {id} (age {:?}, timeout {:?})",
now.duration_since(ctx.started_at),
ctx.timeout
);
}
!expired
});
}
async fn send_dht_request(
&self,
peer_id: &PeerId,
operation: DhtNetworkOperation,
address_hint: Option<&MultiAddr>,
) -> Result<DhtNetworkResult> {
self.sweep_expired_operations();
let message_id = Uuid::new_v4().to_string();
let message = DhtNetworkMessage {
message_id: message_id.clone(),
source: self.config.peer_id,
target: Some(*peer_id),
message_type: DhtMessageType::Request,
payload: operation,
result: None, timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|_| {
P2PError::Network(NetworkError::ProtocolError(
"System clock error: unable to get current timestamp".into(),
))
})?
.as_secs(),
ttl: 10,
hop_count: 0,
};
let message_data = postcard::to_stdvec(&message)
.map_err(|e| P2PError::Serialization(e.to_string().into()))?;
let (response_tx, response_rx) = oneshot::channel();
let contacted_nodes = vec![*peer_id];
let operation_context = DhtOperationContext {
operation: message.payload.clone(),
peer_id: *peer_id,
started_at: Instant::now(),
timeout: self.config.request_timeout,
contacted_nodes,
response_tx: Some(response_tx),
};
if let Ok(mut ops) = self.active_operations.lock() {
ops.insert(message_id.clone(), operation_context);
}
let peer_hex = peer_id.to_hex();
let local_hex = self.config.peer_id.to_hex();
debug!(
"[STEP 1] {} -> {}: Sending {:?} request (msg_id: {})",
local_hex, peer_hex, message.payload, message_id
);
let candidate_addresses: Vec<MultiAddr> = if self.transport.is_peer_connected(peer_id).await
{
Vec::new()
} else {
let mut addrs = Vec::new();
if let Some(hint) = address_hint {
addrs.push(hint.clone());
}
for addr in self.peer_addresses_for_dial(peer_id).await {
if !addrs.contains(&addr) {
addrs.push(addr);
}
}
addrs
};
if !candidate_addresses.is_empty() {
info!(
"[STEP 1b] {} -> {}: No open channel, trying {} dialable address(es)",
local_hex,
peer_hex,
candidate_addresses.len()
);
if let Some(channel_id) = self
.dial_addresses(peer_id, &candidate_addresses, None)
.await
{
let identity_timeout = self.config.request_timeout.min(IDENTITY_EXCHANGE_TIMEOUT);
match self
.transport
.wait_for_peer_identity(&channel_id, identity_timeout)
.await
{
Ok(authenticated) => {
if &authenticated != peer_id {
warn!(
"[STEP 1b] {} -> {}: identity MISMATCH — authenticated as {}. \
Routing table entry may be stale.",
local_hex,
peer_hex,
authenticated.to_hex()
);
if let Ok(mut ops) = self.active_operations.lock() {
ops.remove(&message_id);
}
return Err(P2PError::Identity(IdentityError::IdentityMismatch {
expected: peer_hex.into(),
actual: authenticated.to_hex().into(),
}));
}
debug!(
"[STEP 1b] {} -> {}: identity confirmed ({})",
local_hex,
peer_hex,
authenticated.to_hex()
);
}
Err(e) => {
warn!(
"[STEP 1b] {} -> {}: identity exchange failed, disconnecting channel: {}",
local_hex, peer_hex, e
);
self.transport.disconnect_channel(&channel_id).await;
if let Ok(mut ops) = self.active_operations.lock() {
ops.remove(&message_id);
}
self.record_peer_failure(peer_id).await;
return Err(P2PError::Network(NetworkError::ProtocolError(
format!("identity exchange with {} failed: {}", peer_hex, e).into(),
)));
}
}
} else {
warn!(
"[STEP 1b] {} -> {}: dial failed for all {} candidate address(es)",
local_hex,
peer_hex,
candidate_addresses.len()
);
if let Ok(mut ops) = self.active_operations.lock() {
ops.remove(&message_id);
}
self.record_peer_failure(peer_id).await;
return Err(P2PError::Network(NetworkError::PeerNotFound(
format!(
"failed to dial {} at any of {} candidate address(es)",
peer_hex,
candidate_addresses.len()
)
.into(),
)));
}
}
let result = match self
.transport
.send_message(peer_id, "/dht/1.0.0", message_data)
.await
{
Ok(_) => {
debug!(
"[STEP 2] {} -> {}: Message sent successfully, waiting for response...",
local_hex, peer_hex
);
let result = self
.wait_for_response(&message_id, response_rx, peer_id)
.await;
match &result {
Ok(r) => debug!(
"[STEP 6] {} <- {}: Got response: {:?}",
local_hex,
peer_hex,
std::mem::discriminant(r)
),
Err(e) => warn!(
"[STEP 6 FAILED] {} <- {}: Response error: {}",
local_hex, peer_hex, e
),
}
result
}
Err(e) => {
warn!(
"[STEP 1 FAILED] Failed to send DHT request to {}: {}",
peer_hex, e
);
Err(e)
}
};
if let Ok(mut ops) = self.active_operations.lock() {
ops.remove(&message_id);
}
if result.is_err() {
self.record_peer_failure(peer_id).await;
}
result
}
fn is_local_peer_id(&self, peer_id: &PeerId) -> bool {
*peer_id == self.config.peer_id
}
async fn canonical_app_peer_id(&self, peer_id: &PeerId) -> Option<PeerId> {
if self.transport.is_known_app_peer_id(peer_id).await {
return Some(*peer_id);
}
if self.transport.is_peer_connected(peer_id).await {
return Some(*peer_id);
}
None
}
async fn dial_candidate(
&self,
peer_id: &PeerId,
address: &MultiAddr,
referrer: Option<std::net::SocketAddr>,
) -> Option<String> {
let peer_hex = peer_id.to_hex();
if self.transport.is_peer_connected(peer_id).await {
debug!("dial_candidate: peer {} already connected", peer_hex);
return None;
}
if address.ip().is_some_and(|ip| ip.is_unspecified()) {
debug!(
"dial_candidate: rejecting unspecified address for {}: {}",
peer_hex, address
);
return None;
}
if let Some(socket_addr) = address.dialable_socket_addr() {
let pid_bytes = *peer_id.to_bytes();
info!(
"dial_candidate: setting hole_punch_target_peer_id for {} = {}",
socket_addr,
hex::encode(&pid_bytes[..8])
);
self.transport
.set_hole_punch_target_peer_id(socket_addr, pid_bytes)
.await;
}
if let Some(coordinator_addr) = referrer
&& let Some(socket_addr) = address.dialable_socket_addr()
{
info!(
"dial_candidate: setting preferred coordinator for {} = {} (DHT referrer)",
socket_addr, coordinator_addr
);
self.transport
.set_hole_punch_preferred_coordinator(socket_addr, coordinator_addr)
.await;
}
let dial_timeout = self
.transport
.connection_timeout()
.min(self.config.request_timeout);
match tokio::time::timeout(dial_timeout, self.transport.connect_peer(address)).await {
Ok(Ok(channel_id)) => {
debug!(
"dial_candidate: connected to {} at {} (channel {})",
peer_hex, address, channel_id
);
Some(channel_id)
}
Ok(Err(e)) => {
debug!(
"dial_candidate: failed to connect to {} at {}: {}",
peer_hex, address, e
);
None
}
Err(_) => {
debug!(
"dial_candidate: timeout connecting to {} at {} (>{:?})",
peer_hex, address, dial_timeout
);
None
}
}
}
pub(crate) async fn peer_addresses_for_dial(&self, peer_id: &PeerId) -> Vec<MultiAddr> {
let addrs = self.dht.read().await.get_node_addresses(peer_id).await;
let filtered = Self::dialable_addresses(&addrs);
if !filtered.is_empty() {
return filtered;
}
if let Some(info) = self.transport.peer_info(peer_id).await {
return Self::dialable_addresses(&info.addresses);
}
Vec::new()
}
async fn wait_for_response(
&self,
_message_id: &str,
response_rx: oneshot::Receiver<(PeerId, DhtNetworkResult)>,
_peer_id: &PeerId,
) -> Result<DhtNetworkResult> {
let response_timeout = self.config.request_timeout;
match tokio::time::timeout(response_timeout, response_rx).await {
Ok(Ok((_source, result))) => Ok(result),
Ok(Err(_recv_error)) => {
Err(P2PError::Network(NetworkError::ProtocolError(
"Response channel closed unexpectedly".into(),
)))
}
Err(_timeout) => Err(P2PError::Network(NetworkError::Timeout)),
}
}
pub async fn handle_dht_message(
&self,
data: &[u8],
sender: &PeerId,
) -> Result<Option<Vec<u8>>> {
if data.len() > MAX_MESSAGE_SIZE {
warn!(
"Rejecting oversized DHT message from {sender}: {} bytes (max: {MAX_MESSAGE_SIZE})",
data.len()
);
return Err(P2PError::Validation(
format!(
"Message size {} bytes exceeds maximum allowed size of {MAX_MESSAGE_SIZE} bytes",
data.len()
)
.into(),
));
}
let message: DhtNetworkMessage = postcard::from_bytes(data)
.map_err(|e| P2PError::Serialization(e.to_string().into()))?;
debug!(
"[STEP 3] {}: Received {:?} from {} (msg_id: {})",
self.config.peer_id.to_hex(),
message.message_type,
sender,
message.message_id
);
self.update_peer_info(*sender, &message).await;
match message.message_type {
DhtMessageType::Request => {
debug!(
"[STEP 3a] {}: Processing {:?} request from {}",
self.config.peer_id.to_hex(),
message.payload,
sender
);
let result = self.handle_dht_request(&message, sender).await?;
debug!(
"[STEP 4] {}: Sending response {:?} back to {} (msg_id: {})",
self.config.peer_id.to_hex(),
std::mem::discriminant(&result),
sender,
message.message_id
);
let response = self.create_response_message(&message, result)?;
Ok(Some(postcard::to_stdvec(&response).map_err(|e| {
P2PError::Serialization(e.to_string().into())
})?))
}
DhtMessageType::Response => {
debug!(
"[STEP 5] {}: Received response from {} (msg_id: {})",
self.config.peer_id.to_hex(),
sender,
message.message_id
);
self.handle_dht_response(&message, sender).await?;
Ok(None)
}
DhtMessageType::Broadcast => {
self.handle_dht_broadcast(&message).await?;
Ok(None)
}
DhtMessageType::Error => {
warn!("Received DHT error message: {:?}", message);
Ok(None)
}
}
}
async fn handle_dht_request(
&self,
message: &DhtNetworkMessage,
authenticated_sender: &PeerId,
) -> Result<DhtNetworkResult> {
match &message.payload {
DhtNetworkOperation::FindNode { key } => {
debug!("Handling FIND_NODE request for key: {}", hex::encode(key));
self.handle_find_node_request(key, authenticated_sender)
.await
}
DhtNetworkOperation::Ping => {
debug!("Handling PING request from: {}", authenticated_sender);
Ok(DhtNetworkResult::PongReceived {
responder: self.config.peer_id,
latency: Duration::from_millis(0), })
}
DhtNetworkOperation::Join => {
debug!("Handling JOIN request from: {}", authenticated_sender);
let dht_key = *authenticated_sender.as_bytes();
debug!("Node {} joined the network", authenticated_sender);
Ok(DhtNetworkResult::JoinSuccess {
assigned_key: dht_key,
bootstrap_peers: 1,
})
}
DhtNetworkOperation::Leave => {
debug!("Handling LEAVE request from: {}", authenticated_sender);
Ok(DhtNetworkResult::LeaveSuccess)
}
DhtNetworkOperation::PublishAddress { addresses } => {
let mut hints = Vec::new();
let mut relay_addrs = Vec::new();
for addr in addresses {
if addr
.peer_id()
.is_some_and(|pid| *pid != *authenticated_sender)
{
hints.push(addr.clone());
} else {
relay_addrs.push(addr);
}
}
if !relay_addrs.is_empty() {
info!(
"Handling PUBLISH_ADDRESS from {}: {} relay/direct addresses",
authenticated_sender,
relay_addrs.len()
);
let dht = self.dht.read().await;
for addr in &relay_addrs {
dht.touch_node_typed(
authenticated_sender,
Some(addr),
crate::dht::AddressType::Relay,
)
.await;
}
}
if !hints.is_empty() {
debug!(
"Received {} coordinator hint(s) for peer {} via publish",
hints.len(),
authenticated_sender
);
let dht = self.dht.read().await;
let stored = dht
.merge_coordinator_hints(authenticated_sender, hints)
.await;
if stored > 0 {
debug!(
"Merged {} coordinator hint(s) into routing table for peer {}",
stored, authenticated_sender
);
}
}
Ok(DhtNetworkResult::PublishAddressAck)
}
}
}
#[allow(dead_code)]
pub async fn send_request(
&self,
peer_id: &PeerId,
operation: DhtNetworkOperation,
) -> Result<DhtNetworkResult> {
self.send_dht_request(peer_id, operation, None).await
}
async fn handle_dht_response(
&self,
message: &DhtNetworkMessage,
sender: &PeerId,
) -> Result<()> {
let message_id = &message.message_id;
debug!("Handling DHT response for message_id: {message_id}");
let result = match &message.result {
Some(r) => r.clone(),
None => {
warn!("DHT response message {message_id} has no result field");
return Ok(());
}
};
let Some(sender_app_id) = self.canonical_app_peer_id(sender).await else {
warn!(
"Rejecting DHT response for {message_id}: sender {} has no authenticated app identity",
sender
);
return Ok(());
};
let Ok(mut ops) = self.active_operations.lock() else {
warn!("active_operations mutex poisoned");
return Ok(());
};
if let Some(context) = ops.get_mut(message_id) {
let source_authorized = context.peer_id == sender_app_id
|| context.contacted_nodes.contains(&sender_app_id);
if !source_authorized {
warn!(
"Rejecting DHT response for {message_id}: sender app_id {} \
(transport={}) not in contacted peers (expected {} or one of {:?})",
sender_app_id.to_hex(),
sender,
context.peer_id.to_hex(),
context
.contacted_nodes
.iter()
.map(PeerId::to_hex)
.collect::<Vec<_>>()
);
return Ok(());
}
if let Some(tx) = context.response_tx.take() {
debug!(
"[STEP 5a] {}: Delivering response for msg_id {} to waiting request",
self.config.peer_id.to_hex(),
message_id
);
if tx.send((sender_app_id, result)).is_err() {
warn!(
"[STEP 5a FAILED] {}: Response channel closed for msg_id {} (receiver timed out)",
self.config.peer_id.to_hex(),
message_id
);
}
} else {
debug!(
"Response already delivered for message_id: {message_id}, ignoring duplicate"
);
}
} else {
warn!(
"[STEP 5 FAILED] {}: No active operation found for msg_id {} (may have timed out)",
self.config.peer_id.to_hex(),
message_id
);
}
Ok(())
}
async fn handle_dht_broadcast(&self, _message: &DhtNetworkMessage) -> Result<()> {
debug!("DHT broadcast handling not fully implemented yet");
Ok(())
}
fn create_response_message(
&self,
request: &DhtNetworkMessage,
result: DhtNetworkResult,
) -> Result<DhtNetworkMessage> {
let payload = match &result {
DhtNetworkResult::NodesFound { key, .. } => DhtNetworkOperation::FindNode { key: *key },
DhtNetworkResult::PongReceived { .. } => DhtNetworkOperation::Ping,
DhtNetworkResult::JoinSuccess { .. } => DhtNetworkOperation::Join,
DhtNetworkResult::LeaveSuccess => DhtNetworkOperation::Leave,
DhtNetworkResult::PublishAddressAck => DhtNetworkOperation::Ping,
DhtNetworkResult::PeerRejected => request.payload.clone(),
DhtNetworkResult::Error { .. } => {
return Err(P2PError::Dht(crate::error::DhtError::RoutingError(
"Cannot create response for error result".to_string().into(),
)));
}
};
Ok(DhtNetworkMessage {
message_id: request.message_id.clone(),
source: self.config.peer_id,
target: Some(request.source),
message_type: DhtMessageType::Response,
payload,
result: Some(result),
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|_| {
P2PError::Network(NetworkError::ProtocolError(
"System clock error: unable to get current timestamp".into(),
))
})?
.as_secs(),
ttl: request.ttl.saturating_sub(1),
hop_count: request.hop_count.saturating_add(1),
})
}
async fn update_peer_info(&self, peer_id: PeerId, _message: &DhtNetworkMessage) {
let Some(app_peer_id) = self.canonical_app_peer_id(&peer_id).await else {
debug!(
"Ignoring DHT peer update for unauthenticated transport peer {}",
peer_id
);
return;
};
let transport_addr = self
.transport
.peer_info(&app_peer_id)
.await
.and_then(|info| Self::first_dialable_address(&info.addresses));
let dht = self.dht.read().await;
if dht
.touch_node_typed(
&app_peer_id,
transport_addr.as_ref(),
crate::dht::AddressType::Direct,
)
.await
{
trace!("Touched routing table entry for {}", app_peer_id.to_hex());
}
}
async fn reconcile_connected_peers(&self) {
let connected = self.transport.connected_peers().await;
if connected.is_empty() {
return;
}
info!(
"Reconciling {} already-connected peers for DHT state",
connected.len()
);
let mut skipped = 0u32;
for peer_id in connected {
if let Some(ua) = self.transport.peer_user_agent(&peer_id).await {
self.handle_peer_connected(peer_id, &ua).await;
} else {
skipped += 1;
debug!(
"Skipping reconciliation for peer {} — user agent not yet known",
peer_id.to_hex()
);
}
}
if skipped > 0 {
info!(
"Skipped {} peers during reconciliation (user agent unknown, will arrive via PeerConnected)",
skipped
);
}
}
async fn handle_peer_connected(&self, node_id: PeerId, user_agent: &str) {
let app_peer_id_hex = node_id.to_hex();
info!(
"DHT peer connected: app_id={}, user_agent={}",
app_peer_id_hex, user_agent
);
let dht_key = *node_id.as_bytes();
let addresses = if let Some(info) = self.transport.peer_info(&node_id).await {
Self::dialable_addresses(&info.addresses)
} else {
warn!("peer_info unavailable for app_peer_id {}", app_peer_id_hex);
Vec::new()
};
if addresses.is_empty() {
warn!(
"Peer {} has no valid addresses, skipping DHT routing table addition",
app_peer_id_hex
);
return;
}
if !crate::network::is_dht_participant(user_agent) {
info!(
"Skipping DHT routing table for ephemeral peer {} (user_agent={})",
app_peer_id_hex, user_agent
);
} else {
let address_types = vec![crate::dht::AddressType::Direct; addresses.len()];
let node_info = NodeInfo {
id: node_id,
addresses,
address_types,
last_seen: AtomicInstant::now(),
};
let trust_fn = |peer_id: &PeerId| -> f64 {
self.trust_engine
.as_ref()
.map(|engine| engine.score(peer_id))
.unwrap_or(DEFAULT_NEUTRAL_TRUST)
};
let add_result = self.dht.write().await.add_node(node_info, &trust_fn).await;
match add_result {
Ok(AdmissionResult::Admitted(rt_events)) => {
info!("Added peer {} to DHT routing table", app_peer_id_hex);
self.broadcast_routing_events(&rt_events);
}
Ok(AdmissionResult::StaleRevalidationNeeded {
candidate,
candidate_ips,
candidate_bucket_idx,
stale_peers,
}) => {
debug!(
"Peer {} admission deferred: {} stale peers need revalidation",
app_peer_id_hex,
stale_peers.len()
);
match self
.revalidate_and_retry_admission(
candidate,
candidate_ips,
candidate_bucket_idx,
stale_peers,
&trust_fn,
)
.await
{
Ok(rt_events) => {
info!(
"Added peer {} to DHT routing table after stale revalidation",
app_peer_id_hex
);
self.broadcast_routing_events(&rt_events);
}
Err(e) => {
warn!(
"Stale revalidation for peer {} failed: {}",
app_peer_id_hex, e
);
}
}
}
Err(e) => {
warn!(
"Failed to add peer {} to DHT routing table: {}",
app_peer_id_hex, e
);
}
}
}
if self.event_tx.receiver_count() > 0 {
let _ = self.event_tx.send(DhtNetworkEvent::PeerDiscovered {
peer_id: node_id,
dht_key,
});
}
}
async fn start_network_event_handler(&self, self_arc: Arc<Self>) -> Result<()> {
info!("Starting network event handler...");
let mut events = self.transport.subscribe_events();
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
loop {
tokio::select! {
() = shutdown.cancelled() => {
info!("Network event handler shutting down");
break;
}
recv = events.recv() => {
match recv {
Ok(event) => match event {
crate::network::P2PEvent::PeerConnected(peer_id, ref user_agent) => {
self_arc.handle_peer_connected(peer_id, user_agent).await;
}
crate::network::P2PEvent::PeerDisconnected(peer_id) => {
info!(
"DHT peer fully disconnected: app_id={}",
peer_id.to_hex()
);
if self_arc.event_tx.receiver_count() > 0
&& let Err(e) = self_arc
.event_tx
.send(DhtNetworkEvent::PeerDisconnected {
peer_id,
})
{
warn!(
"Failed to send PeerDisconnected event: {}",
e
);
}
}
crate::network::P2PEvent::Message {
topic,
source,
data,
} => {
trace!(
" [EVENT] Message received: topic={}, source={:?}, {} bytes",
topic,
source,
data.len()
);
if topic == "/dht/1.0.0" {
let Some(source_peer) = source else {
warn!("Ignoring unsigned DHT message");
continue;
};
trace!(" [EVENT] Processing DHT message from {}", source_peer);
let manager_clone = Arc::clone(&self_arc);
let semaphore = Arc::clone(&self_arc.message_handler_semaphore);
tokio::spawn(async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => {
warn!("Message handler semaphore closed");
return;
}
};
match tokio::time::timeout(
REQUEST_TIMEOUT,
manager_clone.handle_dht_message(&data, &source_peer),
)
.await
{
Ok(Ok(Some(response))) => {
if let Err(e) = manager_clone
.transport
.send_message(&source_peer, "/dht/1.0.0", response)
.await
{
warn!(
"Failed to send DHT response to {}: {}",
source_peer, e
);
}
}
Ok(Ok(None)) => {
}
Ok(Err(e)) => {
warn!(
"Failed to handle DHT message from {}: {}",
source_peer, e
);
}
Err(_) => {
warn!(
"DHT message handler timed out after {:?} for peer {}: potential DoS attempt or slow processing",
REQUEST_TIMEOUT, source_peer
);
}
}
});
}
}
},
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!("Network event handler lagged, skipped {} events", skipped);
}
Err(broadcast::error::RecvError::Closed) => {
info!("Network event channel closed, stopping event handler");
break;
}
}
}
}
}
});
*self.event_handler_handle.write().await = Some(handle);
Ok(())
}
async fn revalidate_and_retry_admission(
&self,
candidate: NodeInfo,
candidate_ips: Vec<IpAddr>,
bucket_idx: usize,
stale_peers: Vec<(PeerId, usize)>,
trust_fn: &impl Fn(&PeerId) -> f64,
) -> anyhow::Result<Vec<RoutingTableEvent>> {
if stale_peers.is_empty() {
return Err(anyhow::anyhow!("no stale peers to revalidate"));
}
let _permit = self
.revalidation_semaphore
.clone()
.try_acquire_owned()
.map_err(|_| anyhow::anyhow!("global revalidation limit reached"))?;
{
let mut active = self.bucket_revalidation_active.lock();
if active.contains(&bucket_idx) {
return Err(anyhow::anyhow!(
"revalidation already in progress for bucket {bucket_idx}"
));
}
active.insert(bucket_idx);
}
let _bucket_guard = BucketRevalidationGuard {
active: self.bucket_revalidation_active.clone(),
bucket_idx,
};
let mut evicted_peers = Vec::new();
let mut retained_peers = Vec::new();
for chunk in stale_peers.chunks(MAX_CONCURRENT_REVALIDATION_PINGS) {
let results = futures::future::join_all(chunk.iter().map(|(peer_id, _)| async {
let responded =
tokio::time::timeout(STALE_REVALIDATION_TIMEOUT, self.ping_peer(peer_id))
.await
.is_ok_and(|r| r.is_ok());
(*peer_id, responded)
}))
.await;
for (peer_id, responded) in results {
if responded {
retained_peers.push(peer_id);
} else {
evicted_peers.push(peer_id);
}
}
}
if evicted_peers.is_empty() {
return Err(anyhow::anyhow!(
"all stale peers responded — no room for candidate"
));
}
let mut dht = self.dht.write().await;
let mut all_events = Vec::new();
for peer_id in &evicted_peers {
let removal_events = dht.remove_node_by_id(peer_id).await;
all_events.extend(removal_events);
}
let admission_events = dht
.re_evaluate_admission(candidate, &candidate_ips, trust_fn)
.await?;
all_events.extend(admission_events);
Ok(all_events)
}
async fn ping_peer(&self, peer_id: &PeerId) -> anyhow::Result<()> {
self.send_dht_request(peer_id, DhtNetworkOperation::Ping, None)
.await
.map(|_| ())
.context("ping failed")
}
async fn revalidate_stale_k_closest(&self) {
let stale_peers = {
let dht = self.dht.read().await;
dht.stale_k_closest().await
};
if stale_peers.is_empty() {
return;
}
debug!("Revalidating {} stale K-closest peer(s)", stale_peers.len());
let mut non_responders = Vec::new();
for chunk in stale_peers.chunks(MAX_CONCURRENT_REVALIDATION_PINGS) {
let results = futures::future::join_all(chunk.iter().map(|peer_id| async {
let responded =
tokio::time::timeout(STALE_REVALIDATION_TIMEOUT, self.ping_peer(peer_id))
.await
.is_ok_and(|r| r.is_ok());
(*peer_id, responded)
}))
.await;
for (peer_id, responded) in results {
if !responded {
non_responders.push(peer_id);
}
}
}
if non_responders.is_empty() {
debug!("All stale K-closest peers responded — no evictions");
return;
}
let all_events = {
let mut dht = self.dht.write().await;
let mut events = Vec::new();
for peer_id in &non_responders {
events.extend(dht.remove_node_by_id(peer_id).await);
}
events
};
self.broadcast_routing_events(&all_events);
info!("Evicted {} offline K-closest peer(s)", non_responders.len());
}
fn broadcast_routing_events(&self, events: &[RoutingTableEvent]) {
if self.event_tx.receiver_count() == 0 {
return;
}
for event in events {
match event {
RoutingTableEvent::PeerAdded(id) => {
let _ = self
.event_tx
.send(DhtNetworkEvent::PeerAdded { peer_id: *id });
}
RoutingTableEvent::PeerRemoved(id) => {
let _ = self
.event_tx
.send(DhtNetworkEvent::PeerRemoved { peer_id: *id });
}
RoutingTableEvent::KClosestPeersChanged { old, new } => {
let _ = self.event_tx.send(DhtNetworkEvent::KClosestPeersChanged {
old: old.clone(),
new: new.clone(),
});
}
}
}
}
pub async fn touch_node(&self, peer_id: &PeerId, address: Option<&MultiAddr>) -> bool {
let dht = self.dht.read().await;
dht.touch_node(peer_id, address).await
}
pub async fn touch_node_typed(
&self,
peer_id: &PeerId,
address: Option<&MultiAddr>,
addr_type: crate::dht::AddressType,
) -> bool {
let dht = self.dht.read().await;
dht.touch_node_typed(peer_id, address, addr_type).await
}
pub async fn get_stats(&self) -> DhtNetworkStats {
self.stats.read().await.clone()
}
pub fn subscribe_events(&self) -> broadcast::Receiver<DhtNetworkEvent> {
self.event_tx.subscribe()
}
pub async fn get_connected_peers(&self) -> Vec<PeerId> {
self.transport.connected_peers().await
}
pub async fn get_routing_table_size(&self) -> usize {
self.dht.read().await.routing_table_size().await
}
pub async fn is_in_routing_table(&self, peer_id: &PeerId) -> bool {
let dht_guard = self.dht.read().await;
dht_guard.has_node(peer_id).await
}
pub async fn routing_table_peers(&self) -> Vec<DHTNode> {
let dht_guard = self.dht.read().await;
let nodes = dht_guard.all_nodes().await;
drop(dht_guard);
nodes
.into_iter()
.map(|node| {
let reliability = self
.trust_engine
.as_ref()
.map(|engine| engine.score(&node.id))
.unwrap_or(DEFAULT_NEUTRAL_TRUST);
DHTNode {
peer_id: node.id,
addresses: node.addresses,
distance: None,
reliability,
}
})
.collect()
}
pub fn peer_id(&self) -> &PeerId {
&self.config.peer_id
}
pub async fn publish_address_to_peers(
&self,
addresses: Vec<crate::MultiAddr>,
peers: &[DHTNode],
) {
let op = DhtNetworkOperation::PublishAddress {
addresses: addresses.clone(),
};
for peer in peers {
if peer.peer_id == self.config.peer_id {
continue; }
match self
.send_dht_request(
&peer.peer_id,
op.clone(),
Self::first_dialable_address(&peer.addresses).as_ref(),
)
.await
{
Ok(_) => {
info!("Published address to peer {}", peer.peer_id.to_hex());
}
Err(e) => {
debug!(
"Failed to publish address to peer {}: {}",
peer.peer_id.to_hex(),
e
);
}
}
}
}
pub fn local_addr(&self) -> Option<MultiAddr> {
self.transport.local_addr()
}
pub async fn connect_to_peer(&self, address: &MultiAddr) -> Result<String> {
self.transport.connect_peer(address).await
}
pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
&self.transport
}
pub fn trust_engine(&self) -> Option<Arc<TrustEngine>> {
self.trust_engine.clone()
}
}
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 15;
const ITERATION_GRACE_TIMEOUT_SECS: u64 = 5;
const DEFAULT_MAX_CONCURRENT_OPS: usize = 100;
impl Default for DhtNetworkConfig {
fn default() -> Self {
Self {
peer_id: PeerId::from_bytes([0u8; 32]),
node_config: NodeConfig::default(),
request_timeout: Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS),
max_concurrent_operations: DEFAULT_MAX_CONCURRENT_OPS,
enable_security: true,
swap_threshold: 0.0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_first_dialable_address_skips_non_ip_when_ip_address_exists() {
let ble = MultiAddr::new(crate::address::TransportAddr::Ble {
mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
psm: 0x0025,
});
let quic = MultiAddr::quic("127.0.0.1:9000".parse().unwrap());
let selected = DhtNetworkManager::first_dialable_address(&[ble, quic.clone()]);
assert_eq!(
selected,
Some(quic),
"address selection should prefer a dialable IP transport over a preceding non-IP entry"
);
}
#[test]
fn test_first_dialable_address_returns_none_for_all_non_dialable() {
let ble = MultiAddr::new(crate::address::TransportAddr::Ble {
mac: [0x01, 0x02, 0x03, 0x04, 0x05, 0x06],
psm: 128,
});
let tcp = MultiAddr::tcp("10.0.0.1:80".parse().unwrap());
let lora = MultiAddr::new(crate::address::TransportAddr::LoRa {
dev_addr: [0xDE, 0xAD, 0xBE, 0xEF],
freq_hz: 868_000_000,
});
assert_eq!(
DhtNetworkManager::first_dialable_address(&[ble, tcp, lora]),
None,
"should return None when no QUIC address is present"
);
}
#[test]
fn test_first_dialable_address_rejects_unspecified_ip() {
let unspecified = MultiAddr::quic("0.0.0.0:9000".parse().unwrap());
assert_eq!(
DhtNetworkManager::first_dialable_address(&[unspecified]),
None,
"should reject unspecified (0.0.0.0) addresses"
);
}
#[test]
fn test_first_dialable_address_returns_none_for_empty_slice() {
assert_eq!(
DhtNetworkManager::first_dialable_address(&[]),
None,
"should return None for empty address list"
);
}
#[test]
fn test_peer_rejected_round_trips_through_serialization() {
let result = DhtNetworkResult::PeerRejected;
let bytes = postcard::to_stdvec(&result).expect("serialization should succeed");
let deserialized: DhtNetworkResult =
postcard::from_bytes(&bytes).expect("deserialization should succeed");
assert!(
matches!(deserialized, DhtNetworkResult::PeerRejected),
"round-tripped result should be PeerRejected, got: {deserialized:?}"
);
}
#[test]
fn test_bootstrap_complete_event_construction() {
let event = DhtNetworkEvent::BootstrapComplete { num_peers: 42 };
assert!(
matches!(event, DhtNetworkEvent::BootstrapComplete { num_peers: 42 }),
"BootstrapComplete event should carry the peer count"
);
}
#[test]
fn test_k_closest_changed_event_uses_old_new_naming() {
let old = vec![PeerId::random(), PeerId::random()];
let new = vec![PeerId::random()];
let event = DhtNetworkEvent::KClosestPeersChanged {
old: old.clone(),
new: new.clone(),
};
match event {
DhtNetworkEvent::KClosestPeersChanged {
old: got_old,
new: got_new,
} => {
assert_eq!(got_old, old);
assert_eq!(got_new, new);
}
_ => panic!("expected KClosestPeersChanged"),
}
}
#[test]
fn test_peer_rejected_response_message_preserves_request_payload() {
let request = DhtNetworkMessage {
message_id: "test-123".to_string(),
source: PeerId::random(),
target: Some(PeerId::random()),
message_type: DhtMessageType::Request,
payload: DhtNetworkOperation::Ping,
result: None,
timestamp: 0,
ttl: 10,
hop_count: 0,
};
let response = DhtNetworkMessage {
message_id: request.message_id.clone(),
source: PeerId::random(),
target: Some(request.source),
message_type: DhtMessageType::Response,
payload: request.payload.clone(),
result: Some(DhtNetworkResult::PeerRejected),
timestamp: 0,
ttl: request.ttl.saturating_sub(1),
hop_count: request.hop_count.saturating_add(1),
};
let bytes = postcard::to_stdvec(&response).expect("serialize response");
let decoded: DhtNetworkMessage =
postcard::from_bytes(&bytes).expect("deserialize response");
assert!(
matches!(decoded.result, Some(DhtNetworkResult::PeerRejected)),
"response result should be PeerRejected"
);
assert!(
matches!(decoded.payload, DhtNetworkOperation::Ping),
"response should echo the request's Ping payload"
);
}
}