1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23use crate::identity::manager::IdentityManagerConfig;
24use crate::mcp::{
25 HealthMonitorConfig, MCP_PROTOCOL, MCPCallContext, MCPServer, MCPServerConfig, NetworkSender,
26 Tool,
27};
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::P2PNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::HashMap;
38use std::sync::Arc;
39use std::time::{Duration, SystemTime};
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, info, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub enable_mcp_server: bool,
67
68 pub mcp_server_config: Option<MCPServerConfig>,
70
71 pub connection_timeout: Duration,
73
74 pub keep_alive_interval: Duration,
76
77 pub max_connections: usize,
79
80 pub max_incoming_connections: usize,
82
83 pub dht_config: DHTConfig,
85
86 pub security_config: SecurityConfig,
88
89 pub production_config: Option<ProductionConfig>,
91
92 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
94
95 pub identity_config: Option<IdentityManagerConfig>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct DHTConfig {
102 pub k_value: usize,
104
105 pub alpha_value: usize,
107
108 pub record_ttl: Duration,
110
111 pub refresh_interval: Duration,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SecurityConfig {
118 pub enable_noise: bool,
120
121 pub enable_tls: bool,
123
124 pub trust_level: TrustLevel,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
130pub enum TrustLevel {
131 None,
133 Basic,
135 Full,
137}
138
139impl NodeConfig {
140 pub fn new() -> Result<Self> {
146 let config = Config::default();
148
149 let listen_addr = config.listen_socket_addr()?;
151
152 let mut listen_addrs = vec![];
154
155 if config.network.ipv6_enabled {
157 let ipv6_addr = std::net::SocketAddr::new(
158 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
159 listen_addr.port(),
160 );
161 listen_addrs.push(ipv6_addr);
162 }
163
164 let ipv4_addr = std::net::SocketAddr::new(
166 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
167 listen_addr.port(),
168 );
169 listen_addrs.push(ipv4_addr);
170
171 Ok(Self {
172 peer_id: None,
173 listen_addrs,
174 listen_addr,
175 bootstrap_peers: Vec::new(),
176 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
177 enable_ipv6: config.network.ipv6_enabled,
178 enable_mcp_server: config.mcp.enabled,
179 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
181 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
182 max_connections: config.network.max_connections,
183 max_incoming_connections: config.security.connection_limit as usize,
184 dht_config: DHTConfig::default(),
185 security_config: SecurityConfig::default(),
186 production_config: None,
187 bootstrap_cache_config: None,
188 identity_config: None,
189 })
190 }
191}
192
193impl Default for NodeConfig {
194 fn default() -> Self {
195 let config = Config::default();
197
198 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
202 9000,
203 )
204 });
205
206 Self {
207 peer_id: None,
208 listen_addrs: vec![
209 std::net::SocketAddr::new(
210 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
211 listen_addr.port(),
212 ),
213 std::net::SocketAddr::new(
214 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
215 listen_addr.port(),
216 ),
217 ],
218 listen_addr,
219 bootstrap_peers: Vec::new(),
220 bootstrap_peers_str: Vec::new(),
221 enable_ipv6: config.network.ipv6_enabled,
222 enable_mcp_server: config.mcp.enabled,
223 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
225 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
226 max_connections: config.network.max_connections,
227 max_incoming_connections: config.security.connection_limit as usize,
228 dht_config: DHTConfig::default(),
229 security_config: SecurityConfig::default(),
230 production_config: None, bootstrap_cache_config: None,
232 identity_config: None, }
234 }
235}
236
237impl NodeConfig {
238 pub fn from_config(config: &Config) -> Result<Self> {
240 let listen_addr = config.listen_socket_addr()?;
241 let bootstrap_addrs = config.bootstrap_addrs()?;
242
243 let mut node_config = Self {
244 peer_id: None,
245 listen_addrs: vec![listen_addr],
246 listen_addr,
247 bootstrap_peers: bootstrap_addrs
248 .iter()
249 .map(|addr| addr.socket_addr())
250 .collect(),
251 bootstrap_peers_str: config
252 .network
253 .bootstrap_nodes
254 .iter()
255 .map(|addr| addr.to_string())
256 .collect(),
257 enable_ipv6: config.network.ipv6_enabled,
258 enable_mcp_server: config.mcp.enabled,
259 mcp_server_config: Some(MCPServerConfig {
260 server_name: "P2P-MCP-Server".to_string(),
261 server_version: "1.0.0".to_string(),
262 enable_dht_discovery: true,
263 max_concurrent_requests: 100,
264 request_timeout: Duration::from_secs(30),
265 enable_auth: false,
266 enable_rate_limiting: true,
267 rate_limit_rpm: 60,
268 enable_logging: true,
269 max_tool_execution_time: Duration::from_secs(60),
270 tool_memory_limit: 1024 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
272 }),
273 connection_timeout: Duration::from_secs(config.network.connection_timeout),
274 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
275 max_connections: config.network.max_connections,
276 max_incoming_connections: config.security.connection_limit as usize,
277 dht_config: DHTConfig {
278 k_value: 20,
279 alpha_value: 3,
280 record_ttl: Duration::from_secs(3600),
281 refresh_interval: Duration::from_secs(900),
282 },
283 security_config: SecurityConfig {
284 enable_noise: true,
285 enable_tls: true,
286 trust_level: TrustLevel::Basic,
287 },
288 production_config: Some(ProductionConfig {
289 max_connections: config.network.max_connections,
290 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
293 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
294 health_check_interval: Duration::from_secs(30),
295 metrics_interval: Duration::from_secs(60),
296 enable_performance_tracking: true,
297 enable_auto_cleanup: true,
298 shutdown_timeout: Duration::from_secs(30),
299 rate_limits: crate::production::RateLimitConfig::default(),
300 }),
301 bootstrap_cache_config: None,
302 identity_config: Some(IdentityManagerConfig {
303 cache_ttl: Duration::from_secs(3600),
304 challenge_timeout: Duration::from_secs(30),
305 }),
306 };
307
308 if config.network.ipv6_enabled {
310 node_config.listen_addrs.push(std::net::SocketAddr::new(
311 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
312 listen_addr.port(),
313 ));
314 }
315
316 Ok(node_config)
317 }
318
319 pub fn with_listen_addr(addr: &str) -> Result<Self> {
321 let listen_addr: std::net::SocketAddr = addr
322 .parse()
323 .map_err(|e: std::net::AddrParseError| NetworkError::InvalidAddress(e.to_string().into()))
324 .map_err(P2PError::Network)?;
325 let mut cfg = NodeConfig::default();
326 cfg.listen_addr = listen_addr;
327 cfg.listen_addrs = vec![listen_addr];
328 Ok(cfg)
329 }
330}
331
332impl Default for DHTConfig {
333 fn default() -> Self {
334 Self {
335 k_value: 20,
336 alpha_value: 5,
337 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
340 }
341}
342
343impl Default for SecurityConfig {
344 fn default() -> Self {
345 Self {
346 enable_noise: true,
347 enable_tls: true,
348 trust_level: TrustLevel::Basic,
349 }
350 }
351}
352
353#[derive(Debug, Clone)]
355pub struct PeerInfo {
356 pub peer_id: PeerId,
358
359 pub addresses: Vec<String>,
361
362 pub connected_at: Instant,
364
365 pub last_seen: Instant,
367
368 pub status: ConnectionStatus,
370
371 pub protocols: Vec<String>,
373
374 pub heartbeat_count: u64,
376}
377
378#[derive(Debug, Clone, PartialEq)]
380pub enum ConnectionStatus {
381 Connecting,
383 Connected,
385 Disconnecting,
387 Disconnected,
389 Failed(String),
391}
392
393#[derive(Debug, Clone)]
395pub enum NetworkEvent {
396 PeerConnected {
398 peer_id: PeerId,
400 addresses: Vec<String>,
402 },
403
404 PeerDisconnected {
406 peer_id: PeerId,
408 reason: String,
410 },
411
412 MessageReceived {
414 peer_id: PeerId,
416 protocol: String,
418 data: Vec<u8>,
420 },
421
422 ConnectionFailed {
424 peer_id: Option<PeerId>,
426 address: String,
428 error: String,
430 },
431
432 DHTRecordStored {
434 key: Vec<u8>,
436 value: Vec<u8>,
438 },
439
440 DHTRecordRetrieved {
442 key: Vec<u8>,
444 value: Option<Vec<u8>>,
446 },
447}
448
449#[derive(Debug, Clone)]
454pub enum P2PEvent {
455 Message {
457 topic: String,
459 source: PeerId,
461 data: Vec<u8>,
463 },
464 PeerConnected(PeerId),
466 PeerDisconnected(PeerId),
468}
469
470pub struct P2PNode {
472 config: NodeConfig,
474
475 peer_id: PeerId,
477
478 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
480
481 event_tx: broadcast::Sender<P2PEvent>,
483
484 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
486
487 start_time: Instant,
489
490 running: RwLock<bool>,
492
493 mcp_server: Option<Arc<MCPServer>>,
495
496 dht: Option<Arc<RwLock<DHT>>>,
498
499 resource_manager: Option<Arc<ResourceManager>>,
501
502 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
504
505 network_node: Arc<P2PNetworkNode>,
507
508 #[allow(dead_code)]
510 rate_limiter: Arc<RateLimiter>,
511}
512
513impl P2PNode {
514 pub fn new_for_tests() -> Self {
516 let (event_tx, _) = broadcast::channel(16);
517 Self {
518 config: NodeConfig::default(),
519 peer_id: "test_peer".to_string(),
520 peers: Arc::new(RwLock::new(HashMap::new())),
521 event_tx,
522 listen_addrs: RwLock::new(Vec::new()),
523 start_time: Instant::now(),
524 running: RwLock::new(false),
525 mcp_server: None,
526 dht: None,
527 resource_manager: None,
528 bootstrap_manager: None,
529 network_node: {
530 let bind: std::net::SocketAddr = "127.0.0.1:0"
532 .parse()
533 .unwrap_or(std::net::SocketAddr::from(([127, 0, 0, 1], 0)));
534 let cfg = ant_quic::QuicNodeConfig {
535 role: ant_quic::EndpointRole::Client,
536 bootstrap_nodes: vec![],
537 enable_coordinator: false,
538 max_connections: 1,
539 connection_timeout: std::time::Duration::from_millis(1),
540 stats_interval: std::time::Duration::from_millis(1),
541 auth_config: ant_quic::auth::AuthConfig::default(),
542 bind_addr: Some(bind),
543 };
544 let node = tokio::runtime::Handle::current()
545 .block_on(
546 crate::transport::ant_quic_adapter::P2PNetworkNode::new_with_config(
547 bind, cfg,
548 ),
549 )
550 .unwrap_or_else(|_| {
551 let fallback_bind = std::net::SocketAddr::from(([127, 0, 0, 1], 0));
553 #[allow(clippy::expect_used)]
554 {
555 tokio::runtime::Handle::current()
556 .block_on(crate::transport::ant_quic_adapter::P2PNetworkNode::new(
557 fallback_bind,
558 ))
559 .expect("ant-quic fallback creation failed")
560 }
561 });
562 Arc::new(node)
563 },
564 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
565 max_requests: 100,
566 burst_size: 100,
567 window: std::time::Duration::from_secs(1),
568 ..Default::default()
569 })),
570 }
571 }
572 pub async fn new(config: NodeConfig) -> Result<Self> {
574 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
575 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
577 });
578
579 let (event_tx, _) = broadcast::channel(1000);
580
581 let dht = if true {
583 let dht_config = crate::dht::DHTConfig {
585 replication_factor: config.dht_config.k_value,
586 bucket_size: config.dht_config.k_value,
587 alpha: config.dht_config.alpha_value,
588 record_ttl: config.dht_config.record_ttl,
589 bucket_refresh_interval: config.dht_config.refresh_interval,
590 republish_interval: config.dht_config.refresh_interval,
591 max_distance: 160, };
593 let dht_key = crate::dht::Key::new(peer_id.as_bytes());
594 let dht_instance = DHT::new(dht_key, dht_config);
595 Some(Arc::new(RwLock::new(dht_instance)))
596 } else {
597 None
598 };
599
600 let mcp_server = if config.enable_mcp_server {
602 let mcp_config = config
603 .mcp_server_config
604 .clone()
605 .unwrap_or_else(|| MCPServerConfig {
606 server_name: format!("P2P-MCP-{peer_id}"),
607 server_version: crate::VERSION.to_string(),
608 enable_dht_discovery: dht.is_some(),
609 ..MCPServerConfig::default()
610 });
611
612 let mut server = MCPServer::new(mcp_config);
613
614 if let Some(ref dht_instance) = dht {
616 server = server.with_dht(dht_instance.clone());
617 }
618
619 Some(Arc::new(server))
620 } else {
621 None
622 };
623
624 let resource_manager = config
626 .production_config
627 .clone()
628 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
629
630 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
632 match BootstrapManager::with_config(cache_config.clone()).await {
633 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
634 Err(e) => {
635 warn!(
636 "Failed to initialize bootstrap manager: {}, continuing without cache",
637 e
638 );
639 None
640 }
641 }
642 } else {
643 match BootstrapManager::new().await {
644 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
645 Err(e) => {
646 warn!(
647 "Failed to initialize bootstrap manager: {}, continuing without cache",
648 e
649 );
650 None
651 }
652 }
653 };
654
655 let bind_addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_addr.port()));
658
659 let bootstrap_nodes: Vec<std::net::SocketAddr> = config
661 .bootstrap_peers_str
662 .iter()
663 .filter_map(|addr_str| addr_str.parse().ok())
664 .collect();
665
666 let quic_config = ant_quic::QuicNodeConfig {
667 role: if bootstrap_nodes.is_empty() {
668 ant_quic::EndpointRole::Server {
669 can_coordinate: false,
670 }
671 } else {
672 ant_quic::EndpointRole::Client
673 },
674 bootstrap_nodes,
675 enable_coordinator: false,
676 max_connections: config.max_connections.min(1000),
677 connection_timeout: config.connection_timeout,
678 stats_interval: std::time::Duration::from_secs(60),
679 auth_config: ant_quic::auth::AuthConfig::default(),
680 bind_addr: Some(bind_addr),
681 };
682
683 let network_node = Arc::new(
684 P2PNetworkNode::new_with_config(bind_addr, quic_config)
685 .await
686 .map_err(|e| {
687 P2PError::Transport(crate::error::TransportError::SetupFailed(
688 format!("Failed to create P2P network node: {}", e).into(),
689 ))
690 })?,
691 );
692
693 let rate_limiter = Arc::new(RateLimiter::new(
695 crate::validation::RateLimitConfig::default(),
696 ));
697
698 let node = Self {
699 config,
700 peer_id,
701 peers: Arc::new(RwLock::new(HashMap::new())),
702 event_tx,
703 listen_addrs: RwLock::new(Vec::new()),
704 start_time: Instant::now(),
705 running: RwLock::new(false),
706 mcp_server,
707 dht,
708 resource_manager,
709 bootstrap_manager,
710 network_node,
711 rate_limiter,
712 };
713 info!("Created P2P node with peer ID: {}", node.peer_id);
714
715 Ok(node)
720 }
721
722 pub fn builder() -> NodeBuilder {
724 NodeBuilder::new()
725 }
726
727 pub fn peer_id(&self) -> &PeerId {
729 &self.peer_id
730 }
731
732 pub async fn initialize_mcp_network(&self) -> Result<()> {
735 if let Some(ref _mcp_server) = self.mcp_server {
736 let (send_tx, mut send_rx) =
738 tokio::sync::mpsc::unbounded_channel::<(PeerId, String, Vec<u8>)>();
739
740 let network_sender = P2PNetworkSender::new(self.peer_id.clone(), send_tx);
742
743 _mcp_server
745 .set_network_sender(Arc::new(network_sender))
746 .await;
747
748 let network_node: Arc<crate::transport::ant_quic_adapter::P2PNetworkNode> =
750 Arc::clone(&self.network_node);
751 let _peer_id_for_task = self.peer_id.clone();
752 tokio::spawn(async move {
753 while let Some((peer_id, protocol, data)) = send_rx.recv().await {
754 debug!(
755 "Sending network message to {}: {} bytes on protocol {}",
756 peer_id,
757 data.len(),
758 protocol
759 );
760
761 let message_data = match handle_protocol_message_creation(&protocol, data) {
763 Some(msg) => msg,
764 None => continue,
765 };
766
767 let send_result = network_node
769 .send_to_peer_string(&peer_id, &message_data)
770 .await;
771 handle_message_send_result(
772 send_result.map_err(|e| {
773 P2PError::Transport(crate::error::TransportError::StreamError(
774 e.to_string().into(),
775 ))
776 }),
777 &peer_id,
778 )
779 .await;
780 }
781 });
782
783 info!(
784 "MCP network integration initialized for peer {}",
785 self.peer_id
786 );
787 }
788 Ok(())
789 }
790
791
792 pub fn local_addr(&self) -> Option<String> {
793 self.listen_addrs
794 .try_read()
795 .ok()
796 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
797 }
798
799 pub async fn subscribe(&self, topic: &str) -> Result<()> {
800 info!("Subscribed to topic: {}", topic);
803 Ok(())
804 }
805
806 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
807 info!(
808 "Publishing message to topic: {} ({} bytes)",
809 topic,
810 data.len()
811 );
812
813 let peer_list: Vec<PeerId> = {
815 let peers_guard = self.peers.read().await;
816 peers_guard.keys().cloned().collect()
817 };
818
819 if peer_list.is_empty() {
820 debug!("No peers connected, message will only be sent to local subscribers");
821 } else {
822 let mut send_count = 0;
824 for peer_id in &peer_list {
825 match self.send_message(peer_id, topic, data.to_vec()).await {
826 Ok(_) => {
827 send_count += 1;
828 debug!("Sent message to peer: {}", peer_id);
829 }
830 Err(e) => {
831 warn!("Failed to send message to peer {}: {}", peer_id, e);
832 }
833 }
834 }
835 info!(
836 "Published message to {}/{} connected peers",
837 send_count,
838 peer_list.len()
839 );
840 }
841
842 let event = P2PEvent::Message {
844 topic: topic.to_string(),
845 source: self.peer_id.clone(),
846 data: data.to_vec(),
847 };
848 let _ = self.event_tx.send(event);
849
850 Ok(())
851 }
852
853 pub fn config(&self) -> &NodeConfig {
855 &self.config
856 }
857
858 pub async fn start(&self) -> Result<()> {
860 info!("Starting P2P node...");
861
862 if let Some(ref resource_manager) = self.resource_manager {
864 resource_manager.start().await.map_err(|e| {
865 P2PError::Network(crate::error::NetworkError::ProtocolError(
866 format!("Failed to start resource manager: {e}").into(),
867 ))
868 })?;
869 info!("Production resource manager started");
870 }
871
872 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
874 let mut manager = bootstrap_manager.write().await;
875 manager.start_background_tasks().await.map_err(|e| {
876 P2PError::Network(crate::error::NetworkError::ProtocolError(
877 format!("Failed to start bootstrap manager: {e}").into(),
878 ))
879 })?;
880 info!("Bootstrap cache manager started");
881 }
882
883 *self.running.write().await = true;
885
886 self.start_network_listeners().await?;
888
889 let listen_addrs = self.listen_addrs.read().await;
891 info!("P2P node started on addresses: {:?}", *listen_addrs);
892
893 self.initialize_mcp_network().await?;
895
896 if let Some(ref _mcp_server) = self.mcp_server {
898 _mcp_server.start().await.map_err(|e| {
899 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
900 format!("Failed to start MCP server: {e}").into(),
901 ))
902 })?;
903 info!("MCP server started with network integration");
904 }
905
906 self.start_message_receiving_system().await?;
908
909 self.connect_bootstrap_peers().await?;
911
912 Ok(())
913 }
914
915 async fn start_network_listeners(&self) -> Result<()> {
917 info!("Starting network listeners...");
918
919 let _network_node = &self.network_node;
921
922 for &socket_addr in &self.config.listen_addrs {
924 if let Err(e) = self.start_listener_on_address(socket_addr).await {
927 warn!("Failed to start listener on {}: {}", socket_addr, e);
928 } else {
929 info!("Started listener on {}", socket_addr);
930 }
931 }
932
933 if self.config.listen_addrs.is_empty() {
935 let mut default_addrs = vec![std::net::SocketAddr::new(
937 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
938 9000,
939 )];
940
941 if self.config.enable_ipv6 {
943 default_addrs.push(std::net::SocketAddr::new(
944 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
945 9000,
946 ));
947 }
948
949 for addr in default_addrs {
950 if let Err(e) = self.start_listener_on_address(addr).await {
951 warn!("Failed to start default listener on {}: {}", addr, e);
952 } else {
953 info!("Started default listener on {}", addr);
954 }
955 }
956 }
957
958 Ok(())
959 }
960
961 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
963 warn!("QUIC transport temporarily disabled during ant-quic migration");
1002 Err(crate::P2PError::Transport(
1004 crate::error::TransportError::SetupFailed(
1005 format!(
1006 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1007 )
1008 .into(),
1009 ),
1010 ))
1011 }
1012
1013 #[allow(dead_code)] async fn start_connection_acceptor(
1016 &self,
1017 transport: Arc<dyn crate::transport::Transport>,
1018 addr: std::net::SocketAddr,
1019 transport_type: crate::transport::TransportType,
1020 ) -> Result<()> {
1021 info!(
1022 "Starting connection acceptor for {:?} on {}",
1023 transport_type, addr
1024 );
1025
1026 let event_tx = self.event_tx.clone();
1028 let _peer_id = self.peer_id.clone();
1029 let peers = Arc::clone(&self.peers);
1030 let _network_node = Arc::clone(&self.network_node);
1031 let mcp_server = self.mcp_server.clone();
1032 let rate_limiter = Arc::clone(&self.rate_limiter);
1033
1034 tokio::spawn(async move {
1036 loop {
1037 match transport.accept().await {
1038 Ok(connection) => {
1039 let remote_addr = connection.remote_addr();
1040 let connection_peer_id =
1041 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1042
1043 let socket_addr = remote_addr.socket_addr();
1045 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1046 continue;
1048 }
1049
1050 info!(
1051 "Accepted {:?} connection from {} (peer: {})",
1052 transport_type, remote_addr, connection_peer_id
1053 );
1054
1055 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1057
1058 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1060
1061 spawn_connection_handler(
1063 connection,
1064 connection_peer_id,
1065 event_tx.clone(),
1066 Arc::clone(&peers),
1067 mcp_server.clone(),
1068 );
1069 }
1070 Err(e) => {
1071 warn!(
1072 "Failed to accept {:?} connection on {}: {}",
1073 transport_type, addr, e
1074 );
1075
1076 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1078 }
1079 }
1080 }
1081 });
1082
1083 info!(
1084 "Connection acceptor background task started for {:?} on {}",
1085 transport_type, addr
1086 );
1087 Ok(())
1088 }
1089
1090 async fn start_message_receiving_system(&self) -> Result<()> {
1092 info!("Message receiving system initialized (background tasks simplified for demo)");
1093
1094 Ok(())
1099 }
1100
1101 #[allow(dead_code)]
1103 async fn handle_received_message(
1104 &self,
1105 message_data: Vec<u8>,
1106 peer_id: &PeerId,
1107 protocol: &str,
1108 event_tx: &broadcast::Sender<P2PEvent>,
1109 ) -> Result<()> {
1110 if protocol == MCP_PROTOCOL {
1112 return self.handle_mcp_message(message_data, peer_id).await;
1113 }
1114
1115 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1117 Ok(message) => {
1118 if let (Some(protocol), Some(data), Some(from)) = (
1119 message.get("protocol").and_then(|v| v.as_str()),
1120 message.get("data").and_then(|v| v.as_array()),
1121 message.get("from").and_then(|v| v.as_str()),
1122 ) {
1123 let data_bytes: Vec<u8> = data
1125 .iter()
1126 .filter_map(|v| v.as_u64().map(|n| n as u8))
1127 .collect();
1128
1129 let event = P2PEvent::Message {
1131 topic: protocol.to_string(),
1132 source: from.to_string(),
1133 data: data_bytes,
1134 };
1135
1136 let _ = event_tx.send(event);
1137 debug!("Generated message event from peer: {}", peer_id);
1138 }
1139 }
1140 Err(e) => {
1141 warn!("Failed to parse received message from {}: {}", peer_id, e);
1142 }
1143 }
1144
1145 Ok(())
1146 }
1147
1148 #[allow(dead_code)]
1150 async fn handle_mcp_message(&self, message_data: Vec<u8>, peer_id: &PeerId) -> Result<()> {
1151 if let Some(ref _mcp_server) = self.mcp_server {
1152 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
1154 Ok(p2p_mcp_message) => {
1155 debug!(
1156 "Received MCP message from peer {}: {:?}",
1157 peer_id, p2p_mcp_message.message_type
1158 );
1159
1160 match p2p_mcp_message.message_type {
1162 crate::mcp::P2PMCPMessageType::Request => {
1163 self.handle_mcp_tool_request(p2p_mcp_message, peer_id)
1165 .await?;
1166 }
1167 crate::mcp::P2PMCPMessageType::Response => {
1168 self.handle_mcp_tool_response(p2p_mcp_message).await?;
1170 }
1171 crate::mcp::P2PMCPMessageType::ServiceAdvertisement => {
1172 self.handle_mcp_service_advertisement(p2p_mcp_message, peer_id)
1174 .await?;
1175 }
1176 crate::mcp::P2PMCPMessageType::ServiceDiscovery => {
1177 self.handle_mcp_service_discovery(p2p_mcp_message, peer_id)
1179 .await?;
1180 }
1181 crate::mcp::P2PMCPMessageType::Heartbeat => {
1182 debug!("Received heartbeat from peer {}", peer_id);
1184
1185 let _ =
1187 update_peer_heartbeat(&self.peers, peer_id)
1188 .await
1189 .map_err(|e| {
1190 debug!(
1191 "Failed to update heartbeat for peer {}: {}",
1192 peer_id, e
1193 )
1194 });
1195
1196 let timestamp = std::time::SystemTime::now()
1198 .duration_since(std::time::UNIX_EPOCH)
1199 .map_err(|e| {
1200 P2PError::Network(NetworkError::ProtocolError(
1201 format!("System time error: {}", e).into(),
1202 ))
1203 })?
1204 .as_secs();
1205
1206 let ack_data = serde_json::to_vec(&serde_json::json!({
1207 "type": "heartbeat_ack",
1208 "timestamp": timestamp
1209 }))
1210 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1211
1212 let _ = self
1213 .send_message(peer_id, MCP_PROTOCOL, ack_data)
1214 .await
1215 .map_err(|e| {
1216 warn!("Failed to send heartbeat ack to {}: {}", peer_id, e)
1217 });
1218 }
1219 crate::mcp::P2PMCPMessageType::HealthCheck => {
1220 debug!("Received health check from peer {}", peer_id);
1222
1223 let peers_count = self.peers.read().await.len();
1225 let uptime = self.start_time.elapsed();
1226
1227 let (memory_usage, cpu_usage) =
1229 get_resource_metrics(&self.resource_manager).await;
1230
1231 let timestamp = std::time::SystemTime::now()
1233 .duration_since(std::time::UNIX_EPOCH)
1234 .map_err(|e| {
1235 P2PError::Network(NetworkError::ProtocolError(
1236 format!("System time error: {}", e).into(),
1237 ))
1238 })?
1239 .as_secs();
1240
1241 let health_response = serde_json::json!({
1242 "type": "health_check_response",
1243 "status": "healthy",
1244 "peer_id": self.peer_id,
1245 "peers_count": peers_count,
1246 "uptime_secs": uptime.as_secs(),
1247 "memory_usage_bytes": memory_usage,
1248 "cpu_usage_percent": cpu_usage,
1249 "timestamp": timestamp
1250 });
1251
1252 let response_data = serde_json::to_vec(&health_response)
1253 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1254
1255 if let Err(e) = self
1257 .send_message(peer_id, MCP_PROTOCOL, response_data)
1258 .await
1259 {
1260 warn!("Failed to send health check response to {}: {}", peer_id, e);
1261 }
1262 }
1263 }
1264 }
1265 Err(e) => {
1266 warn!(
1267 "Failed to deserialize MCP message from peer {}: {}",
1268 peer_id, e
1269 );
1270 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1271 format!("Invalid MCP message: {e}").into(),
1272 )));
1273 }
1274 }
1275 } else {
1276 warn!("Received MCP message but MCP server is not enabled");
1277 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1278 "MCP server not enabled".to_string().into(),
1279 )));
1280 }
1281
1282 Ok(())
1283 }
1284
1285 #[allow(dead_code)]
1287 async fn handle_mcp_tool_request(
1288 &self,
1289 message: crate::mcp::P2PMCPMessage,
1290 peer_id: &PeerId,
1291 ) -> Result<()> {
1292 if let Some(ref _mcp_server) = self.mcp_server {
1293 if let crate::mcp::MCPMessage::CallTool { name, arguments } = message.payload {
1295 debug!(
1296 "Handling MCP tool request for '{}' from peer {}",
1297 name, peer_id
1298 );
1299
1300 let context = MCPCallContext {
1302 caller_id: peer_id.clone(),
1303 timestamp: std::time::SystemTime::now(),
1304 timeout: Duration::from_secs(30),
1305 auth_info: None,
1306 metadata: std::collections::HashMap::new(),
1307 };
1308
1309 match _mcp_server.call_tool(&name, arguments, context).await {
1311 Ok(result) => {
1312 let response_message = crate::mcp::P2PMCPMessage {
1314 message_type: crate::mcp::P2PMCPMessageType::Response,
1315 message_id: message.message_id,
1316 source_peer: self.peer_id.clone(),
1317 target_peer: Some(peer_id.clone()),
1318 timestamp: std::time::SystemTime::now()
1319 .duration_since(std::time::UNIX_EPOCH)
1320 .unwrap_or_default()
1321 .as_secs(),
1322 payload: crate::mcp::MCPMessage::CallToolResult {
1323 content: vec![crate::mcp::MCPContent::Text {
1324 text: serde_json::to_string(&result).unwrap_or_default(),
1325 }],
1326 is_error: false,
1327 },
1328 ttl: 5,
1329 };
1330
1331 let response_data = serde_json::to_vec(&response_message)
1333 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1334
1335 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1336 .await?;
1337 debug!("Sent MCP tool response to peer {}", peer_id);
1338 }
1339 Err(e) => {
1340 let error_message = crate::mcp::P2PMCPMessage {
1342 message_type: crate::mcp::P2PMCPMessageType::Response,
1343 message_id: message.message_id,
1344 source_peer: self.peer_id.clone(),
1345 target_peer: Some(peer_id.clone()),
1346 timestamp: std::time::SystemTime::now()
1347 .duration_since(std::time::UNIX_EPOCH)
1348 .unwrap_or_default()
1349 .as_secs(),
1350 payload: crate::mcp::MCPMessage::CallToolResult {
1351 content: vec![crate::mcp::MCPContent::Text {
1352 text: format!("Error: {e}"),
1353 }],
1354 is_error: true,
1355 },
1356 ttl: 5,
1357 };
1358
1359 let error_data = serde_json::to_vec(&error_message)
1360 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1361
1362 self.send_message(peer_id, MCP_PROTOCOL, error_data).await?;
1363 warn!("Sent MCP error response to peer {}: {}", peer_id, e);
1364 }
1365 }
1366 }
1367 }
1368
1369 Ok(())
1370 }
1371
1372 #[allow(dead_code)]
1374 async fn handle_mcp_tool_response(&self, message: crate::mcp::P2PMCPMessage) -> Result<()> {
1375 if let Some(ref _mcp_server) = self.mcp_server {
1376 debug!("Received MCP tool response: {}", message.message_id);
1378 }
1381
1382 Ok(())
1383 }
1384
1385 #[allow(dead_code)]
1387 async fn handle_mcp_service_advertisement(
1388 &self,
1389 message: crate::mcp::P2PMCPMessage,
1390 peer_id: &PeerId,
1391 ) -> Result<()> {
1392 debug!("Received MCP service advertisement from peer {}", peer_id);
1393
1394 if let Some(ref _mcp_server) = self.mcp_server {
1395 _mcp_server.handle_service_advertisement(message).await?;
1397 debug!("Processed service advertisement from peer {}", peer_id);
1398 } else {
1399 warn!("Received MCP service advertisement but MCP server is not enabled");
1400 }
1401
1402 Ok(())
1403 }
1404
1405 #[allow(dead_code)]
1407 async fn handle_mcp_service_discovery(
1408 &self,
1409 message: crate::mcp::P2PMCPMessage,
1410 peer_id: &PeerId,
1411 ) -> Result<()> {
1412 debug!("Received MCP service discovery query from peer {}", peer_id);
1413
1414 if let Some(ref _mcp_server) = self.mcp_server {
1415 if let Ok(Some(response_data)) = _mcp_server.handle_service_discovery(message).await {
1417 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1419 .await?;
1420 debug!("Sent service discovery response to peer {}", peer_id);
1421 }
1422 } else {
1423 warn!("Received MCP service discovery query but MCP server is not enabled");
1424 }
1425
1426 Ok(())
1427 }
1428
1429 pub async fn run(&self) -> Result<()> {
1431 if !*self.running.read().await {
1432 self.start().await?;
1433 }
1434
1435 info!("P2P node running...");
1436
1437 loop {
1439 if !*self.running.read().await {
1440 break;
1441 }
1442
1443 self.periodic_tasks().await?;
1445
1446 tokio::time::sleep(Duration::from_millis(100)).await;
1448 }
1449
1450 info!("P2P node stopped");
1451 Ok(())
1452 }
1453
1454 pub async fn stop(&self) -> Result<()> {
1456 info!("Stopping P2P node...");
1457
1458 *self.running.write().await = false;
1460
1461 if let Some(ref _mcp_server) = self.mcp_server {
1463 _mcp_server.shutdown().await.map_err(|e| {
1464 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1465 format!("Failed to shutdown MCP server: {e}").into(),
1466 ))
1467 })?;
1468 info!("MCP server stopped");
1469 }
1470
1471 self.disconnect_all_peers().await?;
1473
1474 if let Some(ref resource_manager) = self.resource_manager {
1476 resource_manager.shutdown().await.map_err(|e| {
1477 P2PError::Network(crate::error::NetworkError::ProtocolError(
1478 format!("Failed to shutdown resource manager: {e}").into(),
1479 ))
1480 })?;
1481 info!("Production resource manager stopped");
1482 }
1483
1484 info!("P2P node stopped");
1485 Ok(())
1486 }
1487
1488 pub async fn shutdown(&self) -> Result<()> {
1490 self.stop().await
1491 }
1492
1493 pub async fn is_running(&self) -> bool {
1495 *self.running.read().await
1496 }
1497
1498 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1500 self.listen_addrs.read().await.clone()
1501 }
1502
1503 pub async fn connected_peers(&self) -> Vec<PeerId> {
1505 self.peers.read().await.keys().cloned().collect()
1506 }
1507
1508 pub async fn peer_count(&self) -> usize {
1510 self.peers.read().await.len()
1511 }
1512
1513 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1515 self.peers.read().await.get(peer_id).cloned()
1516 }
1517
1518 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1520 info!("Connecting to peer at: {}", address);
1521
1522 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1524 Some(resource_manager.acquire_connection().await?)
1525 } else {
1526 None
1527 };
1528
1529 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1531 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1532 format!("{}: {}", address, e).into(),
1533 ))
1534 })?;
1535
1536 let peer_id = {
1538 match self.network_node.connect_to_peer_string(_socket_addr).await {
1539 Ok(connected_peer_id) => {
1540 info!("Successfully connected to peer: {}", connected_peer_id);
1541 connected_peer_id
1542 }
1543 Err(e) => {
1544 warn!("Failed to connect to peer at {}: {}", address, e);
1545
1546 let demo_peer_id =
1549 format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1550 warn!(
1551 "Using demo peer ID: {} (transport connection failed)",
1552 demo_peer_id
1553 );
1554 demo_peer_id
1555 }
1556 }
1557 };
1558
1559 let peer_info = PeerInfo {
1561 peer_id: peer_id.clone(),
1562 addresses: vec![address.to_string()],
1563 connected_at: Instant::now(),
1564 last_seen: Instant::now(),
1565 status: ConnectionStatus::Connected,
1566 protocols: vec!["p2p-foundation/1.0".to_string()],
1567 heartbeat_count: 0,
1568 };
1569
1570 self.peers.write().await.insert(peer_id.clone(), peer_info);
1572
1573 if let Some(ref resource_manager) = self.resource_manager {
1575 resource_manager.record_bandwidth(0, 0); }
1577
1578 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1580
1581 info!("Connected to peer: {}", peer_id);
1582 Ok(peer_id)
1583 }
1584
1585 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1587 info!("Disconnecting from peer: {}", peer_id);
1588
1589 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1590 peer_info.status = ConnectionStatus::Disconnected;
1591
1592 let _ = self
1594 .event_tx
1595 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1596
1597 info!("Disconnected from peer: {}", peer_id);
1598 }
1599
1600 Ok(())
1601 }
1602
1603 pub async fn send_message(
1605 &self,
1606 peer_id: &PeerId,
1607 protocol: &str,
1608 data: Vec<u8>,
1609 ) -> Result<()> {
1610 debug!(
1611 "Sending message to peer {} on protocol {}",
1612 peer_id, protocol
1613 );
1614
1615 if let Some(ref resource_manager) = self.resource_manager
1617 && !resource_manager
1618 .check_rate_limit(peer_id, "message")
1619 .await?
1620 {
1621 return Err(P2PError::ResourceExhausted(
1622 format!("Rate limit exceeded for peer {}", peer_id).into(),
1623 ));
1624 }
1625
1626 if !self.peers.read().await.contains_key(peer_id) {
1628 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1629 peer_id.to_string().into(),
1630 )));
1631 }
1632
1633 if protocol == MCP_PROTOCOL {
1635 if data.len() < 4 {
1637 return Err(P2PError::Network(
1638 crate::error::NetworkError::ProtocolError(
1639 "Invalid MCP message: too short".to_string().into(),
1640 ),
1641 ));
1642 }
1643
1644 let message_type = data.first().unwrap_or(&0);
1646 if *message_type > 10 {
1647 return Err(P2PError::Network(
1649 crate::error::NetworkError::ProtocolError(
1650 "Invalid MCP message type".to_string().into(),
1651 ),
1652 ));
1653 }
1654
1655 debug!("Validated MCP message for network transmission");
1656 }
1657
1658 if let Some(ref resource_manager) = self.resource_manager {
1660 resource_manager.record_bandwidth(data.len() as u64, 0);
1661 }
1662
1663 let _message_data = self.create_protocol_message(protocol, data)?;
1665
1666 {
1668 match self.network_node.send_message(peer_id, _message_data).await {
1669 Ok(_) => {
1670 debug!("Message sent to peer {} via transport layer", peer_id);
1671 }
1672 Err(e) => {
1673 warn!("Failed to send message to peer {}: {}", peer_id, e);
1674 return Err(P2PError::Network(
1675 crate::error::NetworkError::ProtocolError(
1676 format!("Message send failed: {e}").into(),
1677 ),
1678 ));
1679 }
1680 }
1681 Ok(())
1682 }
1683 }
1684
1685 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1687 use serde_json::json;
1688
1689 let timestamp = std::time::SystemTime::now()
1690 .duration_since(std::time::UNIX_EPOCH)
1691 .map_err(|e| {
1692 P2PError::Network(NetworkError::ProtocolError(
1693 format!("System time error: {}", e).into(),
1694 ))
1695 })?
1696 .as_secs();
1697
1698 let message = json!({
1700 "protocol": protocol,
1701 "data": data,
1702 "from": self.peer_id,
1703 "timestamp": timestamp
1704 });
1705
1706 serde_json::to_vec(&message).map_err(|e| {
1707 P2PError::Transport(crate::error::TransportError::StreamError(
1708 format!("Failed to serialize message: {e}").into(),
1709 ))
1710 })
1711 }
1712
1713 }
1715
1716#[allow(dead_code)]
1718fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1719 use serde_json::json;
1720
1721 let timestamp = std::time::SystemTime::now()
1722 .duration_since(std::time::UNIX_EPOCH)
1723 .map_err(|e| {
1724 P2PError::Network(NetworkError::ProtocolError(
1725 format!("System time error: {}", e).into(),
1726 ))
1727 })?
1728 .as_secs();
1729
1730 let message = json!({
1732 "protocol": protocol,
1733 "data": data,
1734 "timestamp": timestamp
1735 });
1736
1737 serde_json::to_vec(&message).map_err(|e| {
1738 P2PError::Transport(crate::error::TransportError::StreamError(
1739 format!("Failed to serialize message: {e}").into(),
1740 ))
1741 })
1742}
1743
1744impl P2PNode {
1745 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1747 self.event_tx.subscribe()
1748 }
1749
1750 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1752 self.subscribe_events()
1753 }
1754
1755 pub fn uptime(&self) -> Duration {
1757 self.start_time.elapsed()
1758 }
1759
1760 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1762 self.mcp_server.as_ref()
1763 }
1764
1765 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1767 if let Some(ref _mcp_server) = self.mcp_server {
1768 let tool_name = tool.definition.name.clone();
1769 _mcp_server.register_tool(tool).await.map_err(|e| {
1770 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1771 format!("{}: Registration failed: {e}", tool_name).into(),
1772 ))
1773 })
1774 } else {
1775 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1776 "MCP server not enabled".to_string().into(),
1777 )))
1778 }
1779 }
1780
1781 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1783 if let Some(ref _mcp_server) = self.mcp_server {
1784 if let Some(ref resource_manager) = self.resource_manager
1786 && !resource_manager
1787 .check_rate_limit(&self.peer_id, "mcp")
1788 .await?
1789 {
1790 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1791 "MCP rate limit exceeded".to_string().into(),
1792 )));
1793 }
1794
1795 let context = MCPCallContext {
1796 caller_id: self.peer_id.clone(),
1797 timestamp: SystemTime::now(),
1798 timeout: Duration::from_secs(30),
1799 auth_info: None,
1800 metadata: HashMap::new(),
1801 };
1802
1803 _mcp_server
1804 .call_tool(tool_name, arguments, context)
1805 .await
1806 .map_err(|e| {
1807 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1808 format!("{}: Execution failed: {e}", tool_name).into(),
1809 ))
1810 })
1811 } else {
1812 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1813 "MCP server not enabled".to_string().into(),
1814 )))
1815 }
1816 }
1817
1818 pub async fn call_remote_mcp_tool(
1820 &self,
1821 peer_id: &PeerId,
1822 tool_name: &str,
1823 arguments: Value,
1824 ) -> Result<Value> {
1825 if let Some(ref _mcp_server) = self.mcp_server {
1826 if peer_id == &self.peer_id {
1828 let context = MCPCallContext {
1830 caller_id: self.peer_id.clone(),
1831 timestamp: SystemTime::now(),
1832 timeout: Duration::from_secs(30),
1833 auth_info: None,
1834 metadata: HashMap::new(),
1835 };
1836
1837 return _mcp_server.call_tool(tool_name, arguments, context).await;
1839 }
1840
1841 let context = MCPCallContext {
1845 caller_id: self.peer_id.clone(),
1846 timestamp: SystemTime::now(),
1847 timeout: Duration::from_secs(30),
1848 auth_info: None,
1849 metadata: HashMap::new(),
1850 };
1851
1852 match _mcp_server
1854 .call_tool(tool_name, arguments.clone(), context)
1855 .await
1856 {
1857 Ok(mut result) => {
1858 if let Value::Object(ref mut map) = result {
1860 map.insert("tool".to_string(), Value::String(tool_name.to_string()));
1861 }
1862 Ok(result)
1863 }
1864 Err(e) => Err(e),
1865 }
1866 } else {
1867 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1868 "MCP server not enabled".to_string().into(),
1869 )))
1870 }
1871 }
1872
1873 #[allow(dead_code)]
1875 async fn handle_mcp_remote_tool_call(
1876 &self,
1877 peer_id: &PeerId,
1878 tool_name: &str,
1879 arguments: Value,
1880 context: MCPCallContext,
1881 ) -> Result<Value> {
1882 let request_id = uuid::Uuid::new_v4().to_string();
1883
1884 let mcp_message = crate::mcp::MCPMessage::CallTool {
1886 name: tool_name.to_string(),
1887 arguments,
1888 };
1889
1890 let p2p_message = crate::mcp::P2PMCPMessage {
1892 message_type: crate::mcp::P2PMCPMessageType::Request,
1893 message_id: request_id.clone(),
1894 source_peer: context.caller_id.clone(),
1895 target_peer: Some(peer_id.clone()),
1896 timestamp: context
1897 .timestamp
1898 .duration_since(std::time::UNIX_EPOCH)
1899 .map_err(|e| {
1900 P2PError::Network(crate::error::NetworkError::ProtocolError(
1901 format!("Time error: {e}").into(),
1902 ))
1903 })?
1904 .as_secs(),
1905 payload: mcp_message,
1906 ttl: 5, };
1908
1909 let message_data = serde_json::to_vec(&p2p_message)
1911 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1912
1913 if message_data.len() > crate::mcp::MAX_MESSAGE_SIZE {
1914 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1915 "Message too large".to_string().into(),
1916 )));
1917 }
1918
1919 self.send_message(peer_id, MCP_PROTOCOL, message_data)
1921 .await?;
1922
1923 info!(
1925 "MCP remote tool call sent to peer {}, tool: {}",
1926 peer_id, tool_name
1927 );
1928
1929 Ok(serde_json::json!({
1932 "status": "sent",
1933 "message": "Remote tool call sent successfully",
1934 "peer_id": peer_id,
1935 "tool": tool_name, "request_id": request_id
1937 }))
1938 }
1939
1940 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1942 if let Some(ref _mcp_server) = self.mcp_server {
1943 let (tools, _) = _mcp_server.list_tools(None).await.map_err(|e| {
1944 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1945 format!("Failed to list tools: {e}").into(),
1946 ))
1947 })?;
1948
1949 Ok(tools.into_iter().map(|tool| tool.name).collect())
1950 } else {
1951 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1952 "MCP server not enabled".to_string().into(),
1953 )))
1954 }
1955 }
1956
1957 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1959 if let Some(ref _mcp_server) = self.mcp_server {
1960 _mcp_server.discover_remote_services().await.map_err(|e| {
1961 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1962 format!("Failed to discover services: {e}").into(),
1963 ))
1964 })
1965 } else {
1966 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1967 "MCP server not enabled".to_string().into(),
1968 )))
1969 }
1970 }
1971
1972 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1974 if let Some(ref _mcp_server) = self.mcp_server {
1975 if peer_id == &self.peer_id {
1977 return self.list_mcp_tools().await;
1978 }
1979
1980 self.list_mcp_tools().await
1984 } else {
1985 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1986 "MCP server not enabled".to_string().into(),
1987 )))
1988 }
1989 }
1990
1991 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1993 if let Some(ref _mcp_server) = self.mcp_server {
1994 Ok(_mcp_server.get_stats().await)
1995 } else {
1996 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1997 "MCP server not enabled".to_string().into(),
1998 )))
1999 }
2000 }
2001
2002 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2004 if let Some(ref resource_manager) = self.resource_manager {
2005 Ok(resource_manager.get_metrics().await)
2006 } else {
2007 Err(P2PError::Network(
2008 crate::error::NetworkError::ProtocolError(
2009 "Production resource manager not enabled".to_string().into(),
2010 ),
2011 ))
2012 }
2013 }
2014
2015 pub async fn health_check(&self) -> Result<()> {
2017 if let Some(ref resource_manager) = self.resource_manager {
2018 resource_manager.health_check().await
2019 } else {
2020 let peer_count = self.peer_count().await;
2022 if peer_count > self.config.max_connections {
2023 Err(P2PError::Network(
2024 crate::error::NetworkError::ProtocolError(
2025 format!("Too many connections: {peer_count}").into(),
2026 ),
2027 ))
2028 } else {
2029 Ok(())
2030 }
2031 }
2032 }
2033
2034 pub fn production_config(&self) -> Option<&ProductionConfig> {
2036 self.config.production_config.as_ref()
2037 }
2038
2039 pub fn is_production_mode(&self) -> bool {
2041 self.resource_manager.is_some()
2042 }
2043
2044 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2046 self.dht.as_ref()
2047 }
2048
2049 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2051 if let Some(ref dht) = self.dht {
2052 let dht_instance = dht.write().await;
2053 dht_instance
2054 .put(key.clone(), value.clone())
2055 .await
2056 .map_err(|e| {
2057 P2PError::Dht(crate::error::DhtError::StoreFailed(
2058 format!("{}: {e}", key).into(),
2059 ))
2060 })?;
2061
2062 Ok(())
2063 } else {
2064 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2065 "DHT not enabled".to_string().into(),
2066 )))
2067 }
2068 }
2069
2070 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2072 if let Some(ref dht) = self.dht {
2073 let dht_instance = dht.write().await;
2074 let record_result = dht_instance.get(&key).await;
2075
2076 let value = record_result.as_ref().map(|record| record.value.clone());
2077
2078 Ok(value)
2079 } else {
2080 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2081 "DHT not enabled".to_string().into(),
2082 )))
2083 }
2084 }
2085
2086 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2088 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2089 let mut manager = bootstrap_manager.write().await;
2090 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2091 .iter()
2092 .filter_map(|addr| addr.parse().ok())
2093 .collect();
2094 let contact = ContactEntry::new(peer_id, socket_addresses);
2095 manager.add_contact(contact).await.map_err(|e| {
2096 P2PError::Network(crate::error::NetworkError::ProtocolError(
2097 format!("Failed to add peer to bootstrap cache: {e}").into(),
2098 ))
2099 })?;
2100 }
2101 Ok(())
2102 }
2103
2104 pub async fn update_peer_metrics(
2106 &self,
2107 peer_id: &PeerId,
2108 success: bool,
2109 latency_ms: Option<u64>,
2110 _error: Option<String>,
2111 ) -> Result<()> {
2112 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2113 let mut manager = bootstrap_manager.write().await;
2114
2115 let metrics = QualityMetrics {
2117 success_rate: if success { 1.0 } else { 0.0 },
2118 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2119 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2121 last_successful_connection: if success {
2122 chrono::Utc::now()
2123 } else {
2124 chrono::Utc::now() - chrono::Duration::hours(1)
2125 },
2126 uptime_score: 0.5,
2127 };
2128
2129 manager
2130 .update_contact_metrics(peer_id, metrics)
2131 .await
2132 .map_err(|e| {
2133 P2PError::Network(crate::error::NetworkError::ProtocolError(
2134 format!("Failed to update peer metrics: {e}").into(),
2135 ))
2136 })?;
2137 }
2138 Ok(())
2139 }
2140
2141 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2143 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2144 let manager = bootstrap_manager.read().await;
2145 let stats = manager.get_stats().await.map_err(|e| {
2146 P2PError::Network(crate::error::NetworkError::ProtocolError(
2147 format!("Failed to get bootstrap stats: {e}").into(),
2148 ))
2149 })?;
2150 Ok(Some(stats))
2151 } else {
2152 Ok(None)
2153 }
2154 }
2155
2156 pub async fn cached_peer_count(&self) -> usize {
2158 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2159 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2160 {
2161 return stats.total_contacts;
2162 }
2163 0
2164 }
2165
2166 async fn connect_bootstrap_peers(&self) -> Result<()> {
2168 let mut bootstrap_contacts = Vec::new();
2169 let mut used_cache = false;
2170
2171 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2173 let manager = bootstrap_manager.read().await;
2174 match manager.get_bootstrap_peers(20).await {
2175 Ok(contacts) => {
2177 if !contacts.is_empty() {
2178 info!("Using {} cached bootstrap peers", contacts.len());
2179 bootstrap_contacts = contacts;
2180 used_cache = true;
2181 }
2182 }
2183 Err(e) => {
2184 warn!("Failed to get cached bootstrap peers: {}", e);
2185 }
2186 }
2187 }
2188
2189 if bootstrap_contacts.is_empty() {
2191 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2192 &self.config.bootstrap_peers_str
2193 } else {
2194 &self
2196 .config
2197 .bootstrap_peers
2198 .iter()
2199 .map(|addr| addr.to_string())
2200 .collect::<Vec<_>>()
2201 };
2202
2203 if bootstrap_peers.is_empty() {
2204 info!("No bootstrap peers configured and no cached peers available");
2205 return Ok(());
2206 }
2207
2208 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
2209
2210 for addr in bootstrap_peers {
2211 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2212 let contact = ContactEntry::new(
2213 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
2214 vec![socket_addr],
2215 );
2216 bootstrap_contacts.push(contact);
2217 } else {
2218 warn!("Invalid bootstrap address format: {}", addr);
2219 }
2220 }
2221 }
2222
2223 let mut successful_connections = 0;
2225 for contact in bootstrap_contacts {
2226 for addr in &contact.addresses {
2227 match self.connect_peer(&addr.to_string()).await {
2228 Ok(peer_id) => {
2229 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2230 successful_connections += 1;
2231
2232 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2234 let mut manager = bootstrap_manager.write().await;
2235 let mut updated_contact = contact.clone();
2236 updated_contact.peer_id = peer_id.clone();
2237 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2240 warn!("Failed to update bootstrap cache: {}", e);
2241 }
2242 }
2243 break; }
2245 Err(e) => {
2246 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2247
2248 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2250 let mut manager = bootstrap_manager.write().await;
2251 let mut updated_contact = contact.clone();
2252 updated_contact.update_connection_result(
2253 false,
2254 None,
2255 Some(e.to_string()),
2256 );
2257
2258 if let Err(e) = manager.add_contact(updated_contact).await {
2259 warn!("Failed to update bootstrap cache: {}", e);
2260 }
2261 }
2262 }
2263 }
2264 }
2265 }
2266
2267 if successful_connections == 0 {
2268 if !used_cache {
2269 warn!("Failed to connect to any bootstrap peers");
2270 }
2271 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2272 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2274 }));
2275 } else {
2276 info!(
2277 "Successfully connected to {} bootstrap peers",
2278 successful_connections
2279 );
2280 }
2281
2282 Ok(())
2283 }
2284
2285 async fn disconnect_all_peers(&self) -> Result<()> {
2287 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2288
2289 for peer_id in peer_ids {
2290 self.disconnect_peer(&peer_id).await?;
2291 }
2292
2293 Ok(())
2294 }
2295
2296 async fn periodic_tasks(&self) -> Result<()> {
2298 Ok(())
2304 }
2305
2306 pub async fn discover_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2308 if let Some(ref _mcp_server) = self.mcp_server {
2309 _mcp_server.discover_remote_services().await
2310 } else {
2311 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2312 "MCP server not enabled".to_string().into(),
2313 )))
2314 }
2315 }
2316
2317 pub async fn get_all_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2319 if let Some(ref _mcp_server) = self.mcp_server {
2320 _mcp_server.get_all_services().await
2321 } else {
2322 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2323 "MCP server not enabled".to_string().into(),
2324 )))
2325 }
2326 }
2327
2328 pub async fn find_mcp_services_with_tool(
2330 &self,
2331 tool_name: &str,
2332 ) -> Result<Vec<crate::mcp::MCPService>> {
2333 if let Some(ref _mcp_server) = self.mcp_server {
2334 _mcp_server.find_services_with_tool(tool_name).await
2335 } else {
2336 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2337 "MCP server not enabled".to_string().into(),
2338 )))
2339 }
2340 }
2341
2342 pub async fn announce_mcp_services(&self) -> Result<()> {
2344 if let Some(ref _mcp_server) = self.mcp_server {
2345 _mcp_server.announce_local_services().await
2346 } else {
2347 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2348 "MCP server not enabled".to_string().into(),
2349 )))
2350 }
2351 }
2352
2353 pub async fn refresh_mcp_service_discovery(&self) -> Result<()> {
2355 if let Some(ref _mcp_server) = self.mcp_server {
2356 _mcp_server.refresh_service_discovery().await
2357 } else {
2358 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2359 "MCP server not enabled".to_string().into(),
2360 )))
2361 }
2362 }
2363
2364 pub async fn query_peer_mcp_services(&self, peer_id: &PeerId) -> Result<()> {
2366 if self.mcp_server.is_none() {
2367 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2368 "MCP server not enabled".to_string().into(),
2369 )));
2370 }
2371
2372 let discovery_query = crate::mcp::P2PMCPMessage {
2373 message_type: crate::mcp::P2PMCPMessageType::ServiceDiscovery,
2374 message_id: uuid::Uuid::new_v4().to_string(),
2375 source_peer: self.peer_id.clone(),
2376 target_peer: Some(peer_id.clone()),
2377 timestamp: std::time::SystemTime::now()
2378 .duration_since(std::time::UNIX_EPOCH)
2379 .unwrap_or_default()
2380 .as_secs(),
2381 payload: crate::mcp::MCPMessage::ListTools { cursor: None },
2382 ttl: 3,
2383 };
2384
2385 let query_data = serde_json::to_vec(&discovery_query)
2386 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2387
2388 self.send_message(peer_id, MCP_PROTOCOL, query_data).await?;
2389 debug!("Sent MCP service discovery query to peer {}", peer_id);
2390
2391 Ok(())
2392 }
2393
2394 pub async fn broadcast_mcp_service_discovery(&self) -> Result<()> {
2396 if self.mcp_server.is_none() {
2397 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2398 "MCP server not enabled".to_string().into(),
2399 )));
2400 }
2401
2402 let peer_list: Vec<PeerId> = {
2404 let peers_guard = self.peers.read().await;
2405 peers_guard.keys().cloned().collect()
2406 };
2407
2408 if peer_list.is_empty() {
2409 debug!("No peers connected for MCP service discovery broadcast");
2410 return Ok(());
2411 }
2412
2413 let mut successful_queries = 0;
2415 for peer_id in &peer_list {
2416 match self.query_peer_mcp_services(peer_id).await {
2417 Ok(_) => {
2418 successful_queries += 1;
2419 debug!("Sent MCP service discovery query to peer: {}", peer_id);
2420 }
2421 Err(e) => {
2422 warn!(
2423 "Failed to send MCP service discovery query to peer {}: {}",
2424 peer_id, e
2425 );
2426 }
2427 }
2428 }
2429
2430 info!(
2431 "Broadcast MCP service discovery to {}/{} connected peers",
2432 successful_queries,
2433 peer_list.len()
2434 );
2435
2436 Ok(())
2437 }
2438}
2439
2440#[derive(Clone)]
2442pub struct P2PNetworkSender {
2443 peer_id: PeerId,
2444 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2446}
2447
2448impl P2PNetworkSender {
2449 pub fn new(
2450 peer_id: PeerId,
2451 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2452 ) -> Self {
2453 Self { peer_id, send_tx }
2454 }
2455}
2456
2457#[async_trait::async_trait]
2459impl NetworkSender for P2PNetworkSender {
2460 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2462 self.send_tx
2463 .send((peer_id.clone(), protocol.to_string(), data))
2464 .map_err(|_| {
2465 P2PError::Network(crate::error::NetworkError::ProtocolError(
2466 "Failed to send message via channel".to_string().into(),
2467 ))
2468 })?;
2469 Ok(())
2470 }
2471
2472 fn local_peer_id(&self) -> &PeerId {
2474 &self.peer_id
2475 }
2476}
2477
2478pub struct NodeBuilder {
2480 config: NodeConfig,
2481}
2482
2483impl Default for NodeBuilder {
2484 fn default() -> Self {
2485 Self::new()
2486 }
2487}
2488
2489impl NodeBuilder {
2490 pub fn new() -> Self {
2492 Self {
2493 config: NodeConfig::default(),
2494 }
2495 }
2496
2497 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2499 self.config.peer_id = Some(peer_id);
2500 self
2501 }
2502
2503 pub fn listen_on(mut self, addr: &str) -> Self {
2505 if let Ok(multiaddr) = addr.parse() {
2506 self.config.listen_addrs.push(multiaddr);
2507 }
2508 self
2509 }
2510
2511 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2513 if let Ok(multiaddr) = addr.parse() {
2514 self.config.bootstrap_peers.push(multiaddr);
2515 }
2516 self.config.bootstrap_peers_str.push(addr.to_string());
2517 self
2518 }
2519
2520 pub fn with_ipv6(mut self, enable: bool) -> Self {
2522 self.config.enable_ipv6 = enable;
2523 self
2524 }
2525
2526 pub fn with_mcp_server(mut self) -> Self {
2528 self.config.enable_mcp_server = true;
2529 self
2530 }
2531
2532 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
2534 self.config.mcp_server_config = Some(mcp_config);
2535 self.config.enable_mcp_server = true;
2536 self
2537 }
2538
2539 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2541 self.config.connection_timeout = timeout;
2542 self
2543 }
2544
2545 pub fn with_max_connections(mut self, max: usize) -> Self {
2547 self.config.max_connections = max;
2548 self
2549 }
2550
2551 pub fn with_production_mode(mut self) -> Self {
2553 self.config.production_config = Some(ProductionConfig::default());
2554 self
2555 }
2556
2557 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2559 self.config.production_config = Some(production_config);
2560 self
2561 }
2562
2563 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2565 self.config.dht_config = dht_config;
2566 self
2567 }
2568
2569 pub fn with_default_dht(mut self) -> Self {
2571 self.config.dht_config = DHTConfig::default();
2572 self
2573 }
2574
2575 pub async fn build(self) -> Result<P2PNode> {
2577 P2PNode::new(self.config).await
2578 }
2579}
2580
2581#[allow(dead_code)] async fn handle_received_message_standalone(
2584 message_data: Vec<u8>,
2585 peer_id: &PeerId,
2586 protocol: &str,
2587 event_tx: &broadcast::Sender<P2PEvent>,
2588 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2589) -> Result<()> {
2590 if protocol == MCP_PROTOCOL {
2592 return handle_mcp_message_standalone(message_data, peer_id, mcp_server).await;
2593 }
2594
2595 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2597 Ok(message) => {
2598 if let (Some(protocol), Some(data), Some(from)) = (
2599 message.get("protocol").and_then(|v| v.as_str()),
2600 message.get("data").and_then(|v| v.as_array()),
2601 message.get("from").and_then(|v| v.as_str()),
2602 ) {
2603 let data_bytes: Vec<u8> = data
2605 .iter()
2606 .filter_map(|v| v.as_u64().map(|n| n as u8))
2607 .collect();
2608
2609 let event = P2PEvent::Message {
2611 topic: protocol.to_string(),
2612 source: from.to_string(),
2613 data: data_bytes,
2614 };
2615
2616 let _ = event_tx.send(event);
2617 debug!("Generated message event from peer: {}", peer_id);
2618 }
2619 }
2620 Err(e) => {
2621 warn!("Failed to parse received message from {}: {}", peer_id, e);
2622 }
2623 }
2624
2625 Ok(())
2626}
2627
2628#[allow(dead_code)] async fn handle_mcp_message_standalone(
2631 message_data: Vec<u8>,
2632 peer_id: &PeerId,
2633 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2634) -> Result<()> {
2635 if let Some(_mcp_server) = mcp_server {
2636 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
2638 Ok(p2p_mcp_message) => {
2639 use crate::mcp::P2PMCPMessageType;
2641 match p2p_mcp_message.message_type {
2642 P2PMCPMessageType::Request => {
2643 debug!("Received MCP request from peer {}", peer_id);
2644 }
2647 P2PMCPMessageType::Response => {
2648 debug!("Received MCP response from peer {}", peer_id);
2649 }
2651 P2PMCPMessageType::ServiceAdvertisement => {
2652 debug!("Received service advertisement from peer {}", peer_id);
2653 }
2655 P2PMCPMessageType::ServiceDiscovery => {
2656 debug!("Received service discovery query from peer {}", peer_id);
2657 }
2659 P2PMCPMessageType::Heartbeat => {
2660 debug!("Received heartbeat from peer {}", peer_id);
2661 }
2663 P2PMCPMessageType::HealthCheck => {
2664 debug!("Received health check from peer {}", peer_id);
2665 }
2667 }
2668 }
2669 Err(e) => {
2670 warn!(
2671 "Failed to deserialize MCP message from peer {}: {}",
2672 peer_id, e
2673 );
2674 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
2675 format!("Invalid MCP message: {e}").into(),
2676 )));
2677 }
2678 }
2679 } else {
2680 warn!("Received MCP message but MCP server is not enabled");
2681 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2682 "MCP server not enabled".to_string().into(),
2683 )));
2684 }
2685
2686 Ok(())
2687}
2688
2689fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2691 match create_protocol_message_static(protocol, data) {
2692 Ok(msg) => Some(msg),
2693 Err(e) => {
2694 warn!("Failed to create protocol message: {}", e);
2695 None
2696 }
2697 }
2698}
2699
2700async fn handle_message_send_result(result: Result<()>, peer_id: &PeerId) {
2702 match result {
2703 Ok(_) => {
2704 debug!("Message sent to peer {} via transport layer", peer_id);
2705 }
2706 Err(e) => {
2707 warn!("Failed to send message to peer {}: {}", peer_id, e);
2708 }
2709 }
2710}
2711
2712#[allow(dead_code)] fn check_rate_limit(
2715 rate_limiter: &RateLimiter,
2716 socket_addr: &std::net::SocketAddr,
2717 remote_addr: &NetworkAddress,
2718) -> Result<()> {
2719 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2720 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2721 e
2722 })
2723}
2724
2725#[allow(dead_code)] async fn register_new_peer(
2728 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2729 peer_id: &PeerId,
2730 remote_addr: &NetworkAddress,
2731) {
2732 let mut peers_guard = peers.write().await;
2733 let peer_info = PeerInfo {
2734 peer_id: peer_id.clone(),
2735 addresses: vec![remote_addr.to_string()],
2736 connected_at: tokio::time::Instant::now(),
2737 last_seen: tokio::time::Instant::now(),
2738 status: ConnectionStatus::Connected,
2739 protocols: vec!["p2p-chat/1.0.0".to_string()],
2740 heartbeat_count: 0,
2741 };
2742 peers_guard.insert(peer_id.clone(), peer_info);
2743}
2744
2745#[allow(dead_code)] fn spawn_connection_handler(
2748 connection: Box<dyn crate::transport::Connection>,
2749 peer_id: PeerId,
2750 event_tx: broadcast::Sender<P2PEvent>,
2751 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2752 mcp_server: Option<Arc<MCPServer>>,
2753) {
2754 tokio::spawn(async move {
2755 handle_peer_connection(connection, peer_id, event_tx, peers, mcp_server).await;
2756 });
2757}
2758
2759#[allow(dead_code)] async fn handle_peer_connection(
2762 mut connection: Box<dyn crate::transport::Connection>,
2763 peer_id: PeerId,
2764 event_tx: broadcast::Sender<P2PEvent>,
2765 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2766 mcp_server: Option<Arc<MCPServer>>,
2767) {
2768 loop {
2769 match connection.receive().await {
2770 Ok(message_data) => {
2771 debug!(
2772 "Received {} bytes from peer: {}",
2773 message_data.len(),
2774 peer_id
2775 );
2776
2777 if let Err(e) = handle_received_message_standalone(
2779 message_data,
2780 &peer_id,
2781 "unknown", &event_tx,
2783 &mcp_server,
2784 )
2785 .await
2786 {
2787 warn!("Failed to handle message from {}: {}", peer_id, e);
2788 }
2789 }
2790 Err(e) => {
2791 warn!("Failed to receive message from {}: {}", peer_id, e);
2792
2793 if !connection.is_alive().await {
2795 info!("Connection to {} is dead, removing peer", peer_id);
2796
2797 remove_peer(&peers, &peer_id).await;
2799
2800 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2802
2803 break; }
2805
2806 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2808 }
2809 }
2810 }
2811}
2812
2813#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2816 let mut peers_guard = peers.write().await;
2817 peers_guard.remove(peer_id);
2818}
2819
2820async fn update_peer_heartbeat(
2822 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2823 peer_id: &PeerId,
2824) -> Result<()> {
2825 let mut peers_guard = peers.write().await;
2826 match peers_guard.get_mut(peer_id) {
2827 Some(peer_info) => {
2828 peer_info.last_seen = Instant::now();
2829 peer_info.heartbeat_count += 1;
2830 Ok(())
2831 }
2832 None => {
2833 warn!("Received heartbeat from unknown peer: {}", peer_id);
2834 Err(P2PError::Network(NetworkError::PeerNotFound(
2835 format!("Peer {} not found", peer_id).into(),
2836 )))
2837 }
2838 }
2839}
2840
2841async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f32) {
2843 if let Some(manager) = resource_manager {
2844 let metrics = manager.get_metrics().await;
2845 (metrics.memory_used, metrics.cpu_usage as f32)
2846 } else {
2847 (0, 0.0)
2848 }
2849}
2850
2851#[cfg(test)]
2852mod tests {
2853 use super::*;
2854 use crate::mcp::{
2855 MCPTool, Tool, ToolHandler, ToolHealthStatus, ToolMetadata, ToolRequirements,
2856 };
2857 use serde_json::json;
2858 use std::future::Future;
2859 use std::pin::Pin;
2860 use std::time::Duration;
2861 use tokio::time::timeout;
2862
2863 struct NetworkTestTool {
2865 name: String,
2866 }
2867
2868 impl NetworkTestTool {
2869 fn new(name: &str) -> Self {
2870 Self {
2871 name: name.to_string(),
2872 }
2873 }
2874 }
2875
2876 impl ToolHandler for NetworkTestTool {
2877 fn execute(
2878 &self,
2879 arguments: serde_json::Value,
2880 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
2881 let name = self.name.clone();
2882 Box::pin(async move {
2883 Ok(json!({
2884 "tool": name,
2885 "input": arguments,
2886 "result": "network test success"
2887 }))
2888 })
2889 }
2890
2891 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
2892 Ok(())
2893 }
2894
2895 fn get_requirements(&self) -> ToolRequirements {
2896 ToolRequirements::default()
2897 }
2898 }
2899
2900 fn create_test_node_config() -> NodeConfig {
2902 NodeConfig {
2903 peer_id: Some("test_peer_123".to_string()),
2904 listen_addrs: vec![
2905 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2906 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2907 ],
2908 listen_addr: std::net::SocketAddr::new(
2909 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2910 0,
2911 ),
2912 bootstrap_peers: vec![],
2913 bootstrap_peers_str: vec![],
2914 enable_ipv6: true,
2915 enable_mcp_server: true,
2916 mcp_server_config: Some(MCPServerConfig {
2917 enable_auth: false, enable_rate_limiting: false, ..Default::default()
2920 }),
2921 connection_timeout: Duration::from_secs(10),
2922 keep_alive_interval: Duration::from_secs(30),
2923 max_connections: 100,
2924 max_incoming_connections: 50,
2925 dht_config: DHTConfig::default(),
2926 security_config: SecurityConfig::default(),
2927 production_config: None,
2928 bootstrap_cache_config: None,
2929 identity_config: None,
2930 }
2931 }
2932
2933 fn create_test_tool(name: &str) -> Tool {
2935 Tool {
2936 definition: MCPTool {
2937 name: name.to_string(),
2938 description: format!("Test tool: {}", name).into(),
2939 input_schema: json!({
2940 "type": "object",
2941 "properties": {
2942 "input": { "type": "string" }
2943 }
2944 }),
2945 },
2946 handler: Box::new(NetworkTestTool::new(name)),
2947 metadata: ToolMetadata {
2948 created_at: SystemTime::now(),
2949 last_called: None,
2950 call_count: 0,
2951 avg_execution_time: Duration::from_millis(0),
2952 health_status: ToolHealthStatus::Healthy,
2953 tags: vec!["test".to_string()],
2954 },
2955 }
2956 }
2957
2958 #[tokio::test]
2959 async fn test_node_config_default() {
2960 let config = NodeConfig::default();
2961
2962 assert!(config.peer_id.is_none());
2963 assert_eq!(config.listen_addrs.len(), 2);
2964 assert!(config.enable_ipv6);
2965 assert!(config.enable_mcp_server);
2966 assert_eq!(config.max_connections, 1000);
2967 assert_eq!(config.max_incoming_connections, 100);
2968 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2969 }
2970
2971 #[tokio::test]
2972 async fn test_dht_config_default() {
2973 let config = DHTConfig::default();
2974
2975 assert_eq!(config.k_value, 20);
2976 assert_eq!(config.alpha_value, 5);
2977 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2978 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2979 }
2980
2981 #[tokio::test]
2982 async fn test_security_config_default() {
2983 let config = SecurityConfig::default();
2984
2985 assert!(config.enable_noise);
2986 assert!(config.enable_tls);
2987 assert_eq!(config.trust_level, TrustLevel::Basic);
2988 }
2989
2990 #[test]
2991 fn test_trust_level_variants() {
2992 let _none = TrustLevel::None;
2994 let _basic = TrustLevel::Basic;
2995 let _full = TrustLevel::Full;
2996
2997 assert_eq!(TrustLevel::None, TrustLevel::None);
2999 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3000 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3001 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3002 }
3003
3004 #[test]
3005 fn test_connection_status_variants() {
3006 let connecting = ConnectionStatus::Connecting;
3007 let connected = ConnectionStatus::Connected;
3008 let disconnecting = ConnectionStatus::Disconnecting;
3009 let disconnected = ConnectionStatus::Disconnected;
3010 let failed = ConnectionStatus::Failed("test error".to_string());
3011
3012 assert_eq!(connecting, ConnectionStatus::Connecting);
3013 assert_eq!(connected, ConnectionStatus::Connected);
3014 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3015 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3016 assert_ne!(connecting, connected);
3017
3018 if let ConnectionStatus::Failed(msg) = failed {
3019 assert_eq!(msg, "test error");
3020 } else {
3021 panic!("Expected Failed status");
3022 }
3023 }
3024
3025 #[tokio::test]
3026 async fn test_node_creation() -> Result<()> {
3027 let config = create_test_node_config();
3028 let node = P2PNode::new(config).await?;
3029
3030 assert_eq!(node.peer_id(), "test_peer_123");
3031 assert!(!node.is_running().await);
3032 assert_eq!(node.peer_count().await, 0);
3033 assert!(node.connected_peers().await.is_empty());
3034
3035 Ok(())
3036 }
3037
3038 #[tokio::test]
3039 async fn test_node_creation_without_peer_id() -> Result<()> {
3040 let mut config = create_test_node_config();
3041 config.peer_id = None;
3042
3043 let node = P2PNode::new(config).await?;
3044
3045 assert!(node.peer_id().starts_with("peer_"));
3047 assert!(!node.is_running().await);
3048
3049 Ok(())
3050 }
3051
3052 #[tokio::test]
3053 async fn test_node_lifecycle() -> Result<()> {
3054 let config = create_test_node_config();
3055 let node = P2PNode::new(config).await?;
3056
3057 assert!(!node.is_running().await);
3059
3060 node.start().await?;
3062 assert!(node.is_running().await);
3063
3064 let listen_addrs = node.listen_addrs().await;
3066 assert!(
3067 !listen_addrs.is_empty(),
3068 "Expected at least one listening address"
3069 );
3070
3071 node.stop().await?;
3073 assert!(!node.is_running().await);
3074
3075 Ok(())
3076 }
3077
3078 #[tokio::test]
3079 async fn test_peer_connection() -> Result<()> {
3080 let config = create_test_node_config();
3081 let node = P2PNode::new(config).await?;
3082
3083 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3084
3085 let peer_id = node.connect_peer(&peer_addr).await?;
3087 assert!(peer_id.starts_with("peer_from_"));
3088
3089 assert_eq!(node.peer_count().await, 1);
3091
3092 let connected_peers = node.connected_peers().await;
3094 assert_eq!(connected_peers.len(), 1);
3095 assert_eq!(connected_peers[0], peer_id);
3096
3097 let peer_info = node.peer_info(&peer_id).await;
3099 assert!(peer_info.is_some());
3100 let info = peer_info.expect("Peer info should exist after adding peer");
3101 assert_eq!(info.peer_id, peer_id);
3102 assert_eq!(info.status, ConnectionStatus::Connected);
3103 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3104
3105 node.disconnect_peer(&peer_id).await?;
3107 assert_eq!(node.peer_count().await, 0);
3108
3109 Ok(())
3110 }
3111
3112 #[tokio::test]
3113 async fn test_event_subscription() -> Result<()> {
3114 let config = create_test_node_config();
3115 let node = P2PNode::new(config).await?;
3116
3117 let mut events = node.subscribe_events();
3118 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3119
3120 let peer_id = node.connect_peer(&peer_addr).await?;
3122
3123 let event = timeout(Duration::from_millis(100), events.recv()).await;
3125 assert!(event.is_ok());
3126
3127 let event_result = event
3128 .expect("Should receive event")
3129 .expect("Event should not be error");
3130 match event_result {
3131 P2PEvent::PeerConnected(event_peer_id) => {
3132 assert_eq!(event_peer_id, peer_id);
3133 }
3134 _ => panic!("Expected PeerConnected event"),
3135 }
3136
3137 node.disconnect_peer(&peer_id).await?;
3139
3140 let event = timeout(Duration::from_millis(100), events.recv()).await;
3142 assert!(event.is_ok());
3143
3144 let event_result = event
3145 .expect("Should receive event")
3146 .expect("Event should not be error");
3147 match event_result {
3148 P2PEvent::PeerDisconnected(event_peer_id) => {
3149 assert_eq!(event_peer_id, peer_id);
3150 }
3151 _ => panic!("Expected PeerDisconnected event"),
3152 }
3153
3154 Ok(())
3155 }
3156
3157 #[tokio::test]
3158 async fn test_message_sending() -> Result<()> {
3159 let mut config1 = create_test_node_config();
3161 config1.listen_addr =
3162 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3163 let node1 = P2PNode::new(config1).await?;
3164 node1.start().await?;
3165
3166 let mut config2 = create_test_node_config();
3167 config2.listen_addr =
3168 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3169 let node2 = P2PNode::new(config2).await?;
3170 node2.start().await?;
3171
3172 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3174
3175 let node2_addr = node2.local_addr().ok_or_else(|| {
3177 P2PError::Network(crate::error::NetworkError::ProtocolError(
3178 "No listening address".to_string().into(),
3179 ))
3180 })?;
3181
3182 let peer_id = node1.connect_peer(&node2_addr).await?;
3184
3185 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3187
3188 let message_data = b"Hello, peer!".to_vec();
3190 let result = node1
3191 .send_message(&peer_id, "test-protocol", message_data)
3192 .await;
3193 if let Err(e) = &result {
3196 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3197 }
3198
3199 let non_existent_peer = "non_existent_peer".to_string();
3201 let result = node1
3202 .send_message(&non_existent_peer, "test-protocol", vec![])
3203 .await;
3204 assert!(result.is_err());
3205 assert!(result.unwrap_err().to_string().contains("not connected"));
3206
3207 Ok(())
3208 }
3209
3210 #[tokio::test]
3211 async fn test_mcp_integration() -> Result<()> {
3212 let config = create_test_node_config();
3213 let node = P2PNode::new(config).await?;
3214
3215 node.start().await?;
3217
3218 let tool = create_test_tool("network_test_tool");
3220 node.register_mcp_tool(tool).await?;
3221
3222 let tools = node.list_mcp_tools().await?;
3224 assert!(tools.contains(&"network_test_tool".to_string()));
3225
3226 let arguments = json!({"input": "test_input"});
3228 let result = node
3229 .call_mcp_tool("network_test_tool", arguments.clone())
3230 .await?;
3231 assert_eq!(result["tool"], "network_test_tool");
3232 assert_eq!(result["input"], arguments);
3233
3234 let stats = node.mcp_stats().await?;
3236 assert_eq!(stats.total_tools, 1);
3237
3238 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
3240 assert!(result.is_err());
3241
3242 node.stop().await?;
3243 Ok(())
3244 }
3245
3246 #[tokio::test]
3247 async fn test_remote_mcp_operations() -> Result<()> {
3248 let config = create_test_node_config();
3249 let node = P2PNode::new(config).await?;
3250
3251 node.start().await?;
3252
3253 let tool = create_test_tool("remote_test_tool");
3255 node.register_mcp_tool(tool).await?;
3256
3257 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
3258 let peer_id = node.connect_peer(&peer_addr).await?;
3259
3260 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
3262 assert!(!remote_tools.is_empty());
3263
3264 let arguments = json!({"input": "remote_test"});
3266 let result = node
3267 .call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone())
3268 .await?;
3269 assert_eq!(result["tool"], "remote_test_tool");
3270
3271 let services = node.discover_remote_mcp_services().await?;
3273 assert!(services.is_empty());
3275
3276 node.stop().await?;
3277 Ok(())
3278 }
3279
3280 #[tokio::test]
3281 async fn test_health_check() -> Result<()> {
3282 let config = create_test_node_config();
3283 let node = P2PNode::new(config).await?;
3284
3285 let result = node.health_check().await;
3287 assert!(result.is_ok());
3288
3289 for i in 0..5 {
3291 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
3292 node.connect_peer(&addr).await?;
3293 }
3294
3295 let result = node.health_check().await;
3297 assert!(result.is_ok());
3298
3299 Ok(())
3300 }
3301
3302 #[tokio::test]
3303 async fn test_node_uptime() -> Result<()> {
3304 let config = create_test_node_config();
3305 let node = P2PNode::new(config).await?;
3306
3307 let uptime1 = node.uptime();
3308 assert!(uptime1 >= Duration::from_secs(0));
3309
3310 tokio::time::sleep(Duration::from_millis(10)).await;
3312
3313 let uptime2 = node.uptime();
3314 assert!(uptime2 > uptime1);
3315
3316 Ok(())
3317 }
3318
3319 #[tokio::test]
3320 async fn test_node_config_access() -> Result<()> {
3321 let config = create_test_node_config();
3322 let expected_peer_id = config.peer_id.clone();
3323 let node = P2PNode::new(config).await?;
3324
3325 let node_config = node.config();
3326 assert_eq!(node_config.peer_id, expected_peer_id);
3327 assert_eq!(node_config.max_connections, 100);
3328 assert!(node_config.enable_mcp_server);
3329
3330 Ok(())
3331 }
3332
3333 #[tokio::test]
3334 async fn test_mcp_server_access() -> Result<()> {
3335 let config = create_test_node_config();
3336 let node = P2PNode::new(config).await?;
3337
3338 assert!(node.mcp_server().is_some());
3340
3341 let mut config = create_test_node_config();
3343 config.enable_mcp_server = false;
3344 let node_no_mcp = P2PNode::new(config).await?;
3345 assert!(node_no_mcp.mcp_server().is_none());
3346
3347 Ok(())
3348 }
3349
3350 #[tokio::test]
3351 async fn test_dht_access() -> Result<()> {
3352 let config = create_test_node_config();
3353 let node = P2PNode::new(config).await?;
3354
3355 assert!(node.dht().is_some());
3357
3358 Ok(())
3359 }
3360
3361 #[tokio::test]
3362 async fn test_node_builder() -> Result<()> {
3363 let node = P2PNode::builder()
3364 .with_peer_id("builder_test_peer".to_string())
3365 .listen_on("/ip4/127.0.0.1/tcp/9100")
3366 .listen_on("/ip6/::1/tcp/9100")
3367 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
3368 .with_ipv6(true)
3369 .with_mcp_server()
3370 .with_connection_timeout(Duration::from_secs(15))
3371 .with_max_connections(200)
3372 .build()
3373 .await?;
3374
3375 assert_eq!(node.peer_id(), "builder_test_peer");
3376 let config = node.config();
3377 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
3379 assert!(config.enable_ipv6);
3380 assert!(config.enable_mcp_server);
3381 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3382 assert_eq!(config.max_connections, 200);
3383
3384 Ok(())
3385 }
3386
3387 #[tokio::test]
3388 async fn test_node_builder_with_mcp_config() -> Result<()> {
3389 let mcp_config = MCPServerConfig {
3390 server_name: "test_mcp_server".to_string(),
3391 server_version: "1.0.0".to_string(),
3392 enable_dht_discovery: false,
3393 enable_auth: false,
3394 ..MCPServerConfig::default()
3395 };
3396
3397 let node = P2PNode::builder()
3398 .with_peer_id("mcp_config_test".to_string())
3399 .with_mcp_config(mcp_config.clone())
3400 .build()
3401 .await?;
3402
3403 assert_eq!(node.peer_id(), "mcp_config_test");
3404 let config = node.config();
3405 assert!(config.enable_mcp_server);
3406 assert!(config.mcp_server_config.is_some());
3407
3408 let node_mcp_config = config
3409 .mcp_server_config
3410 .as_ref()
3411 .expect("MCP server config should be present in test config");
3412 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
3413 assert!(!node_mcp_config.enable_auth);
3414
3415 Ok(())
3416 }
3417
3418 #[tokio::test]
3419 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
3420 let mut config = create_test_node_config();
3421 config.enable_mcp_server = false;
3422 let node = P2PNode::new(config).await?;
3423
3424 let tool = create_test_tool("test_tool");
3426 let result = node.register_mcp_tool(tool).await;
3427 assert!(result.is_err());
3428 assert!(
3429 result
3430 .unwrap_err()
3431 .to_string()
3432 .contains("MCP server not enabled")
3433 );
3434
3435 let result = node.call_mcp_tool("test_tool", json!({})).await;
3436 assert!(result.is_err());
3437 assert!(
3438 result
3439 .unwrap_err()
3440 .to_string()
3441 .contains("MCP server not enabled")
3442 );
3443
3444 let result = node.list_mcp_tools().await;
3445 assert!(result.is_err());
3446 assert!(
3447 result
3448 .unwrap_err()
3449 .to_string()
3450 .contains("MCP server not enabled")
3451 );
3452
3453 let result = node.mcp_stats().await;
3454 assert!(result.is_err());
3455 assert!(
3456 result
3457 .unwrap_err()
3458 .to_string()
3459 .contains("MCP server not enabled")
3460 );
3461
3462 Ok(())
3463 }
3464
3465 #[tokio::test]
3466 async fn test_bootstrap_peers() -> Result<()> {
3467 let mut config = create_test_node_config();
3468 config.bootstrap_peers = vec![
3469 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3470 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3471 ];
3472
3473 let node = P2PNode::new(config).await?;
3474
3475 node.start().await?;
3477
3478 let peer_count = node.peer_count().await;
3481 assert!(
3482 peer_count <= 2,
3483 "Peer count should not exceed bootstrap peer count"
3484 );
3485
3486 node.stop().await?;
3487 Ok(())
3488 }
3489
3490 #[tokio::test]
3491 async fn test_production_mode_disabled() -> Result<()> {
3492 let config = create_test_node_config();
3493 let node = P2PNode::new(config).await?;
3494
3495 assert!(!node.is_production_mode());
3496 assert!(node.production_config().is_none());
3497
3498 let result = node.resource_metrics().await;
3500 assert!(result.is_err());
3501 assert!(result.unwrap_err().to_string().contains("not enabled"));
3502
3503 Ok(())
3504 }
3505
3506 #[tokio::test]
3507 async fn test_network_event_variants() {
3508 let peer_id = "test_peer".to_string();
3510 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3511
3512 let _peer_connected = NetworkEvent::PeerConnected {
3513 peer_id: peer_id.clone(),
3514 addresses: vec![address.clone()],
3515 };
3516
3517 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3518 peer_id: peer_id.clone(),
3519 reason: "test disconnect".to_string(),
3520 };
3521
3522 let _message_received = NetworkEvent::MessageReceived {
3523 peer_id: peer_id.clone(),
3524 protocol: "test-protocol".to_string(),
3525 data: vec![1, 2, 3],
3526 };
3527
3528 let _connection_failed = NetworkEvent::ConnectionFailed {
3529 peer_id: Some(peer_id.clone()),
3530 address: address.clone(),
3531 error: "connection refused".to_string(),
3532 };
3533
3534 let _dht_stored = NetworkEvent::DHTRecordStored {
3535 key: vec![1, 2, 3],
3536 value: vec![4, 5, 6],
3537 };
3538
3539 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3540 key: vec![1, 2, 3],
3541 value: Some(vec![4, 5, 6]),
3542 };
3543 }
3544
3545 #[tokio::test]
3546 async fn test_peer_info_structure() {
3547 let peer_info = PeerInfo {
3548 peer_id: "test_peer".to_string(),
3549 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3550 connected_at: Instant::now(),
3551 last_seen: Instant::now(),
3552 status: ConnectionStatus::Connected,
3553 protocols: vec!["test-protocol".to_string()],
3554 heartbeat_count: 0,
3555 };
3556
3557 assert_eq!(peer_info.peer_id, "test_peer");
3558 assert_eq!(peer_info.addresses.len(), 1);
3559 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3560 assert_eq!(peer_info.protocols.len(), 1);
3561 }
3562
3563 #[tokio::test]
3564 async fn test_serialization() -> Result<()> {
3565 let config = create_test_node_config();
3567 let serialized = serde_json::to_string(&config)?;
3568 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3569
3570 assert_eq!(config.peer_id, deserialized.peer_id);
3571 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3572 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3573
3574 Ok(())
3575 }
3576}