1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, info, trace, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub connection_timeout: Duration,
68
69 pub keep_alive_interval: Duration,
71
72 pub max_connections: usize,
74
75 pub max_incoming_connections: usize,
77
78 pub dht_config: DHTConfig,
80
81 pub security_config: SecurityConfig,
83
84 pub production_config: Option<ProductionConfig>,
86
87 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90 pub diversity_config: Option<crate::security::IPDiversityConfig>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct DHTConfig {
100 pub k_value: usize,
102
103 pub alpha_value: usize,
105
106 pub record_ttl: Duration,
108
109 pub refresh_interval: Duration,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SecurityConfig {
116 pub enable_noise: bool,
118
119 pub enable_tls: bool,
121
122 pub trust_level: TrustLevel,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
128pub enum TrustLevel {
129 None,
131 Basic,
133 Full,
135}
136
137impl NodeConfig {
138 pub fn new() -> Result<Self> {
144 let config = Config::default();
146
147 let listen_addr = config.listen_socket_addr()?;
149
150 let mut listen_addrs = vec![];
152
153 if config.network.ipv6_enabled {
155 let ipv6_addr = std::net::SocketAddr::new(
156 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
157 listen_addr.port(),
158 );
159 listen_addrs.push(ipv6_addr);
160 }
161
162 let ipv4_addr = std::net::SocketAddr::new(
164 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
165 listen_addr.port(),
166 );
167 listen_addrs.push(ipv4_addr);
168
169 Ok(Self {
170 peer_id: None,
171 listen_addrs,
172 listen_addr,
173 bootstrap_peers: Vec::new(),
174 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
175 enable_ipv6: config.network.ipv6_enabled,
176
177 connection_timeout: Duration::from_secs(config.network.connection_timeout),
178 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
179 max_connections: config.network.max_connections,
180 max_incoming_connections: config.security.connection_limit as usize,
181 dht_config: DHTConfig::default(),
182 security_config: SecurityConfig::default(),
183 production_config: None,
184 bootstrap_cache_config: None,
185 diversity_config: None,
186 })
188 }
189}
190
191impl Default for NodeConfig {
192 fn default() -> Self {
193 let config = Config::default();
195
196 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
198 std::net::SocketAddr::new(
199 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
200 9000,
201 )
202 });
203
204 Self {
205 peer_id: None,
206 listen_addrs: vec![
207 std::net::SocketAddr::new(
208 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
209 listen_addr.port(),
210 ),
211 std::net::SocketAddr::new(
212 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
213 listen_addr.port(),
214 ),
215 ],
216 listen_addr,
217 bootstrap_peers: Vec::new(),
218 bootstrap_peers_str: Vec::new(),
219 enable_ipv6: config.network.ipv6_enabled,
220
221 connection_timeout: Duration::from_secs(config.network.connection_timeout),
222 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
223 max_connections: config.network.max_connections,
224 max_incoming_connections: config.security.connection_limit as usize,
225 dht_config: DHTConfig::default(),
226 security_config: SecurityConfig::default(),
227 production_config: None, bootstrap_cache_config: None,
229 diversity_config: None,
230 }
232 }
233}
234
235impl NodeConfig {
236 pub fn from_config(config: &Config) -> Result<Self> {
238 let listen_addr = config.listen_socket_addr()?;
239 let bootstrap_addrs = config.bootstrap_addrs()?;
240
241 let mut node_config = Self {
242 peer_id: None,
243 listen_addrs: vec![listen_addr],
244 listen_addr,
245 bootstrap_peers: bootstrap_addrs
246 .iter()
247 .map(|addr| addr.socket_addr())
248 .collect(),
249 bootstrap_peers_str: config
250 .network
251 .bootstrap_nodes
252 .iter()
253 .map(|addr| addr.to_string())
254 .collect(),
255 enable_ipv6: config.network.ipv6_enabled,
256
257 connection_timeout: Duration::from_secs(config.network.connection_timeout),
258 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
259 max_connections: config.network.max_connections,
260 max_incoming_connections: config.security.connection_limit as usize,
261 dht_config: DHTConfig {
262 k_value: 20,
263 alpha_value: 3,
264 record_ttl: Duration::from_secs(3600),
265 refresh_interval: Duration::from_secs(900),
266 },
267 security_config: SecurityConfig {
268 enable_noise: true,
269 enable_tls: true,
270 trust_level: TrustLevel::Basic,
271 },
272 production_config: Some(ProductionConfig {
273 max_connections: config.network.max_connections,
274 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
277 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
278 health_check_interval: Duration::from_secs(30),
279 metrics_interval: Duration::from_secs(60),
280 enable_performance_tracking: true,
281 enable_auto_cleanup: true,
282 shutdown_timeout: Duration::from_secs(30),
283 rate_limits: crate::production::RateLimitConfig::default(),
284 }),
285 bootstrap_cache_config: None,
286 diversity_config: None,
287 };
292
293 if config.network.ipv6_enabled {
295 node_config.listen_addrs.push(std::net::SocketAddr::new(
296 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
297 listen_addr.port(),
298 ));
299 }
300
301 Ok(node_config)
302 }
303
304 pub fn with_listen_addr(addr: &str) -> Result<Self> {
306 let listen_addr: std::net::SocketAddr = addr
307 .parse()
308 .map_err(|e: std::net::AddrParseError| {
309 NetworkError::InvalidAddress(e.to_string().into())
310 })
311 .map_err(P2PError::Network)?;
312 let cfg = NodeConfig {
313 listen_addr,
314 listen_addrs: vec![listen_addr],
315 diversity_config: None,
316 ..Default::default()
317 };
318 Ok(cfg)
319 }
320}
321
322impl Default for DHTConfig {
323 fn default() -> Self {
324 Self {
325 k_value: 20,
326 alpha_value: 5,
327 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
330 }
331}
332
333impl Default for SecurityConfig {
334 fn default() -> Self {
335 Self {
336 enable_noise: true,
337 enable_tls: true,
338 trust_level: TrustLevel::Basic,
339 }
340 }
341}
342
343#[derive(Debug, Clone)]
345pub struct PeerInfo {
346 pub peer_id: PeerId,
348
349 pub addresses: Vec<String>,
351
352 pub connected_at: Instant,
354
355 pub last_seen: Instant,
357
358 pub status: ConnectionStatus,
360
361 pub protocols: Vec<String>,
363
364 pub heartbeat_count: u64,
366}
367
368#[derive(Debug, Clone, PartialEq)]
370pub enum ConnectionStatus {
371 Connecting,
373 Connected,
375 Disconnecting,
377 Disconnected,
379 Failed(String),
381}
382
383#[derive(Debug, Clone)]
385pub enum NetworkEvent {
386 PeerConnected {
388 peer_id: PeerId,
390 addresses: Vec<String>,
392 },
393
394 PeerDisconnected {
396 peer_id: PeerId,
398 reason: String,
400 },
401
402 MessageReceived {
404 peer_id: PeerId,
406 protocol: String,
408 data: Vec<u8>,
410 },
411
412 ConnectionFailed {
414 peer_id: Option<PeerId>,
416 address: String,
418 error: String,
420 },
421
422 DHTRecordStored {
424 key: Vec<u8>,
426 value: Vec<u8>,
428 },
429
430 DHTRecordRetrieved {
432 key: Vec<u8>,
434 value: Option<Vec<u8>>,
436 },
437}
438
439#[derive(Debug, Clone)]
444pub enum P2PEvent {
445 Message {
447 topic: String,
449 source: PeerId,
451 data: Vec<u8>,
453 },
454 PeerConnected(PeerId),
456 PeerDisconnected(PeerId),
458}
459
460pub struct P2PNode {
470 config: NodeConfig,
472
473 peer_id: PeerId,
475
476 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
478
479 event_tx: broadcast::Sender<P2PEvent>,
481
482 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
484
485 start_time: Instant,
487
488 running: RwLock<bool>,
490
491 dht: Option<Arc<RwLock<DHT>>>,
493
494 resource_manager: Option<Arc<ResourceManager>>,
496
497 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
499
500 dual_node: Arc<DualStackNetworkNode>,
502
503 #[allow(dead_code)]
505 rate_limiter: Arc<RateLimiter>,
506
507 active_connections: Arc<RwLock<HashSet<PeerId>>>,
510
511 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
513
514 #[allow(dead_code)]
516 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
517
518 #[allow(dead_code)]
520 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
521
522 #[allow(dead_code)]
524 shutdown: Arc<AtomicBool>,
525
526 #[allow(dead_code)]
528 geo_provider: Arc<BgpGeoProvider>,
529}
530
531fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
547 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
548
549 if addr.ip().is_unspecified() {
550 let loopback_ip = match addr {
552 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
555 std::net::SocketAddr::new(loopback_ip, addr.port())
556 } else {
557 addr
559 }
560}
561
562impl P2PNode {
563 pub fn new_for_tests() -> Result<Self> {
565 let (event_tx, _) = broadcast::channel(16);
566 Ok(Self {
567 config: NodeConfig::default(),
568 peer_id: "test_peer".to_string(),
569 peers: Arc::new(RwLock::new(HashMap::new())),
570 event_tx,
571 listen_addrs: RwLock::new(Vec::new()),
572 start_time: Instant::now(),
573 running: RwLock::new(false),
574 dht: None,
575 resource_manager: None,
576 bootstrap_manager: None,
577 dual_node: {
578 let v6: Option<std::net::SocketAddr> = "[::1]:0"
580 .parse()
581 .ok()
582 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
583 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
584 let handle = tokio::runtime::Handle::current();
585 let dual_attempt = handle.block_on(
586 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
587 );
588 let dual = match dual_attempt {
589 Ok(d) => d,
590 Err(_e1) => {
591 let fallback = handle.block_on(
593 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
594 None,
595 "127.0.0.1:0".parse().ok(),
596 ),
597 );
598 match fallback {
599 Ok(d) => d,
600 Err(e2) => {
601 return Err(P2PError::Network(NetworkError::BindError(
602 format!("Failed to create dual-stack network node: {}", e2)
603 .into(),
604 )));
605 }
606 }
607 }
608 };
609 Arc::new(dual)
610 },
611 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
612 max_requests: 100,
613 burst_size: 100,
614 window: std::time::Duration::from_secs(1),
615 ..Default::default()
616 })),
617 active_connections: Arc::new(RwLock::new(HashSet::new())),
618 connection_monitor_handle: Arc::new(RwLock::new(None)),
619 keepalive_handle: Arc::new(RwLock::new(None)),
620 shutdown: Arc::new(AtomicBool::new(false)),
621 geo_provider: Arc::new(BgpGeoProvider::new()),
622 security_dashboard: None,
623 })
624 }
625 pub async fn new(config: NodeConfig) -> Result<Self> {
627 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
628 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
630 });
631
632 let (event_tx, _) = broadcast::channel(1000);
633
634 {
637 use blake3::Hasher;
638 let mut hasher = Hasher::new();
639 hasher.update(peer_id.as_bytes());
640 let digest = hasher.finalize();
641 let mut nid = [0u8; 32];
642 nid.copy_from_slice(digest.as_bytes());
643 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
644 crate::identity::node_identity::NodeId::from_bytes(nid),
645 ));
646 }
649
650 let (dht, security_dashboard) = if true {
652 let _dht_config = crate::dht::DHTConfig {
654 replication_factor: config.dht_config.k_value,
655 bucket_size: config.dht_config.k_value,
656 alpha: config.dht_config.alpha_value,
657 record_ttl: config.dht_config.record_ttl,
658 bucket_refresh_interval: config.dht_config.refresh_interval,
659 republish_interval: config.dht_config.refresh_interval,
660 max_distance: 160,
661 };
662 let peer_bytes = peer_id.as_bytes();
664 let mut node_id_bytes = [0u8; 32];
665 let len = peer_bytes.len().min(32);
666 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
667 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
668 let dht_instance = DHT::new(node_id).map_err(|e| {
669 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
670 e.to_string().into(),
671 ))
672 })?;
673 dht_instance.start_maintenance_tasks();
674
675 let security_metrics = dht_instance.security_metrics();
677 let dashboard = crate::dht::metrics::SecurityDashboard::new(
678 security_metrics,
679 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
680 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
681 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
682 );
683
684 (
685 Some(Arc::new(RwLock::new(dht_instance))),
686 Some(Arc::new(dashboard)),
687 )
688 } else {
689 (None, None)
690 };
691
692 let resource_manager = config
696 .production_config
697 .clone()
698 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
699
700 let diversity_config = config.diversity_config.clone().unwrap_or_default();
702 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
703 match BootstrapManager::with_full_config(
704 cache_config.clone(),
705 crate::rate_limit::JoinRateLimiterConfig::default(),
706 diversity_config.clone(),
707 )
708 .await
709 {
710 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
711 Err(e) => {
712 warn!(
713 "Failed to initialize bootstrap manager: {}, continuing without cache",
714 e
715 );
716 None
717 }
718 }
719 } else {
720 match BootstrapManager::with_full_config(
721 crate::bootstrap::CacheConfig::default(),
722 crate::rate_limit::JoinRateLimiterConfig::default(),
723 diversity_config,
724 )
725 .await
726 {
727 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
728 Err(e) => {
729 warn!(
730 "Failed to initialize bootstrap manager: {}, continuing without cache",
731 e
732 );
733 None
734 }
735 }
736 };
737
738 let (v6_opt, v4_opt) = {
741 let port = config.listen_addr.port();
742 let ip = config.listen_addr.ip();
743
744 let v4_addr = if ip.is_ipv4() {
745 Some(std::net::SocketAddr::new(ip, port))
746 } else {
747 Some(std::net::SocketAddr::new(
750 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
751 port,
752 ))
753 };
754
755 let v6_addr = if config.enable_ipv6 {
756 if ip.is_ipv6() {
757 Some(std::net::SocketAddr::new(ip, port))
758 } else {
759 Some(std::net::SocketAddr::new(
760 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
761 port,
762 ))
763 }
764 } else {
765 None
766 };
767 (v6_addr, v4_addr)
768 };
769
770 let dual_node = Arc::new(
771 DualStackNetworkNode::new(v6_opt, v4_opt)
772 .await
773 .map_err(|e| {
774 P2PError::Transport(crate::error::TransportError::SetupFailed(
775 format!("Failed to create dual-stack network nodes: {}", e).into(),
776 ))
777 })?,
778 );
779
780 let rate_limiter = Arc::new(RateLimiter::new(
782 crate::validation::RateLimitConfig::default(),
783 ));
784
785 let active_connections = Arc::new(RwLock::new(HashSet::new()));
787
788 let geo_provider = Arc::new(BgpGeoProvider::new());
790
791 let peers = Arc::new(RwLock::new(HashMap::new()));
793
794 let connection_monitor_handle = {
796 let active_conns = Arc::clone(&active_connections);
797 let peers_map = Arc::clone(&peers);
798 let event_tx_clone = event_tx.clone();
799 let dual_node_clone = Arc::clone(&dual_node);
800 let geo_provider_clone = Arc::clone(&geo_provider);
801 let peer_id_clone = peer_id.clone();
802
803 let handle = tokio::spawn(async move {
804 Self::connection_lifecycle_monitor(
805 dual_node_clone,
806 active_conns,
807 peers_map,
808 event_tx_clone,
809 geo_provider_clone,
810 peer_id_clone,
811 )
812 .await;
813 });
814
815 Arc::new(RwLock::new(Some(handle)))
816 };
817
818 let shutdown = Arc::new(AtomicBool::new(false));
820 let keepalive_handle = {
821 let active_conns = Arc::clone(&active_connections);
822 let dual_node_clone = Arc::clone(&dual_node);
823 let shutdown_clone = Arc::clone(&shutdown);
824
825 let handle = tokio::spawn(async move {
826 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
827 });
828
829 Arc::new(RwLock::new(Some(handle)))
830 };
831
832 let node = Self {
833 config,
834 peer_id,
835 peers,
836 event_tx,
837 listen_addrs: RwLock::new(Vec::new()),
838 start_time: Instant::now(),
839 running: RwLock::new(false),
840 dht,
841 resource_manager,
842 bootstrap_manager,
843 dual_node,
844 rate_limiter,
845 active_connections,
846 security_dashboard,
847 connection_monitor_handle,
848 keepalive_handle,
849 shutdown,
850 geo_provider,
851 };
852 info!("Created P2P node with peer ID: {}", node.peer_id);
853
854 node.start_network_listeners().await?;
856
857 node.start_connection_monitor().await;
859
860 Ok(node)
861 }
862
863 pub fn builder() -> NodeBuilder {
865 NodeBuilder::new()
866 }
867
868 pub fn peer_id(&self) -> &PeerId {
870 &self.peer_id
871 }
872
873 pub fn local_addr(&self) -> Option<String> {
874 self.listen_addrs
875 .try_read()
876 .ok()
877 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
878 }
879
880 pub async fn subscribe(&self, topic: &str) -> Result<()> {
881 info!("Subscribed to topic: {}", topic);
884 Ok(())
885 }
886
887 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
888 info!(
889 "Publishing message to topic: {} ({} bytes)",
890 topic,
891 data.len()
892 );
893
894 let peer_list: Vec<PeerId> = {
896 let peers_guard = self.peers.read().await;
897 peers_guard.keys().cloned().collect()
898 };
899
900 if peer_list.is_empty() {
901 debug!("No peers connected, message will only be sent to local subscribers");
902 } else {
903 let mut send_count = 0;
905 for peer_id in &peer_list {
906 match self.send_message(peer_id, topic, data.to_vec()).await {
907 Ok(_) => {
908 send_count += 1;
909 debug!("Sent message to peer: {}", peer_id);
910 }
911 Err(e) => {
912 warn!("Failed to send message to peer {}: {}", peer_id, e);
913 }
914 }
915 }
916 info!(
917 "Published message to {}/{} connected peers",
918 send_count,
919 peer_list.len()
920 );
921 }
922
923 let event = P2PEvent::Message {
925 topic: topic.to_string(),
926 source: self.peer_id.clone(),
927 data: data.to_vec(),
928 };
929 let _ = self.event_tx.send(event);
930
931 Ok(())
932 }
933
934 pub fn config(&self) -> &NodeConfig {
936 &self.config
937 }
938
939 pub async fn start(&self) -> Result<()> {
941 info!("Starting P2P node...");
942
943 if let Some(ref resource_manager) = self.resource_manager {
945 resource_manager.start().await.map_err(|e| {
946 P2PError::Network(crate::error::NetworkError::ProtocolError(
947 format!("Failed to start resource manager: {e}").into(),
948 ))
949 })?;
950 info!("Production resource manager started");
951 }
952
953 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
955 let mut manager = bootstrap_manager.write().await;
956 manager.start_background_tasks().await.map_err(|e| {
957 P2PError::Network(crate::error::NetworkError::ProtocolError(
958 format!("Failed to start bootstrap manager: {e}").into(),
959 ))
960 })?;
961 info!("Bootstrap cache manager started");
962 }
963
964 *self.running.write().await = true;
966
967 self.start_network_listeners().await?;
969
970 let listen_addrs = self.listen_addrs.read().await;
972 info!("P2P node started on addresses: {:?}", *listen_addrs);
973
974 self.start_message_receiving_system().await?;
978
979 self.connect_bootstrap_peers().await?;
981
982 Ok(())
983 }
984
985 async fn start_network_listeners(&self) -> Result<()> {
987 info!("Starting dual-stack listeners (ant-quic)...");
988 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
990 P2PError::Transport(crate::error::TransportError::SetupFailed(
991 format!("Failed to get local addresses: {}", e).into(),
992 ))
993 })?;
994 {
995 let mut la = self.listen_addrs.write().await;
996 *la = addrs.clone();
997 }
998
999 let event_tx = self.event_tx.clone();
1001 let peers = self.peers.clone();
1002 let active_connections = self.active_connections.clone();
1003 let rate_limiter = self.rate_limiter.clone();
1004 let dual = self.dual_node.clone();
1005 tokio::spawn(async move {
1006 loop {
1007 match dual.accept_any().await {
1008 Ok((ant_peer_id, remote_sock)) => {
1009 let peer_id =
1010 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1011 let remote_addr = NetworkAddress::from(remote_sock);
1012 let _ = rate_limiter.check_ip(&remote_sock.ip());
1014 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1015 register_new_peer(&peers, &peer_id, &remote_addr).await;
1016 active_connections.write().await.insert(peer_id);
1017 }
1018 Err(e) => {
1019 warn!("Accept failed: {}", e);
1020 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1021 }
1022 }
1023 }
1024 });
1025
1026 info!("Dual-stack listeners active on: {:?}", addrs);
1027 Ok(())
1028 }
1029
1030 #[allow(dead_code)]
1032 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1033 warn!("QUIC transport temporarily disabled during ant-quic migration");
1072 Err(crate::P2PError::Transport(
1074 crate::error::TransportError::SetupFailed(
1075 format!(
1076 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1077 )
1078 .into(),
1079 ),
1080 ))
1081 }
1082
1083 #[allow(dead_code)] async fn start_connection_acceptor(
1086 &self,
1087 transport: Arc<dyn crate::transport::Transport>,
1088 addr: std::net::SocketAddr,
1089 transport_type: crate::transport::TransportType,
1090 ) -> Result<()> {
1091 info!(
1092 "Starting connection acceptor for {:?} on {}",
1093 transport_type, addr
1094 );
1095
1096 let event_tx = self.event_tx.clone();
1098 let _peer_id = self.peer_id.clone();
1099 let peers = Arc::clone(&self.peers);
1100 let rate_limiter = Arc::clone(&self.rate_limiter);
1103
1104 tokio::spawn(async move {
1106 loop {
1107 match transport.accept().await {
1108 Ok(connection) => {
1109 let remote_addr = connection.remote_addr();
1110 let connection_peer_id =
1111 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1112
1113 let socket_addr = remote_addr.socket_addr();
1115 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1116 continue;
1118 }
1119
1120 info!(
1121 "Accepted {:?} connection from {} (peer: {})",
1122 transport_type, remote_addr, connection_peer_id
1123 );
1124
1125 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1127
1128 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1130
1131 spawn_connection_handler(
1133 connection,
1134 connection_peer_id,
1135 event_tx.clone(),
1136 Arc::clone(&peers),
1137 );
1138 }
1139 Err(e) => {
1140 warn!(
1141 "Failed to accept {:?} connection on {}: {}",
1142 transport_type, addr, e
1143 );
1144
1145 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1147 }
1148 }
1149 }
1150 });
1151
1152 info!(
1153 "Connection acceptor background task started for {:?} on {}",
1154 transport_type, addr
1155 );
1156 Ok(())
1157 }
1158
1159 async fn start_message_receiving_system(&self) -> Result<()> {
1161 info!("Starting message receiving system");
1162 let dual = self.dual_node.clone();
1163 let event_tx = self.event_tx.clone();
1164
1165 tokio::spawn(async move {
1166 loop {
1167 match dual.receive_any().await {
1168 Ok((_peer_id, bytes)) => {
1169 #[allow(clippy::collapsible_if)]
1171 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1172 if let (Some(protocol), Some(data), Some(from)) = (
1173 value.get("protocol").and_then(|v| v.as_str()),
1174 value.get("data").and_then(|v| v.as_array()),
1175 value.get("from").and_then(|v| v.as_str()),
1176 ) {
1177 let payload: Vec<u8> = data
1178 .iter()
1179 .filter_map(|v| v.as_u64().map(|n| n as u8))
1180 .collect();
1181 let _ = event_tx.send(P2PEvent::Message {
1182 topic: protocol.to_string(),
1183 source: from.to_string(),
1184 data: payload,
1185 });
1186 }
1187 }
1188 }
1189 Err(e) => {
1190 warn!("Receive error: {}", e);
1191 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1192 }
1193 }
1194 }
1195 });
1196
1197 Ok(())
1198 }
1199
1200 #[allow(dead_code)]
1202 async fn handle_received_message(
1203 &self,
1204 message_data: Vec<u8>,
1205 peer_id: &PeerId,
1206 _protocol: &str,
1207 event_tx: &broadcast::Sender<P2PEvent>,
1208 ) -> Result<()> {
1209 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1213 Ok(message) => {
1214 if let (Some(protocol), Some(data), Some(from)) = (
1215 message.get("protocol").and_then(|v| v.as_str()),
1216 message.get("data").and_then(|v| v.as_array()),
1217 message.get("from").and_then(|v| v.as_str()),
1218 ) {
1219 let data_bytes: Vec<u8> = data
1221 .iter()
1222 .filter_map(|v| v.as_u64().map(|n| n as u8))
1223 .collect();
1224
1225 let event = P2PEvent::Message {
1227 topic: protocol.to_string(),
1228 source: from.to_string(),
1229 data: data_bytes,
1230 };
1231
1232 let _ = event_tx.send(event);
1233 debug!("Generated message event from peer: {}", peer_id);
1234 }
1235 }
1236 Err(e) => {
1237 warn!("Failed to parse received message from {}: {}", peer_id, e);
1238 }
1239 }
1240
1241 Ok(())
1242 }
1243
1244 pub async fn run(&self) -> Result<()> {
1250 if !*self.running.read().await {
1251 self.start().await?;
1252 }
1253
1254 info!("P2P node running...");
1255
1256 loop {
1258 if !*self.running.read().await {
1259 break;
1260 }
1261
1262 self.periodic_tasks().await?;
1264
1265 tokio::time::sleep(Duration::from_millis(100)).await;
1267 }
1268
1269 info!("P2P node stopped");
1270 Ok(())
1271 }
1272
1273 pub async fn stop(&self) -> Result<()> {
1275 info!("Stopping P2P node...");
1276
1277 *self.running.write().await = false;
1279
1280 self.disconnect_all_peers().await?;
1282
1283 if let Some(ref resource_manager) = self.resource_manager {
1285 resource_manager.shutdown().await.map_err(|e| {
1286 P2PError::Network(crate::error::NetworkError::ProtocolError(
1287 format!("Failed to shutdown resource manager: {e}").into(),
1288 ))
1289 })?;
1290 info!("Production resource manager stopped");
1291 }
1292
1293 info!("P2P node stopped");
1294 Ok(())
1295 }
1296
1297 pub async fn shutdown(&self) -> Result<()> {
1299 self.stop().await
1300 }
1301
1302 pub async fn is_running(&self) -> bool {
1304 *self.running.read().await
1305 }
1306
1307 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1309 self.listen_addrs.read().await.clone()
1310 }
1311
1312 pub async fn connected_peers(&self) -> Vec<PeerId> {
1314 self.peers.read().await.keys().cloned().collect()
1315 }
1316
1317 pub async fn peer_count(&self) -> usize {
1319 self.peers.read().await.len()
1320 }
1321
1322 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1324 self.peers.read().await.get(peer_id).cloned()
1325 }
1326
1327 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1339 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1341
1342 let peers = self.peers.read().await;
1343
1344 for (peer_id, peer_info) in peers.iter() {
1346 for peer_addr in &peer_info.addresses {
1348 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1349 && peer_socket == socket_addr
1350 {
1351 return Some(peer_id.clone());
1352 }
1353 }
1354 }
1355
1356 None
1357 }
1358
1359 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1365 let peers = self.peers.read().await;
1366
1367 peers
1368 .iter()
1369 .map(|(peer_id, peer_info)| (peer_id.clone(), peer_info.addresses.clone()))
1370 .collect()
1371 }
1372
1373 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1385 self.active_connections.write().await.remove(peer_id);
1387 self.peers.write().await.remove(peer_id).is_some()
1389 }
1390
1391 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1404 self.peers.read().await.contains_key(peer_id)
1405 }
1406
1407 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1409 info!("Connecting to peer at: {}", address);
1410
1411 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1413 Some(resource_manager.acquire_connection().await?)
1414 } else {
1415 None
1416 };
1417
1418 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1420 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1421 format!("{}: {}", address, e).into(),
1422 ))
1423 })?;
1424
1425 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1428 if normalized_addr != socket_addr {
1429 info!(
1430 "Normalized wildcard address {} to loopback {}",
1431 socket_addr, normalized_addr
1432 );
1433 }
1434
1435 let addr_list = vec![normalized_addr];
1437 let peer_id = match tokio::time::timeout(
1438 self.config.connection_timeout,
1439 self.dual_node.connect_happy_eyeballs(&addr_list),
1440 )
1441 .await
1442 {
1443 Ok(Ok(peer)) => {
1444 let connected_peer_id =
1445 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1446 info!("Successfully connected to peer: {}", connected_peer_id);
1447 connected_peer_id
1448 }
1449 Ok(Err(e)) => {
1450 warn!("Failed to connect to peer at {}: {}", address, e);
1451 let sanitized_address = address.replace(['/', ':'], "_");
1452 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1453 warn!(
1454 "Using demo peer ID: {} (transport connection failed)",
1455 demo_peer_id
1456 );
1457 demo_peer_id
1458 }
1459 Err(_) => {
1460 warn!(
1461 "Timed out connecting to peer at {} after {:?}",
1462 address, self.config.connection_timeout
1463 );
1464 let sanitized_address = address.replace(['/', ':'], "_");
1465 let demo_peer_id = format!("peer_from_{}", sanitized_address);
1466 demo_peer_id
1467 }
1468 };
1469
1470 let peer_info = PeerInfo {
1472 peer_id: peer_id.clone(),
1473 addresses: vec![address.to_string()],
1474 connected_at: Instant::now(),
1475 last_seen: Instant::now(),
1476 status: ConnectionStatus::Connected,
1477 protocols: vec!["p2p-foundation/1.0".to_string()],
1478 heartbeat_count: 0,
1479 };
1480
1481 self.peers.write().await.insert(peer_id.clone(), peer_info);
1483
1484 self.active_connections
1487 .write()
1488 .await
1489 .insert(peer_id.clone());
1490
1491 if let Some(ref resource_manager) = self.resource_manager {
1493 resource_manager.record_bandwidth(0, 0); }
1495
1496 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1498
1499 info!("Connected to peer: {}", peer_id);
1500 Ok(peer_id)
1501 }
1502
1503 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1505 info!("Disconnecting from peer: {}", peer_id);
1506
1507 self.active_connections.write().await.remove(peer_id);
1509
1510 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1511 peer_info.status = ConnectionStatus::Disconnected;
1512
1513 let _ = self
1515 .event_tx
1516 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1517
1518 info!("Disconnected from peer: {}", peer_id);
1519 }
1520
1521 Ok(())
1522 }
1523
1524 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1526 self.active_connections.read().await.contains(peer_id)
1527 }
1528
1529 pub async fn send_message(
1531 &self,
1532 peer_id: &PeerId,
1533 protocol: &str,
1534 data: Vec<u8>,
1535 ) -> Result<()> {
1536 debug!(
1537 "Sending message to peer {} on protocol {}",
1538 peer_id, protocol
1539 );
1540
1541 if let Some(ref resource_manager) = self.resource_manager
1543 && !resource_manager
1544 .check_rate_limit(peer_id, "message")
1545 .await?
1546 {
1547 return Err(P2PError::ResourceExhausted(
1548 format!("Rate limit exceeded for peer {}", peer_id).into(),
1549 ));
1550 }
1551
1552 if !self.peers.read().await.contains_key(peer_id) {
1554 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1555 peer_id.to_string().into(),
1556 )));
1557 }
1558
1559 if !self.is_connection_active(peer_id).await {
1562 debug!(
1563 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1564 peer_id
1565 );
1566
1567 self.remove_peer(peer_id).await;
1569
1570 return Err(P2PError::Network(
1571 crate::error::NetworkError::ConnectionClosed {
1572 peer_id: peer_id.to_string().into(),
1573 },
1574 ));
1575 }
1576
1577 if let Some(ref resource_manager) = self.resource_manager {
1581 resource_manager.record_bandwidth(data.len() as u64, 0);
1582 }
1583
1584 let _message_data = self.create_protocol_message(protocol, data)?;
1586
1587 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1589 tokio::time::timeout(self.config.connection_timeout, send_fut)
1590 .await
1591 .map_err(|_| {
1592 P2PError::Transport(crate::error::TransportError::StreamError(
1593 "Timed out sending message".into(),
1594 ))
1595 })?
1596 .map_err(|e| {
1597 P2PError::Transport(crate::error::TransportError::StreamError(
1598 e.to_string().into(),
1599 ))
1600 })
1601 }
1602
1603 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1605 use serde_json::json;
1606
1607 let timestamp = std::time::SystemTime::now()
1608 .duration_since(std::time::UNIX_EPOCH)
1609 .map_err(|e| {
1610 P2PError::Network(NetworkError::ProtocolError(
1611 format!("System time error: {}", e).into(),
1612 ))
1613 })?
1614 .as_secs();
1615
1616 let message = json!({
1618 "protocol": protocol,
1619 "data": data,
1620 "from": self.peer_id,
1621 "timestamp": timestamp
1622 });
1623
1624 serde_json::to_vec(&message).map_err(|e| {
1625 P2PError::Transport(crate::error::TransportError::StreamError(
1626 format!("Failed to serialize message: {e}").into(),
1627 ))
1628 })
1629 }
1630
1631 }
1633
1634#[allow(dead_code)]
1636fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1637 use serde_json::json;
1638
1639 let timestamp = std::time::SystemTime::now()
1640 .duration_since(std::time::UNIX_EPOCH)
1641 .map_err(|e| {
1642 P2PError::Network(NetworkError::ProtocolError(
1643 format!("System time error: {}", e).into(),
1644 ))
1645 })?
1646 .as_secs();
1647
1648 let message = json!({
1650 "protocol": protocol,
1651 "data": data,
1652 "timestamp": timestamp
1653 });
1654
1655 serde_json::to_vec(&message).map_err(|e| {
1656 P2PError::Transport(crate::error::TransportError::StreamError(
1657 format!("Failed to serialize message: {e}").into(),
1658 ))
1659 })
1660}
1661
1662impl P2PNode {
1663 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1665 self.event_tx.subscribe()
1666 }
1667
1668 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1670 self.subscribe_events()
1671 }
1672
1673 pub fn uptime(&self) -> Duration {
1675 self.start_time.elapsed()
1676 }
1677
1678 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1688 if let Some(ref resource_manager) = self.resource_manager {
1689 Ok(resource_manager.get_metrics().await)
1690 } else {
1691 Err(P2PError::Network(
1692 crate::error::NetworkError::ProtocolError(
1693 "Production resource manager not enabled".to_string().into(),
1694 ),
1695 ))
1696 }
1697 }
1698
1699 async fn connection_lifecycle_monitor(
1702 dual_node: Arc<DualStackNetworkNode>,
1703 active_connections: Arc<RwLock<HashSet<String>>>,
1704 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1705 event_tx: broadcast::Sender<P2PEvent>,
1706 geo_provider: Arc<BgpGeoProvider>,
1707 local_peer_id: String,
1708 ) {
1709 use crate::transport::ant_quic_adapter::ConnectionEvent;
1710
1711 let mut event_rx = dual_node.subscribe_connection_events();
1712
1713 info!("Connection lifecycle monitor started");
1714
1715 loop {
1716 match event_rx.recv().await {
1717 Ok(event) => {
1718 match event {
1719 ConnectionEvent::Established {
1720 peer_id,
1721 remote_address,
1722 } => {
1723 let peer_id_str =
1724 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1725 debug!(
1726 "Connection established: peer={}, addr={}",
1727 peer_id_str, remote_address
1728 );
1729
1730 let ip = remote_address.ip();
1733 let is_rejected = match ip {
1734 std::net::IpAddr::V4(v4) => {
1735 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1737 geo_provider.is_hosting_asn(asn)
1738 || geo_provider.is_vpn_asn(asn)
1739 } else {
1740 false
1741 }
1742 }
1743 std::net::IpAddr::V6(v6) => {
1744 let info = geo_provider.lookup(v6);
1745 info.is_hosting_provider || info.is_vpn_provider
1746 }
1747 };
1748
1749 if is_rejected {
1750 info!(
1751 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1752 peer_id_str, remote_address
1753 );
1754
1755 let rejection = RejectionMessage {
1757 reason: RejectionReason::GeoIpPolicy,
1758 message:
1759 "Connection rejected: Hosting/VPN providers not allowed"
1760 .to_string(),
1761 suggested_target: None, };
1763
1764 if let Ok(data) = serde_json::to_vec(&rejection) {
1766 let timestamp = std::time::SystemTime::now()
1768 .duration_since(std::time::UNIX_EPOCH)
1769 .unwrap_or_default()
1770 .as_secs();
1771
1772 let message = serde_json::json!({
1773 "protocol": "control",
1774 "data": data,
1775 "from": local_peer_id,
1776 "timestamp": timestamp
1777 });
1778
1779 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1780 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1784
1785 tokio::task::yield_now().await;
1788 }
1789 }
1790
1791 continue;
1795 }
1796
1797 active_connections.write().await.insert(peer_id_str.clone());
1799
1800 let mut peers_lock = peers.write().await;
1802 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1803 peer_info.status = ConnectionStatus::Connected;
1804 peer_info.connected_at = Instant::now();
1805 } else {
1806 debug!("Registering new incoming peer: {}", peer_id_str);
1808 peers_lock.insert(
1809 peer_id_str.clone(),
1810 PeerInfo {
1811 peer_id: peer_id_str.clone(),
1812 addresses: vec![remote_address.to_string()],
1813 status: ConnectionStatus::Connected,
1814 last_seen: Instant::now(),
1815 connected_at: Instant::now(),
1816 protocols: Vec::new(),
1817 heartbeat_count: 0,
1818 },
1819 );
1820 }
1821
1822 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
1824 }
1825 ConnectionEvent::Lost { peer_id, reason } => {
1826 let peer_id_str =
1827 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1828 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
1829
1830 active_connections.write().await.remove(&peer_id_str);
1832
1833 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1835 peer_info.status = ConnectionStatus::Disconnected;
1836 peer_info.last_seen = Instant::now();
1837 }
1838
1839 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1841 }
1842 ConnectionEvent::Failed { peer_id, reason } => {
1843 let peer_id_str =
1844 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1845 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
1846
1847 active_connections.write().await.remove(&peer_id_str);
1849
1850 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
1852 peer_info.status = ConnectionStatus::Failed(reason.clone());
1853 }
1854
1855 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
1857 }
1858 }
1859 }
1860 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1861 warn!(
1862 "Connection event monitor lagged, skipped {} events",
1863 skipped
1864 );
1865 continue;
1866 }
1867 Err(broadcast::error::RecvError::Closed) => {
1868 info!("Connection event channel closed, stopping monitor");
1869 break;
1870 }
1871 }
1872 }
1873
1874 info!("Connection lifecycle monitor stopped");
1875 }
1876
1877 async fn start_connection_monitor(&self) {
1879 debug!("Connection monitor already running from initialization");
1883 }
1884
1885 async fn keepalive_task(
1891 active_connections: Arc<RwLock<HashSet<String>>>,
1892 dual_node: Arc<DualStackNetworkNode>,
1893 shutdown: Arc<AtomicBool>,
1894 ) {
1895 use tokio::time::{Duration, interval};
1896
1897 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
1901 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1902
1903 info!(
1904 "Keepalive task started (interval: {}s)",
1905 KEEPALIVE_INTERVAL_SECS
1906 );
1907
1908 loop {
1909 if shutdown.load(Ordering::Relaxed) {
1911 info!("Keepalive task shutting down");
1912 break;
1913 }
1914
1915 interval.tick().await;
1916
1917 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
1919
1920 if peers.is_empty() {
1921 trace!("Keepalive: no active connections");
1922 continue;
1923 }
1924
1925 debug!("Sending keepalive to {} active connections", peers.len());
1926
1927 for peer_id in peers {
1929 match dual_node
1930 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
1931 .await
1932 {
1933 Ok(_) => {
1934 trace!("Keepalive sent to peer: {}", peer_id);
1935 }
1936 Err(e) => {
1937 debug!(
1938 "Failed to send keepalive to peer {}: {} (connection may have closed)",
1939 peer_id, e
1940 );
1941 }
1943 }
1944 }
1945 }
1946
1947 info!("Keepalive task stopped");
1948 }
1949
1950 pub async fn health_check(&self) -> Result<()> {
1952 if let Some(ref resource_manager) = self.resource_manager {
1953 resource_manager.health_check().await
1954 } else {
1955 let peer_count = self.peer_count().await;
1957 if peer_count > self.config.max_connections {
1958 Err(P2PError::Network(
1959 crate::error::NetworkError::ProtocolError(
1960 format!("Too many connections: {peer_count}").into(),
1961 ),
1962 ))
1963 } else {
1964 Ok(())
1965 }
1966 }
1967 }
1968
1969 pub fn production_config(&self) -> Option<&ProductionConfig> {
1971 self.config.production_config.as_ref()
1972 }
1973
1974 pub fn is_production_mode(&self) -> bool {
1976 self.resource_manager.is_some()
1977 }
1978
1979 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
1981 self.dht.as_ref()
1982 }
1983
1984 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1986 if let Some(ref dht) = self.dht {
1987 let mut dht_instance = dht.write().await;
1988 let dht_key = crate::dht::DhtKey::from_bytes(key);
1989 dht_instance
1990 .store(&dht_key, value.clone())
1991 .await
1992 .map_err(|e| {
1993 P2PError::Dht(crate::error::DhtError::StoreFailed(
1994 format!("{:?}: {e}", key).into(),
1995 ))
1996 })?;
1997
1998 Ok(())
1999 } else {
2000 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2001 "DHT not enabled".to_string().into(),
2002 )))
2003 }
2004 }
2005
2006 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2008 if let Some(ref dht) = self.dht {
2009 let dht_instance = dht.read().await;
2010 let dht_key = crate::dht::DhtKey::from_bytes(key);
2011 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2012 P2PError::Dht(crate::error::DhtError::StoreFailed(
2013 format!("Retrieve failed: {e}").into(),
2014 ))
2015 })?;
2016
2017 Ok(record_result)
2018 } else {
2019 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2020 "DHT not enabled".to_string().into(),
2021 )))
2022 }
2023 }
2024
2025 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2027 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2028 let mut manager = bootstrap_manager.write().await;
2029 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2030 .iter()
2031 .filter_map(|addr| addr.parse().ok())
2032 .collect();
2033 let contact = ContactEntry::new(peer_id, socket_addresses);
2034 manager.add_contact(contact).await.map_err(|e| {
2035 P2PError::Network(crate::error::NetworkError::ProtocolError(
2036 format!("Failed to add peer to bootstrap cache: {e}").into(),
2037 ))
2038 })?;
2039 }
2040 Ok(())
2041 }
2042
2043 pub async fn update_peer_metrics(
2045 &self,
2046 peer_id: &PeerId,
2047 success: bool,
2048 latency_ms: Option<u64>,
2049 _error: Option<String>,
2050 ) -> Result<()> {
2051 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2052 let mut manager = bootstrap_manager.write().await;
2053
2054 let metrics = QualityMetrics {
2056 success_rate: if success { 1.0 } else { 0.0 },
2057 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2058 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2060 last_successful_connection: if success {
2061 chrono::Utc::now()
2062 } else {
2063 chrono::Utc::now() - chrono::Duration::hours(1)
2064 },
2065 uptime_score: 0.5,
2066 };
2067
2068 manager
2069 .update_contact_metrics(peer_id, metrics)
2070 .await
2071 .map_err(|e| {
2072 P2PError::Network(crate::error::NetworkError::ProtocolError(
2073 format!("Failed to update peer metrics: {e}").into(),
2074 ))
2075 })?;
2076 }
2077 Ok(())
2078 }
2079
2080 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2082 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2083 let manager = bootstrap_manager.read().await;
2084 let stats = manager.get_stats().await.map_err(|e| {
2085 P2PError::Network(crate::error::NetworkError::ProtocolError(
2086 format!("Failed to get bootstrap stats: {e}").into(),
2087 ))
2088 })?;
2089 Ok(Some(stats))
2090 } else {
2091 Ok(None)
2092 }
2093 }
2094
2095 pub async fn cached_peer_count(&self) -> usize {
2097 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2098 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2099 {
2100 return stats.total_contacts;
2101 }
2102 0
2103 }
2104
2105 async fn connect_bootstrap_peers(&self) -> Result<()> {
2107 let mut bootstrap_contacts = Vec::new();
2108 let mut used_cache = false;
2109 let mut seen_addresses = std::collections::HashSet::new();
2110
2111 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2113 self.config.bootstrap_peers_str.clone()
2114 } else {
2115 self.config
2117 .bootstrap_peers
2118 .iter()
2119 .map(|addr| addr.to_string())
2120 .collect::<Vec<_>>()
2121 };
2122
2123 if !cli_bootstrap_peers.is_empty() {
2124 info!(
2125 "Using {} CLI-provided bootstrap peers (priority)",
2126 cli_bootstrap_peers.len()
2127 );
2128 for addr in &cli_bootstrap_peers {
2129 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2130 seen_addresses.insert(socket_addr);
2131 let contact = ContactEntry::new(
2132 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2133 vec![socket_addr],
2134 );
2135 bootstrap_contacts.push(contact);
2136 } else {
2137 warn!("Invalid bootstrap address format: {}", addr);
2138 }
2139 }
2140 }
2141
2142 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2144 let manager = bootstrap_manager.read().await;
2145 match manager.get_bootstrap_peers(20).await {
2146 Ok(contacts) => {
2148 if !contacts.is_empty() {
2149 let mut added_from_cache = 0;
2150 for contact in contacts {
2151 let new_addresses: Vec<_> = contact
2153 .addresses
2154 .iter()
2155 .filter(|addr| !seen_addresses.contains(addr))
2156 .copied()
2157 .collect();
2158
2159 if !new_addresses.is_empty() {
2160 for addr in &new_addresses {
2161 seen_addresses.insert(*addr);
2162 }
2163 let mut contact = contact.clone();
2164 contact.addresses = new_addresses;
2165 bootstrap_contacts.push(contact);
2166 added_from_cache += 1;
2167 }
2168 }
2169 if added_from_cache > 0 {
2170 info!(
2171 "Added {} cached bootstrap peers (supplementing CLI peers)",
2172 added_from_cache
2173 );
2174 used_cache = true;
2175 }
2176 }
2177 }
2178 Err(e) => {
2179 warn!("Failed to get cached bootstrap peers: {}", e);
2180 }
2181 }
2182 }
2183
2184 if bootstrap_contacts.is_empty() {
2185 info!("No bootstrap peers configured and no cached peers available");
2186 return Ok(());
2187 }
2188
2189 let mut successful_connections = 0;
2191 for contact in bootstrap_contacts {
2192 for addr in &contact.addresses {
2193 match self.connect_peer(&addr.to_string()).await {
2194 Ok(peer_id) => {
2195 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2196 successful_connections += 1;
2197
2198 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2200 let mut manager = bootstrap_manager.write().await;
2201 let mut updated_contact = contact.clone();
2202 updated_contact.peer_id = peer_id.clone();
2203 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2206 warn!("Failed to update bootstrap cache: {}", e);
2207 }
2208 }
2209 break; }
2211 Err(e) => {
2212 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2213
2214 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2216 let mut manager = bootstrap_manager.write().await;
2217 let mut updated_contact = contact.clone();
2218 updated_contact.update_connection_result(
2219 false,
2220 None,
2221 Some(e.to_string()),
2222 );
2223
2224 if let Err(e) = manager.add_contact(updated_contact).await {
2225 warn!("Failed to update bootstrap cache: {}", e);
2226 }
2227 }
2228 }
2229 }
2230 }
2231 }
2232
2233 if successful_connections == 0 {
2234 if !used_cache {
2235 warn!("Failed to connect to any bootstrap peers");
2236 }
2237 return Err(P2PError::Network(NetworkError::ConnectionFailed {
2238 addr: std::net::SocketAddr::from(([0, 0, 0, 0], 0)), reason: "Failed to connect to any bootstrap peers".into(),
2240 }));
2241 }
2242 info!(
2243 "Successfully connected to {} bootstrap peers",
2244 successful_connections
2245 );
2246
2247 Ok(())
2248 }
2249
2250 async fn disconnect_all_peers(&self) -> Result<()> {
2252 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2253
2254 for peer_id in peer_ids {
2255 self.disconnect_peer(&peer_id).await?;
2256 }
2257
2258 Ok(())
2259 }
2260
2261 async fn periodic_tasks(&self) -> Result<()> {
2263 Ok(())
2269 }
2270}
2271
2272#[async_trait::async_trait]
2274pub trait NetworkSender: Send + Sync {
2275 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2277
2278 fn local_peer_id(&self) -> &PeerId;
2280}
2281
2282#[derive(Clone)]
2284pub struct P2PNetworkSender {
2285 peer_id: PeerId,
2286 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2288}
2289
2290impl P2PNetworkSender {
2291 pub fn new(
2292 peer_id: PeerId,
2293 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2294 ) -> Self {
2295 Self { peer_id, send_tx }
2296 }
2297}
2298
2299#[async_trait::async_trait]
2301impl NetworkSender for P2PNetworkSender {
2302 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2304 self.send_tx
2305 .send((peer_id.clone(), protocol.to_string(), data))
2306 .map_err(|_| {
2307 P2PError::Network(crate::error::NetworkError::ProtocolError(
2308 "Failed to send message via channel".to_string().into(),
2309 ))
2310 })?;
2311 Ok(())
2312 }
2313
2314 fn local_peer_id(&self) -> &PeerId {
2316 &self.peer_id
2317 }
2318}
2319
2320pub struct NodeBuilder {
2322 config: NodeConfig,
2323}
2324
2325impl Default for NodeBuilder {
2326 fn default() -> Self {
2327 Self::new()
2328 }
2329}
2330
2331impl NodeBuilder {
2332 pub fn new() -> Self {
2334 Self {
2335 config: NodeConfig::default(),
2336 }
2337 }
2338
2339 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2341 self.config.peer_id = Some(peer_id);
2342 self
2343 }
2344
2345 pub fn listen_on(mut self, addr: &str) -> Self {
2347 if let Ok(multiaddr) = addr.parse() {
2348 self.config.listen_addrs.push(multiaddr);
2349 }
2350 self
2351 }
2352
2353 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2355 if let Ok(multiaddr) = addr.parse() {
2356 self.config.bootstrap_peers.push(multiaddr);
2357 }
2358 self.config.bootstrap_peers_str.push(addr.to_string());
2359 self
2360 }
2361
2362 pub fn with_ipv6(mut self, enable: bool) -> Self {
2364 self.config.enable_ipv6 = enable;
2365 self
2366 }
2367
2368 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2372 self.config.connection_timeout = timeout;
2373 self
2374 }
2375
2376 pub fn with_max_connections(mut self, max: usize) -> Self {
2378 self.config.max_connections = max;
2379 self
2380 }
2381
2382 pub fn with_production_mode(mut self) -> Self {
2384 self.config.production_config = Some(ProductionConfig::default());
2385 self
2386 }
2387
2388 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2390 self.config.production_config = Some(production_config);
2391 self
2392 }
2393
2394 pub fn with_diversity_config(
2396 mut self,
2397 diversity_config: crate::security::IPDiversityConfig,
2398 ) -> Self {
2399 self.config.diversity_config = Some(diversity_config);
2400 self
2401 }
2402
2403 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2405 self.config.dht_config = dht_config;
2406 self
2407 }
2408
2409 pub fn with_default_dht(mut self) -> Self {
2411 self.config.dht_config = DHTConfig::default();
2412 self
2413 }
2414
2415 pub async fn build(self) -> Result<P2PNode> {
2417 P2PNode::new(self.config).await
2418 }
2419}
2420
2421#[cfg(test)]
2422#[allow(clippy::unwrap_used, clippy::expect_used)]
2423mod diversity_tests {
2424 use super::*;
2425 use crate::security::IPDiversityConfig;
2426
2427 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2428 let diversity_config = config.diversity_config.clone().unwrap_or_default();
2429 if let Some(ref cache_config) = config.bootstrap_cache_config {
2430 BootstrapManager::with_full_config(
2431 cache_config.clone(),
2432 crate::rate_limit::JoinRateLimiterConfig::default(),
2433 diversity_config,
2434 )
2435 .await
2436 .expect("bootstrap manager")
2437 } else {
2438 BootstrapManager::with_full_config(
2439 crate::bootstrap::CacheConfig::default(),
2440 crate::rate_limit::JoinRateLimiterConfig::default(),
2441 diversity_config,
2442 )
2443 .await
2444 .expect("bootstrap manager")
2445 }
2446 }
2447
2448 #[tokio::test]
2449 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2450 let config = NodeConfig {
2451 diversity_config: Some(IPDiversityConfig::testnet()),
2452 ..Default::default()
2453 };
2454
2455 let manager = build_bootstrap_manager_like_prod(&config).await;
2456 assert!(manager.diversity_config().is_relaxed());
2457 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2458 }
2459}
2460
2461#[allow(dead_code)] async fn handle_received_message_standalone(
2464 message_data: Vec<u8>,
2465 peer_id: &PeerId,
2466 _protocol: &str,
2467 event_tx: &broadcast::Sender<P2PEvent>,
2468) -> Result<()> {
2469 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2471 Ok(message) => {
2472 if let (Some(protocol), Some(data), Some(from)) = (
2473 message.get("protocol").and_then(|v| v.as_str()),
2474 message.get("data").and_then(|v| v.as_array()),
2475 message.get("from").and_then(|v| v.as_str()),
2476 ) {
2477 let data_bytes: Vec<u8> = data
2479 .iter()
2480 .filter_map(|v| v.as_u64().map(|n| n as u8))
2481 .collect();
2482
2483 let event = P2PEvent::Message {
2485 topic: protocol.to_string(),
2486 source: from.to_string(),
2487 data: data_bytes,
2488 };
2489
2490 let _ = event_tx.send(event);
2491 debug!("Generated message event from peer: {}", peer_id);
2492 }
2493 }
2494 Err(e) => {
2495 warn!("Failed to parse received message from {}: {}", peer_id, e);
2496 }
2497 }
2498
2499 Ok(())
2500}
2501
2502#[allow(dead_code)]
2506fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2507 match create_protocol_message_static(protocol, data) {
2508 Ok(msg) => Some(msg),
2509 Err(e) => {
2510 warn!("Failed to create protocol message: {}", e);
2511 None
2512 }
2513 }
2514}
2515
2516#[allow(dead_code)]
2518async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2519 match result {
2520 Ok(_) => {
2521 debug!("Message sent to peer {} via transport layer", peer_id);
2522 }
2523 Err(e) => {
2524 warn!("Failed to send message to peer {}: {}", peer_id, e);
2525 }
2526 }
2527}
2528
2529#[allow(dead_code)] fn check_rate_limit(
2532 rate_limiter: &RateLimiter,
2533 socket_addr: &std::net::SocketAddr,
2534 remote_addr: &NetworkAddress,
2535) -> Result<()> {
2536 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2537 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2538 e
2539 })
2540}
2541
2542#[allow(dead_code)] async fn register_new_peer(
2545 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2546 peer_id: &PeerId,
2547 remote_addr: &NetworkAddress,
2548) {
2549 let mut peers_guard = peers.write().await;
2550 let peer_info = PeerInfo {
2551 peer_id: peer_id.clone(),
2552 addresses: vec![remote_addr.to_string()],
2553 connected_at: tokio::time::Instant::now(),
2554 last_seen: tokio::time::Instant::now(),
2555 status: ConnectionStatus::Connected,
2556 protocols: vec!["p2p-chat/1.0.0".to_string()],
2557 heartbeat_count: 0,
2558 };
2559 peers_guard.insert(peer_id.clone(), peer_info);
2560}
2561
2562#[allow(dead_code)] fn spawn_connection_handler(
2565 connection: Box<dyn crate::transport::Connection>,
2566 peer_id: PeerId,
2567 event_tx: broadcast::Sender<P2PEvent>,
2568 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2569) {
2570 tokio::spawn(async move {
2571 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2572 });
2573}
2574
2575#[allow(dead_code)] async fn handle_peer_connection(
2578 mut connection: Box<dyn crate::transport::Connection>,
2579 peer_id: PeerId,
2580 event_tx: broadcast::Sender<P2PEvent>,
2581 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2582) {
2583 loop {
2584 match connection.receive().await {
2585 Ok(message_data) => {
2586 debug!(
2587 "Received {} bytes from peer: {}",
2588 message_data.len(),
2589 peer_id
2590 );
2591
2592 if let Err(e) = handle_received_message_standalone(
2594 message_data,
2595 &peer_id,
2596 "unknown", &event_tx,
2598 )
2599 .await
2600 {
2601 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2602 }
2603 }
2604 Err(e) => {
2605 warn!("Failed to receive message from {}: {}", peer_id, e);
2606
2607 if !connection.is_alive().await {
2609 info!("Connection to {} is dead, removing peer", peer_id);
2610
2611 remove_peer(&peers, &peer_id).await;
2613
2614 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2616
2617 break; }
2619
2620 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2622 }
2623 }
2624 }
2625}
2626
2627#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2630 let mut peers_guard = peers.write().await;
2631 peers_guard.remove(peer_id);
2632}
2633
2634#[allow(dead_code)]
2636async fn update_peer_heartbeat(
2637 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2638 peer_id: &PeerId,
2639) -> Result<()> {
2640 let mut peers_guard = peers.write().await;
2641 match peers_guard.get_mut(peer_id) {
2642 Some(peer_info) => {
2643 peer_info.last_seen = Instant::now();
2644 peer_info.heartbeat_count += 1;
2645 Ok(())
2646 }
2647 None => {
2648 warn!("Received heartbeat from unknown peer: {}", peer_id);
2649 Err(P2PError::Network(NetworkError::PeerNotFound(
2650 format!("Peer {} not found", peer_id).into(),
2651 )))
2652 }
2653 }
2654}
2655
2656#[allow(dead_code)]
2658async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2659 if let Some(manager) = resource_manager {
2660 let metrics = manager.get_metrics().await;
2661 (metrics.memory_used, metrics.cpu_usage)
2662 } else {
2663 (0, 0.0)
2664 }
2665}
2666
2667#[cfg(test)]
2668mod tests {
2669 use super::*;
2670 use std::time::Duration;
2672 use tokio::time::timeout;
2673
2674 fn create_test_node_config() -> NodeConfig {
2680 NodeConfig {
2681 peer_id: Some("test_peer_123".to_string()),
2682 listen_addrs: vec![
2683 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2684 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2685 ],
2686 listen_addr: std::net::SocketAddr::new(
2687 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2688 0,
2689 ),
2690 bootstrap_peers: vec![],
2691 bootstrap_peers_str: vec![],
2692 enable_ipv6: true,
2693
2694 connection_timeout: Duration::from_millis(300),
2695 keep_alive_interval: Duration::from_secs(30),
2696 max_connections: 100,
2697 max_incoming_connections: 50,
2698 dht_config: DHTConfig::default(),
2699 security_config: SecurityConfig::default(),
2700 production_config: None,
2701 bootstrap_cache_config: None,
2702 diversity_config: None,
2703 }
2705 }
2706
2707 #[tokio::test]
2711 async fn test_node_config_default() {
2712 let config = NodeConfig::default();
2713
2714 assert!(config.peer_id.is_none());
2715 assert_eq!(config.listen_addrs.len(), 2);
2716 assert!(config.enable_ipv6);
2717 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2719 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2720 }
2721
2722 #[tokio::test]
2723 async fn test_dht_config_default() {
2724 let config = DHTConfig::default();
2725
2726 assert_eq!(config.k_value, 20);
2727 assert_eq!(config.alpha_value, 5);
2728 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2729 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2730 }
2731
2732 #[tokio::test]
2733 async fn test_security_config_default() {
2734 let config = SecurityConfig::default();
2735
2736 assert!(config.enable_noise);
2737 assert!(config.enable_tls);
2738 assert_eq!(config.trust_level, TrustLevel::Basic);
2739 }
2740
2741 #[test]
2742 fn test_trust_level_variants() {
2743 let _none = TrustLevel::None;
2745 let _basic = TrustLevel::Basic;
2746 let _full = TrustLevel::Full;
2747
2748 assert_eq!(TrustLevel::None, TrustLevel::None);
2750 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2751 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2752 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2753 }
2754
2755 #[test]
2756 fn test_connection_status_variants() {
2757 let connecting = ConnectionStatus::Connecting;
2758 let connected = ConnectionStatus::Connected;
2759 let disconnecting = ConnectionStatus::Disconnecting;
2760 let disconnected = ConnectionStatus::Disconnected;
2761 let failed = ConnectionStatus::Failed("test error".to_string());
2762
2763 assert_eq!(connecting, ConnectionStatus::Connecting);
2764 assert_eq!(connected, ConnectionStatus::Connected);
2765 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2766 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2767 assert_ne!(connecting, connected);
2768
2769 if let ConnectionStatus::Failed(msg) = failed {
2770 assert_eq!(msg, "test error");
2771 } else {
2772 panic!("Expected Failed status");
2773 }
2774 }
2775
2776 #[tokio::test]
2777 async fn test_node_creation() -> Result<()> {
2778 let config = create_test_node_config();
2779 let node = P2PNode::new(config).await?;
2780
2781 assert_eq!(node.peer_id(), "test_peer_123");
2782 assert!(!node.is_running().await);
2783 assert_eq!(node.peer_count().await, 0);
2784 assert!(node.connected_peers().await.is_empty());
2785
2786 Ok(())
2787 }
2788
2789 #[tokio::test]
2790 async fn test_node_creation_without_peer_id() -> Result<()> {
2791 let mut config = create_test_node_config();
2792 config.peer_id = None;
2793
2794 let node = P2PNode::new(config).await?;
2795
2796 assert!(node.peer_id().starts_with("peer_"));
2798 assert!(!node.is_running().await);
2799
2800 Ok(())
2801 }
2802
2803 #[tokio::test]
2804 async fn test_node_lifecycle() -> Result<()> {
2805 let config = create_test_node_config();
2806 let node = P2PNode::new(config).await?;
2807
2808 assert!(!node.is_running().await);
2810
2811 node.start().await?;
2813 assert!(node.is_running().await);
2814
2815 let listen_addrs = node.listen_addrs().await;
2817 assert!(
2818 !listen_addrs.is_empty(),
2819 "Expected at least one listening address"
2820 );
2821
2822 node.stop().await?;
2824 assert!(!node.is_running().await);
2825
2826 Ok(())
2827 }
2828
2829 #[tokio::test]
2830 async fn test_peer_connection() -> Result<()> {
2831 let config = create_test_node_config();
2832 let node = P2PNode::new(config).await?;
2833
2834 let peer_addr = "127.0.0.1:0";
2835
2836 let peer_id = node.connect_peer(peer_addr).await?;
2838 assert!(peer_id.starts_with("peer_from_"));
2839
2840 assert_eq!(node.peer_count().await, 1);
2842
2843 let connected_peers = node.connected_peers().await;
2845 assert_eq!(connected_peers.len(), 1);
2846 assert_eq!(connected_peers[0], peer_id);
2847
2848 let peer_info = node.peer_info(&peer_id).await;
2850 assert!(peer_info.is_some());
2851 let info = peer_info.expect("Peer info should exist after adding peer");
2852 assert_eq!(info.peer_id, peer_id);
2853 assert_eq!(info.status, ConnectionStatus::Connected);
2854 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2855
2856 node.disconnect_peer(&peer_id).await?;
2858 assert_eq!(node.peer_count().await, 0);
2859
2860 Ok(())
2861 }
2862
2863 #[tokio::test]
2864 async fn test_event_subscription() -> Result<()> {
2865 let config = create_test_node_config();
2866 let node = P2PNode::new(config).await?;
2867
2868 let mut events = node.subscribe_events();
2869 let peer_addr = "127.0.0.1:0";
2870
2871 let peer_id = node.connect_peer(peer_addr).await?;
2873
2874 let event = timeout(Duration::from_millis(100), events.recv()).await;
2876 assert!(event.is_ok());
2877
2878 let event_result = event
2879 .expect("Should receive event")
2880 .expect("Event should not be error");
2881 match event_result {
2882 P2PEvent::PeerConnected(event_peer_id) => {
2883 assert_eq!(event_peer_id, peer_id);
2884 }
2885 _ => panic!("Expected PeerConnected event"),
2886 }
2887
2888 node.disconnect_peer(&peer_id).await?;
2890
2891 let event = timeout(Duration::from_millis(100), events.recv()).await;
2893 assert!(event.is_ok());
2894
2895 let event_result = event
2896 .expect("Should receive event")
2897 .expect("Event should not be error");
2898 match event_result {
2899 P2PEvent::PeerDisconnected(event_peer_id) => {
2900 assert_eq!(event_peer_id, peer_id);
2901 }
2902 _ => panic!("Expected PeerDisconnected event"),
2903 }
2904
2905 Ok(())
2906 }
2907
2908 #[tokio::test]
2909 async fn test_message_sending() -> Result<()> {
2910 let mut config1 = create_test_node_config();
2912 config1.listen_addr =
2913 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2914 let node1 = P2PNode::new(config1).await?;
2915 node1.start().await?;
2916
2917 let mut config2 = create_test_node_config();
2918 config2.listen_addr =
2919 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2920 let node2 = P2PNode::new(config2).await?;
2921 node2.start().await?;
2922
2923 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2925
2926 let node2_addr = node2.local_addr().ok_or_else(|| {
2928 P2PError::Network(crate::error::NetworkError::ProtocolError(
2929 "No listening address".to_string().into(),
2930 ))
2931 })?;
2932
2933 let peer_id =
2935 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2936 Ok(res) => res?,
2937 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2938 };
2939
2940 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2942
2943 let message_data = b"Hello, peer!".to_vec();
2945 let result = match timeout(
2946 Duration::from_millis(500),
2947 node1.send_message(&peer_id, "test-protocol", message_data),
2948 )
2949 .await
2950 {
2951 Ok(res) => res,
2952 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2953 };
2954 if let Err(e) = &result {
2957 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2958 }
2959
2960 let non_existent_peer = "non_existent_peer".to_string();
2962 let result = node1
2963 .send_message(&non_existent_peer, "test-protocol", vec![])
2964 .await;
2965 assert!(result.is_err(), "Sending to non-existent peer should fail");
2966
2967 Ok(())
2968 }
2969
2970 #[tokio::test]
2971 async fn test_remote_mcp_operations() -> Result<()> {
2972 let config = create_test_node_config();
2973 let node = P2PNode::new(config).await?;
2974
2975 node.start().await?;
2977 node.stop().await?;
2978 Ok(())
2979 }
2980
2981 #[tokio::test]
2982 async fn test_health_check() -> Result<()> {
2983 let config = create_test_node_config();
2984 let node = P2PNode::new(config).await?;
2985
2986 let result = node.health_check().await;
2988 assert!(result.is_ok());
2989
2990 Ok(())
2995 }
2996
2997 #[tokio::test]
2998 async fn test_node_uptime() -> Result<()> {
2999 let config = create_test_node_config();
3000 let node = P2PNode::new(config).await?;
3001
3002 let uptime1 = node.uptime();
3003 assert!(uptime1 >= Duration::from_secs(0));
3004
3005 tokio::time::sleep(Duration::from_millis(10)).await;
3007
3008 let uptime2 = node.uptime();
3009 assert!(uptime2 > uptime1);
3010
3011 Ok(())
3012 }
3013
3014 #[tokio::test]
3015 async fn test_node_config_access() -> Result<()> {
3016 let config = create_test_node_config();
3017 let expected_peer_id = config.peer_id.clone();
3018 let node = P2PNode::new(config).await?;
3019
3020 let node_config = node.config();
3021 assert_eq!(node_config.peer_id, expected_peer_id);
3022 assert_eq!(node_config.max_connections, 100);
3023 Ok(())
3026 }
3027
3028 #[tokio::test]
3029 async fn test_mcp_server_access() -> Result<()> {
3030 let config = create_test_node_config();
3031 let _node = P2PNode::new(config).await?;
3032
3033 Ok(())
3035 }
3036
3037 #[tokio::test]
3038 async fn test_dht_access() -> Result<()> {
3039 let config = create_test_node_config();
3040 let node = P2PNode::new(config).await?;
3041
3042 assert!(node.dht().is_some());
3044
3045 Ok(())
3046 }
3047
3048 #[tokio::test]
3049 async fn test_node_builder() -> Result<()> {
3050 let builder = P2PNode::builder()
3052 .with_peer_id("builder_test_peer".to_string())
3053 .listen_on("/ip4/127.0.0.1/tcp/0")
3054 .listen_on("/ip6/::1/tcp/0")
3055 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3057 .with_connection_timeout(Duration::from_secs(15))
3058 .with_max_connections(200);
3059
3060 let config = builder.config;
3062 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3063 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3066 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3067 assert_eq!(config.max_connections, 200);
3068
3069 Ok(())
3070 }
3071
3072 #[tokio::test]
3073 async fn test_bootstrap_peers() -> Result<()> {
3074 let mut config = create_test_node_config();
3075 config.bootstrap_peers = vec![
3076 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3077 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3078 ];
3079
3080 let node = P2PNode::new(config).await?;
3081
3082 node.start().await?;
3084
3085 let _peer_count = node.peer_count().await;
3089
3090 node.stop().await?;
3091 Ok(())
3092 }
3093
3094 #[tokio::test]
3095 async fn test_production_mode_disabled() -> Result<()> {
3096 let config = create_test_node_config();
3097 let node = P2PNode::new(config).await?;
3098
3099 assert!(!node.is_production_mode());
3100 assert!(node.production_config().is_none());
3101
3102 let result = node.resource_metrics().await;
3104 assert!(result.is_err());
3105 assert!(result.unwrap_err().to_string().contains("not enabled"));
3106
3107 Ok(())
3108 }
3109
3110 #[tokio::test]
3111 async fn test_network_event_variants() {
3112 let peer_id = "test_peer".to_string();
3114 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3115
3116 let _peer_connected = NetworkEvent::PeerConnected {
3117 peer_id: peer_id.clone(),
3118 addresses: vec![address.clone()],
3119 };
3120
3121 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3122 peer_id: peer_id.clone(),
3123 reason: "test disconnect".to_string(),
3124 };
3125
3126 let _message_received = NetworkEvent::MessageReceived {
3127 peer_id: peer_id.clone(),
3128 protocol: "test-protocol".to_string(),
3129 data: vec![1, 2, 3],
3130 };
3131
3132 let _connection_failed = NetworkEvent::ConnectionFailed {
3133 peer_id: Some(peer_id.clone()),
3134 address: address.clone(),
3135 error: "connection refused".to_string(),
3136 };
3137
3138 let _dht_stored = NetworkEvent::DHTRecordStored {
3139 key: vec![1, 2, 3],
3140 value: vec![4, 5, 6],
3141 };
3142
3143 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3144 key: vec![1, 2, 3],
3145 value: Some(vec![4, 5, 6]),
3146 };
3147 }
3148
3149 #[tokio::test]
3150 async fn test_peer_info_structure() {
3151 let peer_info = PeerInfo {
3152 peer_id: "test_peer".to_string(),
3153 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3154 connected_at: Instant::now(),
3155 last_seen: Instant::now(),
3156 status: ConnectionStatus::Connected,
3157 protocols: vec!["test-protocol".to_string()],
3158 heartbeat_count: 0,
3159 };
3160
3161 assert_eq!(peer_info.peer_id, "test_peer");
3162 assert_eq!(peer_info.addresses.len(), 1);
3163 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3164 assert_eq!(peer_info.protocols.len(), 1);
3165 }
3166
3167 #[tokio::test]
3168 async fn test_serialization() -> Result<()> {
3169 let config = create_test_node_config();
3171 let serialized = serde_json::to_string(&config)?;
3172 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3173
3174 assert_eq!(config.peer_id, deserialized.peer_id);
3175 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3176 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3177
3178 Ok(())
3179 }
3180
3181 #[tokio::test]
3182 async fn test_get_peer_id_by_address_found() -> Result<()> {
3183 let config = create_test_node_config();
3184 let node = P2PNode::new(config).await?;
3185
3186 let test_peer_id = "peer_test_123".to_string();
3188 let test_address = "192.168.1.100:9000".to_string();
3189
3190 let peer_info = PeerInfo {
3191 peer_id: test_peer_id.clone(),
3192 addresses: vec![test_address.clone()],
3193 connected_at: Instant::now(),
3194 last_seen: Instant::now(),
3195 status: ConnectionStatus::Connected,
3196 protocols: vec!["test-protocol".to_string()],
3197 heartbeat_count: 0,
3198 };
3199
3200 node.peers
3201 .write()
3202 .await
3203 .insert(test_peer_id.clone(), peer_info);
3204
3205 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3207 assert_eq!(found_peer_id, Some(test_peer_id));
3208
3209 Ok(())
3210 }
3211
3212 #[tokio::test]
3213 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3214 let config = create_test_node_config();
3215 let node = P2PNode::new(config).await?;
3216
3217 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3219 assert_eq!(result, None);
3220
3221 Ok(())
3222 }
3223
3224 #[tokio::test]
3225 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3226 let config = create_test_node_config();
3227 let node = P2PNode::new(config).await?;
3228
3229 let result = node.get_peer_id_by_address("invalid-address").await;
3231 assert_eq!(result, None);
3232
3233 Ok(())
3234 }
3235
3236 #[tokio::test]
3237 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3238 let config = create_test_node_config();
3239 let node = P2PNode::new(config).await?;
3240
3241 let peer1_id = "peer_1".to_string();
3243 let peer1_addr = "192.168.1.101:9001".to_string();
3244
3245 let peer2_id = "peer_2".to_string();
3246 let peer2_addr = "192.168.1.102:9002".to_string();
3247
3248 let peer1_info = PeerInfo {
3249 peer_id: peer1_id.clone(),
3250 addresses: vec![peer1_addr.clone()],
3251 connected_at: Instant::now(),
3252 last_seen: Instant::now(),
3253 status: ConnectionStatus::Connected,
3254 protocols: vec!["test-protocol".to_string()],
3255 heartbeat_count: 0,
3256 };
3257
3258 let peer2_info = PeerInfo {
3259 peer_id: peer2_id.clone(),
3260 addresses: vec![peer2_addr.clone()],
3261 connected_at: Instant::now(),
3262 last_seen: Instant::now(),
3263 status: ConnectionStatus::Connected,
3264 protocols: vec!["test-protocol".to_string()],
3265 heartbeat_count: 0,
3266 };
3267
3268 node.peers
3269 .write()
3270 .await
3271 .insert(peer1_id.clone(), peer1_info);
3272 node.peers
3273 .write()
3274 .await
3275 .insert(peer2_id.clone(), peer2_info);
3276
3277 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3279 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3280
3281 assert_eq!(found_peer1, Some(peer1_id));
3282 assert_eq!(found_peer2, Some(peer2_id));
3283
3284 Ok(())
3285 }
3286
3287 #[tokio::test]
3288 async fn test_list_active_connections_empty() -> Result<()> {
3289 let config = create_test_node_config();
3290 let node = P2PNode::new(config).await?;
3291
3292 let connections = node.list_active_connections().await;
3294 assert!(connections.is_empty());
3295
3296 Ok(())
3297 }
3298
3299 #[tokio::test]
3300 async fn test_list_active_connections_with_peers() -> Result<()> {
3301 let config = create_test_node_config();
3302 let node = P2PNode::new(config).await?;
3303
3304 let peer1_id = "peer_1".to_string();
3306 let peer1_addrs = vec![
3307 "192.168.1.101:9001".to_string(),
3308 "192.168.1.101:9002".to_string(),
3309 ];
3310
3311 let peer2_id = "peer_2".to_string();
3312 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3313
3314 let peer1_info = PeerInfo {
3315 peer_id: peer1_id.clone(),
3316 addresses: peer1_addrs.clone(),
3317 connected_at: Instant::now(),
3318 last_seen: Instant::now(),
3319 status: ConnectionStatus::Connected,
3320 protocols: vec!["test-protocol".to_string()],
3321 heartbeat_count: 0,
3322 };
3323
3324 let peer2_info = PeerInfo {
3325 peer_id: peer2_id.clone(),
3326 addresses: peer2_addrs.clone(),
3327 connected_at: Instant::now(),
3328 last_seen: Instant::now(),
3329 status: ConnectionStatus::Connected,
3330 protocols: vec!["test-protocol".to_string()],
3331 heartbeat_count: 0,
3332 };
3333
3334 node.peers
3335 .write()
3336 .await
3337 .insert(peer1_id.clone(), peer1_info);
3338 node.peers
3339 .write()
3340 .await
3341 .insert(peer2_id.clone(), peer2_info);
3342
3343 let connections = node.list_active_connections().await;
3345 assert_eq!(connections.len(), 2);
3346
3347 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3349 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3350
3351 assert!(peer1_conn.is_some());
3352 assert!(peer2_conn.is_some());
3353
3354 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3356 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3357
3358 Ok(())
3359 }
3360
3361 #[tokio::test]
3362 async fn test_remove_peer_success() -> Result<()> {
3363 let config = create_test_node_config();
3364 let node = P2PNode::new(config).await?;
3365
3366 let peer_id = "peer_to_remove".to_string();
3368 let peer_info = PeerInfo {
3369 peer_id: peer_id.clone(),
3370 addresses: vec!["192.168.1.100:9000".to_string()],
3371 connected_at: Instant::now(),
3372 last_seen: Instant::now(),
3373 status: ConnectionStatus::Connected,
3374 protocols: vec!["test-protocol".to_string()],
3375 heartbeat_count: 0,
3376 };
3377
3378 node.peers.write().await.insert(peer_id.clone(), peer_info);
3379
3380 assert!(node.is_peer_connected(&peer_id).await);
3382
3383 let removed = node.remove_peer(&peer_id).await;
3385 assert!(removed);
3386
3387 assert!(!node.is_peer_connected(&peer_id).await);
3389
3390 Ok(())
3391 }
3392
3393 #[tokio::test]
3394 async fn test_remove_peer_nonexistent() -> Result<()> {
3395 let config = create_test_node_config();
3396 let node = P2PNode::new(config).await?;
3397
3398 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3400 assert!(!removed);
3401
3402 Ok(())
3403 }
3404
3405 #[tokio::test]
3406 async fn test_is_peer_connected() -> Result<()> {
3407 let config = create_test_node_config();
3408 let node = P2PNode::new(config).await?;
3409
3410 let peer_id = "test_peer".to_string();
3411
3412 assert!(!node.is_peer_connected(&peer_id).await);
3414
3415 let peer_info = PeerInfo {
3417 peer_id: peer_id.clone(),
3418 addresses: vec!["192.168.1.100:9000".to_string()],
3419 connected_at: Instant::now(),
3420 last_seen: Instant::now(),
3421 status: ConnectionStatus::Connected,
3422 protocols: vec!["test-protocol".to_string()],
3423 heartbeat_count: 0,
3424 };
3425
3426 node.peers.write().await.insert(peer_id.clone(), peer_info);
3427
3428 assert!(node.is_peer_connected(&peer_id).await);
3430
3431 node.remove_peer(&peer_id).await;
3433
3434 assert!(!node.is_peer_connected(&peer_id).await);
3436
3437 Ok(())
3438 }
3439
3440 #[test]
3441 fn test_normalize_ipv6_wildcard() {
3442 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3443
3444 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3445 let normalized = normalize_wildcard_to_loopback(wildcard);
3446
3447 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3448 assert_eq!(normalized.port(), 8080);
3449 }
3450
3451 #[test]
3452 fn test_normalize_ipv4_wildcard() {
3453 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3454
3455 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3456 let normalized = normalize_wildcard_to_loopback(wildcard);
3457
3458 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3459 assert_eq!(normalized.port(), 9000);
3460 }
3461
3462 #[test]
3463 fn test_normalize_specific_address_unchanged() {
3464 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3465 let normalized = normalize_wildcard_to_loopback(specific);
3466
3467 assert_eq!(normalized, specific);
3468 }
3469
3470 #[test]
3471 fn test_normalize_loopback_unchanged() {
3472 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3473
3474 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3475 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3476 assert_eq!(normalized_v6, loopback_v6);
3477
3478 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3479 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3480 assert_eq!(normalized_v4, loopback_v4);
3481 }
3482}