1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23use crate::identity::manager::IdentityManagerConfig;
24use crate::mcp::{
25 HealthMonitorConfig, MCP_PROTOCOL, MCPCallContext, MCPServer, MCPServerConfig, NetworkSender,
26 Tool,
27};
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::P2PNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::HashMap;
38use std::sync::Arc;
39use std::time::{Duration, SystemTime};
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, info, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub enable_mcp_server: bool,
67
68 pub mcp_server_config: Option<MCPServerConfig>,
70
71 pub connection_timeout: Duration,
73
74 pub keep_alive_interval: Duration,
76
77 pub max_connections: usize,
79
80 pub max_incoming_connections: usize,
82
83 pub dht_config: DHTConfig,
85
86 pub security_config: SecurityConfig,
88
89 pub production_config: Option<ProductionConfig>,
91
92 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
94
95 pub identity_config: Option<IdentityManagerConfig>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct DHTConfig {
102 pub k_value: usize,
104
105 pub alpha_value: usize,
107
108 pub record_ttl: Duration,
110
111 pub refresh_interval: Duration,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SecurityConfig {
118 pub enable_noise: bool,
120
121 pub enable_tls: bool,
123
124 pub trust_level: TrustLevel,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
130pub enum TrustLevel {
131 None,
133 Basic,
135 Full,
137}
138
139impl NodeConfig {
140 pub fn new() -> Result<Self> {
146 let config = Config::default();
148
149 let listen_addr = config.listen_socket_addr()?;
151
152 let mut listen_addrs = vec![];
154
155 if config.network.ipv6_enabled {
157 let ipv6_addr = std::net::SocketAddr::new(
158 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
159 listen_addr.port(),
160 );
161 listen_addrs.push(ipv6_addr);
162 }
163
164 let ipv4_addr = std::net::SocketAddr::new(
166 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
167 listen_addr.port(),
168 );
169 listen_addrs.push(ipv4_addr);
170
171 Ok(Self {
172 peer_id: None,
173 listen_addrs,
174 listen_addr,
175 bootstrap_peers: Vec::new(),
176 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
177 enable_ipv6: config.network.ipv6_enabled,
178 enable_mcp_server: config.mcp.enabled,
179 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
181 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
182 max_connections: config.network.max_connections,
183 max_incoming_connections: config.security.connection_limit as usize,
184 dht_config: DHTConfig::default(),
185 security_config: SecurityConfig::default(),
186 production_config: None,
187 bootstrap_cache_config: None,
188 identity_config: None,
189 })
190 }
191}
192
193impl Default for NodeConfig {
194 fn default() -> Self {
195 let config = Config::default();
197
198 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
202 9000,
203 )
204 });
205
206 Self {
207 peer_id: None,
208 listen_addrs: vec![
209 std::net::SocketAddr::new(
210 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
211 listen_addr.port(),
212 ),
213 std::net::SocketAddr::new(
214 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
215 listen_addr.port(),
216 ),
217 ],
218 listen_addr,
219 bootstrap_peers: Vec::new(),
220 bootstrap_peers_str: Vec::new(),
221 enable_ipv6: config.network.ipv6_enabled,
222 enable_mcp_server: config.mcp.enabled,
223 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
225 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
226 max_connections: config.network.max_connections,
227 max_incoming_connections: config.security.connection_limit as usize,
228 dht_config: DHTConfig::default(),
229 security_config: SecurityConfig::default(),
230 production_config: None, bootstrap_cache_config: None,
232 identity_config: None, }
234 }
235}
236
237impl NodeConfig {
238 pub fn from_config(config: &Config) -> Result<Self> {
240 let listen_addr = config.listen_socket_addr()?;
241 let bootstrap_addrs = config.bootstrap_addrs()?;
242
243 let mut node_config = Self {
244 peer_id: None,
245 listen_addrs: vec![listen_addr],
246 listen_addr,
247 bootstrap_peers: bootstrap_addrs
248 .iter()
249 .map(|addr| addr.socket_addr())
250 .collect(),
251 bootstrap_peers_str: config
252 .network
253 .bootstrap_nodes
254 .iter()
255 .map(|addr| addr.to_string())
256 .collect(),
257 enable_ipv6: config.network.ipv6_enabled,
258 enable_mcp_server: config.mcp.enabled,
259 mcp_server_config: Some(MCPServerConfig {
260 server_name: "P2P-MCP-Server".to_string(),
261 server_version: "1.0.0".to_string(),
262 enable_dht_discovery: true,
263 max_concurrent_requests: 100,
264 request_timeout: Duration::from_secs(30),
265 enable_auth: false,
266 enable_rate_limiting: true,
267 rate_limit_rpm: 60,
268 enable_logging: true,
269 max_tool_execution_time: Duration::from_secs(60),
270 tool_memory_limit: 1024 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
272 }),
273 connection_timeout: Duration::from_secs(config.network.connection_timeout),
274 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
275 max_connections: config.network.max_connections,
276 max_incoming_connections: config.security.connection_limit as usize,
277 dht_config: DHTConfig {
278 k_value: 20,
279 alpha_value: 3,
280 record_ttl: Duration::from_secs(3600),
281 refresh_interval: Duration::from_secs(900),
282 },
283 security_config: SecurityConfig {
284 enable_noise: true,
285 enable_tls: true,
286 trust_level: TrustLevel::Basic,
287 },
288 production_config: Some(ProductionConfig {
289 max_connections: config.network.max_connections,
290 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
293 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
294 health_check_interval: Duration::from_secs(30),
295 metrics_interval: Duration::from_secs(60),
296 enable_performance_tracking: true,
297 enable_auto_cleanup: true,
298 shutdown_timeout: Duration::from_secs(30),
299 rate_limits: crate::production::RateLimitConfig::default(),
300 }),
301 bootstrap_cache_config: None,
302 identity_config: Some(IdentityManagerConfig {
303 cache_ttl: Duration::from_secs(3600),
304 challenge_timeout: Duration::from_secs(30),
305 }),
306 };
307
308 if config.network.ipv6_enabled {
310 node_config.listen_addrs.push(std::net::SocketAddr::new(
311 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
312 listen_addr.port(),
313 ));
314 }
315
316 Ok(node_config)
317 }
318
319 pub fn with_listen_addr(addr: &str) -> Result<Self> {
321 let listen_addr: std::net::SocketAddr = addr
322 .parse()
323 .map_err(|e: std::net::AddrParseError| {
324 NetworkError::InvalidAddress(e.to_string().into())
325 })
326 .map_err(P2PError::Network)?;
327 let cfg = NodeConfig {
328 listen_addr,
329 listen_addrs: vec![listen_addr],
330 ..Default::default()
331 };
332 Ok(cfg)
333 }
334}
335
336impl Default for DHTConfig {
337 fn default() -> Self {
338 Self {
339 k_value: 20,
340 alpha_value: 5,
341 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
344 }
345}
346
347impl Default for SecurityConfig {
348 fn default() -> Self {
349 Self {
350 enable_noise: true,
351 enable_tls: true,
352 trust_level: TrustLevel::Basic,
353 }
354 }
355}
356
357#[derive(Debug, Clone)]
359pub struct PeerInfo {
360 pub peer_id: PeerId,
362
363 pub addresses: Vec<String>,
365
366 pub connected_at: Instant,
368
369 pub last_seen: Instant,
371
372 pub status: ConnectionStatus,
374
375 pub protocols: Vec<String>,
377
378 pub heartbeat_count: u64,
380}
381
382#[derive(Debug, Clone, PartialEq)]
384pub enum ConnectionStatus {
385 Connecting,
387 Connected,
389 Disconnecting,
391 Disconnected,
393 Failed(String),
395}
396
397#[derive(Debug, Clone)]
399pub enum NetworkEvent {
400 PeerConnected {
402 peer_id: PeerId,
404 addresses: Vec<String>,
406 },
407
408 PeerDisconnected {
410 peer_id: PeerId,
412 reason: String,
414 },
415
416 MessageReceived {
418 peer_id: PeerId,
420 protocol: String,
422 data: Vec<u8>,
424 },
425
426 ConnectionFailed {
428 peer_id: Option<PeerId>,
430 address: String,
432 error: String,
434 },
435
436 DHTRecordStored {
438 key: Vec<u8>,
440 value: Vec<u8>,
442 },
443
444 DHTRecordRetrieved {
446 key: Vec<u8>,
448 value: Option<Vec<u8>>,
450 },
451}
452
453#[derive(Debug, Clone)]
458pub enum P2PEvent {
459 Message {
461 topic: String,
463 source: PeerId,
465 data: Vec<u8>,
467 },
468 PeerConnected(PeerId),
470 PeerDisconnected(PeerId),
472}
473
474pub struct P2PNode {
476 config: NodeConfig,
478
479 peer_id: PeerId,
481
482 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
484
485 event_tx: broadcast::Sender<P2PEvent>,
487
488 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
490
491 start_time: Instant,
493
494 running: RwLock<bool>,
496
497 mcp_server: Option<Arc<MCPServer>>,
499
500 dht: Option<Arc<RwLock<DHT>>>,
502
503 resource_manager: Option<Arc<ResourceManager>>,
505
506 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
508
509 network_node: Arc<P2PNetworkNode>,
511
512 #[allow(dead_code)]
514 rate_limiter: Arc<RateLimiter>,
515}
516
517impl P2PNode {
518 pub fn new_for_tests() -> Self {
520 let (event_tx, _) = broadcast::channel(16);
521 Self {
522 config: NodeConfig::default(),
523 peer_id: "test_peer".to_string(),
524 peers: Arc::new(RwLock::new(HashMap::new())),
525 event_tx,
526 listen_addrs: RwLock::new(Vec::new()),
527 start_time: Instant::now(),
528 running: RwLock::new(false),
529 mcp_server: None,
530 dht: None,
531 resource_manager: None,
532 bootstrap_manager: None,
533 network_node: {
534 let bind: std::net::SocketAddr = "127.0.0.1:0"
536 .parse()
537 .unwrap_or(std::net::SocketAddr::from(([127, 0, 0, 1], 0)));
538 let cfg = ant_quic::QuicNodeConfig {
539 role: ant_quic::EndpointRole::Client,
540 bootstrap_nodes: vec![],
541 enable_coordinator: false,
542 max_connections: 1,
543 connection_timeout: std::time::Duration::from_millis(1),
544 stats_interval: std::time::Duration::from_millis(1),
545 auth_config: ant_quic::auth::AuthConfig::default(),
546 bind_addr: Some(bind),
547 };
548 let node = tokio::runtime::Handle::current()
549 .block_on(
550 crate::transport::ant_quic_adapter::P2PNetworkNode::new_with_config(
551 bind, cfg,
552 ),
553 )
554 .unwrap_or_else(|_| {
555 let fallback_bind = std::net::SocketAddr::from(([127, 0, 0, 1], 0));
557 #[allow(clippy::expect_used)]
558 {
559 tokio::runtime::Handle::current()
560 .block_on(crate::transport::ant_quic_adapter::P2PNetworkNode::new(
561 fallback_bind,
562 ))
563 .expect("ant-quic fallback creation failed")
564 }
565 });
566 Arc::new(node)
567 },
568 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
569 max_requests: 100,
570 burst_size: 100,
571 window: std::time::Duration::from_secs(1),
572 ..Default::default()
573 })),
574 }
575 }
576 pub async fn new(config: NodeConfig) -> Result<Self> {
578 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
579 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
581 });
582
583 let (event_tx, _) = broadcast::channel(1000);
584
585 let dht = if true {
587 let dht_config = crate::dht::DHTConfig {
589 replication_factor: config.dht_config.k_value,
590 bucket_size: config.dht_config.k_value,
591 alpha: config.dht_config.alpha_value,
592 record_ttl: config.dht_config.record_ttl,
593 bucket_refresh_interval: config.dht_config.refresh_interval,
594 republish_interval: config.dht_config.refresh_interval,
595 max_distance: 160, };
597 let dht_key = crate::dht::Key::new(peer_id.as_bytes());
598 let dht_instance = DHT::new(dht_key, dht_config);
599 Some(Arc::new(RwLock::new(dht_instance)))
600 } else {
601 None
602 };
603
604 let mcp_server = if config.enable_mcp_server {
606 let mcp_config = config
607 .mcp_server_config
608 .clone()
609 .unwrap_or_else(|| MCPServerConfig {
610 server_name: format!("P2P-MCP-{peer_id}"),
611 server_version: crate::VERSION.to_string(),
612 enable_dht_discovery: dht.is_some(),
613 ..MCPServerConfig::default()
614 });
615
616 let mut server = MCPServer::new(mcp_config);
617
618 if let Some(ref dht_instance) = dht {
620 server = server.with_dht(dht_instance.clone());
621 }
622
623 Some(Arc::new(server))
624 } else {
625 None
626 };
627
628 let resource_manager = config
630 .production_config
631 .clone()
632 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
633
634 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
636 match BootstrapManager::with_config(cache_config.clone()).await {
637 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
638 Err(e) => {
639 warn!(
640 "Failed to initialize bootstrap manager: {}, continuing without cache",
641 e
642 );
643 None
644 }
645 }
646 } else {
647 match BootstrapManager::new().await {
648 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
649 Err(e) => {
650 warn!(
651 "Failed to initialize bootstrap manager: {}, continuing without cache",
652 e
653 );
654 None
655 }
656 }
657 };
658
659 let bind_addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_addr.port()));
662
663 let bootstrap_nodes: Vec<std::net::SocketAddr> = config
665 .bootstrap_peers_str
666 .iter()
667 .filter_map(|addr_str| addr_str.parse().ok())
668 .collect();
669
670 let quic_config = ant_quic::QuicNodeConfig {
671 role: if bootstrap_nodes.is_empty() {
672 ant_quic::EndpointRole::Server {
673 can_coordinate: false,
674 }
675 } else {
676 ant_quic::EndpointRole::Client
677 },
678 bootstrap_nodes,
679 enable_coordinator: false,
680 max_connections: config.max_connections.min(1000),
681 connection_timeout: config.connection_timeout,
682 stats_interval: std::time::Duration::from_secs(60),
683 auth_config: ant_quic::auth::AuthConfig::default(),
684 bind_addr: Some(bind_addr),
685 };
686
687 let network_node = Arc::new(
688 P2PNetworkNode::new_with_config(bind_addr, quic_config)
689 .await
690 .map_err(|e| {
691 P2PError::Transport(crate::error::TransportError::SetupFailed(
692 format!("Failed to create P2P network node: {}", e).into(),
693 ))
694 })?,
695 );
696
697 let rate_limiter = Arc::new(RateLimiter::new(
699 crate::validation::RateLimitConfig::default(),
700 ));
701
702 let node = Self {
703 config,
704 peer_id,
705 peers: Arc::new(RwLock::new(HashMap::new())),
706 event_tx,
707 listen_addrs: RwLock::new(Vec::new()),
708 start_time: Instant::now(),
709 running: RwLock::new(false),
710 mcp_server,
711 dht,
712 resource_manager,
713 bootstrap_manager,
714 network_node,
715 rate_limiter,
716 };
717 info!("Created P2P node with peer ID: {}", node.peer_id);
718
719 Ok(node)
724 }
725
726 pub fn builder() -> NodeBuilder {
728 NodeBuilder::new()
729 }
730
731 pub fn peer_id(&self) -> &PeerId {
733 &self.peer_id
734 }
735
736 pub async fn initialize_mcp_network(&self) -> Result<()> {
739 if let Some(ref _mcp_server) = self.mcp_server {
740 let (send_tx, mut send_rx) =
742 tokio::sync::mpsc::unbounded_channel::<(PeerId, String, Vec<u8>)>();
743
744 let network_sender = P2PNetworkSender::new(self.peer_id.clone(), send_tx);
746
747 _mcp_server
749 .set_network_sender(Arc::new(network_sender))
750 .await;
751
752 let network_node: Arc<crate::transport::ant_quic_adapter::P2PNetworkNode> =
754 Arc::clone(&self.network_node);
755 let _peer_id_for_task = self.peer_id.clone();
756 tokio::spawn(async move {
757 while let Some((peer_id, protocol, data)) = send_rx.recv().await {
758 debug!(
759 "Sending network message to {}: {} bytes on protocol {}",
760 peer_id,
761 data.len(),
762 protocol
763 );
764
765 let message_data = match handle_protocol_message_creation(&protocol, data) {
767 Some(msg) => msg,
768 None => continue,
769 };
770
771 let send_result = network_node
773 .send_to_peer_string(&peer_id, &message_data)
774 .await;
775 handle_message_send_result(
776 send_result.map_err(|e| {
777 P2PError::Transport(crate::error::TransportError::StreamError(
778 e.to_string().into(),
779 ))
780 }),
781 &peer_id,
782 )
783 .await;
784 }
785 });
786
787 info!(
788 "MCP network integration initialized for peer {}",
789 self.peer_id
790 );
791 }
792 Ok(())
793 }
794
795 pub fn local_addr(&self) -> Option<String> {
796 self.listen_addrs
797 .try_read()
798 .ok()
799 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
800 }
801
802 pub async fn subscribe(&self, topic: &str) -> Result<()> {
803 info!("Subscribed to topic: {}", topic);
806 Ok(())
807 }
808
809 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
810 info!(
811 "Publishing message to topic: {} ({} bytes)",
812 topic,
813 data.len()
814 );
815
816 let peer_list: Vec<PeerId> = {
818 let peers_guard = self.peers.read().await;
819 peers_guard.keys().cloned().collect()
820 };
821
822 if peer_list.is_empty() {
823 debug!("No peers connected, message will only be sent to local subscribers");
824 } else {
825 let mut send_count = 0;
827 for peer_id in &peer_list {
828 match self.send_message(peer_id, topic, data.to_vec()).await {
829 Ok(_) => {
830 send_count += 1;
831 debug!("Sent message to peer: {}", peer_id);
832 }
833 Err(e) => {
834 warn!("Failed to send message to peer {}: {}", peer_id, e);
835 }
836 }
837 }
838 info!(
839 "Published message to {}/{} connected peers",
840 send_count,
841 peer_list.len()
842 );
843 }
844
845 let event = P2PEvent::Message {
847 topic: topic.to_string(),
848 source: self.peer_id.clone(),
849 data: data.to_vec(),
850 };
851 let _ = self.event_tx.send(event);
852
853 Ok(())
854 }
855
856 pub fn config(&self) -> &NodeConfig {
858 &self.config
859 }
860
861 pub async fn start(&self) -> Result<()> {
863 info!("Starting P2P node...");
864
865 if let Some(ref resource_manager) = self.resource_manager {
867 resource_manager.start().await.map_err(|e| {
868 P2PError::Network(crate::error::NetworkError::ProtocolError(
869 format!("Failed to start resource manager: {e}").into(),
870 ))
871 })?;
872 info!("Production resource manager started");
873 }
874
875 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
877 let mut manager = bootstrap_manager.write().await;
878 manager.start_background_tasks().await.map_err(|e| {
879 P2PError::Network(crate::error::NetworkError::ProtocolError(
880 format!("Failed to start bootstrap manager: {e}").into(),
881 ))
882 })?;
883 info!("Bootstrap cache manager started");
884 }
885
886 *self.running.write().await = true;
888
889 self.start_network_listeners().await?;
891
892 let listen_addrs = self.listen_addrs.read().await;
894 info!("P2P node started on addresses: {:?}", *listen_addrs);
895
896 self.initialize_mcp_network().await?;
898
899 if let Some(ref _mcp_server) = self.mcp_server {
901 _mcp_server.start().await.map_err(|e| {
902 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
903 format!("Failed to start MCP server: {e}").into(),
904 ))
905 })?;
906 info!("MCP server started with network integration");
907 }
908
909 self.start_message_receiving_system().await?;
911
912 self.connect_bootstrap_peers().await?;
914
915 Ok(())
916 }
917
918 async fn start_network_listeners(&self) -> Result<()> {
920 info!("Starting network listeners...");
921
922 let _network_node = &self.network_node;
924
925 for &socket_addr in &self.config.listen_addrs {
927 if let Err(e) = self.start_listener_on_address(socket_addr).await {
930 warn!("Failed to start listener on {}: {}", socket_addr, e);
931 } else {
932 info!("Started listener on {}", socket_addr);
933 }
934 }
935
936 if self.config.listen_addrs.is_empty() {
938 let mut default_addrs = vec![std::net::SocketAddr::new(
940 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
941 9000,
942 )];
943
944 if self.config.enable_ipv6 {
946 default_addrs.push(std::net::SocketAddr::new(
947 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
948 9000,
949 ));
950 }
951
952 for addr in default_addrs {
953 if let Err(e) = self.start_listener_on_address(addr).await {
954 warn!("Failed to start default listener on {}: {}", addr, e);
955 } else {
956 info!("Started default listener on {}", addr);
957 }
958 }
959 }
960
961 Ok(())
962 }
963
964 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
966 warn!("QUIC transport temporarily disabled during ant-quic migration");
1005 Err(crate::P2PError::Transport(
1007 crate::error::TransportError::SetupFailed(
1008 format!(
1009 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1010 )
1011 .into(),
1012 ),
1013 ))
1014 }
1015
1016 #[allow(dead_code)] async fn start_connection_acceptor(
1019 &self,
1020 transport: Arc<dyn crate::transport::Transport>,
1021 addr: std::net::SocketAddr,
1022 transport_type: crate::transport::TransportType,
1023 ) -> Result<()> {
1024 info!(
1025 "Starting connection acceptor for {:?} on {}",
1026 transport_type, addr
1027 );
1028
1029 let event_tx = self.event_tx.clone();
1031 let _peer_id = self.peer_id.clone();
1032 let peers = Arc::clone(&self.peers);
1033 let _network_node = Arc::clone(&self.network_node);
1034 let mcp_server = self.mcp_server.clone();
1035 let rate_limiter = Arc::clone(&self.rate_limiter);
1036
1037 tokio::spawn(async move {
1039 loop {
1040 match transport.accept().await {
1041 Ok(connection) => {
1042 let remote_addr = connection.remote_addr();
1043 let connection_peer_id =
1044 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1045
1046 let socket_addr = remote_addr.socket_addr();
1048 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1049 continue;
1051 }
1052
1053 info!(
1054 "Accepted {:?} connection from {} (peer: {})",
1055 transport_type, remote_addr, connection_peer_id
1056 );
1057
1058 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1060
1061 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1063
1064 spawn_connection_handler(
1066 connection,
1067 connection_peer_id,
1068 event_tx.clone(),
1069 Arc::clone(&peers),
1070 mcp_server.clone(),
1071 );
1072 }
1073 Err(e) => {
1074 warn!(
1075 "Failed to accept {:?} connection on {}: {}",
1076 transport_type, addr, e
1077 );
1078
1079 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1081 }
1082 }
1083 }
1084 });
1085
1086 info!(
1087 "Connection acceptor background task started for {:?} on {}",
1088 transport_type, addr
1089 );
1090 Ok(())
1091 }
1092
1093 async fn start_message_receiving_system(&self) -> Result<()> {
1095 info!("Message receiving system initialized (background tasks simplified for demo)");
1096
1097 Ok(())
1102 }
1103
1104 #[allow(dead_code)]
1106 async fn handle_received_message(
1107 &self,
1108 message_data: Vec<u8>,
1109 peer_id: &PeerId,
1110 protocol: &str,
1111 event_tx: &broadcast::Sender<P2PEvent>,
1112 ) -> Result<()> {
1113 if protocol == MCP_PROTOCOL {
1115 return self.handle_mcp_message(message_data, peer_id).await;
1116 }
1117
1118 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1120 Ok(message) => {
1121 if let (Some(protocol), Some(data), Some(from)) = (
1122 message.get("protocol").and_then(|v| v.as_str()),
1123 message.get("data").and_then(|v| v.as_array()),
1124 message.get("from").and_then(|v| v.as_str()),
1125 ) {
1126 let data_bytes: Vec<u8> = data
1128 .iter()
1129 .filter_map(|v| v.as_u64().map(|n| n as u8))
1130 .collect();
1131
1132 let event = P2PEvent::Message {
1134 topic: protocol.to_string(),
1135 source: from.to_string(),
1136 data: data_bytes,
1137 };
1138
1139 let _ = event_tx.send(event);
1140 debug!("Generated message event from peer: {}", peer_id);
1141 }
1142 }
1143 Err(e) => {
1144 warn!("Failed to parse received message from {}: {}", peer_id, e);
1145 }
1146 }
1147
1148 Ok(())
1149 }
1150
1151 #[allow(dead_code)]
1153 async fn handle_mcp_message(&self, message_data: Vec<u8>, peer_id: &PeerId) -> Result<()> {
1154 if let Some(ref _mcp_server) = self.mcp_server {
1155 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
1157 Ok(p2p_mcp_message) => {
1158 debug!(
1159 "Received MCP message from peer {}: {:?}",
1160 peer_id, p2p_mcp_message.message_type
1161 );
1162
1163 match p2p_mcp_message.message_type {
1165 crate::mcp::P2PMCPMessageType::Request => {
1166 self.handle_mcp_tool_request(p2p_mcp_message, peer_id)
1168 .await?;
1169 }
1170 crate::mcp::P2PMCPMessageType::Response => {
1171 self.handle_mcp_tool_response(p2p_mcp_message).await?;
1173 }
1174 crate::mcp::P2PMCPMessageType::ServiceAdvertisement => {
1175 self.handle_mcp_service_advertisement(p2p_mcp_message, peer_id)
1177 .await?;
1178 }
1179 crate::mcp::P2PMCPMessageType::ServiceDiscovery => {
1180 self.handle_mcp_service_discovery(p2p_mcp_message, peer_id)
1182 .await?;
1183 }
1184 crate::mcp::P2PMCPMessageType::Heartbeat => {
1185 debug!("Received heartbeat from peer {}", peer_id);
1187
1188 let _ =
1190 update_peer_heartbeat(&self.peers, peer_id)
1191 .await
1192 .map_err(|e| {
1193 debug!(
1194 "Failed to update heartbeat for peer {}: {}",
1195 peer_id, e
1196 )
1197 });
1198
1199 let timestamp = std::time::SystemTime::now()
1201 .duration_since(std::time::UNIX_EPOCH)
1202 .map_err(|e| {
1203 P2PError::Network(NetworkError::ProtocolError(
1204 format!("System time error: {}", e).into(),
1205 ))
1206 })?
1207 .as_secs();
1208
1209 let ack_data = serde_json::to_vec(&serde_json::json!({
1210 "type": "heartbeat_ack",
1211 "timestamp": timestamp
1212 }))
1213 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1214
1215 let _ = self
1216 .send_message(peer_id, MCP_PROTOCOL, ack_data)
1217 .await
1218 .map_err(|e| {
1219 warn!("Failed to send heartbeat ack to {}: {}", peer_id, e)
1220 });
1221 }
1222 crate::mcp::P2PMCPMessageType::HealthCheck => {
1223 debug!("Received health check from peer {}", peer_id);
1225
1226 let peers_count = self.peers.read().await.len();
1228 let uptime = self.start_time.elapsed();
1229
1230 let (memory_usage, cpu_usage) =
1232 get_resource_metrics(&self.resource_manager).await;
1233
1234 let timestamp = std::time::SystemTime::now()
1236 .duration_since(std::time::UNIX_EPOCH)
1237 .map_err(|e| {
1238 P2PError::Network(NetworkError::ProtocolError(
1239 format!("System time error: {}", e).into(),
1240 ))
1241 })?
1242 .as_secs();
1243
1244 let health_response = serde_json::json!({
1245 "type": "health_check_response",
1246 "status": "healthy",
1247 "peer_id": self.peer_id,
1248 "peers_count": peers_count,
1249 "uptime_secs": uptime.as_secs(),
1250 "memory_usage_bytes": memory_usage,
1251 "cpu_usage_percent": cpu_usage,
1252 "timestamp": timestamp
1253 });
1254
1255 let response_data = serde_json::to_vec(&health_response)
1256 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1257
1258 if let Err(e) = self
1260 .send_message(peer_id, MCP_PROTOCOL, response_data)
1261 .await
1262 {
1263 warn!("Failed to send health check response to {}: {}", peer_id, e);
1264 }
1265 }
1266 }
1267 }
1268 Err(e) => {
1269 warn!(
1270 "Failed to deserialize MCP message from peer {}: {}",
1271 peer_id, e
1272 );
1273 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1274 format!("Invalid MCP message: {e}").into(),
1275 )));
1276 }
1277 }
1278 } else {
1279 warn!("Received MCP message but MCP server is not enabled");
1280 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1281 "MCP server not enabled".to_string().into(),
1282 )));
1283 }
1284
1285 Ok(())
1286 }
1287
1288 #[allow(dead_code)]
1290 async fn handle_mcp_tool_request(
1291 &self,
1292 message: crate::mcp::P2PMCPMessage,
1293 peer_id: &PeerId,
1294 ) -> Result<()> {
1295 if let Some(ref _mcp_server) = self.mcp_server {
1296 if let crate::mcp::MCPMessage::CallTool { name, arguments } = message.payload {
1298 debug!(
1299 "Handling MCP tool request for '{}' from peer {}",
1300 name, peer_id
1301 );
1302
1303 let context = MCPCallContext {
1305 caller_id: peer_id.clone(),
1306 timestamp: std::time::SystemTime::now(),
1307 timeout: Duration::from_secs(30),
1308 auth_info: None,
1309 metadata: std::collections::HashMap::new(),
1310 };
1311
1312 match _mcp_server.call_tool(&name, arguments, context).await {
1314 Ok(result) => {
1315 let response_message = crate::mcp::P2PMCPMessage {
1317 message_type: crate::mcp::P2PMCPMessageType::Response,
1318 message_id: message.message_id,
1319 source_peer: self.peer_id.clone(),
1320 target_peer: Some(peer_id.clone()),
1321 timestamp: std::time::SystemTime::now()
1322 .duration_since(std::time::UNIX_EPOCH)
1323 .unwrap_or_default()
1324 .as_secs(),
1325 payload: crate::mcp::MCPMessage::CallToolResult {
1326 content: vec![crate::mcp::MCPContent::Text {
1327 text: serde_json::to_string(&result).unwrap_or_default(),
1328 }],
1329 is_error: false,
1330 },
1331 ttl: 5,
1332 };
1333
1334 let response_data = serde_json::to_vec(&response_message)
1336 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1337
1338 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1339 .await?;
1340 debug!("Sent MCP tool response to peer {}", peer_id);
1341 }
1342 Err(e) => {
1343 let error_message = crate::mcp::P2PMCPMessage {
1345 message_type: crate::mcp::P2PMCPMessageType::Response,
1346 message_id: message.message_id,
1347 source_peer: self.peer_id.clone(),
1348 target_peer: Some(peer_id.clone()),
1349 timestamp: std::time::SystemTime::now()
1350 .duration_since(std::time::UNIX_EPOCH)
1351 .unwrap_or_default()
1352 .as_secs(),
1353 payload: crate::mcp::MCPMessage::CallToolResult {
1354 content: vec![crate::mcp::MCPContent::Text {
1355 text: format!("Error: {e}"),
1356 }],
1357 is_error: true,
1358 },
1359 ttl: 5,
1360 };
1361
1362 let error_data = serde_json::to_vec(&error_message)
1363 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1364
1365 self.send_message(peer_id, MCP_PROTOCOL, error_data).await?;
1366 warn!("Sent MCP error response to peer {}: {}", peer_id, e);
1367 }
1368 }
1369 }
1370 }
1371
1372 Ok(())
1373 }
1374
1375 #[allow(dead_code)]
1377 async fn handle_mcp_tool_response(&self, message: crate::mcp::P2PMCPMessage) -> Result<()> {
1378 if let Some(ref _mcp_server) = self.mcp_server {
1379 debug!("Received MCP tool response: {}", message.message_id);
1381 }
1384
1385 Ok(())
1386 }
1387
1388 #[allow(dead_code)]
1390 async fn handle_mcp_service_advertisement(
1391 &self,
1392 message: crate::mcp::P2PMCPMessage,
1393 peer_id: &PeerId,
1394 ) -> Result<()> {
1395 debug!("Received MCP service advertisement from peer {}", peer_id);
1396
1397 if let Some(ref _mcp_server) = self.mcp_server {
1398 _mcp_server.handle_service_advertisement(message).await?;
1400 debug!("Processed service advertisement from peer {}", peer_id);
1401 } else {
1402 warn!("Received MCP service advertisement but MCP server is not enabled");
1403 }
1404
1405 Ok(())
1406 }
1407
1408 #[allow(dead_code)]
1410 async fn handle_mcp_service_discovery(
1411 &self,
1412 message: crate::mcp::P2PMCPMessage,
1413 peer_id: &PeerId,
1414 ) -> Result<()> {
1415 debug!("Received MCP service discovery query from peer {}", peer_id);
1416
1417 if let Some(ref _mcp_server) = self.mcp_server {
1418 if let Ok(Some(response_data)) = _mcp_server.handle_service_discovery(message).await {
1420 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1422 .await?;
1423 debug!("Sent service discovery response to peer {}", peer_id);
1424 }
1425 } else {
1426 warn!("Received MCP service discovery query but MCP server is not enabled");
1427 }
1428
1429 Ok(())
1430 }
1431
1432 pub async fn run(&self) -> Result<()> {
1434 if !*self.running.read().await {
1435 self.start().await?;
1436 }
1437
1438 info!("P2P node running...");
1439
1440 loop {
1442 if !*self.running.read().await {
1443 break;
1444 }
1445
1446 self.periodic_tasks().await?;
1448
1449 tokio::time::sleep(Duration::from_millis(100)).await;
1451 }
1452
1453 info!("P2P node stopped");
1454 Ok(())
1455 }
1456
1457 pub async fn stop(&self) -> Result<()> {
1459 info!("Stopping P2P node...");
1460
1461 *self.running.write().await = false;
1463
1464 if let Some(ref _mcp_server) = self.mcp_server {
1466 _mcp_server.shutdown().await.map_err(|e| {
1467 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1468 format!("Failed to shutdown MCP server: {e}").into(),
1469 ))
1470 })?;
1471 info!("MCP server stopped");
1472 }
1473
1474 self.disconnect_all_peers().await?;
1476
1477 if let Some(ref resource_manager) = self.resource_manager {
1479 resource_manager.shutdown().await.map_err(|e| {
1480 P2PError::Network(crate::error::NetworkError::ProtocolError(
1481 format!("Failed to shutdown resource manager: {e}").into(),
1482 ))
1483 })?;
1484 info!("Production resource manager stopped");
1485 }
1486
1487 info!("P2P node stopped");
1488 Ok(())
1489 }
1490
1491 pub async fn shutdown(&self) -> Result<()> {
1493 self.stop().await
1494 }
1495
1496 pub async fn is_running(&self) -> bool {
1498 *self.running.read().await
1499 }
1500
1501 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1503 self.listen_addrs.read().await.clone()
1504 }
1505
1506 pub async fn connected_peers(&self) -> Vec<PeerId> {
1508 self.peers.read().await.keys().cloned().collect()
1509 }
1510
1511 pub async fn peer_count(&self) -> usize {
1513 self.peers.read().await.len()
1514 }
1515
1516 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1518 self.peers.read().await.get(peer_id).cloned()
1519 }
1520
1521 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1523 info!("Connecting to peer at: {}", address);
1524
1525 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1527 Some(resource_manager.acquire_connection().await?)
1528 } else {
1529 None
1530 };
1531
1532 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1534 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1535 format!("{}: {}", address, e).into(),
1536 ))
1537 })?;
1538
1539 let peer_id = {
1541 match self.network_node.connect_to_peer_string(_socket_addr).await {
1542 Ok(connected_peer_id) => {
1543 info!("Successfully connected to peer: {}", connected_peer_id);
1544 connected_peer_id
1545 }
1546 Err(e) => {
1547 warn!("Failed to connect to peer at {}: {}", address, e);
1548
1549 let demo_peer_id =
1552 format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1553 warn!(
1554 "Using demo peer ID: {} (transport connection failed)",
1555 demo_peer_id
1556 );
1557 demo_peer_id
1558 }
1559 }
1560 };
1561
1562 let peer_info = PeerInfo {
1564 peer_id: peer_id.clone(),
1565 addresses: vec![address.to_string()],
1566 connected_at: Instant::now(),
1567 last_seen: Instant::now(),
1568 status: ConnectionStatus::Connected,
1569 protocols: vec!["p2p-foundation/1.0".to_string()],
1570 heartbeat_count: 0,
1571 };
1572
1573 self.peers.write().await.insert(peer_id.clone(), peer_info);
1575
1576 if let Some(ref resource_manager) = self.resource_manager {
1578 resource_manager.record_bandwidth(0, 0); }
1580
1581 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1583
1584 info!("Connected to peer: {}", peer_id);
1585 Ok(peer_id)
1586 }
1587
1588 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1590 info!("Disconnecting from peer: {}", peer_id);
1591
1592 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1593 peer_info.status = ConnectionStatus::Disconnected;
1594
1595 let _ = self
1597 .event_tx
1598 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1599
1600 info!("Disconnected from peer: {}", peer_id);
1601 }
1602
1603 Ok(())
1604 }
1605
1606 pub async fn send_message(
1608 &self,
1609 peer_id: &PeerId,
1610 protocol: &str,
1611 data: Vec<u8>,
1612 ) -> Result<()> {
1613 debug!(
1614 "Sending message to peer {} on protocol {}",
1615 peer_id, protocol
1616 );
1617
1618 if let Some(ref resource_manager) = self.resource_manager
1620 && !resource_manager
1621 .check_rate_limit(peer_id, "message")
1622 .await?
1623 {
1624 return Err(P2PError::ResourceExhausted(
1625 format!("Rate limit exceeded for peer {}", peer_id).into(),
1626 ));
1627 }
1628
1629 if !self.peers.read().await.contains_key(peer_id) {
1631 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1632 peer_id.to_string().into(),
1633 )));
1634 }
1635
1636 if protocol == MCP_PROTOCOL {
1638 if data.len() < 4 {
1640 return Err(P2PError::Network(
1641 crate::error::NetworkError::ProtocolError(
1642 "Invalid MCP message: too short".to_string().into(),
1643 ),
1644 ));
1645 }
1646
1647 let message_type = data.first().unwrap_or(&0);
1649 if *message_type > 10 {
1650 return Err(P2PError::Network(
1652 crate::error::NetworkError::ProtocolError(
1653 "Invalid MCP message type".to_string().into(),
1654 ),
1655 ));
1656 }
1657
1658 debug!("Validated MCP message for network transmission");
1659 }
1660
1661 if let Some(ref resource_manager) = self.resource_manager {
1663 resource_manager.record_bandwidth(data.len() as u64, 0);
1664 }
1665
1666 let _message_data = self.create_protocol_message(protocol, data)?;
1668
1669 {
1671 match self.network_node.send_message(peer_id, _message_data).await {
1672 Ok(_) => {
1673 debug!("Message sent to peer {} via transport layer", peer_id);
1674 }
1675 Err(e) => {
1676 warn!("Failed to send message to peer {}: {}", peer_id, e);
1677 return Err(P2PError::Network(
1678 crate::error::NetworkError::ProtocolError(
1679 format!("Message send failed: {e}").into(),
1680 ),
1681 ));
1682 }
1683 }
1684 Ok(())
1685 }
1686 }
1687
1688 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1690 use serde_json::json;
1691
1692 let timestamp = std::time::SystemTime::now()
1693 .duration_since(std::time::UNIX_EPOCH)
1694 .map_err(|e| {
1695 P2PError::Network(NetworkError::ProtocolError(
1696 format!("System time error: {}", e).into(),
1697 ))
1698 })?
1699 .as_secs();
1700
1701 let message = json!({
1703 "protocol": protocol,
1704 "data": data,
1705 "from": self.peer_id,
1706 "timestamp": timestamp
1707 });
1708
1709 serde_json::to_vec(&message).map_err(|e| {
1710 P2PError::Transport(crate::error::TransportError::StreamError(
1711 format!("Failed to serialize message: {e}").into(),
1712 ))
1713 })
1714 }
1715
1716 }
1718
1719#[allow(dead_code)]
1721fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1722 use serde_json::json;
1723
1724 let timestamp = std::time::SystemTime::now()
1725 .duration_since(std::time::UNIX_EPOCH)
1726 .map_err(|e| {
1727 P2PError::Network(NetworkError::ProtocolError(
1728 format!("System time error: {}", e).into(),
1729 ))
1730 })?
1731 .as_secs();
1732
1733 let message = json!({
1735 "protocol": protocol,
1736 "data": data,
1737 "timestamp": timestamp
1738 });
1739
1740 serde_json::to_vec(&message).map_err(|e| {
1741 P2PError::Transport(crate::error::TransportError::StreamError(
1742 format!("Failed to serialize message: {e}").into(),
1743 ))
1744 })
1745}
1746
1747impl P2PNode {
1748 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1750 self.event_tx.subscribe()
1751 }
1752
1753 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1755 self.subscribe_events()
1756 }
1757
1758 pub fn uptime(&self) -> Duration {
1760 self.start_time.elapsed()
1761 }
1762
1763 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1765 self.mcp_server.as_ref()
1766 }
1767
1768 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1770 if let Some(ref _mcp_server) = self.mcp_server {
1771 let tool_name = tool.definition.name.clone();
1772 _mcp_server.register_tool(tool).await.map_err(|e| {
1773 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1774 format!("{}: Registration failed: {e}", tool_name).into(),
1775 ))
1776 })
1777 } else {
1778 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1779 "MCP server not enabled".to_string().into(),
1780 )))
1781 }
1782 }
1783
1784 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1786 if let Some(ref _mcp_server) = self.mcp_server {
1787 if let Some(ref resource_manager) = self.resource_manager
1789 && !resource_manager
1790 .check_rate_limit(&self.peer_id, "mcp")
1791 .await?
1792 {
1793 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1794 "MCP rate limit exceeded".to_string().into(),
1795 )));
1796 }
1797
1798 let context = MCPCallContext {
1799 caller_id: self.peer_id.clone(),
1800 timestamp: SystemTime::now(),
1801 timeout: Duration::from_secs(30),
1802 auth_info: None,
1803 metadata: HashMap::new(),
1804 };
1805
1806 _mcp_server
1807 .call_tool(tool_name, arguments, context)
1808 .await
1809 .map_err(|e| {
1810 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1811 format!("{}: Execution failed: {e}", tool_name).into(),
1812 ))
1813 })
1814 } else {
1815 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1816 "MCP server not enabled".to_string().into(),
1817 )))
1818 }
1819 }
1820
1821 pub async fn call_remote_mcp_tool(
1823 &self,
1824 peer_id: &PeerId,
1825 tool_name: &str,
1826 arguments: Value,
1827 ) -> Result<Value> {
1828 if let Some(ref _mcp_server) = self.mcp_server {
1829 if peer_id == &self.peer_id {
1831 let context = MCPCallContext {
1833 caller_id: self.peer_id.clone(),
1834 timestamp: SystemTime::now(),
1835 timeout: Duration::from_secs(30),
1836 auth_info: None,
1837 metadata: HashMap::new(),
1838 };
1839
1840 return _mcp_server.call_tool(tool_name, arguments, context).await;
1842 }
1843
1844 let context = MCPCallContext {
1848 caller_id: self.peer_id.clone(),
1849 timestamp: SystemTime::now(),
1850 timeout: Duration::from_secs(30),
1851 auth_info: None,
1852 metadata: HashMap::new(),
1853 };
1854
1855 match _mcp_server
1857 .call_tool(tool_name, arguments.clone(), context)
1858 .await
1859 {
1860 Ok(mut result) => {
1861 if let Value::Object(ref mut map) = result {
1863 map.insert("tool".to_string(), Value::String(tool_name.to_string()));
1864 }
1865 Ok(result)
1866 }
1867 Err(e) => Err(e),
1868 }
1869 } else {
1870 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1871 "MCP server not enabled".to_string().into(),
1872 )))
1873 }
1874 }
1875
1876 #[allow(dead_code)]
1878 async fn handle_mcp_remote_tool_call(
1879 &self,
1880 peer_id: &PeerId,
1881 tool_name: &str,
1882 arguments: Value,
1883 context: MCPCallContext,
1884 ) -> Result<Value> {
1885 let request_id = uuid::Uuid::new_v4().to_string();
1886
1887 let mcp_message = crate::mcp::MCPMessage::CallTool {
1889 name: tool_name.to_string(),
1890 arguments,
1891 };
1892
1893 let p2p_message = crate::mcp::P2PMCPMessage {
1895 message_type: crate::mcp::P2PMCPMessageType::Request,
1896 message_id: request_id.clone(),
1897 source_peer: context.caller_id.clone(),
1898 target_peer: Some(peer_id.clone()),
1899 timestamp: context
1900 .timestamp
1901 .duration_since(std::time::UNIX_EPOCH)
1902 .map_err(|e| {
1903 P2PError::Network(crate::error::NetworkError::ProtocolError(
1904 format!("Time error: {e}").into(),
1905 ))
1906 })?
1907 .as_secs(),
1908 payload: mcp_message,
1909 ttl: 5, };
1911
1912 let message_data = serde_json::to_vec(&p2p_message)
1914 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1915
1916 if message_data.len() > crate::mcp::MAX_MESSAGE_SIZE {
1917 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1918 "Message too large".to_string().into(),
1919 )));
1920 }
1921
1922 self.send_message(peer_id, MCP_PROTOCOL, message_data)
1924 .await?;
1925
1926 info!(
1928 "MCP remote tool call sent to peer {}, tool: {}",
1929 peer_id, tool_name
1930 );
1931
1932 Ok(serde_json::json!({
1935 "status": "sent",
1936 "message": "Remote tool call sent successfully",
1937 "peer_id": peer_id,
1938 "tool": tool_name, "request_id": request_id
1940 }))
1941 }
1942
1943 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1945 if let Some(ref _mcp_server) = self.mcp_server {
1946 let (tools, _) = _mcp_server.list_tools(None).await.map_err(|e| {
1947 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1948 format!("Failed to list tools: {e}").into(),
1949 ))
1950 })?;
1951
1952 Ok(tools.into_iter().map(|tool| tool.name).collect())
1953 } else {
1954 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1955 "MCP server not enabled".to_string().into(),
1956 )))
1957 }
1958 }
1959
1960 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1962 if let Some(ref _mcp_server) = self.mcp_server {
1963 _mcp_server.discover_remote_services().await.map_err(|e| {
1964 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1965 format!("Failed to discover services: {e}").into(),
1966 ))
1967 })
1968 } else {
1969 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1970 "MCP server not enabled".to_string().into(),
1971 )))
1972 }
1973 }
1974
1975 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1977 if let Some(ref _mcp_server) = self.mcp_server {
1978 if peer_id == &self.peer_id {
1980 return self.list_mcp_tools().await;
1981 }
1982
1983 self.list_mcp_tools().await
1987 } else {
1988 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1989 "MCP server not enabled".to_string().into(),
1990 )))
1991 }
1992 }
1993
1994 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1996 if let Some(ref _mcp_server) = self.mcp_server {
1997 Ok(_mcp_server.get_stats().await)
1998 } else {
1999 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2000 "MCP server not enabled".to_string().into(),
2001 )))
2002 }
2003 }
2004
2005 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2007 if let Some(ref resource_manager) = self.resource_manager {
2008 Ok(resource_manager.get_metrics().await)
2009 } else {
2010 Err(P2PError::Network(
2011 crate::error::NetworkError::ProtocolError(
2012 "Production resource manager not enabled".to_string().into(),
2013 ),
2014 ))
2015 }
2016 }
2017
2018 pub async fn health_check(&self) -> Result<()> {
2020 if let Some(ref resource_manager) = self.resource_manager {
2021 resource_manager.health_check().await
2022 } else {
2023 let peer_count = self.peer_count().await;
2025 if peer_count > self.config.max_connections {
2026 Err(P2PError::Network(
2027 crate::error::NetworkError::ProtocolError(
2028 format!("Too many connections: {peer_count}").into(),
2029 ),
2030 ))
2031 } else {
2032 Ok(())
2033 }
2034 }
2035 }
2036
2037 pub fn production_config(&self) -> Option<&ProductionConfig> {
2039 self.config.production_config.as_ref()
2040 }
2041
2042 pub fn is_production_mode(&self) -> bool {
2044 self.resource_manager.is_some()
2045 }
2046
2047 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2049 self.dht.as_ref()
2050 }
2051
2052 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2054 if let Some(ref dht) = self.dht {
2055 let dht_instance = dht.write().await;
2056 dht_instance
2057 .put(key.clone(), value.clone())
2058 .await
2059 .map_err(|e| {
2060 P2PError::Dht(crate::error::DhtError::StoreFailed(
2061 format!("{}: {e}", key).into(),
2062 ))
2063 })?;
2064
2065 Ok(())
2066 } else {
2067 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2068 "DHT not enabled".to_string().into(),
2069 )))
2070 }
2071 }
2072
2073 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2075 if let Some(ref dht) = self.dht {
2076 let dht_instance = dht.write().await;
2077 let record_result = dht_instance.get(&key).await;
2078
2079 let value = record_result.as_ref().map(|record| record.value.clone());
2080
2081 Ok(value)
2082 } else {
2083 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2084 "DHT not enabled".to_string().into(),
2085 )))
2086 }
2087 }
2088
2089 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2091 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2092 let mut manager = bootstrap_manager.write().await;
2093 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2094 .iter()
2095 .filter_map(|addr| addr.parse().ok())
2096 .collect();
2097 let contact = ContactEntry::new(peer_id, socket_addresses);
2098 manager.add_contact(contact).await.map_err(|e| {
2099 P2PError::Network(crate::error::NetworkError::ProtocolError(
2100 format!("Failed to add peer to bootstrap cache: {e}").into(),
2101 ))
2102 })?;
2103 }
2104 Ok(())
2105 }
2106
2107 pub async fn update_peer_metrics(
2109 &self,
2110 peer_id: &PeerId,
2111 success: bool,
2112 latency_ms: Option<u64>,
2113 _error: Option<String>,
2114 ) -> Result<()> {
2115 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2116 let mut manager = bootstrap_manager.write().await;
2117
2118 let metrics = QualityMetrics {
2120 success_rate: if success { 1.0 } else { 0.0 },
2121 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2122 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2124 last_successful_connection: if success {
2125 chrono::Utc::now()
2126 } else {
2127 chrono::Utc::now() - chrono::Duration::hours(1)
2128 },
2129 uptime_score: 0.5,
2130 };
2131
2132 manager
2133 .update_contact_metrics(peer_id, metrics)
2134 .await
2135 .map_err(|e| {
2136 P2PError::Network(crate::error::NetworkError::ProtocolError(
2137 format!("Failed to update peer metrics: {e}").into(),
2138 ))
2139 })?;
2140 }
2141 Ok(())
2142 }
2143
2144 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2146 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2147 let manager = bootstrap_manager.read().await;
2148 let stats = manager.get_stats().await.map_err(|e| {
2149 P2PError::Network(crate::error::NetworkError::ProtocolError(
2150 format!("Failed to get bootstrap stats: {e}").into(),
2151 ))
2152 })?;
2153 Ok(Some(stats))
2154 } else {
2155 Ok(None)
2156 }
2157 }
2158
2159 pub async fn cached_peer_count(&self) -> usize {
2161 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2162 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2163 {
2164 return stats.total_contacts;
2165 }
2166 0
2167 }
2168
2169 async fn connect_bootstrap_peers(&self) -> Result<()> {
2171 let mut bootstrap_contacts = Vec::new();
2172 let mut used_cache = false;
2173
2174 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2176 let manager = bootstrap_manager.read().await;
2177 match manager.get_bootstrap_peers(20).await {
2178 Ok(contacts) => {
2180 if !contacts.is_empty() {
2181 info!("Using {} cached bootstrap peers", contacts.len());
2182 bootstrap_contacts = contacts;
2183 used_cache = true;
2184 }
2185 }
2186 Err(e) => {
2187 warn!("Failed to get cached bootstrap peers: {}", e);
2188 }
2189 }
2190 }
2191
2192 if bootstrap_contacts.is_empty() {
2194 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2195 &self.config.bootstrap_peers_str
2196 } else {
2197 &self
2199 .config
2200 .bootstrap_peers
2201 .iter()
2202 .map(|addr| addr.to_string())
2203 .collect::<Vec<_>>()
2204 };
2205
2206 if bootstrap_peers.is_empty() {
2207 info!("No bootstrap peers configured and no cached peers available");
2208 return Ok(());
2209 }
2210
2211 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
2212
2213 for addr in bootstrap_peers {
2214 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2215 let contact = ContactEntry::new(
2216 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
2217 vec![socket_addr],
2218 );
2219 bootstrap_contacts.push(contact);
2220 } else {
2221 warn!("Invalid bootstrap address format: {}", addr);
2222 }
2223 }
2224 }
2225
2226 let mut successful_connections = 0;
2228 for contact in bootstrap_contacts {
2229 for addr in &contact.addresses {
2230 match self.connect_peer(&addr.to_string()).await {
2231 Ok(peer_id) => {
2232 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2233 successful_connections += 1;
2234
2235 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2237 let mut manager = bootstrap_manager.write().await;
2238 let mut updated_contact = contact.clone();
2239 updated_contact.peer_id = peer_id.clone();
2240 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2243 warn!("Failed to update bootstrap cache: {}", e);
2244 }
2245 }
2246 break; }
2248 Err(e) => {
2249 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2250
2251 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2253 let mut manager = bootstrap_manager.write().await;
2254 let mut updated_contact = contact.clone();
2255 updated_contact.update_connection_result(
2256 false,
2257 None,
2258 Some(e.to_string()),
2259 );
2260
2261 if let Err(e) = manager.add_contact(updated_contact).await {
2262 warn!("Failed to update bootstrap cache: {}", e);
2263 }
2264 }
2265 }
2266 }
2267 }
2268 }
2269
2270 if successful_connections == 0 {
2271 if !used_cache {
2272 warn!("Failed to connect to any bootstrap peers");
2273 }
2274 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2275 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2277 }));
2278 } else {
2279 info!(
2280 "Successfully connected to {} bootstrap peers",
2281 successful_connections
2282 );
2283 }
2284
2285 Ok(())
2286 }
2287
2288 async fn disconnect_all_peers(&self) -> Result<()> {
2290 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2291
2292 for peer_id in peer_ids {
2293 self.disconnect_peer(&peer_id).await?;
2294 }
2295
2296 Ok(())
2297 }
2298
2299 async fn periodic_tasks(&self) -> Result<()> {
2301 Ok(())
2307 }
2308
2309 pub async fn discover_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2311 if let Some(ref _mcp_server) = self.mcp_server {
2312 _mcp_server.discover_remote_services().await
2313 } else {
2314 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2315 "MCP server not enabled".to_string().into(),
2316 )))
2317 }
2318 }
2319
2320 pub async fn get_all_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2322 if let Some(ref _mcp_server) = self.mcp_server {
2323 _mcp_server.get_all_services().await
2324 } else {
2325 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2326 "MCP server not enabled".to_string().into(),
2327 )))
2328 }
2329 }
2330
2331 pub async fn find_mcp_services_with_tool(
2333 &self,
2334 tool_name: &str,
2335 ) -> Result<Vec<crate::mcp::MCPService>> {
2336 if let Some(ref _mcp_server) = self.mcp_server {
2337 _mcp_server.find_services_with_tool(tool_name).await
2338 } else {
2339 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2340 "MCP server not enabled".to_string().into(),
2341 )))
2342 }
2343 }
2344
2345 pub async fn announce_mcp_services(&self) -> Result<()> {
2347 if let Some(ref _mcp_server) = self.mcp_server {
2348 _mcp_server.announce_local_services().await
2349 } else {
2350 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2351 "MCP server not enabled".to_string().into(),
2352 )))
2353 }
2354 }
2355
2356 pub async fn refresh_mcp_service_discovery(&self) -> Result<()> {
2358 if let Some(ref _mcp_server) = self.mcp_server {
2359 _mcp_server.refresh_service_discovery().await
2360 } else {
2361 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2362 "MCP server not enabled".to_string().into(),
2363 )))
2364 }
2365 }
2366
2367 pub async fn query_peer_mcp_services(&self, peer_id: &PeerId) -> Result<()> {
2369 if self.mcp_server.is_none() {
2370 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2371 "MCP server not enabled".to_string().into(),
2372 )));
2373 }
2374
2375 let discovery_query = crate::mcp::P2PMCPMessage {
2376 message_type: crate::mcp::P2PMCPMessageType::ServiceDiscovery,
2377 message_id: uuid::Uuid::new_v4().to_string(),
2378 source_peer: self.peer_id.clone(),
2379 target_peer: Some(peer_id.clone()),
2380 timestamp: std::time::SystemTime::now()
2381 .duration_since(std::time::UNIX_EPOCH)
2382 .unwrap_or_default()
2383 .as_secs(),
2384 payload: crate::mcp::MCPMessage::ListTools { cursor: None },
2385 ttl: 3,
2386 };
2387
2388 let query_data = serde_json::to_vec(&discovery_query)
2389 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2390
2391 self.send_message(peer_id, MCP_PROTOCOL, query_data).await?;
2392 debug!("Sent MCP service discovery query to peer {}", peer_id);
2393
2394 Ok(())
2395 }
2396
2397 pub async fn broadcast_mcp_service_discovery(&self) -> Result<()> {
2399 if self.mcp_server.is_none() {
2400 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2401 "MCP server not enabled".to_string().into(),
2402 )));
2403 }
2404
2405 let peer_list: Vec<PeerId> = {
2407 let peers_guard = self.peers.read().await;
2408 peers_guard.keys().cloned().collect()
2409 };
2410
2411 if peer_list.is_empty() {
2412 debug!("No peers connected for MCP service discovery broadcast");
2413 return Ok(());
2414 }
2415
2416 let mut successful_queries = 0;
2418 for peer_id in &peer_list {
2419 match self.query_peer_mcp_services(peer_id).await {
2420 Ok(_) => {
2421 successful_queries += 1;
2422 debug!("Sent MCP service discovery query to peer: {}", peer_id);
2423 }
2424 Err(e) => {
2425 warn!(
2426 "Failed to send MCP service discovery query to peer {}: {}",
2427 peer_id, e
2428 );
2429 }
2430 }
2431 }
2432
2433 info!(
2434 "Broadcast MCP service discovery to {}/{} connected peers",
2435 successful_queries,
2436 peer_list.len()
2437 );
2438
2439 Ok(())
2440 }
2441}
2442
2443#[derive(Clone)]
2445pub struct P2PNetworkSender {
2446 peer_id: PeerId,
2447 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2449}
2450
2451impl P2PNetworkSender {
2452 pub fn new(
2453 peer_id: PeerId,
2454 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2455 ) -> Self {
2456 Self { peer_id, send_tx }
2457 }
2458}
2459
2460#[async_trait::async_trait]
2462impl NetworkSender for P2PNetworkSender {
2463 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2465 self.send_tx
2466 .send((peer_id.clone(), protocol.to_string(), data))
2467 .map_err(|_| {
2468 P2PError::Network(crate::error::NetworkError::ProtocolError(
2469 "Failed to send message via channel".to_string().into(),
2470 ))
2471 })?;
2472 Ok(())
2473 }
2474
2475 fn local_peer_id(&self) -> &PeerId {
2477 &self.peer_id
2478 }
2479}
2480
2481pub struct NodeBuilder {
2483 config: NodeConfig,
2484}
2485
2486impl Default for NodeBuilder {
2487 fn default() -> Self {
2488 Self::new()
2489 }
2490}
2491
2492impl NodeBuilder {
2493 pub fn new() -> Self {
2495 Self {
2496 config: NodeConfig::default(),
2497 }
2498 }
2499
2500 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2502 self.config.peer_id = Some(peer_id);
2503 self
2504 }
2505
2506 pub fn listen_on(mut self, addr: &str) -> Self {
2508 if let Ok(multiaddr) = addr.parse() {
2509 self.config.listen_addrs.push(multiaddr);
2510 }
2511 self
2512 }
2513
2514 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2516 if let Ok(multiaddr) = addr.parse() {
2517 self.config.bootstrap_peers.push(multiaddr);
2518 }
2519 self.config.bootstrap_peers_str.push(addr.to_string());
2520 self
2521 }
2522
2523 pub fn with_ipv6(mut self, enable: bool) -> Self {
2525 self.config.enable_ipv6 = enable;
2526 self
2527 }
2528
2529 pub fn with_mcp_server(mut self) -> Self {
2531 self.config.enable_mcp_server = true;
2532 self
2533 }
2534
2535 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
2537 self.config.mcp_server_config = Some(mcp_config);
2538 self.config.enable_mcp_server = true;
2539 self
2540 }
2541
2542 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2544 self.config.connection_timeout = timeout;
2545 self
2546 }
2547
2548 pub fn with_max_connections(mut self, max: usize) -> Self {
2550 self.config.max_connections = max;
2551 self
2552 }
2553
2554 pub fn with_production_mode(mut self) -> Self {
2556 self.config.production_config = Some(ProductionConfig::default());
2557 self
2558 }
2559
2560 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2562 self.config.production_config = Some(production_config);
2563 self
2564 }
2565
2566 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2568 self.config.dht_config = dht_config;
2569 self
2570 }
2571
2572 pub fn with_default_dht(mut self) -> Self {
2574 self.config.dht_config = DHTConfig::default();
2575 self
2576 }
2577
2578 pub async fn build(self) -> Result<P2PNode> {
2580 P2PNode::new(self.config).await
2581 }
2582}
2583
2584#[allow(dead_code)] async fn handle_received_message_standalone(
2587 message_data: Vec<u8>,
2588 peer_id: &PeerId,
2589 protocol: &str,
2590 event_tx: &broadcast::Sender<P2PEvent>,
2591 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2592) -> Result<()> {
2593 if protocol == MCP_PROTOCOL {
2595 return handle_mcp_message_standalone(message_data, peer_id, mcp_server).await;
2596 }
2597
2598 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2600 Ok(message) => {
2601 if let (Some(protocol), Some(data), Some(from)) = (
2602 message.get("protocol").and_then(|v| v.as_str()),
2603 message.get("data").and_then(|v| v.as_array()),
2604 message.get("from").and_then(|v| v.as_str()),
2605 ) {
2606 let data_bytes: Vec<u8> = data
2608 .iter()
2609 .filter_map(|v| v.as_u64().map(|n| n as u8))
2610 .collect();
2611
2612 let event = P2PEvent::Message {
2614 topic: protocol.to_string(),
2615 source: from.to_string(),
2616 data: data_bytes,
2617 };
2618
2619 let _ = event_tx.send(event);
2620 debug!("Generated message event from peer: {}", peer_id);
2621 }
2622 }
2623 Err(e) => {
2624 warn!("Failed to parse received message from {}: {}", peer_id, e);
2625 }
2626 }
2627
2628 Ok(())
2629}
2630
2631#[allow(dead_code)] async fn handle_mcp_message_standalone(
2634 message_data: Vec<u8>,
2635 peer_id: &PeerId,
2636 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2637) -> Result<()> {
2638 if let Some(_mcp_server) = mcp_server {
2639 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
2641 Ok(p2p_mcp_message) => {
2642 use crate::mcp::P2PMCPMessageType;
2644 match p2p_mcp_message.message_type {
2645 P2PMCPMessageType::Request => {
2646 debug!("Received MCP request from peer {}", peer_id);
2647 }
2650 P2PMCPMessageType::Response => {
2651 debug!("Received MCP response from peer {}", peer_id);
2652 }
2654 P2PMCPMessageType::ServiceAdvertisement => {
2655 debug!("Received service advertisement from peer {}", peer_id);
2656 }
2658 P2PMCPMessageType::ServiceDiscovery => {
2659 debug!("Received service discovery query from peer {}", peer_id);
2660 }
2662 P2PMCPMessageType::Heartbeat => {
2663 debug!("Received heartbeat from peer {}", peer_id);
2664 }
2666 P2PMCPMessageType::HealthCheck => {
2667 debug!("Received health check from peer {}", peer_id);
2668 }
2670 }
2671 }
2672 Err(e) => {
2673 warn!(
2674 "Failed to deserialize MCP message from peer {}: {}",
2675 peer_id, e
2676 );
2677 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
2678 format!("Invalid MCP message: {e}").into(),
2679 )));
2680 }
2681 }
2682 } else {
2683 warn!("Received MCP message but MCP server is not enabled");
2684 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2685 "MCP server not enabled".to_string().into(),
2686 )));
2687 }
2688
2689 Ok(())
2690}
2691
2692fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2694 match create_protocol_message_static(protocol, data) {
2695 Ok(msg) => Some(msg),
2696 Err(e) => {
2697 warn!("Failed to create protocol message: {}", e);
2698 None
2699 }
2700 }
2701}
2702
2703async fn handle_message_send_result(result: Result<()>, peer_id: &PeerId) {
2705 match result {
2706 Ok(_) => {
2707 debug!("Message sent to peer {} via transport layer", peer_id);
2708 }
2709 Err(e) => {
2710 warn!("Failed to send message to peer {}: {}", peer_id, e);
2711 }
2712 }
2713}
2714
2715#[allow(dead_code)] fn check_rate_limit(
2718 rate_limiter: &RateLimiter,
2719 socket_addr: &std::net::SocketAddr,
2720 remote_addr: &NetworkAddress,
2721) -> Result<()> {
2722 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2723 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2724 e
2725 })
2726}
2727
2728#[allow(dead_code)] async fn register_new_peer(
2731 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2732 peer_id: &PeerId,
2733 remote_addr: &NetworkAddress,
2734) {
2735 let mut peers_guard = peers.write().await;
2736 let peer_info = PeerInfo {
2737 peer_id: peer_id.clone(),
2738 addresses: vec![remote_addr.to_string()],
2739 connected_at: tokio::time::Instant::now(),
2740 last_seen: tokio::time::Instant::now(),
2741 status: ConnectionStatus::Connected,
2742 protocols: vec!["p2p-chat/1.0.0".to_string()],
2743 heartbeat_count: 0,
2744 };
2745 peers_guard.insert(peer_id.clone(), peer_info);
2746}
2747
2748#[allow(dead_code)] fn spawn_connection_handler(
2751 connection: Box<dyn crate::transport::Connection>,
2752 peer_id: PeerId,
2753 event_tx: broadcast::Sender<P2PEvent>,
2754 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2755 mcp_server: Option<Arc<MCPServer>>,
2756) {
2757 tokio::spawn(async move {
2758 handle_peer_connection(connection, peer_id, event_tx, peers, mcp_server).await;
2759 });
2760}
2761
2762#[allow(dead_code)] async fn handle_peer_connection(
2765 mut connection: Box<dyn crate::transport::Connection>,
2766 peer_id: PeerId,
2767 event_tx: broadcast::Sender<P2PEvent>,
2768 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2769 mcp_server: Option<Arc<MCPServer>>,
2770) {
2771 loop {
2772 match connection.receive().await {
2773 Ok(message_data) => {
2774 debug!(
2775 "Received {} bytes from peer: {}",
2776 message_data.len(),
2777 peer_id
2778 );
2779
2780 if let Err(e) = handle_received_message_standalone(
2782 message_data,
2783 &peer_id,
2784 "unknown", &event_tx,
2786 &mcp_server,
2787 )
2788 .await
2789 {
2790 warn!("Failed to handle message from {}: {}", peer_id, e);
2791 }
2792 }
2793 Err(e) => {
2794 warn!("Failed to receive message from {}: {}", peer_id, e);
2795
2796 if !connection.is_alive().await {
2798 info!("Connection to {} is dead, removing peer", peer_id);
2799
2800 remove_peer(&peers, &peer_id).await;
2802
2803 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2805
2806 break; }
2808
2809 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2811 }
2812 }
2813 }
2814}
2815
2816#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2819 let mut peers_guard = peers.write().await;
2820 peers_guard.remove(peer_id);
2821}
2822
2823async fn update_peer_heartbeat(
2825 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2826 peer_id: &PeerId,
2827) -> Result<()> {
2828 let mut peers_guard = peers.write().await;
2829 match peers_guard.get_mut(peer_id) {
2830 Some(peer_info) => {
2831 peer_info.last_seen = Instant::now();
2832 peer_info.heartbeat_count += 1;
2833 Ok(())
2834 }
2835 None => {
2836 warn!("Received heartbeat from unknown peer: {}", peer_id);
2837 Err(P2PError::Network(NetworkError::PeerNotFound(
2838 format!("Peer {} not found", peer_id).into(),
2839 )))
2840 }
2841 }
2842}
2843
2844async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f32) {
2846 if let Some(manager) = resource_manager {
2847 let metrics = manager.get_metrics().await;
2848 (metrics.memory_used, metrics.cpu_usage as f32)
2849 } else {
2850 (0, 0.0)
2851 }
2852}
2853
2854#[cfg(test)]
2855mod tests {
2856 use super::*;
2857 use crate::mcp::{
2858 MCPTool, Tool, ToolHandler, ToolHealthStatus, ToolMetadata, ToolRequirements,
2859 };
2860 use serde_json::json;
2861 use std::future::Future;
2862 use std::pin::Pin;
2863 use std::time::Duration;
2864 use tokio::time::timeout;
2865
2866 struct NetworkTestTool {
2868 name: String,
2869 }
2870
2871 impl NetworkTestTool {
2872 fn new(name: &str) -> Self {
2873 Self {
2874 name: name.to_string(),
2875 }
2876 }
2877 }
2878
2879 impl ToolHandler for NetworkTestTool {
2880 fn execute(
2881 &self,
2882 arguments: serde_json::Value,
2883 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
2884 let name = self.name.clone();
2885 Box::pin(async move {
2886 Ok(json!({
2887 "tool": name,
2888 "input": arguments,
2889 "result": "network test success"
2890 }))
2891 })
2892 }
2893
2894 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
2895 Ok(())
2896 }
2897
2898 fn get_requirements(&self) -> ToolRequirements {
2899 ToolRequirements::default()
2900 }
2901 }
2902
2903 fn create_test_node_config() -> NodeConfig {
2905 NodeConfig {
2906 peer_id: Some("test_peer_123".to_string()),
2907 listen_addrs: vec![
2908 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2909 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2910 ],
2911 listen_addr: std::net::SocketAddr::new(
2912 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2913 0,
2914 ),
2915 bootstrap_peers: vec![],
2916 bootstrap_peers_str: vec![],
2917 enable_ipv6: true,
2918 enable_mcp_server: true,
2919 mcp_server_config: Some(MCPServerConfig {
2920 enable_auth: false, enable_rate_limiting: false, ..Default::default()
2923 }),
2924 connection_timeout: Duration::from_secs(10),
2925 keep_alive_interval: Duration::from_secs(30),
2926 max_connections: 100,
2927 max_incoming_connections: 50,
2928 dht_config: DHTConfig::default(),
2929 security_config: SecurityConfig::default(),
2930 production_config: None,
2931 bootstrap_cache_config: None,
2932 identity_config: None,
2933 }
2934 }
2935
2936 fn create_test_tool(name: &str) -> Tool {
2938 Tool {
2939 definition: MCPTool {
2940 name: name.to_string(),
2941 description: format!("Test tool: {}", name).into(),
2942 input_schema: json!({
2943 "type": "object",
2944 "properties": {
2945 "input": { "type": "string" }
2946 }
2947 }),
2948 },
2949 handler: Box::new(NetworkTestTool::new(name)),
2950 metadata: ToolMetadata {
2951 created_at: SystemTime::now(),
2952 last_called: None,
2953 call_count: 0,
2954 avg_execution_time: Duration::from_millis(0),
2955 health_status: ToolHealthStatus::Healthy,
2956 tags: vec!["test".to_string()],
2957 },
2958 }
2959 }
2960
2961 #[tokio::test]
2962 async fn test_node_config_default() {
2963 let config = NodeConfig::default();
2964
2965 assert!(config.peer_id.is_none());
2966 assert_eq!(config.listen_addrs.len(), 2);
2967 assert!(config.enable_ipv6);
2968 assert!(config.enable_mcp_server);
2969 assert_eq!(config.max_connections, 1000);
2970 assert_eq!(config.max_incoming_connections, 100);
2971 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2972 }
2973
2974 #[tokio::test]
2975 async fn test_dht_config_default() {
2976 let config = DHTConfig::default();
2977
2978 assert_eq!(config.k_value, 20);
2979 assert_eq!(config.alpha_value, 5);
2980 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2981 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2982 }
2983
2984 #[tokio::test]
2985 async fn test_security_config_default() {
2986 let config = SecurityConfig::default();
2987
2988 assert!(config.enable_noise);
2989 assert!(config.enable_tls);
2990 assert_eq!(config.trust_level, TrustLevel::Basic);
2991 }
2992
2993 #[test]
2994 fn test_trust_level_variants() {
2995 let _none = TrustLevel::None;
2997 let _basic = TrustLevel::Basic;
2998 let _full = TrustLevel::Full;
2999
3000 assert_eq!(TrustLevel::None, TrustLevel::None);
3002 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3003 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3004 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3005 }
3006
3007 #[test]
3008 fn test_connection_status_variants() {
3009 let connecting = ConnectionStatus::Connecting;
3010 let connected = ConnectionStatus::Connected;
3011 let disconnecting = ConnectionStatus::Disconnecting;
3012 let disconnected = ConnectionStatus::Disconnected;
3013 let failed = ConnectionStatus::Failed("test error".to_string());
3014
3015 assert_eq!(connecting, ConnectionStatus::Connecting);
3016 assert_eq!(connected, ConnectionStatus::Connected);
3017 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3018 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3019 assert_ne!(connecting, connected);
3020
3021 if let ConnectionStatus::Failed(msg) = failed {
3022 assert_eq!(msg, "test error");
3023 } else {
3024 panic!("Expected Failed status");
3025 }
3026 }
3027
3028 #[tokio::test]
3029 async fn test_node_creation() -> Result<()> {
3030 let config = create_test_node_config();
3031 let node = P2PNode::new(config).await?;
3032
3033 assert_eq!(node.peer_id(), "test_peer_123");
3034 assert!(!node.is_running().await);
3035 assert_eq!(node.peer_count().await, 0);
3036 assert!(node.connected_peers().await.is_empty());
3037
3038 Ok(())
3039 }
3040
3041 #[tokio::test]
3042 async fn test_node_creation_without_peer_id() -> Result<()> {
3043 let mut config = create_test_node_config();
3044 config.peer_id = None;
3045
3046 let node = P2PNode::new(config).await?;
3047
3048 assert!(node.peer_id().starts_with("peer_"));
3050 assert!(!node.is_running().await);
3051
3052 Ok(())
3053 }
3054
3055 #[tokio::test]
3056 async fn test_node_lifecycle() -> Result<()> {
3057 let config = create_test_node_config();
3058 let node = P2PNode::new(config).await?;
3059
3060 assert!(!node.is_running().await);
3062
3063 node.start().await?;
3065 assert!(node.is_running().await);
3066
3067 let listen_addrs = node.listen_addrs().await;
3069 assert!(
3070 !listen_addrs.is_empty(),
3071 "Expected at least one listening address"
3072 );
3073
3074 node.stop().await?;
3076 assert!(!node.is_running().await);
3077
3078 Ok(())
3079 }
3080
3081 #[tokio::test]
3082 async fn test_peer_connection() -> Result<()> {
3083 let config = create_test_node_config();
3084 let node = P2PNode::new(config).await?;
3085
3086 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3087
3088 let peer_id = node.connect_peer(&peer_addr).await?;
3090 assert!(peer_id.starts_with("peer_from_"));
3091
3092 assert_eq!(node.peer_count().await, 1);
3094
3095 let connected_peers = node.connected_peers().await;
3097 assert_eq!(connected_peers.len(), 1);
3098 assert_eq!(connected_peers[0], peer_id);
3099
3100 let peer_info = node.peer_info(&peer_id).await;
3102 assert!(peer_info.is_some());
3103 let info = peer_info.expect("Peer info should exist after adding peer");
3104 assert_eq!(info.peer_id, peer_id);
3105 assert_eq!(info.status, ConnectionStatus::Connected);
3106 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3107
3108 node.disconnect_peer(&peer_id).await?;
3110 assert_eq!(node.peer_count().await, 0);
3111
3112 Ok(())
3113 }
3114
3115 #[tokio::test]
3116 async fn test_event_subscription() -> Result<()> {
3117 let config = create_test_node_config();
3118 let node = P2PNode::new(config).await?;
3119
3120 let mut events = node.subscribe_events();
3121 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3122
3123 let peer_id = node.connect_peer(&peer_addr).await?;
3125
3126 let event = timeout(Duration::from_millis(100), events.recv()).await;
3128 assert!(event.is_ok());
3129
3130 let event_result = event
3131 .expect("Should receive event")
3132 .expect("Event should not be error");
3133 match event_result {
3134 P2PEvent::PeerConnected(event_peer_id) => {
3135 assert_eq!(event_peer_id, peer_id);
3136 }
3137 _ => panic!("Expected PeerConnected event"),
3138 }
3139
3140 node.disconnect_peer(&peer_id).await?;
3142
3143 let event = timeout(Duration::from_millis(100), events.recv()).await;
3145 assert!(event.is_ok());
3146
3147 let event_result = event
3148 .expect("Should receive event")
3149 .expect("Event should not be error");
3150 match event_result {
3151 P2PEvent::PeerDisconnected(event_peer_id) => {
3152 assert_eq!(event_peer_id, peer_id);
3153 }
3154 _ => panic!("Expected PeerDisconnected event"),
3155 }
3156
3157 Ok(())
3158 }
3159
3160 #[tokio::test]
3161 async fn test_message_sending() -> Result<()> {
3162 let mut config1 = create_test_node_config();
3164 config1.listen_addr =
3165 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3166 let node1 = P2PNode::new(config1).await?;
3167 node1.start().await?;
3168
3169 let mut config2 = create_test_node_config();
3170 config2.listen_addr =
3171 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3172 let node2 = P2PNode::new(config2).await?;
3173 node2.start().await?;
3174
3175 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3177
3178 let node2_addr = node2.local_addr().ok_or_else(|| {
3180 P2PError::Network(crate::error::NetworkError::ProtocolError(
3181 "No listening address".to_string().into(),
3182 ))
3183 })?;
3184
3185 let peer_id = node1.connect_peer(&node2_addr).await?;
3187
3188 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3190
3191 let message_data = b"Hello, peer!".to_vec();
3193 let result = node1
3194 .send_message(&peer_id, "test-protocol", message_data)
3195 .await;
3196 if let Err(e) = &result {
3199 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3200 }
3201
3202 let non_existent_peer = "non_existent_peer".to_string();
3204 let result = node1
3205 .send_message(&non_existent_peer, "test-protocol", vec![])
3206 .await;
3207 assert!(result.is_err());
3208 assert!(result.unwrap_err().to_string().contains("not connected"));
3209
3210 Ok(())
3211 }
3212
3213 #[tokio::test]
3214 async fn test_mcp_integration() -> Result<()> {
3215 let config = create_test_node_config();
3216 let node = P2PNode::new(config).await?;
3217
3218 node.start().await?;
3220
3221 let tool = create_test_tool("network_test_tool");
3223 node.register_mcp_tool(tool).await?;
3224
3225 let tools = node.list_mcp_tools().await?;
3227 assert!(tools.contains(&"network_test_tool".to_string()));
3228
3229 let arguments = json!({"input": "test_input"});
3231 let result = node
3232 .call_mcp_tool("network_test_tool", arguments.clone())
3233 .await?;
3234 assert_eq!(result["tool"], "network_test_tool");
3235 assert_eq!(result["input"], arguments);
3236
3237 let stats = node.mcp_stats().await?;
3239 assert_eq!(stats.total_tools, 1);
3240
3241 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
3243 assert!(result.is_err());
3244
3245 node.stop().await?;
3246 Ok(())
3247 }
3248
3249 #[tokio::test]
3250 async fn test_remote_mcp_operations() -> Result<()> {
3251 let config = create_test_node_config();
3252 let node = P2PNode::new(config).await?;
3253
3254 node.start().await?;
3255
3256 let tool = create_test_tool("remote_test_tool");
3258 node.register_mcp_tool(tool).await?;
3259
3260 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
3261 let peer_id = node.connect_peer(&peer_addr).await?;
3262
3263 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
3265 assert!(!remote_tools.is_empty());
3266
3267 let arguments = json!({"input": "remote_test"});
3269 let result = node
3270 .call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone())
3271 .await?;
3272 assert_eq!(result["tool"], "remote_test_tool");
3273
3274 let services = node.discover_remote_mcp_services().await?;
3276 assert!(services.is_empty());
3278
3279 node.stop().await?;
3280 Ok(())
3281 }
3282
3283 #[tokio::test]
3284 async fn test_health_check() -> Result<()> {
3285 let config = create_test_node_config();
3286 let node = P2PNode::new(config).await?;
3287
3288 let result = node.health_check().await;
3290 assert!(result.is_ok());
3291
3292 for i in 0..5 {
3294 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
3295 node.connect_peer(&addr).await?;
3296 }
3297
3298 let result = node.health_check().await;
3300 assert!(result.is_ok());
3301
3302 Ok(())
3303 }
3304
3305 #[tokio::test]
3306 async fn test_node_uptime() -> Result<()> {
3307 let config = create_test_node_config();
3308 let node = P2PNode::new(config).await?;
3309
3310 let uptime1 = node.uptime();
3311 assert!(uptime1 >= Duration::from_secs(0));
3312
3313 tokio::time::sleep(Duration::from_millis(10)).await;
3315
3316 let uptime2 = node.uptime();
3317 assert!(uptime2 > uptime1);
3318
3319 Ok(())
3320 }
3321
3322 #[tokio::test]
3323 async fn test_node_config_access() -> Result<()> {
3324 let config = create_test_node_config();
3325 let expected_peer_id = config.peer_id.clone();
3326 let node = P2PNode::new(config).await?;
3327
3328 let node_config = node.config();
3329 assert_eq!(node_config.peer_id, expected_peer_id);
3330 assert_eq!(node_config.max_connections, 100);
3331 assert!(node_config.enable_mcp_server);
3332
3333 Ok(())
3334 }
3335
3336 #[tokio::test]
3337 async fn test_mcp_server_access() -> Result<()> {
3338 let config = create_test_node_config();
3339 let node = P2PNode::new(config).await?;
3340
3341 assert!(node.mcp_server().is_some());
3343
3344 let mut config = create_test_node_config();
3346 config.enable_mcp_server = false;
3347 let node_no_mcp = P2PNode::new(config).await?;
3348 assert!(node_no_mcp.mcp_server().is_none());
3349
3350 Ok(())
3351 }
3352
3353 #[tokio::test]
3354 async fn test_dht_access() -> Result<()> {
3355 let config = create_test_node_config();
3356 let node = P2PNode::new(config).await?;
3357
3358 assert!(node.dht().is_some());
3360
3361 Ok(())
3362 }
3363
3364 #[tokio::test]
3365 async fn test_node_builder() -> Result<()> {
3366 let node = P2PNode::builder()
3367 .with_peer_id("builder_test_peer".to_string())
3368 .listen_on("/ip4/127.0.0.1/tcp/9100")
3369 .listen_on("/ip6/::1/tcp/9100")
3370 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
3371 .with_ipv6(true)
3372 .with_mcp_server()
3373 .with_connection_timeout(Duration::from_secs(15))
3374 .with_max_connections(200)
3375 .build()
3376 .await?;
3377
3378 assert_eq!(node.peer_id(), "builder_test_peer");
3379 let config = node.config();
3380 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
3382 assert!(config.enable_ipv6);
3383 assert!(config.enable_mcp_server);
3384 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3385 assert_eq!(config.max_connections, 200);
3386
3387 Ok(())
3388 }
3389
3390 #[tokio::test]
3391 async fn test_node_builder_with_mcp_config() -> Result<()> {
3392 let mcp_config = MCPServerConfig {
3393 server_name: "test_mcp_server".to_string(),
3394 server_version: "1.0.0".to_string(),
3395 enable_dht_discovery: false,
3396 enable_auth: false,
3397 ..MCPServerConfig::default()
3398 };
3399
3400 let node = P2PNode::builder()
3401 .with_peer_id("mcp_config_test".to_string())
3402 .with_mcp_config(mcp_config.clone())
3403 .build()
3404 .await?;
3405
3406 assert_eq!(node.peer_id(), "mcp_config_test");
3407 let config = node.config();
3408 assert!(config.enable_mcp_server);
3409 assert!(config.mcp_server_config.is_some());
3410
3411 let node_mcp_config = config
3412 .mcp_server_config
3413 .as_ref()
3414 .expect("MCP server config should be present in test config");
3415 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
3416 assert!(!node_mcp_config.enable_auth);
3417
3418 Ok(())
3419 }
3420
3421 #[tokio::test]
3422 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
3423 let mut config = create_test_node_config();
3424 config.enable_mcp_server = false;
3425 let node = P2PNode::new(config).await?;
3426
3427 let tool = create_test_tool("test_tool");
3429 let result = node.register_mcp_tool(tool).await;
3430 assert!(result.is_err());
3431 assert!(
3432 result
3433 .unwrap_err()
3434 .to_string()
3435 .contains("MCP server not enabled")
3436 );
3437
3438 let result = node.call_mcp_tool("test_tool", json!({})).await;
3439 assert!(result.is_err());
3440 assert!(
3441 result
3442 .unwrap_err()
3443 .to_string()
3444 .contains("MCP server not enabled")
3445 );
3446
3447 let result = node.list_mcp_tools().await;
3448 assert!(result.is_err());
3449 assert!(
3450 result
3451 .unwrap_err()
3452 .to_string()
3453 .contains("MCP server not enabled")
3454 );
3455
3456 let result = node.mcp_stats().await;
3457 assert!(result.is_err());
3458 assert!(
3459 result
3460 .unwrap_err()
3461 .to_string()
3462 .contains("MCP server not enabled")
3463 );
3464
3465 Ok(())
3466 }
3467
3468 #[tokio::test]
3469 async fn test_bootstrap_peers() -> Result<()> {
3470 let mut config = create_test_node_config();
3471 config.bootstrap_peers = vec![
3472 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3473 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3474 ];
3475
3476 let node = P2PNode::new(config).await?;
3477
3478 node.start().await?;
3480
3481 let peer_count = node.peer_count().await;
3484 assert!(
3485 peer_count <= 2,
3486 "Peer count should not exceed bootstrap peer count"
3487 );
3488
3489 node.stop().await?;
3490 Ok(())
3491 }
3492
3493 #[tokio::test]
3494 async fn test_production_mode_disabled() -> Result<()> {
3495 let config = create_test_node_config();
3496 let node = P2PNode::new(config).await?;
3497
3498 assert!(!node.is_production_mode());
3499 assert!(node.production_config().is_none());
3500
3501 let result = node.resource_metrics().await;
3503 assert!(result.is_err());
3504 assert!(result.unwrap_err().to_string().contains("not enabled"));
3505
3506 Ok(())
3507 }
3508
3509 #[tokio::test]
3510 async fn test_network_event_variants() {
3511 let peer_id = "test_peer".to_string();
3513 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3514
3515 let _peer_connected = NetworkEvent::PeerConnected {
3516 peer_id: peer_id.clone(),
3517 addresses: vec![address.clone()],
3518 };
3519
3520 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3521 peer_id: peer_id.clone(),
3522 reason: "test disconnect".to_string(),
3523 };
3524
3525 let _message_received = NetworkEvent::MessageReceived {
3526 peer_id: peer_id.clone(),
3527 protocol: "test-protocol".to_string(),
3528 data: vec![1, 2, 3],
3529 };
3530
3531 let _connection_failed = NetworkEvent::ConnectionFailed {
3532 peer_id: Some(peer_id.clone()),
3533 address: address.clone(),
3534 error: "connection refused".to_string(),
3535 };
3536
3537 let _dht_stored = NetworkEvent::DHTRecordStored {
3538 key: vec![1, 2, 3],
3539 value: vec![4, 5, 6],
3540 };
3541
3542 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3543 key: vec![1, 2, 3],
3544 value: Some(vec![4, 5, 6]),
3545 };
3546 }
3547
3548 #[tokio::test]
3549 async fn test_peer_info_structure() {
3550 let peer_info = PeerInfo {
3551 peer_id: "test_peer".to_string(),
3552 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3553 connected_at: Instant::now(),
3554 last_seen: Instant::now(),
3555 status: ConnectionStatus::Connected,
3556 protocols: vec!["test-protocol".to_string()],
3557 heartbeat_count: 0,
3558 };
3559
3560 assert_eq!(peer_info.peer_id, "test_peer");
3561 assert_eq!(peer_info.addresses.len(), 1);
3562 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3563 assert_eq!(peer_info.protocols.len(), 1);
3564 }
3565
3566 #[tokio::test]
3567 async fn test_serialization() -> Result<()> {
3568 let config = create_test_node_config();
3570 let serialized = serde_json::to_string(&config)?;
3571 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3572
3573 assert_eq!(config.peer_id, deserialized.peer_id);
3574 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3575 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3576
3577 Ok(())
3578 }
3579}