1use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
20use crate::config::Config;
21use crate::dht::DHT;
22use crate::error::{NetworkError, P2PError, P2pResult as Result};
23
24use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
25use crate::transport::ant_quic_adapter::DualStackNetworkNode;
26#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
28use crate::validation::RateLimitConfig;
29use crate::validation::RateLimiter;
30use crate::{NetworkAddress, PeerId};
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use std::collections::{HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::{RwLock, broadcast};
38use tokio::time::Instant;
39use tracing::{debug, info, trace, warn};
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeConfig {
44 pub peer_id: Option<PeerId>,
46
47 pub listen_addrs: Vec<std::net::SocketAddr>,
49
50 pub listen_addr: std::net::SocketAddr,
52
53 pub bootstrap_peers: Vec<std::net::SocketAddr>,
55
56 pub bootstrap_peers_str: Vec<String>,
58
59 pub enable_ipv6: bool,
61
62 pub connection_timeout: Duration,
65
66 pub keep_alive_interval: Duration,
68
69 pub max_connections: usize,
71
72 pub max_incoming_connections: usize,
74
75 pub dht_config: DHTConfig,
77
78 pub security_config: SecurityConfig,
80
81 pub production_config: Option<ProductionConfig>,
83
84 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct DHTConfig {
91 pub k_value: usize,
93
94 pub alpha_value: usize,
96
97 pub record_ttl: Duration,
99
100 pub refresh_interval: Duration,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct SecurityConfig {
107 pub enable_noise: bool,
109
110 pub enable_tls: bool,
112
113 pub trust_level: TrustLevel,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
119pub enum TrustLevel {
120 None,
122 Basic,
124 Full,
126}
127
128impl NodeConfig {
129 pub fn new() -> Result<Self> {
135 let config = Config::default();
137
138 let listen_addr = config.listen_socket_addr()?;
140
141 let mut listen_addrs = vec![];
143
144 if config.network.ipv6_enabled {
146 let ipv6_addr = std::net::SocketAddr::new(
147 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
148 listen_addr.port(),
149 );
150 listen_addrs.push(ipv6_addr);
151 }
152
153 let ipv4_addr = std::net::SocketAddr::new(
155 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
156 listen_addr.port(),
157 );
158 listen_addrs.push(ipv4_addr);
159
160 Ok(Self {
161 peer_id: None,
162 listen_addrs,
163 listen_addr,
164 bootstrap_peers: Vec::new(),
165 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
166 enable_ipv6: config.network.ipv6_enabled,
167
168 connection_timeout: Duration::from_secs(config.network.connection_timeout),
169 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
170 max_connections: config.network.max_connections,
171 max_incoming_connections: config.security.connection_limit as usize,
172 dht_config: DHTConfig::default(),
173 security_config: SecurityConfig::default(),
174 production_config: None,
175 bootstrap_cache_config: None,
176 })
178 }
179}
180
181impl Default for NodeConfig {
182 fn default() -> Self {
183 let config = Config::default();
185
186 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
188 std::net::SocketAddr::new(
189 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
190 9000,
191 )
192 });
193
194 Self {
195 peer_id: None,
196 listen_addrs: vec![
197 std::net::SocketAddr::new(
198 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
199 listen_addr.port(),
200 ),
201 std::net::SocketAddr::new(
202 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
203 listen_addr.port(),
204 ),
205 ],
206 listen_addr,
207 bootstrap_peers: Vec::new(),
208 bootstrap_peers_str: Vec::new(),
209 enable_ipv6: config.network.ipv6_enabled,
210
211 connection_timeout: Duration::from_secs(config.network.connection_timeout),
212 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
213 max_connections: config.network.max_connections,
214 max_incoming_connections: config.security.connection_limit as usize,
215 dht_config: DHTConfig::default(),
216 security_config: SecurityConfig::default(),
217 production_config: None, bootstrap_cache_config: None,
219 }
221 }
222}
223
224impl NodeConfig {
225 pub fn from_config(config: &Config) -> Result<Self> {
227 let listen_addr = config.listen_socket_addr()?;
228 let bootstrap_addrs = config.bootstrap_addrs()?;
229
230 let mut node_config = Self {
231 peer_id: None,
232 listen_addrs: vec![listen_addr],
233 listen_addr,
234 bootstrap_peers: bootstrap_addrs
235 .iter()
236 .map(|addr| addr.socket_addr())
237 .collect(),
238 bootstrap_peers_str: config
239 .network
240 .bootstrap_nodes
241 .iter()
242 .map(|addr| addr.to_string())
243 .collect(),
244 enable_ipv6: config.network.ipv6_enabled,
245
246 connection_timeout: Duration::from_secs(config.network.connection_timeout),
247 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
248 max_connections: config.network.max_connections,
249 max_incoming_connections: config.security.connection_limit as usize,
250 dht_config: DHTConfig {
251 k_value: 20,
252 alpha_value: 3,
253 record_ttl: Duration::from_secs(3600),
254 refresh_interval: Duration::from_secs(900),
255 },
256 security_config: SecurityConfig {
257 enable_noise: true,
258 enable_tls: true,
259 trust_level: TrustLevel::Basic,
260 },
261 production_config: Some(ProductionConfig {
262 max_connections: config.network.max_connections,
263 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
266 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
267 health_check_interval: Duration::from_secs(30),
268 metrics_interval: Duration::from_secs(60),
269 enable_performance_tracking: true,
270 enable_auto_cleanup: true,
271 shutdown_timeout: Duration::from_secs(30),
272 rate_limits: crate::production::RateLimitConfig::default(),
273 }),
274 bootstrap_cache_config: None,
275 };
280
281 if config.network.ipv6_enabled {
283 node_config.listen_addrs.push(std::net::SocketAddr::new(
284 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
285 listen_addr.port(),
286 ));
287 }
288
289 Ok(node_config)
290 }
291
292 pub fn with_listen_addr(addr: &str) -> Result<Self> {
294 let listen_addr: std::net::SocketAddr = addr
295 .parse()
296 .map_err(|e: std::net::AddrParseError| {
297 NetworkError::InvalidAddress(e.to_string().into())
298 })
299 .map_err(P2PError::Network)?;
300 let cfg = NodeConfig {
301 listen_addr,
302 listen_addrs: vec![listen_addr],
303 ..Default::default()
304 };
305 Ok(cfg)
306 }
307}
308
309impl Default for DHTConfig {
310 fn default() -> Self {
311 Self {
312 k_value: 20,
313 alpha_value: 5,
314 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
317 }
318}
319
320impl Default for SecurityConfig {
321 fn default() -> Self {
322 Self {
323 enable_noise: true,
324 enable_tls: true,
325 trust_level: TrustLevel::Basic,
326 }
327 }
328}
329
330#[derive(Debug, Clone)]
332pub struct PeerInfo {
333 pub peer_id: PeerId,
335
336 pub addresses: Vec<String>,
338
339 pub connected_at: Instant,
341
342 pub last_seen: Instant,
344
345 pub status: ConnectionStatus,
347
348 pub protocols: Vec<String>,
350
351 pub heartbeat_count: u64,
353}
354
355#[derive(Debug, Clone, PartialEq)]
357pub enum ConnectionStatus {
358 Connecting,
360 Connected,
362 Disconnecting,
364 Disconnected,
366 Failed(String),
368}
369
370#[derive(Debug, Clone)]
372pub enum NetworkEvent {
373 PeerConnected {
375 peer_id: PeerId,
377 addresses: Vec<String>,
379 },
380
381 PeerDisconnected {
383 peer_id: PeerId,
385 reason: String,
387 },
388
389 MessageReceived {
391 peer_id: PeerId,
393 protocol: String,
395 data: Vec<u8>,
397 },
398
399 ConnectionFailed {
401 peer_id: Option<PeerId>,
403 address: String,
405 error: String,
407 },
408
409 DHTRecordStored {
411 key: Vec<u8>,
413 value: Vec<u8>,
415 },
416
417 DHTRecordRetrieved {
419 key: Vec<u8>,
421 value: Option<Vec<u8>>,
423 },
424}
425
426#[derive(Debug, Clone)]
431pub enum P2PEvent {
432 Message {
434 topic: String,
436 source: PeerId,
438 data: Vec<u8>,
440 },
441 PeerConnected(PeerId),
443 PeerDisconnected(PeerId),
445}
446
447pub struct P2PNode {
457 config: NodeConfig,
459
460 peer_id: PeerId,
462
463 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
465
466 event_tx: broadcast::Sender<P2PEvent>,
468
469 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
471
472 start_time: Instant,
474
475 running: RwLock<bool>,
477
478 dht: Option<Arc<RwLock<DHT>>>,
480
481 resource_manager: Option<Arc<ResourceManager>>,
483
484 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
486
487 dual_node: Arc<DualStackNetworkNode>,
489
490 #[allow(dead_code)]
492 rate_limiter: Arc<RateLimiter>,
493
494 active_connections: Arc<RwLock<HashSet<PeerId>>>,
497
498 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
500
501 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
503
504 shutdown: Arc<AtomicBool>,
506}
507
508fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
524 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
525
526 if addr.ip().is_unspecified() {
527 let loopback_ip = match addr {
529 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
532 std::net::SocketAddr::new(loopback_ip, addr.port())
533 } else {
534 addr
536 }
537}
538
539impl P2PNode {
540 pub fn new_for_tests() -> Result<Self> {
542 let (event_tx, _) = broadcast::channel(16);
543 Ok(Self {
544 config: NodeConfig::default(),
545 peer_id: "test_peer".to_string(),
546 peers: Arc::new(RwLock::new(HashMap::new())),
547 event_tx,
548 listen_addrs: RwLock::new(Vec::new()),
549 start_time: Instant::now(),
550 running: RwLock::new(false),
551 dht: None,
552 resource_manager: None,
553 bootstrap_manager: None,
554 dual_node: {
555 let v6: Option<std::net::SocketAddr> = "[::1]:0"
557 .parse()
558 .ok()
559 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
560 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
561 let handle = tokio::runtime::Handle::current();
562 let dual_attempt = handle.block_on(
563 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
564 );
565 let dual = match dual_attempt {
566 Ok(d) => d,
567 Err(_e1) => {
568 let fallback = handle.block_on(
570 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
571 None,
572 "127.0.0.1:0".parse().ok(),
573 ),
574 );
575 match fallback {
576 Ok(d) => d,
577 Err(e2) => {
578 return Err(P2PError::Network(NetworkError::BindError(
579 format!("Failed to create dual-stack network node: {}", e2)
580 .into(),
581 )));
582 }
583 }
584 }
585 };
586 Arc::new(dual)
587 },
588 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
589 max_requests: 100,
590 burst_size: 100,
591 window: std::time::Duration::from_secs(1),
592 ..Default::default()
593 })),
594 active_connections: Arc::new(RwLock::new(HashSet::new())),
595 connection_monitor_handle: Arc::new(RwLock::new(None)),
596 keepalive_handle: Arc::new(RwLock::new(None)),
597 shutdown: Arc::new(AtomicBool::new(false)),
598 })
599 }
600 pub async fn new(config: NodeConfig) -> Result<Self> {
602 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
603 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
605 });
606
607 let (event_tx, _) = broadcast::channel(1000);
608
609 {
612 use blake3::Hasher;
613 let mut hasher = Hasher::new();
614 hasher.update(peer_id.as_bytes());
615 let digest = hasher.finalize();
616 let mut nid = [0u8; 32];
617 nid.copy_from_slice(digest.as_bytes());
618 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
619 crate::identity::node_identity::NodeId::from_bytes(nid),
620 ));
621 }
624
625 let dht = if true {
627 let _dht_config = crate::dht::DHTConfig {
629 replication_factor: config.dht_config.k_value,
630 bucket_size: config.dht_config.k_value,
631 alpha: config.dht_config.alpha_value,
632 record_ttl: config.dht_config.record_ttl,
633 bucket_refresh_interval: config.dht_config.refresh_interval,
634 republish_interval: config.dht_config.refresh_interval,
635 max_distance: 160, };
637 let peer_bytes = peer_id.as_bytes();
639 let mut node_id_bytes = [0u8; 32];
640 let len = peer_bytes.len().min(32);
641 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
642 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
643 let dht_instance = DHT::new(node_id).map_err(|e| {
644 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
645 e.to_string().into(),
646 ))
647 })?;
648 Some(Arc::new(RwLock::new(dht_instance)))
649 } else {
650 None
651 };
652
653 let resource_manager = config
657 .production_config
658 .clone()
659 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
660
661 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
663 match BootstrapManager::with_config(cache_config.clone()).await {
664 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
665 Err(e) => {
666 warn!(
667 "Failed to initialize bootstrap manager: {}, continuing without cache",
668 e
669 );
670 None
671 }
672 }
673 } else {
674 match BootstrapManager::new().await {
675 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
676 Err(e) => {
677 warn!(
678 "Failed to initialize bootstrap manager: {}, continuing without cache",
679 e
680 );
681 None
682 }
683 }
684 };
685
686 let (v6_opt, v4_opt) = if !config.listen_addrs.is_empty() {
688 let v6_addr = config.listen_addrs.iter().find(|a| a.is_ipv6()).cloned();
689 let v4_addr = config.listen_addrs.iter().find(|a| a.is_ipv4()).cloned();
690 (v6_addr, v4_addr)
691 } else {
692 let v4_addr = Some(std::net::SocketAddr::new(
694 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
695 config.listen_addr.port(),
696 ));
697 let v6_addr = if config.enable_ipv6 {
698 Some(std::net::SocketAddr::new(
699 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
700 config.listen_addr.port(),
701 ))
702 } else {
703 None
704 };
705 (v6_addr, v4_addr)
706 };
707
708 let dual_node = Arc::new(
709 DualStackNetworkNode::new(v6_opt, v4_opt)
710 .await
711 .map_err(|e| {
712 P2PError::Transport(crate::error::TransportError::SetupFailed(
713 format!("Failed to create dual-stack network nodes: {}", e).into(),
714 ))
715 })?,
716 );
717
718 let rate_limiter = Arc::new(RateLimiter::new(
720 crate::validation::RateLimitConfig::default(),
721 ));
722
723 let active_connections = Arc::new(RwLock::new(HashSet::new()));
725
726 let connection_monitor_handle = {
728 let active_conns = Arc::clone(&active_connections);
729 let peers_map = Arc::new(RwLock::new(HashMap::new())); let event_tx_clone = event_tx.clone();
731 let dual_node_clone = Arc::clone(&dual_node);
732
733 let handle = tokio::spawn(async move {
734 Self::connection_lifecycle_monitor(
735 dual_node_clone,
736 active_conns,
737 peers_map,
738 event_tx_clone,
739 ).await;
740 });
741
742 Arc::new(RwLock::new(Some(handle)))
743 };
744
745 let shutdown = Arc::new(AtomicBool::new(false));
747 let keepalive_handle = {
748 let active_conns = Arc::clone(&active_connections);
749 let dual_node_clone = Arc::clone(&dual_node);
750 let shutdown_clone = Arc::clone(&shutdown);
751
752 let handle = tokio::spawn(async move {
753 Self::keepalive_task(
754 active_conns,
755 dual_node_clone,
756 shutdown_clone,
757 ).await;
758 });
759
760 Arc::new(RwLock::new(Some(handle)))
761 };
762
763 let node = Self {
764 config,
765 peer_id,
766 peers: Arc::new(RwLock::new(HashMap::new())),
767 event_tx,
768 listen_addrs: RwLock::new(Vec::new()),
769 start_time: Instant::now(),
770 running: RwLock::new(false),
771 dht,
772 resource_manager,
773 bootstrap_manager,
774 dual_node,
775 rate_limiter,
776 active_connections,
777 connection_monitor_handle,
778 keepalive_handle,
779 shutdown,
780 };
781 info!("Created P2P node with peer ID: {}", node.peer_id);
782
783 node.start_network_listeners().await?;
785
786 node.start_connection_monitor().await;
788
789 Ok(node)
790 }
791
792 pub fn builder() -> NodeBuilder {
794 NodeBuilder::new()
795 }
796
797 pub fn peer_id(&self) -> &PeerId {
799 &self.peer_id
800 }
801
802 pub fn local_addr(&self) -> Option<String> {
803 self.listen_addrs
804 .try_read()
805 .ok()
806 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
807 }
808
809 pub async fn subscribe(&self, topic: &str) -> Result<()> {
810 info!("Subscribed to topic: {}", topic);
813 Ok(())
814 }
815
816 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
817 info!(
818 "Publishing message to topic: {} ({} bytes)",
819 topic,
820 data.len()
821 );
822
823 let peer_list: Vec<PeerId> = {
825 let peers_guard = self.peers.read().await;
826 peers_guard.keys().cloned().collect()
827 };
828
829 if peer_list.is_empty() {
830 debug!("No peers connected, message will only be sent to local subscribers");
831 } else {
832 let mut send_count = 0;
834 for peer_id in &peer_list {
835 match self.send_message(peer_id, topic, data.to_vec()).await {
836 Ok(_) => {
837 send_count += 1;
838 debug!("Sent message to peer: {}", peer_id);
839 }
840 Err(e) => {
841 warn!("Failed to send message to peer {}: {}", peer_id, e);
842 }
843 }
844 }
845 info!(
846 "Published message to {}/{} connected peers",
847 send_count,
848 peer_list.len()
849 );
850 }
851
852 let event = P2PEvent::Message {
854 topic: topic.to_string(),
855 source: self.peer_id.clone(),
856 data: data.to_vec(),
857 };
858 let _ = self.event_tx.send(event);
859
860 Ok(())
861 }
862
863 pub fn config(&self) -> &NodeConfig {
865 &self.config
866 }
867
868 pub async fn start(&self) -> Result<()> {
870 info!("Starting P2P node...");
871
872 if let Some(ref resource_manager) = self.resource_manager {
874 resource_manager.start().await.map_err(|e| {
875 P2PError::Network(crate::error::NetworkError::ProtocolError(
876 format!("Failed to start resource manager: {e}").into(),
877 ))
878 })?;
879 info!("Production resource manager started");
880 }
881
882 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
884 let mut manager = bootstrap_manager.write().await;
885 manager.start_background_tasks().await.map_err(|e| {
886 P2PError::Network(crate::error::NetworkError::ProtocolError(
887 format!("Failed to start bootstrap manager: {e}").into(),
888 ))
889 })?;
890 info!("Bootstrap cache manager started");
891 }
892
893 *self.running.write().await = true;
895
896 self.start_network_listeners().await?;
898
899 let listen_addrs = self.listen_addrs.read().await;
901 info!("P2P node started on addresses: {:?}", *listen_addrs);
902
903 self.start_message_receiving_system().await?;
907
908 self.connect_bootstrap_peers().await?;
910
911 Ok(())
912 }
913
914 async fn start_network_listeners(&self) -> Result<()> {
916 info!("Starting dual-stack listeners (ant-quic)...");
917 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
919 P2PError::Transport(crate::error::TransportError::SetupFailed(
920 format!("Failed to get local addresses: {}", e).into(),
921 ))
922 })?;
923 {
924 let mut la = self.listen_addrs.write().await;
925 *la = addrs.clone();
926 }
927
928 let event_tx = self.event_tx.clone();
930 let peers = self.peers.clone();
931 let rate_limiter = self.rate_limiter.clone();
932 let dual = self.dual_node.clone();
933 tokio::spawn(async move {
934 loop {
935 match dual.accept_any().await {
936 Ok((ant_peer_id, remote_sock)) => {
937 let peer_id =
938 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
939 let remote_addr = NetworkAddress::from(remote_sock);
940 let _ = rate_limiter.check_ip(&remote_sock.ip());
942 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
943 register_new_peer(&peers, &peer_id, &remote_addr).await;
944 }
945 Err(e) => {
946 warn!("Accept failed: {}", e);
947 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
948 }
949 }
950 }
951 });
952
953 info!("Dual-stack listeners active on: {:?}", addrs);
954 Ok(())
955 }
956
957 #[allow(dead_code)]
959 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
960 warn!("QUIC transport temporarily disabled during ant-quic migration");
999 Err(crate::P2PError::Transport(
1001 crate::error::TransportError::SetupFailed(
1002 format!(
1003 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1004 )
1005 .into(),
1006 ),
1007 ))
1008 }
1009
1010 #[allow(dead_code)] async fn start_connection_acceptor(
1013 &self,
1014 transport: Arc<dyn crate::transport::Transport>,
1015 addr: std::net::SocketAddr,
1016 transport_type: crate::transport::TransportType,
1017 ) -> Result<()> {
1018 info!(
1019 "Starting connection acceptor for {:?} on {}",
1020 transport_type, addr
1021 );
1022
1023 let event_tx = self.event_tx.clone();
1025 let _peer_id = self.peer_id.clone();
1026 let peers = Arc::clone(&self.peers);
1027 let rate_limiter = Arc::clone(&self.rate_limiter);
1030
1031 tokio::spawn(async move {
1033 loop {
1034 match transport.accept().await {
1035 Ok(connection) => {
1036 let remote_addr = connection.remote_addr();
1037 let connection_peer_id =
1038 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1039
1040 let socket_addr = remote_addr.socket_addr();
1042 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1043 continue;
1045 }
1046
1047 info!(
1048 "Accepted {:?} connection from {} (peer: {})",
1049 transport_type, remote_addr, connection_peer_id
1050 );
1051
1052 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1054
1055 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1057
1058 spawn_connection_handler(
1060 connection,
1061 connection_peer_id,
1062 event_tx.clone(),
1063 Arc::clone(&peers),
1064 );
1065 }
1066 Err(e) => {
1067 warn!(
1068 "Failed to accept {:?} connection on {}: {}",
1069 transport_type, addr, e
1070 );
1071
1072 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1074 }
1075 }
1076 }
1077 });
1078
1079 info!(
1080 "Connection acceptor background task started for {:?} on {}",
1081 transport_type, addr
1082 );
1083 Ok(())
1084 }
1085
1086 async fn start_message_receiving_system(&self) -> Result<()> {
1088 info!("Starting message receiving system");
1089 let dual = self.dual_node.clone();
1090 let event_tx = self.event_tx.clone();
1091
1092 tokio::spawn(async move {
1093 loop {
1094 match dual.receive_any().await {
1095 Ok((_peer_id, bytes)) => {
1096 #[allow(clippy::collapsible_if)]
1098 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1099 if let (Some(protocol), Some(data), Some(from)) = (
1100 value.get("protocol").and_then(|v| v.as_str()),
1101 value.get("data").and_then(|v| v.as_array()),
1102 value.get("from").and_then(|v| v.as_str()),
1103 ) {
1104 let payload: Vec<u8> = data
1105 .iter()
1106 .filter_map(|v| v.as_u64().map(|n| n as u8))
1107 .collect();
1108 let _ = event_tx.send(P2PEvent::Message {
1109 topic: protocol.to_string(),
1110 source: from.to_string(),
1111 data: payload,
1112 });
1113 }
1114 }
1115 }
1116 Err(e) => {
1117 warn!("Receive error: {}", e);
1118 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1119 }
1120 }
1121 }
1122 });
1123
1124 Ok(())
1125 }
1126
1127 #[allow(dead_code)]
1129 async fn handle_received_message(
1130 &self,
1131 message_data: Vec<u8>,
1132 peer_id: &PeerId,
1133 _protocol: &str,
1134 event_tx: &broadcast::Sender<P2PEvent>,
1135 ) -> Result<()> {
1136 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1140 Ok(message) => {
1141 if let (Some(protocol), Some(data), Some(from)) = (
1142 message.get("protocol").and_then(|v| v.as_str()),
1143 message.get("data").and_then(|v| v.as_array()),
1144 message.get("from").and_then(|v| v.as_str()),
1145 ) {
1146 let data_bytes: Vec<u8> = data
1148 .iter()
1149 .filter_map(|v| v.as_u64().map(|n| n as u8))
1150 .collect();
1151
1152 let event = P2PEvent::Message {
1154 topic: protocol.to_string(),
1155 source: from.to_string(),
1156 data: data_bytes,
1157 };
1158
1159 let _ = event_tx.send(event);
1160 debug!("Generated message event from peer: {}", peer_id);
1161 }
1162 }
1163 Err(e) => {
1164 warn!("Failed to parse received message from {}: {}", peer_id, e);
1165 }
1166 }
1167
1168 Ok(())
1169 }
1170
1171 pub async fn run(&self) -> Result<()> {
1177 if !*self.running.read().await {
1178 self.start().await?;
1179 }
1180
1181 info!("P2P node running...");
1182
1183 loop {
1185 if !*self.running.read().await {
1186 break;
1187 }
1188
1189 self.periodic_tasks().await?;
1191
1192 tokio::time::sleep(Duration::from_millis(100)).await;
1194 }
1195
1196 info!("P2P node stopped");
1197 Ok(())
1198 }
1199
1200 pub async fn stop(&self) -> Result<()> {
1202 info!("Stopping P2P node...");
1203
1204 *self.running.write().await = false;
1206
1207 self.disconnect_all_peers().await?;
1209
1210 if let Some(ref resource_manager) = self.resource_manager {
1212 resource_manager.shutdown().await.map_err(|e| {
1213 P2PError::Network(crate::error::NetworkError::ProtocolError(
1214 format!("Failed to shutdown resource manager: {e}").into(),
1215 ))
1216 })?;
1217 info!("Production resource manager stopped");
1218 }
1219
1220 info!("P2P node stopped");
1221 Ok(())
1222 }
1223
1224 pub async fn shutdown(&self) -> Result<()> {
1226 self.stop().await
1227 }
1228
1229 pub async fn is_running(&self) -> bool {
1231 *self.running.read().await
1232 }
1233
1234 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1236 self.listen_addrs.read().await.clone()
1237 }
1238
1239 pub async fn connected_peers(&self) -> Vec<PeerId> {
1241 self.peers.read().await.keys().cloned().collect()
1242 }
1243
1244 pub async fn peer_count(&self) -> usize {
1246 self.peers.read().await.len()
1247 }
1248
1249 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1251 self.peers.read().await.get(peer_id).cloned()
1252 }
1253
1254 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1266 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1268
1269 let peers = self.peers.read().await;
1270
1271 for (peer_id, peer_info) in peers.iter() {
1273 for peer_addr in &peer_info.addresses {
1275 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1276 && peer_socket == socket_addr
1277 {
1278 return Some(peer_id.clone());
1279 }
1280 }
1281 }
1282
1283 None
1284 }
1285
1286 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1292 let peers = self.peers.read().await;
1293
1294 peers
1295 .iter()
1296 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1297 .collect()
1298 }
1299
1300 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1312 self.active_connections.write().await.remove(peer_id);
1314 self.peers.write().await.remove(peer_id).is_some()
1316 }
1317
1318 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1331 self.peers.read().await.contains_key(peer_id)
1332 }
1333
1334 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1336 info!("Connecting to peer at: {}", address);
1337
1338 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1340 Some(resource_manager.acquire_connection().await?)
1341 } else {
1342 None
1343 };
1344
1345 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1347 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1348 format!("{}: {}", address, e).into(),
1349 ))
1350 })?;
1351
1352 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1355 if normalized_addr != socket_addr {
1356 info!(
1357 "Normalized wildcard address {} to loopback {}",
1358 socket_addr, normalized_addr
1359 );
1360 }
1361
1362 let addr_list = vec![normalized_addr];
1364 let peer_id = match tokio::time::timeout(
1365 self.config.connection_timeout,
1366 self.dual_node.connect_happy_eyeballs(&addr_list),
1367 )
1368 .await
1369 {
1370 Ok(Ok(peer)) => {
1371 let connected_peer_id =
1372 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1373 info!("Successfully connected to peer: {}", connected_peer_id);
1374 connected_peer_id
1375 }
1376 Ok(Err(e)) => {
1377 warn!("Failed to connect to peer at {}: {}", address, e);
1378 let sanitized_address = address.replace(['/', ':'], "_");
1379 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1380 warn!(
1381 "Using demo peer ID: {} (transport connection failed)",
1382 demo_peer_id
1383 );
1384 demo_peer_id
1385 }
1386 Err(_) => {
1387 warn!(
1388 "Timed out connecting to peer at {} after {:?}",
1389 address, self.config.connection_timeout
1390 );
1391 let sanitized_address = address.replace(['/', ':'], "_");
1392 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1393 demo_peer_id
1394 }
1395 };
1396
1397 let peer_info = PeerInfo {
1399 peer_id: peer_id.clone(),
1400 addresses: vec![address.to_string()],
1401 connected_at: Instant::now(),
1402 last_seen: Instant::now(),
1403 status: ConnectionStatus::Connected,
1404 protocols: vec!["p2p-foundation/1.0".to_string()],
1405 heartbeat_count: 0,
1406 };
1407
1408 self.peers.write().await.insert(peer_id.clone(), peer_info);
1410
1411 if let Some(ref resource_manager) = self.resource_manager {
1413 resource_manager.record_bandwidth(0, 0); }
1415
1416 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1418
1419 info!("Connected to peer: {}", peer_id);
1420 Ok(peer_id)
1421 }
1422
1423 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1425 info!("Disconnecting from peer: {}", peer_id);
1426
1427 self.active_connections.write().await.remove(peer_id);
1429
1430 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1431 peer_info.status = ConnectionStatus::Disconnected;
1432
1433 let _ = self
1435 .event_tx
1436 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1437
1438 info!("Disconnected from peer: {}", peer_id);
1439 }
1440
1441 Ok(())
1442 }
1443
1444 pub async fn is_connection_active(&self, peer_id: &PeerId) -> bool {
1446 self.active_connections.read().await.contains(peer_id)
1447 }
1448
1449 pub async fn send_message(
1451 &self,
1452 peer_id: &PeerId,
1453 protocol: &str,
1454 data: Vec<u8>,
1455 ) -> Result<()> {
1456 debug!(
1457 "Sending message to peer {} on protocol {}",
1458 peer_id, protocol
1459 );
1460
1461 if let Some(ref resource_manager) = self.resource_manager
1463 && !resource_manager
1464 .check_rate_limit(peer_id, "message")
1465 .await?
1466 {
1467 return Err(P2PError::ResourceExhausted(
1468 format!("Rate limit exceeded for peer {}", peer_id).into(),
1469 ));
1470 }
1471
1472 if !self.peers.read().await.contains_key(peer_id) {
1474 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1475 peer_id.to_string().into(),
1476 )));
1477 }
1478
1479 if !self.is_connection_active(peer_id).await {
1482 debug!(
1483 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1484 peer_id
1485 );
1486
1487 self.remove_peer(peer_id).await;
1489
1490 return Err(P2PError::Network(crate::error::NetworkError::ConnectionClosed {
1491 peer_id: peer_id.to_string().into(),
1492 }));
1493 }
1494
1495 if let Some(ref resource_manager) = self.resource_manager {
1499 resource_manager.record_bandwidth(data.len() as u64, 0);
1500 }
1501
1502 let _message_data = self.create_protocol_message(protocol, data)?;
1504
1505 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1507 tokio::time::timeout(self.config.connection_timeout, send_fut)
1508 .await
1509 .map_err(|_| {
1510 P2PError::Transport(crate::error::TransportError::StreamError(
1511 "Timed out sending message".into(),
1512 ))
1513 })?
1514 .map_err(|e| {
1515 P2PError::Transport(crate::error::TransportError::StreamError(
1516 e.to_string().into(),
1517 ))
1518 })
1519 }
1520
1521 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1523 use serde_json::json;
1524
1525 let timestamp = std::time::SystemTime::now()
1526 .duration_since(std::time::UNIX_EPOCH)
1527 .map_err(|e| {
1528 P2PError::Network(NetworkError::ProtocolError(
1529 format!("System time error: {}", e).into(),
1530 ))
1531 })?
1532 .as_secs();
1533
1534 let message = json!({
1536 "protocol": protocol,
1537 "data": data,
1538 "from": self.peer_id,
1539 "timestamp": timestamp
1540 });
1541
1542 serde_json::to_vec(&message).map_err(|e| {
1543 P2PError::Transport(crate::error::TransportError::StreamError(
1544 format!("Failed to serialize message: {e}").into(),
1545 ))
1546 })
1547 }
1548
1549 }
1551
1552#[allow(dead_code)]
1554fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1555 use serde_json::json;
1556
1557 let timestamp = std::time::SystemTime::now()
1558 .duration_since(std::time::UNIX_EPOCH)
1559 .map_err(|e| {
1560 P2PError::Network(NetworkError::ProtocolError(
1561 format!("System time error: {}", e).into(),
1562 ))
1563 })?
1564 .as_secs();
1565
1566 let message = json!({
1568 "protocol": protocol,
1569 "data": data,
1570 "timestamp": timestamp
1571 });
1572
1573 serde_json::to_vec(&message).map_err(|e| {
1574 P2PError::Transport(crate::error::TransportError::StreamError(
1575 format!("Failed to serialize message: {e}").into(),
1576 ))
1577 })
1578}
1579
1580impl P2PNode {
1581 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1583 self.event_tx.subscribe()
1584 }
1585
1586 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1588 self.subscribe_events()
1589 }
1590
1591 pub fn uptime(&self) -> Duration {
1593 self.start_time.elapsed()
1594 }
1595
1596 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1606 if let Some(ref resource_manager) = self.resource_manager {
1607 Ok(resource_manager.get_metrics().await)
1608 } else {
1609 Err(P2PError::Network(
1610 crate::error::NetworkError::ProtocolError(
1611 "Production resource manager not enabled".to_string().into(),
1612 ),
1613 ))
1614 }
1615 }
1616
1617 async fn connection_lifecycle_monitor(
1620 dual_node: Arc<DualStackNetworkNode>,
1621 active_connections: Arc<RwLock<HashSet<String>>>,
1622 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1623 event_tx: broadcast::Sender<P2PEvent>,
1624 ) {
1625 use crate::transport::ant_quic_adapter::ConnectionEvent;
1626
1627 let mut event_rx = dual_node.subscribe_connection_events();
1628
1629 info!("Connection lifecycle monitor started");
1630
1631 loop {
1632 match event_rx.recv().await {
1633 Ok(event) => {
1634 match event {
1635 ConnectionEvent::Established { peer_id, remote_address } => {
1636 let peer_id_str = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1637 debug!("Connection established: peer={}, addr={}", peer_id_str, remote_address);
1638
1639 active_connections.write().await.insert(peer_id_str.clone());
1641
1642 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1644 peer_info.status = ConnectionStatus::Connected;
1645 peer_info.connected_at = Instant::now();
1646 }
1647
1648 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1650 }
1651 ConnectionEvent::Lost { peer_id, reason } => {
1652 let peer_id_str = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1653 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1654
1655 active_connections.write().await.remove(&peer_id_str);
1657
1658 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1660 peer_info.status = ConnectionStatus::Disconnected;
1661 peer_info.last_seen = Instant::now();
1662 }
1663
1664 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1666 }
1667 ConnectionEvent::Failed { peer_id, reason } => {
1668 let peer_id_str = crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1669 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1670
1671 active_connections.write().await.remove(&peer_id_str);
1673
1674 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1676 peer_info.status = ConnectionStatus::Failed(reason.clone());
1677 }
1678
1679 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1681 }
1682 }
1683 }
1684 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1685 warn!("Connection event monitor lagged, skipped {} events", skipped);
1686 continue;
1687 }
1688 Err(broadcast::error::RecvError::Closed) => {
1689 info!("Connection event channel closed, stopping monitor");
1690 break;
1691 }
1692 }
1693 }
1694
1695 info!("Connection lifecycle monitor stopped");
1696 }
1697
1698 async fn start_connection_monitor(&self) {
1700 debug!("Connection monitor already running from initialization");
1704 }
1705
1706 async fn keepalive_task(
1712 active_connections: Arc<RwLock<HashSet<String>>>,
1713 dual_node: Arc<DualStackNetworkNode>,
1714 shutdown: Arc<AtomicBool>,
1715 ) {
1716 use tokio::time::{interval, Duration};
1717
1718 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1722 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1723
1724 info!("Keepalive task started (interval: {}s)", KEEPALIVE_INTERVAL_SECS);
1725
1726 loop {
1727 if shutdown.load(Ordering::Relaxed) {
1729 info!("Keepalive task shutting down");
1730 break;
1731 }
1732
1733 interval.tick().await;
1734
1735 let peers: Vec<String> = {
1737 active_connections.read().await.iter().cloned().collect()
1738 };
1739
1740 if peers.is_empty() {
1741 trace!("Keepalive: no active connections");
1742 continue;
1743 }
1744
1745 debug!("Sending keepalive to {} active connections", peers.len());
1746
1747 for peer_id in peers {
1749 match dual_node.send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD).await {
1750 Ok(_) => {
1751 trace!("Keepalive sent to peer: {}", peer_id);
1752 }
1753 Err(e) => {
1754 debug!("Failed to send keepalive to peer {}: {} (connection may have closed)", peer_id, e);
1755 }
1757 }
1758 }
1759 }
1760
1761 info!("Keepalive task stopped");
1762 }
1763
1764 pub async fn health_check(&self) -> Result<()> {
1766 if let Some(ref resource_manager) = self.resource_manager {
1767 resource_manager.health_check().await
1768 } else {
1769 let peer_count = self.peer_count().await;
1771 if peer_count > self.config.max_connections {
1772 Err(P2PError::Network(
1773 crate::error::NetworkError::ProtocolError(
1774 format!("Too many connections: {peer_count}").into(),
1775 ),
1776 ))
1777 } else {
1778 Ok(())
1779 }
1780 }
1781 }
1782
1783 pub fn production_config(&self) -> Option<&ProductionConfig> {
1785 self.config.production_config.as_ref()
1786 }
1787
1788 pub fn is_production_mode(&self) -> bool {
1790 self.resource_manager.is_some()
1791 }
1792
1793 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1795 self.dht.as_ref()
1796 }
1797
1798 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1800 if let Some(ref dht) = self.dht {
1801 let mut dht_instance = dht.write().await;
1802 let dht_key = crate::dht::DhtKey::from_bytes(key);
1803 dht_instance
1804 .store(&dht_key, value.clone())
1805 .await
1806 .map_err(|e| {
1807 P2PError::Dht(crate::error::DhtError::StoreFailed(
1808 format!("{:?}: {e}", key).into(),
1809 ))
1810 })?;
1811
1812 Ok(())
1813 } else {
1814 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1815 "DHT not enabled".to_string().into(),
1816 )))
1817 }
1818 }
1819
1820 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1822 if let Some(ref dht) = self.dht {
1823 let dht_instance = dht.read().await;
1824 let dht_key = crate::dht::DhtKey::from_bytes(key);
1825 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
1826 P2PError::Dht(crate::error::DhtError::StoreFailed(
1827 format!("Retrieve failed: {e}").into(),
1828 ))
1829 })?;
1830
1831 Ok(record_result)
1832 } else {
1833 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
1834 "DHT not enabled".to_string().into(),
1835 )))
1836 }
1837 }
1838
1839 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1841 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1842 let mut manager = bootstrap_manager.write().await;
1843 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1844 .iter()
1845 .filter_map(|addr| addr.parse().ok())
1846 .collect();
1847 let contact = ContactEntry::new(peer_id, socket_addresses);
1848 manager.add_contact(contact).await.map_err(|e| {
1849 P2PError::Network(crate::error::NetworkError::ProtocolError(
1850 format!("Failed to add peer to bootstrap cache: {e}").into(),
1851 ))
1852 })?;
1853 }
1854 Ok(())
1855 }
1856
1857 pub async fn update_peer_metrics(
1859 &self,
1860 peer_id: &PeerId,
1861 success: bool,
1862 latency_ms: Option<u64>,
1863 _error: Option<String>,
1864 ) -> Result<()> {
1865 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1866 let mut manager = bootstrap_manager.write().await;
1867
1868 let metrics = QualityMetrics {
1870 success_rate: if success { 1.0 } else { 0.0 },
1871 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1872 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
1874 last_successful_connection: if success {
1875 chrono::Utc::now()
1876 } else {
1877 chrono::Utc::now() - chrono::Duration::hours(1)
1878 },
1879 uptime_score: 0.5,
1880 };
1881
1882 manager
1883 .update_contact_metrics(peer_id, metrics)
1884 .await
1885 .map_err(|e| {
1886 P2PError::Network(crate::error::NetworkError::ProtocolError(
1887 format!("Failed to update peer metrics: {e}").into(),
1888 ))
1889 })?;
1890 }
1891 Ok(())
1892 }
1893
1894 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1896 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1897 let manager = bootstrap_manager.read().await;
1898 let stats = manager.get_stats().await.map_err(|e| {
1899 P2PError::Network(crate::error::NetworkError::ProtocolError(
1900 format!("Failed to get bootstrap stats: {e}").into(),
1901 ))
1902 })?;
1903 Ok(Some(stats))
1904 } else {
1905 Ok(None)
1906 }
1907 }
1908
1909 pub async fn cached_peer_count(&self) -> usize {
1911 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1912 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1913 {
1914 return stats.total_contacts;
1915 }
1916 0
1917 }
1918
1919 async fn connect_bootstrap_peers(&self) -> Result<()> {
1921 let mut bootstrap_contacts = Vec::new();
1922 let mut used_cache = false;
1923
1924 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1926 let manager = bootstrap_manager.read().await;
1927 match manager.get_bootstrap_peers(20).await {
1928 Ok(contacts) => {
1930 if !contacts.is_empty() {
1931 info!("Using {} cached bootstrap peers", contacts.len());
1932 bootstrap_contacts = contacts;
1933 used_cache = true;
1934 }
1935 }
1936 Err(e) => {
1937 warn!("Failed to get cached bootstrap peers: {}", e);
1938 }
1939 }
1940 }
1941
1942 if bootstrap_contacts.is_empty() {
1944 let bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1945 &self.config.bootstrap_peers_str
1946 } else {
1947 &self
1949 .config
1950 .bootstrap_peers
1951 .iter()
1952 .map(|addr| addr.to_string())
1953 .collect::<Vec<_>>()
1954 };
1955
1956 if bootstrap_peers.is_empty() {
1957 info!("No bootstrap peers configured and no cached peers available");
1958 return Ok(());
1959 }
1960
1961 info!("Using {} configured bootstrap peers", bootstrap_peers.len());
1962
1963 for addr in bootstrap_peers {
1964 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1965 let contact = ContactEntry::new(
1966 format!("unknown_peer_{}", addr.chars().take(8).collect::<String>()),
1967 vec![socket_addr],
1968 );
1969 bootstrap_contacts.push(contact);
1970 } else {
1971 warn!("Invalid bootstrap address format: {}", addr);
1972 }
1973 }
1974 }
1975
1976 let mut successful_connections = 0;
1978 for contact in bootstrap_contacts {
1979 for addr in &contact.addresses {
1980 match self.connect_peer(&addr.to_string()).await {
1981 Ok(peer_id) => {
1982 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
1983 successful_connections += 1;
1984
1985 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1987 let mut manager = bootstrap_manager.write().await;
1988 let mut updated_contact = contact.clone();
1989 updated_contact.peer_id = peer_id.clone();
1990 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1993 warn!("Failed to update bootstrap cache: {}", e);
1994 }
1995 }
1996 break; }
1998 Err(e) => {
1999 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2000
2001 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2003 let mut manager = bootstrap_manager.write().await;
2004 let mut updated_contact = contact.clone();
2005 updated_contact.update_connection_result(
2006 false,
2007 None,
2008 Some(e.to_string()),
2009 );
2010
2011 if let Err(e) = manager.add_contact(updated_contact).await {
2012 warn!("Failed to update bootstrap cache: {}", e);
2013 }
2014 }
2015 }
2016 }
2017 }
2018 }
2019
2020 if successful_connections == 0 {
2021 if !used_cache {
2022 warn!("Failed to connect to any bootstrap peers");
2023 }
2024 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2025 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2027 }));
2028 }
2029 info!(
2030 "Successfully connected to {} bootstrap peers",
2031 successful_connections
2032 );
2033
2034 Ok(())
2035 }
2036
2037 async fn disconnect_all_peers(&self) -> Result<()> {
2039 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2040
2041 for peer_id in peer_ids {
2042 self.disconnect_peer(&peer_id).await?;
2043 }
2044
2045 Ok(())
2046 }
2047
2048 async fn periodic_tasks(&self) -> Result<()> {
2050 Ok(())
2056 }
2057}
2058
2059#[async_trait::async_trait]
2061pub trait NetworkSender: Send + Sync {
2062 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2064
2065 fn local_peer_id(&self) -> &PeerId;
2067}
2068
2069#[derive(Clone)]
2071pub struct P2PNetworkSender {
2072 peer_id: PeerId,
2073 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2075}
2076
2077impl P2PNetworkSender {
2078 pub fn new(
2079 peer_id: PeerId,
2080 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2081 ) -> Self {
2082 Self { peer_id, send_tx }
2083 }
2084}
2085
2086#[async_trait::async_trait]
2088impl NetworkSender for P2PNetworkSender {
2089 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2091 self.send_tx
2092 .send((peer_id.clone(), protocol.to_string(), data))
2093 .map_err(|_| {
2094 P2PError::Network(crate::error::NetworkError::ProtocolError(
2095 "Failed to send message via channel".to_string().into(),
2096 ))
2097 })?;
2098 Ok(())
2099 }
2100
2101 fn local_peer_id(&self) -> &PeerId {
2103 &self.peer_id
2104 }
2105}
2106
2107pub struct NodeBuilder {
2109 config: NodeConfig,
2110}
2111
2112impl Default for NodeBuilder {
2113 fn default() -> Self {
2114 Self::new()
2115 }
2116}
2117
2118impl NodeBuilder {
2119 pub fn new() -> Self {
2121 Self {
2122 config: NodeConfig::default(),
2123 }
2124 }
2125
2126 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2128 self.config.peer_id = Some(peer_id);
2129 self
2130 }
2131
2132 pub fn listen_on(mut self, addr: &str) -> Self {
2134 if let Ok(multiaddr) = addr.parse() {
2135 self.config.listen_addrs.push(multiaddr);
2136 }
2137 self
2138 }
2139
2140 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2142 if let Ok(multiaddr) = addr.parse() {
2143 self.config.bootstrap_peers.push(multiaddr);
2144 }
2145 self.config.bootstrap_peers_str.push(addr.to_string());
2146 self
2147 }
2148
2149 pub fn with_ipv6(mut self, enable: bool) -> Self {
2151 self.config.enable_ipv6 = enable;
2152 self
2153 }
2154
2155 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2159 self.config.connection_timeout = timeout;
2160 self
2161 }
2162
2163 pub fn with_max_connections(mut self, max: usize) -> Self {
2165 self.config.max_connections = max;
2166 self
2167 }
2168
2169 pub fn with_production_mode(mut self) -> Self {
2171 self.config.production_config = Some(ProductionConfig::default());
2172 self
2173 }
2174
2175 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2177 self.config.production_config = Some(production_config);
2178 self
2179 }
2180
2181 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2183 self.config.dht_config = dht_config;
2184 self
2185 }
2186
2187 pub fn with_default_dht(mut self) -> Self {
2189 self.config.dht_config = DHTConfig::default();
2190 self
2191 }
2192
2193 pub async fn build(self) -> Result<P2PNode> {
2195 P2PNode::new(self.config).await
2196 }
2197}
2198
2199#[allow(dead_code)] async fn handle_received_message_standalone(
2202 message_data: Vec<u8>,
2203 peer_id: &PeerId,
2204 _protocol: &str,
2205 event_tx: &broadcast::Sender<P2PEvent>,
2206) -> Result<()> {
2207 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2209 Ok(message) => {
2210 if let (Some(protocol), Some(data), Some(from)) = (
2211 message.get("protocol").and_then(|v| v.as_str()),
2212 message.get("data").and_then(|v| v.as_array()),
2213 message.get("from").and_then(|v| v.as_str()),
2214 ) {
2215 let data_bytes: Vec<u8> = data
2217 .iter()
2218 .filter_map(|v| v.as_u64().map(|n| n as u8))
2219 .collect();
2220
2221 let event = P2PEvent::Message {
2223 topic: protocol.to_string(),
2224 source: from.to_string(),
2225 data: data_bytes,
2226 };
2227
2228 let _ = event_tx.send(event);
2229 debug!("Generated message event from peer: {}", peer_id);
2230 }
2231 }
2232 Err(e) => {
2233 warn!("Failed to parse received message from {}: {}", peer_id, e);
2234 }
2235 }
2236
2237 Ok(())
2238}
2239
2240#[allow(dead_code)]
2244fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2245 match create_protocol_message_static(protocol, data) {
2246 Ok(msg) => Some(msg),
2247 Err(e) => {
2248 warn!("Failed to create protocol message: {}", e);
2249 None
2250 }
2251 }
2252}
2253
2254#[allow(dead_code)]
2256async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2257 match result {
2258 Ok(_) => {
2259 debug!("Message sent to peer {} via transport layer", peer_id);
2260 }
2261 Err(e) => {
2262 warn!("Failed to send message to peer {}: {}", peer_id, e);
2263 }
2264 }
2265}
2266
2267#[allow(dead_code)] fn check_rate_limit(
2270 rate_limiter: &RateLimiter,
2271 socket_addr: &std::net::SocketAddr,
2272 remote_addr: &NetworkAddress,
2273) -> Result<()> {
2274 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2275 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2276 e
2277 })
2278}
2279
2280#[allow(dead_code)] async fn register_new_peer(
2283 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2284 peer_id: &PeerId,
2285 remote_addr: &NetworkAddress,
2286) {
2287 let mut peers_guard = peers.write().await;
2288 let peer_info = PeerInfo {
2289 peer_id: peer_id.clone(),
2290 addresses: vec![remote_addr.to_string()],
2291 connected_at: tokio::time::Instant::now(),
2292 last_seen: tokio::time::Instant::now(),
2293 status: ConnectionStatus::Connected,
2294 protocols: vec!["p2p-chat/1.0.0".to_string()],
2295 heartbeat_count: 0,
2296 };
2297 peers_guard.insert(peer_id.clone(), peer_info);
2298}
2299
2300#[allow(dead_code)] fn spawn_connection_handler(
2303 connection: Box<dyn crate::transport::Connection>,
2304 peer_id: PeerId,
2305 event_tx: broadcast::Sender<P2PEvent>,
2306 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2307) {
2308 tokio::spawn(async move {
2309 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2310 });
2311}
2312
2313#[allow(dead_code)] async fn handle_peer_connection(
2316 mut connection: Box<dyn crate::transport::Connection>,
2317 peer_id: PeerId,
2318 event_tx: broadcast::Sender<P2PEvent>,
2319 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2320) {
2321 loop {
2322 match connection.receive().await {
2323 Ok(message_data) => {
2324 debug!(
2325 "Received {} bytes from peer: {}",
2326 message_data.len(),
2327 peer_id
2328 );
2329
2330 if let Err(e) = handle_received_message_standalone(
2332 message_data,
2333 &peer_id,
2334 "unknown", &event_tx,
2336 )
2337 .await
2338 {
2339 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2340 }
2341 }
2342 Err(e) => {
2343 warn!("Failed to receive message from {}: {}", peer_id, e);
2344
2345 if !connection.is_alive().await {
2347 info!("Connection to {} is dead, removing peer", peer_id);
2348
2349 remove_peer(&peers, &peer_id).await;
2351
2352 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2354
2355 break; }
2357
2358 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2360 }
2361 }
2362 }
2363}
2364
2365#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2368 let mut peers_guard = peers.write().await;
2369 peers_guard.remove(peer_id);
2370}
2371
2372#[allow(dead_code)]
2374async fn update_peer_heartbeat(
2375 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2376 peer_id: &PeerId,
2377) -> Result<()> {
2378 let mut peers_guard = peers.write().await;
2379 match peers_guard.get_mut(peer_id) {
2380 Some(peer_info) => {
2381 peer_info.last_seen = Instant::now();
2382 peer_info.heartbeat_count += 1;
2383 Ok(())
2384 }
2385 None => {
2386 warn!("Received heartbeat from unknown peer: {}", peer_id);
2387 Err(P2PError::Network(NetworkError::PeerNotFound(
2388 format!("Peer {} not found", peer_id).into(),
2389 )))
2390 }
2391 }
2392}
2393
2394#[allow(dead_code)]
2396async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2397 if let Some(manager) = resource_manager {
2398 let metrics = manager.get_metrics().await;
2399 (metrics.memory_used, metrics.cpu_usage)
2400 } else {
2401 (0, 0.0)
2402 }
2403}
2404
2405#[cfg(test)]
2406mod tests {
2407 use super::*;
2408 use std::time::Duration;
2410 use tokio::time::timeout;
2411
2412 fn create_test_node_config() -> NodeConfig {
2418 NodeConfig {
2419 peer_id: Some("test_peer_123".to_string()),
2420 listen_addrs: vec![
2421 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2422 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2423 ],
2424 listen_addr: std::net::SocketAddr::new(
2425 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2426 0,
2427 ),
2428 bootstrap_peers: vec![],
2429 bootstrap_peers_str: vec![],
2430 enable_ipv6: true,
2431
2432 connection_timeout: Duration::from_millis(300),
2433 keep_alive_interval: Duration::from_secs(30),
2434 max_connections: 100,
2435 max_incoming_connections: 50,
2436 dht_config: DHTConfig::default(),
2437 security_config: SecurityConfig::default(),
2438 production_config: None,
2439 bootstrap_cache_config: None,
2440 }
2442 }
2443
2444 #[tokio::test]
2448 async fn test_node_config_default() {
2449 let config = NodeConfig::default();
2450
2451 assert!(config.peer_id.is_none());
2452 assert_eq!(config.listen_addrs.len(), 2);
2453 assert!(config.enable_ipv6);
2454 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2456 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2457 }
2458
2459 #[tokio::test]
2460 async fn test_dht_config_default() {
2461 let config = DHTConfig::default();
2462
2463 assert_eq!(config.k_value, 20);
2464 assert_eq!(config.alpha_value, 5);
2465 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2466 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2467 }
2468
2469 #[tokio::test]
2470 async fn test_security_config_default() {
2471 let config = SecurityConfig::default();
2472
2473 assert!(config.enable_noise);
2474 assert!(config.enable_tls);
2475 assert_eq!(config.trust_level, TrustLevel::Basic);
2476 }
2477
2478 #[test]
2479 fn test_trust_level_variants() {
2480 let _none = TrustLevel::None;
2482 let _basic = TrustLevel::Basic;
2483 let _full = TrustLevel::Full;
2484
2485 assert_eq!(TrustLevel::None, TrustLevel::None);
2487 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2488 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2489 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2490 }
2491
2492 #[test]
2493 fn test_connection_status_variants() {
2494 let connecting = ConnectionStatus::Connecting;
2495 let connected = ConnectionStatus::Connected;
2496 let disconnecting = ConnectionStatus::Disconnecting;
2497 let disconnected = ConnectionStatus::Disconnected;
2498 let failed = ConnectionStatus::Failed("test error".to_string());
2499
2500 assert_eq!(connecting, ConnectionStatus::Connecting);
2501 assert_eq!(connected, ConnectionStatus::Connected);
2502 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2503 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2504 assert_ne!(connecting, connected);
2505
2506 if let ConnectionStatus::Failed(msg) = failed {
2507 assert_eq!(msg, "test error");
2508 } else {
2509 panic!("Expected Failed status");
2510 }
2511 }
2512
2513 #[tokio::test]
2514 async fn test_node_creation() -> Result<()> {
2515 let config = create_test_node_config();
2516 let node = P2PNode::new(config).await?;
2517
2518 assert_eq!(node.peer_id(), "test_peer_123");
2519 assert!(!node.is_running().await);
2520 assert_eq!(node.peer_count().await, 0);
2521 assert!(node.connected_peers().await.is_empty());
2522
2523 Ok(())
2524 }
2525
2526 #[tokio::test]
2527 async fn test_node_creation_without_peer_id() -> Result<()> {
2528 let mut config = create_test_node_config();
2529 config.peer_id = None;
2530
2531 let node = P2PNode::new(config).await?;
2532
2533 assert!(node.peer_id().starts_with("peer_"));
2535 assert!(!node.is_running().await);
2536
2537 Ok(())
2538 }
2539
2540 #[tokio::test]
2541 async fn test_node_lifecycle() -> Result<()> {
2542 let config = create_test_node_config();
2543 let node = P2PNode::new(config).await?;
2544
2545 assert!(!node.is_running().await);
2547
2548 node.start().await?;
2550 assert!(node.is_running().await);
2551
2552 let listen_addrs = node.listen_addrs().await;
2554 assert!(
2555 !listen_addrs.is_empty(),
2556 "Expected at least one listening address"
2557 );
2558
2559 node.stop().await?;
2561 assert!(!node.is_running().await);
2562
2563 Ok(())
2564 }
2565
2566 #[tokio::test]
2567 async fn test_peer_connection() -> Result<()> {
2568 let config = create_test_node_config();
2569 let node = P2PNode::new(config).await?;
2570
2571 let peer_addr = "127.0.0.1:0";
2572
2573 let peer_id = node.connect_peer(peer_addr).await?;
2575 assert!(peer_id.starts_with("peer_from_"));
2576
2577 assert_eq!(node.peer_count().await, 1);
2579
2580 let connected_peers = node.connected_peers().await;
2582 assert_eq!(connected_peers.len(), 1);
2583 assert_eq!(connected_peers[0], peer_id);
2584
2585 let peer_info = node.peer_info(&peer_id).await;
2587 assert!(peer_info.is_some());
2588 let info = peer_info.expect("Peer info should exist after adding peer");
2589 assert_eq!(info.peer_id, peer_id);
2590 assert_eq!(info.status, ConnectionStatus::Connected);
2591 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2592
2593 node.disconnect_peer(&peer_id).await?;
2595 assert_eq!(node.peer_count().await, 0);
2596
2597 Ok(())
2598 }
2599
2600 #[tokio::test]
2601 async fn test_event_subscription() -> Result<()> {
2602 let config = create_test_node_config();
2603 let node = P2PNode::new(config).await?;
2604
2605 let mut events = node.subscribe_events();
2606 let peer_addr = "127.0.0.1:0";
2607
2608 let peer_id = node.connect_peer(peer_addr).await?;
2610
2611 let event = timeout(Duration::from_millis(100), events.recv()).await;
2613 assert!(event.is_ok());
2614
2615 let event_result = event
2616 .expect("Should receive event")
2617 .expect("Event should not be error");
2618 match event_result {
2619 P2PEvent::PeerConnected(event_peer_id) => {
2620 assert_eq!(event_peer_id, peer_id);
2621 }
2622 _ => panic!("Expected PeerConnected event"),
2623 }
2624
2625 node.disconnect_peer(&peer_id).await?;
2627
2628 let event = timeout(Duration::from_millis(100), events.recv()).await;
2630 assert!(event.is_ok());
2631
2632 let event_result = event
2633 .expect("Should receive event")
2634 .expect("Event should not be error");
2635 match event_result {
2636 P2PEvent::PeerDisconnected(event_peer_id) => {
2637 assert_eq!(event_peer_id, peer_id);
2638 }
2639 _ => panic!("Expected PeerDisconnected event"),
2640 }
2641
2642 Ok(())
2643 }
2644
2645 #[tokio::test]
2646 async fn test_message_sending() -> Result<()> {
2647 let mut config1 = create_test_node_config();
2649 config1.listen_addr =
2650 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2651 let node1 = P2PNode::new(config1).await?;
2652 node1.start().await?;
2653
2654 let mut config2 = create_test_node_config();
2655 config2.listen_addr =
2656 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2657 let node2 = P2PNode::new(config2).await?;
2658 node2.start().await?;
2659
2660 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2662
2663 let node2_addr = node2.local_addr().ok_or_else(|| {
2665 P2PError::Network(crate::error::NetworkError::ProtocolError(
2666 "No listening address".to_string().into(),
2667 ))
2668 })?;
2669
2670 let peer_id =
2672 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2673 Ok(res) => res?,
2674 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2675 };
2676
2677 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2679
2680 let message_data = b"Hello, peer!".to_vec();
2682 let result = match timeout(
2683 Duration::from_millis(500),
2684 node1.send_message(&peer_id, "test-protocol", message_data),
2685 )
2686 .await
2687 {
2688 Ok(res) => res,
2689 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2690 };
2691 if let Err(e) = &result {
2694 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2695 }
2696
2697 let non_existent_peer = "non_existent_peer".to_string();
2699 let result = node1
2700 .send_message(&non_existent_peer, "test-protocol", vec![])
2701 .await;
2702 assert!(result.is_err(), "Sending to non-existent peer should fail");
2703
2704 Ok(())
2705 }
2706
2707 #[tokio::test]
2708 async fn test_remote_mcp_operations() -> Result<()> {
2709 let config = create_test_node_config();
2710 let node = P2PNode::new(config).await?;
2711
2712 node.start().await?;
2714 node.stop().await?;
2715 Ok(())
2716 }
2717
2718 #[tokio::test]
2719 async fn test_health_check() -> Result<()> {
2720 let config = create_test_node_config();
2721 let node = P2PNode::new(config).await?;
2722
2723 let result = node.health_check().await;
2725 assert!(result.is_ok());
2726
2727 Ok(())
2732 }
2733
2734 #[tokio::test]
2735 async fn test_node_uptime() -> Result<()> {
2736 let config = create_test_node_config();
2737 let node = P2PNode::new(config).await?;
2738
2739 let uptime1 = node.uptime();
2740 assert!(uptime1 >= Duration::from_secs(0));
2741
2742 tokio::time::sleep(Duration::from_millis(10)).await;
2744
2745 let uptime2 = node.uptime();
2746 assert!(uptime2 > uptime1);
2747
2748 Ok(())
2749 }
2750
2751 #[tokio::test]
2752 async fn test_node_config_access() -> Result<()> {
2753 let config = create_test_node_config();
2754 let expected_peer_id = config.peer_id.clone();
2755 let node = P2PNode::new(config).await?;
2756
2757 let node_config = node.config();
2758 assert_eq!(node_config.peer_id, expected_peer_id);
2759 assert_eq!(node_config.max_connections, 100);
2760 Ok(())
2763 }
2764
2765 #[tokio::test]
2766 async fn test_mcp_server_access() -> Result<()> {
2767 let config = create_test_node_config();
2768 let _node = P2PNode::new(config).await?;
2769
2770 Ok(())
2772 }
2773
2774 #[tokio::test]
2775 async fn test_dht_access() -> Result<()> {
2776 let config = create_test_node_config();
2777 let node = P2PNode::new(config).await?;
2778
2779 assert!(node.dht().is_some());
2781
2782 Ok(())
2783 }
2784
2785 #[tokio::test]
2786 async fn test_node_builder() -> Result<()> {
2787 let builder = P2PNode::builder()
2789 .with_peer_id("builder_test_peer".to_string())
2790 .listen_on("/ip4/127.0.0.1/tcp/0")
2791 .listen_on("/ip6/::1/tcp/0")
2792 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2794 .with_connection_timeout(Duration::from_secs(15))
2795 .with_max_connections(200);
2796
2797 let config = builder.config;
2799 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2800 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2803 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2804 assert_eq!(config.max_connections, 200);
2805
2806 Ok(())
2807 }
2808
2809 #[tokio::test]
2810 async fn test_bootstrap_peers() -> Result<()> {
2811 let mut config = create_test_node_config();
2812 config.bootstrap_peers = vec![
2813 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2814 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2815 ];
2816
2817 let node = P2PNode::new(config).await?;
2818
2819 node.start().await?;
2821
2822 let peer_count = node.peer_count().await;
2825 assert!(
2826 peer_count <= 2,
2827 "Peer count should not exceed bootstrap peer count"
2828 );
2829
2830 node.stop().await?;
2831 Ok(())
2832 }
2833
2834 #[tokio::test]
2835 async fn test_production_mode_disabled() -> Result<()> {
2836 let config = create_test_node_config();
2837 let node = P2PNode::new(config).await?;
2838
2839 assert!(!node.is_production_mode());
2840 assert!(node.production_config().is_none());
2841
2842 let result = node.resource_metrics().await;
2844 assert!(result.is_err());
2845 assert!(result.unwrap_err().to_string().contains("not enabled"));
2846
2847 Ok(())
2848 }
2849
2850 #[tokio::test]
2851 async fn test_network_event_variants() {
2852 let peer_id = "test_peer".to_string();
2854 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2855
2856 let _peer_connected = NetworkEvent::PeerConnected {
2857 peer_id: peer_id.clone(),
2858 addresses: vec![address.clone()],
2859 };
2860
2861 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2862 peer_id: peer_id.clone(),
2863 reason: "test disconnect".to_string(),
2864 };
2865
2866 let _message_received = NetworkEvent::MessageReceived {
2867 peer_id: peer_id.clone(),
2868 protocol: "test-protocol".to_string(),
2869 data: vec![1, 2, 3],
2870 };
2871
2872 let _connection_failed = NetworkEvent::ConnectionFailed {
2873 peer_id: Some(peer_id.clone()),
2874 address: address.clone(),
2875 error: "connection refused".to_string(),
2876 };
2877
2878 let _dht_stored = NetworkEvent::DHTRecordStored {
2879 key: vec![1, 2, 3],
2880 value: vec![4, 5, 6],
2881 };
2882
2883 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2884 key: vec![1, 2, 3],
2885 value: Some(vec![4, 5, 6]),
2886 };
2887 }
2888
2889 #[tokio::test]
2890 async fn test_peer_info_structure() {
2891 let peer_info = PeerInfo {
2892 peer_id: "test_peer".to_string(),
2893 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2894 connected_at: Instant::now(),
2895 last_seen: Instant::now(),
2896 status: ConnectionStatus::Connected,
2897 protocols: vec!["test-protocol".to_string()],
2898 heartbeat_count: 0,
2899 };
2900
2901 assert_eq!(peer_info.peer_id, "test_peer");
2902 assert_eq!(peer_info.addresses.len(), 1);
2903 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2904 assert_eq!(peer_info.protocols.len(), 1);
2905 }
2906
2907 #[tokio::test]
2908 async fn test_serialization() -> Result<()> {
2909 let config = create_test_node_config();
2911 let serialized = serde_json::to_string(&config)?;
2912 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2913
2914 assert_eq!(config.peer_id, deserialized.peer_id);
2915 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2916 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2917
2918 Ok(())
2919 }
2920
2921 #[tokio::test]
2922 async fn test_get_peer_id_by_address_found() -> Result<()> {
2923 let config = create_test_node_config();
2924 let node = P2PNode::new(config).await?;
2925
2926 let test_peer_id = "peer_test_123".to_string();
2928 let test_address = "192.168.1.100:9000".to_string();
2929
2930 let peer_info = PeerInfo {
2931 peer_id: test_peer_id.clone(),
2932 addresses: vec![test_address.clone()],
2933 connected_at: Instant::now(),
2934 last_seen: Instant::now(),
2935 status: ConnectionStatus::Connected,
2936 protocols: vec!["test-protocol".to_string()],
2937 heartbeat_count: 0,
2938 };
2939
2940 node.peers
2941 .write()
2942 .await
2943 .insert(test_peer_id.clone(), peer_info);
2944
2945 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
2947 assert_eq!(found_peer_id, Some(test_peer_id));
2948
2949 Ok(())
2950 }
2951
2952 #[tokio::test]
2953 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
2954 let config = create_test_node_config();
2955 let node = P2PNode::new(config).await?;
2956
2957 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
2959 assert_eq!(result, None);
2960
2961 Ok(())
2962 }
2963
2964 #[tokio::test]
2965 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
2966 let config = create_test_node_config();
2967 let node = P2PNode::new(config).await?;
2968
2969 let result = node.get_peer_id_by_address("invalid-address").await;
2971 assert_eq!(result, None);
2972
2973 Ok(())
2974 }
2975
2976 #[tokio::test]
2977 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
2978 let config = create_test_node_config();
2979 let node = P2PNode::new(config).await?;
2980
2981 let peer1_id = "peer_1".to_string();
2983 let peer1_addr = "192.168.1.101:9001".to_string();
2984
2985 let peer2_id = "peer_2".to_string();
2986 let peer2_addr = "192.168.1.102:9002".to_string();
2987
2988 let peer1_info = PeerInfo {
2989 peer_id: peer1_id.clone(),
2990 addresses: vec![peer1_addr.clone()],
2991 connected_at: Instant::now(),
2992 last_seen: Instant::now(),
2993 status: ConnectionStatus::Connected,
2994 protocols: vec!["test-protocol".to_string()],
2995 heartbeat_count: 0,
2996 };
2997
2998 let peer2_info = PeerInfo {
2999 peer_id: peer2_id.clone(),
3000 addresses: vec![peer2_addr.clone()],
3001 connected_at: Instant::now(),
3002 last_seen: Instant::now(),
3003 status: ConnectionStatus::Connected,
3004 protocols: vec!["test-protocol".to_string()],
3005 heartbeat_count: 0,
3006 };
3007
3008 node.peers
3009 .write()
3010 .await
3011 .insert(peer1_id.clone(), peer1_info);
3012 node.peers
3013 .write()
3014 .await
3015 .insert(peer2_id.clone(), peer2_info);
3016
3017 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3019 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3020
3021 assert_eq!(found_peer1, Some(peer1_id));
3022 assert_eq!(found_peer2, Some(peer2_id));
3023
3024 Ok(())
3025 }
3026
3027 #[tokio::test]
3028 async fn test_list_active_connections_empty() -> Result<()> {
3029 let config = create_test_node_config();
3030 let node = P2PNode::new(config).await?;
3031
3032 let connections = node.list_active_connections().await;
3034 assert!(connections.is_empty());
3035
3036 Ok(())
3037 }
3038
3039 #[tokio::test]
3040 async fn test_list_active_connections_with_peers() -> Result<()> {
3041 let config = create_test_node_config();
3042 let node = P2PNode::new(config).await?;
3043
3044 let peer1_id = "peer_1".to_string();
3046 let peer1_addrs = vec![
3047 "192.168.1.101:9001".to_string(),
3048 "192.168.1.101:9002".to_string(),
3049 ];
3050
3051 let peer2_id = "peer_2".to_string();
3052 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3053
3054 let peer1_info = PeerInfo {
3055 peer_id: peer1_id.clone(),
3056 addresses: peer1_addrs.clone(),
3057 connected_at: Instant::now(),
3058 last_seen: Instant::now(),
3059 status: ConnectionStatus::Connected,
3060 protocols: vec!["test-protocol".to_string()],
3061 heartbeat_count: 0,
3062 };
3063
3064 let peer2_info = PeerInfo {
3065 peer_id: peer2_id.clone(),
3066 addresses: peer2_addrs.clone(),
3067 connected_at: Instant::now(),
3068 last_seen: Instant::now(),
3069 status: ConnectionStatus::Connected,
3070 protocols: vec!["test-protocol".to_string()],
3071 heartbeat_count: 0,
3072 };
3073
3074 node.peers
3075 .write()
3076 .await
3077 .insert(peer1_id.clone(), peer1_info);
3078 node.peers
3079 .write()
3080 .await
3081 .insert(peer2_id.clone(), peer2_info);
3082
3083 let connections = node.list_active_connections().await;
3085 assert_eq!(connections.len(), 2);
3086
3087 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3089 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3090
3091 assert!(peer1_conn.is_some());
3092 assert!(peer2_conn.is_some());
3093
3094 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3096 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3097
3098 Ok(())
3099 }
3100
3101 #[tokio::test]
3102 async fn test_remove_peer_success() -> Result<()> {
3103 let config = create_test_node_config();
3104 let node = P2PNode::new(config).await?;
3105
3106 let peer_id = "peer_to_remove".to_string();
3108 let peer_info = PeerInfo {
3109 peer_id: peer_id.clone(),
3110 addresses: vec!["192.168.1.100:9000".to_string()],
3111 connected_at: Instant::now(),
3112 last_seen: Instant::now(),
3113 status: ConnectionStatus::Connected,
3114 protocols: vec!["test-protocol".to_string()],
3115 heartbeat_count: 0,
3116 };
3117
3118 node.peers.write().await.insert(peer_id.clone(), peer_info);
3119
3120 assert!(node.is_peer_connected(&peer_id).await);
3122
3123 let removed = node.remove_peer(&peer_id).await;
3125 assert!(removed);
3126
3127 assert!(!node.is_peer_connected(&peer_id).await);
3129
3130 Ok(())
3131 }
3132
3133 #[tokio::test]
3134 async fn test_remove_peer_nonexistent() -> Result<()> {
3135 let config = create_test_node_config();
3136 let node = P2PNode::new(config).await?;
3137
3138 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3140 assert!(!removed);
3141
3142 Ok(())
3143 }
3144
3145 #[tokio::test]
3146 async fn test_is_peer_connected() -> Result<()> {
3147 let config = create_test_node_config();
3148 let node = P2PNode::new(config).await?;
3149
3150 let peer_id = "test_peer".to_string();
3151
3152 assert!(!node.is_peer_connected(&peer_id).await);
3154
3155 let peer_info = PeerInfo {
3157 peer_id: peer_id.clone(),
3158 addresses: vec!["192.168.1.100:9000".to_string()],
3159 connected_at: Instant::now(),
3160 last_seen: Instant::now(),
3161 status: ConnectionStatus::Connected,
3162 protocols: vec!["test-protocol".to_string()],
3163 heartbeat_count: 0,
3164 };
3165
3166 node.peers.write().await.insert(peer_id.clone(), peer_info);
3167
3168 assert!(node.is_peer_connected(&peer_id).await);
3170
3171 node.remove_peer(&peer_id).await;
3173
3174 assert!(!node.is_peer_connected(&peer_id).await);
3176
3177 Ok(())
3178 }
3179
3180 #[test]
3181 fn test_normalize_ipv6_wildcard() {
3182 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3183
3184 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3185 let normalized = normalize_wildcard_to_loopback(wildcard);
3186
3187 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3188 assert_eq!(normalized.port(), 8080);
3189 }
3190
3191 #[test]
3192 fn test_normalize_ipv4_wildcard() {
3193 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3194
3195 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3196 let normalized = normalize_wildcard_to_loopback(wildcard);
3197
3198 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3199 assert_eq!(normalized.port(), 9000);
3200 }
3201
3202 #[test]
3203 fn test_normalize_specific_address_unchanged() {
3204 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3205 let normalized = normalize_wildcard_to_loopback(specific);
3206
3207 assert_eq!(normalized, specific);
3208 }
3209
3210 #[test]
3211 fn test_normalize_loopback_unchanged() {
3212 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3213
3214 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3215 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3216 assert_eq!(normalized_v6, loopback_v6);
3217
3218 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3219 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3220 assert_eq!(normalized_v4, loopback_v4);
3221 }
3222}