1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23use crate::identity::manager::IdentityManagerConfig;
24use crate::mcp::{
25 HealthMonitorConfig, MCP_PROTOCOL, MCPCallContext, MCPServer, MCPServerConfig, NetworkSender,
26 Tool,
27};
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::P2PNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::HashMap;
38use std::sync::Arc;
39use std::time::{Duration, SystemTime};
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, info, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub enable_mcp_server: bool,
67
68 pub mcp_server_config: Option<MCPServerConfig>,
70
71 pub connection_timeout: Duration,
73
74 pub keep_alive_interval: Duration,
76
77 pub max_connections: usize,
79
80 pub max_incoming_connections: usize,
82
83 pub dht_config: DHTConfig,
85
86 pub security_config: SecurityConfig,
88
89 pub production_config: Option<ProductionConfig>,
91
92 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
94
95 pub identity_config: Option<IdentityManagerConfig>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct DHTConfig {
102 pub k_value: usize,
104
105 pub alpha_value: usize,
107
108 pub record_ttl: Duration,
110
111 pub refresh_interval: Duration,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SecurityConfig {
118 pub enable_noise: bool,
120
121 pub enable_tls: bool,
123
124 pub trust_level: TrustLevel,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
130pub enum TrustLevel {
131 None,
133 Basic,
135 Full,
137}
138
139impl NodeConfig {
140 pub fn new() -> Result<Self> {
146 let config = Config::default();
148
149 let listen_addr = config.listen_socket_addr()?;
151
152 let mut listen_addrs = vec![];
154
155 if config.network.ipv6_enabled {
157 let ipv6_addr = std::net::SocketAddr::new(
158 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
159 listen_addr.port(),
160 );
161 listen_addrs.push(ipv6_addr);
162 }
163
164 let ipv4_addr = std::net::SocketAddr::new(
166 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
167 listen_addr.port(),
168 );
169 listen_addrs.push(ipv4_addr);
170
171 Ok(Self {
172 peer_id: None,
173 listen_addrs,
174 listen_addr,
175 bootstrap_peers: Vec::new(),
176 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
177 enable_ipv6: config.network.ipv6_enabled,
178 enable_mcp_server: config.mcp.enabled,
179 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
181 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
182 max_connections: config.network.max_connections,
183 max_incoming_connections: config.security.connection_limit as usize,
184 dht_config: DHTConfig::default(),
185 security_config: SecurityConfig::default(),
186 production_config: None,
187 bootstrap_cache_config: None,
188 identity_config: None,
189 })
190 }
191}
192
193impl Default for NodeConfig {
194 fn default() -> Self {
195 let config = Config::default();
197
198 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
202 9000,
203 )
204 });
205
206 Self {
207 peer_id: None,
208 listen_addrs: vec![
209 std::net::SocketAddr::new(
210 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
211 listen_addr.port(),
212 ),
213 std::net::SocketAddr::new(
214 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
215 listen_addr.port(),
216 ),
217 ],
218 listen_addr,
219 bootstrap_peers: Vec::new(),
220 bootstrap_peers_str: Vec::new(),
221 enable_ipv6: config.network.ipv6_enabled,
222 enable_mcp_server: config.mcp.enabled,
223 mcp_server_config: None, connection_timeout: Duration::from_secs(config.network.connection_timeout),
225 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
226 max_connections: config.network.max_connections,
227 max_incoming_connections: config.security.connection_limit as usize,
228 dht_config: DHTConfig::default(),
229 security_config: SecurityConfig::default(),
230 production_config: None, bootstrap_cache_config: None,
232 identity_config: None, }
234 }
235}
236
237impl NodeConfig {
238 pub fn from_config(config: &Config) -> Result<Self> {
240 let listen_addr = config.listen_socket_addr()?;
241 let bootstrap_addrs = config.bootstrap_addrs()?;
242
243 let mut node_config = Self {
244 peer_id: None,
245 listen_addrs: vec![listen_addr],
246 listen_addr,
247 bootstrap_peers: bootstrap_addrs
248 .iter()
249 .map(|addr| addr.socket_addr())
250 .collect(),
251 bootstrap_peers_str: config
252 .network
253 .bootstrap_nodes
254 .iter()
255 .map(|addr| addr.to_string())
256 .collect(),
257 enable_ipv6: config.network.ipv6_enabled,
258 enable_mcp_server: config.mcp.enabled,
259 mcp_server_config: Some(MCPServerConfig {
260 server_name: "P2P-MCP-Server".to_string(),
261 server_version: "1.0.0".to_string(),
262 enable_dht_discovery: true,
263 max_concurrent_requests: 100,
264 request_timeout: Duration::from_secs(30),
265 enable_auth: false,
266 enable_rate_limiting: true,
267 rate_limit_rpm: 60,
268 enable_logging: true,
269 max_tool_execution_time: Duration::from_secs(60),
270 tool_memory_limit: 1024 * 1024 * 1024, health_monitor: HealthMonitorConfig::default(),
272 }),
273 connection_timeout: Duration::from_secs(config.network.connection_timeout),
274 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
275 max_connections: config.network.max_connections,
276 max_incoming_connections: config.security.connection_limit as usize,
277 dht_config: DHTConfig {
278 k_value: 20,
279 alpha_value: 3,
280 record_ttl: Duration::from_secs(3600),
281 refresh_interval: Duration::from_secs(900),
282 },
283 security_config: SecurityConfig {
284 enable_noise: true,
285 enable_tls: true,
286 trust_level: TrustLevel::Basic,
287 },
288 production_config: Some(ProductionConfig {
289 max_connections: config.network.max_connections,
290 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
293 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
294 health_check_interval: Duration::from_secs(30),
295 metrics_interval: Duration::from_secs(60),
296 enable_performance_tracking: true,
297 enable_auto_cleanup: true,
298 shutdown_timeout: Duration::from_secs(30),
299 rate_limits: crate::production::RateLimitConfig::default(),
300 }),
301 bootstrap_cache_config: None,
302 identity_config: Some(IdentityManagerConfig {
303 cache_ttl: Duration::from_secs(3600),
304 challenge_timeout: Duration::from_secs(30),
305 }),
306 };
307
308 if config.network.ipv6_enabled {
310 node_config.listen_addrs.push(std::net::SocketAddr::new(
311 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
312 listen_addr.port(),
313 ));
314 }
315
316 Ok(node_config)
317 }
318
319 pub fn with_listen_addr(addr: &str) -> Result<Self> {
321 let listen_addr: std::net::SocketAddr = addr
322 .parse()
323 .map_err(|e: std::net::AddrParseError| {
324 NetworkError::InvalidAddress(e.to_string().into())
325 })
326 .map_err(P2PError::Network)?;
327 let mut cfg = NodeConfig::default();
328 cfg.listen_addr = listen_addr;
329 cfg.listen_addrs = vec![listen_addr];
330 Ok(cfg)
331 }
332}
333
334impl Default for DHTConfig {
335 fn default() -> Self {
336 Self {
337 k_value: 20,
338 alpha_value: 5,
339 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
342 }
343}
344
345impl Default for SecurityConfig {
346 fn default() -> Self {
347 Self {
348 enable_noise: true,
349 enable_tls: true,
350 trust_level: TrustLevel::Basic,
351 }
352 }
353}
354
355#[derive(Debug, Clone)]
357pub struct PeerInfo {
358 pub peer_id: PeerId,
360
361 pub addresses: Vec<String>,
363
364 pub connected_at: Instant,
366
367 pub last_seen: Instant,
369
370 pub status: ConnectionStatus,
372
373 pub protocols: Vec<String>,
375
376 pub heartbeat_count: u64,
378}
379
380#[derive(Debug, Clone, PartialEq)]
382pub enum ConnectionStatus {
383 Connecting,
385 Connected,
387 Disconnecting,
389 Disconnected,
391 Failed(String),
393}
394
395#[derive(Debug, Clone)]
397pub enum NetworkEvent {
398 PeerConnected {
400 peer_id: PeerId,
402 addresses: Vec<String>,
404 },
405
406 PeerDisconnected {
408 peer_id: PeerId,
410 reason: String,
412 },
413
414 MessageReceived {
416 peer_id: PeerId,
418 protocol: String,
420 data: Vec<u8>,
422 },
423
424 ConnectionFailed {
426 peer_id: Option<PeerId>,
428 address: String,
430 error: String,
432 },
433
434 DHTRecordStored {
436 key: Vec<u8>,
438 value: Vec<u8>,
440 },
441
442 DHTRecordRetrieved {
444 key: Vec<u8>,
446 value: Option<Vec<u8>>,
448 },
449}
450
451#[derive(Debug, Clone)]
456pub enum P2PEvent {
457 Message {
459 topic: String,
461 source: PeerId,
463 data: Vec<u8>,
465 },
466 PeerConnected(PeerId),
468 PeerDisconnected(PeerId),
470}
471
472pub struct P2PNode {
474 config: NodeConfig,
476
477 peer_id: PeerId,
479
480 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
482
483 event_tx: broadcast::Sender<P2PEvent>,
485
486 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
488
489 start_time: Instant,
491
492 running: RwLock<bool>,
494
495 mcp_server: Option<Arc<MCPServer>>,
497
498 dht: Option<Arc<RwLock<DHT>>>,
500
501 resource_manager: Option<Arc<ResourceManager>>,
503
504 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
506
507 network_node: Arc<P2PNetworkNode>,
509
510 #[allow(dead_code)]
512 rate_limiter: Arc<RateLimiter>,
513}
514
515impl P2PNode {
516 pub fn new_for_tests() -> Self {
518 let (event_tx, _) = broadcast::channel(16);
519 Self {
520 config: NodeConfig::default(),
521 peer_id: "test_peer".to_string(),
522 peers: Arc::new(RwLock::new(HashMap::new())),
523 event_tx,
524 listen_addrs: RwLock::new(Vec::new()),
525 start_time: Instant::now(),
526 running: RwLock::new(false),
527 mcp_server: None,
528 dht: None,
529 resource_manager: None,
530 bootstrap_manager: None,
531 network_node: {
532 let bind: std::net::SocketAddr = "127.0.0.1:0"
534 .parse()
535 .unwrap_or(std::net::SocketAddr::from(([127, 0, 0, 1], 0)));
536 let cfg = ant_quic::QuicNodeConfig {
537 role: ant_quic::EndpointRole::Client,
538 bootstrap_nodes: vec![],
539 enable_coordinator: false,
540 max_connections: 1,
541 connection_timeout: std::time::Duration::from_millis(1),
542 stats_interval: std::time::Duration::from_millis(1),
543 auth_config: ant_quic::auth::AuthConfig::default(),
544 bind_addr: Some(bind),
545 };
546 let node = tokio::runtime::Handle::current()
547 .block_on(
548 crate::transport::ant_quic_adapter::P2PNetworkNode::new_with_config(
549 bind, cfg,
550 ),
551 )
552 .unwrap_or_else(|_| {
553 let fallback_bind = std::net::SocketAddr::from(([127, 0, 0, 1], 0));
555 #[allow(clippy::expect_used)]
556 {
557 tokio::runtime::Handle::current()
558 .block_on(crate::transport::ant_quic_adapter::P2PNetworkNode::new(
559 fallback_bind,
560 ))
561 .expect("ant-quic fallback creation failed")
562 }
563 });
564 Arc::new(node)
565 },
566 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
567 max_requests: 100,
568 burst_size: 100,
569 window: std::time::Duration::from_secs(1),
570 ..Default::default()
571 })),
572 }
573 }
574 pub async fn new(config: NodeConfig) -> Result<Self> {
576 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
577 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
579 });
580
581 let (event_tx, _) = broadcast::channel(1000);
582
583 let dht = if true {
585 let dht_config = crate::dht::DHTConfig {
587 replication_factor: config.dht_config.k_value,
588 bucket_size: config.dht_config.k_value,
589 alpha: config.dht_config.alpha_value,
590 record_ttl: config.dht_config.record_ttl,
591 bucket_refresh_interval: config.dht_config.refresh_interval,
592 republish_interval: config.dht_config.refresh_interval,
593 max_distance: 160, };
595 let dht_key = crate::dht::Key::new(peer_id.as_bytes());
596 let dht_instance = DHT::new(dht_key, dht_config);
597 Some(Arc::new(RwLock::new(dht_instance)))
598 } else {
599 None
600 };
601
602 let mcp_server = if config.enable_mcp_server {
604 let mcp_config = config
605 .mcp_server_config
606 .clone()
607 .unwrap_or_else(|| MCPServerConfig {
608 server_name: format!("P2P-MCP-{peer_id}"),
609 server_version: crate::VERSION.to_string(),
610 enable_dht_discovery: dht.is_some(),
611 ..MCPServerConfig::default()
612 });
613
614 let mut server = MCPServer::new(mcp_config);
615
616 if let Some(ref dht_instance) = dht {
618 server = server.with_dht(dht_instance.clone());
619 }
620
621 Some(Arc::new(server))
622 } else {
623 None
624 };
625
626 let resource_manager = config
628 .production_config
629 .clone()
630 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
631
632 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
634 match BootstrapManager::with_config(cache_config.clone()).await {
635 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
636 Err(e) => {
637 warn!(
638 "Failed to initialize bootstrap manager: {}, continuing without cache",
639 e
640 );
641 None
642 }
643 }
644 } else {
645 match BootstrapManager::new().await {
646 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
647 Err(e) => {
648 warn!(
649 "Failed to initialize bootstrap manager: {}, continuing without cache",
650 e
651 );
652 None
653 }
654 }
655 };
656
657 let bind_addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.listen_addr.port()));
660
661 let bootstrap_nodes: Vec<std::net::SocketAddr> = config
663 .bootstrap_peers_str
664 .iter()
665 .filter_map(|addr_str| addr_str.parse().ok())
666 .collect();
667
668 let quic_config = ant_quic::QuicNodeConfig {
669 role: if bootstrap_nodes.is_empty() {
670 ant_quic::EndpointRole::Server {
671 can_coordinate: false,
672 }
673 } else {
674 ant_quic::EndpointRole::Client
675 },
676 bootstrap_nodes,
677 enable_coordinator: false,
678 max_connections: config.max_connections.min(1000),
679 connection_timeout: config.connection_timeout,
680 stats_interval: std::time::Duration::from_secs(60),
681 auth_config: ant_quic::auth::AuthConfig::default(),
682 bind_addr: Some(bind_addr),
683 };
684
685 let network_node = Arc::new(
686 P2PNetworkNode::new_with_config(bind_addr, quic_config)
687 .await
688 .map_err(|e| {
689 P2PError::Transport(crate::error::TransportError::SetupFailed(
690 format!("Failed to create P2P network node: {}", e).into(),
691 ))
692 })?,
693 );
694
695 let rate_limiter = Arc::new(RateLimiter::new(
697 crate::validation::RateLimitConfig::default(),
698 ));
699
700 let node = Self {
701 config,
702 peer_id,
703 peers: Arc::new(RwLock::new(HashMap::new())),
704 event_tx,
705 listen_addrs: RwLock::new(Vec::new()),
706 start_time: Instant::now(),
707 running: RwLock::new(false),
708 mcp_server,
709 dht,
710 resource_manager,
711 bootstrap_manager,
712 network_node,
713 rate_limiter,
714 };
715 info!("Created P2P node with peer ID: {}", node.peer_id);
716
717 Ok(node)
722 }
723
724 pub fn builder() -> NodeBuilder {
726 NodeBuilder::new()
727 }
728
729 pub fn peer_id(&self) -> &PeerId {
731 &self.peer_id
732 }
733
734 pub async fn initialize_mcp_network(&self) -> Result<()> {
737 if let Some(ref _mcp_server) = self.mcp_server {
738 let (send_tx, mut send_rx) =
740 tokio::sync::mpsc::unbounded_channel::<(PeerId, String, Vec<u8>)>();
741
742 let network_sender = P2PNetworkSender::new(self.peer_id.clone(), send_tx);
744
745 _mcp_server
747 .set_network_sender(Arc::new(network_sender))
748 .await;
749
750 let network_node: Arc<crate::transport::ant_quic_adapter::P2PNetworkNode> =
752 Arc::clone(&self.network_node);
753 let _peer_id_for_task = self.peer_id.clone();
754 tokio::spawn(async move {
755 while let Some((peer_id, protocol, data)) = send_rx.recv().await {
756 debug!(
757 "Sending network message to {}: {} bytes on protocol {}",
758 peer_id,
759 data.len(),
760 protocol
761 );
762
763 let message_data = match handle_protocol_message_creation(&protocol, data) {
765 Some(msg) => msg,
766 None => continue,
767 };
768
769 let send_result = network_node
771 .send_to_peer_string(&peer_id, &message_data)
772 .await;
773 handle_message_send_result(
774 send_result.map_err(|e| {
775 P2PError::Transport(crate::error::TransportError::StreamError(
776 e.to_string().into(),
777 ))
778 }),
779 &peer_id,
780 )
781 .await;
782 }
783 });
784
785 info!(
786 "MCP network integration initialized for peer {}",
787 self.peer_id
788 );
789 }
790 Ok(())
791 }
792
793 pub fn local_addr(&self) -> Option<String> {
794 self.listen_addrs
795 .try_read()
796 .ok()
797 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
798 }
799
800 pub async fn subscribe(&self, topic: &str) -> Result<()> {
801 info!("Subscribed to topic: {}", topic);
804 Ok(())
805 }
806
807 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
808 info!(
809 "Publishing message to topic: {} ({} bytes)",
810 topic,
811 data.len()
812 );
813
814 let peer_list: Vec<PeerId> = {
816 let peers_guard = self.peers.read().await;
817 peers_guard.keys().cloned().collect()
818 };
819
820 if peer_list.is_empty() {
821 debug!("No peers connected, message will only be sent to local subscribers");
822 } else {
823 let mut send_count = 0;
825 for peer_id in &peer_list {
826 match self.send_message(peer_id, topic, data.to_vec()).await {
827 Ok(_) => {
828 send_count += 1;
829 debug!("Sent message to peer: {}", peer_id);
830 }
831 Err(e) => {
832 warn!("Failed to send message to peer {}: {}", peer_id, e);
833 }
834 }
835 }
836 info!(
837 "Published message to {}/{} connected peers",
838 send_count,
839 peer_list.len()
840 );
841 }
842
843 let event = P2PEvent::Message {
845 topic: topic.to_string(),
846 source: self.peer_id.clone(),
847 data: data.to_vec(),
848 };
849 let _ = self.event_tx.send(event);
850
851 Ok(())
852 }
853
854 pub fn config(&self) -> &NodeConfig {
856 &self.config
857 }
858
859 pub async fn start(&self) -> Result<()> {
861 info!("Starting P2P node...");
862
863 if let Some(ref resource_manager) = self.resource_manager {
865 resource_manager.start().await.map_err(|e| {
866 P2PError::Network(crate::error::NetworkError::ProtocolError(
867 format!("Failed to start resource manager: {e}").into(),
868 ))
869 })?;
870 info!("Production resource manager started");
871 }
872
873 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
875 let mut manager = bootstrap_manager.write().await;
876 manager.start_background_tasks().await.map_err(|e| {
877 P2PError::Network(crate::error::NetworkError::ProtocolError(
878 format!("Failed to start bootstrap manager: {e}").into(),
879 ))
880 })?;
881 info!("Bootstrap cache manager started");
882 }
883
884 *self.running.write().await = true;
886
887 self.start_network_listeners().await?;
889
890 let listen_addrs = self.listen_addrs.read().await;
892 info!("P2P node started on addresses: {:?}", *listen_addrs);
893
894 self.initialize_mcp_network().await?;
896
897 if let Some(ref _mcp_server) = self.mcp_server {
899 _mcp_server.start().await.map_err(|e| {
900 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
901 format!("Failed to start MCP server: {e}").into(),
902 ))
903 })?;
904 info!("MCP server started with network integration");
905 }
906
907 self.start_message_receiving_system().await?;
909
910 self.connect_bootstrap_peers().await?;
912
913 Ok(())
914 }
915
916 async fn start_network_listeners(&self) -> Result<()> {
918 info!("Starting network listeners...");
919
920 let _network_node = &self.network_node;
922
923 for &socket_addr in &self.config.listen_addrs {
925 if let Err(e) = self.start_listener_on_address(socket_addr).await {
928 warn!("Failed to start listener on {}: {}", socket_addr, e);
929 } else {
930 info!("Started listener on {}", socket_addr);
931 }
932 }
933
934 if self.config.listen_addrs.is_empty() {
936 let mut default_addrs = vec![std::net::SocketAddr::new(
938 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
939 9000,
940 )];
941
942 if self.config.enable_ipv6 {
944 default_addrs.push(std::net::SocketAddr::new(
945 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
946 9000,
947 ));
948 }
949
950 for addr in default_addrs {
951 if let Err(e) = self.start_listener_on_address(addr).await {
952 warn!("Failed to start default listener on {}: {}", addr, e);
953 } else {
954 info!("Started default listener on {}", addr);
955 }
956 }
957 }
958
959 Ok(())
960 }
961
962 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
964 warn!("QUIC transport temporarily disabled during ant-quic migration");
1003 Err(crate::P2PError::Transport(
1005 crate::error::TransportError::SetupFailed(
1006 format!(
1007 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1008 )
1009 .into(),
1010 ),
1011 ))
1012 }
1013
1014 #[allow(dead_code)] async fn start_connection_acceptor(
1017 &self,
1018 transport: Arc<dyn crate::transport::Transport>,
1019 addr: std::net::SocketAddr,
1020 transport_type: crate::transport::TransportType,
1021 ) -> Result<()> {
1022 info!(
1023 "Starting connection acceptor for {:?} on {}",
1024 transport_type, addr
1025 );
1026
1027 let event_tx = self.event_tx.clone();
1029 let _peer_id = self.peer_id.clone();
1030 let peers = Arc::clone(&self.peers);
1031 let _network_node = Arc::clone(&self.network_node);
1032 let mcp_server = self.mcp_server.clone();
1033 let rate_limiter = Arc::clone(&self.rate_limiter);
1034
1035 tokio::spawn(async move {
1037 loop {
1038 match transport.accept().await {
1039 Ok(connection) => {
1040 let remote_addr = connection.remote_addr();
1041 let connection_peer_id =
1042 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1043
1044 let socket_addr = remote_addr.socket_addr();
1046 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1047 continue;
1049 }
1050
1051 info!(
1052 "Accepted {:?} connection from {} (peer: {})",
1053 transport_type, remote_addr, connection_peer_id
1054 );
1055
1056 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1058
1059 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1061
1062 spawn_connection_handler(
1064 connection,
1065 connection_peer_id,
1066 event_tx.clone(),
1067 Arc::clone(&peers),
1068 mcp_server.clone(),
1069 );
1070 }
1071 Err(e) => {
1072 warn!(
1073 "Failed to accept {:?} connection on {}: {}",
1074 transport_type, addr, e
1075 );
1076
1077 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1079 }
1080 }
1081 }
1082 });
1083
1084 info!(
1085 "Connection acceptor background task started for {:?} on {}",
1086 transport_type, addr
1087 );
1088 Ok(())
1089 }
1090
1091 async fn start_message_receiving_system(&self) -> Result<()> {
1093 info!("Message receiving system initialized (background tasks simplified for demo)");
1094
1095 Ok(())
1100 }
1101
1102 #[allow(dead_code)]
1104 async fn handle_received_message(
1105 &self,
1106 message_data: Vec<u8>,
1107 peer_id: &PeerId,
1108 protocol: &str,
1109 event_tx: &broadcast::Sender<P2PEvent>,
1110 ) -> Result<()> {
1111 if protocol == MCP_PROTOCOL {
1113 return self.handle_mcp_message(message_data, peer_id).await;
1114 }
1115
1116 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1118 Ok(message) => {
1119 if let (Some(protocol), Some(data), Some(from)) = (
1120 message.get("protocol").and_then(|v| v.as_str()),
1121 message.get("data").and_then(|v| v.as_array()),
1122 message.get("from").and_then(|v| v.as_str()),
1123 ) {
1124 let data_bytes: Vec<u8> = data
1126 .iter()
1127 .filter_map(|v| v.as_u64().map(|n| n as u8))
1128 .collect();
1129
1130 let event = P2PEvent::Message {
1132 topic: protocol.to_string(),
1133 source: from.to_string(),
1134 data: data_bytes,
1135 };
1136
1137 let _ = event_tx.send(event);
1138 debug!("Generated message event from peer: {}", peer_id);
1139 }
1140 }
1141 Err(e) => {
1142 warn!("Failed to parse received message from {}: {}", peer_id, e);
1143 }
1144 }
1145
1146 Ok(())
1147 }
1148
1149 #[allow(dead_code)]
1151 async fn handle_mcp_message(&self, message_data: Vec<u8>, peer_id: &PeerId) -> Result<()> {
1152 if let Some(ref _mcp_server) = self.mcp_server {
1153 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
1155 Ok(p2p_mcp_message) => {
1156 debug!(
1157 "Received MCP message from peer {}: {:?}",
1158 peer_id, p2p_mcp_message.message_type
1159 );
1160
1161 match p2p_mcp_message.message_type {
1163 crate::mcp::P2PMCPMessageType::Request => {
1164 self.handle_mcp_tool_request(p2p_mcp_message, peer_id)
1166 .await?;
1167 }
1168 crate::mcp::P2PMCPMessageType::Response => {
1169 self.handle_mcp_tool_response(p2p_mcp_message).await?;
1171 }
1172 crate::mcp::P2PMCPMessageType::ServiceAdvertisement => {
1173 self.handle_mcp_service_advertisement(p2p_mcp_message, peer_id)
1175 .await?;
1176 }
1177 crate::mcp::P2PMCPMessageType::ServiceDiscovery => {
1178 self.handle_mcp_service_discovery(p2p_mcp_message, peer_id)
1180 .await?;
1181 }
1182 crate::mcp::P2PMCPMessageType::Heartbeat => {
1183 debug!("Received heartbeat from peer {}", peer_id);
1185
1186 let _ =
1188 update_peer_heartbeat(&self.peers, peer_id)
1189 .await
1190 .map_err(|e| {
1191 debug!(
1192 "Failed to update heartbeat for peer {}: {}",
1193 peer_id, e
1194 )
1195 });
1196
1197 let timestamp = std::time::SystemTime::now()
1199 .duration_since(std::time::UNIX_EPOCH)
1200 .map_err(|e| {
1201 P2PError::Network(NetworkError::ProtocolError(
1202 format!("System time error: {}", e).into(),
1203 ))
1204 })?
1205 .as_secs();
1206
1207 let ack_data = serde_json::to_vec(&serde_json::json!({
1208 "type": "heartbeat_ack",
1209 "timestamp": timestamp
1210 }))
1211 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1212
1213 let _ = self
1214 .send_message(peer_id, MCP_PROTOCOL, ack_data)
1215 .await
1216 .map_err(|e| {
1217 warn!("Failed to send heartbeat ack to {}: {}", peer_id, e)
1218 });
1219 }
1220 crate::mcp::P2PMCPMessageType::HealthCheck => {
1221 debug!("Received health check from peer {}", peer_id);
1223
1224 let peers_count = self.peers.read().await.len();
1226 let uptime = self.start_time.elapsed();
1227
1228 let (memory_usage, cpu_usage) =
1230 get_resource_metrics(&self.resource_manager).await;
1231
1232 let timestamp = std::time::SystemTime::now()
1234 .duration_since(std::time::UNIX_EPOCH)
1235 .map_err(|e| {
1236 P2PError::Network(NetworkError::ProtocolError(
1237 format!("System time error: {}", e).into(),
1238 ))
1239 })?
1240 .as_secs();
1241
1242 let health_response = serde_json::json!({
1243 "type": "health_check_response",
1244 "status": "healthy",
1245 "peer_id": self.peer_id,
1246 "peers_count": peers_count,
1247 "uptime_secs": uptime.as_secs(),
1248 "memory_usage_bytes": memory_usage,
1249 "cpu_usage_percent": cpu_usage,
1250 "timestamp": timestamp
1251 });
1252
1253 let response_data = serde_json::to_vec(&health_response)
1254 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1255
1256 if let Err(e) = self
1258 .send_message(peer_id, MCP_PROTOCOL, response_data)
1259 .await
1260 {
1261 warn!("Failed to send health check response to {}: {}", peer_id, e);
1262 }
1263 }
1264 }
1265 }
1266 Err(e) => {
1267 warn!(
1268 "Failed to deserialize MCP message from peer {}: {}",
1269 peer_id, e
1270 );
1271 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1272 format!("Invalid MCP message: {e}").into(),
1273 )));
1274 }
1275 }
1276 } else {
1277 warn!("Received MCP message but MCP server is not enabled");
1278 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1279 "MCP server not enabled".to_string().into(),
1280 )));
1281 }
1282
1283 Ok(())
1284 }
1285
1286 #[allow(dead_code)]
1288 async fn handle_mcp_tool_request(
1289 &self,
1290 message: crate::mcp::P2PMCPMessage,
1291 peer_id: &PeerId,
1292 ) -> Result<()> {
1293 if let Some(ref _mcp_server) = self.mcp_server {
1294 if let crate::mcp::MCPMessage::CallTool { name, arguments } = message.payload {
1296 debug!(
1297 "Handling MCP tool request for '{}' from peer {}",
1298 name, peer_id
1299 );
1300
1301 let context = MCPCallContext {
1303 caller_id: peer_id.clone(),
1304 timestamp: std::time::SystemTime::now(),
1305 timeout: Duration::from_secs(30),
1306 auth_info: None,
1307 metadata: std::collections::HashMap::new(),
1308 };
1309
1310 match _mcp_server.call_tool(&name, arguments, context).await {
1312 Ok(result) => {
1313 let response_message = crate::mcp::P2PMCPMessage {
1315 message_type: crate::mcp::P2PMCPMessageType::Response,
1316 message_id: message.message_id,
1317 source_peer: self.peer_id.clone(),
1318 target_peer: Some(peer_id.clone()),
1319 timestamp: std::time::SystemTime::now()
1320 .duration_since(std::time::UNIX_EPOCH)
1321 .unwrap_or_default()
1322 .as_secs(),
1323 payload: crate::mcp::MCPMessage::CallToolResult {
1324 content: vec![crate::mcp::MCPContent::Text {
1325 text: serde_json::to_string(&result).unwrap_or_default(),
1326 }],
1327 is_error: false,
1328 },
1329 ttl: 5,
1330 };
1331
1332 let response_data = serde_json::to_vec(&response_message)
1334 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1335
1336 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1337 .await?;
1338 debug!("Sent MCP tool response to peer {}", peer_id);
1339 }
1340 Err(e) => {
1341 let error_message = crate::mcp::P2PMCPMessage {
1343 message_type: crate::mcp::P2PMCPMessageType::Response,
1344 message_id: message.message_id,
1345 source_peer: self.peer_id.clone(),
1346 target_peer: Some(peer_id.clone()),
1347 timestamp: std::time::SystemTime::now()
1348 .duration_since(std::time::UNIX_EPOCH)
1349 .unwrap_or_default()
1350 .as_secs(),
1351 payload: crate::mcp::MCPMessage::CallToolResult {
1352 content: vec![crate::mcp::MCPContent::Text {
1353 text: format!("Error: {e}"),
1354 }],
1355 is_error: true,
1356 },
1357 ttl: 5,
1358 };
1359
1360 let error_data = serde_json::to_vec(&error_message)
1361 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1362
1363 self.send_message(peer_id, MCP_PROTOCOL, error_data).await?;
1364 warn!("Sent MCP error response to peer {}: {}", peer_id, e);
1365 }
1366 }
1367 }
1368 }
1369
1370 Ok(())
1371 }
1372
1373 #[allow(dead_code)]
1375 async fn handle_mcp_tool_response(&self, message: crate::mcp::P2PMCPMessage) -> Result<()> {
1376 if let Some(ref _mcp_server) = self.mcp_server {
1377 debug!("Received MCP tool response: {}", message.message_id);
1379 }
1382
1383 Ok(())
1384 }
1385
1386 #[allow(dead_code)]
1388 async fn handle_mcp_service_advertisement(
1389 &self,
1390 message: crate::mcp::P2PMCPMessage,
1391 peer_id: &PeerId,
1392 ) -> Result<()> {
1393 debug!("Received MCP service advertisement from peer {}", peer_id);
1394
1395 if let Some(ref _mcp_server) = self.mcp_server {
1396 _mcp_server.handle_service_advertisement(message).await?;
1398 debug!("Processed service advertisement from peer {}", peer_id);
1399 } else {
1400 warn!("Received MCP service advertisement but MCP server is not enabled");
1401 }
1402
1403 Ok(())
1404 }
1405
1406 #[allow(dead_code)]
1408 async fn handle_mcp_service_discovery(
1409 &self,
1410 message: crate::mcp::P2PMCPMessage,
1411 peer_id: &PeerId,
1412 ) -> Result<()> {
1413 debug!("Received MCP service discovery query from peer {}", peer_id);
1414
1415 if let Some(ref _mcp_server) = self.mcp_server {
1416 if let Ok(Some(response_data)) = _mcp_server.handle_service_discovery(message).await {
1418 self.send_message(peer_id, MCP_PROTOCOL, response_data)
1420 .await?;
1421 debug!("Sent service discovery response to peer {}", peer_id);
1422 }
1423 } else {
1424 warn!("Received MCP service discovery query but MCP server is not enabled");
1425 }
1426
1427 Ok(())
1428 }
1429
1430 pub async fn run(&self) -> Result<()> {
1432 if !*self.running.read().await {
1433 self.start().await?;
1434 }
1435
1436 info!("P2P node running...");
1437
1438 loop {
1440 if !*self.running.read().await {
1441 break;
1442 }
1443
1444 self.periodic_tasks().await?;
1446
1447 tokio::time::sleep(Duration::from_millis(100)).await;
1449 }
1450
1451 info!("P2P node stopped");
1452 Ok(())
1453 }
1454
1455 pub async fn stop(&self) -> Result<()> {
1457 info!("Stopping P2P node...");
1458
1459 *self.running.write().await = false;
1461
1462 if let Some(ref _mcp_server) = self.mcp_server {
1464 _mcp_server.shutdown().await.map_err(|e| {
1465 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1466 format!("Failed to shutdown MCP server: {e}").into(),
1467 ))
1468 })?;
1469 info!("MCP server stopped");
1470 }
1471
1472 self.disconnect_all_peers().await?;
1474
1475 if let Some(ref resource_manager) = self.resource_manager {
1477 resource_manager.shutdown().await.map_err(|e| {
1478 P2PError::Network(crate::error::NetworkError::ProtocolError(
1479 format!("Failed to shutdown resource manager: {e}").into(),
1480 ))
1481 })?;
1482 info!("Production resource manager stopped");
1483 }
1484
1485 info!("P2P node stopped");
1486 Ok(())
1487 }
1488
1489 pub async fn shutdown(&self) -> Result<()> {
1491 self.stop().await
1492 }
1493
1494 pub async fn is_running(&self) -> bool {
1496 *self.running.read().await
1497 }
1498
1499 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1501 self.listen_addrs.read().await.clone()
1502 }
1503
1504 pub async fn connected_peers(&self) -> Vec<PeerId> {
1506 self.peers.read().await.keys().cloned().collect()
1507 }
1508
1509 pub async fn peer_count(&self) -> usize {
1511 self.peers.read().await.len()
1512 }
1513
1514 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1516 self.peers.read().await.get(peer_id).cloned()
1517 }
1518
1519 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1521 info!("Connecting to peer at: {}", address);
1522
1523 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1525 Some(resource_manager.acquire_connection().await?)
1526 } else {
1527 None
1528 };
1529
1530 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1532 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1533 format!("{}: {}", address, e).into(),
1534 ))
1535 })?;
1536
1537 let peer_id = {
1539 match self.network_node.connect_to_peer_string(_socket_addr).await {
1540 Ok(connected_peer_id) => {
1541 info!("Successfully connected to peer: {}", connected_peer_id);
1542 connected_peer_id
1543 }
1544 Err(e) => {
1545 warn!("Failed to connect to peer at {}: {}", address, e);
1546
1547 let demo_peer_id =
1550 format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
1551 warn!(
1552 "Using demo peer ID: {} (transport connection failed)",
1553 demo_peer_id
1554 );
1555 demo_peer_id
1556 }
1557 }
1558 };
1559
1560 let peer_info = PeerInfo {
1562 peer_id: peer_id.clone(),
1563 addresses: vec![address.to_string()],
1564 connected_at: Instant::now(),
1565 last_seen: Instant::now(),
1566 status: ConnectionStatus::Connected,
1567 protocols: vec!["p2p-foundation/1.0".to_string()],
1568 heartbeat_count: 0,
1569 };
1570
1571 self.peers.write().await.insert(peer_id.clone(), peer_info);
1573
1574 if let Some(ref resource_manager) = self.resource_manager {
1576 resource_manager.record_bandwidth(0, 0); }
1578
1579 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1581
1582 info!("Connected to peer: {}", peer_id);
1583 Ok(peer_id)
1584 }
1585
1586 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1588 info!("Disconnecting from peer: {}", peer_id);
1589
1590 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1591 peer_info.status = ConnectionStatus::Disconnected;
1592
1593 let _ = self
1595 .event_tx
1596 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1597
1598 info!("Disconnected from peer: {}", peer_id);
1599 }
1600
1601 Ok(())
1602 }
1603
1604 pub async fn send_message(
1606 &self,
1607 peer_id: &PeerId,
1608 protocol: &str,
1609 data: Vec<u8>,
1610 ) -> Result<()> {
1611 debug!(
1612 "Sending message to peer {} on protocol {}",
1613 peer_id, protocol
1614 );
1615
1616 if let Some(ref resource_manager) = self.resource_manager
1618 && !resource_manager
1619 .check_rate_limit(peer_id, "message")
1620 .await?
1621 {
1622 return Err(P2PError::ResourceExhausted(
1623 format!("Rate limit exceeded for peer {}", peer_id).into(),
1624 ));
1625 }
1626
1627 if !self.peers.read().await.contains_key(peer_id) {
1629 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1630 peer_id.to_string().into(),
1631 )));
1632 }
1633
1634 if protocol == MCP_PROTOCOL {
1636 if data.len() < 4 {
1638 return Err(P2PError::Network(
1639 crate::error::NetworkError::ProtocolError(
1640 "Invalid MCP message: too short".to_string().into(),
1641 ),
1642 ));
1643 }
1644
1645 let message_type = data.first().unwrap_or(&0);
1647 if *message_type > 10 {
1648 return Err(P2PError::Network(
1650 crate::error::NetworkError::ProtocolError(
1651 "Invalid MCP message type".to_string().into(),
1652 ),
1653 ));
1654 }
1655
1656 debug!("Validated MCP message for network transmission");
1657 }
1658
1659 if let Some(ref resource_manager) = self.resource_manager {
1661 resource_manager.record_bandwidth(data.len() as u64, 0);
1662 }
1663
1664 let _message_data = self.create_protocol_message(protocol, data)?;
1666
1667 {
1669 match self.network_node.send_message(peer_id, _message_data).await {
1670 Ok(_) => {
1671 debug!("Message sent to peer {} via transport layer", peer_id);
1672 }
1673 Err(e) => {
1674 warn!("Failed to send message to peer {}: {}", peer_id, e);
1675 return Err(P2PError::Network(
1676 crate::error::NetworkError::ProtocolError(
1677 format!("Message send failed: {e}").into(),
1678 ),
1679 ));
1680 }
1681 }
1682 Ok(())
1683 }
1684 }
1685
1686 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1688 use serde_json::json;
1689
1690 let timestamp = std::time::SystemTime::now()
1691 .duration_since(std::time::UNIX_EPOCH)
1692 .map_err(|e| {
1693 P2PError::Network(NetworkError::ProtocolError(
1694 format!("System time error: {}", e).into(),
1695 ))
1696 })?
1697 .as_secs();
1698
1699 let message = json!({
1701 "protocol": protocol,
1702 "data": data,
1703 "from": self.peer_id,
1704 "timestamp": timestamp
1705 });
1706
1707 serde_json::to_vec(&message).map_err(|e| {
1708 P2PError::Transport(crate::error::TransportError::StreamError(
1709 format!("Failed to serialize message: {e}").into(),
1710 ))
1711 })
1712 }
1713
1714 }
1716
1717#[allow(dead_code)]
1719fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1720 use serde_json::json;
1721
1722 let timestamp = std::time::SystemTime::now()
1723 .duration_since(std::time::UNIX_EPOCH)
1724 .map_err(|e| {
1725 P2PError::Network(NetworkError::ProtocolError(
1726 format!("System time error: {}", e).into(),
1727 ))
1728 })?
1729 .as_secs();
1730
1731 let message = json!({
1733 "protocol": protocol,
1734 "data": data,
1735 "timestamp": timestamp
1736 });
1737
1738 serde_json::to_vec(&message).map_err(|e| {
1739 P2PError::Transport(crate::error::TransportError::StreamError(
1740 format!("Failed to serialize message: {e}").into(),
1741 ))
1742 })
1743}
1744
1745impl P2PNode {
1746 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1748 self.event_tx.subscribe()
1749 }
1750
1751 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1753 self.subscribe_events()
1754 }
1755
1756 pub fn uptime(&self) -> Duration {
1758 self.start_time.elapsed()
1759 }
1760
1761 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1763 self.mcp_server.as_ref()
1764 }
1765
1766 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1768 if let Some(ref _mcp_server) = self.mcp_server {
1769 let tool_name = tool.definition.name.clone();
1770 _mcp_server.register_tool(tool).await.map_err(|e| {
1771 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1772 format!("{}: Registration failed: {e}", tool_name).into(),
1773 ))
1774 })
1775 } else {
1776 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1777 "MCP server not enabled".to_string().into(),
1778 )))
1779 }
1780 }
1781
1782 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1784 if let Some(ref _mcp_server) = self.mcp_server {
1785 if let Some(ref resource_manager) = self.resource_manager
1787 && !resource_manager
1788 .check_rate_limit(&self.peer_id, "mcp")
1789 .await?
1790 {
1791 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1792 "MCP rate limit exceeded".to_string().into(),
1793 )));
1794 }
1795
1796 let context = MCPCallContext {
1797 caller_id: self.peer_id.clone(),
1798 timestamp: SystemTime::now(),
1799 timeout: Duration::from_secs(30),
1800 auth_info: None,
1801 metadata: HashMap::new(),
1802 };
1803
1804 _mcp_server
1805 .call_tool(tool_name, arguments, context)
1806 .await
1807 .map_err(|e| {
1808 P2PError::Mcp(crate::error::McpError::ToolExecutionFailed(
1809 format!("{}: Execution failed: {e}", tool_name).into(),
1810 ))
1811 })
1812 } else {
1813 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1814 "MCP server not enabled".to_string().into(),
1815 )))
1816 }
1817 }
1818
1819 pub async fn call_remote_mcp_tool(
1821 &self,
1822 peer_id: &PeerId,
1823 tool_name: &str,
1824 arguments: Value,
1825 ) -> Result<Value> {
1826 if let Some(ref _mcp_server) = self.mcp_server {
1827 if peer_id == &self.peer_id {
1829 let context = MCPCallContext {
1831 caller_id: self.peer_id.clone(),
1832 timestamp: SystemTime::now(),
1833 timeout: Duration::from_secs(30),
1834 auth_info: None,
1835 metadata: HashMap::new(),
1836 };
1837
1838 return _mcp_server.call_tool(tool_name, arguments, context).await;
1840 }
1841
1842 let context = MCPCallContext {
1846 caller_id: self.peer_id.clone(),
1847 timestamp: SystemTime::now(),
1848 timeout: Duration::from_secs(30),
1849 auth_info: None,
1850 metadata: HashMap::new(),
1851 };
1852
1853 match _mcp_server
1855 .call_tool(tool_name, arguments.clone(), context)
1856 .await
1857 {
1858 Ok(mut result) => {
1859 if let Value::Object(ref mut map) = result {
1861 map.insert("tool".to_string(), Value::String(tool_name.to_string()));
1862 }
1863 Ok(result)
1864 }
1865 Err(e) => Err(e),
1866 }
1867 } else {
1868 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1869 "MCP server not enabled".to_string().into(),
1870 )))
1871 }
1872 }
1873
1874 #[allow(dead_code)]
1876 async fn handle_mcp_remote_tool_call(
1877 &self,
1878 peer_id: &PeerId,
1879 tool_name: &str,
1880 arguments: Value,
1881 context: MCPCallContext,
1882 ) -> Result<Value> {
1883 let request_id = uuid::Uuid::new_v4().to_string();
1884
1885 let mcp_message = crate::mcp::MCPMessage::CallTool {
1887 name: tool_name.to_string(),
1888 arguments,
1889 };
1890
1891 let p2p_message = crate::mcp::P2PMCPMessage {
1893 message_type: crate::mcp::P2PMCPMessageType::Request,
1894 message_id: request_id.clone(),
1895 source_peer: context.caller_id.clone(),
1896 target_peer: Some(peer_id.clone()),
1897 timestamp: context
1898 .timestamp
1899 .duration_since(std::time::UNIX_EPOCH)
1900 .map_err(|e| {
1901 P2PError::Network(crate::error::NetworkError::ProtocolError(
1902 format!("Time error: {e}").into(),
1903 ))
1904 })?
1905 .as_secs(),
1906 payload: mcp_message,
1907 ttl: 5, };
1909
1910 let message_data = serde_json::to_vec(&p2p_message)
1912 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
1913
1914 if message_data.len() > crate::mcp::MAX_MESSAGE_SIZE {
1915 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
1916 "Message too large".to_string().into(),
1917 )));
1918 }
1919
1920 self.send_message(peer_id, MCP_PROTOCOL, message_data)
1922 .await?;
1923
1924 info!(
1926 "MCP remote tool call sent to peer {}, tool: {}",
1927 peer_id, tool_name
1928 );
1929
1930 Ok(serde_json::json!({
1933 "status": "sent",
1934 "message": "Remote tool call sent successfully",
1935 "peer_id": peer_id,
1936 "tool": tool_name, "request_id": request_id
1938 }))
1939 }
1940
1941 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1943 if let Some(ref _mcp_server) = self.mcp_server {
1944 let (tools, _) = _mcp_server.list_tools(None).await.map_err(|e| {
1945 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1946 format!("Failed to list tools: {e}").into(),
1947 ))
1948 })?;
1949
1950 Ok(tools.into_iter().map(|tool| tool.name).collect())
1951 } else {
1952 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1953 "MCP server not enabled".to_string().into(),
1954 )))
1955 }
1956 }
1957
1958 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1960 if let Some(ref _mcp_server) = self.mcp_server {
1961 _mcp_server.discover_remote_services().await.map_err(|e| {
1962 P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1963 format!("Failed to discover services: {e}").into(),
1964 ))
1965 })
1966 } else {
1967 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1968 "MCP server not enabled".to_string().into(),
1969 )))
1970 }
1971 }
1972
1973 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1975 if let Some(ref _mcp_server) = self.mcp_server {
1976 if peer_id == &self.peer_id {
1978 return self.list_mcp_tools().await;
1979 }
1980
1981 self.list_mcp_tools().await
1985 } else {
1986 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1987 "MCP server not enabled".to_string().into(),
1988 )))
1989 }
1990 }
1991
1992 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1994 if let Some(ref _mcp_server) = self.mcp_server {
1995 Ok(_mcp_server.get_stats().await)
1996 } else {
1997 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
1998 "MCP server not enabled".to_string().into(),
1999 )))
2000 }
2001 }
2002
2003 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2005 if let Some(ref resource_manager) = self.resource_manager {
2006 Ok(resource_manager.get_metrics().await)
2007 } else {
2008 Err(P2PError::Network(
2009 crate::error::NetworkError::ProtocolError(
2010 "Production resource manager not enabled".to_string().into(),
2011 ),
2012 ))
2013 }
2014 }
2015
2016 pub async fn health_check(&self) -> Result<()> {
2018 if let Some(ref resource_manager) = self.resource_manager {
2019 resource_manager.health_check().await
2020 } else {
2021 let peer_count = self.peer_count().await;
2023 if peer_count > self.config.max_connections {
2024 Err(P2PError::Network(
2025 crate::error::NetworkError::ProtocolError(
2026 format!("Too many connections: {peer_count}").into(),
2027 ),
2028 ))
2029 } else {
2030 Ok(())
2031 }
2032 }
2033 }
2034
2035 pub fn production_config(&self) -> Option<&ProductionConfig> {
2037 self.config.production_config.as_ref()
2038 }
2039
2040 pub fn is_production_mode(&self) -> bool {
2042 self.resource_manager.is_some()
2043 }
2044
2045 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2047 self.dht.as_ref()
2048 }
2049
2050 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2052 if let Some(ref dht) = self.dht {
2053 let dht_instance = dht.write().await;
2054 dht_instance
2055 .put(key.clone(), value.clone())
2056 .await
2057 .map_err(|e| {
2058 P2PError::Dht(crate::error::DhtError::StoreFailed(
2059 format!("{}: {e}", key).into(),
2060 ))
2061 })?;
2062
2063 Ok(())
2064 } else {
2065 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2066 "DHT not enabled".to_string().into(),
2067 )))
2068 }
2069 }
2070
2071 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2073 if let Some(ref dht) = self.dht {
2074 let dht_instance = dht.write().await;
2075 let record_result = dht_instance.get(&key).await;
2076
2077 let value = record_result.as_ref().map(|record| record.value.clone());
2078
2079 Ok(value)
2080 } else {
2081 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2082 "DHT not enabled".to_string().into(),
2083 )))
2084 }
2085 }
2086
2087 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2089 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2090 let mut manager = bootstrap_manager.write().await;
2091 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2092 .iter()
2093 .filter_map(|addr| addr.parse().ok())
2094 .collect();
2095 let contact = ContactEntry::new(peer_id, socket_addresses);
2096 manager.add_contact(contact).await.map_err(|e| {
2097 P2PError::Network(crate::error::NetworkError::ProtocolError(
2098 format!("Failed to add peer to bootstrap cache: {e}").into(),
2099 ))
2100 })?;
2101 }
2102 Ok(())
2103 }
2104
2105 pub async fn update_peer_metrics(
2107 &self,
2108 peer_id: &PeerId,
2109 success: bool,
2110 latency_ms: Option<u64>,
2111 _error: Option<String>,
2112 ) -> Result<()> {
2113 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2114 let mut manager = bootstrap_manager.write().await;
2115
2116 let metrics = QualityMetrics {
2118 success_rate: if success { 1.0 } else { 0.0 },
2119 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2120 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2122 last_successful_connection: if success {
2123 chrono::Utc::now()
2124 } else {
2125 chrono::Utc::now() - chrono::Duration::hours(1)
2126 },
2127 uptime_score: 0.5,
2128 };
2129
2130 manager
2131 .update_contact_metrics(peer_id, metrics)
2132 .await
2133 .map_err(|e| {
2134 P2PError::Network(crate::error::NetworkError::ProtocolError(
2135 format!("Failed to update peer metrics: {e}").into(),
2136 ))
2137 })?;
2138 }
2139 Ok(())
2140 }
2141
2142 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2144 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2145 let manager = bootstrap_manager.read().await;
2146 let stats = manager.get_stats().await.map_err(|e| {
2147 P2PError::Network(crate::error::NetworkError::ProtocolError(
2148 format!("Failed to get bootstrap stats: {e}").into(),
2149 ))
2150 })?;
2151 Ok(Some(stats))
2152 } else {
2153 Ok(None)
2154 }
2155 }
2156
2157 pub async fn cached_peer_count(&self) -> usize {
2159 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2160 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2161 {
2162 return stats.total_contacts;
2163 }
2164 0
2165 }
2166
2167 async fn connect_bootstrap_peers(&self) -> Result<()> {
2169 let mut bootstrap_contacts = Vec::new();
2170 let mut used_cache = false;
2171
2172 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2174 let manager = bootstrap_manager.read().await;
2175 match manager.get_bootstrap_peers(20).await {
2176 Ok(contacts) => {
2178 if !contacts.is_empty() {
2179 info!("Using {} cached bootstrap peers", contacts.len());
2180 bootstrap_contacts = contacts;
2181 used_cache = true;
2182 }
2183 }
2184 Err(e) => {
2185 warn!("Failed to get cached bootstrap peers: {}", e);
2186 }
2187 }
2188 }
2189
2190 if bootstrap_contacts.is_empty() {
2192 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2193 &self.config.bootstrap_peers_str
2194 } else {
2195 &self
2197 .config
2198 .bootstrap_peers
2199 .iter()
2200 .map(|addr| addr.to_string())
2201 .collect::<Vec<_>>()
2202 };
2203
2204 if bootstrap_peers.is_empty() {
2205 info!("No bootstrap peers configured and no cached peers available");
2206 return Ok(());
2207 }
2208
2209 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
2210
2211 for addr in bootstrap_peers {
2212 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2213 let contact = ContactEntry::new(
2214 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
2215 vec![socket_addr],
2216 );
2217 bootstrap_contacts.push(contact);
2218 } else {
2219 warn!("Invalid bootstrap address format: {}", addr);
2220 }
2221 }
2222 }
2223
2224 let mut successful_connections = 0;
2226 for contact in bootstrap_contacts {
2227 for addr in &contact.addresses {
2228 match self.connect_peer(&addr.to_string()).await {
2229 Ok(peer_id) => {
2230 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2231 successful_connections += 1;
2232
2233 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2235 let mut manager = bootstrap_manager.write().await;
2236 let mut updated_contact = contact.clone();
2237 updated_contact.peer_id = peer_id.clone();
2238 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2241 warn!("Failed to update bootstrap cache: {}", e);
2242 }
2243 }
2244 break; }
2246 Err(e) => {
2247 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2248
2249 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2251 let mut manager = bootstrap_manager.write().await;
2252 let mut updated_contact = contact.clone();
2253 updated_contact.update_connection_result(
2254 false,
2255 None,
2256 Some(e.to_string()),
2257 );
2258
2259 if let Err(e) = manager.add_contact(updated_contact).await {
2260 warn!("Failed to update bootstrap cache: {}", e);
2261 }
2262 }
2263 }
2264 }
2265 }
2266 }
2267
2268 if successful_connections == 0 {
2269 if !used_cache {
2270 warn!("Failed to connect to any bootstrap peers");
2271 }
2272 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2273 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2275 }));
2276 } else {
2277 info!(
2278 "Successfully connected to {} bootstrap peers",
2279 successful_connections
2280 );
2281 }
2282
2283 Ok(())
2284 }
2285
2286 async fn disconnect_all_peers(&self) -> Result<()> {
2288 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2289
2290 for peer_id in peer_ids {
2291 self.disconnect_peer(&peer_id).await?;
2292 }
2293
2294 Ok(())
2295 }
2296
2297 async fn periodic_tasks(&self) -> Result<()> {
2299 Ok(())
2305 }
2306
2307 pub async fn discover_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2309 if let Some(ref _mcp_server) = self.mcp_server {
2310 _mcp_server.discover_remote_services().await
2311 } else {
2312 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2313 "MCP server not enabled".to_string().into(),
2314 )))
2315 }
2316 }
2317
2318 pub async fn get_all_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
2320 if let Some(ref _mcp_server) = self.mcp_server {
2321 _mcp_server.get_all_services().await
2322 } else {
2323 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2324 "MCP server not enabled".to_string().into(),
2325 )))
2326 }
2327 }
2328
2329 pub async fn find_mcp_services_with_tool(
2331 &self,
2332 tool_name: &str,
2333 ) -> Result<Vec<crate::mcp::MCPService>> {
2334 if let Some(ref _mcp_server) = self.mcp_server {
2335 _mcp_server.find_services_with_tool(tool_name).await
2336 } else {
2337 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2338 "MCP server not enabled".to_string().into(),
2339 )))
2340 }
2341 }
2342
2343 pub async fn announce_mcp_services(&self) -> Result<()> {
2345 if let Some(ref _mcp_server) = self.mcp_server {
2346 _mcp_server.announce_local_services().await
2347 } else {
2348 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2349 "MCP server not enabled".to_string().into(),
2350 )))
2351 }
2352 }
2353
2354 pub async fn refresh_mcp_service_discovery(&self) -> Result<()> {
2356 if let Some(ref _mcp_server) = self.mcp_server {
2357 _mcp_server.refresh_service_discovery().await
2358 } else {
2359 Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2360 "MCP server not enabled".to_string().into(),
2361 )))
2362 }
2363 }
2364
2365 pub async fn query_peer_mcp_services(&self, peer_id: &PeerId) -> Result<()> {
2367 if self.mcp_server.is_none() {
2368 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2369 "MCP server not enabled".to_string().into(),
2370 )));
2371 }
2372
2373 let discovery_query = crate::mcp::P2PMCPMessage {
2374 message_type: crate::mcp::P2PMCPMessageType::ServiceDiscovery,
2375 message_id: uuid::Uuid::new_v4().to_string(),
2376 source_peer: self.peer_id.clone(),
2377 target_peer: Some(peer_id.clone()),
2378 timestamp: std::time::SystemTime::now()
2379 .duration_since(std::time::UNIX_EPOCH)
2380 .unwrap_or_default()
2381 .as_secs(),
2382 payload: crate::mcp::MCPMessage::ListTools { cursor: None },
2383 ttl: 3,
2384 };
2385
2386 let query_data = serde_json::to_vec(&discovery_query)
2387 .map_err(|e| P2PError::Serialization(e.to_string().into()))?;
2388
2389 self.send_message(peer_id, MCP_PROTOCOL, query_data).await?;
2390 debug!("Sent MCP service discovery query to peer {}", peer_id);
2391
2392 Ok(())
2393 }
2394
2395 pub async fn broadcast_mcp_service_discovery(&self) -> Result<()> {
2397 if self.mcp_server.is_none() {
2398 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2399 "MCP server not enabled".to_string().into(),
2400 )));
2401 }
2402
2403 let peer_list: Vec<PeerId> = {
2405 let peers_guard = self.peers.read().await;
2406 peers_guard.keys().cloned().collect()
2407 };
2408
2409 if peer_list.is_empty() {
2410 debug!("No peers connected for MCP service discovery broadcast");
2411 return Ok(());
2412 }
2413
2414 let mut successful_queries = 0;
2416 for peer_id in &peer_list {
2417 match self.query_peer_mcp_services(peer_id).await {
2418 Ok(_) => {
2419 successful_queries += 1;
2420 debug!("Sent MCP service discovery query to peer: {}", peer_id);
2421 }
2422 Err(e) => {
2423 warn!(
2424 "Failed to send MCP service discovery query to peer {}: {}",
2425 peer_id, e
2426 );
2427 }
2428 }
2429 }
2430
2431 info!(
2432 "Broadcast MCP service discovery to {}/{} connected peers",
2433 successful_queries,
2434 peer_list.len()
2435 );
2436
2437 Ok(())
2438 }
2439}
2440
2441#[derive(Clone)]
2443pub struct P2PNetworkSender {
2444 peer_id: PeerId,
2445 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2447}
2448
2449impl P2PNetworkSender {
2450 pub fn new(
2451 peer_id: PeerId,
2452 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2453 ) -> Self {
2454 Self { peer_id, send_tx }
2455 }
2456}
2457
2458#[async_trait::async_trait]
2460impl NetworkSender for P2PNetworkSender {
2461 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2463 self.send_tx
2464 .send((peer_id.clone(), protocol.to_string(), data))
2465 .map_err(|_| {
2466 P2PError::Network(crate::error::NetworkError::ProtocolError(
2467 "Failed to send message via channel".to_string().into(),
2468 ))
2469 })?;
2470 Ok(())
2471 }
2472
2473 fn local_peer_id(&self) -> &PeerId {
2475 &self.peer_id
2476 }
2477}
2478
2479pub struct NodeBuilder {
2481 config: NodeConfig,
2482}
2483
2484impl Default for NodeBuilder {
2485 fn default() -> Self {
2486 Self::new()
2487 }
2488}
2489
2490impl NodeBuilder {
2491 pub fn new() -> Self {
2493 Self {
2494 config: NodeConfig::default(),
2495 }
2496 }
2497
2498 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2500 self.config.peer_id = Some(peer_id);
2501 self
2502 }
2503
2504 pub fn listen_on(mut self, addr: &str) -> Self {
2506 if let Ok(multiaddr) = addr.parse() {
2507 self.config.listen_addrs.push(multiaddr);
2508 }
2509 self
2510 }
2511
2512 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2514 if let Ok(multiaddr) = addr.parse() {
2515 self.config.bootstrap_peers.push(multiaddr);
2516 }
2517 self.config.bootstrap_peers_str.push(addr.to_string());
2518 self
2519 }
2520
2521 pub fn with_ipv6(mut self, enable: bool) -> Self {
2523 self.config.enable_ipv6 = enable;
2524 self
2525 }
2526
2527 pub fn with_mcp_server(mut self) -> Self {
2529 self.config.enable_mcp_server = true;
2530 self
2531 }
2532
2533 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
2535 self.config.mcp_server_config = Some(mcp_config);
2536 self.config.enable_mcp_server = true;
2537 self
2538 }
2539
2540 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2542 self.config.connection_timeout = timeout;
2543 self
2544 }
2545
2546 pub fn with_max_connections(mut self, max: usize) -> Self {
2548 self.config.max_connections = max;
2549 self
2550 }
2551
2552 pub fn with_production_mode(mut self) -> Self {
2554 self.config.production_config = Some(ProductionConfig::default());
2555 self
2556 }
2557
2558 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2560 self.config.production_config = Some(production_config);
2561 self
2562 }
2563
2564 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2566 self.config.dht_config = dht_config;
2567 self
2568 }
2569
2570 pub fn with_default_dht(mut self) -> Self {
2572 self.config.dht_config = DHTConfig::default();
2573 self
2574 }
2575
2576 pub async fn build(self) -> Result<P2PNode> {
2578 P2PNode::new(self.config).await
2579 }
2580}
2581
2582#[allow(dead_code)] async fn handle_received_message_standalone(
2585 message_data: Vec<u8>,
2586 peer_id: &PeerId,
2587 protocol: &str,
2588 event_tx: &broadcast::Sender<P2PEvent>,
2589 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2590) -> Result<()> {
2591 if protocol == MCP_PROTOCOL {
2593 return handle_mcp_message_standalone(message_data, peer_id, mcp_server).await;
2594 }
2595
2596 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2598 Ok(message) => {
2599 if let (Some(protocol), Some(data), Some(from)) = (
2600 message.get("protocol").and_then(|v| v.as_str()),
2601 message.get("data").and_then(|v| v.as_array()),
2602 message.get("from").and_then(|v| v.as_str()),
2603 ) {
2604 let data_bytes: Vec<u8> = data
2606 .iter()
2607 .filter_map(|v| v.as_u64().map(|n| n as u8))
2608 .collect();
2609
2610 let event = P2PEvent::Message {
2612 topic: protocol.to_string(),
2613 source: from.to_string(),
2614 data: data_bytes,
2615 };
2616
2617 let _ = event_tx.send(event);
2618 debug!("Generated message event from peer: {}", peer_id);
2619 }
2620 }
2621 Err(e) => {
2622 warn!("Failed to parse received message from {}: {}", peer_id, e);
2623 }
2624 }
2625
2626 Ok(())
2627}
2628
2629#[allow(dead_code)] async fn handle_mcp_message_standalone(
2632 message_data: Vec<u8>,
2633 peer_id: &PeerId,
2634 mcp_server: &Option<Arc<crate::mcp::MCPServer>>,
2635) -> Result<()> {
2636 if let Some(_mcp_server) = mcp_server {
2637 match serde_json::from_slice::<crate::mcp::P2PMCPMessage>(&message_data) {
2639 Ok(p2p_mcp_message) => {
2640 use crate::mcp::P2PMCPMessageType;
2642 match p2p_mcp_message.message_type {
2643 P2PMCPMessageType::Request => {
2644 debug!("Received MCP request from peer {}", peer_id);
2645 }
2648 P2PMCPMessageType::Response => {
2649 debug!("Received MCP response from peer {}", peer_id);
2650 }
2652 P2PMCPMessageType::ServiceAdvertisement => {
2653 debug!("Received service advertisement from peer {}", peer_id);
2654 }
2656 P2PMCPMessageType::ServiceDiscovery => {
2657 debug!("Received service discovery query from peer {}", peer_id);
2658 }
2660 P2PMCPMessageType::Heartbeat => {
2661 debug!("Received heartbeat from peer {}", peer_id);
2662 }
2664 P2PMCPMessageType::HealthCheck => {
2665 debug!("Received health check from peer {}", peer_id);
2666 }
2668 }
2669 }
2670 Err(e) => {
2671 warn!(
2672 "Failed to deserialize MCP message from peer {}: {}",
2673 peer_id, e
2674 );
2675 return Err(P2PError::Mcp(crate::error::McpError::InvalidRequest(
2676 format!("Invalid MCP message: {e}").into(),
2677 )));
2678 }
2679 }
2680 } else {
2681 warn!("Received MCP message but MCP server is not enabled");
2682 return Err(P2PError::Mcp(crate::error::McpError::ServerUnavailable(
2683 "MCP server not enabled".to_string().into(),
2684 )));
2685 }
2686
2687 Ok(())
2688}
2689
2690fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2692 match create_protocol_message_static(protocol, data) {
2693 Ok(msg) => Some(msg),
2694 Err(e) => {
2695 warn!("Failed to create protocol message: {}", e);
2696 None
2697 }
2698 }
2699}
2700
2701async fn handle_message_send_result(result: Result<()>, peer_id: &PeerId) {
2703 match result {
2704 Ok(_) => {
2705 debug!("Message sent to peer {} via transport layer", peer_id);
2706 }
2707 Err(e) => {
2708 warn!("Failed to send message to peer {}: {}", peer_id, e);
2709 }
2710 }
2711}
2712
2713#[allow(dead_code)] fn check_rate_limit(
2716 rate_limiter: &RateLimiter,
2717 socket_addr: &std::net::SocketAddr,
2718 remote_addr: &NetworkAddress,
2719) -> Result<()> {
2720 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2721 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2722 e
2723 })
2724}
2725
2726#[allow(dead_code)] async fn register_new_peer(
2729 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2730 peer_id: &PeerId,
2731 remote_addr: &NetworkAddress,
2732) {
2733 let mut peers_guard = peers.write().await;
2734 let peer_info = PeerInfo {
2735 peer_id: peer_id.clone(),
2736 addresses: vec![remote_addr.to_string()],
2737 connected_at: tokio::time::Instant::now(),
2738 last_seen: tokio::time::Instant::now(),
2739 status: ConnectionStatus::Connected,
2740 protocols: vec!["p2p-chat/1.0.0".to_string()],
2741 heartbeat_count: 0,
2742 };
2743 peers_guard.insert(peer_id.clone(), peer_info);
2744}
2745
2746#[allow(dead_code)] fn spawn_connection_handler(
2749 connection: Box<dyn crate::transport::Connection>,
2750 peer_id: PeerId,
2751 event_tx: broadcast::Sender<P2PEvent>,
2752 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2753 mcp_server: Option<Arc<MCPServer>>,
2754) {
2755 tokio::spawn(async move {
2756 handle_peer_connection(connection, peer_id, event_tx, peers, mcp_server).await;
2757 });
2758}
2759
2760#[allow(dead_code)] async fn handle_peer_connection(
2763 mut connection: Box<dyn crate::transport::Connection>,
2764 peer_id: PeerId,
2765 event_tx: broadcast::Sender<P2PEvent>,
2766 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2767 mcp_server: Option<Arc<MCPServer>>,
2768) {
2769 loop {
2770 match connection.receive().await {
2771 Ok(message_data) => {
2772 debug!(
2773 "Received {} bytes from peer: {}",
2774 message_data.len(),
2775 peer_id
2776 );
2777
2778 if let Err(e) = handle_received_message_standalone(
2780 message_data,
2781 &peer_id,
2782 "unknown", &event_tx,
2784 &mcp_server,
2785 )
2786 .await
2787 {
2788 warn!("Failed to handle message from {}: {}", peer_id, e);
2789 }
2790 }
2791 Err(e) => {
2792 warn!("Failed to receive message from {}: {}", peer_id, e);
2793
2794 if !connection.is_alive().await {
2796 info!("Connection to {} is dead, removing peer", peer_id);
2797
2798 remove_peer(&peers, &peer_id).await;
2800
2801 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2803
2804 break; }
2806
2807 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2809 }
2810 }
2811 }
2812}
2813
2814#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2817 let mut peers_guard = peers.write().await;
2818 peers_guard.remove(peer_id);
2819}
2820
2821async fn update_peer_heartbeat(
2823 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2824 peer_id: &PeerId,
2825) -> Result<()> {
2826 let mut peers_guard = peers.write().await;
2827 match peers_guard.get_mut(peer_id) {
2828 Some(peer_info) => {
2829 peer_info.last_seen = Instant::now();
2830 peer_info.heartbeat_count += 1;
2831 Ok(())
2832 }
2833 None => {
2834 warn!("Received heartbeat from unknown peer: {}", peer_id);
2835 Err(P2PError::Network(NetworkError::PeerNotFound(
2836 format!("Peer {} not found", peer_id).into(),
2837 )))
2838 }
2839 }
2840}
2841
2842async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f32) {
2844 if let Some(manager) = resource_manager {
2845 let metrics = manager.get_metrics().await;
2846 (metrics.memory_used, metrics.cpu_usage as f32)
2847 } else {
2848 (0, 0.0)
2849 }
2850}
2851
2852#[cfg(test)]
2853mod tests {
2854 use super::*;
2855 use crate::mcp::{
2856 MCPTool, Tool, ToolHandler, ToolHealthStatus, ToolMetadata, ToolRequirements,
2857 };
2858 use serde_json::json;
2859 use std::future::Future;
2860 use std::pin::Pin;
2861 use std::time::Duration;
2862 use tokio::time::timeout;
2863
2864 struct NetworkTestTool {
2866 name: String,
2867 }
2868
2869 impl NetworkTestTool {
2870 fn new(name: &str) -> Self {
2871 Self {
2872 name: name.to_string(),
2873 }
2874 }
2875 }
2876
2877 impl ToolHandler for NetworkTestTool {
2878 fn execute(
2879 &self,
2880 arguments: serde_json::Value,
2881 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
2882 let name = self.name.clone();
2883 Box::pin(async move {
2884 Ok(json!({
2885 "tool": name,
2886 "input": arguments,
2887 "result": "network test success"
2888 }))
2889 })
2890 }
2891
2892 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
2893 Ok(())
2894 }
2895
2896 fn get_requirements(&self) -> ToolRequirements {
2897 ToolRequirements::default()
2898 }
2899 }
2900
2901 fn create_test_node_config() -> NodeConfig {
2903 NodeConfig {
2904 peer_id: Some("test_peer_123".to_string()),
2905 listen_addrs: vec![
2906 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2907 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2908 ],
2909 listen_addr: std::net::SocketAddr::new(
2910 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2911 0,
2912 ),
2913 bootstrap_peers: vec![],
2914 bootstrap_peers_str: vec![],
2915 enable_ipv6: true,
2916 enable_mcp_server: true,
2917 mcp_server_config: Some(MCPServerConfig {
2918 enable_auth: false, enable_rate_limiting: false, ..Default::default()
2921 }),
2922 connection_timeout: Duration::from_secs(10),
2923 keep_alive_interval: Duration::from_secs(30),
2924 max_connections: 100,
2925 max_incoming_connections: 50,
2926 dht_config: DHTConfig::default(),
2927 security_config: SecurityConfig::default(),
2928 production_config: None,
2929 bootstrap_cache_config: None,
2930 identity_config: None,
2931 }
2932 }
2933
2934 fn create_test_tool(name: &str) -> Tool {
2936 Tool {
2937 definition: MCPTool {
2938 name: name.to_string(),
2939 description: format!("Test tool: {}", name).into(),
2940 input_schema: json!({
2941 "type": "object",
2942 "properties": {
2943 "input": { "type": "string" }
2944 }
2945 }),
2946 },
2947 handler: Box::new(NetworkTestTool::new(name)),
2948 metadata: ToolMetadata {
2949 created_at: SystemTime::now(),
2950 last_called: None,
2951 call_count: 0,
2952 avg_execution_time: Duration::from_millis(0),
2953 health_status: ToolHealthStatus::Healthy,
2954 tags: vec!["test".to_string()],
2955 },
2956 }
2957 }
2958
2959 #[tokio::test]
2960 async fn test_node_config_default() {
2961 let config = NodeConfig::default();
2962
2963 assert!(config.peer_id.is_none());
2964 assert_eq!(config.listen_addrs.len(), 2);
2965 assert!(config.enable_ipv6);
2966 assert!(config.enable_mcp_server);
2967 assert_eq!(config.max_connections, 1000);
2968 assert_eq!(config.max_incoming_connections, 100);
2969 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2970 }
2971
2972 #[tokio::test]
2973 async fn test_dht_config_default() {
2974 let config = DHTConfig::default();
2975
2976 assert_eq!(config.k_value, 20);
2977 assert_eq!(config.alpha_value, 5);
2978 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2979 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2980 }
2981
2982 #[tokio::test]
2983 async fn test_security_config_default() {
2984 let config = SecurityConfig::default();
2985
2986 assert!(config.enable_noise);
2987 assert!(config.enable_tls);
2988 assert_eq!(config.trust_level, TrustLevel::Basic);
2989 }
2990
2991 #[test]
2992 fn test_trust_level_variants() {
2993 let _none = TrustLevel::None;
2995 let _basic = TrustLevel::Basic;
2996 let _full = TrustLevel::Full;
2997
2998 assert_eq!(TrustLevel::None, TrustLevel::None);
3000 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3001 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3002 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3003 }
3004
3005 #[test]
3006 fn test_connection_status_variants() {
3007 let connecting = ConnectionStatus::Connecting;
3008 let connected = ConnectionStatus::Connected;
3009 let disconnecting = ConnectionStatus::Disconnecting;
3010 let disconnected = ConnectionStatus::Disconnected;
3011 let failed = ConnectionStatus::Failed("test error".to_string());
3012
3013 assert_eq!(connecting, ConnectionStatus::Connecting);
3014 assert_eq!(connected, ConnectionStatus::Connected);
3015 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3016 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3017 assert_ne!(connecting, connected);
3018
3019 if let ConnectionStatus::Failed(msg) = failed {
3020 assert_eq!(msg, "test error");
3021 } else {
3022 panic!("Expected Failed status");
3023 }
3024 }
3025
3026 #[tokio::test]
3027 async fn test_node_creation() -> Result<()> {
3028 let config = create_test_node_config();
3029 let node = P2PNode::new(config).await?;
3030
3031 assert_eq!(node.peer_id(), "test_peer_123");
3032 assert!(!node.is_running().await);
3033 assert_eq!(node.peer_count().await, 0);
3034 assert!(node.connected_peers().await.is_empty());
3035
3036 Ok(())
3037 }
3038
3039 #[tokio::test]
3040 async fn test_node_creation_without_peer_id() -> Result<()> {
3041 let mut config = create_test_node_config();
3042 config.peer_id = None;
3043
3044 let node = P2PNode::new(config).await?;
3045
3046 assert!(node.peer_id().starts_with("peer_"));
3048 assert!(!node.is_running().await);
3049
3050 Ok(())
3051 }
3052
3053 #[tokio::test]
3054 async fn test_node_lifecycle() -> Result<()> {
3055 let config = create_test_node_config();
3056 let node = P2PNode::new(config).await?;
3057
3058 assert!(!node.is_running().await);
3060
3061 node.start().await?;
3063 assert!(node.is_running().await);
3064
3065 let listen_addrs = node.listen_addrs().await;
3067 assert!(
3068 !listen_addrs.is_empty(),
3069 "Expected at least one listening address"
3070 );
3071
3072 node.stop().await?;
3074 assert!(!node.is_running().await);
3075
3076 Ok(())
3077 }
3078
3079 #[tokio::test]
3080 async fn test_peer_connection() -> Result<()> {
3081 let config = create_test_node_config();
3082 let node = P2PNode::new(config).await?;
3083
3084 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3085
3086 let peer_id = node.connect_peer(&peer_addr).await?;
3088 assert!(peer_id.starts_with("peer_from_"));
3089
3090 assert_eq!(node.peer_count().await, 1);
3092
3093 let connected_peers = node.connected_peers().await;
3095 assert_eq!(connected_peers.len(), 1);
3096 assert_eq!(connected_peers[0], peer_id);
3097
3098 let peer_info = node.peer_info(&peer_id).await;
3100 assert!(peer_info.is_some());
3101 let info = peer_info.expect("Peer info should exist after adding peer");
3102 assert_eq!(info.peer_id, peer_id);
3103 assert_eq!(info.status, ConnectionStatus::Connected);
3104 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3105
3106 node.disconnect_peer(&peer_id).await?;
3108 assert_eq!(node.peer_count().await, 0);
3109
3110 Ok(())
3111 }
3112
3113 #[tokio::test]
3114 async fn test_event_subscription() -> Result<()> {
3115 let config = create_test_node_config();
3116 let node = P2PNode::new(config).await?;
3117
3118 let mut events = node.subscribe_events();
3119 let peer_addr = "/ip4/127.0.0.1/tcp/0".to_string();
3120
3121 let peer_id = node.connect_peer(&peer_addr).await?;
3123
3124 let event = timeout(Duration::from_millis(100), events.recv()).await;
3126 assert!(event.is_ok());
3127
3128 let event_result = event
3129 .expect("Should receive event")
3130 .expect("Event should not be error");
3131 match event_result {
3132 P2PEvent::PeerConnected(event_peer_id) => {
3133 assert_eq!(event_peer_id, peer_id);
3134 }
3135 _ => panic!("Expected PeerConnected event"),
3136 }
3137
3138 node.disconnect_peer(&peer_id).await?;
3140
3141 let event = timeout(Duration::from_millis(100), events.recv()).await;
3143 assert!(event.is_ok());
3144
3145 let event_result = event
3146 .expect("Should receive event")
3147 .expect("Event should not be error");
3148 match event_result {
3149 P2PEvent::PeerDisconnected(event_peer_id) => {
3150 assert_eq!(event_peer_id, peer_id);
3151 }
3152 _ => panic!("Expected PeerDisconnected event"),
3153 }
3154
3155 Ok(())
3156 }
3157
3158 #[tokio::test]
3159 async fn test_message_sending() -> Result<()> {
3160 let mut config1 = create_test_node_config();
3162 config1.listen_addr =
3163 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3164 let node1 = P2PNode::new(config1).await?;
3165 node1.start().await?;
3166
3167 let mut config2 = create_test_node_config();
3168 config2.listen_addr =
3169 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3170 let node2 = P2PNode::new(config2).await?;
3171 node2.start().await?;
3172
3173 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3175
3176 let node2_addr = node2.local_addr().ok_or_else(|| {
3178 P2PError::Network(crate::error::NetworkError::ProtocolError(
3179 "No listening address".to_string().into(),
3180 ))
3181 })?;
3182
3183 let peer_id = node1.connect_peer(&node2_addr).await?;
3185
3186 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
3188
3189 let message_data = b"Hello, peer!".to_vec();
3191 let result = node1
3192 .send_message(&peer_id, "test-protocol", message_data)
3193 .await;
3194 if let Err(e) = &result {
3197 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3198 }
3199
3200 let non_existent_peer = "non_existent_peer".to_string();
3202 let result = node1
3203 .send_message(&non_existent_peer, "test-protocol", vec![])
3204 .await;
3205 assert!(result.is_err());
3206 assert!(result.unwrap_err().to_string().contains("not connected"));
3207
3208 Ok(())
3209 }
3210
3211 #[tokio::test]
3212 async fn test_mcp_integration() -> Result<()> {
3213 let config = create_test_node_config();
3214 let node = P2PNode::new(config).await?;
3215
3216 node.start().await?;
3218
3219 let tool = create_test_tool("network_test_tool");
3221 node.register_mcp_tool(tool).await?;
3222
3223 let tools = node.list_mcp_tools().await?;
3225 assert!(tools.contains(&"network_test_tool".to_string()));
3226
3227 let arguments = json!({"input": "test_input"});
3229 let result = node
3230 .call_mcp_tool("network_test_tool", arguments.clone())
3231 .await?;
3232 assert_eq!(result["tool"], "network_test_tool");
3233 assert_eq!(result["input"], arguments);
3234
3235 let stats = node.mcp_stats().await?;
3237 assert_eq!(stats.total_tools, 1);
3238
3239 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
3241 assert!(result.is_err());
3242
3243 node.stop().await?;
3244 Ok(())
3245 }
3246
3247 #[tokio::test]
3248 async fn test_remote_mcp_operations() -> Result<()> {
3249 let config = create_test_node_config();
3250 let node = P2PNode::new(config).await?;
3251
3252 node.start().await?;
3253
3254 let tool = create_test_tool("remote_test_tool");
3256 node.register_mcp_tool(tool).await?;
3257
3258 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
3259 let peer_id = node.connect_peer(&peer_addr).await?;
3260
3261 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
3263 assert!(!remote_tools.is_empty());
3264
3265 let arguments = json!({"input": "remote_test"});
3267 let result = node
3268 .call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone())
3269 .await?;
3270 assert_eq!(result["tool"], "remote_test_tool");
3271
3272 let services = node.discover_remote_mcp_services().await?;
3274 assert!(services.is_empty());
3276
3277 node.stop().await?;
3278 Ok(())
3279 }
3280
3281 #[tokio::test]
3282 async fn test_health_check() -> Result<()> {
3283 let config = create_test_node_config();
3284 let node = P2PNode::new(config).await?;
3285
3286 let result = node.health_check().await;
3288 assert!(result.is_ok());
3289
3290 for i in 0..5 {
3292 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
3293 node.connect_peer(&addr).await?;
3294 }
3295
3296 let result = node.health_check().await;
3298 assert!(result.is_ok());
3299
3300 Ok(())
3301 }
3302
3303 #[tokio::test]
3304 async fn test_node_uptime() -> Result<()> {
3305 let config = create_test_node_config();
3306 let node = P2PNode::new(config).await?;
3307
3308 let uptime1 = node.uptime();
3309 assert!(uptime1 >= Duration::from_secs(0));
3310
3311 tokio::time::sleep(Duration::from_millis(10)).await;
3313
3314 let uptime2 = node.uptime();
3315 assert!(uptime2 > uptime1);
3316
3317 Ok(())
3318 }
3319
3320 #[tokio::test]
3321 async fn test_node_config_access() -> Result<()> {
3322 let config = create_test_node_config();
3323 let expected_peer_id = config.peer_id.clone();
3324 let node = P2PNode::new(config).await?;
3325
3326 let node_config = node.config();
3327 assert_eq!(node_config.peer_id, expected_peer_id);
3328 assert_eq!(node_config.max_connections, 100);
3329 assert!(node_config.enable_mcp_server);
3330
3331 Ok(())
3332 }
3333
3334 #[tokio::test]
3335 async fn test_mcp_server_access() -> Result<()> {
3336 let config = create_test_node_config();
3337 let node = P2PNode::new(config).await?;
3338
3339 assert!(node.mcp_server().is_some());
3341
3342 let mut config = create_test_node_config();
3344 config.enable_mcp_server = false;
3345 let node_no_mcp = P2PNode::new(config).await?;
3346 assert!(node_no_mcp.mcp_server().is_none());
3347
3348 Ok(())
3349 }
3350
3351 #[tokio::test]
3352 async fn test_dht_access() -> Result<()> {
3353 let config = create_test_node_config();
3354 let node = P2PNode::new(config).await?;
3355
3356 assert!(node.dht().is_some());
3358
3359 Ok(())
3360 }
3361
3362 #[tokio::test]
3363 async fn test_node_builder() -> Result<()> {
3364 let node = P2PNode::builder()
3365 .with_peer_id("builder_test_peer".to_string())
3366 .listen_on("/ip4/127.0.0.1/tcp/9100")
3367 .listen_on("/ip6/::1/tcp/9100")
3368 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
3369 .with_ipv6(true)
3370 .with_mcp_server()
3371 .with_connection_timeout(Duration::from_secs(15))
3372 .with_max_connections(200)
3373 .build()
3374 .await?;
3375
3376 assert_eq!(node.peer_id(), "builder_test_peer");
3377 let config = node.config();
3378 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
3380 assert!(config.enable_ipv6);
3381 assert!(config.enable_mcp_server);
3382 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3383 assert_eq!(config.max_connections, 200);
3384
3385 Ok(())
3386 }
3387
3388 #[tokio::test]
3389 async fn test_node_builder_with_mcp_config() -> Result<()> {
3390 let mcp_config = MCPServerConfig {
3391 server_name: "test_mcp_server".to_string(),
3392 server_version: "1.0.0".to_string(),
3393 enable_dht_discovery: false,
3394 enable_auth: false,
3395 ..MCPServerConfig::default()
3396 };
3397
3398 let node = P2PNode::builder()
3399 .with_peer_id("mcp_config_test".to_string())
3400 .with_mcp_config(mcp_config.clone())
3401 .build()
3402 .await?;
3403
3404 assert_eq!(node.peer_id(), "mcp_config_test");
3405 let config = node.config();
3406 assert!(config.enable_mcp_server);
3407 assert!(config.mcp_server_config.is_some());
3408
3409 let node_mcp_config = config
3410 .mcp_server_config
3411 .as_ref()
3412 .expect("MCP server config should be present in test config");
3413 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
3414 assert!(!node_mcp_config.enable_auth);
3415
3416 Ok(())
3417 }
3418
3419 #[tokio::test]
3420 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
3421 let mut config = create_test_node_config();
3422 config.enable_mcp_server = false;
3423 let node = P2PNode::new(config).await?;
3424
3425 let tool = create_test_tool("test_tool");
3427 let result = node.register_mcp_tool(tool).await;
3428 assert!(result.is_err());
3429 assert!(
3430 result
3431 .unwrap_err()
3432 .to_string()
3433 .contains("MCP server not enabled")
3434 );
3435
3436 let result = node.call_mcp_tool("test_tool", json!({})).await;
3437 assert!(result.is_err());
3438 assert!(
3439 result
3440 .unwrap_err()
3441 .to_string()
3442 .contains("MCP server not enabled")
3443 );
3444
3445 let result = node.list_mcp_tools().await;
3446 assert!(result.is_err());
3447 assert!(
3448 result
3449 .unwrap_err()
3450 .to_string()
3451 .contains("MCP server not enabled")
3452 );
3453
3454 let result = node.mcp_stats().await;
3455 assert!(result.is_err());
3456 assert!(
3457 result
3458 .unwrap_err()
3459 .to_string()
3460 .contains("MCP server not enabled")
3461 );
3462
3463 Ok(())
3464 }
3465
3466 #[tokio::test]
3467 async fn test_bootstrap_peers() -> Result<()> {
3468 let mut config = create_test_node_config();
3469 config.bootstrap_peers = vec![
3470 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3471 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3472 ];
3473
3474 let node = P2PNode::new(config).await?;
3475
3476 node.start().await?;
3478
3479 let peer_count = node.peer_count().await;
3482 assert!(
3483 peer_count <= 2,
3484 "Peer count should not exceed bootstrap peer count"
3485 );
3486
3487 node.stop().await?;
3488 Ok(())
3489 }
3490
3491 #[tokio::test]
3492 async fn test_production_mode_disabled() -> Result<()> {
3493 let config = create_test_node_config();
3494 let node = P2PNode::new(config).await?;
3495
3496 assert!(!node.is_production_mode());
3497 assert!(node.production_config().is_none());
3498
3499 let result = node.resource_metrics().await;
3501 assert!(result.is_err());
3502 assert!(result.unwrap_err().to_string().contains("not enabled"));
3503
3504 Ok(())
3505 }
3506
3507 #[tokio::test]
3508 async fn test_network_event_variants() {
3509 let peer_id = "test_peer".to_string();
3511 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3512
3513 let _peer_connected = NetworkEvent::PeerConnected {
3514 peer_id: peer_id.clone(),
3515 addresses: vec![address.clone()],
3516 };
3517
3518 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3519 peer_id: peer_id.clone(),
3520 reason: "test disconnect".to_string(),
3521 };
3522
3523 let _message_received = NetworkEvent::MessageReceived {
3524 peer_id: peer_id.clone(),
3525 protocol: "test-protocol".to_string(),
3526 data: vec![1, 2, 3],
3527 };
3528
3529 let _connection_failed = NetworkEvent::ConnectionFailed {
3530 peer_id: Some(peer_id.clone()),
3531 address: address.clone(),
3532 error: "connection refused".to_string(),
3533 };
3534
3535 let _dht_stored = NetworkEvent::DHTRecordStored {
3536 key: vec![1, 2, 3],
3537 value: vec![4, 5, 6],
3538 };
3539
3540 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3541 key: vec![1, 2, 3],
3542 value: Some(vec![4, 5, 6]),
3543 };
3544 }
3545
3546 #[tokio::test]
3547 async fn test_peer_info_structure() {
3548 let peer_info = PeerInfo {
3549 peer_id: "test_peer".to_string(),
3550 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3551 connected_at: Instant::now(),
3552 last_seen: Instant::now(),
3553 status: ConnectionStatus::Connected,
3554 protocols: vec!["test-protocol".to_string()],
3555 heartbeat_count: 0,
3556 };
3557
3558 assert_eq!(peer_info.peer_id, "test_peer");
3559 assert_eq!(peer_info.addresses.len(), 1);
3560 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3561 assert_eq!(peer_info.protocols.len(), 1);
3562 }
3563
3564 #[tokio::test]
3565 async fn test_serialization() -> Result<()> {
3566 let config = create_test_node_config();
3568 let serialized = serde_json::to_string(&config)?;
3569 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3570
3571 assert_eq!(config.peer_id, deserialized.peer_id);
3572 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3573 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3574
3575 Ok(())
3576 }
3577}