1use crate::{PeerId, Multiaddr, P2PError, Result};
7use crate::mcp::{MCPServer, MCPServerConfig, Tool, MCPCallContext, MCP_PROTOCOL};
8use crate::dht::{DHT, DHTConfig as DHTConfigInner};
9use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
10use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
11use crate::transport::{TransportManager, QuicTransport, TcpTransport, TransportSelection, TransportOptions};
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::{Duration, SystemTime};
17use tokio::sync::{broadcast, RwLock};
18use tokio::time::Instant;
19use tracing::{debug, info, warn};
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct NodeConfig {
24 pub peer_id: Option<PeerId>,
26
27 pub listen_addrs: Vec<Multiaddr>,
29
30 pub listen_addr: std::net::SocketAddr,
32
33 pub bootstrap_peers: Vec<Multiaddr>,
35
36 pub bootstrap_peers_str: Vec<String>,
38
39 pub enable_ipv6: bool,
41
42 pub enable_mcp_server: bool,
44
45 pub mcp_server_config: Option<MCPServerConfig>,
47
48 pub connection_timeout: Duration,
50
51 pub keep_alive_interval: Duration,
53
54 pub max_connections: usize,
56
57 pub max_incoming_connections: usize,
59
60 pub dht_config: DHTConfig,
62
63 pub security_config: SecurityConfig,
65
66 pub production_config: Option<ProductionConfig>,
68
69 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct DHTConfig {
76 pub k_value: usize,
78
79 pub alpha_value: usize,
81
82 pub record_ttl: Duration,
84
85 pub refresh_interval: Duration,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SecurityConfig {
92 pub enable_noise: bool,
94
95 pub enable_tls: bool,
97
98 pub trust_level: TrustLevel,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
104pub enum TrustLevel {
105 None,
107 Basic,
109 Full,
111}
112
113impl Default for NodeConfig {
114 fn default() -> Self {
115 Self {
116 peer_id: None,
117 listen_addrs: vec![
118 "/ip6/::/tcp/9000".to_string(),
119 "/ip4/0.0.0.0/tcp/9000".to_string(),
120 ],
121 listen_addr: "127.0.0.1:9000".parse().unwrap(),
122 bootstrap_peers: Vec::new(),
123 bootstrap_peers_str: Vec::new(),
124 enable_ipv6: true,
125 enable_mcp_server: true,
126 mcp_server_config: None, connection_timeout: Duration::from_secs(30),
128 keep_alive_interval: Duration::from_secs(60),
129 max_connections: 1000,
130 max_incoming_connections: 100,
131 dht_config: DHTConfig::default(),
132 security_config: SecurityConfig::default(),
133 production_config: None, bootstrap_cache_config: None,
135 }
136 }
137}
138
139impl Default for DHTConfig {
140 fn default() -> Self {
141 Self {
142 k_value: 20,
143 alpha_value: 5,
144 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
147 }
148}
149
150impl Default for SecurityConfig {
151 fn default() -> Self {
152 Self {
153 enable_noise: true,
154 enable_tls: true,
155 trust_level: TrustLevel::Basic,
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct PeerInfo {
163 pub peer_id: PeerId,
165
166 pub addresses: Vec<String>,
168
169 pub connected_at: Instant,
171
172 pub last_seen: Instant,
174
175 pub status: ConnectionStatus,
177
178 pub protocols: Vec<String>,
180}
181
182#[derive(Debug, Clone, PartialEq)]
184pub enum ConnectionStatus {
185 Connecting,
187 Connected,
189 Disconnecting,
191 Disconnected,
193 Failed(String),
195}
196
197#[derive(Debug, Clone)]
199pub enum NetworkEvent {
200 PeerConnected {
202 peer_id: PeerId,
204 addresses: Vec<String>,
206 },
207
208 PeerDisconnected {
210 peer_id: PeerId,
212 reason: String,
214 },
215
216 MessageReceived {
218 peer_id: PeerId,
220 protocol: String,
222 data: Vec<u8>,
224 },
225
226 ConnectionFailed {
228 peer_id: Option<PeerId>,
230 address: String,
232 error: String,
234 },
235
236 DHTRecordStored {
238 key: Vec<u8>,
240 value: Vec<u8>,
242 },
243
244 DHTRecordRetrieved {
246 key: Vec<u8>,
248 value: Option<Vec<u8>>,
250 },
251}
252
253#[derive(Debug, Clone)]
255pub enum P2PEvent {
256 Message { topic: String, source: PeerId, data: Vec<u8> },
257 PeerConnected(PeerId),
258 PeerDisconnected(PeerId),
259}
260
261pub struct P2PNode {
263 config: NodeConfig,
265
266 peer_id: PeerId,
268
269 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
271
272 event_tx: broadcast::Sender<P2PEvent>,
274
275 listen_addrs: RwLock<Vec<Multiaddr>>,
277
278 start_time: Instant,
280
281 running: RwLock<bool>,
283
284 mcp_server: Option<Arc<MCPServer>>,
286
287 dht: Option<Arc<RwLock<DHT>>>,
289
290 resource_manager: Option<Arc<ResourceManager>>,
292
293 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
295
296 transport_manager: Arc<TransportManager>,
298}
299
300impl P2PNode {
301 pub async fn new(config: NodeConfig) -> Result<Self> {
303 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
304 format!("peer_{}", uuid::Uuid::new_v4().to_string()[..8].to_string())
306 });
307
308 let (event_tx, _) = broadcast::channel(1000);
309
310 let dht = if config.enable_mcp_server || true { let dht_config = DHTConfigInner {
313 replication_factor: config.dht_config.k_value,
314 bucket_size: config.dht_config.k_value,
315 alpha: config.dht_config.alpha_value,
316 record_ttl: config.dht_config.record_ttl,
317 bucket_refresh_interval: config.dht_config.refresh_interval,
318 republish_interval: config.dht_config.refresh_interval,
319 max_distance: 160, };
321 let dht_key = crate::dht::Key::new(peer_id.as_bytes());
322 let dht_instance = DHT::new(dht_key, dht_config);
323 Some(Arc::new(RwLock::new(dht_instance)))
324 } else {
325 None
326 };
327
328 let mcp_server = if config.enable_mcp_server {
330 let mcp_config = config.mcp_server_config.clone().unwrap_or_else(|| {
331 MCPServerConfig {
332 server_name: format!("P2P-MCP-{}", peer_id),
333 server_version: crate::VERSION.to_string(),
334 enable_dht_discovery: dht.is_some(),
335 ..MCPServerConfig::default()
336 }
337 });
338
339 let mut server = MCPServer::new(mcp_config);
340
341 if let Some(ref dht_instance) = dht {
343 server = server.with_dht(dht_instance.clone());
344 }
345
346 Some(Arc::new(server))
347 } else {
348 None
349 };
350
351 let resource_manager = if let Some(prod_config) = config.production_config.clone() {
353 Some(Arc::new(ResourceManager::new(prod_config)))
354 } else {
355 None
356 };
357
358 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
360 match BootstrapManager::with_config(cache_config.clone()).await {
361 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
362 Err(e) => {
363 warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
364 None
365 }
366 }
367 } else {
368 match BootstrapManager::new().await {
369 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
370 Err(e) => {
371 warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
372 None
373 }
374 }
375 };
376
377 let transport_options = TransportOptions::default();
379 let mut transport_manager = TransportManager::new(
380 TransportSelection::default(), transport_options
382 );
383
384 match QuicTransport::new(true) { Ok(quic_transport) => {
387 transport_manager.register_transport(Arc::new(quic_transport));
388 info!("Registered QUIC transport");
389 }
390 Err(e) => {
391 warn!("Failed to create QUIC transport: {}, continuing without QUIC", e);
392 }
393 }
394
395 let tcp_transport = TcpTransport::new(false); transport_manager.register_transport(Arc::new(tcp_transport));
398 info!("Registered TCP transport");
399
400 let transport_manager = Arc::new(transport_manager);
401
402 let node = Self {
403 config,
404 peer_id,
405 peers: Arc::new(RwLock::new(HashMap::new())),
406 event_tx,
407 listen_addrs: RwLock::new(Vec::new()),
408 start_time: Instant::now(),
409 running: RwLock::new(false),
410 mcp_server,
411 dht,
412 resource_manager,
413 bootstrap_manager,
414 transport_manager,
415 };
416
417 info!("Created P2P node with peer ID: {}", node.peer_id);
418 Ok(node)
419 }
420
421 pub fn builder() -> NodeBuilder {
423 NodeBuilder::new()
424 }
425
426 pub fn peer_id(&self) -> &PeerId {
428 &self.peer_id
429 }
430
431 pub fn local_addr(&self) -> Option<String> {
432 self.listen_addrs.try_read().ok().and_then(|addrs| addrs.get(0).map(|a| a.to_string()))
433 }
434
435 pub async fn subscribe(&self, topic: &str) -> Result<()> {
436 info!("Subscribed to topic: {}", topic);
439 Ok(())
440 }
441
442 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
443 info!("Publishing message to topic: {} ({} bytes)", topic, data.len());
444
445 let peer_list: Vec<PeerId> = {
447 let peers_guard = self.peers.read().await;
448 peers_guard.keys().cloned().collect()
449 };
450
451 if peer_list.is_empty() {
452 debug!("No peers connected, message will only be sent to local subscribers");
453 } else {
454 let mut send_count = 0;
456 for peer_id in &peer_list {
457 match self.send_message(peer_id, topic, data.to_vec()).await {
458 Ok(_) => {
459 send_count += 1;
460 debug!("Sent message to peer: {}", peer_id);
461 }
462 Err(e) => {
463 warn!("Failed to send message to peer {}: {}", peer_id, e);
464 }
465 }
466 }
467 info!("Published message to {}/{} connected peers", send_count, peer_list.len());
468 }
469
470 let event = P2PEvent::Message {
472 topic: topic.to_string(),
473 source: self.peer_id.clone(),
474 data: data.to_vec(),
475 };
476 let _ = self.event_tx.send(event);
477
478 Ok(())
479 }
480
481 pub fn config(&self) -> &NodeConfig {
483 &self.config
484 }
485
486 pub async fn start(&self) -> Result<()> {
488 info!("Starting P2P node...");
489
490 if let Some(ref resource_manager) = self.resource_manager {
492 resource_manager.start().await
493 .map_err(|e| P2PError::Network(format!("Failed to start resource manager: {}", e)))?;
494 info!("Production resource manager started");
495 }
496
497 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
499 let mut manager = bootstrap_manager.write().await;
500 manager.start_background_tasks().await
501 .map_err(|e| P2PError::Network(format!("Failed to start bootstrap manager: {}", e)))?;
502 info!("Bootstrap cache manager started");
503 }
504
505 *self.running.write().await = true;
507
508 self.start_network_listeners().await?;
510
511 let mut listen_addrs = self.listen_addrs.write().await;
513 listen_addrs.extend(self.config.listen_addrs.clone());
514
515 info!("P2P node started on addresses: {:?}", *listen_addrs);
516
517 if let Some(ref mcp_server) = self.mcp_server {
519 mcp_server.start().await
520 .map_err(|e| P2PError::MCP(format!("Failed to start MCP server: {}", e)))?;
521 info!("MCP server started");
522 }
523
524 self.start_message_receiving_system().await?;
526
527 self.connect_bootstrap_peers().await?;
529
530 Ok(())
531 }
532
533 async fn start_network_listeners(&self) -> Result<()> {
535 info!("Starting network listeners...");
536
537 let transport_manager = &self.transport_manager;
539
540 for multiaddr in &self.config.listen_addrs {
542 if let Some(socket_addr) = self.multiaddr_to_socketaddr(multiaddr) {
544 if let Err(e) = self.start_listener_on_address(socket_addr).await {
547 warn!("Failed to start listener on {}: {}", socket_addr, e);
548 } else {
549 info!("Started listener on {}", socket_addr);
550 }
551 } else {
552 warn!("Could not parse address for listening: {}", multiaddr);
553 }
554 }
555
556 if self.config.listen_addrs.is_empty() {
558 let default_addrs = vec![
560 "0.0.0.0:9000".parse::<std::net::SocketAddr>().unwrap(),
561 "[::]:9000".parse::<std::net::SocketAddr>().unwrap(),
562 ];
563
564 for addr in default_addrs {
565 if let Err(e) = self.start_listener_on_address(addr).await {
566 warn!("Failed to start default listener on {}: {}", addr, e);
567 } else {
568 info!("Started default listener on {}", addr);
569 }
570 }
571 }
572
573 Ok(())
574 }
575
576 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
578 use crate::transport::{TransportType, Transport};
579
580 match crate::transport::QuicTransport::new(true) {
582 Ok(quic_transport) => {
583 match quic_transport.listen(addr).await {
584 Ok(listen_addrs) => {
585 info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
586
587 {
589 let mut node_listen_addrs = self.listen_addrs.write().await;
590 node_listen_addrs.clear(); node_listen_addrs.extend(listen_addrs);
592 }
593
594 self.start_connection_acceptor(
596 Arc::new(quic_transport),
597 addr,
598 crate::transport::TransportType::QUIC
599 ).await?;
600
601 return Ok(());
602 }
603 Err(e) => {
604 warn!("Failed to start QUIC listener on {}: {}", addr, e);
605 }
606 }
607 }
608 Err(e) => {
609 warn!("Failed to create QUIC transport for listening: {}", e);
610 }
611 }
612
613 let tcp_transport = crate::transport::TcpTransport::new(false);
615 match tcp_transport.listen(addr).await {
616 Ok(listen_addrs) => {
617 info!("TCP listener started on {} -> {:?}", addr, listen_addrs);
618
619 {
621 let mut node_listen_addrs = self.listen_addrs.write().await;
622 node_listen_addrs.clear(); node_listen_addrs.extend(listen_addrs);
624 }
625
626 self.start_connection_acceptor(
628 Arc::new(tcp_transport),
629 addr,
630 crate::transport::TransportType::TCP
631 ).await?;
632
633 Ok(())
634 }
635 Err(e) => {
636 warn!("Failed to start TCP listener on {}: {}", addr, e);
637 Err(e)
638 }
639 }
640 }
641
642 async fn start_connection_acceptor(
644 &self,
645 transport: Arc<dyn crate::transport::Transport>,
646 addr: std::net::SocketAddr,
647 transport_type: crate::transport::TransportType
648 ) -> Result<()> {
649 info!("Starting connection acceptor for {:?} on {}", transport_type, addr);
650
651 let event_tx = self.event_tx.clone();
653 let peer_id = self.peer_id.clone();
654 let peers = Arc::clone(&self.peers);
655 let transport_manager = Arc::clone(&self.transport_manager);
656
657 tokio::spawn(async move {
659 loop {
660 match transport.accept().await {
661 Ok(mut connection) => {
662 let remote_addr = connection.remote_addr();
663 let connection_peer_id = format!("peer_from_{}",
664 remote_addr.replace("/", "_").replace(":", "_"));
665
666 info!("Accepted {:?} connection from {} (peer: {})",
667 transport_type, remote_addr, connection_peer_id);
668
669 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
671
672 {
674 let mut peers_guard = peers.write().await;
675 let peer_info = PeerInfo {
676 peer_id: connection_peer_id.clone(),
677 addresses: vec![remote_addr.clone()],
678 connected_at: tokio::time::Instant::now(),
679 last_seen: tokio::time::Instant::now(),
680 status: ConnectionStatus::Connected,
681 protocols: vec!["p2p-chat/1.0.0".to_string()],
682 };
683 peers_guard.insert(connection_peer_id.clone(), peer_info);
684 }
685
686 let connection_event_tx = event_tx.clone();
688 let connection_peer_id_clone = connection_peer_id.clone();
689 let connection_peers = Arc::clone(&peers);
690
691 tokio::spawn(async move {
692 loop {
693 match connection.receive().await {
694 Ok(message_data) => {
695 debug!("Received {} bytes from peer: {}",
696 message_data.len(), connection_peer_id_clone);
697
698 if let Err(e) = Self::handle_received_message(
700 message_data,
701 &connection_peer_id_clone,
702 &connection_event_tx
703 ).await {
704 warn!("Failed to handle message from {}: {}",
705 connection_peer_id_clone, e);
706 }
707 }
708 Err(e) => {
709 warn!("Failed to receive message from {}: {}",
710 connection_peer_id_clone, e);
711
712 if !connection.is_alive().await {
714 info!("Connection to {} is dead, removing peer",
715 connection_peer_id_clone);
716
717 {
719 let mut peers_guard = connection_peers.write().await;
720 peers_guard.remove(&connection_peer_id_clone);
721 }
722
723 let _ = connection_event_tx.send(
725 P2PEvent::PeerDisconnected(connection_peer_id_clone.clone())
726 );
727
728 break; }
730
731 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
733 }
734 }
735 }
736 });
737 }
738 Err(e) => {
739 warn!("Failed to accept {:?} connection on {}: {}", transport_type, addr, e);
740
741 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
743 }
744 }
745 }
746 });
747
748 info!("Connection acceptor background task started for {:?} on {}", transport_type, addr);
749 Ok(())
750 }
751
752 async fn start_message_receiving_system(&self) -> Result<()> {
754 info!("Message receiving system initialized (background tasks simplified for demo)");
755
756 Ok(())
761 }
762
763 async fn handle_received_message(
765 message_data: Vec<u8>,
766 peer_id: &PeerId,
767 event_tx: &broadcast::Sender<P2PEvent>
768 ) -> Result<()> {
769 match serde_json::from_slice::<serde_json::Value>(&message_data) {
771 Ok(message) => {
772 if let (Some(protocol), Some(data), Some(from)) = (
773 message.get("protocol").and_then(|v| v.as_str()),
774 message.get("data").and_then(|v| v.as_array()),
775 message.get("from").and_then(|v| v.as_str())
776 ) {
777 let data_bytes: Vec<u8> = data.iter()
779 .filter_map(|v| v.as_u64().map(|n| n as u8))
780 .collect();
781
782 let event = P2PEvent::Message {
784 topic: protocol.to_string(),
785 source: from.to_string(),
786 data: data_bytes,
787 };
788
789 let _ = event_tx.send(event);
790 debug!("Generated message event from peer: {}", peer_id);
791 }
792 }
793 Err(e) => {
794 warn!("Failed to parse received message from {}: {}", peer_id, e);
795 }
796 }
797
798 Ok(())
799 }
800
801 fn multiaddr_to_socketaddr(&self, multiaddr: &Multiaddr) -> Option<std::net::SocketAddr> {
803 let addr_str = multiaddr.to_string();
805
806 if addr_str.starts_with("/ip4/") {
808 let parts: Vec<&str> = addr_str.split('/').collect();
809 if parts.len() >= 5 {
810 let ip = parts[2];
811 let port = parts[4];
812 if let Ok(port_num) = port.parse::<u16>() {
813 if let Ok(ip_addr) = ip.parse::<std::net::Ipv4Addr>() {
814 return Some(std::net::SocketAddr::V4(
815 std::net::SocketAddrV4::new(ip_addr, port_num)
816 ));
817 }
818 }
819 }
820 }
821
822 if addr_str.starts_with("/ip6/") {
824 let parts: Vec<&str> = addr_str.split('/').collect();
825 if parts.len() >= 5 {
826 let ip = parts[2];
827 let port = parts[4];
828 if let Ok(port_num) = port.parse::<u16>() {
829 if let Ok(ip_addr) = ip.parse::<std::net::Ipv6Addr>() {
830 return Some(std::net::SocketAddr::V6(
831 std::net::SocketAddrV6::new(ip_addr, port_num, 0, 0)
832 ));
833 }
834 }
835 }
836 }
837
838 None
839 }
840
841 pub async fn run(&self) -> Result<()> {
843 if !*self.running.read().await {
844 self.start().await?;
845 }
846
847 info!("P2P node running...");
848
849 loop {
851 if !*self.running.read().await {
852 break;
853 }
854
855 self.periodic_tasks().await?;
857
858 tokio::time::sleep(Duration::from_millis(100)).await;
860 }
861
862 info!("P2P node stopped");
863 Ok(())
864 }
865
866 pub async fn stop(&self) -> Result<()> {
868 info!("Stopping P2P node...");
869
870 *self.running.write().await = false;
872
873 if let Some(ref mcp_server) = self.mcp_server {
875 mcp_server.shutdown().await
876 .map_err(|e| P2PError::MCP(format!("Failed to shutdown MCP server: {}", e)))?;
877 info!("MCP server stopped");
878 }
879
880 self.disconnect_all_peers().await?;
882
883 if let Some(ref resource_manager) = self.resource_manager {
885 resource_manager.shutdown().await
886 .map_err(|e| P2PError::Network(format!("Failed to shutdown resource manager: {}", e)))?;
887 info!("Production resource manager stopped");
888 }
889
890 info!("P2P node stopped");
891 Ok(())
892 }
893
894 pub async fn is_running(&self) -> bool {
896 *self.running.read().await
897 }
898
899 pub async fn listen_addrs(&self) -> Vec<Multiaddr> {
901 self.listen_addrs.read().await.clone()
902 }
903
904 pub async fn connected_peers(&self) -> Vec<PeerId> {
906 self.peers.read().await.keys().cloned().collect()
907 }
908
909 pub async fn peer_count(&self) -> usize {
911 self.peers.read().await.len()
912 }
913
914 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
916 self.peers.read().await.get(peer_id).cloned()
917 }
918
919 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
921 info!("Connecting to peer at: {}", address);
922
923 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
925 Some(resource_manager.acquire_connection().await?)
926 } else {
927 None
928 };
929
930 let multiaddr: Multiaddr = address.parse()
932 .map_err(|e| P2PError::Transport(format!("Invalid address format: {}", e)))?;
933
934 let peer_id = match self.transport_manager.connect(&multiaddr).await {
936 Ok(connected_peer_id) => {
937 info!("Successfully connected to peer: {}", connected_peer_id);
938 connected_peer_id
939 }
940 Err(e) => {
941 warn!("Failed to connect to peer at {}: {}", address, e);
942
943 let demo_peer_id = format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
946 warn!("Using demo peer ID: {} (transport connection failed)", demo_peer_id);
947 demo_peer_id
948 }
949 };
950
951 let peer_info = PeerInfo {
953 peer_id: peer_id.clone(),
954 addresses: vec![address.to_string()],
955 connected_at: Instant::now(),
956 last_seen: Instant::now(),
957 status: ConnectionStatus::Connected,
958 protocols: vec!["p2p-foundation/1.0".to_string()],
959 };
960
961 self.peers.write().await.insert(peer_id.clone(), peer_info);
963
964 if let Some(ref resource_manager) = self.resource_manager {
966 resource_manager.record_bandwidth(0, 0); }
968
969 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
971
972 info!("Connected to peer: {}", peer_id);
973 Ok(peer_id)
974 }
975
976 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
978 info!("Disconnecting from peer: {}", peer_id);
979
980 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
981 peer_info.status = ConnectionStatus::Disconnected;
982
983 let _ = self.event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
985
986 info!("Disconnected from peer: {}", peer_id);
987 }
988
989 Ok(())
990 }
991
992 pub async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
994 debug!("Sending message to peer {} on protocol {}", peer_id, protocol);
995
996 if let Some(ref resource_manager) = self.resource_manager {
998 if !resource_manager.check_rate_limit(peer_id, "message").await? {
999 return Err(P2PError::Network(format!("Rate limit exceeded for peer {}", peer_id)));
1000 }
1001 }
1002
1003 if !self.peers.read().await.contains_key(peer_id) {
1005 return Err(P2PError::Network(format!("Peer {} not connected", peer_id)));
1006 }
1007
1008 if protocol == MCP_PROTOCOL {
1010 if let Some(ref mcp_server) = self.mcp_server {
1011 debug!("Handling MCP message locally for demonstration");
1016 if let Ok(response_data) = mcp_server.handle_p2p_message(&data, &self.peer_id).await {
1017 if let Some(response) = response_data {
1018 debug!("Generated MCP response: {} bytes", response.len());
1019 }
1021 }
1022 }
1023 }
1024
1025 if let Some(ref resource_manager) = self.resource_manager {
1027 resource_manager.record_bandwidth(data.len() as u64, 0);
1028 }
1029
1030 let message_data = self.create_protocol_message(protocol, data)?;
1032
1033 match self.transport_manager.send_message(peer_id, message_data).await {
1035 Ok(_) => {
1036 debug!("Message sent to peer {} via transport layer", peer_id);
1037 }
1038 Err(e) => {
1039 warn!("Failed to send message to peer {}: {}", peer_id, e);
1040 debug!("Demo mode: treating send failure as success for chat compatibility");
1043 }
1044 }
1045 Ok(())
1046 }
1047
1048 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1050 use serde_json::json;
1051
1052 let message = json!({
1054 "protocol": protocol,
1055 "data": data,
1056 "from": self.peer_id,
1057 "timestamp": std::time::SystemTime::now()
1058 .duration_since(std::time::UNIX_EPOCH)
1059 .unwrap()
1060 .as_secs()
1061 });
1062
1063 serde_json::to_vec(&message)
1064 .map_err(|e| P2PError::Transport(format!("Failed to serialize message: {}", e)))
1065 }
1066
1067 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1069 self.event_tx.subscribe()
1070 }
1071
1072 pub fn uptime(&self) -> Duration {
1074 self.start_time.elapsed()
1075 }
1076
1077 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1079 self.mcp_server.as_ref()
1080 }
1081
1082 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1084 if let Some(ref mcp_server) = self.mcp_server {
1085 mcp_server.register_tool(tool).await
1086 .map_err(|e| P2PError::MCP(format!("Failed to register tool: {}", e)))
1087 } else {
1088 Err(P2PError::MCP("MCP server not enabled".to_string()))
1089 }
1090 }
1091
1092 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1094 if let Some(ref mcp_server) = self.mcp_server {
1095 if let Some(ref resource_manager) = self.resource_manager {
1097 if !resource_manager.check_rate_limit(&self.peer_id, "mcp").await? {
1098 return Err(P2PError::MCP("MCP rate limit exceeded".to_string()));
1099 }
1100 }
1101
1102 let context = MCPCallContext {
1103 caller_id: self.peer_id.clone(),
1104 timestamp: SystemTime::now(),
1105 timeout: Duration::from_secs(30),
1106 auth_info: None,
1107 metadata: HashMap::new(),
1108 };
1109
1110 mcp_server.call_tool(tool_name, arguments, context).await
1111 .map_err(|e| P2PError::MCP(format!("Tool call failed: {}", e)))
1112 } else {
1113 Err(P2PError::MCP("MCP server not enabled".to_string()))
1114 }
1115 }
1116
1117 pub async fn call_remote_mcp_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value) -> Result<Value> {
1119 if let Some(ref mcp_server) = self.mcp_server {
1120 let context = MCPCallContext {
1122 caller_id: self.peer_id.clone(),
1123 timestamp: SystemTime::now(),
1124 timeout: Duration::from_secs(30),
1125 auth_info: None,
1126 metadata: HashMap::new(),
1127 };
1128
1129 match mcp_server.call_remote_tool(peer_id, tool_name, arguments.clone(), context).await {
1131 Ok(result) => Ok(result),
1132 Err(P2PError::MCP(msg)) if msg.contains("network integration") => {
1133 info!("Simulating remote MCP call to {} on peer {}", tool_name, peer_id);
1136
1137 self.call_mcp_tool(tool_name, arguments).await
1139 }
1140 Err(e) => Err(e),
1141 }
1142 } else {
1143 Err(P2PError::MCP("MCP server not enabled".to_string()))
1144 }
1145 }
1146
1147 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1149 if let Some(ref mcp_server) = self.mcp_server {
1150 let (tools, _) = mcp_server.list_tools(None).await
1151 .map_err(|e| P2PError::MCP(format!("Failed to list tools: {}", e)))?;
1152
1153 Ok(tools.into_iter().map(|tool| tool.name).collect())
1154 } else {
1155 Err(P2PError::MCP("MCP server not enabled".to_string()))
1156 }
1157 }
1158
1159 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1161 if let Some(ref mcp_server) = self.mcp_server {
1162 mcp_server.discover_remote_services().await
1163 .map_err(|e| P2PError::MCP(format!("Failed to discover services: {}", e)))
1164 } else {
1165 Err(P2PError::MCP("MCP server not enabled".to_string()))
1166 }
1167 }
1168
1169 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1171 if let Some(ref _mcp_server) = self.mcp_server {
1172 let request_message = crate::mcp::MCPMessage::ListTools {
1174 cursor: None,
1175 };
1176
1177 let p2p_message = crate::mcp::P2PMCPMessage {
1179 message_type: crate::mcp::P2PMCPMessageType::Request,
1180 message_id: uuid::Uuid::new_v4().to_string(),
1181 source_peer: self.peer_id.clone(),
1182 target_peer: Some(peer_id.clone()),
1183 timestamp: SystemTime::now()
1184 .duration_since(std::time::UNIX_EPOCH)
1185 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1186 .as_secs(),
1187 payload: request_message,
1188 ttl: 5,
1189 };
1190
1191 let message_data = serde_json::to_vec(&p2p_message)
1193 .map_err(|e| P2PError::Serialization(e))?;
1194
1195 self.send_message(peer_id, MCP_PROTOCOL, message_data).await?;
1197
1198 self.list_mcp_tools().await
1201 } else {
1202 Err(P2PError::MCP("MCP server not enabled".to_string()))
1203 }
1204 }
1205
1206 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1208 if let Some(ref mcp_server) = self.mcp_server {
1209 Ok(mcp_server.get_stats().await)
1210 } else {
1211 Err(P2PError::MCP("MCP server not enabled".to_string()))
1212 }
1213 }
1214
1215 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1217 if let Some(ref resource_manager) = self.resource_manager {
1218 Ok(resource_manager.get_metrics().await)
1219 } else {
1220 Err(P2PError::Network("Production resource manager not enabled".to_string()))
1221 }
1222 }
1223
1224 pub async fn health_check(&self) -> Result<()> {
1226 if let Some(ref resource_manager) = self.resource_manager {
1227 resource_manager.health_check().await
1228 } else {
1229 let peer_count = self.peer_count().await;
1231 if peer_count > self.config.max_connections {
1232 Err(P2PError::Network(format!("Too many connections: {}", peer_count)))
1233 } else {
1234 Ok(())
1235 }
1236 }
1237 }
1238
1239 pub fn production_config(&self) -> Option<&ProductionConfig> {
1241 self.config.production_config.as_ref()
1242 }
1243
1244 pub fn is_production_mode(&self) -> bool {
1246 self.resource_manager.is_some()
1247 }
1248
1249 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1251 self.dht.as_ref()
1252 }
1253
1254 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1256 if let Some(ref dht) = self.dht {
1257 let dht_instance = dht.write().await;
1258 dht_instance.put(key.clone(), value.clone()).await
1259 .map_err(|e| P2PError::DHT(format!("DHT put failed: {}", e)))?;
1260
1261 Ok(())
1262 } else {
1263 Err(P2PError::DHT("DHT not enabled".to_string()))
1264 }
1265 }
1266
1267 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1269 if let Some(ref dht) = self.dht {
1270 let dht_instance = dht.write().await;
1271 let record_result = dht_instance.get(&key).await;
1272
1273 let value = record_result.as_ref().map(|record| record.value.clone());
1274
1275 Ok(value)
1276 } else {
1277 Err(P2PError::DHT("DHT not enabled".to_string()))
1278 }
1279 }
1280
1281 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1283 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1284 let mut manager = bootstrap_manager.write().await;
1285 let contact = ContactEntry::new(peer_id, addresses);
1286 manager.add_contact(contact).await
1287 .map_err(|e| P2PError::Network(format!("Failed to add peer to bootstrap cache: {}", e)))?;
1288 }
1289 Ok(())
1290 }
1291
1292 pub async fn update_peer_metrics(&self, peer_id: &PeerId, success: bool, latency_ms: Option<u64>, _error: Option<String>) -> Result<()> {
1294 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1295 let mut manager = bootstrap_manager.write().await;
1296
1297 let metrics = QualityMetrics {
1299 success_rate: if success { 1.0 } else { 0.0 },
1300 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1301 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1303 last_successful_connection: if success { chrono::Utc::now() } else { chrono::Utc::now() - chrono::Duration::hours(1) },
1304 uptime_score: 0.5,
1305 };
1306
1307 manager.update_contact_metrics(peer_id, metrics).await
1308 .map_err(|e| P2PError::Network(format!("Failed to update peer metrics: {}", e)))?;
1309 }
1310 Ok(())
1311 }
1312
1313 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1315 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1316 let manager = bootstrap_manager.read().await;
1317 let stats = manager.get_stats().await
1318 .map_err(|e| P2PError::Network(format!("Failed to get bootstrap stats: {}", e)))?;
1319 Ok(Some(stats))
1320 } else {
1321 Ok(None)
1322 }
1323 }
1324
1325 pub async fn cached_peer_count(&self) -> usize {
1327 if let Some(ref _bootstrap_manager) = self.bootstrap_manager {
1328 if let Ok(stats) = self.get_bootstrap_cache_stats().await {
1329 if let Some(stats) = stats {
1330 return stats.total_contacts;
1331 }
1332 }
1333 }
1334 0
1335 }
1336
1337 async fn connect_bootstrap_peers(&self) -> Result<()> {
1339 let mut bootstrap_contacts = Vec::new();
1340 let mut used_cache = false;
1341
1342 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1344 let manager = bootstrap_manager.read().await;
1345 match manager.get_bootstrap_peers(20).await { Ok(contacts) => {
1347 if !contacts.is_empty() {
1348 info!("Using {} cached bootstrap peers", contacts.len());
1349 bootstrap_contacts = contacts;
1350 used_cache = true;
1351 }
1352 }
1353 Err(e) => {
1354 warn!("Failed to get cached bootstrap peers: {}", e);
1355 }
1356 }
1357 }
1358
1359 if bootstrap_contacts.is_empty() {
1361 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1362 &self.config.bootstrap_peers_str
1363 } else {
1364 &self.config.bootstrap_peers.iter().map(|addr| addr.to_string()).collect::<Vec<_>>()
1366 };
1367
1368 if bootstrap_peers.is_empty() {
1369 info!("No bootstrap peers configured and no cached peers available");
1370 return Ok(());
1371 }
1372
1373 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1374
1375 for addr in bootstrap_peers {
1376 let contact = ContactEntry::new(
1377 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1378 vec![addr.clone()]
1379 );
1380 bootstrap_contacts.push(contact);
1381 }
1382 }
1383
1384 let mut successful_connections = 0;
1386 for contact in bootstrap_contacts {
1387 for addr in &contact.addresses {
1388 match self.connect_peer(addr).await {
1389 Ok(peer_id) => {
1390 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1391 successful_connections += 1;
1392
1393 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1395 let mut manager = bootstrap_manager.write().await;
1396 let mut updated_contact = contact.clone();
1397 updated_contact.peer_id = peer_id.clone();
1398 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1401 warn!("Failed to update bootstrap cache: {}", e);
1402 }
1403 }
1404 break; }
1406 Err(e) => {
1407 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1408
1409 if used_cache {
1411 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1412 let mut manager = bootstrap_manager.write().await;
1413 let mut updated_contact = contact.clone();
1414 updated_contact.update_connection_result(false, None, Some(e.to_string()));
1415
1416 if let Err(e) = manager.add_contact(updated_contact).await {
1417 warn!("Failed to update bootstrap cache: {}", e);
1418 }
1419 }
1420 }
1421 }
1422 }
1423 }
1424 }
1425
1426 if successful_connections == 0 && !used_cache {
1427 warn!("Failed to connect to any bootstrap peers");
1428 } else {
1429 info!("Successfully connected to {} bootstrap peers", successful_connections);
1430 }
1431
1432 Ok(())
1433 }
1434
1435 async fn disconnect_all_peers(&self) -> Result<()> {
1437 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1438
1439 for peer_id in peer_ids {
1440 self.disconnect_peer(&peer_id).await?;
1441 }
1442
1443 Ok(())
1444 }
1445
1446 async fn periodic_tasks(&self) -> Result<()> {
1448 Ok(())
1454 }
1455}
1456
1457pub struct NodeBuilder {
1459 config: NodeConfig,
1460}
1461
1462impl NodeBuilder {
1463 pub fn new() -> Self {
1465 Self {
1466 config: NodeConfig::default(),
1467 }
1468 }
1469
1470 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1472 self.config.peer_id = Some(peer_id);
1473 self
1474 }
1475
1476 pub fn listen_on(mut self, addr: &str) -> Self {
1478 self.config.listen_addrs.push(addr.to_string());
1479 self
1480 }
1481
1482 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1484 self.config.bootstrap_peers.push(addr.to_string());
1485 self
1486 }
1487
1488 pub fn with_ipv6(mut self, enable: bool) -> Self {
1490 self.config.enable_ipv6 = enable;
1491 self
1492 }
1493
1494 pub fn with_mcp_server(mut self) -> Self {
1496 self.config.enable_mcp_server = true;
1497 self
1498 }
1499
1500 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
1502 self.config.mcp_server_config = Some(mcp_config);
1503 self.config.enable_mcp_server = true;
1504 self
1505 }
1506
1507 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1509 self.config.connection_timeout = timeout;
1510 self
1511 }
1512
1513 pub fn with_max_connections(mut self, max: usize) -> Self {
1515 self.config.max_connections = max;
1516 self
1517 }
1518
1519 pub fn with_production_mode(mut self) -> Self {
1521 self.config.production_config = Some(ProductionConfig::default());
1522 self
1523 }
1524
1525 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1527 self.config.production_config = Some(production_config);
1528 self
1529 }
1530
1531 pub async fn build(self) -> Result<P2PNode> {
1533 P2PNode::new(self.config).await
1534 }
1535}
1536
1537#[cfg(test)]
1538mod tests {
1539 use super::*;
1540 use crate::mcp::{Tool, MCPTool, ToolHandler, ToolMetadata, ToolHealthStatus, ToolRequirements};
1541 use serde_json::json;
1542 use std::pin::Pin;
1543 use std::future::Future;
1544 use std::time::Duration;
1545 use tokio::time::timeout;
1546
1547 struct NetworkTestTool {
1549 name: String,
1550 }
1551
1552 impl NetworkTestTool {
1553 fn new(name: &str) -> Self {
1554 Self {
1555 name: name.to_string(),
1556 }
1557 }
1558 }
1559
1560 impl ToolHandler for NetworkTestTool {
1561 fn execute(&self, arguments: serde_json::Value) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
1562 let name = self.name.clone();
1563 Box::pin(async move {
1564 Ok(json!({
1565 "tool": name,
1566 "input": arguments,
1567 "result": "network test success"
1568 }))
1569 })
1570 }
1571
1572 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
1573 Ok(())
1574 }
1575
1576 fn get_requirements(&self) -> ToolRequirements {
1577 ToolRequirements::default()
1578 }
1579 }
1580
1581 fn create_test_node_config() -> NodeConfig {
1583 NodeConfig {
1584 peer_id: Some("test_peer_123".to_string()),
1585 listen_addrs: vec![
1586 "/ip6/::1/tcp/9001".to_string(),
1587 "/ip4/127.0.0.1/tcp/9001".to_string(),
1588 ],
1589 listen_addr: "127.0.0.1:9001".parse().unwrap(),
1590 bootstrap_peers: vec![],
1591 bootstrap_peers_str: vec![],
1592 enable_ipv6: true,
1593 enable_mcp_server: true,
1594 mcp_server_config: Some(MCPServerConfig {
1595 enable_auth: false, enable_rate_limiting: false, ..Default::default()
1598 }),
1599 connection_timeout: Duration::from_secs(10),
1600 keep_alive_interval: Duration::from_secs(30),
1601 max_connections: 100,
1602 max_incoming_connections: 50,
1603 dht_config: DHTConfig::default(),
1604 security_config: SecurityConfig::default(),
1605 production_config: None,
1606 bootstrap_cache_config: None,
1607 }
1608 }
1609
1610 fn create_test_tool(name: &str) -> Tool {
1612 Tool {
1613 definition: MCPTool {
1614 name: name.to_string(),
1615 description: format!("Test tool: {}", name),
1616 input_schema: json!({
1617 "type": "object",
1618 "properties": {
1619 "input": { "type": "string" }
1620 }
1621 }),
1622 },
1623 handler: Box::new(NetworkTestTool::new(name)),
1624 metadata: ToolMetadata {
1625 created_at: SystemTime::now(),
1626 last_called: None,
1627 call_count: 0,
1628 avg_execution_time: Duration::from_millis(0),
1629 health_status: ToolHealthStatus::Healthy,
1630 tags: vec!["test".to_string()],
1631 },
1632 }
1633 }
1634
1635 #[tokio::test]
1636 async fn test_node_config_default() {
1637 let config = NodeConfig::default();
1638
1639 assert!(config.peer_id.is_none());
1640 assert_eq!(config.listen_addrs.len(), 2);
1641 assert!(config.enable_ipv6);
1642 assert!(config.enable_mcp_server);
1643 assert_eq!(config.max_connections, 1000);
1644 assert_eq!(config.max_incoming_connections, 100);
1645 assert_eq!(config.connection_timeout, Duration::from_secs(30));
1646 }
1647
1648 #[tokio::test]
1649 async fn test_dht_config_default() {
1650 let config = DHTConfig::default();
1651
1652 assert_eq!(config.k_value, 20);
1653 assert_eq!(config.alpha_value, 5);
1654 assert_eq!(config.record_ttl, Duration::from_secs(3600));
1655 assert_eq!(config.refresh_interval, Duration::from_secs(600));
1656 }
1657
1658 #[tokio::test]
1659 async fn test_security_config_default() {
1660 let config = SecurityConfig::default();
1661
1662 assert!(config.enable_noise);
1663 assert!(config.enable_tls);
1664 assert_eq!(config.trust_level, TrustLevel::Basic);
1665 }
1666
1667 #[test]
1668 fn test_trust_level_variants() {
1669 let _none = TrustLevel::None;
1671 let _basic = TrustLevel::Basic;
1672 let _full = TrustLevel::Full;
1673
1674 assert_eq!(TrustLevel::None, TrustLevel::None);
1676 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
1677 assert_eq!(TrustLevel::Full, TrustLevel::Full);
1678 assert_ne!(TrustLevel::None, TrustLevel::Basic);
1679 }
1680
1681 #[test]
1682 fn test_connection_status_variants() {
1683 let connecting = ConnectionStatus::Connecting;
1684 let connected = ConnectionStatus::Connected;
1685 let disconnecting = ConnectionStatus::Disconnecting;
1686 let disconnected = ConnectionStatus::Disconnected;
1687 let failed = ConnectionStatus::Failed("test error".to_string());
1688
1689 assert_eq!(connecting, ConnectionStatus::Connecting);
1690 assert_eq!(connected, ConnectionStatus::Connected);
1691 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
1692 assert_eq!(disconnected, ConnectionStatus::Disconnected);
1693 assert_ne!(connecting, connected);
1694
1695 if let ConnectionStatus::Failed(msg) = failed {
1696 assert_eq!(msg, "test error");
1697 } else {
1698 panic!("Expected Failed status");
1699 }
1700 }
1701
1702 #[tokio::test]
1703 async fn test_node_creation() -> Result<()> {
1704 let config = create_test_node_config();
1705 let node = P2PNode::new(config).await?;
1706
1707 assert_eq!(node.peer_id(), "test_peer_123");
1708 assert!(!node.is_running().await);
1709 assert_eq!(node.peer_count().await, 0);
1710 assert!(node.connected_peers().await.is_empty());
1711
1712 Ok(())
1713 }
1714
1715 #[tokio::test]
1716 async fn test_node_creation_without_peer_id() -> Result<()> {
1717 let mut config = create_test_node_config();
1718 config.peer_id = None;
1719
1720 let node = P2PNode::new(config).await?;
1721
1722 assert!(node.peer_id().starts_with("peer_"));
1724 assert!(!node.is_running().await);
1725
1726 Ok(())
1727 }
1728
1729 #[tokio::test]
1730 async fn test_node_lifecycle() -> Result<()> {
1731 let config = create_test_node_config();
1732 let node = P2PNode::new(config).await?;
1733
1734 assert!(!node.is_running().await);
1736
1737 node.start().await?;
1739 assert!(node.is_running().await);
1740
1741 let listen_addrs = node.listen_addrs().await;
1743 assert_eq!(listen_addrs.len(), 2);
1744
1745 node.stop().await?;
1747 assert!(!node.is_running().await);
1748
1749 Ok(())
1750 }
1751
1752 #[tokio::test]
1753 async fn test_peer_connection() -> Result<()> {
1754 let config = create_test_node_config();
1755 let node = P2PNode::new(config).await?;
1756
1757 let peer_addr = "/ip4/127.0.0.1/tcp/9002".to_string();
1758
1759 let peer_id = node.connect_peer(&peer_addr).await?;
1761 assert!(peer_id.starts_with("peer_from_"));
1762
1763 assert_eq!(node.peer_count().await, 1);
1765
1766 let connected_peers = node.connected_peers().await;
1768 assert_eq!(connected_peers.len(), 1);
1769 assert_eq!(connected_peers[0], peer_id);
1770
1771 let peer_info = node.peer_info(&peer_id).await;
1773 assert!(peer_info.is_some());
1774 let info = peer_info.unwrap();
1775 assert_eq!(info.peer_id, peer_id);
1776 assert_eq!(info.status, ConnectionStatus::Connected);
1777 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
1778
1779 node.disconnect_peer(&peer_id).await?;
1781 assert_eq!(node.peer_count().await, 0);
1782
1783 Ok(())
1784 }
1785
1786 #[tokio::test]
1787 async fn test_event_subscription() -> Result<()> {
1788 let config = create_test_node_config();
1789 let node = P2PNode::new(config).await?;
1790
1791 let mut events = node.subscribe_events();
1792 let peer_addr = "/ip4/127.0.0.1/tcp/9003".to_string();
1793
1794 let peer_id = node.connect_peer(&peer_addr).await?;
1796
1797 let event = timeout(Duration::from_millis(100), events.recv()).await;
1799 assert!(event.is_ok());
1800
1801 match event.unwrap().unwrap() {
1802 P2PEvent::PeerConnected(event_peer_id) => {
1803 assert_eq!(event_peer_id, peer_id);
1804 }
1805 _ => panic!("Expected PeerConnected event"),
1806 }
1807
1808 node.disconnect_peer(&peer_id).await?;
1810
1811 let event = timeout(Duration::from_millis(100), events.recv()).await;
1813 assert!(event.is_ok());
1814
1815 match event.unwrap().unwrap() {
1816 P2PEvent::PeerDisconnected(event_peer_id) => {
1817 assert_eq!(event_peer_id, peer_id);
1818 }
1819 _ => panic!("Expected PeerDisconnected event"),
1820 }
1821
1822 Ok(())
1823 }
1824
1825 #[tokio::test]
1826 async fn test_message_sending() -> Result<()> {
1827 let config = create_test_node_config();
1828 let node = P2PNode::new(config).await?;
1829
1830 let peer_addr = "/ip4/127.0.0.1/tcp/9004".to_string();
1831 let peer_id = node.connect_peer(&peer_addr).await?;
1832
1833 let message_data = b"Hello, peer!".to_vec();
1835 let result = node.send_message(&peer_id, "test-protocol", message_data).await;
1836 assert!(result.is_ok());
1837
1838 let non_existent_peer = "non_existent_peer".to_string();
1840 let result = node.send_message(&non_existent_peer, "test-protocol", vec![]).await;
1841 assert!(result.is_err());
1842 assert!(result.unwrap_err().to_string().contains("not connected"));
1843
1844 Ok(())
1845 }
1846
1847 #[tokio::test]
1848 async fn test_mcp_integration() -> Result<()> {
1849 let config = create_test_node_config();
1850 let node = P2PNode::new(config).await?;
1851
1852 node.start().await?;
1854
1855 let tool = create_test_tool("network_test_tool");
1857 node.register_mcp_tool(tool).await?;
1858
1859 let tools = node.list_mcp_tools().await?;
1861 assert!(tools.contains(&"network_test_tool".to_string()));
1862
1863 let arguments = json!({"input": "test_input"});
1865 let result = node.call_mcp_tool("network_test_tool", arguments.clone()).await?;
1866 assert_eq!(result["tool"], "network_test_tool");
1867 assert_eq!(result["input"], arguments);
1868
1869 let stats = node.mcp_stats().await?;
1871 assert_eq!(stats.total_tools, 1);
1872
1873 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
1875 assert!(result.is_err());
1876
1877 node.stop().await?;
1878 Ok(())
1879 }
1880
1881 #[tokio::test]
1882 async fn test_remote_mcp_operations() -> Result<()> {
1883 let config = create_test_node_config();
1884 let node = P2PNode::new(config).await?;
1885
1886 node.start().await?;
1887
1888 let tool = create_test_tool("remote_test_tool");
1890 node.register_mcp_tool(tool).await?;
1891
1892 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
1893 let peer_id = node.connect_peer(&peer_addr).await?;
1894
1895 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
1897 assert!(!remote_tools.is_empty());
1898
1899 let arguments = json!({"input": "remote_test"});
1901 let result = node.call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone()).await?;
1902 assert_eq!(result["tool"], "remote_test_tool");
1903
1904 let services = node.discover_remote_mcp_services().await?;
1906 assert!(services.is_empty());
1908
1909 node.stop().await?;
1910 Ok(())
1911 }
1912
1913 #[tokio::test]
1914 async fn test_health_check() -> Result<()> {
1915 let config = create_test_node_config();
1916 let node = P2PNode::new(config).await?;
1917
1918 let result = node.health_check().await;
1920 assert!(result.is_ok());
1921
1922 for i in 0..5 {
1924 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
1925 node.connect_peer(&addr).await?;
1926 }
1927
1928 let result = node.health_check().await;
1930 assert!(result.is_ok());
1931
1932 Ok(())
1933 }
1934
1935 #[tokio::test]
1936 async fn test_node_uptime() -> Result<()> {
1937 let config = create_test_node_config();
1938 let node = P2PNode::new(config).await?;
1939
1940 let uptime1 = node.uptime();
1941 assert!(uptime1 >= Duration::from_secs(0));
1942
1943 tokio::time::sleep(Duration::from_millis(10)).await;
1945
1946 let uptime2 = node.uptime();
1947 assert!(uptime2 > uptime1);
1948
1949 Ok(())
1950 }
1951
1952 #[tokio::test]
1953 async fn test_node_config_access() -> Result<()> {
1954 let config = create_test_node_config();
1955 let expected_peer_id = config.peer_id.clone();
1956 let node = P2PNode::new(config).await?;
1957
1958 let node_config = node.config();
1959 assert_eq!(node_config.peer_id, expected_peer_id);
1960 assert_eq!(node_config.max_connections, 100);
1961 assert!(node_config.enable_mcp_server);
1962
1963 Ok(())
1964 }
1965
1966 #[tokio::test]
1967 async fn test_mcp_server_access() -> Result<()> {
1968 let config = create_test_node_config();
1969 let node = P2PNode::new(config).await?;
1970
1971 assert!(node.mcp_server().is_some());
1973
1974 let mut config = create_test_node_config();
1976 config.enable_mcp_server = false;
1977 let node_no_mcp = P2PNode::new(config).await?;
1978 assert!(node_no_mcp.mcp_server().is_none());
1979
1980 Ok(())
1981 }
1982
1983 #[tokio::test]
1984 async fn test_dht_access() -> Result<()> {
1985 let config = create_test_node_config();
1986 let node = P2PNode::new(config).await?;
1987
1988 assert!(node.dht().is_some());
1990
1991 Ok(())
1992 }
1993
1994 #[tokio::test]
1995 async fn test_node_builder() -> Result<()> {
1996 let node = P2PNode::builder()
1997 .with_peer_id("builder_test_peer".to_string())
1998 .listen_on("/ip4/127.0.0.1/tcp/9100")
1999 .listen_on("/ip6/::1/tcp/9100")
2000 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
2001 .with_ipv6(true)
2002 .with_mcp_server()
2003 .with_connection_timeout(Duration::from_secs(15))
2004 .with_max_connections(200)
2005 .build()
2006 .await?;
2007
2008 assert_eq!(node.peer_id(), "builder_test_peer");
2009 let config = node.config();
2010 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
2012 assert!(config.enable_ipv6);
2013 assert!(config.enable_mcp_server);
2014 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2015 assert_eq!(config.max_connections, 200);
2016
2017 Ok(())
2018 }
2019
2020 #[tokio::test]
2021 async fn test_node_builder_with_mcp_config() -> Result<()> {
2022 let mcp_config = MCPServerConfig {
2023 server_name: "test_mcp_server".to_string(),
2024 server_version: "1.0.0".to_string(),
2025 enable_dht_discovery: false,
2026 enable_auth: false,
2027 ..MCPServerConfig::default()
2028 };
2029
2030 let node = P2PNode::builder()
2031 .with_peer_id("mcp_config_test".to_string())
2032 .with_mcp_config(mcp_config.clone())
2033 .build()
2034 .await?;
2035
2036 assert_eq!(node.peer_id(), "mcp_config_test");
2037 let config = node.config();
2038 assert!(config.enable_mcp_server);
2039 assert!(config.mcp_server_config.is_some());
2040
2041 let node_mcp_config = config.mcp_server_config.as_ref().unwrap();
2042 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
2043 assert!(!node_mcp_config.enable_auth);
2044
2045 Ok(())
2046 }
2047
2048 #[tokio::test]
2049 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
2050 let mut config = create_test_node_config();
2051 config.enable_mcp_server = false;
2052 let node = P2PNode::new(config).await?;
2053
2054 let tool = create_test_tool("test_tool");
2056 let result = node.register_mcp_tool(tool).await;
2057 assert!(result.is_err());
2058 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2059
2060 let result = node.call_mcp_tool("test_tool", json!({})).await;
2061 assert!(result.is_err());
2062 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2063
2064 let result = node.list_mcp_tools().await;
2065 assert!(result.is_err());
2066 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2067
2068 let result = node.mcp_stats().await;
2069 assert!(result.is_err());
2070 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2071
2072 Ok(())
2073 }
2074
2075 #[tokio::test]
2076 async fn test_bootstrap_peers() -> Result<()> {
2077 let mut config = create_test_node_config();
2078 config.bootstrap_peers = vec![
2079 "/ip4/127.0.0.1/tcp/9200".to_string(),
2080 "/ip4/127.0.0.1/tcp/9201".to_string(),
2081 ];
2082
2083 let node = P2PNode::new(config).await?;
2084
2085 node.start().await?;
2087
2088 let peer_count = node.peer_count().await;
2091 assert!(peer_count <= 2, "Peer count should not exceed bootstrap peer count");
2092
2093 node.stop().await?;
2094 Ok(())
2095 }
2096
2097 #[tokio::test]
2098 async fn test_production_mode_disabled() -> Result<()> {
2099 let config = create_test_node_config();
2100 let node = P2PNode::new(config).await?;
2101
2102 assert!(!node.is_production_mode());
2103 assert!(node.production_config().is_none());
2104
2105 let result = node.resource_metrics().await;
2107 assert!(result.is_err());
2108 assert!(result.unwrap_err().to_string().contains("not enabled"));
2109
2110 Ok(())
2111 }
2112
2113 #[tokio::test]
2114 async fn test_network_event_variants() {
2115 let peer_id = "test_peer".to_string();
2117 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2118
2119 let _peer_connected = NetworkEvent::PeerConnected {
2120 peer_id: peer_id.clone(),
2121 addresses: vec![address.clone()],
2122 };
2123
2124 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2125 peer_id: peer_id.clone(),
2126 reason: "test disconnect".to_string(),
2127 };
2128
2129 let _message_received = NetworkEvent::MessageReceived {
2130 peer_id: peer_id.clone(),
2131 protocol: "test-protocol".to_string(),
2132 data: vec![1, 2, 3],
2133 };
2134
2135 let _connection_failed = NetworkEvent::ConnectionFailed {
2136 peer_id: Some(peer_id.clone()),
2137 address: address.clone(),
2138 error: "connection refused".to_string(),
2139 };
2140
2141 let _dht_stored = NetworkEvent::DHTRecordStored {
2142 key: vec![1, 2, 3],
2143 value: vec![4, 5, 6],
2144 };
2145
2146 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2147 key: vec![1, 2, 3],
2148 value: Some(vec![4, 5, 6]),
2149 };
2150 }
2151
2152 #[tokio::test]
2153 async fn test_peer_info_structure() {
2154 let peer_info = PeerInfo {
2155 peer_id: "test_peer".to_string(),
2156 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2157 connected_at: Instant::now(),
2158 last_seen: Instant::now(),
2159 status: ConnectionStatus::Connected,
2160 protocols: vec!["test-protocol".to_string()],
2161 };
2162
2163 assert_eq!(peer_info.peer_id, "test_peer");
2164 assert_eq!(peer_info.addresses.len(), 1);
2165 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2166 assert_eq!(peer_info.protocols.len(), 1);
2167 }
2168
2169 #[tokio::test]
2170 async fn test_serialization() -> Result<()> {
2171 let config = create_test_node_config();
2173 let serialized = serde_json::to_string(&config)?;
2174 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2175
2176 assert_eq!(config.peer_id, deserialized.peer_id);
2177 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2178 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2179
2180 Ok(())
2181 }
2182}