1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, error, info, trace, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub connection_timeout: Duration,
68
69 pub keep_alive_interval: Duration,
71
72 pub max_connections: usize,
74
75 pub max_incoming_connections: usize,
77
78 pub dht_config: DHTConfig,
80
81 pub security_config: SecurityConfig,
83
84 pub production_config: Option<ProductionConfig>,
86
87 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90 pub diversity_config: Option<crate::security::IPDiversityConfig>,
95
96 #[serde(default)]
101 pub attestation_config: crate::attestation::AttestationConfig,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct DHTConfig {
107 pub k_value: usize,
109
110 pub alpha_value: usize,
112
113 pub record_ttl: Duration,
115
116 pub refresh_interval: Duration,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SecurityConfig {
123 pub enable_noise: bool,
125
126 pub enable_tls: bool,
128
129 pub trust_level: TrustLevel,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum TrustLevel {
136 None,
138 Basic,
140 Full,
142}
143
144impl NodeConfig {
145 pub fn new() -> Result<Self> {
151 let config = Config::default();
153
154 let listen_addr = config.listen_socket_addr()?;
156
157 let mut listen_addrs = vec![];
159
160 if config.network.ipv6_enabled {
162 let ipv6_addr = std::net::SocketAddr::new(
163 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
164 listen_addr.port(),
165 );
166 listen_addrs.push(ipv6_addr);
167 }
168
169 let ipv4_addr = std::net::SocketAddr::new(
171 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
172 listen_addr.port(),
173 );
174 listen_addrs.push(ipv4_addr);
175
176 Ok(Self {
177 peer_id: None,
178 listen_addrs,
179 listen_addr,
180 bootstrap_peers: Vec::new(),
181 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
182 enable_ipv6: config.network.ipv6_enabled,
183
184 connection_timeout: Duration::from_secs(config.network.connection_timeout),
185 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
186 max_connections: config.network.max_connections,
187 max_incoming_connections: config.security.connection_limit as usize,
188 dht_config: DHTConfig::default(),
189 security_config: SecurityConfig::default(),
190 production_config: None,
191 bootstrap_cache_config: None,
192 diversity_config: None,
193 attestation_config: config.attestation.clone(),
194 })
195 }
196}
197
198impl Default for NodeConfig {
199 fn default() -> Self {
200 let config = Config::default();
202
203 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
205 std::net::SocketAddr::new(
206 std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
207 9000,
208 )
209 });
210
211 Self {
212 peer_id: None,
213 listen_addrs: vec![
214 std::net::SocketAddr::new(
215 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
216 listen_addr.port(),
217 ),
218 std::net::SocketAddr::new(
219 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
220 listen_addr.port(),
221 ),
222 ],
223 listen_addr,
224 bootstrap_peers: Vec::new(),
225 bootstrap_peers_str: Vec::new(),
226 enable_ipv6: config.network.ipv6_enabled,
227
228 connection_timeout: Duration::from_secs(config.network.connection_timeout),
229 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
230 max_connections: config.network.max_connections,
231 max_incoming_connections: config.security.connection_limit as usize,
232 dht_config: DHTConfig::default(),
233 security_config: SecurityConfig::default(),
234 production_config: None, bootstrap_cache_config: None,
236 diversity_config: None,
237 attestation_config: config.attestation.clone(),
238 }
239 }
240}
241
242impl NodeConfig {
243 pub fn from_config(config: &Config) -> Result<Self> {
245 let listen_addr = config.listen_socket_addr()?;
246 let bootstrap_addrs = config.bootstrap_addrs()?;
247
248 let mut node_config = Self {
249 peer_id: None,
250 listen_addrs: vec![listen_addr],
251 listen_addr,
252 bootstrap_peers: bootstrap_addrs
253 .iter()
254 .map(|addr| addr.socket_addr())
255 .collect(),
256 bootstrap_peers_str: config
257 .network
258 .bootstrap_nodes
259 .iter()
260 .map(|addr| addr.to_string())
261 .collect(),
262 enable_ipv6: config.network.ipv6_enabled,
263
264 connection_timeout: Duration::from_secs(config.network.connection_timeout),
265 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
266 max_connections: config.network.max_connections,
267 max_incoming_connections: config.security.connection_limit as usize,
268 dht_config: DHTConfig {
269 k_value: 20,
270 alpha_value: 3,
271 record_ttl: Duration::from_secs(3600),
272 refresh_interval: Duration::from_secs(900),
273 },
274 security_config: SecurityConfig {
275 enable_noise: true,
276 enable_tls: true,
277 trust_level: TrustLevel::Basic,
278 },
279 production_config: Some(ProductionConfig {
280 max_connections: config.network.max_connections,
281 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
284 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
285 health_check_interval: Duration::from_secs(30),
286 metrics_interval: Duration::from_secs(60),
287 enable_performance_tracking: true,
288 enable_auto_cleanup: true,
289 shutdown_timeout: Duration::from_secs(30),
290 rate_limits: crate::production::RateLimitConfig::default(),
291 }),
292 bootstrap_cache_config: None,
293 diversity_config: None,
294 attestation_config: config.attestation.clone(),
295 };
296
297 if config.network.ipv6_enabled {
299 node_config.listen_addrs.push(std::net::SocketAddr::new(
300 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
301 listen_addr.port(),
302 ));
303 }
304
305 Ok(node_config)
306 }
307
308 pub fn with_listen_addr(addr: &str) -> Result<Self> {
310 let listen_addr: std::net::SocketAddr = addr
311 .parse()
312 .map_err(|e: std::net::AddrParseError| {
313 NetworkError::InvalidAddress(e.to_string().into())
314 })
315 .map_err(P2PError::Network)?;
316 let cfg = NodeConfig {
317 listen_addr,
318 listen_addrs: vec![listen_addr],
319 diversity_config: None,
320 ..Default::default()
321 };
322 Ok(cfg)
323 }
324}
325
326impl Default for DHTConfig {
327 fn default() -> Self {
328 Self {
329 k_value: 20,
330 alpha_value: 5,
331 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
334 }
335}
336
337impl Default for SecurityConfig {
338 fn default() -> Self {
339 Self {
340 enable_noise: true,
341 enable_tls: true,
342 trust_level: TrustLevel::Basic,
343 }
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct PeerInfo {
350 pub peer_id: PeerId,
352
353 pub addresses: Vec<String>,
355
356 pub connected_at: Instant,
358
359 pub last_seen: Instant,
361
362 pub status: ConnectionStatus,
364
365 pub protocols: Vec<String>,
367
368 pub heartbeat_count: u64,
370}
371
372#[derive(Debug, Clone, PartialEq)]
374pub enum ConnectionStatus {
375 Connecting,
377 Connected,
379 Disconnecting,
381 Disconnected,
383 Failed(String),
385}
386
387#[derive(Debug, Clone)]
389pub enum NetworkEvent {
390 PeerConnected {
392 peer_id: PeerId,
394 addresses: Vec<String>,
396 },
397
398 PeerDisconnected {
400 peer_id: PeerId,
402 reason: String,
404 },
405
406 MessageReceived {
408 peer_id: PeerId,
410 protocol: String,
412 data: Vec<u8>,
414 },
415
416 ConnectionFailed {
418 peer_id: Option<PeerId>,
420 address: String,
422 error: String,
424 },
425
426 DHTRecordStored {
428 key: Vec<u8>,
430 value: Vec<u8>,
432 },
433
434 DHTRecordRetrieved {
436 key: Vec<u8>,
438 value: Option<Vec<u8>>,
440 },
441}
442
443#[derive(Debug, Clone)]
448pub enum P2PEvent {
449 Message {
451 topic: String,
453 source: PeerId,
455 data: Vec<u8>,
457 },
458 PeerConnected(PeerId),
460 PeerDisconnected(PeerId),
462}
463
464pub struct P2PNode {
474 config: NodeConfig,
476
477 peer_id: PeerId,
479
480 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
482
483 event_tx: broadcast::Sender<P2PEvent>,
485
486 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
488
489 start_time: Instant,
491
492 running: RwLock<bool>,
494
495 dht: Option<Arc<RwLock<DHT>>>,
497
498 resource_manager: Option<Arc<ResourceManager>>,
500
501 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
503
504 dual_node: Arc<DualStackNetworkNode>,
506
507 #[allow(dead_code)]
509 rate_limiter: Arc<RateLimiter>,
510
511 active_connections: Arc<RwLock<HashSet<PeerId>>>,
514
515 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
517
518 #[allow(dead_code)]
520 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
521
522 #[allow(dead_code)]
524 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
525
526 #[allow(dead_code)]
528 shutdown: Arc<AtomicBool>,
529
530 #[allow(dead_code)]
532 geo_provider: Arc<BgpGeoProvider>,
533
534 entangled_id: Option<crate::attestation::EntangledId>,
537
538 binary_hash: [u8; 32],
541}
542
543fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
559 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
560
561 if addr.ip().is_unspecified() {
562 let loopback_ip = match addr {
564 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
567 std::net::SocketAddr::new(loopback_ip, addr.port())
568 } else {
569 addr
571 }
572}
573
574impl P2PNode {
575 pub fn new_for_tests() -> Result<Self> {
577 let (event_tx, _) = broadcast::channel(16);
578 Ok(Self {
579 config: NodeConfig::default(),
580 peer_id: "test_peer".to_string(),
581 peers: Arc::new(RwLock::new(HashMap::new())),
582 event_tx,
583 listen_addrs: RwLock::new(Vec::new()),
584 start_time: Instant::now(),
585 running: RwLock::new(false),
586 dht: None,
587 resource_manager: None,
588 bootstrap_manager: None,
589 dual_node: {
590 let v6: Option<std::net::SocketAddr> = "[::1]:0"
592 .parse()
593 .ok()
594 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
595 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
596 let handle = tokio::runtime::Handle::current();
597 let dual_attempt = handle.block_on(
598 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
599 );
600 let dual = match dual_attempt {
601 Ok(d) => d,
602 Err(_e1) => {
603 let fallback = handle.block_on(
605 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
606 None,
607 "127.0.0.1:0".parse().ok(),
608 ),
609 );
610 match fallback {
611 Ok(d) => d,
612 Err(e2) => {
613 return Err(P2PError::Network(NetworkError::BindError(
614 format!("Failed to create dual-stack network node: {}", e2)
615 .into(),
616 )));
617 }
618 }
619 }
620 };
621 Arc::new(dual)
622 },
623 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
624 max_requests: 100,
625 burst_size: 100,
626 window: std::time::Duration::from_secs(1),
627 ..Default::default()
628 })),
629 active_connections: Arc::new(RwLock::new(HashSet::new())),
630 connection_monitor_handle: Arc::new(RwLock::new(None)),
631 keepalive_handle: Arc::new(RwLock::new(None)),
632 shutdown: Arc::new(AtomicBool::new(false)),
633 geo_provider: Arc::new(BgpGeoProvider::new()),
634 security_dashboard: None,
635 entangled_id: None,
637 binary_hash: [0u8; 32],
638 })
639 }
640 pub async fn new(config: NodeConfig) -> Result<Self> {
642 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
643 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
645 });
646
647 let (event_tx, _) = broadcast::channel(1000);
648
649 {
652 use blake3::Hasher;
653 let mut hasher = Hasher::new();
654 hasher.update(peer_id.as_bytes());
655 let digest = hasher.finalize();
656 let mut nid = [0u8; 32];
657 nid.copy_from_slice(digest.as_bytes());
658 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
659 crate::identity::node_identity::NodeId::from_bytes(nid),
660 ));
661 }
664
665 let (dht, security_dashboard) = if true {
667 let _dht_config = crate::dht::DHTConfig {
669 replication_factor: config.dht_config.k_value,
670 bucket_size: config.dht_config.k_value,
671 alpha: config.dht_config.alpha_value,
672 record_ttl: config.dht_config.record_ttl,
673 bucket_refresh_interval: config.dht_config.refresh_interval,
674 republish_interval: config.dht_config.refresh_interval,
675 max_distance: 160,
676 };
677 let peer_bytes = peer_id.as_bytes();
679 let mut node_id_bytes = [0u8; 32];
680 let len = peer_bytes.len().min(32);
681 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
682 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
683 let dht_instance = DHT::new(node_id).map_err(|e| {
684 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
685 e.to_string().into(),
686 ))
687 })?;
688 dht_instance.start_maintenance_tasks();
689
690 let security_metrics = dht_instance.security_metrics();
692 let dashboard = crate::dht::metrics::SecurityDashboard::new(
693 security_metrics,
694 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
695 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
696 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
697 );
698
699 (
700 Some(Arc::new(RwLock::new(dht_instance))),
701 Some(Arc::new(dashboard)),
702 )
703 } else {
704 (None, None)
705 };
706
707 let resource_manager = config
711 .production_config
712 .clone()
713 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
714
715 let diversity_config = config.diversity_config.clone().unwrap_or_default();
717 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
718 match BootstrapManager::with_full_config(
719 cache_config.clone(),
720 crate::rate_limit::JoinRateLimiterConfig::default(),
721 diversity_config.clone(),
722 )
723 .await
724 {
725 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
726 Err(e) => {
727 warn!(
728 "Failed to initialize bootstrap manager: {}, continuing without cache",
729 e
730 );
731 None
732 }
733 }
734 } else {
735 match BootstrapManager::with_full_config(
736 crate::bootstrap::CacheConfig::default(),
737 crate::rate_limit::JoinRateLimiterConfig::default(),
738 diversity_config,
739 )
740 .await
741 {
742 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
743 Err(e) => {
744 warn!(
745 "Failed to initialize bootstrap manager: {}, continuing without cache",
746 e
747 );
748 None
749 }
750 }
751 };
752
753 let (v6_opt, v4_opt) = {
756 let port = config.listen_addr.port();
757 let ip = config.listen_addr.ip();
758
759 let v4_addr = if ip.is_ipv4() {
760 Some(std::net::SocketAddr::new(ip, port))
761 } else {
762 Some(std::net::SocketAddr::new(
765 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
766 port,
767 ))
768 };
769
770 let v6_addr = if config.enable_ipv6 {
771 if ip.is_ipv6() {
772 Some(std::net::SocketAddr::new(ip, port))
773 } else {
774 Some(std::net::SocketAddr::new(
775 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
776 port,
777 ))
778 }
779 } else {
780 None
781 };
782 (v6_addr, v4_addr)
783 };
784
785 let dual_node = Arc::new(
786 DualStackNetworkNode::new(v6_opt, v4_opt)
787 .await
788 .map_err(|e| {
789 P2PError::Transport(crate::error::TransportError::SetupFailed(
790 format!("Failed to create dual-stack network nodes: {}", e).into(),
791 ))
792 })?,
793 );
794
795 let rate_limiter = Arc::new(RateLimiter::new(
797 crate::validation::RateLimitConfig::default(),
798 ));
799
800 let active_connections = Arc::new(RwLock::new(HashSet::new()));
802
803 let geo_provider = Arc::new(BgpGeoProvider::new());
805
806 let peers = Arc::new(RwLock::new(HashMap::new()));
808
809 let connection_event_rx = dual_node.subscribe_connection_events();
813
814 let connection_monitor_handle = {
815 let active_conns = Arc::clone(&active_connections);
816 let peers_map = Arc::clone(&peers);
817 let event_tx_clone = event_tx.clone();
818 let dual_node_clone = Arc::clone(&dual_node);
819 let geo_provider_clone = Arc::clone(&geo_provider);
820 let peer_id_clone = peer_id.clone();
821
822 let handle = tokio::spawn(async move {
823 Self::connection_lifecycle_monitor_with_rx(
824 dual_node_clone,
825 connection_event_rx,
826 active_conns,
827 peers_map,
828 event_tx_clone,
829 geo_provider_clone,
830 peer_id_clone,
831 )
832 .await;
833 });
834
835 Arc::new(RwLock::new(Some(handle)))
836 };
837
838 let shutdown = Arc::new(AtomicBool::new(false));
840 let keepalive_handle = {
841 let active_conns = Arc::clone(&active_connections);
842 let dual_node_clone = Arc::clone(&dual_node);
843 let shutdown_clone = Arc::clone(&shutdown);
844
845 let handle = tokio::spawn(async move {
846 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
847 });
848
849 Arc::new(RwLock::new(Some(handle)))
850 };
851
852 let binary_hash = Self::compute_binary_hash();
855
856 let node = Self {
857 config,
858 peer_id,
859 peers,
860 event_tx,
861 listen_addrs: RwLock::new(Vec::new()),
862 start_time: Instant::now(),
863 running: RwLock::new(false),
864 dht,
865 resource_manager,
866 bootstrap_manager,
867 dual_node,
868 rate_limiter,
869 active_connections,
870 security_dashboard,
871 connection_monitor_handle,
872 keepalive_handle,
873 shutdown,
874 geo_provider,
875 entangled_id: None,
877 binary_hash,
878 };
879 info!("Created P2P node with peer ID: {}", node.peer_id);
880
881 node.start_network_listeners().await?;
883
884 node.start_connection_monitor().await;
886
887 Ok(node)
888 }
889
890 pub fn builder() -> NodeBuilder {
892 NodeBuilder::new()
893 }
894
895 pub fn peer_id(&self) -> &PeerId {
897 &self.peer_id
898 }
899
900 pub fn local_addr(&self) -> Option<String> {
901 self.listen_addrs
902 .try_read()
903 .ok()
904 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
905 }
906
907 pub async fn subscribe(&self, topic: &str) -> Result<()> {
908 info!("Subscribed to topic: {}", topic);
911 Ok(())
912 }
913
914 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
915 info!(
916 "Publishing message to topic: {} ({} bytes)",
917 topic,
918 data.len()
919 );
920
921 let peer_list: Vec<PeerId> = {
923 let peers_guard = self.peers.read().await;
924 peers_guard.keys().cloned().collect()
925 };
926
927 if peer_list.is_empty() {
928 debug!("No peers connected, message will only be sent to local subscribers");
929 } else {
930 let mut send_count = 0;
932 for peer_id in &peer_list {
933 match self.send_message(peer_id, topic, data.to_vec()).await {
934 Ok(_) => {
935 send_count += 1;
936 debug!("Sent message to peer: {}", peer_id);
937 }
938 Err(e) => {
939 warn!("Failed to send message to peer {}: {}", peer_id, e);
940 }
941 }
942 }
943 info!(
944 "Published message to {}/{} connected peers",
945 send_count,
946 peer_list.len()
947 );
948 }
949
950 let event = P2PEvent::Message {
952 topic: topic.to_string(),
953 source: self.peer_id.clone(),
954 data: data.to_vec(),
955 };
956 let _ = self.event_tx.send(event);
957
958 Ok(())
959 }
960
961 pub fn config(&self) -> &NodeConfig {
963 &self.config
964 }
965
966 pub async fn start(&self) -> Result<()> {
968 info!("Starting P2P node...");
969
970 if let Some(ref resource_manager) = self.resource_manager {
972 resource_manager.start().await.map_err(|e| {
973 P2PError::Network(crate::error::NetworkError::ProtocolError(
974 format!("Failed to start resource manager: {e}").into(),
975 ))
976 })?;
977 info!("Production resource manager started");
978 }
979
980 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
982 let mut manager = bootstrap_manager.write().await;
983 manager.start_background_tasks().await.map_err(|e| {
984 P2PError::Network(crate::error::NetworkError::ProtocolError(
985 format!("Failed to start bootstrap manager: {e}").into(),
986 ))
987 })?;
988 info!("Bootstrap cache manager started");
989 }
990
991 *self.running.write().await = true;
993
994 self.start_network_listeners().await?;
996
997 let listen_addrs = self.listen_addrs.read().await;
999 info!("P2P node started on addresses: {:?}", *listen_addrs);
1000
1001 self.start_message_receiving_system().await?;
1005
1006 self.connect_bootstrap_peers().await?;
1008
1009 Ok(())
1010 }
1011
1012 async fn start_network_listeners(&self) -> Result<()> {
1014 info!("Starting dual-stack listeners (ant-quic)...");
1015 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1017 P2PError::Transport(crate::error::TransportError::SetupFailed(
1018 format!("Failed to get local addresses: {}", e).into(),
1019 ))
1020 })?;
1021 {
1022 let mut la = self.listen_addrs.write().await;
1023 *la = addrs.clone();
1024 }
1025
1026 let event_tx = self.event_tx.clone();
1028 let peers = self.peers.clone();
1029 let active_connections = self.active_connections.clone();
1030 let rate_limiter = self.rate_limiter.clone();
1031 let dual = self.dual_node.clone();
1032 tokio::spawn(async move {
1033 loop {
1034 match dual.accept_any().await {
1035 Ok((ant_peer_id, remote_sock)) => {
1036 let peer_id =
1037 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1038 let remote_addr = NetworkAddress::from(remote_sock);
1039 let _ = rate_limiter.check_ip(&remote_sock.ip());
1041 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1042 register_new_peer(&peers, &peer_id, &remote_addr).await;
1043 active_connections.write().await.insert(peer_id);
1044 }
1045 Err(e) => {
1046 warn!("Accept failed: {}", e);
1047 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1048 }
1049 }
1050 }
1051 });
1052
1053 info!("Dual-stack listeners active on: {:?}", addrs);
1054 Ok(())
1055 }
1056
1057 #[allow(dead_code)]
1059 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1060 warn!("QUIC transport temporarily disabled during ant-quic migration");
1099 Err(crate::P2PError::Transport(
1101 crate::error::TransportError::SetupFailed(
1102 format!(
1103 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1104 )
1105 .into(),
1106 ),
1107 ))
1108 }
1109
1110 #[allow(dead_code)] async fn start_connection_acceptor(
1113 &self,
1114 transport: Arc<dyn crate::transport::Transport>,
1115 addr: std::net::SocketAddr,
1116 transport_type: crate::transport::TransportType,
1117 ) -> Result<()> {
1118 info!(
1119 "Starting connection acceptor for {:?} on {}",
1120 transport_type, addr
1121 );
1122
1123 let event_tx = self.event_tx.clone();
1125 let _peer_id = self.peer_id.clone();
1126 let peers = Arc::clone(&self.peers);
1127 let rate_limiter = Arc::clone(&self.rate_limiter);
1130
1131 tokio::spawn(async move {
1133 loop {
1134 match transport.accept().await {
1135 Ok(connection) => {
1136 let remote_addr = connection.remote_addr();
1137 let connection_peer_id =
1138 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1139
1140 let socket_addr = remote_addr.socket_addr();
1142 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1143 continue;
1145 }
1146
1147 info!(
1148 "Accepted {:?} connection from {} (peer: {})",
1149 transport_type, remote_addr, connection_peer_id
1150 );
1151
1152 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1154
1155 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1157
1158 spawn_connection_handler(
1160 connection,
1161 connection_peer_id,
1162 event_tx.clone(),
1163 Arc::clone(&peers),
1164 );
1165 }
1166 Err(e) => {
1167 warn!(
1168 "Failed to accept {:?} connection on {}: {}",
1169 transport_type, addr, e
1170 );
1171
1172 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1174 }
1175 }
1176 }
1177 });
1178
1179 info!(
1180 "Connection acceptor background task started for {:?} on {}",
1181 transport_type, addr
1182 );
1183 Ok(())
1184 }
1185
1186 async fn start_message_receiving_system(&self) -> Result<()> {
1188 info!("Starting message receiving system");
1189 let dual = self.dual_node.clone();
1190 let event_tx = self.event_tx.clone();
1191
1192 tokio::spawn(async move {
1193 loop {
1194 match dual.receive_any().await {
1195 Ok((_peer_id, bytes)) => {
1196 #[allow(clippy::collapsible_if)]
1198 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1199 if let (Some(protocol), Some(data), Some(from)) = (
1200 value.get("protocol").and_then(|v| v.as_str()),
1201 value.get("data").and_then(|v| v.as_array()),
1202 value.get("from").and_then(|v| v.as_str()),
1203 ) {
1204 let payload: Vec<u8> = data
1205 .iter()
1206 .filter_map(|v| v.as_u64().map(|n| n as u8))
1207 .collect();
1208 let _ = event_tx.send(P2PEvent::Message {
1209 topic: protocol.to_string(),
1210 source: from.to_string(),
1211 data: payload,
1212 });
1213 }
1214 }
1215 }
1216 Err(e) => {
1217 warn!("Receive error: {}", e);
1218 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1219 }
1220 }
1221 }
1222 });
1223
1224 Ok(())
1225 }
1226
1227 #[allow(dead_code)]
1229 async fn handle_received_message(
1230 &self,
1231 message_data: Vec<u8>,
1232 peer_id: &PeerId,
1233 _protocol: &str,
1234 event_tx: &broadcast::Sender<P2PEvent>,
1235 ) -> Result<()> {
1236 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1240 Ok(message) => {
1241 if let (Some(protocol), Some(data), Some(from)) = (
1242 message.get("protocol").and_then(|v| v.as_str()),
1243 message.get("data").and_then(|v| v.as_array()),
1244 message.get("from").and_then(|v| v.as_str()),
1245 ) {
1246 let data_bytes: Vec<u8> = data
1248 .iter()
1249 .filter_map(|v| v.as_u64().map(|n| n as u8))
1250 .collect();
1251
1252 let event = P2PEvent::Message {
1254 topic: protocol.to_string(),
1255 source: from.to_string(),
1256 data: data_bytes,
1257 };
1258
1259 let _ = event_tx.send(event);
1260 debug!("Generated message event from peer: {}", peer_id);
1261 }
1262 }
1263 Err(e) => {
1264 warn!("Failed to parse received message from {}: {}", peer_id, e);
1265 }
1266 }
1267
1268 Ok(())
1269 }
1270
1271 pub async fn run(&self) -> Result<()> {
1277 if !*self.running.read().await {
1278 self.start().await?;
1279 }
1280
1281 info!("P2P node running...");
1282
1283 loop {
1285 if !*self.running.read().await {
1286 break;
1287 }
1288
1289 self.periodic_tasks().await?;
1291
1292 tokio::time::sleep(Duration::from_millis(100)).await;
1294 }
1295
1296 info!("P2P node stopped");
1297 Ok(())
1298 }
1299
1300 pub async fn stop(&self) -> Result<()> {
1302 info!("Stopping P2P node...");
1303
1304 *self.running.write().await = false;
1306
1307 self.disconnect_all_peers().await?;
1309
1310 if let Some(ref resource_manager) = self.resource_manager {
1312 resource_manager.shutdown().await.map_err(|e| {
1313 P2PError::Network(crate::error::NetworkError::ProtocolError(
1314 format!("Failed to shutdown resource manager: {e}").into(),
1315 ))
1316 })?;
1317 info!("Production resource manager stopped");
1318 }
1319
1320 info!("P2P node stopped");
1321 Ok(())
1322 }
1323
1324 pub async fn shutdown(&self) -> Result<()> {
1326 self.stop().await
1327 }
1328
1329 pub async fn is_running(&self) -> bool {
1331 *self.running.read().await
1332 }
1333
1334 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1336 self.listen_addrs.read().await.clone()
1337 }
1338
1339 pub async fn connected_peers(&self) -> Vec<PeerId> {
1341 self.active_connections
1344 .read()
1345 .await
1346 .iter()
1347 .cloned()
1348 .collect()
1349 }
1350
1351 pub async fn peer_count(&self) -> usize {
1353 self.active_connections.read().await.len()
1354 }
1355
1356 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1358 self.peers.read().await.get(peer_id).cloned()
1359 }
1360
1361 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1373 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1375
1376 let peers = self.peers.read().await;
1377
1378 for (peer_id, peer_info) in peers.iter() {
1380 for peer_addr in &peer_info.addresses {
1382 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1383 && peer_socket == socket_addr
1384 {
1385 return Some(peer_id.clone());
1386 }
1387 }
1388 }
1389
1390 None
1391 }
1392
1393 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1399 let active = self.active_connections.read().await;
1400 let peers = self.peers.read().await;
1401
1402 active
1403 .iter()
1404 .map(|peer_id| {
1405 let addresses = peers
1406 .get(peer_id)
1407 .map(|info| info.addresses.clone())
1408 .unwrap_or_default();
1409 (peer_id.clone(), addresses)
1410 })
1411 .collect()
1412 }
1413
1414 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1426 self.active_connections.write().await.remove(peer_id);
1428 self.peers.write().await.remove(peer_id).is_some()
1430 }
1431
1432 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1445 self.peers.read().await.contains_key(peer_id)
1446 }
1447
1448 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1450 info!("Connecting to peer at: {}", address);
1451
1452 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1454 Some(resource_manager.acquire_connection().await?)
1455 } else {
1456 None
1457 };
1458
1459 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1461 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1462 format!("{}: {}", address, e).into(),
1463 ))
1464 })?;
1465
1466 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1469 if normalized_addr != socket_addr {
1470 info!(
1471 "Normalized wildcard address {} to loopback {}",
1472 socket_addr, normalized_addr
1473 );
1474 }
1475
1476 let addr_list = vec![normalized_addr];
1478 let peer_id = match tokio::time::timeout(
1479 self.config.connection_timeout,
1480 self.dual_node.connect_happy_eyeballs(&addr_list),
1481 )
1482 .await
1483 {
1484 Ok(Ok(peer)) => {
1485 let connected_peer_id =
1486 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1487 info!("Successfully connected to peer: {}", connected_peer_id);
1488 connected_peer_id
1489 }
1490 Ok(Err(e)) => {
1491 warn!("Failed to connect to peer at {}: {}", address, e);
1492 return Err(P2PError::Transport(
1493 crate::error::TransportError::ConnectionFailed {
1494 addr: normalized_addr,
1495 reason: e.to_string().into(),
1496 },
1497 ));
1498 }
1499 Err(_) => {
1500 warn!(
1501 "Timed out connecting to peer at {} after {:?}",
1502 address, self.config.connection_timeout
1503 );
1504 return Err(P2PError::Timeout(self.config.connection_timeout));
1505 }
1506 };
1507
1508 let peer_info = PeerInfo {
1510 peer_id: peer_id.clone(),
1511 addresses: vec![address.to_string()],
1512 connected_at: Instant::now(),
1513 last_seen: Instant::now(),
1514 status: ConnectionStatus::Connected,
1515 protocols: vec!["p2p-foundation/1.0".to_string()],
1516 heartbeat_count: 0,
1517 };
1518
1519 self.peers.write().await.insert(peer_id.clone(), peer_info);
1521
1522 self.active_connections
1525 .write()
1526 .await
1527 .insert(peer_id.clone());
1528
1529 if let Some(ref resource_manager) = self.resource_manager {
1531 resource_manager.record_bandwidth(0, 0); }
1533
1534 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1536
1537 info!("Connected to peer: {}", peer_id);
1538 Ok(peer_id)
1539 }
1540
1541 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1543 info!("Disconnecting from peer: {}", peer_id);
1544
1545 self.active_connections.write().await.remove(peer_id);
1547
1548 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1549 peer_info.status = ConnectionStatus::Disconnected;
1550
1551 let _ = self
1553 .event_tx
1554 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1555
1556 info!("Disconnected from peer: {}", peer_id);
1557 }
1558
1559 Ok(())
1560 }
1561
1562 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1564 self.active_connections.read().await.contains(peer_id)
1565 }
1566
1567 pub async fn send_message(
1569 &self,
1570 peer_id: &PeerId,
1571 protocol: &str,
1572 data: Vec<u8>,
1573 ) -> Result<()> {
1574 debug!(
1575 "Sending message to peer {} on protocol {}",
1576 peer_id, protocol
1577 );
1578
1579 if let Some(ref resource_manager) = self.resource_manager
1581 && !resource_manager
1582 .check_rate_limit(peer_id, "message")
1583 .await?
1584 {
1585 return Err(P2PError::ResourceExhausted(
1586 format!("Rate limit exceeded for peer {}", peer_id).into(),
1587 ));
1588 }
1589
1590 if !self.peers.read().await.contains_key(peer_id) {
1592 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1593 peer_id.to_string().into(),
1594 )));
1595 }
1596
1597 if !self.is_connection_active(peer_id).await {
1600 debug!(
1601 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1602 peer_id
1603 );
1604
1605 self.remove_peer(peer_id).await;
1607
1608 return Err(P2PError::Network(
1609 crate::error::NetworkError::ConnectionClosed {
1610 peer_id: peer_id.to_string().into(),
1611 },
1612 ));
1613 }
1614
1615 if let Some(ref resource_manager) = self.resource_manager {
1619 resource_manager.record_bandwidth(data.len() as u64, 0);
1620 }
1621
1622 let _message_data = self.create_protocol_message(protocol, data)?;
1624
1625 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1627 tokio::time::timeout(self.config.connection_timeout, send_fut)
1628 .await
1629 .map_err(|_| {
1630 P2PError::Transport(crate::error::TransportError::StreamError(
1631 "Timed out sending message".into(),
1632 ))
1633 })?
1634 .map_err(|e| {
1635 P2PError::Transport(crate::error::TransportError::StreamError(
1636 e.to_string().into(),
1637 ))
1638 })
1639 }
1640
1641 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1643 use serde_json::json;
1644
1645 let timestamp = std::time::SystemTime::now()
1646 .duration_since(std::time::UNIX_EPOCH)
1647 .map_err(|e| {
1648 P2PError::Network(NetworkError::ProtocolError(
1649 format!("System time error: {}", e).into(),
1650 ))
1651 })?
1652 .as_secs();
1653
1654 let message = json!({
1656 "protocol": protocol,
1657 "data": data,
1658 "from": self.peer_id,
1659 "timestamp": timestamp
1660 });
1661
1662 serde_json::to_vec(&message).map_err(|e| {
1663 P2PError::Transport(crate::error::TransportError::StreamError(
1664 format!("Failed to serialize message: {e}").into(),
1665 ))
1666 })
1667 }
1668
1669 }
1671
1672#[allow(dead_code)]
1674fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1675 use serde_json::json;
1676
1677 let timestamp = std::time::SystemTime::now()
1678 .duration_since(std::time::UNIX_EPOCH)
1679 .map_err(|e| {
1680 P2PError::Network(NetworkError::ProtocolError(
1681 format!("System time error: {}", e).into(),
1682 ))
1683 })?
1684 .as_secs();
1685
1686 let message = json!({
1688 "protocol": protocol,
1689 "data": data,
1690 "timestamp": timestamp
1691 });
1692
1693 serde_json::to_vec(&message).map_err(|e| {
1694 P2PError::Transport(crate::error::TransportError::StreamError(
1695 format!("Failed to serialize message: {e}").into(),
1696 ))
1697 })
1698}
1699
1700impl P2PNode {
1701 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1703 self.event_tx.subscribe()
1704 }
1705
1706 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1708 self.subscribe_events()
1709 }
1710
1711 pub fn uptime(&self) -> Duration {
1713 self.start_time.elapsed()
1714 }
1715
1716 fn compute_binary_hash() -> [u8; 32] {
1725 if let Some(hash) = std::env::current_exe()
1727 .ok()
1728 .and_then(|exe_path| std::fs::read(&exe_path).ok())
1729 .map(|binary_data| blake3::hash(&binary_data))
1730 {
1731 return *hash.as_bytes();
1732 }
1733 let placeholder = format!(
1736 "saorsa-core-v{}-{}",
1737 env!("CARGO_PKG_VERSION"),
1738 std::env::consts::ARCH
1739 );
1740 let hash = blake3::hash(placeholder.as_bytes());
1741 *hash.as_bytes()
1742 }
1743
1744 #[must_use]
1746 pub fn binary_hash(&self) -> &[u8; 32] {
1747 &self.binary_hash
1748 }
1749
1750 #[must_use]
1752 pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1753 self.entangled_id.as_ref()
1754 }
1755
1756 pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1761 self.entangled_id = Some(entangled_id);
1762 }
1763
1764 pub fn verify_peer_attestation(
1793 &self,
1794 peer_id: &str,
1795 peer_entangled_id: &crate::attestation::EntangledId,
1796 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1797 ) -> crate::attestation::EnforcementDecision {
1798 use crate::attestation::{
1799 AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1800 };
1801
1802 let config = &self.config.attestation_config;
1803
1804 if !config.enabled {
1806 return EnforcementDecision::Skipped;
1807 }
1808
1809 let id_valid = peer_entangled_id.verify(peer_public_key);
1811
1812 let binary_hash = *peer_entangled_id.binary_hash();
1814 let binary_allowed = config.is_binary_allowed(&binary_hash);
1815
1816 match config.enforcement_mode {
1817 EnforcementMode::Off => EnforcementDecision::Skipped,
1818
1819 EnforcementMode::Soft => {
1820 if !id_valid {
1822 warn!(
1823 peer = %peer_id,
1824 binary_hash = %hex::encode(&binary_hash[..8]),
1825 "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1826 );
1827 return EnforcementDecision::AllowWithWarning {
1828 reason: AttestationRejectionReason::IdentityMismatch,
1829 };
1830 }
1831 if !binary_allowed {
1832 warn!(
1833 peer = %peer_id,
1834 binary_hash = %hex::encode(binary_hash),
1835 "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1836 );
1837 return EnforcementDecision::AllowWithWarning {
1838 reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1839 };
1840 }
1841 EnforcementDecision::Allow
1842 }
1843
1844 EnforcementMode::Hard => {
1845 if !id_valid {
1847 error!(
1848 peer = %peer_id,
1849 binary_hash = %hex::encode(&binary_hash[..8]),
1850 "REJECTING peer: Invalid entangled ID derivation"
1851 );
1852 return EnforcementDecision::Reject {
1853 rejection: AttestationRejection::identity_mismatch(),
1854 };
1855 }
1856 if !binary_allowed {
1857 error!(
1858 peer = %peer_id,
1859 binary_hash = %hex::encode(binary_hash),
1860 "REJECTING peer: Binary not in allowlist"
1861 );
1862 return EnforcementDecision::Reject {
1863 rejection: AttestationRejection::binary_not_allowed(binary_hash),
1864 };
1865 }
1866
1867 info!(
1868 peer = %peer_id,
1869 entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
1870 "Peer attestation verified successfully (hard mode)"
1871 );
1872 EnforcementDecision::Allow
1873 }
1874 }
1875 }
1876
1877 #[must_use]
1885 pub fn verify_peer_attestation_simple(
1886 &self,
1887 peer_id: &str,
1888 peer_entangled_id: &crate::attestation::EntangledId,
1889 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1890 ) -> bool {
1891 self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
1892 .should_allow()
1893 }
1894
1895 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1905 if let Some(ref resource_manager) = self.resource_manager {
1906 Ok(resource_manager.get_metrics().await)
1907 } else {
1908 Err(P2PError::Network(
1909 crate::error::NetworkError::ProtocolError(
1910 "Production resource manager not enabled".to_string().into(),
1911 ),
1912 ))
1913 }
1914 }
1915
1916 #[allow(clippy::too_many_arguments)]
1922 async fn connection_lifecycle_monitor_with_rx(
1923 _dual_node: Arc<DualStackNetworkNode>,
1924 mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
1925 active_connections: Arc<RwLock<HashSet<String>>>,
1926 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
1927 event_tx: broadcast::Sender<P2PEvent>,
1928 geo_provider: Arc<BgpGeoProvider>,
1929 _local_peer_id: String,
1930 ) {
1931 use crate::transport::ant_quic_adapter::ConnectionEvent;
1932
1933 info!("Connection lifecycle monitor started (pre-subscribed receiver)");
1934
1935 loop {
1936 match event_rx.recv().await {
1937 Ok(event) => {
1938 match event {
1939 ConnectionEvent::Established {
1940 peer_id,
1941 remote_address,
1942 } => {
1943 let peer_id_str =
1944 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1945 debug!(
1946 "Connection established: peer={}, addr={}",
1947 peer_id_str, remote_address
1948 );
1949
1950 let ip = remote_address.ip();
1952 let is_rejected = match ip {
1953 std::net::IpAddr::V4(v4) => {
1954 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
1955 geo_provider.is_hosting_asn(asn)
1956 || geo_provider.is_vpn_asn(asn)
1957 } else {
1958 false
1959 }
1960 }
1961 std::net::IpAddr::V6(v6) => {
1962 let info = geo_provider.lookup(v6);
1963 info.is_hosting_provider || info.is_vpn_provider
1964 }
1965 };
1966
1967 if is_rejected {
1968 info!(
1969 "Rejecting connection from {} ({}) due to GeoIP policy",
1970 peer_id_str, remote_address
1971 );
1972 continue;
1973 }
1974
1975 active_connections.write().await.insert(peer_id_str.clone());
1977
1978 let mut peers_lock = peers.write().await;
1980 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
1981 peer_info.status = ConnectionStatus::Connected;
1982 peer_info.connected_at = Instant::now();
1983 } else {
1984 debug!("Registering new incoming peer: {}", peer_id_str);
1985 peers_lock.insert(
1986 peer_id_str.clone(),
1987 PeerInfo {
1988 peer_id: peer_id_str.clone(),
1989 addresses: vec![remote_address.to_string()],
1990 status: ConnectionStatus::Connected,
1991 last_seen: Instant::now(),
1992 connected_at: Instant::now(),
1993 protocols: Vec::new(),
1994 heartbeat_count: 0,
1995 },
1996 );
1997 }
1998
1999 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2001 }
2002 ConnectionEvent::Lost { peer_id, reason } => {
2003 let peer_id_str =
2004 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2005 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2006
2007 active_connections.write().await.remove(&peer_id_str);
2009
2010 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2012 peer_info.status = ConnectionStatus::Disconnected;
2013 peer_info.last_seen = Instant::now();
2014 }
2015
2016 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2018 }
2019 ConnectionEvent::Failed { peer_id, reason } => {
2020 let peer_id_str =
2021 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2022 debug!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2023
2024 active_connections.write().await.remove(&peer_id_str);
2026
2027 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2029 peer_info.status = ConnectionStatus::Disconnected;
2030 peer_info.last_seen = Instant::now();
2031 }
2032
2033 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2035 }
2036 }
2037 }
2038 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2039 warn!("Connection event receiver lagged, skipped {} events", skipped);
2040 }
2041 Err(broadcast::error::RecvError::Closed) => {
2042 info!("Connection event channel closed, stopping lifecycle monitor");
2043 break;
2044 }
2045 }
2046 }
2047 }
2048
2049 #[allow(dead_code)]
2054 async fn connection_lifecycle_monitor(
2055 dual_node: Arc<DualStackNetworkNode>,
2056 active_connections: Arc<RwLock<HashSet<String>>>,
2057 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2058 event_tx: broadcast::Sender<P2PEvent>,
2059 geo_provider: Arc<BgpGeoProvider>,
2060 local_peer_id: String,
2061 ) {
2062 use crate::transport::ant_quic_adapter::ConnectionEvent;
2063
2064 let mut event_rx = dual_node.subscribe_connection_events();
2065
2066 info!("Connection lifecycle monitor started");
2067
2068 loop {
2069 match event_rx.recv().await {
2070 Ok(event) => {
2071 match event {
2072 ConnectionEvent::Established {
2073 peer_id,
2074 remote_address,
2075 } => {
2076 let peer_id_str =
2077 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2078 debug!(
2079 "Connection established: peer={}, addr={}",
2080 peer_id_str, remote_address
2081 );
2082
2083 let ip = remote_address.ip();
2086 let is_rejected = match ip {
2087 std::net::IpAddr::V4(v4) => {
2088 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2090 geo_provider.is_hosting_asn(asn)
2091 || geo_provider.is_vpn_asn(asn)
2092 } else {
2093 false
2094 }
2095 }
2096 std::net::IpAddr::V6(v6) => {
2097 let info = geo_provider.lookup(v6);
2098 info.is_hosting_provider || info.is_vpn_provider
2099 }
2100 };
2101
2102 if is_rejected {
2103 info!(
2104 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
2105 peer_id_str, remote_address
2106 );
2107
2108 let rejection = RejectionMessage {
2110 reason: RejectionReason::GeoIpPolicy,
2111 message:
2112 "Connection rejected: Hosting/VPN providers not allowed"
2113 .to_string(),
2114 suggested_target: None, };
2116
2117 if let Ok(data) = serde_json::to_vec(&rejection) {
2119 let timestamp = std::time::SystemTime::now()
2121 .duration_since(std::time::UNIX_EPOCH)
2122 .unwrap_or_default()
2123 .as_secs();
2124
2125 let message = serde_json::json!({
2126 "protocol": "control",
2127 "data": data,
2128 "from": local_peer_id,
2129 "timestamp": timestamp
2130 });
2131
2132 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
2133 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
2137
2138 tokio::task::yield_now().await;
2141 }
2142 }
2143
2144 continue;
2148 }
2149
2150 active_connections.write().await.insert(peer_id_str.clone());
2152
2153 let mut peers_lock = peers.write().await;
2155 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2156 peer_info.status = ConnectionStatus::Connected;
2157 peer_info.connected_at = Instant::now();
2158 } else {
2159 debug!("Registering new incoming peer: {}", peer_id_str);
2161 peers_lock.insert(
2162 peer_id_str.clone(),
2163 PeerInfo {
2164 peer_id: peer_id_str.clone(),
2165 addresses: vec![remote_address.to_string()],
2166 status: ConnectionStatus::Connected,
2167 last_seen: Instant::now(),
2168 connected_at: Instant::now(),
2169 protocols: Vec::new(),
2170 heartbeat_count: 0,
2171 },
2172 );
2173 }
2174
2175 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2177 }
2178 ConnectionEvent::Lost { peer_id, reason } => {
2179 let peer_id_str =
2180 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2181 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2182
2183 active_connections.write().await.remove(&peer_id_str);
2185
2186 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2188 peer_info.status = ConnectionStatus::Disconnected;
2189 peer_info.last_seen = Instant::now();
2190 }
2191
2192 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2194 }
2195 ConnectionEvent::Failed { peer_id, reason } => {
2196 let peer_id_str =
2197 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2198 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2199
2200 active_connections.write().await.remove(&peer_id_str);
2202
2203 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2205 peer_info.status = ConnectionStatus::Failed(reason.clone());
2206 }
2207
2208 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2210 }
2211 }
2212 }
2213 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2214 warn!(
2215 "Connection event monitor lagged, skipped {} events",
2216 skipped
2217 );
2218 continue;
2219 }
2220 Err(broadcast::error::RecvError::Closed) => {
2221 info!("Connection event channel closed, stopping monitor");
2222 break;
2223 }
2224 }
2225 }
2226
2227 info!("Connection lifecycle monitor stopped");
2228 }
2229
2230 async fn start_connection_monitor(&self) {
2232 debug!("Connection monitor already running from initialization");
2236 }
2237
2238 async fn keepalive_task(
2244 active_connections: Arc<RwLock<HashSet<String>>>,
2245 dual_node: Arc<DualStackNetworkNode>,
2246 shutdown: Arc<AtomicBool>,
2247 ) {
2248 use tokio::time::{Duration, interval};
2249
2250 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2254 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2255
2256 info!(
2257 "Keepalive task started (interval: {}s)",
2258 KEEPALIVE_INTERVAL_SECS
2259 );
2260
2261 loop {
2262 if shutdown.load(Ordering::Relaxed) {
2264 info!("Keepalive task shutting down");
2265 break;
2266 }
2267
2268 interval.tick().await;
2269
2270 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2272
2273 if peers.is_empty() {
2274 trace!("Keepalive: no active connections");
2275 continue;
2276 }
2277
2278 debug!("Sending keepalive to {} active connections", peers.len());
2279
2280 for peer_id in peers {
2282 match dual_node
2283 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
2284 .await
2285 {
2286 Ok(_) => {
2287 trace!("Keepalive sent to peer: {}", peer_id);
2288 }
2289 Err(e) => {
2290 debug!(
2291 "Failed to send keepalive to peer {}: {} (connection may have closed)",
2292 peer_id, e
2293 );
2294 }
2296 }
2297 }
2298 }
2299
2300 info!("Keepalive task stopped");
2301 }
2302
2303 pub async fn health_check(&self) -> Result<()> {
2305 if let Some(ref resource_manager) = self.resource_manager {
2306 resource_manager.health_check().await
2307 } else {
2308 let peer_count = self.peer_count().await;
2310 if peer_count > self.config.max_connections {
2311 Err(P2PError::Network(
2312 crate::error::NetworkError::ProtocolError(
2313 format!("Too many connections: {peer_count}").into(),
2314 ),
2315 ))
2316 } else {
2317 Ok(())
2318 }
2319 }
2320 }
2321
2322 pub fn production_config(&self) -> Option<&ProductionConfig> {
2324 self.config.production_config.as_ref()
2325 }
2326
2327 pub fn is_production_mode(&self) -> bool {
2329 self.resource_manager.is_some()
2330 }
2331
2332 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2334 self.dht.as_ref()
2335 }
2336
2337 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2339 if let Some(ref dht) = self.dht {
2340 let mut dht_instance = dht.write().await;
2341 let dht_key = crate::dht::DhtKey::from_bytes(key);
2342 dht_instance
2343 .store(&dht_key, value.clone())
2344 .await
2345 .map_err(|e| {
2346 P2PError::Dht(crate::error::DhtError::StoreFailed(
2347 format!("{:?}: {e}", key).into(),
2348 ))
2349 })?;
2350
2351 Ok(())
2352 } else {
2353 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2354 "DHT not enabled".to_string().into(),
2355 )))
2356 }
2357 }
2358
2359 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2361 if let Some(ref dht) = self.dht {
2362 let dht_instance = dht.read().await;
2363 let dht_key = crate::dht::DhtKey::from_bytes(key);
2364 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2365 P2PError::Dht(crate::error::DhtError::StoreFailed(
2366 format!("Retrieve failed: {e}").into(),
2367 ))
2368 })?;
2369
2370 Ok(record_result)
2371 } else {
2372 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2373 "DHT not enabled".to_string().into(),
2374 )))
2375 }
2376 }
2377
2378 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2380 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2381 let mut manager = bootstrap_manager.write().await;
2382 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2383 .iter()
2384 .filter_map(|addr| addr.parse().ok())
2385 .collect();
2386 let contact = ContactEntry::new(peer_id, socket_addresses);
2387 manager.add_contact(contact).await.map_err(|e| {
2388 P2PError::Network(crate::error::NetworkError::ProtocolError(
2389 format!("Failed to add peer to bootstrap cache: {e}").into(),
2390 ))
2391 })?;
2392 }
2393 Ok(())
2394 }
2395
2396 pub async fn update_peer_metrics(
2398 &self,
2399 peer_id: &PeerId,
2400 success: bool,
2401 latency_ms: Option<u64>,
2402 _error: Option<String>,
2403 ) -> Result<()> {
2404 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2405 let mut manager = bootstrap_manager.write().await;
2406
2407 let metrics = QualityMetrics {
2409 success_rate: if success { 1.0 } else { 0.0 },
2410 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2411 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2413 last_successful_connection: if success {
2414 chrono::Utc::now()
2415 } else {
2416 chrono::Utc::now() - chrono::Duration::hours(1)
2417 },
2418 uptime_score: 0.5,
2419 };
2420
2421 manager
2422 .update_contact_metrics(peer_id, metrics)
2423 .await
2424 .map_err(|e| {
2425 P2PError::Network(crate::error::NetworkError::ProtocolError(
2426 format!("Failed to update peer metrics: {e}").into(),
2427 ))
2428 })?;
2429 }
2430 Ok(())
2431 }
2432
2433 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2435 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2436 let manager = bootstrap_manager.read().await;
2437 let stats = manager.get_stats().await.map_err(|e| {
2438 P2PError::Network(crate::error::NetworkError::ProtocolError(
2439 format!("Failed to get bootstrap stats: {e}").into(),
2440 ))
2441 })?;
2442 Ok(Some(stats))
2443 } else {
2444 Ok(None)
2445 }
2446 }
2447
2448 pub async fn cached_peer_count(&self) -> usize {
2450 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2451 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2452 {
2453 return stats.total_contacts;
2454 }
2455 0
2456 }
2457
2458 async fn connect_bootstrap_peers(&self) -> Result<()> {
2460 let mut bootstrap_contacts = Vec::new();
2461 let mut used_cache = false;
2462 let mut seen_addresses = std::collections::HashSet::new();
2463
2464 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2466 self.config.bootstrap_peers_str.clone()
2467 } else {
2468 self.config
2470 .bootstrap_peers
2471 .iter()
2472 .map(|addr| addr.to_string())
2473 .collect::<Vec<_>>()
2474 };
2475
2476 if !cli_bootstrap_peers.is_empty() {
2477 info!(
2478 "Using {} CLI-provided bootstrap peers (priority)",
2479 cli_bootstrap_peers.len()
2480 );
2481 for addr in &cli_bootstrap_peers {
2482 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2483 seen_addresses.insert(socket_addr);
2484 let contact = ContactEntry::new(
2485 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2486 vec![socket_addr],
2487 );
2488 bootstrap_contacts.push(contact);
2489 } else {
2490 warn!("Invalid bootstrap address format: {}", addr);
2491 }
2492 }
2493 }
2494
2495 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2497 let manager = bootstrap_manager.read().await;
2498 match manager.get_bootstrap_peers(20).await {
2499 Ok(contacts) => {
2501 if !contacts.is_empty() {
2502 let mut added_from_cache = 0;
2503 for contact in contacts {
2504 let new_addresses: Vec<_> = contact
2506 .addresses
2507 .iter()
2508 .filter(|addr| !seen_addresses.contains(addr))
2509 .copied()
2510 .collect();
2511
2512 if !new_addresses.is_empty() {
2513 for addr in &new_addresses {
2514 seen_addresses.insert(*addr);
2515 }
2516 let mut contact = contact.clone();
2517 contact.addresses = new_addresses;
2518 bootstrap_contacts.push(contact);
2519 added_from_cache += 1;
2520 }
2521 }
2522 if added_from_cache > 0 {
2523 info!(
2524 "Added {} cached bootstrap peers (supplementing CLI peers)",
2525 added_from_cache
2526 );
2527 used_cache = true;
2528 }
2529 }
2530 }
2531 Err(e) => {
2532 warn!("Failed to get cached bootstrap peers: {}", e);
2533 }
2534 }
2535 }
2536
2537 if bootstrap_contacts.is_empty() {
2538 info!("No bootstrap peers configured and no cached peers available");
2539 return Ok(());
2540 }
2541
2542 let mut successful_connections = 0;
2544 for contact in bootstrap_contacts {
2545 for addr in &contact.addresses {
2546 match self.connect_peer(&addr.to_string()).await {
2547 Ok(peer_id) => {
2548 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2549 successful_connections += 1;
2550
2551 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2553 let mut manager = bootstrap_manager.write().await;
2554 let mut updated_contact = contact.clone();
2555 updated_contact.peer_id = peer_id.clone();
2556 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2559 warn!("Failed to update bootstrap cache: {}", e);
2560 }
2561 }
2562 break; }
2564 Err(e) => {
2565 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2566
2567 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2569 let mut manager = bootstrap_manager.write().await;
2570 let mut updated_contact = contact.clone();
2571 updated_contact.update_connection_result(
2572 false,
2573 None,
2574 Some(e.to_string()),
2575 );
2576
2577 if let Err(e) = manager.add_contact(updated_contact).await {
2578 warn!("Failed to update bootstrap cache: {}", e);
2579 }
2580 }
2581 }
2582 }
2583 }
2584 }
2585
2586 if successful_connections == 0 {
2587 if !used_cache {
2588 warn!("Failed to connect to any bootstrap peers");
2589 }
2590 return Ok(());
2593 }
2594 info!(
2595 "Successfully connected to {} bootstrap peers",
2596 successful_connections
2597 );
2598
2599 Ok(())
2600 }
2601
2602 async fn disconnect_all_peers(&self) -> Result<()> {
2604 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2605
2606 for peer_id in peer_ids {
2607 self.disconnect_peer(&peer_id).await?;
2608 }
2609
2610 Ok(())
2611 }
2612
2613 async fn periodic_tasks(&self) -> Result<()> {
2615 Ok(())
2621 }
2622}
2623
2624#[async_trait::async_trait]
2626pub trait NetworkSender: Send + Sync {
2627 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2629
2630 fn local_peer_id(&self) -> &PeerId;
2632}
2633
2634#[derive(Clone)]
2636pub struct P2PNetworkSender {
2637 peer_id: PeerId,
2638 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2640}
2641
2642impl P2PNetworkSender {
2643 pub fn new(
2644 peer_id: PeerId,
2645 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2646 ) -> Self {
2647 Self { peer_id, send_tx }
2648 }
2649}
2650
2651#[async_trait::async_trait]
2653impl NetworkSender for P2PNetworkSender {
2654 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2656 self.send_tx
2657 .send((peer_id.clone(), protocol.to_string(), data))
2658 .map_err(|_| {
2659 P2PError::Network(crate::error::NetworkError::ProtocolError(
2660 "Failed to send message via channel".to_string().into(),
2661 ))
2662 })?;
2663 Ok(())
2664 }
2665
2666 fn local_peer_id(&self) -> &PeerId {
2668 &self.peer_id
2669 }
2670}
2671
2672pub struct NodeBuilder {
2674 config: NodeConfig,
2675}
2676
2677impl Default for NodeBuilder {
2678 fn default() -> Self {
2679 Self::new()
2680 }
2681}
2682
2683impl NodeBuilder {
2684 pub fn new() -> Self {
2686 Self {
2687 config: NodeConfig::default(),
2688 }
2689 }
2690
2691 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2693 self.config.peer_id = Some(peer_id);
2694 self
2695 }
2696
2697 pub fn listen_on(mut self, addr: &str) -> Self {
2699 if let Ok(multiaddr) = addr.parse() {
2700 self.config.listen_addrs.push(multiaddr);
2701 }
2702 self
2703 }
2704
2705 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2707 if let Ok(multiaddr) = addr.parse() {
2708 self.config.bootstrap_peers.push(multiaddr);
2709 }
2710 self.config.bootstrap_peers_str.push(addr.to_string());
2711 self
2712 }
2713
2714 pub fn with_ipv6(mut self, enable: bool) -> Self {
2716 self.config.enable_ipv6 = enable;
2717 self
2718 }
2719
2720 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2724 self.config.connection_timeout = timeout;
2725 self
2726 }
2727
2728 pub fn with_max_connections(mut self, max: usize) -> Self {
2730 self.config.max_connections = max;
2731 self
2732 }
2733
2734 pub fn with_production_mode(mut self) -> Self {
2736 self.config.production_config = Some(ProductionConfig::default());
2737 self
2738 }
2739
2740 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2742 self.config.production_config = Some(production_config);
2743 self
2744 }
2745
2746 pub fn with_diversity_config(
2748 mut self,
2749 diversity_config: crate::security::IPDiversityConfig,
2750 ) -> Self {
2751 self.config.diversity_config = Some(diversity_config);
2752 self
2753 }
2754
2755 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2757 self.config.dht_config = dht_config;
2758 self
2759 }
2760
2761 pub fn with_default_dht(mut self) -> Self {
2763 self.config.dht_config = DHTConfig::default();
2764 self
2765 }
2766
2767 pub async fn build(self) -> Result<P2PNode> {
2769 P2PNode::new(self.config).await
2770 }
2771}
2772
2773#[cfg(test)]
2774#[allow(clippy::unwrap_used, clippy::expect_used)]
2775mod diversity_tests {
2776 use super::*;
2777 use crate::security::IPDiversityConfig;
2778
2779 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2780 let diversity_config = config.diversity_config.clone().unwrap_or_default();
2781 if let Some(ref cache_config) = config.bootstrap_cache_config {
2782 BootstrapManager::with_full_config(
2783 cache_config.clone(),
2784 crate::rate_limit::JoinRateLimiterConfig::default(),
2785 diversity_config,
2786 )
2787 .await
2788 .expect("bootstrap manager")
2789 } else {
2790 BootstrapManager::with_full_config(
2791 crate::bootstrap::CacheConfig::default(),
2792 crate::rate_limit::JoinRateLimiterConfig::default(),
2793 diversity_config,
2794 )
2795 .await
2796 .expect("bootstrap manager")
2797 }
2798 }
2799
2800 #[tokio::test]
2801 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2802 let config = NodeConfig {
2803 diversity_config: Some(IPDiversityConfig::testnet()),
2804 ..Default::default()
2805 };
2806
2807 let manager = build_bootstrap_manager_like_prod(&config).await;
2808 assert!(manager.diversity_config().is_relaxed());
2809 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2810 }
2811}
2812
2813#[allow(dead_code)] async fn handle_received_message_standalone(
2816 message_data: Vec<u8>,
2817 peer_id: &PeerId,
2818 _protocol: &str,
2819 event_tx: &broadcast::Sender<P2PEvent>,
2820) -> Result<()> {
2821 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2823 Ok(message) => {
2824 if let (Some(protocol), Some(data), Some(from)) = (
2825 message.get("protocol").and_then(|v| v.as_str()),
2826 message.get("data").and_then(|v| v.as_array()),
2827 message.get("from").and_then(|v| v.as_str()),
2828 ) {
2829 let data_bytes: Vec<u8> = data
2831 .iter()
2832 .filter_map(|v| v.as_u64().map(|n| n as u8))
2833 .collect();
2834
2835 let event = P2PEvent::Message {
2837 topic: protocol.to_string(),
2838 source: from.to_string(),
2839 data: data_bytes,
2840 };
2841
2842 let _ = event_tx.send(event);
2843 debug!("Generated message event from peer: {}", peer_id);
2844 }
2845 }
2846 Err(e) => {
2847 warn!("Failed to parse received message from {}: {}", peer_id, e);
2848 }
2849 }
2850
2851 Ok(())
2852}
2853
2854#[allow(dead_code)]
2858fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2859 match create_protocol_message_static(protocol, data) {
2860 Ok(msg) => Some(msg),
2861 Err(e) => {
2862 warn!("Failed to create protocol message: {}", e);
2863 None
2864 }
2865 }
2866}
2867
2868#[allow(dead_code)]
2870async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
2871 match result {
2872 Ok(_) => {
2873 debug!("Message sent to peer {} via transport layer", peer_id);
2874 }
2875 Err(e) => {
2876 warn!("Failed to send message to peer {}: {}", peer_id, e);
2877 }
2878 }
2879}
2880
2881#[allow(dead_code)] fn check_rate_limit(
2884 rate_limiter: &RateLimiter,
2885 socket_addr: &std::net::SocketAddr,
2886 remote_addr: &NetworkAddress,
2887) -> Result<()> {
2888 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
2889 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
2890 e
2891 })
2892}
2893
2894#[allow(dead_code)] async fn register_new_peer(
2897 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2898 peer_id: &PeerId,
2899 remote_addr: &NetworkAddress,
2900) {
2901 let mut peers_guard = peers.write().await;
2902 let peer_info = PeerInfo {
2903 peer_id: peer_id.clone(),
2904 addresses: vec![remote_addr.to_string()],
2905 connected_at: tokio::time::Instant::now(),
2906 last_seen: tokio::time::Instant::now(),
2907 status: ConnectionStatus::Connected,
2908 protocols: vec!["p2p-chat/1.0.0".to_string()],
2909 heartbeat_count: 0,
2910 };
2911 peers_guard.insert(peer_id.clone(), peer_info);
2912}
2913
2914#[allow(dead_code)] fn spawn_connection_handler(
2917 connection: Box<dyn crate::transport::Connection>,
2918 peer_id: PeerId,
2919 event_tx: broadcast::Sender<P2PEvent>,
2920 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2921) {
2922 tokio::spawn(async move {
2923 handle_peer_connection(connection, peer_id, event_tx, peers).await;
2924 });
2925}
2926
2927#[allow(dead_code)] async fn handle_peer_connection(
2930 mut connection: Box<dyn crate::transport::Connection>,
2931 peer_id: PeerId,
2932 event_tx: broadcast::Sender<P2PEvent>,
2933 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2934) {
2935 loop {
2936 match connection.receive().await {
2937 Ok(message_data) => {
2938 debug!(
2939 "Received {} bytes from peer: {}",
2940 message_data.len(),
2941 peer_id
2942 );
2943
2944 if let Err(e) = handle_received_message_standalone(
2946 message_data,
2947 &peer_id,
2948 "unknown", &event_tx,
2950 )
2951 .await
2952 {
2953 warn!("Failed to handle message from peer {}: {}", peer_id, e);
2954 }
2955 }
2956 Err(e) => {
2957 warn!("Failed to receive message from {}: {}", peer_id, e);
2958
2959 if !connection.is_alive().await {
2961 info!("Connection to {} is dead, removing peer", peer_id);
2962
2963 remove_peer(&peers, &peer_id).await;
2965
2966 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2968
2969 break; }
2971
2972 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2974 }
2975 }
2976 }
2977}
2978
2979#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
2982 let mut peers_guard = peers.write().await;
2983 peers_guard.remove(peer_id);
2984}
2985
2986#[allow(dead_code)]
2988async fn update_peer_heartbeat(
2989 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2990 peer_id: &PeerId,
2991) -> Result<()> {
2992 let mut peers_guard = peers.write().await;
2993 match peers_guard.get_mut(peer_id) {
2994 Some(peer_info) => {
2995 peer_info.last_seen = Instant::now();
2996 peer_info.heartbeat_count += 1;
2997 Ok(())
2998 }
2999 None => {
3000 warn!("Received heartbeat from unknown peer: {}", peer_id);
3001 Err(P2PError::Network(NetworkError::PeerNotFound(
3002 format!("Peer {} not found", peer_id).into(),
3003 )))
3004 }
3005 }
3006}
3007
3008#[allow(dead_code)]
3010async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
3011 if let Some(manager) = resource_manager {
3012 let metrics = manager.get_metrics().await;
3013 (metrics.memory_used, metrics.cpu_usage)
3014 } else {
3015 (0, 0.0)
3016 }
3017}
3018
3019#[cfg(test)]
3020mod tests {
3021 use super::*;
3022 use std::time::Duration;
3024 use tokio::time::timeout;
3025
3026 fn create_test_node_config() -> NodeConfig {
3032 NodeConfig {
3033 peer_id: Some("test_peer_123".to_string()),
3034 listen_addrs: vec![
3035 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
3036 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
3037 ],
3038 listen_addr: std::net::SocketAddr::new(
3039 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3040 0,
3041 ),
3042 bootstrap_peers: vec![],
3043 bootstrap_peers_str: vec![],
3044 enable_ipv6: true,
3045
3046 connection_timeout: Duration::from_secs(2),
3047 keep_alive_interval: Duration::from_secs(30),
3048 max_connections: 100,
3049 max_incoming_connections: 50,
3050 dht_config: DHTConfig::default(),
3051 security_config: SecurityConfig::default(),
3052 production_config: None,
3053 bootstrap_cache_config: None,
3054 diversity_config: None,
3055 attestation_config: crate::attestation::AttestationConfig::default(),
3056 }
3057 }
3058
3059 #[tokio::test]
3063 async fn test_node_config_default() {
3064 let config = NodeConfig::default();
3065
3066 assert!(config.peer_id.is_none());
3067 assert_eq!(config.listen_addrs.len(), 2);
3068 assert!(config.enable_ipv6);
3069 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
3071 assert_eq!(config.connection_timeout, Duration::from_secs(30));
3072 }
3073
3074 #[tokio::test]
3075 async fn test_dht_config_default() {
3076 let config = DHTConfig::default();
3077
3078 assert_eq!(config.k_value, 20);
3079 assert_eq!(config.alpha_value, 5);
3080 assert_eq!(config.record_ttl, Duration::from_secs(3600));
3081 assert_eq!(config.refresh_interval, Duration::from_secs(600));
3082 }
3083
3084 #[tokio::test]
3085 async fn test_security_config_default() {
3086 let config = SecurityConfig::default();
3087
3088 assert!(config.enable_noise);
3089 assert!(config.enable_tls);
3090 assert_eq!(config.trust_level, TrustLevel::Basic);
3091 }
3092
3093 #[test]
3094 fn test_trust_level_variants() {
3095 let _none = TrustLevel::None;
3097 let _basic = TrustLevel::Basic;
3098 let _full = TrustLevel::Full;
3099
3100 assert_eq!(TrustLevel::None, TrustLevel::None);
3102 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3103 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3104 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3105 }
3106
3107 #[test]
3108 fn test_connection_status_variants() {
3109 let connecting = ConnectionStatus::Connecting;
3110 let connected = ConnectionStatus::Connected;
3111 let disconnecting = ConnectionStatus::Disconnecting;
3112 let disconnected = ConnectionStatus::Disconnected;
3113 let failed = ConnectionStatus::Failed("test error".to_string());
3114
3115 assert_eq!(connecting, ConnectionStatus::Connecting);
3116 assert_eq!(connected, ConnectionStatus::Connected);
3117 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3118 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3119 assert_ne!(connecting, connected);
3120
3121 if let ConnectionStatus::Failed(msg) = failed {
3122 assert_eq!(msg, "test error");
3123 } else {
3124 panic!("Expected Failed status");
3125 }
3126 }
3127
3128 #[tokio::test]
3129 async fn test_node_creation() -> Result<()> {
3130 let config = create_test_node_config();
3131 let node = P2PNode::new(config).await?;
3132
3133 assert_eq!(node.peer_id(), "test_peer_123");
3134 assert!(!node.is_running().await);
3135 assert_eq!(node.peer_count().await, 0);
3136 assert!(node.connected_peers().await.is_empty());
3137
3138 Ok(())
3139 }
3140
3141 #[tokio::test]
3142 async fn test_node_creation_without_peer_id() -> Result<()> {
3143 let mut config = create_test_node_config();
3144 config.peer_id = None;
3145
3146 let node = P2PNode::new(config).await?;
3147
3148 assert!(node.peer_id().starts_with("peer_"));
3150 assert!(!node.is_running().await);
3151
3152 Ok(())
3153 }
3154
3155 #[tokio::test]
3156 async fn test_node_lifecycle() -> Result<()> {
3157 let config = create_test_node_config();
3158 let node = P2PNode::new(config).await?;
3159
3160 assert!(!node.is_running().await);
3162
3163 node.start().await?;
3165 assert!(node.is_running().await);
3166
3167 let listen_addrs = node.listen_addrs().await;
3169 assert!(
3170 !listen_addrs.is_empty(),
3171 "Expected at least one listening address"
3172 );
3173
3174 node.stop().await?;
3176 assert!(!node.is_running().await);
3177
3178 Ok(())
3179 }
3180
3181 #[tokio::test]
3182 async fn test_peer_connection() -> Result<()> {
3183 let config1 = create_test_node_config();
3184 let mut config2 = create_test_node_config();
3185 config2.peer_id = Some("test_peer_456".to_string());
3186
3187 let node1 = P2PNode::new(config1).await?;
3188 let node2 = P2PNode::new(config2).await?;
3189
3190 node1.start().await?;
3191 node2.start().await?;
3192
3193 let node2_addr = node2
3194 .listen_addrs()
3195 .await
3196 .into_iter()
3197 .find(|a| a.ip().is_ipv4())
3198 .ok_or_else(|| {
3199 P2PError::Network(crate::error::NetworkError::InvalidAddress(
3200 "Node 2 did not expose an IPv4 listen address".into(),
3201 ))
3202 })?;
3203
3204 let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3206
3207 assert_eq!(node1.peer_count().await, 1);
3209
3210 let connected_peers = node1.connected_peers().await;
3212 assert_eq!(connected_peers.len(), 1);
3213 assert_eq!(connected_peers[0], peer_id);
3214
3215 let peer_info = node1.peer_info(&peer_id).await;
3217 assert!(peer_info.is_some());
3218 let info = peer_info.expect("Peer info should exist after adding peer");
3219 assert_eq!(info.peer_id, peer_id);
3220 assert_eq!(info.status, ConnectionStatus::Connected);
3221 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3222
3223 node1.disconnect_peer(&peer_id).await?;
3225 assert_eq!(node1.peer_count().await, 0);
3226
3227 node1.stop().await?;
3228 node2.stop().await?;
3229
3230 Ok(())
3231 }
3232
3233 #[cfg_attr(target_os = "windows", ignore)]
3240 #[tokio::test]
3241 async fn test_event_subscription() -> Result<()> {
3242 let ipv4_localhost =
3247 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3248
3249 let mut config1 = create_test_node_config();
3250 config1.listen_addr = ipv4_localhost;
3251 config1.listen_addrs = vec![ipv4_localhost];
3252 config1.enable_ipv6 = false;
3253
3254 let mut config2 = create_test_node_config();
3255 config2.peer_id = Some("test_peer_456".to_string());
3256 config2.listen_addr = ipv4_localhost;
3257 config2.listen_addrs = vec![ipv4_localhost];
3258 config2.enable_ipv6 = false;
3259
3260 let node1 = P2PNode::new(config1).await?;
3261 let node2 = P2PNode::new(config2).await?;
3262
3263 node1.start().await?;
3264 node2.start().await?;
3265
3266 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3269
3270 let mut events = node1.subscribe_events();
3271
3272 let node2_addr = node2.local_addr().ok_or_else(|| {
3274 P2PError::Network(crate::error::NetworkError::ProtocolError(
3275 "No listening address".to_string().into(),
3276 ))
3277 })?;
3278
3279 let mut peer_id = None;
3282 for attempt in 0..3 {
3283 if attempt > 0 {
3284 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3285 }
3286 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3287 Ok(Ok(id)) => {
3288 peer_id = Some(id);
3289 break;
3290 }
3291 Ok(Err(_)) | Err(_) => continue,
3292 }
3293 }
3294 let peer_id = peer_id.ok_or_else(|| {
3295 P2PError::Network(crate::error::NetworkError::ProtocolError(
3296 "Failed to connect after 3 attempts".to_string().into(),
3297 ))
3298 })?;
3299
3300 let event = timeout(Duration::from_secs(2), events.recv()).await;
3302 assert!(event.is_ok());
3303
3304 let event_result = event
3305 .expect("Should receive event")
3306 .expect("Event should not be error");
3307 match event_result {
3308 P2PEvent::PeerConnected(event_peer_id) => {
3309 assert_eq!(event_peer_id, peer_id);
3310 }
3311 _ => panic!("Expected PeerConnected event"),
3312 }
3313
3314 node1.disconnect_peer(&peer_id).await?;
3316
3317 let event = timeout(Duration::from_secs(2), events.recv()).await;
3319 assert!(event.is_ok());
3320
3321 let event_result = event
3322 .expect("Should receive event")
3323 .expect("Event should not be error");
3324 match event_result {
3325 P2PEvent::PeerDisconnected(event_peer_id) => {
3326 assert_eq!(event_peer_id, peer_id);
3327 }
3328 _ => panic!("Expected PeerDisconnected event"),
3329 }
3330
3331 node1.stop().await?;
3332 node2.stop().await?;
3333
3334 Ok(())
3335 }
3336
3337 #[cfg_attr(target_os = "windows", ignore)]
3339 #[tokio::test]
3340 async fn test_message_sending() -> Result<()> {
3341 let mut config1 = create_test_node_config();
3343 config1.listen_addr =
3344 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3345 let node1 = P2PNode::new(config1).await?;
3346 node1.start().await?;
3347
3348 let mut config2 = create_test_node_config();
3349 config2.listen_addr =
3350 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3351 let node2 = P2PNode::new(config2).await?;
3352 node2.start().await?;
3353
3354 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3356
3357 let node2_addr = node2.local_addr().ok_or_else(|| {
3359 P2PError::Network(crate::error::NetworkError::ProtocolError(
3360 "No listening address".to_string().into(),
3361 ))
3362 })?;
3363
3364 let peer_id =
3366 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3367 Ok(res) => res?,
3368 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3369 };
3370
3371 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3373
3374 let message_data = b"Hello, peer!".to_vec();
3376 let result = match timeout(
3377 Duration::from_millis(500),
3378 node1.send_message(&peer_id, "test-protocol", message_data),
3379 )
3380 .await
3381 {
3382 Ok(res) => res,
3383 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3384 };
3385 if let Err(e) = &result {
3388 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3389 }
3390
3391 let non_existent_peer = "non_existent_peer".to_string();
3393 let result = node1
3394 .send_message(&non_existent_peer, "test-protocol", vec![])
3395 .await;
3396 assert!(result.is_err(), "Sending to non-existent peer should fail");
3397
3398 Ok(())
3399 }
3400
3401 #[tokio::test]
3402 async fn test_remote_mcp_operations() -> Result<()> {
3403 let config = create_test_node_config();
3404 let node = P2PNode::new(config).await?;
3405
3406 node.start().await?;
3408 node.stop().await?;
3409 Ok(())
3410 }
3411
3412 #[tokio::test]
3413 async fn test_health_check() -> Result<()> {
3414 let config = create_test_node_config();
3415 let node = P2PNode::new(config).await?;
3416
3417 let result = node.health_check().await;
3419 assert!(result.is_ok());
3420
3421 Ok(())
3426 }
3427
3428 #[tokio::test]
3429 async fn test_node_uptime() -> Result<()> {
3430 let config = create_test_node_config();
3431 let node = P2PNode::new(config).await?;
3432
3433 let uptime1 = node.uptime();
3434 assert!(uptime1 >= Duration::from_secs(0));
3435
3436 tokio::time::sleep(Duration::from_millis(10)).await;
3438
3439 let uptime2 = node.uptime();
3440 assert!(uptime2 > uptime1);
3441
3442 Ok(())
3443 }
3444
3445 #[tokio::test]
3446 async fn test_node_config_access() -> Result<()> {
3447 let config = create_test_node_config();
3448 let expected_peer_id = config.peer_id.clone();
3449 let node = P2PNode::new(config).await?;
3450
3451 let node_config = node.config();
3452 assert_eq!(node_config.peer_id, expected_peer_id);
3453 assert_eq!(node_config.max_connections, 100);
3454 Ok(())
3457 }
3458
3459 #[tokio::test]
3460 async fn test_mcp_server_access() -> Result<()> {
3461 let config = create_test_node_config();
3462 let _node = P2PNode::new(config).await?;
3463
3464 Ok(())
3466 }
3467
3468 #[tokio::test]
3469 async fn test_dht_access() -> Result<()> {
3470 let config = create_test_node_config();
3471 let node = P2PNode::new(config).await?;
3472
3473 assert!(node.dht().is_some());
3475
3476 Ok(())
3477 }
3478
3479 #[tokio::test]
3480 async fn test_node_builder() -> Result<()> {
3481 let builder = P2PNode::builder()
3483 .with_peer_id("builder_test_peer".to_string())
3484 .listen_on("/ip4/127.0.0.1/tcp/0")
3485 .listen_on("/ip6/::1/tcp/0")
3486 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3488 .with_connection_timeout(Duration::from_secs(15))
3489 .with_max_connections(200);
3490
3491 let config = builder.config;
3493 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3494 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3497 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3498 assert_eq!(config.max_connections, 200);
3499
3500 Ok(())
3501 }
3502
3503 #[tokio::test]
3504 async fn test_bootstrap_peers() -> Result<()> {
3505 let mut config = create_test_node_config();
3506 config.bootstrap_peers = vec![
3507 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3508 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3509 ];
3510
3511 let node = P2PNode::new(config).await?;
3512
3513 node.start().await?;
3515
3516 let _peer_count = node.peer_count().await;
3520
3521 node.stop().await?;
3522 Ok(())
3523 }
3524
3525 #[tokio::test]
3526 async fn test_production_mode_disabled() -> Result<()> {
3527 let config = create_test_node_config();
3528 let node = P2PNode::new(config).await?;
3529
3530 assert!(!node.is_production_mode());
3531 assert!(node.production_config().is_none());
3532
3533 let result = node.resource_metrics().await;
3535 assert!(result.is_err());
3536 assert!(result.unwrap_err().to_string().contains("not enabled"));
3537
3538 Ok(())
3539 }
3540
3541 #[tokio::test]
3542 async fn test_network_event_variants() {
3543 let peer_id = "test_peer".to_string();
3545 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3546
3547 let _peer_connected = NetworkEvent::PeerConnected {
3548 peer_id: peer_id.clone(),
3549 addresses: vec![address.clone()],
3550 };
3551
3552 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3553 peer_id: peer_id.clone(),
3554 reason: "test disconnect".to_string(),
3555 };
3556
3557 let _message_received = NetworkEvent::MessageReceived {
3558 peer_id: peer_id.clone(),
3559 protocol: "test-protocol".to_string(),
3560 data: vec![1, 2, 3],
3561 };
3562
3563 let _connection_failed = NetworkEvent::ConnectionFailed {
3564 peer_id: Some(peer_id.clone()),
3565 address: address.clone(),
3566 error: "connection refused".to_string(),
3567 };
3568
3569 let _dht_stored = NetworkEvent::DHTRecordStored {
3570 key: vec![1, 2, 3],
3571 value: vec![4, 5, 6],
3572 };
3573
3574 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3575 key: vec![1, 2, 3],
3576 value: Some(vec![4, 5, 6]),
3577 };
3578 }
3579
3580 #[tokio::test]
3581 async fn test_peer_info_structure() {
3582 let peer_info = PeerInfo {
3583 peer_id: "test_peer".to_string(),
3584 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3585 connected_at: Instant::now(),
3586 last_seen: Instant::now(),
3587 status: ConnectionStatus::Connected,
3588 protocols: vec!["test-protocol".to_string()],
3589 heartbeat_count: 0,
3590 };
3591
3592 assert_eq!(peer_info.peer_id, "test_peer");
3593 assert_eq!(peer_info.addresses.len(), 1);
3594 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3595 assert_eq!(peer_info.protocols.len(), 1);
3596 }
3597
3598 #[tokio::test]
3599 async fn test_serialization() -> Result<()> {
3600 let config = create_test_node_config();
3602 let serialized = serde_json::to_string(&config)?;
3603 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3604
3605 assert_eq!(config.peer_id, deserialized.peer_id);
3606 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3607 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3608
3609 Ok(())
3610 }
3611
3612 #[tokio::test]
3613 async fn test_get_peer_id_by_address_found() -> Result<()> {
3614 let config = create_test_node_config();
3615 let node = P2PNode::new(config).await?;
3616
3617 let test_peer_id = "peer_test_123".to_string();
3619 let test_address = "192.168.1.100:9000".to_string();
3620
3621 let peer_info = PeerInfo {
3622 peer_id: test_peer_id.clone(),
3623 addresses: vec![test_address.clone()],
3624 connected_at: Instant::now(),
3625 last_seen: Instant::now(),
3626 status: ConnectionStatus::Connected,
3627 protocols: vec!["test-protocol".to_string()],
3628 heartbeat_count: 0,
3629 };
3630
3631 node.peers
3632 .write()
3633 .await
3634 .insert(test_peer_id.clone(), peer_info);
3635
3636 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3638 assert_eq!(found_peer_id, Some(test_peer_id));
3639
3640 Ok(())
3641 }
3642
3643 #[tokio::test]
3644 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3645 let config = create_test_node_config();
3646 let node = P2PNode::new(config).await?;
3647
3648 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3650 assert_eq!(result, None);
3651
3652 Ok(())
3653 }
3654
3655 #[tokio::test]
3656 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3657 let config = create_test_node_config();
3658 let node = P2PNode::new(config).await?;
3659
3660 let result = node.get_peer_id_by_address("invalid-address").await;
3662 assert_eq!(result, None);
3663
3664 Ok(())
3665 }
3666
3667 #[tokio::test]
3668 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3669 let config = create_test_node_config();
3670 let node = P2PNode::new(config).await?;
3671
3672 let peer1_id = "peer_1".to_string();
3674 let peer1_addr = "192.168.1.101:9001".to_string();
3675
3676 let peer2_id = "peer_2".to_string();
3677 let peer2_addr = "192.168.1.102:9002".to_string();
3678
3679 let peer1_info = PeerInfo {
3680 peer_id: peer1_id.clone(),
3681 addresses: vec![peer1_addr.clone()],
3682 connected_at: Instant::now(),
3683 last_seen: Instant::now(),
3684 status: ConnectionStatus::Connected,
3685 protocols: vec!["test-protocol".to_string()],
3686 heartbeat_count: 0,
3687 };
3688
3689 let peer2_info = PeerInfo {
3690 peer_id: peer2_id.clone(),
3691 addresses: vec![peer2_addr.clone()],
3692 connected_at: Instant::now(),
3693 last_seen: Instant::now(),
3694 status: ConnectionStatus::Connected,
3695 protocols: vec!["test-protocol".to_string()],
3696 heartbeat_count: 0,
3697 };
3698
3699 node.peers
3700 .write()
3701 .await
3702 .insert(peer1_id.clone(), peer1_info);
3703 node.peers
3704 .write()
3705 .await
3706 .insert(peer2_id.clone(), peer2_info);
3707
3708 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3710 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3711
3712 assert_eq!(found_peer1, Some(peer1_id));
3713 assert_eq!(found_peer2, Some(peer2_id));
3714
3715 Ok(())
3716 }
3717
3718 #[tokio::test]
3719 async fn test_list_active_connections_empty() -> Result<()> {
3720 let config = create_test_node_config();
3721 let node = P2PNode::new(config).await?;
3722
3723 let connections = node.list_active_connections().await;
3725 assert!(connections.is_empty());
3726
3727 Ok(())
3728 }
3729
3730 #[tokio::test]
3731 async fn test_list_active_connections_with_peers() -> Result<()> {
3732 let config = create_test_node_config();
3733 let node = P2PNode::new(config).await?;
3734
3735 let peer1_id = "peer_1".to_string();
3737 let peer1_addrs = vec![
3738 "192.168.1.101:9001".to_string(),
3739 "192.168.1.101:9002".to_string(),
3740 ];
3741
3742 let peer2_id = "peer_2".to_string();
3743 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3744
3745 let peer1_info = PeerInfo {
3746 peer_id: peer1_id.clone(),
3747 addresses: peer1_addrs.clone(),
3748 connected_at: Instant::now(),
3749 last_seen: Instant::now(),
3750 status: ConnectionStatus::Connected,
3751 protocols: vec!["test-protocol".to_string()],
3752 heartbeat_count: 0,
3753 };
3754
3755 let peer2_info = PeerInfo {
3756 peer_id: peer2_id.clone(),
3757 addresses: peer2_addrs.clone(),
3758 connected_at: Instant::now(),
3759 last_seen: Instant::now(),
3760 status: ConnectionStatus::Connected,
3761 protocols: vec!["test-protocol".to_string()],
3762 heartbeat_count: 0,
3763 };
3764
3765 node.peers
3766 .write()
3767 .await
3768 .insert(peer1_id.clone(), peer1_info);
3769 node.peers
3770 .write()
3771 .await
3772 .insert(peer2_id.clone(), peer2_info);
3773
3774 node.active_connections
3776 .write()
3777 .await
3778 .insert(peer1_id.clone());
3779 node.active_connections
3780 .write()
3781 .await
3782 .insert(peer2_id.clone());
3783
3784 let connections = node.list_active_connections().await;
3786 assert_eq!(connections.len(), 2);
3787
3788 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3790 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3791
3792 assert!(peer1_conn.is_some());
3793 assert!(peer2_conn.is_some());
3794
3795 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3797 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3798
3799 Ok(())
3800 }
3801
3802 #[tokio::test]
3803 async fn test_remove_peer_success() -> Result<()> {
3804 let config = create_test_node_config();
3805 let node = P2PNode::new(config).await?;
3806
3807 let peer_id = "peer_to_remove".to_string();
3809 let peer_info = PeerInfo {
3810 peer_id: peer_id.clone(),
3811 addresses: vec!["192.168.1.100:9000".to_string()],
3812 connected_at: Instant::now(),
3813 last_seen: Instant::now(),
3814 status: ConnectionStatus::Connected,
3815 protocols: vec!["test-protocol".to_string()],
3816 heartbeat_count: 0,
3817 };
3818
3819 node.peers.write().await.insert(peer_id.clone(), peer_info);
3820
3821 assert!(node.is_peer_connected(&peer_id).await);
3823
3824 let removed = node.remove_peer(&peer_id).await;
3826 assert!(removed);
3827
3828 assert!(!node.is_peer_connected(&peer_id).await);
3830
3831 Ok(())
3832 }
3833
3834 #[tokio::test]
3835 async fn test_remove_peer_nonexistent() -> Result<()> {
3836 let config = create_test_node_config();
3837 let node = P2PNode::new(config).await?;
3838
3839 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3841 assert!(!removed);
3842
3843 Ok(())
3844 }
3845
3846 #[tokio::test]
3847 async fn test_is_peer_connected() -> Result<()> {
3848 let config = create_test_node_config();
3849 let node = P2PNode::new(config).await?;
3850
3851 let peer_id = "test_peer".to_string();
3852
3853 assert!(!node.is_peer_connected(&peer_id).await);
3855
3856 let peer_info = PeerInfo {
3858 peer_id: peer_id.clone(),
3859 addresses: vec!["192.168.1.100:9000".to_string()],
3860 connected_at: Instant::now(),
3861 last_seen: Instant::now(),
3862 status: ConnectionStatus::Connected,
3863 protocols: vec!["test-protocol".to_string()],
3864 heartbeat_count: 0,
3865 };
3866
3867 node.peers.write().await.insert(peer_id.clone(), peer_info);
3868
3869 assert!(node.is_peer_connected(&peer_id).await);
3871
3872 node.remove_peer(&peer_id).await;
3874
3875 assert!(!node.is_peer_connected(&peer_id).await);
3877
3878 Ok(())
3879 }
3880
3881 #[test]
3882 fn test_normalize_ipv6_wildcard() {
3883 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3884
3885 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3886 let normalized = normalize_wildcard_to_loopback(wildcard);
3887
3888 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3889 assert_eq!(normalized.port(), 8080);
3890 }
3891
3892 #[test]
3893 fn test_normalize_ipv4_wildcard() {
3894 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3895
3896 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3897 let normalized = normalize_wildcard_to_loopback(wildcard);
3898
3899 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3900 assert_eq!(normalized.port(), 9000);
3901 }
3902
3903 #[test]
3904 fn test_normalize_specific_address_unchanged() {
3905 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3906 let normalized = normalize_wildcard_to_loopback(specific);
3907
3908 assert_eq!(normalized, specific);
3909 }
3910
3911 #[test]
3912 fn test_normalize_loopback_unchanged() {
3913 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3914
3915 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3916 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3917 assert_eq!(normalized_v6, loopback_v6);
3918
3919 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3920 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3921 assert_eq!(normalized_v4, loopback_v4);
3922 }
3923}