use crate::PeerId;
use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
use crate::bootstrap::{BootstrapConfig, BootstrapManager};
use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
use crate::MultiAddr;
use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
use parking_lot::Mutex as ParkingMutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct WireMessage {
pub(crate) protocol: String,
pub(crate) data: Vec<u8>,
pub(crate) from: PeerId,
pub(crate) timestamp: u64,
#[serde(default)]
pub(crate) user_agent: String,
#[serde(default)]
pub(crate) public_key: Vec<u8>,
#[serde(default)]
pub(crate) signature: Vec<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum NodeMode {
#[default]
Node,
Client,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ListenMode {
Public,
Local,
}
pub fn user_agent_for_mode(mode: NodeMode) -> String {
let prefix = match mode {
NodeMode::Node => "node",
NodeMode::Client => "client",
};
format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
}
pub fn is_dht_participant(user_agent: &str) -> bool {
user_agent.starts_with("node/")
}
pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
const DEFAULT_LISTEN_PORT: u16 = 9000;
const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 90;
const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 10;
const fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
#[serde(default)]
pub local: bool,
#[serde(default)]
pub port: u16,
#[serde(default = "default_true")]
pub ipv6: bool,
pub bootstrap_peers: Vec<crate::MultiAddr>,
pub connection_timeout: Duration,
pub max_connections: usize,
pub dht_config: DHTConfig,
pub bootstrap_cache_config: Option<BootstrapConfig>,
pub diversity_config: Option<crate::security::IPDiversityConfig>,
#[serde(default)]
pub max_message_size: Option<usize>,
#[serde(skip)]
pub node_identity: Option<Arc<NodeIdentity>>,
#[serde(default)]
pub mode: NodeMode,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub custom_user_agent: Option<String>,
#[serde(default)]
pub allow_loopback: bool,
#[serde(default)]
pub adaptive_dht_config: AdaptiveDhtConfig,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub close_group_cache_dir: Option<PathBuf>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DHTConfig {
pub k_value: usize,
pub alpha_value: usize,
pub refresh_interval: Duration,
}
#[inline]
fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
let (v4, v6) = match mode {
ListenMode::Public => (
std::net::Ipv4Addr::UNSPECIFIED,
std::net::Ipv6Addr::UNSPECIFIED,
),
ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
};
if ipv6_enabled {
addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
std::net::IpAddr::V6(v6),
port,
)));
}
addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
std::net::IpAddr::V4(v4),
port,
)));
addrs
}
impl NodeConfig {
pub fn user_agent(&self) -> String {
self.custom_user_agent
.clone()
.unwrap_or_else(|| user_agent_for_mode(self.mode))
}
pub fn listen_addrs(&self) -> Vec<MultiAddr> {
let mode = if self.local {
ListenMode::Local
} else {
ListenMode::Public
};
build_listen_addrs(self.port, self.ipv6, mode)
}
pub fn new() -> Result<Self> {
Ok(Self::default())
}
pub fn builder() -> NodeConfigBuilder {
NodeConfigBuilder::default()
}
}
#[derive(Debug, Clone)]
pub struct NodeConfigBuilder {
port: u16,
ipv6: bool,
local: bool,
bootstrap_peers: Vec<crate::MultiAddr>,
max_connections: Option<usize>,
connection_timeout: Option<Duration>,
dht_config: Option<DHTConfig>,
max_message_size: Option<usize>,
mode: NodeMode,
custom_user_agent: Option<String>,
allow_loopback: Option<bool>,
adaptive_dht_config: Option<AdaptiveDhtConfig>,
close_group_cache_dir: Option<PathBuf>,
}
impl Default for NodeConfigBuilder {
fn default() -> Self {
Self {
port: 0,
ipv6: true,
local: false,
bootstrap_peers: Vec::new(),
max_connections: None,
connection_timeout: None,
dht_config: None,
max_message_size: None,
mode: NodeMode::default(),
custom_user_agent: None,
allow_loopback: None,
adaptive_dht_config: None,
close_group_cache_dir: None,
}
}
}
impl NodeConfigBuilder {
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
pub fn ipv6(mut self, enabled: bool) -> Self {
self.ipv6 = enabled;
self
}
pub fn local(mut self, local: bool) -> Self {
self.local = local;
self
}
pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
self.bootstrap_peers.push(addr);
self
}
pub fn max_connections(mut self, max: usize) -> Self {
self.max_connections = Some(max);
self
}
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = Some(timeout);
self
}
pub fn dht_config(mut self, config: DHTConfig) -> Self {
self.dht_config = Some(config);
self
}
pub fn max_message_size(mut self, max_message_size: usize) -> Self {
self.max_message_size = Some(max_message_size);
self
}
pub fn mode(mut self, mode: NodeMode) -> Self {
self.mode = mode;
self
}
pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.custom_user_agent = Some(user_agent.into());
self
}
pub fn allow_loopback(mut self, allow: bool) -> Self {
self.allow_loopback = Some(allow);
self
}
pub fn trust_enforcement(mut self, enabled: bool) -> Self {
let threshold = if enabled {
AdaptiveDhtConfig::default().block_threshold
} else {
0.0
};
self.adaptive_dht_config = Some(AdaptiveDhtConfig {
block_threshold: threshold,
});
self
}
pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
self.adaptive_dht_config = Some(config);
self
}
pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.close_group_cache_dir = Some(path.into());
self
}
pub fn build(self) -> Result<NodeConfig> {
let allow_loopback = self.allow_loopback.unwrap_or(self.local);
Ok(NodeConfig {
local: self.local,
port: self.port,
ipv6: self.ipv6,
bootstrap_peers: self.bootstrap_peers,
connection_timeout: self
.connection_timeout
.unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
dht_config: self.dht_config.unwrap_or_default(),
bootstrap_cache_config: None,
diversity_config: None,
max_message_size: self.max_message_size,
node_identity: None,
mode: self.mode,
custom_user_agent: self.custom_user_agent,
allow_loopback,
adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
close_group_cache_dir: self.close_group_cache_dir,
})
}
}
impl Default for NodeConfig {
fn default() -> Self {
Self {
local: false,
port: DEFAULT_LISTEN_PORT,
ipv6: true,
bootstrap_peers: Vec::new(),
connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
max_connections: DEFAULT_MAX_CONNECTIONS,
dht_config: DHTConfig::default(),
bootstrap_cache_config: None,
diversity_config: None,
max_message_size: None,
node_identity: None,
mode: NodeMode::default(),
custom_user_agent: None,
allow_loopback: false,
adaptive_dht_config: AdaptiveDhtConfig::default(),
close_group_cache_dir: None,
}
}
}
impl DHTConfig {
pub const DEFAULT_K_VALUE: usize = 20;
const DEFAULT_ALPHA_VALUE: usize = 3;
const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
const MIN_K_VALUE: usize = 4;
pub fn validate(&self) -> Result<()> {
if self.k_value < Self::MIN_K_VALUE {
return Err(P2PError::Validation(
format!(
"k_value must be >= {} (got {}), values below {} produce degenerate behavior",
Self::MIN_K_VALUE,
self.k_value,
Self::MIN_K_VALUE,
)
.into(),
));
}
if self.alpha_value < 1 {
return Err(P2PError::Validation(
format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
));
}
if self.refresh_interval.is_zero() {
return Err(P2PError::Validation("refresh_interval must be > 0".into()));
}
Ok(())
}
}
impl Default for DHTConfig {
fn default() -> Self {
Self {
k_value: Self::DEFAULT_K_VALUE,
alpha_value: Self::DEFAULT_ALPHA_VALUE,
refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
}
}
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
#[allow(dead_code)]
pub(crate) channel_id: String,
pub addresses: Vec<MultiAddr>,
pub connected_at: Instant,
pub last_seen: Instant,
pub status: ConnectionStatus,
pub protocols: Vec<String>,
pub heartbeat_count: u64,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionStatus {
Connecting,
Connected,
Disconnecting,
Disconnected,
Failed(String),
}
#[derive(Debug, Clone)]
pub enum P2PEvent {
Message {
topic: String,
source: Option<PeerId>,
data: Vec<u8>,
},
PeerConnected(PeerId, String),
PeerDisconnected(PeerId),
}
#[derive(Debug, Clone)]
pub struct PeerResponse {
pub peer_id: PeerId,
pub data: Vec<u8>,
pub latency: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RequestResponseEnvelope {
pub(crate) message_id: String,
pub(crate) is_response: bool,
pub(crate) payload: Vec<u8>,
}
pub(crate) struct PendingRequest {
pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
pub(crate) expected_peer: PeerId,
}
const RECONNECT_IDENTITY_TIMEOUT: Duration = Duration::from_secs(5);
const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
pub struct P2PNode {
config: NodeConfig,
peer_id: PeerId,
transport: Arc<crate::transport_handle::TransportHandle>,
start_time: Instant,
shutdown: CancellationToken,
adaptive_dht: AdaptiveDHT,
bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
is_bootstrapped: Arc<AtomicBool>,
is_started: Arc<AtomicBool>,
reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
}
pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
if addr.ip().is_unspecified() {
let loopback_ip = match addr {
std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
std::net::SocketAddr::new(loopback_ip, addr.port())
} else {
addr
}
}
impl P2PNode {
pub async fn new(config: NodeConfig) -> Result<Self> {
let node_identity = match config.node_identity.clone() {
Some(identity) => identity,
None => Arc::new(NodeIdentity::generate()?),
};
let peer_id = *node_identity.peer_id();
config.dht_config.validate()?;
if let Some(ref diversity) = config.diversity_config {
diversity
.validate()
.map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
}
let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
let bootstrap_manager =
match BootstrapManager::with_node_config(bootstrap_config, &config).await {
Ok(manager) => Some(Arc::new(RwLock::new(manager))),
Err(e) => {
warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
None
}
};
let transport_config = crate::transport_handle::TransportConfig::from_node_config(
&config,
crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
node_identity.clone(),
);
let transport =
Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
let dht_manager_config = DhtNetworkConfig {
peer_id,
node_config: config.clone(),
request_timeout: config.connection_timeout,
max_concurrent_operations: MAX_ACTIVE_REQUESTS,
enable_security: true,
block_threshold: 0.0, };
let adaptive_dht = AdaptiveDHT::new(
transport.clone(),
dht_manager_config,
config.adaptive_dht_config.clone(),
)
.await?;
let node = Self {
config,
peer_id,
transport,
start_time: Instant::now(),
shutdown: CancellationToken::new(),
adaptive_dht,
bootstrap_manager,
is_bootstrapped: Arc::new(AtomicBool::new(false)),
is_started: Arc::new(AtomicBool::new(false)),
reconnect_locks: ParkingMutex::new(HashMap::new()),
};
info!(
"Created P2P node with peer ID: {} (call start() to begin networking)",
node.peer_id
);
Ok(node)
}
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
&self.transport
}
pub fn local_addr(&self) -> Option<MultiAddr> {
self.transport.local_addr()
}
pub fn is_bootstrapped(&self) -> bool {
self.is_bootstrapped.load(Ordering::SeqCst)
}
pub async fn re_bootstrap(&self) -> Result<()> {
self.is_bootstrapped.store(false, Ordering::SeqCst);
self.connect_bootstrap_peers(None).await
}
pub fn trust_engine(&self) -> Arc<TrustEngine> {
self.adaptive_dht.trust_engine().clone()
}
pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
self.adaptive_dht.report_trust_event(peer_id, event).await;
}
pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
self.adaptive_dht.peer_trust(peer_id)
}
pub fn adaptive_dht(&self) -> &AdaptiveDHT {
&self.adaptive_dht
}
pub async fn send_request(
&self,
peer_id: &PeerId,
protocol: &str,
data: Vec<u8>,
timeout: Duration,
) -> Result<PeerResponse> {
if self.adaptive_dht.peer_trust(peer_id) < self.adaptive_dht.config().block_threshold {
return Err(P2PError::Network(crate::error::NetworkError::PeerBlocked(
*peer_id,
)));
}
match self
.transport
.send_request(peer_id, protocol, data, timeout)
.await
{
Ok(resp) => {
self.report_trust_event(peer_id, TrustEvent::SuccessfulResponse)
.await;
Ok(resp)
}
Err(e) => {
let event = if matches!(&e, P2PError::Timeout(_)) {
TrustEvent::ConnectionTimeout
} else {
TrustEvent::ConnectionFailed
};
self.report_trust_event(peer_id, event).await;
Err(e)
}
}
}
pub async fn send_response(
&self,
peer_id: &PeerId,
protocol: &str,
message_id: &str,
data: Vec<u8>,
) -> Result<()> {
self.transport
.send_response(peer_id, protocol, message_id, data)
.await
}
pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
crate::transport_handle::TransportHandle::parse_request_envelope(data)
}
pub async fn subscribe(&self, topic: &str) -> Result<()> {
self.transport.subscribe(topic).await
}
pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
self.transport.publish(topic, data).await
}
pub fn config(&self) -> &NodeConfig {
&self.config
}
pub async fn start(&self) -> Result<()> {
info!("Starting P2P node...");
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let mut manager = bootstrap_manager.write().await;
manager
.start_maintenance()
.map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
info!("Bootstrap cache manager started");
}
self.transport.start_network_listeners().await?;
self.adaptive_dht.start().await?;
let listen_addrs = self.transport.listen_addrs().await;
info!("P2P node started on addresses: {:?}", listen_addrs);
let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
match CloseGroupCache::load_from_dir(dir).await {
Ok(Some(cache)) => {
let original_count = cache.peers.len();
let cache = CloseGroupCache {
peers: cache
.peers
.into_iter()
.filter(|p| p.trust.score.is_finite())
.collect(),
..cache
};
let filtered_count = original_count - cache.peers.len();
if filtered_count > 0 {
warn!(
"Filtered {filtered_count} peers with non-finite trust scores from close group cache"
);
}
let trust_snapshot = TrustSnapshot {
peers: cache
.peers
.iter()
.map(|p| (p.peer_id, p.trust.clone()))
.collect(),
};
self.adaptive_dht
.trust_engine()
.import_snapshot(&trust_snapshot);
info!(
"Loaded {} peers from close group cache (trust scores imported)",
cache.peers.len()
);
Some(cache)
}
Ok(None) => {
debug!(
"No close group cache found in {}, fresh start",
dir.display()
);
None
}
Err(e) => {
warn!(
"Failed to load close group cache from {}: {e}",
dir.display()
);
None
}
}
} else {
None
};
self.connect_bootstrap_peers(close_group_cache.as_ref())
.await?;
self.is_started
.store(true, std::sync::atomic::Ordering::Release);
Ok(())
}
pub async fn run(&self) -> Result<()> {
if !self.is_running() {
self.start().await?;
}
info!("P2P node running...");
self.shutdown.cancelled().await;
info!("P2P node stopped");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping P2P node...");
if let Some(ref dir) = self.config.close_group_cache_dir
&& let Err(e) = self.save_close_group_cache(dir).await
{
warn!("Failed to save close group cache on shutdown: {e}");
}
self.shutdown.cancel();
self.adaptive_dht.stop().await?;
self.transport.stop().await?;
self.is_started
.store(false, std::sync::atomic::Ordering::Release);
info!("P2P node stopped");
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
self.stop().await
}
pub fn is_running(&self) -> bool {
self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
}
pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
self.transport.listen_addrs().await
}
pub async fn connected_peers(&self) -> Vec<PeerId> {
self.transport.connected_peers().await
}
pub async fn peer_count(&self) -> usize {
self.transport.peer_count().await
}
pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
self.transport.peer_info(peer_id).await
}
#[allow(dead_code)]
pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
self.transport.get_channel_id_by_address(addr).await
}
#[allow(dead_code)]
pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
self.transport.list_active_connections().await
}
#[allow(dead_code)]
pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
self.transport.remove_channel(channel_id).await
}
pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
self.transport.disconnect_channel(channel_id).await;
}
pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
self.transport.is_peer_connected(peer_id).await
}
pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
self.transport.connect_peer(address).await
}
pub async fn wait_for_peer_identity(
&self,
channel_id: &str,
timeout: Duration,
) -> Result<PeerId> {
self.transport
.wait_for_peer_identity(channel_id, timeout)
.await
}
pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
self.transport.disconnect_peer(peer_id).await
}
#[allow(dead_code)]
pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
self.transport.is_connection_active(channel_id).await
}
pub async fn send_message(
&self,
peer_id: &PeerId,
protocol: &str,
data: Vec<u8>,
addrs: &[MultiAddr],
) -> Result<()> {
let existing_channels = self.transport.channels_for_peer(peer_id).await;
if existing_channels.is_empty() {
let lock = self.reconnect_lock_for(peer_id);
let _guard = lock.lock().await;
if self.transport.is_peer_connected(peer_id).await {
return self.transport.send_message(peer_id, protocol, data).await;
}
return self
.reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
.await;
}
let saved_addrs: Vec<MultiAddr> = self
.transport
.peer_info(peer_id)
.await
.map(|info| info.addresses)
.unwrap_or_default();
let retry_data = data.clone();
match self.transport.send_message(peer_id, protocol, data).await {
Ok(()) => return Ok(()),
Err(e) => {
debug!(
peer = %peer_id.to_hex(),
error = %e,
"send failed, attempting reconnect",
);
}
}
let lock = self.reconnect_lock_for(peer_id);
let _guard = lock.lock().await;
if self.transport.is_peer_connected(peer_id).await {
for channel_id in &existing_channels {
self.transport.disconnect_channel(channel_id).await;
}
return self
.transport
.send_message(peer_id, protocol, retry_data)
.await;
}
self.reconnect_and_send(
peer_id,
protocol,
retry_data,
addrs,
&saved_addrs,
&existing_channels,
)
.await
}
async fn reconnect_and_send(
&self,
peer_id: &PeerId,
protocol: &str,
data: Vec<u8>,
addrs: &[MultiAddr],
saved_addrs: &[MultiAddr],
stale_channels: &[String],
) -> Result<()> {
let address = self
.resolve_dial_address(peer_id, addrs, saved_addrs)
.await
.ok_or_else(|| {
P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
})?;
if !stale_channels.is_empty() {
for channel_id in stale_channels {
self.transport.disconnect_channel(channel_id).await;
}
tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
}
let channel_id = self.transport.connect_peer(&address).await?;
let authenticated = match self
.transport
.wait_for_peer_identity(&channel_id, RECONNECT_IDENTITY_TIMEOUT)
.await
{
Ok(peer) => peer,
Err(e) => {
self.transport.disconnect_channel(&channel_id).await;
return Err(e);
}
};
if &authenticated != peer_id {
self.transport.disconnect_channel(&channel_id).await;
return Err(P2PError::Identity(IdentityError::IdentityMismatch {
expected: peer_id.to_hex().into(),
actual: authenticated.to_hex().into(),
}));
}
self.transport.send_message(peer_id, protocol, data).await
}
async fn resolve_dial_address(
&self,
peer_id: &PeerId,
caller_addrs: &[MultiAddr],
saved_addrs: &[MultiAddr],
) -> Option<MultiAddr> {
if let Some(addr) = Self::first_dialable(caller_addrs) {
return Some(addr);
}
if let Some(addr) = Self::first_dialable(saved_addrs) {
return Some(addr);
}
let dht_addrs = self.adaptive_dht.peer_addresses_for_dial(peer_id).await;
Self::first_dialable(&dht_addrs)
}
fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
addrs
.iter()
.find(|a| {
let dialable = a
.dialable_socket_addr()
.is_some_and(|sa| !sa.ip().is_unspecified());
if !dialable {
trace!(address = %a, "skipping non-dialable address");
}
dialable
})
.cloned()
}
fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
self.reconnect_locks
.lock()
.entry(*peer_id)
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
}
}
const MAX_MESSAGE_AGE_SECS: u64 = 300;
const MAX_FUTURE_SECS: u64 = 30;
fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
}
pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
if let Err(e) = tx.send(event) {
tracing::trace!("Event broadcast has no receivers: {e}");
}
}
pub(crate) struct ParsedMessage {
pub(crate) event: P2PEvent,
pub(crate) authenticated_node_id: Option<PeerId>,
pub(crate) user_agent: String,
}
pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
let message: WireMessage = postcard::from_bytes(bytes).ok()?;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
tracing::warn!(
"Rejecting stale message from {} (timestamp {} is {} seconds old)",
source,
message.timestamp,
now.saturating_sub(message.timestamp)
);
return None;
}
if message.timestamp > now + MAX_FUTURE_SECS {
tracing::warn!(
"Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
source,
message.timestamp,
message.timestamp.saturating_sub(now)
);
return None;
}
let authenticated_node_id = if !message.signature.is_empty() {
match verify_message_signature(&message) {
Ok(peer_id) => {
debug!(
"Message from {} authenticated as app-level NodeId {}",
source, peer_id
);
Some(peer_id)
}
Err(e) => {
warn!(
"Rejecting message from {}: signature verification failed: {}",
source, e
);
return None;
}
}
} else {
None
};
debug!(
"Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
message.protocol,
authenticated_node_id,
source,
message.from,
message.data.len()
);
Some(ParsedMessage {
event: P2PEvent::Message {
topic: message.protocol,
source: authenticated_node_id,
data: message.data,
},
authenticated_node_id,
user_agent: message.user_agent,
})
}
fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
.map_err(|e| format!("invalid public key: {e:?}"))?;
let peer_id = peer_id_from_public_key(&pubkey);
if message.from != peer_id {
return Err(format!(
"from field mismatch: message claims '{}' but public key derives '{}'",
message.from, peer_id
));
}
let signable = postcard::to_stdvec(&(
&message.protocol,
&message.data as &[u8],
&message.from,
message.timestamp,
&message.user_agent,
))
.map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
let sig = MlDsaSignature::from_bytes(&message.signature)
.map_err(|e| format!("invalid signature: {e:?}"))?;
let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
.map_err(|e| format!("verification error: {e}"))?;
if valid {
Ok(peer_id)
} else {
Err("signature is invalid".to_string())
}
}
impl P2PNode {
pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
self.transport.subscribe_events()
}
pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
self.subscribe_events()
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub async fn health_check(&self) -> Result<()> {
let peer_count = self.peer_count().await;
if peer_count > self.config.max_connections {
Err(protocol_error(format!(
"Too many connections: {peer_count}"
)))
} else {
Ok(())
}
}
pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
self.adaptive_dht.dht_manager()
}
pub fn dht(&self) -> &Arc<DhtNetworkManager> {
self.dht_manager()
}
pub async fn add_discovered_peer(
&self,
_peer_id: PeerId,
addresses: Vec<MultiAddr>,
) -> Result<()> {
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
let socket_addresses: Vec<std::net::SocketAddr> = addresses
.iter()
.filter_map(|addr| addr.socket_addr())
.collect();
if let Some(&primary) = socket_addresses.first() {
manager
.add_peer(&primary, socket_addresses)
.await
.map_err(|e| {
protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
})?;
}
}
Ok(())
}
pub async fn update_peer_metrics(
&self,
addr: &MultiAddr,
success: bool,
latency_ms: Option<u64>,
_error: Option<String>,
) -> Result<()> {
if let Some(ref bootstrap_manager) = self.bootstrap_manager
&& let Some(sa) = addr.socket_addr()
{
let manager = bootstrap_manager.read().await;
if success {
let rtt_ms = latency_ms.unwrap_or(0) as u32;
manager.record_success(&sa, rtt_ms).await;
} else {
manager.record_failure(&sa).await;
}
}
Ok(())
}
pub async fn get_bootstrap_cache_stats(
&self,
) -> Result<Option<crate::bootstrap::BootstrapStats>> {
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
Ok(Some(manager.stats().await))
} else {
Ok(None)
}
}
pub async fn cached_peer_count(&self) -> usize {
if let Some(ref _bootstrap_manager) = self.bootstrap_manager
&& let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
{
return stats.total_peers;
}
0
}
async fn connect_bootstrap_peers(
&self,
close_group_cache: Option<&CloseGroupCache>,
) -> Result<()> {
let mut bootstrap_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
let mut used_cache = false;
let mut seen_addresses = std::collections::HashSet::new();
if let Some(cache) = close_group_cache {
let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
sorted_peers.sort_by(|a, b| {
let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
Some(ord) => ord,
None => {
if a.trust.score.is_nan() {
std::cmp::Ordering::Greater } else {
std::cmp::Ordering::Less }
}
};
score_ord.then_with(|| {
let da = self.peer_id.xor_distance(&a.peer_id);
let db = self.peer_id.xor_distance(&b.peer_id);
da.cmp(&db)
})
});
let mut added_from_close_group = 0usize;
for peer in &sorted_peers {
let new_addresses: Vec<MultiAddr> = peer
.addresses
.iter()
.filter(|a| {
a.dialable_socket_addr()
.is_some_and(|sa| !seen_addresses.contains(&sa))
})
.cloned()
.collect();
if !new_addresses.is_empty() {
for addr in &new_addresses {
if let Some(sa) = addr.socket_addr() {
seen_addresses.insert(sa);
}
}
bootstrap_addr_sets.push(new_addresses);
added_from_close_group += 1;
}
}
if added_from_close_group > 0 {
info!(
"Added {} close group cache peers (highest trust first)",
added_from_close_group
);
}
}
if !self.config.bootstrap_peers.is_empty() {
info!(
"Using {} configured bootstrap peers (priority)",
self.config.bootstrap_peers.len()
);
for multiaddr in &self.config.bootstrap_peers {
let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
continue;
};
seen_addresses.insert(socket_addr);
bootstrap_addr_sets.push(vec![multiaddr.clone()]);
}
}
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
if !cached_peers.is_empty() {
let mut added_from_cache = 0;
for cached in cached_peers {
let mut addrs = vec![cached.primary_address];
addrs.extend(cached.addresses);
let new_addresses: Vec<MultiAddr> = addrs
.into_iter()
.filter(|a| !seen_addresses.contains(a))
.map(MultiAddr::quic)
.collect();
if !new_addresses.is_empty() {
for addr in &new_addresses {
if let Some(sa) = addr.socket_addr() {
seen_addresses.insert(sa);
}
}
bootstrap_addr_sets.push(new_addresses);
added_from_cache += 1;
}
}
if added_from_cache > 0 {
info!(
"Added {} cached bootstrap peers (supplementing CLI peers)",
added_from_cache
);
used_cache = true;
}
}
}
if bootstrap_addr_sets.is_empty() {
info!("No bootstrap peers configured and no cached peers available");
return Ok(());
}
let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
let mut successful_connections = 0;
let mut connected_peer_ids: Vec<PeerId> = Vec::new();
for addrs in &bootstrap_addr_sets {
for addr in addrs {
match self.connect_peer(addr).await {
Ok(channel_id) => {
match self
.transport
.wait_for_peer_identity(&channel_id, identity_timeout)
.await
{
Ok(real_peer_id) => {
successful_connections += 1;
connected_peer_ids.push(real_peer_id);
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
if let Some(sa) = addr.socket_addr() {
manager.record_success(&sa, 100).await;
}
}
break; }
Err(e) => {
warn!(
"Timeout waiting for identity from bootstrap peer {}: {}, \
closing channel {}",
addr, e, channel_id
);
self.disconnect_channel(&channel_id).await;
}
}
}
Err(e) => {
warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
if let Some(sa) = addr.socket_addr() {
manager.record_failure(&sa).await;
}
}
}
}
}
}
if successful_connections == 0 {
if !used_cache {
warn!("Failed to connect to any bootstrap peers");
}
return Ok(());
}
info!(
"Successfully connected to {} bootstrap peers",
successful_connections
);
match self
.dht_manager()
.bootstrap_from_peers(&connected_peer_ids)
.await
{
Ok(count) => info!("DHT peer discovery found {} peers", count),
Err(e) => warn!("DHT peer discovery failed: {}", e),
}
const SELF_LOOKUP_ROUNDS: u8 = 2;
for i in 1..=SELF_LOOKUP_ROUNDS {
if let Err(e) = self.dht_manager().trigger_self_lookup().await {
warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
} else {
debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
}
}
self.is_bootstrapped.store(true, Ordering::SeqCst);
info!(
"Bootstrap complete: connected to {} peers, initiated {} discovery requests",
successful_connections,
connected_peer_ids.len()
);
if let Some(ref dir) = self.config.close_group_cache_dir
&& let Err(e) = self.save_close_group_cache(dir).await
{
warn!("Failed to save close group cache after bootstrap: {e}");
}
Ok(())
}
async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
let key: crate::dht::Key = *self.peer_id.as_bytes();
let k_value = self.config.dht_config.k_value;
let close_group = self
.dht_manager()
.find_closest_nodes_local(&key, k_value)
.await;
if close_group.is_empty() {
debug!("No close group peers to save");
return Ok(());
}
let trust_engine = self.adaptive_dht.trust_engine();
let now_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let peers: Vec<CachedCloseGroupPeer> = close_group
.into_iter()
.filter_map(|dht_node| {
let score = trust_engine.score(&dht_node.peer_id);
if !score.is_finite() {
return None;
}
Some(CachedCloseGroupPeer {
peer_id: dht_node.peer_id,
addresses: dht_node.addresses,
trust: TrustRecord {
score,
last_updated_epoch_secs: now_epoch,
},
})
})
.collect();
let peer_count = peers.len();
let cache = CloseGroupCache {
peers,
saved_at_epoch_secs: now_epoch,
};
cache.save_to_dir(dir).await?;
info!(
"Saved {} close group peers to cache in {}",
peer_count,
dir.display()
);
Ok(())
}
}
#[async_trait::async_trait]
#[allow(dead_code)]
pub trait NetworkSender: Send + Sync {
async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
fn local_peer_id(&self) -> PeerId;
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod diversity_tests {
use super::*;
use crate::security::IPDiversityConfig;
async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
BootstrapManager::with_node_config(bootstrap_config, config)
.await
.expect("bootstrap manager")
}
#[tokio::test]
async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
let config = NodeConfig {
diversity_config: Some(IPDiversityConfig::testnet()),
..Default::default()
};
let manager = build_bootstrap_manager_like_prod(&config).await;
assert_eq!(manager.diversity_config().max_per_ip, Some(usize::MAX));
assert_eq!(manager.diversity_config().max_per_subnet, Some(usize::MAX));
}
}
pub(crate) async fn register_new_channel(
peers: &Arc<RwLock<HashMap<String, PeerInfo>>>,
channel_id: &str,
remote_addr: &MultiAddr,
) {
let mut peers_guard = peers.write().await;
let peer_info = PeerInfo {
channel_id: channel_id.to_owned(),
addresses: vec![remote_addr.clone()],
connected_at: tokio::time::Instant::now(),
last_seen: tokio::time::Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["p2p-core/1.0.0".to_string()],
heartbeat_count: 0,
};
peers_guard.insert(channel_id.to_owned(), peer_info);
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::time::timeout;
const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
fn create_test_node_config() -> NodeConfig {
NodeConfig {
local: true,
port: 0,
ipv6: true,
bootstrap_peers: vec![],
connection_timeout: Duration::from_secs(2),
max_connections: 100,
dht_config: DHTConfig::default(),
bootstrap_cache_config: None,
diversity_config: None,
max_message_size: None,
node_identity: None,
mode: NodeMode::default(),
custom_user_agent: None,
allow_loopback: true,
adaptive_dht_config: AdaptiveDhtConfig::default(),
close_group_cache_dir: None,
}
}
#[tokio::test]
async fn test_node_config_default() {
let config = NodeConfig::default();
assert_eq!(config.listen_addrs().len(), 2); assert_eq!(config.max_connections, 10000);
assert_eq!(config.connection_timeout, Duration::from_secs(90));
}
#[tokio::test]
async fn test_dht_config_default() {
let config = DHTConfig::default();
assert_eq!(config.k_value, 20);
assert_eq!(config.alpha_value, 3);
assert_eq!(config.refresh_interval, Duration::from_secs(600));
}
#[test]
fn test_connection_status_variants() {
let connecting = ConnectionStatus::Connecting;
let connected = ConnectionStatus::Connected;
let disconnecting = ConnectionStatus::Disconnecting;
let disconnected = ConnectionStatus::Disconnected;
let failed = ConnectionStatus::Failed("test error".to_string());
assert_eq!(connecting, ConnectionStatus::Connecting);
assert_eq!(connected, ConnectionStatus::Connected);
assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
assert_eq!(disconnected, ConnectionStatus::Disconnected);
assert_ne!(connecting, connected);
if let ConnectionStatus::Failed(msg) = failed {
assert_eq!(msg, "test error");
} else {
panic!("Expected Failed status");
}
}
#[tokio::test]
async fn test_node_creation() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert_eq!(node.peer_id().to_hex().len(), 64);
assert!(!node.is_running());
assert_eq!(node.peer_count().await, 0);
assert!(node.connected_peers().await.is_empty());
Ok(())
}
#[tokio::test]
async fn test_node_lifecycle() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert!(!node.is_running());
node.start().await?;
assert!(node.is_running());
let listen_addrs = node.listen_addrs().await;
assert!(
!listen_addrs.is_empty(),
"Expected at least one listening address"
);
node.stop().await?;
assert!(!node.is_running());
Ok(())
}
#[tokio::test]
async fn test_peer_connection() -> Result<()> {
let config1 = create_test_node_config();
let config2 = create_test_node_config();
let node1 = P2PNode::new(config1).await?;
let node2 = P2PNode::new(config2).await?;
node1.start().await?;
node2.start().await?;
let node2_addr = node2
.listen_addrs()
.await
.into_iter()
.find(|a| a.is_ipv4())
.ok_or_else(|| {
P2PError::Network(crate::error::NetworkError::InvalidAddress(
"Node 2 did not expose an IPv4 listen address".into(),
))
})?;
let channel_id = node1.connect_peer(&node2_addr).await?;
assert!(node1.is_connection_active(&channel_id).await);
let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
assert!(peer_info.is_some());
let info = peer_info.expect("Peer info should exist after connect");
assert_eq!(info.channel_id, channel_id);
assert_eq!(info.status, ConnectionStatus::Connected);
assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
node1.remove_channel(&channel_id).await;
assert!(!node1.is_connection_active(&channel_id).await);
node1.stop().await?;
node2.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
let result = node.connect_peer(&tcp_addr).await;
assert!(
matches!(
result,
Err(P2PError::Network(
crate::error::NetworkError::InvalidAddress(_)
))
),
"TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
result
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test]
async fn test_event_subscription() -> Result<()> {
let identity1 =
Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
let identity2 =
Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
let mut config1 = create_test_node_config();
config1.ipv6 = false;
config1.node_identity = Some(identity1);
let node2_peer_id = *identity2.peer_id();
let mut config2 = create_test_node_config();
config2.ipv6 = false;
config2.node_identity = Some(identity2);
let node1 = P2PNode::new(config1).await?;
let node2 = P2PNode::new(config2).await?;
node1.start().await?;
node2.start().await?;
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut events = node2.subscribe_events();
let node2_addr = node2.local_addr().ok_or_else(|| {
P2PError::Network(crate::error::NetworkError::ProtocolError(
"No listening address".to_string().into(),
))
})?;
let mut channel_id = None;
for attempt in 0..3 {
if attempt > 0 {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
Ok(Ok(id)) => {
channel_id = Some(id);
break;
}
Ok(Err(_)) | Err(_) => continue,
}
}
let channel_id = channel_id.expect("Failed to connect after 3 attempts");
let target_peer_id = node1
.wait_for_peer_identity(&channel_id, Duration::from_secs(2))
.await?;
assert_eq!(target_peer_id, node2_peer_id);
node1
.send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
.await?;
let event = timeout(Duration::from_secs(2), async {
loop {
match events.recv().await {
Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
Ok(P2PEvent::Message { .. }) => continue, Ok(_) => continue,
Err(e) => return Err(e),
}
}
})
.await;
assert!(event.is_ok(), "Should receive PeerConnected event");
let connected_peer_id = event.expect("Timed out").expect("Channel error");
assert!(
connected_peer_id.0.iter().any(|&b| b != 0),
"PeerConnected should carry a non-zero peer ID"
);
node1.stop().await?;
node2.stop().await?;
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test]
async fn test_message_sending() -> Result<()> {
let mut config1 = create_test_node_config();
config1.ipv6 = false;
let node1 = P2PNode::new(config1).await?;
node1.start().await?;
let mut config2 = create_test_node_config();
config2.ipv6 = false;
let node2 = P2PNode::new(config2).await?;
node2.start().await?;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let node2_addr = node2.local_addr().ok_or_else(|| {
P2PError::Network(crate::error::NetworkError::ProtocolError(
"No listening address".to_string().into(),
))
})?;
let channel_id =
match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
Ok(res) => res?,
Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
};
let target_peer_id = node1
.wait_for_peer_identity(&channel_id, Duration::from_secs(2))
.await?;
assert_eq!(target_peer_id, node2.peer_id().clone());
let message_data = b"Hello, peer!".to_vec();
let result = match timeout(
Duration::from_millis(500),
node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
)
.await
{
Ok(res) => res,
Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
};
if let Err(e) = &result {
assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
}
let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
let result = node1
.send_message(&non_existent_peer, "test-protocol", vec![], &[])
.await;
assert!(result.is_err(), "Sending to non-existent peer should fail");
node1.stop().await?;
node2.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_remote_mcp_operations() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
node.start().await?;
node.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_health_check() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let result = node.health_check().await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_node_uptime() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let uptime1 = node.uptime();
assert!(uptime1 >= Duration::from_secs(0));
tokio::time::sleep(Duration::from_millis(10)).await;
let uptime2 = node.uptime();
assert!(uptime2 > uptime1);
Ok(())
}
#[tokio::test]
async fn test_node_config_access() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let node_config = node.config();
assert_eq!(node_config.max_connections, 100);
Ok(())
}
#[tokio::test]
async fn test_mcp_server_access() -> Result<()> {
let config = create_test_node_config();
let _node = P2PNode::new(config).await?;
Ok(())
}
#[tokio::test]
async fn test_dht_access() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let _dht = node.dht();
Ok(())
}
#[tokio::test]
async fn test_node_config_builder() -> Result<()> {
let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
let config = NodeConfig::builder()
.local(true)
.ipv6(true)
.bootstrap_peer(bootstrap)
.connection_timeout(Duration::from_secs(15))
.max_connections(200)
.max_message_size(TEST_MAX_MESSAGE_SIZE)
.build()?;
assert_eq!(config.listen_addrs().len(), 2); assert!(config.local);
assert!(config.ipv6);
assert_eq!(config.bootstrap_peers.len(), 1);
assert_eq!(config.connection_timeout, Duration::from_secs(15));
assert_eq!(config.max_connections, 200);
assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
assert!(config.allow_loopback);
Ok(())
}
#[tokio::test]
async fn test_bootstrap_peers() -> Result<()> {
let mut config = create_test_node_config();
config.bootstrap_peers = vec![
crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
];
let node = P2PNode::new(config).await?;
node.start().await?;
let _peer_count = node.peer_count().await;
node.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_peer_info_structure() {
let peer_info = PeerInfo {
channel_id: "test_peer".to_string(),
addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
assert_eq!(peer_info.channel_id, "test_peer");
assert_eq!(peer_info.addresses.len(), 1);
assert_eq!(peer_info.status, ConnectionStatus::Connected);
assert_eq!(peer_info.protocols.len(), 1);
}
#[tokio::test]
async fn test_serialization() -> Result<()> {
let config = create_test_node_config();
let serialized = serde_json::to_string(&config)?;
let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
assert_eq!(config.local, deserialized.local);
assert_eq!(config.port, deserialized.port);
assert_eq!(config.ipv6, deserialized.ipv6);
assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
Ok(())
}
#[tokio::test]
async fn test_get_channel_id_by_address_found() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let test_channel_id = "peer_test_123".to_string();
let test_address = "192.168.1.100:9000";
let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
let peer_info = PeerInfo {
channel_id: test_channel_id.clone(),
addresses: vec![test_multiaddr],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
node.transport
.inject_peer(test_channel_id.clone(), peer_info)
.await;
let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
assert_eq!(found_channel_id, Some(test_channel_id));
Ok(())
}
#[tokio::test]
async fn test_get_channel_id_by_address_not_found() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
let result = node.get_channel_id_by_address(&unknown_addr).await;
assert_eq!(result, None);
Ok(())
}
#[tokio::test]
async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
psm: 0x0025,
});
let result = node.get_channel_id_by_address(&ble_addr).await;
assert_eq!(result, None);
Ok(())
}
#[tokio::test]
async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let peer1_id = "peer_1".to_string();
let peer1_addr_str = "192.168.1.101:9001";
let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
let peer2_id = "peer_2".to_string();
let peer2_addr_str = "192.168.1.102:9002";
let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
let peer1_info = PeerInfo {
channel_id: peer1_id.clone(),
addresses: vec![peer1_multiaddr],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
let peer2_info = PeerInfo {
channel_id: peer2_id.clone(),
addresses: vec![peer2_multiaddr],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
node.transport
.inject_peer(peer1_id.clone(), peer1_info)
.await;
node.transport
.inject_peer(peer2_id.clone(), peer2_info)
.await;
let found_peer1 = node
.get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
.await;
let found_peer2 = node
.get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
.await;
assert_eq!(found_peer1, Some(peer1_id));
assert_eq!(found_peer2, Some(peer2_id));
Ok(())
}
#[tokio::test]
async fn test_list_active_connections_empty() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let connections = node.list_active_connections().await;
assert!(connections.is_empty());
Ok(())
}
#[tokio::test]
async fn test_list_active_connections_with_peers() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let peer1_id = "peer_1".to_string();
let peer1_addrs = vec![
MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
];
let peer2_id = "peer_2".to_string();
let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
let peer1_info = PeerInfo {
channel_id: peer1_id.clone(),
addresses: peer1_addrs.clone(),
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
let peer2_info = PeerInfo {
channel_id: peer2_id.clone(),
addresses: peer2_addrs.clone(),
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
node.transport
.inject_peer(peer1_id.clone(), peer1_info)
.await;
node.transport
.inject_peer(peer2_id.clone(), peer2_info)
.await;
node.transport
.inject_active_connection(peer1_id.clone())
.await;
node.transport
.inject_active_connection(peer2_id.clone())
.await;
let connections = node.list_active_connections().await;
assert_eq!(connections.len(), 2);
let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
assert!(peer1_conn.is_some());
assert!(peer2_conn.is_some());
assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
Ok(())
}
#[tokio::test]
async fn test_remove_channel_success() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let channel_id = "peer_to_remove".to_string();
let channel_peer_id = PeerId::from_name(&channel_id);
let peer_info = PeerInfo {
channel_id: channel_id.clone(),
addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
node.transport
.inject_peer(channel_id.clone(), peer_info)
.await;
node.transport
.inject_peer_to_channel(channel_peer_id, channel_id.clone())
.await;
assert!(node.is_peer_connected(&channel_peer_id).await);
let removed = node.remove_channel(&channel_id).await;
assert!(removed);
assert!(!node.is_peer_connected(&channel_peer_id).await);
Ok(())
}
#[tokio::test]
async fn test_remove_channel_nonexistent() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let removed = node.remove_channel("nonexistent_peer").await;
assert!(!removed);
Ok(())
}
#[tokio::test]
async fn test_is_peer_connected() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let channel_id = "test_peer".to_string();
let channel_peer_id = PeerId::from_name(&channel_id);
assert!(!node.is_peer_connected(&channel_peer_id).await);
let peer_info = PeerInfo {
channel_id: channel_id.clone(),
addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
heartbeat_count: 0,
};
node.transport
.inject_peer(channel_id.clone(), peer_info)
.await;
node.transport
.inject_peer_to_channel(channel_peer_id, channel_id.clone())
.await;
assert!(node.is_peer_connected(&channel_peer_id).await);
node.remove_channel(&channel_id).await;
assert!(!node.is_peer_connected(&channel_peer_id).await);
Ok(())
}
#[test]
fn test_normalize_ipv6_wildcard() {
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
let normalized = normalize_wildcard_to_loopback(wildcard);
assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
assert_eq!(normalized.port(), 8080);
}
#[test]
fn test_normalize_ipv4_wildcard() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
let normalized = normalize_wildcard_to_loopback(wildcard);
assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
assert_eq!(normalized.port(), 9000);
}
#[test]
fn test_normalize_specific_address_unchanged() {
let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
let normalized = normalize_wildcard_to_loopback(specific);
assert_eq!(normalized, specific);
}
#[test]
fn test_normalize_loopback_unchanged() {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
assert_eq!(normalized_v6, loopback_v6);
let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
assert_eq!(normalized_v4, loopback_v4);
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
let msg = WireMessage {
protocol: protocol.to_string(),
data,
from: PeerId::from_name(from),
timestamp,
user_agent: String::new(),
public_key: Vec::new(),
signature: Vec::new(),
};
postcard::to_stdvec(&msg).unwrap()
}
#[test]
fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
let transport_id = "abcdef0123456789";
let logical_id = "spoofed-logical-id";
let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
let parsed =
parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
assert!(parsed.authenticated_node_id.is_none());
match parsed.event {
P2PEvent::Message {
topic,
source,
data,
} => {
assert!(source.is_none(), "unsigned message source must be None");
assert_eq!(topic, "test/v1");
assert_eq!(data, vec![1u8, 2, 3]);
}
other => panic!("expected P2PEvent::Message, got {:?}", other),
}
}
#[test]
fn test_parse_protocol_message_rejects_invalid_bytes() {
assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
}
#[test]
fn test_parse_protocol_message_rejects_truncated_message() {
let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
let truncated = &full_bytes[..full_bytes.len() / 2];
assert!(parse_protocol_message(truncated, "peer-id").is_none());
}
#[test]
fn test_parse_protocol_message_empty_payload() {
let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
let parsed = parse_protocol_message(&bytes, "transport-peer")
.expect("valid message with empty data should parse");
match parsed.event {
P2PEvent::Message { data, .. } => assert!(data.is_empty()),
other => panic!("expected P2PEvent::Message, got {:?}", other),
}
}
#[test]
fn test_parse_protocol_message_preserves_binary_payload() {
let payload: Vec<u8> = (0..=255).collect();
let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
let parsed = parse_protocol_message(&bytes, "peer-id")
.expect("valid message with full byte range should parse");
match parsed.event {
P2PEvent::Message { data, topic, .. } => {
assert_eq!(topic, "binary/v1");
assert_eq!(
data, payload,
"payload must survive bincode round-trip exactly"
);
}
other => panic!("expected P2PEvent::Message, got {:?}", other),
}
}
#[test]
fn test_parse_signed_message_verifies_and_uses_node_id() {
let identity = NodeIdentity::generate().expect("should generate identity");
let protocol = "test/signed";
let data: Vec<u8> = vec![10, 20, 30];
let from = *identity.peer_id();
let timestamp = current_timestamp();
let user_agent = "test/1.0";
let signable =
postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
.unwrap();
let sig = identity.sign(&signable).expect("signing should succeed");
let msg = WireMessage {
protocol: protocol.to_string(),
data: data.clone(),
from,
timestamp,
user_agent: user_agent.to_string(),
public_key: identity.public_key().as_bytes().to_vec(),
signature: sig.as_bytes().to_vec(),
};
let bytes = postcard::to_stdvec(&msg).unwrap();
let parsed =
parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
let expected_peer_id = *identity.peer_id();
assert_eq!(
parsed.authenticated_node_id.as_ref(),
Some(&expected_peer_id)
);
match parsed.event {
P2PEvent::Message { source, .. } => {
assert_eq!(
source.as_ref(),
Some(&expected_peer_id),
"source should be the verified PeerId"
);
}
other => panic!("expected P2PEvent::Message, got {:?}", other),
}
}
#[test]
fn test_parse_message_with_bad_signature_is_rejected() {
let identity = NodeIdentity::generate().expect("should generate identity");
let protocol = "test/bad-sig";
let data: Vec<u8> = vec![1, 2, 3];
let from = *identity.peer_id();
let timestamp = current_timestamp();
let user_agent = "test/1.0";
let signable =
postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
.unwrap();
let sig = identity.sign(&signable).expect("signing should succeed");
let msg = WireMessage {
protocol: protocol.to_string(),
data: vec![99, 99, 99],
from,
timestamp,
user_agent: user_agent.to_string(),
public_key: identity.public_key().as_bytes().to_vec(),
signature: sig.as_bytes().to_vec(),
};
let bytes = postcard::to_stdvec(&msg).unwrap();
assert!(
parse_protocol_message(&bytes, "transport-xyz").is_none(),
"message with bad signature should be rejected"
);
}
#[test]
fn test_parse_message_with_mismatched_from_is_rejected() {
let identity = NodeIdentity::generate().expect("should generate identity");
let protocol = "test/from-mismatch";
let data: Vec<u8> = vec![1, 2, 3];
let fake_from = PeerId::from_bytes([0xDE; 32]);
let timestamp = current_timestamp();
let user_agent = "test/1.0";
let signable =
postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
.unwrap();
let sig = identity.sign(&signable).expect("signing should succeed");
let msg = WireMessage {
protocol: protocol.to_string(),
data,
from: fake_from,
timestamp,
user_agent: user_agent.to_string(),
public_key: identity.public_key().as_bytes().to_vec(),
signature: sig.as_bytes().to_vec(),
};
let bytes = postcard::to_stdvec(&msg).unwrap();
assert!(
parse_protocol_message(&bytes, "transport-xyz").is_none(),
"message with mismatched from field should be rejected"
);
}
}