1use crate::bgp_geo_provider::BgpGeoProvider;
20use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
21use crate::config::Config;
22use crate::dht::DHT;
23use crate::error::{NetworkError, P2PError, P2pResult as Result};
24use crate::security::GeoProvider;
25
26use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
27use crate::transport::ant_quic_adapter::{DualStackNetworkNode, ant_peer_id_to_string};
28#[allow(unused_imports)] use crate::transport::{TransportOptions, TransportType};
30use crate::validation::RateLimitConfig;
31use crate::validation::RateLimiter;
32use crate::{NetworkAddress, PeerId};
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::Duration;
38use tokio::sync::{RwLock, broadcast};
39use tokio::time::{Instant, interval};
40use tracing::{debug, error, info, trace, warn};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
48struct WireMessage {
49 protocol: String,
51 data: Vec<u8>,
53 from: String,
55 timestamp: u64,
57}
58
59const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";
61
62const RECV_CHANNEL_CAPACITY: usize = 256;
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct NodeConfig {
68 pub peer_id: Option<PeerId>,
70
71 pub listen_addrs: Vec<std::net::SocketAddr>,
73
74 pub listen_addr: std::net::SocketAddr,
76
77 pub bootstrap_peers: Vec<std::net::SocketAddr>,
79
80 pub bootstrap_peers_str: Vec<String>,
82
83 pub enable_ipv6: bool,
85
86 pub connection_timeout: Duration,
89
90 pub keep_alive_interval: Duration,
92
93 pub max_connections: usize,
95
96 pub max_incoming_connections: usize,
98
99 pub dht_config: DHTConfig,
101
102 pub security_config: SecurityConfig,
104
105 pub production_config: Option<ProductionConfig>,
107
108 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
110
111 pub diversity_config: Option<crate::security::IPDiversityConfig>,
116
117 #[serde(default)]
122 pub attestation_config: crate::attestation::AttestationConfig,
123
124 #[serde(default = "default_stale_peer_threshold")]
127 pub stale_peer_threshold: Duration,
128}
129
130fn default_stale_peer_threshold() -> Duration {
132 Duration::from_secs(60)
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct DHTConfig {
138 pub k_value: usize,
140
141 pub alpha_value: usize,
143
144 pub record_ttl: Duration,
146
147 pub refresh_interval: Duration,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct SecurityConfig {
154 pub enable_noise: bool,
156
157 pub enable_tls: bool,
159
160 pub trust_level: TrustLevel,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
166pub enum TrustLevel {
167 None,
169 Basic,
171 Full,
173}
174
175#[inline]
183fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
184 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
185
186 if ipv6_enabled {
187 addrs.push(std::net::SocketAddr::new(
188 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
189 port,
190 ));
191 }
192
193 addrs.push(std::net::SocketAddr::new(
194 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
195 port,
196 ));
197
198 addrs
199}
200
201impl NodeConfig {
202 pub fn new() -> Result<Self> {
208 let config = Config::default();
209 let listen_addr = config.listen_socket_addr()?;
210
211 Ok(Self {
212 peer_id: None,
213 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
214 listen_addr,
215 bootstrap_peers: Vec::new(),
216 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
217 enable_ipv6: config.network.ipv6_enabled,
218 connection_timeout: Duration::from_secs(config.network.connection_timeout),
219 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
220 max_connections: config.network.max_connections,
221 max_incoming_connections: config.security.connection_limit as usize,
222 dht_config: DHTConfig::default(),
223 security_config: SecurityConfig::default(),
224 production_config: None,
225 bootstrap_cache_config: None,
226 diversity_config: None,
227 attestation_config: config.attestation.clone(),
228 stale_peer_threshold: default_stale_peer_threshold(),
229 })
230 }
231
232 pub fn builder() -> NodeConfigBuilder {
234 NodeConfigBuilder::default()
235 }
236}
237
238#[derive(Debug, Clone, Default)]
244pub struct NodeConfigBuilder {
245 peer_id: Option<PeerId>,
246 listen_port: Option<u16>,
247 enable_ipv6: Option<bool>,
248 bootstrap_peers: Vec<std::net::SocketAddr>,
249 max_connections: Option<usize>,
250 connection_timeout: Option<Duration>,
251 keep_alive_interval: Option<Duration>,
252 dht_config: Option<DHTConfig>,
253 security_config: Option<SecurityConfig>,
254 production_config: Option<ProductionConfig>,
255}
256
257impl NodeConfigBuilder {
258 pub fn peer_id(mut self, peer_id: PeerId) -> Self {
260 self.peer_id = Some(peer_id);
261 self
262 }
263
264 pub fn listen_port(mut self, port: u16) -> Self {
266 self.listen_port = Some(port);
267 self
268 }
269
270 pub fn ipv6(mut self, enabled: bool) -> Self {
272 self.enable_ipv6 = Some(enabled);
273 self
274 }
275
276 pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
278 self.bootstrap_peers.push(addr);
279 self
280 }
281
282 pub fn max_connections(mut self, max: usize) -> Self {
284 self.max_connections = Some(max);
285 self
286 }
287
288 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
290 self.connection_timeout = Some(timeout);
291 self
292 }
293
294 pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
296 self.keep_alive_interval = Some(interval);
297 self
298 }
299
300 pub fn dht_config(mut self, config: DHTConfig) -> Self {
302 self.dht_config = Some(config);
303 self
304 }
305
306 pub fn security_config(mut self, config: SecurityConfig) -> Self {
308 self.security_config = Some(config);
309 self
310 }
311
312 pub fn production_config(mut self, config: ProductionConfig) -> Self {
314 self.production_config = Some(config);
315 self
316 }
317
318 pub fn build(self) -> Result<NodeConfig> {
324 let base_config = Config::default();
325 let default_port = base_config
326 .listen_socket_addr()
327 .map(|addr| addr.port())
328 .unwrap_or(9000);
329 let port = self.listen_port.unwrap_or(default_port);
330 let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
331
332 let listen_addr =
333 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
334
335 Ok(NodeConfig {
336 peer_id: self.peer_id,
337 listen_addrs: build_listen_addrs(port, ipv6_enabled),
338 listen_addr,
339 bootstrap_peers: self.bootstrap_peers.clone(),
340 bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
341 enable_ipv6: ipv6_enabled,
342 connection_timeout: self
343 .connection_timeout
344 .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
345 keep_alive_interval: self
346 .keep_alive_interval
347 .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
348 max_connections: self
349 .max_connections
350 .unwrap_or(base_config.network.max_connections),
351 max_incoming_connections: base_config.security.connection_limit as usize,
352 dht_config: self.dht_config.unwrap_or_default(),
353 security_config: self.security_config.unwrap_or_default(),
354 production_config: self.production_config,
355 bootstrap_cache_config: None,
356 diversity_config: None,
357 attestation_config: base_config.attestation.clone(),
358 stale_peer_threshold: default_stale_peer_threshold(),
359 })
360 }
361}
362
363impl Default for NodeConfig {
364 fn default() -> Self {
365 let config = Config::default();
366 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
367 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 9000)
368 });
369
370 Self {
371 peer_id: None,
372 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
373 listen_addr,
374 bootstrap_peers: Vec::new(),
375 bootstrap_peers_str: Vec::new(),
376 enable_ipv6: config.network.ipv6_enabled,
377 connection_timeout: Duration::from_secs(config.network.connection_timeout),
378 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
379 max_connections: config.network.max_connections,
380 max_incoming_connections: config.security.connection_limit as usize,
381 dht_config: DHTConfig::default(),
382 security_config: SecurityConfig::default(),
383 production_config: None,
384 bootstrap_cache_config: None,
385 diversity_config: None,
386 attestation_config: config.attestation.clone(),
387 stale_peer_threshold: default_stale_peer_threshold(),
388 }
389 }
390}
391
392impl NodeConfig {
393 pub fn from_config(config: &Config) -> Result<Self> {
395 let listen_addr = config.listen_socket_addr()?;
396 let bootstrap_addrs = config.bootstrap_addrs()?;
397
398 let mut node_config = Self {
399 peer_id: None,
400 listen_addrs: vec![listen_addr],
401 listen_addr,
402 bootstrap_peers: bootstrap_addrs
403 .iter()
404 .map(|addr| addr.socket_addr())
405 .collect(),
406 bootstrap_peers_str: config
407 .network
408 .bootstrap_nodes
409 .iter()
410 .map(|addr| addr.to_string())
411 .collect(),
412 enable_ipv6: config.network.ipv6_enabled,
413
414 connection_timeout: Duration::from_secs(config.network.connection_timeout),
415 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
416 max_connections: config.network.max_connections,
417 max_incoming_connections: config.security.connection_limit as usize,
418 dht_config: DHTConfig {
419 k_value: 20,
420 alpha_value: 3,
421 record_ttl: Duration::from_secs(3600),
422 refresh_interval: Duration::from_secs(900),
423 },
424 security_config: SecurityConfig {
425 enable_noise: true,
426 enable_tls: true,
427 trust_level: TrustLevel::Basic,
428 },
429 production_config: Some(ProductionConfig {
430 max_connections: config.network.max_connections,
431 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
434 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
435 health_check_interval: Duration::from_secs(30),
436 metrics_interval: Duration::from_secs(60),
437 enable_performance_tracking: true,
438 enable_auto_cleanup: true,
439 shutdown_timeout: Duration::from_secs(30),
440 rate_limits: crate::production::RateLimitConfig::default(),
441 }),
442 bootstrap_cache_config: None,
443 diversity_config: None,
444 attestation_config: config.attestation.clone(),
445 stale_peer_threshold: default_stale_peer_threshold(),
446 };
447
448 if config.network.ipv6_enabled {
450 node_config.listen_addrs.push(std::net::SocketAddr::new(
451 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
452 listen_addr.port(),
453 ));
454 }
455
456 Ok(node_config)
457 }
458
459 pub fn with_listen_addr(addr: &str) -> Result<Self> {
461 let listen_addr: std::net::SocketAddr = addr
462 .parse()
463 .map_err(|e: std::net::AddrParseError| {
464 NetworkError::InvalidAddress(e.to_string().into())
465 })
466 .map_err(P2PError::Network)?;
467 let cfg = NodeConfig {
468 listen_addr,
469 listen_addrs: vec![listen_addr],
470 diversity_config: None,
471 ..Default::default()
472 };
473 Ok(cfg)
474 }
475}
476
477impl Default for DHTConfig {
478 fn default() -> Self {
479 Self {
480 k_value: 20,
481 alpha_value: 5,
482 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
485 }
486}
487
488impl Default for SecurityConfig {
489 fn default() -> Self {
490 Self {
491 enable_noise: true,
492 enable_tls: true,
493 trust_level: TrustLevel::Basic,
494 }
495 }
496}
497
498#[derive(Debug, Clone)]
500pub struct PeerInfo {
501 pub peer_id: PeerId,
503
504 pub addresses: Vec<String>,
506
507 pub connected_at: Instant,
509
510 pub last_seen: Instant,
512
513 pub status: ConnectionStatus,
515
516 pub protocols: Vec<String>,
518
519 pub heartbeat_count: u64,
521}
522
523#[derive(Debug, Clone, PartialEq)]
525pub enum ConnectionStatus {
526 Connecting,
528 Connected,
530 Disconnecting,
532 Disconnected,
534 Failed(String),
536}
537
538#[derive(Debug, Clone)]
540pub enum NetworkEvent {
541 PeerConnected {
543 peer_id: PeerId,
545 addresses: Vec<String>,
547 },
548
549 PeerDisconnected {
551 peer_id: PeerId,
553 reason: String,
555 },
556
557 MessageReceived {
559 peer_id: PeerId,
561 protocol: String,
563 data: Vec<u8>,
565 },
566
567 ConnectionFailed {
569 peer_id: Option<PeerId>,
571 address: String,
573 error: String,
575 },
576
577 DHTRecordStored {
579 key: Vec<u8>,
581 value: Vec<u8>,
583 },
584
585 DHTRecordRetrieved {
587 key: Vec<u8>,
589 value: Option<Vec<u8>>,
591 },
592}
593
594#[derive(Debug, Clone)]
599pub enum P2PEvent {
600 Message {
602 topic: String,
604 source: PeerId,
606 data: Vec<u8>,
608 },
609 PeerConnected(PeerId),
611 PeerDisconnected(PeerId),
613}
614
615pub struct P2PNode {
625 config: NodeConfig,
627
628 peer_id: PeerId,
630
631 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
633
634 event_tx: broadcast::Sender<P2PEvent>,
636
637 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
639
640 start_time: Instant,
642
643 running: RwLock<bool>,
645
646 dht: Option<Arc<RwLock<DHT>>>,
648
649 resource_manager: Option<Arc<ResourceManager>>,
651
652 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
654
655 dual_node: Arc<DualStackNetworkNode>,
657
658 #[allow(dead_code)]
660 rate_limiter: Arc<RateLimiter>,
661
662 active_connections: Arc<RwLock<HashSet<PeerId>>>,
665
666 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
668
669 #[allow(dead_code)]
671 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
672
673 #[allow(dead_code)]
675 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
676
677 #[allow(dead_code)]
679 periodic_tasks_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
680
681 shutdown: Arc<AtomicBool>,
683
684 recv_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
686
687 listener_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
689
690 #[allow(dead_code)]
692 geo_provider: Arc<BgpGeoProvider>,
693
694 entangled_id: Option<crate::attestation::EntangledId>,
697
698 binary_hash: [u8; 32],
701}
702
703fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
719 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
720
721 if addr.ip().is_unspecified() {
722 let loopback_ip = match addr {
724 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
727 std::net::SocketAddr::new(loopback_ip, addr.port())
728 } else {
729 addr
731 }
732}
733
734impl P2PNode {
735 pub fn new_for_tests() -> Result<Self> {
737 let (event_tx, _) = broadcast::channel(16);
738 Ok(Self {
739 config: NodeConfig::default(),
740 peer_id: "test_peer".to_string(),
741 peers: Arc::new(RwLock::new(HashMap::new())),
742 event_tx,
743 listen_addrs: RwLock::new(Vec::new()),
744 start_time: Instant::now(),
745 running: RwLock::new(false),
746 dht: None,
747 resource_manager: None,
748 bootstrap_manager: None,
749 dual_node: {
750 let v6: Option<std::net::SocketAddr> = "[::1]:0"
752 .parse()
753 .ok()
754 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
755 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
756 let handle = tokio::runtime::Handle::current();
757 let dual_attempt = handle.block_on(
758 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
759 );
760 let dual = match dual_attempt {
761 Ok(d) => d,
762 Err(_e1) => {
763 let fallback = handle.block_on(
765 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
766 None,
767 "127.0.0.1:0".parse().ok(),
768 ),
769 );
770 match fallback {
771 Ok(d) => d,
772 Err(e2) => {
773 return Err(P2PError::Network(NetworkError::BindError(
774 format!("Failed to create dual-stack network node: {}", e2)
775 .into(),
776 )));
777 }
778 }
779 }
780 };
781 Arc::new(dual)
782 },
783 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
784 max_requests: 100,
785 burst_size: 100,
786 window: std::time::Duration::from_secs(1),
787 ..Default::default()
788 })),
789 active_connections: Arc::new(RwLock::new(HashSet::new())),
790 connection_monitor_handle: Arc::new(RwLock::new(None)),
791 keepalive_handle: Arc::new(RwLock::new(None)),
792 periodic_tasks_handle: Arc::new(RwLock::new(None)),
793 shutdown: Arc::new(AtomicBool::new(false)),
794 recv_handles: Arc::new(RwLock::new(Vec::new())),
795 listener_handle: Arc::new(RwLock::new(None)),
796 geo_provider: Arc::new(BgpGeoProvider::new()),
797 security_dashboard: None,
798 entangled_id: None,
800 binary_hash: [0u8; 32],
801 })
802 }
803 pub async fn new(config: NodeConfig) -> Result<Self> {
805 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
806 format!("peer_{}", &uuid::Uuid::new_v4().to_string()[..8])
808 });
809
810 let (event_tx, _) = broadcast::channel(1000);
811
812 {
815 use blake3::Hasher;
816 let mut hasher = Hasher::new();
817 hasher.update(peer_id.as_bytes());
818 let digest = hasher.finalize();
819 let mut nid = [0u8; 32];
820 nid.copy_from_slice(digest.as_bytes());
821 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
822 crate::identity::node_identity::NodeId::from_bytes(nid),
823 ));
824 }
827
828 let (dht, security_dashboard) = if true {
830 let _dht_config = crate::dht::DHTConfig {
832 replication_factor: config.dht_config.k_value,
833 bucket_size: config.dht_config.k_value,
834 alpha: config.dht_config.alpha_value,
835 record_ttl: config.dht_config.record_ttl,
836 bucket_refresh_interval: config.dht_config.refresh_interval,
837 republish_interval: config.dht_config.refresh_interval,
838 max_distance: 160,
839 };
840 let peer_bytes = peer_id.as_bytes();
842 let mut node_id_bytes = [0u8; 32];
843 let len = peer_bytes.len().min(32);
844 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
845 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
846 let dht_instance = DHT::new(node_id).map_err(|e| {
847 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
848 e.to_string().into(),
849 ))
850 })?;
851 dht_instance.start_maintenance_tasks();
852
853 let security_metrics = dht_instance.security_metrics();
855 let dashboard = crate::dht::metrics::SecurityDashboard::new(
856 security_metrics,
857 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
858 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
859 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
860 );
861
862 (
863 Some(Arc::new(RwLock::new(dht_instance))),
864 Some(Arc::new(dashboard)),
865 )
866 } else {
867 (None, None)
868 };
869
870 let resource_manager = config
874 .production_config
875 .clone()
876 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
877
878 let diversity_config = config.diversity_config.clone().unwrap_or_default();
880 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
881 match BootstrapManager::with_full_config(
882 cache_config.clone(),
883 crate::rate_limit::JoinRateLimiterConfig::default(),
884 diversity_config.clone(),
885 )
886 .await
887 {
888 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
889 Err(e) => {
890 warn!(
891 "Failed to initialize bootstrap manager: {}, continuing without cache",
892 e
893 );
894 None
895 }
896 }
897 } else {
898 match BootstrapManager::with_full_config(
899 crate::bootstrap::CacheConfig::default(),
900 crate::rate_limit::JoinRateLimiterConfig::default(),
901 diversity_config,
902 )
903 .await
904 {
905 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
906 Err(e) => {
907 warn!(
908 "Failed to initialize bootstrap manager: {}, continuing without cache",
909 e
910 );
911 None
912 }
913 }
914 };
915
916 let (v6_opt, v4_opt) = {
919 let port = config.listen_addr.port();
920 let ip = config.listen_addr.ip();
921
922 let v4_addr = if ip.is_ipv4() {
923 Some(std::net::SocketAddr::new(ip, port))
924 } else {
925 Some(std::net::SocketAddr::new(
928 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
929 port,
930 ))
931 };
932
933 let v6_addr = if config.enable_ipv6 {
934 if ip.is_ipv6() {
935 Some(std::net::SocketAddr::new(ip, port))
936 } else {
937 Some(std::net::SocketAddr::new(
938 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
939 port,
940 ))
941 }
942 } else {
943 None
944 };
945 (v6_addr, v4_addr)
946 };
947
948 let dual_node = Arc::new(
949 DualStackNetworkNode::new_with_max_connections(v6_opt, v4_opt, config.max_connections)
950 .await
951 .map_err(|e| {
952 P2PError::Transport(crate::error::TransportError::SetupFailed(
953 format!("Failed to create dual-stack network nodes: {}", e).into(),
954 ))
955 })?,
956 );
957
958 let rate_limiter = Arc::new(RateLimiter::new(
960 crate::validation::RateLimitConfig::default(),
961 ));
962
963 let active_connections = Arc::new(RwLock::new(HashSet::new()));
965
966 let geo_provider = Arc::new(BgpGeoProvider::new());
968
969 let peers = Arc::new(RwLock::new(HashMap::new()));
971
972 let connection_event_rx = dual_node.subscribe_connection_events();
976
977 let connection_monitor_handle = {
978 let active_conns = Arc::clone(&active_connections);
979 let peers_map = Arc::clone(&peers);
980 let event_tx_clone = event_tx.clone();
981 let dual_node_clone = Arc::clone(&dual_node);
982 let geo_provider_clone = Arc::clone(&geo_provider);
983 let peer_id_clone = peer_id.clone();
984
985 let handle = tokio::spawn(async move {
986 Self::connection_lifecycle_monitor_with_rx(
987 dual_node_clone,
988 connection_event_rx,
989 active_conns,
990 peers_map,
991 event_tx_clone,
992 geo_provider_clone,
993 peer_id_clone,
994 )
995 .await;
996 });
997
998 Arc::new(RwLock::new(Some(handle)))
999 };
1000
1001 let shutdown = Arc::new(AtomicBool::new(false));
1003 let keepalive_handle = {
1004 let active_conns = Arc::clone(&active_connections);
1005 let dual_node_clone = Arc::clone(&dual_node);
1006 let shutdown_clone = Arc::clone(&shutdown);
1007
1008 let handle = tokio::spawn(async move {
1009 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
1010 });
1011
1012 Arc::new(RwLock::new(Some(handle)))
1013 };
1014
1015 let periodic_tasks_handle = {
1017 let peers_clone = Arc::clone(&peers);
1018 let active_conns_clone = Arc::clone(&active_connections);
1019 let event_tx_clone = event_tx.clone();
1020 let stale_threshold = config.stale_peer_threshold;
1021 let shutdown_clone = Arc::clone(&shutdown);
1022
1023 let handle = tokio::spawn(async move {
1024 Self::periodic_maintenance_task(
1025 peers_clone,
1026 active_conns_clone,
1027 event_tx_clone,
1028 stale_threshold,
1029 shutdown_clone,
1030 )
1031 .await;
1032 });
1033
1034 Arc::new(RwLock::new(Some(handle)))
1035 };
1036
1037 let binary_hash = Self::compute_binary_hash();
1040
1041 let node = Self {
1042 config,
1043 peer_id,
1044 peers,
1045 event_tx,
1046 listen_addrs: RwLock::new(Vec::new()),
1047 start_time: Instant::now(),
1048 running: RwLock::new(false),
1049 dht,
1050 resource_manager,
1051 bootstrap_manager,
1052 dual_node,
1053 rate_limiter,
1054 active_connections,
1055 security_dashboard,
1056 connection_monitor_handle,
1057 keepalive_handle,
1058 periodic_tasks_handle,
1059 shutdown,
1060 recv_handles: Arc::new(RwLock::new(Vec::new())),
1061 listener_handle: Arc::new(RwLock::new(None)),
1062 geo_provider,
1063 entangled_id: None,
1065 binary_hash,
1066 };
1067 info!(
1068 "Created P2P node with peer ID: {} (call start() to begin networking)",
1069 node.peer_id
1070 );
1071
1072 Ok(node)
1073 }
1074
1075 pub fn builder() -> NodeBuilder {
1077 NodeBuilder::new()
1078 }
1079
1080 pub fn peer_id(&self) -> &PeerId {
1082 &self.peer_id
1083 }
1084
1085 pub fn transport_peer_id(&self) -> Option<String> {
1092 if let Some(ref v4) = self.dual_node.v4 {
1093 return Some(ant_peer_id_to_string(&v4.our_peer_id()));
1094 }
1095 if let Some(ref v6) = self.dual_node.v6 {
1096 return Some(ant_peer_id_to_string(&v6.our_peer_id()));
1097 }
1098 None
1099 }
1100
1101 pub fn local_addr(&self) -> Option<String> {
1102 self.listen_addrs
1103 .try_read()
1104 .ok()
1105 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
1106 }
1107
1108 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1109 info!("Subscribed to topic: {}", topic);
1112 Ok(())
1113 }
1114
1115 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1116 info!(
1117 "Publishing message to topic: {} ({} bytes)",
1118 topic,
1119 data.len()
1120 );
1121
1122 let peer_list: Vec<PeerId> = {
1124 let peers_guard = self.peers.read().await;
1125 peers_guard.keys().cloned().collect()
1126 };
1127
1128 if peer_list.is_empty() {
1129 debug!("No peers connected, message will only be sent to local subscribers");
1130 } else {
1131 let mut send_count = 0;
1133 for peer_id in &peer_list {
1134 match self.send_message(peer_id, topic, data.to_vec()).await {
1135 Ok(_) => {
1136 send_count += 1;
1137 debug!("Sent message to peer: {}", peer_id);
1138 }
1139 Err(e) => {
1140 warn!("Failed to send message to peer {}: {}", peer_id, e);
1141 }
1142 }
1143 }
1144 info!(
1145 "Published message to {}/{} connected peers",
1146 send_count,
1147 peer_list.len()
1148 );
1149 }
1150
1151 let event = P2PEvent::Message {
1153 topic: topic.to_string(),
1154 source: self.peer_id.clone(),
1155 data: data.to_vec(),
1156 };
1157 let _ = self.event_tx.send(event);
1158
1159 Ok(())
1160 }
1161
1162 pub fn config(&self) -> &NodeConfig {
1164 &self.config
1165 }
1166
1167 pub async fn start(&self) -> Result<()> {
1169 info!("Starting P2P node...");
1170
1171 if let Some(ref resource_manager) = self.resource_manager {
1173 resource_manager.start().await.map_err(|e| {
1174 P2PError::Network(crate::error::NetworkError::ProtocolError(
1175 format!("Failed to start resource manager: {e}").into(),
1176 ))
1177 })?;
1178 info!("Production resource manager started");
1179 }
1180
1181 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1183 let mut manager = bootstrap_manager.write().await;
1184 manager.start_background_tasks().await.map_err(|e| {
1185 P2PError::Network(crate::error::NetworkError::ProtocolError(
1186 format!("Failed to start bootstrap manager: {e}").into(),
1187 ))
1188 })?;
1189 info!("Bootstrap cache manager started");
1190 }
1191
1192 *self.running.write().await = true;
1194
1195 self.start_network_listeners().await?;
1197
1198 self.start_connection_monitor().await;
1200
1201 let listen_addrs = self.listen_addrs.read().await;
1203 info!("P2P node started on addresses: {:?}", *listen_addrs);
1204
1205 self.start_message_receiving_system().await?;
1207
1208 self.connect_bootstrap_peers().await?;
1210
1211 Ok(())
1212 }
1213
1214 async fn start_network_listeners(&self) -> Result<()> {
1216 info!("Starting dual-stack listeners (ant-quic)...");
1217 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1219 P2PError::Transport(crate::error::TransportError::SetupFailed(
1220 format!("Failed to get local addresses: {}", e).into(),
1221 ))
1222 })?;
1223 {
1224 let mut la = self.listen_addrs.write().await;
1225 *la = addrs.clone();
1226 }
1227
1228 let event_tx = self.event_tx.clone();
1230 let peers = self.peers.clone();
1231 let active_connections = self.active_connections.clone();
1232 let rate_limiter = self.rate_limiter.clone();
1233 let dual = self.dual_node.clone();
1234 let shutdown = Arc::clone(&self.shutdown);
1235 let handle = tokio::spawn(async move {
1236 loop {
1237 if shutdown.load(Ordering::Relaxed) {
1238 break;
1239 }
1240 match dual.accept_any().await {
1241 Ok((ant_peer_id, remote_sock)) => {
1242 if let Err(e) = rate_limiter.check_ip(&remote_sock.ip()) {
1244 warn!(
1245 "Rate-limited incoming connection from {}: {}",
1246 remote_sock, e
1247 );
1248 continue;
1249 }
1250
1251 debug!(
1252 "Accepted connection from {} (protocol validation pending ant-quic implementation)",
1253 remote_sock
1254 );
1255
1256 let peer_id = ant_peer_id_to_string(&ant_peer_id);
1257 let remote_addr = NetworkAddress::from(remote_sock);
1258 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1259 register_new_peer(&peers, &peer_id, &remote_addr).await;
1260 active_connections.write().await.insert(peer_id);
1261 }
1262 Err(e) => {
1263 warn!("Accept failed: {}", e);
1264 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1265 }
1266 }
1267 }
1268 });
1269 *self.listener_handle.write().await = Some(handle);
1270
1271 info!("Dual-stack listeners active on: {:?}", addrs);
1272 Ok(())
1273 }
1274
1275 async fn start_message_receiving_system(&self) -> Result<()> {
1280 info!("Starting message receiving system");
1281
1282 let (tx, mut rx) = tokio::sync::mpsc::channel(RECV_CHANNEL_CAPACITY);
1283 let shutdown = Arc::clone(&self.shutdown);
1284
1285 let mut handles = Vec::new();
1286
1287 if let Some(v6) = self.dual_node.v6.as_ref() {
1289 handles.push(v6.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1290 }
1291 if let Some(v4) = self.dual_node.v4.as_ref() {
1292 handles.push(v4.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1293 }
1294 drop(tx); let event_tx = self.event_tx.clone();
1297 handles.push(tokio::spawn(async move {
1298 info!("Message receive loop started");
1299 while let Some((peer_id, bytes)) = rx.recv().await {
1300 let transport_peer_id = ant_peer_id_to_string(&peer_id);
1301 info!(
1302 "Received {} bytes from peer {}",
1303 bytes.len(),
1304 transport_peer_id
1305 );
1306
1307 if bytes == KEEPALIVE_PAYLOAD {
1308 trace!("Received keepalive from {}", transport_peer_id);
1309 continue;
1310 }
1311
1312 match parse_protocol_message(&bytes, &transport_peer_id) {
1313 Some(event) => {
1314 let _ = event_tx.send(event);
1315 }
1316 None => {
1317 warn!("Failed to parse protocol message ({} bytes)", bytes.len());
1318 }
1319 }
1320 }
1321 info!("Message receive loop ended — channel closed");
1322 }));
1323
1324 *self.recv_handles.write().await = handles;
1325
1326 Ok(())
1327 }
1328
1329 pub async fn run(&self) -> Result<()> {
1335 if !*self.running.read().await {
1336 self.start().await?;
1337 }
1338
1339 info!("P2P node running...");
1340
1341 loop {
1343 if !*self.running.read().await {
1344 break;
1345 }
1346
1347 self.periodic_tasks().await?;
1349
1350 tokio::time::sleep(Duration::from_millis(100)).await;
1352 }
1353
1354 info!("P2P node stopped");
1355 Ok(())
1356 }
1357
1358 pub async fn stop(&self) -> Result<()> {
1360 info!("Stopping P2P node...");
1361
1362 self.shutdown.store(true, Ordering::Relaxed);
1364
1365 *self.running.write().await = false;
1367
1368 self.dual_node.shutdown_endpoints().await;
1375
1376 let handles: Vec<_> = self.recv_handles.write().await.drain(..).collect();
1378 for handle in handles {
1379 let _ = handle.await;
1380 }
1381
1382 if let Some(handle) = self.listener_handle.write().await.take() {
1384 let _ = handle.await;
1385 }
1386
1387 self.disconnect_all_peers().await?;
1389
1390 if let Some(ref resource_manager) = self.resource_manager {
1392 resource_manager.shutdown().await.map_err(|e| {
1393 P2PError::Network(crate::error::NetworkError::ProtocolError(
1394 format!("Failed to shutdown resource manager: {e}").into(),
1395 ))
1396 })?;
1397 info!("Production resource manager stopped");
1398 }
1399
1400 info!("P2P node stopped");
1401 Ok(())
1402 }
1403
1404 pub async fn shutdown(&self) -> Result<()> {
1406 self.stop().await
1407 }
1408
1409 pub async fn is_running(&self) -> bool {
1411 *self.running.read().await
1412 }
1413
1414 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1416 self.listen_addrs.read().await.clone()
1417 }
1418
1419 pub async fn connected_peers(&self) -> Vec<PeerId> {
1421 self.active_connections
1424 .read()
1425 .await
1426 .iter()
1427 .cloned()
1428 .collect()
1429 }
1430
1431 pub async fn peer_count(&self) -> usize {
1433 self.active_connections.read().await.len()
1434 }
1435
1436 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1438 self.peers.read().await.get(peer_id).cloned()
1439 }
1440
1441 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1453 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1455
1456 let peers = self.peers.read().await;
1457
1458 for (peer_id, peer_info) in peers.iter() {
1460 for peer_addr in &peer_info.addresses {
1462 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1463 && peer_socket == socket_addr
1464 {
1465 return Some(peer_id.clone());
1466 }
1467 }
1468 }
1469
1470 None
1471 }
1472
1473 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1479 let active = self.active_connections.read().await;
1480 let peers = self.peers.read().await;
1481
1482 active
1483 .iter()
1484 .map(|peer_id| {
1485 let addresses = peers
1486 .get(peer_id)
1487 .map(|info| info.addresses.clone())
1488 .unwrap_or_default();
1489 (peer_id.clone(), addresses)
1490 })
1491 .collect()
1492 }
1493
1494 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1506 self.active_connections.write().await.remove(peer_id);
1508 self.peers.write().await.remove(peer_id).is_some()
1510 }
1511
1512 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1525 self.peers.read().await.contains_key(peer_id)
1526 }
1527
1528 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1530 info!("Connecting to peer at: {}", address);
1531
1532 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1534 Some(resource_manager.acquire_connection().await?)
1535 } else {
1536 None
1537 };
1538
1539 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1541 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1542 format!("{}: {}", address, e).into(),
1543 ))
1544 })?;
1545
1546 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1549 if normalized_addr != socket_addr {
1550 info!(
1551 "Normalized wildcard address {} to loopback {}",
1552 socket_addr, normalized_addr
1553 );
1554 }
1555
1556 let addr_list = vec![normalized_addr];
1558 let peer_id = match tokio::time::timeout(
1559 self.config.connection_timeout,
1560 self.dual_node.connect_happy_eyeballs(&addr_list),
1561 )
1562 .await
1563 {
1564 Ok(Ok(peer)) => {
1565 let connected_peer_id = ant_peer_id_to_string(&peer);
1566 info!("Successfully connected to peer: {}", connected_peer_id);
1567
1568 if connected_peer_id == self.peer_id {
1570 warn!(
1571 "Detected self-connection to own address {} (peer_id: {}), rejecting",
1572 address, connected_peer_id
1573 );
1574 self.dual_node.disconnect_peer(&peer).await;
1577 return Err(P2PError::Network(
1578 crate::error::NetworkError::InvalidAddress(
1579 format!("Cannot connect to self ({})", address).into(),
1580 ),
1581 ));
1582 }
1583
1584 connected_peer_id
1585 }
1586 Ok(Err(e)) => {
1587 warn!("Failed to connect to peer at {}: {}", address, e);
1588 return Err(P2PError::Transport(
1589 crate::error::TransportError::ConnectionFailed {
1590 addr: normalized_addr,
1591 reason: e.to_string().into(),
1592 },
1593 ));
1594 }
1595 Err(_) => {
1596 warn!(
1597 "Timed out connecting to peer at {} after {:?}",
1598 address, self.config.connection_timeout
1599 );
1600 return Err(P2PError::Timeout(self.config.connection_timeout));
1601 }
1602 };
1603
1604 let peer_info = PeerInfo {
1606 peer_id: peer_id.clone(),
1607 addresses: vec![address.to_string()],
1608 connected_at: Instant::now(),
1609 last_seen: Instant::now(),
1610 status: ConnectionStatus::Connected,
1611 protocols: vec!["p2p-foundation/1.0".to_string()],
1612 heartbeat_count: 0,
1613 };
1614
1615 self.peers.write().await.insert(peer_id.clone(), peer_info);
1617
1618 self.active_connections
1621 .write()
1622 .await
1623 .insert(peer_id.clone());
1624
1625 if let Some(ref resource_manager) = self.resource_manager {
1627 resource_manager.record_bandwidth(0, 0); }
1629
1630 let _ = self.event_tx.send(P2PEvent::PeerConnected(peer_id.clone()));
1632
1633 info!("Connected to peer: {}", peer_id);
1634 Ok(peer_id)
1635 }
1636
1637 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1643 info!("Disconnecting from peer: {}", peer_id);
1644
1645 self.dual_node.disconnect_peer_string(peer_id).await.ok();
1648
1649 self.active_connections.write().await.remove(peer_id);
1651
1652 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1653 peer_info.status = ConnectionStatus::Disconnected;
1654
1655 let _ = self
1657 .event_tx
1658 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1659
1660 info!("Disconnected from peer: {}", peer_id);
1661 }
1662
1663 Ok(())
1664 }
1665
1666 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1668 self.active_connections.read().await.contains(peer_id)
1669 }
1670
1671 pub async fn send_message(
1673 &self,
1674 peer_id: &PeerId,
1675 protocol: &str,
1676 data: Vec<u8>,
1677 ) -> Result<()> {
1678 debug!(
1679 "Sending message to peer {} on protocol {}",
1680 peer_id, protocol
1681 );
1682
1683 if let Some(ref resource_manager) = self.resource_manager
1685 && !resource_manager
1686 .check_rate_limit(peer_id, "message")
1687 .await?
1688 {
1689 return Err(P2PError::ResourceExhausted(
1690 format!("Rate limit exceeded for peer {}", peer_id).into(),
1691 ));
1692 }
1693
1694 if !self.peers.read().await.contains_key(peer_id) {
1696 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1697 peer_id.to_string().into(),
1698 )));
1699 }
1700
1701 if !self.is_connection_active(peer_id).await {
1704 debug!(
1705 "Connection to peer {} exists in peers map but ant-quic connection is closed",
1706 peer_id
1707 );
1708
1709 self.remove_peer(peer_id).await;
1711
1712 return Err(P2PError::Network(
1713 crate::error::NetworkError::ConnectionClosed {
1714 peer_id: peer_id.to_string().into(),
1715 },
1716 ));
1717 }
1718
1719 if let Some(ref resource_manager) = self.resource_manager {
1723 resource_manager.record_bandwidth(data.len() as u64, 0);
1724 }
1725
1726 let raw_data_len = data.len();
1728 let _message_data = self.create_protocol_message(protocol, data)?;
1729 info!(
1730 "Sending {} bytes to peer {} on protocol {} (raw data: {} bytes)",
1731 _message_data.len(),
1732 peer_id,
1733 protocol,
1734 raw_data_len
1735 );
1736
1737 let send_fut = self
1741 .dual_node
1742 .send_to_peer_string_optimized(peer_id, &_message_data);
1743 let result = tokio::time::timeout(self.config.connection_timeout, send_fut)
1744 .await
1745 .map_err(|_| {
1746 P2PError::Transport(crate::error::TransportError::StreamError(
1747 "Timed out sending message".into(),
1748 ))
1749 })?
1750 .map_err(|e| {
1751 P2PError::Transport(crate::error::TransportError::StreamError(
1752 e.to_string().into(),
1753 ))
1754 });
1755
1756 if result.is_ok() {
1757 info!(
1758 "Successfully sent {} bytes to peer {}",
1759 _message_data.len(),
1760 peer_id
1761 );
1762 } else {
1763 warn!("Failed to send message to peer {}", peer_id);
1764 }
1765
1766 result
1767 }
1768
1769 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1771 let timestamp = std::time::SystemTime::now()
1772 .duration_since(std::time::UNIX_EPOCH)
1773 .map_err(|e| {
1774 P2PError::Network(NetworkError::ProtocolError(
1775 format!("System time error: {}", e).into(),
1776 ))
1777 })?
1778 .as_secs();
1779
1780 let message = WireMessage {
1781 protocol: protocol.to_string(),
1782 data,
1783 from: self.peer_id.clone(),
1784 timestamp,
1785 };
1786
1787 postcard::to_stdvec(&message).map_err(|e| {
1788 P2PError::Transport(crate::error::TransportError::StreamError(
1789 format!("Failed to serialize message: {e}").into(),
1790 ))
1791 })
1792 }
1793
1794 }
1796
1797fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
1807 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1808
1809 debug!(
1810 "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}",
1811 message.protocol,
1812 source,
1813 message.from,
1814 message.data.len()
1815 );
1816
1817 Some(P2PEvent::Message {
1818 topic: message.protocol,
1819 source: source.to_string(),
1820 data: message.data,
1821 })
1822}
1823
1824impl P2PNode {
1825 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1827 self.event_tx.subscribe()
1828 }
1829
1830 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1832 self.subscribe_events()
1833 }
1834
1835 pub fn uptime(&self) -> Duration {
1837 self.start_time.elapsed()
1838 }
1839
1840 fn compute_binary_hash() -> [u8; 32] {
1849 if let Some(hash) = std::env::current_exe()
1851 .ok()
1852 .and_then(|exe_path| std::fs::read(&exe_path).ok())
1853 .map(|binary_data| blake3::hash(&binary_data))
1854 {
1855 return *hash.as_bytes();
1856 }
1857 let placeholder = format!(
1860 "saorsa-core-v{}-{}",
1861 env!("CARGO_PKG_VERSION"),
1862 std::env::consts::ARCH
1863 );
1864 let hash = blake3::hash(placeholder.as_bytes());
1865 *hash.as_bytes()
1866 }
1867
1868 #[must_use]
1870 pub fn binary_hash(&self) -> &[u8; 32] {
1871 &self.binary_hash
1872 }
1873
1874 #[must_use]
1876 pub fn entangled_id(&self) -> Option<&crate::attestation::EntangledId> {
1877 self.entangled_id.as_ref()
1878 }
1879
1880 pub fn set_entangled_id(&mut self, entangled_id: crate::attestation::EntangledId) {
1885 self.entangled_id = Some(entangled_id);
1886 }
1887
1888 pub fn verify_peer_attestation(
1917 &self,
1918 peer_id: &str,
1919 peer_entangled_id: &crate::attestation::EntangledId,
1920 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
1921 ) -> crate::attestation::EnforcementDecision {
1922 use crate::attestation::{
1923 AttestationRejection, AttestationRejectionReason, EnforcementDecision, EnforcementMode,
1924 };
1925
1926 let config = &self.config.attestation_config;
1927
1928 if !config.enabled {
1930 return EnforcementDecision::Skipped;
1931 }
1932
1933 let id_valid = peer_entangled_id.verify(peer_public_key);
1935
1936 let binary_hash = *peer_entangled_id.binary_hash();
1938 let binary_allowed = config.is_binary_allowed(&binary_hash);
1939
1940 match config.enforcement_mode {
1941 EnforcementMode::Off => EnforcementDecision::Skipped,
1942
1943 EnforcementMode::Soft => {
1944 if !id_valid {
1946 warn!(
1947 peer = %peer_id,
1948 binary_hash = %hex::encode(&binary_hash[..8]),
1949 "Peer attestation verification failed: Invalid entangled ID (soft mode - allowing)"
1950 );
1951 return EnforcementDecision::AllowWithWarning {
1952 reason: AttestationRejectionReason::IdentityMismatch,
1953 };
1954 }
1955 if !binary_allowed {
1956 warn!(
1957 peer = %peer_id,
1958 binary_hash = %hex::encode(binary_hash),
1959 "Peer attestation verification failed: Binary not in allowlist (soft mode - allowing)"
1960 );
1961 return EnforcementDecision::AllowWithWarning {
1962 reason: AttestationRejectionReason::BinaryNotAllowed { hash: binary_hash },
1963 };
1964 }
1965 EnforcementDecision::Allow
1966 }
1967
1968 EnforcementMode::Hard => {
1969 if !id_valid {
1971 error!(
1972 peer = %peer_id,
1973 binary_hash = %hex::encode(&binary_hash[..8]),
1974 "REJECTING peer: Invalid entangled ID derivation"
1975 );
1976 return EnforcementDecision::Reject {
1977 rejection: AttestationRejection::identity_mismatch(),
1978 };
1979 }
1980 if !binary_allowed {
1981 error!(
1982 peer = %peer_id,
1983 binary_hash = %hex::encode(binary_hash),
1984 "REJECTING peer: Binary not in allowlist"
1985 );
1986 return EnforcementDecision::Reject {
1987 rejection: AttestationRejection::binary_not_allowed(binary_hash),
1988 };
1989 }
1990
1991 info!(
1992 peer = %peer_id,
1993 entangled_id = %hex::encode(&peer_entangled_id.id()[..8]),
1994 "Peer attestation verified successfully (hard mode)"
1995 );
1996 EnforcementDecision::Allow
1997 }
1998 }
1999 }
2000
2001 #[must_use]
2009 pub fn verify_peer_attestation_simple(
2010 &self,
2011 peer_id: &str,
2012 peer_entangled_id: &crate::attestation::EntangledId,
2013 peer_public_key: &crate::quantum_crypto::ant_quic_integration::MlDsaPublicKey,
2014 ) -> bool {
2015 self.verify_peer_attestation(peer_id, peer_entangled_id, peer_public_key)
2016 .should_allow()
2017 }
2018
2019 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2029 if let Some(ref resource_manager) = self.resource_manager {
2030 Ok(resource_manager.get_metrics().await)
2031 } else {
2032 Err(P2PError::Network(
2033 crate::error::NetworkError::ProtocolError(
2034 "Production resource manager not enabled".to_string().into(),
2035 ),
2036 ))
2037 }
2038 }
2039
2040 #[allow(clippy::too_many_arguments)]
2046 async fn connection_lifecycle_monitor_with_rx(
2047 dual_node: Arc<DualStackNetworkNode>,
2048 mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
2049 active_connections: Arc<RwLock<HashSet<String>>>,
2050 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2051 event_tx: broadcast::Sender<P2PEvent>,
2052 geo_provider: Arc<BgpGeoProvider>,
2053 _local_peer_id: String,
2054 ) {
2055 use crate::transport::ant_quic_adapter::ConnectionEvent;
2056
2057 info!("Connection lifecycle monitor started (pre-subscribed receiver)");
2058
2059 loop {
2060 match event_rx.recv().await {
2061 Ok(event) => {
2062 match event {
2063 ConnectionEvent::Established {
2064 peer_id,
2065 remote_address,
2066 } => {
2067 let peer_id_str = ant_peer_id_to_string(&peer_id);
2068 debug!(
2069 "Connection established: peer={}, addr={}",
2070 peer_id_str, remote_address
2071 );
2072
2073 let ip = remote_address.ip();
2075 let is_rejected = match ip {
2076 std::net::IpAddr::V4(v4) => {
2077 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2078 geo_provider.is_hosting_asn(asn)
2079 || geo_provider.is_vpn_asn(asn)
2080 } else {
2081 false
2082 }
2083 }
2084 std::net::IpAddr::V6(v6) => {
2085 let info = geo_provider.lookup(v6);
2086 info.is_hosting_provider || info.is_vpn_provider
2087 }
2088 };
2089
2090 if is_rejected {
2091 info!(
2092 "Rejecting connection from {} ({}) due to GeoIP policy",
2093 peer_id_str, remote_address
2094 );
2095 dual_node.disconnect_peer(&peer_id).await;
2098 continue;
2099 }
2100
2101 active_connections.write().await.insert(peer_id_str.clone());
2103
2104 let mut peers_lock = peers.write().await;
2106 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2107 peer_info.status = ConnectionStatus::Connected;
2108 peer_info.connected_at = Instant::now();
2109 } else {
2110 debug!("Registering new incoming peer: {}", peer_id_str);
2111 peers_lock.insert(
2112 peer_id_str.clone(),
2113 PeerInfo {
2114 peer_id: peer_id_str.clone(),
2115 addresses: vec![remote_address.to_string()],
2116 status: ConnectionStatus::Connected,
2117 last_seen: Instant::now(),
2118 connected_at: Instant::now(),
2119 protocols: Vec::new(),
2120 heartbeat_count: 0,
2121 },
2122 );
2123 }
2124
2125 let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str));
2127 }
2128 ConnectionEvent::Lost { peer_id, reason } => {
2129 let peer_id_str = ant_peer_id_to_string(&peer_id);
2130 debug!("Connection lost: peer={}, reason={}", peer_id_str, reason);
2131
2132 active_connections.write().await.remove(&peer_id_str);
2134
2135 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2137 peer_info.status = ConnectionStatus::Disconnected;
2138 peer_info.last_seen = Instant::now();
2139 }
2140
2141 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2143 }
2144 ConnectionEvent::Failed { peer_id, reason } => {
2145 let peer_id_str = ant_peer_id_to_string(&peer_id);
2146 debug!("Connection failed: peer={}, reason={}", peer_id_str, reason);
2147
2148 active_connections.write().await.remove(&peer_id_str);
2150
2151 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2153 peer_info.status = ConnectionStatus::Disconnected;
2154 peer_info.last_seen = Instant::now();
2155 }
2156
2157 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str));
2159 }
2160 }
2161 }
2162 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2163 warn!(
2164 "Connection event receiver lagged, skipped {} events",
2165 skipped
2166 );
2167 }
2168 Err(broadcast::error::RecvError::Closed) => {
2169 info!("Connection event channel closed, stopping lifecycle monitor");
2170 break;
2171 }
2172 }
2173 }
2174 }
2175
2176 async fn start_connection_monitor(&self) {
2178 debug!("Connection monitor already running from initialization");
2182 }
2183
2184 async fn keepalive_task(
2190 active_connections: Arc<RwLock<HashSet<String>>>,
2191 dual_node: Arc<DualStackNetworkNode>,
2192 shutdown: Arc<AtomicBool>,
2193 ) {
2194 const KEEPALIVE_INTERVAL_SECS: u64 = 15; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2197 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2198
2199 info!(
2200 "Keepalive task started (interval: {}s)",
2201 KEEPALIVE_INTERVAL_SECS
2202 );
2203
2204 loop {
2205 if shutdown.load(Ordering::Relaxed) {
2207 info!("Keepalive task shutting down");
2208 break;
2209 }
2210
2211 interval.tick().await;
2212
2213 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2215
2216 if peers.is_empty() {
2217 trace!("Keepalive: no active connections");
2218 continue;
2219 }
2220
2221 debug!("Sending keepalive to {} active connections", peers.len());
2222
2223 let futs: Vec<_> = peers
2226 .into_iter()
2227 .map(|peer_id| {
2228 let node = Arc::clone(&dual_node);
2229 async move {
2230 if let Err(e) = node
2231 .send_to_peer_string_optimized(&peer_id, KEEPALIVE_PAYLOAD)
2232 .await
2233 {
2234 debug!(
2235 "Failed to send keepalive to peer {}: {} (connection may have closed)",
2236 peer_id, e
2237 );
2238 } else {
2239 trace!("Keepalive sent to peer: {}", peer_id);
2240 }
2241 }
2242 })
2243 .collect();
2244 futures::future::join_all(futs).await;
2245 }
2246
2247 info!("Keepalive task stopped");
2248 }
2249
2250 async fn periodic_maintenance_task(
2257 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2258 active_connections: Arc<RwLock<HashSet<PeerId>>>,
2259 event_tx: broadcast::Sender<P2PEvent>,
2260 stale_threshold: Duration,
2261 shutdown: Arc<AtomicBool>,
2262 ) {
2263 let cleanup_threshold = stale_threshold * 2;
2264 let mut interval = interval(Duration::from_millis(100));
2265 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2266
2267 info!(
2268 "Periodic maintenance task started (stale threshold: {:?})",
2269 stale_threshold
2270 );
2271
2272 loop {
2273 interval.tick().await;
2274
2275 if shutdown.load(Ordering::SeqCst) {
2276 break;
2277 }
2278
2279 let now = Instant::now();
2280 let mut peers_to_remove: Vec<PeerId> = Vec::new();
2281 let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2282
2283 {
2285 let peers_lock = peers.read().await;
2286 for (peer_id, peer_info) in peers_lock.iter() {
2287 let elapsed = now.duration_since(peer_info.last_seen);
2288
2289 match &peer_info.status {
2290 ConnectionStatus::Connected => {
2291 if elapsed > stale_threshold {
2292 debug!(
2293 peer_id = %peer_id,
2294 elapsed_secs = elapsed.as_secs(),
2295 "Peer went stale - marking for disconnection"
2296 );
2297 peers_to_mark_disconnected.push(peer_id.clone());
2298 }
2299 }
2300 ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2301 if elapsed > cleanup_threshold {
2302 trace!(
2303 peer_id = %peer_id,
2304 elapsed_secs = elapsed.as_secs(),
2305 "Removing disconnected peer from tracking"
2306 );
2307 peers_to_remove.push(peer_id.clone());
2308 }
2309 }
2310 ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2311 if elapsed > stale_threshold {
2312 debug!(
2313 peer_id = %peer_id,
2314 status = ?peer_info.status,
2315 "Connection timed out in transitional state"
2316 );
2317 peers_to_mark_disconnected.push(peer_id.clone());
2318 }
2319 }
2320 }
2321 }
2322 }
2323
2324 if !peers_to_mark_disconnected.is_empty() {
2329 let mut peers_lock = peers.write().await;
2330 for peer_id in &peers_to_mark_disconnected {
2331 if let Some(peer_info) = peers_lock.get_mut(peer_id) {
2332 peer_info.status = ConnectionStatus::Disconnected;
2333 }
2334 }
2335 }
2336
2337 for peer_id in &peers_to_mark_disconnected {
2339 active_connections.write().await.remove(peer_id);
2340 let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id.clone()));
2341 info!(peer_id = %peer_id, "Stale peer disconnected");
2342 }
2343
2344 if !peers_to_remove.is_empty() {
2346 let mut peers_lock = peers.write().await;
2347 for peer_id in &peers_to_remove {
2348 peers_lock.remove(peer_id);
2349 trace!(peer_id = %peer_id, "Peer removed from tracking");
2350 }
2351 }
2352 }
2353
2354 info!("Periodic maintenance task stopped");
2355 }
2356
2357 pub async fn health_check(&self) -> Result<()> {
2359 if let Some(ref resource_manager) = self.resource_manager {
2360 resource_manager.health_check().await
2361 } else {
2362 let peer_count = self.peer_count().await;
2364 if peer_count > self.config.max_connections {
2365 Err(P2PError::Network(
2366 crate::error::NetworkError::ProtocolError(
2367 format!("Too many connections: {peer_count}").into(),
2368 ),
2369 ))
2370 } else {
2371 Ok(())
2372 }
2373 }
2374 }
2375
2376 pub fn production_config(&self) -> Option<&ProductionConfig> {
2378 self.config.production_config.as_ref()
2379 }
2380
2381 pub fn is_production_mode(&self) -> bool {
2383 self.resource_manager.is_some()
2384 }
2385
2386 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2388 self.dht.as_ref()
2389 }
2390
2391 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2393 if let Some(ref dht) = self.dht {
2394 let mut dht_instance = dht.write().await;
2395 let dht_key = crate::dht::DhtKey::from_bytes(key);
2396 dht_instance
2397 .store(&dht_key, value.clone())
2398 .await
2399 .map_err(|e| {
2400 P2PError::Dht(crate::error::DhtError::StoreFailed(
2401 format!("{:?}: {e}", key).into(),
2402 ))
2403 })?;
2404
2405 Ok(())
2406 } else {
2407 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2408 "DHT not enabled".to_string().into(),
2409 )))
2410 }
2411 }
2412
2413 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2415 if let Some(ref dht) = self.dht {
2416 let dht_instance = dht.read().await;
2417 let dht_key = crate::dht::DhtKey::from_bytes(key);
2418 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2419 P2PError::Dht(crate::error::DhtError::StoreFailed(
2420 format!("Retrieve failed: {e}").into(),
2421 ))
2422 })?;
2423
2424 Ok(record_result)
2425 } else {
2426 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2427 "DHT not enabled".to_string().into(),
2428 )))
2429 }
2430 }
2431
2432 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2434 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2435 let manager = bootstrap_manager.write().await;
2436 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2437 .iter()
2438 .filter_map(|addr| addr.parse().ok())
2439 .collect();
2440 let contact = ContactEntry::new(peer_id, socket_addresses);
2441 manager.add_contact(contact).await.map_err(|e| {
2442 P2PError::Network(crate::error::NetworkError::ProtocolError(
2443 format!("Failed to add peer to bootstrap cache: {e}").into(),
2444 ))
2445 })?;
2446 }
2447 Ok(())
2448 }
2449
2450 pub async fn update_peer_metrics(
2452 &self,
2453 peer_id: &PeerId,
2454 success: bool,
2455 latency_ms: Option<u64>,
2456 _error: Option<String>,
2457 ) -> Result<()> {
2458 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2459 let manager = bootstrap_manager.write().await;
2460
2461 let metrics = QualityMetrics {
2463 success_rate: if success { 1.0 } else { 0.0 },
2464 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2465 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2467 last_successful_connection: if success {
2468 chrono::Utc::now()
2469 } else {
2470 chrono::Utc::now() - chrono::Duration::hours(1)
2471 },
2472 uptime_score: 0.5,
2473 };
2474
2475 manager
2476 .update_contact_metrics(peer_id, metrics)
2477 .await
2478 .map_err(|e| {
2479 P2PError::Network(crate::error::NetworkError::ProtocolError(
2480 format!("Failed to update peer metrics: {e}").into(),
2481 ))
2482 })?;
2483 }
2484 Ok(())
2485 }
2486
2487 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2489 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2490 let manager = bootstrap_manager.read().await;
2491 let stats = manager.get_stats().await.map_err(|e| {
2492 P2PError::Network(crate::error::NetworkError::ProtocolError(
2493 format!("Failed to get bootstrap stats: {e}").into(),
2494 ))
2495 })?;
2496 Ok(Some(stats))
2497 } else {
2498 Ok(None)
2499 }
2500 }
2501
2502 pub async fn cached_peer_count(&self) -> usize {
2504 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2505 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2506 {
2507 return stats.total_contacts;
2508 }
2509 0
2510 }
2511
2512 async fn connect_bootstrap_peers(&self) -> Result<()> {
2514 let mut bootstrap_contacts = Vec::new();
2515 let mut used_cache = false;
2516 let mut seen_addresses = std::collections::HashSet::new();
2517
2518 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2520 self.config.bootstrap_peers_str.clone()
2521 } else {
2522 self.config
2524 .bootstrap_peers
2525 .iter()
2526 .map(|addr| addr.to_string())
2527 .collect::<Vec<_>>()
2528 };
2529
2530 if !cli_bootstrap_peers.is_empty() {
2531 info!(
2532 "Using {} CLI-provided bootstrap peers (priority)",
2533 cli_bootstrap_peers.len()
2534 );
2535 for addr in &cli_bootstrap_peers {
2536 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2537 seen_addresses.insert(socket_addr);
2538 let contact = ContactEntry::new(
2539 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2540 vec![socket_addr],
2541 );
2542 bootstrap_contacts.push(contact);
2543 } else {
2544 warn!("Invalid bootstrap address format: {}", addr);
2545 }
2546 }
2547 }
2548
2549 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2552 let manager = bootstrap_manager.read().await;
2553 match manager.get_quic_bootstrap_peers(20).await {
2554 Ok(contacts) => {
2556 if !contacts.is_empty() {
2557 let mut added_from_cache = 0;
2558 for contact in contacts {
2559 let new_addresses: Vec<_> = contact
2561 .addresses
2562 .iter()
2563 .filter(|addr| !seen_addresses.contains(addr))
2564 .copied()
2565 .collect();
2566
2567 if !new_addresses.is_empty() {
2568 for addr in &new_addresses {
2569 seen_addresses.insert(*addr);
2570 }
2571 let mut contact = contact.clone();
2572 contact.addresses = new_addresses;
2573 bootstrap_contacts.push(contact);
2574 added_from_cache += 1;
2575 }
2576 }
2577 if added_from_cache > 0 {
2578 info!(
2579 "Added {} cached bootstrap peers (supplementing CLI peers)",
2580 added_from_cache
2581 );
2582 used_cache = true;
2583 }
2584 }
2585 }
2586 Err(e) => {
2587 warn!("Failed to get cached bootstrap peers: {}", e);
2588 }
2589 }
2590 }
2591
2592 if bootstrap_contacts.is_empty() {
2593 info!("No bootstrap peers configured and no cached peers available");
2594 return Ok(());
2595 }
2596
2597 let mut successful_connections = 0;
2599 for contact in bootstrap_contacts {
2600 for addr in &contact.addresses {
2601 match self.connect_peer(&addr.to_string()).await {
2602 Ok(peer_id) => {
2603 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2604 successful_connections += 1;
2605
2606 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2608 let manager = bootstrap_manager.write().await;
2609 let mut updated_contact = contact.clone();
2610 updated_contact.peer_id = peer_id.clone();
2611 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2614 warn!("Failed to update bootstrap cache: {}", e);
2615 }
2616 }
2617 break; }
2619 Err(e) => {
2620 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2621
2622 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2624 let manager = bootstrap_manager.write().await;
2625 let mut updated_contact = contact.clone();
2626 updated_contact.update_connection_result(
2627 false,
2628 None,
2629 Some(e.to_string()),
2630 );
2631
2632 if let Err(e) = manager.add_contact(updated_contact).await {
2633 warn!("Failed to update bootstrap cache: {}", e);
2634 }
2635 }
2636 }
2637 }
2638 }
2639 }
2640
2641 if successful_connections == 0 {
2642 if !used_cache {
2643 warn!("Failed to connect to any bootstrap peers");
2644 }
2645 return Ok(());
2648 }
2649 info!(
2650 "Successfully connected to {} bootstrap peers",
2651 successful_connections
2652 );
2653
2654 Ok(())
2655 }
2656
2657 async fn disconnect_all_peers(&self) -> Result<()> {
2659 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2660
2661 for peer_id in peer_ids {
2662 self.disconnect_peer(&peer_id).await?;
2663 }
2664
2665 Ok(())
2666 }
2667
2668 async fn periodic_tasks(&self) -> Result<()> {
2675 let stale_threshold = self.config.stale_peer_threshold;
2677 let cleanup_threshold = stale_threshold * 2;
2679
2680 let now = Instant::now();
2681 let mut peers_to_remove: Vec<PeerId> = Vec::new();
2682 let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2683
2684 {
2686 let peers = self.peers.read().await;
2687 for (peer_id, peer_info) in peers.iter() {
2688 let elapsed = now.duration_since(peer_info.last_seen);
2689
2690 match &peer_info.status {
2691 ConnectionStatus::Connected => {
2692 if elapsed > stale_threshold {
2694 debug!(
2695 peer_id = %peer_id,
2696 elapsed_secs = elapsed.as_secs(),
2697 "Peer went stale - marking for disconnection"
2698 );
2699 peers_to_mark_disconnected.push(peer_id.clone());
2700 }
2701 }
2702 ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2703 if elapsed > cleanup_threshold {
2705 trace!(
2706 peer_id = %peer_id,
2707 elapsed_secs = elapsed.as_secs(),
2708 "Removing disconnected peer from tracking"
2709 );
2710 peers_to_remove.push(peer_id.clone());
2711 }
2712 }
2713 ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2714 if elapsed > stale_threshold {
2716 debug!(
2717 peer_id = %peer_id,
2718 status = ?peer_info.status,
2719 "Connection timed out in transitional state"
2720 );
2721 peers_to_mark_disconnected.push(peer_id.clone());
2722 }
2723 }
2724 }
2725 }
2726 }
2727
2728 if !peers_to_mark_disconnected.is_empty() {
2730 let mut peers = self.peers.write().await;
2731 for peer_id in &peers_to_mark_disconnected {
2732 if let Some(peer_info) = peers.get_mut(peer_id) {
2733 peer_info.status = ConnectionStatus::Disconnected;
2734 peer_info.last_seen = now; }
2736 }
2737 }
2738
2739 for peer_id in &peers_to_mark_disconnected {
2741 self.active_connections.write().await.remove(peer_id);
2743
2744 let _ = self
2746 .event_tx
2747 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
2748
2749 info!(
2750 peer_id = %peer_id,
2751 "Stale peer disconnected"
2752 );
2753 }
2754
2755 if !peers_to_remove.is_empty() {
2757 let mut peers = self.peers.write().await;
2758 for peer_id in &peers_to_remove {
2759 peers.remove(peer_id);
2760 trace!(peer_id = %peer_id, "Peer removed from tracking");
2761 }
2762 }
2763
2764 Ok(())
2765 }
2766}
2767
2768#[async_trait::async_trait]
2770pub trait NetworkSender: Send + Sync {
2771 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2773
2774 fn local_peer_id(&self) -> &PeerId;
2776}
2777
2778#[derive(Clone)]
2780pub struct P2PNetworkSender {
2781 peer_id: PeerId,
2782 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2784}
2785
2786impl P2PNetworkSender {
2787 pub fn new(
2788 peer_id: PeerId,
2789 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2790 ) -> Self {
2791 Self { peer_id, send_tx }
2792 }
2793}
2794
2795#[async_trait::async_trait]
2797impl NetworkSender for P2PNetworkSender {
2798 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2800 self.send_tx
2801 .send((peer_id.clone(), protocol.to_string(), data))
2802 .map_err(|_| {
2803 P2PError::Network(crate::error::NetworkError::ProtocolError(
2804 "Failed to send message via channel".to_string().into(),
2805 ))
2806 })?;
2807 Ok(())
2808 }
2809
2810 fn local_peer_id(&self) -> &PeerId {
2812 &self.peer_id
2813 }
2814}
2815
2816pub struct NodeBuilder {
2818 config: NodeConfig,
2819}
2820
2821impl Default for NodeBuilder {
2822 fn default() -> Self {
2823 Self::new()
2824 }
2825}
2826
2827impl NodeBuilder {
2828 pub fn new() -> Self {
2830 Self {
2831 config: NodeConfig::default(),
2832 }
2833 }
2834
2835 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2837 self.config.peer_id = Some(peer_id);
2838 self
2839 }
2840
2841 pub fn listen_on(mut self, addr: &str) -> Self {
2843 if let Ok(multiaddr) = addr.parse() {
2844 self.config.listen_addrs.push(multiaddr);
2845 }
2846 self
2847 }
2848
2849 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
2851 if let Ok(multiaddr) = addr.parse() {
2852 self.config.bootstrap_peers.push(multiaddr);
2853 }
2854 self.config.bootstrap_peers_str.push(addr.to_string());
2855 self
2856 }
2857
2858 pub fn with_ipv6(mut self, enable: bool) -> Self {
2860 self.config.enable_ipv6 = enable;
2861 self
2862 }
2863
2864 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
2868 self.config.connection_timeout = timeout;
2869 self
2870 }
2871
2872 pub fn with_max_connections(mut self, max: usize) -> Self {
2874 self.config.max_connections = max;
2875 self
2876 }
2877
2878 pub fn with_production_mode(mut self) -> Self {
2880 self.config.production_config = Some(ProductionConfig::default());
2881 self
2882 }
2883
2884 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
2886 self.config.production_config = Some(production_config);
2887 self
2888 }
2889
2890 pub fn with_diversity_config(
2892 mut self,
2893 diversity_config: crate::security::IPDiversityConfig,
2894 ) -> Self {
2895 self.config.diversity_config = Some(diversity_config);
2896 self
2897 }
2898
2899 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
2901 self.config.dht_config = dht_config;
2902 self
2903 }
2904
2905 pub fn with_default_dht(mut self) -> Self {
2907 self.config.dht_config = DHTConfig::default();
2908 self
2909 }
2910
2911 pub async fn build(self) -> Result<P2PNode> {
2913 P2PNode::new(self.config).await
2914 }
2915}
2916
2917#[cfg(test)]
2918#[allow(clippy::unwrap_used, clippy::expect_used)]
2919mod diversity_tests {
2920 use super::*;
2921 use crate::security::IPDiversityConfig;
2922
2923 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2924 let diversity_config = config.diversity_config.clone().unwrap_or_default();
2925 let temp_dir = tempfile::TempDir::new().expect("temp dir");
2927 let mut cache_config = config.bootstrap_cache_config.clone().unwrap_or_default();
2928 cache_config.cache_dir = temp_dir.path().to_path_buf();
2929
2930 BootstrapManager::with_full_config(
2931 cache_config,
2932 crate::rate_limit::JoinRateLimiterConfig::default(),
2933 diversity_config,
2934 )
2935 .await
2936 .expect("bootstrap manager")
2937 }
2938
2939 #[tokio::test]
2940 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2941 let config = NodeConfig {
2942 diversity_config: Some(IPDiversityConfig::testnet()),
2943 ..Default::default()
2944 };
2945
2946 let manager = build_bootstrap_manager_like_prod(&config).await;
2947 assert!(manager.diversity_config().is_relaxed());
2948 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
2949 }
2950}
2951
2952async fn register_new_peer(
2954 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2955 peer_id: &PeerId,
2956 remote_addr: &NetworkAddress,
2957) {
2958 let mut peers_guard = peers.write().await;
2959 let peer_info = PeerInfo {
2960 peer_id: peer_id.clone(),
2961 addresses: vec![remote_addr.to_string()],
2962 connected_at: tokio::time::Instant::now(),
2963 last_seen: tokio::time::Instant::now(),
2964 status: ConnectionStatus::Connected,
2965 protocols: vec!["p2p-chat/1.0.0".to_string()],
2966 heartbeat_count: 0,
2967 };
2968 peers_guard.insert(peer_id.clone(), peer_info);
2969}
2970
2971#[cfg(test)]
2972mod tests {
2973 use super::*;
2974 use std::time::Duration;
2976 use tokio::time::timeout;
2977
2978 fn create_test_node_config() -> NodeConfig {
2984 NodeConfig {
2985 peer_id: Some("test_peer_123".to_string()),
2986 listen_addrs: vec![
2987 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2988 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2989 ],
2990 listen_addr: std::net::SocketAddr::new(
2991 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2992 0,
2993 ),
2994 bootstrap_peers: vec![],
2995 bootstrap_peers_str: vec![],
2996 enable_ipv6: true,
2997
2998 connection_timeout: Duration::from_secs(2),
2999 keep_alive_interval: Duration::from_secs(30),
3000 max_connections: 100,
3001 max_incoming_connections: 50,
3002 dht_config: DHTConfig::default(),
3003 security_config: SecurityConfig::default(),
3004 production_config: None,
3005 bootstrap_cache_config: None,
3006 diversity_config: None,
3007 attestation_config: crate::attestation::AttestationConfig::default(),
3008 stale_peer_threshold: default_stale_peer_threshold(),
3009 }
3010 }
3011
3012 #[tokio::test]
3016 async fn test_node_config_default() {
3017 let config = NodeConfig::default();
3018
3019 assert!(config.peer_id.is_none());
3020 assert_eq!(config.listen_addrs.len(), 2);
3021 assert!(config.enable_ipv6);
3022 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
3024 assert_eq!(config.connection_timeout, Duration::from_secs(30));
3025 }
3026
3027 #[tokio::test]
3028 async fn test_dht_config_default() {
3029 let config = DHTConfig::default();
3030
3031 assert_eq!(config.k_value, 20);
3032 assert_eq!(config.alpha_value, 5);
3033 assert_eq!(config.record_ttl, Duration::from_secs(3600));
3034 assert_eq!(config.refresh_interval, Duration::from_secs(600));
3035 }
3036
3037 #[tokio::test]
3038 async fn test_security_config_default() {
3039 let config = SecurityConfig::default();
3040
3041 assert!(config.enable_noise);
3042 assert!(config.enable_tls);
3043 assert_eq!(config.trust_level, TrustLevel::Basic);
3044 }
3045
3046 #[test]
3047 fn test_trust_level_variants() {
3048 let _none = TrustLevel::None;
3050 let _basic = TrustLevel::Basic;
3051 let _full = TrustLevel::Full;
3052
3053 assert_eq!(TrustLevel::None, TrustLevel::None);
3055 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3056 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3057 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3058 }
3059
3060 #[test]
3061 fn test_connection_status_variants() {
3062 let connecting = ConnectionStatus::Connecting;
3063 let connected = ConnectionStatus::Connected;
3064 let disconnecting = ConnectionStatus::Disconnecting;
3065 let disconnected = ConnectionStatus::Disconnected;
3066 let failed = ConnectionStatus::Failed("test error".to_string());
3067
3068 assert_eq!(connecting, ConnectionStatus::Connecting);
3069 assert_eq!(connected, ConnectionStatus::Connected);
3070 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3071 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3072 assert_ne!(connecting, connected);
3073
3074 if let ConnectionStatus::Failed(msg) = failed {
3075 assert_eq!(msg, "test error");
3076 } else {
3077 panic!("Expected Failed status");
3078 }
3079 }
3080
3081 #[tokio::test]
3082 async fn test_node_creation() -> Result<()> {
3083 let config = create_test_node_config();
3084 let node = P2PNode::new(config).await?;
3085
3086 assert_eq!(node.peer_id(), "test_peer_123");
3087 assert!(!node.is_running().await);
3088 assert_eq!(node.peer_count().await, 0);
3089 assert!(node.connected_peers().await.is_empty());
3090
3091 Ok(())
3092 }
3093
3094 #[tokio::test]
3095 async fn test_node_creation_without_peer_id() -> Result<()> {
3096 let mut config = create_test_node_config();
3097 config.peer_id = None;
3098
3099 let node = P2PNode::new(config).await?;
3100
3101 assert!(node.peer_id().starts_with("peer_"));
3103 assert!(!node.is_running().await);
3104
3105 Ok(())
3106 }
3107
3108 #[tokio::test]
3109 async fn test_node_lifecycle() -> Result<()> {
3110 let config = create_test_node_config();
3111 let node = P2PNode::new(config).await?;
3112
3113 assert!(!node.is_running().await);
3115
3116 node.start().await?;
3118 assert!(node.is_running().await);
3119
3120 let listen_addrs = node.listen_addrs().await;
3122 assert!(
3123 !listen_addrs.is_empty(),
3124 "Expected at least one listening address"
3125 );
3126
3127 node.stop().await?;
3129 assert!(!node.is_running().await);
3130
3131 Ok(())
3132 }
3133
3134 #[tokio::test]
3135 async fn test_peer_connection() -> Result<()> {
3136 let config1 = create_test_node_config();
3137 let mut config2 = create_test_node_config();
3138 config2.peer_id = Some("test_peer_456".to_string());
3139
3140 let node1 = P2PNode::new(config1).await?;
3141 let node2 = P2PNode::new(config2).await?;
3142
3143 node1.start().await?;
3144 node2.start().await?;
3145
3146 let node2_addr = node2
3147 .listen_addrs()
3148 .await
3149 .into_iter()
3150 .find(|a| a.ip().is_ipv4())
3151 .ok_or_else(|| {
3152 P2PError::Network(crate::error::NetworkError::InvalidAddress(
3153 "Node 2 did not expose an IPv4 listen address".into(),
3154 ))
3155 })?;
3156
3157 let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3159
3160 assert_eq!(node1.peer_count().await, 1);
3162
3163 let connected_peers = node1.connected_peers().await;
3165 assert_eq!(connected_peers.len(), 1);
3166 assert_eq!(connected_peers[0], peer_id);
3167
3168 let peer_info = node1.peer_info(&peer_id).await;
3170 assert!(peer_info.is_some());
3171 let info = peer_info.expect("Peer info should exist after adding peer");
3172 assert_eq!(info.peer_id, peer_id);
3173 assert_eq!(info.status, ConnectionStatus::Connected);
3174 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3175
3176 node1.disconnect_peer(&peer_id).await?;
3178 assert_eq!(node1.peer_count().await, 0);
3179
3180 node1.stop().await?;
3181 node2.stop().await?;
3182
3183 Ok(())
3184 }
3185
3186 #[cfg_attr(target_os = "windows", ignore)]
3193 #[tokio::test]
3194 async fn test_event_subscription() -> Result<()> {
3195 let ipv4_localhost =
3200 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3201
3202 let mut config1 = create_test_node_config();
3203 config1.listen_addr = ipv4_localhost;
3204 config1.listen_addrs = vec![ipv4_localhost];
3205 config1.enable_ipv6 = false;
3206
3207 let mut config2 = create_test_node_config();
3208 config2.peer_id = Some("test_peer_456".to_string());
3209 config2.listen_addr = ipv4_localhost;
3210 config2.listen_addrs = vec![ipv4_localhost];
3211 config2.enable_ipv6 = false;
3212
3213 let node1 = P2PNode::new(config1).await?;
3214 let node2 = P2PNode::new(config2).await?;
3215
3216 node1.start().await?;
3217 node2.start().await?;
3218
3219 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3222
3223 let mut events = node1.subscribe_events();
3224
3225 let node2_addr = node2.local_addr().ok_or_else(|| {
3227 P2PError::Network(crate::error::NetworkError::ProtocolError(
3228 "No listening address".to_string().into(),
3229 ))
3230 })?;
3231
3232 let mut peer_id = None;
3235 for attempt in 0..3 {
3236 if attempt > 0 {
3237 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3238 }
3239 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3240 Ok(Ok(id)) => {
3241 peer_id = Some(id);
3242 break;
3243 }
3244 Ok(Err(_)) | Err(_) => continue,
3245 }
3246 }
3247 let peer_id = peer_id.ok_or_else(|| {
3248 P2PError::Network(crate::error::NetworkError::ProtocolError(
3249 "Failed to connect after 3 attempts".to_string().into(),
3250 ))
3251 })?;
3252
3253 let event = timeout(Duration::from_secs(2), events.recv()).await;
3255 assert!(event.is_ok());
3256
3257 let event_result = event
3258 .expect("Should receive event")
3259 .expect("Event should not be error");
3260 match event_result {
3261 P2PEvent::PeerConnected(event_peer_id) => {
3262 assert_eq!(event_peer_id, peer_id);
3263 }
3264 _ => panic!("Expected PeerConnected event"),
3265 }
3266
3267 node1.disconnect_peer(&peer_id).await?;
3269
3270 let event = timeout(Duration::from_secs(2), events.recv()).await;
3272 assert!(event.is_ok());
3273
3274 let event_result = event
3275 .expect("Should receive event")
3276 .expect("Event should not be error");
3277 match event_result {
3278 P2PEvent::PeerDisconnected(event_peer_id) => {
3279 assert_eq!(event_peer_id, peer_id);
3280 }
3281 _ => panic!("Expected PeerDisconnected event"),
3282 }
3283
3284 node1.stop().await?;
3285 node2.stop().await?;
3286
3287 Ok(())
3288 }
3289
3290 #[cfg_attr(target_os = "windows", ignore)]
3292 #[tokio::test]
3293 async fn test_message_sending() -> Result<()> {
3294 let mut config1 = create_test_node_config();
3296 config1.listen_addr =
3297 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3298 let node1 = P2PNode::new(config1).await?;
3299 node1.start().await?;
3300
3301 let mut config2 = create_test_node_config();
3302 config2.listen_addr =
3303 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3304 let node2 = P2PNode::new(config2).await?;
3305 node2.start().await?;
3306
3307 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3309
3310 let node2_addr = node2.local_addr().ok_or_else(|| {
3312 P2PError::Network(crate::error::NetworkError::ProtocolError(
3313 "No listening address".to_string().into(),
3314 ))
3315 })?;
3316
3317 let peer_id =
3319 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3320 Ok(res) => res?,
3321 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3322 };
3323
3324 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3326
3327 let message_data = b"Hello, peer!".to_vec();
3329 let result = match timeout(
3330 Duration::from_millis(500),
3331 node1.send_message(&peer_id, "test-protocol", message_data),
3332 )
3333 .await
3334 {
3335 Ok(res) => res,
3336 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3337 };
3338 if let Err(e) = &result {
3341 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3342 }
3343
3344 let non_existent_peer = "non_existent_peer".to_string();
3346 let result = node1
3347 .send_message(&non_existent_peer, "test-protocol", vec![])
3348 .await;
3349 assert!(result.is_err(), "Sending to non-existent peer should fail");
3350
3351 Ok(())
3352 }
3353
3354 #[tokio::test]
3355 async fn test_remote_mcp_operations() -> Result<()> {
3356 let config = create_test_node_config();
3357 let node = P2PNode::new(config).await?;
3358
3359 node.start().await?;
3361 node.stop().await?;
3362 Ok(())
3363 }
3364
3365 #[tokio::test]
3366 async fn test_health_check() -> Result<()> {
3367 let config = create_test_node_config();
3368 let node = P2PNode::new(config).await?;
3369
3370 let result = node.health_check().await;
3372 assert!(result.is_ok());
3373
3374 Ok(())
3379 }
3380
3381 #[tokio::test]
3382 async fn test_node_uptime() -> Result<()> {
3383 let config = create_test_node_config();
3384 let node = P2PNode::new(config).await?;
3385
3386 let uptime1 = node.uptime();
3387 assert!(uptime1 >= Duration::from_secs(0));
3388
3389 tokio::time::sleep(Duration::from_millis(10)).await;
3391
3392 let uptime2 = node.uptime();
3393 assert!(uptime2 > uptime1);
3394
3395 Ok(())
3396 }
3397
3398 #[tokio::test]
3399 async fn test_node_config_access() -> Result<()> {
3400 let config = create_test_node_config();
3401 let expected_peer_id = config.peer_id.clone();
3402 let node = P2PNode::new(config).await?;
3403
3404 let node_config = node.config();
3405 assert_eq!(node_config.peer_id, expected_peer_id);
3406 assert_eq!(node_config.max_connections, 100);
3407 Ok(())
3410 }
3411
3412 #[tokio::test]
3413 async fn test_mcp_server_access() -> Result<()> {
3414 let config = create_test_node_config();
3415 let _node = P2PNode::new(config).await?;
3416
3417 Ok(())
3419 }
3420
3421 #[tokio::test]
3422 async fn test_dht_access() -> Result<()> {
3423 let config = create_test_node_config();
3424 let node = P2PNode::new(config).await?;
3425
3426 assert!(node.dht().is_some());
3428
3429 Ok(())
3430 }
3431
3432 #[tokio::test]
3433 async fn test_node_builder() -> Result<()> {
3434 let builder = P2PNode::builder()
3436 .with_peer_id("builder_test_peer".to_string())
3437 .listen_on("/ip4/127.0.0.1/tcp/0")
3438 .listen_on("/ip6/::1/tcp/0")
3439 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3441 .with_connection_timeout(Duration::from_secs(15))
3442 .with_max_connections(200);
3443
3444 let config = builder.config;
3446 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3447 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3450 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3451 assert_eq!(config.max_connections, 200);
3452
3453 Ok(())
3454 }
3455
3456 #[tokio::test]
3457 async fn test_bootstrap_peers() -> Result<()> {
3458 let mut config = create_test_node_config();
3459 config.bootstrap_peers = vec![
3460 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3461 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3462 ];
3463
3464 let node = P2PNode::new(config).await?;
3465
3466 node.start().await?;
3468
3469 let _peer_count = node.peer_count().await;
3473
3474 node.stop().await?;
3475 Ok(())
3476 }
3477
3478 #[tokio::test]
3479 async fn test_production_mode_disabled() -> Result<()> {
3480 let config = create_test_node_config();
3481 let node = P2PNode::new(config).await?;
3482
3483 assert!(!node.is_production_mode());
3484 assert!(node.production_config().is_none());
3485
3486 let result = node.resource_metrics().await;
3488 assert!(result.is_err());
3489 assert!(result.unwrap_err().to_string().contains("not enabled"));
3490
3491 Ok(())
3492 }
3493
3494 #[tokio::test]
3495 async fn test_network_event_variants() {
3496 let peer_id = "test_peer".to_string();
3498 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3499
3500 let _peer_connected = NetworkEvent::PeerConnected {
3501 peer_id: peer_id.clone(),
3502 addresses: vec![address.clone()],
3503 };
3504
3505 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3506 peer_id: peer_id.clone(),
3507 reason: "test disconnect".to_string(),
3508 };
3509
3510 let _message_received = NetworkEvent::MessageReceived {
3511 peer_id: peer_id.clone(),
3512 protocol: "test-protocol".to_string(),
3513 data: vec![1, 2, 3],
3514 };
3515
3516 let _connection_failed = NetworkEvent::ConnectionFailed {
3517 peer_id: Some(peer_id.clone()),
3518 address: address.clone(),
3519 error: "connection refused".to_string(),
3520 };
3521
3522 let _dht_stored = NetworkEvent::DHTRecordStored {
3523 key: vec![1, 2, 3],
3524 value: vec![4, 5, 6],
3525 };
3526
3527 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3528 key: vec![1, 2, 3],
3529 value: Some(vec![4, 5, 6]),
3530 };
3531 }
3532
3533 #[tokio::test]
3534 async fn test_peer_info_structure() {
3535 let peer_info = PeerInfo {
3536 peer_id: "test_peer".to_string(),
3537 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3538 connected_at: Instant::now(),
3539 last_seen: Instant::now(),
3540 status: ConnectionStatus::Connected,
3541 protocols: vec!["test-protocol".to_string()],
3542 heartbeat_count: 0,
3543 };
3544
3545 assert_eq!(peer_info.peer_id, "test_peer");
3546 assert_eq!(peer_info.addresses.len(), 1);
3547 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3548 assert_eq!(peer_info.protocols.len(), 1);
3549 }
3550
3551 #[tokio::test]
3552 async fn test_serialization() -> Result<()> {
3553 let config = create_test_node_config();
3555 let serialized = serde_json::to_string(&config)?;
3556 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3557
3558 assert_eq!(config.peer_id, deserialized.peer_id);
3559 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3560 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3561
3562 Ok(())
3563 }
3564
3565 #[tokio::test]
3566 async fn test_get_peer_id_by_address_found() -> Result<()> {
3567 let config = create_test_node_config();
3568 let node = P2PNode::new(config).await?;
3569
3570 let test_peer_id = "peer_test_123".to_string();
3572 let test_address = "192.168.1.100:9000".to_string();
3573
3574 let peer_info = PeerInfo {
3575 peer_id: test_peer_id.clone(),
3576 addresses: vec![test_address.clone()],
3577 connected_at: Instant::now(),
3578 last_seen: Instant::now(),
3579 status: ConnectionStatus::Connected,
3580 protocols: vec!["test-protocol".to_string()],
3581 heartbeat_count: 0,
3582 };
3583
3584 node.peers
3585 .write()
3586 .await
3587 .insert(test_peer_id.clone(), peer_info);
3588
3589 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3591 assert_eq!(found_peer_id, Some(test_peer_id));
3592
3593 Ok(())
3594 }
3595
3596 #[tokio::test]
3597 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3598 let config = create_test_node_config();
3599 let node = P2PNode::new(config).await?;
3600
3601 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3603 assert_eq!(result, None);
3604
3605 Ok(())
3606 }
3607
3608 #[tokio::test]
3609 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3610 let config = create_test_node_config();
3611 let node = P2PNode::new(config).await?;
3612
3613 let result = node.get_peer_id_by_address("invalid-address").await;
3615 assert_eq!(result, None);
3616
3617 Ok(())
3618 }
3619
3620 #[tokio::test]
3621 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3622 let config = create_test_node_config();
3623 let node = P2PNode::new(config).await?;
3624
3625 let peer1_id = "peer_1".to_string();
3627 let peer1_addr = "192.168.1.101:9001".to_string();
3628
3629 let peer2_id = "peer_2".to_string();
3630 let peer2_addr = "192.168.1.102:9002".to_string();
3631
3632 let peer1_info = PeerInfo {
3633 peer_id: peer1_id.clone(),
3634 addresses: vec![peer1_addr.clone()],
3635 connected_at: Instant::now(),
3636 last_seen: Instant::now(),
3637 status: ConnectionStatus::Connected,
3638 protocols: vec!["test-protocol".to_string()],
3639 heartbeat_count: 0,
3640 };
3641
3642 let peer2_info = PeerInfo {
3643 peer_id: peer2_id.clone(),
3644 addresses: vec![peer2_addr.clone()],
3645 connected_at: Instant::now(),
3646 last_seen: Instant::now(),
3647 status: ConnectionStatus::Connected,
3648 protocols: vec!["test-protocol".to_string()],
3649 heartbeat_count: 0,
3650 };
3651
3652 node.peers
3653 .write()
3654 .await
3655 .insert(peer1_id.clone(), peer1_info);
3656 node.peers
3657 .write()
3658 .await
3659 .insert(peer2_id.clone(), peer2_info);
3660
3661 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3663 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3664
3665 assert_eq!(found_peer1, Some(peer1_id));
3666 assert_eq!(found_peer2, Some(peer2_id));
3667
3668 Ok(())
3669 }
3670
3671 #[tokio::test]
3672 async fn test_list_active_connections_empty() -> Result<()> {
3673 let config = create_test_node_config();
3674 let node = P2PNode::new(config).await?;
3675
3676 let connections = node.list_active_connections().await;
3678 assert!(connections.is_empty());
3679
3680 Ok(())
3681 }
3682
3683 #[tokio::test]
3684 async fn test_list_active_connections_with_peers() -> Result<()> {
3685 let config = create_test_node_config();
3686 let node = P2PNode::new(config).await?;
3687
3688 let peer1_id = "peer_1".to_string();
3690 let peer1_addrs = vec![
3691 "192.168.1.101:9001".to_string(),
3692 "192.168.1.101:9002".to_string(),
3693 ];
3694
3695 let peer2_id = "peer_2".to_string();
3696 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3697
3698 let peer1_info = PeerInfo {
3699 peer_id: peer1_id.clone(),
3700 addresses: peer1_addrs.clone(),
3701 connected_at: Instant::now(),
3702 last_seen: Instant::now(),
3703 status: ConnectionStatus::Connected,
3704 protocols: vec!["test-protocol".to_string()],
3705 heartbeat_count: 0,
3706 };
3707
3708 let peer2_info = PeerInfo {
3709 peer_id: peer2_id.clone(),
3710 addresses: peer2_addrs.clone(),
3711 connected_at: Instant::now(),
3712 last_seen: Instant::now(),
3713 status: ConnectionStatus::Connected,
3714 protocols: vec!["test-protocol".to_string()],
3715 heartbeat_count: 0,
3716 };
3717
3718 node.peers
3719 .write()
3720 .await
3721 .insert(peer1_id.clone(), peer1_info);
3722 node.peers
3723 .write()
3724 .await
3725 .insert(peer2_id.clone(), peer2_info);
3726
3727 node.active_connections
3729 .write()
3730 .await
3731 .insert(peer1_id.clone());
3732 node.active_connections
3733 .write()
3734 .await
3735 .insert(peer2_id.clone());
3736
3737 let connections = node.list_active_connections().await;
3739 assert_eq!(connections.len(), 2);
3740
3741 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3743 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3744
3745 assert!(peer1_conn.is_some());
3746 assert!(peer2_conn.is_some());
3747
3748 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3750 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3751
3752 Ok(())
3753 }
3754
3755 #[tokio::test]
3756 async fn test_remove_peer_success() -> Result<()> {
3757 let config = create_test_node_config();
3758 let node = P2PNode::new(config).await?;
3759
3760 let peer_id = "peer_to_remove".to_string();
3762 let peer_info = PeerInfo {
3763 peer_id: peer_id.clone(),
3764 addresses: vec!["192.168.1.100:9000".to_string()],
3765 connected_at: Instant::now(),
3766 last_seen: Instant::now(),
3767 status: ConnectionStatus::Connected,
3768 protocols: vec!["test-protocol".to_string()],
3769 heartbeat_count: 0,
3770 };
3771
3772 node.peers.write().await.insert(peer_id.clone(), peer_info);
3773
3774 assert!(node.is_peer_connected(&peer_id).await);
3776
3777 let removed = node.remove_peer(&peer_id).await;
3779 assert!(removed);
3780
3781 assert!(!node.is_peer_connected(&peer_id).await);
3783
3784 Ok(())
3785 }
3786
3787 #[tokio::test]
3788 async fn test_remove_peer_nonexistent() -> Result<()> {
3789 let config = create_test_node_config();
3790 let node = P2PNode::new(config).await?;
3791
3792 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3794 assert!(!removed);
3795
3796 Ok(())
3797 }
3798
3799 #[tokio::test]
3800 async fn test_is_peer_connected() -> Result<()> {
3801 let config = create_test_node_config();
3802 let node = P2PNode::new(config).await?;
3803
3804 let peer_id = "test_peer".to_string();
3805
3806 assert!(!node.is_peer_connected(&peer_id).await);
3808
3809 let peer_info = PeerInfo {
3811 peer_id: peer_id.clone(),
3812 addresses: vec!["192.168.1.100:9000".to_string()],
3813 connected_at: Instant::now(),
3814 last_seen: Instant::now(),
3815 status: ConnectionStatus::Connected,
3816 protocols: vec!["test-protocol".to_string()],
3817 heartbeat_count: 0,
3818 };
3819
3820 node.peers.write().await.insert(peer_id.clone(), peer_info);
3821
3822 assert!(node.is_peer_connected(&peer_id).await);
3824
3825 node.remove_peer(&peer_id).await;
3827
3828 assert!(!node.is_peer_connected(&peer_id).await);
3830
3831 Ok(())
3832 }
3833
3834 #[test]
3835 fn test_normalize_ipv6_wildcard() {
3836 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3837
3838 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3839 let normalized = normalize_wildcard_to_loopback(wildcard);
3840
3841 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3842 assert_eq!(normalized.port(), 8080);
3843 }
3844
3845 #[test]
3846 fn test_normalize_ipv4_wildcard() {
3847 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3848
3849 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3850 let normalized = normalize_wildcard_to_loopback(wildcard);
3851
3852 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3853 assert_eq!(normalized.port(), 9000);
3854 }
3855
3856 #[test]
3857 fn test_normalize_specific_address_unchanged() {
3858 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3859 let normalized = normalize_wildcard_to_loopback(specific);
3860
3861 assert_eq!(normalized, specific);
3862 }
3863
3864 #[test]
3865 fn test_normalize_loopback_unchanged() {
3866 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3867
3868 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3869 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3870 assert_eq!(normalized_v6, loopback_v6);
3871
3872 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3873 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3874 assert_eq!(normalized_v4, loopback_v4);
3875 }
3876
3877 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3881 let msg = WireMessage {
3882 protocol: protocol.to_string(),
3883 data,
3884 from: from.to_string(),
3885 timestamp,
3886 };
3887 postcard::to_stdvec(&msg).unwrap()
3888 }
3889
3890 #[test]
3891 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3892 let transport_id = "abcdef0123456789";
3896 let logical_id = "spoofed-logical-id";
3897 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, 1000);
3898
3899 let event =
3900 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3901
3902 match event {
3903 P2PEvent::Message {
3904 topic,
3905 source,
3906 data,
3907 } => {
3908 assert_eq!(source, transport_id, "source must be the transport peer ID");
3909 assert_ne!(
3910 source, logical_id,
3911 "source must NOT be the logical 'from' field"
3912 );
3913 assert_eq!(topic, "test/v1");
3914 assert_eq!(data, vec![1u8, 2, 3]);
3915 }
3916 other => panic!("expected P2PEvent::Message, got {:?}", other),
3917 }
3918 }
3919
3920 #[test]
3921 fn test_parse_protocol_message_rejects_invalid_bytes() {
3922 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3924 }
3925
3926 #[test]
3927 fn test_parse_protocol_message_rejects_truncated_message() {
3928 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", 1000);
3930 let truncated = &full_bytes[..full_bytes.len() / 2];
3931 assert!(parse_protocol_message(truncated, "peer-id").is_none());
3932 }
3933
3934 #[test]
3935 fn test_parse_protocol_message_empty_payload() {
3936 let bytes = make_wire_bytes("ping", vec![], "sender", 1000);
3937
3938 let event = parse_protocol_message(&bytes, "transport-peer")
3939 .expect("valid message with empty data should parse");
3940
3941 match event {
3942 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3943 other => panic!("expected P2PEvent::Message, got {:?}", other),
3944 }
3945 }
3946
3947 #[test]
3948 fn test_parse_protocol_message_preserves_binary_payload() {
3949 let payload: Vec<u8> = (0..=255).collect();
3951 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", 42);
3952
3953 let event = parse_protocol_message(&bytes, "peer-id")
3954 .expect("valid message with full byte range should parse");
3955
3956 match event {
3957 P2PEvent::Message { data, topic, .. } => {
3958 assert_eq!(topic, "binary/v1");
3959 assert_eq!(
3960 data, payload,
3961 "payload must survive bincode round-trip exactly"
3962 );
3963 }
3964 other => panic!("expected P2PEvent::Message, got {:?}", other),
3965 }
3966 }
3967}