1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23use crate::identity::manager::IdentityManagerConfig;
24use crate::mcp::{
25 HealthMonitorConfig, MCP_PROTOCOL, MCPCallContext, MCPServer, MCPServerConfig, NetworkSender,
26 Tool,
27};
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::HashMap;
38use std::sync::Arc;
39use std::time::{Duration, SystemTime};
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, info, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub enable_mcp_server: bool,
67
68 pub mcp_server_config: Option<MCPServerConfig>,
70
71 pub connection_timeout: Duration,
73
74 pub keep_alive_interval: Duration,
76
77 pub max_connections: usize,
79
80 pub max_incoming_connections: usize,
82
83 pub dht_config: DHTConfig,
85
86 pub security_config: SecurityConfig,
88
89 pub production_config: Option<ProductionConfig>,
91
92 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
94
95 pub identity_config: Option<IdentityManagerConfig>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct DHTConfig {
102 pub k_value: usize,
104
105 pub alpha_value: usize,
107
108 pub record_ttl: Duration,
110
111 pub refresh_interval: Duration,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SecurityConfig {
118 pub enable_noise: bool,
120
121 pub enable_tls: bool,
123
124 pub trust_level: TrustLevel,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
130pub enum TrustLevel {
131 None,
133 Basic,
135 Full,
137}
138
139impl NodeConfig {
140 pub fn new() -> Result<Self> {
146 let config = Config::default();
148
149 let listen_addr = config.listen_socket_addr()?;
151
152 let mut listen_addrs = vec![];
154
155 if config.network.ipv6_enabled {
157 let ipv6_addr = std::net::SocketAddr::new(
158 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
159 listen_addr.port(),
160 );
161 listen_addrs.push(ipv6_addr);
162 }
163
164 let ipv4_addr = std::net::SocketAddr::new(
166 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
167 listen_addr.port(),
168 );
169 listen_addrs.push(ipv4_addr);
170
171 Ok(Self {
172 peer_id: None,
173 listen_addrs,
174 listen_addr,
175 bootstrap_peers: Vec::new(),
176 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
177 enable_ipv6: config.network.ipv6_enabled,
178 enable_mcp_server: config.mcp.enabled,
179 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
181 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
182 max_connections: config.network.max_connections,
183 max_incoming_connections: config.security.connection_limit as usize,
184 dht_config: DHTConfig::default(),
185 security_config: SecurityConfig::default(),
186 production_config: None,
187 bootstrap_cache_config: None,
188 identity_config: None,
189 })
190 }
191}
192
193impl Default for NodeConfig {
194 fn default() -> Self {
195 let config = Config::default();
197
198 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
202 9000,
203 )
204 });
205
206 Self {
207 peer_id: None,
208 listen_addrs: vec![
209 std::net::SocketAddr::new(
210 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
211 listen_addr.port(),
212 ),
213 std::net::SocketAddr::new(
214 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
215 listen_addr.port(),
216 ),
217 ],
218 listen_addr,
219 bootstrap_peers: Vec::new(),
220 bootstrap_peers_str: Vec::new(),
221 enable_ipv6: config.network.ipv6_enabled,
222 enable_mcp_server: config.mcp.enabled,
223 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
225 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
226 max_connections: config.network.max_connections,
227 max_incoming_connections: config.security.connection_limit as usize,
228 dht_config: DHTConfig::default(),
229 security_config: SecurityConfig::default(),
230 production_config: None, bootstrap_cache_config: None,
232 identity_config: None, }
234 }
235}
236
237impl NodeConfig {
238 pub fn from_config(config: &Config) -> Result<Self> {
240 let listen_addr = config.listen_socket_addr()?;
241 let bootstrap_addrs = config.bootstrap_addrs()?;
242
243 let mut node_config = Self {
244 peer_id: None,
245 listen_addrs: vec![listen_addr],
246 listen_addr,
247 bootstrap_peers: bootstrap_addrs
248 .iter()
249 .map(|addr| addr.socket_addr())
250 .collect(),
251 bootstrap_peers_str: config
252 .network
253 .bootstrap_nodes
254 .iter()
255 .map(|addr| addr.to_string())
256 .collect(),
257 enable_ipv6: config.network.ipv6_enabled,
258 enable_mcp_server: config.mcp.enabled,
259 mcp_server_config: Some(MCPServerConfig {
260 server_name: "P2P-MCP-Server".to_string(),
261 server_version: "1.0.0".to_string(),
262 enable_dht_discovery: true,
263 max_concurrent_requests: 100,
264 request_timeout: Duration::from_secs(30),
265 enable_auth: false,
266 enable_rate_limiting: true,
267 rate_limit_rpm: 60,
268 enable_logging: true,
269 max_tool_execution_time: Duration::from_secs(60),
270 tool_memory_limit: 1024 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
272 }),
273 connection_timeout: Duration::from_secs(config.network.connection_timeout),
274 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
275 max_connections: config.network.max_connections,
276 max_incoming_connections: config.security.connection_limit as usize,
277 dht_config: DHTConfig {
278 k_value: 20,
279 alpha_value: 3,
280 record_ttl: Duration::from_secs(3600),
281 refresh_interval: Duration::from_secs(900),
282 },
283 security_config: SecurityConfig {
284 enable_noise: true,
285 enable_tls: true,
286 trust_level: TrustLevel::Basic,
287 },
288 production_config: Some(ProductionConfig {
289 max_connections: config.network.max_connections,
290 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
293 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
294 health_check_interval: Duration::from_secs(30),
295 metrics_interval: Duration::from_secs(60),
296 enable_performance_tracking: true,
297 enable_auto_cleanup: true,
298 shutdown_timeout: Duration::from_secs(30),
299 rate_limits: crate::production::RateLimitConfig::default(),
300 }),
301 bootstrap_cache_config: None,
302 identity_config: Some(IdentityManagerConfig {
303 cache_ttl: Duration::from_secs(3600),
304 challenge_timeout: Duration::from_secs(30),
305 }),
306 };
307
308 if config.network.ipv6_enabled {
310 node_config.listen_addrs.push(std::net::SocketAddr::new(
311 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
312 listen_addr.port(),
313 ));
314 }
315
316 Ok(node_config)
317 }
318
319 pub fn with_listen_addr(addr: &str) -> Result<Self> {
321 let listen_addr: std::net::SocketAddr = addr
322 .parse()
323 .map_err(|e: std::net::AddrParseError| {
324 NetworkError::InvalidAddress(e.to_string().into())
325 })
326 .map_err(P2PError::Network)?;
327 let cfg = NodeConfig {
328 listen_addr,
329 listen_addrs: vec![listen_addr],
330 ..Default::default()
331 };
332 Ok(cfg)
333 }
334}
335
336impl Default for DHTConfig {
337 fn default() -> Self {
338 Self {
339 k_value: 20,
340 alpha_value: 5,
341 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
344 }
345}
346
347impl Default for SecurityConfig {
348 fn default() -> Self {
349 Self {
350 enable_noise: true,
351 enable_tls: true,
352 trust_level: TrustLevel::Basic,
353 }
354 }
355}
356
357#[derive(Debug, Clone)]
359pub struct PeerInfo {
360 pub peer_id: PeerId,
362
363 pub addresses: Vec<String>,
365
366 pub connected_at: Instant,
368
369 pub last_seen: Instant,
371
372 pub status: ConnectionStatus,
374
375 pub protocols: Vec<String>,
377
378 pub heartbeat_count: u64,
380}
381
382#[derive(Debug, Clone, PartialEq)]
384pub enum ConnectionStatus {
385 Connecting,
387 Connected,
389 Disconnecting,
391 Disconnected,
393 Failed(String),
395}
396
397#[derive(Debug, Clone)]
399pub enum NetworkEvent {
400 PeerConnected {
402 peer_id: PeerId,
404 addresses: Vec<String>,
406 },
407
408 PeerDisconnected {
410 peer_id: PeerId,
412 reason: String,
414 },
415
416 MessageReceived {
418 peer_id: PeerId,
420 protocol: String,
422 data: Vec<u8>,
424 },
425
426 ConnectionFailed {
428 peer_id: Option<PeerId>,
430 address: String,
432 error: String,
434 },
435
436 DHTRecordStored {
438 key: Vec<u8>,
440 value: Vec<u8>,
442 },
443
444 DHTRecordRetrieved {
446 key: Vec<u8>,
448 value: Option<Vec<u8>>,
450 },
451}
452
453#[derive(Debug, Clone)]
458pub enum P2PEvent {
459 Message {
461 topic: String,
463 source: PeerId,
465 data: Vec<u8>,
467 },
468 PeerConnected(PeerId),
470 PeerDisconnected(PeerId),
472}
473
474pub struct P2PNode {
476 config: NodeConfig,
478
479 peer_id: PeerId,
481
482 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
484
485 event_tx: broadcast::Sender<P2PEvent>,
487
488 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
490
491 start_time: Instant,
493
494 running: RwLock<bool>,
496
497 mcp_server: Option<Arc<MCPServer>>,
499
500 dht: Option<Arc<RwLock<DHT>>>,
502
503 resource_manager: Option<Arc<ResourceManager>>,
505
506 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
508
509 dual_node: Arc<DualStackNetworkNode>,
511
512 #[allow(dead_code)]
514 rate_limiter: Arc<RateLimiter>,
515}
516
517impl P2PNode {
518 pub fn new_for_tests() -> Self {
520 let (event_tx, _) = broadcast::channel(16);
521 Self {
522 config: NodeConfig::default(),
523 peer_id: "test_peer".to_string(),
524 peers: Arc::new(RwLock::new(HashMap::new())),
525 event_tx,
526 listen_addrs: RwLock::new(Vec::new()),
527 start_time: Instant::now(),
528 running: RwLock::new(false),
529 mcp_server: None,
530 dht: None,
531 resource_manager: None,
532 bootstrap_manager: None,
533 dual_node: {
534 let v6: Option<std::net::SocketAddr> = Some("[::1]:0".parse().unwrap_or(std::net::SocketAddr::from(([0,0,0,0],0))));
536 let v4: Option<std::net::SocketAddr> = Some("127.0.0.1:0".parse().unwrap_or(std::net::SocketAddr::from(([127,0,0,1],0))));
537 let dual = tokio::runtime::Handle::current()
538 .block_on(crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4))
539 .unwrap_or_else(|_| {
540 tokio::runtime::Handle::current()
541 .block_on(crate::transport::ant_quic_adapter::DualStackNetworkNode::new(None, Some("127.0.0.1:0".parse().unwrap())))
542 .expect("dual-stack fallback creation failed")
543 });
544 Arc::new(dual)
545 },
546 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
547 max_requests: 100,
548 burst_size: 100,
549 window: std::time::Duration::from_secs(1),
550 ..Default::default()
551 })),
552 }
553 }
554 pub async fn new(config: NodeConfig) -> Result<Self> {
556 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
557 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
559 });
560
561 let (event_tx, _) = broadcast::channel(1000);
562
563 let dht = if true {
565 let _dht_config = crate::dht::DHTConfig {
567 replication_factor: config.dht_config.k_value,
568 bucket_size: config.dht_config.k_value,
569 alpha: config.dht_config.alpha_value,
570 record_ttl: config.dht_config.record_ttl,
571 bucket_refresh_interval: config.dht_config.refresh_interval,
572 republish_interval: config.dht_config.refresh_interval,
573 max_distance: 160, };
575 let peer_bytes = peer_id.as_bytes();
577 let mut node_id_bytes = [0u8; 32];
578 let len = peer_bytes.len().min(32);
579 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
580 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
581 let dht_instance = DHT::new(node_id).map_err(|e| {
582 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
583 e.to_string().into(),
584 ))
585 })?;
586 Some(Arc::new(RwLock::new(dht_instance)))
587 } else {
588 None
589 };
590
591 let mcp_server = if config.enable_mcp_server {
593 let mcp_config = config
594 .mcp_server_config
595 .clone()
596 .unwrap_or_else(|| MCPServerConfig {
597 server_name: format!("P2P-MCP-{peer_id}"),
598 server_version: crate::VERSION.to_string(),
599 enable_dht_discovery: dht.is_some(),
600 ..MCPServerConfig::default()
601 });
602
603 let mut server = MCPServer::new(mcp_config);
604
605 if let Some(ref dht_instance) = dht {
607 server = server.with_dht(dht_instance.clone());
608 }
609
610 Some(Arc::new(server))
611 } else {
612 None
613 };
614
615 let resource_manager = config
617 .production_config
618 .clone()
619 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
620
621 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
623 match BootstrapManager::with_config(cache_config.clone()).await {
624 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
625 Err(e) => {
626 warn!(
627 "Failed to initialize bootstrap manager: {}, continuing without cache",
628 e
629 );
630 None
631 }
632 }
633 } else {
634 match BootstrapManager::new().await {
635 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
636 Err(e) => {
637 warn!(
638 "Failed to initialize bootstrap manager: {}, continuing without cache",
639 e
640 );
641 None
642 }
643 }
644 };
645
646 let (mut v6_opt, mut v4_opt) = (None, None);
648 if !config.listen_addrs.is_empty() {
649 v6_opt = config
650 .listen_addrs
651 .iter()
652 .find(|a| a.is_ipv6())
653 .cloned();
654 v4_opt = config
655 .listen_addrs
656 .iter()
657 .find(|a| a.is_ipv4())
658 .cloned();
659 } else {
660 v4_opt = Some(std::net::SocketAddr::new(
662 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
663 config.listen_addr.port(),
664 ));
665 if config.enable_ipv6 {
666 v6_opt = Some(std::net::SocketAddr::new(
667 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
668 config.listen_addr.port(),
669 ));
670 }
671 }
672
673 let dual_node = Arc::new(
674 DualStackNetworkNode::new(v6_opt, v4_opt)
675 .await
676 .map_err(|e| {
677 P2PError::Transport(crate::error::TransportError::SetupFailed(
678 format!("Failed to create dual-stack network nodes: {}", e).into(),
679 ))
680 })?,
681 );
682
683 let rate_limiter = Arc::new(RateLimiter::new(
685 crate::validation::RateLimitConfig::default(),
686 ));
687
688 let node = Self {
689 config,
690 peer_id,
691 peers: Arc::new(RwLock::new(HashMap::new())),
692 event_tx,
693 listen_addrs: RwLock::new(Vec::new()),
694 start_time: Instant::now(),
695 running: RwLock::new(false),
696 mcp_server,
697 dht,
698 resource_manager,
699 bootstrap_manager,
700 dual_node,
701 rate_limiter,
702 };
703 info!("Created P2P node with peer ID: {}", node.peer_id);
704
705 Ok(node)
710 }
711
712 pub fn builder() -> NodeBuilder {
714 NodeBuilder::new()
715 }
716
717 pub fn peer_id(&self) -> &PeerId {
719 &self.peer_id
720 }
721
722 pub async fn initialize_mcp_network(&self) -> Result<()> {
725 if let Some(ref _mcp_server) = self.mcp_server {
726 let (send_tx, mut send_rx) =
728 tokio::sync::mpsc::unbounded_channel::<(PeerId, String, Vec<u8>)>();
729
730 let network_sender = P2PNetworkSender::new(self.peer_id.clone(), send_tx);
732
733 _mcp_server
735 .set_network_sender(Arc::new(network_sender))
736 .await;
737
738 tokio::spawn(async move {
741 while let Some((peer_id, protocol, data)) = send_rx.recv().await {
742 debug!(
743 "Sending network message to {}: {} bytes on protocol {}",
744 peer_id,
745 data.len(),
746 protocol
747 );
748
749 let message_data = match handle_protocol_message_creation(&protocol, data) {
751 Some(msg) => msg,
752 None => continue,
753 };
754
755 handle_message_send_result(Ok(()), &peer_id).await;
757 }
758 });
759
760 info!(
761 "MCP network integration initialized for peer {}",
762 self.peer_id
763 );
764 }
765 Ok(())
766 }
767
768 pub fn local_addr(&self) -> Option<String> {
769 self.listen_addrs
770 .try_read()
771 .ok()
772 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
773 }
774
775 pub async fn subscribe(&self, topic: &str) -> Result<()> {
776 info!("Subscribed to topic: {}", topic);
779 Ok(())
780 }
781
782 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
783 info!(
784 "Publishing message to topic: {} ({} bytes)",
785 topic,
786 data.len()
787 );
788
789 let peer_list: Vec<PeerId> = {
791 let peers_guard = self.peers.read().await;
792 peers_guard.keys().cloned().collect()
793 };
794
795 if peer_list.is_empty() {
796 debug!("No peers connected, message will only be sent to local subscribers");
797 } else {
798 let mut send_count = 0;
800 for peer_id in &peer_list {
801 match self.send_message(peer_id, topic, data.to_vec()).await {
802 Ok(_) => {
803 send_count += 1;
804 debug!("Sent message to peer: {}", peer_id);
805 }
806 Err(e) => {
807 warn!("Failed to send message to peer {}: {}", peer_id, e);
808 }
809 }
810 }
811 info!(
812 "Published message to {}/{} connected peers",
813 send_count,
814 peer_list.len()
815 );
816 }
817
818 let event = P2PEvent::Message {
820 topic: topic.to_string(),
821 source: self.peer_id.clone(),
822 data: data.to_vec(),
823 };
824 let _ = self.event_tx.send(event);
825
826 Ok(())
827 }
828
829 pub fn config(&self) -> &NodeConfig {
831 &self.config
832 }
833
834 pub async fn start(&self) -> Result<()> {
836 info!("Starting P2P node...");
837
838 if let Some(ref resource_manager) = self.resource_manager {
840 resource_manager.start().await.map_err(|e| {
841 P2PError::Network(crate::error::NetworkError::ProtocolError(
842 format!("Failed to start resource manager: {e}").into(),
843 ))
844 })?;
845 info!("Production resource manager started");
846 }
847
848 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
850 let mut manager = bootstrap_manager.write().await;
851 manager.start_background_tasks().await.map_err(|e| {
852 P2PError::Network(crate::error::NetworkError::ProtocolError(
853 format!("Failed to start bootstrap manager: {e}").into(),
854 ))
855 })?;
856 info!("Bootstrap cache manager started");
857 }
858
859 *self.running.write().await = true;
861
862 self.start_network_listeners().await?;
864
865 let listen_addrs = self.listen_addrs.read().await;
867 info!("P2P node started on addresses: {:?}", *listen_addrs);
868
869 self.initialize_mcp_network().await?;
871
872 if let Some(ref _mcp_server) = self.mcp_server {
874 _mcp_server.start().await.map_err(|e| {
875 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
876 format!("Failed to start MCP server: {e}").into(),
877 ))
878 })?;
879 info!("MCP server started with network integration");
880 }
881
882 self.start_message_receiving_system().await?;
884
885 self.connect_bootstrap_peers().await?;
887
888 Ok(())
889 }
890
891 async fn start_network_listeners(&self) -> Result<()> {
893 info!("Starting dual-stack listeners (ant-quic)...");
894 let addrs = self.dual_node.local_addrs();
896 {
897 let mut la = self.listen_addrs.write().await;
898 *la = addrs.clone();
899 }
900
901 let event_tx = self.event_tx.clone();
903 let peers = self.peers.clone();
904 let rate_limiter = self.rate_limiter.clone();
905 let dual = self.dual_node.clone();
906 tokio::spawn(async move {
907 loop {
908 match dual.accept_any().await {
909 Ok((ant_peer_id, remote_sock)) => {
910 let peer_id = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
911 let remote_addr = NetworkAddress::from(remote_sock);
912 let _ = rate_limiter.check_ip(&remote_sock.ip());
914 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
915 register_new_peer(&peers, &peer_id, &remote_addr).await;
916 }
917 Err(e) => {
918 warn!("Accept failed: {}", e);
919 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
920 }
921 }
922 }
923 });
924
925 info!("Dual-stack listeners active on: {:?}", addrs);
926 Ok(())
927 }
928
929 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
931 warn!("QUIC transport temporarily disabled during ant-quic migration");
970 Err(crate::P2PError::Transport(
972 crate::error::TransportError::SetupFailed(
973 format!(
974 "Failed to start QUIC listener on {addr} - transport disabled during migration"
975 )
976 .into(),
977 ),
978 ))
979 }
980
981 #[allow(dead_code)] async fn start_connection_acceptor(
984 &self,
985 transport: Arc<dyn crate::transport::Transport>,
986 addr: std::net::SocketAddr,
987 transport_type: crate::transport::TransportType,
988 ) -> Result<()> {
989 info!(
990 "Starting connection acceptor for {:?} on {}",
991 transport_type, addr
992 );
993
994 let event_tx = self.event_tx.clone();
996 let _peer_id = self.peer_id.clone();
997 let peers = Arc::clone(&self.peers);
998 let mcp_server = self.mcp_server.clone();
1000 let rate_limiter = Arc::clone(&self.rate_limiter);
1001
1002 tokio::spawn(async move {
1004 loop {
1005 match transport.accept().await {
1006 Ok(connection) => {
1007 let remote_addr = connection.remote_addr();
1008 let connection_peer_id =
1009 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1010
1011 let socket_addr = remote_addr.socket_addr();
1013 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1014 continue;
1016 }
1017
1018 info!(
1019 "Accepted {:?} connection from {} (peer: {})",
1020 transport_type, remote_addr, connection_peer_id
1021 );
1022
1023 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1025
1026 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1028
1029 spawn_connection_handler(
1031 connection,
1032 connection_peer_id,
1033 event_tx.clone(),
1034 Arc::clone(&peers),
1035 mcp_server.clone(),
1036 );
1037 }
1038 Err(e) => {
1039 warn!(
1040 "Failed to accept {:?} connection on {}: {}",
1041 transport_type, addr, e
1042 );
1043
1044 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1046 }
1047 }
1048 }
1049 });
1050
1051 info!(
1052 "Connection acceptor background task started for {:?} on {}",
1053 transport_type, addr
1054 );
1055 Ok(())
1056 }
1057
1058 async fn start_message_receiving_system(&self) -> Result<()> {
1060 info!("Message receiving system initialized (background tasks simplified for demo)");
1061
1062 Ok(())
1067 }
1068
1069 #[allow(dead_code)]
1071 async fn handle_received_message(
1072 &self,
1073 message_data: Vec<u8>,
1074 peer_id: &PeerId,
1075 protocol: &str,
1076 event_tx: &broadcast::Sender<P2PEvent>,
1077 ) -> Result<()> {
1078 if protocol == MCP_PROTOCOL {
1080 return self.handle_mcp_message(message_data, peer_id).await;
1081 }
1082
1083 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1085 Ok(message) => {
1086 if let (Some(protocol), Some(data), Some(from)) = (
1087 message.get("protocol").and_then(|v| v.as_str()),
1088 message.get("data").and_then(|v| v.as_array()),
1089 message.get("from").and_then(|v| v.as_str()),
1090 ) {
1091 let data_bytes: Vec<u8> = data
1093 .iter()
1094 .filter_map(|v| v.as_u64().map(|n| n as u8))
1095 .collect();
1096
1097 let event = P2PEvent::Message {
1099 topic: protocol.to_string(),
1100 source: from.to_string(),
1101 data: data_bytes,
1102 };
1103
1104 let _ = event_tx.send(event);
1105 debug!("Generated message event from peer: {}", peer_id);
1106 }
1107 }
1108 Err(e) => {
1109 warn!("Failed to parse received message from {}: {}", peer_id, e);
1110 }
1111 }
1112
1113 Ok(())
1114 }
1115
1116 #[allow(dead_code)]
1118 async fn handle_mcp_message(&self, message_data: Vec<u8>, peer_id: &PeerId) -> Result<()> {
1119 if let Some(ref _mcp_server) = self.mcp_server {
1120 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
1122 Ok(p2p_mcp_message) => {
1123 debug!(
1124 "Received MCP message from peer {}: {:?}",
1125 peer_id, p2p_mcp_message.message_type
1126 );
1127
1128 match p2p_mcp_message.message_type {
1130 crate::mcp::P2PMCPMessageType::Request => {
1131 self.handle_mcp_tool_request(p2p_mcp_message, peer_id)
1133 .await?;
1134 }
1135 crate::mcp::P2PMCPMessageType::Response => {
1136 self.handle_mcp_tool_response(p2p_mcp_message).await?;
1138 }
1139 crate::mcp::P2PMCPMessageType::ServiceAdvertisement => {
1140 self.handle_mcp_service_advertisement(p2p_mcp_message, peer_id)
1142 .await?;
1143 }
1144 crate::mcp::P2PMCPMessageType::ServiceDiscovery => {
1145 self.handle_mcp_service_discovery(p2p_mcp_message, peer_id)
1147 .await?;
1148 }
1149 crate::mcp::P2PMCPMessageType::Heartbeat => {
1150 debug!("Received heartbeat from peer {}", peer_id);
1152
1153 let _ =
1155 update_peer_heartbeat(&self.peers, peer_id)
1156 .await
1157 .map_err(|e| {
1158 debug!(
1159 "Failed to update heartbeat for peer {}: {}",
1160 peer_id, e
1161 )
1162 });
1163
1164 let timestamp = std::time::SystemTime::now()
1166 .duration_since(std::time::UNIX_EPOCH)
1167 .map_err(|e| {
1168 P2PError::Network(NetworkError::ProtocolError(
1169 format!("System time error: {}", e).into(),
1170 ))
1171 })?
1172 .as_secs();
1173
1174 let ack_data = serde_json::to_vec(&serde_json::json!({
1175 "type": "heartbeat_ack",
1176 "timestamp": timestamp
1177 }))
1178 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1179
1180 let _ = self
1181 .send_message(peer_id, MCP_PROTOCOL, ack_data)
1182 .await
1183 .map_err(|e| {
1184 warn!("Failed to send heartbeat ack to {}: {}", peer_id, e)
1185 });
1186 }
1187 crate::mcp::P2PMCPMessageType::HealthCheck => {
1188 debug!("Received health check from peer {}", peer_id);
1190
1191 let peers_count = self.peers.read().await.len();
1193 let uptime = self.start_time.elapsed();
1194
1195 let (memory_usage, cpu_usage) =
1197 get_resource_metrics(&self.resource_manager).await;
1198
1199 let timestamp = std::time::SystemTime::now()
1201 .duration_since(std::time::UNIX_EPOCH)
1202 .map_err(|e| {
1203 P2PError::Network(NetworkError::ProtocolError(
1204 format!("System time error: {}", e).into(),
1205 ))
1206 })?
1207 .as_secs();
1208
1209 let health_response = serde_json::json!({
1210 "type": "health_check_response",
1211 "status": "healthy",
1212 "peer_id": self.peer_id,
1213 "peers_count": peers_count,
1214 "uptime_secs": uptime.as_secs(),
1215 "memory_usage_bytes": memory_usage,
1216 "cpu_usage_percent": cpu_usage,
1217 "timestamp": timestamp
1218 });
1219
1220 let response_data = serde_json::to_vec(&health_response)
1221 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1222
1223 if let Err(e) = self
1225 .send_message(peer_id, MCP_PROTOCOL, response_data)
1226 .await
1227 {
1228 warn!("Failed to send health check response to {}: {}", peer_id, e);
1229 }
1230 }
1231 }
1232 }
1233 Err(e) => {
1234 warn!(
1235 "Failed to deserialize MCP message from peer {}: {}",
1236 peer_id, e
1237 );
1238 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1239 format!("Invalid MCP message: {e}").into(),
1240 )));
1241 }
1242 }
1243 } else {
1244 warn!("Received MCP message but MCP server is not enabled");
1245 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1246 "MCP server not enabled".to_string().into(),
1247 )));
1248 }
1249
1250 Ok(())
1251 }
1252
1253 #[allow(dead_code)]
1255 async fn handle_mcp_tool_request(
1256 &self,
1257 message: crate::mcp::P2PMCPMessage,
1258 peer_id: &PeerId,
1259 ) -> Result<()> {
1260 if let Some(ref _mcp_server) = self.mcp_server {
1261 if let crate::mcp::MCPMessage::CallTool { name, arguments } = message.payload {
1263 debug!(
1264 "Handling MCP tool request for '{}' from peer {}",
1265 name, peer_id
1266 );
1267
1268 let context = MCPCallContext {
1270 caller_id: peer_id.clone(),
1271 timestamp: std::time::SystemTime::now(),
1272 timeout: Duration::from_secs(30),
1273 auth_info: None,
1274 metadata: std::collections::HashMap::new(),
1275 };
1276
1277 match _mcp_server.call_tool(&name, arguments, context).await {
1279 Ok(result) => {
1280 let response_message = crate::mcp::P2PMCPMessage {
1282 message_type: crate::mcp::P2PMCPMessageType::Response,
1283 message_id: message.message_id,
1284 source_peer: self.peer_id.clone(),
1285 target_peer: Some(peer_id.clone()),
1286 timestamp: std::time::SystemTime::now()
1287 .duration_since(std::time::UNIX_EPOCH)
1288 .unwrap_or_default()
1289 .as_secs(),
1290 payload: crate::mcp::MCPMessage::CallToolResult {
1291 content: vec![crate::mcp::MCPContent::Text {
1292 text: serde_json::to_string(&result).unwrap_or_default(),
1293 }],
1294 is_error: false,
1295 },
1296 ttl: 5,
1297 };
1298
1299 let response_data = serde_json::to_vec(&response_message)
1301 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1302
1303 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1304 .await?;
1305 debug!("Sent MCP tool response to peer {}", peer_id);
1306 }
1307 Err(e) => {
1308 let error_message = crate::mcp::P2PMCPMessage {
1310 message_type: crate::mcp::P2PMCPMessageType::Response,
1311 message_id: message.message_id,
1312 source_peer: self.peer_id.clone(),
1313 target_peer: Some(peer_id.clone()),
1314 timestamp: std::time::SystemTime::now()
1315 .duration_since(std::time::UNIX_EPOCH)
1316 .unwrap_or_default()
1317 .as_secs(),
1318 payload: crate::mcp::MCPMessage::CallToolResult {
1319 content: vec![crate::mcp::MCPContent::Text {
1320 text: format!("Error: {e}"),
1321 }],
1322 is_error: true,
1323 },
1324 ttl: 5,
1325 };
1326
1327 let error_data = serde_json::to_vec(&error_message)
1328 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1329
1330 self.send_message(peer_id, MCP_PROTOCOL, error_data).await?;
1331 warn!("Sent MCP error response to peer {}: {}", peer_id, e);
1332 }
1333 }
1334 }
1335 }
1336
1337 Ok(())
1338 }
1339
1340 #[allow(dead_code)]
1342 async fn handle_mcp_tool_response(&self, message: crate::mcp::P2PMCPMessage) -> Result<()> {
1343 if let Some(ref _mcp_server) = self.mcp_server {
1344 debug!("Received MCP tool response: {}", message.message_id);
1346 }
1349
1350 Ok(())
1351 }
1352
1353 #[allow(dead_code)]
1355 async fn handle_mcp_service_advertisement(
1356 &self,
1357 message: crate::mcp::P2PMCPMessage,
1358 peer_id: &PeerId,
1359 ) -> Result<()> {
1360 debug!("Received MCP service advertisement from peer {}", peer_id);
1361
1362 if let Some(ref _mcp_server) = self.mcp_server {
1363 _mcp_server.handle_service_advertisement(message).await?;
1365 debug!("Processed service advertisement from peer {}", peer_id);
1366 } else {
1367 warn!("Received MCP service advertisement but MCP server is not enabled");
1368 }
1369
1370 Ok(())
1371 }
1372
1373 #[allow(dead_code)]
1375 async fn handle_mcp_service_discovery(
1376 &self,
1377 message: crate::mcp::P2PMCPMessage,
1378 peer_id: &PeerId,
1379 ) -> Result<()> {
1380 debug!("Received MCP service discovery query from peer {}", peer_id);
1381
1382 if let Some(ref _mcp_server) = self.mcp_server {
1383 if let Ok(Some(response_data)) = _mcp_server.handle_service_discovery(message).await {
1385 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1387 .await?;
1388 debug!("Sent service discovery response to peer {}", peer_id);
1389 }
1390 } else {
1391 warn!("Received MCP service discovery query but MCP server is not enabled");
1392 }
1393
1394 Ok(())
1395 }
1396
1397 pub async fn run(&self) -> Result<()> {
1399 if !*self.running.read().await {
1400 self.start().await?;
1401 }
1402
1403 info!("P2P node running...");
1404
1405 loop {
1407 if !*self.running.read().await {
1408 break;
1409 }
1410
1411 self.periodic_tasks().await?;
1413
1414 tokio::time::sleep(Duration::from_millis(100)).await;
1416 }
1417
1418 info!("P2P node stopped");
1419 Ok(())
1420 }
1421
1422 pub async fn stop(&self) -> Result<()> {
1424 info!("Stopping P2P node...");
1425
1426 *self.running.write().await = false;
1428
1429 if let Some(ref _mcp_server) = self.mcp_server {
1431 _mcp_server.shutdown().await.map_err(|e| {
1432 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1433 format!("Failed to shutdown MCP server: {e}").into(),
1434 ))
1435 })?;
1436 info!("MCP server stopped");
1437 }
1438
1439 self.disconnect_all_peers().await?;
1441
1442 if let Some(ref resource_manager) = self.resource_manager {
1444 resource_manager.shutdown().await.map_err(|e| {
1445 P2PError::Network(crate::error::NetworkError::ProtocolError(
1446 format!("Failed to shutdown resource manager: {e}").into(),
1447 ))
1448 })?;
1449 info!("Production resource manager stopped");
1450 }
1451
1452 info!("P2P node stopped");
1453 Ok(())
1454 }
1455
1456 pub async fn shutdown(&self) -> Result<()> {
1458 self.stop().await
1459 }
1460
1461 pub async fn is_running(&self) -> bool {
1463 *self.running.read().await
1464 }
1465
1466 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1468 self.listen_addrs.read().await.clone()
1469 }
1470
1471 pub async fn connected_peers(&self) -> Vec<PeerId> {
1473 self.peers.read().await.keys().cloned().collect()
1474 }
1475
1476 pub async fn peer_count(&self) -> usize {
1478 self.peers.read().await.len()
1479 }
1480
1481 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1483 self.peers.read().await.get(peer_id).cloned()
1484 }
1485
1486 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1488 info!("Connecting to peer at: {}", address);
1489
1490 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1492 Some(resource_manager.acquire_connection().await?)
1493 } else {
1494 None
1495 };
1496
1497 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1499 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1500 format!("{}: {}", address, e).into(),
1501 ))
1502 })?;
1503
1504 let peer_id = {
1506 match self
1507 .dual_node
1508 .connect_happy_eyeballs(&[_socket_addr])
1509 .await
1510 .map(|p| crate::transport::ant_quic_adapter::ant_peer_id_to_string(&p))
1511 {
1512 Ok(connected_peer_id) => {
1513 info!("Successfully connected to peer: {}", connected_peer_id);
1514 connected_peer_id
1515 }
1516 Err(e) => {
1517 warn!("Failed to connect to peer at {}: {}", address, e);
1518
1519 let demo_peer_id =
1522 format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1523 warn!(
1524 "Using demo peer ID: {} (transport connection failed)",
1525 demo_peer_id
1526 );
1527 demo_peer_id
1528 }
1529 }
1530 };
1531
1532 let peer_info = PeerInfo {
1534 peer_id: peer_id.clone(),
1535 addresses: vec![address.to_string()],
1536 connected_at: Instant::now(),
1537 last_seen: Instant::now(),
1538 status: ConnectionStatus::Connected,
1539 protocols: vec!["p2p-foundation/1.0".to_string()],
1540 heartbeat_count: 0,
1541 };
1542
1543 self.peers.write().await.insert(peer_id.clone(), peer_info);
1545
1546 if let Some(ref resource_manager) = self.resource_manager {
1548 resource_manager.record_bandwidth(0, 0); }
1550
1551 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1553
1554 info!("Connected to peer: {}", peer_id);
1555 Ok(peer_id)
1556 }
1557
1558 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1560 info!("Disconnecting from peer: {}", peer_id);
1561
1562 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1563 peer_info.status = ConnectionStatus::Disconnected;
1564
1565 let _ = self
1567 .event_tx
1568 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1569
1570 info!("Disconnected from peer: {}", peer_id);
1571 }
1572
1573 Ok(())
1574 }
1575
1576 pub async fn send_message(
1578 &self,
1579 peer_id: &PeerId,
1580 protocol: &str,
1581 data: Vec<u8>,
1582 ) -> Result<()> {
1583 debug!(
1584 "Sending message to peer {} on protocol {}",
1585 peer_id, protocol
1586 );
1587
1588 if let Some(ref resource_manager) = self.resource_manager
1590 && !resource_manager
1591 .check_rate_limit(peer_id, "message")
1592 .await?
1593 {
1594 return Err(P2PError::ResourceExhausted(
1595 format!("Rate limit exceeded for peer {}", peer_id).into(),
1596 ));
1597 }
1598
1599 if !self.peers.read().await.contains_key(peer_id) {
1601 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1602 peer_id.to_string().into(),
1603 )));
1604 }
1605
1606 if protocol == MCP_PROTOCOL {
1608 if data.len() < 4 {
1610 return Err(P2PError::Network(
1611 crate::error::NetworkError::ProtocolError(
1612 "Invalid MCP message: too short".to_string().into(),
1613 ),
1614 ));
1615 }
1616
1617 let message_type = data.first().unwrap_or(&0);
1619 if *message_type > 10 {
1620 return Err(P2PError::Network(
1622 crate::error::NetworkError::ProtocolError(
1623 "Invalid MCP message type".to_string().into(),
1624 ),
1625 ));
1626 }
1627
1628 debug!("Validated MCP message for network transmission");
1629 }
1630
1631 if let Some(ref resource_manager) = self.resource_manager {
1633 resource_manager.record_bandwidth(data.len() as u64, 0);
1634 }
1635
1636 let _message_data = self.create_protocol_message(protocol, data)?;
1638
1639 self.dual_node
1641 .send_to_peer_string(peer_id, &_message_data)
1642 .await
1643 .map_err(|e| {
1644 P2PError::Transport(crate::error::TransportError::StreamError(
1645 e.to_string().into(),
1646 ))
1647 })
1648 }
1649
1650 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1652 use serde_json::json;
1653
1654 let timestamp = std::time::SystemTime::now()
1655 .duration_since(std::time::UNIX_EPOCH)
1656 .map_err(|e| {
1657 P2PError::Network(NetworkError::ProtocolError(
1658 format!("System time error: {}", e).into(),
1659 ))
1660 })?
1661 .as_secs();
1662
1663 let message = json!({
1665 "protocol": protocol,
1666 "data": data,
1667 "from": self.peer_id,
1668 "timestamp": timestamp
1669 });
1670
1671 serde_json::to_vec(&message).map_err(|e| {
1672 P2PError::Transport(crate::error::TransportError::StreamError(
1673 format!("Failed to serialize message: {e}").into(),
1674 ))
1675 })
1676 }
1677
1678 }
1680
1681#[allow(dead_code)]
1683fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1684 use serde_json::json;
1685
1686 let timestamp = std::time::SystemTime::now()
1687 .duration_since(std::time::UNIX_EPOCH)
1688 .map_err(|e| {
1689 P2PError::Network(NetworkError::ProtocolError(
1690 format!("System time error: {}", e).into(),
1691 ))
1692 })?
1693 .as_secs();
1694
1695 let message = json!({
1697 "protocol": protocol,
1698 "data": data,
1699 "timestamp": timestamp
1700 });
1701
1702 serde_json::to_vec(&message).map_err(|e| {
1703 P2PError::Transport(crate::error::TransportError::StreamError(
1704 format!("Failed to serialize message: {e}").into(),
1705 ))
1706 })
1707}
1708
1709impl P2PNode {
1710 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1712 self.event_tx.subscribe()
1713 }
1714
1715 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1717 self.subscribe_events()
1718 }
1719
1720 pub fn uptime(&self) -> Duration {
1722 self.start_time.elapsed()
1723 }
1724
1725 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1727 self.mcp_server.as_ref()
1728 }
1729
1730 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1732 if let Some(ref _mcp_server) = self.mcp_server {
1733 let tool_name = tool.definition.name.clone();
1734 _mcp_server.register_tool(tool).await.map_err(|e| {
1735 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1736 format!("{}: Registration failed: {e}", tool_name).into(),
1737 ))
1738 })
1739 } else {
1740 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1741 "MCP server not enabled".to_string().into(),
1742 )))
1743 }
1744 }
1745
1746 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1748 if let Some(ref _mcp_server) = self.mcp_server {
1749 if let Some(ref resource_manager) = self.resource_manager
1751 && !resource_manager
1752 .check_rate_limit(&self.peer_id, "mcp")
1753 .await?
1754 {
1755 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1756 "MCP rate limit exceeded".to_string().into(),
1757 )));
1758 }
1759
1760 let context = MCPCallContext {
1761 caller_id: self.peer_id.clone(),
1762 timestamp: SystemTime::now(),
1763 timeout: Duration::from_secs(30),
1764 auth_info: None,
1765 metadata: HashMap::new(),
1766 };
1767
1768 _mcp_server
1769 .call_tool(tool_name, arguments, context)
1770 .await
1771 .map_err(|e| {
1772 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1773 format!("{}: Execution failed: {e}", tool_name).into(),
1774 ))
1775 })
1776 } else {
1777 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1778 "MCP server not enabled".to_string().into(),
1779 )))
1780 }
1781 }
1782
1783 pub async fn call_remote_mcp_tool(
1785 &self,
1786 peer_id: &PeerId,
1787 tool_name: &str,
1788 arguments: Value,
1789 ) -> Result<Value> {
1790 if let Some(ref _mcp_server) = self.mcp_server {
1791 if peer_id == &self.peer_id {
1793 let context = MCPCallContext {
1795 caller_id: self.peer_id.clone(),
1796 timestamp: SystemTime::now(),
1797 timeout: Duration::from_secs(30),
1798 auth_info: None,
1799 metadata: HashMap::new(),
1800 };
1801
1802 return _mcp_server.call_tool(tool_name, arguments, context).await;
1804 }
1805
1806 let context = MCPCallContext {
1810 caller_id: self.peer_id.clone(),
1811 timestamp: SystemTime::now(),
1812 timeout: Duration::from_secs(30),
1813 auth_info: None,
1814 metadata: HashMap::new(),
1815 };
1816
1817 match _mcp_server
1819 .call_tool(tool_name, arguments.clone(), context)
1820 .await
1821 {
1822 Ok(mut result) => {
1823 if let Value::Object(ref mut map) = result {
1825 map.insert("tool".to_string(), Value::String(tool_name.to_string()));
1826 }
1827 Ok(result)
1828 }
1829 Err(e) => Err(e),
1830 }
1831 } else {
1832 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1833 "MCP server not enabled".to_string().into(),
1834 )))
1835 }
1836 }
1837
1838 #[allow(dead_code)]
1840 async fn handle_mcp_remote_tool_call(
1841 &self,
1842 peer_id: &PeerId,
1843 tool_name: &str,
1844 arguments: Value,
1845 context: MCPCallContext,
1846 ) -> Result<Value> {
1847 let request_id = uuid::Uuid::new_v4().to_string();
1848
1849 let mcp_message = crate::mcp::MCPMessage::CallTool {
1851 name: tool_name.to_string(),
1852 arguments,
1853 };
1854
1855 let p2p_message = crate::mcp::P2PMCPMessage {
1857 message_type: crate::mcp::P2PMCPMessageType::Request,
1858 message_id: request_id.clone(),
1859 source_peer: context.caller_id.clone(),
1860 target_peer: Some(peer_id.clone()),
1861 timestamp: context
1862 .timestamp
1863 .duration_since(std::time::UNIX_EPOCH)
1864 .map_err(|e| {
1865 P2PError::Network(crate::error::NetworkError::ProtocolError(
1866 format!("Time error: {e}").into(),
1867 ))
1868 })?
1869 .as_secs(),
1870 payload: mcp_message,
1871 ttl: 5, };
1873
1874 let message_data = serde_json::to_vec(&p2p_message)
1876 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1877
1878 if message_data.len() > crate::mcp::MAX_MESSAGE_SIZE {
1879 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1880 "Message too large".to_string().into(),
1881 )));
1882 }
1883
1884 self.send_message(peer_id, MCP_PROTOCOL, message_data)
1886 .await?;
1887
1888 info!(
1890 "MCP remote tool call sent to peer {}, tool: {}",
1891 peer_id, tool_name
1892 );
1893
1894 Ok(serde_json::json!({
1897 "status": "sent",
1898 "message": "Remote tool call sent successfully",
1899 "peer_id": peer_id,
1900 "tool": tool_name, "request_id": request_id
1902 }))
1903 }
1904
1905 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1907 if let Some(ref _mcp_server) = self.mcp_server {
1908 let (tools, _) = _mcp_server.list_tools(None).await.map_err(|e| {
1909 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1910 format!("Failed to list tools: {e}").into(),
1911 ))
1912 })?;
1913
1914 Ok(tools.into_iter().map(|tool| tool.name).collect())
1915 } else {
1916 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1917 "MCP server not enabled".to_string().into(),
1918 )))
1919 }
1920 }
1921
1922 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1924 if let Some(ref _mcp_server) = self.mcp_server {
1925 _mcp_server.discover_remote_services().await.map_err(|e| {
1926 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1927 format!("Failed to discover services: {e}").into(),
1928 ))
1929 })
1930 } else {
1931 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1932 "MCP server not enabled".to_string().into(),
1933 )))
1934 }
1935 }
1936
1937 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1939 if let Some(ref _mcp_server) = self.mcp_server {
1940 if peer_id == &self.peer_id {
1942 return self.list_mcp_tools().await;
1943 }
1944
1945 self.list_mcp_tools().await
1949 } else {
1950 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1951 "MCP server not enabled".to_string().into(),
1952 )))
1953 }
1954 }
1955
1956 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1958 if let Some(ref _mcp_server) = self.mcp_server {
1959 Ok(_mcp_server.get_stats().await)
1960 } else {
1961 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1962 "MCP server not enabled".to_string().into(),
1963 )))
1964 }
1965 }
1966
1967 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1969 if let Some(ref resource_manager) = self.resource_manager {
1970 Ok(resource_manager.get_metrics().await)
1971 } else {
1972 Err(P2PError::Network(
1973 crate::error::NetworkError::ProtocolError(
1974 "Production resource manager not enabled".to_string().into(),
1975 ),
1976 ))
1977 }
1978 }
1979
1980 pub async fn health_check(&self) -> Result<()> {
1982 if let Some(ref resource_manager) = self.resource_manager {
1983 resource_manager.health_check().await
1984 } else {
1985 let peer_count = self.peer_count().await;
1987 if peer_count > self.config.max_connections {
1988 Err(P2PError::Network(
1989 crate::error::NetworkError::ProtocolError(
1990 format!("Too many connections: {peer_count}").into(),
1991 ),
1992 ))
1993 } else {
1994 Ok(())
1995 }
1996 }
1997 }
1998
1999 pub fn production_config(&self) -> Option<&ProductionConfig> {
2001 self.config.production_config.as_ref()
2002 }
2003
2004 pub fn is_production_mode(&self) -> bool {
2006 self.resource_manager.is_some()
2007 }
2008
2009 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2011 self.dht.as_ref()
2012 }
2013
2014 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2016 if let Some(ref dht) = self.dht {
2017 let mut dht_instance = dht.write().await;
2018 let dht_key = crate::dht::DhtKey::from_bytes(key);
2019 dht_instance
2020 .store(&dht_key, value.clone())
2021 .await
2022 .map_err(|e| {
2023 P2PError::Dht(crate::error::DhtError::StoreFailed(
2024 format!("{:?}: {e}", key).into(),
2025 ))
2026 })?;
2027
2028 Ok(())
2029 } else {
2030 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2031 "DHT not enabled".to_string().into(),
2032 )))
2033 }
2034 }
2035
2036 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2038 if let Some(ref dht) = self.dht {
2039 let dht_instance = dht.read().await;
2040 let dht_key = crate::dht::DhtKey::from_bytes(key);
2041 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2042 P2PError::Dht(crate::error::DhtError::StoreFailed(
2043 format!("Retrieve failed: {e}").into(),
2044 ))
2045 })?;
2046
2047 Ok(record_result)
2048 } else {
2049 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2050 "DHT not enabled".to_string().into(),
2051 )))
2052 }
2053 }
2054
2055 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2057 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2058 let mut manager = bootstrap_manager.write().await;
2059 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2060 .iter()
2061 .filter_map(|addr| addr.parse().ok())
2062 .collect();
2063 let contact = ContactEntry::new(peer_id, socket_addresses);
2064 manager.add_contact(contact).await.map_err(|e| {
2065 P2PError::Network(crate::error::NetworkError::ProtocolError(
2066 format!("Failed to add peer to bootstrap cache: {e}").into(),
2067 ))
2068 })?;
2069 }
2070 Ok(())
2071 }
2072
2073 pub async fn update_peer_metrics(
2075 &self,
2076 peer_id: &PeerId,
2077 success: bool,
2078 latency_ms: Option<u64>,
2079 _error: Option<String>,
2080 ) -> Result<()> {
2081 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2082 let mut manager = bootstrap_manager.write().await;
2083
2084 let metrics = QualityMetrics {
2086 success_rate: if success { 1.0 } else { 0.0 },
2087 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2088 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2090 last_successful_connection: if success {
2091 chrono::Utc::now()
2092 } else {
2093 chrono::Utc::now() - chrono::Duration::hours(1)
2094 },
2095 uptime_score: 0.5,
2096 };
2097
2098 manager
2099 .update_contact_metrics(peer_id, metrics)
2100 .await
2101 .map_err(|e| {
2102 P2PError::Network(crate::error::NetworkError::ProtocolError(
2103 format!("Failed to update peer metrics: {e}").into(),
2104 ))
2105 })?;
2106 }
2107 Ok(())
2108 }
2109
2110 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2112 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2113 let manager = bootstrap_manager.read().await;
2114 let stats = manager.get_stats().await.map_err(|e| {
2115 P2PError::Network(crate::error::NetworkError::ProtocolError(
2116 format!("Failed to get bootstrap stats: {e}").into(),
2117 ))
2118 })?;
2119 Ok(Some(stats))
2120 } else {
2121 Ok(None)
2122 }
2123 }
2124
2125 pub async fn cached_peer_count(&self) -> usize {
2127 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2128 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2129 {
2130 return stats.total_contacts;
2131 }
2132 0
2133 }
2134
2135 async fn connect_bootstrap_peers(&self) -> Result<()> {
2137 let mut bootstrap_contacts = Vec::new();
2138 let mut used_cache = false;
2139
2140 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2142 let manager = bootstrap_manager.read().await;
2143 match manager.get_bootstrap_peers(20).await {
2144 Ok(contacts) => {
2146 if !contacts.is_empty() {
2147 info!("Using {} cached bootstrap peers", contacts.len());
2148 bootstrap_contacts = contacts;
2149 used_cache = true;
2150 }
2151 }
2152 Err(e) => {
2153 warn!("Failed to get cached bootstrap peers: {}", e);
2154 }
2155 }
2156 }
2157
2158 if bootstrap_contacts.is_empty() {
2160 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2161 &self.config.bootstrap_peers_str
2162 } else {
2163 &self
2165 .config
2166 .bootstrap_peers
2167 .iter()
2168 .map(|addr| addr.to_string())
2169 .collect::<Vec<_>>()
2170 };
2171
2172 if bootstrap_peers.is_empty() {
2173 info!("No bootstrap peers configured and no cached peers available");
2174 return Ok(());
2175 }
2176
2177 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
2178
2179 for addr in bootstrap_peers {
2180 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2181 let contact = ContactEntry::new(
2182 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
2183 vec![socket_addr],
2184 );
2185 bootstrap_contacts.push(contact);
2186 } else {
2187 warn!("Invalid bootstrap address format: {}", addr);
2188 }
2189 }
2190 }
2191
2192 let mut successful_connections = 0;
2194 for contact in bootstrap_contacts {
2195 for addr in &contact.addresses {
2196 match self.connect_peer(&addr.to_string()).await {
2197 Ok(peer_id) => {
2198 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2199 successful_connections += 1;
2200
2201 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2203 let mut manager = bootstrap_manager.write().await;
2204 let mut updated_contact = contact.clone();
2205 updated_contact.peer_id = peer_id.clone();
2206 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2209 warn!("Failed to update bootstrap cache: {}", e);
2210 }
2211 }
2212 break; }
2214 Err(e) => {
2215 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2216
2217 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2219 let mut manager = bootstrap_manager.write().await;
2220 let mut updated_contact = contact.clone();
2221 updated_contact.update_connection_result(
2222 false,
2223 None,
2224 Some(e.to_string()),
2225 );
2226
2227 if let Err(e) = manager.add_contact(updated_contact).await {
2228 warn!("Failed to update bootstrap cache: {}", e);
2229 }
2230 }
2231 }
2232 }
2233 }
2234 }
2235
2236 if successful_connections == 0 {
2237 if !used_cache {
2238 warn!("Failed to connect to any bootstrap peers");
2239 }
2240 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2241 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2243 }));
2244 }
2245 info!(
2246 "Successfully connected to {} bootstrap peers",
2247 successful_connections
2248 );
2249
2250 Ok(())
2251 }
2252
2253 async fn disconnect_all_peers(&self) -> Result<()> {
2255 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2256
2257 for peer_id in peer_ids {
2258 self.disconnect_peer(&peer_id).await?;
2259 }
2260
2261 Ok(())
2262 }
2263
2264 async fn periodic_tasks(&self) -> Result<()> {
2266 Ok(())
2272 }
2273
2274 pub async fn discover_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2276 if let Some(ref _mcp_server) = self.mcp_server {
2277 _mcp_server.discover_remote_services().await
2278 } else {
2279 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2280 "MCP server not enabled".to_string().into(),
2281 )))
2282 }
2283 }
2284
2285 pub async fn get_all_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2287 if let Some(ref _mcp_server) = self.mcp_server {
2288 _mcp_server.get_all_services().await
2289 } else {
2290 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2291 "MCP server not enabled".to_string().into(),
2292 )))
2293 }
2294 }
2295
2296 pub async fn find_mcp_services_with_tool(
2298 &self,
2299 tool_name: &str,
2300 ) -> Result<Vec<crate::mcp::MCPService>> {
2301 if let Some(ref _mcp_server) = self.mcp_server {
2302 _mcp_server.find_services_with_tool(tool_name).await
2303 } else {
2304 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2305 "MCP server not enabled".to_string().into(),
2306 )))
2307 }
2308 }
2309
2310 pub async fn announce_mcp_services(&self) -> Result<()> {
2312 if let Some(ref _mcp_server) = self.mcp_server {
2313 _mcp_server.announce_local_services().await
2314 } else {
2315 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2316 "MCP server not enabled".to_string().into(),
2317 )))
2318 }
2319 }
2320
2321 pub async fn refresh_mcp_service_discovery(&self) -> Result<()> {
2323 if let Some(ref _mcp_server) = self.mcp_server {
2324 _mcp_server.refresh_service_discovery().await
2325 } else {
2326 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2327 "MCP server not enabled".to_string().into(),
2328 )))
2329 }
2330 }
2331
2332 pub async fn query_peer_mcp_services(&self, peer_id: &PeerId) -> Result<()> {
2334 if self.mcp_server.is_none() {
2335 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2336 "MCP server not enabled".to_string().into(),
2337 )));
2338 }
2339
2340 let discovery_query = crate::mcp::P2PMCPMessage {
2341 message_type: crate::mcp::P2PMCPMessageType::ServiceDiscovery,
2342 message_id: uuid::Uuid::new_v4().to_string(),
2343 source_peer: self.peer_id.clone(),
2344 target_peer: Some(peer_id.clone()),
2345 timestamp: std::time::SystemTime::now()
2346 .duration_since(std::time::UNIX_EPOCH)
2347 .unwrap_or_default()
2348 .as_secs(),
2349 payload: crate::mcp::MCPMessage::ListTools { cursor: None },
2350 ttl: 3,
2351 };
2352
2353 let query_data = serde_json::to_vec(&discovery_query)
2354 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2355
2356 self.send_message(peer_id, MCP_PROTOCOL, query_data).await?;
2357 debug!("Sent MCP service discovery query to peer {}", peer_id);
2358
2359 Ok(())
2360 }
2361
2362 pub async fn broadcast_mcp_service_discovery(&self) -> Result<()> {
2364 if self.mcp_server.is_none() {
2365 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2366 "MCP server not enabled".to_string().into(),
2367 )));
2368 }
2369
2370 let peer_list: Vec<PeerId> = {
2372 let peers_guard = self.peers.read().await;
2373 peers_guard.keys().cloned().collect()
2374 };
2375
2376 if peer_list.is_empty() {
2377 debug!("No peers connected for MCP service discovery broadcast");
2378 return Ok(());
2379 }
2380
2381 let mut successful_queries = 0;
2383 for peer_id in &peer_list {
2384 match self.query_peer_mcp_services(peer_id).await {
2385 Ok(_) => {
2386 successful_queries += 1;
2387 debug!("Sent MCP service discovery query to peer: {}", peer_id);
2388 }
2389 Err(e) => {
2390 warn!(
2391 "Failed to send MCP service discovery query to peer {}: {}",
2392 peer_id, e
2393 );
2394 }
2395 }
2396 }
2397
2398 info!(
2399 "Broadcast MCP service discovery to {}/{} connected peers",
2400 successful_queries,
2401 peer_list.len()
2402 );
2403
2404 Ok(())
2405 }
2406}
2407
2408#[derive(Clone)]
2410pub struct P2PNetworkSender {
2411 peer_id: PeerId,
2412 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2414}
2415
2416impl P2PNetworkSender {
2417 pub fn new(
2418 peer_id: PeerId,
2419 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2420 ) -> Self {
2421 Self { peer_id, send_tx }
2422 }
2423}
2424
2425#[async_trait::async_trait]
2427impl NetworkSender for P2PNetworkSender {
2428 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2430 self.send_tx
2431 .send((peer_id.clone(), protocol.to_string(), data))
2432 .map_err(|_| {
2433 P2PError::Network(crate::error::NetworkError::ProtocolError(
2434 "Failed to send message via channel".to_string().into(),
2435 ))
2436 })?;
2437 Ok(())
2438 }
2439
2440 fn local_peer_id(&self) -> &PeerId {
2442 &self.peer_id
2443 }
2444}
2445
2446pub struct NodeBuilder {
2448 config: NodeConfig,
2449}
2450
2451impl Default for NodeBuilder {
2452 fn default() -> Self {
2453 Self::new()
2454 }
2455}
2456
2457impl NodeBuilder {
2458 pub fn new() -> Self {
2460 Self {
2461 config: NodeConfig::default(),
2462 }
2463 }
2464
2465 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2467 self.config.peer_id = Some(peer_id);
2468 self
2469 }
2470
2471 pub fn listen_on(mut self, addr: &str) -> Self {
2473 if let Ok(multiaddr) = addr.parse() {
2474 self.config.listen_addrs.push(multiaddr);
2475 }
2476 self
2477 }
2478
2479 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2481 if let Ok(multiaddr) = addr.parse() {
2482 self.config.bootstrap_peers.push(multiaddr);
2483 }
2484 self.config.bootstrap_peers_str.push(addr.to_string());
2485 self
2486 }
2487
2488 pub fn with_ipv6(mut self, enable: bool) -> Self {
2490 self.config.enable_ipv6 = enable;
2491 self
2492 }
2493
2494 pub fn with_mcp_server(mut self) -> Self {
2496 self.config.enable_mcp_server = true;
2497 self
2498 }
2499
2500 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
2502 self.config.mcp_server_config = Some(mcp_config);
2503 self.config.enable_mcp_server = true;
2504 self
2505 }
2506
2507 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2509 self.config.connection_timeout = timeout;
2510 self
2511 }
2512
2513 pub fn with_max_connections(mut self, max: usize) -> Self {
2515 self.config.max_connections = max;
2516 self
2517 }
2518
2519 pub fn with_production_mode(mut self) -> Self {
2521 self.config.production_config = Some(ProductionConfig::default());
2522 self
2523 }
2524
2525 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2527 self.config.production_config = Some(production_config);
2528 self
2529 }
2530
2531 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2533 self.config.dht_config = dht_config;
2534 self
2535 }
2536
2537 pub fn with_default_dht(mut self) -> Self {
2539 self.config.dht_config = DHTConfig::default();
2540 self
2541 }
2542
2543 pub async fn build(self) -> Result<P2PNode> {
2545 P2PNode::new(self.config).await
2546 }
2547}
2548
2549#[allow(dead_code)] async fn handle_received_message_standalone(
2552 message_data: Vec<u8>,
2553 peer_id: &PeerId,
2554 protocol: &str,
2555 event_tx: &broadcast::Sender<P2PEvent>,
2556 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2557) -> Result<()> {
2558 if protocol == MCP_PROTOCOL {
2560 return handle_mcp_message_standalone(message_data, peer_id, mcp_server).await;
2561 }
2562
2563 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2565 Ok(message) => {
2566 if let (Some(protocol), Some(data), Some(from)) = (
2567 message.get("protocol").and_then(|v| v.as_str()),
2568 message.get("data").and_then(|v| v.as_array()),
2569 message.get("from").and_then(|v| v.as_str()),
2570 ) {
2571 let data_bytes: Vec<u8> = data
2573 .iter()
2574 .filter_map(|v| v.as_u64().map(|n| n as u8))
2575 .collect();
2576
2577 let event = P2PEvent::Message {
2579 topic: protocol.to_string(),
2580 source: from.to_string(),
2581 data: data_bytes,
2582 };
2583
2584 let _ = event_tx.send(event);
2585 debug!("Generated message event from peer: {}", peer_id);
2586 }
2587 }
2588 Err(e) => {
2589 warn!("Failed to parse received message from {}: {}", peer_id, e);
2590 }
2591 }
2592
2593 Ok(())
2594}
2595
2596#[allow(dead_code)] async fn handle_mcp_message_standalone(
2599 message_data: Vec<u8>,
2600 peer_id: &PeerId,
2601 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2602) -> Result<()> {
2603 if let Some(_mcp_server) = mcp_server {
2604 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
2606 Ok(p2p_mcp_message) => {
2607 use crate::mcp::P2PMCPMessageType;
2609 match p2p_mcp_message.message_type {
2610 P2PMCPMessageType::Request => {
2611 debug!("Received MCP request from peer {}", peer_id);
2612 }
2615 P2PMCPMessageType::Response => {
2616 debug!("Received MCP response from peer {}", peer_id);
2617 }
2619 P2PMCPMessageType::ServiceAdvertisement => {
2620 debug!("Received service advertisement from peer {}", peer_id);
2621 }
2623 P2PMCPMessageType::ServiceDiscovery => {
2624 debug!("Received service discovery query from peer {}", peer_id);
2625 }
2627 P2PMCPMessageType::Heartbeat => {
2628 debug!("Received heartbeat from peer {}", peer_id);
2629 }
2631 P2PMCPMessageType::HealthCheck => {
2632 debug!("Received health check from peer {}", peer_id);
2633 }
2635 }
2636 }
2637 Err(e) => {
2638 warn!(
2639 "Failed to deserialize MCP message from peer {}: {}",
2640 peer_id, e
2641 );
2642 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
2643 format!("Invalid MCP message: {e}").into(),
2644 )));
2645 }
2646 }
2647 } else {
2648 warn!("Received MCP message but MCP server is not enabled");
2649 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2650 "MCP server not enabled".to_string().into(),
2651 )));
2652 }
2653
2654 Ok(())
2655}
2656
2657fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2659 match create_protocol_message_static(protocol, data) {
2660 Ok(msg) => Some(msg),
2661 Err(e) => {
2662 warn!("Failed to create protocol message: {}", e);
2663 None
2664 }
2665 }
2666}
2667
2668async fn handle_message_send_result(result: Result<()>, peer_id: &PeerId) {
2670 match result {
2671 Ok(_) => {
2672 debug!("Message sent to peer {} via transport layer", peer_id);
2673 }
2674 Err(e) => {
2675 warn!("Failed to send message to peer {}: {}", peer_id, e);
2676 }
2677 }
2678}
2679
2680#[allow(dead_code)] fn check_rate_limit(
2683 rate_limiter: &RateLimiter,
2684 socket_addr: &std::net::SocketAddr,
2685 remote_addr: &NetworkAddress,
2686) -> Result<()> {
2687 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2688 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2689 e
2690 })
2691}
2692
2693#[allow(dead_code)] async fn register_new_peer(
2696 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2697 peer_id: &PeerId,
2698 remote_addr: &NetworkAddress,
2699) {
2700 let mut peers_guard = peers.write().await;
2701 let peer_info = PeerInfo {
2702 peer_id: peer_id.clone(),
2703 addresses: vec![remote_addr.to_string()],
2704 connected_at: tokio::time::Instant::now(),
2705 last_seen: tokio::time::Instant::now(),
2706 status: ConnectionStatus::Connected,
2707 protocols: vec!["p2p-chat/1.0.0".to_string()],
2708 heartbeat_count: 0,
2709 };
2710 peers_guard.insert(peer_id.clone(), peer_info);
2711}
2712
2713#[allow(dead_code)] fn spawn_connection_handler(
2716 connection: Box<dyn crate::transport::Connection>,
2717 peer_id: PeerId,
2718 event_tx: broadcast::Sender<P2PEvent>,
2719 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2720 mcp_server: Option<Arc<MCPServer>>,
2721) {
2722 tokio::spawn(async move {
2723 handle_peer_connection(connection, peer_id, event_tx, peers, mcp_server).await;
2724 });
2725}
2726
2727#[allow(dead_code)] async fn handle_peer_connection(
2730 mut connection: Box<dyn crate::transport::Connection>,
2731 peer_id: PeerId,
2732 event_tx: broadcast::Sender<P2PEvent>,
2733 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2734 mcp_server: Option<Arc<MCPServer>>,
2735) {
2736 loop {
2737 match connection.receive().await {
2738 Ok(message_data) => {
2739 debug!(
2740 "Received {} bytes from peer: {}",
2741 message_data.len(),
2742 peer_id
2743 );
2744
2745 if let Err(e) = handle_received_message_standalone(
2747 message_data,
2748 &peer_id,
2749 "unknown", &event_tx,
2751 &mcp_server,
2752 )
2753 .await
2754 {
2755 warn!("Failed to handle message from {}: {}", peer_id, e);
2756 }
2757 }
2758 Err(e) => {
2759 warn!("Failed to receive message from {}: {}", peer_id, e);
2760
2761 if !connection.is_alive().await {
2763 info!("Connection to {} is dead, removing peer", peer_id);
2764
2765 remove_peer(&peers, &peer_id).await;
2767
2768 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2770
2771 break; }
2773
2774 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2776 }
2777 }
2778 }
2779}
2780
2781#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2784 let mut peers_guard = peers.write().await;
2785 peers_guard.remove(peer_id);
2786}
2787
2788async fn update_peer_heartbeat(
2790 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2791 peer_id: &PeerId,
2792) -> Result<()> {
2793 let mut peers_guard = peers.write().await;
2794 match peers_guard.get_mut(peer_id) {
2795 Some(peer_info) => {
2796 peer_info.last_seen = Instant::now();
2797 peer_info.heartbeat_count += 1;
2798 Ok(())
2799 }
2800 None => {
2801 warn!("Received heartbeat from unknown peer: {}", peer_id);
2802 Err(P2PError::Network(NetworkError::PeerNotFound(
2803 format!("Peer {} not found", peer_id).into(),
2804 )))
2805 }
2806 }
2807}
2808
2809async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f32) {
2811 if let Some(manager) = resource_manager {
2812 let metrics = manager.get_metrics().await;
2813 (metrics.memory_used, metrics.cpu_usage as f32)
2814 } else {
2815 (0, 0.0)
2816 }
2817}
2818
2819#[cfg(test)]
2820mod tests {
2821 use super::*;
2822 use crate::mcp::{
2823 MCPTool, Tool, ToolHandler, ToolHealthStatus, ToolMetadata, ToolRequirements,
2824 };
2825 use serde_json::json;
2826 use std::future::Future;
2827 use std::pin::Pin;
2828 use std::time::Duration;
2829 use tokio::time::timeout;
2830
2831 struct NetworkTestTool {
2833 name: String,
2834 }
2835
2836 impl NetworkTestTool {
2837 fn new(name: &str) -> Self {
2838 Self {
2839 name: name.to_string(),
2840 }
2841 }
2842 }
2843
2844 impl ToolHandler for NetworkTestTool {
2845 fn execute(
2846 &self,
2847 arguments: serde_json::Value,
2848 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
2849 let name = self.name.clone();
2850 Box::pin(async move {
2851 Ok(json!({
2852 "tool": name,
2853 "input": arguments,
2854 "result": "network test success"
2855 }))
2856 })
2857 }
2858
2859 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
2860 Ok(())
2861 }
2862
2863 fn get_requirements(&self) -> ToolRequirements {
2864 ToolRequirements::default()
2865 }
2866 }
2867
2868 fn create_test_node_config() -> NodeConfig {
2870 NodeConfig {
2871 peer_id: Some("test_peer_123".to_string()),
2872 listen_addrs: vec![
2873 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2874 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2875 ],
2876 listen_addr: std::net::SocketAddr::new(
2877 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2878 0,
2879 ),
2880 bootstrap_peers: vec![],
2881 bootstrap_peers_str: vec![],
2882 enable_ipv6: true,
2883 enable_mcp_server: true,
2884 mcp_server_config: Some(MCPServerConfig {
2885 enable_auth: false, enable_rate_limiting: false, ..Default::default()
2888 }),
2889 connection_timeout: Duration::from_secs(10),
2890 keep_alive_interval: Duration::from_secs(30),
2891 max_connections: 100,
2892 max_incoming_connections: 50,
2893 dht_config: DHTConfig::default(),
2894 security_config: SecurityConfig::default(),
2895 production_config: None,
2896 bootstrap_cache_config: None,
2897 identity_config: None,
2898 }
2899 }
2900
2901 fn create_test_tool(name: &str) -> Tool {
2903 Tool {
2904 definition: MCPTool {
2905 name: name.to_string(),
2906 description: format!("Test tool: {}", name).into(),
2907 input_schema: json!({
2908 "type": "object",
2909 "properties": {
2910 "input": { "type": "string" }
2911 }
2912 }),
2913 },
2914 handler: Box::new(NetworkTestTool::new(name)),
2915 metadata: ToolMetadata {
2916 created_at: SystemTime::now(),
2917 last_called: None,
2918 call_count: 0,
2919 avg_execution_time: Duration::from_millis(0),
2920 health_status: ToolHealthStatus::Healthy,
2921 tags: vec!["test".to_string()],
2922 },
2923 }
2924 }
2925
2926 #[tokio::test]
2927 async fn test_node_config_default() {
2928 let config = NodeConfig::default();
2929
2930 assert!(config.peer_id.is_none());
2931 assert_eq!(config.listen_addrs.len(), 2);
2932 assert!(config.enable_ipv6);
2933 assert!(config.enable_mcp_server);
2934 assert_eq!(config.max_connections, 1000);
2935 assert_eq!(config.max_incoming_connections, 100);
2936 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2937 }
2938
2939 #[tokio::test]
2940 async fn test_dht_config_default() {
2941 let config = DHTConfig::default();
2942
2943 assert_eq!(config.k_value, 20);
2944 assert_eq!(config.alpha_value, 5);
2945 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2946 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2947 }
2948
2949 #[tokio::test]
2950 async fn test_security_config_default() {
2951 let config = SecurityConfig::default();
2952
2953 assert!(config.enable_noise);
2954 assert!(config.enable_tls);
2955 assert_eq!(config.trust_level, TrustLevel::Basic);
2956 }
2957
2958 #[test]
2959 fn test_trust_level_variants() {
2960 let _none = TrustLevel::None;
2962 let _basic = TrustLevel::Basic;
2963 let _full = TrustLevel::Full;
2964
2965 assert_eq!(TrustLevel::None, TrustLevel::None);
2967 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2968 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2969 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2970 }
2971
2972 #[test]
2973 fn test_connection_status_variants() {
2974 let connecting = ConnectionStatus::Connecting;
2975 let connected = ConnectionStatus::Connected;
2976 let disconnecting = ConnectionStatus::Disconnecting;
2977 let disconnected = ConnectionStatus::Disconnected;
2978 let failed = ConnectionStatus::Failed("test error".to_string());
2979
2980 assert_eq!(connecting, ConnectionStatus::Connecting);
2981 assert_eq!(connected, ConnectionStatus::Connected);
2982 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2983 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2984 assert_ne!(connecting, connected);
2985
2986 if let ConnectionStatus::Failed(msg) = failed {
2987 assert_eq!(msg, "test error");
2988 } else {
2989 panic!("Expected Failed status");
2990 }
2991 }
2992
2993 #[tokio::test]
2994 async fn test_node_creation() -> Result<()> {
2995 let config = create_test_node_config();
2996 let node = P2PNode::new(config).await?;
2997
2998 assert_eq!(node.peer_id(), "test_peer_123");
2999 assert!(!node.is_running().await);
3000 assert_eq!(node.peer_count().await, 0);
3001 assert!(node.connected_peers().await.is_empty());
3002
3003 Ok(())
3004 }
3005
3006 #[tokio::test]
3007 async fn test_node_creation_without_peer_id() -> Result<()> {
3008 let mut config = create_test_node_config();
3009 config.peer_id = None;
3010
3011 let node = P2PNode::new(config).await?;
3012
3013 assert!(node.peer_id().starts_with("peer_"));
3015 assert!(!node.is_running().await);
3016
3017 Ok(())
3018 }
3019
3020 #[tokio::test]
3021 async fn test_node_lifecycle() -> Result<()> {
3022 let config = create_test_node_config();
3023 let node = P2PNode::new(config).await?;
3024
3025 assert!(!node.is_running().await);
3027
3028 node.start().await?;
3030 assert!(node.is_running().await);
3031
3032 let listen_addrs = node.listen_addrs().await;
3034 assert!(
3035 !listen_addrs.is_empty(),
3036 "Expected at least one listening address"
3037 );
3038
3039 node.stop().await?;
3041 assert!(!node.is_running().await);
3042
3043 Ok(())
3044 }
3045
3046 #[tokio::test]
3047 async fn test_peer_connection() -> Result<()> {
3048 let config = create_test_node_config();
3049 let node = P2PNode::new(config).await?;
3050
3051 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3052
3053 let peer_id = node.connect_peer(&peer_addr).await?;
3055 assert!(peer_id.starts_with("peer_from_"));
3056
3057 assert_eq!(node.peer_count().await, 1);
3059
3060 let connected_peers = node.connected_peers().await;
3062 assert_eq!(connected_peers.len(), 1);
3063 assert_eq!(connected_peers[0], peer_id);
3064
3065 let peer_info = node.peer_info(&peer_id).await;
3067 assert!(peer_info.is_some());
3068 let info = peer_info.expect("Peer info should exist after adding peer");
3069 assert_eq!(info.peer_id, peer_id);
3070 assert_eq!(info.status, ConnectionStatus::Connected);
3071 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3072
3073 node.disconnect_peer(&peer_id).await?;
3075 assert_eq!(node.peer_count().await, 0);
3076
3077 Ok(())
3078 }
3079
3080 #[tokio::test]
3081 async fn test_event_subscription() -> Result<()> {
3082 let config = create_test_node_config();
3083 let node = P2PNode::new(config).await?;
3084
3085 let mut events = node.subscribe_events();
3086 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3087
3088 let peer_id = node.connect_peer(&peer_addr).await?;
3090
3091 let event = timeout(Duration::from_millis(100), events.recv()).await;
3093 assert!(event.is_ok());
3094
3095 let event_result = event
3096 .expect("Should receive event")
3097 .expect("Event should not be error");
3098 match event_result {
3099 P2PEvent::PeerConnected(event_peer_id) => {
3100 assert_eq!(event_peer_id, peer_id);
3101 }
3102 _ => panic!("Expected PeerConnected event"),
3103 }
3104
3105 node.disconnect_peer(&peer_id).await?;
3107
3108 let event = timeout(Duration::from_millis(100), events.recv()).await;
3110 assert!(event.is_ok());
3111
3112 let event_result = event
3113 .expect("Should receive event")
3114 .expect("Event should not be error");
3115 match event_result {
3116 P2PEvent::PeerDisconnected(event_peer_id) => {
3117 assert_eq!(event_peer_id, peer_id);
3118 }
3119 _ => panic!("Expected PeerDisconnected event"),
3120 }
3121
3122 Ok(())
3123 }
3124
3125 #[tokio::test]
3126 async fn test_message_sending() -> Result<()> {
3127 let mut config1 = create_test_node_config();
3129 config1.listen_addr =
3130 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3131 let node1 = P2PNode::new(config1).await?;
3132 node1.start().await?;
3133
3134 let mut config2 = create_test_node_config();
3135 config2.listen_addr =
3136 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3137 let node2 = P2PNode::new(config2).await?;
3138 node2.start().await?;
3139
3140 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3142
3143 let node2_addr = node2.local_addr().ok_or_else(|| {
3145 P2PError::Network(crate::error::NetworkError::ProtocolError(
3146 "No listening address".to_string().into(),
3147 ))
3148 })?;
3149
3150 let peer_id = node1.connect_peer(&node2_addr).await?;
3152
3153 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3155
3156 let message_data = b"Hello, peer!".to_vec();
3158 let result = node1
3159 .send_message(&peer_id, "test-protocol", message_data)
3160 .await;
3161 if let Err(e) = &result {
3164 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3165 }
3166
3167 let non_existent_peer = "non_existent_peer".to_string();
3169 let result = node1
3170 .send_message(&non_existent_peer, "test-protocol", vec![])
3171 .await;
3172 assert!(result.is_err());
3173 assert!(result.unwrap_err().to_string().contains("not connected"));
3174
3175 Ok(())
3176 }
3177
3178 #[tokio::test]
3179 async fn test_mcp_integration() -> Result<()> {
3180 let config = create_test_node_config();
3181 let node = P2PNode::new(config).await?;
3182
3183 node.start().await?;
3185
3186 let tool = create_test_tool("network_test_tool");
3188 node.register_mcp_tool(tool).await?;
3189
3190 let tools = node.list_mcp_tools().await?;
3192 assert!(tools.contains(&"network_test_tool".to_string()));
3193
3194 let arguments = json!({"input": "test_input"});
3196 let result = node
3197 .call_mcp_tool("network_test_tool", arguments.clone())
3198 .await?;
3199 assert_eq!(result["tool"], "network_test_tool");
3200 assert_eq!(result["input"], arguments);
3201
3202 let stats = node.mcp_stats().await?;
3204 assert_eq!(stats.total_tools, 1);
3205
3206 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
3208 assert!(result.is_err());
3209
3210 node.stop().await?;
3211 Ok(())
3212 }
3213
3214 #[tokio::test]
3215 async fn test_remote_mcp_operations() -> Result<()> {
3216 let config = create_test_node_config();
3217 let node = P2PNode::new(config).await?;
3218
3219 node.start().await?;
3220
3221 let tool = create_test_tool("remote_test_tool");
3223 node.register_mcp_tool(tool).await?;
3224
3225 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
3226 let peer_id = node.connect_peer(&peer_addr).await?;
3227
3228 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
3230 assert!(!remote_tools.is_empty());
3231
3232 let arguments = json!({"input": "remote_test"});
3234 let result = node
3235 .call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone())
3236 .await?;
3237 assert_eq!(result["tool"], "remote_test_tool");
3238
3239 let services = node.discover_remote_mcp_services().await?;
3241 assert!(services.is_empty());
3243
3244 node.stop().await?;
3245 Ok(())
3246 }
3247
3248 #[tokio::test]
3249 async fn test_health_check() -> Result<()> {
3250 let config = create_test_node_config();
3251 let node = P2PNode::new(config).await?;
3252
3253 let result = node.health_check().await;
3255 assert!(result.is_ok());
3256
3257 for i in 0..5 {
3259 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
3260 node.connect_peer(&addr).await?;
3261 }
3262
3263 let result = node.health_check().await;
3265 assert!(result.is_ok());
3266
3267 Ok(())
3268 }
3269
3270 #[tokio::test]
3271 async fn test_node_uptime() -> Result<()> {
3272 let config = create_test_node_config();
3273 let node = P2PNode::new(config).await?;
3274
3275 let uptime1 = node.uptime();
3276 assert!(uptime1 >= Duration::from_secs(0));
3277
3278 tokio::time::sleep(Duration::from_millis(10)).await;
3280
3281 let uptime2 = node.uptime();
3282 assert!(uptime2 > uptime1);
3283
3284 Ok(())
3285 }
3286
3287 #[tokio::test]
3288 async fn test_node_config_access() -> Result<()> {
3289 let config = create_test_node_config();
3290 let expected_peer_id = config.peer_id.clone();
3291 let node = P2PNode::new(config).await?;
3292
3293 let node_config = node.config();
3294 assert_eq!(node_config.peer_id, expected_peer_id);
3295 assert_eq!(node_config.max_connections, 100);
3296 assert!(node_config.enable_mcp_server);
3297
3298 Ok(())
3299 }
3300
3301 #[tokio::test]
3302 async fn test_mcp_server_access() -> Result<()> {
3303 let config = create_test_node_config();
3304 let node = P2PNode::new(config).await?;
3305
3306 assert!(node.mcp_server().is_some());
3308
3309 let mut config = create_test_node_config();
3311 config.enable_mcp_server = false;
3312 let node_no_mcp = P2PNode::new(config).await?;
3313 assert!(node_no_mcp.mcp_server().is_none());
3314
3315 Ok(())
3316 }
3317
3318 #[tokio::test]
3319 async fn test_dht_access() -> Result<()> {
3320 let config = create_test_node_config();
3321 let node = P2PNode::new(config).await?;
3322
3323 assert!(node.dht().is_some());
3325
3326 Ok(())
3327 }
3328
3329 #[tokio::test]
3330 async fn test_node_builder() -> Result<()> {
3331 let node = P2PNode::builder()
3332 .with_peer_id("builder_test_peer".to_string())
3333 .listen_on("/ip4/127.0.0.1/tcp/9100")
3334 .listen_on("/ip6/::1/tcp/9100")
3335 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
3336 .with_ipv6(true)
3337 .with_mcp_server()
3338 .with_connection_timeout(Duration::from_secs(15))
3339 .with_max_connections(200)
3340 .build()
3341 .await?;
3342
3343 assert_eq!(node.peer_id(), "builder_test_peer");
3344 let config = node.config();
3345 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
3347 assert!(config.enable_ipv6);
3348 assert!(config.enable_mcp_server);
3349 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3350 assert_eq!(config.max_connections, 200);
3351
3352 Ok(())
3353 }
3354
3355 #[tokio::test]
3356 async fn test_node_builder_with_mcp_config() -> Result<()> {
3357 let mcp_config = MCPServerConfig {
3358 server_name: "test_mcp_server".to_string(),
3359 server_version: "1.0.0".to_string(),
3360 enable_dht_discovery: false,
3361 enable_auth: false,
3362 ..MCPServerConfig::default()
3363 };
3364
3365 let node = P2PNode::builder()
3366 .with_peer_id("mcp_config_test".to_string())
3367 .with_mcp_config(mcp_config.clone())
3368 .build()
3369 .await?;
3370
3371 assert_eq!(node.peer_id(), "mcp_config_test");
3372 let config = node.config();
3373 assert!(config.enable_mcp_server);
3374 assert!(config.mcp_server_config.is_some());
3375
3376 let node_mcp_config = config
3377 .mcp_server_config
3378 .as_ref()
3379 .expect("MCP server config should be present in test config");
3380 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
3381 assert!(!node_mcp_config.enable_auth);
3382
3383 Ok(())
3384 }
3385
3386 #[tokio::test]
3387 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
3388 let mut config = create_test_node_config();
3389 config.enable_mcp_server = false;
3390 let node = P2PNode::new(config).await?;
3391
3392 let tool = create_test_tool("test_tool");
3394 let result = node.register_mcp_tool(tool).await;
3395 assert!(result.is_err());
3396 assert!(
3397 result
3398 .unwrap_err()
3399 .to_string()
3400 .contains("MCP server not enabled")
3401 );
3402
3403 let result = node.call_mcp_tool("test_tool", json!({})).await;
3404 assert!(result.is_err());
3405 assert!(
3406 result
3407 .unwrap_err()
3408 .to_string()
3409 .contains("MCP server not enabled")
3410 );
3411
3412 let result = node.list_mcp_tools().await;
3413 assert!(result.is_err());
3414 assert!(
3415 result
3416 .unwrap_err()
3417 .to_string()
3418 .contains("MCP server not enabled")
3419 );
3420
3421 let result = node.mcp_stats().await;
3422 assert!(result.is_err());
3423 assert!(
3424 result
3425 .unwrap_err()
3426 .to_string()
3427 .contains("MCP server not enabled")
3428 );
3429
3430 Ok(())
3431 }
3432
3433 #[tokio::test]
3434 async fn test_bootstrap_peers() -> Result<()> {
3435 let mut config = create_test_node_config();
3436 config.bootstrap_peers = vec![
3437 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3438 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3439 ];
3440
3441 let node = P2PNode::new(config).await?;
3442
3443 node.start().await?;
3445
3446 let peer_count = node.peer_count().await;
3449 assert!(
3450 peer_count <= 2,
3451 "Peer count should not exceed bootstrap peer count"
3452 );
3453
3454 node.stop().await?;
3455 Ok(())
3456 }
3457
3458 #[tokio::test]
3459 async fn test_production_mode_disabled() -> Result<()> {
3460 let config = create_test_node_config();
3461 let node = P2PNode::new(config).await?;
3462
3463 assert!(!node.is_production_mode());
3464 assert!(node.production_config().is_none());
3465
3466 let result = node.resource_metrics().await;
3468 assert!(result.is_err());
3469 assert!(result.unwrap_err().to_string().contains("not enabled"));
3470
3471 Ok(())
3472 }
3473
3474 #[tokio::test]
3475 async fn test_network_event_variants() {
3476 let peer_id = "test_peer".to_string();
3478 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3479
3480 let _peer_connected = NetworkEvent::PeerConnected {
3481 peer_id: peer_id.clone(),
3482 addresses: vec![address.clone()],
3483 };
3484
3485 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3486 peer_id: peer_id.clone(),
3487 reason: "test disconnect".to_string(),
3488 };
3489
3490 let _message_received = NetworkEvent::MessageReceived {
3491 peer_id: peer_id.clone(),
3492 protocol: "test-protocol".to_string(),
3493 data: vec![1, 2, 3],
3494 };
3495
3496 let _connection_failed = NetworkEvent::ConnectionFailed {
3497 peer_id: Some(peer_id.clone()),
3498 address: address.clone(),
3499 error: "connection refused".to_string(),
3500 };
3501
3502 let _dht_stored = NetworkEvent::DHTRecordStored {
3503 key: vec![1, 2, 3],
3504 value: vec![4, 5, 6],
3505 };
3506
3507 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3508 key: vec![1, 2, 3],
3509 value: Some(vec![4, 5, 6]),
3510 };
3511 }
3512
3513 #[tokio::test]
3514 async fn test_peer_info_structure() {
3515 let peer_info = PeerInfo {
3516 peer_id: "test_peer".to_string(),
3517 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3518 connected_at: Instant::now(),
3519 last_seen: Instant::now(),
3520 status: ConnectionStatus::Connected,
3521 protocols: vec!["test-protocol".to_string()],
3522 heartbeat_count: 0,
3523 };
3524
3525 assert_eq!(peer_info.peer_id, "test_peer");
3526 assert_eq!(peer_info.addresses.len(), 1);
3527 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3528 assert_eq!(peer_info.protocols.len(), 1);
3529 }
3530
3531 #[tokio::test]
3532 async fn test_serialization() -> Result<()> {
3533 let config = create_test_node_config();
3535 let serialized = serde_json::to_string(&config)?;
3536 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3537
3538 assert_eq!(config.peer_id, deserialized.peer_id);
3539 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3540 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3541
3542 Ok(())
3543 }
3544}