1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, error, info, trace, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub connection_timeout: Duration,
68
69 pub keep_alive_interval: Duration,
71
72 pub max_connections: usize,
74
75 pub max_incoming_connections: usize,
77
78 pub dht_config: DHTConfig,
80
81 pub security_config: SecurityConfig,
83
84 pub production_config: Option<ProductionConfig>,
86
87 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90 pub diversity_config: Option<crate::security::IPDiversityConfig>,
95
96 #[serde(default)]
101 pub attestation_config: crate::attestation::AttestationConfig,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct DHTConfig {
107 pub k_value: usize,
109
110 pub alpha_value: usize,
112
113 pub record_ttl: Duration,
115
116 pub refresh_interval: Duration,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SecurityConfig {
123 pub enable_noise: bool,
125
126 pub enable_tls: bool,
128
129 pub trust_level: TrustLevel,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum TrustLevel {
136 None,
138 Basic,
140 Full,
142}
143
144impl NodeConfig {
145 pub fn new() -> Result<Self> {
151 let config = Config::default();
153
154 let listen_addr = config.listen_socket_addr()?;
156
157 let mut listen_addrs = vec![];
159
160 if config.network.ipv6_enabled {
162 let ipv6_addr = std::net::SocketAddr::new(
163 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
164 listen_addr.port(),
165 );
166 listen_addrs.push(ipv6_addr);
167 }
168
169 let ipv4_addr = std::net::SocketAddr::new(
171 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
172 listen_addr.port(),
173 );
174 listen_addrs.push(ipv4_addr);
175
176 Ok(Self {
177 peer_id: None,
178 listen_addrs,
179 listen_addr,
180 bootstrap_peers: Vec::new(),
181 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
182 enable_ipv6: config.network.ipv6_enabled,
183
184 connection_timeout: Duration::from_secs(config.network.connection_timeout),
185 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
186 max_connections: config.network.max_connections,
187 max_incoming_connections: config.security.connection_limit as usize,
188 dht_config: DHTConfig::default(),
189 security_config: SecurityConfig::default(),
190 production_config: None,
191 bootstrap_cache_config: None,
192 diversity_config: None,
193 attestation_config: config.attestation.clone(),
194 })
195 }
196}
197
198impl Default for NodeConfig {
199 fn default() -> Self {
200 let config = Config::default();
202
203 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
205 std::net::SocketAddr::new(
206 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
207 9000,
208 )
209 });
210
211 Self {
212 peer_id: None,
213 listen_addrs: vec![
214 std::net::SocketAddr::new(
215 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
216 listen_addr.port(),
217 ),
218 std::net::SocketAddr::new(
219 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
220 listen_addr.port(),
221 ),
222 ],
223 listen_addr,
224 bootstrap_peers: Vec::new(),
225 bootstrap_peers_str: Vec::new(),
226 enable_ipv6: config.network.ipv6_enabled,
227
228 connection_timeout: Duration::from_secs(config.network.connection_timeout),
229 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
230 max_connections: config.network.max_connections,
231 max_incoming_connections: config.security.connection_limit as usize,
232 dht_config: DHTConfig::default(),
233 security_config: SecurityConfig::default(),
234 production_config: None, bootstrap_cache_config: None,
236 diversity_config: None,
237 attestation_config: config.attestation.clone(),
238 }
239 }
240}
241
242impl NodeConfig {
243 pub fn from_config(config: &Config) -> Result<Self> {
245 let listen_addr = config.listen_socket_addr()?;
246 let bootstrap_addrs = config.bootstrap_addrs()?;
247
248 let mut node_config = Self {
249 peer_id: None,
250 listen_addrs: vec![listen_addr],
251 listen_addr,
252 bootstrap_peers: bootstrap_addrs
253 .iter()
254 .map(|addr| addr.socket_addr())
255 .collect(),
256 bootstrap_peers_str: config
257 .network
258 .bootstrap_nodes
259 .iter()
260 .map(|addr| addr.to_string())
261 .collect(),
262 enable_ipv6: config.network.ipv6_enabled,
263
264 connection_timeout: Duration::from_secs(config.network.connection_timeout),
265 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266 max_connections: config.network.max_connections,
267 max_incoming_connections: config.security.connection_limit as usize,
268 dht_config: DHTConfig {
269 k_value: 20,
270 alpha_value: 3,
271 record_ttl: Duration::from_secs(3600),
272 refresh_interval: Duration::from_secs(900),
273 },
274 security_config: SecurityConfig {
275 enable_noise: true,
276 enable_tls: true,
277 trust_level: TrustLevel::Basic,
278 },
279 production_config: Some(ProductionConfig {
280 max_connections: config.network.max_connections,
281 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
284 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
285 health_check_interval: Duration::from_secs(30),
286 metrics_interval: Duration::from_secs(60),
287 enable_performance_tracking: true,
288 enable_auto_cleanup: true,
289 shutdown_timeout: Duration::from_secs(30),
290 rate_limits: crate::production::RateLimitConfig::default(),
291 }),
292 bootstrap_cache_config: None,
293 diversity_config: None,
294 attestation_config: config.attestation.clone(),
295 };
296
297 if config.network.ipv6_enabled {
299 node_config.listen_addrs.push(std::net::SocketAddr::new(
300 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
301 listen_addr.port(),
302 ));
303 }
304
305 Ok(node_config)
306 }
307
308 pub fn with_listen_addr(addr: &str) -> Result<Self> {
310 let listen_addr: std::net::SocketAddr = addr
311 .parse()
312 .map_err(|e: std::net::AddrParseError| {
313 NetworkError::InvalidAddress(e.to_string().into())
314 })
315 .map_err(P2PError::Network)?;
316 let cfg = NodeConfig {
317 listen_addr,
318 listen_addrs: vec![listen_addr],
319 diversity_config: None,
320 ..Default::default()
321 };
322 Ok(cfg)
323 }
324}
325
326impl Default for DHTConfig {
327 fn default() -> Self {
328 Self {
329 k_value: 20,
330 alpha_value: 5,
331 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
334 }
335}
336
337impl Default for SecurityConfig {
338 fn default() -> Self {
339 Self {
340 enable_noise: true,
341 enable_tls: true,
342 trust_level: TrustLevel::Basic,
343 }
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct PeerInfo {
350 pub peer_id: PeerId,
352
353 pub addresses: Vec<String>,
355
356 pub connected_at: Instant,
358
359 pub last_seen: Instant,
361
362 pub status: ConnectionStatus,
364
365 pub protocols: Vec<String>,
367
368 pub heartbeat_count: u64,
370}
371
372#[derive(Debug, Clone, PartialEq)]
374pub enum ConnectionStatus {
375 Connecting,
377 Connected,
379 Disconnecting,
381 Disconnected,
383 Failed(String),
385}
386
387#[derive(Debug, Clone)]
389pub enum NetworkEvent {
390 PeerConnected {
392 peer_id: PeerId,
394 addresses: Vec<String>,
396 },
397
398 PeerDisconnected {
400 peer_id: PeerId,
402 reason: String,
404 },
405
406 MessageReceived {
408 peer_id: PeerId,
410 protocol: String,
412 data: Vec<u8>,
414 },
415
416 ConnectionFailed {
418 peer_id: Option<PeerId>,
420 address: String,
422 error: String,
424 },
425
426 DHTRecordStored {
428 key: Vec<u8>,
430 value: Vec<u8>,
432 },
433
434 DHTRecordRetrieved {
436 key: Vec<u8>,
438 value: Option<Vec<u8>>,
440 },
441}
442
443#[derive(Debug, Clone)]
448pub enum P2PEvent {
449 Message {
451 topic: String,
453 source: PeerId,
455 data: Vec<u8>,
457 },
458 PeerConnected(PeerId),
460 PeerDisconnected(PeerId),
462}
463
464pub struct P2PNode {
474 config: NodeConfig,
476
477 peer_id: PeerId,
479
480 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
482
483 event_tx: broadcast::Sender<P2PEvent>,
485
486 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
488
489 start_time: Instant,
491
492 running: RwLock<bool>,
494
495 dht: Option<Arc<RwLock<DHT>>>,
497
498 resource_manager: Option<Arc<ResourceManager>>,
500
501 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
503
504 dual_node: Arc<DualStackNetworkNode>,
506
507 #[allow(dead_code)]
509 rate_limiter: Arc<RateLimiter>,
510
511 active_connections: Arc<RwLock<HashSet<PeerId>>>,
514
515 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
517
518 #[allow(dead_code)]
520 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
521
522 #[allow(dead_code)]
524 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
525
526 #[allow(dead_code)]
528 shutdown: Arc<AtomicBool>,
529
530 #[allow(dead_code)]
532 geo_provider: Arc<BgpGeoProvider>,
533
534 entangled_id: Option<crate::attestation::EntangledId>,
537
538 binary_hash: [u8; 32],
541}
542
543fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
559 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
560
561 if addr.ip().is_unspecified() {
562 let loopback_ip = match addr {
564 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
567 std::net::SocketAddr::new(loopback_ip, addr.port())
568 } else {
569 addr
571 }
572}
573
574impl P2PNode {
575 pub fn new_for_tests() -> Result<Self> {
577 let (event_tx, _) = broadcast::channel(16);
578 Ok(Self {
579 config: NodeConfig::default(),
580 peer_id: "test_peer".to_string(),
581 peers: Arc::new(RwLock::new(HashMap::new())),
582 event_tx,
583 listen_addrs: RwLock::new(Vec::new()),
584 start_time: Instant::now(),
585 running: RwLock::new(false),
586 dht: None,
587 resource_manager: None,
588 bootstrap_manager: None,
589 dual_node: {
590 let v6: Option<std::net::SocketAddr> = "[::1]:0"
592 .parse()
593 .ok()
594 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
595 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
596 let handle = tokio::runtime::Handle::current();
597 let dual_attempt = handle.block_on(
598 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
599 );
600 let dual = match dual_attempt {
601 Ok(d) => d,
602 Err(_e1) => {
603 let fallback = handle.block_on(
605 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
606 None,
607 "127.0.0.1:0".parse().ok(),
608 ),
609 );
610 match fallback {
611 Ok(d) => d,
612 Err(e2) => {
613 return Err(P2PError::Network(NetworkError::BindError(
614 format!("Failed to create dual-stack network node: {}", e2)
615 .into(),
616 )));
617 }
618 }
619 }
620 };
621 Arc::new(dual)
622 },
623 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
624 max_requests: 100,
625 burst_size: 100,
626 window: std::time::Duration::from_secs(1),
627 ..Default::default()
628 })),
629 active_connections: Arc::new(RwLock::new(HashSet::new())),
630 connection_monitor_handle: Arc::new(RwLock::new(None)),
631 keepalive_handle: Arc::new(RwLock::new(None)),
632 shutdown: Arc::new(AtomicBool::new(false)),
633 geo_provider: Arc::new(BgpGeoProvider::new()),
634 security_dashboard: None,
635 entangled_id: None,
637 binary_hash: [0u8; 32],
638 })
639 }
640 pub async fn new(config: NodeConfig) -> Result<Self> {
642 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
643 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
645 });
646
647 let (event_tx, _) = broadcast::channel(1000);
648
649 {
652 use blake3::Hasher;
653 let mut hasher = Hasher::new();
654 hasher.update(peer_id.as_bytes());
655 let digest = hasher.finalize();
656 let mut nid = [0u8; 32];
657 nid.copy_from_slice(digest.as_bytes());
658 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
659 crate::identity::node_identity::NodeId::from_bytes(nid),
660 ));
661 }
664
665 let (dht, security_dashboard) = if true {
667 let _dht_config = crate::dht::DHTConfig {
669 replication_factor: config.dht_config.k_value,
670 bucket_size: config.dht_config.k_value,
671 alpha: config.dht_config.alpha_value,
672 record_ttl: config.dht_config.record_ttl,
673 bucket_refresh_interval: config.dht_config.refresh_interval,
674 republish_interval: config.dht_config.refresh_interval,
675 max_distance: 160,
676 };
677 let peer_bytes = peer_id.as_bytes();
679 let mut node_id_bytes = [0u8; 32];
680 let len = peer_bytes.len().min(32);
681 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
682 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
683 let dht_instance = DHT::new(node_id).map_err(|e| {
684 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
685 e.to_string().into(),
686 ))
687 })?;
688 dht_instance.start_maintenance_tasks();
689
690 let security_metrics = dht_instance.security_metrics();
692 let dashboard = crate::dht::metrics::SecurityDashboard::new(
693 security_metrics,
694 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
695 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
696 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
697 );
698
699 (
700 Some(Arc::new(RwLock::new(dht_instance))),
701 Some(Arc::new(dashboard)),
702 )
703 } else {
704 (None, None)
705 };
706
707 let resource_manager = config
711 .production_config
712 .clone()
713 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
714
715 let diversity_config = config.diversity_config.clone().unwrap_or_default();
717 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
718 match BootstrapManager::with_full_config(
719 cache_config.clone(),
720 crate::rate_limit::JoinRateLimiterConfig::default(),
721 diversity_config.clone(),
722 )
723 .await
724 {
725 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
726 Err(e) => {
727 warn!(
728 "Failed to initialize bootstrap manager: {}, continuing without cache",
729 e
730 );
731 None
732 }
733 }
734 } else {
735 match BootstrapManager::with_full_config(
736 crate::bootstrap::CacheConfig::default(),
737 crate::rate_limit::JoinRateLimiterConfig::default(),
738 diversity_config,
739 )
740 .await
741 {
742 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
743 Err(e) => {
744 warn!(
745 "Failed to initialize bootstrap manager: {}, continuing without cache",
746 e
747 );
748 None
749 }
750 }
751 };
752
753 let (v6_opt, v4_opt) = {
756 let port = config.listen_addr.port();
757 let ip = config.listen_addr.ip();
758
759 let v4_addr = if ip.is_ipv4() {
760 Some(std::net::SocketAddr::new(ip, port))
761 } else {
762 Some(std::net::SocketAddr::new(
765 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
766 port,
767 ))
768 };
769
770 let v6_addr = if config.enable_ipv6 {
771 if ip.is_ipv6() {
772 Some(std::net::SocketAddr::new(ip, port))
773 } else {
774 Some(std::net::SocketAddr::new(
775 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
776 port,
777 ))
778 }
779 } else {
780 None
781 };
782 (v6_addr, v4_addr)
783 };
784
785 let dual_node = Arc::new(
786 DualStackNetworkNode::new(v6_opt, v4_opt)
787 .await
788 .map_err(|e| {
789 P2PError::Transport(crate::error::TransportError::SetupFailed(
790 format!("Failed to create dual-stack network nodes: {}", e).into(),
791 ))
792 })?,
793 );
794
795 let rate_limiter = Arc::new(RateLimiter::new(
797 crate::validation::RateLimitConfig::default(),
798 ));
799
800 let active_connections = Arc::new(RwLock::new(HashSet::new()));
802
803 let geo_provider = Arc::new(BgpGeoProvider::new());
805
806 let peers = Arc::new(RwLock::new(HashMap::new()));
808
809 let connection_monitor_handle = {
811 let active_conns = Arc::clone(&active_connections);
812 let peers_map = Arc::clone(&peers);
813 let event_tx_clone = event_tx.clone();
814 let dual_node_clone = Arc::clone(&dual_node);
815 let geo_provider_clone = Arc::clone(&geo_provider);
816 let peer_id_clone = peer_id.clone();
817
818 let handle = tokio::spawn(async move {
819 Self::connection_lifecycle_monitor(
820 dual_node_clone,
821 active_conns,
822 peers_map,
823 event_tx_clone,
824 geo_provider_clone,
825 peer_id_clone,
826 )
827 .await;
828 });
829
830 Arc::new(RwLock::new(Some(handle)))
831 };
832
833 let shutdown = Arc::new(AtomicBool::new(false));
835 let keepalive_handle = {
836 let active_conns = Arc::clone(&active_connections);
837 let dual_node_clone = Arc::clone(&dual_node);
838 let shutdown_clone = Arc::clone(&shutdown);
839
840 let handle = tokio::spawn(async move {
841 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
842 });
843
844 Arc::new(RwLock::new(Some(handle)))
845 };
846
847 let binary_hash = Self::compute_binary_hash();
850
851 let node = Self {
852 config,
853 peer_id,
854 peers,
855 event_tx,
856 listen_addrs: RwLock::new(Vec::new()),
857 start_time: Instant::now(),
858 running: RwLock::new(false),
859 dht,
860 resource_manager,
861 bootstrap_manager,
862 dual_node,
863 rate_limiter,
864 active_connections,
865 security_dashboard,
866 connection_monitor_handle,
867 keepalive_handle,
868 shutdown,
869 geo_provider,
870 entangled_id: None,
872 binary_hash,
873 };
874 info!("Created P2P node with peer ID: {}", node.peer_id);
875
876 node.start_network_listeners().await?;
878
879 node.start_connection_monitor().await;
881
882 Ok(node)
883 }
884
885 pub fn builder() -> NodeBuilder {
887 NodeBuilder::new()
888 }
889
890 pub fn peer_id(&self) -> &PeerId {
892 &self.peer_id
893 }
894
895 pub fn local_addr(&self) -> Option<String> {
896 self.listen_addrs
897 .try_read()
898 .ok()
899 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
900 }
901
902 pub async fn subscribe(&self, topic: &str) -> Result<()> {
903 info!("Subscribed to topic: {}", topic);
906 Ok(())
907 }
908
909 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
910 info!(
911 "Publishing message to topic: {} ({} bytes)",
912 topic,
913 data.len()
914 );
915
916 let peer_list: Vec<PeerId> = {
918 let peers_guard = self.peers.read().await;
919 peers_guard.keys().cloned().collect()
920 };
921
922 if peer_list.is_empty() {
923 debug!("No peers connected, message will only be sent to local subscribers");
924 } else {
925 let mut send_count = 0;
927 for peer_id in &peer_list {
928 match self.send_message(peer_id, topic, data.to_vec()).await {
929 Ok(_) => {
930 send_count += 1;
931 debug!("Sent message to peer: {}", peer_id);
932 }
933 Err(e) => {
934 warn!("Failed to send message to peer {}: {}", peer_id, e);
935 }
936 }
937 }
938 info!(
939 "Published message to {}/{} connected peers",
940 send_count,
941 peer_list.len()
942 );
943 }
944
945 let event = P2PEvent::Message {
947 topic: topic.to_string(),
948 source: self.peer_id.clone(),
949 data: data.to_vec(),
950 };
951 let _ = self.event_tx.send(event);
952
953 Ok(())
954 }
955
956 pub fn config(&self) -> &NodeConfig {
958 &self.config
959 }
960
961 pub async fn start(&self) -> Result<()> {
963 info!("Starting P2P node...");
964
965 if let Some(ref resource_manager) = self.resource_manager {
967 resource_manager.start().await.map_err(|e| {
968 P2PError::Network(crate::error::NetworkError::ProtocolError(
969 format!("Failed to start resource manager: {e}").into(),
970 ))
971 })?;
972 info!("Production resource manager started");
973 }
974
975 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
977 let mut manager = bootstrap_manager.write().await;
978 manager.start_background_tasks().await.map_err(|e| {
979 P2PError::Network(crate::error::NetworkError::ProtocolError(
980 format!("Failed to start bootstrap manager: {e}").into(),
981 ))
982 })?;
983 info!("Bootstrap cache manager started");
984 }
985
986 *self.running.write().await = true;
988
989 self.start_network_listeners().await?;
991
992 let listen_addrs = self.listen_addrs.read().await;
994 info!("P2P node started on addresses: {:?}", *listen_addrs);
995
996 self.start_message_receiving_system().await?;
1000
1001 self.connect_bootstrap_peers().await?;
1003
1004 Ok(())
1005 }
1006
1007 async fn start_network_listeners(&self) -> Result<()> {
1009 info!("Starting dual-stack listeners (ant-quic)...");
1010 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1012 P2PError::Transport(crate::error::TransportError::SetupFailed(
1013 format!("Failed to get local addresses: {}", e).into(),
1014 ))
1015 })?;
1016 {
1017 let mut la = self.listen_addrs.write().await;
1018 *la = addrs.clone();
1019 }
1020
1021 let event_tx = self.event_tx.clone();
1023 let peers = self.peers.clone();
1024 let active_connections = self.active_connections.clone();
1025 let rate_limiter = self.rate_limiter.clone();
1026 let dual = self.dual_node.clone();
1027 tokio::spawn(async move {
1028 loop {
1029 match dual.accept_any().await {
1030 Ok((ant_peer_id, remote_sock)) => {
1031 let peer_id =
1032 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1033 let remote_addr = NetworkAddress::from(remote_sock);
1034 let _ = rate_limiter.check_ip(&remote_sock.ip());
1036 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1037 register_new_peer(&peers, &peer_id, &remote_addr).await;
1038 active_connections.write().await.insert(peer_id);
1039 }
1040 Err(e) => {
1041 warn!("Accept failed: {}", e);
1042 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1043 }
1044 }
1045 }
1046 });
1047
1048 info!("Dual-stack listeners active on: {:?}", addrs);
1049 Ok(())
1050 }
1051
1052 #[allow(dead_code)]
1054 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1055 warn!("QUIC transport temporarily disabled during ant-quic migration");
1094 Err(crate::P2PError::Transport(
1096 crate::error::TransportError::SetupFailed(
1097 format!(
1098 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1099 )
1100 .into(),
1101 ),
1102 ))
1103 }
1104
1105 #[allow(dead_code)] async fn start_connection_acceptor(
1108 &self,
1109 transport: Arc<dyn crate::transport::Transport>,
1110 addr: std::net::SocketAddr,
1111 transport_type: crate::transport::TransportType,
1112 ) -> Result<()> {
1113 info!(
1114 "Starting connection acceptor for {:?} on {}",
1115 transport_type, addr
1116 );
1117
1118 let event_tx = self.event_tx.clone();
1120 let _peer_id = self.peer_id.clone();
1121 let peers = Arc::clone(&self.peers);
1122 let rate_limiter = Arc::clone(&self.rate_limiter);
1125
1126 tokio::spawn(async move {
1128 loop {
1129 match transport.accept().await {
1130 Ok(connection) => {
1131 let remote_addr = connection.remote_addr();
1132 let connection_peer_id =
1133 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1134
1135 let socket_addr = remote_addr.socket_addr();
1137 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1138 continue;
1140 }
1141
1142 info!(
1143 "Accepted {:?} connection from {} (peer: {})",
1144 transport_type, remote_addr, connection_peer_id
1145 );
1146
1147 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1149
1150 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1152
1153 spawn_connection_handler(
1155 connection,
1156 connection_peer_id,
1157 event_tx.clone(),
1158 Arc::clone(&peers),
1159 );
1160 }
1161 Err(e) => {
1162 warn!(
1163 "Failed to accept {:?} connection on {}: {}",
1164 transport_type, addr, e
1165 );
1166
1167 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1169 }
1170 }
1171 }
1172 });
1173
1174 info!(
1175 "Connection acceptor background task started for {:?} on {}",
1176 transport_type, addr
1177 );
1178 Ok(())
1179 }
1180
1181 async fn start_message_receiving_system(&self) -> Result<()> {
1183 info!("Starting message receiving system");
1184 let dual = self.dual_node.clone();
1185 let event_tx = self.event_tx.clone();
1186
1187 tokio::spawn(async move {
1188 loop {
1189 match dual.receive_any().await {
1190 Ok((_peer_id, bytes)) => {
1191 #[allow(clippy::collapsible_if)]
1193 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1194 if let (Some(protocol), Some(data), Some(from)) = (
1195 value.get("protocol").and_then(|v| v.as_str()),
1196 value.get("data").and_then(|v| v.as_array()),
1197 value.get("from").and_then(|v| v.as_str()),
1198 ) {
1199 let payload: Vec<u8> = data
1200 .iter()
1201 .filter_map(|v| v.as_u64().map(|n| n as u8))
1202 .collect();
1203 let _ = event_tx.send(P2PEvent::Message {
1204 topic: protocol.to_string(),
1205 source: from.to_string(),
1206 data: payload,
1207 });
1208 }
1209 }
1210 }
1211 Err(e) => {
1212 warn!("Receive error: {}", e);
1213 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1214 }
1215 }
1216 }
1217 });
1218
1219 Ok(())
1220 }
1221
1222 #[allow(dead_code)]
1224 async fn handle_received_message(
1225 &self,
1226 message_data: Vec<u8>,
1227 peer_id: &PeerId,
1228 _protocol: &str,
1229 event_tx: &broadcast::Sender<P2PEvent>,
1230 ) -> Result<()> {
1231 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1235 Ok(message) => {
1236 if let (Some(protocol), Some(data), Some(from)) = (
1237 message.get("protocol").and_then(|v| v.as_str()),
1238 message.get("data").and_then(|v| v.as_array()),
1239 message.get("from").and_then(|v| v.as_str()),
1240 ) {
1241 let data_bytes: Vec<u8> = data
1243 .iter()
1244 .filter_map(|v| v.as_u64().map(|n| n as u8))
1245 .collect();
1246
1247 let event = P2PEvent::Message {
1249 topic: protocol.to_string(),
1250 source: from.to_string(),
1251 data: data_bytes,
1252 };
1253
1254 let _ = event_tx.send(event);
1255 debug!("Generated message event from peer: {}", peer_id);
1256 }
1257 }
1258 Err(e) => {
1259 warn!("Failed to parse received message from {}: {}", peer_id, e);
1260 }
1261 }
1262
1263 Ok(())
1264 }
1265
1266 pub async fn run(&self) -> Result<()> {
1272 if !*self.running.read().await {
1273 self.start().await?;
1274 }
1275
1276 info!("P2P node running...");
1277
1278 loop {
1280 if !*self.running.read().await {
1281 break;
1282 }
1283
1284 self.periodic_tasks().await?;
1286
1287 tokio::time::sleep(Duration::from_millis(100)).await;
1289 }
1290
1291 info!("P2P node stopped");
1292 Ok(())
1293 }
1294
1295 pub async fn stop(&self) -> Result<()> {
1297 info!("Stopping P2P node...");
1298
1299 *self.running.write().await = false;
1301
1302 self.disconnect_all_peers().await?;
1304
1305 if let Some(ref resource_manager) = self.resource_manager {
1307 resource_manager.shutdown().await.map_err(|e| {
1308 P2PError::Network(crate::error::NetworkError::ProtocolError(
1309 format!("Failed to shutdown resource manager: {e}").into(),
1310 ))
1311 })?;
1312 info!("Production resource manager stopped");
1313 }
1314
1315 info!("P2P node stopped");
1316 Ok(())
1317 }
1318
1319 pub async fn shutdown(&self) -> Result<()> {
1321 self.stop().await
1322 }
1323
1324 pub async fn is_running(&self) -> bool {
1326 *self.running.read().await
1327 }
1328
1329 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1331 self.listen_addrs.read().await.clone()
1332 }
1333
1334 pub async fn connected_peers(&self) -> Vec<PeerId> {
1336 self.active_connections
1339 .read()
1340 .await
1341 .iter()
1342 .cloned()
1343 .collect()
1344 }
1345
1346 pub async fn peer_count(&self) -> usize {
1348 self.active_connections.read().await.len()
1349 }
1350
1351 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1353 self.peers.read().await.get(peer_id).cloned()
1354 }
1355
1356 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1368 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1370
1371 let peers = self.peers.read().await;
1372
1373 for (peer_id, peer_info) in peers.iter() {
1375 for peer_addr in &peer_info.addresses {
1377 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1378 && peer_socket == socket_addr
1379 {
1380 return Some(peer_id.clone());
1381 }
1382 }
1383 }
1384
1385 None
1386 }
1387
1388 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1394 let active = self.active_connections.read().await;
1395 let peers = self.peers.read().await;
1396
1397 active
1398 .iter()
1399 .map(|peer_id| {
1400 let addresses = peers
1401 .get(peer_id)
1402 .map(|info| info.addresses.clone())
1403 .unwrap_or_default();
1404 (peer_id.clone(), addresses)
1405 })
1406 .collect()
1407 }
1408
1409 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1421 self.active_connections.write().await.remove(peer_id);
1423 self.peers.write().await.remove(peer_id).is_some()
1425 }
1426
1427 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1440 self.peers.read().await.contains_key(peer_id)
1441 }
1442
1443 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1445 info!("Connecting to peer at: {}", address);
1446
1447 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1449 Some(resource_manager.acquire_connection().await?)
1450 } else {
1451 None
1452 };
1453
1454 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1456 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1457 format!("{}: {}", address, e).into(),
1458 ))
1459 })?;
1460
1461 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1464 if normalized_addr != socket_addr {
1465 info!(
1466 "Normalized wildcard address {} to loopback {}",
1467 socket_addr, normalized_addr
1468 );
1469 }
1470
1471 let addr_list = vec![normalized_addr];
1473 let peer_id = match tokio::time::timeout(
1474 self.config.connection_timeout,
1475 self.dual_node.connect_happy_eyeballs(&addr_list),
1476 )
1477 .await
1478 {
1479 Ok(Ok(peer)) => {
1480 let connected_peer_id =
1481 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1482 info!("Successfully connected to peer: {}", connected_peer_id);
1483 connected_peer_id
1484 }
1485 Ok(Err(e)) => {
1486 warn!("Failed to connect to peer at {}: {}", address, e);
1487 return Err(P2PError::Transport(
1488 crate::error::TransportError::ConnectionFailed {
1489 addr: normalized_addr,
1490 reason: e.to_string().into(),
1491 },
1492 ));
1493 }
1494 Err(_) => {
1495 warn!(
1496 "Timed out connecting to peer at {} after {:?}",
1497 address, self.config.connection_timeout
1498 );
1499 return Err(P2PError::Timeout(self.config.connection_timeout));
1500 }
1501 };
1502
1503 let peer_info = PeerInfo {
1505 peer_id: peer_id.clone(),
1506 addresses: vec![address.to_string()],
1507 connected_at: Instant::now(),
1508 last_seen: Instant::now(),
1509 status: ConnectionStatus::Connected,
1510 protocols: vec!["p2p-foundation/1.0".to_string()],
1511 heartbeat_count: 0,
1512 };
1513
1514 self.peers.write().await.insert(peer_id.clone(), peer_info);
1516
1517 self.active_connections
1520 .write()
1521 .await
1522 .insert(peer_id.clone());
1523
1524 if let Some(ref resource_manager) = self.resource_manager {
1526 resource_manager.record_bandwidth(0, 0); }
1528
1529 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1531
1532 info!("Connected to peer: {}", peer_id);
1533 Ok(peer_id)
1534 }
1535
1536 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1538 info!("Disconnecting from peer: {}", peer_id);
1539
1540 self.active_connections.write().await.remove(peer_id);
1542
1543 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1544 peer_info.status = ConnectionStatus::Disconnected;
1545
1546 let _ = self
1548 .event_tx
1549 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1550
1551 info!("Disconnected from peer: {}", peer_id);
1552 }
1553
1554 Ok(())
1555 }
1556
1557 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1559 self.active_connections.read().await.contains(peer_id)
1560 }
1561
1562 pub async fn send_message(
1564 &self,
1565 peer_id: &PeerId,
1566 protocol: &str,
1567 data: Vec<u8>,
1568 ) -> Result<()> {
1569 debug!(
1570 "Sending message to peer {} on protocol {}",
1571 peer_id, protocol
1572 );
1573
1574 if let Some(ref resource_manager) = self.resource_manager
1576 && !resource_manager
1577 .check_rate_limit(peer_id, "message")
1578 .await?
1579 {
1580 return Err(P2PError::ResourceExhausted(
1581 format!("Rate limit exceeded for peer {}", peer_id).into(),
1582 ));
1583 }
1584
1585 if !self.peers.read().await.contains_key(peer_id) {
1587 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1588 peer_id.to_string().into(),
1589 )));
1590 }
1591
1592 if !self.is_connection_active(peer_id).await {
1595 debug!(
1596 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1597 peer_id
1598 );
1599
1600 self.remove_peer(peer_id).await;
1602
1603 return Err(P2PError::Network(
1604 crate::error::NetworkError::ConnectionClosed {
1605 peer_id: peer_id.to_string().into(),
1606 },
1607 ));
1608 }
1609
1610 if let Some(ref resource_manager) = self.resource_manager {
1614 resource_manager.record_bandwidth(data.len() as u64, 0);
1615 }
1616
1617 let _message_data = self.create_protocol_message(protocol, data)?;
1619
1620 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1622 tokio::time::timeout(self.config.connection_timeout, send_fut)
1623 .await
1624 .map_err(|_| {
1625 P2PError::Transport(crate::error::TransportError::StreamError(
1626 "Timed out sending message".into(),
1627 ))
1628 })?
1629 .map_err(|e| {
1630 P2PError::Transport(crate::error::TransportError::StreamError(
1631 e.to_string().into(),
1632 ))
1633 })
1634 }
1635
1636 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1638 use serde_json::json;
1639
1640 let timestamp = std::time::SystemTime::now()
1641 .duration_since(std::time::UNIX_EPOCH)
1642 .map_err(|e| {
1643 P2PError::Network(NetworkError::ProtocolError(
1644 format!("System time error: {}", e).into(),
1645 ))
1646 })?
1647 .as_secs();
1648
1649 let message = json!({
1651 "protocol": protocol,
1652 "data": data,
1653 "from": self.peer_id,
1654 "timestamp": timestamp
1655 });
1656
1657 serde_json::to_vec(&message).map_err(|e| {
1658 P2PError::Transport(crate::error::TransportError::StreamError(
1659 format!("Failed to serialize message: {e}").into(),
1660 ))
1661 })
1662 }
1663
1664 }
1666
1667#[allow(dead_code)]
1669fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1670 use serde_json::json;
1671
1672 let timestamp = std::time::SystemTime::now()
1673 .duration_since(std::time::UNIX_EPOCH)
1674 .map_err(|e| {
1675 P2PError::Network(NetworkError::ProtocolError(
1676 format!("System time error: {}", e).into(),
1677 ))
1678 })?
1679 .as_secs();
1680
1681 let message = json!({
1683 "protocol": protocol,
1684 "data": data,
1685 "timestamp": timestamp
1686 });
1687
1688 serde_json::to_vec(&message).map_err(|e| {
1689 P2PError::Transport(crate::error::TransportError::StreamError(
1690 format!("Failed to serialize message: {e}").into(),
1691 ))
1692 })
1693}
1694
1695impl P2PNode {
1696 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1698 self.event_tx.subscribe()
1699 }
1700
1701 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1703 self.subscribe_events()
1704 }
1705
1706 pub fn uptime(&self) -> Duration {
1708 self.start_time.elapsed()
1709 }
1710
1711 fn compute_binary_hash() -> [u8; 32] {
1720 if let Some(hash) = std::env::current_exe()
1722 .ok()
1723 .and_then(|exe_path| std::fs::read(&exe_path).ok())
1724 .map(|binary_data| blake3::hash(&binary_data))
1725 {
1726 return *hash.as_bytes();
1727 }
1728 let placeholder = format!(
1731 "saorsa-core-v{}-{}",
1732 env!("CARGO_PKG_VERSION"),
1733 std::env::consts::ARCH
1734 );
1735 let hash = blake3::hash(placeholder.as_bytes());
1736 *hash.as_bytes()
1737 }
1738
1739 #[must_use]
1741 pub fn binary_hash(&self) -> &[u8; 32] {
1742 &self.binary_hash
1743 }
1744
1745 #[must_use]
1747 pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1748 self.entangled_id.as_ref()
1749 }
1750
1751 pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1756 self.entangled_id = Some(entangled_id);
1757 }
1758
1759 pub fn verify_peer_attestation(
1788 &self,
1789 peer_id: &str,
1790 peer_entangled_id: &crate::attestation::EntangledId,
1791 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1792 ) -> crate::attestation::EnforcementDecision {
1793 use crate::attestation::{
1794 AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1795 };
1796
1797 let config = &self.config.attestation_config;
1798
1799 if !config.enabled {
1801 return EnforcementDecision::Skipped;
1802 }
1803
1804 let id_valid = peer_entangled_id.verify(peer_public_key);
1806
1807 let binary_hash = *peer_entangled_id.binary_hash();
1809 let binary_allowed = config.is_binary_allowed(&binary_hash);
1810
1811 match config.enforcement_mode {
1812 EnforcementMode::Off => EnforcementDecision::Skipped,
1813
1814 EnforcementMode::Soft => {
1815 if !id_valid {
1817 warn!(
1818 peer = %peer_id,
1819 binary_hash = %hex::encode(&binary_hash[..8]),
1820 "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1821 );
1822 return EnforcementDecision::AllowWithWarning {
1823 reason: AttestationRejectionReason::IdentityMismatch,
1824 };
1825 }
1826 if !binary_allowed {
1827 warn!(
1828 peer = %peer_id,
1829 binary_hash = %hex::encode(binary_hash),
1830 "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1831 );
1832 return EnforcementDecision::AllowWithWarning {
1833 reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1834 };
1835 }
1836 EnforcementDecision::Allow
1837 }
1838
1839 EnforcementMode::Hard => {
1840 if !id_valid {
1842 error!(
1843 peer = %peer_id,
1844 binary_hash = %hex::encode(&binary_hash[..8]),
1845 "REJECTING peer: Invalid entangled ID derivation"
1846 );
1847 return EnforcementDecision::Reject {
1848 rejection: AttestationRejection::identity_mismatch(),
1849 };
1850 }
1851 if !binary_allowed {
1852 error!(
1853 peer = %peer_id,
1854 binary_hash = %hex::encode(binary_hash),
1855 "REJECTING peer: Binary not in allowlist"
1856 );
1857 return EnforcementDecision::Reject {
1858 rejection: AttestationRejection::binary_not_allowed(binary_hash),
1859 };
1860 }
1861
1862 info!(
1863 peer = %peer_id,
1864 entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
1865 "Peer attestation verified successfully (hard mode)"
1866 );
1867 EnforcementDecision::Allow
1868 }
1869 }
1870 }
1871
1872 #[must_use]
1880 pub fn verify_peer_attestation_simple(
1881 &self,
1882 peer_id: &str,
1883 peer_entangled_id: &crate::attestation::EntangledId,
1884 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1885 ) -> bool {
1886 self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
1887 .should_allow()
1888 }
1889
1890 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1900 if let Some(ref resource_manager) = self.resource_manager {
1901 Ok(resource_manager.get_metrics().await)
1902 } else {
1903 Err(P2PError::Network(
1904 crate::error::NetworkError::ProtocolError(
1905 "Production resource manager not enabled".to_string().into(),
1906 ),
1907 ))
1908 }
1909 }
1910
1911 async fn connection_lifecycle_monitor(
1914 dual_node: Arc<DualStackNetworkNode>,
1915 active_connections: Arc<RwLock<HashSet<String>>>,
1916 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1917 event_tx: broadcast::Sender<P2PEvent>,
1918 geo_provider: Arc<BgpGeoProvider>,
1919 local_peer_id: String,
1920 ) {
1921 use crate::transport::ant_quic_adapter::ConnectionEvent;
1922
1923 let mut event_rx = dual_node.subscribe_connection_events();
1924
1925 info!("Connection lifecycle monitor started");
1926
1927 loop {
1928 match event_rx.recv().await {
1929 Ok(event) => {
1930 match event {
1931 ConnectionEvent::Established {
1932 peer_id,
1933 remote_address,
1934 } => {
1935 let peer_id_str =
1936 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1937 debug!(
1938 "Connection established: peer={}, addr={}",
1939 peer_id_str, remote_address
1940 );
1941
1942 let ip = remote_address.ip();
1945 let is_rejected = match ip {
1946 std::net::IpAddr::V4(v4) => {
1947 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1949 geo_provider.is_hosting_asn(asn)
1950 || geo_provider.is_vpn_asn(asn)
1951 } else {
1952 false
1953 }
1954 }
1955 std::net::IpAddr::V6(v6) => {
1956 let info = geo_provider.lookup(v6);
1957 info.is_hosting_provider || info.is_vpn_provider
1958 }
1959 };
1960
1961 if is_rejected {
1962 info!(
1963 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
1964 peer_id_str, remote_address
1965 );
1966
1967 let rejection = RejectionMessage {
1969 reason: RejectionReason::GeoIpPolicy,
1970 message:
1971 "Connection rejected: Hosting/VPN providers not allowed"
1972 .to_string(),
1973 suggested_target: None, };
1975
1976 if let Ok(data) = serde_json::to_vec(&rejection) {
1978 let timestamp = std::time::SystemTime::now()
1980 .duration_since(std::time::UNIX_EPOCH)
1981 .unwrap_or_default()
1982 .as_secs();
1983
1984 let message = serde_json::json!({
1985 "protocol": "control",
1986 "data": data,
1987 "from": local_peer_id,
1988 "timestamp": timestamp
1989 });
1990
1991 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
1992 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
1996
1997 tokio::task::yield_now().await;
2000 }
2001 }
2002
2003 continue;
2007 }
2008
2009 active_connections.write().await.insert(peer_id_str.clone());
2011
2012 let mut peers_lock = peers.write().await;
2014 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2015 peer_info.status = ConnectionStatus::Connected;
2016 peer_info.connected_at = Instant::now();
2017 } else {
2018 debug!("Registering new incoming peer: {}", peer_id_str);
2020 peers_lock.insert(
2021 peer_id_str.clone(),
2022 PeerInfo {
2023 peer_id: peer_id_str.clone(),
2024 addresses: vec![remote_address.to_string()],
2025 status: ConnectionStatus::Connected,
2026 last_seen: Instant::now(),
2027 connected_at: Instant::now(),
2028 protocols: Vec::new(),
2029 heartbeat_count: 0,
2030 },
2031 );
2032 }
2033
2034 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2036 }
2037 ConnectionEvent::Lost { peer_id, reason } => {
2038 let peer_id_str =
2039 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2040 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2041
2042 active_connections.write().await.remove(&peer_id_str);
2044
2045 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2047 peer_info.status = ConnectionStatus::Disconnected;
2048 peer_info.last_seen = Instant::now();
2049 }
2050
2051 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2053 }
2054 ConnectionEvent::Failed { peer_id, reason } => {
2055 let peer_id_str =
2056 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2057 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2058
2059 active_connections.write().await.remove(&peer_id_str);
2061
2062 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2064 peer_info.status = ConnectionStatus::Failed(reason.clone());
2065 }
2066
2067 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2069 }
2070 }
2071 }
2072 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2073 warn!(
2074 "Connection event monitor lagged, skipped {} events",
2075 skipped
2076 );
2077 continue;
2078 }
2079 Err(broadcast::error::RecvError::Closed) => {
2080 info!("Connection event channel closed, stopping monitor");
2081 break;
2082 }
2083 }
2084 }
2085
2086 info!("Connection lifecycle monitor stopped");
2087 }
2088
2089 async fn start_connection_monitor(&self) {
2091 debug!("Connection monitor already running from initialization");
2095 }
2096
2097 async fn keepalive_task(
2103 active_connections: Arc<RwLock<HashSet<String>>>,
2104 dual_node: Arc<DualStackNetworkNode>,
2105 shutdown: Arc<AtomicBool>,
2106 ) {
2107 use tokio::time::{Duration, interval};
2108
2109 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2113 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2114
2115 info!(
2116 "Keepalive task started (interval: {}s)",
2117 KEEPALIVE_INTERVAL_SECS
2118 );
2119
2120 loop {
2121 if shutdown.load(Ordering::Relaxed) {
2123 info!("Keepalive task shutting down");
2124 break;
2125 }
2126
2127 interval.tick().await;
2128
2129 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2131
2132 if peers.is_empty() {
2133 trace!("Keepalive: no active connections");
2134 continue;
2135 }
2136
2137 debug!("Sending keepalive to {} active connections", peers.len());
2138
2139 for peer_id in peers {
2141 match dual_node
2142 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
2143 .await
2144 {
2145 Ok(_) => {
2146 trace!("Keepalive sent to peer: {}", peer_id);
2147 }
2148 Err(e) => {
2149 debug!(
2150 "Failed to send keepalive to peer {}: {} (connection may have closed)",
2151 peer_id, e
2152 );
2153 }
2155 }
2156 }
2157 }
2158
2159 info!("Keepalive task stopped");
2160 }
2161
2162 pub async fn health_check(&self) -> Result<()> {
2164 if let Some(ref resource_manager) = self.resource_manager {
2165 resource_manager.health_check().await
2166 } else {
2167 let peer_count = self.peer_count().await;
2169 if peer_count > self.config.max_connections {
2170 Err(P2PError::Network(
2171 crate::error::NetworkError::ProtocolError(
2172 format!("Too many connections: {peer_count}").into(),
2173 ),
2174 ))
2175 } else {
2176 Ok(())
2177 }
2178 }
2179 }
2180
2181 pub fn production_config(&self) -> Option<&ProductionConfig> {
2183 self.config.production_config.as_ref()
2184 }
2185
2186 pub fn is_production_mode(&self) -> bool {
2188 self.resource_manager.is_some()
2189 }
2190
2191 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2193 self.dht.as_ref()
2194 }
2195
2196 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2198 if let Some(ref dht) = self.dht {
2199 let mut dht_instance = dht.write().await;
2200 let dht_key = crate::dht::DhtKey::from_bytes(key);
2201 dht_instance
2202 .store(&dht_key, value.clone())
2203 .await
2204 .map_err(|e| {
2205 P2PError::Dht(crate::error::DhtError::StoreFailed(
2206 format!("{:?}: {e}", key).into(),
2207 ))
2208 })?;
2209
2210 Ok(())
2211 } else {
2212 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2213 "DHT not enabled".to_string().into(),
2214 )))
2215 }
2216 }
2217
2218 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2220 if let Some(ref dht) = self.dht {
2221 let dht_instance = dht.read().await;
2222 let dht_key = crate::dht::DhtKey::from_bytes(key);
2223 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2224 P2PError::Dht(crate::error::DhtError::StoreFailed(
2225 format!("Retrieve failed: {e}").into(),
2226 ))
2227 })?;
2228
2229 Ok(record_result)
2230 } else {
2231 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2232 "DHT not enabled".to_string().into(),
2233 )))
2234 }
2235 }
2236
2237 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2239 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2240 let mut manager = bootstrap_manager.write().await;
2241 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2242 .iter()
2243 .filter_map(|addr| addr.parse().ok())
2244 .collect();
2245 let contact = ContactEntry::new(peer_id, socket_addresses);
2246 manager.add_contact(contact).await.map_err(|e| {
2247 P2PError::Network(crate::error::NetworkError::ProtocolError(
2248 format!("Failed to add peer to bootstrap cache: {e}").into(),
2249 ))
2250 })?;
2251 }
2252 Ok(())
2253 }
2254
2255 pub async fn update_peer_metrics(
2257 &self,
2258 peer_id: &PeerId,
2259 success: bool,
2260 latency_ms: Option<u64>,
2261 _error: Option<String>,
2262 ) -> Result<()> {
2263 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2264 let mut manager = bootstrap_manager.write().await;
2265
2266 let metrics = QualityMetrics {
2268 success_rate: if success { 1.0 } else { 0.0 },
2269 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2270 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2272 last_successful_connection: if success {
2273 chrono::Utc::now()
2274 } else {
2275 chrono::Utc::now() - chrono::Duration::hours(1)
2276 },
2277 uptime_score: 0.5,
2278 };
2279
2280 manager
2281 .update_contact_metrics(peer_id, metrics)
2282 .await
2283 .map_err(|e| {
2284 P2PError::Network(crate::error::NetworkError::ProtocolError(
2285 format!("Failed to update peer metrics: {e}").into(),
2286 ))
2287 })?;
2288 }
2289 Ok(())
2290 }
2291
2292 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2294 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2295 let manager = bootstrap_manager.read().await;
2296 let stats = manager.get_stats().await.map_err(|e| {
2297 P2PError::Network(crate::error::NetworkError::ProtocolError(
2298 format!("Failed to get bootstrap stats: {e}").into(),
2299 ))
2300 })?;
2301 Ok(Some(stats))
2302 } else {
2303 Ok(None)
2304 }
2305 }
2306
2307 pub async fn cached_peer_count(&self) -> usize {
2309 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2310 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2311 {
2312 return stats.total_contacts;
2313 }
2314 0
2315 }
2316
2317 async fn connect_bootstrap_peers(&self) -> Result<()> {
2319 let mut bootstrap_contacts = Vec::new();
2320 let mut used_cache = false;
2321 let mut seen_addresses = std::collections::HashSet::new();
2322
2323 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2325 self.config.bootstrap_peers_str.clone()
2326 } else {
2327 self.config
2329 .bootstrap_peers
2330 .iter()
2331 .map(|addr| addr.to_string())
2332 .collect::<Vec<_>>()
2333 };
2334
2335 if !cli_bootstrap_peers.is_empty() {
2336 info!(
2337 "Using {} CLI-provided bootstrap peers (priority)",
2338 cli_bootstrap_peers.len()
2339 );
2340 for addr in &cli_bootstrap_peers {
2341 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2342 seen_addresses.insert(socket_addr);
2343 let contact = ContactEntry::new(
2344 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2345 vec![socket_addr],
2346 );
2347 bootstrap_contacts.push(contact);
2348 } else {
2349 warn!("Invalid bootstrap address format: {}", addr);
2350 }
2351 }
2352 }
2353
2354 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2356 let manager = bootstrap_manager.read().await;
2357 match manager.get_bootstrap_peers(20).await {
2358 Ok(contacts) => {
2360 if !contacts.is_empty() {
2361 let mut added_from_cache = 0;
2362 for contact in contacts {
2363 let new_addresses: Vec<_> = contact
2365 .addresses
2366 .iter()
2367 .filter(|addr| !seen_addresses.contains(addr))
2368 .copied()
2369 .collect();
2370
2371 if !new_addresses.is_empty() {
2372 for addr in &new_addresses {
2373 seen_addresses.insert(*addr);
2374 }
2375 let mut contact = contact.clone();
2376 contact.addresses = new_addresses;
2377 bootstrap_contacts.push(contact);
2378 added_from_cache += 1;
2379 }
2380 }
2381 if added_from_cache > 0 {
2382 info!(
2383 "Added {} cached bootstrap peers (supplementing CLI peers)",
2384 added_from_cache
2385 );
2386 used_cache = true;
2387 }
2388 }
2389 }
2390 Err(e) => {
2391 warn!("Failed to get cached bootstrap peers: {}", e);
2392 }
2393 }
2394 }
2395
2396 if bootstrap_contacts.is_empty() {
2397 info!("No bootstrap peers configured and no cached peers available");
2398 return Ok(());
2399 }
2400
2401 let mut successful_connections = 0;
2403 for contact in bootstrap_contacts {
2404 for addr in &contact.addresses {
2405 match self.connect_peer(&addr.to_string()).await {
2406 Ok(peer_id) => {
2407 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2408 successful_connections += 1;
2409
2410 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2412 let mut manager = bootstrap_manager.write().await;
2413 let mut updated_contact = contact.clone();
2414 updated_contact.peer_id = peer_id.clone();
2415 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2418 warn!("Failed to update bootstrap cache: {}", e);
2419 }
2420 }
2421 break; }
2423 Err(e) => {
2424 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2425
2426 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2428 let mut manager = bootstrap_manager.write().await;
2429 let mut updated_contact = contact.clone();
2430 updated_contact.update_connection_result(
2431 false,
2432 None,
2433 Some(e.to_string()),
2434 );
2435
2436 if let Err(e) = manager.add_contact(updated_contact).await {
2437 warn!("Failed to update bootstrap cache: {}", e);
2438 }
2439 }
2440 }
2441 }
2442 }
2443 }
2444
2445 if successful_connections == 0 {
2446 if !used_cache {
2447 warn!("Failed to connect to any bootstrap peers");
2448 }
2449 return Ok(());
2452 }
2453 info!(
2454 "Successfully connected to {} bootstrap peers",
2455 successful_connections
2456 );
2457
2458 Ok(())
2459 }
2460
2461 async fn disconnect_all_peers(&self) -> Result<()> {
2463 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2464
2465 for peer_id in peer_ids {
2466 self.disconnect_peer(&peer_id).await?;
2467 }
2468
2469 Ok(())
2470 }
2471
2472 async fn periodic_tasks(&self) -> Result<()> {
2474 Ok(())
2480 }
2481}
2482
2483#[async_trait::async_trait]
2485pub trait NetworkSender: Send + Sync {
2486 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2488
2489 fn local_peer_id(&self) -> &PeerId;
2491}
2492
2493#[derive(Clone)]
2495pub struct P2PNetworkSender {
2496 peer_id: PeerId,
2497 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2499}
2500
2501impl P2PNetworkSender {
2502 pub fn new(
2503 peer_id: PeerId,
2504 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2505 ) -> Self {
2506 Self { peer_id, send_tx }
2507 }
2508}
2509
2510#[async_trait::async_trait]
2512impl NetworkSender for P2PNetworkSender {
2513 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2515 self.send_tx
2516 .send((peer_id.clone(), protocol.to_string(), data))
2517 .map_err(|_| {
2518 P2PError::Network(crate::error::NetworkError::ProtocolError(
2519 "Failed to send message via channel".to_string().into(),
2520 ))
2521 })?;
2522 Ok(())
2523 }
2524
2525 fn local_peer_id(&self) -> &PeerId {
2527 &self.peer_id
2528 }
2529}
2530
2531pub struct NodeBuilder {
2533 config: NodeConfig,
2534}
2535
2536impl Default for NodeBuilder {
2537 fn default() -> Self {
2538 Self::new()
2539 }
2540}
2541
2542impl NodeBuilder {
2543 pub fn new() -> Self {
2545 Self {
2546 config: NodeConfig::default(),
2547 }
2548 }
2549
2550 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2552 self.config.peer_id = Some(peer_id);
2553 self
2554 }
2555
2556 pub fn listen_on(mut self, addr: &str) -> Self {
2558 if let Ok(multiaddr) = addr.parse() {
2559 self.config.listen_addrs.push(multiaddr);
2560 }
2561 self
2562 }
2563
2564 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2566 if let Ok(multiaddr) = addr.parse() {
2567 self.config.bootstrap_peers.push(multiaddr);
2568 }
2569 self.config.bootstrap_peers_str.push(addr.to_string());
2570 self
2571 }
2572
2573 pub fn with_ipv6(mut self, enable: bool) -> Self {
2575 self.config.enable_ipv6 = enable;
2576 self
2577 }
2578
2579 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2583 self.config.connection_timeout = timeout;
2584 self
2585 }
2586
2587 pub fn with_max_connections(mut self, max: usize) -> Self {
2589 self.config.max_connections = max;
2590 self
2591 }
2592
2593 pub fn with_production_mode(mut self) -> Self {
2595 self.config.production_config = Some(ProductionConfig::default());
2596 self
2597 }
2598
2599 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2601 self.config.production_config = Some(production_config);
2602 self
2603 }
2604
2605 pub fn with_diversity_config(
2607 mut self,
2608 diversity_config: crate::security::IPDiversityConfig,
2609 ) -> Self {
2610 self.config.diversity_config = Some(diversity_config);
2611 self
2612 }
2613
2614 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2616 self.config.dht_config = dht_config;
2617 self
2618 }
2619
2620 pub fn with_default_dht(mut self) -> Self {
2622 self.config.dht_config = DHTConfig::default();
2623 self
2624 }
2625
2626 pub async fn build(self) -> Result<P2PNode> {
2628 P2PNode::new(self.config).await
2629 }
2630}
2631
2632#[cfg(test)]
2633#[allow(clippy::unwrap_used, clippy::expect_used)]
2634mod diversity_tests {
2635 use super::*;
2636 use crate::security::IPDiversityConfig;
2637
2638 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2639 let diversity_config = config.diversity_config.clone().unwrap_or_default();
2640 if let Some(ref cache_config) = config.bootstrap_cache_config {
2641 BootstrapManager::with_full_config(
2642 cache_config.clone(),
2643 crate::rate_limit::JoinRateLimiterConfig::default(),
2644 diversity_config,
2645 )
2646 .await
2647 .expect("bootstrap manager")
2648 } else {
2649 BootstrapManager::with_full_config(
2650 crate::bootstrap::CacheConfig::default(),
2651 crate::rate_limit::JoinRateLimiterConfig::default(),
2652 diversity_config,
2653 )
2654 .await
2655 .expect("bootstrap manager")
2656 }
2657 }
2658
2659 #[tokio::test]
2660 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2661 let config = NodeConfig {
2662 diversity_config: Some(IPDiversityConfig::testnet()),
2663 ..Default::default()
2664 };
2665
2666 let manager = build_bootstrap_manager_like_prod(&config).await;
2667 assert!(manager.diversity_config().is_relaxed());
2668 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2669 }
2670}
2671
2672#[allow(dead_code)] async fn handle_received_message_standalone(
2675 message_data: Vec<u8>,
2676 peer_id: &PeerId,
2677 _protocol: &str,
2678 event_tx: &broadcast::Sender<P2PEvent>,
2679) -> Result<()> {
2680 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2682 Ok(message) => {
2683 if let (Some(protocol), Some(data), Some(from)) = (
2684 message.get("protocol").and_then(|v| v.as_str()),
2685 message.get("data").and_then(|v| v.as_array()),
2686 message.get("from").and_then(|v| v.as_str()),
2687 ) {
2688 let data_bytes: Vec<u8> = data
2690 .iter()
2691 .filter_map(|v| v.as_u64().map(|n| n as u8))
2692 .collect();
2693
2694 let event = P2PEvent::Message {
2696 topic: protocol.to_string(),
2697 source: from.to_string(),
2698 data: data_bytes,
2699 };
2700
2701 let _ = event_tx.send(event);
2702 debug!("Generated message event from peer: {}", peer_id);
2703 }
2704 }
2705 Err(e) => {
2706 warn!("Failed to parse received message from {}: {}", peer_id, e);
2707 }
2708 }
2709
2710 Ok(())
2711}
2712
2713#[allow(dead_code)]
2717fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2718 match create_protocol_message_static(protocol, data) {
2719 Ok(msg) => Some(msg),
2720 Err(e) => {
2721 warn!("Failed to create protocol message: {}", e);
2722 None
2723 }
2724 }
2725}
2726
2727#[allow(dead_code)]
2729async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2730 match result {
2731 Ok(_) => {
2732 debug!("Message sent to peer {} via transport layer", peer_id);
2733 }
2734 Err(e) => {
2735 warn!("Failed to send message to peer {}: {}", peer_id, e);
2736 }
2737 }
2738}
2739
2740#[allow(dead_code)] fn check_rate_limit(
2743 rate_limiter: &RateLimiter,
2744 socket_addr: &std::net::SocketAddr,
2745 remote_addr: &NetworkAddress,
2746) -> Result<()> {
2747 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2748 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2749 e
2750 })
2751}
2752
2753#[allow(dead_code)] async fn register_new_peer(
2756 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2757 peer_id: &PeerId,
2758 remote_addr: &NetworkAddress,
2759) {
2760 let mut peers_guard = peers.write().await;
2761 let peer_info = PeerInfo {
2762 peer_id: peer_id.clone(),
2763 addresses: vec![remote_addr.to_string()],
2764 connected_at: tokio::time::Instant::now(),
2765 last_seen: tokio::time::Instant::now(),
2766 status: ConnectionStatus::Connected,
2767 protocols: vec!["p2p-chat/1.0.0".to_string()],
2768 heartbeat_count: 0,
2769 };
2770 peers_guard.insert(peer_id.clone(), peer_info);
2771}
2772
2773#[allow(dead_code)] fn spawn_connection_handler(
2776 connection: Box<dyn crate::transport::Connection>,
2777 peer_id: PeerId,
2778 event_tx: broadcast::Sender<P2PEvent>,
2779 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2780) {
2781 tokio::spawn(async move {
2782 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2783 });
2784}
2785
2786#[allow(dead_code)] async fn handle_peer_connection(
2789 mut connection: Box<dyn crate::transport::Connection>,
2790 peer_id: PeerId,
2791 event_tx: broadcast::Sender<P2PEvent>,
2792 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2793) {
2794 loop {
2795 match connection.receive().await {
2796 Ok(message_data) => {
2797 debug!(
2798 "Received {} bytes from peer: {}",
2799 message_data.len(),
2800 peer_id
2801 );
2802
2803 if let Err(e) = handle_received_message_standalone(
2805 message_data,
2806 &peer_id,
2807 "unknown", &event_tx,
2809 )
2810 .await
2811 {
2812 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2813 }
2814 }
2815 Err(e) => {
2816 warn!("Failed to receive message from {}: {}", peer_id, e);
2817
2818 if !connection.is_alive().await {
2820 info!("Connection to {} is dead, removing peer", peer_id);
2821
2822 remove_peer(&peers, &peer_id).await;
2824
2825 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2827
2828 break; }
2830
2831 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2833 }
2834 }
2835 }
2836}
2837
2838#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2841 let mut peers_guard = peers.write().await;
2842 peers_guard.remove(peer_id);
2843}
2844
2845#[allow(dead_code)]
2847async fn update_peer_heartbeat(
2848 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2849 peer_id: &PeerId,
2850) -> Result<()> {
2851 let mut peers_guard = peers.write().await;
2852 match peers_guard.get_mut(peer_id) {
2853 Some(peer_info) => {
2854 peer_info.last_seen = Instant::now();
2855 peer_info.heartbeat_count += 1;
2856 Ok(())
2857 }
2858 None => {
2859 warn!("Received heartbeat from unknown peer: {}", peer_id);
2860 Err(P2PError::Network(NetworkError::PeerNotFound(
2861 format!("Peer {} not found", peer_id).into(),
2862 )))
2863 }
2864 }
2865}
2866
2867#[allow(dead_code)]
2869async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
2870 if let Some(manager) = resource_manager {
2871 let metrics = manager.get_metrics().await;
2872 (metrics.memory_used, metrics.cpu_usage)
2873 } else {
2874 (0, 0.0)
2875 }
2876}
2877
2878#[cfg(test)]
2879mod tests {
2880 use super::*;
2881 use std::time::Duration;
2883 use tokio::time::timeout;
2884
2885 fn create_test_node_config() -> NodeConfig {
2891 NodeConfig {
2892 peer_id: Some("test_peer_123".to_string()),
2893 listen_addrs: vec![
2894 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2895 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2896 ],
2897 listen_addr: std::net::SocketAddr::new(
2898 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2899 0,
2900 ),
2901 bootstrap_peers: vec![],
2902 bootstrap_peers_str: vec![],
2903 enable_ipv6: true,
2904
2905 connection_timeout: Duration::from_secs(2),
2906 keep_alive_interval: Duration::from_secs(30),
2907 max_connections: 100,
2908 max_incoming_connections: 50,
2909 dht_config: DHTConfig::default(),
2910 security_config: SecurityConfig::default(),
2911 production_config: None,
2912 bootstrap_cache_config: None,
2913 diversity_config: None,
2914 attestation_config: crate::attestation::AttestationConfig::default(),
2915 }
2916 }
2917
2918 #[tokio::test]
2922 async fn test_node_config_default() {
2923 let config = NodeConfig::default();
2924
2925 assert!(config.peer_id.is_none());
2926 assert_eq!(config.listen_addrs.len(), 2);
2927 assert!(config.enable_ipv6);
2928 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2930 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2931 }
2932
2933 #[tokio::test]
2934 async fn test_dht_config_default() {
2935 let config = DHTConfig::default();
2936
2937 assert_eq!(config.k_value, 20);
2938 assert_eq!(config.alpha_value, 5);
2939 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2940 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2941 }
2942
2943 #[tokio::test]
2944 async fn test_security_config_default() {
2945 let config = SecurityConfig::default();
2946
2947 assert!(config.enable_noise);
2948 assert!(config.enable_tls);
2949 assert_eq!(config.trust_level, TrustLevel::Basic);
2950 }
2951
2952 #[test]
2953 fn test_trust_level_variants() {
2954 let _none = TrustLevel::None;
2956 let _basic = TrustLevel::Basic;
2957 let _full = TrustLevel::Full;
2958
2959 assert_eq!(TrustLevel::None, TrustLevel::None);
2961 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2962 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2963 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2964 }
2965
2966 #[test]
2967 fn test_connection_status_variants() {
2968 let connecting = ConnectionStatus::Connecting;
2969 let connected = ConnectionStatus::Connected;
2970 let disconnecting = ConnectionStatus::Disconnecting;
2971 let disconnected = ConnectionStatus::Disconnected;
2972 let failed = ConnectionStatus::Failed("test error".to_string());
2973
2974 assert_eq!(connecting, ConnectionStatus::Connecting);
2975 assert_eq!(connected, ConnectionStatus::Connected);
2976 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2977 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2978 assert_ne!(connecting, connected);
2979
2980 if let ConnectionStatus::Failed(msg) = failed {
2981 assert_eq!(msg, "test error");
2982 } else {
2983 panic!("Expected Failed status");
2984 }
2985 }
2986
2987 #[tokio::test]
2988 async fn test_node_creation() -> Result<()> {
2989 let config = create_test_node_config();
2990 let node = P2PNode::new(config).await?;
2991
2992 assert_eq!(node.peer_id(), "test_peer_123");
2993 assert!(!node.is_running().await);
2994 assert_eq!(node.peer_count().await, 0);
2995 assert!(node.connected_peers().await.is_empty());
2996
2997 Ok(())
2998 }
2999
3000 #[tokio::test]
3001 async fn test_node_creation_without_peer_id() -> Result<()> {
3002 let mut config = create_test_node_config();
3003 config.peer_id = None;
3004
3005 let node = P2PNode::new(config).await?;
3006
3007 assert!(node.peer_id().starts_with("peer_"));
3009 assert!(!node.is_running().await);
3010
3011 Ok(())
3012 }
3013
3014 #[tokio::test]
3015 async fn test_node_lifecycle() -> Result<()> {
3016 let config = create_test_node_config();
3017 let node = P2PNode::new(config).await?;
3018
3019 assert!(!node.is_running().await);
3021
3022 node.start().await?;
3024 assert!(node.is_running().await);
3025
3026 let listen_addrs = node.listen_addrs().await;
3028 assert!(
3029 !listen_addrs.is_empty(),
3030 "Expected at least one listening address"
3031 );
3032
3033 node.stop().await?;
3035 assert!(!node.is_running().await);
3036
3037 Ok(())
3038 }
3039
3040 #[tokio::test]
3041 async fn test_peer_connection() -> Result<()> {
3042 let config1 = create_test_node_config();
3043 let mut config2 = create_test_node_config();
3044 config2.peer_id = Some("test_peer_456".to_string());
3045
3046 let node1 = P2PNode::new(config1).await?;
3047 let node2 = P2PNode::new(config2).await?;
3048
3049 node1.start().await?;
3050 node2.start().await?;
3051
3052 let node2_addr = node2
3053 .listen_addrs()
3054 .await
3055 .into_iter()
3056 .find(|a| a.ip().is_ipv4())
3057 .ok_or_else(|| {
3058 P2PError::Network(crate::error::NetworkError::InvalidAddress(
3059 "Node 2 did not expose an IPv4 listen address".into(),
3060 ))
3061 })?;
3062
3063 let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3065
3066 assert_eq!(node1.peer_count().await, 1);
3068
3069 let connected_peers = node1.connected_peers().await;
3071 assert_eq!(connected_peers.len(), 1);
3072 assert_eq!(connected_peers[0], peer_id);
3073
3074 let peer_info = node1.peer_info(&peer_id).await;
3076 assert!(peer_info.is_some());
3077 let info = peer_info.expect("Peer info should exist after adding peer");
3078 assert_eq!(info.peer_id, peer_id);
3079 assert_eq!(info.status, ConnectionStatus::Connected);
3080 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3081
3082 node1.disconnect_peer(&peer_id).await?;
3084 assert_eq!(node1.peer_count().await, 0);
3085
3086 node1.stop().await?;
3087 node2.stop().await?;
3088
3089 Ok(())
3090 }
3091
3092 #[cfg_attr(target_os = "windows", ignore)]
3099 #[tokio::test]
3100 async fn test_event_subscription() -> Result<()> {
3101 let ipv4_localhost =
3106 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3107
3108 let mut config1 = create_test_node_config();
3109 config1.listen_addr = ipv4_localhost;
3110 config1.listen_addrs = vec![ipv4_localhost];
3111 config1.enable_ipv6 = false;
3112
3113 let mut config2 = create_test_node_config();
3114 config2.peer_id = Some("test_peer_456".to_string());
3115 config2.listen_addr = ipv4_localhost;
3116 config2.listen_addrs = vec![ipv4_localhost];
3117 config2.enable_ipv6 = false;
3118
3119 let node1 = P2PNode::new(config1).await?;
3120 let node2 = P2PNode::new(config2).await?;
3121
3122 node1.start().await?;
3123 node2.start().await?;
3124
3125 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3128
3129 let mut events = node1.subscribe_events();
3130
3131 let node2_addr = node2.local_addr().ok_or_else(|| {
3133 P2PError::Network(crate::error::NetworkError::ProtocolError(
3134 "No listening address".to_string().into(),
3135 ))
3136 })?;
3137
3138 let mut peer_id = None;
3141 for attempt in 0..3 {
3142 if attempt > 0 {
3143 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3144 }
3145 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3146 Ok(Ok(id)) => {
3147 peer_id = Some(id);
3148 break;
3149 }
3150 Ok(Err(_)) | Err(_) => continue,
3151 }
3152 }
3153 let peer_id = peer_id.ok_or_else(|| {
3154 P2PError::Network(crate::error::NetworkError::ProtocolError(
3155 "Failed to connect after 3 attempts".to_string().into(),
3156 ))
3157 })?;
3158
3159 let event = timeout(Duration::from_secs(2), events.recv()).await;
3161 assert!(event.is_ok());
3162
3163 let event_result = event
3164 .expect("Should receive event")
3165 .expect("Event should not be error");
3166 match event_result {
3167 P2PEvent::PeerConnected(event_peer_id) => {
3168 assert_eq!(event_peer_id, peer_id);
3169 }
3170 _ => panic!("Expected PeerConnected event"),
3171 }
3172
3173 node1.disconnect_peer(&peer_id).await?;
3175
3176 let event = timeout(Duration::from_secs(2), events.recv()).await;
3178 assert!(event.is_ok());
3179
3180 let event_result = event
3181 .expect("Should receive event")
3182 .expect("Event should not be error");
3183 match event_result {
3184 P2PEvent::PeerDisconnected(event_peer_id) => {
3185 assert_eq!(event_peer_id, peer_id);
3186 }
3187 _ => panic!("Expected PeerDisconnected event"),
3188 }
3189
3190 node1.stop().await?;
3191 node2.stop().await?;
3192
3193 Ok(())
3194 }
3195
3196 #[cfg_attr(target_os = "windows", ignore)]
3198 #[tokio::test]
3199 async fn test_message_sending() -> Result<()> {
3200 let mut config1 = create_test_node_config();
3202 config1.listen_addr =
3203 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3204 let node1 = P2PNode::new(config1).await?;
3205 node1.start().await?;
3206
3207 let mut config2 = create_test_node_config();
3208 config2.listen_addr =
3209 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3210 let node2 = P2PNode::new(config2).await?;
3211 node2.start().await?;
3212
3213 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3215
3216 let node2_addr = node2.local_addr().ok_or_else(|| {
3218 P2PError::Network(crate::error::NetworkError::ProtocolError(
3219 "No listening address".to_string().into(),
3220 ))
3221 })?;
3222
3223 let peer_id =
3225 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3226 Ok(res) => res?,
3227 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3228 };
3229
3230 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3232
3233 let message_data = b"Hello, peer!".to_vec();
3235 let result = match timeout(
3236 Duration::from_millis(500),
3237 node1.send_message(&peer_id, "test-protocol", message_data),
3238 )
3239 .await
3240 {
3241 Ok(res) => res,
3242 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3243 };
3244 if let Err(e) = &result {
3247 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3248 }
3249
3250 let non_existent_peer = "non_existent_peer".to_string();
3252 let result = node1
3253 .send_message(&non_existent_peer, "test-protocol", vec![])
3254 .await;
3255 assert!(result.is_err(), "Sending to non-existent peer should fail");
3256
3257 Ok(())
3258 }
3259
3260 #[tokio::test]
3261 async fn test_remote_mcp_operations() -> Result<()> {
3262 let config = create_test_node_config();
3263 let node = P2PNode::new(config).await?;
3264
3265 node.start().await?;
3267 node.stop().await?;
3268 Ok(())
3269 }
3270
3271 #[tokio::test]
3272 async fn test_health_check() -> Result<()> {
3273 let config = create_test_node_config();
3274 let node = P2PNode::new(config).await?;
3275
3276 let result = node.health_check().await;
3278 assert!(result.is_ok());
3279
3280 Ok(())
3285 }
3286
3287 #[tokio::test]
3288 async fn test_node_uptime() -> Result<()> {
3289 let config = create_test_node_config();
3290 let node = P2PNode::new(config).await?;
3291
3292 let uptime1 = node.uptime();
3293 assert!(uptime1 >= Duration::from_secs(0));
3294
3295 tokio::time::sleep(Duration::from_millis(10)).await;
3297
3298 let uptime2 = node.uptime();
3299 assert!(uptime2 > uptime1);
3300
3301 Ok(())
3302 }
3303
3304 #[tokio::test]
3305 async fn test_node_config_access() -> Result<()> {
3306 let config = create_test_node_config();
3307 let expected_peer_id = config.peer_id.clone();
3308 let node = P2PNode::new(config).await?;
3309
3310 let node_config = node.config();
3311 assert_eq!(node_config.peer_id, expected_peer_id);
3312 assert_eq!(node_config.max_connections, 100);
3313 Ok(())
3316 }
3317
3318 #[tokio::test]
3319 async fn test_mcp_server_access() -> Result<()> {
3320 let config = create_test_node_config();
3321 let _node = P2PNode::new(config).await?;
3322
3323 Ok(())
3325 }
3326
3327 #[tokio::test]
3328 async fn test_dht_access() -> Result<()> {
3329 let config = create_test_node_config();
3330 let node = P2PNode::new(config).await?;
3331
3332 assert!(node.dht().is_some());
3334
3335 Ok(())
3336 }
3337
3338 #[tokio::test]
3339 async fn test_node_builder() -> Result<()> {
3340 let builder = P2PNode::builder()
3342 .with_peer_id("builder_test_peer".to_string())
3343 .listen_on("/ip4/127.0.0.1/tcp/0")
3344 .listen_on("/ip6/::1/tcp/0")
3345 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3347 .with_connection_timeout(Duration::from_secs(15))
3348 .with_max_connections(200);
3349
3350 let config = builder.config;
3352 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3353 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3356 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3357 assert_eq!(config.max_connections, 200);
3358
3359 Ok(())
3360 }
3361
3362 #[tokio::test]
3363 async fn test_bootstrap_peers() -> Result<()> {
3364 let mut config = create_test_node_config();
3365 config.bootstrap_peers = vec![
3366 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3367 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3368 ];
3369
3370 let node = P2PNode::new(config).await?;
3371
3372 node.start().await?;
3374
3375 let _peer_count = node.peer_count().await;
3379
3380 node.stop().await?;
3381 Ok(())
3382 }
3383
3384 #[tokio::test]
3385 async fn test_production_mode_disabled() -> Result<()> {
3386 let config = create_test_node_config();
3387 let node = P2PNode::new(config).await?;
3388
3389 assert!(!node.is_production_mode());
3390 assert!(node.production_config().is_none());
3391
3392 let result = node.resource_metrics().await;
3394 assert!(result.is_err());
3395 assert!(result.unwrap_err().to_string().contains("not enabled"));
3396
3397 Ok(())
3398 }
3399
3400 #[tokio::test]
3401 async fn test_network_event_variants() {
3402 let peer_id = "test_peer".to_string();
3404 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3405
3406 let _peer_connected = NetworkEvent::PeerConnected {
3407 peer_id: peer_id.clone(),
3408 addresses: vec![address.clone()],
3409 };
3410
3411 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3412 peer_id: peer_id.clone(),
3413 reason: "test disconnect".to_string(),
3414 };
3415
3416 let _message_received = NetworkEvent::MessageReceived {
3417 peer_id: peer_id.clone(),
3418 protocol: "test-protocol".to_string(),
3419 data: vec![1, 2, 3],
3420 };
3421
3422 let _connection_failed = NetworkEvent::ConnectionFailed {
3423 peer_id: Some(peer_id.clone()),
3424 address: address.clone(),
3425 error: "connection refused".to_string(),
3426 };
3427
3428 let _dht_stored = NetworkEvent::DHTRecordStored {
3429 key: vec![1, 2, 3],
3430 value: vec![4, 5, 6],
3431 };
3432
3433 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3434 key: vec![1, 2, 3],
3435 value: Some(vec![4, 5, 6]),
3436 };
3437 }
3438
3439 #[tokio::test]
3440 async fn test_peer_info_structure() {
3441 let peer_info = PeerInfo {
3442 peer_id: "test_peer".to_string(),
3443 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3444 connected_at: Instant::now(),
3445 last_seen: Instant::now(),
3446 status: ConnectionStatus::Connected,
3447 protocols: vec!["test-protocol".to_string()],
3448 heartbeat_count: 0,
3449 };
3450
3451 assert_eq!(peer_info.peer_id, "test_peer");
3452 assert_eq!(peer_info.addresses.len(), 1);
3453 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3454 assert_eq!(peer_info.protocols.len(), 1);
3455 }
3456
3457 #[tokio::test]
3458 async fn test_serialization() -> Result<()> {
3459 let config = create_test_node_config();
3461 let serialized = serde_json::to_string(&config)?;
3462 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3463
3464 assert_eq!(config.peer_id, deserialized.peer_id);
3465 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3466 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3467
3468 Ok(())
3469 }
3470
3471 #[tokio::test]
3472 async fn test_get_peer_id_by_address_found() -> Result<()> {
3473 let config = create_test_node_config();
3474 let node = P2PNode::new(config).await?;
3475
3476 let test_peer_id = "peer_test_123".to_string();
3478 let test_address = "192.168.1.100:9000".to_string();
3479
3480 let peer_info = PeerInfo {
3481 peer_id: test_peer_id.clone(),
3482 addresses: vec![test_address.clone()],
3483 connected_at: Instant::now(),
3484 last_seen: Instant::now(),
3485 status: ConnectionStatus::Connected,
3486 protocols: vec!["test-protocol".to_string()],
3487 heartbeat_count: 0,
3488 };
3489
3490 node.peers
3491 .write()
3492 .await
3493 .insert(test_peer_id.clone(), peer_info);
3494
3495 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3497 assert_eq!(found_peer_id, Some(test_peer_id));
3498
3499 Ok(())
3500 }
3501
3502 #[tokio::test]
3503 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3504 let config = create_test_node_config();
3505 let node = P2PNode::new(config).await?;
3506
3507 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3509 assert_eq!(result, None);
3510
3511 Ok(())
3512 }
3513
3514 #[tokio::test]
3515 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3516 let config = create_test_node_config();
3517 let node = P2PNode::new(config).await?;
3518
3519 let result = node.get_peer_id_by_address("invalid-address").await;
3521 assert_eq!(result, None);
3522
3523 Ok(())
3524 }
3525
3526 #[tokio::test]
3527 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3528 let config = create_test_node_config();
3529 let node = P2PNode::new(config).await?;
3530
3531 let peer1_id = "peer_1".to_string();
3533 let peer1_addr = "192.168.1.101:9001".to_string();
3534
3535 let peer2_id = "peer_2".to_string();
3536 let peer2_addr = "192.168.1.102:9002".to_string();
3537
3538 let peer1_info = PeerInfo {
3539 peer_id: peer1_id.clone(),
3540 addresses: vec![peer1_addr.clone()],
3541 connected_at: Instant::now(),
3542 last_seen: Instant::now(),
3543 status: ConnectionStatus::Connected,
3544 protocols: vec!["test-protocol".to_string()],
3545 heartbeat_count: 0,
3546 };
3547
3548 let peer2_info = PeerInfo {
3549 peer_id: peer2_id.clone(),
3550 addresses: vec![peer2_addr.clone()],
3551 connected_at: Instant::now(),
3552 last_seen: Instant::now(),
3553 status: ConnectionStatus::Connected,
3554 protocols: vec!["test-protocol".to_string()],
3555 heartbeat_count: 0,
3556 };
3557
3558 node.peers
3559 .write()
3560 .await
3561 .insert(peer1_id.clone(), peer1_info);
3562 node.peers
3563 .write()
3564 .await
3565 .insert(peer2_id.clone(), peer2_info);
3566
3567 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3569 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3570
3571 assert_eq!(found_peer1, Some(peer1_id));
3572 assert_eq!(found_peer2, Some(peer2_id));
3573
3574 Ok(())
3575 }
3576
3577 #[tokio::test]
3578 async fn test_list_active_connections_empty() -> Result<()> {
3579 let config = create_test_node_config();
3580 let node = P2PNode::new(config).await?;
3581
3582 let connections = node.list_active_connections().await;
3584 assert!(connections.is_empty());
3585
3586 Ok(())
3587 }
3588
3589 #[tokio::test]
3590 async fn test_list_active_connections_with_peers() -> Result<()> {
3591 let config = create_test_node_config();
3592 let node = P2PNode::new(config).await?;
3593
3594 let peer1_id = "peer_1".to_string();
3596 let peer1_addrs = vec![
3597 "192.168.1.101:9001".to_string(),
3598 "192.168.1.101:9002".to_string(),
3599 ];
3600
3601 let peer2_id = "peer_2".to_string();
3602 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3603
3604 let peer1_info = PeerInfo {
3605 peer_id: peer1_id.clone(),
3606 addresses: peer1_addrs.clone(),
3607 connected_at: Instant::now(),
3608 last_seen: Instant::now(),
3609 status: ConnectionStatus::Connected,
3610 protocols: vec!["test-protocol".to_string()],
3611 heartbeat_count: 0,
3612 };
3613
3614 let peer2_info = PeerInfo {
3615 peer_id: peer2_id.clone(),
3616 addresses: peer2_addrs.clone(),
3617 connected_at: Instant::now(),
3618 last_seen: Instant::now(),
3619 status: ConnectionStatus::Connected,
3620 protocols: vec!["test-protocol".to_string()],
3621 heartbeat_count: 0,
3622 };
3623
3624 node.peers
3625 .write()
3626 .await
3627 .insert(peer1_id.clone(), peer1_info);
3628 node.peers
3629 .write()
3630 .await
3631 .insert(peer2_id.clone(), peer2_info);
3632
3633 node.active_connections
3635 .write()
3636 .await
3637 .insert(peer1_id.clone());
3638 node.active_connections
3639 .write()
3640 .await
3641 .insert(peer2_id.clone());
3642
3643 let connections = node.list_active_connections().await;
3645 assert_eq!(connections.len(), 2);
3646
3647 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3649 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3650
3651 assert!(peer1_conn.is_some());
3652 assert!(peer2_conn.is_some());
3653
3654 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3656 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3657
3658 Ok(())
3659 }
3660
3661 #[tokio::test]
3662 async fn test_remove_peer_success() -> Result<()> {
3663 let config = create_test_node_config();
3664 let node = P2PNode::new(config).await?;
3665
3666 let peer_id = "peer_to_remove".to_string();
3668 let peer_info = PeerInfo {
3669 peer_id: peer_id.clone(),
3670 addresses: vec!["192.168.1.100:9000".to_string()],
3671 connected_at: Instant::now(),
3672 last_seen: Instant::now(),
3673 status: ConnectionStatus::Connected,
3674 protocols: vec!["test-protocol".to_string()],
3675 heartbeat_count: 0,
3676 };
3677
3678 node.peers.write().await.insert(peer_id.clone(), peer_info);
3679
3680 assert!(node.is_peer_connected(&peer_id).await);
3682
3683 let removed = node.remove_peer(&peer_id).await;
3685 assert!(removed);
3686
3687 assert!(!node.is_peer_connected(&peer_id).await);
3689
3690 Ok(())
3691 }
3692
3693 #[tokio::test]
3694 async fn test_remove_peer_nonexistent() -> Result<()> {
3695 let config = create_test_node_config();
3696 let node = P2PNode::new(config).await?;
3697
3698 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3700 assert!(!removed);
3701
3702 Ok(())
3703 }
3704
3705 #[tokio::test]
3706 async fn test_is_peer_connected() -> Result<()> {
3707 let config = create_test_node_config();
3708 let node = P2PNode::new(config).await?;
3709
3710 let peer_id = "test_peer".to_string();
3711
3712 assert!(!node.is_peer_connected(&peer_id).await);
3714
3715 let peer_info = PeerInfo {
3717 peer_id: peer_id.clone(),
3718 addresses: vec!["192.168.1.100:9000".to_string()],
3719 connected_at: Instant::now(),
3720 last_seen: Instant::now(),
3721 status: ConnectionStatus::Connected,
3722 protocols: vec!["test-protocol".to_string()],
3723 heartbeat_count: 0,
3724 };
3725
3726 node.peers.write().await.insert(peer_id.clone(), peer_info);
3727
3728 assert!(node.is_peer_connected(&peer_id).await);
3730
3731 node.remove_peer(&peer_id).await;
3733
3734 assert!(!node.is_peer_connected(&peer_id).await);
3736
3737 Ok(())
3738 }
3739
3740 #[test]
3741 fn test_normalize_ipv6_wildcard() {
3742 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3743
3744 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3745 let normalized = normalize_wildcard_to_loopback(wildcard);
3746
3747 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3748 assert_eq!(normalized.port(), 8080);
3749 }
3750
3751 #[test]
3752 fn test_normalize_ipv4_wildcard() {
3753 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3754
3755 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3756 let normalized = normalize_wildcard_to_loopback(wildcard);
3757
3758 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3759 assert_eq!(normalized.port(), 9000);
3760 }
3761
3762 #[test]
3763 fn test_normalize_specific_address_unchanged() {
3764 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3765 let normalized = normalize_wildcard_to_loopback(specific);
3766
3767 assert_eq!(normalized, specific);
3768 }
3769
3770 #[test]
3771 fn test_normalize_loopback_unchanged() {
3772 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3773
3774 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3775 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3776 assert_eq!(normalized_v6, loopback_v6);
3777
3778 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3779 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3780 assert_eq!(normalized_v4, loopback_v4);
3781 }
3782}