1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{RwLock, broadcast};
37use tokio::time::Instant;
38use tracing::{debug, info, warn};
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeConfig {
43 pub peer_id: Option<PeerId>,
45
46 pub listen_addrs: Vec<std::net::SocketAddr>,
48
49 pub listen_addr: std::net::SocketAddr,
51
52 pub bootstrap_peers: Vec<std::net::SocketAddr>,
54
55 pub bootstrap_peers_str: Vec<String>,
57
58 pub enable_ipv6: bool,
60
61 pub connection_timeout: Duration,
64
65 pub keep_alive_interval: Duration,
67
68 pub max_connections: usize,
70
71 pub max_incoming_connections: usize,
73
74 pub dht_config: DHTConfig,
76
77 pub security_config: SecurityConfig,
79
80 pub production_config: Option<ProductionConfig>,
82
83 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DHTConfig {
90 pub k_value: usize,
92
93 pub alpha_value: usize,
95
96 pub record_ttl: Duration,
98
99 pub refresh_interval: Duration,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106 pub enable_noise: bool,
108
109 pub enable_tls: bool,
111
112 pub trust_level: TrustLevel,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum TrustLevel {
119 None,
121 Basic,
123 Full,
125}
126
127impl NodeConfig {
128 pub fn new() -> Result<Self> {
134 let config = Config::default();
136
137 let listen_addr = config.listen_socket_addr()?;
139
140 let mut listen_addrs = vec![];
142
143 if config.network.ipv6_enabled {
145 let ipv6_addr = std::net::SocketAddr::new(
146 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
147 listen_addr.port(),
148 );
149 listen_addrs.push(ipv6_addr);
150 }
151
152 let ipv4_addr = std::net::SocketAddr::new(
154 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
155 listen_addr.port(),
156 );
157 listen_addrs.push(ipv4_addr);
158
159 Ok(Self {
160 peer_id: None,
161 listen_addrs,
162 listen_addr,
163 bootstrap_peers: Vec::new(),
164 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
165 enable_ipv6: config.network.ipv6_enabled,
166
167 connection_timeout: Duration::from_secs(config.network.connection_timeout),
168 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
169 max_connections: config.network.max_connections,
170 max_incoming_connections: config.security.connection_limit as usize,
171 dht_config: DHTConfig::default(),
172 security_config: SecurityConfig::default(),
173 production_config: None,
174 bootstrap_cache_config: None,
175 })
177 }
178}
179
180impl Default for NodeConfig {
181 fn default() -> Self {
182 let config = Config::default();
184
185 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
187 std::net::SocketAddr::new(
188 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
189 9000,
190 )
191 });
192
193 Self {
194 peer_id: None,
195 listen_addrs: vec![
196 std::net::SocketAddr::new(
197 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
198 listen_addr.port(),
199 ),
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
202 listen_addr.port(),
203 ),
204 ],
205 listen_addr,
206 bootstrap_peers: Vec::new(),
207 bootstrap_peers_str: Vec::new(),
208 enable_ipv6: config.network.ipv6_enabled,
209
210 connection_timeout: Duration::from_secs(config.network.connection_timeout),
211 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
212 max_connections: config.network.max_connections,
213 max_incoming_connections: config.security.connection_limit as usize,
214 dht_config: DHTConfig::default(),
215 security_config: SecurityConfig::default(),
216 production_config: None, bootstrap_cache_config: None,
218 }
220 }
221}
222
223impl NodeConfig {
224 pub fn from_config(config: &Config) -> Result<Self> {
226 let listen_addr = config.listen_socket_addr()?;
227 let bootstrap_addrs = config.bootstrap_addrs()?;
228
229 let mut node_config = Self {
230 peer_id: None,
231 listen_addrs: vec![listen_addr],
232 listen_addr,
233 bootstrap_peers: bootstrap_addrs
234 .iter()
235 .map(|addr| addr.socket_addr())
236 .collect(),
237 bootstrap_peers_str: config
238 .network
239 .bootstrap_nodes
240 .iter()
241 .map(|addr| addr.to_string())
242 .collect(),
243 enable_ipv6: config.network.ipv6_enabled,
244
245 connection_timeout: Duration::from_secs(config.network.connection_timeout),
246 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
247 max_connections: config.network.max_connections,
248 max_incoming_connections: config.security.connection_limit as usize,
249 dht_config: DHTConfig {
250 k_value: 20,
251 alpha_value: 3,
252 record_ttl: Duration::from_secs(3600),
253 refresh_interval: Duration::from_secs(900),
254 },
255 security_config: SecurityConfig {
256 enable_noise: true,
257 enable_tls: true,
258 trust_level: TrustLevel::Basic,
259 },
260 production_config: Some(ProductionConfig {
261 max_connections: config.network.max_connections,
262 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
265 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266 health_check_interval: Duration::from_secs(30),
267 metrics_interval: Duration::from_secs(60),
268 enable_performance_tracking: true,
269 enable_auto_cleanup: true,
270 shutdown_timeout: Duration::from_secs(30),
271 rate_limits: crate::production::RateLimitConfig::default(),
272 }),
273 bootstrap_cache_config: None,
274 };
279
280 if config.network.ipv6_enabled {
282 node_config.listen_addrs.push(std::net::SocketAddr::new(
283 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
284 listen_addr.port(),
285 ));
286 }
287
288 Ok(node_config)
289 }
290
291 pub fn with_listen_addr(addr: &str) -> Result<Self> {
293 let listen_addr: std::net::SocketAddr = addr
294 .parse()
295 .map_err(|e: std::net::AddrParseError| {
296 NetworkError::InvalidAddress(e.to_string().into())
297 })
298 .map_err(P2PError::Network)?;
299 let cfg = NodeConfig {
300 listen_addr,
301 listen_addrs: vec![listen_addr],
302 ..Default::default()
303 };
304 Ok(cfg)
305 }
306}
307
308impl Default for DHTConfig {
309 fn default() -> Self {
310 Self {
311 k_value: 20,
312 alpha_value: 5,
313 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
316 }
317}
318
319impl Default for SecurityConfig {
320 fn default() -> Self {
321 Self {
322 enable_noise: true,
323 enable_tls: true,
324 trust_level: TrustLevel::Basic,
325 }
326 }
327}
328
329#[derive(Debug, Clone)]
331pub struct PeerInfo {
332 pub peer_id: PeerId,
334
335 pub addresses: Vec<String>,
337
338 pub connected_at: Instant,
340
341 pub last_seen: Instant,
343
344 pub status: ConnectionStatus,
346
347 pub protocols: Vec<String>,
349
350 pub heartbeat_count: u64,
352}
353
354#[derive(Debug, Clone, PartialEq)]
356pub enum ConnectionStatus {
357 Connecting,
359 Connected,
361 Disconnecting,
363 Disconnected,
365 Failed(String),
367}
368
369#[derive(Debug, Clone)]
371pub enum NetworkEvent {
372 PeerConnected {
374 peer_id: PeerId,
376 addresses: Vec<String>,
378 },
379
380 PeerDisconnected {
382 peer_id: PeerId,
384 reason: String,
386 },
387
388 MessageReceived {
390 peer_id: PeerId,
392 protocol: String,
394 data: Vec<u8>,
396 },
397
398 ConnectionFailed {
400 peer_id: Option<PeerId>,
402 address: String,
404 error: String,
406 },
407
408 DHTRecordStored {
410 key: Vec<u8>,
412 value: Vec<u8>,
414 },
415
416 DHTRecordRetrieved {
418 key: Vec<u8>,
420 value: Option<Vec<u8>>,
422 },
423}
424
425#[derive(Debug, Clone)]
430pub enum P2PEvent {
431 Message {
433 topic: String,
435 source: PeerId,
437 data: Vec<u8>,
439 },
440 PeerConnected(PeerId),
442 PeerDisconnected(PeerId),
444}
445
446pub struct P2PNode {
456 config: NodeConfig,
458
459 peer_id: PeerId,
461
462 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
464
465 event_tx: broadcast::Sender<P2PEvent>,
467
468 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
470
471 start_time: Instant,
473
474 running: RwLock<bool>,
476
477 dht: Option<Arc<RwLock<DHT>>>,
479
480 resource_manager: Option<Arc<ResourceManager>>,
482
483 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
485
486 dual_node: Arc<DualStackNetworkNode>,
488
489 #[allow(dead_code)]
491 rate_limiter: Arc<RateLimiter>,
492}
493
494impl P2PNode {
495 pub fn new_for_tests() -> Result<Self> {
497 let (event_tx, _) = broadcast::channel(16);
498 Ok(Self {
499 config: NodeConfig::default(),
500 peer_id: "test_peer".to_string(),
501 peers: Arc::new(RwLock::new(HashMap::new())),
502 event_tx,
503 listen_addrs: RwLock::new(Vec::new()),
504 start_time: Instant::now(),
505 running: RwLock::new(false),
506 dht: None,
507 resource_manager: None,
508 bootstrap_manager: None,
509 dual_node: {
510 let v6: Option<std::net::SocketAddr> = "[::1]:0"
512 .parse()
513 .ok()
514 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
515 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
516 let handle = tokio::runtime::Handle::current();
517 let dual_attempt = handle.block_on(
518 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
519 );
520 let dual = match dual_attempt {
521 Ok(d) => d,
522 Err(_e1) => {
523 let fallback = handle.block_on(
525 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
526 None,
527 "127.0.0.1:0".parse().ok(),
528 ),
529 );
530 match fallback {
531 Ok(d) => d,
532 Err(e2) => {
533 return Err(P2PError::Network(NetworkError::BindError(
534 format!("Failed to create dual-stack network node: {}", e2)
535 .into(),
536 )));
537 }
538 }
539 }
540 };
541 Arc::new(dual)
542 },
543 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
544 max_requests: 100,
545 burst_size: 100,
546 window: std::time::Duration::from_secs(1),
547 ..Default::default()
548 })),
549 })
550 }
551 pub async fn new(config: NodeConfig) -> Result<Self> {
553 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
554 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
556 });
557
558 let (event_tx, _) = broadcast::channel(1000);
559
560 {
563 use blake3::Hasher;
564 let mut hasher = Hasher::new();
565 hasher.update(peer_id.as_bytes());
566 let digest = hasher.finalize();
567 let mut nid = [0u8; 32];
568 nid.copy_from_slice(digest.as_bytes());
569 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
570 crate::identity::node_identity::NodeId::from_bytes(nid),
571 ));
572 }
575
576 let dht = if true {
578 let _dht_config = crate::dht::DHTConfig {
580 replication_factor: config.dht_config.k_value,
581 bucket_size: config.dht_config.k_value,
582 alpha: config.dht_config.alpha_value,
583 record_ttl: config.dht_config.record_ttl,
584 bucket_refresh_interval: config.dht_config.refresh_interval,
585 republish_interval: config.dht_config.refresh_interval,
586 max_distance: 160, };
588 let peer_bytes = peer_id.as_bytes();
590 let mut node_id_bytes = [0u8; 32];
591 let len = peer_bytes.len().min(32);
592 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
593 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
594 let dht_instance = DHT::new(node_id).map_err(|e| {
595 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
596 e.to_string().into(),
597 ))
598 })?;
599 Some(Arc::new(RwLock::new(dht_instance)))
600 } else {
601 None
602 };
603
604 let resource_manager = config
608 .production_config
609 .clone()
610 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
611
612 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
614 match BootstrapManager::with_config(cache_config.clone()).await {
615 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
616 Err(e) => {
617 warn!(
618 "Failed to initialize bootstrap manager: {}, continuing without cache",
619 e
620 );
621 None
622 }
623 }
624 } else {
625 match BootstrapManager::new().await {
626 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
627 Err(e) => {
628 warn!(
629 "Failed to initialize bootstrap manager: {}, continuing without cache",
630 e
631 );
632 None
633 }
634 }
635 };
636
637 let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
639 let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
640 let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
641 (v6_addr, v4_addr)
642 } else {
643 let v4_addr = Some(std::net::SocketAddr::new(
645 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
646 config.listen_addr.port(),
647 ));
648 let v6_addr = if config.enable_ipv6 {
649 Some(std::net::SocketAddr::new(
650 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
651 config.listen_addr.port(),
652 ))
653 } else {
654 None
655 };
656 (v6_addr, v4_addr)
657 };
658
659 let dual_node = Arc::new(
660 DualStackNetworkNode::new(v6_opt, v4_opt)
661 .await
662 .map_err(|e| {
663 P2PError::Transport(crate::error::TransportError::SetupFailed(
664 format!("Failed to create dual-stack network nodes: {}", e).into(),
665 ))
666 })?,
667 );
668
669 let rate_limiter = Arc::new(RateLimiter::new(
671 crate::validation::RateLimitConfig::default(),
672 ));
673
674 let node = Self {
675 config,
676 peer_id,
677 peers: Arc::new(RwLock::new(HashMap::new())),
678 event_tx,
679 listen_addrs: RwLock::new(Vec::new()),
680 start_time: Instant::now(),
681 running: RwLock::new(false),
682 dht,
683 resource_manager,
684 bootstrap_manager,
685 dual_node,
686 rate_limiter,
687 };
688 info!("Created P2P node with peer ID: {}", node.peer_id);
689
690 node.start_network_listeners().await?;
692
693 Ok(node)
694 }
695
696 pub fn builder() -> NodeBuilder {
698 NodeBuilder::new()
699 }
700
701 pub fn peer_id(&self) -> &PeerId {
703 &self.peer_id
704 }
705
706 pub fn local_addr(&self) -> Option<String> {
707 self.listen_addrs
708 .try_read()
709 .ok()
710 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
711 }
712
713 pub async fn subscribe(&self, topic: &str) -> Result<()> {
714 info!("Subscribed to topic: {}", topic);
717 Ok(())
718 }
719
720 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
721 info!(
722 "Publishing message to topic: {} ({} bytes)",
723 topic,
724 data.len()
725 );
726
727 let peer_list: Vec<PeerId> = {
729 let peers_guard = self.peers.read().await;
730 peers_guard.keys().cloned().collect()
731 };
732
733 if peer_list.is_empty() {
734 debug!("No peers connected, message will only be sent to local subscribers");
735 } else {
736 let mut send_count = 0;
738 for peer_id in &peer_list {
739 match self.send_message(peer_id, topic, data.to_vec()).await {
740 Ok(_) => {
741 send_count += 1;
742 debug!("Sent message to peer: {}", peer_id);
743 }
744 Err(e) => {
745 warn!("Failed to send message to peer {}: {}", peer_id, e);
746 }
747 }
748 }
749 info!(
750 "Published message to {}/{} connected peers",
751 send_count,
752 peer_list.len()
753 );
754 }
755
756 let event = P2PEvent::Message {
758 topic: topic.to_string(),
759 source: self.peer_id.clone(),
760 data: data.to_vec(),
761 };
762 let _ = self.event_tx.send(event);
763
764 Ok(())
765 }
766
767 pub fn config(&self) -> &NodeConfig {
769 &self.config
770 }
771
772 pub async fn start(&self) -> Result<()> {
774 info!("Starting P2P node...");
775
776 if let Some(ref resource_manager) = self.resource_manager {
778 resource_manager.start().await.map_err(|e| {
779 P2PError::Network(crate::error::NetworkError::ProtocolError(
780 format!("Failed to start resource manager: {e}").into(),
781 ))
782 })?;
783 info!("Production resource manager started");
784 }
785
786 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
788 let mut manager = bootstrap_manager.write().await;
789 manager.start_background_tasks().await.map_err(|e| {
790 P2PError::Network(crate::error::NetworkError::ProtocolError(
791 format!("Failed to start bootstrap manager: {e}").into(),
792 ))
793 })?;
794 info!("Bootstrap cache manager started");
795 }
796
797 *self.running.write().await = true;
799
800 self.start_network_listeners().await?;
802
803 let listen_addrs = self.listen_addrs.read().await;
805 info!("P2P node started on addresses: {:?}", *listen_addrs);
806
807 self.start_message_receiving_system().await?;
811
812 self.connect_bootstrap_peers().await?;
814
815 Ok(())
816 }
817
818 async fn start_network_listeners(&self) -> Result<()> {
820 info!("Starting dual-stack listeners (ant-quic)...");
821 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
823 P2PError::Transport(crate::error::TransportError::SetupFailed(
824 format!("Failed to get local addresses: {}", e).into(),
825 ))
826 })?;
827 {
828 let mut la = self.listen_addrs.write().await;
829 *la = addrs.clone();
830 }
831
832 let event_tx = self.event_tx.clone();
834 let peers = self.peers.clone();
835 let rate_limiter = self.rate_limiter.clone();
836 let dual = self.dual_node.clone();
837 tokio::spawn(async move {
838 loop {
839 match dual.accept_any().await {
840 Ok((ant_peer_id, remote_sock)) => {
841 let peer_id =
842 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
843 let remote_addr = NetworkAddress::from(remote_sock);
844 let _ = rate_limiter.check_ip(&remote_sock.ip());
846 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
847 register_new_peer(&peers, &peer_id, &remote_addr).await;
848 }
849 Err(e) => {
850 warn!("Accept failed: {}", e);
851 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
852 }
853 }
854 }
855 });
856
857 info!("Dual-stack listeners active on: {:?}", addrs);
858 Ok(())
859 }
860
861 #[allow(dead_code)]
863 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
864 warn!("QUIC transport temporarily disabled during ant-quic migration");
903 Err(crate::P2PError::Transport(
905 crate::error::TransportError::SetupFailed(
906 format!(
907 "Failed to start QUIC listener on {addr} - transport disabled during migration"
908 )
909 .into(),
910 ),
911 ))
912 }
913
914 #[allow(dead_code)] async fn start_connection_acceptor(
917 &self,
918 transport: Arc<dyn crate::transport::Transport>,
919 addr: std::net::SocketAddr,
920 transport_type: crate::transport::TransportType,
921 ) -> Result<()> {
922 info!(
923 "Starting connection acceptor for {:?} on {}",
924 transport_type, addr
925 );
926
927 let event_tx = self.event_tx.clone();
929 let _peer_id = self.peer_id.clone();
930 let peers = Arc::clone(&self.peers);
931 let rate_limiter = Arc::clone(&self.rate_limiter);
934
935 tokio::spawn(async move {
937 loop {
938 match transport.accept().await {
939 Ok(connection) => {
940 let remote_addr = connection.remote_addr();
941 let connection_peer_id =
942 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
943
944 let socket_addr = remote_addr.socket_addr();
946 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
947 continue;
949 }
950
951 info!(
952 "Accepted {:?} connection from {} (peer: {})",
953 transport_type, remote_addr, connection_peer_id
954 );
955
956 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
958
959 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
961
962 spawn_connection_handler(
964 connection,
965 connection_peer_id,
966 event_tx.clone(),
967 Arc::clone(&peers),
968 );
969 }
970 Err(e) => {
971 warn!(
972 "Failed to accept {:?} connection on {}: {}",
973 transport_type, addr, e
974 );
975
976 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
978 }
979 }
980 }
981 });
982
983 info!(
984 "Connection acceptor background task started for {:?} on {}",
985 transport_type, addr
986 );
987 Ok(())
988 }
989
990 async fn start_message_receiving_system(&self) -> Result<()> {
992 info!("Starting message receiving system");
993 let dual = self.dual_node.clone();
994 let event_tx = self.event_tx.clone();
995
996 tokio::spawn(async move {
997 loop {
998 match dual.receive_any().await {
999 Ok((_peer_id, bytes)) => {
1000 #[allow(clippy::collapsible_if)]
1002 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1003 if let (Some(protocol), Some(data), Some(from)) = (
1004 value.get("protocol").and_then(|v| v.as_str()),
1005 value.get("data").and_then(|v| v.as_array()),
1006 value.get("from").and_then(|v| v.as_str()),
1007 ) {
1008 let payload: Vec<u8> = data
1009 .iter()
1010 .filter_map(|v| v.as_u64().map(|n| n as u8))
1011 .collect();
1012 let _ = event_tx.send(P2PEvent::Message {
1013 topic: protocol.to_string(),
1014 source: from.to_string(),
1015 data: payload,
1016 });
1017 }
1018 }
1019 }
1020 Err(e) => {
1021 warn!("Receive error: {}", e);
1022 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1023 }
1024 }
1025 }
1026 });
1027
1028 Ok(())
1029 }
1030
1031 #[allow(dead_code)]
1033 async fn handle_received_message(
1034 &self,
1035 message_data: Vec<u8>,
1036 peer_id: &PeerId,
1037 _protocol: &str,
1038 event_tx: &broadcast::Sender<P2PEvent>,
1039 ) -> Result<()> {
1040 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1044 Ok(message) => {
1045 if let (Some(protocol), Some(data), Some(from)) = (
1046 message.get("protocol").and_then(|v| v.as_str()),
1047 message.get("data").and_then(|v| v.as_array()),
1048 message.get("from").and_then(|v| v.as_str()),
1049 ) {
1050 let data_bytes: Vec<u8> = data
1052 .iter()
1053 .filter_map(|v| v.as_u64().map(|n| n as u8))
1054 .collect();
1055
1056 let event = P2PEvent::Message {
1058 topic: protocol.to_string(),
1059 source: from.to_string(),
1060 data: data_bytes,
1061 };
1062
1063 let _ = event_tx.send(event);
1064 debug!("Generated message event from peer: {}", peer_id);
1065 }
1066 }
1067 Err(e) => {
1068 warn!("Failed to parse received message from {}: {}", peer_id, e);
1069 }
1070 }
1071
1072 Ok(())
1073 }
1074
1075 pub async fn run(&self) -> Result<()> {
1081 if !*self.running.read().await {
1082 self.start().await?;
1083 }
1084
1085 info!("P2P node running...");
1086
1087 loop {
1089 if !*self.running.read().await {
1090 break;
1091 }
1092
1093 self.periodic_tasks().await?;
1095
1096 tokio::time::sleep(Duration::from_millis(100)).await;
1098 }
1099
1100 info!("P2P node stopped");
1101 Ok(())
1102 }
1103
1104 pub async fn stop(&self) -> Result<()> {
1106 info!("Stopping P2P node...");
1107
1108 *self.running.write().await = false;
1110
1111 self.disconnect_all_peers().await?;
1113
1114 if let Some(ref resource_manager) = self.resource_manager {
1116 resource_manager.shutdown().await.map_err(|e| {
1117 P2PError::Network(crate::error::NetworkError::ProtocolError(
1118 format!("Failed to shutdown resource manager: {e}").into(),
1119 ))
1120 })?;
1121 info!("Production resource manager stopped");
1122 }
1123
1124 info!("P2P node stopped");
1125 Ok(())
1126 }
1127
1128 pub async fn shutdown(&self) -> Result<()> {
1130 self.stop().await
1131 }
1132
1133 pub async fn is_running(&self) -> bool {
1135 *self.running.read().await
1136 }
1137
1138 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1140 self.listen_addrs.read().await.clone()
1141 }
1142
1143 pub async fn connected_peers(&self) -> Vec<PeerId> {
1145 self.peers.read().await.keys().cloned().collect()
1146 }
1147
1148 pub async fn peer_count(&self) -> usize {
1150 self.peers.read().await.len()
1151 }
1152
1153 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1155 self.peers.read().await.get(peer_id).cloned()
1156 }
1157
1158 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1160 info!("Connecting to peer at: {}", address);
1161
1162 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1164 Some(resource_manager.acquire_connection().await?)
1165 } else {
1166 None
1167 };
1168
1169 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1171 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1172 format!("{}: {}", address, e).into(),
1173 ))
1174 })?;
1175
1176 let addr_list = vec![_socket_addr];
1178 let peer_id = match tokio::time::timeout(
1179 self.config.connection_timeout,
1180 self.dual_node.connect_happy_eyeballs(&addr_list),
1181 )
1182 .await
1183 {
1184 Ok(Ok(peer)) => {
1185 let connected_peer_id =
1186 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1187 info!("Successfully connected to peer: {}", connected_peer_id);
1188 connected_peer_id
1189 }
1190 Ok(Err(e)) => {
1191 warn!("Failed to connect to peer at {}: {}", address, e);
1192 let sanitized_address = address.replace(['/', ':'], "_");
1193 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1194 warn!(
1195 "Using demo peer ID: {} (transport connection failed)",
1196 demo_peer_id
1197 );
1198 demo_peer_id
1199 }
1200 Err(_) => {
1201 warn!(
1202 "Timed out connecting to peer at {} after {:?}",
1203 address, self.config.connection_timeout
1204 );
1205 let sanitized_address = address.replace(['/', ':'], "_");
1206 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1207 demo_peer_id
1208 }
1209 };
1210
1211 let peer_info = PeerInfo {
1213 peer_id: peer_id.clone(),
1214 addresses: vec![address.to_string()],
1215 connected_at: Instant::now(),
1216 last_seen: Instant::now(),
1217 status: ConnectionStatus::Connected,
1218 protocols: vec!["p2p-foundation/1.0".to_string()],
1219 heartbeat_count: 0,
1220 };
1221
1222 self.peers.write().await.insert(peer_id.clone(), peer_info);
1224
1225 if let Some(ref resource_manager) = self.resource_manager {
1227 resource_manager.record_bandwidth(0, 0); }
1229
1230 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1232
1233 info!("Connected to peer: {}", peer_id);
1234 Ok(peer_id)
1235 }
1236
1237 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1239 info!("Disconnecting from peer: {}", peer_id);
1240
1241 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1242 peer_info.status = ConnectionStatus::Disconnected;
1243
1244 let _ = self
1246 .event_tx
1247 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1248
1249 info!("Disconnected from peer: {}", peer_id);
1250 }
1251
1252 Ok(())
1253 }
1254
1255 pub async fn send_message(
1257 &self,
1258 peer_id: &PeerId,
1259 protocol: &str,
1260 data: Vec<u8>,
1261 ) -> Result<()> {
1262 debug!(
1263 "Sending message to peer {} on protocol {}",
1264 peer_id, protocol
1265 );
1266
1267 if let Some(ref resource_manager) = self.resource_manager
1269 && !resource_manager
1270 .check_rate_limit(peer_id, "message")
1271 .await?
1272 {
1273 return Err(P2PError::ResourceExhausted(
1274 format!("Rate limit exceeded for peer {}", peer_id).into(),
1275 ));
1276 }
1277
1278 if !self.peers.read().await.contains_key(peer_id) {
1280 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1281 peer_id.to_string().into(),
1282 )));
1283 }
1284
1285 if let Some(ref resource_manager) = self.resource_manager {
1289 resource_manager.record_bandwidth(data.len() as u64, 0);
1290 }
1291
1292 let _message_data = self.create_protocol_message(protocol, data)?;
1294
1295 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1297 tokio::time::timeout(self.config.connection_timeout, send_fut)
1298 .await
1299 .map_err(|_| {
1300 P2PError::Transport(crate::error::TransportError::StreamError(
1301 "Timed out sending message".into(),
1302 ))
1303 })?
1304 .map_err(|e| {
1305 P2PError::Transport(crate::error::TransportError::StreamError(
1306 e.to_string().into(),
1307 ))
1308 })
1309 }
1310
1311 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1313 use serde_json::json;
1314
1315 let timestamp = std::time::SystemTime::now()
1316 .duration_since(std::time::UNIX_EPOCH)
1317 .map_err(|e| {
1318 P2PError::Network(NetworkError::ProtocolError(
1319 format!("System time error: {}", e).into(),
1320 ))
1321 })?
1322 .as_secs();
1323
1324 let message = json!({
1326 "protocol": protocol,
1327 "data": data,
1328 "from": self.peer_id,
1329 "timestamp": timestamp
1330 });
1331
1332 serde_json::to_vec(&message).map_err(|e| {
1333 P2PError::Transport(crate::error::TransportError::StreamError(
1334 format!("Failed to serialize message: {e}").into(),
1335 ))
1336 })
1337 }
1338
1339 }
1341
1342#[allow(dead_code)]
1344fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1345 use serde_json::json;
1346
1347 let timestamp = std::time::SystemTime::now()
1348 .duration_since(std::time::UNIX_EPOCH)
1349 .map_err(|e| {
1350 P2PError::Network(NetworkError::ProtocolError(
1351 format!("System time error: {}", e).into(),
1352 ))
1353 })?
1354 .as_secs();
1355
1356 let message = json!({
1358 "protocol": protocol,
1359 "data": data,
1360 "timestamp": timestamp
1361 });
1362
1363 serde_json::to_vec(&message).map_err(|e| {
1364 P2PError::Transport(crate::error::TransportError::StreamError(
1365 format!("Failed to serialize message: {e}").into(),
1366 ))
1367 })
1368}
1369
1370impl P2PNode {
1371 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1373 self.event_tx.subscribe()
1374 }
1375
1376 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1378 self.subscribe_events()
1379 }
1380
1381 pub fn uptime(&self) -> Duration {
1383 self.start_time.elapsed()
1384 }
1385
1386 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1396 if let Some(ref resource_manager) = self.resource_manager {
1397 Ok(resource_manager.get_metrics().await)
1398 } else {
1399 Err(P2PError::Network(
1400 crate::error::NetworkError::ProtocolError(
1401 "Production resource manager not enabled".to_string().into(),
1402 ),
1403 ))
1404 }
1405 }
1406
1407 pub async fn health_check(&self) -> Result<()> {
1409 if let Some(ref resource_manager) = self.resource_manager {
1410 resource_manager.health_check().await
1411 } else {
1412 let peer_count = self.peer_count().await;
1414 if peer_count > self.config.max_connections {
1415 Err(P2PError::Network(
1416 crate::error::NetworkError::ProtocolError(
1417 format!("Too many connections: {peer_count}").into(),
1418 ),
1419 ))
1420 } else {
1421 Ok(())
1422 }
1423 }
1424 }
1425
1426 pub fn production_config(&self) -> Option<&ProductionConfig> {
1428 self.config.production_config.as_ref()
1429 }
1430
1431 pub fn is_production_mode(&self) -> bool {
1433 self.resource_manager.is_some()
1434 }
1435
1436 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1438 self.dht.as_ref()
1439 }
1440
1441 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1443 if let Some(ref dht) = self.dht {
1444 let mut dht_instance = dht.write().await;
1445 let dht_key = crate::dht::DhtKey::from_bytes(key);
1446 dht_instance
1447 .store(&dht_key, value.clone())
1448 .await
1449 .map_err(|e| {
1450 P2PError::Dht(crate::error::DhtError::StoreFailed(
1451 format!("{:?}: {e}", key).into(),
1452 ))
1453 })?;
1454
1455 Ok(())
1456 } else {
1457 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1458 "DHT not enabled".to_string().into(),
1459 )))
1460 }
1461 }
1462
1463 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1465 if let Some(ref dht) = self.dht {
1466 let dht_instance = dht.read().await;
1467 let dht_key = crate::dht::DhtKey::from_bytes(key);
1468 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1469 P2PError::Dht(crate::error::DhtError::StoreFailed(
1470 format!("Retrieve failed: {e}").into(),
1471 ))
1472 })?;
1473
1474 Ok(record_result)
1475 } else {
1476 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1477 "DHT not enabled".to_string().into(),
1478 )))
1479 }
1480 }
1481
1482 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1484 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1485 let mut manager = bootstrap_manager.write().await;
1486 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1487 .iter()
1488 .filter_map(|addr| addr.parse().ok())
1489 .collect();
1490 let contact = ContactEntry::new(peer_id, socket_addresses);
1491 manager.add_contact(contact).await.map_err(|e| {
1492 P2PError::Network(crate::error::NetworkError::ProtocolError(
1493 format!("Failed to add peer to bootstrap cache: {e}").into(),
1494 ))
1495 })?;
1496 }
1497 Ok(())
1498 }
1499
1500 pub async fn update_peer_metrics(
1502 &self,
1503 peer_id: &PeerId,
1504 success: bool,
1505 latency_ms: Option<u64>,
1506 _error: Option<String>,
1507 ) -> Result<()> {
1508 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1509 let mut manager = bootstrap_manager.write().await;
1510
1511 let metrics = QualityMetrics {
1513 success_rate: if success { 1.0 } else { 0.0 },
1514 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1515 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1517 last_successful_connection: if success {
1518 chrono::Utc::now()
1519 } else {
1520 chrono::Utc::now() - chrono::Duration::hours(1)
1521 },
1522 uptime_score: 0.5,
1523 };
1524
1525 manager
1526 .update_contact_metrics(peer_id, metrics)
1527 .await
1528 .map_err(|e| {
1529 P2PError::Network(crate::error::NetworkError::ProtocolError(
1530 format!("Failed to update peer metrics: {e}").into(),
1531 ))
1532 })?;
1533 }
1534 Ok(())
1535 }
1536
1537 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1539 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1540 let manager = bootstrap_manager.read().await;
1541 let stats = manager.get_stats().await.map_err(|e| {
1542 P2PError::Network(crate::error::NetworkError::ProtocolError(
1543 format!("Failed to get bootstrap stats: {e}").into(),
1544 ))
1545 })?;
1546 Ok(Some(stats))
1547 } else {
1548 Ok(None)
1549 }
1550 }
1551
1552 pub async fn cached_peer_count(&self) -> usize {
1554 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1555 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1556 {
1557 return stats.total_contacts;
1558 }
1559 0
1560 }
1561
1562 async fn connect_bootstrap_peers(&self) -> Result<()> {
1564 let mut bootstrap_contacts = Vec::new();
1565 let mut used_cache = false;
1566
1567 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1569 let manager = bootstrap_manager.read().await;
1570 match manager.get_bootstrap_peers(20).await {
1571 Ok(contacts) => {
1573 if !contacts.is_empty() {
1574 info!("Using {} cached bootstrap peers", contacts.len());
1575 bootstrap_contacts = contacts;
1576 used_cache = true;
1577 }
1578 }
1579 Err(e) => {
1580 warn!("Failed to get cached bootstrap peers: {}", e);
1581 }
1582 }
1583 }
1584
1585 if bootstrap_contacts.is_empty() {
1587 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1588 &self.config.bootstrap_peers_str
1589 } else {
1590 &self
1592 .config
1593 .bootstrap_peers
1594 .iter()
1595 .map(|addr| addr.to_string())
1596 .collect::<Vec<_>>()
1597 };
1598
1599 if bootstrap_peers.is_empty() {
1600 info!("No bootstrap peers configured and no cached peers available");
1601 return Ok(());
1602 }
1603
1604 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1605
1606 for addr in bootstrap_peers {
1607 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1608 let contact = ContactEntry::new(
1609 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1610 vec![socket_addr],
1611 );
1612 bootstrap_contacts.push(contact);
1613 } else {
1614 warn!("Invalid bootstrap address format: {}", addr);
1615 }
1616 }
1617 }
1618
1619 let mut successful_connections = 0;
1621 for contact in bootstrap_contacts {
1622 for addr in &contact.addresses {
1623 match self.connect_peer(&addr.to_string()).await {
1624 Ok(peer_id) => {
1625 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1626 successful_connections += 1;
1627
1628 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1630 let mut manager = bootstrap_manager.write().await;
1631 let mut updated_contact = contact.clone();
1632 updated_contact.peer_id = peer_id.clone();
1633 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1636 warn!("Failed to update bootstrap cache: {}", e);
1637 }
1638 }
1639 break; }
1641 Err(e) => {
1642 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1643
1644 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1646 let mut manager = bootstrap_manager.write().await;
1647 let mut updated_contact = contact.clone();
1648 updated_contact.update_connection_result(
1649 false,
1650 None,
1651 Some(e.to_string()),
1652 );
1653
1654 if let Err(e) = manager.add_contact(updated_contact).await {
1655 warn!("Failed to update bootstrap cache: {}", e);
1656 }
1657 }
1658 }
1659 }
1660 }
1661 }
1662
1663 if successful_connections == 0 {
1664 if !used_cache {
1665 warn!("Failed to connect to any bootstrap peers");
1666 }
1667 return Err(P2PError::Network(NetworkError::ConnectionFailed {
1668 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
1670 }));
1671 }
1672 info!(
1673 "Successfully connected to {} bootstrap peers",
1674 successful_connections
1675 );
1676
1677 Ok(())
1678 }
1679
1680 async fn disconnect_all_peers(&self) -> Result<()> {
1682 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1683
1684 for peer_id in peer_ids {
1685 self.disconnect_peer(&peer_id).await?;
1686 }
1687
1688 Ok(())
1689 }
1690
1691 async fn periodic_tasks(&self) -> Result<()> {
1693 Ok(())
1699 }
1700}
1701
1702#[async_trait::async_trait]
1704pub trait NetworkSender: Send + Sync {
1705 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1707
1708 fn local_peer_id(&self) -> &PeerId;
1710}
1711
1712#[derive(Clone)]
1714pub struct P2PNetworkSender {
1715 peer_id: PeerId,
1716 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1718}
1719
1720impl P2PNetworkSender {
1721 pub fn new(
1722 peer_id: PeerId,
1723 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1724 ) -> Self {
1725 Self { peer_id, send_tx }
1726 }
1727}
1728
1729#[async_trait::async_trait]
1731impl NetworkSender for P2PNetworkSender {
1732 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1734 self.send_tx
1735 .send((peer_id.clone(), protocol.to_string(), data))
1736 .map_err(|_| {
1737 P2PError::Network(crate::error::NetworkError::ProtocolError(
1738 "Failed to send message via channel".to_string().into(),
1739 ))
1740 })?;
1741 Ok(())
1742 }
1743
1744 fn local_peer_id(&self) -> &PeerId {
1746 &self.peer_id
1747 }
1748}
1749
1750pub struct NodeBuilder {
1752 config: NodeConfig,
1753}
1754
1755impl Default for NodeBuilder {
1756 fn default() -> Self {
1757 Self::new()
1758 }
1759}
1760
1761impl NodeBuilder {
1762 pub fn new() -> Self {
1764 Self {
1765 config: NodeConfig::default(),
1766 }
1767 }
1768
1769 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1771 self.config.peer_id = Some(peer_id);
1772 self
1773 }
1774
1775 pub fn listen_on(mut self, addr: &str) -> Self {
1777 if let Ok(multiaddr) = addr.parse() {
1778 self.config.listen_addrs.push(multiaddr);
1779 }
1780 self
1781 }
1782
1783 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1785 if let Ok(multiaddr) = addr.parse() {
1786 self.config.bootstrap_peers.push(multiaddr);
1787 }
1788 self.config.bootstrap_peers_str.push(addr.to_string());
1789 self
1790 }
1791
1792 pub fn with_ipv6(mut self, enable: bool) -> Self {
1794 self.config.enable_ipv6 = enable;
1795 self
1796 }
1797
1798 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1802 self.config.connection_timeout = timeout;
1803 self
1804 }
1805
1806 pub fn with_max_connections(mut self, max: usize) -> Self {
1808 self.config.max_connections = max;
1809 self
1810 }
1811
1812 pub fn with_production_mode(mut self) -> Self {
1814 self.config.production_config = Some(ProductionConfig::default());
1815 self
1816 }
1817
1818 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1820 self.config.production_config = Some(production_config);
1821 self
1822 }
1823
1824 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1826 self.config.dht_config = dht_config;
1827 self
1828 }
1829
1830 pub fn with_default_dht(mut self) -> Self {
1832 self.config.dht_config = DHTConfig::default();
1833 self
1834 }
1835
1836 pub async fn build(self) -> Result<P2PNode> {
1838 P2PNode::new(self.config).await
1839 }
1840}
1841
1842#[allow(dead_code)] async fn handle_received_message_standalone(
1845 message_data: Vec<u8>,
1846 peer_id: &PeerId,
1847 _protocol: &str,
1848 event_tx: &broadcast::Sender<P2PEvent>,
1849) -> Result<()> {
1850 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1852 Ok(message) => {
1853 if let (Some(protocol), Some(data), Some(from)) = (
1854 message.get("protocol").and_then(|v| v.as_str()),
1855 message.get("data").and_then(|v| v.as_array()),
1856 message.get("from").and_then(|v| v.as_str()),
1857 ) {
1858 let data_bytes: Vec<u8> = data
1860 .iter()
1861 .filter_map(|v| v.as_u64().map(|n| n as u8))
1862 .collect();
1863
1864 let event = P2PEvent::Message {
1866 topic: protocol.to_string(),
1867 source: from.to_string(),
1868 data: data_bytes,
1869 };
1870
1871 let _ = event_tx.send(event);
1872 debug!("Generated message event from peer: {}", peer_id);
1873 }
1874 }
1875 Err(e) => {
1876 warn!("Failed to parse received message from {}: {}", peer_id, e);
1877 }
1878 }
1879
1880 Ok(())
1881}
1882
1883#[allow(dead_code)]
1887fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
1888 match create_protocol_message_static(protocol, data) {
1889 Ok(msg) => Some(msg),
1890 Err(e) => {
1891 warn!("Failed to create protocol message: {}", e);
1892 None
1893 }
1894 }
1895}
1896
1897#[allow(dead_code)]
1899async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
1900 match result {
1901 Ok(_) => {
1902 debug!("Message sent to peer {} via transport layer", peer_id);
1903 }
1904 Err(e) => {
1905 warn!("Failed to send message to peer {}: {}", peer_id, e);
1906 }
1907 }
1908}
1909
1910#[allow(dead_code)] fn check_rate_limit(
1913 rate_limiter: &RateLimiter,
1914 socket_addr: &std::net::SocketAddr,
1915 remote_addr: &NetworkAddress,
1916) -> Result<()> {
1917 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
1918 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
1919 e
1920 })
1921}
1922
1923#[allow(dead_code)] async fn register_new_peer(
1926 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1927 peer_id: &PeerId,
1928 remote_addr: &NetworkAddress,
1929) {
1930 let mut peers_guard = peers.write().await;
1931 let peer_info = PeerInfo {
1932 peer_id: peer_id.clone(),
1933 addresses: vec![remote_addr.to_string()],
1934 connected_at: tokio::time::Instant::now(),
1935 last_seen: tokio::time::Instant::now(),
1936 status: ConnectionStatus::Connected,
1937 protocols: vec!["p2p-chat/1.0.0".to_string()],
1938 heartbeat_count: 0,
1939 };
1940 peers_guard.insert(peer_id.clone(), peer_info);
1941}
1942
1943#[allow(dead_code)] fn spawn_connection_handler(
1946 connection: Box<dyn crate::transport::Connection>,
1947 peer_id: PeerId,
1948 event_tx: broadcast::Sender<P2PEvent>,
1949 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1950) {
1951 tokio::spawn(async move {
1952 handle_peer_connection(connection, peer_id, event_tx, peers).await;
1953 });
1954}
1955
1956#[allow(dead_code)] async fn handle_peer_connection(
1959 mut connection: Box<dyn crate::transport::Connection>,
1960 peer_id: PeerId,
1961 event_tx: broadcast::Sender<P2PEvent>,
1962 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1963) {
1964 loop {
1965 match connection.receive().await {
1966 Ok(message_data) => {
1967 debug!(
1968 "Received {} bytes from peer: {}",
1969 message_data.len(),
1970 peer_id
1971 );
1972
1973 if let Err(e) = handle_received_message_standalone(
1975 message_data,
1976 &peer_id,
1977 "unknown", &event_tx,
1979 )
1980 .await
1981 {
1982 warn!("Failed to handle message from peer {}: {}", peer_id, e);
1983 }
1984 }
1985 Err(e) => {
1986 warn!("Failed to receive message from {}: {}", peer_id, e);
1987
1988 if !connection.is_alive().await {
1990 info!("Connection to {} is dead, removing peer", peer_id);
1991
1992 remove_peer(&peers, &peer_id).await;
1994
1995 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1997
1998 break; }
2000
2001 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2003 }
2004 }
2005 }
2006}
2007
2008#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2011 let mut peers_guard = peers.write().await;
2012 peers_guard.remove(peer_id);
2013}
2014
2015#[allow(dead_code)]
2017async fn update_peer_heartbeat(
2018 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2019 peer_id: &PeerId,
2020) -> Result<()> {
2021 let mut peers_guard = peers.write().await;
2022 match peers_guard.get_mut(peer_id) {
2023 Some(peer_info) => {
2024 peer_info.last_seen = Instant::now();
2025 peer_info.heartbeat_count += 1;
2026 Ok(())
2027 }
2028 None => {
2029 warn!("Received heartbeat from unknown peer: {}", peer_id);
2030 Err(P2PError::Network(NetworkError::PeerNotFound(
2031 format!("Peer {} not found", peer_id).into(),
2032 )))
2033 }
2034 }
2035}
2036
2037#[allow(dead_code)]
2039async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2040 if let Some(manager) = resource_manager {
2041 let metrics = manager.get_metrics().await;
2042 (metrics.memory_used, metrics.cpu_usage)
2043 } else {
2044 (0, 0.0)
2045 }
2046}
2047
2048#[cfg(test)]
2049mod tests {
2050 use super::*;
2051 use std::time::Duration;
2053 use tokio::time::timeout;
2054
2055 fn create_test_node_config() -> NodeConfig {
2061 NodeConfig {
2062 peer_id: Some("test_peer_123".to_string()),
2063 listen_addrs: vec![
2064 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2065 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2066 ],
2067 listen_addr: std::net::SocketAddr::new(
2068 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2069 0,
2070 ),
2071 bootstrap_peers: vec![],
2072 bootstrap_peers_str: vec![],
2073 enable_ipv6: true,
2074
2075 connection_timeout: Duration::from_millis(300),
2076 keep_alive_interval: Duration::from_secs(30),
2077 max_connections: 100,
2078 max_incoming_connections: 50,
2079 dht_config: DHTConfig::default(),
2080 security_config: SecurityConfig::default(),
2081 production_config: None,
2082 bootstrap_cache_config: None,
2083 }
2085 }
2086
2087 #[tokio::test]
2091 async fn test_node_config_default() {
2092 let config = NodeConfig::default();
2093
2094 assert!(config.peer_id.is_none());
2095 assert_eq!(config.listen_addrs.len(), 2);
2096 assert!(config.enable_ipv6);
2097 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2099 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2100 }
2101
2102 #[tokio::test]
2103 async fn test_dht_config_default() {
2104 let config = DHTConfig::default();
2105
2106 assert_eq!(config.k_value, 20);
2107 assert_eq!(config.alpha_value, 5);
2108 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2109 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2110 }
2111
2112 #[tokio::test]
2113 async fn test_security_config_default() {
2114 let config = SecurityConfig::default();
2115
2116 assert!(config.enable_noise);
2117 assert!(config.enable_tls);
2118 assert_eq!(config.trust_level, TrustLevel::Basic);
2119 }
2120
2121 #[test]
2122 fn test_trust_level_variants() {
2123 let _none = TrustLevel::None;
2125 let _basic = TrustLevel::Basic;
2126 let _full = TrustLevel::Full;
2127
2128 assert_eq!(TrustLevel::None, TrustLevel::None);
2130 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2131 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2132 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2133 }
2134
2135 #[test]
2136 fn test_connection_status_variants() {
2137 let connecting = ConnectionStatus::Connecting;
2138 let connected = ConnectionStatus::Connected;
2139 let disconnecting = ConnectionStatus::Disconnecting;
2140 let disconnected = ConnectionStatus::Disconnected;
2141 let failed = ConnectionStatus::Failed("test error".to_string());
2142
2143 assert_eq!(connecting, ConnectionStatus::Connecting);
2144 assert_eq!(connected, ConnectionStatus::Connected);
2145 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2146 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2147 assert_ne!(connecting, connected);
2148
2149 if let ConnectionStatus::Failed(msg) = failed {
2150 assert_eq!(msg, "test error");
2151 } else {
2152 panic!("Expected Failed status");
2153 }
2154 }
2155
2156 #[tokio::test]
2157 async fn test_node_creation() -> Result<()> {
2158 let config = create_test_node_config();
2159 let node = P2PNode::new(config).await?;
2160
2161 assert_eq!(node.peer_id(), "test_peer_123");
2162 assert!(!node.is_running().await);
2163 assert_eq!(node.peer_count().await, 0);
2164 assert!(node.connected_peers().await.is_empty());
2165
2166 Ok(())
2167 }
2168
2169 #[tokio::test]
2170 async fn test_node_creation_without_peer_id() -> Result<()> {
2171 let mut config = create_test_node_config();
2172 config.peer_id = None;
2173
2174 let node = P2PNode::new(config).await?;
2175
2176 assert!(node.peer_id().starts_with("peer_"));
2178 assert!(!node.is_running().await);
2179
2180 Ok(())
2181 }
2182
2183 #[tokio::test]
2184 async fn test_node_lifecycle() -> Result<()> {
2185 let config = create_test_node_config();
2186 let node = P2PNode::new(config).await?;
2187
2188 assert!(!node.is_running().await);
2190
2191 node.start().await?;
2193 assert!(node.is_running().await);
2194
2195 let listen_addrs = node.listen_addrs().await;
2197 assert!(
2198 !listen_addrs.is_empty(),
2199 "Expected at least one listening address"
2200 );
2201
2202 node.stop().await?;
2204 assert!(!node.is_running().await);
2205
2206 Ok(())
2207 }
2208
2209 #[tokio::test]
2210 async fn test_peer_connection() -> Result<()> {
2211 let config = create_test_node_config();
2212 let node = P2PNode::new(config).await?;
2213
2214 let peer_addr = "127.0.0.1:0";
2215
2216 let peer_id = node.connect_peer(peer_addr).await?;
2218 assert!(peer_id.starts_with("peer_from_"));
2219
2220 assert_eq!(node.peer_count().await, 1);
2222
2223 let connected_peers = node.connected_peers().await;
2225 assert_eq!(connected_peers.len(), 1);
2226 assert_eq!(connected_peers[0], peer_id);
2227
2228 let peer_info = node.peer_info(&peer_id).await;
2230 assert!(peer_info.is_some());
2231 let info = peer_info.expect("Peer info should exist after adding peer");
2232 assert_eq!(info.peer_id, peer_id);
2233 assert_eq!(info.status, ConnectionStatus::Connected);
2234 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2235
2236 node.disconnect_peer(&peer_id).await?;
2238 assert_eq!(node.peer_count().await, 0);
2239
2240 Ok(())
2241 }
2242
2243 #[tokio::test]
2244 async fn test_event_subscription() -> Result<()> {
2245 let config = create_test_node_config();
2246 let node = P2PNode::new(config).await?;
2247
2248 let mut events = node.subscribe_events();
2249 let peer_addr = "127.0.0.1:0";
2250
2251 let peer_id = node.connect_peer(peer_addr).await?;
2253
2254 let event = timeout(Duration::from_millis(100), events.recv()).await;
2256 assert!(event.is_ok());
2257
2258 let event_result = event
2259 .expect("Should receive event")
2260 .expect("Event should not be error");
2261 match event_result {
2262 P2PEvent::PeerConnected(event_peer_id) => {
2263 assert_eq!(event_peer_id, peer_id);
2264 }
2265 _ => panic!("Expected PeerConnected event"),
2266 }
2267
2268 node.disconnect_peer(&peer_id).await?;
2270
2271 let event = timeout(Duration::from_millis(100), events.recv()).await;
2273 assert!(event.is_ok());
2274
2275 let event_result = event
2276 .expect("Should receive event")
2277 .expect("Event should not be error");
2278 match event_result {
2279 P2PEvent::PeerDisconnected(event_peer_id) => {
2280 assert_eq!(event_peer_id, peer_id);
2281 }
2282 _ => panic!("Expected PeerDisconnected event"),
2283 }
2284
2285 Ok(())
2286 }
2287
2288 #[tokio::test]
2289 async fn test_message_sending() -> Result<()> {
2290 let mut config1 = create_test_node_config();
2292 config1.listen_addr =
2293 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2294 let node1 = P2PNode::new(config1).await?;
2295 node1.start().await?;
2296
2297 let mut config2 = create_test_node_config();
2298 config2.listen_addr =
2299 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2300 let node2 = P2PNode::new(config2).await?;
2301 node2.start().await?;
2302
2303 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2305
2306 let node2_addr = node2.local_addr().ok_or_else(|| {
2308 P2PError::Network(crate::error::NetworkError::ProtocolError(
2309 "No listening address".to_string().into(),
2310 ))
2311 })?;
2312
2313 let peer_id =
2315 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2316 Ok(res) => res?,
2317 Err(_) => return Err(P2PError::Network(NetworkError::Timeout).into()),
2318 };
2319
2320 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2322
2323 let message_data = b"Hello, peer!".to_vec();
2325 let result = match timeout(
2326 Duration::from_millis(500),
2327 node1.send_message(&peer_id, "test-protocol", message_data),
2328 )
2329 .await
2330 {
2331 Ok(res) => res,
2332 Err(_) => return Err(P2PError::Network(NetworkError::Timeout).into()),
2333 };
2334 if let Err(e) = &result {
2337 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2338 }
2339
2340 let non_existent_peer = "non_existent_peer".to_string();
2342 let result = node1
2343 .send_message(&non_existent_peer, "test-protocol", vec![])
2344 .await;
2345 assert!(result.is_err(), "Sending to non-existent peer should fail");
2346
2347 Ok(())
2348 }
2349
2350 #[tokio::test]
2351 async fn test_remote_mcp_operations() -> Result<()> {
2352 let config = create_test_node_config();
2353 let node = P2PNode::new(config).await?;
2354
2355 node.start().await?;
2357 node.stop().await?;
2358 Ok(())
2359 }
2360
2361 #[tokio::test]
2362 async fn test_health_check() -> Result<()> {
2363 let config = create_test_node_config();
2364 let node = P2PNode::new(config).await?;
2365
2366 let result = node.health_check().await;
2368 assert!(result.is_ok());
2369
2370 Ok(())
2375 }
2376
2377 #[tokio::test]
2378 async fn test_node_uptime() -> Result<()> {
2379 let config = create_test_node_config();
2380 let node = P2PNode::new(config).await?;
2381
2382 let uptime1 = node.uptime();
2383 assert!(uptime1 >= Duration::from_secs(0));
2384
2385 tokio::time::sleep(Duration::from_millis(10)).await;
2387
2388 let uptime2 = node.uptime();
2389 assert!(uptime2 > uptime1);
2390
2391 Ok(())
2392 }
2393
2394 #[tokio::test]
2395 async fn test_node_config_access() -> Result<()> {
2396 let config = create_test_node_config();
2397 let expected_peer_id = config.peer_id.clone();
2398 let node = P2PNode::new(config).await?;
2399
2400 let node_config = node.config();
2401 assert_eq!(node_config.peer_id, expected_peer_id);
2402 assert_eq!(node_config.max_connections, 100);
2403 Ok(())
2406 }
2407
2408 #[tokio::test]
2409 async fn test_mcp_server_access() -> Result<()> {
2410 let config = create_test_node_config();
2411 let _node = P2PNode::new(config).await?;
2412
2413 Ok(())
2415 }
2416
2417 #[tokio::test]
2418 async fn test_dht_access() -> Result<()> {
2419 let config = create_test_node_config();
2420 let node = P2PNode::new(config).await?;
2421
2422 assert!(node.dht().is_some());
2424
2425 Ok(())
2426 }
2427
2428 #[tokio::test]
2429 async fn test_node_builder() -> Result<()> {
2430 let builder = P2PNode::builder()
2432 .with_peer_id("builder_test_peer".to_string())
2433 .listen_on("/ip4/127.0.0.1/tcp/0")
2434 .listen_on("/ip6/::1/tcp/0")
2435 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2437 .with_connection_timeout(Duration::from_secs(15))
2438 .with_max_connections(200);
2439
2440 let config = builder.config;
2442 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2443 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2446 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2447 assert_eq!(config.max_connections, 200);
2448
2449 Ok(())
2450 }
2451
2452 #[tokio::test]
2453 async fn test_bootstrap_peers() -> Result<()> {
2454 let mut config = create_test_node_config();
2455 config.bootstrap_peers = vec![
2456 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2457 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2458 ];
2459
2460 let node = P2PNode::new(config).await?;
2461
2462 node.start().await?;
2464
2465 let peer_count = node.peer_count().await;
2468 assert!(
2469 peer_count <= 2,
2470 "Peer count should not exceed bootstrap peer count"
2471 );
2472
2473 node.stop().await?;
2474 Ok(())
2475 }
2476
2477 #[tokio::test]
2478 async fn test_production_mode_disabled() -> Result<()> {
2479 let config = create_test_node_config();
2480 let node = P2PNode::new(config).await?;
2481
2482 assert!(!node.is_production_mode());
2483 assert!(node.production_config().is_none());
2484
2485 let result = node.resource_metrics().await;
2487 assert!(result.is_err());
2488 assert!(result.unwrap_err().to_string().contains("not enabled"));
2489
2490 Ok(())
2491 }
2492
2493 #[tokio::test]
2494 async fn test_network_event_variants() {
2495 let peer_id = "test_peer".to_string();
2497 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2498
2499 let _peer_connected = NetworkEvent::PeerConnected {
2500 peer_id: peer_id.clone(),
2501 addresses: vec![address.clone()],
2502 };
2503
2504 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2505 peer_id: peer_id.clone(),
2506 reason: "test disconnect".to_string(),
2507 };
2508
2509 let _message_received = NetworkEvent::MessageReceived {
2510 peer_id: peer_id.clone(),
2511 protocol: "test-protocol".to_string(),
2512 data: vec![1, 2, 3],
2513 };
2514
2515 let _connection_failed = NetworkEvent::ConnectionFailed {
2516 peer_id: Some(peer_id.clone()),
2517 address: address.clone(),
2518 error: "connection refused".to_string(),
2519 };
2520
2521 let _dht_stored = NetworkEvent::DHTRecordStored {
2522 key: vec![1, 2, 3],
2523 value: vec![4, 5, 6],
2524 };
2525
2526 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2527 key: vec![1, 2, 3],
2528 value: Some(vec![4, 5, 6]),
2529 };
2530 }
2531
2532 #[tokio::test]
2533 async fn test_peer_info_structure() {
2534 let peer_info = PeerInfo {
2535 peer_id: "test_peer".to_string(),
2536 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2537 connected_at: Instant::now(),
2538 last_seen: Instant::now(),
2539 status: ConnectionStatus::Connected,
2540 protocols: vec!["test-protocol".to_string()],
2541 heartbeat_count: 0,
2542 };
2543
2544 assert_eq!(peer_info.peer_id, "test_peer");
2545 assert_eq!(peer_info.addresses.len(), 1);
2546 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2547 assert_eq!(peer_info.protocols.len(), 1);
2548 }
2549
2550 #[tokio::test]
2551 async fn test_serialization() -> Result<()> {
2552 let config = create_test_node_config();
2554 let serialized = serde_json::to_string(&config)?;
2555 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2556
2557 assert_eq!(config.peer_id, deserialized.peer_id);
2558 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2559 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2560
2561 Ok(())
2562 }
2563}