1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use serde_json::json;
37use std::collections::{HashMap, HashSet};
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::Instant;
43use tracing::{debug, info, trace, warn};
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NodeConfig {
48 pub peer_id: Option<PeerId>,
50
51 pub listen_addrs: Vec<std::net::SocketAddr>,
53
54 pub listen_addr: std::net::SocketAddr,
56
57 pub bootstrap_peers: Vec<std::net::SocketAddr>,
59
60 pub bootstrap_peers_str: Vec<String>,
62
63 pub enable_ipv6: bool,
65
66 pub connection_timeout: Duration,
69
70 pub keep_alive_interval: Duration,
72
73 pub max_connections: usize,
75
76 pub max_incoming_connections: usize,
78
79 pub dht_config: DHTConfig,
81
82 pub security_config: SecurityConfig,
84
85 pub production_config: Option<ProductionConfig>,
87
88 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
90
91 pub diversity_config: Option<crate::security::IPDiversityConfig>,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct DHTConfig {
101 pub k_value: usize,
103
104 pub alpha_value: usize,
106
107 pub record_ttl: Duration,
109
110 pub refresh_interval: Duration,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct SecurityConfig {
117 pub enable_noise: bool,
119
120 pub enable_tls: bool,
122
123 pub trust_level: TrustLevel,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
129pub enum TrustLevel {
130 None,
132 Basic,
134 Full,
136}
137
138impl NodeConfig {
139 pub fn new() -> Result<Self> {
145 let config = Config::default();
147
148 let listen_addr = config.listen_socket_addr()?;
150
151 let mut listen_addrs = vec![];
153
154 if config.network.ipv6_enabled {
156 let ipv6_addr = std::net::SocketAddr::new(
157 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
158 listen_addr.port(),
159 );
160 listen_addrs.push(ipv6_addr);
161 }
162
163 let ipv4_addr = std::net::SocketAddr::new(
165 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
166 listen_addr.port(),
167 );
168 listen_addrs.push(ipv4_addr);
169
170 Ok(Self {
171 peer_id: None,
172 listen_addrs,
173 listen_addr,
174 bootstrap_peers: Vec::new(),
175 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
176 enable_ipv6: config.network.ipv6_enabled,
177
178 connection_timeout: Duration::from_secs(config.network.connection_timeout),
179 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
180 max_connections: config.network.max_connections,
181 max_incoming_connections: config.security.connection_limit as usize,
182 dht_config: DHTConfig::default(),
183 security_config: SecurityConfig::default(),
184 production_config: None,
185 bootstrap_cache_config: None,
186 diversity_config: None,
187 })
189 }
190}
191
192impl Default for NodeConfig {
193 fn default() -> Self {
194 let config = Config::default();
196
197 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
199 std::net::SocketAddr::new(
200 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
201 9000,
202 )
203 });
204
205 Self {
206 peer_id: None,
207 listen_addrs: vec![
208 std::net::SocketAddr::new(
209 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
210 listen_addr.port(),
211 ),
212 std::net::SocketAddr::new(
213 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
214 listen_addr.port(),
215 ),
216 ],
217 listen_addr,
218 bootstrap_peers: Vec::new(),
219 bootstrap_peers_str: Vec::new(),
220 enable_ipv6: config.network.ipv6_enabled,
221
222 connection_timeout: Duration::from_secs(config.network.connection_timeout),
223 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
224 max_connections: config.network.max_connections,
225 max_incoming_connections: config.security.connection_limit as usize,
226 dht_config: DHTConfig::default(),
227 security_config: SecurityConfig::default(),
228 production_config: None, bootstrap_cache_config: None,
230 diversity_config: None,
231 }
233 }
234}
235
236impl NodeConfig {
237 pub fn from_config(config: &Config) -> Result<Self> {
239 let listen_addr = config.listen_socket_addr()?;
240 let bootstrap_addrs = config.bootstrap_addrs()?;
241
242 let mut node_config = Self {
243 peer_id: None,
244 listen_addrs: vec![listen_addr],
245 listen_addr,
246 bootstrap_peers: bootstrap_addrs
247 .iter()
248 .map(|addr| addr.socket_addr())
249 .collect(),
250 bootstrap_peers_str: config
251 .network
252 .bootstrap_nodes
253 .iter()
254 .map(|addr| addr.to_string())
255 .collect(),
256 enable_ipv6: config.network.ipv6_enabled,
257
258 connection_timeout: Duration::from_secs(config.network.connection_timeout),
259 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
260 max_connections: config.network.max_connections,
261 max_incoming_connections: config.security.connection_limit as usize,
262 dht_config: DHTConfig {
263 k_value: 20,
264 alpha_value: 3,
265 record_ttl: Duration::from_secs(3600),
266 refresh_interval: Duration::from_secs(900),
267 },
268 security_config: SecurityConfig {
269 enable_noise: true,
270 enable_tls: true,
271 trust_level: TrustLevel::Basic,
272 },
273 production_config: Some(ProductionConfig {
274 max_connections: config.network.max_connections,
275 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
278 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
279 health_check_interval: Duration::from_secs(30),
280 metrics_interval: Duration::from_secs(60),
281 enable_performance_tracking: true,
282 enable_auto_cleanup: true,
283 shutdown_timeout: Duration::from_secs(30),
284 rate_limits: crate::production::RateLimitConfig::default(),
285 }),
286 bootstrap_cache_config: None,
287 diversity_config: None,
288 };
293
294 if config.network.ipv6_enabled {
296 node_config.listen_addrs.push(std::net::SocketAddr::new(
297 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
298 listen_addr.port(),
299 ));
300 }
301
302 Ok(node_config)
303 }
304
305 pub fn with_listen_addr(addr: &str) -> Result<Self> {
307 let listen_addr: std::net::SocketAddr = addr
308 .parse()
309 .map_err(|e: std::net::AddrParseError| {
310 NetworkError::InvalidAddress(e.to_string().into())
311 })
312 .map_err(P2PError::Network)?;
313 let cfg = NodeConfig {
314 listen_addr,
315 listen_addrs: vec![listen_addr],
316 diversity_config: None,
317 ..Default::default()
318 };
319 Ok(cfg)
320 }
321}
322
323impl Default for DHTConfig {
324 fn default() -> Self {
325 Self {
326 k_value: 20,
327 alpha_value: 5,
328 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
331 }
332}
333
334impl Default for SecurityConfig {
335 fn default() -> Self {
336 Self {
337 enable_noise: true,
338 enable_tls: true,
339 trust_level: TrustLevel::Basic,
340 }
341 }
342}
343
344#[derive(Debug, Clone)]
346pub struct PeerInfo {
347 pub peer_id: PeerId,
349
350 pub addresses: Vec<String>,
352
353 pub connected_at: Instant,
355
356 pub last_seen: Instant,
358
359 pub status: ConnectionStatus,
361
362 pub protocols: Vec<String>,
364
365 pub heartbeat_count: u64,
367}
368
369#[derive(Debug, Clone, PartialEq)]
371pub enum ConnectionStatus {
372 Connecting,
374 Connected,
376 Disconnecting,
378 Disconnected,
380 Failed(String),
382}
383
384#[derive(Debug, Clone)]
386pub enum NetworkEvent {
387 PeerConnected {
389 peer_id: PeerId,
391 addresses: Vec<String>,
393 },
394
395 PeerDisconnected {
397 peer_id: PeerId,
399 reason: String,
401 },
402
403 MessageReceived {
405 peer_id: PeerId,
407 protocol: String,
409 data: Vec<u8>,
411 },
412
413 ConnectionFailed {
415 peer_id: Option<PeerId>,
417 address: String,
419 error: String,
421 },
422
423 DHTRecordStored {
425 key: Vec<u8>,
427 value: Vec<u8>,
429 },
430
431 DHTRecordRetrieved {
433 key: Vec<u8>,
435 value: Option<Vec<u8>>,
437 },
438}
439
440#[derive(Debug, Clone)]
445pub enum P2PEvent {
446 Message {
448 topic: String,
450 source: PeerId,
452 data: Vec<u8>,
454 },
455 PeerConnected(PeerId),
457 PeerDisconnected(PeerId),
459}
460
461pub struct P2PNode {
471 config: NodeConfig,
473
474 peer_id: PeerId,
476
477 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
479
480 event_tx: broadcast::Sender<P2PEvent>,
482
483 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
485
486 start_time: Instant,
488
489 running: RwLock<bool>,
491
492 dht: Option<Arc<RwLock<DHT>>>,
494
495 resource_manager: Option<Arc<ResourceManager>>,
497
498 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
500
501 dual_node: Arc<DualStackNetworkNode>,
503
504 #[allow(dead_code)]
506 rate_limiter: Arc<RateLimiter>,
507
508 active_connections: Arc<RwLock<HashSet<PeerId>>>,
511
512 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
514
515 #[allow(dead_code)]
517 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
518
519 #[allow(dead_code)]
521 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
522
523 #[allow(dead_code)]
525 shutdown: Arc<AtomicBool>,
526
527 #[allow(dead_code)]
529 geo_provider: Arc<BgpGeoProvider>,
530}
531
532fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
548 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
549
550 if addr.ip().is_unspecified() {
551 let loopback_ip = match addr {
553 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
556 std::net::SocketAddr::new(loopback_ip, addr.port())
557 } else {
558 addr
560 }
561}
562
563impl P2PNode {
564 pub fn new_for_tests() -> Result<Self> {
566 let (event_tx, _) = broadcast::channel(16);
567 Ok(Self {
568 config: NodeConfig::default(),
569 peer_id: "test_peer".to_string(),
570 peers: Arc::new(RwLock::new(HashMap::new())),
571 event_tx,
572 listen_addrs: RwLock::new(Vec::new()),
573 start_time: Instant::now(),
574 running: RwLock::new(false),
575 dht: None,
576 resource_manager: None,
577 bootstrap_manager: None,
578 dual_node: {
579 let v6: Option<std::net::SocketAddr> = "[::1]:0"
581 .parse()
582 .ok()
583 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
584 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
585 let handle = tokio::runtime::Handle::current();
586 let dual_attempt = handle.block_on(
587 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
588 );
589 let dual = match dual_attempt {
590 Ok(d) => d,
591 Err(_e1) => {
592 let fallback = handle.block_on(
594 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
595 None,
596 "127.0.0.1:0".parse().ok(),
597 ),
598 );
599 match fallback {
600 Ok(d) => d,
601 Err(e2) => {
602 return Err(P2PError::Network(NetworkError::BindError(
603 format!("Failed to create dual-stack network node: {}", e2)
604 .into(),
605 )));
606 }
607 }
608 }
609 };
610 Arc::new(dual)
611 },
612 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
613 max_requests: 100,
614 burst_size: 100,
615 window: std::time::Duration::from_secs(1),
616 ..Default::default()
617 })),
618 active_connections: Arc::new(RwLock::new(HashSet::new())),
619 connection_monitor_handle: Arc::new(RwLock::new(None)),
620 keepalive_handle: Arc::new(RwLock::new(None)),
621 shutdown: Arc::new(AtomicBool::new(false)),
622 geo_provider: Arc::new(BgpGeoProvider::new()),
623 security_dashboard: None,
624 })
625 }
626 pub async fn new(config: NodeConfig) -> Result<Self> {
628 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
629 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
631 });
632
633 let (event_tx, _) = broadcast::channel(1000);
634
635 {
638 use blake3::Hasher;
639 let mut hasher = Hasher::new();
640 hasher.update(peer_id.as_bytes());
641 let digest = hasher.finalize();
642 let mut nid = [0u8; 32];
643 nid.copy_from_slice(digest.as_bytes());
644 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
645 crate::identity::node_identity::NodeId::from_bytes(nid),
646 ));
647 }
650
651 let (dht, security_dashboard) = if true {
653 let _dht_config = crate::dht::DHTConfig {
655 replication_factor: config.dht_config.k_value,
656 bucket_size: config.dht_config.k_value,
657 alpha: config.dht_config.alpha_value,
658 record_ttl: config.dht_config.record_ttl,
659 bucket_refresh_interval: config.dht_config.refresh_interval,
660 republish_interval: config.dht_config.refresh_interval,
661 max_distance: 160,
662 };
663 let peer_bytes = peer_id.as_bytes();
665 let mut node_id_bytes = [0u8; 32];
666 let len = peer_bytes.len().min(32);
667 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
668 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
669 let dht_instance = DHT::new(node_id).map_err(|e| {
670 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
671 e.to_string().into(),
672 ))
673 })?;
674 dht_instance.start_maintenance_tasks();
675
676 let security_metrics = dht_instance.security_metrics();
678 let dashboard = crate::dht::metrics::SecurityDashboard::new(
679 security_metrics,
680 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
681 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
682 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
683 );
684
685 (
686 Some(Arc::new(RwLock::new(dht_instance))),
687 Some(Arc::new(dashboard)),
688 )
689 } else {
690 (None, None)
691 };
692
693 let resource_manager = config
697 .production_config
698 .clone()
699 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
700
701 let diversity_config = config.diversity_config.clone().unwrap_or_default();
703 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
704 match BootstrapManager::with_full_config(
705 cache_config.clone(),
706 crate::rate_limit::JoinRateLimiterConfig::default(),
707 diversity_config.clone(),
708 )
709 .await
710 {
711 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
712 Err(e) => {
713 warn!(
714 "Failed to initialize bootstrap manager: {}, continuing without cache",
715 e
716 );
717 None
718 }
719 }
720 } else {
721 match BootstrapManager::with_full_config(
722 crate::bootstrap::CacheConfig::default(),
723 crate::rate_limit::JoinRateLimiterConfig::default(),
724 diversity_config,
725 )
726 .await
727 {
728 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
729 Err(e) => {
730 warn!(
731 "Failed to initialize bootstrap manager: {}, continuing without cache",
732 e
733 );
734 None
735 }
736 }
737 };
738
739 let (v6_opt, v4_opt) = {
742 let port = config.listen_addr.port();
743 let ip = config.listen_addr.ip();
744
745 let v4_addr = if ip.is_ipv4() {
746 Some(std::net::SocketAddr::new(ip, port))
747 } else {
748 Some(std::net::SocketAddr::new(
751 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
752 port,
753 ))
754 };
755
756 let v6_addr = if config.enable_ipv6 {
757 if ip.is_ipv6() {
758 Some(std::net::SocketAddr::new(ip, port))
759 } else {
760 Some(std::net::SocketAddr::new(
761 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
762 port,
763 ))
764 }
765 } else {
766 None
767 };
768 (v6_addr, v4_addr)
769 };
770
771 let dual_node = Arc::new(
772 DualStackNetworkNode::new(v6_opt, v4_opt)
773 .await
774 .map_err(|e| {
775 P2PError::Transport(crate::error::TransportError::SetupFailed(
776 format!("Failed to create dual-stack network nodes: {}", e).into(),
777 ))
778 })?,
779 );
780
781 let rate_limiter = Arc::new(RateLimiter::new(
783 crate::validation::RateLimitConfig::default(),
784 ));
785
786 let active_connections = Arc::new(RwLock::new(HashSet::new()));
788
789 let geo_provider = Arc::new(BgpGeoProvider::new());
791
792 let peers = Arc::new(RwLock::new(HashMap::new()));
794
795 let connection_monitor_handle = {
797 let active_conns = Arc::clone(&active_connections);
798 let peers_map = Arc::clone(&peers);
799 let event_tx_clone = event_tx.clone();
800 let dual_node_clone = Arc::clone(&dual_node);
801 let geo_provider_clone = Arc::clone(&geo_provider);
802 let peer_id_clone = peer_id.clone();
803
804 let handle = tokio::spawn(async move {
805 Self::connection_lifecycle_monitor(
806 dual_node_clone,
807 active_conns,
808 peers_map,
809 event_tx_clone,
810 geo_provider_clone,
811 peer_id_clone,
812 )
813 .await;
814 });
815
816 Arc::new(RwLock::new(Some(handle)))
817 };
818
819 let shutdown = Arc::new(AtomicBool::new(false));
821 let keepalive_handle = {
822 let active_conns = Arc::clone(&active_connections);
823 let dual_node_clone = Arc::clone(&dual_node);
824 let shutdown_clone = Arc::clone(&shutdown);
825
826 let handle = tokio::spawn(async move {
827 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
828 });
829
830 Arc::new(RwLock::new(Some(handle)))
831 };
832
833 let node = Self {
834 config,
835 peer_id,
836 peers,
837 event_tx,
838 listen_addrs: RwLock::new(Vec::new()),
839 start_time: Instant::now(),
840 running: RwLock::new(false),
841 dht,
842 resource_manager,
843 bootstrap_manager,
844 dual_node,
845 rate_limiter,
846 active_connections,
847 security_dashboard,
848 connection_monitor_handle,
849 keepalive_handle,
850 shutdown,
851 geo_provider,
852 };
853 info!("Created P2P node with peer ID: {}", node.peer_id);
854
855 node.start_network_listeners().await?;
857
858 node.start_connection_monitor().await;
860
861 Ok(node)
862 }
863
864 pub fn builder() -> NodeBuilder {
866 NodeBuilder::new()
867 }
868
869 pub fn peer_id(&self) -> &PeerId {
871 &self.peer_id
872 }
873
874 pub fn local_addr(&self) -> Option<String> {
875 self.listen_addrs
876 .try_read()
877 .ok()
878 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
879 }
880
881 pub async fn subscribe(&self, topic: &str) -> Result<()> {
882 info!("Subscribed to topic: {}", topic);
885 Ok(())
886 }
887
888 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
889 info!(
890 "Publishing message to topic: {} ({} bytes)",
891 topic,
892 data.len()
893 );
894
895 let peer_list: Vec<PeerId> = {
897 let peers_guard = self.peers.read().await;
898 peers_guard.keys().cloned().collect()
899 };
900
901 if peer_list.is_empty() {
902 debug!("No peers connected, message will only be sent to local subscribers");
903 } else {
904 let mut send_count = 0;
906 for peer_id in &peer_list {
907 match self.send_message(peer_id, topic, data.to_vec()).await {
908 Ok(_) => {
909 send_count += 1;
910 debug!("Sent message to peer: {}", peer_id);
911 }
912 Err(e) => {
913 warn!("Failed to send message to peer {}: {}", peer_id, e);
914 }
915 }
916 }
917 info!(
918 "Published message to {}/{} connected peers",
919 send_count,
920 peer_list.len()
921 );
922 }
923
924 let event = P2PEvent::Message {
926 topic: topic.to_string(),
927 source: self.peer_id.clone(),
928 data: data.to_vec(),
929 };
930 let _ = self.event_tx.send(event);
931
932 Ok(())
933 }
934
935 pub fn config(&self) -> &NodeConfig {
937 &self.config
938 }
939
940 pub async fn start(&self) -> Result<()> {
942 info!("Starting P2P node...");
943
944 if let Some(ref resource_manager) = self.resource_manager {
946 resource_manager.start().await.map_err(|e| {
947 P2PError::Network(crate::error::NetworkError::ProtocolError(
948 format!("Failed to start resource manager: {e}").into(),
949 ))
950 })?;
951 info!("Production resource manager started");
952 }
953
954 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
956 let mut manager = bootstrap_manager.write().await;
957 manager.start_background_tasks().await.map_err(|e| {
958 P2PError::Network(crate::error::NetworkError::ProtocolError(
959 format!("Failed to start bootstrap manager: {e}").into(),
960 ))
961 })?;
962 info!("Bootstrap cache manager started");
963 }
964
965 *self.running.write().await = true;
967
968 self.start_network_listeners().await?;
970
971 let listen_addrs = self.listen_addrs.read().await;
973 info!("P2P node started on addresses: {:?}", *listen_addrs);
974
975 self.start_message_receiving_system().await?;
979
980 self.connect_bootstrap_peers().await?;
982
983 Ok(())
984 }
985
986 async fn start_network_listeners(&self) -> Result<()> {
988 info!("Starting dual-stack listeners (ant-quic)...");
989 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
991 P2PError::Transport(crate::error::TransportError::SetupFailed(
992 format!("Failed to get local addresses: {}", e).into(),
993 ))
994 })?;
995 {
996 let mut la = self.listen_addrs.write().await;
997 *la = addrs.clone();
998 }
999
1000 let event_tx = self.event_tx.clone();
1002 let peers = self.peers.clone();
1003 let active_connections = self.active_connections.clone();
1004 let rate_limiter = self.rate_limiter.clone();
1005 let dual = self.dual_node.clone();
1006 tokio::spawn(async move {
1007 loop {
1008 match dual.accept_any().await {
1009 Ok((ant_peer_id, remote_sock)) => {
1010 let peer_id =
1011 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1012 let remote_addr = NetworkAddress::from(remote_sock);
1013 let _ = rate_limiter.check_ip(&remote_sock.ip());
1015 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1016 register_new_peer(&peers, &peer_id, &remote_addr).await;
1017 active_connections.write().await.insert(peer_id);
1018 }
1019 Err(e) => {
1020 warn!("Accept failed: {}", e);
1021 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1022 }
1023 }
1024 }
1025 });
1026
1027 info!("Dual-stack listeners active on: {:?}", addrs);
1028 Ok(())
1029 }
1030
1031 #[allow(dead_code)]
1033 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1034 warn!("QUIC transport temporarily disabled during ant-quic migration");
1073 Err(crate::P2PError::Transport(
1075 crate::error::TransportError::SetupFailed(
1076 format!(
1077 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1078 )
1079 .into(),
1080 ),
1081 ))
1082 }
1083
1084 #[allow(dead_code)] async fn start_connection_acceptor(
1087 &self,
1088 transport: Arc<dyn crate::transport::Transport>,
1089 addr: std::net::SocketAddr,
1090 transport_type: crate::transport::TransportType,
1091 ) -> Result<()> {
1092 info!(
1093 "Starting connection acceptor for {:?} on {}",
1094 transport_type, addr
1095 );
1096
1097 let event_tx = self.event_tx.clone();
1099 let _peer_id = self.peer_id.clone();
1100 let peers = Arc::clone(&self.peers);
1101 let rate_limiter = Arc::clone(&self.rate_limiter);
1104
1105 tokio::spawn(async move {
1107 loop {
1108 match transport.accept().await {
1109 Ok(connection) => {
1110 let remote_addr = connection.remote_addr();
1111 let connection_peer_id =
1112 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1113
1114 let socket_addr = remote_addr.socket_addr();
1116 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1117 continue;
1119 }
1120
1121 info!(
1122 "Accepted {:?} connection from {} (peer: {})",
1123 transport_type, remote_addr, connection_peer_id
1124 );
1125
1126 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1128
1129 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1131
1132 spawn_connection_handler(
1134 connection,
1135 connection_peer_id,
1136 event_tx.clone(),
1137 Arc::clone(&peers),
1138 );
1139 }
1140 Err(e) => {
1141 warn!(
1142 "Failed to accept {:?} connection on {}: {}",
1143 transport_type, addr, e
1144 );
1145
1146 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1148 }
1149 }
1150 }
1151 });
1152
1153 info!(
1154 "Connection acceptor background task started for {:?} on {}",
1155 transport_type, addr
1156 );
1157 Ok(())
1158 }
1159
1160 async fn start_message_receiving_system(&self) -> Result<()> {
1162 info!("Starting message receiving system");
1163 let dual = self.dual_node.clone();
1164 let event_tx = self.event_tx.clone();
1165
1166 tokio::spawn(async move {
1167 loop {
1168 match dual.receive_any().await {
1169 Ok((_peer_id, bytes)) => {
1170 #[allow(clippy::collapsible_if)]
1172 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1173 if let (Some(protocol), Some(data), Some(from)) = (
1174 value.get("protocol").and_then(|v| v.as_str()),
1175 value.get("data").and_then(|v| v.as_array()),
1176 value.get("from").and_then(|v| v.as_str()),
1177 ) {
1178 let payload: Vec<u8> = data
1179 .iter()
1180 .filter_map(|v| v.as_u64().map(|n| n as u8))
1181 .collect();
1182 let _ = event_tx.send(P2PEvent::Message {
1183 topic: protocol.to_string(),
1184 source: from.to_string(),
1185 data: payload,
1186 });
1187 }
1188 }
1189 }
1190 Err(e) => {
1191 warn!("Receive error: {}", e);
1192 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1193 }
1194 }
1195 }
1196 });
1197
1198 Ok(())
1199 }
1200
1201 #[allow(dead_code)]
1203 async fn handle_received_message(
1204 &self,
1205 message_data: Vec<u8>,
1206 peer_id: &PeerId,
1207 _protocol: &str,
1208 event_tx: &broadcast::Sender<P2PEvent>,
1209 ) -> Result<()> {
1210 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1214 Ok(message) => {
1215 if let (Some(protocol), Some(data), Some(from)) = (
1216 message.get("protocol").and_then(|v| v.as_str()),
1217 message.get("data").and_then(|v| v.as_array()),
1218 message.get("from").and_then(|v| v.as_str()),
1219 ) {
1220 let data_bytes: Vec<u8> = data
1222 .iter()
1223 .filter_map(|v| v.as_u64().map(|n| n as u8))
1224 .collect();
1225
1226 let event = P2PEvent::Message {
1228 topic: protocol.to_string(),
1229 source: from.to_string(),
1230 data: data_bytes,
1231 };
1232
1233 let _ = event_tx.send(event);
1234 debug!("Generated message event from peer: {}", peer_id);
1235 }
1236 }
1237 Err(e) => {
1238 warn!("Failed to parse received message from {}: {}", peer_id, e);
1239 }
1240 }
1241
1242 Ok(())
1243 }
1244
1245 pub async fn run(&self) -> Result<()> {
1251 if !*self.running.read().await {
1252 self.start().await?;
1253 }
1254
1255 info!("P2P node running...");
1256
1257 loop {
1259 if !*self.running.read().await {
1260 break;
1261 }
1262
1263 self.periodic_tasks().await?;
1265
1266 tokio::time::sleep(Duration::from_millis(100)).await;
1268 }
1269
1270 info!("P2P node stopped");
1271 Ok(())
1272 }
1273
1274 pub async fn stop(&self) -> Result<()> {
1276 info!("Stopping P2P node...");
1277
1278 *self.running.write().await = false;
1280
1281 self.disconnect_all_peers().await?;
1283
1284 if let Some(ref resource_manager) = self.resource_manager {
1286 resource_manager.shutdown().await.map_err(|e| {
1287 P2PError::Network(crate::error::NetworkError::ProtocolError(
1288 format!("Failed to shutdown resource manager: {e}").into(),
1289 ))
1290 })?;
1291 info!("Production resource manager stopped");
1292 }
1293
1294 info!("P2P node stopped");
1295 Ok(())
1296 }
1297
1298 pub async fn shutdown(&self) -> Result<()> {
1300 self.stop().await
1301 }
1302
1303 pub async fn is_running(&self) -> bool {
1305 *self.running.read().await
1306 }
1307
1308 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1310 self.listen_addrs.read().await.clone()
1311 }
1312
1313 pub async fn connected_peers(&self) -> Vec<PeerId> {
1315 self.peers.read().await.keys().cloned().collect()
1316 }
1317
1318 pub async fn peer_count(&self) -> usize {
1320 self.peers.read().await.len()
1321 }
1322
1323 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1325 self.peers.read().await.get(peer_id).cloned()
1326 }
1327
1328 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1340 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1342
1343 let peers = self.peers.read().await;
1344
1345 for (peer_id, peer_info) in peers.iter() {
1347 for peer_addr in &peer_info.addresses {
1349 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1350 && peer_socket == socket_addr
1351 {
1352 return Some(peer_id.clone());
1353 }
1354 }
1355 }
1356
1357 None
1358 }
1359
1360 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1366 let peers = self.peers.read().await;
1367
1368 peers
1369 .iter()
1370 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1371 .collect()
1372 }
1373
1374 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1386 self.active_connections.write().await.remove(peer_id);
1388 self.peers.write().await.remove(peer_id).is_some()
1390 }
1391
1392 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1405 self.peers.read().await.contains_key(peer_id)
1406 }
1407
1408 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1410 info!("Connecting to peer at: {}", address);
1411
1412 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1414 Some(resource_manager.acquire_connection().await?)
1415 } else {
1416 None
1417 };
1418
1419 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1421 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1422 format!("{}: {}", address, e).into(),
1423 ))
1424 })?;
1425
1426 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1429 if normalized_addr != socket_addr {
1430 info!(
1431 "Normalized wildcard address {} to loopback {}",
1432 socket_addr, normalized_addr
1433 );
1434 }
1435
1436 let addr_list = vec![normalized_addr];
1438 let peer_id = match tokio::time::timeout(
1439 self.config.connection_timeout,
1440 self.dual_node.connect_happy_eyeballs(&addr_list),
1441 )
1442 .await
1443 {
1444 Ok(Ok(peer)) => {
1445 let connected_peer_id =
1446 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1447 info!("Successfully connected to peer: {}", connected_peer_id);
1448 connected_peer_id
1449 }
1450 Ok(Err(e)) => {
1451 warn!("Failed to connect to peer at {}: {}", address, e);
1452 let sanitized_address = address.replace(['/', ':'], "_");
1453 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1454 warn!(
1455 "Using demo peer ID: {} (transport connection failed)",
1456 demo_peer_id
1457 );
1458 demo_peer_id
1459 }
1460 Err(_) => {
1461 warn!(
1462 "Timed out connecting to peer at {} after {:?}",
1463 address, self.config.connection_timeout
1464 );
1465 let sanitized_address = address.replace(['/', ':'], "_");
1466 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1467 demo_peer_id
1468 }
1469 };
1470
1471 let peer_info = PeerInfo {
1473 peer_id: peer_id.clone(),
1474 addresses: vec![address.to_string()],
1475 connected_at: Instant::now(),
1476 last_seen: Instant::now(),
1477 status: ConnectionStatus::Connected,
1478 protocols: vec!["p2p-foundation/1.0".to_string()],
1479 heartbeat_count: 0,
1480 };
1481
1482 self.peers.write().await.insert(peer_id.clone(), peer_info);
1484
1485 self.active_connections
1488 .write()
1489 .await
1490 .insert(peer_id.clone());
1491
1492 if let Some(ref resource_manager) = self.resource_manager {
1494 resource_manager.record_bandwidth(0, 0); }
1496
1497 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1499
1500 info!("Connected to peer: {}", peer_id);
1501 Ok(peer_id)
1502 }
1503
1504 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1506 info!("Disconnecting from peer: {}", peer_id);
1507
1508 self.active_connections.write().await.remove(peer_id);
1510
1511 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1512 peer_info.status = ConnectionStatus::Disconnected;
1513
1514 let _ = self
1516 .event_tx
1517 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1518
1519 info!("Disconnected from peer: {}", peer_id);
1520 }
1521
1522 Ok(())
1523 }
1524
1525 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1527 self.active_connections.read().await.contains(peer_id)
1528 }
1529
1530 pub async fn send_message(
1532 &self,
1533 peer_id: &PeerId,
1534 protocol: &str,
1535 data: Vec<u8>,
1536 ) -> Result<()> {
1537 debug!(
1538 "Sending message to peer {} on protocol {}",
1539 peer_id, protocol
1540 );
1541
1542 if let Some(ref resource_manager) = self.resource_manager
1544 && !resource_manager
1545 .check_rate_limit(peer_id, "message")
1546 .await?
1547 {
1548 return Err(P2PError::ResourceExhausted(
1549 format!("Rate limit exceeded for peer {}", peer_id).into(),
1550 ));
1551 }
1552
1553 if !self.peers.read().await.contains_key(peer_id) {
1555 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1556 peer_id.to_string().into(),
1557 )));
1558 }
1559
1560 if !self.is_connection_active(peer_id).await {
1563 debug!(
1564 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1565 peer_id
1566 );
1567
1568 self.remove_peer(peer_id).await;
1570
1571 return Err(P2PError::Network(
1572 crate::error::NetworkError::ConnectionClosed {
1573 peer_id: peer_id.to_string().into(),
1574 },
1575 ));
1576 }
1577
1578 if let Some(ref resource_manager) = self.resource_manager {
1582 resource_manager.record_bandwidth(data.len() as u64, 0);
1583 }
1584
1585 let _message_data = self.create_protocol_message(protocol, data)?;
1587
1588 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1590 tokio::time::timeout(self.config.connection_timeout, send_fut)
1591 .await
1592 .map_err(|_| {
1593 P2PError::Transport(crate::error::TransportError::StreamError(
1594 "Timed out sending message".into(),
1595 ))
1596 })?
1597 .map_err(|e| {
1598 P2PError::Transport(crate::error::TransportError::StreamError(
1599 e.to_string().into(),
1600 ))
1601 })
1602 }
1603
1604 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1606 use serde_json::json;
1607
1608 let timestamp = std::time::SystemTime::now()
1609 .duration_since(std::time::UNIX_EPOCH)
1610 .map_err(|e| {
1611 P2PError::Network(NetworkError::ProtocolError(
1612 format!("System time error: {}", e).into(),
1613 ))
1614 })?
1615 .as_secs();
1616
1617 let message = json!({
1619 "protocol": protocol,
1620 "data": data,
1621 "from": self.peer_id,
1622 "timestamp": timestamp
1623 });
1624
1625 serde_json::to_vec(&message).map_err(|e| {
1626 P2PError::Transport(crate::error::TransportError::StreamError(
1627 format!("Failed to serialize message: {e}").into(),
1628 ))
1629 })
1630 }
1631
1632 }
1634
1635#[allow(dead_code)]
1637fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1638 use serde_json::json;
1639
1640 let timestamp = std::time::SystemTime::now()
1641 .duration_since(std::time::UNIX_EPOCH)
1642 .map_err(|e| {
1643 P2PError::Network(NetworkError::ProtocolError(
1644 format!("System time error: {}", e).into(),
1645 ))
1646 })?
1647 .as_secs();
1648
1649 let message = json!({
1651 "protocol": protocol,
1652 "data": data,
1653 "timestamp": timestamp
1654 });
1655
1656 serde_json::to_vec(&message).map_err(|e| {
1657 P2PError::Transport(crate::error::TransportError::StreamError(
1658 format!("Failed to serialize message: {e}").into(),
1659 ))
1660 })
1661}
1662
1663impl P2PNode {
1664 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1666 self.event_tx.subscribe()
1667 }
1668
1669 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1671 self.subscribe_events()
1672 }
1673
1674 pub fn uptime(&self) -> Duration {
1676 self.start_time.elapsed()
1677 }
1678
1679 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1689 if let Some(ref resource_manager) = self.resource_manager {
1690 Ok(resource_manager.get_metrics().await)
1691 } else {
1692 Err(P2PError::Network(
1693 crate::error::NetworkError::ProtocolError(
1694 "Production resource manager not enabled".to_string().into(),
1695 ),
1696 ))
1697 }
1698 }
1699
1700 async fn connection_lifecycle_monitor(
1703 dual_node: Arc<DualStackNetworkNode>,
1704 active_connections: Arc<RwLock<HashSet<String>>>,
1705 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1706 event_tx: broadcast::Sender<P2PEvent>,
1707 geo_provider: Arc<BgpGeoProvider>,
1708 local_peer_id: String,
1709 ) {
1710 use crate::transport::ant_quic_adapter::ConnectionEvent;
1711
1712 let mut event_rx = dual_node.subscribe_connection_events();
1713
1714 info!("Connection lifecycle monitor started");
1715
1716 loop {
1717 match event_rx.recv().await {
1718 Ok(event) => {
1719 match event {
1720 ConnectionEvent::Established {
1721 peer_id,
1722 remote_address,
1723 } => {
1724 let peer_id_str =
1725 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1726 debug!(
1727 "Connection established: peer={}, addr={}",
1728 peer_id_str, remote_address
1729 );
1730
1731 let ip = remote_address.ip();
1734 let is_rejected = match ip {
1735 std::net::IpAddr::V4(v4) => {
1736 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1738 geo_provider.is_hosting_asn(asn)
1739 || geo_provider.is_vpn_asn(asn)
1740 } else {
1741 false
1742 }
1743 }
1744 std::net::IpAddr::V6(v6) => {
1745 let info = geo_provider.lookup(v6);
1746 info.is_hosting_provider || info.is_vpn_provider
1747 }
1748 };
1749
1750 if is_rejected {
1751 info!(
1752 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1753 peer_id_str, remote_address
1754 );
1755
1756 let rejection = RejectionMessage {
1758 reason: RejectionReason::GeoIpPolicy,
1759 message:
1760 "Connection rejected: Hosting/VPN providers not allowed"
1761 .to_string(),
1762 suggested_target: None, };
1764
1765 if let Ok(data) = serde_json::to_vec(&rejection) {
1767 let timestamp = std::time::SystemTime::now()
1769 .duration_since(std::time::UNIX_EPOCH)
1770 .unwrap_or_default()
1771 .as_secs();
1772
1773 let message = serde_json::json!({
1774 "protocol": "control",
1775 "data": data,
1776 "from": local_peer_id,
1777 "timestamp": timestamp
1778 });
1779
1780 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1781 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1785
1786 tokio::task::yield_now().await;
1789 }
1790 }
1791
1792 continue;
1796 }
1797
1798 active_connections.write().await.insert(peer_id_str.clone());
1800
1801 let mut peers_lock = peers.write().await;
1803 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1804 peer_info.status = ConnectionStatus::Connected;
1805 peer_info.connected_at = Instant::now();
1806 } else {
1807 debug!("Registering new incoming peer: {}", peer_id_str);
1809 peers_lock.insert(
1810 peer_id_str.clone(),
1811 PeerInfo {
1812 peer_id: peer_id_str.clone(),
1813 addresses: vec![remote_address.to_string()],
1814 status: ConnectionStatus::Connected,
1815 last_seen: Instant::now(),
1816 connected_at: Instant::now(),
1817 protocols: Vec::new(),
1818 heartbeat_count: 0,
1819 },
1820 );
1821 }
1822
1823 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1825 }
1826 ConnectionEvent::Lost { peer_id, reason } => {
1827 let peer_id_str =
1828 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1829 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1830
1831 active_connections.write().await.remove(&peer_id_str);
1833
1834 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1836 peer_info.status = ConnectionStatus::Disconnected;
1837 peer_info.last_seen = Instant::now();
1838 }
1839
1840 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1842 }
1843 ConnectionEvent::Failed { peer_id, reason } => {
1844 let peer_id_str =
1845 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1846 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1847
1848 active_connections.write().await.remove(&peer_id_str);
1850
1851 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1853 peer_info.status = ConnectionStatus::Failed(reason.clone());
1854 }
1855
1856 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1858 }
1859 }
1860 }
1861 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1862 warn!(
1863 "Connection event monitor lagged, skipped {} events",
1864 skipped
1865 );
1866 continue;
1867 }
1868 Err(broadcast::error::RecvError::Closed) => {
1869 info!("Connection event channel closed, stopping monitor");
1870 break;
1871 }
1872 }
1873 }
1874
1875 info!("Connection lifecycle monitor stopped");
1876 }
1877
1878 async fn start_connection_monitor(&self) {
1880 debug!("Connection monitor already running from initialization");
1884 }
1885
1886 async fn keepalive_task(
1892 active_connections: Arc<RwLock<HashSet<String>>>,
1893 dual_node: Arc<DualStackNetworkNode>,
1894 shutdown: Arc<AtomicBool>,
1895 ) {
1896 use tokio::time::{Duration, interval};
1897
1898 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1902 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1903
1904 info!(
1905 "Keepalive task started (interval: {}s)",
1906 KEEPALIVE_INTERVAL_SECS
1907 );
1908
1909 loop {
1910 if shutdown.load(Ordering::Relaxed) {
1912 info!("Keepalive task shutting down");
1913 break;
1914 }
1915
1916 interval.tick().await;
1917
1918 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1920
1921 if peers.is_empty() {
1922 trace!("Keepalive: no active connections");
1923 continue;
1924 }
1925
1926 debug!("Sending keepalive to {} active connections", peers.len());
1927
1928 for peer_id in peers {
1930 match dual_node
1931 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1932 .await
1933 {
1934 Ok(_) => {
1935 trace!("Keepalive sent to peer: {}", peer_id);
1936 }
1937 Err(e) => {
1938 debug!(
1939 "Failed to send keepalive to peer {}: {} (connection may have closed)",
1940 peer_id, e
1941 );
1942 }
1944 }
1945 }
1946 }
1947
1948 info!("Keepalive task stopped");
1949 }
1950
1951 pub async fn health_check(&self) -> Result<()> {
1953 if let Some(ref resource_manager) = self.resource_manager {
1954 resource_manager.health_check().await
1955 } else {
1956 let peer_count = self.peer_count().await;
1958 if peer_count > self.config.max_connections {
1959 Err(P2PError::Network(
1960 crate::error::NetworkError::ProtocolError(
1961 format!("Too many connections: {peer_count}").into(),
1962 ),
1963 ))
1964 } else {
1965 Ok(())
1966 }
1967 }
1968 }
1969
1970 pub fn production_config(&self) -> Option<&ProductionConfig> {
1972 self.config.production_config.as_ref()
1973 }
1974
1975 pub fn is_production_mode(&self) -> bool {
1977 self.resource_manager.is_some()
1978 }
1979
1980 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1982 self.dht.as_ref()
1983 }
1984
1985 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1987 if let Some(ref dht) = self.dht {
1988 let mut dht_instance = dht.write().await;
1989 let dht_key = crate::dht::DhtKey::from_bytes(key);
1990 dht_instance
1991 .store(&dht_key, value.clone())
1992 .await
1993 .map_err(|e| {
1994 P2PError::Dht(crate::error::DhtError::StoreFailed(
1995 format!("{:?}: {e}", key).into(),
1996 ))
1997 })?;
1998
1999 Ok(())
2000 } else {
2001 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2002 "DHT not enabled".to_string().into(),
2003 )))
2004 }
2005 }
2006
2007 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2009 if let Some(ref dht) = self.dht {
2010 let dht_instance = dht.read().await;
2011 let dht_key = crate::dht::DhtKey::from_bytes(key);
2012 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2013 P2PError::Dht(crate::error::DhtError::StoreFailed(
2014 format!("Retrieve failed: {e}").into(),
2015 ))
2016 })?;
2017
2018 Ok(record_result)
2019 } else {
2020 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2021 "DHT not enabled".to_string().into(),
2022 )))
2023 }
2024 }
2025
2026 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2028 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2029 let mut manager = bootstrap_manager.write().await;
2030 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2031 .iter()
2032 .filter_map(|addr| addr.parse().ok())
2033 .collect();
2034 let contact = ContactEntry::new(peer_id, socket_addresses);
2035 manager.add_contact(contact).await.map_err(|e| {
2036 P2PError::Network(crate::error::NetworkError::ProtocolError(
2037 format!("Failed to add peer to bootstrap cache: {e}").into(),
2038 ))
2039 })?;
2040 }
2041 Ok(())
2042 }
2043
2044 pub async fn update_peer_metrics(
2046 &self,
2047 peer_id: &PeerId,
2048 success: bool,
2049 latency_ms: Option<u64>,
2050 _error: Option<String>,
2051 ) -> Result<()> {
2052 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2053 let mut manager = bootstrap_manager.write().await;
2054
2055 let metrics = QualityMetrics {
2057 success_rate: if success { 1.0 } else { 0.0 },
2058 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2059 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2061 last_successful_connection: if success {
2062 chrono::Utc::now()
2063 } else {
2064 chrono::Utc::now() - chrono::Duration::hours(1)
2065 },
2066 uptime_score: 0.5,
2067 };
2068
2069 manager
2070 .update_contact_metrics(peer_id, metrics)
2071 .await
2072 .map_err(|e| {
2073 P2PError::Network(crate::error::NetworkError::ProtocolError(
2074 format!("Failed to update peer metrics: {e}").into(),
2075 ))
2076 })?;
2077 }
2078 Ok(())
2079 }
2080
2081 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2083 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2084 let manager = bootstrap_manager.read().await;
2085 let stats = manager.get_stats().await.map_err(|e| {
2086 P2PError::Network(crate::error::NetworkError::ProtocolError(
2087 format!("Failed to get bootstrap stats: {e}").into(),
2088 ))
2089 })?;
2090 Ok(Some(stats))
2091 } else {
2092 Ok(None)
2093 }
2094 }
2095
2096 pub async fn cached_peer_count(&self) -> usize {
2098 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2099 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2100 {
2101 return stats.total_contacts;
2102 }
2103 0
2104 }
2105
2106 async fn connect_bootstrap_peers(&self) -> Result<()> {
2108 let mut bootstrap_contacts = Vec::new();
2109 let mut used_cache = false;
2110 let mut seen_addresses = std::collections::HashSet::new();
2111
2112 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2114 self.config.bootstrap_peers_str.clone()
2115 } else {
2116 self.config
2118 .bootstrap_peers
2119 .iter()
2120 .map(|addr| addr.to_string())
2121 .collect::<Vec<_>>()
2122 };
2123
2124 if !cli_bootstrap_peers.is_empty() {
2125 info!(
2126 "Using {} CLI-provided bootstrap peers (priority)",
2127 cli_bootstrap_peers.len()
2128 );
2129 for addr in &cli_bootstrap_peers {
2130 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2131 seen_addresses.insert(socket_addr);
2132 let contact = ContactEntry::new(
2133 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2134 vec![socket_addr],
2135 );
2136 bootstrap_contacts.push(contact);
2137 } else {
2138 warn!("Invalid bootstrap address format: {}", addr);
2139 }
2140 }
2141 }
2142
2143 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2145 let manager = bootstrap_manager.read().await;
2146 match manager.get_bootstrap_peers(20).await {
2147 Ok(contacts) => {
2149 if !contacts.is_empty() {
2150 let mut added_from_cache = 0;
2151 for contact in contacts {
2152 let new_addresses: Vec<_> = contact
2154 .addresses
2155 .iter()
2156 .filter(|addr| !seen_addresses.contains(addr))
2157 .copied()
2158 .collect();
2159
2160 if !new_addresses.is_empty() {
2161 for addr in &new_addresses {
2162 seen_addresses.insert(*addr);
2163 }
2164 let mut contact = contact.clone();
2165 contact.addresses = new_addresses;
2166 bootstrap_contacts.push(contact);
2167 added_from_cache += 1;
2168 }
2169 }
2170 if added_from_cache > 0 {
2171 info!(
2172 "Added {} cached bootstrap peers (supplementing CLI peers)",
2173 added_from_cache
2174 );
2175 used_cache = true;
2176 }
2177 }
2178 }
2179 Err(e) => {
2180 warn!("Failed to get cached bootstrap peers: {}", e);
2181 }
2182 }
2183 }
2184
2185 if bootstrap_contacts.is_empty() {
2186 info!("No bootstrap peers configured and no cached peers available");
2187 return Ok(());
2188 }
2189
2190 let mut successful_connections = 0;
2192 for contact in bootstrap_contacts {
2193 for addr in &contact.addresses {
2194 match self.connect_peer(&addr.to_string()).await {
2195 Ok(peer_id) => {
2196 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2197 successful_connections += 1;
2198
2199 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2201 let mut manager = bootstrap_manager.write().await;
2202 let mut updated_contact = contact.clone();
2203 updated_contact.peer_id = peer_id.clone();
2204 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2207 warn!("Failed to update bootstrap cache: {}", e);
2208 }
2209 }
2210 break; }
2212 Err(e) => {
2213 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2214
2215 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2217 let mut manager = bootstrap_manager.write().await;
2218 let mut updated_contact = contact.clone();
2219 updated_contact.update_connection_result(
2220 false,
2221 None,
2222 Some(e.to_string()),
2223 );
2224
2225 if let Err(e) = manager.add_contact(updated_contact).await {
2226 warn!("Failed to update bootstrap cache: {}", e);
2227 }
2228 }
2229 }
2230 }
2231 }
2232 }
2233
2234 if successful_connections == 0 {
2235 if !used_cache {
2236 warn!("Failed to connect to any bootstrap peers");
2237 }
2238 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2239 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2241 }));
2242 }
2243 info!(
2244 "Successfully connected to {} bootstrap peers",
2245 successful_connections
2246 );
2247
2248 Ok(())
2249 }
2250
2251 async fn disconnect_all_peers(&self) -> Result<()> {
2253 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2254
2255 for peer_id in peer_ids {
2256 self.disconnect_peer(&peer_id).await?;
2257 }
2258
2259 Ok(())
2260 }
2261
2262 async fn periodic_tasks(&self) -> Result<()> {
2264 Ok(())
2270 }
2271}
2272
2273#[async_trait::async_trait]
2275pub trait NetworkSender: Send + Sync {
2276 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2278
2279 fn local_peer_id(&self) -> &PeerId;
2281}
2282
2283#[derive(Clone)]
2285pub struct P2PNetworkSender {
2286 peer_id: PeerId,
2287 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2289}
2290
2291impl P2PNetworkSender {
2292 pub fn new(
2293 peer_id: PeerId,
2294 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2295 ) -> Self {
2296 Self { peer_id, send_tx }
2297 }
2298}
2299
2300#[async_trait::async_trait]
2302impl NetworkSender for P2PNetworkSender {
2303 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2305 self.send_tx
2306 .send((peer_id.clone(), protocol.to_string(), data))
2307 .map_err(|_| {
2308 P2PError::Network(crate::error::NetworkError::ProtocolError(
2309 "Failed to send message via channel".to_string().into(),
2310 ))
2311 })?;
2312 Ok(())
2313 }
2314
2315 fn local_peer_id(&self) -> &PeerId {
2317 &self.peer_id
2318 }
2319}
2320
2321pub struct NodeBuilder {
2323 config: NodeConfig,
2324}
2325
2326impl Default for NodeBuilder {
2327 fn default() -> Self {
2328 Self::new()
2329 }
2330}
2331
2332impl NodeBuilder {
2333 pub fn new() -> Self {
2335 Self {
2336 config: NodeConfig::default(),
2337 }
2338 }
2339
2340 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2342 self.config.peer_id = Some(peer_id);
2343 self
2344 }
2345
2346 pub fn listen_on(mut self, addr: &str) -> Self {
2348 if let Ok(multiaddr) = addr.parse() {
2349 self.config.listen_addrs.push(multiaddr);
2350 }
2351 self
2352 }
2353
2354 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2356 if let Ok(multiaddr) = addr.parse() {
2357 self.config.bootstrap_peers.push(multiaddr);
2358 }
2359 self.config.bootstrap_peers_str.push(addr.to_string());
2360 self
2361 }
2362
2363 pub fn with_ipv6(mut self, enable: bool) -> Self {
2365 self.config.enable_ipv6 = enable;
2366 self
2367 }
2368
2369 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2373 self.config.connection_timeout = timeout;
2374 self
2375 }
2376
2377 pub fn with_max_connections(mut self, max: usize) -> Self {
2379 self.config.max_connections = max;
2380 self
2381 }
2382
2383 pub fn with_production_mode(mut self) -> Self {
2385 self.config.production_config = Some(ProductionConfig::default());
2386 self
2387 }
2388
2389 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2391 self.config.production_config = Some(production_config);
2392 self
2393 }
2394
2395 pub fn with_diversity_config(
2397 mut self,
2398 diversity_config: crate::security::IPDiversityConfig,
2399 ) -> Self {
2400 self.config.diversity_config = Some(diversity_config);
2401 self
2402 }
2403
2404 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2406 self.config.dht_config = dht_config;
2407 self
2408 }
2409
2410 pub fn with_default_dht(mut self) -> Self {
2412 self.config.dht_config = DHTConfig::default();
2413 self
2414 }
2415
2416 pub async fn build(self) -> Result<P2PNode> {
2418 P2PNode::new(self.config).await
2419 }
2420}
2421
2422#[cfg(test)]
2423#[allow(clippy::unwrap_used, clippy::expect_used)]
2424mod diversity_tests {
2425 use super::*;
2426 use crate::security::IPDiversityConfig;
2427
2428 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2429 let diversity_config = config.diversity_config.clone().unwrap_or_default();
2430 if let Some(ref cache_config) = config.bootstrap_cache_config {
2431 BootstrapManager::with_full_config(
2432 cache_config.clone(),
2433 crate::rate_limit::JoinRateLimiterConfig::default(),
2434 diversity_config,
2435 )
2436 .await
2437 .expect("bootstrap manager")
2438 } else {
2439 BootstrapManager::with_full_config(
2440 crate::bootstrap::CacheConfig::default(),
2441 crate::rate_limit::JoinRateLimiterConfig::default(),
2442 diversity_config,
2443 )
2444 .await
2445 .expect("bootstrap manager")
2446 }
2447 }
2448
2449 #[tokio::test]
2450 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2451 let config = NodeConfig {
2452 diversity_config: Some(IPDiversityConfig::testnet()),
2453 ..Default::default()
2454 };
2455
2456 let manager = build_bootstrap_manager_like_prod(&config).await;
2457 assert!(manager.diversity_config().is_relaxed());
2458 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2459 }
2460}
2461
2462#[allow(dead_code)] async fn handle_received_message_standalone(
2465 message_data: Vec<u8>,
2466 peer_id: &PeerId,
2467 _protocol: &str,
2468 event_tx: &broadcast::Sender<P2PEvent>,
2469) -> Result<()> {
2470 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2472 Ok(message) => {
2473 if let (Some(protocol), Some(data), Some(from)) = (
2474 message.get("protocol").and_then(|v| v.as_str()),
2475 message.get("data").and_then(|v| v.as_array()),
2476 message.get("from").and_then(|v| v.as_str()),
2477 ) {
2478 let data_bytes: Vec<u8> = data
2480 .iter()
2481 .filter_map(|v| v.as_u64().map(|n| n as u8))
2482 .collect();
2483
2484 let event = P2PEvent::Message {
2486 topic: protocol.to_string(),
2487 source: from.to_string(),
2488 data: data_bytes,
2489 };
2490
2491 let _ = event_tx.send(event);
2492 debug!("Generated message event from peer: {}", peer_id);
2493 }
2494 }
2495 Err(e) => {
2496 warn!("Failed to parse received message from {}: {}", peer_id, e);
2497 }
2498 }
2499
2500 Ok(())
2501}
2502
2503#[allow(dead_code)]
2507fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2508 match create_protocol_message_static(protocol, data) {
2509 Ok(msg) => Some(msg),
2510 Err(e) => {
2511 warn!("Failed to create protocol message: {}", e);
2512 None
2513 }
2514 }
2515}
2516
2517#[allow(dead_code)]
2519async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2520 match result {
2521 Ok(_) => {
2522 debug!("Message sent to peer {} via transport layer", peer_id);
2523 }
2524 Err(e) => {
2525 warn!("Failed to send message to peer {}: {}", peer_id, e);
2526 }
2527 }
2528}
2529
2530#[allow(dead_code)] fn check_rate_limit(
2533 rate_limiter: &RateLimiter,
2534 socket_addr: &std::net::SocketAddr,
2535 remote_addr: &NetworkAddress,
2536) -> Result<()> {
2537 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2538 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2539 e
2540 })
2541}
2542
2543#[allow(dead_code)] async fn register_new_peer(
2546 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2547 peer_id: &PeerId,
2548 remote_addr: &NetworkAddress,
2549) {
2550 let mut peers_guard = peers.write().await;
2551 let peer_info = PeerInfo {
2552 peer_id: peer_id.clone(),
2553 addresses: vec![remote_addr.to_string()],
2554 connected_at: tokio::time::Instant::now(),
2555 last_seen: tokio::time::Instant::now(),
2556 status: ConnectionStatus::Connected,
2557 protocols: vec!["p2p-chat/1.0.0".to_string()],
2558 heartbeat_count: 0,
2559 };
2560 peers_guard.insert(peer_id.clone(), peer_info);
2561}
2562
2563#[allow(dead_code)] fn spawn_connection_handler(
2566 connection: Box<dyn crate::transport::Connection>,
2567 peer_id: PeerId,
2568 event_tx: broadcast::Sender<P2PEvent>,
2569 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2570) {
2571 tokio::spawn(async move {
2572 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2573 });
2574}
2575
2576#[allow(dead_code)] async fn handle_peer_connection(
2579 mut connection: Box<dyn crate::transport::Connection>,
2580 peer_id: PeerId,
2581 event_tx: broadcast::Sender<P2PEvent>,
2582 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2583) {
2584 loop {
2585 match connection.receive().await {
2586 Ok(message_data) => {
2587 debug!(
2588 "Received {} bytes from peer: {}",
2589 message_data.len(),
2590 peer_id
2591 );
2592
2593 if let Err(e) = handle_received_message_standalone(
2595 message_data,
2596 &peer_id,
2597 "unknown", &event_tx,
2599 )
2600 .await
2601 {
2602 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2603 }
2604 }
2605 Err(e) => {
2606 warn!("Failed to receive message from {}: {}", peer_id, e);
2607
2608 if !connection.is_alive().await {
2610 info!("Connection to {} is dead, removing peer", peer_id);
2611
2612 remove_peer(&peers, &peer_id).await;
2614
2615 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2617
2618 break; }
2620
2621 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2623 }
2624 }
2625 }
2626}
2627
2628#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2631 let mut peers_guard = peers.write().await;
2632 peers_guard.remove(peer_id);
2633}
2634
2635#[allow(dead_code)]
2637async fn update_peer_heartbeat(
2638 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2639 peer_id: &PeerId,
2640) -> Result<()> {
2641 let mut peers_guard = peers.write().await;
2642 match peers_guard.get_mut(peer_id) {
2643 Some(peer_info) => {
2644 peer_info.last_seen = Instant::now();
2645 peer_info.heartbeat_count += 1;
2646 Ok(())
2647 }
2648 None => {
2649 warn!("Received heartbeat from unknown peer: {}", peer_id);
2650 Err(P2PError::Network(NetworkError::PeerNotFound(
2651 format!("Peer {} not found", peer_id).into(),
2652 )))
2653 }
2654 }
2655}
2656
2657#[allow(dead_code)]
2659async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2660 if let Some(manager) = resource_manager {
2661 let metrics = manager.get_metrics().await;
2662 (metrics.memory_used, metrics.cpu_usage)
2663 } else {
2664 (0, 0.0)
2665 }
2666}
2667
2668#[cfg(test)]
2669mod tests {
2670 use super::*;
2671 use std::time::Duration;
2673 use tokio::time::timeout;
2674
2675 fn create_test_node_config() -> NodeConfig {
2681 NodeConfig {
2682 peer_id: Some("test_peer_123".to_string()),
2683 listen_addrs: vec![
2684 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2685 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2686 ],
2687 listen_addr: std::net::SocketAddr::new(
2688 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2689 0,
2690 ),
2691 bootstrap_peers: vec![],
2692 bootstrap_peers_str: vec![],
2693 enable_ipv6: true,
2694
2695 connection_timeout: Duration::from_millis(300),
2696 keep_alive_interval: Duration::from_secs(30),
2697 max_connections: 100,
2698 max_incoming_connections: 50,
2699 dht_config: DHTConfig::default(),
2700 security_config: SecurityConfig::default(),
2701 production_config: None,
2702 bootstrap_cache_config: None,
2703 diversity_config: None,
2704 }
2706 }
2707
2708 #[tokio::test]
2712 async fn test_node_config_default() {
2713 let config = NodeConfig::default();
2714
2715 assert!(config.peer_id.is_none());
2716 assert_eq!(config.listen_addrs.len(), 2);
2717 assert!(config.enable_ipv6);
2718 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2720 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2721 }
2722
2723 #[tokio::test]
2724 async fn test_dht_config_default() {
2725 let config = DHTConfig::default();
2726
2727 assert_eq!(config.k_value, 20);
2728 assert_eq!(config.alpha_value, 5);
2729 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2730 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2731 }
2732
2733 #[tokio::test]
2734 async fn test_security_config_default() {
2735 let config = SecurityConfig::default();
2736
2737 assert!(config.enable_noise);
2738 assert!(config.enable_tls);
2739 assert_eq!(config.trust_level, TrustLevel::Basic);
2740 }
2741
2742 #[test]
2743 fn test_trust_level_variants() {
2744 let _none = TrustLevel::None;
2746 let _basic = TrustLevel::Basic;
2747 let _full = TrustLevel::Full;
2748
2749 assert_eq!(TrustLevel::None, TrustLevel::None);
2751 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2752 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2753 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2754 }
2755
2756 #[test]
2757 fn test_connection_status_variants() {
2758 let connecting = ConnectionStatus::Connecting;
2759 let connected = ConnectionStatus::Connected;
2760 let disconnecting = ConnectionStatus::Disconnecting;
2761 let disconnected = ConnectionStatus::Disconnected;
2762 let failed = ConnectionStatus::Failed("test error".to_string());
2763
2764 assert_eq!(connecting, ConnectionStatus::Connecting);
2765 assert_eq!(connected, ConnectionStatus::Connected);
2766 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2767 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2768 assert_ne!(connecting, connected);
2769
2770 if let ConnectionStatus::Failed(msg) = failed {
2771 assert_eq!(msg, "test error");
2772 } else {
2773 panic!("Expected Failed status");
2774 }
2775 }
2776
2777 #[tokio::test]
2778 async fn test_node_creation() -> Result<()> {
2779 let config = create_test_node_config();
2780 let node = P2PNode::new(config).await?;
2781
2782 assert_eq!(node.peer_id(), "test_peer_123");
2783 assert!(!node.is_running().await);
2784 assert_eq!(node.peer_count().await, 0);
2785 assert!(node.connected_peers().await.is_empty());
2786
2787 Ok(())
2788 }
2789
2790 #[tokio::test]
2791 async fn test_node_creation_without_peer_id() -> Result<()> {
2792 let mut config = create_test_node_config();
2793 config.peer_id = None;
2794
2795 let node = P2PNode::new(config).await?;
2796
2797 assert!(node.peer_id().starts_with("peer_"));
2799 assert!(!node.is_running().await);
2800
2801 Ok(())
2802 }
2803
2804 #[tokio::test]
2805 async fn test_node_lifecycle() -> Result<()> {
2806 let config = create_test_node_config();
2807 let node = P2PNode::new(config).await?;
2808
2809 assert!(!node.is_running().await);
2811
2812 node.start().await?;
2814 assert!(node.is_running().await);
2815
2816 let listen_addrs = node.listen_addrs().await;
2818 assert!(
2819 !listen_addrs.is_empty(),
2820 "Expected at least one listening address"
2821 );
2822
2823 node.stop().await?;
2825 assert!(!node.is_running().await);
2826
2827 Ok(())
2828 }
2829
2830 #[tokio::test]
2831 async fn test_peer_connection() -> Result<()> {
2832 let config = create_test_node_config();
2833 let node = P2PNode::new(config).await?;
2834
2835 let peer_addr = "127.0.0.1:0";
2836
2837 let peer_id = node.connect_peer(peer_addr).await?;
2839 assert!(peer_id.starts_with("peer_from_"));
2840
2841 assert_eq!(node.peer_count().await, 1);
2843
2844 let connected_peers = node.connected_peers().await;
2846 assert_eq!(connected_peers.len(), 1);
2847 assert_eq!(connected_peers[0], peer_id);
2848
2849 let peer_info = node.peer_info(&peer_id).await;
2851 assert!(peer_info.is_some());
2852 let info = peer_info.expect("Peer info should exist after adding peer");
2853 assert_eq!(info.peer_id, peer_id);
2854 assert_eq!(info.status, ConnectionStatus::Connected);
2855 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2856
2857 node.disconnect_peer(&peer_id).await?;
2859 assert_eq!(node.peer_count().await, 0);
2860
2861 Ok(())
2862 }
2863
2864 #[tokio::test]
2865 async fn test_event_subscription() -> Result<()> {
2866 let config = create_test_node_config();
2867 let node = P2PNode::new(config).await?;
2868
2869 let mut events = node.subscribe_events();
2870 let peer_addr = "127.0.0.1:0";
2871
2872 let peer_id = node.connect_peer(peer_addr).await?;
2874
2875 let event = timeout(Duration::from_millis(100), events.recv()).await;
2877 assert!(event.is_ok());
2878
2879 let event_result = event
2880 .expect("Should receive event")
2881 .expect("Event should not be error");
2882 match event_result {
2883 P2PEvent::PeerConnected(event_peer_id) => {
2884 assert_eq!(event_peer_id, peer_id);
2885 }
2886 _ => panic!("Expected PeerConnected event"),
2887 }
2888
2889 node.disconnect_peer(&peer_id).await?;
2891
2892 let event = timeout(Duration::from_millis(100), events.recv()).await;
2894 assert!(event.is_ok());
2895
2896 let event_result = event
2897 .expect("Should receive event")
2898 .expect("Event should not be error");
2899 match event_result {
2900 P2PEvent::PeerDisconnected(event_peer_id) => {
2901 assert_eq!(event_peer_id, peer_id);
2902 }
2903 _ => panic!("Expected PeerDisconnected event"),
2904 }
2905
2906 Ok(())
2907 }
2908
2909 #[tokio::test]
2910 async fn test_message_sending() -> Result<()> {
2911 let mut config1 = create_test_node_config();
2913 config1.listen_addr =
2914 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2915 let node1 = P2PNode::new(config1).await?;
2916 node1.start().await?;
2917
2918 let mut config2 = create_test_node_config();
2919 config2.listen_addr =
2920 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2921 let node2 = P2PNode::new(config2).await?;
2922 node2.start().await?;
2923
2924 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2926
2927 let node2_addr = node2.local_addr().ok_or_else(|| {
2929 P2PError::Network(crate::error::NetworkError::ProtocolError(
2930 "No listening address".to_string().into(),
2931 ))
2932 })?;
2933
2934 let peer_id =
2936 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2937 Ok(res) => res?,
2938 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2939 };
2940
2941 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2943
2944 let message_data = b"Hello, peer!".to_vec();
2946 let result = match timeout(
2947 Duration::from_millis(500),
2948 node1.send_message(&peer_id, "test-protocol", message_data),
2949 )
2950 .await
2951 {
2952 Ok(res) => res,
2953 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2954 };
2955 if let Err(e) = &result {
2958 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2959 }
2960
2961 let non_existent_peer = "non_existent_peer".to_string();
2963 let result = node1
2964 .send_message(&non_existent_peer, "test-protocol", vec![])
2965 .await;
2966 assert!(result.is_err(), "Sending to non-existent peer should fail");
2967
2968 Ok(())
2969 }
2970
2971 #[tokio::test]
2972 async fn test_remote_mcp_operations() -> Result<()> {
2973 let config = create_test_node_config();
2974 let node = P2PNode::new(config).await?;
2975
2976 node.start().await?;
2978 node.stop().await?;
2979 Ok(())
2980 }
2981
2982 #[tokio::test]
2983 async fn test_health_check() -> Result<()> {
2984 let config = create_test_node_config();
2985 let node = P2PNode::new(config).await?;
2986
2987 let result = node.health_check().await;
2989 assert!(result.is_ok());
2990
2991 Ok(())
2996 }
2997
2998 #[tokio::test]
2999 async fn test_node_uptime() -> Result<()> {
3000 let config = create_test_node_config();
3001 let node = P2PNode::new(config).await?;
3002
3003 let uptime1 = node.uptime();
3004 assert!(uptime1 >= Duration::from_secs(0));
3005
3006 tokio::time::sleep(Duration::from_millis(10)).await;
3008
3009 let uptime2 = node.uptime();
3010 assert!(uptime2 > uptime1);
3011
3012 Ok(())
3013 }
3014
3015 #[tokio::test]
3016 async fn test_node_config_access() -> Result<()> {
3017 let config = create_test_node_config();
3018 let expected_peer_id = config.peer_id.clone();
3019 let node = P2PNode::new(config).await?;
3020
3021 let node_config = node.config();
3022 assert_eq!(node_config.peer_id, expected_peer_id);
3023 assert_eq!(node_config.max_connections, 100);
3024 Ok(())
3027 }
3028
3029 #[tokio::test]
3030 async fn test_mcp_server_access() -> Result<()> {
3031 let config = create_test_node_config();
3032 let _node = P2PNode::new(config).await?;
3033
3034 Ok(())
3036 }
3037
3038 #[tokio::test]
3039 async fn test_dht_access() -> Result<()> {
3040 let config = create_test_node_config();
3041 let node = P2PNode::new(config).await?;
3042
3043 assert!(node.dht().is_some());
3045
3046 Ok(())
3047 }
3048
3049 #[tokio::test]
3050 async fn test_node_builder() -> Result<()> {
3051 let builder = P2PNode::builder()
3053 .with_peer_id("builder_test_peer".to_string())
3054 .listen_on("/ip4/127.0.0.1/tcp/0")
3055 .listen_on("/ip6/::1/tcp/0")
3056 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3058 .with_connection_timeout(Duration::from_secs(15))
3059 .with_max_connections(200);
3060
3061 let config = builder.config;
3063 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3064 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3067 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3068 assert_eq!(config.max_connections, 200);
3069
3070 Ok(())
3071 }
3072
3073 #[tokio::test]
3074 async fn test_bootstrap_peers() -> Result<()> {
3075 let mut config = create_test_node_config();
3076 config.bootstrap_peers = vec![
3077 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3078 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3079 ];
3080
3081 let node = P2PNode::new(config).await?;
3082
3083 node.start().await?;
3085
3086 let _peer_count = node.peer_count().await;
3090
3091 node.stop().await?;
3092 Ok(())
3093 }
3094
3095 #[tokio::test]
3096 async fn test_production_mode_disabled() -> Result<()> {
3097 let config = create_test_node_config();
3098 let node = P2PNode::new(config).await?;
3099
3100 assert!(!node.is_production_mode());
3101 assert!(node.production_config().is_none());
3102
3103 let result = node.resource_metrics().await;
3105 assert!(result.is_err());
3106 assert!(result.unwrap_err().to_string().contains("not enabled"));
3107
3108 Ok(())
3109 }
3110
3111 #[tokio::test]
3112 async fn test_network_event_variants() {
3113 let peer_id = "test_peer".to_string();
3115 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3116
3117 let _peer_connected = NetworkEvent::PeerConnected {
3118 peer_id: peer_id.clone(),
3119 addresses: vec![address.clone()],
3120 };
3121
3122 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3123 peer_id: peer_id.clone(),
3124 reason: "test disconnect".to_string(),
3125 };
3126
3127 let _message_received = NetworkEvent::MessageReceived {
3128 peer_id: peer_id.clone(),
3129 protocol: "test-protocol".to_string(),
3130 data: vec![1, 2, 3],
3131 };
3132
3133 let _connection_failed = NetworkEvent::ConnectionFailed {
3134 peer_id: Some(peer_id.clone()),
3135 address: address.clone(),
3136 error: "connection refused".to_string(),
3137 };
3138
3139 let _dht_stored = NetworkEvent::DHTRecordStored {
3140 key: vec![1, 2, 3],
3141 value: vec![4, 5, 6],
3142 };
3143
3144 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3145 key: vec![1, 2, 3],
3146 value: Some(vec![4, 5, 6]),
3147 };
3148 }
3149
3150 #[tokio::test]
3151 async fn test_peer_info_structure() {
3152 let peer_info = PeerInfo {
3153 peer_id: "test_peer".to_string(),
3154 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3155 connected_at: Instant::now(),
3156 last_seen: Instant::now(),
3157 status: ConnectionStatus::Connected,
3158 protocols: vec!["test-protocol".to_string()],
3159 heartbeat_count: 0,
3160 };
3161
3162 assert_eq!(peer_info.peer_id, "test_peer");
3163 assert_eq!(peer_info.addresses.len(), 1);
3164 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3165 assert_eq!(peer_info.protocols.len(), 1);
3166 }
3167
3168 #[tokio::test]
3169 async fn test_serialization() -> Result<()> {
3170 let config = create_test_node_config();
3172 let serialized = serde_json::to_string(&config)?;
3173 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3174
3175 assert_eq!(config.peer_id, deserialized.peer_id);
3176 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3177 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3178
3179 Ok(())
3180 }
3181
3182 #[tokio::test]
3183 async fn test_get_peer_id_by_address_found() -> Result<()> {
3184 let config = create_test_node_config();
3185 let node = P2PNode::new(config).await?;
3186
3187 let test_peer_id = "peer_test_123".to_string();
3189 let test_address = "192.168.1.100:9000".to_string();
3190
3191 let peer_info = PeerInfo {
3192 peer_id: test_peer_id.clone(),
3193 addresses: vec![test_address.clone()],
3194 connected_at: Instant::now(),
3195 last_seen: Instant::now(),
3196 status: ConnectionStatus::Connected,
3197 protocols: vec!["test-protocol".to_string()],
3198 heartbeat_count: 0,
3199 };
3200
3201 node.peers
3202 .write()
3203 .await
3204 .insert(test_peer_id.clone(), peer_info);
3205
3206 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3208 assert_eq!(found_peer_id, Some(test_peer_id));
3209
3210 Ok(())
3211 }
3212
3213 #[tokio::test]
3214 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3215 let config = create_test_node_config();
3216 let node = P2PNode::new(config).await?;
3217
3218 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3220 assert_eq!(result, None);
3221
3222 Ok(())
3223 }
3224
3225 #[tokio::test]
3226 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3227 let config = create_test_node_config();
3228 let node = P2PNode::new(config).await?;
3229
3230 let result = node.get_peer_id_by_address("invalid-address").await;
3232 assert_eq!(result, None);
3233
3234 Ok(())
3235 }
3236
3237 #[tokio::test]
3238 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3239 let config = create_test_node_config();
3240 let node = P2PNode::new(config).await?;
3241
3242 let peer1_id = "peer_1".to_string();
3244 let peer1_addr = "192.168.1.101:9001".to_string();
3245
3246 let peer2_id = "peer_2".to_string();
3247 let peer2_addr = "192.168.1.102:9002".to_string();
3248
3249 let peer1_info = PeerInfo {
3250 peer_id: peer1_id.clone(),
3251 addresses: vec![peer1_addr.clone()],
3252 connected_at: Instant::now(),
3253 last_seen: Instant::now(),
3254 status: ConnectionStatus::Connected,
3255 protocols: vec!["test-protocol".to_string()],
3256 heartbeat_count: 0,
3257 };
3258
3259 let peer2_info = PeerInfo {
3260 peer_id: peer2_id.clone(),
3261 addresses: vec![peer2_addr.clone()],
3262 connected_at: Instant::now(),
3263 last_seen: Instant::now(),
3264 status: ConnectionStatus::Connected,
3265 protocols: vec!["test-protocol".to_string()],
3266 heartbeat_count: 0,
3267 };
3268
3269 node.peers
3270 .write()
3271 .await
3272 .insert(peer1_id.clone(), peer1_info);
3273 node.peers
3274 .write()
3275 .await
3276 .insert(peer2_id.clone(), peer2_info);
3277
3278 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3280 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3281
3282 assert_eq!(found_peer1, Some(peer1_id));
3283 assert_eq!(found_peer2, Some(peer2_id));
3284
3285 Ok(())
3286 }
3287
3288 #[tokio::test]
3289 async fn test_list_active_connections_empty() -> Result<()> {
3290 let config = create_test_node_config();
3291 let node = P2PNode::new(config).await?;
3292
3293 let connections = node.list_active_connections().await;
3295 assert!(connections.is_empty());
3296
3297 Ok(())
3298 }
3299
3300 #[tokio::test]
3301 async fn test_list_active_connections_with_peers() -> Result<()> {
3302 let config = create_test_node_config();
3303 let node = P2PNode::new(config).await?;
3304
3305 let peer1_id = "peer_1".to_string();
3307 let peer1_addrs = vec![
3308 "192.168.1.101:9001".to_string(),
3309 "192.168.1.101:9002".to_string(),
3310 ];
3311
3312 let peer2_id = "peer_2".to_string();
3313 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3314
3315 let peer1_info = PeerInfo {
3316 peer_id: peer1_id.clone(),
3317 addresses: peer1_addrs.clone(),
3318 connected_at: Instant::now(),
3319 last_seen: Instant::now(),
3320 status: ConnectionStatus::Connected,
3321 protocols: vec!["test-protocol".to_string()],
3322 heartbeat_count: 0,
3323 };
3324
3325 let peer2_info = PeerInfo {
3326 peer_id: peer2_id.clone(),
3327 addresses: peer2_addrs.clone(),
3328 connected_at: Instant::now(),
3329 last_seen: Instant::now(),
3330 status: ConnectionStatus::Connected,
3331 protocols: vec!["test-protocol".to_string()],
3332 heartbeat_count: 0,
3333 };
3334
3335 node.peers
3336 .write()
3337 .await
3338 .insert(peer1_id.clone(), peer1_info);
3339 node.peers
3340 .write()
3341 .await
3342 .insert(peer2_id.clone(), peer2_info);
3343
3344 let connections = node.list_active_connections().await;
3346 assert_eq!(connections.len(), 2);
3347
3348 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3350 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3351
3352 assert!(peer1_conn.is_some());
3353 assert!(peer2_conn.is_some());
3354
3355 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3357 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3358
3359 Ok(())
3360 }
3361
3362 #[tokio::test]
3363 async fn test_remove_peer_success() -> Result<()> {
3364 let config = create_test_node_config();
3365 let node = P2PNode::new(config).await?;
3366
3367 let peer_id = "peer_to_remove".to_string();
3369 let peer_info = PeerInfo {
3370 peer_id: peer_id.clone(),
3371 addresses: vec!["192.168.1.100:9000".to_string()],
3372 connected_at: Instant::now(),
3373 last_seen: Instant::now(),
3374 status: ConnectionStatus::Connected,
3375 protocols: vec!["test-protocol".to_string()],
3376 heartbeat_count: 0,
3377 };
3378
3379 node.peers.write().await.insert(peer_id.clone(), peer_info);
3380
3381 assert!(node.is_peer_connected(&peer_id).await);
3383
3384 let removed = node.remove_peer(&peer_id).await;
3386 assert!(removed);
3387
3388 assert!(!node.is_peer_connected(&peer_id).await);
3390
3391 Ok(())
3392 }
3393
3394 #[tokio::test]
3395 async fn test_remove_peer_nonexistent() -> Result<()> {
3396 let config = create_test_node_config();
3397 let node = P2PNode::new(config).await?;
3398
3399 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3401 assert!(!removed);
3402
3403 Ok(())
3404 }
3405
3406 #[tokio::test]
3407 async fn test_is_peer_connected() -> Result<()> {
3408 let config = create_test_node_config();
3409 let node = P2PNode::new(config).await?;
3410
3411 let peer_id = "test_peer".to_string();
3412
3413 assert!(!node.is_peer_connected(&peer_id).await);
3415
3416 let peer_info = PeerInfo {
3418 peer_id: peer_id.clone(),
3419 addresses: vec!["192.168.1.100:9000".to_string()],
3420 connected_at: Instant::now(),
3421 last_seen: Instant::now(),
3422 status: ConnectionStatus::Connected,
3423 protocols: vec!["test-protocol".to_string()],
3424 heartbeat_count: 0,
3425 };
3426
3427 node.peers.write().await.insert(peer_id.clone(), peer_info);
3428
3429 assert!(node.is_peer_connected(&peer_id).await);
3431
3432 node.remove_peer(&peer_id).await;
3434
3435 assert!(!node.is_peer_connected(&peer_id).await);
3437
3438 Ok(())
3439 }
3440
3441 #[test]
3442 fn test_normalize_ipv6_wildcard() {
3443 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3444
3445 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3446 let normalized = normalize_wildcard_to_loopback(wildcard);
3447
3448 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3449 assert_eq!(normalized.port(), 8080);
3450 }
3451
3452 #[test]
3453 fn test_normalize_ipv4_wildcard() {
3454 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3455
3456 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3457 let normalized = normalize_wildcard_to_loopback(wildcard);
3458
3459 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3460 assert_eq!(normalized.port(), 9000);
3461 }
3462
3463 #[test]
3464 fn test_normalize_specific_address_unchanged() {
3465 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3466 let normalized = normalize_wildcard_to_loopback(specific);
3467
3468 assert_eq!(normalized, specific);
3469 }
3470
3471 #[test]
3472 fn test_normalize_loopback_unchanged() {
3473 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3474
3475 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3476 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3477 assert_eq!(normalized_v6, loopback_v6);
3478
3479 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3480 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3481 assert_eq!(normalized_v4, loopback_v4);
3482 }
3483}