1use crate::{PeerId, Multiaddr, P2PError, Result};
7use crate::mcp::{MCPServer, MCPServerConfig, Tool, MCPCallContext, MCP_PROTOCOL};
8use crate::dht::{DHT, DHTConfig as DHTConfigInner};
9use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
10use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
11use crate::transport::{TransportManager, QuicTransport, TcpTransport, TransportSelection, TransportOptions};
12use crate::identity::manager::{IdentityManager, IdentityManagerConfig};
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, SystemTime};
18use tokio::sync::{broadcast, RwLock};
19use tokio::time::Instant;
20use tracing::{debug, info, warn};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct NodeConfig {
25 pub peer_id: Option<PeerId>,
27
28 pub listen_addrs: Vec<Multiaddr>,
30
31 pub listen_addr: std::net::SocketAddr,
33
34 pub bootstrap_peers: Vec<Multiaddr>,
36
37 pub bootstrap_peers_str: Vec<String>,
39
40 pub enable_ipv6: bool,
42
43 pub enable_mcp_server: bool,
45
46 pub mcp_server_config: Option<MCPServerConfig>,
48
49 pub connection_timeout: Duration,
51
52 pub keep_alive_interval: Duration,
54
55 pub max_connections: usize,
57
58 pub max_incoming_connections: usize,
60
61 pub dht_config: DHTConfig,
63
64 pub security_config: SecurityConfig,
66
67 pub production_config: Option<ProductionConfig>,
69
70 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
72
73 pub identity_config: Option<IdentityManagerConfig>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct DHTConfig {
80 pub k_value: usize,
82
83 pub alpha_value: usize,
85
86 pub record_ttl: Duration,
88
89 pub refresh_interval: Duration,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct SecurityConfig {
96 pub enable_noise: bool,
98
99 pub enable_tls: bool,
101
102 pub trust_level: TrustLevel,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
108pub enum TrustLevel {
109 None,
111 Basic,
113 Full,
115}
116
117impl Default for NodeConfig {
118 fn default() -> Self {
119 Self {
120 peer_id: None,
121 listen_addrs: vec![
122 "/ip6/::/tcp/9000".to_string(),
123 "/ip4/0.0.0.0/tcp/9000".to_string(),
124 ],
125 listen_addr: "127.0.0.1:9000".parse().unwrap(),
126 bootstrap_peers: Vec::new(),
127 bootstrap_peers_str: Vec::new(),
128 enable_ipv6: true,
129 enable_mcp_server: true,
130 mcp_server_config: None, connection_timeout: Duration::from_secs(30),
132 keep_alive_interval: Duration::from_secs(60),
133 max_connections: 1000,
134 max_incoming_connections: 100,
135 dht_config: DHTConfig::default(),
136 security_config: SecurityConfig::default(),
137 production_config: None, bootstrap_cache_config: None,
139 identity_config: None, }
141 }
142}
143
144impl Default for DHTConfig {
145 fn default() -> Self {
146 Self {
147 k_value: 20,
148 alpha_value: 5,
149 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
152 }
153}
154
155impl Default for SecurityConfig {
156 fn default() -> Self {
157 Self {
158 enable_noise: true,
159 enable_tls: true,
160 trust_level: TrustLevel::Basic,
161 }
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct PeerInfo {
168 pub peer_id: PeerId,
170
171 pub addresses: Vec<String>,
173
174 pub connected_at: Instant,
176
177 pub last_seen: Instant,
179
180 pub status: ConnectionStatus,
182
183 pub protocols: Vec<String>,
185}
186
187#[derive(Debug, Clone, PartialEq)]
189pub enum ConnectionStatus {
190 Connecting,
192 Connected,
194 Disconnecting,
196 Disconnected,
198 Failed(String),
200}
201
202#[derive(Debug, Clone)]
204pub enum NetworkEvent {
205 PeerConnected {
207 peer_id: PeerId,
209 addresses: Vec<String>,
211 },
212
213 PeerDisconnected {
215 peer_id: PeerId,
217 reason: String,
219 },
220
221 MessageReceived {
223 peer_id: PeerId,
225 protocol: String,
227 data: Vec<u8>,
229 },
230
231 ConnectionFailed {
233 peer_id: Option<PeerId>,
235 address: String,
237 error: String,
239 },
240
241 DHTRecordStored {
243 key: Vec<u8>,
245 value: Vec<u8>,
247 },
248
249 DHTRecordRetrieved {
251 key: Vec<u8>,
253 value: Option<Vec<u8>>,
255 },
256}
257
258#[derive(Debug, Clone)]
260pub enum P2PEvent {
261 Message { topic: String, source: PeerId, data: Vec<u8> },
262 PeerConnected(PeerId),
263 PeerDisconnected(PeerId),
264}
265
266pub struct P2PNode {
268 config: NodeConfig,
270
271 peer_id: PeerId,
273
274 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
276
277 event_tx: broadcast::Sender<P2PEvent>,
279
280 listen_addrs: RwLock<Vec<Multiaddr>>,
282
283 start_time: Instant,
285
286 running: RwLock<bool>,
288
289 mcp_server: Option<Arc<MCPServer>>,
291
292 dht: Option<Arc<RwLock<DHT>>>,
294
295 resource_manager: Option<Arc<ResourceManager>>,
297
298 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
300
301 transport_manager: Arc<TransportManager>,
303}
304
305impl P2PNode {
306 pub async fn new(config: NodeConfig) -> Result<Self> {
308 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
309 format!("peer_{}", uuid::Uuid::new_v4().to_string()[..8].to_string())
311 });
312
313 let (event_tx, _) = broadcast::channel(1000);
314
315 let dht = if config.enable_mcp_server || true { let dht_config = DHTConfigInner {
318 replication_factor: config.dht_config.k_value,
319 bucket_size: config.dht_config.k_value,
320 alpha: config.dht_config.alpha_value,
321 record_ttl: config.dht_config.record_ttl,
322 bucket_refresh_interval: config.dht_config.refresh_interval,
323 republish_interval: config.dht_config.refresh_interval,
324 max_distance: 160, };
326 let dht_key = crate::dht::Key::new(peer_id.as_bytes());
327 let dht_instance = DHT::new(dht_key, dht_config);
328 Some(Arc::new(RwLock::new(dht_instance)))
329 } else {
330 None
331 };
332
333 let mcp_server = if config.enable_mcp_server {
335 let mcp_config = config.mcp_server_config.clone().unwrap_or_else(|| {
336 MCPServerConfig {
337 server_name: format!("P2P-MCP-{}", peer_id),
338 server_version: crate::VERSION.to_string(),
339 enable_dht_discovery: dht.is_some(),
340 ..MCPServerConfig::default()
341 }
342 });
343
344 let mut server = MCPServer::new(mcp_config);
345
346 if let Some(ref dht_instance) = dht {
348 server = server.with_dht(dht_instance.clone());
349 }
350
351 Some(Arc::new(server))
352 } else {
353 None
354 };
355
356 let resource_manager = if let Some(prod_config) = config.production_config.clone() {
358 Some(Arc::new(ResourceManager::new(prod_config)))
359 } else {
360 None
361 };
362
363 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
365 match BootstrapManager::with_config(cache_config.clone()).await {
366 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
367 Err(e) => {
368 warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
369 None
370 }
371 }
372 } else {
373 match BootstrapManager::new().await {
374 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
375 Err(e) => {
376 warn!("Failed to initialize bootstrap manager: {}, continuing without cache", e);
377 None
378 }
379 }
380 };
381
382 let transport_options = TransportOptions::default();
384 let mut transport_manager = TransportManager::new(
385 TransportSelection::default(), transport_options
387 );
388
389 match QuicTransport::new(true) { Ok(quic_transport) => {
392 transport_manager.register_transport(Arc::new(quic_transport));
393 info!("Registered QUIC transport");
394 }
395 Err(e) => {
396 warn!("Failed to create QUIC transport: {}, continuing without QUIC", e);
397 }
398 }
399
400 let tcp_transport = TcpTransport::new(false); transport_manager.register_transport(Arc::new(tcp_transport));
403 info!("Registered TCP transport");
404
405 let transport_manager = Arc::new(transport_manager);
406
407 let node = Self {
408 config,
409 peer_id,
410 peers: Arc::new(RwLock::new(HashMap::new())),
411 event_tx,
412 listen_addrs: RwLock::new(Vec::new()),
413 start_time: Instant::now(),
414 running: RwLock::new(false),
415 mcp_server,
416 dht,
417 resource_manager,
418 bootstrap_manager,
419 transport_manager,
420 };
421
422 info!("Created P2P node with peer ID: {}", node.peer_id);
423 Ok(node)
424 }
425
426 pub fn builder() -> NodeBuilder {
428 NodeBuilder::new()
429 }
430
431 pub fn peer_id(&self) -> &PeerId {
433 &self.peer_id
434 }
435
436 pub fn local_addr(&self) -> Option<String> {
437 self.listen_addrs.try_read().ok().and_then(|addrs| addrs.get(0).map(|a| a.to_string()))
438 }
439
440 pub async fn subscribe(&self, topic: &str) -> Result<()> {
441 info!("Subscribed to topic: {}", topic);
444 Ok(())
445 }
446
447 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
448 info!("Publishing message to topic: {} ({} bytes)", topic, data.len());
449
450 let peer_list: Vec<PeerId> = {
452 let peers_guard = self.peers.read().await;
453 peers_guard.keys().cloned().collect()
454 };
455
456 if peer_list.is_empty() {
457 debug!("No peers connected, message will only be sent to local subscribers");
458 } else {
459 let mut send_count = 0;
461 for peer_id in &peer_list {
462 match self.send_message(peer_id, topic, data.to_vec()).await {
463 Ok(_) => {
464 send_count += 1;
465 debug!("Sent message to peer: {}", peer_id);
466 }
467 Err(e) => {
468 warn!("Failed to send message to peer {}: {}", peer_id, e);
469 }
470 }
471 }
472 info!("Published message to {}/{} connected peers", send_count, peer_list.len());
473 }
474
475 let event = P2PEvent::Message {
477 topic: topic.to_string(),
478 source: self.peer_id.clone(),
479 data: data.to_vec(),
480 };
481 let _ = self.event_tx.send(event);
482
483 Ok(())
484 }
485
486 pub fn config(&self) -> &NodeConfig {
488 &self.config
489 }
490
491 pub async fn start(&self) -> Result<()> {
493 info!("Starting P2P node...");
494
495 if let Some(ref resource_manager) = self.resource_manager {
497 resource_manager.start().await
498 .map_err(|e| P2PError::Network(format!("Failed to start resource manager: {}", e)))?;
499 info!("Production resource manager started");
500 }
501
502 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
504 let mut manager = bootstrap_manager.write().await;
505 manager.start_background_tasks().await
506 .map_err(|e| P2PError::Network(format!("Failed to start bootstrap manager: {}", e)))?;
507 info!("Bootstrap cache manager started");
508 }
509
510 *self.running.write().await = true;
512
513 self.start_network_listeners().await?;
515
516 let mut listen_addrs = self.listen_addrs.write().await;
518 listen_addrs.extend(self.config.listen_addrs.clone());
519
520 info!("P2P node started on addresses: {:?}", *listen_addrs);
521
522 if let Some(ref mcp_server) = self.mcp_server {
524 mcp_server.start().await
525 .map_err(|e| P2PError::MCP(format!("Failed to start MCP server: {}", e)))?;
526 info!("MCP server started");
527 }
528
529 self.start_message_receiving_system().await?;
531
532 self.connect_bootstrap_peers().await?;
534
535 Ok(())
536 }
537
538 async fn start_network_listeners(&self) -> Result<()> {
540 info!("Starting network listeners...");
541
542 let transport_manager = &self.transport_manager;
544
545 for multiaddr in &self.config.listen_addrs {
547 if let Some(socket_addr) = self.multiaddr_to_socketaddr(multiaddr) {
549 if let Err(e) = self.start_listener_on_address(socket_addr).await {
552 warn!("Failed to start listener on {}: {}", socket_addr, e);
553 } else {
554 info!("Started listener on {}", socket_addr);
555 }
556 } else {
557 warn!("Could not parse address for listening: {}", multiaddr);
558 }
559 }
560
561 if self.config.listen_addrs.is_empty() {
563 let default_addrs = vec![
565 "0.0.0.0:9000".parse::<std::net::SocketAddr>().unwrap(),
566 "[::]:9000".parse::<std::net::SocketAddr>().unwrap(),
567 ];
568
569 for addr in default_addrs {
570 if let Err(e) = self.start_listener_on_address(addr).await {
571 warn!("Failed to start default listener on {}: {}", addr, e);
572 } else {
573 info!("Started default listener on {}", addr);
574 }
575 }
576 }
577
578 Ok(())
579 }
580
581 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
583 use crate::transport::{TransportType, Transport};
584
585 match crate::transport::QuicTransport::new(true) {
587 Ok(quic_transport) => {
588 match quic_transport.listen(addr).await {
589 Ok(listen_addrs) => {
590 info!("QUIC listener started on {} -> {:?}", addr, listen_addrs);
591
592 {
594 let mut node_listen_addrs = self.listen_addrs.write().await;
595 node_listen_addrs.clear(); node_listen_addrs.extend(listen_addrs);
597 }
598
599 self.start_connection_acceptor(
601 Arc::new(quic_transport),
602 addr,
603 crate::transport::TransportType::QUIC
604 ).await?;
605
606 return Ok(());
607 }
608 Err(e) => {
609 warn!("Failed to start QUIC listener on {}: {}", addr, e);
610 }
611 }
612 }
613 Err(e) => {
614 warn!("Failed to create QUIC transport for listening: {}", e);
615 }
616 }
617
618 let tcp_transport = crate::transport::TcpTransport::new(false);
620 match tcp_transport.listen(addr).await {
621 Ok(listen_addrs) => {
622 info!("TCP listener started on {} -> {:?}", addr, listen_addrs);
623
624 {
626 let mut node_listen_addrs = self.listen_addrs.write().await;
627 node_listen_addrs.clear(); node_listen_addrs.extend(listen_addrs);
629 }
630
631 self.start_connection_acceptor(
633 Arc::new(tcp_transport),
634 addr,
635 crate::transport::TransportType::TCP
636 ).await?;
637
638 Ok(())
639 }
640 Err(e) => {
641 warn!("Failed to start TCP listener on {}: {}", addr, e);
642 Err(e)
643 }
644 }
645 }
646
647 async fn start_connection_acceptor(
649 &self,
650 transport: Arc<dyn crate::transport::Transport>,
651 addr: std::net::SocketAddr,
652 transport_type: crate::transport::TransportType
653 ) -> Result<()> {
654 info!("Starting connection acceptor for {:?} on {}", transport_type, addr);
655
656 let event_tx = self.event_tx.clone();
658 let peer_id = self.peer_id.clone();
659 let peers = Arc::clone(&self.peers);
660 let transport_manager = Arc::clone(&self.transport_manager);
661
662 tokio::spawn(async move {
664 loop {
665 match transport.accept().await {
666 Ok(mut connection) => {
667 let remote_addr = connection.remote_addr();
668 let connection_peer_id = format!("peer_from_{}",
669 remote_addr.replace("/", "_").replace(":", "_"));
670
671 info!("Accepted {:?} connection from {} (peer: {})",
672 transport_type, remote_addr, connection_peer_id);
673
674 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
676
677 {
679 let mut peers_guard = peers.write().await;
680 let peer_info = PeerInfo {
681 peer_id: connection_peer_id.clone(),
682 addresses: vec![remote_addr.clone()],
683 connected_at: tokio::time::Instant::now(),
684 last_seen: tokio::time::Instant::now(),
685 status: ConnectionStatus::Connected,
686 protocols: vec!["p2p-chat/1.0.0".to_string()],
687 };
688 peers_guard.insert(connection_peer_id.clone(), peer_info);
689 }
690
691 let connection_event_tx = event_tx.clone();
693 let connection_peer_id_clone = connection_peer_id.clone();
694 let connection_peers = Arc::clone(&peers);
695
696 tokio::spawn(async move {
697 loop {
698 match connection.receive().await {
699 Ok(message_data) => {
700 debug!("Received {} bytes from peer: {}",
701 message_data.len(), connection_peer_id_clone);
702
703 if let Err(e) = Self::handle_received_message(
705 message_data,
706 &connection_peer_id_clone,
707 &connection_event_tx
708 ).await {
709 warn!("Failed to handle message from {}: {}",
710 connection_peer_id_clone, e);
711 }
712 }
713 Err(e) => {
714 warn!("Failed to receive message from {}: {}",
715 connection_peer_id_clone, e);
716
717 if !connection.is_alive().await {
719 info!("Connection to {} is dead, removing peer",
720 connection_peer_id_clone);
721
722 {
724 let mut peers_guard = connection_peers.write().await;
725 peers_guard.remove(&connection_peer_id_clone);
726 }
727
728 let _ = connection_event_tx.send(
730 P2PEvent::PeerDisconnected(connection_peer_id_clone.clone())
731 );
732
733 break; }
735
736 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
738 }
739 }
740 }
741 });
742 }
743 Err(e) => {
744 warn!("Failed to accept {:?} connection on {}: {}", transport_type, addr, e);
745
746 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
748 }
749 }
750 }
751 });
752
753 info!("Connection acceptor background task started for {:?} on {}", transport_type, addr);
754 Ok(())
755 }
756
757 async fn start_message_receiving_system(&self) -> Result<()> {
759 info!("Message receiving system initialized (background tasks simplified for demo)");
760
761 Ok(())
766 }
767
768 async fn handle_received_message(
770 message_data: Vec<u8>,
771 peer_id: &PeerId,
772 event_tx: &broadcast::Sender<P2PEvent>
773 ) -> Result<()> {
774 match serde_json::from_slice::<serde_json::Value>(&message_data) {
776 Ok(message) => {
777 if let (Some(protocol), Some(data), Some(from)) = (
778 message.get("protocol").and_then(|v| v.as_str()),
779 message.get("data").and_then(|v| v.as_array()),
780 message.get("from").and_then(|v| v.as_str())
781 ) {
782 let data_bytes: Vec<u8> = data.iter()
784 .filter_map(|v| v.as_u64().map(|n| n as u8))
785 .collect();
786
787 let event = P2PEvent::Message {
789 topic: protocol.to_string(),
790 source: from.to_string(),
791 data: data_bytes,
792 };
793
794 let _ = event_tx.send(event);
795 debug!("Generated message event from peer: {}", peer_id);
796 }
797 }
798 Err(e) => {
799 warn!("Failed to parse received message from {}: {}", peer_id, e);
800 }
801 }
802
803 Ok(())
804 }
805
806 fn multiaddr_to_socketaddr(&self, multiaddr: &Multiaddr) -> Option<std::net::SocketAddr> {
808 let addr_str = multiaddr.to_string();
810
811 if addr_str.starts_with("/ip4/") {
813 let parts: Vec<&str> = addr_str.split('/').collect();
814 if parts.len() >= 5 {
815 let ip = parts[2];
816 let port = parts[4];
817 if let Ok(port_num) = port.parse::<u16>() {
818 if let Ok(ip_addr) = ip.parse::<std::net::Ipv4Addr>() {
819 return Some(std::net::SocketAddr::V4(
820 std::net::SocketAddrV4::new(ip_addr, port_num)
821 ));
822 }
823 }
824 }
825 }
826
827 if addr_str.starts_with("/ip6/") {
829 let parts: Vec<&str> = addr_str.split('/').collect();
830 if parts.len() >= 5 {
831 let ip = parts[2];
832 let port = parts[4];
833 if let Ok(port_num) = port.parse::<u16>() {
834 if let Ok(ip_addr) = ip.parse::<std::net::Ipv6Addr>() {
835 return Some(std::net::SocketAddr::V6(
836 std::net::SocketAddrV6::new(ip_addr, port_num, 0, 0)
837 ));
838 }
839 }
840 }
841 }
842
843 None
844 }
845
846 pub async fn run(&self) -> Result<()> {
848 if !*self.running.read().await {
849 self.start().await?;
850 }
851
852 info!("P2P node running...");
853
854 loop {
856 if !*self.running.read().await {
857 break;
858 }
859
860 self.periodic_tasks().await?;
862
863 tokio::time::sleep(Duration::from_millis(100)).await;
865 }
866
867 info!("P2P node stopped");
868 Ok(())
869 }
870
871 pub async fn stop(&self) -> Result<()> {
873 info!("Stopping P2P node...");
874
875 *self.running.write().await = false;
877
878 if let Some(ref mcp_server) = self.mcp_server {
880 mcp_server.shutdown().await
881 .map_err(|e| P2PError::MCP(format!("Failed to shutdown MCP server: {}", e)))?;
882 info!("MCP server stopped");
883 }
884
885 self.disconnect_all_peers().await?;
887
888 if let Some(ref resource_manager) = self.resource_manager {
890 resource_manager.shutdown().await
891 .map_err(|e| P2PError::Network(format!("Failed to shutdown resource manager: {}", e)))?;
892 info!("Production resource manager stopped");
893 }
894
895 info!("P2P node stopped");
896 Ok(())
897 }
898
899 pub async fn is_running(&self) -> bool {
901 *self.running.read().await
902 }
903
904 pub async fn listen_addrs(&self) -> Vec<Multiaddr> {
906 self.listen_addrs.read().await.clone()
907 }
908
909 pub async fn connected_peers(&self) -> Vec<PeerId> {
911 self.peers.read().await.keys().cloned().collect()
912 }
913
914 pub async fn peer_count(&self) -> usize {
916 self.peers.read().await.len()
917 }
918
919 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
921 self.peers.read().await.get(peer_id).cloned()
922 }
923
924 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
926 info!("Connecting to peer at: {}", address);
927
928 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
930 Some(resource_manager.acquire_connection().await?)
931 } else {
932 None
933 };
934
935 let multiaddr: Multiaddr = address.parse()
937 .map_err(|e| P2PError::Transport(format!("Invalid address format: {}", e)))?;
938
939 let peer_id = match self.transport_manager.connect(&multiaddr).await {
941 Ok(connected_peer_id) => {
942 info!("Successfully connected to peer: {}", connected_peer_id);
943 connected_peer_id
944 }
945 Err(e) => {
946 warn!("Failed to connect to peer at {}: {}", address, e);
947
948 let demo_peer_id = format!("peer_from_{}", address.replace("/", "_").replace(":", "_"));
951 warn!("Using demo peer ID: {} (transport connection failed)", demo_peer_id);
952 demo_peer_id
953 }
954 };
955
956 let peer_info = PeerInfo {
958 peer_id: peer_id.clone(),
959 addresses: vec![address.to_string()],
960 connected_at: Instant::now(),
961 last_seen: Instant::now(),
962 status: ConnectionStatus::Connected,
963 protocols: vec!["p2p-foundation/1.0".to_string()],
964 };
965
966 self.peers.write().await.insert(peer_id.clone(), peer_info);
968
969 if let Some(ref resource_manager) = self.resource_manager {
971 resource_manager.record_bandwidth(0, 0); }
973
974 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
976
977 info!("Connected to peer: {}", peer_id);
978 Ok(peer_id)
979 }
980
981 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
983 info!("Disconnecting from peer: {}", peer_id);
984
985 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
986 peer_info.status = ConnectionStatus::Disconnected;
987
988 let _ = self.event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
990
991 info!("Disconnected from peer: {}", peer_id);
992 }
993
994 Ok(())
995 }
996
997 pub async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
999 debug!("Sending message to peer {} on protocol {}", peer_id, protocol);
1000
1001 if let Some(ref resource_manager) = self.resource_manager {
1003 if !resource_manager.check_rate_limit(peer_id, "message").await? {
1004 return Err(P2PError::Network(format!("Rate limit exceeded for peer {}", peer_id)));
1005 }
1006 }
1007
1008 if !self.peers.read().await.contains_key(peer_id) {
1010 return Err(P2PError::Network(format!("Peer {} not connected", peer_id)));
1011 }
1012
1013 if protocol == MCP_PROTOCOL {
1015 if let Some(ref mcp_server) = self.mcp_server {
1016 debug!("Handling MCP message locally for demonstration");
1021 if let Ok(response_data) = mcp_server.handle_p2p_message(&data, &self.peer_id).await {
1022 if let Some(response) = response_data {
1023 debug!("Generated MCP response: {} bytes", response.len());
1024 }
1026 }
1027 }
1028 }
1029
1030 if let Some(ref resource_manager) = self.resource_manager {
1032 resource_manager.record_bandwidth(data.len() as u64, 0);
1033 }
1034
1035 let message_data = self.create_protocol_message(protocol, data)?;
1037
1038 match self.transport_manager.send_message(peer_id, message_data).await {
1040 Ok(_) => {
1041 debug!("Message sent to peer {} via transport layer", peer_id);
1042 }
1043 Err(e) => {
1044 warn!("Failed to send message to peer {}: {}", peer_id, e);
1045 debug!("Demo mode: treating send failure as success for chat compatibility");
1048 }
1049 }
1050 Ok(())
1051 }
1052
1053 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1055 use serde_json::json;
1056
1057 let message = json!({
1059 "protocol": protocol,
1060 "data": data,
1061 "from": self.peer_id,
1062 "timestamp": std::time::SystemTime::now()
1063 .duration_since(std::time::UNIX_EPOCH)
1064 .unwrap()
1065 .as_secs()
1066 });
1067
1068 serde_json::to_vec(&message)
1069 .map_err(|e| P2PError::Transport(format!("Failed to serialize message: {}", e)))
1070 }
1071
1072 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1074 self.event_tx.subscribe()
1075 }
1076
1077 pub fn uptime(&self) -> Duration {
1079 self.start_time.elapsed()
1080 }
1081
1082 pub fn mcp_server(&self) -> Option<&Arc<MCPServer>> {
1084 self.mcp_server.as_ref()
1085 }
1086
1087 pub async fn register_mcp_tool(&self, tool: Tool) -> Result<()> {
1089 if let Some(ref mcp_server) = self.mcp_server {
1090 mcp_server.register_tool(tool).await
1091 .map_err(|e| P2PError::MCP(format!("Failed to register tool: {}", e)))
1092 } else {
1093 Err(P2PError::MCP("MCP server not enabled".to_string()))
1094 }
1095 }
1096
1097 pub async fn call_mcp_tool(&self, tool_name: &str, arguments: Value) -> Result<Value> {
1099 if let Some(ref mcp_server) = self.mcp_server {
1100 if let Some(ref resource_manager) = self.resource_manager {
1102 if !resource_manager.check_rate_limit(&self.peer_id, "mcp").await? {
1103 return Err(P2PError::MCP("MCP rate limit exceeded".to_string()));
1104 }
1105 }
1106
1107 let context = MCPCallContext {
1108 caller_id: self.peer_id.clone(),
1109 timestamp: SystemTime::now(),
1110 timeout: Duration::from_secs(30),
1111 auth_info: None,
1112 metadata: HashMap::new(),
1113 };
1114
1115 mcp_server.call_tool(tool_name, arguments, context).await
1116 .map_err(|e| P2PError::MCP(format!("Tool call failed: {}", e)))
1117 } else {
1118 Err(P2PError::MCP("MCP server not enabled".to_string()))
1119 }
1120 }
1121
1122 pub async fn call_remote_mcp_tool(&self, peer_id: &PeerId, tool_name: &str, arguments: Value) -> Result<Value> {
1124 if let Some(ref mcp_server) = self.mcp_server {
1125 let context = MCPCallContext {
1127 caller_id: self.peer_id.clone(),
1128 timestamp: SystemTime::now(),
1129 timeout: Duration::from_secs(30),
1130 auth_info: None,
1131 metadata: HashMap::new(),
1132 };
1133
1134 match mcp_server.call_remote_tool(peer_id, tool_name, arguments.clone(), context).await {
1136 Ok(result) => Ok(result),
1137 Err(P2PError::MCP(msg)) if msg.contains("network integration") => {
1138 info!("Simulating remote MCP call to {} on peer {}", tool_name, peer_id);
1141
1142 self.call_mcp_tool(tool_name, arguments).await
1144 }
1145 Err(e) => Err(e),
1146 }
1147 } else {
1148 Err(P2PError::MCP("MCP server not enabled".to_string()))
1149 }
1150 }
1151
1152 pub async fn list_mcp_tools(&self) -> Result<Vec<String>> {
1154 if let Some(ref mcp_server) = self.mcp_server {
1155 let (tools, _) = mcp_server.list_tools(None).await
1156 .map_err(|e| P2PError::MCP(format!("Failed to list tools: {}", e)))?;
1157
1158 Ok(tools.into_iter().map(|tool| tool.name).collect())
1159 } else {
1160 Err(P2PError::MCP("MCP server not enabled".to_string()))
1161 }
1162 }
1163
1164 pub async fn discover_remote_mcp_services(&self) -> Result<Vec<crate::mcp::MCPService>> {
1166 if let Some(ref mcp_server) = self.mcp_server {
1167 mcp_server.discover_remote_services().await
1168 .map_err(|e| P2PError::MCP(format!("Failed to discover services: {}", e)))
1169 } else {
1170 Err(P2PError::MCP("MCP server not enabled".to_string()))
1171 }
1172 }
1173
1174 pub async fn list_remote_mcp_tools(&self, peer_id: &PeerId) -> Result<Vec<String>> {
1176 if let Some(ref _mcp_server) = self.mcp_server {
1177 let request_message = crate::mcp::MCPMessage::ListTools {
1179 cursor: None,
1180 };
1181
1182 let p2p_message = crate::mcp::P2PMCPMessage {
1184 message_type: crate::mcp::P2PMCPMessageType::Request,
1185 message_id: uuid::Uuid::new_v4().to_string(),
1186 source_peer: self.peer_id.clone(),
1187 target_peer: Some(peer_id.clone()),
1188 timestamp: SystemTime::now()
1189 .duration_since(std::time::UNIX_EPOCH)
1190 .map_err(|e| P2PError::Network(format!("Time error: {}", e)))?
1191 .as_secs(),
1192 payload: request_message,
1193 ttl: 5,
1194 };
1195
1196 let message_data = serde_json::to_vec(&p2p_message)
1198 .map_err(|e| P2PError::Serialization(e))?;
1199
1200 self.send_message(peer_id, MCP_PROTOCOL, message_data).await?;
1202
1203 self.list_mcp_tools().await
1206 } else {
1207 Err(P2PError::MCP("MCP server not enabled".to_string()))
1208 }
1209 }
1210
1211 pub async fn mcp_stats(&self) -> Result<crate::mcp::MCPServerStats> {
1213 if let Some(ref mcp_server) = self.mcp_server {
1214 Ok(mcp_server.get_stats().await)
1215 } else {
1216 Err(P2PError::MCP("MCP server not enabled".to_string()))
1217 }
1218 }
1219
1220 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1222 if let Some(ref resource_manager) = self.resource_manager {
1223 Ok(resource_manager.get_metrics().await)
1224 } else {
1225 Err(P2PError::Network("Production resource manager not enabled".to_string()))
1226 }
1227 }
1228
1229 pub async fn health_check(&self) -> Result<()> {
1231 if let Some(ref resource_manager) = self.resource_manager {
1232 resource_manager.health_check().await
1233 } else {
1234 let peer_count = self.peer_count().await;
1236 if peer_count > self.config.max_connections {
1237 Err(P2PError::Network(format!("Too many connections: {}", peer_count)))
1238 } else {
1239 Ok(())
1240 }
1241 }
1242 }
1243
1244 pub fn production_config(&self) -> Option<&ProductionConfig> {
1246 self.config.production_config.as_ref()
1247 }
1248
1249 pub fn is_production_mode(&self) -> bool {
1251 self.resource_manager.is_some()
1252 }
1253
1254 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1256 self.dht.as_ref()
1257 }
1258
1259 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1261 if let Some(ref dht) = self.dht {
1262 let dht_instance = dht.write().await;
1263 dht_instance.put(key.clone(), value.clone()).await
1264 .map_err(|e| P2PError::DHT(format!("DHT put failed: {}", e)))?;
1265
1266 Ok(())
1267 } else {
1268 Err(P2PError::DHT("DHT not enabled".to_string()))
1269 }
1270 }
1271
1272 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1274 if let Some(ref dht) = self.dht {
1275 let dht_instance = dht.write().await;
1276 let record_result = dht_instance.get(&key).await;
1277
1278 let value = record_result.as_ref().map(|record| record.value.clone());
1279
1280 Ok(value)
1281 } else {
1282 Err(P2PError::DHT("DHT not enabled".to_string()))
1283 }
1284 }
1285
1286 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1288 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1289 let mut manager = bootstrap_manager.write().await;
1290 let contact = ContactEntry::new(peer_id, addresses);
1291 manager.add_contact(contact).await
1292 .map_err(|e| P2PError::Network(format!("Failed to add peer to bootstrap cache: {}", e)))?;
1293 }
1294 Ok(())
1295 }
1296
1297 pub async fn update_peer_metrics(&self, peer_id: &PeerId, success: bool, latency_ms: Option<u64>, _error: Option<String>) -> Result<()> {
1299 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1300 let mut manager = bootstrap_manager.write().await;
1301
1302 let metrics = QualityMetrics {
1304 success_rate: if success { 1.0 } else { 0.0 },
1305 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1306 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1308 last_successful_connection: if success { chrono::Utc::now() } else { chrono::Utc::now() - chrono::Duration::hours(1) },
1309 uptime_score: 0.5,
1310 };
1311
1312 manager.update_contact_metrics(peer_id, metrics).await
1313 .map_err(|e| P2PError::Network(format!("Failed to update peer metrics: {}", e)))?;
1314 }
1315 Ok(())
1316 }
1317
1318 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1320 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1321 let manager = bootstrap_manager.read().await;
1322 let stats = manager.get_stats().await
1323 .map_err(|e| P2PError::Network(format!("Failed to get bootstrap stats: {}", e)))?;
1324 Ok(Some(stats))
1325 } else {
1326 Ok(None)
1327 }
1328 }
1329
1330 pub async fn cached_peer_count(&self) -> usize {
1332 if let Some(ref _bootstrap_manager) = self.bootstrap_manager {
1333 if let Ok(stats) = self.get_bootstrap_cache_stats().await {
1334 if let Some(stats) = stats {
1335 return stats.total_contacts;
1336 }
1337 }
1338 }
1339 0
1340 }
1341
1342 async fn connect_bootstrap_peers(&self) -> Result<()> {
1344 let mut bootstrap_contacts = Vec::new();
1345 let mut used_cache = false;
1346
1347 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1349 let manager = bootstrap_manager.read().await;
1350 match manager.get_bootstrap_peers(20).await { Ok(contacts) => {
1352 if !contacts.is_empty() {
1353 info!("Using {} cached bootstrap peers", contacts.len());
1354 bootstrap_contacts = contacts;
1355 used_cache = true;
1356 }
1357 }
1358 Err(e) => {
1359 warn!("Failed to get cached bootstrap peers: {}", e);
1360 }
1361 }
1362 }
1363
1364 if bootstrap_contacts.is_empty() {
1366 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1367 &self.config.bootstrap_peers_str
1368 } else {
1369 &self.config.bootstrap_peers.iter().map(|addr| addr.to_string()).collect::<Vec<_>>()
1371 };
1372
1373 if bootstrap_peers.is_empty() {
1374 info!("No bootstrap peers configured and no cached peers available");
1375 return Ok(());
1376 }
1377
1378 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1379
1380 for addr in bootstrap_peers {
1381 let contact = ContactEntry::new(
1382 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1383 vec![addr.clone()]
1384 );
1385 bootstrap_contacts.push(contact);
1386 }
1387 }
1388
1389 let mut successful_connections = 0;
1391 for contact in bootstrap_contacts {
1392 for addr in &contact.addresses {
1393 match self.connect_peer(addr).await {
1394 Ok(peer_id) => {
1395 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1396 successful_connections += 1;
1397
1398 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1400 let mut manager = bootstrap_manager.write().await;
1401 let mut updated_contact = contact.clone();
1402 updated_contact.peer_id = peer_id.clone();
1403 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1406 warn!("Failed to update bootstrap cache: {}", e);
1407 }
1408 }
1409 break; }
1411 Err(e) => {
1412 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1413
1414 if used_cache {
1416 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1417 let mut manager = bootstrap_manager.write().await;
1418 let mut updated_contact = contact.clone();
1419 updated_contact.update_connection_result(false, None, Some(e.to_string()));
1420
1421 if let Err(e) = manager.add_contact(updated_contact).await {
1422 warn!("Failed to update bootstrap cache: {}", e);
1423 }
1424 }
1425 }
1426 }
1427 }
1428 }
1429 }
1430
1431 if successful_connections == 0 && !used_cache {
1432 warn!("Failed to connect to any bootstrap peers");
1433 } else {
1434 info!("Successfully connected to {} bootstrap peers", successful_connections);
1435 }
1436
1437 Ok(())
1438 }
1439
1440 async fn disconnect_all_peers(&self) -> Result<()> {
1442 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1443
1444 for peer_id in peer_ids {
1445 self.disconnect_peer(&peer_id).await?;
1446 }
1447
1448 Ok(())
1449 }
1450
1451 async fn periodic_tasks(&self) -> Result<()> {
1453 Ok(())
1459 }
1460}
1461
1462pub struct NodeBuilder {
1464 config: NodeConfig,
1465}
1466
1467impl NodeBuilder {
1468 pub fn new() -> Self {
1470 Self {
1471 config: NodeConfig::default(),
1472 }
1473 }
1474
1475 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1477 self.config.peer_id = Some(peer_id);
1478 self
1479 }
1480
1481 pub fn listen_on(mut self, addr: &str) -> Self {
1483 self.config.listen_addrs.push(addr.to_string());
1484 self
1485 }
1486
1487 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1489 self.config.bootstrap_peers.push(addr.to_string());
1490 self
1491 }
1492
1493 pub fn with_ipv6(mut self, enable: bool) -> Self {
1495 self.config.enable_ipv6 = enable;
1496 self
1497 }
1498
1499 pub fn with_mcp_server(mut self) -> Self {
1501 self.config.enable_mcp_server = true;
1502 self
1503 }
1504
1505 pub fn with_mcp_config(mut self, mcp_config: MCPServerConfig) -> Self {
1507 self.config.mcp_server_config = Some(mcp_config);
1508 self.config.enable_mcp_server = true;
1509 self
1510 }
1511
1512 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1514 self.config.connection_timeout = timeout;
1515 self
1516 }
1517
1518 pub fn with_max_connections(mut self, max: usize) -> Self {
1520 self.config.max_connections = max;
1521 self
1522 }
1523
1524 pub fn with_production_mode(mut self) -> Self {
1526 self.config.production_config = Some(ProductionConfig::default());
1527 self
1528 }
1529
1530 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1532 self.config.production_config = Some(production_config);
1533 self
1534 }
1535
1536 pub async fn build(self) -> Result<P2PNode> {
1538 P2PNode::new(self.config).await
1539 }
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544 use super::*;
1545 use crate::mcp::{Tool, MCPTool, ToolHandler, ToolMetadata, ToolHealthStatus, ToolRequirements};
1546 use serde_json::json;
1547 use std::pin::Pin;
1548 use std::future::Future;
1549 use std::time::Duration;
1550 use tokio::time::timeout;
1551
1552 struct NetworkTestTool {
1554 name: String,
1555 }
1556
1557 impl NetworkTestTool {
1558 fn new(name: &str) -> Self {
1559 Self {
1560 name: name.to_string(),
1561 }
1562 }
1563 }
1564
1565 impl ToolHandler for NetworkTestTool {
1566 fn execute(&self, arguments: serde_json::Value) -> Pin<Box<dyn Future<Output = Result<serde_json::Value>> + Send + '_>> {
1567 let name = self.name.clone();
1568 Box::pin(async move {
1569 Ok(json!({
1570 "tool": name,
1571 "input": arguments,
1572 "result": "network test success"
1573 }))
1574 })
1575 }
1576
1577 fn validate(&self, _arguments: &serde_json::Value) -> Result<()> {
1578 Ok(())
1579 }
1580
1581 fn get_requirements(&self) -> ToolRequirements {
1582 ToolRequirements::default()
1583 }
1584 }
1585
1586 fn create_test_node_config() -> NodeConfig {
1588 NodeConfig {
1589 peer_id: Some("test_peer_123".to_string()),
1590 listen_addrs: vec![
1591 "/ip6/::1/tcp/9001".to_string(),
1592 "/ip4/127.0.0.1/tcp/9001".to_string(),
1593 ],
1594 listen_addr: "127.0.0.1:9001".parse().unwrap(),
1595 bootstrap_peers: vec![],
1596 bootstrap_peers_str: vec![],
1597 enable_ipv6: true,
1598 enable_mcp_server: true,
1599 mcp_server_config: Some(MCPServerConfig {
1600 enable_auth: false, enable_rate_limiting: false, ..Default::default()
1603 }),
1604 connection_timeout: Duration::from_secs(10),
1605 keep_alive_interval: Duration::from_secs(30),
1606 max_connections: 100,
1607 max_incoming_connections: 50,
1608 dht_config: DHTConfig::default(),
1609 security_config: SecurityConfig::default(),
1610 production_config: None,
1611 bootstrap_cache_config: None,
1612 identity_config: None,
1613 }
1614 }
1615
1616 fn create_test_tool(name: &str) -> Tool {
1618 Tool {
1619 definition: MCPTool {
1620 name: name.to_string(),
1621 description: format!("Test tool: {}", name),
1622 input_schema: json!({
1623 "type": "object",
1624 "properties": {
1625 "input": { "type": "string" }
1626 }
1627 }),
1628 },
1629 handler: Box::new(NetworkTestTool::new(name)),
1630 metadata: ToolMetadata {
1631 created_at: SystemTime::now(),
1632 last_called: None,
1633 call_count: 0,
1634 avg_execution_time: Duration::from_millis(0),
1635 health_status: ToolHealthStatus::Healthy,
1636 tags: vec!["test".to_string()],
1637 },
1638 }
1639 }
1640
1641 #[tokio::test]
1642 async fn test_node_config_default() {
1643 let config = NodeConfig::default();
1644
1645 assert!(config.peer_id.is_none());
1646 assert_eq!(config.listen_addrs.len(), 2);
1647 assert!(config.enable_ipv6);
1648 assert!(config.enable_mcp_server);
1649 assert_eq!(config.max_connections, 1000);
1650 assert_eq!(config.max_incoming_connections, 100);
1651 assert_eq!(config.connection_timeout, Duration::from_secs(30));
1652 }
1653
1654 #[tokio::test]
1655 async fn test_dht_config_default() {
1656 let config = DHTConfig::default();
1657
1658 assert_eq!(config.k_value, 20);
1659 assert_eq!(config.alpha_value, 5);
1660 assert_eq!(config.record_ttl, Duration::from_secs(3600));
1661 assert_eq!(config.refresh_interval, Duration::from_secs(600));
1662 }
1663
1664 #[tokio::test]
1665 async fn test_security_config_default() {
1666 let config = SecurityConfig::default();
1667
1668 assert!(config.enable_noise);
1669 assert!(config.enable_tls);
1670 assert_eq!(config.trust_level, TrustLevel::Basic);
1671 }
1672
1673 #[test]
1674 fn test_trust_level_variants() {
1675 let _none = TrustLevel::None;
1677 let _basic = TrustLevel::Basic;
1678 let _full = TrustLevel::Full;
1679
1680 assert_eq!(TrustLevel::None, TrustLevel::None);
1682 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
1683 assert_eq!(TrustLevel::Full, TrustLevel::Full);
1684 assert_ne!(TrustLevel::None, TrustLevel::Basic);
1685 }
1686
1687 #[test]
1688 fn test_connection_status_variants() {
1689 let connecting = ConnectionStatus::Connecting;
1690 let connected = ConnectionStatus::Connected;
1691 let disconnecting = ConnectionStatus::Disconnecting;
1692 let disconnected = ConnectionStatus::Disconnected;
1693 let failed = ConnectionStatus::Failed("test error".to_string());
1694
1695 assert_eq!(connecting, ConnectionStatus::Connecting);
1696 assert_eq!(connected, ConnectionStatus::Connected);
1697 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
1698 assert_eq!(disconnected, ConnectionStatus::Disconnected);
1699 assert_ne!(connecting, connected);
1700
1701 if let ConnectionStatus::Failed(msg) = failed {
1702 assert_eq!(msg, "test error");
1703 } else {
1704 panic!("Expected Failed status");
1705 }
1706 }
1707
1708 #[tokio::test]
1709 async fn test_node_creation() -> Result<()> {
1710 let config = create_test_node_config();
1711 let node = P2PNode::new(config).await?;
1712
1713 assert_eq!(node.peer_id(), "test_peer_123");
1714 assert!(!node.is_running().await);
1715 assert_eq!(node.peer_count().await, 0);
1716 assert!(node.connected_peers().await.is_empty());
1717
1718 Ok(())
1719 }
1720
1721 #[tokio::test]
1722 async fn test_node_creation_without_peer_id() -> Result<()> {
1723 let mut config = create_test_node_config();
1724 config.peer_id = None;
1725
1726 let node = P2PNode::new(config).await?;
1727
1728 assert!(node.peer_id().starts_with("peer_"));
1730 assert!(!node.is_running().await);
1731
1732 Ok(())
1733 }
1734
1735 #[tokio::test]
1736 async fn test_node_lifecycle() -> Result<()> {
1737 let config = create_test_node_config();
1738 let node = P2PNode::new(config).await?;
1739
1740 assert!(!node.is_running().await);
1742
1743 node.start().await?;
1745 assert!(node.is_running().await);
1746
1747 let listen_addrs = node.listen_addrs().await;
1749 assert_eq!(listen_addrs.len(), 2);
1750
1751 node.stop().await?;
1753 assert!(!node.is_running().await);
1754
1755 Ok(())
1756 }
1757
1758 #[tokio::test]
1759 async fn test_peer_connection() -> Result<()> {
1760 let config = create_test_node_config();
1761 let node = P2PNode::new(config).await?;
1762
1763 let peer_addr = "/ip4/127.0.0.1/tcp/9002".to_string();
1764
1765 let peer_id = node.connect_peer(&peer_addr).await?;
1767 assert!(peer_id.starts_with("peer_from_"));
1768
1769 assert_eq!(node.peer_count().await, 1);
1771
1772 let connected_peers = node.connected_peers().await;
1774 assert_eq!(connected_peers.len(), 1);
1775 assert_eq!(connected_peers[0], peer_id);
1776
1777 let peer_info = node.peer_info(&peer_id).await;
1779 assert!(peer_info.is_some());
1780 let info = peer_info.unwrap();
1781 assert_eq!(info.peer_id, peer_id);
1782 assert_eq!(info.status, ConnectionStatus::Connected);
1783 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
1784
1785 node.disconnect_peer(&peer_id).await?;
1787 assert_eq!(node.peer_count().await, 0);
1788
1789 Ok(())
1790 }
1791
1792 #[tokio::test]
1793 async fn test_event_subscription() -> Result<()> {
1794 let config = create_test_node_config();
1795 let node = P2PNode::new(config).await?;
1796
1797 let mut events = node.subscribe_events();
1798 let peer_addr = "/ip4/127.0.0.1/tcp/9003".to_string();
1799
1800 let peer_id = node.connect_peer(&peer_addr).await?;
1802
1803 let event = timeout(Duration::from_millis(100), events.recv()).await;
1805 assert!(event.is_ok());
1806
1807 match event.unwrap().unwrap() {
1808 P2PEvent::PeerConnected(event_peer_id) => {
1809 assert_eq!(event_peer_id, peer_id);
1810 }
1811 _ => panic!("Expected PeerConnected event"),
1812 }
1813
1814 node.disconnect_peer(&peer_id).await?;
1816
1817 let event = timeout(Duration::from_millis(100), events.recv()).await;
1819 assert!(event.is_ok());
1820
1821 match event.unwrap().unwrap() {
1822 P2PEvent::PeerDisconnected(event_peer_id) => {
1823 assert_eq!(event_peer_id, peer_id);
1824 }
1825 _ => panic!("Expected PeerDisconnected event"),
1826 }
1827
1828 Ok(())
1829 }
1830
1831 #[tokio::test]
1832 async fn test_message_sending() -> Result<()> {
1833 let config = create_test_node_config();
1834 let node = P2PNode::new(config).await?;
1835
1836 let peer_addr = "/ip4/127.0.0.1/tcp/9004".to_string();
1837 let peer_id = node.connect_peer(&peer_addr).await?;
1838
1839 let message_data = b"Hello, peer!".to_vec();
1841 let result = node.send_message(&peer_id, "test-protocol", message_data).await;
1842 assert!(result.is_ok());
1843
1844 let non_existent_peer = "non_existent_peer".to_string();
1846 let result = node.send_message(&non_existent_peer, "test-protocol", vec![]).await;
1847 assert!(result.is_err());
1848 assert!(result.unwrap_err().to_string().contains("not connected"));
1849
1850 Ok(())
1851 }
1852
1853 #[tokio::test]
1854 async fn test_mcp_integration() -> Result<()> {
1855 let config = create_test_node_config();
1856 let node = P2PNode::new(config).await?;
1857
1858 node.start().await?;
1860
1861 let tool = create_test_tool("network_test_tool");
1863 node.register_mcp_tool(tool).await?;
1864
1865 let tools = node.list_mcp_tools().await?;
1867 assert!(tools.contains(&"network_test_tool".to_string()));
1868
1869 let arguments = json!({"input": "test_input"});
1871 let result = node.call_mcp_tool("network_test_tool", arguments.clone()).await?;
1872 assert_eq!(result["tool"], "network_test_tool");
1873 assert_eq!(result["input"], arguments);
1874
1875 let stats = node.mcp_stats().await?;
1877 assert_eq!(stats.total_tools, 1);
1878
1879 let result = node.call_mcp_tool("non_existent_tool", json!({})).await;
1881 assert!(result.is_err());
1882
1883 node.stop().await?;
1884 Ok(())
1885 }
1886
1887 #[tokio::test]
1888 async fn test_remote_mcp_operations() -> Result<()> {
1889 let config = create_test_node_config();
1890 let node = P2PNode::new(config).await?;
1891
1892 node.start().await?;
1893
1894 let tool = create_test_tool("remote_test_tool");
1896 node.register_mcp_tool(tool).await?;
1897
1898 let peer_addr = "/ip4/127.0.0.1/tcp/9005".to_string();
1899 let peer_id = node.connect_peer(&peer_addr).await?;
1900
1901 let remote_tools = node.list_remote_mcp_tools(&peer_id).await?;
1903 assert!(!remote_tools.is_empty());
1904
1905 let arguments = json!({"input": "remote_test"});
1907 let result = node.call_remote_mcp_tool(&peer_id, "remote_test_tool", arguments.clone()).await?;
1908 assert_eq!(result["tool"], "remote_test_tool");
1909
1910 let services = node.discover_remote_mcp_services().await?;
1912 assert!(services.is_empty());
1914
1915 node.stop().await?;
1916 Ok(())
1917 }
1918
1919 #[tokio::test]
1920 async fn test_health_check() -> Result<()> {
1921 let config = create_test_node_config();
1922 let node = P2PNode::new(config).await?;
1923
1924 let result = node.health_check().await;
1926 assert!(result.is_ok());
1927
1928 for i in 0..5 {
1930 let addr = format!("/ip4/127.0.0.1/tcp/{}", 9010 + i);
1931 node.connect_peer(&addr).await?;
1932 }
1933
1934 let result = node.health_check().await;
1936 assert!(result.is_ok());
1937
1938 Ok(())
1939 }
1940
1941 #[tokio::test]
1942 async fn test_node_uptime() -> Result<()> {
1943 let config = create_test_node_config();
1944 let node = P2PNode::new(config).await?;
1945
1946 let uptime1 = node.uptime();
1947 assert!(uptime1 >= Duration::from_secs(0));
1948
1949 tokio::time::sleep(Duration::from_millis(10)).await;
1951
1952 let uptime2 = node.uptime();
1953 assert!(uptime2 > uptime1);
1954
1955 Ok(())
1956 }
1957
1958 #[tokio::test]
1959 async fn test_node_config_access() -> Result<()> {
1960 let config = create_test_node_config();
1961 let expected_peer_id = config.peer_id.clone();
1962 let node = P2PNode::new(config).await?;
1963
1964 let node_config = node.config();
1965 assert_eq!(node_config.peer_id, expected_peer_id);
1966 assert_eq!(node_config.max_connections, 100);
1967 assert!(node_config.enable_mcp_server);
1968
1969 Ok(())
1970 }
1971
1972 #[tokio::test]
1973 async fn test_mcp_server_access() -> Result<()> {
1974 let config = create_test_node_config();
1975 let node = P2PNode::new(config).await?;
1976
1977 assert!(node.mcp_server().is_some());
1979
1980 let mut config = create_test_node_config();
1982 config.enable_mcp_server = false;
1983 let node_no_mcp = P2PNode::new(config).await?;
1984 assert!(node_no_mcp.mcp_server().is_none());
1985
1986 Ok(())
1987 }
1988
1989 #[tokio::test]
1990 async fn test_dht_access() -> Result<()> {
1991 let config = create_test_node_config();
1992 let node = P2PNode::new(config).await?;
1993
1994 assert!(node.dht().is_some());
1996
1997 Ok(())
1998 }
1999
2000 #[tokio::test]
2001 async fn test_node_builder() -> Result<()> {
2002 let node = P2PNode::builder()
2003 .with_peer_id("builder_test_peer".to_string())
2004 .listen_on("/ip4/127.0.0.1/tcp/9100")
2005 .listen_on("/ip6/::1/tcp/9100")
2006 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9101")
2007 .with_ipv6(true)
2008 .with_mcp_server()
2009 .with_connection_timeout(Duration::from_secs(15))
2010 .with_max_connections(200)
2011 .build()
2012 .await?;
2013
2014 assert_eq!(node.peer_id(), "builder_test_peer");
2015 let config = node.config();
2016 assert_eq!(config.listen_addrs.len(), 4); assert_eq!(config.bootstrap_peers.len(), 1);
2018 assert!(config.enable_ipv6);
2019 assert!(config.enable_mcp_server);
2020 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2021 assert_eq!(config.max_connections, 200);
2022
2023 Ok(())
2024 }
2025
2026 #[tokio::test]
2027 async fn test_node_builder_with_mcp_config() -> Result<()> {
2028 let mcp_config = MCPServerConfig {
2029 server_name: "test_mcp_server".to_string(),
2030 server_version: "1.0.0".to_string(),
2031 enable_dht_discovery: false,
2032 enable_auth: false,
2033 ..MCPServerConfig::default()
2034 };
2035
2036 let node = P2PNode::builder()
2037 .with_peer_id("mcp_config_test".to_string())
2038 .with_mcp_config(mcp_config.clone())
2039 .build()
2040 .await?;
2041
2042 assert_eq!(node.peer_id(), "mcp_config_test");
2043 let config = node.config();
2044 assert!(config.enable_mcp_server);
2045 assert!(config.mcp_server_config.is_some());
2046
2047 let node_mcp_config = config.mcp_server_config.as_ref().unwrap();
2048 assert_eq!(node_mcp_config.server_name, "test_mcp_server");
2049 assert!(!node_mcp_config.enable_auth);
2050
2051 Ok(())
2052 }
2053
2054 #[tokio::test]
2055 async fn test_mcp_server_not_enabled_errors() -> Result<()> {
2056 let mut config = create_test_node_config();
2057 config.enable_mcp_server = false;
2058 let node = P2PNode::new(config).await?;
2059
2060 let tool = create_test_tool("test_tool");
2062 let result = node.register_mcp_tool(tool).await;
2063 assert!(result.is_err());
2064 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2065
2066 let result = node.call_mcp_tool("test_tool", json!({})).await;
2067 assert!(result.is_err());
2068 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2069
2070 let result = node.list_mcp_tools().await;
2071 assert!(result.is_err());
2072 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2073
2074 let result = node.mcp_stats().await;
2075 assert!(result.is_err());
2076 assert!(result.unwrap_err().to_string().contains("MCP server not enabled"));
2077
2078 Ok(())
2079 }
2080
2081 #[tokio::test]
2082 async fn test_bootstrap_peers() -> Result<()> {
2083 let mut config = create_test_node_config();
2084 config.bootstrap_peers = vec![
2085 "/ip4/127.0.0.1/tcp/9200".to_string(),
2086 "/ip4/127.0.0.1/tcp/9201".to_string(),
2087 ];
2088
2089 let node = P2PNode::new(config).await?;
2090
2091 node.start().await?;
2093
2094 let peer_count = node.peer_count().await;
2097 assert!(peer_count <= 2, "Peer count should not exceed bootstrap peer count");
2098
2099 node.stop().await?;
2100 Ok(())
2101 }
2102
2103 #[tokio::test]
2104 async fn test_production_mode_disabled() -> Result<()> {
2105 let config = create_test_node_config();
2106 let node = P2PNode::new(config).await?;
2107
2108 assert!(!node.is_production_mode());
2109 assert!(node.production_config().is_none());
2110
2111 let result = node.resource_metrics().await;
2113 assert!(result.is_err());
2114 assert!(result.unwrap_err().to_string().contains("not enabled"));
2115
2116 Ok(())
2117 }
2118
2119 #[tokio::test]
2120 async fn test_network_event_variants() {
2121 let peer_id = "test_peer".to_string();
2123 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2124
2125 let _peer_connected = NetworkEvent::PeerConnected {
2126 peer_id: peer_id.clone(),
2127 addresses: vec![address.clone()],
2128 };
2129
2130 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2131 peer_id: peer_id.clone(),
2132 reason: "test disconnect".to_string(),
2133 };
2134
2135 let _message_received = NetworkEvent::MessageReceived {
2136 peer_id: peer_id.clone(),
2137 protocol: "test-protocol".to_string(),
2138 data: vec![1, 2, 3],
2139 };
2140
2141 let _connection_failed = NetworkEvent::ConnectionFailed {
2142 peer_id: Some(peer_id.clone()),
2143 address: address.clone(),
2144 error: "connection refused".to_string(),
2145 };
2146
2147 let _dht_stored = NetworkEvent::DHTRecordStored {
2148 key: vec![1, 2, 3],
2149 value: vec![4, 5, 6],
2150 };
2151
2152 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2153 key: vec![1, 2, 3],
2154 value: Some(vec![4, 5, 6]),
2155 };
2156 }
2157
2158 #[tokio::test]
2159 async fn test_peer_info_structure() {
2160 let peer_info = PeerInfo {
2161 peer_id: "test_peer".to_string(),
2162 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2163 connected_at: Instant::now(),
2164 last_seen: Instant::now(),
2165 status: ConnectionStatus::Connected,
2166 protocols: vec!["test-protocol".to_string()],
2167 };
2168
2169 assert_eq!(peer_info.peer_id, "test_peer");
2170 assert_eq!(peer_info.addresses.len(), 1);
2171 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2172 assert_eq!(peer_info.protocols.len(), 1);
2173 }
2174
2175 #[tokio::test]
2176 async fn test_serialization() -> Result<()> {
2177 let config = create_test_node_config();
2179 let serialized = serde_json::to_string(&config)?;
2180 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2181
2182 assert_eq!(config.peer_id, deserialized.peer_id);
2183 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2184 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2185
2186 Ok(())
2187 }
2188}