1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::{RwLock, broadcast};
37use tokio::time::Instant;
38use tracing::{debug, info, warn};
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeConfig {
43 pub peer_id: Option<PeerId>,
45
46 pub listen_addrs: Vec<std::net::SocketAddr>,
48
49 pub listen_addr: std::net::SocketAddr,
51
52 pub bootstrap_peers: Vec<std::net::SocketAddr>,
54
55 pub bootstrap_peers_str: Vec<String>,
57
58 pub enable_ipv6: bool,
60
61 pub connection_timeout: Duration,
64
65 pub keep_alive_interval: Duration,
67
68 pub max_connections: usize,
70
71 pub max_incoming_connections: usize,
73
74 pub dht_config: DHTConfig,
76
77 pub security_config: SecurityConfig,
79
80 pub production_config: Option<ProductionConfig>,
82
83 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DHTConfig {
90 pub k_value: usize,
92
93 pub alpha_value: usize,
95
96 pub record_ttl: Duration,
98
99 pub refresh_interval: Duration,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct SecurityConfig {
106 pub enable_noise: bool,
108
109 pub enable_tls: bool,
111
112 pub trust_level: TrustLevel,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum TrustLevel {
119 None,
121 Basic,
123 Full,
125}
126
127impl NodeConfig {
128 pub fn new() -> Result<Self> {
134 let config = Config::default();
136
137 let listen_addr = config.listen_socket_addr()?;
139
140 let mut listen_addrs = vec![];
142
143 if config.network.ipv6_enabled {
145 let ipv6_addr = std::net::SocketAddr::new(
146 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
147 listen_addr.port(),
148 );
149 listen_addrs.push(ipv6_addr);
150 }
151
152 let ipv4_addr = std::net::SocketAddr::new(
154 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
155 listen_addr.port(),
156 );
157 listen_addrs.push(ipv4_addr);
158
159 Ok(Self {
160 peer_id: None,
161 listen_addrs,
162 listen_addr,
163 bootstrap_peers: Vec::new(),
164 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
165 enable_ipv6: config.network.ipv6_enabled,
166
167 connection_timeout: Duration::from_secs(config.network.connection_timeout),
168 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
169 max_connections: config.network.max_connections,
170 max_incoming_connections: config.security.connection_limit as usize,
171 dht_config: DHTConfig::default(),
172 security_config: SecurityConfig::default(),
173 production_config: None,
174 bootstrap_cache_config: None,
175 })
177 }
178}
179
180impl Default for NodeConfig {
181 fn default() -> Self {
182 let config = Config::default();
184
185 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
187 std::net::SocketAddr::new(
188 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
189 9000,
190 )
191 });
192
193 Self {
194 peer_id: None,
195 listen_addrs: vec![
196 std::net::SocketAddr::new(
197 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
198 listen_addr.port(),
199 ),
200 std::net::SocketAddr::new(
201 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
202 listen_addr.port(),
203 ),
204 ],
205 listen_addr,
206 bootstrap_peers: Vec::new(),
207 bootstrap_peers_str: Vec::new(),
208 enable_ipv6: config.network.ipv6_enabled,
209
210 connection_timeout: Duration::from_secs(config.network.connection_timeout),
211 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
212 max_connections: config.network.max_connections,
213 max_incoming_connections: config.security.connection_limit as usize,
214 dht_config: DHTConfig::default(),
215 security_config: SecurityConfig::default(),
216 production_config: None, bootstrap_cache_config: None,
218 }
220 }
221}
222
223impl NodeConfig {
224 pub fn from_config(config: &Config) -> Result<Self> {
226 let listen_addr = config.listen_socket_addr()?;
227 let bootstrap_addrs = config.bootstrap_addrs()?;
228
229 let mut node_config = Self {
230 peer_id: None,
231 listen_addrs: vec![listen_addr],
232 listen_addr,
233 bootstrap_peers: bootstrap_addrs
234 .iter()
235 .map(|addr| addr.socket_addr())
236 .collect(),
237 bootstrap_peers_str: config
238 .network
239 .bootstrap_nodes
240 .iter()
241 .map(|addr| addr.to_string())
242 .collect(),
243 enable_ipv6: config.network.ipv6_enabled,
244
245 connection_timeout: Duration::from_secs(config.network.connection_timeout),
246 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
247 max_connections: config.network.max_connections,
248 max_incoming_connections: config.security.connection_limit as usize,
249 dht_config: DHTConfig {
250 k_value: 20,
251 alpha_value: 3,
252 record_ttl: Duration::from_secs(3600),
253 refresh_interval: Duration::from_secs(900),
254 },
255 security_config: SecurityConfig {
256 enable_noise: true,
257 enable_tls: true,
258 trust_level: TrustLevel::Basic,
259 },
260 production_config: Some(ProductionConfig {
261 max_connections: config.network.max_connections,
262 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
265 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266 health_check_interval: Duration::from_secs(30),
267 metrics_interval: Duration::from_secs(60),
268 enable_performance_tracking: true,
269 enable_auto_cleanup: true,
270 shutdown_timeout: Duration::from_secs(30),
271 rate_limits: crate::production::RateLimitConfig::default(),
272 }),
273 bootstrap_cache_config: None,
274 };
279
280 if config.network.ipv6_enabled {
282 node_config.listen_addrs.push(std::net::SocketAddr::new(
283 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
284 listen_addr.port(),
285 ));
286 }
287
288 Ok(node_config)
289 }
290
291 pub fn with_listen_addr(addr: &str) -> Result<Self> {
293 let listen_addr: std::net::SocketAddr = addr
294 .parse()
295 .map_err(|e: std::net::AddrParseError| {
296 NetworkError::InvalidAddress(e.to_string().into())
297 })
298 .map_err(P2PError::Network)?;
299 let cfg = NodeConfig {
300 listen_addr,
301 listen_addrs: vec![listen_addr],
302 ..Default::default()
303 };
304 Ok(cfg)
305 }
306}
307
308impl Default for DHTConfig {
309 fn default() -> Self {
310 Self {
311 k_value: 20,
312 alpha_value: 5,
313 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
316 }
317}
318
319impl Default for SecurityConfig {
320 fn default() -> Self {
321 Self {
322 enable_noise: true,
323 enable_tls: true,
324 trust_level: TrustLevel::Basic,
325 }
326 }
327}
328
329#[derive(Debug, Clone)]
331pub struct PeerInfo {
332 pub peer_id: PeerId,
334
335 pub addresses: Vec<String>,
337
338 pub connected_at: Instant,
340
341 pub last_seen: Instant,
343
344 pub status: ConnectionStatus,
346
347 pub protocols: Vec<String>,
349
350 pub heartbeat_count: u64,
352}
353
354#[derive(Debug, Clone, PartialEq)]
356pub enum ConnectionStatus {
357 Connecting,
359 Connected,
361 Disconnecting,
363 Disconnected,
365 Failed(String),
367}
368
369#[derive(Debug, Clone)]
371pub enum NetworkEvent {
372 PeerConnected {
374 peer_id: PeerId,
376 addresses: Vec<String>,
378 },
379
380 PeerDisconnected {
382 peer_id: PeerId,
384 reason: String,
386 },
387
388 MessageReceived {
390 peer_id: PeerId,
392 protocol: String,
394 data: Vec<u8>,
396 },
397
398 ConnectionFailed {
400 peer_id: Option<PeerId>,
402 address: String,
404 error: String,
406 },
407
408 DHTRecordStored {
410 key: Vec<u8>,
412 value: Vec<u8>,
414 },
415
416 DHTRecordRetrieved {
418 key: Vec<u8>,
420 value: Option<Vec<u8>>,
422 },
423}
424
425#[derive(Debug, Clone)]
430pub enum P2PEvent {
431 Message {
433 topic: String,
435 source: PeerId,
437 data: Vec<u8>,
439 },
440 PeerConnected(PeerId),
442 PeerDisconnected(PeerId),
444}
445
446pub struct P2PNode {
456 config: NodeConfig,
458
459 peer_id: PeerId,
461
462 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
464
465 event_tx: broadcast::Sender<P2PEvent>,
467
468 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
470
471 start_time: Instant,
473
474 running: RwLock<bool>,
476
477 dht: Option<Arc<RwLock<DHT>>>,
479
480 resource_manager: Option<Arc<ResourceManager>>,
482
483 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
485
486 dual_node: Arc<DualStackNetworkNode>,
488
489 #[allow(dead_code)]
491 rate_limiter: Arc<RateLimiter>,
492}
493
494impl P2PNode {
495 pub fn new_for_tests() -> Result<Self> {
497 let (event_tx, _) = broadcast::channel(16);
498 Ok(Self {
499 config: NodeConfig::default(),
500 peer_id: "test_peer".to_string(),
501 peers: Arc::new(RwLock::new(HashMap::new())),
502 event_tx,
503 listen_addrs: RwLock::new(Vec::new()),
504 start_time: Instant::now(),
505 running: RwLock::new(false),
506 dht: None,
507 resource_manager: None,
508 bootstrap_manager: None,
509 dual_node: {
510 let v6: Option<std::net::SocketAddr> = "[::1]:0"
512 .parse()
513 .ok()
514 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
515 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
516 let handle = tokio::runtime::Handle::current();
517 let dual_attempt = handle.block_on(
518 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
519 );
520 let dual = match dual_attempt {
521 Ok(d) => d,
522 Err(_e1) => {
523 let fallback = handle.block_on(
525 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
526 None,
527 "127.0.0.1:0".parse().ok(),
528 ),
529 );
530 match fallback {
531 Ok(d) => d,
532 Err(e2) => {
533 return Err(P2PError::Network(NetworkError::BindError(
534 format!("Failed to create dual-stack network node: {}", e2)
535 .into(),
536 )));
537 }
538 }
539 }
540 };
541 Arc::new(dual)
542 },
543 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
544 max_requests: 100,
545 burst_size: 100,
546 window: std::time::Duration::from_secs(1),
547 ..Default::default()
548 })),
549 })
550 }
551 pub async fn new(config: NodeConfig) -> Result<Self> {
553 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
554 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
556 });
557
558 let (event_tx, _) = broadcast::channel(1000);
559
560 {
563 use blake3::Hasher;
564 let mut hasher = Hasher::new();
565 hasher.update(peer_id.as_bytes());
566 let digest = hasher.finalize();
567 let mut nid = [0u8; 32];
568 nid.copy_from_slice(digest.as_bytes());
569 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
570 crate::identity::node_identity::NodeId::from_bytes(nid),
571 ));
572 }
575
576 let dht = if true {
578 let _dht_config = crate::dht::DHTConfig {
580 replication_factor: config.dht_config.k_value,
581 bucket_size: config.dht_config.k_value,
582 alpha: config.dht_config.alpha_value,
583 record_ttl: config.dht_config.record_ttl,
584 bucket_refresh_interval: config.dht_config.refresh_interval,
585 republish_interval: config.dht_config.refresh_interval,
586 max_distance: 160, };
588 let peer_bytes = peer_id.as_bytes();
590 let mut node_id_bytes = [0u8; 32];
591 let len = peer_bytes.len().min(32);
592 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
593 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
594 let dht_instance = DHT::new(node_id).map_err(|e| {
595 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
596 e.to_string().into(),
597 ))
598 })?;
599 Some(Arc::new(RwLock::new(dht_instance)))
600 } else {
601 None
602 };
603
604 let resource_manager = config
608 .production_config
609 .clone()
610 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
611
612 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
614 match BootstrapManager::with_config(cache_config.clone()).await {
615 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
616 Err(e) => {
617 warn!(
618 "Failed to initialize bootstrap manager: {}, continuing without cache",
619 e
620 );
621 None
622 }
623 }
624 } else {
625 match BootstrapManager::new().await {
626 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
627 Err(e) => {
628 warn!(
629 "Failed to initialize bootstrap manager: {}, continuing without cache",
630 e
631 );
632 None
633 }
634 }
635 };
636
637 let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
639 let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
640 let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
641 (v6_addr, v4_addr)
642 } else {
643 let v4_addr = Some(std::net::SocketAddr::new(
645 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
646 config.listen_addr.port(),
647 ));
648 let v6_addr = if config.enable_ipv6 {
649 Some(std::net::SocketAddr::new(
650 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
651 config.listen_addr.port(),
652 ))
653 } else {
654 None
655 };
656 (v6_addr, v4_addr)
657 };
658
659 let dual_node = Arc::new(
660 DualStackNetworkNode::new(v6_opt, v4_opt)
661 .await
662 .map_err(|e| {
663 P2PError::Transport(crate::error::TransportError::SetupFailed(
664 format!("Failed to create dual-stack network nodes: {}", e).into(),
665 ))
666 })?,
667 );
668
669 let rate_limiter = Arc::new(RateLimiter::new(
671 crate::validation::RateLimitConfig::default(),
672 ));
673
674 let node = Self {
675 config,
676 peer_id,
677 peers: Arc::new(RwLock::new(HashMap::new())),
678 event_tx,
679 listen_addrs: RwLock::new(Vec::new()),
680 start_time: Instant::now(),
681 running: RwLock::new(false),
682 dht,
683 resource_manager,
684 bootstrap_manager,
685 dual_node,
686 rate_limiter,
687 };
688 info!("Created P2P node with peer ID: {}", node.peer_id);
689
690 Ok(node)
691 }
692
693 pub fn builder() -> NodeBuilder {
695 NodeBuilder::new()
696 }
697
698 pub fn peer_id(&self) -> &PeerId {
700 &self.peer_id
701 }
702
703 pub fn local_addr(&self) -> Option<String> {
704 self.listen_addrs
705 .try_read()
706 .ok()
707 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
708 }
709
710 pub async fn subscribe(&self, topic: &str) -> Result<()> {
711 info!("Subscribed to topic: {}", topic);
714 Ok(())
715 }
716
717 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
718 info!(
719 "Publishing message to topic: {} ({} bytes)",
720 topic,
721 data.len()
722 );
723
724 let peer_list: Vec<PeerId> = {
726 let peers_guard = self.peers.read().await;
727 peers_guard.keys().cloned().collect()
728 };
729
730 if peer_list.is_empty() {
731 debug!("No peers connected, message will only be sent to local subscribers");
732 } else {
733 let mut send_count = 0;
735 for peer_id in &peer_list {
736 match self.send_message(peer_id, topic, data.to_vec()).await {
737 Ok(_) => {
738 send_count += 1;
739 debug!("Sent message to peer: {}", peer_id);
740 }
741 Err(e) => {
742 warn!("Failed to send message to peer {}: {}", peer_id, e);
743 }
744 }
745 }
746 info!(
747 "Published message to {}/{} connected peers",
748 send_count,
749 peer_list.len()
750 );
751 }
752
753 let event = P2PEvent::Message {
755 topic: topic.to_string(),
756 source: self.peer_id.clone(),
757 data: data.to_vec(),
758 };
759 let _ = self.event_tx.send(event);
760
761 Ok(())
762 }
763
764 pub fn config(&self) -> &NodeConfig {
766 &self.config
767 }
768
769 pub async fn start(&self) -> Result<()> {
771 info!("Starting P2P node...");
772
773 if let Some(ref resource_manager) = self.resource_manager {
775 resource_manager.start().await.map_err(|e| {
776 P2PError::Network(crate::error::NetworkError::ProtocolError(
777 format!("Failed to start resource manager: {e}").into(),
778 ))
779 })?;
780 info!("Production resource manager started");
781 }
782
783 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
785 let mut manager = bootstrap_manager.write().await;
786 manager.start_background_tasks().await.map_err(|e| {
787 P2PError::Network(crate::error::NetworkError::ProtocolError(
788 format!("Failed to start bootstrap manager: {e}").into(),
789 ))
790 })?;
791 info!("Bootstrap cache manager started");
792 }
793
794 *self.running.write().await = true;
796
797 self.start_network_listeners().await?;
799
800 let listen_addrs = self.listen_addrs.read().await;
802 info!("P2P node started on addresses: {:?}", *listen_addrs);
803
804 self.start_message_receiving_system().await?;
808
809 self.connect_bootstrap_peers().await?;
811
812 Ok(())
813 }
814
815 async fn start_network_listeners(&self) -> Result<()> {
817 info!("Starting dual-stack listeners (ant-quic)...");
818 let addrs = self.dual_node.local_addrs();
820 {
821 let mut la = self.listen_addrs.write().await;
822 *la = addrs.clone();
823 }
824
825 let event_tx = self.event_tx.clone();
827 let peers = self.peers.clone();
828 let rate_limiter = self.rate_limiter.clone();
829 let dual = self.dual_node.clone();
830 tokio::spawn(async move {
831 loop {
832 match dual.accept_any().await {
833 Ok((ant_peer_id, remote_sock)) => {
834 let peer_id =
835 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
836 let remote_addr = NetworkAddress::from(remote_sock);
837 let _ = rate_limiter.check_ip(&remote_sock.ip());
839 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
840 register_new_peer(&peers, &peer_id, &remote_addr).await;
841 }
842 Err(e) => {
843 warn!("Accept failed: {}", e);
844 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
845 }
846 }
847 }
848 });
849
850 info!("Dual-stack listeners active on: {:?}", addrs);
851 Ok(())
852 }
853
854 #[allow(dead_code)]
856 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
857 warn!("QUIC transport temporarily disabled during ant-quic migration");
896 Err(crate::P2PError::Transport(
898 crate::error::TransportError::SetupFailed(
899 format!(
900 "Failed to start QUIC listener on {addr} - transport disabled during migration"
901 )
902 .into(),
903 ),
904 ))
905 }
906
907 #[allow(dead_code)] async fn start_connection_acceptor(
910 &self,
911 transport: Arc<dyn crate::transport::Transport>,
912 addr: std::net::SocketAddr,
913 transport_type: crate::transport::TransportType,
914 ) -> Result<()> {
915 info!(
916 "Starting connection acceptor for {:?} on {}",
917 transport_type, addr
918 );
919
920 let event_tx = self.event_tx.clone();
922 let _peer_id = self.peer_id.clone();
923 let peers = Arc::clone(&self.peers);
924 let rate_limiter = Arc::clone(&self.rate_limiter);
927
928 tokio::spawn(async move {
930 loop {
931 match transport.accept().await {
932 Ok(connection) => {
933 let remote_addr = connection.remote_addr();
934 let connection_peer_id =
935 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
936
937 let socket_addr = remote_addr.socket_addr();
939 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
940 continue;
942 }
943
944 info!(
945 "Accepted {:?} connection from {} (peer: {})",
946 transport_type, remote_addr, connection_peer_id
947 );
948
949 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
951
952 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
954
955 spawn_connection_handler(
957 connection,
958 connection_peer_id,
959 event_tx.clone(),
960 Arc::clone(&peers),
961 );
962 }
963 Err(e) => {
964 warn!(
965 "Failed to accept {:?} connection on {}: {}",
966 transport_type, addr, e
967 );
968
969 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
971 }
972 }
973 }
974 });
975
976 info!(
977 "Connection acceptor background task started for {:?} on {}",
978 transport_type, addr
979 );
980 Ok(())
981 }
982
983 async fn start_message_receiving_system(&self) -> Result<()> {
985 info!("Starting message receiving system");
986 let dual = self.dual_node.clone();
987 let event_tx = self.event_tx.clone();
988
989 tokio::spawn(async move {
990 loop {
991 match dual.receive_any().await {
992 Ok((_peer_id, bytes)) => {
993 #[allow(clippy::collapsible_if)]
995 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
996 if let (Some(protocol), Some(data), Some(from)) = (
997 value.get("protocol").and_then(|v| v.as_str()),
998 value.get("data").and_then(|v| v.as_array()),
999 value.get("from").and_then(|v| v.as_str()),
1000 ) {
1001 let payload: Vec<u8> = data
1002 .iter()
1003 .filter_map(|v| v.as_u64().map(|n| n as u8))
1004 .collect();
1005 let _ = event_tx.send(P2PEvent::Message {
1006 topic: protocol.to_string(),
1007 source: from.to_string(),
1008 data: payload,
1009 });
1010 }
1011 }
1012 }
1013 Err(e) => {
1014 warn!("Receive error: {}", e);
1015 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1016 }
1017 }
1018 }
1019 });
1020
1021 Ok(())
1022 }
1023
1024 #[allow(dead_code)]
1026 async fn handle_received_message(
1027 &self,
1028 message_data: Vec<u8>,
1029 peer_id: &PeerId,
1030 _protocol: &str,
1031 event_tx: &broadcast::Sender<P2PEvent>,
1032 ) -> Result<()> {
1033 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1037 Ok(message) => {
1038 if let (Some(protocol), Some(data), Some(from)) = (
1039 message.get("protocol").and_then(|v| v.as_str()),
1040 message.get("data").and_then(|v| v.as_array()),
1041 message.get("from").and_then(|v| v.as_str()),
1042 ) {
1043 let data_bytes: Vec<u8> = data
1045 .iter()
1046 .filter_map(|v| v.as_u64().map(|n| n as u8))
1047 .collect();
1048
1049 let event = P2PEvent::Message {
1051 topic: protocol.to_string(),
1052 source: from.to_string(),
1053 data: data_bytes,
1054 };
1055
1056 let _ = event_tx.send(event);
1057 debug!("Generated message event from peer: {}", peer_id);
1058 }
1059 }
1060 Err(e) => {
1061 warn!("Failed to parse received message from {}: {}", peer_id, e);
1062 }
1063 }
1064
1065 Ok(())
1066 }
1067
1068 pub async fn run(&self) -> Result<()> {
1074 if !*self.running.read().await {
1075 self.start().await?;
1076 }
1077
1078 info!("P2P node running...");
1079
1080 loop {
1082 if !*self.running.read().await {
1083 break;
1084 }
1085
1086 self.periodic_tasks().await?;
1088
1089 tokio::time::sleep(Duration::from_millis(100)).await;
1091 }
1092
1093 info!("P2P node stopped");
1094 Ok(())
1095 }
1096
1097 pub async fn stop(&self) -> Result<()> {
1099 info!("Stopping P2P node...");
1100
1101 *self.running.write().await = false;
1103
1104 self.disconnect_all_peers().await?;
1106
1107 if let Some(ref resource_manager) = self.resource_manager {
1109 resource_manager.shutdown().await.map_err(|e| {
1110 P2PError::Network(crate::error::NetworkError::ProtocolError(
1111 format!("Failed to shutdown resource manager: {e}").into(),
1112 ))
1113 })?;
1114 info!("Production resource manager stopped");
1115 }
1116
1117 info!("P2P node stopped");
1118 Ok(())
1119 }
1120
1121 pub async fn shutdown(&self) -> Result<()> {
1123 self.stop().await
1124 }
1125
1126 pub async fn is_running(&self) -> bool {
1128 *self.running.read().await
1129 }
1130
1131 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1133 self.listen_addrs.read().await.clone()
1134 }
1135
1136 pub async fn connected_peers(&self) -> Vec<PeerId> {
1138 self.peers.read().await.keys().cloned().collect()
1139 }
1140
1141 pub async fn peer_count(&self) -> usize {
1143 self.peers.read().await.len()
1144 }
1145
1146 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1148 self.peers.read().await.get(peer_id).cloned()
1149 }
1150
1151 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1153 info!("Connecting to peer at: {}", address);
1154
1155 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1157 Some(resource_manager.acquire_connection().await?)
1158 } else {
1159 None
1160 };
1161
1162 let _socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1164 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1165 format!("{}: {}", address, e).into(),
1166 ))
1167 })?;
1168
1169 let addr_list = vec![_socket_addr];
1171 let peer_id = match tokio::time::timeout(
1172 self.config.connection_timeout,
1173 self.dual_node.connect_happy_eyeballs(&addr_list),
1174 )
1175 .await
1176 {
1177 Ok(Ok(peer)) => {
1178 let connected_peer_id =
1179 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1180 info!("Successfully connected to peer: {}", connected_peer_id);
1181 connected_peer_id
1182 }
1183 Ok(Err(e)) => {
1184 warn!("Failed to connect to peer at {}: {}", address, e);
1185 let demo_peer_id =
1186 format!("peer_from_{}", address.replace('/', "_").replace(':', "_"));
1187 warn!(
1188 "Using demo peer ID: {} (transport connection failed)",
1189 demo_peer_id
1190 );
1191 demo_peer_id
1192 }
1193 Err(_) => {
1194 warn!(
1195 "Timed out connecting to peer at {} after {:?}",
1196 address, self.config.connection_timeout
1197 );
1198 let demo_peer_id =
1199 format!("peer_from_{}", address.replace('/', "_").replace(':', "_"));
1200 demo_peer_id
1201 }
1202 };
1203
1204 let peer_info = PeerInfo {
1206 peer_id: peer_id.clone(),
1207 addresses: vec![address.to_string()],
1208 connected_at: Instant::now(),
1209 last_seen: Instant::now(),
1210 status: ConnectionStatus::Connected,
1211 protocols: vec!["p2p-foundation/1.0".to_string()],
1212 heartbeat_count: 0,
1213 };
1214
1215 self.peers.write().await.insert(peer_id.clone(), peer_info);
1217
1218 if let Some(ref resource_manager) = self.resource_manager {
1220 resource_manager.record_bandwidth(0, 0); }
1222
1223 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1225
1226 info!("Connected to peer: {}", peer_id);
1227 Ok(peer_id)
1228 }
1229
1230 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1232 info!("Disconnecting from peer: {}", peer_id);
1233
1234 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1235 peer_info.status = ConnectionStatus::Disconnected;
1236
1237 let _ = self
1239 .event_tx
1240 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1241
1242 info!("Disconnected from peer: {}", peer_id);
1243 }
1244
1245 Ok(())
1246 }
1247
1248 pub async fn send_message(
1250 &self,
1251 peer_id: &PeerId,
1252 protocol: &str,
1253 data: Vec<u8>,
1254 ) -> Result<()> {
1255 debug!(
1256 "Sending message to peer {} on protocol {}",
1257 peer_id, protocol
1258 );
1259
1260 if let Some(ref resource_manager) = self.resource_manager
1262 && !resource_manager
1263 .check_rate_limit(peer_id, "message")
1264 .await?
1265 {
1266 return Err(P2PError::ResourceExhausted(
1267 format!("Rate limit exceeded for peer {}", peer_id).into(),
1268 ));
1269 }
1270
1271 if !self.peers.read().await.contains_key(peer_id) {
1273 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1274 peer_id.to_string().into(),
1275 )));
1276 }
1277
1278 if let Some(ref resource_manager) = self.resource_manager {
1282 resource_manager.record_bandwidth(data.len() as u64, 0);
1283 }
1284
1285 let _message_data = self.create_protocol_message(protocol, data)?;
1287
1288 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1290 tokio::time::timeout(self.config.connection_timeout, send_fut)
1291 .await
1292 .map_err(|_| {
1293 P2PError::Transport(crate::error::TransportError::StreamError(
1294 "Timed out sending message".into(),
1295 ))
1296 })?
1297 .map_err(|e| {
1298 P2PError::Transport(crate::error::TransportError::StreamError(
1299 e.to_string().into(),
1300 ))
1301 })
1302 }
1303
1304 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1306 use serde_json::json;
1307
1308 let timestamp = std::time::SystemTime::now()
1309 .duration_since(std::time::UNIX_EPOCH)
1310 .map_err(|e| {
1311 P2PError::Network(NetworkError::ProtocolError(
1312 format!("System time error: {}", e).into(),
1313 ))
1314 })?
1315 .as_secs();
1316
1317 let message = json!({
1319 "protocol": protocol,
1320 "data": data,
1321 "from": self.peer_id,
1322 "timestamp": timestamp
1323 });
1324
1325 serde_json::to_vec(&message).map_err(|e| {
1326 P2PError::Transport(crate::error::TransportError::StreamError(
1327 format!("Failed to serialize message: {e}").into(),
1328 ))
1329 })
1330 }
1331
1332 }
1334
1335#[allow(dead_code)]
1337fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1338 use serde_json::json;
1339
1340 let timestamp = std::time::SystemTime::now()
1341 .duration_since(std::time::UNIX_EPOCH)
1342 .map_err(|e| {
1343 P2PError::Network(NetworkError::ProtocolError(
1344 format!("System time error: {}", e).into(),
1345 ))
1346 })?
1347 .as_secs();
1348
1349 let message = json!({
1351 "protocol": protocol,
1352 "data": data,
1353 "timestamp": timestamp
1354 });
1355
1356 serde_json::to_vec(&message).map_err(|e| {
1357 P2PError::Transport(crate::error::TransportError::StreamError(
1358 format!("Failed to serialize message: {e}").into(),
1359 ))
1360 })
1361}
1362
1363impl P2PNode {
1364 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1366 self.event_tx.subscribe()
1367 }
1368
1369 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1371 self.subscribe_events()
1372 }
1373
1374 pub fn uptime(&self) -> Duration {
1376 self.start_time.elapsed()
1377 }
1378
1379 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1389 if let Some(ref resource_manager) = self.resource_manager {
1390 Ok(resource_manager.get_metrics().await)
1391 } else {
1392 Err(P2PError::Network(
1393 crate::error::NetworkError::ProtocolError(
1394 "Production resource manager not enabled".to_string().into(),
1395 ),
1396 ))
1397 }
1398 }
1399
1400 pub async fn health_check(&self) -> Result<()> {
1402 if let Some(ref resource_manager) = self.resource_manager {
1403 resource_manager.health_check().await
1404 } else {
1405 let peer_count = self.peer_count().await;
1407 if peer_count > self.config.max_connections {
1408 Err(P2PError::Network(
1409 crate::error::NetworkError::ProtocolError(
1410 format!("Too many connections: {peer_count}").into(),
1411 ),
1412 ))
1413 } else {
1414 Ok(())
1415 }
1416 }
1417 }
1418
1419 pub fn production_config(&self) -> Option<&ProductionConfig> {
1421 self.config.production_config.as_ref()
1422 }
1423
1424 pub fn is_production_mode(&self) -> bool {
1426 self.resource_manager.is_some()
1427 }
1428
1429 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1431 self.dht.as_ref()
1432 }
1433
1434 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1436 if let Some(ref dht) = self.dht {
1437 let mut dht_instance = dht.write().await;
1438 let dht_key = crate::dht::DhtKey::from_bytes(key);
1439 dht_instance
1440 .store(&dht_key, value.clone())
1441 .await
1442 .map_err(|e| {
1443 P2PError::Dht(crate::error::DhtError::StoreFailed(
1444 format!("{:?}: {e}", key).into(),
1445 ))
1446 })?;
1447
1448 Ok(())
1449 } else {
1450 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1451 "DHT not enabled".to_string().into(),
1452 )))
1453 }
1454 }
1455
1456 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1458 if let Some(ref dht) = self.dht {
1459 let dht_instance = dht.read().await;
1460 let dht_key = crate::dht::DhtKey::from_bytes(key);
1461 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1462 P2PError::Dht(crate::error::DhtError::StoreFailed(
1463 format!("Retrieve failed: {e}").into(),
1464 ))
1465 })?;
1466
1467 Ok(record_result)
1468 } else {
1469 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1470 "DHT not enabled".to_string().into(),
1471 )))
1472 }
1473 }
1474
1475 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1477 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1478 let mut manager = bootstrap_manager.write().await;
1479 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1480 .iter()
1481 .filter_map(|addr| addr.parse().ok())
1482 .collect();
1483 let contact = ContactEntry::new(peer_id, socket_addresses);
1484 manager.add_contact(contact).await.map_err(|e| {
1485 P2PError::Network(crate::error::NetworkError::ProtocolError(
1486 format!("Failed to add peer to bootstrap cache: {e}").into(),
1487 ))
1488 })?;
1489 }
1490 Ok(())
1491 }
1492
1493 pub async fn update_peer_metrics(
1495 &self,
1496 peer_id: &PeerId,
1497 success: bool,
1498 latency_ms: Option<u64>,
1499 _error: Option<String>,
1500 ) -> Result<()> {
1501 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1502 let mut manager = bootstrap_manager.write().await;
1503
1504 let metrics = QualityMetrics {
1506 success_rate: if success { 1.0 } else { 0.0 },
1507 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1508 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1510 last_successful_connection: if success {
1511 chrono::Utc::now()
1512 } else {
1513 chrono::Utc::now() - chrono::Duration::hours(1)
1514 },
1515 uptime_score: 0.5,
1516 };
1517
1518 manager
1519 .update_contact_metrics(peer_id, metrics)
1520 .await
1521 .map_err(|e| {
1522 P2PError::Network(crate::error::NetworkError::ProtocolError(
1523 format!("Failed to update peer metrics: {e}").into(),
1524 ))
1525 })?;
1526 }
1527 Ok(())
1528 }
1529
1530 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1532 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1533 let manager = bootstrap_manager.read().await;
1534 let stats = manager.get_stats().await.map_err(|e| {
1535 P2PError::Network(crate::error::NetworkError::ProtocolError(
1536 format!("Failed to get bootstrap stats: {e}").into(),
1537 ))
1538 })?;
1539 Ok(Some(stats))
1540 } else {
1541 Ok(None)
1542 }
1543 }
1544
1545 pub async fn cached_peer_count(&self) -> usize {
1547 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1548 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1549 {
1550 return stats.total_contacts;
1551 }
1552 0
1553 }
1554
1555 async fn connect_bootstrap_peers(&self) -> Result<()> {
1557 let mut bootstrap_contacts = Vec::new();
1558 let mut used_cache = false;
1559
1560 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1562 let manager = bootstrap_manager.read().await;
1563 match manager.get_bootstrap_peers(20).await {
1564 Ok(contacts) => {
1566 if !contacts.is_empty() {
1567 info!("Using {} cached bootstrap peers", contacts.len());
1568 bootstrap_contacts = contacts;
1569 used_cache = true;
1570 }
1571 }
1572 Err(e) => {
1573 warn!("Failed to get cached bootstrap peers: {}", e);
1574 }
1575 }
1576 }
1577
1578 if bootstrap_contacts.is_empty() {
1580 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1581 &self.config.bootstrap_peers_str
1582 } else {
1583 &self
1585 .config
1586 .bootstrap_peers
1587 .iter()
1588 .map(|addr| addr.to_string())
1589 .collect::<Vec<_>>()
1590 };
1591
1592 if bootstrap_peers.is_empty() {
1593 info!("No bootstrap peers configured and no cached peers available");
1594 return Ok(());
1595 }
1596
1597 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1598
1599 for addr in bootstrap_peers {
1600 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1601 let contact = ContactEntry::new(
1602 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1603 vec![socket_addr],
1604 );
1605 bootstrap_contacts.push(contact);
1606 } else {
1607 warn!("Invalid bootstrap address format: {}", addr);
1608 }
1609 }
1610 }
1611
1612 let mut successful_connections = 0;
1614 for contact in bootstrap_contacts {
1615 for addr in &contact.addresses {
1616 match self.connect_peer(&addr.to_string()).await {
1617 Ok(peer_id) => {
1618 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1619 successful_connections += 1;
1620
1621 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1623 let mut manager = bootstrap_manager.write().await;
1624 let mut updated_contact = contact.clone();
1625 updated_contact.peer_id = peer_id.clone();
1626 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1629 warn!("Failed to update bootstrap cache: {}", e);
1630 }
1631 }
1632 break; }
1634 Err(e) => {
1635 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1636
1637 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1639 let mut manager = bootstrap_manager.write().await;
1640 let mut updated_contact = contact.clone();
1641 updated_contact.update_connection_result(
1642 false,
1643 None,
1644 Some(e.to_string()),
1645 );
1646
1647 if let Err(e) = manager.add_contact(updated_contact).await {
1648 warn!("Failed to update bootstrap cache: {}", e);
1649 }
1650 }
1651 }
1652 }
1653 }
1654 }
1655
1656 if successful_connections == 0 {
1657 if !used_cache {
1658 warn!("Failed to connect to any bootstrap peers");
1659 }
1660 return Err(P2PError::Network(NetworkError::ConnectionFailed {
1661 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
1663 }));
1664 }
1665 info!(
1666 "Successfully connected to {} bootstrap peers",
1667 successful_connections
1668 );
1669
1670 Ok(())
1671 }
1672
1673 async fn disconnect_all_peers(&self) -> Result<()> {
1675 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
1676
1677 for peer_id in peer_ids {
1678 self.disconnect_peer(&peer_id).await?;
1679 }
1680
1681 Ok(())
1682 }
1683
1684 async fn periodic_tasks(&self) -> Result<()> {
1686 Ok(())
1692 }
1693}
1694
1695#[async_trait::async_trait]
1697pub trait NetworkSender: Send + Sync {
1698 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1700
1701 fn local_peer_id(&self) -> &PeerId;
1703}
1704
1705#[derive(Clone)]
1707pub struct P2PNetworkSender {
1708 peer_id: PeerId,
1709 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1711}
1712
1713impl P2PNetworkSender {
1714 pub fn new(
1715 peer_id: PeerId,
1716 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
1717 ) -> Self {
1718 Self { peer_id, send_tx }
1719 }
1720}
1721
1722#[async_trait::async_trait]
1724impl NetworkSender for P2PNetworkSender {
1725 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
1727 self.send_tx
1728 .send((peer_id.clone(), protocol.to_string(), data))
1729 .map_err(|_| {
1730 P2PError::Network(crate::error::NetworkError::ProtocolError(
1731 "Failed to send message via channel".to_string().into(),
1732 ))
1733 })?;
1734 Ok(())
1735 }
1736
1737 fn local_peer_id(&self) -> &PeerId {
1739 &self.peer_id
1740 }
1741}
1742
1743pub struct NodeBuilder {
1745 config: NodeConfig,
1746}
1747
1748impl Default for NodeBuilder {
1749 fn default() -> Self {
1750 Self::new()
1751 }
1752}
1753
1754impl NodeBuilder {
1755 pub fn new() -> Self {
1757 Self {
1758 config: NodeConfig::default(),
1759 }
1760 }
1761
1762 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1764 self.config.peer_id = Some(peer_id);
1765 self
1766 }
1767
1768 pub fn listen_on(mut self, addr: &str) -> Self {
1770 if let Ok(multiaddr) = addr.parse() {
1771 self.config.listen_addrs.push(multiaddr);
1772 }
1773 self
1774 }
1775
1776 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1778 if let Ok(multiaddr) = addr.parse() {
1779 self.config.bootstrap_peers.push(multiaddr);
1780 }
1781 self.config.bootstrap_peers_str.push(addr.to_string());
1782 self
1783 }
1784
1785 pub fn with_ipv6(mut self, enable: bool) -> Self {
1787 self.config.enable_ipv6 = enable;
1788 self
1789 }
1790
1791 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1795 self.config.connection_timeout = timeout;
1796 self
1797 }
1798
1799 pub fn with_max_connections(mut self, max: usize) -> Self {
1801 self.config.max_connections = max;
1802 self
1803 }
1804
1805 pub fn with_production_mode(mut self) -> Self {
1807 self.config.production_config = Some(ProductionConfig::default());
1808 self
1809 }
1810
1811 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1813 self.config.production_config = Some(production_config);
1814 self
1815 }
1816
1817 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1819 self.config.dht_config = dht_config;
1820 self
1821 }
1822
1823 pub fn with_default_dht(mut self) -> Self {
1825 self.config.dht_config = DHTConfig::default();
1826 self
1827 }
1828
1829 pub async fn build(self) -> Result<P2PNode> {
1831 P2PNode::new(self.config).await
1832 }
1833}
1834
1835#[allow(dead_code)] async fn handle_received_message_standalone(
1838 message_data: Vec<u8>,
1839 peer_id: &PeerId,
1840 _protocol: &str,
1841 event_tx: &broadcast::Sender<P2PEvent>,
1842) -> Result<()> {
1843 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1845 Ok(message) => {
1846 if let (Some(protocol), Some(data), Some(from)) = (
1847 message.get("protocol").and_then(|v| v.as_str()),
1848 message.get("data").and_then(|v| v.as_array()),
1849 message.get("from").and_then(|v| v.as_str()),
1850 ) {
1851 let data_bytes: Vec<u8> = data
1853 .iter()
1854 .filter_map(|v| v.as_u64().map(|n| n as u8))
1855 .collect();
1856
1857 let event = P2PEvent::Message {
1859 topic: protocol.to_string(),
1860 source: from.to_string(),
1861 data: data_bytes,
1862 };
1863
1864 let _ = event_tx.send(event);
1865 debug!("Generated message event from peer: {}", peer_id);
1866 }
1867 }
1868 Err(e) => {
1869 warn!("Failed to parse received message from {}: {}", peer_id, e);
1870 }
1871 }
1872
1873 Ok(())
1874}
1875
1876#[allow(dead_code)]
1880fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
1881 match create_protocol_message_static(protocol, data) {
1882 Ok(msg) => Some(msg),
1883 Err(e) => {
1884 warn!("Failed to create protocol message: {}", e);
1885 None
1886 }
1887 }
1888}
1889
1890#[allow(dead_code)]
1892async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
1893 match result {
1894 Ok(_) => {
1895 debug!("Message sent to peer {} via transport layer", peer_id);
1896 }
1897 Err(e) => {
1898 warn!("Failed to send message to peer {}: {}", peer_id, e);
1899 }
1900 }
1901}
1902
1903#[allow(dead_code)] fn check_rate_limit(
1906 rate_limiter: &RateLimiter,
1907 socket_addr: &std::net::SocketAddr,
1908 remote_addr: &NetworkAddress,
1909) -> Result<()> {
1910 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
1911 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
1912 e
1913 })
1914}
1915
1916#[allow(dead_code)] async fn register_new_peer(
1919 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1920 peer_id: &PeerId,
1921 remote_addr: &NetworkAddress,
1922) {
1923 let mut peers_guard = peers.write().await;
1924 let peer_info = PeerInfo {
1925 peer_id: peer_id.clone(),
1926 addresses: vec![remote_addr.to_string()],
1927 connected_at: tokio::time::Instant::now(),
1928 last_seen: tokio::time::Instant::now(),
1929 status: ConnectionStatus::Connected,
1930 protocols: vec!["p2p-chat/1.0.0".to_string()],
1931 heartbeat_count: 0,
1932 };
1933 peers_guard.insert(peer_id.clone(), peer_info);
1934}
1935
1936#[allow(dead_code)] fn spawn_connection_handler(
1939 connection: Box<dyn crate::transport::Connection>,
1940 peer_id: PeerId,
1941 event_tx: broadcast::Sender<P2PEvent>,
1942 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1943) {
1944 tokio::spawn(async move {
1945 handle_peer_connection(connection, peer_id, event_tx, peers).await;
1946 });
1947}
1948
1949#[allow(dead_code)] async fn handle_peer_connection(
1952 mut connection: Box<dyn crate::transport::Connection>,
1953 peer_id: PeerId,
1954 event_tx: broadcast::Sender<P2PEvent>,
1955 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1956) {
1957 loop {
1958 match connection.receive().await {
1959 Ok(message_data) => {
1960 debug!(
1961 "Received {} bytes from peer: {}",
1962 message_data.len(),
1963 peer_id
1964 );
1965
1966 if let Err(e) = handle_received_message_standalone(
1968 message_data,
1969 &peer_id,
1970 "unknown", &event_tx,
1972 )
1973 .await
1974 {
1975 warn!("Failed to handle message from peer {}: {}", peer_id, e);
1976 }
1977 }
1978 Err(e) => {
1979 warn!("Failed to receive message from {}: {}", peer_id, e);
1980
1981 if !connection.is_alive().await {
1983 info!("Connection to {} is dead, removing peer", peer_id);
1984
1985 remove_peer(&peers, &peer_id).await;
1987
1988 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
1990
1991 break; }
1993
1994 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1996 }
1997 }
1998 }
1999}
2000
2001#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2004 let mut peers_guard = peers.write().await;
2005 peers_guard.remove(peer_id);
2006}
2007
2008#[allow(dead_code)]
2010async fn update_peer_heartbeat(
2011 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2012 peer_id: &PeerId,
2013) -> Result<()> {
2014 let mut peers_guard = peers.write().await;
2015 match peers_guard.get_mut(peer_id) {
2016 Some(peer_info) => {
2017 peer_info.last_seen = Instant::now();
2018 peer_info.heartbeat_count += 1;
2019 Ok(())
2020 }
2021 None => {
2022 warn!("Received heartbeat from unknown peer: {}", peer_id);
2023 Err(P2PError::Network(NetworkError::PeerNotFound(
2024 format!("Peer {} not found", peer_id).into(),
2025 )))
2026 }
2027 }
2028}
2029
2030#[allow(dead_code)]
2032async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2033 if let Some(manager) = resource_manager {
2034 let metrics = manager.get_metrics().await;
2035 (metrics.memory_used, metrics.cpu_usage)
2036 } else {
2037 (0, 0.0)
2038 }
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043 use super::*;
2044 use std::time::Duration;
2046 use tokio::time::timeout;
2047
2048 fn create_test_node_config() -> NodeConfig {
2054 NodeConfig {
2055 peer_id: Some("test_peer_123".to_string()),
2056 listen_addrs: vec![
2057 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2058 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2059 ],
2060 listen_addr: std::net::SocketAddr::new(
2061 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2062 0,
2063 ),
2064 bootstrap_peers: vec![],
2065 bootstrap_peers_str: vec![],
2066 enable_ipv6: true,
2067
2068 connection_timeout: Duration::from_millis(300),
2069 keep_alive_interval: Duration::from_secs(30),
2070 max_connections: 100,
2071 max_incoming_connections: 50,
2072 dht_config: DHTConfig::default(),
2073 security_config: SecurityConfig::default(),
2074 production_config: None,
2075 bootstrap_cache_config: None,
2076 }
2078 }
2079
2080 #[tokio::test]
2084 async fn test_node_config_default() {
2085 let config = NodeConfig::default();
2086
2087 assert!(config.peer_id.is_none());
2088 assert_eq!(config.listen_addrs.len(), 2);
2089 assert!(config.enable_ipv6);
2090 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2092 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2093 }
2094
2095 #[tokio::test]
2096 async fn test_dht_config_default() {
2097 let config = DHTConfig::default();
2098
2099 assert_eq!(config.k_value, 20);
2100 assert_eq!(config.alpha_value, 5);
2101 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2102 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2103 }
2104
2105 #[tokio::test]
2106 async fn test_security_config_default() {
2107 let config = SecurityConfig::default();
2108
2109 assert!(config.enable_noise);
2110 assert!(config.enable_tls);
2111 assert_eq!(config.trust_level, TrustLevel::Basic);
2112 }
2113
2114 #[test]
2115 fn test_trust_level_variants() {
2116 let _none = TrustLevel::None;
2118 let _basic = TrustLevel::Basic;
2119 let _full = TrustLevel::Full;
2120
2121 assert_eq!(TrustLevel::None, TrustLevel::None);
2123 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2124 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2125 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2126 }
2127
2128 #[test]
2129 fn test_connection_status_variants() {
2130 let connecting = ConnectionStatus::Connecting;
2131 let connected = ConnectionStatus::Connected;
2132 let disconnecting = ConnectionStatus::Disconnecting;
2133 let disconnected = ConnectionStatus::Disconnected;
2134 let failed = ConnectionStatus::Failed("test error".to_string());
2135
2136 assert_eq!(connecting, ConnectionStatus::Connecting);
2137 assert_eq!(connected, ConnectionStatus::Connected);
2138 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2139 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2140 assert_ne!(connecting, connected);
2141
2142 if let ConnectionStatus::Failed(msg) = failed {
2143 assert_eq!(msg, "test error");
2144 } else {
2145 panic!("Expected Failed status");
2146 }
2147 }
2148
2149 #[tokio::test]
2150 async fn test_node_creation() -> Result<()> {
2151 let config = create_test_node_config();
2152 let node = P2PNode::new(config).await?;
2153
2154 assert_eq!(node.peer_id(), "test_peer_123");
2155 assert!(!node.is_running().await);
2156 assert_eq!(node.peer_count().await, 0);
2157 assert!(node.connected_peers().await.is_empty());
2158
2159 Ok(())
2160 }
2161
2162 #[tokio::test]
2163 async fn test_node_creation_without_peer_id() -> Result<()> {
2164 let mut config = create_test_node_config();
2165 config.peer_id = None;
2166
2167 let node = P2PNode::new(config).await?;
2168
2169 assert!(node.peer_id().starts_with("peer_"));
2171 assert!(!node.is_running().await);
2172
2173 Ok(())
2174 }
2175
2176 #[tokio::test]
2177 async fn test_node_lifecycle() -> Result<()> {
2178 let config = create_test_node_config();
2179 let node = P2PNode::new(config).await?;
2180
2181 assert!(!node.is_running().await);
2183
2184 node.start().await?;
2186 assert!(node.is_running().await);
2187
2188 let listen_addrs = node.listen_addrs().await;
2190 assert!(
2191 !listen_addrs.is_empty(),
2192 "Expected at least one listening address"
2193 );
2194
2195 node.stop().await?;
2197 assert!(!node.is_running().await);
2198
2199 Ok(())
2200 }
2201
2202 #[tokio::test]
2203 async fn test_peer_connection() -> Result<()> {
2204 let config = create_test_node_config();
2205 let node = P2PNode::new(config).await?;
2206
2207 let peer_addr = "127.0.0.1:0";
2208
2209 let peer_id = node.connect_peer(peer_addr).await?;
2211 assert!(peer_id.starts_with("peer_from_"));
2212
2213 assert_eq!(node.peer_count().await, 1);
2215
2216 let connected_peers = node.connected_peers().await;
2218 assert_eq!(connected_peers.len(), 1);
2219 assert_eq!(connected_peers[0], peer_id);
2220
2221 let peer_info = node.peer_info(&peer_id).await;
2223 assert!(peer_info.is_some());
2224 let info = peer_info.expect("Peer info should exist after adding peer");
2225 assert_eq!(info.peer_id, peer_id);
2226 assert_eq!(info.status, ConnectionStatus::Connected);
2227 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2228
2229 node.disconnect_peer(&peer_id).await?;
2231 assert_eq!(node.peer_count().await, 0);
2232
2233 Ok(())
2234 }
2235
2236 #[tokio::test]
2237 async fn test_event_subscription() -> Result<()> {
2238 let config = create_test_node_config();
2239 let node = P2PNode::new(config).await?;
2240
2241 let mut events = node.subscribe_events();
2242 let peer_addr = "127.0.0.1:0";
2243
2244 let peer_id = node.connect_peer(peer_addr).await?;
2246
2247 let event = timeout(Duration::from_millis(100), events.recv()).await;
2249 assert!(event.is_ok());
2250
2251 let event_result = event
2252 .expect("Should receive event")
2253 .expect("Event should not be error");
2254 match event_result {
2255 P2PEvent::PeerConnected(event_peer_id) => {
2256 assert_eq!(event_peer_id, peer_id);
2257 }
2258 _ => panic!("Expected PeerConnected event"),
2259 }
2260
2261 node.disconnect_peer(&peer_id).await?;
2263
2264 let event = timeout(Duration::from_millis(100), events.recv()).await;
2266 assert!(event.is_ok());
2267
2268 let event_result = event
2269 .expect("Should receive event")
2270 .expect("Event should not be error");
2271 match event_result {
2272 P2PEvent::PeerDisconnected(event_peer_id) => {
2273 assert_eq!(event_peer_id, peer_id);
2274 }
2275 _ => panic!("Expected PeerDisconnected event"),
2276 }
2277
2278 Ok(())
2279 }
2280
2281 #[tokio::test]
2282 async fn test_message_sending() -> Result<()> {
2283 let mut config1 = create_test_node_config();
2285 config1.listen_addr =
2286 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2287 let node1 = P2PNode::new(config1).await?;
2288 node1.start().await?;
2289
2290 let mut config2 = create_test_node_config();
2291 config2.listen_addr =
2292 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2293 let node2 = P2PNode::new(config2).await?;
2294 node2.start().await?;
2295
2296 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2298
2299 let node2_addr = node2.local_addr().ok_or_else(|| {
2301 P2PError::Network(crate::error::NetworkError::ProtocolError(
2302 "No listening address".to_string().into(),
2303 ))
2304 })?;
2305
2306 let peer_id =
2308 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2309 Ok(res) => res?,
2310 Err(_) => return Err(P2PError::Network(NetworkError::Timeout).into()),
2311 };
2312
2313 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2315
2316 let message_data = b"Hello, peer!".to_vec();
2318 let result = match timeout(
2319 Duration::from_millis(500),
2320 node1.send_message(&peer_id, "test-protocol", message_data),
2321 )
2322 .await
2323 {
2324 Ok(res) => res,
2325 Err(_) => return Err(P2PError::Network(NetworkError::Timeout).into()),
2326 };
2327 if let Err(e) = &result {
2330 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2331 }
2332
2333 let non_existent_peer = "non_existent_peer".to_string();
2335 let result = node1
2336 .send_message(&non_existent_peer, "test-protocol", vec![])
2337 .await;
2338 assert!(result.is_err(), "Sending to non-existent peer should fail");
2339
2340 Ok(())
2341 }
2342
2343 #[tokio::test]
2344 async fn test_remote_mcp_operations() -> Result<()> {
2345 let config = create_test_node_config();
2346 let node = P2PNode::new(config).await?;
2347
2348 node.start().await?;
2350 node.stop().await?;
2351 Ok(())
2352 }
2353
2354 #[tokio::test]
2355 async fn test_health_check() -> Result<()> {
2356 let config = create_test_node_config();
2357 let node = P2PNode::new(config).await?;
2358
2359 let result = node.health_check().await;
2361 assert!(result.is_ok());
2362
2363 Ok(())
2368 }
2369
2370 #[tokio::test]
2371 async fn test_node_uptime() -> Result<()> {
2372 let config = create_test_node_config();
2373 let node = P2PNode::new(config).await?;
2374
2375 let uptime1 = node.uptime();
2376 assert!(uptime1 >= Duration::from_secs(0));
2377
2378 tokio::time::sleep(Duration::from_millis(10)).await;
2380
2381 let uptime2 = node.uptime();
2382 assert!(uptime2 > uptime1);
2383
2384 Ok(())
2385 }
2386
2387 #[tokio::test]
2388 async fn test_node_config_access() -> Result<()> {
2389 let config = create_test_node_config();
2390 let expected_peer_id = config.peer_id.clone();
2391 let node = P2PNode::new(config).await?;
2392
2393 let node_config = node.config();
2394 assert_eq!(node_config.peer_id, expected_peer_id);
2395 assert_eq!(node_config.max_connections, 100);
2396 Ok(())
2399 }
2400
2401 #[tokio::test]
2402 async fn test_mcp_server_access() -> Result<()> {
2403 let config = create_test_node_config();
2404 let _node = P2PNode::new(config).await?;
2405
2406 Ok(())
2408 }
2409
2410 #[tokio::test]
2411 async fn test_dht_access() -> Result<()> {
2412 let config = create_test_node_config();
2413 let node = P2PNode::new(config).await?;
2414
2415 assert!(node.dht().is_some());
2417
2418 Ok(())
2419 }
2420
2421 #[tokio::test]
2422 async fn test_node_builder() -> Result<()> {
2423 let builder = P2PNode::builder()
2425 .with_peer_id("builder_test_peer".to_string())
2426 .listen_on("/ip4/127.0.0.1/tcp/0")
2427 .listen_on("/ip6/::1/tcp/0")
2428 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2430 .with_connection_timeout(Duration::from_secs(15))
2431 .with_max_connections(200);
2432
2433 let config = builder.config;
2435 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2436 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2439 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2440 assert_eq!(config.max_connections, 200);
2441
2442 Ok(())
2443 }
2444
2445 #[tokio::test]
2446 async fn test_bootstrap_peers() -> Result<()> {
2447 let mut config = create_test_node_config();
2448 config.bootstrap_peers = vec![
2449 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2450 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2451 ];
2452
2453 let node = P2PNode::new(config).await?;
2454
2455 node.start().await?;
2457
2458 let peer_count = node.peer_count().await;
2461 assert!(
2462 peer_count <= 2,
2463 "Peer count should not exceed bootstrap peer count"
2464 );
2465
2466 node.stop().await?;
2467 Ok(())
2468 }
2469
2470 #[tokio::test]
2471 async fn test_production_mode_disabled() -> Result<()> {
2472 let config = create_test_node_config();
2473 let node = P2PNode::new(config).await?;
2474
2475 assert!(!node.is_production_mode());
2476 assert!(node.production_config().is_none());
2477
2478 let result = node.resource_metrics().await;
2480 assert!(result.is_err());
2481 assert!(result.unwrap_err().to_string().contains("not enabled"));
2482
2483 Ok(())
2484 }
2485
2486 #[tokio::test]
2487 async fn test_network_event_variants() {
2488 let peer_id = "test_peer".to_string();
2490 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2491
2492 let _peer_connected = NetworkEvent::PeerConnected {
2493 peer_id: peer_id.clone(),
2494 addresses: vec![address.clone()],
2495 };
2496
2497 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2498 peer_id: peer_id.clone(),
2499 reason: "test disconnect".to_string(),
2500 };
2501
2502 let _message_received = NetworkEvent::MessageReceived {
2503 peer_id: peer_id.clone(),
2504 protocol: "test-protocol".to_string(),
2505 data: vec![1, 2, 3],
2506 };
2507
2508 let _connection_failed = NetworkEvent::ConnectionFailed {
2509 peer_id: Some(peer_id.clone()),
2510 address: address.clone(),
2511 error: "connection refused".to_string(),
2512 };
2513
2514 let _dht_stored = NetworkEvent::DHTRecordStored {
2515 key: vec![1, 2, 3],
2516 value: vec![4, 5, 6],
2517 };
2518
2519 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2520 key: vec![1, 2, 3],
2521 value: Some(vec![4, 5, 6]),
2522 };
2523 }
2524
2525 #[tokio::test]
2526 async fn test_peer_info_structure() {
2527 let peer_info = PeerInfo {
2528 peer_id: "test_peer".to_string(),
2529 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2530 connected_at: Instant::now(),
2531 last_seen: Instant::now(),
2532 status: ConnectionStatus::Connected,
2533 protocols: vec!["test-protocol".to_string()],
2534 heartbeat_count: 0,
2535 };
2536
2537 assert_eq!(peer_info.peer_id, "test_peer");
2538 assert_eq!(peer_info.addresses.len(), 1);
2539 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2540 assert_eq!(peer_info.protocols.len(), 1);
2541 }
2542
2543 #[tokio::test]
2544 async fn test_serialization() -> Result<()> {
2545 let config = create_test_node_config();
2547 let serialized = serde_json::to_string(&config)?;
2548 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2549
2550 assert_eq!(config.peer_id, deserialized.peer_id);
2551 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2552 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2553
2554 Ok(())
2555 }
2556}