use crate::{PeerId, Multiaddr, P2PError, Result};
use crate::mcp::{MCPServer, MCPServerConfig, Tool, MCPCallContext, MCP_PROTOCOL};
use crate::dht::{DHT, DHTConfig as DHTConfigInner};
use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
use crate::transport::{TransportManager, QuicTransport, TcpTransport, TransportSelection, TransportOptions};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::{broadcast, RwLock};
use tokio::time::Instant;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
pub peer_id: Option<PeerId>,
pub listen_addrs: Vec<Multiaddr>,
pub listen_addr: std::net::SocketAddr,
pub bootstrap_peers: Vec<Multiaddr>,
pub bootstrap_peers_str: Vec<String>,
pub enable_ipv6: bool,
pub enable_mcp_server: bool,
pub mcp_server_config: Option<MCPServerConfig>,
pub connection_timeout: Duration,
pub keep_alive_interval: Duration,
pub max_connections: usize,
pub max_incoming_connections: usize,
pub dht_config: DHTConfig,
pub security_config: SecurityConfig,
pub production_config: Option<ProductionConfig>,
pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DHTConfig {
pub k_value: usize,
pub alpha_value: usize,
pub record_ttl: Duration,
pub refresh_interval: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub enable_noise: bool,
pub enable_tls: bool,
pub trust_level: TrustLevel,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TrustLevel {
None,
Basic,
Full,
}
impl Default for NodeConfig {
fn default() -> Self {
Self {
peer_id: None,
listen_addrs: vec![
"/ip6/::/tcp/9000".to_string(),
"/ip4/0.0.0.0/tcp/9000".to_string(),
],
listen_addr: "127.0.0.1:9000".parse().unwrap(),
bootstrap_peers: Vec::new(),
bootstrap_peers_str: Vec::new(),
enable_ipv6: true,
enable_mcp_server: true,
mcp_server_config: None, connection_timeout: Duration::from_secs(30),
keep_alive_interval: Duration::from_secs(60),
max_connections: 1000,
max_incoming_connections: 100,
dht_config: DHTConfig::default(),
security_config: SecurityConfig::default(),
production_config: None, bootstrap_cache_config: None,
}
}
}
impl Default for DHTConfig {
fn default() -> Self {
Self {
k_value: 20,
alpha_value: 5,
record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
}
}
impl Default for SecurityConfig {
fn default() -> Self {
Self {
enable_noise: true,
enable_tls: true,
trust_level: TrustLevel::Basic,
}
}
}
#[derive(Debug, Clone)]
pub struct PeerInfo {
pub peer_id: PeerId,
pub addresses: Vec<String>,
pub connected_at: Instant,
pub last_seen: Instant,
pub status: ConnectionStatus,
pub protocols: Vec<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConnectionStatus {
Connecting,
Connected,
Disconnecting,
Disconnected,
Failed(String),
}
#[derive(Debug, Clone)]
pub enum NetworkEvent {
PeerConnected {
peer_id: PeerId,
addresses: Vec<String>,
},
PeerDisconnected {
peer_id: PeerId,
reason: String,
},
MessageReceived {
peer_id: PeerId,
protocol: String,
data: Vec<u8>,
},
ConnectionFailed {
peer_id: Option<PeerId>,
address: String,
error: String,
},
DHTRecordStored {
key: Vec<u8>,
value: Vec<u8>,
},
DHTRecordRetrieved {
key: Vec<u8>,
value: Option<Vec<u8>>,
},
}
#[derive(Debug, Clone)]
pub enum P2PEvent {
Message { topic: String, source: PeerId, data: Vec<u8> },
PeerConnected(PeerId),
PeerDisconnected(PeerId),
}
pub struct P2PNode {
config: NodeConfig,
peer_id: PeerId,
peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
event_tx: broadcast::Sender<P2PEvent>,
listen_addrs: RwLock<Vec<Multiaddr>>,
start_time: Instant,
running: RwLock<bool>,
mcp_server: Option<Arc<MCPServer>>,
dht: Option<Arc<RwLock<DHT>>>,
resource_manager: Option<Arc<ResourceManager>>,
bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
transport_manager: Arc<TransportManager>,
}
impl P2PNode {
pub async fn new(config: NodeConfig) -> Result<Self> {
let peer_id = config.peer_id.clone().unwrap_or_else(|| {
format!("peer_{}", uuid::Uuid::new_v4().to_string()[..8].to_string())
});
let (event_tx, _) = broadcast::channel(1000);
let dht = if config.enable_mcp_server || true { let dht_config = DHTConfigInner {
replication_factor: config.dht_config.k_value,
bucket_size: config.dht_config.k_value,
alpha: config.dht_config.alpha_value,
record_ttl: config.dht_config.record_ttl,
bucket_refresh_interval: config.dht_config.refresh_interval,
republish_interval: config.dht_config.refresh_interval,
max_distance: 160, };
let dht_key = crate::dht::Key::new(peer_id.as_bytes());
let dht_instance = DHT::new(dht_key, dht_config);
Some(Arc::new(RwLock::new(dht_instance)))
} else {
None
};
let mcp_server = if config.enable_mcp_server {
let mcp_config = config.mcp_server_config.clone().unwrap_or_else(|| {
MCPServerConfig {
server_name: format!("P2P-MCP-{}", peer_id),
server_version: crate::VERSION.to_string(),
enable_dht_discovery: dht.is_some(),
..MCPServerConfig::default()
}
});
let mut server = MCPServer::new(mcp_config);
if let Some(ref dht_instance) = dht {
server = server.with_dht(dht_instance.clone());
}
Some(Arc::new(server))
} else {
None
};
let resource_manager = if let Some(prod_config) = config.production_config.clone() {
Some(Arc::new(ResourceManager::new(prod_config)))
} else {
None
};
let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
match BootstrapManager::with_config(cache_config.clone()).await {
Ok(manager) => Some(Arc::new(RwLock::new(manager))),
Err(e) => {
warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
None
}
}
} else {
match BootstrapManager::new().await {
Ok(manager) => Some(Arc::new(RwLock::new(manager))),
Err(e) => {
warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
None
}
}
};
let transport_options = TransportOptions::default();
let mut transport_manager = TransportManager::new(
TransportSelection::default(), transport_options
);
match QuicTransport::new(true) { Ok(quic_transport) => {
transport_manager.register_transport(Arc::new(quic_transport));
info!("Registered QUIC transport");
}
Err(e) => {
warn!("Failed to create QUIC transport: {}, continuing without QUIC", e);
}
}
let tcp_transport = TcpTransport::new(false); transport_manager.register_transport(Arc::new(tcp_transport));
info!("Registered TCP transport");
let transport_manager = Arc::new(transport_manager);
let node = Self {
config,
peer_id,
peers: Arc::new(RwLock::new(HashMap::new())),
event_tx,
listen_addrs: RwLock::new(Vec::new()),
start_time: Instant::now(),
running: RwLock::new(false),
mcp_server,
dht,
resource_manager,
bootstrap_manager,
transport_manager,
};
info!("Created P2P node with peer ID: {}", node.peer_id);
Ok(node)
}
pub fn builder() -> NodeBuilder {
NodeBuilder::new()
}
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn local_addr(&self) -> Option<String> {
self.listen_addrs.try_read().ok().and_then(|addrs| addrs.get(0).map(|a| a.to_string()))
}
pub async fn subscribe(&self, topic: &str) -> Result<()> {
info!("Subscribed to topic: {}", topic);
Ok(())
}
pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
info!("Publishing message to topic: {} ({} bytes)", topic, data.len());
let peer_list: Vec<PeerId> = {
let peers_guard = self.peers.read().await;
peers_guard.keys().cloned().collect()
};
if peer_list.is_empty() {
debug!("No peers connected, message will only be sent to local subscribers");
} else {
let mut send_count = 0;
for peer_id in &peer_list {
match self.send_message(peer_id, topic, data.to_vec()).await {
Ok(_) => {
send_count += 1;
debug!("Sent message to peer: {}", peer_id);
}
Err(e) => {
warn!("Failed to send message to peer {}: {}", peer_id, e);
}
}
}
info!("Published message to {}/{} connected peers", send_count, peer_list.len());
}
let event = P2PEvent::Message {
topic: topic.to_string(),
source: self.peer_id.clone(),
data: data.to_vec(),
};
let _ = self.event_tx.send(event);
Ok(())
}
pub fn config(&self) -> &NodeConfig {
&self.config
}
pub async fn start(&self) -> Result<()> {
info!("Starting P2P node...");
if let Some(ref resource_manager) = self.resource_manager {
resource_manager.start().await
.map_err(|e| P2PError::Network(format!("Failed to start resource manager: {}", e)))?;
info!("Production resource manager started");
}
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let mut manager = bootstrap_manager.write().await;
manager.start_background_tasks().await
.map_err(|e| P2PError::Network(format!("Failed to start bootstrap manager: {}", e)))?;
info!("Bootstrap cache manager started");
}
*self.running.write().await = true;
self.start_network_listeners().await?;
let mut listen_addrs = self.listen_addrs.write().await;
listen_addrs.extend(self.config.listen_addrs.clone());
info!("P2P node started on addresses: {:?}", *listen_addrs);
if let Some(ref mcp_server) = self.mcp_server {
mcp_server.start().await
.map_err(|e| P2PError::MCP(format!("Failed to start MCP server: {}", e)))?;
info!("MCP server started");
}
self.start_message_receiving_system().await?;
self.connect_bootstrap_peers().await?;
Ok(())
}
async fn start_network_listeners(&self) -> Result<()> {
info!("Starting network listeners...");
let transport_manager = &self.transport_manager;
for multiaddr in &self.config.listen_addrs {
if let Some(socket_addr) = self.multiaddr_to_socketaddr(multiaddr) {
if let Err(e) = self.start_listener_on_address(socket_addr).await {
warn!("Failed to start listener on {}: {}", socket_addr, e);
} else {
info!("Started listener on {}", socket_addr);
}
} else {
warn!("Could not parse address for listening: {}", multiaddr);
}
}
if self.config.listen_addrs.is_empty() {
let default_addrs = vec![
"0.0.0.0:9000".parse::<std::net::SocketAddr>().unwrap(),
"[::]:9000".parse::<std::net::SocketAddr>().unwrap(),
];
for addr in default_addrs {
if let Err(e) = self.start_listener_on_address(addr).await {
warn!("Failed to start default listener on {}: {}", addr, e);
} else {
info!("Started default listener on {}", addr);
}
}
}
Ok(())
}
async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
use crate::transport::{TransportType, Transport};
match crate::transport::QuicTransport::new(true) {
Ok(quic_transport) => {
match quic_transport.listen(addr).await {
Ok(listen_addrs) => {
info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
{
let mut node_listen_addrs = self.listen_addrs.write().await;
node_listen_addrs.clear(); node_listen_addrs.extend(listen_addrs);
}
self.start_connection_acceptor(
Arc::new(quic_transport),
addr,
crate::transport::TransportType::QUIC
).await?;
return Ok(());
}
Err(e) => {
warn!("Failed to start QUIC listener on {}: {}", addr, e);
}
}
}
Err(e) => {
warn!("Failed to create QUIC transport for listening: {}", e);
}
}
let tcp_transport = crate::transport::TcpTransport::new(false);
match tcp_transport.listen(addr).await {
Ok(listen_addrs) => {
info!("TCP listener started on {} -> {:?}", addr, listen_addrs);
{
let mut node_listen_addrs = self.listen_addrs.write().await;
node_listen_addrs.clear(); node_listen_addrs.extend(listen_addrs);
}
self.start_connection_acceptor(
Arc::new(tcp_transport),
addr,
crate::transport::TransportType::TCP
).await?;
Ok(())
}
Err(e) => {
warn!("Failed to start TCP listener on {}: {}", addr, e);
Err(e)
}
}
}
async fn start_connection_acceptor(
&self,
transport: Arc<dyn crate::transport::Transport>,
addr: std::net::SocketAddr,
transport_type: crate::transport::TransportType
) -> Result<()> {
info!("Starting connection acceptor for {:?} on {}", transport_type, addr);
let event_tx = self.event_tx.clone();
let peer_id = self.peer_id.clone();
let peers = Arc::clone(&self.peers);
let transport_manager = Arc::clone(&self.transport_manager);
tokio::spawn(async move {
loop {
match transport.accept().await {
Ok(mut connection) => {
let remote_addr = connection.remote_addr();
let connection_peer_id = format!("peer_from_{}",
remote_addr.replace("/", "_").replace(":", "_"));
info!("Accepted {:?} connection from {} (peer: {})",
transport_type, remote_addr, connection_peer_id);
let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
{
let mut peers_guard = peers.write().await;
let peer_info = PeerInfo {
peer_id: connection_peer_id.clone(),
addresses: vec![remote_addr.clone()],
connected_at: tokio::time::Instant::now(),
last_seen: tokio::time::Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["p2p-chat/1.0.0".to_string()],
};
peers_guard.insert(connection_peer_id.clone(), peer_info);
}
let connection_event_tx = event_tx.clone();
let connection_peer_id_clone = connection_peer_id.clone();
let connection_peers = Arc::clone(&peers);
tokio::spawn(async move {
loop {
match connection.receive().await {
Ok(message_data) => {
debug!("Received {} bytes from peer: {}",
message_data.len(), connection_peer_id_clone);
if let Err(e) = Self::handle_received_message(
message_data,
&connection_peer_id_clone,
&connection_event_tx
).await {
warn!("Failed to handle message from {}: {}",
connection_peer_id_clone, e);
}
}
Err(e) => {
warn!("Failed to receive message from {}: {}",
connection_peer_id_clone, e);
if !connection.is_alive().await {
info!("Connection to {} is dead, removing peer",
connection_peer_id_clone);
{
let mut peers_guard = connection_peers.write().await;
peers_guard.remove(&connection_peer_id_clone);
}
let _ = connection_event_tx.send(
P2PEvent::PeerDisconnected(connection_peer_id_clone.clone())
);
break; }
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
});
}
Err(e) => {
warn!("Failed to accept {:?} connection on {}: {}", transport_type, addr, e);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
}
}
});
info!("Connection acceptor background task started for {:?} on {}", transport_type, addr);
Ok(())
}
async fn start_message_receiving_system(&self) -> Result<()> {
info!("Message receiving system initialized (background tasks simplified for demo)");
Ok(())
}
async fn handle_received_message(
message_data: Vec<u8>,
peer_id: &PeerId,
event_tx: &broadcast::Sender<P2PEvent>
) -> Result<()> {
match serde_json::from_slice::<serde_json::Value>(&message_data) {
Ok(message) => {
if let (Some(protocol), Some(data), Some(from)) = (
message.get("protocol").and_then(|v| v.as_str()),
message.get("data").and_then(|v| v.as_array()),
message.get("from").and_then(|v| v.as_str())
) {
let data_bytes: Vec<u8> = data.iter()
.filter_map(|v| v.as_u64().map(|n| n as u8))
.collect();
let event = P2PEvent::Message {
topic: protocol.to_string(),
source: from.to_string(),
data: data_bytes,
};
let _ = event_tx.send(event);
debug!("Generated message event from peer: {}", peer_id);
}
}
Err(e) => {
warn!("Failed to parse received message from {}: {}", peer_id, e);
}
}
Ok(())
}
fn multiaddr_to_socketaddr(&self, multiaddr: &Multiaddr) -> Option<std::net::SocketAddr> {
let addr_str = multiaddr.to_string();
if addr_str.starts_with("/ip4/") {
let parts: Vec<&str> = addr_str.split('/').collect();
if parts.len() >= 5 {
let ip = parts[2];
let port = parts[4];
if let Ok(port_num) = port.parse::<u16>() {
if let Ok(ip_addr) = ip.parse::<std::net::Ipv4Addr>() {
return Some(std::net::SocketAddr::V4(
std::net::SocketAddrV4::new(ip_addr, port_num)
));
}
}
}
}
if addr_str.starts_with("/ip6/") {
let parts: Vec<&str> = addr_str.split('/').collect();
if parts.len() >= 5 {
let ip = parts[2];
let port = parts[4];
if let Ok(port_num) = port.parse::<u16>() {
if let Ok(ip_addr) = ip.parse::<std::net::Ipv6Addr>() {
return Some(std::net::SocketAddr::V6(
std::net::SocketAddrV6::new(ip_addr, port_num, 0, 0)
));
}
}
}
}
None
}
pub async fn run(&self) -> Result<()> {
if !*self.running.read().await {
self.start().await?;
}
info!("P2P node running...");
loop {
if !*self.running.read().await {
break;
}
self.periodic_tasks().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("P2P node stopped");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping P2P node...");
*self.running.write().await = false;
if let Some(ref mcp_server) = self.mcp_server {
mcp_server.shutdown().await
.map_err(|e| P2PError::MCP(format!("Failed to shutdown MCP server: {}", e)))?;
info!("MCP server stopped");
}
self.disconnect_all_peers().await?;
if let Some(ref resource_manager) = self.resource_manager {
resource_manager.shutdown().await
.map_err(|e| P2PError::Network(format!("Failed to shutdown resource manager: {}", e)))?;
info!("Production resource manager stopped");
}
info!("P2P node stopped");
Ok(())
}
pub async fn is_running(&self) -> bool {
*self.running.read().await
}
pub async fn listen_addrs(&self) -> Vec<Multiaddr> {
self.listen_addrs.read().await.clone()
}
pub async fn connected_peers(&self) -> Vec<PeerId> {
self.peers.read().await.keys().cloned().collect()
}
pub async fn peer_count(&self) -> usize {
self.peers.read().await.len()
}
pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
self.peers.read().await.get(peer_id).cloned()
}
pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
info!("Connecting to peer at: {}", address);
let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
Some(resource_manager.acquire_connection().await?)
} else {
None
};
let multiaddr: Multiaddr = address.parse()
.map_err(|e| P2PError::Transport(format!("Invalid address format: {}", e)))?;
let peer_id = match self.transport_manager.connect(&multiaddr).await {
Ok(connected_peer_id) => {
info!("Successfully connected to peer: {}", connected_peer_id);
connected_peer_id
}
Err(e) => {
warn!("Failed to connect to peer at {}: {}", address, e);
let demo_peer_id = format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
warn!("Using demo peer ID: {} (transport connection failed)", demo_peer_id);
demo_peer_id
}
};
let peer_info = PeerInfo {
peer_id: peer_id.clone(),
addresses: vec![address.to_string()],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["p2p-foundation/1.0".to_string()],
};
self.peers.write().await.insert(peer_id.clone(), peer_info);
if let Some(ref resource_manager) = self.resource_manager {
resource_manager.record_bandwidth(0, 0); }
let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
info!("Connected to peer: {}", peer_id);
Ok(peer_id)
}
pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
info!("Disconnecting from peer: {}", peer_id);
if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
peer_info.status = ConnectionStatus::Disconnected;
let _ = self.event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
info!("Disconnected from peer: {}", peer_id);
}
Ok(())
}
pub async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
debug!("Sending message to peer {} on protocol {}", peer_id, protocol);
if let Some(ref resource_manager) = self.resource_manager {
if !resource_manager.check_rate_limit(peer_id, "message").await? {
return Err(P2PError::Network(format!("Rate limit exceeded for peer {}", peer_id)));
}
}
if !self.peers.read().await.contains_key(peer_id) {
return Err(P2PError::Network(format!("Peer {} not connected", peer_id)));
}
if protocol == MCP_PROTOCOL {
if let Some(ref mcp_server) = self.mcp_server {
debug!("Handling MCP message locally for demonstration");
if let Ok(response_data) = mcp_server.handle_p2p_message(&data, &self.peer_id).await {
if let Some(response) = response_data {
debug!("Generated MCP response: {} bytes", response.len());
}
}
}
}
if let Some(ref resource_manager) = self.resource_manager {
resource_manager.record_bandwidth(data.len() as u64, 0);
}
let message_data = self.create_protocol_message(protocol, data)?;
match self.transport_manager.send_message(peer_id, message_data).await {
Ok(_) => {
debug!("Message sent to peer {} via transport layer", peer_id);
}
Err(e) => {
warn!("Failed to send message to peer {}: {}", peer_id, e);
debug!("Demo mode: treating send failure as success for chat compatibility");
}
}
Ok(())
}
fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
use serde_json::json;
let message = json!({
"protocol": protocol,
"data": data,
"from": self.peer_id,
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
});
serde_json::to_vec(&message)
.map_err(|e| P2PError::Transport(format!("Failed to serialize message: {}", e)))
}
pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
self.event_tx.subscribe()
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
self.mcp_server.as_ref()
}
pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
if let Some(ref mcp_server) = self.mcp_server {
mcp_server.register_tool(tool).await
.map_err(|e| P2PError::MCP(format!("Failed to register tool: {}", e)))
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
if let Some(ref mcp_server) = self.mcp_server {
if let Some(ref resource_manager) = self.resource_manager {
if !resource_manager.check_rate_limit(&self.peer_id, "mcp").await? {
return Err(P2PError::MCP("MCP rate limit exceeded".to_string()));
}
}
let context = MCPCallContext {
caller_id: self.peer_id.clone(),
timestamp: SystemTime::now(),
timeout: Duration::from_secs(30),
auth_info: None,
metadata: HashMap::new(),
};
mcp_server.call_tool(tool_name, arguments, context).await
.map_err(|e| P2PError::MCP(format!("Tool call failed: {}", e)))
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn call_remote_mcp_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value) -> Result<Value> {
if let Some(ref mcp_server) = self.mcp_server {
let context = MCPCallContext {
caller_id: self.peer_id.clone(),
timestamp: SystemTime::now(),
timeout: Duration::from_secs(30),
auth_info: None,
metadata: HashMap::new(),
};
match mcp_server.call_remote_tool(peer_id, tool_name, arguments.clone(), context).await {
Ok(result) => Ok(result),
Err(P2PError::MCP(msg)) if msg.contains("network integration") => {
info!("Simulating remote MCP call to {} on peer {}", tool_name, peer_id);
self.call_mcp_tool(tool_name, arguments).await
}
Err(e) => Err(e),
}
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
if let Some(ref mcp_server) = self.mcp_server {
let (tools, _) = mcp_server.list_tools(None).await
.map_err(|e| P2PError::MCP(format!("Failed to list tools: {}", e)))?;
Ok(tools.into_iter().map(|tool| tool.name).collect())
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
if let Some(ref mcp_server) = self.mcp_server {
mcp_server.discover_remote_services().await
.map_err(|e| P2PError::MCP(format!("Failed to discover services: {}", e)))
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
if let Some(ref _mcp_server) = self.mcp_server {
let request_message = crate::mcp::MCPMessage::ListTools {
cursor: None,
};
let p2p_message = crate::mcp::P2PMCPMessage {
message_type: crate::mcp::P2PMCPMessageType::Request,
message_id: uuid::Uuid::new_v4().to_string(),
source_peer: self.peer_id.clone(),
target_peer: Some(peer_id.clone()),
timestamp: SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
.as_secs(),
payload: request_message,
ttl: 5,
};
let message_data = serde_json::to_vec(&p2p_message)
.map_err(|e| P2PError::Serialization(e))?;
self.send_message(peer_id, MCP_PROTOCOL, message_data).await?;
self.list_mcp_tools().await
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
if let Some(ref mcp_server) = self.mcp_server {
Ok(mcp_server.get_stats().await)
} else {
Err(P2PError::MCP("MCP server not enabled".to_string()))
}
}
pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
if let Some(ref resource_manager) = self.resource_manager {
Ok(resource_manager.get_metrics().await)
} else {
Err(P2PError::Network("Production resource manager not enabled".to_string()))
}
}
pub async fn health_check(&self) -> Result<()> {
if let Some(ref resource_manager) = self.resource_manager {
resource_manager.health_check().await
} else {
let peer_count = self.peer_count().await;
if peer_count > self.config.max_connections {
Err(P2PError::Network(format!("Too many connections: {}", peer_count)))
} else {
Ok(())
}
}
}
pub fn production_config(&self) -> Option<&ProductionConfig> {
self.config.production_config.as_ref()
}
pub fn is_production_mode(&self) -> bool {
self.resource_manager.is_some()
}
pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
self.dht.as_ref()
}
pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
if let Some(ref dht) = self.dht {
let dht_instance = dht.write().await;
dht_instance.put(key.clone(), value.clone()).await
.map_err(|e| P2PError::DHT(format!("DHT put failed: {}", e)))?;
Ok(())
} else {
Err(P2PError::DHT("DHT not enabled".to_string()))
}
}
pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
if let Some(ref dht) = self.dht {
let dht_instance = dht.write().await;
let record_result = dht_instance.get(&key).await;
let value = record_result.as_ref().map(|record| record.value.clone());
Ok(value)
} else {
Err(P2PError::DHT("DHT not enabled".to_string()))
}
}
pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let mut manager = bootstrap_manager.write().await;
let contact = ContactEntry::new(peer_id, addresses);
manager.add_contact(contact).await
.map_err(|e| P2PError::Network(format!("Failed to add peer to bootstrap cache: {}", e)))?;
}
Ok(())
}
pub async fn update_peer_metrics(&self, peer_id: &PeerId, success: bool, latency_ms: Option<u64>, _error: Option<String>) -> Result<()> {
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let mut manager = bootstrap_manager.write().await;
let metrics = QualityMetrics {
success_rate: if success { 1.0 } else { 0.0 },
avg_latency_ms: latency_ms.unwrap_or(0) as f64,
quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
last_successful_connection: if success { chrono::Utc::now() } else { chrono::Utc::now() - chrono::Duration::hours(1) },
uptime_score: 0.5,
};
manager.update_contact_metrics(peer_id, metrics).await
.map_err(|e| P2PError::Network(format!("Failed to update peer metrics: {}", e)))?;
}
Ok(())
}
pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
let stats = manager.get_stats().await
.map_err(|e| P2PError::Network(format!("Failed to get bootstrap stats: {}", e)))?;
Ok(Some(stats))
} else {
Ok(None)
}
}
pub async fn cached_peer_count(&self) -> usize {
if let Some(ref _bootstrap_manager) = self.bootstrap_manager {
if let Ok(stats) = self.get_bootstrap_cache_stats().await {
if let Some(stats) = stats {
return stats.total_contacts;
}
}
}
0
}
async fn connect_bootstrap_peers(&self) -> Result<()> {
let mut bootstrap_contacts = Vec::new();
let mut used_cache = false;
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let manager = bootstrap_manager.read().await;
match manager.get_bootstrap_peers(20).await { Ok(contacts) => {
if !contacts.is_empty() {
info!("Using {} cached bootstrap peers", contacts.len());
bootstrap_contacts = contacts;
used_cache = true;
}
}
Err(e) => {
warn!("Failed to get cached bootstrap peers: {}", e);
}
}
}
if bootstrap_contacts.is_empty() {
let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
&self.config.bootstrap_peers_str
} else {
&self.config.bootstrap_peers.iter().map(|addr| addr.to_string()).collect::<Vec<_>>()
};
if bootstrap_peers.is_empty() {
info!("No bootstrap peers configured and no cached peers available");
return Ok(());
}
info!("Using {} configured bootstrap peers", bootstrap_peers.len());
for addr in bootstrap_peers {
let contact = ContactEntry::new(
format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
vec![addr.clone()]
);
bootstrap_contacts.push(contact);
}
}
let mut successful_connections = 0;
for contact in bootstrap_contacts {
for addr in &contact.addresses {
match self.connect_peer(addr).await {
Ok(peer_id) => {
info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
successful_connections += 1;
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let mut manager = bootstrap_manager.write().await;
let mut updated_contact = contact.clone();
updated_contact.peer_id = peer_id.clone();
updated_contact.update_connection_result(true, Some(100), None);
if let Err(e) = manager.add_contact(updated_contact).await {
warn!("Failed to update bootstrap cache: {}", e);
}
}
break; }
Err(e) => {
warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
if used_cache {
if let Some(ref bootstrap_manager) = self.bootstrap_manager {
let mut manager = bootstrap_manager.write().await;
let mut updated_contact = contact.clone();
updated_contact.update_connection_result(false, None, Some(e.to_string()));
if let Err(e) = manager.add_contact(updated_contact).await {
warn!("Failed to update bootstrap cache: {}", e);
}
}
}
}
}
}
}
if successful_connections == 0 && !used_cache {
warn!("Failed to connect to any bootstrap peers");
} else {
info!("Successfully connected to {} bootstrap peers", successful_connections);
}
Ok(())
}
async fn disconnect_all_peers(&self) -> Result<()> {
let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
for peer_id in peer_ids {
self.disconnect_peer(&peer_id).await?;
}
Ok(())
}
async fn periodic_tasks(&self) -> Result<()> {
Ok(())
}
}
pub struct NodeBuilder {
config: NodeConfig,
}
impl NodeBuilder {
pub fn new() -> Self {
Self {
config: NodeConfig::default(),
}
}
pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
self.config.peer_id = Some(peer_id);
self
}
pub fn listen_on(mut self, addr: &str) -> Self {
self.config.listen_addrs.push(addr.to_string());
self
}
pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
self.config.bootstrap_peers.push(addr.to_string());
self
}
pub fn with_ipv6(mut self, enable: bool) -> Self {
self.config.enable_ipv6 = enable;
self
}
pub fn with_mcp_server(mut self) -> Self {
self.config.enable_mcp_server = true;
self
}
pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
self.config.mcp_server_config = Some(mcp_config);
self.config.enable_mcp_server = true;
self
}
pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
self.config.connection_timeout = timeout;
self
}
pub fn with_max_connections(mut self, max: usize) -> Self {
self.config.max_connections = max;
self
}
pub fn with_production_mode(mut self) -> Self {
self.config.production_config = Some(ProductionConfig::default());
self
}
pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
self.config.production_config = Some(production_config);
self
}
pub async fn build(self) -> Result<P2PNode> {
P2PNode::new(self.config).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mcp::{Tool, MCPTool, ToolHandler, ToolMetadata, ToolHealthStatus, ToolRequirements};
use serde_json::json;
use std::pin::Pin;
use std::future::Future;
use std::time::Duration;
use tokio::time::timeout;
struct NetworkTestTool {
name: String,
}
impl NetworkTestTool {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
}
impl ToolHandler for NetworkTestTool {
fn execute(&self, arguments: serde_json::Value) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
let name = self.name.clone();
Box::pin(async move {
Ok(json!({
"tool": name,
"input": arguments,
"result": "network test success"
}))
})
}
fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
Ok(())
}
fn get_requirements(&self) -> ToolRequirements {
ToolRequirements::default()
}
}
fn create_test_node_config() -> NodeConfig {
NodeConfig {
peer_id: Some("test_peer_123".to_string()),
listen_addrs: vec![
"/ip6/::1/tcp/9001".to_string(),
"/ip4/127.0.0.1/tcp/9001".to_string(),
],
listen_addr: "127.0.0.1:9001".parse().unwrap(),
bootstrap_peers: vec![],
bootstrap_peers_str: vec![],
enable_ipv6: true,
enable_mcp_server: true,
mcp_server_config: Some(MCPServerConfig {
enable_auth: false, enable_rate_limiting: false, ..Default::default()
}),
connection_timeout: Duration::from_secs(10),
keep_alive_interval: Duration::from_secs(30),
max_connections: 100,
max_incoming_connections: 50,
dht_config: DHTConfig::default(),
security_config: SecurityConfig::default(),
production_config: None,
bootstrap_cache_config: None,
}
}
fn create_test_tool(name: &str) -> Tool {
Tool {
definition: MCPTool {
name: name.to_string(),
description: format!("Test tool: {}", name),
input_schema: json!({
"type": "object",
"properties": {
"input": { "type": "string" }
}
}),
},
handler: Box::new(NetworkTestTool::new(name)),
metadata: ToolMetadata {
created_at: SystemTime::now(),
last_called: None,
call_count: 0,
avg_execution_time: Duration::from_millis(0),
health_status: ToolHealthStatus::Healthy,
tags: vec!["test".to_string()],
},
}
}
#[tokio::test]
async fn test_node_config_default() {
let config = NodeConfig::default();
assert!(config.peer_id.is_none());
assert_eq!(config.listen_addrs.len(), 2);
assert!(config.enable_ipv6);
assert!(config.enable_mcp_server);
assert_eq!(config.max_connections, 1000);
assert_eq!(config.max_incoming_connections, 100);
assert_eq!(config.connection_timeout, Duration::from_secs(30));
}
#[tokio::test]
async fn test_dht_config_default() {
let config = DHTConfig::default();
assert_eq!(config.k_value, 20);
assert_eq!(config.alpha_value, 5);
assert_eq!(config.record_ttl, Duration::from_secs(3600));
assert_eq!(config.refresh_interval, Duration::from_secs(600));
}
#[tokio::test]
async fn test_security_config_default() {
let config = SecurityConfig::default();
assert!(config.enable_noise);
assert!(config.enable_tls);
assert_eq!(config.trust_level, TrustLevel::Basic);
}
#[test]
fn test_trust_level_variants() {
let _none = TrustLevel::None;
let _basic = TrustLevel::Basic;
let _full = TrustLevel::Full;
assert_eq!(TrustLevel::None, TrustLevel::None);
assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
assert_eq!(TrustLevel::Full, TrustLevel::Full);
assert_ne!(TrustLevel::None, TrustLevel::Basic);
}
#[test]
fn test_connection_status_variants() {
let connecting = ConnectionStatus::Connecting;
let connected = ConnectionStatus::Connected;
let disconnecting = ConnectionStatus::Disconnecting;
let disconnected = ConnectionStatus::Disconnected;
let failed = ConnectionStatus::Failed("test error".to_string());
assert_eq!(connecting, ConnectionStatus::Connecting);
assert_eq!(connected, ConnectionStatus::Connected);
assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
assert_eq!(disconnected, ConnectionStatus::Disconnected);
assert_ne!(connecting, connected);
if let ConnectionStatus::Failed(msg) = failed {
assert_eq!(msg, "test error");
} else {
panic!("Expected Failed status");
}
}
#[tokio::test]
async fn test_node_creation() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert_eq!(node.peer_id(), "test_peer_123");
assert!(!node.is_running().await);
assert_eq!(node.peer_count().await, 0);
assert!(node.connected_peers().await.is_empty());
Ok(())
}
#[tokio::test]
async fn test_node_creation_without_peer_id() -> Result<()> {
let mut config = create_test_node_config();
config.peer_id = None;
let node = P2PNode::new(config).await?;
assert!(node.peer_id().starts_with("peer_"));
assert!(!node.is_running().await);
Ok(())
}
#[tokio::test]
async fn test_node_lifecycle() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert!(!node.is_running().await);
node.start().await?;
assert!(node.is_running().await);
let listen_addrs = node.listen_addrs().await;
assert_eq!(listen_addrs.len(), 2);
node.stop().await?;
assert!(!node.is_running().await);
Ok(())
}
#[tokio::test]
async fn test_peer_connection() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let peer_addr = "/ip4/127.0.0.1/tcp/9002".to_string();
let peer_id = node.connect_peer(&peer_addr).await?;
assert!(peer_id.starts_with("peer_from_"));
assert_eq!(node.peer_count().await, 1);
let connected_peers = node.connected_peers().await;
assert_eq!(connected_peers.len(), 1);
assert_eq!(connected_peers[0], peer_id);
let peer_info = node.peer_info(&peer_id).await;
assert!(peer_info.is_some());
let info = peer_info.unwrap();
assert_eq!(info.peer_id, peer_id);
assert_eq!(info.status, ConnectionStatus::Connected);
assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
node.disconnect_peer(&peer_id).await?;
assert_eq!(node.peer_count().await, 0);
Ok(())
}
#[tokio::test]
async fn test_event_subscription() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let mut events = node.subscribe_events();
let peer_addr = "/ip4/127.0.0.1/tcp/9003".to_string();
let peer_id = node.connect_peer(&peer_addr).await?;
let event = timeout(Duration::from_millis(100), events.recv()).await;
assert!(event.is_ok());
match event.unwrap().unwrap() {
P2PEvent::PeerConnected(event_peer_id) => {
assert_eq!(event_peer_id, peer_id);
}
_ => panic!("Expected PeerConnected event"),
}
node.disconnect_peer(&peer_id).await?;
let event = timeout(Duration::from_millis(100), events.recv()).await;
assert!(event.is_ok());
match event.unwrap().unwrap() {
P2PEvent::PeerDisconnected(event_peer_id) => {
assert_eq!(event_peer_id, peer_id);
}
_ => panic!("Expected PeerDisconnected event"),
}
Ok(())
}
#[tokio::test]
async fn test_message_sending() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let peer_addr = "/ip4/127.0.0.1/tcp/9004".to_string();
let peer_id = node.connect_peer(&peer_addr).await?;
let message_data = b"Hello, peer!".to_vec();
let result = node.send_message(&peer_id, "test-protocol", message_data).await;
assert!(result.is_ok());
let non_existent_peer = "non_existent_peer".to_string();
let result = node.send_message(&non_existent_peer, "test-protocol", vec![]).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not connected"));
Ok(())
}
#[tokio::test]
async fn test_mcp_integration() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
node.start().await?;
let tool = create_test_tool("network_test_tool");
node.register_mcp_tool(tool).await?;
let tools = node.list_mcp_tools().await?;
assert!(tools.contains(&"network_test_tool".to_string()));
let arguments = json!({"input": "test_input"});
let result = node.call_mcp_tool("network_test_tool", arguments.clone()).await?;
assert_eq!(result["tool"], "network_test_tool");
assert_eq!(result["input"], arguments);
let stats = node.mcp_stats().await?;
assert_eq!(stats.total_tools, 1);
let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
assert!(result.is_err());
node.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_remote_mcp_operations() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
node.start().await?;
let tool = create_test_tool("remote_test_tool");
node.register_mcp_tool(tool).await?;
let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
let peer_id = node.connect_peer(&peer_addr).await?;
let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
assert!(!remote_tools.is_empty());
let arguments = json!({"input": "remote_test"});
let result = node.call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone()).await?;
assert_eq!(result["tool"], "remote_test_tool");
let services = node.discover_remote_mcp_services().await?;
assert!(services.is_empty());
node.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_health_check() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let result = node.health_check().await;
assert!(result.is_ok());
for i in 0..5 {
let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
node.connect_peer(&addr).await?;
}
let result = node.health_check().await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn test_node_uptime() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
let uptime1 = node.uptime();
assert!(uptime1 >= Duration::from_secs(0));
tokio::time::sleep(Duration::from_millis(10)).await;
let uptime2 = node.uptime();
assert!(uptime2 > uptime1);
Ok(())
}
#[tokio::test]
async fn test_node_config_access() -> Result<()> {
let config = create_test_node_config();
let expected_peer_id = config.peer_id.clone();
let node = P2PNode::new(config).await?;
let node_config = node.config();
assert_eq!(node_config.peer_id, expected_peer_id);
assert_eq!(node_config.max_connections, 100);
assert!(node_config.enable_mcp_server);
Ok(())
}
#[tokio::test]
async fn test_mcp_server_access() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert!(node.mcp_server().is_some());
let mut config = create_test_node_config();
config.enable_mcp_server = false;
let node_no_mcp = P2PNode::new(config).await?;
assert!(node_no_mcp.mcp_server().is_none());
Ok(())
}
#[tokio::test]
async fn test_dht_access() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert!(node.dht().is_some());
Ok(())
}
#[tokio::test]
async fn test_node_builder() -> Result<()> {
let node = P2PNode::builder()
.with_peer_id("builder_test_peer".to_string())
.listen_on("/ip4/127.0.0.1/tcp/9100")
.listen_on("/ip6/::1/tcp/9100")
.with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
.with_ipv6(true)
.with_mcp_server()
.with_connection_timeout(Duration::from_secs(15))
.with_max_connections(200)
.build()
.await?;
assert_eq!(node.peer_id(), "builder_test_peer");
let config = node.config();
assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
assert!(config.enable_ipv6);
assert!(config.enable_mcp_server);
assert_eq!(config.connection_timeout, Duration::from_secs(15));
assert_eq!(config.max_connections, 200);
Ok(())
}
#[tokio::test]
async fn test_node_builder_with_mcp_config() -> Result<()> {
let mcp_config = MCPServerConfig {
server_name: "test_mcp_server".to_string(),
server_version: "1.0.0".to_string(),
enable_dht_discovery: false,
enable_auth: false,
..MCPServerConfig::default()
};
let node = P2PNode::builder()
.with_peer_id("mcp_config_test".to_string())
.with_mcp_config(mcp_config.clone())
.build()
.await?;
assert_eq!(node.peer_id(), "mcp_config_test");
let config = node.config();
assert!(config.enable_mcp_server);
assert!(config.mcp_server_config.is_some());
let node_mcp_config = config.mcp_server_config.as_ref().unwrap();
assert_eq!(node_mcp_config.server_name, "test_mcp_server");
assert!(!node_mcp_config.enable_auth);
Ok(())
}
#[tokio::test]
async fn test_mcp_server_not_enabled_errors() -> Result<()> {
let mut config = create_test_node_config();
config.enable_mcp_server = false;
let node = P2PNode::new(config).await?;
let tool = create_test_tool("test_tool");
let result = node.register_mcp_tool(tool).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
let result = node.call_mcp_tool("test_tool", json!({})).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
let result = node.list_mcp_tools().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
let result = node.mcp_stats().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
Ok(())
}
#[tokio::test]
async fn test_bootstrap_peers() -> Result<()> {
let mut config = create_test_node_config();
config.bootstrap_peers = vec![
"/ip4/127.0.0.1/tcp/9200".to_string(),
"/ip4/127.0.0.1/tcp/9201".to_string(),
];
let node = P2PNode::new(config).await?;
node.start().await?;
let peer_count = node.peer_count().await;
assert!(peer_count <= 2, "Peer count should not exceed bootstrap peer count");
node.stop().await?;
Ok(())
}
#[tokio::test]
async fn test_production_mode_disabled() -> Result<()> {
let config = create_test_node_config();
let node = P2PNode::new(config).await?;
assert!(!node.is_production_mode());
assert!(node.production_config().is_none());
let result = node.resource_metrics().await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not enabled"));
Ok(())
}
#[tokio::test]
async fn test_network_event_variants() {
let peer_id = "test_peer".to_string();
let address = "/ip4/127.0.0.1/tcp/9000".to_string();
let _peer_connected = NetworkEvent::PeerConnected {
peer_id: peer_id.clone(),
addresses: vec![address.clone()],
};
let _peer_disconnected = NetworkEvent::PeerDisconnected {
peer_id: peer_id.clone(),
reason: "test disconnect".to_string(),
};
let _message_received = NetworkEvent::MessageReceived {
peer_id: peer_id.clone(),
protocol: "test-protocol".to_string(),
data: vec![1, 2, 3],
};
let _connection_failed = NetworkEvent::ConnectionFailed {
peer_id: Some(peer_id.clone()),
address: address.clone(),
error: "connection refused".to_string(),
};
let _dht_stored = NetworkEvent::DHTRecordStored {
key: vec![1, 2, 3],
value: vec![4, 5, 6],
};
let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
key: vec![1, 2, 3],
value: Some(vec![4, 5, 6]),
};
}
#[tokio::test]
async fn test_peer_info_structure() {
let peer_info = PeerInfo {
peer_id: "test_peer".to_string(),
addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
connected_at: Instant::now(),
last_seen: Instant::now(),
status: ConnectionStatus::Connected,
protocols: vec!["test-protocol".to_string()],
};
assert_eq!(peer_info.peer_id, "test_peer");
assert_eq!(peer_info.addresses.len(), 1);
assert_eq!(peer_info.status, ConnectionStatus::Connected);
assert_eq!(peer_info.protocols.len(), 1);
}
#[tokio::test]
async fn test_serialization() -> Result<()> {
let config = create_test_node_config();
let serialized = serde_json::to_string(&config)?;
let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
assert_eq!(config.peer_id, deserialized.peer_id);
assert_eq!(config.listen_addrs, deserialized.listen_addrs);
assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
Ok(())
}
}