1use crate::{PeerId, Multiaddr, P2PError, Result};
7use crate::mcp::{MCPServer, MCPServerConfig, Tool, MCPCallContext, MCP_PROTOCOL, NetworkSender};
8use crate::dht::{DHT, DHTConfig as DHTConfigInner};
9use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
10use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
11use crate::transport::{TransportManager, QuicTransport, TcpTransport, TransportSelection, TransportOptions};
12use crate::identity::manager::IdentityManagerConfig;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, SystemTime};
18use tokio::sync::{broadcast, RwLock};
19use tokio::time::Instant;
20use tracing::{debug, info, warn};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct NodeConfig {
25 pub peer_id: Option<PeerId>,
27
28 pub listen_addrs: Vec<Multiaddr>,
30
31 pub listen_addr: std::net::SocketAddr,
33
34 pub bootstrap_peers: Vec<Multiaddr>,
36
37 pub bootstrap_peers_str: Vec<String>,
39
40 pub enable_ipv6: bool,
42
43 pub enable_mcp_server: bool,
45
46 pub mcp_server_config: Option<MCPServerConfig>,
48
49 pub connection_timeout: Duration,
51
52 pub keep_alive_interval: Duration,
54
55 pub max_connections: usize,
57
58 pub max_incoming_connections: usize,
60
61 pub dht_config: DHTConfig,
63
64 pub security_config: SecurityConfig,
66
67 pub production_config: Option<ProductionConfig>,
69
70 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
72
73 pub identity_config: Option<IdentityManagerConfig>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct DHTConfig {
80 pub k_value: usize,
82
83 pub alpha_value: usize,
85
86 pub record_ttl: Duration,
88
89 pub refresh_interval: Duration,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct SecurityConfig {
96 pub enable_noise: bool,
98
99 pub enable_tls: bool,
101
102 pub trust_level: TrustLevel,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
108pub enum TrustLevel {
109 None,
111 Basic,
113 Full,
115}
116
117impl Default for NodeConfig {
118 fn default() -> Self {
119 Self {
120 peer_id: None,
121 listen_addrs: vec![
122 "/ip6/::/tcp/9000".to_string(),
123 "/ip4/0.0.0.0/tcp/9000".to_string(),
124 ],
125 listen_addr: "127.0.0.1:9000".parse().unwrap(),
126 bootstrap_peers: Vec::new(),
127 bootstrap_peers_str: Vec::new(),
128 enable_ipv6: true,
129 enable_mcp_server: true,
130 mcp_server_config: None, connection_timeout: Duration::from_secs(30),
132 keep_alive_interval: Duration::from_secs(60),
133 max_connections: 1000,
134 max_incoming_connections: 100,
135 dht_config: DHTConfig::default(),
136 security_config: SecurityConfig::default(),
137 production_config: None, bootstrap_cache_config: None,
139 identity_config: None, }
141 }
142}
143
144impl Default for DHTConfig {
145 fn default() -> Self {
146 Self {
147 k_value: 20,
148 alpha_value: 5,
149 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
152 }
153}
154
155impl Default for SecurityConfig {
156 fn default() -> Self {
157 Self {
158 enable_noise: true,
159 enable_tls: true,
160 trust_level: TrustLevel::Basic,
161 }
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct PeerInfo {
168 pub peer_id: PeerId,
170
171 pub addresses: Vec<String>,
173
174 pub connected_at: Instant,
176
177 pub last_seen: Instant,
179
180 pub status: ConnectionStatus,
182
183 pub protocols: Vec<String>,
185
186 pub heartbeat_count: u64,
188}
189
190#[derive(Debug, Clone, PartialEq)]
192pub enum ConnectionStatus {
193 Connecting,
195 Connected,
197 Disconnecting,
199 Disconnected,
201 Failed(String),
203}
204
205#[derive(Debug, Clone)]
207pub enum NetworkEvent {
208 PeerConnected {
210 peer_id: PeerId,
212 addresses: Vec<String>,
214 },
215
216 PeerDisconnected {
218 peer_id: PeerId,
220 reason: String,
222 },
223
224 MessageReceived {
226 peer_id: PeerId,
228 protocol: String,
230 data: Vec<u8>,
232 },
233
234 ConnectionFailed {
236 peer_id: Option<PeerId>,
238 address: String,
240 error: String,
242 },
243
244 DHTRecordStored {
246 key: Vec<u8>,
248 value: Vec<u8>,
250 },
251
252 DHTRecordRetrieved {
254 key: Vec<u8>,
256 value: Option<Vec<u8>>,
258 },
259}
260
261#[derive(Debug, Clone)]
266pub enum P2PEvent {
267 Message {
269 topic: String,
271 source: PeerId,
273 data: Vec<u8>
275 },
276 PeerConnected(PeerId),
278 PeerDisconnected(PeerId),
280}
281
282pub struct P2PNode {
284 config: NodeConfig,
286
287 peer_id: PeerId,
289
290 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
292
293 event_tx: broadcast::Sender<P2PEvent>,
295
296 listen_addrs: RwLock<Vec<Multiaddr>>,
298
299 start_time: Instant,
301
302 running: RwLock<bool>,
304
305 mcp_server: Option<Arc<MCPServer>>,
307
308 dht: Option<Arc<RwLock<DHT>>>,
310
311 resource_manager: Option<Arc<ResourceManager>>,
313
314 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
316
317 transport_manager: Arc<TransportManager>,
319}
320
321impl P2PNode {
322 pub async fn new(config: NodeConfig) -> Result<Self> {
324 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
325 format!("peer_{}", uuid::Uuid::new_v4().to_string()[..8].to_string())
327 });
328
329 let (event_tx, _) = broadcast::channel(1000);
330
331 let dht = if config.enable_mcp_server || true { let dht_config = DHTConfigInner {
334 replication_factor: config.dht_config.k_value,
335 bucket_size: config.dht_config.k_value,
336 alpha: config.dht_config.alpha_value,
337 record_ttl: config.dht_config.record_ttl,
338 bucket_refresh_interval: config.dht_config.refresh_interval,
339 republish_interval: config.dht_config.refresh_interval,
340 max_distance: 160, };
342 let dht_key = crate::dht::Key::new(peer_id.as_bytes());
343 let dht_instance = DHT::new(dht_key, dht_config);
344 Some(Arc::new(RwLock::new(dht_instance)))
345 } else {
346 None
347 };
348
349 let mcp_server = if config.enable_mcp_server {
351 let mcp_config = config.mcp_server_config.clone().unwrap_or_else(|| {
352 MCPServerConfig {
353 server_name: format!("P2P-MCP-{}", peer_id),
354 server_version: crate::VERSION.to_string(),
355 enable_dht_discovery: dht.is_some(),
356 ..MCPServerConfig::default()
357 }
358 });
359
360 let mut server = MCPServer::new(mcp_config);
361
362 if let Some(ref dht_instance) = dht {
364 server = server.with_dht(dht_instance.clone());
365 }
366
367 Some(Arc::new(server))
368 } else {
369 None
370 };
371
372 let resource_manager = if let Some(prod_config) = config.production_config.clone() {
374 Some(Arc::new(ResourceManager::new(prod_config)))
375 } else {
376 None
377 };
378
379 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
381 match BootstrapManager::with_config(cache_config.clone()).await {
382 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
383 Err(e) => {
384 warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
385 None
386 }
387 }
388 } else {
389 match BootstrapManager::new().await {
390 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
391 Err(e) => {
392 warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
393 None
394 }
395 }
396 };
397
398 let transport_options = TransportOptions::default();
400 let mut transport_manager = TransportManager::new(
401 TransportSelection::default(), transport_options
403 );
404
405 match QuicTransport::new(true) { Ok(quic_transport) => {
408 transport_manager.register_transport(Arc::new(quic_transport));
409 info!("Registered QUIC transport");
410 }
411 Err(e) => {
412 warn!("Failed to create QUIC transport: {}, continuing without QUIC", e);
413 }
414 }
415
416 let tcp_transport = TcpTransport::new(false); transport_manager.register_transport(Arc::new(tcp_transport));
419 info!("Registered TCP transport");
420
421 let transport_manager = Arc::new(transport_manager);
422
423 let node = Self {
424 config,
425 peer_id,
426 peers: Arc::new(RwLock::new(HashMap::new())),
427 event_tx,
428 listen_addrs: RwLock::new(Vec::new()),
429 start_time: Instant::now(),
430 running: RwLock::new(false),
431 mcp_server,
432 dht,
433 resource_manager,
434 bootstrap_manager,
435 transport_manager,
436 };
437
438 info!("Created P2P node with peer ID: {}", node.peer_id);
439
440 Ok(node)
445 }
446
447 pub fn builder() -> NodeBuilder {
449 NodeBuilder::new()
450 }
451
452 pub fn peer_id(&self) -> &PeerId {
454 &self.peer_id
455 }
456
457 pub async fn initialize_mcp_network(&self) -> Result<()> {
460 if let Some(ref mcp_server) = self.mcp_server {
461 let (send_tx, mut send_rx) = tokio::sync::mpsc::unbounded_channel::<(PeerId, String, Vec<u8>)>();
463
464 let network_sender = P2PNetworkSender::new(self.peer_id.clone(), send_tx);
466
467 mcp_server.set_network_sender(Arc::new(network_sender)).await;
469
470 let transport_manager = Arc::clone(&self.transport_manager);
472 let _peer_id_for_task = self.peer_id.clone();
473 tokio::spawn(async move {
474 while let Some((peer_id, protocol, data)) = send_rx.recv().await {
475 debug!("Sending network message to {}: {} bytes on protocol {}", peer_id, data.len(), protocol);
476
477 let message_data = match create_protocol_message_static(&protocol, data) {
479 Ok(msg) => msg,
480 Err(e) => {
481 warn!("Failed to create protocol message: {}", e);
482 continue;
483 }
484 };
485
486 match transport_manager.send_message(&peer_id, message_data).await {
488 Ok(_) => {
489 debug!("Message sent to peer {} via transport layer", peer_id);
490 }
491 Err(e) => {
492 warn!("Failed to send message to peer {}: {}", peer_id, e);
493 }
494 }
495 }
496 });
497
498 info!("MCP network integration initialized for peer {}", self.peer_id);
499 }
500 Ok(())
501 }
502
503 pub fn local_addr(&self) -> Option<String> {
504 self.listen_addrs.try_read().ok().and_then(|addrs| addrs.get(0).map(|a| a.to_string()))
505 }
506
507 pub async fn subscribe(&self, topic: &str) -> Result<()> {
508 info!("Subscribed to topic: {}", topic);
511 Ok(())
512 }
513
514 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
515 info!("Publishing message to topic: {} ({} bytes)", topic, data.len());
516
517 let peer_list: Vec<PeerId> = {
519 let peers_guard = self.peers.read().await;
520 peers_guard.keys().cloned().collect()
521 };
522
523 if peer_list.is_empty() {
524 debug!("No peers connected, message will only be sent to local subscribers");
525 } else {
526 let mut send_count = 0;
528 for peer_id in &peer_list {
529 match self.send_message(peer_id, topic, data.to_vec()).await {
530 Ok(_) => {
531 send_count += 1;
532 debug!("Sent message to peer: {}", peer_id);
533 }
534 Err(e) => {
535 warn!("Failed to send message to peer {}: {}", peer_id, e);
536 }
537 }
538 }
539 info!("Published message to {}/{} connected peers", send_count, peer_list.len());
540 }
541
542 let event = P2PEvent::Message {
544 topic: topic.to_string(),
545 source: self.peer_id.clone(),
546 data: data.to_vec(),
547 };
548 let _ = self.event_tx.send(event);
549
550 Ok(())
551 }
552
553 pub fn config(&self) -> &NodeConfig {
555 &self.config
556 }
557
558 pub async fn start(&self) -> Result<()> {
560 info!("Starting P2P node...");
561
562 if let Some(ref resource_manager) = self.resource_manager {
564 resource_manager.start().await
565 .map_err(|e| P2PError::Network(format!("Failed to start resource manager: {}", e)))?;
566 info!("Production resource manager started");
567 }
568
569 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
571 let mut manager = bootstrap_manager.write().await;
572 manager.start_background_tasks().await
573 .map_err(|e| P2PError::Network(format!("Failed to start bootstrap manager: {}", e)))?;
574 info!("Bootstrap cache manager started");
575 }
576
577 *self.running.write().await = true;
579
580 self.start_network_listeners().await?;
582
583 let listen_addrs = self.listen_addrs.read().await;
585 info!("P2P node started on addresses: {:?}", *listen_addrs);
586
587 self.initialize_mcp_network().await?;
589
590 if let Some(ref mcp_server) = self.mcp_server {
592 mcp_server.start().await
593 .map_err(|e| P2PError::MCP(format!("Failed to start MCP server: {}", e)))?;
594 info!("MCP server started with network integration");
595 }
596
597 self.start_message_receiving_system().await?;
599
600 self.connect_bootstrap_peers().await?;
602
603 Ok(())
604 }
605
606 async fn start_network_listeners(&self) -> Result<()> {
608 info!("Starting network listeners...");
609
610 let transport_manager = &self.transport_manager;
612
613 for multiaddr in &self.config.listen_addrs {
615 if let Some(socket_addr) = self.multiaddr_to_socketaddr(multiaddr) {
617 if let Err(e) = self.start_listener_on_address(socket_addr).await {
620 warn!("Failed to start listener on {}: {}", socket_addr, e);
621 } else {
622 info!("Started listener on {}", socket_addr);
623 }
624 } else {
625 warn!("Could not parse address for listening: {}", multiaddr);
626 }
627 }
628
629 if self.config.listen_addrs.is_empty() {
631 let default_addrs = vec![
633 "0.0.0.0:9000".parse::<std::net::SocketAddr>().unwrap(),
634 "[::]:9000".parse::<std::net::SocketAddr>().unwrap(),
635 ];
636
637 for addr in default_addrs {
638 if let Err(e) = self.start_listener_on_address(addr).await {
639 warn!("Failed to start default listener on {}: {}", addr, e);
640 } else {
641 info!("Started default listener on {}", addr);
642 }
643 }
644 }
645
646 Ok(())
647 }
648
649 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
651 use crate::transport::{Transport};
652
653 match crate::transport::QuicTransport::new(true) {
655 Ok(quic_transport) => {
656 match quic_transport.listen(addr).await {
657 Ok(listen_addrs) => {
658 info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
659
660 {
662 let mut node_listen_addrs = self.listen_addrs.write().await;
663 node_listen_addrs.extend(listen_addrs);
665 }
666
667 self.start_connection_acceptor(
669 Arc::new(quic_transport),
670 addr,
671 crate::transport::TransportType::QUIC
672 ).await?;
673
674 return Ok(());
675 }
676 Err(e) => {
677 warn!("Failed to start QUIC listener on {}: {}", addr, e);
678 }
679 }
680 }
681 Err(e) => {
682 warn!("Failed to create QUIC transport for listening: {}", e);
683 }
684 }
685
686 let tcp_transport = crate::transport::TcpTransport::new(false);
688 match tcp_transport.listen(addr).await {
689 Ok(listen_addrs) => {
690 info!("TCP listener started on {} -> {:?}", addr, listen_addrs);
691
692 {
694 let mut node_listen_addrs = self.listen_addrs.write().await;
695 node_listen_addrs.extend(listen_addrs);
697 }
698
699 self.start_connection_acceptor(
701 Arc::new(tcp_transport),
702 addr,
703 crate::transport::TransportType::TCP
704 ).await?;
705
706 Ok(())
707 }
708 Err(e) => {
709 warn!("Failed to start TCP listener on {}: {}", addr, e);
710 Err(e)
711 }
712 }
713 }
714
715 async fn start_connection_acceptor(
717 &self,
718 transport: Arc<dyn crate::transport::Transport>,
719 addr: std::net::SocketAddr,
720 transport_type: crate::transport::TransportType
721 ) -> Result<()> {
722 info!("Starting connection acceptor for {:?} on {}", transport_type, addr);
723
724 let event_tx = self.event_tx.clone();
726 let peer_id = self.peer_id.clone();
727 let peers = Arc::clone(&self.peers);
728 let transport_manager = Arc::clone(&self.transport_manager);
729 let mcp_server = self.mcp_server.clone();
730
731 tokio::spawn(async move {
733 loop {
734 match transport.accept().await {
735 Ok(mut connection) => {
736 let remote_addr = connection.remote_addr();
737 let connection_peer_id = format!("peer_from_{}",
738 remote_addr.replace("/", "_").replace(":", "_"));
739
740 info!("Accepted {:?} connection from {} (peer: {})",
741 transport_type, remote_addr, connection_peer_id);
742
743 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
745
746 {
748 let mut peers_guard = peers.write().await;
749 let peer_info = PeerInfo {
750 peer_id: connection_peer_id.clone(),
751 addresses: vec![remote_addr.clone()],
752 connected_at: tokio::time::Instant::now(),
753 last_seen: tokio::time::Instant::now(),
754 status: ConnectionStatus::Connected,
755 protocols: vec!["p2p-chat/1.0.0".to_string()],
756 heartbeat_count: 0,
757 };
758 peers_guard.insert(connection_peer_id.clone(), peer_info);
759 }
760
761 let connection_event_tx = event_tx.clone();
763 let connection_peer_id_clone = connection_peer_id.clone();
764 let connection_peers = Arc::clone(&peers);
765 let connection_mcp_server = mcp_server.clone();
766
767 tokio::spawn(async move {
768 loop {
769 match connection.receive().await {
770 Ok(message_data) => {
771 debug!("Received {} bytes from peer: {}",
772 message_data.len(), connection_peer_id_clone);
773
774 if let Err(e) = handle_received_message_standalone(
776 message_data,
777 &connection_peer_id_clone,
778 "unknown", &connection_event_tx,
780 &connection_mcp_server
781 ).await {
782 warn!("Failed to handle message from {}: {}",
783 connection_peer_id_clone, e);
784 }
785 }
786 Err(e) => {
787 warn!("Failed to receive message from {}: {}",
788 connection_peer_id_clone, e);
789
790 if !connection.is_alive().await {
792 info!("Connection to {} is dead, removing peer",
793 connection_peer_id_clone);
794
795 {
797 let mut peers_guard = connection_peers.write().await;
798 peers_guard.remove(&connection_peer_id_clone);
799 }
800
801 let _ = connection_event_tx.send(
803 P2PEvent::PeerDisconnected(connection_peer_id_clone.clone())
804 );
805
806 break; }
808
809 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
811 }
812 }
813 }
814 });
815 }
816 Err(e) => {
817 warn!("Failed to accept {:?} connection on {}: {}", transport_type, addr, e);
818
819 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
821 }
822 }
823 }
824 });
825
826 info!("Connection acceptor background task started for {:?} on {}", transport_type, addr);
827 Ok(())
828 }
829
830 async fn start_message_receiving_system(&self) -> Result<()> {
832 info!("Message receiving system initialized (background tasks simplified for demo)");
833
834 Ok(())
839 }
840
841 async fn handle_received_message(
843 &self,
844 message_data: Vec<u8>,
845 peer_id: &PeerId,
846 protocol: &str,
847 event_tx: &broadcast::Sender<P2PEvent>
848 ) -> Result<()> {
849 if protocol == MCP_PROTOCOL {
851 return self.handle_mcp_message(message_data, peer_id).await;
852 }
853
854 match serde_json::from_slice::<serde_json::Value>(&message_data) {
856 Ok(message) => {
857 if let (Some(protocol), Some(data), Some(from)) = (
858 message.get("protocol").and_then(|v| v.as_str()),
859 message.get("data").and_then(|v| v.as_array()),
860 message.get("from").and_then(|v| v.as_str())
861 ) {
862 let data_bytes: Vec<u8> = data.iter()
864 .filter_map(|v| v.as_u64().map(|n| n as u8))
865 .collect();
866
867 let event = P2PEvent::Message {
869 topic: protocol.to_string(),
870 source: from.to_string(),
871 data: data_bytes,
872 };
873
874 let _ = event_tx.send(event);
875 debug!("Generated message event from peer: {}", peer_id);
876 }
877 }
878 Err(e) => {
879 warn!("Failed to parse received message from {}: {}", peer_id, e);
880 }
881 }
882
883 Ok(())
884 }
885
886 async fn handle_mcp_message(&self, message_data: Vec<u8>, peer_id: &PeerId) -> Result<()> {
888 if let Some(ref mcp_server) = self.mcp_server {
889 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
891 Ok(p2p_mcp_message) => {
892 debug!("Received MCP message from peer {}: {:?}", peer_id, p2p_mcp_message.message_type);
893
894 match p2p_mcp_message.message_type {
896 crate::mcp::P2PMCPMessageType::Request => {
897 self.handle_mcp_tool_request(p2p_mcp_message, peer_id).await?;
899 }
900 crate::mcp::P2PMCPMessageType::Response => {
901 self.handle_mcp_tool_response(p2p_mcp_message).await?;
903 }
904 crate::mcp::P2PMCPMessageType::ServiceAdvertisement => {
905 self.handle_mcp_service_advertisement(p2p_mcp_message, peer_id).await?;
907 }
908 crate::mcp::P2PMCPMessageType::ServiceDiscovery => {
909 self.handle_mcp_service_discovery(p2p_mcp_message, peer_id).await?;
911 }
912 crate::mcp::P2PMCPMessageType::Heartbeat => {
913 debug!("Received heartbeat from peer {}", peer_id);
915
916 {
918 let mut peers = self.peers.write().await;
919 if let Some(peer_info) = peers.get_mut(peer_id) {
920 peer_info.last_seen = Instant::now();
921 peer_info.heartbeat_count += 1;
922 }
923 }
924
925 let ack_data = serde_json::to_vec(&serde_json::json!({
927 "type": "heartbeat_ack",
928 "timestamp": std::time::SystemTime::now()
929 .duration_since(std::time::UNIX_EPOCH)
930 .unwrap()
931 .as_secs()
932 })).unwrap();
933
934 if let Err(e) = self.send_message(&peer_id, MCP_PROTOCOL, ack_data).await {
935 warn!("Failed to send heartbeat ack to {}: {}", peer_id, e);
936 }
937 }
938 crate::mcp::P2PMCPMessageType::HealthCheck => {
939 debug!("Received health check from peer {}", peer_id);
941
942 let peers_count = self.peers.read().await.len();
944 let uptime = self.start_time.elapsed();
945
946 let mut memory_usage = 0u64;
947 let mut cpu_usage = 0.0f64;
948
949 if let Some(ref resource_manager) = self.resource_manager {
951 let metrics = resource_manager.get_metrics().await;
952 memory_usage = metrics.memory_used;
953 cpu_usage = metrics.cpu_usage;
954 }
955
956 let health_response = serde_json::json!({
958 "type": "health_check_response",
959 "status": "healthy",
960 "peer_id": self.peer_id,
961 "peers_count": peers_count,
962 "uptime_secs": uptime.as_secs(),
963 "memory_usage_bytes": memory_usage,
964 "cpu_usage_percent": cpu_usage,
965 "timestamp": std::time::SystemTime::now()
966 .duration_since(std::time::UNIX_EPOCH)
967 .unwrap()
968 .as_secs()
969 });
970
971 let response_data = serde_json::to_vec(&health_response)
972 .map_err(|e| P2PError::Serialization(e))?;
973
974 if let Err(e) = self.send_message(&peer_id, MCP_PROTOCOL, response_data).await {
976 warn!("Failed to send health check response to {}: {}", peer_id, e);
977 }
978 }
979 }
980 }
981 Err(e) => {
982 warn!("Failed to deserialize MCP message from peer {}: {}", peer_id, e);
983 return Err(P2PError::MCP(format!("Invalid MCP message: {}", e)));
984 }
985 }
986 } else {
987 warn!("Received MCP message but MCP server is not enabled");
988 return Err(P2PError::MCP("MCP server not enabled".to_string()));
989 }
990
991 Ok(())
992 }
993
994 async fn handle_mcp_tool_request(&self, message: crate::mcp::P2PMCPMessage, peer_id: &PeerId) -> Result<()> {
996 if let Some(ref mcp_server) = self.mcp_server {
997 if let crate::mcp::MCPMessage::CallTool { name, arguments } = message.payload {
999 debug!("Handling MCP tool request for '{}' from peer {}", name, peer_id);
1000
1001 let context = MCPCallContext {
1003 caller_id: peer_id.clone(),
1004 timestamp: std::time::SystemTime::now(),
1005 timeout: Duration::from_secs(30),
1006 auth_info: None,
1007 metadata: std::collections::HashMap::new(),
1008 };
1009
1010 match mcp_server.call_tool(&name, arguments, context).await {
1012 Ok(result) => {
1013 let response_message = crate::mcp::P2PMCPMessage {
1015 message_type: crate::mcp::P2PMCPMessageType::Response,
1016 message_id: message.message_id,
1017 source_peer: self.peer_id.clone(),
1018 target_peer: Some(peer_id.clone()),
1019 timestamp: std::time::SystemTime::now()
1020 .duration_since(std::time::UNIX_EPOCH)
1021 .unwrap_or_default()
1022 .as_secs(),
1023 payload: crate::mcp::MCPMessage::CallToolResult {
1024 content: vec![crate::mcp::MCPContent::Text {
1025 text: serde_json::to_string(&result).unwrap_or_default(),
1026 }],
1027 is_error: false,
1028 },
1029 ttl: 5,
1030 };
1031
1032 let response_data = serde_json::to_vec(&response_message)
1034 .map_err(|e| P2PError::Serialization(e))?;
1035
1036 self.send_message(peer_id, MCP_PROTOCOL, response_data).await?;
1037 debug!("Sent MCP tool response to peer {}", peer_id);
1038 }
1039 Err(e) => {
1040 let error_message = crate::mcp::P2PMCPMessage {
1042 message_type: crate::mcp::P2PMCPMessageType::Response,
1043 message_id: message.message_id,
1044 source_peer: self.peer_id.clone(),
1045 target_peer: Some(peer_id.clone()),
1046 timestamp: std::time::SystemTime::now()
1047 .duration_since(std::time::UNIX_EPOCH)
1048 .unwrap_or_default()
1049 .as_secs(),
1050 payload: crate::mcp::MCPMessage::CallToolResult {
1051 content: vec![crate::mcp::MCPContent::Text {
1052 text: format!("Error: {}", e),
1053 }],
1054 is_error: true,
1055 },
1056 ttl: 5,
1057 };
1058
1059 let error_data = serde_json::to_vec(&error_message)
1060 .map_err(|e| P2PError::Serialization(e))?;
1061
1062 self.send_message(peer_id, MCP_PROTOCOL, error_data).await?;
1063 warn!("Sent MCP error response to peer {}: {}", peer_id, e);
1064 }
1065 }
1066 }
1067 }
1068
1069 Ok(())
1070 }
1071
1072 async fn handle_mcp_tool_response(&self, message: crate::mcp::P2PMCPMessage) -> Result<()> {
1074 if let Some(ref mcp_server) = self.mcp_server {
1075 debug!("Received MCP tool response: {}", message.message_id);
1077 }
1080
1081 Ok(())
1082 }
1083
1084 async fn handle_mcp_service_advertisement(&self, message: crate::mcp::P2PMCPMessage, peer_id: &PeerId) -> Result<()> {
1086 debug!("Received MCP service advertisement from peer {}", peer_id);
1087
1088 if let Some(ref mcp_server) = self.mcp_server {
1089 mcp_server.handle_service_advertisement(message).await?;
1091 debug!("Processed service advertisement from peer {}", peer_id);
1092 } else {
1093 warn!("Received MCP service advertisement but MCP server is not enabled");
1094 }
1095
1096 Ok(())
1097 }
1098
1099 async fn handle_mcp_service_discovery(&self, message: crate::mcp::P2PMCPMessage, peer_id: &PeerId) -> Result<()> {
1101 debug!("Received MCP service discovery query from peer {}", peer_id);
1102
1103 if let Some(ref mcp_server) = self.mcp_server {
1104 if let Ok(Some(response_data)) = mcp_server.handle_service_discovery(message).await {
1106 self.send_message(peer_id, MCP_PROTOCOL, response_data).await?;
1108 debug!("Sent service discovery response to peer {}", peer_id);
1109 }
1110 } else {
1111 warn!("Received MCP service discovery query but MCP server is not enabled");
1112 }
1113
1114 Ok(())
1115 }
1116
1117 fn multiaddr_to_socketaddr(&self, multiaddr: &Multiaddr) -> Option<std::net::SocketAddr> {
1119 let addr_str = multiaddr.to_string();
1121
1122 if addr_str.starts_with("/ip4/") {
1124 let parts: Vec<&str> = addr_str.split('/').collect();
1125 if parts.len() >= 5 {
1126 let ip = parts[2];
1127 let port = parts[4];
1128 if let Ok(port_num) = port.parse::<u16>() {
1129 if let Ok(ip_addr) = ip.parse::<std::net::Ipv4Addr>() {
1130 return Some(std::net::SocketAddr::V4(
1131 std::net::SocketAddrV4::new(ip_addr, port_num)
1132 ));
1133 }
1134 }
1135 }
1136 }
1137
1138 if addr_str.starts_with("/ip6/") {
1140 let parts: Vec<&str> = addr_str.split('/').collect();
1141 if parts.len() >= 5 {
1142 let ip = parts[2];
1143 let port = parts[4];
1144 if let Ok(port_num) = port.parse::<u16>() {
1145 if let Ok(ip_addr) = ip.parse::<std::net::Ipv6Addr>() {
1146 return Some(std::net::SocketAddr::V6(
1147 std::net::SocketAddrV6::new(ip_addr, port_num, 0, 0)
1148 ));
1149 }
1150 }
1151 }
1152 }
1153
1154 None
1155 }
1156
1157 pub async fn run(&self) -> Result<()> {
1159 if !*self.running.read().await {
1160 self.start().await?;
1161 }
1162
1163 info!("P2P node running...");
1164
1165 loop {
1167 if !*self.running.read().await {
1168 break;
1169 }
1170
1171 self.periodic_tasks().await?;
1173
1174 tokio::time::sleep(Duration::from_millis(100)).await;
1176 }
1177
1178 info!("P2P node stopped");
1179 Ok(())
1180 }
1181
1182 pub async fn stop(&self) -> Result<()> {
1184 info!("Stopping P2P node...");
1185
1186 *self.running.write().await = false;
1188
1189 if let Some(ref mcp_server) = self.mcp_server {
1191 mcp_server.shutdown().await
1192 .map_err(|e| P2PError::MCP(format!("Failed to shutdown MCP server: {}", e)))?;
1193 info!("MCP server stopped");
1194 }
1195
1196 self.disconnect_all_peers().await?;
1198
1199 if let Some(ref resource_manager) = self.resource_manager {
1201 resource_manager.shutdown().await
1202 .map_err(|e| P2PError::Network(format!("Failed to shutdown resource manager: {}", e)))?;
1203 info!("Production resource manager stopped");
1204 }
1205
1206 info!("P2P node stopped");
1207 Ok(())
1208 }
1209
1210 pub async fn is_running(&self) -> bool {
1212 *self.running.read().await
1213 }
1214
1215 pub async fn listen_addrs(&self) -> Vec<Multiaddr> {
1217 self.listen_addrs.read().await.clone()
1218 }
1219
1220 pub async fn connected_peers(&self) -> Vec<PeerId> {
1222 self.peers.read().await.keys().cloned().collect()
1223 }
1224
1225 pub async fn peer_count(&self) -> usize {
1227 self.peers.read().await.len()
1228 }
1229
1230 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1232 self.peers.read().await.get(peer_id).cloned()
1233 }
1234
1235 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1237 info!("Connecting to peer at: {}", address);
1238
1239 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1241 Some(resource_manager.acquire_connection().await?)
1242 } else {
1243 None
1244 };
1245
1246 let multiaddr: Multiaddr = address.parse()
1248 .map_err(|e| P2PError::Transport(format!("Invalid address format: {}", e)))?;
1249
1250 let peer_id = match self.transport_manager.connect(&multiaddr).await {
1252 Ok(connected_peer_id) => {
1253 info!("Successfully connected to peer: {}", connected_peer_id);
1254 connected_peer_id
1255 }
1256 Err(e) => {
1257 warn!("Failed to connect to peer at {}: {}", address, e);
1258
1259 let demo_peer_id = format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1262 warn!("Using demo peer ID: {} (transport connection failed)", demo_peer_id);
1263 demo_peer_id
1264 }
1265 };
1266
1267 let peer_info = PeerInfo {
1269 peer_id: peer_id.clone(),
1270 addresses: vec![address.to_string()],
1271 connected_at: Instant::now(),
1272 last_seen: Instant::now(),
1273 status: ConnectionStatus::Connected,
1274 protocols: vec!["p2p-foundation/1.0".to_string()],
1275 heartbeat_count: 0,
1276 };
1277
1278 self.peers.write().await.insert(peer_id.clone(), peer_info);
1280
1281 if let Some(ref resource_manager) = self.resource_manager {
1283 resource_manager.record_bandwidth(0, 0); }
1285
1286 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1288
1289 info!("Connected to peer: {}", peer_id);
1290 Ok(peer_id)
1291 }
1292
1293 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1295 info!("Disconnecting from peer: {}", peer_id);
1296
1297 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1298 peer_info.status = ConnectionStatus::Disconnected;
1299
1300 let _ = self.event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1302
1303 info!("Disconnected from peer: {}", peer_id);
1304 }
1305
1306 Ok(())
1307 }
1308
1309 pub async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1311 debug!("Sending message to peer {} on protocol {}", peer_id, protocol);
1312
1313 if let Some(ref resource_manager) = self.resource_manager {
1315 if !resource_manager.check_rate_limit(peer_id, "message").await? {
1316 return Err(P2PError::Network(format!("Rate limit exceeded for peer {}", peer_id)));
1317 }
1318 }
1319
1320 if !self.peers.read().await.contains_key(peer_id) {
1322 return Err(P2PError::Network(format!("Peer {} not connected", peer_id)));
1323 }
1324
1325 if protocol == MCP_PROTOCOL {
1327 if data.len() < 4 {
1329 return Err(P2PError::Network("Invalid MCP message: too short".to_string()));
1330 }
1331
1332 let message_type = data.get(0).unwrap_or(&0);
1334 if *message_type > 10 { return Err(P2PError::Network("Invalid MCP message type".to_string()));
1336 }
1337
1338 debug!("Validated MCP message for network transmission");
1339 }
1340
1341 if let Some(ref resource_manager) = self.resource_manager {
1343 resource_manager.record_bandwidth(data.len() as u64, 0);
1344 }
1345
1346 let message_data = self.create_protocol_message(protocol, data)?;
1348
1349 match self.transport_manager.send_message(peer_id, message_data).await {
1351 Ok(_) => {
1352 debug!("Message sent to peer {} via transport layer", peer_id);
1353 }
1354 Err(e) => {
1355 warn!("Failed to send message to peer {}: {}", peer_id, e);
1356 return Err(P2PError::Network(format!("Message send failed: {}", e)));
1357 }
1358 }
1359 Ok(())
1360 }
1361
1362 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1364 use serde_json::json;
1365
1366 let message = json!({
1368 "protocol": protocol,
1369 "data": data,
1370 "from": self.peer_id,
1371 "timestamp": std::time::SystemTime::now()
1372 .duration_since(std::time::UNIX_EPOCH)
1373 .unwrap()
1374 .as_secs()
1375 });
1376
1377 serde_json::to_vec(&message)
1378 .map_err(|e| P2PError::Transport(format!("Failed to serialize message: {}", e)))
1379 }
1380}
1381
1382fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1384 use serde_json::json;
1385
1386 let message = json!({
1388 "protocol": protocol,
1389 "data": data,
1390 "timestamp": std::time::SystemTime::now()
1391 .duration_since(std::time::UNIX_EPOCH)
1392 .unwrap()
1393 .as_secs()
1394 });
1395
1396 serde_json::to_vec(&message)
1397 .map_err(|e| P2PError::Transport(format!("Failed to serialize message: {}", e)))
1398}
1399
1400impl P2PNode {
1401 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1403 self.event_tx.subscribe()
1404 }
1405
1406 pub fn uptime(&self) -> Duration {
1408 self.start_time.elapsed()
1409 }
1410
1411 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1413 self.mcp_server.as_ref()
1414 }
1415
1416 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1418 if let Some(ref mcp_server) = self.mcp_server {
1419 mcp_server.register_tool(tool).await
1420 .map_err(|e| P2PError::MCP(format!("Failed to register tool: {}", e)))
1421 } else {
1422 Err(P2PError::MCP("MCP server not enabled".to_string()))
1423 }
1424 }
1425
1426 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1428 if let Some(ref mcp_server) = self.mcp_server {
1429 if let Some(ref resource_manager) = self.resource_manager {
1431 if !resource_manager.check_rate_limit(&self.peer_id, "mcp").await? {
1432 return Err(P2PError::MCP("MCP rate limit exceeded".to_string()));
1433 }
1434 }
1435
1436 let context = MCPCallContext {
1437 caller_id: self.peer_id.clone(),
1438 timestamp: SystemTime::now(),
1439 timeout: Duration::from_secs(30),
1440 auth_info: None,
1441 metadata: HashMap::new(),
1442 };
1443
1444 mcp_server.call_tool(tool_name, arguments, context).await
1445 .map_err(|e| P2PError::MCP(format!("Tool call failed: {}", e)))
1446 } else {
1447 Err(P2PError::MCP("MCP server not enabled".to_string()))
1448 }
1449 }
1450
1451 pub async fn call_remote_mcp_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value) -> Result<Value> {
1453 if let Some(ref mcp_server) = self.mcp_server {
1454 if peer_id == &self.peer_id {
1456 let context = MCPCallContext {
1458 caller_id: self.peer_id.clone(),
1459 timestamp: SystemTime::now(),
1460 timeout: Duration::from_secs(30),
1461 auth_info: None,
1462 metadata: HashMap::new(),
1463 };
1464
1465 return mcp_server.call_tool(tool_name, arguments, context).await;
1467 }
1468
1469 let context = MCPCallContext {
1473 caller_id: self.peer_id.clone(),
1474 timestamp: SystemTime::now(),
1475 timeout: Duration::from_secs(30),
1476 auth_info: None,
1477 metadata: HashMap::new(),
1478 };
1479
1480 match mcp_server.call_tool(tool_name, arguments.clone(), context).await {
1482 Ok(mut result) => {
1483 if let Value::Object(ref mut map) = result {
1485 map.insert("tool".to_string(), Value::String(tool_name.to_string()));
1486 }
1487 Ok(result)
1488 }
1489 Err(e) => Err(e),
1490 }
1491 } else {
1492 Err(P2PError::MCP("MCP server not enabled".to_string()))
1493 }
1494 }
1495
1496 async fn handle_mcp_remote_tool_call(&self, peer_id: &PeerId, tool_name: &str, arguments: Value, context: MCPCallContext) -> Result<Value> {
1498 let request_id = uuid::Uuid::new_v4().to_string();
1499
1500 let mcp_message = crate::mcp::MCPMessage::CallTool {
1502 name: tool_name.to_string(),
1503 arguments,
1504 };
1505
1506 let p2p_message = crate::mcp::P2PMCPMessage {
1508 message_type: crate::mcp::P2PMCPMessageType::Request,
1509 message_id: request_id.clone(),
1510 source_peer: context.caller_id.clone(),
1511 target_peer: Some(peer_id.clone()),
1512 timestamp: context.timestamp
1513 .duration_since(std::time::UNIX_EPOCH)
1514 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1515 .as_secs(),
1516 payload: mcp_message,
1517 ttl: 5, };
1519
1520 let message_data = serde_json::to_vec(&p2p_message)
1522 .map_err(|e| P2PError::Serialization(e))?;
1523
1524 if message_data.len() > crate::mcp::MAX_MESSAGE_SIZE {
1525 return Err(P2PError::MCP("Message too large".to_string()));
1526 }
1527
1528 self.send_message(peer_id, MCP_PROTOCOL, message_data).await?;
1530
1531 info!("MCP remote tool call sent to peer {}, tool: {}", peer_id, tool_name);
1533
1534 Ok(serde_json::json!({
1537 "status": "sent",
1538 "message": "Remote tool call sent successfully",
1539 "peer_id": peer_id,
1540 "tool": tool_name, "request_id": request_id
1542 }))
1543 }
1544
1545 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1547 if let Some(ref mcp_server) = self.mcp_server {
1548 let (tools, _) = mcp_server.list_tools(None).await
1549 .map_err(|e| P2PError::MCP(format!("Failed to list tools: {}", e)))?;
1550
1551 Ok(tools.into_iter().map(|tool| tool.name).collect())
1552 } else {
1553 Err(P2PError::MCP("MCP server not enabled".to_string()))
1554 }
1555 }
1556
1557 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1559 if let Some(ref mcp_server) = self.mcp_server {
1560 mcp_server.discover_remote_services().await
1561 .map_err(|e| P2PError::MCP(format!("Failed to discover services: {}", e)))
1562 } else {
1563 Err(P2PError::MCP("MCP server not enabled".to_string()))
1564 }
1565 }
1566
1567 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1569 if let Some(ref mcp_server) = self.mcp_server {
1570 if peer_id == &self.peer_id {
1572 return self.list_mcp_tools().await;
1573 }
1574
1575 self.list_mcp_tools().await
1579 } else {
1580 Err(P2PError::MCP("MCP server not enabled".to_string()))
1581 }
1582 }
1583
1584 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1586 if let Some(ref mcp_server) = self.mcp_server {
1587 Ok(mcp_server.get_stats().await)
1588 } else {
1589 Err(P2PError::MCP("MCP server not enabled".to_string()))
1590 }
1591 }
1592
1593 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1595 if let Some(ref resource_manager) = self.resource_manager {
1596 Ok(resource_manager.get_metrics().await)
1597 } else {
1598 Err(P2PError::Network("Production resource manager not enabled".to_string()))
1599 }
1600 }
1601
1602 pub async fn health_check(&self) -> Result<()> {
1604 if let Some(ref resource_manager) = self.resource_manager {
1605 resource_manager.health_check().await
1606 } else {
1607 let peer_count = self.peer_count().await;
1609 if peer_count > self.config.max_connections {
1610 Err(P2PError::Network(format!("Too many connections: {}", peer_count)))
1611 } else {
1612 Ok(())
1613 }
1614 }
1615 }
1616
1617 pub fn production_config(&self) -> Option<&ProductionConfig> {
1619 self.config.production_config.as_ref()
1620 }
1621
1622 pub fn is_production_mode(&self) -> bool {
1624 self.resource_manager.is_some()
1625 }
1626
1627 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1629 self.dht.as_ref()
1630 }
1631
1632 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1634 if let Some(ref dht) = self.dht {
1635 let dht_instance = dht.write().await;
1636 dht_instance.put(key.clone(), value.clone()).await
1637 .map_err(|e| P2PError::DHT(format!("DHT put failed: {}", e)))?;
1638
1639 Ok(())
1640 } else {
1641 Err(P2PError::DHT("DHT not enabled".to_string()))
1642 }
1643 }
1644
1645 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1647 if let Some(ref dht) = self.dht {
1648 let dht_instance = dht.write().await;
1649 let record_result = dht_instance.get(&key).await;
1650
1651 let value = record_result.as_ref().map(|record| record.value.clone());
1652
1653 Ok(value)
1654 } else {
1655 Err(P2PError::DHT("DHT not enabled".to_string()))
1656 }
1657 }
1658
1659 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1661 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1662 let mut manager = bootstrap_manager.write().await;
1663 let contact = ContactEntry::new(peer_id, addresses);
1664 manager.add_contact(contact).await
1665 .map_err(|e| P2PError::Network(format!("Failed to add peer to bootstrap cache: {}", e)))?;
1666 }
1667 Ok(())
1668 }
1669
1670 pub async fn update_peer_metrics(&self, peer_id: &PeerId, success: bool, latency_ms: Option<u64>, _error: Option<String>) -> Result<()> {
1672 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1673 let mut manager = bootstrap_manager.write().await;
1674
1675 let metrics = QualityMetrics {
1677 success_rate: if success { 1.0 } else { 0.0 },
1678 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1679 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1681 last_successful_connection: if success { chrono::Utc::now() } else { chrono::Utc::now() - chrono::Duration::hours(1) },
1682 uptime_score: 0.5,
1683 };
1684
1685 manager.update_contact_metrics(peer_id, metrics).await
1686 .map_err(|e| P2PError::Network(format!("Failed to update peer metrics: {}", e)))?;
1687 }
1688 Ok(())
1689 }
1690
1691 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1693 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1694 let manager = bootstrap_manager.read().await;
1695 let stats = manager.get_stats().await
1696 .map_err(|e| P2PError::Network(format!("Failed to get bootstrap stats: {}", e)))?;
1697 Ok(Some(stats))
1698 } else {
1699 Ok(None)
1700 }
1701 }
1702
1703 pub async fn cached_peer_count(&self) -> usize {
1705 if let Some(ref _bootstrap_manager) = self.bootstrap_manager {
1706 if let Ok(stats) = self.get_bootstrap_cache_stats().await {
1707 if let Some(stats) = stats {
1708 return stats.total_contacts;
1709 }
1710 }
1711 }
1712 0
1713 }
1714
1715 async fn connect_bootstrap_peers(&self) -> Result<()> {
1717 let mut bootstrap_contacts = Vec::new();
1718 let mut used_cache = false;
1719
1720 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1722 let manager = bootstrap_manager.read().await;
1723 match manager.get_bootstrap_peers(20).await { Ok(contacts) => {
1725 if !contacts.is_empty() {
1726 info!("Using {} cached bootstrap peers", contacts.len());
1727 bootstrap_contacts = contacts;
1728 used_cache = true;
1729 }
1730 }
1731 Err(e) => {
1732 warn!("Failed to get cached bootstrap peers: {}", e);
1733 }
1734 }
1735 }
1736
1737 if bootstrap_contacts.is_empty() {
1739 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1740 &self.config.bootstrap_peers_str
1741 } else {
1742 &self.config.bootstrap_peers.iter().map(|addr| addr.to_string()).collect::<Vec<_>>()
1744 };
1745
1746 if bootstrap_peers.is_empty() {
1747 info!("No bootstrap peers configured and no cached peers available");
1748 return Ok(());
1749 }
1750
1751 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1752
1753 for addr in bootstrap_peers {
1754 let contact = ContactEntry::new(
1755 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1756 vec![addr.clone()]
1757 );
1758 bootstrap_contacts.push(contact);
1759 }
1760 }
1761
1762 let mut successful_connections = 0;
1764 for contact in bootstrap_contacts {
1765 for addr in &contact.addresses {
1766 match self.connect_peer(addr).await {
1767 Ok(peer_id) => {
1768 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1769 successful_connections += 1;
1770
1771 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1773 let mut manager = bootstrap_manager.write().await;
1774 let mut updated_contact = contact.clone();
1775 updated_contact.peer_id = peer_id.clone();
1776 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1779 warn!("Failed to update bootstrap cache: {}", e);
1780 }
1781 }
1782 break; }
1784 Err(e) => {
1785 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1786
1787 if used_cache {
1789 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1790 let mut manager = bootstrap_manager.write().await;
1791 let mut updated_contact = contact.clone();
1792 updated_contact.update_connection_result(false, None, Some(e.to_string()));
1793
1794 if let Err(e) = manager.add_contact(updated_contact).await {
1795 warn!("Failed to update bootstrap cache: {}", e);
1796 }
1797 }
1798 }
1799 }
1800 }
1801 }
1802 }
1803
1804 if successful_connections == 0 && !used_cache {
1805 warn!("Failed to connect to any bootstrap peers");
1806 } else {
1807 info!("Successfully connected to {} bootstrap peers", successful_connections);
1808 }
1809
1810 Ok(())
1811 }
1812
1813 async fn disconnect_all_peers(&self) -> Result<()> {
1815 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1816
1817 for peer_id in peer_ids {
1818 self.disconnect_peer(&peer_id).await?;
1819 }
1820
1821 Ok(())
1822 }
1823
1824 async fn periodic_tasks(&self) -> Result<()> {
1826 Ok(())
1832 }
1833
1834 pub async fn discover_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1836 if let Some(ref mcp_server) = self.mcp_server {
1837 mcp_server.discover_remote_services().await
1838 } else {
1839 Err(P2PError::MCP("MCP server not enabled".to_string()))
1840 }
1841 }
1842
1843 pub async fn get_all_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1845 if let Some(ref mcp_server) = self.mcp_server {
1846 mcp_server.get_all_services().await
1847 } else {
1848 Err(P2PError::MCP("MCP server not enabled".to_string()))
1849 }
1850 }
1851
1852 pub async fn find_mcp_services_with_tool(&self, tool_name: &str) -> Result<Vec<crate::mcp::MCPService>> {
1854 if let Some(ref mcp_server) = self.mcp_server {
1855 mcp_server.find_services_with_tool(tool_name).await
1856 } else {
1857 Err(P2PError::MCP("MCP server not enabled".to_string()))
1858 }
1859 }
1860
1861 pub async fn announce_mcp_services(&self) -> Result<()> {
1863 if let Some(ref mcp_server) = self.mcp_server {
1864 mcp_server.announce_local_services().await
1865 } else {
1866 Err(P2PError::MCP("MCP server not enabled".to_string()))
1867 }
1868 }
1869
1870 pub async fn refresh_mcp_service_discovery(&self) -> Result<()> {
1872 if let Some(ref mcp_server) = self.mcp_server {
1873 mcp_server.refresh_service_discovery().await
1874 } else {
1875 Err(P2PError::MCP("MCP server not enabled".to_string()))
1876 }
1877 }
1878
1879 pub async fn query_peer_mcp_services(&self, peer_id: &PeerId) -> Result<()> {
1881 if self.mcp_server.is_none() {
1882 return Err(P2PError::MCP("MCP server not enabled".to_string()));
1883 }
1884
1885 let discovery_query = crate::mcp::P2PMCPMessage {
1886 message_type: crate::mcp::P2PMCPMessageType::ServiceDiscovery,
1887 message_id: uuid::Uuid::new_v4().to_string(),
1888 source_peer: self.peer_id.clone(),
1889 target_peer: Some(peer_id.clone()),
1890 timestamp: std::time::SystemTime::now()
1891 .duration_since(std::time::UNIX_EPOCH)
1892 .unwrap_or_default()
1893 .as_secs(),
1894 payload: crate::mcp::MCPMessage::ListTools {
1895 cursor: None,
1896 },
1897 ttl: 3,
1898 };
1899
1900 let query_data = serde_json::to_vec(&discovery_query)
1901 .map_err(|e| P2PError::Serialization(e))?;
1902
1903 self.send_message(peer_id, MCP_PROTOCOL, query_data).await?;
1904 debug!("Sent MCP service discovery query to peer {}", peer_id);
1905
1906 Ok(())
1907 }
1908
1909 pub async fn broadcast_mcp_service_discovery(&self) -> Result<()> {
1911 if self.mcp_server.is_none() {
1912 return Err(P2PError::MCP("MCP server not enabled".to_string()));
1913 }
1914
1915 let peer_list: Vec<PeerId> = {
1917 let peers_guard = self.peers.read().await;
1918 peers_guard.keys().cloned().collect()
1919 };
1920
1921 if peer_list.is_empty() {
1922 debug!("No peers connected for MCP service discovery broadcast");
1923 return Ok(());
1924 }
1925
1926 let mut successful_queries = 0;
1928 for peer_id in &peer_list {
1929 match self.query_peer_mcp_services(peer_id).await {
1930 Ok(_) => {
1931 successful_queries += 1;
1932 debug!("Sent MCP service discovery query to peer: {}", peer_id);
1933 }
1934 Err(e) => {
1935 warn!("Failed to send MCP service discovery query to peer {}: {}", peer_id, e);
1936 }
1937 }
1938 }
1939
1940 info!("Broadcast MCP service discovery to {}/{} connected peers",
1941 successful_queries, peer_list.len());
1942
1943 Ok(())
1944 }
1945}
1946
1947#[derive(Clone)]
1949pub struct P2PNetworkSender {
1950 peer_id: PeerId,
1951 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1953}
1954
1955impl P2PNetworkSender {
1956 pub fn new(peer_id: PeerId, send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>) -> Self {
1957 Self {
1958 peer_id,
1959 send_tx,
1960 }
1961 }
1962}
1963
1964#[async_trait::async_trait]
1966impl NetworkSender for P2PNetworkSender {
1967 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1969 self.send_tx.send((peer_id.clone(), protocol.to_string(), data))
1970 .map_err(|_| P2PError::Network("Failed to send message via channel".to_string()))?;
1971 Ok(())
1972 }
1973
1974 fn local_peer_id(&self) -> &PeerId {
1976 &self.peer_id
1977 }
1978}
1979
1980pub struct NodeBuilder {
1982 config: NodeConfig,
1983}
1984
1985impl NodeBuilder {
1986 pub fn new() -> Self {
1988 Self {
1989 config: NodeConfig::default(),
1990 }
1991 }
1992
1993 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1995 self.config.peer_id = Some(peer_id);
1996 self
1997 }
1998
1999 pub fn listen_on(mut self, addr: &str) -> Self {
2001 self.config.listen_addrs.push(addr.to_string());
2002 self
2003 }
2004
2005 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2007 self.config.bootstrap_peers.push(addr.to_string());
2008 self
2009 }
2010
2011 pub fn with_ipv6(mut self, enable: bool) -> Self {
2013 self.config.enable_ipv6 = enable;
2014 self
2015 }
2016
2017 pub fn with_mcp_server(mut self) -> Self {
2019 self.config.enable_mcp_server = true;
2020 self
2021 }
2022
2023 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
2025 self.config.mcp_server_config = Some(mcp_config);
2026 self.config.enable_mcp_server = true;
2027 self
2028 }
2029
2030 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2032 self.config.connection_timeout = timeout;
2033 self
2034 }
2035
2036 pub fn with_max_connections(mut self, max: usize) -> Self {
2038 self.config.max_connections = max;
2039 self
2040 }
2041
2042 pub fn with_production_mode(mut self) -> Self {
2044 self.config.production_config = Some(ProductionConfig::default());
2045 self
2046 }
2047
2048 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2050 self.config.production_config = Some(production_config);
2051 self
2052 }
2053
2054 pub async fn build(self) -> Result<P2PNode> {
2056 P2PNode::new(self.config).await
2057 }
2058}
2059
2060async fn handle_received_message_standalone(
2062 message_data: Vec<u8>,
2063 peer_id: &PeerId,
2064 protocol: &str,
2065 event_tx: &broadcast::Sender<P2PEvent>,
2066 mcp_server: &Option<Arc<crate::mcp::MCPServer>>
2067) -> Result<()> {
2068 if protocol == MCP_PROTOCOL {
2070 return handle_mcp_message_standalone(message_data, peer_id, mcp_server).await;
2071 }
2072
2073 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2075 Ok(message) => {
2076 if let (Some(protocol), Some(data), Some(from)) = (
2077 message.get("protocol").and_then(|v| v.as_str()),
2078 message.get("data").and_then(|v| v.as_array()),
2079 message.get("from").and_then(|v| v.as_str())
2080 ) {
2081 let data_bytes: Vec<u8> = data.iter()
2083 .filter_map(|v| v.as_u64().map(|n| n as u8))
2084 .collect();
2085
2086 let event = P2PEvent::Message {
2088 topic: protocol.to_string(),
2089 source: from.to_string(),
2090 data: data_bytes,
2091 };
2092
2093 let _ = event_tx.send(event);
2094 debug!("Generated message event from peer: {}", peer_id);
2095 }
2096 }
2097 Err(e) => {
2098 warn!("Failed to parse received message from {}: {}", peer_id, e);
2099 }
2100 }
2101
2102 Ok(())
2103}
2104
2105async fn handle_mcp_message_standalone(
2107 message_data: Vec<u8>,
2108 peer_id: &PeerId,
2109 mcp_server: &Option<Arc<crate::mcp::MCPServer>>
2110) -> Result<()> {
2111 if let Some(ref mcp_server) = mcp_server {
2112 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
2114 Ok(_p2p_mcp_message) => {
2115 debug!("Received MCP message from peer {}", peer_id);
2117 }
2118 Err(e) => {
2119 warn!("Failed to deserialize MCP message from peer {}: {}", peer_id, e);
2120 return Err(P2PError::MCP(format!("Invalid MCP message: {}", e)));
2121 }
2122 }
2123 } else {
2124 warn!("Received MCP message but MCP server is not enabled");
2125 return Err(P2PError::MCP("MCP server not enabled".to_string()));
2126 }
2127
2128 Ok(())
2129}
2130
2131#[cfg(test)]
2132mod tests {
2133 use super::*;
2134 use crate::mcp::{Tool, MCPTool, ToolHandler, ToolMetadata, ToolHealthStatus, ToolRequirements};
2135 use serde_json::json;
2136 use std::pin::Pin;
2137 use std::future::Future;
2138 use std::time::Duration;
2139 use tokio::time::timeout;
2140
2141 struct NetworkTestTool {
2143 name: String,
2144 }
2145
2146 impl NetworkTestTool {
2147 fn new(name: &str) -> Self {
2148 Self {
2149 name: name.to_string(),
2150 }
2151 }
2152 }
2153
2154 impl ToolHandler for NetworkTestTool {
2155 fn execute(&self, arguments: serde_json::Value) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
2156 let name = self.name.clone();
2157 Box::pin(async move {
2158 Ok(json!({
2159 "tool": name,
2160 "input": arguments,
2161 "result": "network test success"
2162 }))
2163 })
2164 }
2165
2166 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
2167 Ok(())
2168 }
2169
2170 fn get_requirements(&self) -> ToolRequirements {
2171 ToolRequirements::default()
2172 }
2173 }
2174
2175 fn create_test_node_config() -> NodeConfig {
2177 NodeConfig {
2178 peer_id: Some("test_peer_123".to_string()),
2179 listen_addrs: vec![
2180 "/ip6/::1/tcp/9001".to_string(),
2181 "/ip4/127.0.0.1/tcp/9001".to_string(),
2182 ],
2183 listen_addr: "127.0.0.1:9001".parse().unwrap(),
2184 bootstrap_peers: vec![],
2185 bootstrap_peers_str: vec![],
2186 enable_ipv6: true,
2187 enable_mcp_server: true,
2188 mcp_server_config: Some(MCPServerConfig {
2189 enable_auth: false, enable_rate_limiting: false, ..Default::default()
2192 }),
2193 connection_timeout: Duration::from_secs(10),
2194 keep_alive_interval: Duration::from_secs(30),
2195 max_connections: 100,
2196 max_incoming_connections: 50,
2197 dht_config: DHTConfig::default(),
2198 security_config: SecurityConfig::default(),
2199 production_config: None,
2200 bootstrap_cache_config: None,
2201 identity_config: None,
2202 }
2203 }
2204
2205 fn create_test_tool(name: &str) -> Tool {
2207 Tool {
2208 definition: MCPTool {
2209 name: name.to_string(),
2210 description: format!("Test tool: {}", name),
2211 input_schema: json!({
2212 "type": "object",
2213 "properties": {
2214 "input": { "type": "string" }
2215 }
2216 }),
2217 },
2218 handler: Box::new(NetworkTestTool::new(name)),
2219 metadata: ToolMetadata {
2220 created_at: SystemTime::now(),
2221 last_called: None,
2222 call_count: 0,
2223 avg_execution_time: Duration::from_millis(0),
2224 health_status: ToolHealthStatus::Healthy,
2225 tags: vec!["test".to_string()],
2226 },
2227 }
2228 }
2229
2230 #[tokio::test]
2231 async fn test_node_config_default() {
2232 let config = NodeConfig::default();
2233
2234 assert!(config.peer_id.is_none());
2235 assert_eq!(config.listen_addrs.len(), 2);
2236 assert!(config.enable_ipv6);
2237 assert!(config.enable_mcp_server);
2238 assert_eq!(config.max_connections, 1000);
2239 assert_eq!(config.max_incoming_connections, 100);
2240 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2241 }
2242
2243 #[tokio::test]
2244 async fn test_dht_config_default() {
2245 let config = DHTConfig::default();
2246
2247 assert_eq!(config.k_value, 20);
2248 assert_eq!(config.alpha_value, 5);
2249 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2250 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2251 }
2252
2253 #[tokio::test]
2254 async fn test_security_config_default() {
2255 let config = SecurityConfig::default();
2256
2257 assert!(config.enable_noise);
2258 assert!(config.enable_tls);
2259 assert_eq!(config.trust_level, TrustLevel::Basic);
2260 }
2261
2262 #[test]
2263 fn test_trust_level_variants() {
2264 let _none = TrustLevel::None;
2266 let _basic = TrustLevel::Basic;
2267 let _full = TrustLevel::Full;
2268
2269 assert_eq!(TrustLevel::None, TrustLevel::None);
2271 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2272 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2273 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2274 }
2275
2276 #[test]
2277 fn test_connection_status_variants() {
2278 let connecting = ConnectionStatus::Connecting;
2279 let connected = ConnectionStatus::Connected;
2280 let disconnecting = ConnectionStatus::Disconnecting;
2281 let disconnected = ConnectionStatus::Disconnected;
2282 let failed = ConnectionStatus::Failed("test error".to_string());
2283
2284 assert_eq!(connecting, ConnectionStatus::Connecting);
2285 assert_eq!(connected, ConnectionStatus::Connected);
2286 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2287 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2288 assert_ne!(connecting, connected);
2289
2290 if let ConnectionStatus::Failed(msg) = failed {
2291 assert_eq!(msg, "test error");
2292 } else {
2293 panic!("Expected Failed status");
2294 }
2295 }
2296
2297 #[tokio::test]
2298 async fn test_node_creation() -> Result<()> {
2299 let config = create_test_node_config();
2300 let node = P2PNode::new(config).await?;
2301
2302 assert_eq!(node.peer_id(), "test_peer_123");
2303 assert!(!node.is_running().await);
2304 assert_eq!(node.peer_count().await, 0);
2305 assert!(node.connected_peers().await.is_empty());
2306
2307 Ok(())
2308 }
2309
2310 #[tokio::test]
2311 async fn test_node_creation_without_peer_id() -> Result<()> {
2312 let mut config = create_test_node_config();
2313 config.peer_id = None;
2314
2315 let node = P2PNode::new(config).await?;
2316
2317 assert!(node.peer_id().starts_with("peer_"));
2319 assert!(!node.is_running().await);
2320
2321 Ok(())
2322 }
2323
2324 #[tokio::test]
2325 async fn test_node_lifecycle() -> Result<()> {
2326 let config = create_test_node_config();
2327 let node = P2PNode::new(config).await?;
2328
2329 assert!(!node.is_running().await);
2331
2332 node.start().await?;
2334 assert!(node.is_running().await);
2335
2336 let listen_addrs = node.listen_addrs().await;
2338 assert_eq!(listen_addrs.len(), 2);
2339
2340 node.stop().await?;
2342 assert!(!node.is_running().await);
2343
2344 Ok(())
2345 }
2346
2347 #[tokio::test]
2348 async fn test_peer_connection() -> Result<()> {
2349 let config = create_test_node_config();
2350 let node = P2PNode::new(config).await?;
2351
2352 let peer_addr = "/ip4/127.0.0.1/tcp/9002".to_string();
2353
2354 let peer_id = node.connect_peer(&peer_addr).await?;
2356 assert!(peer_id.starts_with("peer_from_"));
2357
2358 assert_eq!(node.peer_count().await, 1);
2360
2361 let connected_peers = node.connected_peers().await;
2363 assert_eq!(connected_peers.len(), 1);
2364 assert_eq!(connected_peers[0], peer_id);
2365
2366 let peer_info = node.peer_info(&peer_id).await;
2368 assert!(peer_info.is_some());
2369 let info = peer_info.unwrap();
2370 assert_eq!(info.peer_id, peer_id);
2371 assert_eq!(info.status, ConnectionStatus::Connected);
2372 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2373
2374 node.disconnect_peer(&peer_id).await?;
2376 assert_eq!(node.peer_count().await, 0);
2377
2378 Ok(())
2379 }
2380
2381 #[tokio::test]
2382 async fn test_event_subscription() -> Result<()> {
2383 let config = create_test_node_config();
2384 let node = P2PNode::new(config).await?;
2385
2386 let mut events = node.subscribe_events();
2387 let peer_addr = "/ip4/127.0.0.1/tcp/9003".to_string();
2388
2389 let peer_id = node.connect_peer(&peer_addr).await?;
2391
2392 let event = timeout(Duration::from_millis(100), events.recv()).await;
2394 assert!(event.is_ok());
2395
2396 match event.unwrap().unwrap() {
2397 P2PEvent::PeerConnected(event_peer_id) => {
2398 assert_eq!(event_peer_id, peer_id);
2399 }
2400 _ => panic!("Expected PeerConnected event"),
2401 }
2402
2403 node.disconnect_peer(&peer_id).await?;
2405
2406 let event = timeout(Duration::from_millis(100), events.recv()).await;
2408 assert!(event.is_ok());
2409
2410 match event.unwrap().unwrap() {
2411 P2PEvent::PeerDisconnected(event_peer_id) => {
2412 assert_eq!(event_peer_id, peer_id);
2413 }
2414 _ => panic!("Expected PeerDisconnected event"),
2415 }
2416
2417 Ok(())
2418 }
2419
2420 #[tokio::test]
2421 async fn test_message_sending() -> Result<()> {
2422 let config = create_test_node_config();
2423 let node = P2PNode::new(config).await?;
2424
2425 let peer_addr = "/ip4/127.0.0.1/tcp/9004".to_string();
2426 let peer_id = node.connect_peer(&peer_addr).await?;
2427
2428 let message_data = b"Hello, peer!".to_vec();
2430 let result = node.send_message(&peer_id, "test-protocol", message_data).await;
2431 assert!(result.is_ok());
2432
2433 let non_existent_peer = "non_existent_peer".to_string();
2435 let result = node.send_message(&non_existent_peer, "test-protocol", vec![]).await;
2436 assert!(result.is_err());
2437 assert!(result.unwrap_err().to_string().contains("not connected"));
2438
2439 Ok(())
2440 }
2441
2442 #[tokio::test]
2443 async fn test_mcp_integration() -> Result<()> {
2444 let config = create_test_node_config();
2445 let node = P2PNode::new(config).await?;
2446
2447 node.start().await?;
2449
2450 let tool = create_test_tool("network_test_tool");
2452 node.register_mcp_tool(tool).await?;
2453
2454 let tools = node.list_mcp_tools().await?;
2456 assert!(tools.contains(&"network_test_tool".to_string()));
2457
2458 let arguments = json!({"input": "test_input"});
2460 let result = node.call_mcp_tool("network_test_tool", arguments.clone()).await?;
2461 assert_eq!(result["tool"], "network_test_tool");
2462 assert_eq!(result["input"], arguments);
2463
2464 let stats = node.mcp_stats().await?;
2466 assert_eq!(stats.total_tools, 1);
2467
2468 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
2470 assert!(result.is_err());
2471
2472 node.stop().await?;
2473 Ok(())
2474 }
2475
2476 #[tokio::test]
2477 async fn test_remote_mcp_operations() -> Result<()> {
2478 let config = create_test_node_config();
2479 let node = P2PNode::new(config).await?;
2480
2481 node.start().await?;
2482
2483 let tool = create_test_tool("remote_test_tool");
2485 node.register_mcp_tool(tool).await?;
2486
2487 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
2488 let peer_id = node.connect_peer(&peer_addr).await?;
2489
2490 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
2492 assert!(!remote_tools.is_empty());
2493
2494 let arguments = json!({"input": "remote_test"});
2496 let result = node.call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone()).await?;
2497 assert_eq!(result["tool"], "remote_test_tool");
2498
2499 let services = node.discover_remote_mcp_services().await?;
2501 assert!(services.is_empty());
2503
2504 node.stop().await?;
2505 Ok(())
2506 }
2507
2508 #[tokio::test]
2509 async fn test_health_check() -> Result<()> {
2510 let config = create_test_node_config();
2511 let node = P2PNode::new(config).await?;
2512
2513 let result = node.health_check().await;
2515 assert!(result.is_ok());
2516
2517 for i in 0..5 {
2519 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
2520 node.connect_peer(&addr).await?;
2521 }
2522
2523 let result = node.health_check().await;
2525 assert!(result.is_ok());
2526
2527 Ok(())
2528 }
2529
2530 #[tokio::test]
2531 async fn test_node_uptime() -> Result<()> {
2532 let config = create_test_node_config();
2533 let node = P2PNode::new(config).await?;
2534
2535 let uptime1 = node.uptime();
2536 assert!(uptime1 >= Duration::from_secs(0));
2537
2538 tokio::time::sleep(Duration::from_millis(10)).await;
2540
2541 let uptime2 = node.uptime();
2542 assert!(uptime2 > uptime1);
2543
2544 Ok(())
2545 }
2546
2547 #[tokio::test]
2548 async fn test_node_config_access() -> Result<()> {
2549 let config = create_test_node_config();
2550 let expected_peer_id = config.peer_id.clone();
2551 let node = P2PNode::new(config).await?;
2552
2553 let node_config = node.config();
2554 assert_eq!(node_config.peer_id, expected_peer_id);
2555 assert_eq!(node_config.max_connections, 100);
2556 assert!(node_config.enable_mcp_server);
2557
2558 Ok(())
2559 }
2560
2561 #[tokio::test]
2562 async fn test_mcp_server_access() -> Result<()> {
2563 let config = create_test_node_config();
2564 let node = P2PNode::new(config).await?;
2565
2566 assert!(node.mcp_server().is_some());
2568
2569 let mut config = create_test_node_config();
2571 config.enable_mcp_server = false;
2572 let node_no_mcp = P2PNode::new(config).await?;
2573 assert!(node_no_mcp.mcp_server().is_none());
2574
2575 Ok(())
2576 }
2577
2578 #[tokio::test]
2579 async fn test_dht_access() -> Result<()> {
2580 let config = create_test_node_config();
2581 let node = P2PNode::new(config).await?;
2582
2583 assert!(node.dht().is_some());
2585
2586 Ok(())
2587 }
2588
2589 #[tokio::test]
2590 async fn test_node_builder() -> Result<()> {
2591 let node = P2PNode::builder()
2592 .with_peer_id("builder_test_peer".to_string())
2593 .listen_on("/ip4/127.0.0.1/tcp/9100")
2594 .listen_on("/ip6/::1/tcp/9100")
2595 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
2596 .with_ipv6(true)
2597 .with_mcp_server()
2598 .with_connection_timeout(Duration::from_secs(15))
2599 .with_max_connections(200)
2600 .build()
2601 .await?;
2602
2603 assert_eq!(node.peer_id(), "builder_test_peer");
2604 let config = node.config();
2605 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
2607 assert!(config.enable_ipv6);
2608 assert!(config.enable_mcp_server);
2609 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2610 assert_eq!(config.max_connections, 200);
2611
2612 Ok(())
2613 }
2614
2615 #[tokio::test]
2616 async fn test_node_builder_with_mcp_config() -> Result<()> {
2617 let mcp_config = MCPServerConfig {
2618 server_name: "test_mcp_server".to_string(),
2619 server_version: "1.0.0".to_string(),
2620 enable_dht_discovery: false,
2621 enable_auth: false,
2622 ..MCPServerConfig::default()
2623 };
2624
2625 let node = P2PNode::builder()
2626 .with_peer_id("mcp_config_test".to_string())
2627 .with_mcp_config(mcp_config.clone())
2628 .build()
2629 .await?;
2630
2631 assert_eq!(node.peer_id(), "mcp_config_test");
2632 let config = node.config();
2633 assert!(config.enable_mcp_server);
2634 assert!(config.mcp_server_config.is_some());
2635
2636 let node_mcp_config = config.mcp_server_config.as_ref().unwrap();
2637 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
2638 assert!(!node_mcp_config.enable_auth);
2639
2640 Ok(())
2641 }
2642
2643 #[tokio::test]
2644 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
2645 let mut config = create_test_node_config();
2646 config.enable_mcp_server = false;
2647 let node = P2PNode::new(config).await?;
2648
2649 let tool = create_test_tool("test_tool");
2651 let result = node.register_mcp_tool(tool).await;
2652 assert!(result.is_err());
2653 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2654
2655 let result = node.call_mcp_tool("test_tool", json!({})).await;
2656 assert!(result.is_err());
2657 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2658
2659 let result = node.list_mcp_tools().await;
2660 assert!(result.is_err());
2661 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2662
2663 let result = node.mcp_stats().await;
2664 assert!(result.is_err());
2665 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2666
2667 Ok(())
2668 }
2669
2670 #[tokio::test]
2671 async fn test_bootstrap_peers() -> Result<()> {
2672 let mut config = create_test_node_config();
2673 config.bootstrap_peers = vec![
2674 "/ip4/127.0.0.1/tcp/9200".to_string(),
2675 "/ip4/127.0.0.1/tcp/9201".to_string(),
2676 ];
2677
2678 let node = P2PNode::new(config).await?;
2679
2680 node.start().await?;
2682
2683 let peer_count = node.peer_count().await;
2686 assert!(peer_count <= 2, "Peer count should not exceed bootstrap peer count");
2687
2688 node.stop().await?;
2689 Ok(())
2690 }
2691
2692 #[tokio::test]
2693 async fn test_production_mode_disabled() -> Result<()> {
2694 let config = create_test_node_config();
2695 let node = P2PNode::new(config).await?;
2696
2697 assert!(!node.is_production_mode());
2698 assert!(node.production_config().is_none());
2699
2700 let result = node.resource_metrics().await;
2702 assert!(result.is_err());
2703 assert!(result.unwrap_err().to_string().contains("not enabled"));
2704
2705 Ok(())
2706 }
2707
2708 #[tokio::test]
2709 async fn test_network_event_variants() {
2710 let peer_id = "test_peer".to_string();
2712 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2713
2714 let _peer_connected = NetworkEvent::PeerConnected {
2715 peer_id: peer_id.clone(),
2716 addresses: vec![address.clone()],
2717 };
2718
2719 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2720 peer_id: peer_id.clone(),
2721 reason: "test disconnect".to_string(),
2722 };
2723
2724 let _message_received = NetworkEvent::MessageReceived {
2725 peer_id: peer_id.clone(),
2726 protocol: "test-protocol".to_string(),
2727 data: vec![1, 2, 3],
2728 };
2729
2730 let _connection_failed = NetworkEvent::ConnectionFailed {
2731 peer_id: Some(peer_id.clone()),
2732 address: address.clone(),
2733 error: "connection refused".to_string(),
2734 };
2735
2736 let _dht_stored = NetworkEvent::DHTRecordStored {
2737 key: vec![1, 2, 3],
2738 value: vec![4, 5, 6],
2739 };
2740
2741 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2742 key: vec![1, 2, 3],
2743 value: Some(vec![4, 5, 6]),
2744 };
2745 }
2746
2747 #[tokio::test]
2748 async fn test_peer_info_structure() {
2749 let peer_info = PeerInfo {
2750 peer_id: "test_peer".to_string(),
2751 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2752 connected_at: Instant::now(),
2753 last_seen: Instant::now(),
2754 status: ConnectionStatus::Connected,
2755 protocols: vec!["test-protocol".to_string()],
2756 heartbeat_count: 0,
2757 };
2758
2759 assert_eq!(peer_info.peer_id, "test_peer");
2760 assert_eq!(peer_info.addresses.len(), 1);
2761 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2762 assert_eq!(peer_info.protocols.len(), 1);
2763 }
2764
2765 #[tokio::test]
2766 async fn test_serialization() -> Result<()> {
2767 let config = create_test_node_config();
2769 let serialized = serde_json::to_string(&config)?;
2770 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2771
2772 assert_eq!(config.peer_id, deserialized.peer_id);
2773 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2774 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2775
2776 Ok(())
2777 }
2778}