1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::control::RejectionMessage;
23use crate::dht::DHT;
24use crate::error::{NetworkError, P2PError, P2pResult as Result};
25use crate::identity::rejection::RejectionReason;
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::DualStackNetworkNode;
30#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
32use crate::validation::RateLimitConfig;
33use crate::validation::RateLimiter;
34use crate::{NetworkAddress, PeerId};
35use serde::{Deserialize, Serialize};
36use std::collections::{HashMap, HashSet};
37use std::sync::Arc;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::time::Duration;
40use tokio::sync::{RwLock, broadcast};
41use tokio::time::Instant;
42use tracing::{debug, error, info, trace, warn};
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NodeConfig {
47 pub peer_id: Option<PeerId>,
49
50 pub listen_addrs: Vec<std::net::SocketAddr>,
52
53 pub listen_addr: std::net::SocketAddr,
55
56 pub bootstrap_peers: Vec<std::net::SocketAddr>,
58
59 pub bootstrap_peers_str: Vec<String>,
61
62 pub enable_ipv6: bool,
64
65 pub connection_timeout: Duration,
68
69 pub keep_alive_interval: Duration,
71
72 pub max_connections: usize,
74
75 pub max_incoming_connections: usize,
77
78 pub dht_config: DHTConfig,
80
81 pub security_config: SecurityConfig,
83
84 pub production_config: Option<ProductionConfig>,
86
87 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
89
90 pub diversity_config: Option<crate::security::IPDiversityConfig>,
95
96 #[serde(default)]
101 pub attestation_config: crate::attestation::AttestationConfig,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct DHTConfig {
107 pub k_value: usize,
109
110 pub alpha_value: usize,
112
113 pub record_ttl: Duration,
115
116 pub refresh_interval: Duration,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct SecurityConfig {
123 pub enable_noise: bool,
125
126 pub enable_tls: bool,
128
129 pub trust_level: TrustLevel,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
135pub enum TrustLevel {
136 None,
138 Basic,
140 Full,
142}
143
144#[inline]
152fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
153 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
154
155 if ipv6_enabled {
156 addrs.push(std::net::SocketAddr::new(
157 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
158 port,
159 ));
160 }
161
162 addrs.push(std::net::SocketAddr::new(
163 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
164 port,
165 ));
166
167 addrs
168}
169
170impl NodeConfig {
171 pub fn new() -> Result<Self> {
177 let config = Config::default();
178 let listen_addr = config.listen_socket_addr()?;
179
180 Ok(Self {
181 peer_id: None,
182 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
183 listen_addr,
184 bootstrap_peers: Vec::new(),
185 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
186 enable_ipv6: config.network.ipv6_enabled,
187 connection_timeout: Duration::from_secs(config.network.connection_timeout),
188 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
189 max_connections: config.network.max_connections,
190 max_incoming_connections: config.security.connection_limit as usize,
191 dht_config: DHTConfig::default(),
192 security_config: SecurityConfig::default(),
193 production_config: None,
194 bootstrap_cache_config: None,
195 diversity_config: None,
196 attestation_config: config.attestation.clone(),
197 })
198 }
199
200 pub fn builder() -> NodeConfigBuilder {
202 NodeConfigBuilder::default()
203 }
204}
205
206#[derive(Debug, Clone, Default)]
212pub struct NodeConfigBuilder {
213 peer_id: Option<PeerId>,
214 listen_port: Option<u16>,
215 enable_ipv6: Option<bool>,
216 bootstrap_peers: Vec<std::net::SocketAddr>,
217 max_connections: Option<usize>,
218 connection_timeout: Option<Duration>,
219 keep_alive_interval: Option<Duration>,
220 dht_config: Option<DHTConfig>,
221 security_config: Option<SecurityConfig>,
222 production_config: Option<ProductionConfig>,
223}
224
225impl NodeConfigBuilder {
226 pub fn peer_id(mut self, peer_id: PeerId) -> Self {
228 self.peer_id = Some(peer_id);
229 self
230 }
231
232 pub fn listen_port(mut self, port: u16) -> Self {
234 self.listen_port = Some(port);
235 self
236 }
237
238 pub fn ipv6(mut self, enabled: bool) -> Self {
240 self.enable_ipv6 = Some(enabled);
241 self
242 }
243
244 pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
246 self.bootstrap_peers.push(addr);
247 self
248 }
249
250 pub fn max_connections(mut self, max: usize) -> Self {
252 self.max_connections = Some(max);
253 self
254 }
255
256 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
258 self.connection_timeout = Some(timeout);
259 self
260 }
261
262 pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
264 self.keep_alive_interval = Some(interval);
265 self
266 }
267
268 pub fn dht_config(mut self, config: DHTConfig) -> Self {
270 self.dht_config = Some(config);
271 self
272 }
273
274 pub fn security_config(mut self, config: SecurityConfig) -> Self {
276 self.security_config = Some(config);
277 self
278 }
279
280 pub fn production_config(mut self, config: ProductionConfig) -> Self {
282 self.production_config = Some(config);
283 self
284 }
285
286 pub fn build(self) -> Result<NodeConfig> {
292 let base_config = Config::default();
293 let default_port = base_config
294 .listen_socket_addr()
295 .map(|addr| addr.port())
296 .unwrap_or(9000);
297 let port = self.listen_port.unwrap_or(default_port);
298 let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
299
300 let listen_addr =
301 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
302
303 Ok(NodeConfig {
304 peer_id: self.peer_id,
305 listen_addrs: build_listen_addrs(port, ipv6_enabled),
306 listen_addr,
307 bootstrap_peers: self.bootstrap_peers.clone(),
308 bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
309 enable_ipv6: ipv6_enabled,
310 connection_timeout: self
311 .connection_timeout
312 .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
313 keep_alive_interval: self
314 .keep_alive_interval
315 .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
316 max_connections: self
317 .max_connections
318 .unwrap_or(base_config.network.max_connections),
319 max_incoming_connections: base_config.security.connection_limit as usize,
320 dht_config: self.dht_config.unwrap_or_default(),
321 security_config: self.security_config.unwrap_or_default(),
322 production_config: self.production_config,
323 bootstrap_cache_config: None,
324 diversity_config: None,
325 attestation_config: base_config.attestation.clone(),
326 })
327 }
328}
329
330impl Default for NodeConfig {
331 fn default() -> Self {
332 let config = Config::default();
333 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
334 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 9000)
335 });
336
337 Self {
338 peer_id: None,
339 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
340 listen_addr,
341 bootstrap_peers: Vec::new(),
342 bootstrap_peers_str: Vec::new(),
343 enable_ipv6: config.network.ipv6_enabled,
344 connection_timeout: Duration::from_secs(config.network.connection_timeout),
345 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
346 max_connections: config.network.max_connections,
347 max_incoming_connections: config.security.connection_limit as usize,
348 dht_config: DHTConfig::default(),
349 security_config: SecurityConfig::default(),
350 production_config: None,
351 bootstrap_cache_config: None,
352 diversity_config: None,
353 attestation_config: config.attestation.clone(),
354 }
355 }
356}
357
358impl NodeConfig {
359 pub fn from_config(config: &Config) -> Result<Self> {
361 let listen_addr = config.listen_socket_addr()?;
362 let bootstrap_addrs = config.bootstrap_addrs()?;
363
364 let mut node_config = Self {
365 peer_id: None,
366 listen_addrs: vec![listen_addr],
367 listen_addr,
368 bootstrap_peers: bootstrap_addrs
369 .iter()
370 .map(|addr| addr.socket_addr())
371 .collect(),
372 bootstrap_peers_str: config
373 .network
374 .bootstrap_nodes
375 .iter()
376 .map(|addr| addr.to_string())
377 .collect(),
378 enable_ipv6: config.network.ipv6_enabled,
379
380 connection_timeout: Duration::from_secs(config.network.connection_timeout),
381 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
382 max_connections: config.network.max_connections,
383 max_incoming_connections: config.security.connection_limit as usize,
384 dht_config: DHTConfig {
385 k_value: 20,
386 alpha_value: 3,
387 record_ttl: Duration::from_secs(3600),
388 refresh_interval: Duration::from_secs(900),
389 },
390 security_config: SecurityConfig {
391 enable_noise: true,
392 enable_tls: true,
393 trust_level: TrustLevel::Basic,
394 },
395 production_config: Some(ProductionConfig {
396 max_connections: config.network.max_connections,
397 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
400 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
401 health_check_interval: Duration::from_secs(30),
402 metrics_interval: Duration::from_secs(60),
403 enable_performance_tracking: true,
404 enable_auto_cleanup: true,
405 shutdown_timeout: Duration::from_secs(30),
406 rate_limits: crate::production::RateLimitConfig::default(),
407 }),
408 bootstrap_cache_config: None,
409 diversity_config: None,
410 attestation_config: config.attestation.clone(),
411 };
412
413 if config.network.ipv6_enabled {
415 node_config.listen_addrs.push(std::net::SocketAddr::new(
416 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
417 listen_addr.port(),
418 ));
419 }
420
421 Ok(node_config)
422 }
423
424 pub fn with_listen_addr(addr: &str) -> Result<Self> {
426 let listen_addr: std::net::SocketAddr = addr
427 .parse()
428 .map_err(|e: std::net::AddrParseError| {
429 NetworkError::InvalidAddress(e.to_string().into())
430 })
431 .map_err(P2PError::Network)?;
432 let cfg = NodeConfig {
433 listen_addr,
434 listen_addrs: vec![listen_addr],
435 diversity_config: None,
436 ..Default::default()
437 };
438 Ok(cfg)
439 }
440}
441
442impl Default for DHTConfig {
443 fn default() -> Self {
444 Self {
445 k_value: 20,
446 alpha_value: 5,
447 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
450 }
451}
452
453impl Default for SecurityConfig {
454 fn default() -> Self {
455 Self {
456 enable_noise: true,
457 enable_tls: true,
458 trust_level: TrustLevel::Basic,
459 }
460 }
461}
462
463#[derive(Debug, Clone)]
465pub struct PeerInfo {
466 pub peer_id: PeerId,
468
469 pub addresses: Vec<String>,
471
472 pub connected_at: Instant,
474
475 pub last_seen: Instant,
477
478 pub status: ConnectionStatus,
480
481 pub protocols: Vec<String>,
483
484 pub heartbeat_count: u64,
486}
487
488#[derive(Debug, Clone, PartialEq)]
490pub enum ConnectionStatus {
491 Connecting,
493 Connected,
495 Disconnecting,
497 Disconnected,
499 Failed(String),
501}
502
503#[derive(Debug, Clone)]
505pub enum NetworkEvent {
506 PeerConnected {
508 peer_id: PeerId,
510 addresses: Vec<String>,
512 },
513
514 PeerDisconnected {
516 peer_id: PeerId,
518 reason: String,
520 },
521
522 MessageReceived {
524 peer_id: PeerId,
526 protocol: String,
528 data: Vec<u8>,
530 },
531
532 ConnectionFailed {
534 peer_id: Option<PeerId>,
536 address: String,
538 error: String,
540 },
541
542 DHTRecordStored {
544 key: Vec<u8>,
546 value: Vec<u8>,
548 },
549
550 DHTRecordRetrieved {
552 key: Vec<u8>,
554 value: Option<Vec<u8>>,
556 },
557}
558
559#[derive(Debug, Clone)]
564pub enum P2PEvent {
565 Message {
567 topic: String,
569 source: PeerId,
571 data: Vec<u8>,
573 },
574 PeerConnected(PeerId),
576 PeerDisconnected(PeerId),
578}
579
580pub struct P2PNode {
590 config: NodeConfig,
592
593 peer_id: PeerId,
595
596 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
598
599 event_tx: broadcast::Sender<P2PEvent>,
601
602 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
604
605 start_time: Instant,
607
608 running: RwLock<bool>,
610
611 dht: Option<Arc<RwLock<DHT>>>,
613
614 resource_manager: Option<Arc<ResourceManager>>,
616
617 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
619
620 dual_node: Arc<DualStackNetworkNode>,
622
623 #[allow(dead_code)]
625 rate_limiter: Arc<RateLimiter>,
626
627 active_connections: Arc<RwLock<HashSet<PeerId>>>,
630
631 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
633
634 #[allow(dead_code)]
636 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
637
638 #[allow(dead_code)]
640 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
641
642 #[allow(dead_code)]
644 shutdown: Arc<AtomicBool>,
645
646 #[allow(dead_code)]
648 geo_provider: Arc<BgpGeoProvider>,
649
650 entangled_id: Option<crate::attestation::EntangledId>,
653
654 binary_hash: [u8; 32],
657}
658
659fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
675 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
676
677 if addr.ip().is_unspecified() {
678 let loopback_ip = match addr {
680 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
683 std::net::SocketAddr::new(loopback_ip, addr.port())
684 } else {
685 addr
687 }
688}
689
690impl P2PNode {
691 pub fn new_for_tests() -> Result<Self> {
693 let (event_tx, _) = broadcast::channel(16);
694 Ok(Self {
695 config: NodeConfig::default(),
696 peer_id: "test_peer".to_string(),
697 peers: Arc::new(RwLock::new(HashMap::new())),
698 event_tx,
699 listen_addrs: RwLock::new(Vec::new()),
700 start_time: Instant::now(),
701 running: RwLock::new(false),
702 dht: None,
703 resource_manager: None,
704 bootstrap_manager: None,
705 dual_node: {
706 let v6: Option<std::net::SocketAddr> = "[::1]:0"
708 .parse()
709 .ok()
710 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
711 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
712 let handle = tokio::runtime::Handle::current();
713 let dual_attempt = handle.block_on(
714 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
715 );
716 let dual = match dual_attempt {
717 Ok(d) => d,
718 Err(_e1) => {
719 let fallback = handle.block_on(
721 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
722 None,
723 "127.0.0.1:0".parse().ok(),
724 ),
725 );
726 match fallback {
727 Ok(d) => d,
728 Err(e2) => {
729 return Err(P2PError::Network(NetworkError::BindError(
730 format!("Failed to create dual-stack network node: {}", e2)
731 .into(),
732 )));
733 }
734 }
735 }
736 };
737 Arc::new(dual)
738 },
739 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
740 max_requests: 100,
741 burst_size: 100,
742 window: std::time::Duration::from_secs(1),
743 ..Default::default()
744 })),
745 active_connections: Arc::new(RwLock::new(HashSet::new())),
746 connection_monitor_handle: Arc::new(RwLock::new(None)),
747 keepalive_handle: Arc::new(RwLock::new(None)),
748 shutdown: Arc::new(AtomicBool::new(false)),
749 geo_provider: Arc::new(BgpGeoProvider::new()),
750 security_dashboard: None,
751 entangled_id: None,
753 binary_hash: [0u8; 32],
754 })
755 }
756 pub async fn new(config: NodeConfig) -> Result<Self> {
758 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
759 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
761 });
762
763 let (event_tx, _) = broadcast::channel(1000);
764
765 {
768 use blake3::Hasher;
769 let mut hasher = Hasher::new();
770 hasher.update(peer_id.as_bytes());
771 let digest = hasher.finalize();
772 let mut nid = [0u8; 32];
773 nid.copy_from_slice(digest.as_bytes());
774 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
775 crate::identity::node_identity::NodeId::from_bytes(nid),
776 ));
777 }
780
781 let (dht, security_dashboard) = if true {
783 let _dht_config = crate::dht::DHTConfig {
785 replication_factor: config.dht_config.k_value,
786 bucket_size: config.dht_config.k_value,
787 alpha: config.dht_config.alpha_value,
788 record_ttl: config.dht_config.record_ttl,
789 bucket_refresh_interval: config.dht_config.refresh_interval,
790 republish_interval: config.dht_config.refresh_interval,
791 max_distance: 160,
792 };
793 let peer_bytes = peer_id.as_bytes();
795 let mut node_id_bytes = [0u8; 32];
796 let len = peer_bytes.len().min(32);
797 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
798 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
799 let dht_instance = DHT::new(node_id).map_err(|e| {
800 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
801 e.to_string().into(),
802 ))
803 })?;
804 dht_instance.start_maintenance_tasks();
805
806 let security_metrics = dht_instance.security_metrics();
808 let dashboard = crate::dht::metrics::SecurityDashboard::new(
809 security_metrics,
810 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
811 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
812 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
813 );
814
815 (
816 Some(Arc::new(RwLock::new(dht_instance))),
817 Some(Arc::new(dashboard)),
818 )
819 } else {
820 (None, None)
821 };
822
823 let resource_manager = config
827 .production_config
828 .clone()
829 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
830
831 let diversity_config = config.diversity_config.clone().unwrap_or_default();
833 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
834 match BootstrapManager::with_full_config(
835 cache_config.clone(),
836 crate::rate_limit::JoinRateLimiterConfig::default(),
837 diversity_config.clone(),
838 )
839 .await
840 {
841 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
842 Err(e) => {
843 warn!(
844 "Failed to initialize bootstrap manager: {}, continuing without cache",
845 e
846 );
847 None
848 }
849 }
850 } else {
851 match BootstrapManager::with_full_config(
852 crate::bootstrap::CacheConfig::default(),
853 crate::rate_limit::JoinRateLimiterConfig::default(),
854 diversity_config,
855 )
856 .await
857 {
858 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
859 Err(e) => {
860 warn!(
861 "Failed to initialize bootstrap manager: {}, continuing without cache",
862 e
863 );
864 None
865 }
866 }
867 };
868
869 let (v6_opt, v4_opt) = {
872 let port = config.listen_addr.port();
873 let ip = config.listen_addr.ip();
874
875 let v4_addr = if ip.is_ipv4() {
876 Some(std::net::SocketAddr::new(ip, port))
877 } else {
878 Some(std::net::SocketAddr::new(
881 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
882 port,
883 ))
884 };
885
886 let v6_addr = if config.enable_ipv6 {
887 if ip.is_ipv6() {
888 Some(std::net::SocketAddr::new(ip, port))
889 } else {
890 Some(std::net::SocketAddr::new(
891 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
892 port,
893 ))
894 }
895 } else {
896 None
897 };
898 (v6_addr, v4_addr)
899 };
900
901 let dual_node = Arc::new(
902 DualStackNetworkNode::new(v6_opt, v4_opt)
903 .await
904 .map_err(|e| {
905 P2PError::Transport(crate::error::TransportError::SetupFailed(
906 format!("Failed to create dual-stack network nodes: {}", e).into(),
907 ))
908 })?,
909 );
910
911 let rate_limiter = Arc::new(RateLimiter::new(
913 crate::validation::RateLimitConfig::default(),
914 ));
915
916 let active_connections = Arc::new(RwLock::new(HashSet::new()));
918
919 let geo_provider = Arc::new(BgpGeoProvider::new());
921
922 let peers = Arc::new(RwLock::new(HashMap::new()));
924
925 let connection_event_rx = dual_node.subscribe_connection_events();
929
930 let connection_monitor_handle = {
931 let active_conns = Arc::clone(&active_connections);
932 let peers_map = Arc::clone(&peers);
933 let event_tx_clone = event_tx.clone();
934 let dual_node_clone = Arc::clone(&dual_node);
935 let geo_provider_clone = Arc::clone(&geo_provider);
936 let peer_id_clone = peer_id.clone();
937
938 let handle = tokio::spawn(async move {
939 Self::connection_lifecycle_monitor_with_rx(
940 dual_node_clone,
941 connection_event_rx,
942 active_conns,
943 peers_map,
944 event_tx_clone,
945 geo_provider_clone,
946 peer_id_clone,
947 )
948 .await;
949 });
950
951 Arc::new(RwLock::new(Some(handle)))
952 };
953
954 let shutdown = Arc::new(AtomicBool::new(false));
956 let keepalive_handle = {
957 let active_conns = Arc::clone(&active_connections);
958 let dual_node_clone = Arc::clone(&dual_node);
959 let shutdown_clone = Arc::clone(&shutdown);
960
961 let handle = tokio::spawn(async move {
962 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
963 });
964
965 Arc::new(RwLock::new(Some(handle)))
966 };
967
968 let binary_hash = Self::compute_binary_hash();
971
972 let node = Self {
973 config,
974 peer_id,
975 peers,
976 event_tx,
977 listen_addrs: RwLock::new(Vec::new()),
978 start_time: Instant::now(),
979 running: RwLock::new(false),
980 dht,
981 resource_manager,
982 bootstrap_manager,
983 dual_node,
984 rate_limiter,
985 active_connections,
986 security_dashboard,
987 connection_monitor_handle,
988 keepalive_handle,
989 shutdown,
990 geo_provider,
991 entangled_id: None,
993 binary_hash,
994 };
995 info!("Created P2P node with peer ID: {}", node.peer_id);
996
997 node.start_network_listeners().await?;
999
1000 node.start_connection_monitor().await;
1002
1003 Ok(node)
1004 }
1005
1006 pub fn builder() -> NodeBuilder {
1008 NodeBuilder::new()
1009 }
1010
1011 pub fn peer_id(&self) -> &PeerId {
1013 &self.peer_id
1014 }
1015
1016 pub fn local_addr(&self) -> Option<String> {
1017 self.listen_addrs
1018 .try_read()
1019 .ok()
1020 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
1021 }
1022
1023 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1024 info!("Subscribed to topic: {}", topic);
1027 Ok(())
1028 }
1029
1030 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1031 info!(
1032 "Publishing message to topic: {} ({} bytes)",
1033 topic,
1034 data.len()
1035 );
1036
1037 let peer_list: Vec<PeerId> = {
1039 let peers_guard = self.peers.read().await;
1040 peers_guard.keys().cloned().collect()
1041 };
1042
1043 if peer_list.is_empty() {
1044 debug!("No peers connected, message will only be sent to local subscribers");
1045 } else {
1046 let mut send_count = 0;
1048 for peer_id in &peer_list {
1049 match self.send_message(peer_id, topic, data.to_vec()).await {
1050 Ok(_) => {
1051 send_count += 1;
1052 debug!("Sent message to peer: {}", peer_id);
1053 }
1054 Err(e) => {
1055 warn!("Failed to send message to peer {}: {}", peer_id, e);
1056 }
1057 }
1058 }
1059 info!(
1060 "Published message to {}/{} connected peers",
1061 send_count,
1062 peer_list.len()
1063 );
1064 }
1065
1066 let event = P2PEvent::Message {
1068 topic: topic.to_string(),
1069 source: self.peer_id.clone(),
1070 data: data.to_vec(),
1071 };
1072 let _ = self.event_tx.send(event);
1073
1074 Ok(())
1075 }
1076
1077 pub fn config(&self) -> &NodeConfig {
1079 &self.config
1080 }
1081
1082 pub async fn start(&self) -> Result<()> {
1084 info!("Starting P2P node...");
1085
1086 if let Some(ref resource_manager) = self.resource_manager {
1088 resource_manager.start().await.map_err(|e| {
1089 P2PError::Network(crate::error::NetworkError::ProtocolError(
1090 format!("Failed to start resource manager: {e}").into(),
1091 ))
1092 })?;
1093 info!("Production resource manager started");
1094 }
1095
1096 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1098 let mut manager = bootstrap_manager.write().await;
1099 manager.start_background_tasks().await.map_err(|e| {
1100 P2PError::Network(crate::error::NetworkError::ProtocolError(
1101 format!("Failed to start bootstrap manager: {e}").into(),
1102 ))
1103 })?;
1104 info!("Bootstrap cache manager started");
1105 }
1106
1107 *self.running.write().await = true;
1109
1110 self.start_network_listeners().await?;
1112
1113 let listen_addrs = self.listen_addrs.read().await;
1115 info!("P2P node started on addresses: {:?}", *listen_addrs);
1116
1117 self.start_message_receiving_system().await?;
1121
1122 self.connect_bootstrap_peers().await?;
1124
1125 Ok(())
1126 }
1127
1128 async fn start_network_listeners(&self) -> Result<()> {
1130 info!("Starting dual-stack listeners (ant-quic)...");
1131 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1133 P2PError::Transport(crate::error::TransportError::SetupFailed(
1134 format!("Failed to get local addresses: {}", e).into(),
1135 ))
1136 })?;
1137 {
1138 let mut la = self.listen_addrs.write().await;
1139 *la = addrs.clone();
1140 }
1141
1142 let event_tx = self.event_tx.clone();
1144 let peers = self.peers.clone();
1145 let active_connections = self.active_connections.clone();
1146 let rate_limiter = self.rate_limiter.clone();
1147 let dual = self.dual_node.clone();
1148 tokio::spawn(async move {
1149 loop {
1150 match dual.accept_any().await {
1151 Ok((ant_peer_id, remote_sock)) => {
1152 let peer_id =
1153 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1154 let remote_addr = NetworkAddress::from(remote_sock);
1155 let _ = rate_limiter.check_ip(&remote_sock.ip());
1157 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1158 register_new_peer(&peers, &peer_id, &remote_addr).await;
1159 active_connections.write().await.insert(peer_id);
1160 }
1161 Err(e) => {
1162 warn!("Accept failed: {}", e);
1163 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1164 }
1165 }
1166 }
1167 });
1168
1169 info!("Dual-stack listeners active on: {:?}", addrs);
1170 Ok(())
1171 }
1172
1173 #[allow(dead_code)]
1175 async fn start_listener_on_address(&self, addr: std::net::SocketAddr) -> Result<()> {
1176 warn!("QUIC transport temporarily disabled during ant-quic migration");
1215 Err(crate::P2PError::Transport(
1217 crate::error::TransportError::SetupFailed(
1218 format!(
1219 "Failed to start QUIC listener on {addr} - transport disabled during migration"
1220 )
1221 .into(),
1222 ),
1223 ))
1224 }
1225
1226 #[allow(dead_code)] async fn start_connection_acceptor(
1229 &self,
1230 transport: Arc<dyn crate::transport::Transport>,
1231 addr: std::net::SocketAddr,
1232 transport_type: crate::transport::TransportType,
1233 ) -> Result<()> {
1234 info!(
1235 "Starting connection acceptor for {:?} on {}",
1236 transport_type, addr
1237 );
1238
1239 let event_tx = self.event_tx.clone();
1241 let _peer_id = self.peer_id.clone();
1242 let peers = Arc::clone(&self.peers);
1243 let rate_limiter = Arc::clone(&self.rate_limiter);
1246
1247 tokio::spawn(async move {
1249 loop {
1250 match transport.accept().await {
1251 Ok(connection) => {
1252 let remote_addr = connection.remote_addr();
1253 let connection_peer_id =
1254 format!("peer_from_{}", remote_addr.to_string().replace(":", "_"));
1255
1256 let socket_addr = remote_addr.socket_addr();
1258 if check_rate_limit(&rate_limiter, &socket_addr, &remote_addr).is_err() {
1259 continue;
1261 }
1262
1263 info!(
1264 "Accepted {:?} connection from {} (peer: {})",
1265 transport_type, remote_addr, connection_peer_id
1266 );
1267
1268 let _ = event_tx.send(P2PEvent::PeerConnected(connection_peer_id.clone()));
1270
1271 register_new_peer(&peers, &connection_peer_id, &remote_addr).await;
1273
1274 spawn_connection_handler(
1276 connection,
1277 connection_peer_id,
1278 event_tx.clone(),
1279 Arc::clone(&peers),
1280 );
1281 }
1282 Err(e) => {
1283 warn!(
1284 "Failed to accept {:?} connection on {}: {}",
1285 transport_type, addr, e
1286 );
1287
1288 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1290 }
1291 }
1292 }
1293 });
1294
1295 info!(
1296 "Connection acceptor background task started for {:?} on {}",
1297 transport_type, addr
1298 );
1299 Ok(())
1300 }
1301
1302 async fn start_message_receiving_system(&self) -> Result<()> {
1304 info!("Starting message receiving system");
1305 let dual = self.dual_node.clone();
1306 let event_tx = self.event_tx.clone();
1307
1308 tokio::spawn(async move {
1309 loop {
1310 match dual.receive_any().await {
1311 Ok((_peer_id, bytes)) => {
1312 #[allow(clippy::collapsible_if)]
1314 if let Ok(value) = serde_json::from_slice::<serde_json::Value>(&bytes) {
1315 if let (Some(protocol), Some(data), Some(from)) = (
1316 value.get("protocol").and_then(|v| v.as_str()),
1317 value.get("data").and_then(|v| v.as_array()),
1318 value.get("from").and_then(|v| v.as_str()),
1319 ) {
1320 let payload: Vec<u8> = data
1321 .iter()
1322 .filter_map(|v| v.as_u64().map(|n| n as u8))
1323 .collect();
1324 let _ = event_tx.send(P2PEvent::Message {
1325 topic: protocol.to_string(),
1326 source: from.to_string(),
1327 data: payload,
1328 });
1329 }
1330 }
1331 }
1332 Err(e) => {
1333 warn!("Receive error: {}", e);
1334 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1335 }
1336 }
1337 }
1338 });
1339
1340 Ok(())
1341 }
1342
1343 #[allow(dead_code)]
1345 async fn handle_received_message(
1346 &self,
1347 message_data: Vec<u8>,
1348 peer_id: &PeerId,
1349 _protocol: &str,
1350 event_tx: &broadcast::Sender<P2PEvent>,
1351 ) -> Result<()> {
1352 match serde_json::from_slice::<serde_json::Value>(&message_data) {
1356 Ok(message) => {
1357 if let (Some(protocol), Some(data), Some(from)) = (
1358 message.get("protocol").and_then(|v| v.as_str()),
1359 message.get("data").and_then(|v| v.as_array()),
1360 message.get("from").and_then(|v| v.as_str()),
1361 ) {
1362 let data_bytes: Vec<u8> = data
1364 .iter()
1365 .filter_map(|v| v.as_u64().map(|n| n as u8))
1366 .collect();
1367
1368 let event = P2PEvent::Message {
1370 topic: protocol.to_string(),
1371 source: from.to_string(),
1372 data: data_bytes,
1373 };
1374
1375 let _ = event_tx.send(event);
1376 debug!("Generated message event from peer: {}", peer_id);
1377 }
1378 }
1379 Err(e) => {
1380 warn!("Failed to parse received message from {}: {}", peer_id, e);
1381 }
1382 }
1383
1384 Ok(())
1385 }
1386
1387 pub async fn run(&self) -> Result<()> {
1393 if !*self.running.read().await {
1394 self.start().await?;
1395 }
1396
1397 info!("P2P node running...");
1398
1399 loop {
1401 if !*self.running.read().await {
1402 break;
1403 }
1404
1405 self.periodic_tasks().await?;
1407
1408 tokio::time::sleep(Duration::from_millis(100)).await;
1410 }
1411
1412 info!("P2P node stopped");
1413 Ok(())
1414 }
1415
1416 pub async fn stop(&self) -> Result<()> {
1418 info!("Stopping P2P node...");
1419
1420 *self.running.write().await = false;
1422
1423 self.disconnect_all_peers().await?;
1425
1426 if let Some(ref resource_manager) = self.resource_manager {
1428 resource_manager.shutdown().await.map_err(|e| {
1429 P2PError::Network(crate::error::NetworkError::ProtocolError(
1430 format!("Failed to shutdown resource manager: {e}").into(),
1431 ))
1432 })?;
1433 info!("Production resource manager stopped");
1434 }
1435
1436 info!("P2P node stopped");
1437 Ok(())
1438 }
1439
1440 pub async fn shutdown(&self) -> Result<()> {
1442 self.stop().await
1443 }
1444
1445 pub async fn is_running(&self) -> bool {
1447 *self.running.read().await
1448 }
1449
1450 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1452 self.listen_addrs.read().await.clone()
1453 }
1454
1455 pub async fn connected_peers(&self) -> Vec<PeerId> {
1457 self.active_connections
1460 .read()
1461 .await
1462 .iter()
1463 .cloned()
1464 .collect()
1465 }
1466
1467 pub async fn peer_count(&self) -> usize {
1469 self.active_connections.read().await.len()
1470 }
1471
1472 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1474 self.peers.read().await.get(peer_id).cloned()
1475 }
1476
1477 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1489 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1491
1492 let peers = self.peers.read().await;
1493
1494 for (peer_id, peer_info) in peers.iter() {
1496 for peer_addr in &peer_info.addresses {
1498 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1499 && peer_socket == socket_addr
1500 {
1501 return Some(peer_id.clone());
1502 }
1503 }
1504 }
1505
1506 None
1507 }
1508
1509 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1515 let active = self.active_connections.read().await;
1516 let peers = self.peers.read().await;
1517
1518 active
1519 .iter()
1520 .map(|peer_id| {
1521 let addresses = peers
1522 .get(peer_id)
1523 .map(|info| info.addresses.clone())
1524 .unwrap_or_default();
1525 (peer_id.clone(), addresses)
1526 })
1527 .collect()
1528 }
1529
1530 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1542 self.active_connections.write().await.remove(peer_id);
1544 self.peers.write().await.remove(peer_id).is_some()
1546 }
1547
1548 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1561 self.peers.read().await.contains_key(peer_id)
1562 }
1563
1564 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1566 info!("Connecting to peer at: {}", address);
1567
1568 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1570 Some(resource_manager.acquire_connection().await?)
1571 } else {
1572 None
1573 };
1574
1575 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1577 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1578 format!("{}: {}", address, e).into(),
1579 ))
1580 })?;
1581
1582 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1585 if normalized_addr != socket_addr {
1586 info!(
1587 "Normalized wildcard address {} to loopback {}",
1588 socket_addr, normalized_addr
1589 );
1590 }
1591
1592 let addr_list = vec![normalized_addr];
1594 let peer_id = match tokio::time::timeout(
1595 self.config.connection_timeout,
1596 self.dual_node.connect_happy_eyeballs(&addr_list),
1597 )
1598 .await
1599 {
1600 Ok(Ok(peer)) => {
1601 let connected_peer_id =
1602 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer);
1603 info!("Successfully connected to peer: {}", connected_peer_id);
1604
1605 if connected_peer_id == self.peer_id {
1607 warn!(
1608 "Detected self-connection to own address {} (peer_id: {}), rejecting",
1609 address, connected_peer_id
1610 );
1611 return Err(P2PError::Network(
1614 crate::error::NetworkError::InvalidAddress(
1615 format!("Cannot connect to self ({})", address).into(),
1616 ),
1617 ));
1618 }
1619
1620 connected_peer_id
1621 }
1622 Ok(Err(e)) => {
1623 warn!("Failed to connect to peer at {}: {}", address, e);
1624 return Err(P2PError::Transport(
1625 crate::error::TransportError::ConnectionFailed {
1626 addr: normalized_addr,
1627 reason: e.to_string().into(),
1628 },
1629 ));
1630 }
1631 Err(_) => {
1632 warn!(
1633 "Timed out connecting to peer at {} after {:?}",
1634 address, self.config.connection_timeout
1635 );
1636 return Err(P2PError::Timeout(self.config.connection_timeout));
1637 }
1638 };
1639
1640 let peer_info = PeerInfo {
1642 peer_id: peer_id.clone(),
1643 addresses: vec![address.to_string()],
1644 connected_at: Instant::now(),
1645 last_seen: Instant::now(),
1646 status: ConnectionStatus::Connected,
1647 protocols: vec!["p2p-foundation/1.0".to_string()],
1648 heartbeat_count: 0,
1649 };
1650
1651 self.peers.write().await.insert(peer_id.clone(), peer_info);
1653
1654 self.active_connections
1657 .write()
1658 .await
1659 .insert(peer_id.clone());
1660
1661 if let Some(ref resource_manager) = self.resource_manager {
1663 resource_manager.record_bandwidth(0, 0); }
1665
1666 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1668
1669 info!("Connected to peer: {}", peer_id);
1670 Ok(peer_id)
1671 }
1672
1673 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1675 info!("Disconnecting from peer: {}", peer_id);
1676
1677 self.active_connections.write().await.remove(peer_id);
1679
1680 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1681 peer_info.status = ConnectionStatus::Disconnected;
1682
1683 let _ = self
1685 .event_tx
1686 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1687
1688 info!("Disconnected from peer: {}", peer_id);
1689 }
1690
1691 Ok(())
1692 }
1693
1694 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1696 self.active_connections.read().await.contains(peer_id)
1697 }
1698
1699 pub async fn send_message(
1701 &self,
1702 peer_id: &PeerId,
1703 protocol: &str,
1704 data: Vec<u8>,
1705 ) -> Result<()> {
1706 debug!(
1707 "Sending message to peer {} on protocol {}",
1708 peer_id, protocol
1709 );
1710
1711 if let Some(ref resource_manager) = self.resource_manager
1713 && !resource_manager
1714 .check_rate_limit(peer_id, "message")
1715 .await?
1716 {
1717 return Err(P2PError::ResourceExhausted(
1718 format!("Rate limit exceeded for peer {}", peer_id).into(),
1719 ));
1720 }
1721
1722 if !self.peers.read().await.contains_key(peer_id) {
1724 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1725 peer_id.to_string().into(),
1726 )));
1727 }
1728
1729 if !self.is_connection_active(peer_id).await {
1732 debug!(
1733 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1734 peer_id
1735 );
1736
1737 self.remove_peer(peer_id).await;
1739
1740 return Err(P2PError::Network(
1741 crate::error::NetworkError::ConnectionClosed {
1742 peer_id: peer_id.to_string().into(),
1743 },
1744 ));
1745 }
1746
1747 if let Some(ref resource_manager) = self.resource_manager {
1751 resource_manager.record_bandwidth(data.len() as u64, 0);
1752 }
1753
1754 let _message_data = self.create_protocol_message(protocol, data)?;
1756
1757 let send_fut = self.dual_node.send_to_peer_string(peer_id, &_message_data);
1759 tokio::time::timeout(self.config.connection_timeout, send_fut)
1760 .await
1761 .map_err(|_| {
1762 P2PError::Transport(crate::error::TransportError::StreamError(
1763 "Timed out sending message".into(),
1764 ))
1765 })?
1766 .map_err(|e| {
1767 P2PError::Transport(crate::error::TransportError::StreamError(
1768 e.to_string().into(),
1769 ))
1770 })
1771 }
1772
1773 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1775 use serde_json::json;
1776
1777 let timestamp = std::time::SystemTime::now()
1778 .duration_since(std::time::UNIX_EPOCH)
1779 .map_err(|e| {
1780 P2PError::Network(NetworkError::ProtocolError(
1781 format!("System time error: {}", e).into(),
1782 ))
1783 })?
1784 .as_secs();
1785
1786 let message = json!({
1788 "protocol": protocol,
1789 "data": data,
1790 "from": self.peer_id,
1791 "timestamp": timestamp
1792 });
1793
1794 serde_json::to_vec(&message).map_err(|e| {
1795 P2PError::Transport(crate::error::TransportError::StreamError(
1796 format!("Failed to serialize message: {e}").into(),
1797 ))
1798 })
1799 }
1800
1801 }
1803
1804#[allow(dead_code)]
1806fn create_protocol_message_static(protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1807 use serde_json::json;
1808
1809 let timestamp = std::time::SystemTime::now()
1810 .duration_since(std::time::UNIX_EPOCH)
1811 .map_err(|e| {
1812 P2PError::Network(NetworkError::ProtocolError(
1813 format!("System time error: {}", e).into(),
1814 ))
1815 })?
1816 .as_secs();
1817
1818 let message = json!({
1820 "protocol": protocol,
1821 "data": data,
1822 "timestamp": timestamp
1823 });
1824
1825 serde_json::to_vec(&message).map_err(|e| {
1826 P2PError::Transport(crate::error::TransportError::StreamError(
1827 format!("Failed to serialize message: {e}").into(),
1828 ))
1829 })
1830}
1831
1832impl P2PNode {
1833 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1835 self.event_tx.subscribe()
1836 }
1837
1838 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1840 self.subscribe_events()
1841 }
1842
1843 pub fn uptime(&self) -> Duration {
1845 self.start_time.elapsed()
1846 }
1847
1848 fn compute_binary_hash() -> [u8; 32] {
1857 if let Some(hash) = std::env::current_exe()
1859 .ok()
1860 .and_then(|exe_path| std::fs::read(&exe_path).ok())
1861 .map(|binary_data| blake3::hash(&binary_data))
1862 {
1863 return *hash.as_bytes();
1864 }
1865 let placeholder = format!(
1868 "saorsa-core-v{}-{}",
1869 env!("CARGO_PKG_VERSION"),
1870 std::env::consts::ARCH
1871 );
1872 let hash = blake3::hash(placeholder.as_bytes());
1873 *hash.as_bytes()
1874 }
1875
1876 #[must_use]
1878 pub fn binary_hash(&self) -> &[u8; 32] {
1879 &self.binary_hash
1880 }
1881
1882 #[must_use]
1884 pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1885 self.entangled_id.as_ref()
1886 }
1887
1888 pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1893 self.entangled_id = Some(entangled_id);
1894 }
1895
1896 pub fn verify_peer_attestation(
1925 &self,
1926 peer_id: &str,
1927 peer_entangled_id: &crate::attestation::EntangledId,
1928 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1929 ) -> crate::attestation::EnforcementDecision {
1930 use crate::attestation::{
1931 AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1932 };
1933
1934 let config = &self.config.attestation_config;
1935
1936 if !config.enabled {
1938 return EnforcementDecision::Skipped;
1939 }
1940
1941 let id_valid = peer_entangled_id.verify(peer_public_key);
1943
1944 let binary_hash = *peer_entangled_id.binary_hash();
1946 let binary_allowed = config.is_binary_allowed(&binary_hash);
1947
1948 match config.enforcement_mode {
1949 EnforcementMode::Off => EnforcementDecision::Skipped,
1950
1951 EnforcementMode::Soft => {
1952 if !id_valid {
1954 warn!(
1955 peer = %peer_id,
1956 binary_hash = %hex::encode(&binary_hash[..8]),
1957 "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1958 );
1959 return EnforcementDecision::AllowWithWarning {
1960 reason: AttestationRejectionReason::IdentityMismatch,
1961 };
1962 }
1963 if !binary_allowed {
1964 warn!(
1965 peer = %peer_id,
1966 binary_hash = %hex::encode(binary_hash),
1967 "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1968 );
1969 return EnforcementDecision::AllowWithWarning {
1970 reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1971 };
1972 }
1973 EnforcementDecision::Allow
1974 }
1975
1976 EnforcementMode::Hard => {
1977 if !id_valid {
1979 error!(
1980 peer = %peer_id,
1981 binary_hash = %hex::encode(&binary_hash[..8]),
1982 "REJECTING peer: Invalid entangled ID derivation"
1983 );
1984 return EnforcementDecision::Reject {
1985 rejection: AttestationRejection::identity_mismatch(),
1986 };
1987 }
1988 if !binary_allowed {
1989 error!(
1990 peer = %peer_id,
1991 binary_hash = %hex::encode(binary_hash),
1992 "REJECTING peer: Binary not in allowlist"
1993 );
1994 return EnforcementDecision::Reject {
1995 rejection: AttestationRejection::binary_not_allowed(binary_hash),
1996 };
1997 }
1998
1999 info!(
2000 peer = %peer_id,
2001 entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
2002 "Peer attestation verified successfully (hard mode)"
2003 );
2004 EnforcementDecision::Allow
2005 }
2006 }
2007 }
2008
2009 #[must_use]
2017 pub fn verify_peer_attestation_simple(
2018 &self,
2019 peer_id: &str,
2020 peer_entangled_id: &crate::attestation::EntangledId,
2021 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
2022 ) -> bool {
2023 self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
2024 .should_allow()
2025 }
2026
2027 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2037 if let Some(ref resource_manager) = self.resource_manager {
2038 Ok(resource_manager.get_metrics().await)
2039 } else {
2040 Err(P2PError::Network(
2041 crate::error::NetworkError::ProtocolError(
2042 "Production resource manager not enabled".to_string().into(),
2043 ),
2044 ))
2045 }
2046 }
2047
2048 #[allow(clippy::too_many_arguments)]
2054 async fn connection_lifecycle_monitor_with_rx(
2055 _dual_node: Arc<DualStackNetworkNode>,
2056 mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
2057 active_connections: Arc<RwLock<HashSet<String>>>,
2058 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2059 event_tx: broadcast::Sender<P2PEvent>,
2060 geo_provider: Arc<BgpGeoProvider>,
2061 _local_peer_id: String,
2062 ) {
2063 use crate::transport::ant_quic_adapter::ConnectionEvent;
2064
2065 info!("Connection lifecycle monitor started (pre-subscribed receiver)");
2066
2067 loop {
2068 match event_rx.recv().await {
2069 Ok(event) => {
2070 match event {
2071 ConnectionEvent::Established {
2072 peer_id,
2073 remote_address,
2074 } => {
2075 let peer_id_str =
2076 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2077 debug!(
2078 "Connection established: peer={}, addr={}",
2079 peer_id_str, remote_address
2080 );
2081
2082 let ip = remote_address.ip();
2084 let is_rejected = match ip {
2085 std::net::IpAddr::V4(v4) => {
2086 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2087 geo_provider.is_hosting_asn(asn)
2088 || geo_provider.is_vpn_asn(asn)
2089 } else {
2090 false
2091 }
2092 }
2093 std::net::IpAddr::V6(v6) => {
2094 let info = geo_provider.lookup(v6);
2095 info.is_hosting_provider || info.is_vpn_provider
2096 }
2097 };
2098
2099 if is_rejected {
2100 info!(
2101 "Rejecting connection from {} ({}) due to GeoIP policy",
2102 peer_id_str, remote_address
2103 );
2104 continue;
2105 }
2106
2107 active_connections.write().await.insert(peer_id_str.clone());
2109
2110 let mut peers_lock = peers.write().await;
2112 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2113 peer_info.status = ConnectionStatus::Connected;
2114 peer_info.connected_at = Instant::now();
2115 } else {
2116 debug!("Registering new incoming peer: {}", peer_id_str);
2117 peers_lock.insert(
2118 peer_id_str.clone(),
2119 PeerInfo {
2120 peer_id: peer_id_str.clone(),
2121 addresses: vec![remote_address.to_string()],
2122 status: ConnectionStatus::Connected,
2123 last_seen: Instant::now(),
2124 connected_at: Instant::now(),
2125 protocols: Vec::new(),
2126 heartbeat_count: 0,
2127 },
2128 );
2129 }
2130
2131 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2133 }
2134 ConnectionEvent::Lost { peer_id, reason } => {
2135 let peer_id_str =
2136 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2137 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2138
2139 active_connections.write().await.remove(&peer_id_str);
2141
2142 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2144 peer_info.status = ConnectionStatus::Disconnected;
2145 peer_info.last_seen = Instant::now();
2146 }
2147
2148 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2150 }
2151 ConnectionEvent::Failed { peer_id, reason } => {
2152 let peer_id_str =
2153 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2154 debug!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2155
2156 active_connections.write().await.remove(&peer_id_str);
2158
2159 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2161 peer_info.status = ConnectionStatus::Disconnected;
2162 peer_info.last_seen = Instant::now();
2163 }
2164
2165 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2167 }
2168 }
2169 }
2170 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2171 warn!(
2172 "Connection event receiver lagged, skipped {} events",
2173 skipped
2174 );
2175 }
2176 Err(broadcast::error::RecvError::Closed) => {
2177 info!("Connection event channel closed, stopping lifecycle monitor");
2178 break;
2179 }
2180 }
2181 }
2182 }
2183
2184 #[allow(dead_code)]
2189 async fn connection_lifecycle_monitor(
2190 dual_node: Arc<DualStackNetworkNode>,
2191 active_connections: Arc<RwLock<HashSet<String>>>,
2192 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2193 event_tx: broadcast::Sender<P2PEvent>,
2194 geo_provider: Arc<BgpGeoProvider>,
2195 local_peer_id: String,
2196 ) {
2197 use crate::transport::ant_quic_adapter::ConnectionEvent;
2198
2199 let mut event_rx = dual_node.subscribe_connection_events();
2200
2201 info!("Connection lifecycle monitor started");
2202
2203 loop {
2204 match event_rx.recv().await {
2205 Ok(event) => {
2206 match event {
2207 ConnectionEvent::Established {
2208 peer_id,
2209 remote_address,
2210 } => {
2211 let peer_id_str =
2212 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2213 debug!(
2214 "Connection established: peer={}, addr={}",
2215 peer_id_str, remote_address
2216 );
2217
2218 let ip = remote_address.ip();
2221 let is_rejected = match ip {
2222 std::net::IpAddr::V4(v4) => {
2223 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2225 geo_provider.is_hosting_asn(asn)
2226 || geo_provider.is_vpn_asn(asn)
2227 } else {
2228 false
2229 }
2230 }
2231 std::net::IpAddr::V6(v6) => {
2232 let info = geo_provider.lookup(v6);
2233 info.is_hosting_provider || info.is_vpn_provider
2234 }
2235 };
2236
2237 if is_rejected {
2238 info!(
2239 "Rejecting connection from {} ({}) due to GeoIP policy (Hosting/VPN)",
2240 peer_id_str, remote_address
2241 );
2242
2243 let rejection = RejectionMessage {
2245 reason: RejectionReason::GeoIpPolicy,
2246 message:
2247 "Connection rejected: Hosting/VPN providers not allowed"
2248 .to_string(),
2249 suggested_target: None, };
2251
2252 if let Ok(data) = serde_json::to_vec(&rejection) {
2254 let timestamp = std::time::SystemTime::now()
2256 .duration_since(std::time::UNIX_EPOCH)
2257 .unwrap_or_default()
2258 .as_secs();
2259
2260 let message = serde_json::json!({
2261 "protocol": "control",
2262 "data": data,
2263 "from": local_peer_id,
2264 "timestamp": timestamp
2265 });
2266
2267 if let Ok(msg_bytes) = serde_json::to_vec(&message) {
2268 let _ = dual_node.send_to_peer(&peer_id, &msg_bytes).await;
2272
2273 tokio::task::yield_now().await;
2276 }
2277 }
2278
2279 continue;
2283 }
2284
2285 active_connections.write().await.insert(peer_id_str.clone());
2287
2288 let mut peers_lock = peers.write().await;
2290 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2291 peer_info.status = ConnectionStatus::Connected;
2292 peer_info.connected_at = Instant::now();
2293 } else {
2294 debug!("Registering new incoming peer: {}", peer_id_str);
2296 peers_lock.insert(
2297 peer_id_str.clone(),
2298 PeerInfo {
2299 peer_id: peer_id_str.clone(),
2300 addresses: vec![remote_address.to_string()],
2301 status: ConnectionStatus::Connected,
2302 last_seen: Instant::now(),
2303 connected_at: Instant::now(),
2304 protocols: Vec::new(),
2305 heartbeat_count: 0,
2306 },
2307 );
2308 }
2309
2310 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2312 }
2313 ConnectionEvent::Lost { peer_id, reason } => {
2314 let peer_id_str =
2315 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2316 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2317
2318 active_connections.write().await.remove(&peer_id_str);
2320
2321 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2323 peer_info.status = ConnectionStatus::Disconnected;
2324 peer_info.last_seen = Instant::now();
2325 }
2326
2327 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2329 }
2330 ConnectionEvent::Failed { peer_id, reason } => {
2331 let peer_id_str =
2332 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
2333 warn!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2334
2335 active_connections.write().await.remove(&peer_id_str);
2337
2338 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2340 peer_info.status = ConnectionStatus::Failed(reason.clone());
2341 }
2342
2343 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2345 }
2346 }
2347 }
2348 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2349 warn!(
2350 "Connection event monitor lagged, skipped {} events",
2351 skipped
2352 );
2353 continue;
2354 }
2355 Err(broadcast::error::RecvError::Closed) => {
2356 info!("Connection event channel closed, stopping monitor");
2357 break;
2358 }
2359 }
2360 }
2361
2362 info!("Connection lifecycle monitor stopped");
2363 }
2364
2365 async fn start_connection_monitor(&self) {
2367 debug!("Connection monitor already running from initialization");
2371 }
2372
2373 async fn keepalive_task(
2379 active_connections: Arc<RwLock<HashSet<String>>>,
2380 dual_node: Arc<DualStackNetworkNode>,
2381 shutdown: Arc<AtomicBool>,
2382 ) {
2383 use tokio::time::{Duration, interval};
2384
2385 const KEEPALIVE_INTERVAL_SECS: u64 = 15; const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2389 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2390
2391 info!(
2392 "Keepalive task started (interval: {}s)",
2393 KEEPALIVE_INTERVAL_SECS
2394 );
2395
2396 loop {
2397 if shutdown.load(Ordering::Relaxed) {
2399 info!("Keepalive task shutting down");
2400 break;
2401 }
2402
2403 interval.tick().await;
2404
2405 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2407
2408 if peers.is_empty() {
2409 trace!("Keepalive: no active connections");
2410 continue;
2411 }
2412
2413 debug!("Sending keepalive to {} active connections", peers.len());
2414
2415 for peer_id in peers {
2417 match dual_node
2418 .send_to_peer_string(&peer_id, KEEPALIVE_PAYLOAD)
2419 .await
2420 {
2421 Ok(_) => {
2422 trace!("Keepalive sent to peer: {}", peer_id);
2423 }
2424 Err(e) => {
2425 debug!(
2426 "Failed to send keepalive to peer {}: {} (connection may have closed)",
2427 peer_id, e
2428 );
2429 }
2431 }
2432 }
2433 }
2434
2435 info!("Keepalive task stopped");
2436 }
2437
2438 pub async fn health_check(&self) -> Result<()> {
2440 if let Some(ref resource_manager) = self.resource_manager {
2441 resource_manager.health_check().await
2442 } else {
2443 let peer_count = self.peer_count().await;
2445 if peer_count > self.config.max_connections {
2446 Err(P2PError::Network(
2447 crate::error::NetworkError::ProtocolError(
2448 format!("Too many connections: {peer_count}").into(),
2449 ),
2450 ))
2451 } else {
2452 Ok(())
2453 }
2454 }
2455 }
2456
2457 pub fn production_config(&self) -> Option<&ProductionConfig> {
2459 self.config.production_config.as_ref()
2460 }
2461
2462 pub fn is_production_mode(&self) -> bool {
2464 self.resource_manager.is_some()
2465 }
2466
2467 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2469 self.dht.as_ref()
2470 }
2471
2472 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2474 if let Some(ref dht) = self.dht {
2475 let mut dht_instance = dht.write().await;
2476 let dht_key = crate::dht::DhtKey::from_bytes(key);
2477 dht_instance
2478 .store(&dht_key, value.clone())
2479 .await
2480 .map_err(|e| {
2481 P2PError::Dht(crate::error::DhtError::StoreFailed(
2482 format!("{:?}: {e}", key).into(),
2483 ))
2484 })?;
2485
2486 Ok(())
2487 } else {
2488 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2489 "DHT not enabled".to_string().into(),
2490 )))
2491 }
2492 }
2493
2494 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2496 if let Some(ref dht) = self.dht {
2497 let dht_instance = dht.read().await;
2498 let dht_key = crate::dht::DhtKey::from_bytes(key);
2499 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2500 P2PError::Dht(crate::error::DhtError::StoreFailed(
2501 format!("Retrieve failed: {e}").into(),
2502 ))
2503 })?;
2504
2505 Ok(record_result)
2506 } else {
2507 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2508 "DHT not enabled".to_string().into(),
2509 )))
2510 }
2511 }
2512
2513 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2515 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2516 let manager = bootstrap_manager.write().await;
2517 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2518 .iter()
2519 .filter_map(|addr| addr.parse().ok())
2520 .collect();
2521 let contact = ContactEntry::new(peer_id, socket_addresses);
2522 manager.add_contact(contact).await.map_err(|e| {
2523 P2PError::Network(crate::error::NetworkError::ProtocolError(
2524 format!("Failed to add peer to bootstrap cache: {e}").into(),
2525 ))
2526 })?;
2527 }
2528 Ok(())
2529 }
2530
2531 pub async fn update_peer_metrics(
2533 &self,
2534 peer_id: &PeerId,
2535 success: bool,
2536 latency_ms: Option<u64>,
2537 _error: Option<String>,
2538 ) -> Result<()> {
2539 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2540 let manager = bootstrap_manager.write().await;
2541
2542 let metrics = QualityMetrics {
2544 success_rate: if success { 1.0 } else { 0.0 },
2545 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2546 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2548 last_successful_connection: if success {
2549 chrono::Utc::now()
2550 } else {
2551 chrono::Utc::now() - chrono::Duration::hours(1)
2552 },
2553 uptime_score: 0.5,
2554 };
2555
2556 manager
2557 .update_contact_metrics(peer_id, metrics)
2558 .await
2559 .map_err(|e| {
2560 P2PError::Network(crate::error::NetworkError::ProtocolError(
2561 format!("Failed to update peer metrics: {e}").into(),
2562 ))
2563 })?;
2564 }
2565 Ok(())
2566 }
2567
2568 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2570 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2571 let manager = bootstrap_manager.read().await;
2572 let stats = manager.get_stats().await.map_err(|e| {
2573 P2PError::Network(crate::error::NetworkError::ProtocolError(
2574 format!("Failed to get bootstrap stats: {e}").into(),
2575 ))
2576 })?;
2577 Ok(Some(stats))
2578 } else {
2579 Ok(None)
2580 }
2581 }
2582
2583 pub async fn cached_peer_count(&self) -> usize {
2585 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2586 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2587 {
2588 return stats.total_contacts;
2589 }
2590 0
2591 }
2592
2593 async fn connect_bootstrap_peers(&self) -> Result<()> {
2595 let mut bootstrap_contacts = Vec::new();
2596 let mut used_cache = false;
2597 let mut seen_addresses = std::collections::HashSet::new();
2598
2599 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2601 self.config.bootstrap_peers_str.clone()
2602 } else {
2603 self.config
2605 .bootstrap_peers
2606 .iter()
2607 .map(|addr| addr.to_string())
2608 .collect::<Vec<_>>()
2609 };
2610
2611 if !cli_bootstrap_peers.is_empty() {
2612 info!(
2613 "Using {} CLI-provided bootstrap peers (priority)",
2614 cli_bootstrap_peers.len()
2615 );
2616 for addr in &cli_bootstrap_peers {
2617 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2618 seen_addresses.insert(socket_addr);
2619 let contact = ContactEntry::new(
2620 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2621 vec![socket_addr],
2622 );
2623 bootstrap_contacts.push(contact);
2624 } else {
2625 warn!("Invalid bootstrap address format: {}", addr);
2626 }
2627 }
2628 }
2629
2630 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2633 let manager = bootstrap_manager.read().await;
2634 match manager.get_quic_bootstrap_peers(20).await {
2635 Ok(contacts) => {
2637 if !contacts.is_empty() {
2638 let mut added_from_cache = 0;
2639 for contact in contacts {
2640 let new_addresses: Vec<_> = contact
2642 .addresses
2643 .iter()
2644 .filter(|addr| !seen_addresses.contains(addr))
2645 .copied()
2646 .collect();
2647
2648 if !new_addresses.is_empty() {
2649 for addr in &new_addresses {
2650 seen_addresses.insert(*addr);
2651 }
2652 let mut contact = contact.clone();
2653 contact.addresses = new_addresses;
2654 bootstrap_contacts.push(contact);
2655 added_from_cache += 1;
2656 }
2657 }
2658 if added_from_cache > 0 {
2659 info!(
2660 "Added {} cached bootstrap peers (supplementing CLI peers)",
2661 added_from_cache
2662 );
2663 used_cache = true;
2664 }
2665 }
2666 }
2667 Err(e) => {
2668 warn!("Failed to get cached bootstrap peers: {}", e);
2669 }
2670 }
2671 }
2672
2673 if bootstrap_contacts.is_empty() {
2674 info!("No bootstrap peers configured and no cached peers available");
2675 return Ok(());
2676 }
2677
2678 let mut successful_connections = 0;
2680 for contact in bootstrap_contacts {
2681 for addr in &contact.addresses {
2682 match self.connect_peer(&addr.to_string()).await {
2683 Ok(peer_id) => {
2684 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2685 successful_connections += 1;
2686
2687 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2689 let manager = bootstrap_manager.write().await;
2690 let mut updated_contact = contact.clone();
2691 updated_contact.peer_id = peer_id.clone();
2692 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2695 warn!("Failed to update bootstrap cache: {}", e);
2696 }
2697 }
2698 break; }
2700 Err(e) => {
2701 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2702
2703 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2705 let manager = bootstrap_manager.write().await;
2706 let mut updated_contact = contact.clone();
2707 updated_contact.update_connection_result(
2708 false,
2709 None,
2710 Some(e.to_string()),
2711 );
2712
2713 if let Err(e) = manager.add_contact(updated_contact).await {
2714 warn!("Failed to update bootstrap cache: {}", e);
2715 }
2716 }
2717 }
2718 }
2719 }
2720 }
2721
2722 if successful_connections == 0 {
2723 if !used_cache {
2724 warn!("Failed to connect to any bootstrap peers");
2725 }
2726 return Ok(());
2729 }
2730 info!(
2731 "Successfully connected to {} bootstrap peers",
2732 successful_connections
2733 );
2734
2735 Ok(())
2736 }
2737
2738 async fn disconnect_all_peers(&self) -> Result<()> {
2740 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2741
2742 for peer_id in peer_ids {
2743 self.disconnect_peer(&peer_id).await?;
2744 }
2745
2746 Ok(())
2747 }
2748
2749 async fn periodic_tasks(&self) -> Result<()> {
2751 Ok(())
2757 }
2758}
2759
2760#[async_trait::async_trait]
2762pub trait NetworkSender: Send + Sync {
2763 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2765
2766 fn local_peer_id(&self) -> &PeerId;
2768}
2769
2770#[derive(Clone)]
2772pub struct P2PNetworkSender {
2773 peer_id: PeerId,
2774 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2776}
2777
2778impl P2PNetworkSender {
2779 pub fn new(
2780 peer_id: PeerId,
2781 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2782 ) -> Self {
2783 Self { peer_id, send_tx }
2784 }
2785}
2786
2787#[async_trait::async_trait]
2789impl NetworkSender for P2PNetworkSender {
2790 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2792 self.send_tx
2793 .send((peer_id.clone(), protocol.to_string(), data))
2794 .map_err(|_| {
2795 P2PError::Network(crate::error::NetworkError::ProtocolError(
2796 "Failed to send message via channel".to_string().into(),
2797 ))
2798 })?;
2799 Ok(())
2800 }
2801
2802 fn local_peer_id(&self) -> &PeerId {
2804 &self.peer_id
2805 }
2806}
2807
2808pub struct NodeBuilder {
2810 config: NodeConfig,
2811}
2812
2813impl Default for NodeBuilder {
2814 fn default() -> Self {
2815 Self::new()
2816 }
2817}
2818
2819impl NodeBuilder {
2820 pub fn new() -> Self {
2822 Self {
2823 config: NodeConfig::default(),
2824 }
2825 }
2826
2827 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2829 self.config.peer_id = Some(peer_id);
2830 self
2831 }
2832
2833 pub fn listen_on(mut self, addr: &str) -> Self {
2835 if let Ok(multiaddr) = addr.parse() {
2836 self.config.listen_addrs.push(multiaddr);
2837 }
2838 self
2839 }
2840
2841 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2843 if let Ok(multiaddr) = addr.parse() {
2844 self.config.bootstrap_peers.push(multiaddr);
2845 }
2846 self.config.bootstrap_peers_str.push(addr.to_string());
2847 self
2848 }
2849
2850 pub fn with_ipv6(mut self, enable: bool) -> Self {
2852 self.config.enable_ipv6 = enable;
2853 self
2854 }
2855
2856 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2860 self.config.connection_timeout = timeout;
2861 self
2862 }
2863
2864 pub fn with_max_connections(mut self, max: usize) -> Self {
2866 self.config.max_connections = max;
2867 self
2868 }
2869
2870 pub fn with_production_mode(mut self) -> Self {
2872 self.config.production_config = Some(ProductionConfig::default());
2873 self
2874 }
2875
2876 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2878 self.config.production_config = Some(production_config);
2879 self
2880 }
2881
2882 pub fn with_diversity_config(
2884 mut self,
2885 diversity_config: crate::security::IPDiversityConfig,
2886 ) -> Self {
2887 self.config.diversity_config = Some(diversity_config);
2888 self
2889 }
2890
2891 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2893 self.config.dht_config = dht_config;
2894 self
2895 }
2896
2897 pub fn with_default_dht(mut self) -> Self {
2899 self.config.dht_config = DHTConfig::default();
2900 self
2901 }
2902
2903 pub async fn build(self) -> Result<P2PNode> {
2905 P2PNode::new(self.config).await
2906 }
2907}
2908
2909#[cfg(test)]
2910#[allow(clippy::unwrap_used, clippy::expect_used)]
2911mod diversity_tests {
2912 use super::*;
2913 use crate::security::IPDiversityConfig;
2914
2915 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2916 let diversity_config = config.diversity_config.clone().unwrap_or_default();
2917 let temp_dir = tempfile::TempDir::new().expect("temp dir");
2919 let mut cache_config = config
2920 .bootstrap_cache_config
2921 .clone()
2922 .unwrap_or_else(crate::bootstrap::CacheConfig::default);
2923 cache_config.cache_dir = temp_dir.path().to_path_buf();
2924
2925 BootstrapManager::with_full_config(
2926 cache_config,
2927 crate::rate_limit::JoinRateLimiterConfig::default(),
2928 diversity_config,
2929 )
2930 .await
2931 .expect("bootstrap manager")
2932 }
2933
2934 #[tokio::test]
2935 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2936 let config = NodeConfig {
2937 diversity_config: Some(IPDiversityConfig::testnet()),
2938 ..Default::default()
2939 };
2940
2941 let manager = build_bootstrap_manager_like_prod(&config).await;
2942 assert!(manager.diversity_config().is_relaxed());
2943 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2944 }
2945}
2946
2947#[allow(dead_code)] async fn handle_received_message_standalone(
2950 message_data: Vec<u8>,
2951 peer_id: &PeerId,
2952 _protocol: &str,
2953 event_tx: &broadcast::Sender<P2PEvent>,
2954) -> Result<()> {
2955 match serde_json::from_slice::<serde_json::Value>(&message_data) {
2957 Ok(message) => {
2958 if let (Some(protocol), Some(data), Some(from)) = (
2959 message.get("protocol").and_then(|v| v.as_str()),
2960 message.get("data").and_then(|v| v.as_array()),
2961 message.get("from").and_then(|v| v.as_str()),
2962 ) {
2963 let data_bytes: Vec<u8> = data
2965 .iter()
2966 .filter_map(|v| v.as_u64().map(|n| n as u8))
2967 .collect();
2968
2969 let event = P2PEvent::Message {
2971 topic: protocol.to_string(),
2972 source: from.to_string(),
2973 data: data_bytes,
2974 };
2975
2976 let _ = event_tx.send(event);
2977 debug!("Generated message event from peer: {}", peer_id);
2978 }
2979 }
2980 Err(e) => {
2981 warn!("Failed to parse received message from {}: {}", peer_id, e);
2982 }
2983 }
2984
2985 Ok(())
2986}
2987
2988#[allow(dead_code)]
2992fn handle_protocol_message_creation(protocol: &str, data: Vec<u8>) -> Option<Vec<u8>> {
2993 match create_protocol_message_static(protocol, data) {
2994 Ok(msg) => Some(msg),
2995 Err(e) => {
2996 warn!("Failed to create protocol message: {}", e);
2997 None
2998 }
2999 }
3000}
3001
3002#[allow(dead_code)]
3004async fn handle_message_send_result(result: crate::error::P2pResult<()>, peer_id: &PeerId) {
3005 match result {
3006 Ok(_) => {
3007 debug!("Message sent to peer {} via transport layer", peer_id);
3008 }
3009 Err(e) => {
3010 warn!("Failed to send message to peer {}: {}", peer_id, e);
3011 }
3012 }
3013}
3014
3015#[allow(dead_code)] fn check_rate_limit(
3018 rate_limiter: &RateLimiter,
3019 socket_addr: &std::net::SocketAddr,
3020 remote_addr: &NetworkAddress,
3021) -> Result<()> {
3022 rate_limiter.check_ip(&socket_addr.ip()).map_err(|e| {
3023 warn!("Rate limit exceeded for {}: {}", remote_addr, e);
3024 e
3025 })
3026}
3027
3028#[allow(dead_code)] async fn register_new_peer(
3031 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3032 peer_id: &PeerId,
3033 remote_addr: &NetworkAddress,
3034) {
3035 let mut peers_guard = peers.write().await;
3036 let peer_info = PeerInfo {
3037 peer_id: peer_id.clone(),
3038 addresses: vec![remote_addr.to_string()],
3039 connected_at: tokio::time::Instant::now(),
3040 last_seen: tokio::time::Instant::now(),
3041 status: ConnectionStatus::Connected,
3042 protocols: vec!["p2p-chat/1.0.0".to_string()],
3043 heartbeat_count: 0,
3044 };
3045 peers_guard.insert(peer_id.clone(), peer_info);
3046}
3047
3048#[allow(dead_code)] fn spawn_connection_handler(
3051 connection: Box<dyn crate::transport::Connection>,
3052 peer_id: PeerId,
3053 event_tx: broadcast::Sender<P2PEvent>,
3054 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3055) {
3056 tokio::spawn(async move {
3057 handle_peer_connection(connection, peer_id, event_tx, peers).await;
3058 });
3059}
3060
3061#[allow(dead_code)] async fn handle_peer_connection(
3064 mut connection: Box<dyn crate::transport::Connection>,
3065 peer_id: PeerId,
3066 event_tx: broadcast::Sender<P2PEvent>,
3067 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3068) {
3069 loop {
3070 match connection.receive().await {
3071 Ok(message_data) => {
3072 debug!(
3073 "Received {} bytes from peer: {}",
3074 message_data.len(),
3075 peer_id
3076 );
3077
3078 if let Err(e) = handle_received_message_standalone(
3080 message_data,
3081 &peer_id,
3082 "unknown", &event_tx,
3084 )
3085 .await
3086 {
3087 warn!("Failed to handle message from peer {}: {}", peer_id, e);
3088 }
3089 }
3090 Err(e) => {
3091 warn!("Failed to receive message from {}: {}", peer_id, e);
3092
3093 if !connection.is_alive().await {
3095 info!("Connection to {} is dead, removing peer", peer_id);
3096
3097 remove_peer(&peers, &peer_id).await;
3099
3100 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
3102
3103 break; }
3105
3106 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
3108 }
3109 }
3110 }
3111}
3112
3113#[allow(dead_code)] async fn remove_peer(peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>, peer_id: &PeerId) {
3116 let mut peers_guard = peers.write().await;
3117 peers_guard.remove(peer_id);
3118}
3119
3120#[allow(dead_code)]
3122async fn update_peer_heartbeat(
3123 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3124 peer_id: &PeerId,
3125) -> Result<()> {
3126 let mut peers_guard = peers.write().await;
3127 match peers_guard.get_mut(peer_id) {
3128 Some(peer_info) => {
3129 peer_info.last_seen = Instant::now();
3130 peer_info.heartbeat_count += 1;
3131 Ok(())
3132 }
3133 None => {
3134 warn!("Received heartbeat from unknown peer: {}", peer_id);
3135 Err(P2PError::Network(NetworkError::PeerNotFound(
3136 format!("Peer {} not found", peer_id).into(),
3137 )))
3138 }
3139 }
3140}
3141
3142#[allow(dead_code)]
3144async fn get_resource_metrics(resource_manager: &Option<Arc<ResourceManager>>) -> (u64, f64) {
3145 if let Some(manager) = resource_manager {
3146 let metrics = manager.get_metrics().await;
3147 (metrics.memory_used, metrics.cpu_usage)
3148 } else {
3149 (0, 0.0)
3150 }
3151}
3152
3153#[cfg(test)]
3154mod tests {
3155 use super::*;
3156 use std::time::Duration;
3158 use tokio::time::timeout;
3159
3160 fn create_test_node_config() -> NodeConfig {
3166 NodeConfig {
3167 peer_id: Some("test_peer_123".to_string()),
3168 listen_addrs: vec![
3169 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
3170 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
3171 ],
3172 listen_addr: std::net::SocketAddr::new(
3173 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3174 0,
3175 ),
3176 bootstrap_peers: vec![],
3177 bootstrap_peers_str: vec![],
3178 enable_ipv6: true,
3179
3180 connection_timeout: Duration::from_secs(2),
3181 keep_alive_interval: Duration::from_secs(30),
3182 max_connections: 100,
3183 max_incoming_connections: 50,
3184 dht_config: DHTConfig::default(),
3185 security_config: SecurityConfig::default(),
3186 production_config: None,
3187 bootstrap_cache_config: None,
3188 diversity_config: None,
3189 attestation_config: crate::attestation::AttestationConfig::default(),
3190 }
3191 }
3192
3193 #[tokio::test]
3197 async fn test_node_config_default() {
3198 let config = NodeConfig::default();
3199
3200 assert!(config.peer_id.is_none());
3201 assert_eq!(config.listen_addrs.len(), 2);
3202 assert!(config.enable_ipv6);
3203 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
3205 assert_eq!(config.connection_timeout, Duration::from_secs(30));
3206 }
3207
3208 #[tokio::test]
3209 async fn test_dht_config_default() {
3210 let config = DHTConfig::default();
3211
3212 assert_eq!(config.k_value, 20);
3213 assert_eq!(config.alpha_value, 5);
3214 assert_eq!(config.record_ttl, Duration::from_secs(3600));
3215 assert_eq!(config.refresh_interval, Duration::from_secs(600));
3216 }
3217
3218 #[tokio::test]
3219 async fn test_security_config_default() {
3220 let config = SecurityConfig::default();
3221
3222 assert!(config.enable_noise);
3223 assert!(config.enable_tls);
3224 assert_eq!(config.trust_level, TrustLevel::Basic);
3225 }
3226
3227 #[test]
3228 fn test_trust_level_variants() {
3229 let _none = TrustLevel::None;
3231 let _basic = TrustLevel::Basic;
3232 let _full = TrustLevel::Full;
3233
3234 assert_eq!(TrustLevel::None, TrustLevel::None);
3236 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3237 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3238 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3239 }
3240
3241 #[test]
3242 fn test_connection_status_variants() {
3243 let connecting = ConnectionStatus::Connecting;
3244 let connected = ConnectionStatus::Connected;
3245 let disconnecting = ConnectionStatus::Disconnecting;
3246 let disconnected = ConnectionStatus::Disconnected;
3247 let failed = ConnectionStatus::Failed("test error".to_string());
3248
3249 assert_eq!(connecting, ConnectionStatus::Connecting);
3250 assert_eq!(connected, ConnectionStatus::Connected);
3251 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3252 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3253 assert_ne!(connecting, connected);
3254
3255 if let ConnectionStatus::Failed(msg) = failed {
3256 assert_eq!(msg, "test error");
3257 } else {
3258 panic!("Expected Failed status");
3259 }
3260 }
3261
3262 #[tokio::test]
3263 async fn test_node_creation() -> Result<()> {
3264 let config = create_test_node_config();
3265 let node = P2PNode::new(config).await?;
3266
3267 assert_eq!(node.peer_id(), "test_peer_123");
3268 assert!(!node.is_running().await);
3269 assert_eq!(node.peer_count().await, 0);
3270 assert!(node.connected_peers().await.is_empty());
3271
3272 Ok(())
3273 }
3274
3275 #[tokio::test]
3276 async fn test_node_creation_without_peer_id() -> Result<()> {
3277 let mut config = create_test_node_config();
3278 config.peer_id = None;
3279
3280 let node = P2PNode::new(config).await?;
3281
3282 assert!(node.peer_id().starts_with("peer_"));
3284 assert!(!node.is_running().await);
3285
3286 Ok(())
3287 }
3288
3289 #[tokio::test]
3290 async fn test_node_lifecycle() -> Result<()> {
3291 let config = create_test_node_config();
3292 let node = P2PNode::new(config).await?;
3293
3294 assert!(!node.is_running().await);
3296
3297 node.start().await?;
3299 assert!(node.is_running().await);
3300
3301 let listen_addrs = node.listen_addrs().await;
3303 assert!(
3304 !listen_addrs.is_empty(),
3305 "Expected at least one listening address"
3306 );
3307
3308 node.stop().await?;
3310 assert!(!node.is_running().await);
3311
3312 Ok(())
3313 }
3314
3315 #[tokio::test]
3316 async fn test_peer_connection() -> Result<()> {
3317 let config1 = create_test_node_config();
3318 let mut config2 = create_test_node_config();
3319 config2.peer_id = Some("test_peer_456".to_string());
3320
3321 let node1 = P2PNode::new(config1).await?;
3322 let node2 = P2PNode::new(config2).await?;
3323
3324 node1.start().await?;
3325 node2.start().await?;
3326
3327 let node2_addr = node2
3328 .listen_addrs()
3329 .await
3330 .into_iter()
3331 .find(|a| a.ip().is_ipv4())
3332 .ok_or_else(|| {
3333 P2PError::Network(crate::error::NetworkError::InvalidAddress(
3334 "Node 2 did not expose an IPv4 listen address".into(),
3335 ))
3336 })?;
3337
3338 let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3340
3341 assert_eq!(node1.peer_count().await, 1);
3343
3344 let connected_peers = node1.connected_peers().await;
3346 assert_eq!(connected_peers.len(), 1);
3347 assert_eq!(connected_peers[0], peer_id);
3348
3349 let peer_info = node1.peer_info(&peer_id).await;
3351 assert!(peer_info.is_some());
3352 let info = peer_info.expect("Peer info should exist after adding peer");
3353 assert_eq!(info.peer_id, peer_id);
3354 assert_eq!(info.status, ConnectionStatus::Connected);
3355 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3356
3357 node1.disconnect_peer(&peer_id).await?;
3359 assert_eq!(node1.peer_count().await, 0);
3360
3361 node1.stop().await?;
3362 node2.stop().await?;
3363
3364 Ok(())
3365 }
3366
3367 #[cfg_attr(target_os = "windows", ignore)]
3374 #[tokio::test]
3375 async fn test_event_subscription() -> Result<()> {
3376 let ipv4_localhost =
3381 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3382
3383 let mut config1 = create_test_node_config();
3384 config1.listen_addr = ipv4_localhost;
3385 config1.listen_addrs = vec![ipv4_localhost];
3386 config1.enable_ipv6 = false;
3387
3388 let mut config2 = create_test_node_config();
3389 config2.peer_id = Some("test_peer_456".to_string());
3390 config2.listen_addr = ipv4_localhost;
3391 config2.listen_addrs = vec![ipv4_localhost];
3392 config2.enable_ipv6 = false;
3393
3394 let node1 = P2PNode::new(config1).await?;
3395 let node2 = P2PNode::new(config2).await?;
3396
3397 node1.start().await?;
3398 node2.start().await?;
3399
3400 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3403
3404 let mut events = node1.subscribe_events();
3405
3406 let node2_addr = node2.local_addr().ok_or_else(|| {
3408 P2PError::Network(crate::error::NetworkError::ProtocolError(
3409 "No listening address".to_string().into(),
3410 ))
3411 })?;
3412
3413 let mut peer_id = None;
3416 for attempt in 0..3 {
3417 if attempt > 0 {
3418 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3419 }
3420 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3421 Ok(Ok(id)) => {
3422 peer_id = Some(id);
3423 break;
3424 }
3425 Ok(Err(_)) | Err(_) => continue,
3426 }
3427 }
3428 let peer_id = peer_id.ok_or_else(|| {
3429 P2PError::Network(crate::error::NetworkError::ProtocolError(
3430 "Failed to connect after 3 attempts".to_string().into(),
3431 ))
3432 })?;
3433
3434 let event = timeout(Duration::from_secs(2), events.recv()).await;
3436 assert!(event.is_ok());
3437
3438 let event_result = event
3439 .expect("Should receive event")
3440 .expect("Event should not be error");
3441 match event_result {
3442 P2PEvent::PeerConnected(event_peer_id) => {
3443 assert_eq!(event_peer_id, peer_id);
3444 }
3445 _ => panic!("Expected PeerConnected event"),
3446 }
3447
3448 node1.disconnect_peer(&peer_id).await?;
3450
3451 let event = timeout(Duration::from_secs(2), events.recv()).await;
3453 assert!(event.is_ok());
3454
3455 let event_result = event
3456 .expect("Should receive event")
3457 .expect("Event should not be error");
3458 match event_result {
3459 P2PEvent::PeerDisconnected(event_peer_id) => {
3460 assert_eq!(event_peer_id, peer_id);
3461 }
3462 _ => panic!("Expected PeerDisconnected event"),
3463 }
3464
3465 node1.stop().await?;
3466 node2.stop().await?;
3467
3468 Ok(())
3469 }
3470
3471 #[cfg_attr(target_os = "windows", ignore)]
3473 #[tokio::test]
3474 async fn test_message_sending() -> Result<()> {
3475 let mut config1 = create_test_node_config();
3477 config1.listen_addr =
3478 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3479 let node1 = P2PNode::new(config1).await?;
3480 node1.start().await?;
3481
3482 let mut config2 = create_test_node_config();
3483 config2.listen_addr =
3484 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3485 let node2 = P2PNode::new(config2).await?;
3486 node2.start().await?;
3487
3488 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3490
3491 let node2_addr = node2.local_addr().ok_or_else(|| {
3493 P2PError::Network(crate::error::NetworkError::ProtocolError(
3494 "No listening address".to_string().into(),
3495 ))
3496 })?;
3497
3498 let peer_id =
3500 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3501 Ok(res) => res?,
3502 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3503 };
3504
3505 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3507
3508 let message_data = b"Hello, peer!".to_vec();
3510 let result = match timeout(
3511 Duration::from_millis(500),
3512 node1.send_message(&peer_id, "test-protocol", message_data),
3513 )
3514 .await
3515 {
3516 Ok(res) => res,
3517 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3518 };
3519 if let Err(e) = &result {
3522 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3523 }
3524
3525 let non_existent_peer = "non_existent_peer".to_string();
3527 let result = node1
3528 .send_message(&non_existent_peer, "test-protocol", vec![])
3529 .await;
3530 assert!(result.is_err(), "Sending to non-existent peer should fail");
3531
3532 Ok(())
3533 }
3534
3535 #[tokio::test]
3536 async fn test_remote_mcp_operations() -> Result<()> {
3537 let config = create_test_node_config();
3538 let node = P2PNode::new(config).await?;
3539
3540 node.start().await?;
3542 node.stop().await?;
3543 Ok(())
3544 }
3545
3546 #[tokio::test]
3547 async fn test_health_check() -> Result<()> {
3548 let config = create_test_node_config();
3549 let node = P2PNode::new(config).await?;
3550
3551 let result = node.health_check().await;
3553 assert!(result.is_ok());
3554
3555 Ok(())
3560 }
3561
3562 #[tokio::test]
3563 async fn test_node_uptime() -> Result<()> {
3564 let config = create_test_node_config();
3565 let node = P2PNode::new(config).await?;
3566
3567 let uptime1 = node.uptime();
3568 assert!(uptime1 >= Duration::from_secs(0));
3569
3570 tokio::time::sleep(Duration::from_millis(10)).await;
3572
3573 let uptime2 = node.uptime();
3574 assert!(uptime2 > uptime1);
3575
3576 Ok(())
3577 }
3578
3579 #[tokio::test]
3580 async fn test_node_config_access() -> Result<()> {
3581 let config = create_test_node_config();
3582 let expected_peer_id = config.peer_id.clone();
3583 let node = P2PNode::new(config).await?;
3584
3585 let node_config = node.config();
3586 assert_eq!(node_config.peer_id, expected_peer_id);
3587 assert_eq!(node_config.max_connections, 100);
3588 Ok(())
3591 }
3592
3593 #[tokio::test]
3594 async fn test_mcp_server_access() -> Result<()> {
3595 let config = create_test_node_config();
3596 let _node = P2PNode::new(config).await?;
3597
3598 Ok(())
3600 }
3601
3602 #[tokio::test]
3603 async fn test_dht_access() -> Result<()> {
3604 let config = create_test_node_config();
3605 let node = P2PNode::new(config).await?;
3606
3607 assert!(node.dht().is_some());
3609
3610 Ok(())
3611 }
3612
3613 #[tokio::test]
3614 async fn test_node_builder() -> Result<()> {
3615 let builder = P2PNode::builder()
3617 .with_peer_id("builder_test_peer".to_string())
3618 .listen_on("/ip4/127.0.0.1/tcp/0")
3619 .listen_on("/ip6/::1/tcp/0")
3620 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3622 .with_connection_timeout(Duration::from_secs(15))
3623 .with_max_connections(200);
3624
3625 let config = builder.config;
3627 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3628 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3631 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3632 assert_eq!(config.max_connections, 200);
3633
3634 Ok(())
3635 }
3636
3637 #[tokio::test]
3638 async fn test_bootstrap_peers() -> Result<()> {
3639 let mut config = create_test_node_config();
3640 config.bootstrap_peers = vec![
3641 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3642 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3643 ];
3644
3645 let node = P2PNode::new(config).await?;
3646
3647 node.start().await?;
3649
3650 let _peer_count = node.peer_count().await;
3654
3655 node.stop().await?;
3656 Ok(())
3657 }
3658
3659 #[tokio::test]
3660 async fn test_production_mode_disabled() -> Result<()> {
3661 let config = create_test_node_config();
3662 let node = P2PNode::new(config).await?;
3663
3664 assert!(!node.is_production_mode());
3665 assert!(node.production_config().is_none());
3666
3667 let result = node.resource_metrics().await;
3669 assert!(result.is_err());
3670 assert!(result.unwrap_err().to_string().contains("not enabled"));
3671
3672 Ok(())
3673 }
3674
3675 #[tokio::test]
3676 async fn test_network_event_variants() {
3677 let peer_id = "test_peer".to_string();
3679 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3680
3681 let _peer_connected = NetworkEvent::PeerConnected {
3682 peer_id: peer_id.clone(),
3683 addresses: vec![address.clone()],
3684 };
3685
3686 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3687 peer_id: peer_id.clone(),
3688 reason: "test disconnect".to_string(),
3689 };
3690
3691 let _message_received = NetworkEvent::MessageReceived {
3692 peer_id: peer_id.clone(),
3693 protocol: "test-protocol".to_string(),
3694 data: vec![1, 2, 3],
3695 };
3696
3697 let _connection_failed = NetworkEvent::ConnectionFailed {
3698 peer_id: Some(peer_id.clone()),
3699 address: address.clone(),
3700 error: "connection refused".to_string(),
3701 };
3702
3703 let _dht_stored = NetworkEvent::DHTRecordStored {
3704 key: vec![1, 2, 3],
3705 value: vec![4, 5, 6],
3706 };
3707
3708 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3709 key: vec![1, 2, 3],
3710 value: Some(vec![4, 5, 6]),
3711 };
3712 }
3713
3714 #[tokio::test]
3715 async fn test_peer_info_structure() {
3716 let peer_info = PeerInfo {
3717 peer_id: "test_peer".to_string(),
3718 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3719 connected_at: Instant::now(),
3720 last_seen: Instant::now(),
3721 status: ConnectionStatus::Connected,
3722 protocols: vec!["test-protocol".to_string()],
3723 heartbeat_count: 0,
3724 };
3725
3726 assert_eq!(peer_info.peer_id, "test_peer");
3727 assert_eq!(peer_info.addresses.len(), 1);
3728 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3729 assert_eq!(peer_info.protocols.len(), 1);
3730 }
3731
3732 #[tokio::test]
3733 async fn test_serialization() -> Result<()> {
3734 let config = create_test_node_config();
3736 let serialized = serde_json::to_string(&config)?;
3737 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3738
3739 assert_eq!(config.peer_id, deserialized.peer_id);
3740 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3741 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3742
3743 Ok(())
3744 }
3745
3746 #[tokio::test]
3747 async fn test_get_peer_id_by_address_found() -> Result<()> {
3748 let config = create_test_node_config();
3749 let node = P2PNode::new(config).await?;
3750
3751 let test_peer_id = "peer_test_123".to_string();
3753 let test_address = "192.168.1.100:9000".to_string();
3754
3755 let peer_info = PeerInfo {
3756 peer_id: test_peer_id.clone(),
3757 addresses: vec![test_address.clone()],
3758 connected_at: Instant::now(),
3759 last_seen: Instant::now(),
3760 status: ConnectionStatus::Connected,
3761 protocols: vec!["test-protocol".to_string()],
3762 heartbeat_count: 0,
3763 };
3764
3765 node.peers
3766 .write()
3767 .await
3768 .insert(test_peer_id.clone(), peer_info);
3769
3770 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3772 assert_eq!(found_peer_id, Some(test_peer_id));
3773
3774 Ok(())
3775 }
3776
3777 #[tokio::test]
3778 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3779 let config = create_test_node_config();
3780 let node = P2PNode::new(config).await?;
3781
3782 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3784 assert_eq!(result, None);
3785
3786 Ok(())
3787 }
3788
3789 #[tokio::test]
3790 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3791 let config = create_test_node_config();
3792 let node = P2PNode::new(config).await?;
3793
3794 let result = node.get_peer_id_by_address("invalid-address").await;
3796 assert_eq!(result, None);
3797
3798 Ok(())
3799 }
3800
3801 #[tokio::test]
3802 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3803 let config = create_test_node_config();
3804 let node = P2PNode::new(config).await?;
3805
3806 let peer1_id = "peer_1".to_string();
3808 let peer1_addr = "192.168.1.101:9001".to_string();
3809
3810 let peer2_id = "peer_2".to_string();
3811 let peer2_addr = "192.168.1.102:9002".to_string();
3812
3813 let peer1_info = PeerInfo {
3814 peer_id: peer1_id.clone(),
3815 addresses: vec![peer1_addr.clone()],
3816 connected_at: Instant::now(),
3817 last_seen: Instant::now(),
3818 status: ConnectionStatus::Connected,
3819 protocols: vec!["test-protocol".to_string()],
3820 heartbeat_count: 0,
3821 };
3822
3823 let peer2_info = PeerInfo {
3824 peer_id: peer2_id.clone(),
3825 addresses: vec![peer2_addr.clone()],
3826 connected_at: Instant::now(),
3827 last_seen: Instant::now(),
3828 status: ConnectionStatus::Connected,
3829 protocols: vec!["test-protocol".to_string()],
3830 heartbeat_count: 0,
3831 };
3832
3833 node.peers
3834 .write()
3835 .await
3836 .insert(peer1_id.clone(), peer1_info);
3837 node.peers
3838 .write()
3839 .await
3840 .insert(peer2_id.clone(), peer2_info);
3841
3842 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3844 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3845
3846 assert_eq!(found_peer1, Some(peer1_id));
3847 assert_eq!(found_peer2, Some(peer2_id));
3848
3849 Ok(())
3850 }
3851
3852 #[tokio::test]
3853 async fn test_list_active_connections_empty() -> Result<()> {
3854 let config = create_test_node_config();
3855 let node = P2PNode::new(config).await?;
3856
3857 let connections = node.list_active_connections().await;
3859 assert!(connections.is_empty());
3860
3861 Ok(())
3862 }
3863
3864 #[tokio::test]
3865 async fn test_list_active_connections_with_peers() -> Result<()> {
3866 let config = create_test_node_config();
3867 let node = P2PNode::new(config).await?;
3868
3869 let peer1_id = "peer_1".to_string();
3871 let peer1_addrs = vec![
3872 "192.168.1.101:9001".to_string(),
3873 "192.168.1.101:9002".to_string(),
3874 ];
3875
3876 let peer2_id = "peer_2".to_string();
3877 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3878
3879 let peer1_info = PeerInfo {
3880 peer_id: peer1_id.clone(),
3881 addresses: peer1_addrs.clone(),
3882 connected_at: Instant::now(),
3883 last_seen: Instant::now(),
3884 status: ConnectionStatus::Connected,
3885 protocols: vec!["test-protocol".to_string()],
3886 heartbeat_count: 0,
3887 };
3888
3889 let peer2_info = PeerInfo {
3890 peer_id: peer2_id.clone(),
3891 addresses: peer2_addrs.clone(),
3892 connected_at: Instant::now(),
3893 last_seen: Instant::now(),
3894 status: ConnectionStatus::Connected,
3895 protocols: vec!["test-protocol".to_string()],
3896 heartbeat_count: 0,
3897 };
3898
3899 node.peers
3900 .write()
3901 .await
3902 .insert(peer1_id.clone(), peer1_info);
3903 node.peers
3904 .write()
3905 .await
3906 .insert(peer2_id.clone(), peer2_info);
3907
3908 node.active_connections
3910 .write()
3911 .await
3912 .insert(peer1_id.clone());
3913 node.active_connections
3914 .write()
3915 .await
3916 .insert(peer2_id.clone());
3917
3918 let connections = node.list_active_connections().await;
3920 assert_eq!(connections.len(), 2);
3921
3922 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3924 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3925
3926 assert!(peer1_conn.is_some());
3927 assert!(peer2_conn.is_some());
3928
3929 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3931 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3932
3933 Ok(())
3934 }
3935
3936 #[tokio::test]
3937 async fn test_remove_peer_success() -> Result<()> {
3938 let config = create_test_node_config();
3939 let node = P2PNode::new(config).await?;
3940
3941 let peer_id = "peer_to_remove".to_string();
3943 let peer_info = PeerInfo {
3944 peer_id: peer_id.clone(),
3945 addresses: vec!["192.168.1.100:9000".to_string()],
3946 connected_at: Instant::now(),
3947 last_seen: Instant::now(),
3948 status: ConnectionStatus::Connected,
3949 protocols: vec!["test-protocol".to_string()],
3950 heartbeat_count: 0,
3951 };
3952
3953 node.peers.write().await.insert(peer_id.clone(), peer_info);
3954
3955 assert!(node.is_peer_connected(&peer_id).await);
3957
3958 let removed = node.remove_peer(&peer_id).await;
3960 assert!(removed);
3961
3962 assert!(!node.is_peer_connected(&peer_id).await);
3964
3965 Ok(())
3966 }
3967
3968 #[tokio::test]
3969 async fn test_remove_peer_nonexistent() -> Result<()> {
3970 let config = create_test_node_config();
3971 let node = P2PNode::new(config).await?;
3972
3973 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3975 assert!(!removed);
3976
3977 Ok(())
3978 }
3979
3980 #[tokio::test]
3981 async fn test_is_peer_connected() -> Result<()> {
3982 let config = create_test_node_config();
3983 let node = P2PNode::new(config).await?;
3984
3985 let peer_id = "test_peer".to_string();
3986
3987 assert!(!node.is_peer_connected(&peer_id).await);
3989
3990 let peer_info = PeerInfo {
3992 peer_id: peer_id.clone(),
3993 addresses: vec!["192.168.1.100:9000".to_string()],
3994 connected_at: Instant::now(),
3995 last_seen: Instant::now(),
3996 status: ConnectionStatus::Connected,
3997 protocols: vec!["test-protocol".to_string()],
3998 heartbeat_count: 0,
3999 };
4000
4001 node.peers.write().await.insert(peer_id.clone(), peer_info);
4002
4003 assert!(node.is_peer_connected(&peer_id).await);
4005
4006 node.remove_peer(&peer_id).await;
4008
4009 assert!(!node.is_peer_connected(&peer_id).await);
4011
4012 Ok(())
4013 }
4014
4015 #[test]
4016 fn test_normalize_ipv6_wildcard() {
4017 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
4018
4019 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
4020 let normalized = normalize_wildcard_to_loopback(wildcard);
4021
4022 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
4023 assert_eq!(normalized.port(), 8080);
4024 }
4025
4026 #[test]
4027 fn test_normalize_ipv4_wildcard() {
4028 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
4029
4030 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
4031 let normalized = normalize_wildcard_to_loopback(wildcard);
4032
4033 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
4034 assert_eq!(normalized.port(), 9000);
4035 }
4036
4037 #[test]
4038 fn test_normalize_specific_address_unchanged() {
4039 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
4040 let normalized = normalize_wildcard_to_loopback(specific);
4041
4042 assert_eq!(normalized, specific);
4043 }
4044
4045 #[test]
4046 fn test_normalize_loopback_unchanged() {
4047 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
4048
4049 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
4050 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
4051 assert_eq!(normalized_v6, loopback_v6);
4052
4053 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
4054 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
4055 assert_eq!(normalized_v4, loopback_v4);
4056 }
4057}