1#[cfg(feature = "adaptive-ml")]
20use crate::adaptive::{EigenTrustEngine, NodeId as AdaptiveNodeId, NodeStatisticsUpdate};
21use crate::bgp_geo_provider::BgpGeoProvider;
22use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
23use crate::config::Config;
24use crate::dht::DHT;
25use crate::error::{NetworkError, P2PError, P2pResult as Result};
26use crate::security::GeoProvider;
27
28use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
29use crate::transport::ant_quic_adapter::{DualStackNetworkNode, ant_peer_id_to_string};
30use crate::validation::RateLimitConfig;
31use crate::validation::RateLimiter;
32use crate::{NetworkAddress, PeerId};
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::Duration;
38use tokio::sync::{RwLock, broadcast};
39use tokio::time::{Instant, interval};
40use tracing::{debug, info, trace, warn};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
48struct WireMessage {
49 protocol: String,
51 data: Vec<u8>,
53 from: String,
55 timestamp: u64,
57}
58
59const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";
61
62const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct NodeConfig {
68 pub peer_id: Option<PeerId>,
70
71 pub listen_addrs: Vec<std::net::SocketAddr>,
73
74 pub listen_addr: std::net::SocketAddr,
76
77 pub bootstrap_peers: Vec<std::net::SocketAddr>,
79
80 pub bootstrap_peers_str: Vec<String>,
82
83 pub enable_ipv6: bool,
85
86 pub connection_timeout: Duration,
89
90 pub keep_alive_interval: Duration,
92
93 pub max_connections: usize,
95
96 pub max_incoming_connections: usize,
98
99 pub dht_config: DHTConfig,
101
102 pub security_config: SecurityConfig,
104
105 pub production_config: Option<ProductionConfig>,
107
108 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
110
111 pub diversity_config: Option<crate::security::IPDiversityConfig>,
116
117 #[serde(default = "default_stale_peer_threshold")]
120 pub stale_peer_threshold: Duration,
121}
122
123fn default_stale_peer_threshold() -> Duration {
125 Duration::from_secs(60)
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct DHTConfig {
131 pub k_value: usize,
133
134 pub alpha_value: usize,
136
137 pub record_ttl: Duration,
139
140 pub refresh_interval: Duration,
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SecurityConfig {
147 pub enable_noise: bool,
149
150 pub enable_tls: bool,
152
153 pub trust_level: TrustLevel,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
159pub enum TrustLevel {
160 None,
162 Basic,
164 Full,
166}
167
168#[inline]
176fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
177 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
178
179 if ipv6_enabled {
180 addrs.push(std::net::SocketAddr::new(
181 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
182 port,
183 ));
184 }
185
186 addrs.push(std::net::SocketAddr::new(
187 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
188 port,
189 ));
190
191 addrs
192}
193
194impl NodeConfig {
195 pub fn new() -> Result<Self> {
201 let config = Config::default();
202 let listen_addr = config.listen_socket_addr()?;
203
204 Ok(Self {
205 peer_id: None,
206 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
207 listen_addr,
208 bootstrap_peers: Vec::new(),
209 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
210 enable_ipv6: config.network.ipv6_enabled,
211 connection_timeout: Duration::from_secs(config.network.connection_timeout),
212 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
213 max_connections: config.network.max_connections,
214 max_incoming_connections: config.security.connection_limit as usize,
215 dht_config: DHTConfig::default(),
216 security_config: SecurityConfig::default(),
217 production_config: None,
218 bootstrap_cache_config: None,
219 diversity_config: None,
220 stale_peer_threshold: default_stale_peer_threshold(),
221 })
222 }
223
224 pub fn builder() -> NodeConfigBuilder {
226 NodeConfigBuilder::default()
227 }
228}
229
230#[derive(Debug, Clone, Default)]
236pub struct NodeConfigBuilder {
237 peer_id: Option<PeerId>,
238 listen_port: Option<u16>,
239 enable_ipv6: Option<bool>,
240 bootstrap_peers: Vec<std::net::SocketAddr>,
241 max_connections: Option<usize>,
242 connection_timeout: Option<Duration>,
243 keep_alive_interval: Option<Duration>,
244 dht_config: Option<DHTConfig>,
245 security_config: Option<SecurityConfig>,
246 production_config: Option<ProductionConfig>,
247}
248
249impl NodeConfigBuilder {
250 pub fn peer_id(mut self, peer_id: PeerId) -> Self {
252 self.peer_id = Some(peer_id);
253 self
254 }
255
256 pub fn listen_port(mut self, port: u16) -> Self {
258 self.listen_port = Some(port);
259 self
260 }
261
262 pub fn ipv6(mut self, enabled: bool) -> Self {
264 self.enable_ipv6 = Some(enabled);
265 self
266 }
267
268 pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
270 self.bootstrap_peers.push(addr);
271 self
272 }
273
274 pub fn max_connections(mut self, max: usize) -> Self {
276 self.max_connections = Some(max);
277 self
278 }
279
280 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
282 self.connection_timeout = Some(timeout);
283 self
284 }
285
286 pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
288 self.keep_alive_interval = Some(interval);
289 self
290 }
291
292 pub fn dht_config(mut self, config: DHTConfig) -> Self {
294 self.dht_config = Some(config);
295 self
296 }
297
298 pub fn security_config(mut self, config: SecurityConfig) -> Self {
300 self.security_config = Some(config);
301 self
302 }
303
304 pub fn production_config(mut self, config: ProductionConfig) -> Self {
306 self.production_config = Some(config);
307 self
308 }
309
310 pub fn build(self) -> Result<NodeConfig> {
316 let base_config = Config::default();
317 let default_port = base_config
318 .listen_socket_addr()
319 .map(|addr| addr.port())
320 .unwrap_or(9000);
321 let port = self.listen_port.unwrap_or(default_port);
322 let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
323
324 let listen_addr =
325 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
326
327 Ok(NodeConfig {
328 peer_id: self.peer_id,
329 listen_addrs: build_listen_addrs(port, ipv6_enabled),
330 listen_addr,
331 bootstrap_peers: self.bootstrap_peers.clone(),
332 bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
333 enable_ipv6: ipv6_enabled,
334 connection_timeout: self
335 .connection_timeout
336 .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
337 keep_alive_interval: self
338 .keep_alive_interval
339 .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
340 max_connections: self
341 .max_connections
342 .unwrap_or(base_config.network.max_connections),
343 max_incoming_connections: base_config.security.connection_limit as usize,
344 dht_config: self.dht_config.unwrap_or_default(),
345 security_config: self.security_config.unwrap_or_default(),
346 production_config: self.production_config,
347 bootstrap_cache_config: None,
348 diversity_config: None,
349 stale_peer_threshold: default_stale_peer_threshold(),
350 })
351 }
352}
353
354impl Default for NodeConfig {
355 fn default() -> Self {
356 let config = Config::default();
357 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
358 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 9000)
359 });
360
361 Self {
362 peer_id: None,
363 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
364 listen_addr,
365 bootstrap_peers: Vec::new(),
366 bootstrap_peers_str: Vec::new(),
367 enable_ipv6: config.network.ipv6_enabled,
368 connection_timeout: Duration::from_secs(config.network.connection_timeout),
369 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
370 max_connections: config.network.max_connections,
371 max_incoming_connections: config.security.connection_limit as usize,
372 dht_config: DHTConfig::default(),
373 security_config: SecurityConfig::default(),
374 production_config: None,
375 bootstrap_cache_config: None,
376 diversity_config: None,
377 stale_peer_threshold: default_stale_peer_threshold(),
378 }
379 }
380}
381
382impl NodeConfig {
383 pub fn from_config(config: &Config) -> Result<Self> {
385 let listen_addr = config.listen_socket_addr()?;
386 let bootstrap_addrs = config.bootstrap_addrs()?;
387
388 let mut node_config = Self {
389 peer_id: None,
390 listen_addrs: vec![listen_addr],
391 listen_addr,
392 bootstrap_peers: bootstrap_addrs
393 .iter()
394 .map(|addr| addr.socket_addr())
395 .collect(),
396 bootstrap_peers_str: config
397 .network
398 .bootstrap_nodes
399 .iter()
400 .map(|addr| addr.to_string())
401 .collect(),
402 enable_ipv6: config.network.ipv6_enabled,
403
404 connection_timeout: Duration::from_secs(config.network.connection_timeout),
405 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
406 max_connections: config.network.max_connections,
407 max_incoming_connections: config.security.connection_limit as usize,
408 dht_config: DHTConfig {
409 k_value: 20,
410 alpha_value: 3,
411 record_ttl: Duration::from_secs(3600),
412 refresh_interval: Duration::from_secs(900),
413 },
414 security_config: SecurityConfig {
415 enable_noise: true,
416 enable_tls: true,
417 trust_level: TrustLevel::Basic,
418 },
419 production_config: Some(ProductionConfig {
420 max_connections: config.network.max_connections,
421 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
424 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
425 health_check_interval: Duration::from_secs(30),
426 metrics_interval: Duration::from_secs(60),
427 enable_performance_tracking: true,
428 enable_auto_cleanup: true,
429 shutdown_timeout: Duration::from_secs(30),
430 rate_limits: crate::production::RateLimitConfig::default(),
431 }),
432 bootstrap_cache_config: None,
433 diversity_config: None,
434 stale_peer_threshold: default_stale_peer_threshold(),
435 };
436
437 if config.network.ipv6_enabled {
439 node_config.listen_addrs.push(std::net::SocketAddr::new(
440 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
441 listen_addr.port(),
442 ));
443 }
444
445 Ok(node_config)
446 }
447
448 pub fn with_listen_addr(addr: &str) -> Result<Self> {
450 let listen_addr: std::net::SocketAddr = addr
451 .parse()
452 .map_err(|e: std::net::AddrParseError| {
453 NetworkError::InvalidAddress(e.to_string().into())
454 })
455 .map_err(P2PError::Network)?;
456 let cfg = NodeConfig {
457 listen_addr,
458 listen_addrs: vec![listen_addr],
459 diversity_config: None,
460 ..Default::default()
461 };
462 Ok(cfg)
463 }
464}
465
466impl Default for DHTConfig {
467 fn default() -> Self {
468 Self {
469 k_value: 20,
470 alpha_value: 5,
471 record_ttl: Duration::from_secs(3600), refresh_interval: Duration::from_secs(600), }
474 }
475}
476
477impl Default for SecurityConfig {
478 fn default() -> Self {
479 Self {
480 enable_noise: true,
481 enable_tls: true,
482 trust_level: TrustLevel::Basic,
483 }
484 }
485}
486
487#[derive(Debug, Clone)]
489pub struct PeerInfo {
490 pub peer_id: PeerId,
492
493 pub addresses: Vec<String>,
495
496 pub connected_at: Instant,
498
499 pub last_seen: Instant,
501
502 pub status: ConnectionStatus,
504
505 pub protocols: Vec<String>,
507
508 pub heartbeat_count: u64,
510}
511
512#[derive(Debug, Clone, PartialEq)]
514pub enum ConnectionStatus {
515 Connecting,
517 Connected,
519 Disconnecting,
521 Disconnected,
523 Failed(String),
525}
526
527#[derive(Debug, Clone)]
529pub enum NetworkEvent {
530 PeerConnected {
532 peer_id: PeerId,
534 addresses: Vec<String>,
536 },
537
538 PeerDisconnected {
540 peer_id: PeerId,
542 reason: String,
544 },
545
546 MessageReceived {
548 peer_id: PeerId,
550 protocol: String,
552 data: Vec<u8>,
554 },
555
556 ConnectionFailed {
558 peer_id: Option<PeerId>,
560 address: String,
562 error: String,
564 },
565
566 DHTRecordStored {
568 key: Vec<u8>,
570 value: Vec<u8>,
572 },
573
574 DHTRecordRetrieved {
576 key: Vec<u8>,
578 value: Option<Vec<u8>>,
580 },
581}
582
583#[derive(Debug, Clone)]
588pub enum P2PEvent {
589 Message {
591 topic: String,
593 source: PeerId,
595 data: Vec<u8>,
597 },
598 PeerConnected(PeerId),
600 PeerDisconnected(PeerId),
602}
603
604pub struct P2PNode {
614 config: NodeConfig,
616
617 peer_id: PeerId,
619
620 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
622
623 event_tx: broadcast::Sender<P2PEvent>,
625
626 listen_addrs: RwLock<Vec<std::net::SocketAddr>>,
628
629 start_time: Instant,
631
632 running: RwLock<bool>,
634
635 dht: Option<Arc<RwLock<DHT>>>,
637
638 resource_manager: Option<Arc<ResourceManager>>,
640
641 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
643
644 dual_node: Arc<DualStackNetworkNode>,
646
647 rate_limiter: Arc<RateLimiter>,
649
650 active_connections: Arc<RwLock<HashSet<PeerId>>>,
653
654 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
656
657 #[allow(dead_code)]
659 connection_monitor_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
660
661 #[allow(dead_code)]
663 keepalive_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
664
665 #[allow(dead_code)]
667 periodic_tasks_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
668
669 shutdown: Arc<AtomicBool>,
671
672 recv_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
674
675 listener_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
677
678 #[allow(dead_code)]
680 geo_provider: Arc<BgpGeoProvider>,
681
682 is_bootstrapped: Arc<AtomicBool>,
684
685 #[cfg(feature = "adaptive-ml")]
691 trust_engine: Option<Arc<EigenTrustEngine>>,
692}
693
694fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
710 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711
712 if addr.ip().is_unspecified() {
713 let loopback_ip = match addr {
715 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
718 std::net::SocketAddr::new(loopback_ip, addr.port())
719 } else {
720 addr
722 }
723}
724
725impl P2PNode {
726 pub fn new_for_tests() -> Result<Self> {
728 let (event_tx, _) = broadcast::channel(16);
729 Ok(Self {
730 config: NodeConfig::default(),
731 peer_id: "test_peer".to_string(),
732 peers: Arc::new(RwLock::new(HashMap::new())),
733 event_tx,
734 listen_addrs: RwLock::new(Vec::new()),
735 start_time: Instant::now(),
736 running: RwLock::new(false),
737 dht: None,
738 resource_manager: None,
739 bootstrap_manager: None,
740 dual_node: {
741 let v6: Option<std::net::SocketAddr> = "[::1]:0"
743 .parse()
744 .ok()
745 .or(Some(std::net::SocketAddr::from(([0, 0, 0, 0], 0))));
746 let v4: Option<std::net::SocketAddr> = "127.0.0.1:0".parse().ok();
747 let handle = tokio::runtime::Handle::current();
748 let dual_attempt = handle.block_on(
749 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(v6, v4),
750 );
751 let dual = match dual_attempt {
752 Ok(d) => d,
753 Err(_e1) => {
754 let fallback = handle.block_on(
756 crate::transport::ant_quic_adapter::DualStackNetworkNode::new(
757 None,
758 "127.0.0.1:0".parse().ok(),
759 ),
760 );
761 match fallback {
762 Ok(d) => d,
763 Err(e2) => {
764 return Err(P2PError::Network(NetworkError::BindError(
765 format!("Failed to create dual-stack network node: {}", e2)
766 .into(),
767 )));
768 }
769 }
770 }
771 };
772 Arc::new(dual)
773 },
774 rate_limiter: Arc::new(RateLimiter::new(RateLimitConfig {
775 max_requests: 100,
776 burst_size: 100,
777 window: std::time::Duration::from_secs(1),
778 ..Default::default()
779 })),
780 active_connections: Arc::new(RwLock::new(HashSet::new())),
781 connection_monitor_handle: Arc::new(RwLock::new(None)),
782 keepalive_handle: Arc::new(RwLock::new(None)),
783 periodic_tasks_handle: Arc::new(RwLock::new(None)),
784 shutdown: Arc::new(AtomicBool::new(false)),
785 recv_handles: Arc::new(RwLock::new(Vec::new())),
786 listener_handle: Arc::new(RwLock::new(None)),
787 geo_provider: Arc::new(BgpGeoProvider::new()),
788 security_dashboard: None,
789 is_bootstrapped: Arc::new(AtomicBool::new(false)),
790 #[cfg(feature = "adaptive-ml")]
791 trust_engine: None,
792 })
793 }
794 pub async fn new(config: NodeConfig) -> Result<Self> {
796 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
797 let uuid_str = uuid::Uuid::new_v4().to_string();
800 format!("peer_{}", &uuid_str[..8])
801 });
802
803 let (event_tx, _) = broadcast::channel(1000);
804
805 {
808 use blake3::Hasher;
809 let mut hasher = Hasher::new();
810 hasher.update(peer_id.as_bytes());
811 let digest = hasher.finalize();
812 let mut nid = [0u8; 32];
813 nid.copy_from_slice(digest.as_bytes());
814 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
815 crate::identity::node_identity::NodeId::from_bytes(nid),
816 ));
817 }
820
821 let (dht, security_dashboard) = if true {
823 let _dht_config = crate::dht::DHTConfig {
825 replication_factor: config.dht_config.k_value,
826 bucket_size: config.dht_config.k_value,
827 alpha: config.dht_config.alpha_value,
828 record_ttl: config.dht_config.record_ttl,
829 bucket_refresh_interval: config.dht_config.refresh_interval,
830 republish_interval: config.dht_config.refresh_interval,
831 max_distance: 160,
832 };
833 let peer_bytes = peer_id.as_bytes();
835 let mut node_id_bytes = [0u8; 32];
836 let len = peer_bytes.len().min(32);
837 node_id_bytes[..len].copy_from_slice(&peer_bytes[..len]);
838 let node_id = crate::dht::core_engine::NodeId::from_bytes(node_id_bytes);
839 let dht_instance = DHT::new(node_id).map_err(|e| {
840 crate::error::P2PError::Dht(crate::error::DhtError::StoreFailed(
841 e.to_string().into(),
842 ))
843 })?;
844 dht_instance.start_maintenance_tasks();
845
846 let security_metrics = dht_instance.security_metrics();
848 let dashboard = crate::dht::metrics::SecurityDashboard::new(
849 security_metrics,
850 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
851 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
852 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
853 );
854
855 (
856 Some(Arc::new(RwLock::new(dht_instance))),
857 Some(Arc::new(dashboard)),
858 )
859 } else {
860 (None, None)
861 };
862
863 let resource_manager = config
867 .production_config
868 .clone()
869 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
870
871 let diversity_config = config.diversity_config.clone().unwrap_or_default();
873 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
874 match BootstrapManager::with_full_config(
875 cache_config.clone(),
876 crate::rate_limit::JoinRateLimiterConfig::default(),
877 diversity_config.clone(),
878 )
879 .await
880 {
881 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
882 Err(e) => {
883 warn!(
884 "Failed to initialize bootstrap manager: {}, continuing without cache",
885 e
886 );
887 None
888 }
889 }
890 } else {
891 match BootstrapManager::with_full_config(
892 crate::bootstrap::CacheConfig::default(),
893 crate::rate_limit::JoinRateLimiterConfig::default(),
894 diversity_config,
895 )
896 .await
897 {
898 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
899 Err(e) => {
900 warn!(
901 "Failed to initialize bootstrap manager: {}, continuing without cache",
902 e
903 );
904 None
905 }
906 }
907 };
908
909 #[cfg(feature = "adaptive-ml")]
912 let trust_engine = {
913 use crate::adaptive::NodeId;
914 use std::collections::HashSet;
915
916 let mut pre_trusted = HashSet::new();
923 for bootstrap_peer in &config.bootstrap_peers_str {
924 let hash = blake3::hash(bootstrap_peer.as_bytes());
926 let mut node_id_bytes = [0u8; 32];
927 node_id_bytes.copy_from_slice(hash.as_bytes());
928 pre_trusted.insert(NodeId::from_bytes(node_id_bytes));
929 }
930
931 let engine = Arc::new(EigenTrustEngine::new(pre_trusted));
932 engine.clone().start_background_updates();
934 Some(engine)
935 };
936
937 let (v6_opt, v4_opt) = {
940 let port = config.listen_addr.port();
941 let ip = config.listen_addr.ip();
942
943 let v4_addr = if ip.is_ipv4() {
944 Some(std::net::SocketAddr::new(ip, port))
945 } else {
946 Some(std::net::SocketAddr::new(
949 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
950 port,
951 ))
952 };
953
954 let v6_addr = if config.enable_ipv6 {
955 if ip.is_ipv6() {
956 Some(std::net::SocketAddr::new(ip, port))
957 } else {
958 Some(std::net::SocketAddr::new(
959 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
960 port,
961 ))
962 }
963 } else {
964 None
965 };
966 (v6_addr, v4_addr)
967 };
968
969 let dual_node = Arc::new(
970 DualStackNetworkNode::new_with_max_connections(v6_opt, v4_opt, config.max_connections)
971 .await
972 .map_err(|e| {
973 P2PError::Transport(crate::error::TransportError::SetupFailed(
974 format!("Failed to create dual-stack network nodes: {}", e).into(),
975 ))
976 })?,
977 );
978
979 let rate_limiter = Arc::new(RateLimiter::new(
981 crate::validation::RateLimitConfig::default(),
982 ));
983
984 let active_connections = Arc::new(RwLock::new(HashSet::new()));
986
987 let geo_provider = Arc::new(BgpGeoProvider::new());
989
990 let peers = Arc::new(RwLock::new(HashMap::new()));
992
993 let connection_event_rx = dual_node.subscribe_connection_events();
997
998 let connection_monitor_handle = {
999 let active_conns = Arc::clone(&active_connections);
1000 let peers_map = Arc::clone(&peers);
1001 let event_tx_clone = event_tx.clone();
1002 let dual_node_clone = Arc::clone(&dual_node);
1003 let geo_provider_clone = Arc::clone(&geo_provider);
1004 let peer_id_clone = peer_id.clone();
1005
1006 let handle = tokio::spawn(async move {
1007 Self::connection_lifecycle_monitor_with_rx(
1008 dual_node_clone,
1009 connection_event_rx,
1010 active_conns,
1011 peers_map,
1012 event_tx_clone,
1013 geo_provider_clone,
1014 peer_id_clone,
1015 )
1016 .await;
1017 });
1018
1019 Arc::new(RwLock::new(Some(handle)))
1020 };
1021
1022 let shutdown = Arc::new(AtomicBool::new(false));
1024 let keepalive_handle = {
1025 let active_conns = Arc::clone(&active_connections);
1026 let dual_node_clone = Arc::clone(&dual_node);
1027 let shutdown_clone = Arc::clone(&shutdown);
1028
1029 let handle = tokio::spawn(async move {
1030 Self::keepalive_task(active_conns, dual_node_clone, shutdown_clone).await;
1031 });
1032
1033 Arc::new(RwLock::new(Some(handle)))
1034 };
1035
1036 let periodic_tasks_handle = {
1038 let peers_clone = Arc::clone(&peers);
1039 let active_conns_clone = Arc::clone(&active_connections);
1040 let event_tx_clone = event_tx.clone();
1041 let stale_threshold = config.stale_peer_threshold;
1042 let shutdown_clone = Arc::clone(&shutdown);
1043
1044 let handle = tokio::spawn(async move {
1045 Self::periodic_maintenance_task(
1046 peers_clone,
1047 active_conns_clone,
1048 event_tx_clone,
1049 stale_threshold,
1050 shutdown_clone,
1051 )
1052 .await;
1053 });
1054
1055 Arc::new(RwLock::new(Some(handle)))
1056 };
1057
1058 let node = Self {
1059 config,
1060 peer_id,
1061 peers,
1062 event_tx,
1063 listen_addrs: RwLock::new(Vec::new()),
1064 start_time: Instant::now(),
1065 running: RwLock::new(false),
1066 dht,
1067 resource_manager,
1068 bootstrap_manager,
1069 dual_node,
1070 rate_limiter,
1071 active_connections,
1072 security_dashboard,
1073 connection_monitor_handle,
1074 keepalive_handle,
1075 periodic_tasks_handle,
1076 shutdown,
1077 recv_handles: Arc::new(RwLock::new(Vec::new())),
1078 listener_handle: Arc::new(RwLock::new(None)),
1079 geo_provider,
1080 is_bootstrapped: Arc::new(AtomicBool::new(false)),
1081 #[cfg(feature = "adaptive-ml")]
1082 trust_engine,
1083 };
1084 info!(
1085 "Created P2P node with peer ID: {} (call start() to begin networking)",
1086 node.peer_id
1087 );
1088
1089 Ok(node)
1090 }
1091
1092 pub fn builder() -> NodeBuilder {
1094 NodeBuilder::new()
1095 }
1096
1097 pub fn peer_id(&self) -> &PeerId {
1099 &self.peer_id
1100 }
1101
1102 pub fn transport_peer_id(&self) -> Option<String> {
1109 if let Some(ref v4) = self.dual_node.v4 {
1110 return Some(ant_peer_id_to_string(&v4.our_peer_id()));
1111 }
1112 if let Some(ref v6) = self.dual_node.v6 {
1113 return Some(ant_peer_id_to_string(&v6.our_peer_id()));
1114 }
1115 None
1116 }
1117
1118 pub fn local_addr(&self) -> Option<String> {
1119 self.listen_addrs
1120 .try_read()
1121 .ok()
1122 .and_then(|addrs| addrs.first().map(|a| a.to_string()))
1123 }
1124
1125 pub fn is_bootstrapped(&self) -> bool {
1130 self.is_bootstrapped.load(Ordering::SeqCst)
1131 }
1132
1133 pub async fn re_bootstrap(&self) -> Result<()> {
1138 self.is_bootstrapped.store(false, Ordering::SeqCst);
1139 self.connect_bootstrap_peers().await
1140 }
1141
1142 #[cfg(feature = "adaptive-ml")]
1166 pub fn trust_engine(&self) -> Option<Arc<EigenTrustEngine>> {
1167 self.trust_engine.clone()
1168 }
1169
1170 #[cfg(feature = "adaptive-ml")]
1176 fn peer_id_to_trust_node_id(peer_id: &str) -> AdaptiveNodeId {
1177 if let Ok(bytes) = hex::decode(peer_id)
1178 && bytes.len() == 32
1179 {
1180 let mut arr = [0u8; 32];
1181 arr.copy_from_slice(&bytes);
1182 return AdaptiveNodeId::from_bytes(arr);
1183 }
1184 let hash = blake3::hash(peer_id.as_bytes());
1186 AdaptiveNodeId::from_bytes(*hash.as_bytes())
1187 }
1188
1189 #[cfg(feature = "adaptive-ml")]
1209 pub async fn report_peer_success(&self, peer_id: &str) -> Result<()> {
1210 if let Some(ref engine) = self.trust_engine {
1211 let node_id = Self::peer_id_to_trust_node_id(peer_id);
1212
1213 engine
1214 .update_node_stats(&node_id, NodeStatisticsUpdate::CorrectResponse)
1215 .await;
1216 Ok(())
1217 } else {
1218 Ok(())
1220 }
1221 }
1222
1223 #[cfg(feature = "adaptive-ml")]
1244 pub async fn report_peer_failure(&self, peer_id: &str) -> Result<()> {
1245 if let Some(ref engine) = self.trust_engine {
1246 let node_id = Self::peer_id_to_trust_node_id(peer_id);
1247
1248 engine
1249 .update_node_stats(&node_id, NodeStatisticsUpdate::FailedResponse)
1250 .await;
1251 Ok(())
1252 } else {
1253 Ok(())
1255 }
1256 }
1257
1258 #[cfg(feature = "adaptive-ml")]
1278 pub fn peer_trust(&self, peer_id: &str) -> f64 {
1279 if let Some(ref engine) = self.trust_engine {
1280 let node_id = Self::peer_id_to_trust_node_id(peer_id);
1281
1282 use crate::adaptive::TrustProvider;
1283 engine.get_trust(&node_id)
1284 } else {
1285 0.5
1287 }
1288 }
1289
1290 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1291 info!("Subscribed to topic: {}", topic);
1294 Ok(())
1295 }
1296
1297 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1298 info!(
1299 "Publishing message to topic: {} ({} bytes)",
1300 topic,
1301 data.len()
1302 );
1303
1304 let peer_list: Vec<PeerId> = {
1306 let peers_guard = self.peers.read().await;
1307 peers_guard.keys().cloned().collect()
1308 };
1309
1310 if peer_list.is_empty() {
1311 debug!("No peers connected, message will only be sent to local subscribers");
1312 } else {
1313 let mut send_count = 0;
1315 for peer_id in &peer_list {
1316 match self.send_message(peer_id, topic, data.to_vec()).await {
1317 Ok(_) => {
1318 send_count += 1;
1319 debug!("Sent message to peer: {}", peer_id);
1320 }
1321 Err(e) => {
1322 warn!("Failed to send message to peer {}: {}", peer_id, e);
1323 }
1324 }
1325 }
1326 info!(
1327 "Published message to {}/{} connected peers",
1328 send_count,
1329 peer_list.len()
1330 );
1331 }
1332
1333 self.send_event(P2PEvent::Message {
1335 topic: topic.to_string(),
1336 source: self.peer_id.clone(),
1337 data: data.to_vec(),
1338 });
1339
1340 Ok(())
1341 }
1342
1343 pub fn config(&self) -> &NodeConfig {
1345 &self.config
1346 }
1347
1348 pub async fn start(&self) -> Result<()> {
1350 info!("Starting P2P node...");
1351
1352 if let Some(ref resource_manager) = self.resource_manager {
1354 resource_manager.start().await.map_err(|e| {
1355 P2PError::Network(crate::error::NetworkError::ProtocolError(
1356 format!("Failed to start resource manager: {e}").into(),
1357 ))
1358 })?;
1359 info!("Production resource manager started");
1360 }
1361
1362 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1364 let mut manager = bootstrap_manager.write().await;
1365 manager.start_background_tasks().await.map_err(|e| {
1366 P2PError::Network(crate::error::NetworkError::ProtocolError(
1367 format!("Failed to start bootstrap manager: {e}").into(),
1368 ))
1369 })?;
1370 info!("Bootstrap cache manager started");
1371 }
1372
1373 *self.running.write().await = true;
1375
1376 self.start_network_listeners().await?;
1378
1379 self.start_connection_monitor().await;
1381
1382 let listen_addrs = self.listen_addrs.read().await;
1384 info!("P2P node started on addresses: {:?}", *listen_addrs);
1385
1386 self.connect_bootstrap_peers().await?;
1392
1393 Ok(())
1394 }
1395
1396 async fn start_network_listeners(&self) -> Result<()> {
1398 info!("Starting dual-stack listeners (ant-quic)...");
1399 let addrs = self.dual_node.local_addrs().await.map_err(|e| {
1401 P2PError::Transport(crate::error::TransportError::SetupFailed(
1402 format!("Failed to get local addresses: {}", e).into(),
1403 ))
1404 })?;
1405 {
1406 let mut la = self.listen_addrs.write().await;
1407 *la = addrs.clone();
1408 }
1409
1410 let event_tx = self.event_tx.clone();
1414 let peers = self.peers.clone();
1415 let active_connections = self.active_connections.clone();
1416 let rate_limiter = self.rate_limiter.clone();
1417 let dual = self.dual_node.clone();
1418 let shutdown = Arc::clone(&self.shutdown);
1419 let handle = tokio::spawn(async move {
1420 loop {
1421 if shutdown.load(Ordering::Relaxed) {
1422 break;
1423 }
1424 match dual.accept_any().await {
1425 Ok((ant_peer_id, remote_sock)) => {
1426 if let Err(e) = rate_limiter.check_ip(&remote_sock.ip()) {
1428 warn!(
1429 "Rate-limited incoming connection from {}: {}",
1430 remote_sock, e
1431 );
1432 continue;
1433 }
1434
1435 let peer_id =
1436 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id);
1437 let remote_addr = NetworkAddress::from(remote_sock);
1438 broadcast_event(&event_tx, P2PEvent::PeerConnected(peer_id.clone()));
1439 register_new_peer(&peers, &peer_id, &remote_addr).await;
1440 active_connections.write().await.insert(peer_id);
1441 }
1442 Err(e) => {
1443 warn!("Accept failed: {}", e);
1444 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1445 }
1446 }
1447 }
1448 });
1449 *self.listener_handle.write().await = Some(handle);
1450
1451 self.start_message_receiving_system().await?;
1456
1457 info!("Dual-stack listeners active on: {:?}", addrs);
1458 Ok(())
1459 }
1460
1461 async fn start_message_receiving_system(&self) -> Result<()> {
1464 info!("Starting message receiving system");
1465
1466 let (tx, mut rx) = tokio::sync::mpsc::channel(MESSAGE_RECV_CHANNEL_CAPACITY);
1467 let shutdown = Arc::clone(&self.shutdown);
1468
1469 let mut handles = Vec::new();
1470
1471 if let Some(v6) = self.dual_node.v6.as_ref() {
1473 handles.push(v6.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1474 }
1475 if let Some(v4) = self.dual_node.v4.as_ref() {
1476 handles.push(v4.spawn_recv_task(tx.clone(), Arc::clone(&shutdown)));
1477 }
1478 drop(tx); let event_tx = self.event_tx.clone();
1481 handles.push(tokio::spawn(async move {
1482 info!("Message receive loop started");
1483 while let Some((peer_id, bytes)) = rx.recv().await {
1484 let transport_peer_id =
1485 crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id);
1486 trace!(
1487 "Received {} bytes from peer {}",
1488 bytes.len(),
1489 transport_peer_id
1490 );
1491
1492 if bytes == KEEPALIVE_PAYLOAD {
1493 trace!("Received keepalive from {}", transport_peer_id);
1494 continue;
1495 }
1496
1497 match parse_protocol_message(&bytes, &transport_peer_id) {
1498 Some(event) => broadcast_event(&event_tx, event),
1499 None => {
1500 warn!("Failed to parse protocol message ({} bytes)", bytes.len());
1501 }
1502 }
1503 }
1504 info!("Message receive loop ended — channel closed");
1505 }));
1506
1507 *self.recv_handles.write().await = handles;
1508
1509 Ok(())
1510 }
1511
1512 pub async fn run(&self) -> Result<()> {
1514 if !*self.running.read().await {
1515 self.start().await?;
1516 }
1517
1518 info!("P2P node running...");
1519
1520 loop {
1522 if !*self.running.read().await {
1523 break;
1524 }
1525
1526 self.periodic_tasks().await?;
1528
1529 tokio::time::sleep(Duration::from_millis(100)).await;
1531 }
1532
1533 info!("P2P node stopped");
1534 Ok(())
1535 }
1536
1537 pub async fn stop(&self) -> Result<()> {
1539 info!("Stopping P2P node...");
1540
1541 self.shutdown.store(true, Ordering::Relaxed);
1543
1544 *self.running.write().await = false;
1546
1547 self.dual_node.shutdown_endpoints().await;
1554
1555 let handles: Vec<_> = self.recv_handles.write().await.drain(..).collect();
1557 for handle in handles {
1558 match handle.await {
1559 Ok(()) => {}
1560 Err(e) if e.is_cancelled() => {
1561 tracing::debug!("Recv task was cancelled during shutdown");
1562 }
1563 Err(e) if e.is_panic() => {
1564 tracing::error!("Recv task panicked during shutdown: {:?}", e);
1565 }
1566 Err(e) => {
1567 tracing::warn!("Recv task join error during shutdown: {:?}", e);
1568 }
1569 }
1570 }
1571
1572 if let Some(handle) = self.listener_handle.write().await.take() {
1574 match handle.await {
1575 Ok(()) => {}
1576 Err(e) if e.is_cancelled() => {
1577 tracing::debug!("Listener task was cancelled during shutdown");
1578 }
1579 Err(e) if e.is_panic() => {
1580 tracing::error!("Listener task panicked during shutdown: {:?}", e);
1581 }
1582 Err(e) => {
1583 tracing::warn!("Listener task join error during shutdown: {:?}", e);
1584 }
1585 }
1586 }
1587
1588 self.disconnect_all_peers().await?;
1590
1591 if let Some(ref resource_manager) = self.resource_manager {
1593 resource_manager.shutdown().await.map_err(|e| {
1594 P2PError::Network(crate::error::NetworkError::ProtocolError(
1595 format!("Failed to shutdown resource manager: {e}").into(),
1596 ))
1597 })?;
1598 info!("Production resource manager stopped");
1599 }
1600
1601 info!("P2P node stopped");
1602 Ok(())
1603 }
1604
1605 pub async fn shutdown(&self) -> Result<()> {
1607 self.stop().await
1608 }
1609
1610 pub async fn is_running(&self) -> bool {
1612 *self.running.read().await
1613 }
1614
1615 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1617 self.listen_addrs.read().await.clone()
1618 }
1619
1620 pub async fn connected_peers(&self) -> Vec<PeerId> {
1622 self.active_connections
1625 .read()
1626 .await
1627 .iter()
1628 .cloned()
1629 .collect()
1630 }
1631
1632 pub async fn peer_count(&self) -> usize {
1634 self.active_connections.read().await.len()
1635 }
1636
1637 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1639 self.peers.read().await.get(peer_id).cloned()
1640 }
1641
1642 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1654 let socket_addr: std::net::SocketAddr = addr.parse().ok()?;
1656
1657 let peers = self.peers.read().await;
1658
1659 for (peer_id, peer_info) in peers.iter() {
1661 for peer_addr in &peer_info.addresses {
1663 if let Ok(peer_socket) = peer_addr.parse::<std::net::SocketAddr>()
1664 && peer_socket == socket_addr
1665 {
1666 return Some(peer_id.clone());
1667 }
1668 }
1669 }
1670
1671 None
1672 }
1673
1674 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1680 let active = self.active_connections.read().await;
1681 let peers = self.peers.read().await;
1682
1683 active
1684 .iter()
1685 .map(|peer_id| {
1686 let addresses = peers
1687 .get(peer_id)
1688 .map(|info| info.addresses.clone())
1689 .unwrap_or_default();
1690 (peer_id.clone(), addresses)
1691 })
1692 .collect()
1693 }
1694
1695 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1707 self.active_connections.write().await.remove(peer_id);
1709 self.peers.write().await.remove(peer_id).is_some()
1711 }
1712
1713 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1726 self.peers.read().await.contains_key(peer_id)
1727 }
1728
1729 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1731 info!("Connecting to peer at: {}", address);
1732
1733 let _connection_guard = if let Some(ref resource_manager) = self.resource_manager {
1735 Some(resource_manager.acquire_connection().await?)
1736 } else {
1737 None
1738 };
1739
1740 let socket_addr: std::net::SocketAddr = address.parse().map_err(|e| {
1742 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1743 format!("{}: {}", address, e).into(),
1744 ))
1745 })?;
1746
1747 let normalized_addr = normalize_wildcard_to_loopback(socket_addr);
1750 if normalized_addr != socket_addr {
1751 info!(
1752 "Normalized wildcard address {} to loopback {}",
1753 socket_addr, normalized_addr
1754 );
1755 }
1756
1757 let addr_list = vec![normalized_addr];
1759 let peer_id = match tokio::time::timeout(
1760 self.config.connection_timeout,
1761 self.dual_node.connect_happy_eyeballs(&addr_list),
1762 )
1763 .await
1764 {
1765 Ok(Ok(peer)) => {
1766 let connected_peer_id = ant_peer_id_to_string(&peer);
1767 info!("Successfully connected to peer: {}", connected_peer_id);
1768
1769 if connected_peer_id == self.peer_id {
1771 warn!(
1772 "Detected self-connection to own address {} (peer_id: {}), rejecting",
1773 address, connected_peer_id
1774 );
1775 self.dual_node.disconnect_peer(&peer).await;
1778 return Err(P2PError::Network(
1779 crate::error::NetworkError::InvalidAddress(
1780 format!("Cannot connect to self ({})", address).into(),
1781 ),
1782 ));
1783 }
1784
1785 connected_peer_id
1786 }
1787 Ok(Err(e)) => {
1788 warn!("Failed to connect to peer at {}: {}", address, e);
1789 return Err(P2PError::Transport(
1790 crate::error::TransportError::ConnectionFailed {
1791 addr: normalized_addr,
1792 reason: e.to_string().into(),
1793 },
1794 ));
1795 }
1796 Err(_) => {
1797 warn!(
1798 "Timed out connecting to peer at {} after {:?}",
1799 address, self.config.connection_timeout
1800 );
1801 return Err(P2PError::Timeout(self.config.connection_timeout));
1802 }
1803 };
1804
1805 let peer_info = PeerInfo {
1807 peer_id: peer_id.clone(),
1808 addresses: vec![address.to_string()],
1809 connected_at: Instant::now(),
1810 last_seen: Instant::now(),
1811 status: ConnectionStatus::Connected,
1812 protocols: vec!["p2p-foundation/1.0".to_string()],
1813 heartbeat_count: 0,
1814 };
1815
1816 self.peers.write().await.insert(peer_id.clone(), peer_info);
1818
1819 self.active_connections
1822 .write()
1823 .await
1824 .insert(peer_id.clone());
1825
1826 if let Some(ref resource_manager) = self.resource_manager {
1828 resource_manager.record_bandwidth(0, 0); }
1830
1831 self.send_event(P2PEvent::PeerConnected(peer_id.clone()));
1833
1834 info!("Connected to peer: {}", peer_id);
1835 Ok(peer_id)
1836 }
1837
1838 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1844 info!("Disconnecting from peer: {}", peer_id);
1845
1846 self.dual_node.disconnect_peer_string(peer_id).await.ok();
1849
1850 self.active_connections.write().await.remove(peer_id);
1852
1853 if let Some(mut peer_info) = self.peers.write().await.remove(peer_id) {
1854 peer_info.status = ConnectionStatus::Disconnected;
1855
1856 let _ = self
1858 .event_tx
1859 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
1860
1861 info!("Disconnected from peer: {}", peer_id);
1862 }
1863
1864 Ok(())
1865 }
1866
1867 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1869 self.active_connections.read().await.contains(peer_id)
1870 }
1871
1872 pub async fn send_message(
1874 &self,
1875 peer_id: &PeerId,
1876 protocol: &str,
1877 data: Vec<u8>,
1878 ) -> Result<()> {
1879 debug!(
1880 "Sending message to peer {} on protocol {}",
1881 peer_id, protocol
1882 );
1883
1884 if let Some(ref resource_manager) = self.resource_manager
1886 && !resource_manager
1887 .check_rate_limit(peer_id, "message")
1888 .await?
1889 {
1890 return Err(P2PError::ResourceExhausted(
1891 format!("Rate limit exceeded for peer {}", peer_id).into(),
1892 ));
1893 }
1894
1895 if !self.peers.read().await.contains_key(peer_id) {
1897 return Err(P2PError::Network(crate::error::NetworkError::PeerNotFound(
1898 peer_id.to_string().into(),
1899 )));
1900 }
1901
1902 if !self.is_connection_active(peer_id).await {
1905 self.remove_peer(peer_id).await;
1907
1908 return Err(P2PError::Network(
1909 crate::error::NetworkError::ConnectionClosed {
1910 peer_id: peer_id.to_string().into(),
1911 },
1912 ));
1913 }
1914
1915 if let Some(ref resource_manager) = self.resource_manager {
1919 resource_manager.record_bandwidth(data.len() as u64, 0);
1920 }
1921
1922 let raw_data_len = data.len();
1924 let _message_data = self.create_protocol_message(protocol, data)?;
1925 info!(
1926 "Sending {} bytes to peer {} on protocol {} (raw data: {} bytes)",
1927 _message_data.len(),
1928 peer_id,
1929 protocol,
1930 raw_data_len
1931 );
1932
1933 let send_fut = self
1937 .dual_node
1938 .send_to_peer_string_optimized(peer_id, &_message_data);
1939 let result = tokio::time::timeout(self.config.connection_timeout, send_fut)
1940 .await
1941 .map_err(|_| {
1942 P2PError::Transport(crate::error::TransportError::StreamError(
1943 "Timed out sending message".into(),
1944 ))
1945 })?
1946 .map_err(|e| {
1947 P2PError::Transport(crate::error::TransportError::StreamError(
1948 e.to_string().into(),
1949 ))
1950 });
1951
1952 if result.is_ok() {
1953 info!(
1954 "Successfully sent {} bytes to peer {}",
1955 _message_data.len(),
1956 peer_id
1957 );
1958 } else {
1959 warn!("Failed to send message to peer {}", peer_id);
1960 }
1961
1962 result
1963 }
1964
1965 fn create_protocol_message(&self, protocol: &str, data: Vec<u8>) -> Result<Vec<u8>> {
1967 let timestamp = std::time::SystemTime::now()
1968 .duration_since(std::time::UNIX_EPOCH)
1969 .map_err(|e| {
1970 P2PError::Network(NetworkError::ProtocolError(
1971 format!("System time error: {}", e).into(),
1972 ))
1973 })?
1974 .as_secs();
1975
1976 let message = WireMessage {
1977 protocol: protocol.to_string(),
1978 data,
1979 from: self.peer_id.clone(),
1980 timestamp,
1981 };
1982
1983 postcard::to_stdvec(&message).map_err(|e| {
1984 P2PError::Transport(crate::error::TransportError::StreamError(
1985 format!("Failed to serialize message: {e}").into(),
1986 ))
1987 })
1988 }
1989
1990 }
1992
1993const MAX_MESSAGE_AGE_SECS: u64 = 300;
2008const MAX_FUTURE_SECS: u64 = 30;
2010
2011fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
2013 if let Err(e) = tx.send(event) {
2014 tracing::trace!("Event broadcast has no receivers: {e}");
2015 }
2016}
2017
2018fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
2019 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
2020
2021 let now = std::time::SystemTime::now()
2023 .duration_since(std::time::UNIX_EPOCH)
2024 .map(|d| d.as_secs())
2025 .unwrap_or(0);
2026
2027 if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
2029 tracing::warn!(
2030 "Rejecting stale message from {} (timestamp {} is {} seconds old)",
2031 source,
2032 message.timestamp,
2033 now.saturating_sub(message.timestamp)
2034 );
2035 return None;
2036 }
2037
2038 if message.timestamp > now + MAX_FUTURE_SECS {
2040 tracing::warn!(
2041 "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
2042 source,
2043 message.timestamp,
2044 message.timestamp.saturating_sub(now)
2045 );
2046 return None;
2047 }
2048
2049 debug!(
2050 "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}",
2051 message.protocol,
2052 source,
2053 message.from,
2054 message.data.len()
2055 );
2056
2057 Some(P2PEvent::Message {
2058 topic: message.protocol,
2059 source: source.to_string(),
2060 data: message.data,
2061 })
2062}
2063
2064impl P2PNode {
2065 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
2067 self.event_tx.subscribe()
2068 }
2069
2070 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
2072 self.subscribe_events()
2073 }
2074
2075 fn send_event(&self, event: P2PEvent) {
2077 if let Err(e) = self.event_tx.send(event) {
2078 tracing::trace!("Event broadcast has no receivers: {e}");
2079 }
2080 }
2081
2082 pub fn uptime(&self) -> Duration {
2084 self.start_time.elapsed()
2085 }
2086
2087 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
2097 if let Some(ref resource_manager) = self.resource_manager {
2098 Ok(resource_manager.get_metrics().await)
2099 } else {
2100 Err(P2PError::Network(
2101 crate::error::NetworkError::ProtocolError(
2102 "Production resource manager not enabled".to_string().into(),
2103 ),
2104 ))
2105 }
2106 }
2107
2108 #[allow(clippy::too_many_arguments)]
2114 async fn connection_lifecycle_monitor_with_rx(
2115 dual_node: Arc<DualStackNetworkNode>,
2116 mut event_rx: broadcast::Receiver<crate::transport::ant_quic_adapter::ConnectionEvent>,
2117 active_connections: Arc<RwLock<HashSet<String>>>,
2118 peers: Arc<RwLock<HashMap<String, PeerInfo>>>,
2119 event_tx: broadcast::Sender<P2PEvent>,
2120 geo_provider: Arc<BgpGeoProvider>,
2121 _local_peer_id: String,
2122 ) {
2123 use crate::transport::ant_quic_adapter::ConnectionEvent;
2124
2125 info!("Connection lifecycle monitor started (pre-subscribed receiver)");
2126
2127 loop {
2128 match event_rx.recv().await {
2129 Ok(event) => {
2130 match event {
2131 ConnectionEvent::Established {
2132 peer_id,
2133 remote_address,
2134 } => {
2135 let peer_id_str = ant_peer_id_to_string(&peer_id);
2136 debug!(
2137 "Connection established: peer={}, addr={}",
2138 peer_id_str, remote_address
2139 );
2140
2141 let ip = remote_address.ip();
2143 let is_rejected = match ip {
2144 std::net::IpAddr::V4(v4) => {
2145 if let Some(asn) = geo_provider.lookup_ipv4_asn(v4) {
2146 geo_provider.is_hosting_asn(asn)
2147 || geo_provider.is_vpn_asn(asn)
2148 } else {
2149 false
2150 }
2151 }
2152 std::net::IpAddr::V6(v6) => {
2153 let info = geo_provider.lookup(v6);
2154 info.is_hosting_provider || info.is_vpn_provider
2155 }
2156 };
2157
2158 if is_rejected {
2159 info!(
2160 "Rejecting connection from {} ({}) due to GeoIP policy",
2161 peer_id_str, remote_address
2162 );
2163 dual_node.disconnect_peer(&peer_id).await;
2166 continue;
2167 }
2168
2169 active_connections.write().await.insert(peer_id_str.clone());
2171
2172 let mut peers_lock = peers.write().await;
2174 if let Some(peer_info) = peers_lock.get_mut(&peer_id_str) {
2175 peer_info.status = ConnectionStatus::Connected;
2176 peer_info.connected_at = Instant::now();
2177 } else {
2178 debug!("Registering new incoming peer: {}", peer_id_str);
2179 peers_lock.insert(
2180 peer_id_str.clone(),
2181 PeerInfo {
2182 peer_id: peer_id_str.clone(),
2183 addresses: vec![remote_address.to_string()],
2184 status: ConnectionStatus::Connected,
2185 last_seen: Instant::now(),
2186 connected_at: Instant::now(),
2187 protocols: Vec::new(),
2188 heartbeat_count: 0,
2189 },
2190 );
2191 }
2192
2193 broadcast_event(&event_tx, P2PEvent::PeerConnected(peer_id_str));
2195 }
2196 ConnectionEvent::Lost { peer_id, reason } => {
2197 let peer_id_str = ant_peer_id_to_string(&peer_id);
2198 debug!("Connection lost: peer={peer_id_str}, reason={reason}");
2199
2200 active_connections.write().await.remove(&peer_id_str);
2202
2203 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2205 peer_info.status = ConnectionStatus::Disconnected;
2206 peer_info.last_seen = Instant::now();
2207 }
2208
2209 broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id_str));
2211 }
2212 ConnectionEvent::Failed { peer_id, reason } => {
2213 let peer_id_str = ant_peer_id_to_string(&peer_id);
2214 debug!("Connection failed: peer={peer_id_str}, reason={reason}");
2215
2216 active_connections.write().await.remove(&peer_id_str);
2218
2219 if let Some(peer_info) = peers.write().await.get_mut(&peer_id_str) {
2221 peer_info.status = ConnectionStatus::Disconnected;
2222 peer_info.last_seen = Instant::now();
2223 }
2224
2225 broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id_str));
2227 }
2228 }
2229 }
2230 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2231 warn!(
2232 "Connection event receiver lagged, skipped {} events",
2233 skipped
2234 );
2235 }
2236 Err(broadcast::error::RecvError::Closed) => {
2237 info!("Connection event channel closed, stopping lifecycle monitor");
2238 break;
2239 }
2240 }
2241 }
2242 }
2243
2244 async fn start_connection_monitor(&self) {
2246 debug!("Connection monitor already running from initialization");
2250 }
2251
2252 async fn keepalive_task(
2258 active_connections: Arc<RwLock<HashSet<String>>>,
2259 dual_node: Arc<DualStackNetworkNode>,
2260 shutdown: Arc<AtomicBool>,
2261 ) {
2262 const KEEPALIVE_INTERVAL_SECS: u64 = 15; let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS));
2265 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2266
2267 info!(
2268 "Keepalive task started (interval: {}s)",
2269 KEEPALIVE_INTERVAL_SECS
2270 );
2271
2272 loop {
2273 if shutdown.load(Ordering::Relaxed) {
2275 info!("Keepalive task shutting down");
2276 break;
2277 }
2278
2279 interval.tick().await;
2280
2281 let peers: Vec<String> = { active_connections.read().await.iter().cloned().collect() };
2283
2284 if peers.is_empty() {
2285 trace!("Keepalive: no active connections");
2286 continue;
2287 }
2288
2289 debug!("Sending keepalive to {} active connections", peers.len());
2290
2291 let futs: Vec<_> = peers
2294 .into_iter()
2295 .map(|peer_id| {
2296 let node = Arc::clone(&dual_node);
2297 async move {
2298 if let Err(e) = node
2299 .send_to_peer_string_optimized(&peer_id, KEEPALIVE_PAYLOAD)
2300 .await
2301 {
2302 debug!(
2303 "Failed to send keepalive to peer {}: {} (connection may have closed)",
2304 peer_id, e
2305 );
2306 } else {
2307 trace!("Keepalive sent to peer: {}", peer_id);
2308 }
2309 }
2310 })
2311 .collect();
2312 futures::future::join_all(futs).await;
2313 }
2314
2315 info!("Keepalive task stopped");
2316 }
2317
2318 async fn periodic_maintenance_task(
2325 peers: Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
2326 active_connections: Arc<RwLock<HashSet<PeerId>>>,
2327 event_tx: broadcast::Sender<P2PEvent>,
2328 stale_threshold: Duration,
2329 shutdown: Arc<AtomicBool>,
2330 ) {
2331 let cleanup_threshold = stale_threshold * 2;
2332 let mut interval = interval(Duration::from_millis(100));
2333 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
2334
2335 info!(
2336 "Periodic maintenance task started (stale threshold: {:?})",
2337 stale_threshold
2338 );
2339
2340 loop {
2341 interval.tick().await;
2342
2343 if shutdown.load(Ordering::SeqCst) {
2344 break;
2345 }
2346
2347 let now = Instant::now();
2348 let mut peers_to_remove: Vec<PeerId> = Vec::new();
2349 let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2350
2351 {
2353 let peers_lock = peers.read().await;
2354 for (peer_id, peer_info) in peers_lock.iter() {
2355 let elapsed = now.duration_since(peer_info.last_seen);
2356
2357 match &peer_info.status {
2358 ConnectionStatus::Connected => {
2359 if elapsed > stale_threshold {
2360 debug!(
2361 peer_id = %peer_id,
2362 elapsed_secs = elapsed.as_secs(),
2363 "Peer went stale - marking for disconnection"
2364 );
2365 peers_to_mark_disconnected.push(peer_id.clone());
2366 }
2367 }
2368 ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2369 if elapsed > cleanup_threshold {
2370 trace!(
2371 peer_id = %peer_id,
2372 elapsed_secs = elapsed.as_secs(),
2373 "Removing disconnected peer from tracking"
2374 );
2375 peers_to_remove.push(peer_id.clone());
2376 }
2377 }
2378 ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2379 if elapsed > stale_threshold {
2380 debug!(
2381 peer_id = %peer_id,
2382 status = ?peer_info.status,
2383 "Connection timed out in transitional state"
2384 );
2385 peers_to_mark_disconnected.push(peer_id.clone());
2386 }
2387 }
2388 }
2389 }
2390 }
2391
2392 if !peers_to_mark_disconnected.is_empty() {
2397 let mut peers_lock = peers.write().await;
2398 for peer_id in &peers_to_mark_disconnected {
2399 if let Some(peer_info) = peers_lock.get_mut(peer_id) {
2400 peer_info.status = ConnectionStatus::Disconnected;
2401 }
2402 }
2403 }
2404
2405 for peer_id in &peers_to_mark_disconnected {
2407 active_connections.write().await.remove(peer_id);
2408 broadcast_event(&event_tx, P2PEvent::PeerDisconnected(peer_id.clone()));
2409 info!(peer_id = %peer_id, "Stale peer disconnected");
2410 }
2411
2412 if !peers_to_remove.is_empty() {
2414 let mut peers_lock = peers.write().await;
2415 for peer_id in &peers_to_remove {
2416 peers_lock.remove(peer_id);
2417 trace!(peer_id = %peer_id, "Peer removed from tracking");
2418 }
2419 }
2420 }
2421
2422 info!("Periodic maintenance task stopped");
2423 }
2424
2425 pub async fn health_check(&self) -> Result<()> {
2427 if let Some(ref resource_manager) = self.resource_manager {
2428 resource_manager.health_check().await
2429 } else {
2430 let peer_count = self.peer_count().await;
2432 if peer_count > self.config.max_connections {
2433 Err(P2PError::Network(
2434 crate::error::NetworkError::ProtocolError(
2435 format!("Too many connections: {peer_count}").into(),
2436 ),
2437 ))
2438 } else {
2439 Ok(())
2440 }
2441 }
2442 }
2443
2444 pub fn production_config(&self) -> Option<&ProductionConfig> {
2446 self.config.production_config.as_ref()
2447 }
2448
2449 pub fn is_production_mode(&self) -> bool {
2451 self.resource_manager.is_some()
2452 }
2453
2454 pub fn dht(&self) -> Option<&Arc<RwLock<DHT>>> {
2456 self.dht.as_ref()
2457 }
2458
2459 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
2465 if let Some(ref dht) = self.dht {
2466 let mut dht_instance = dht.write().await;
2467 let dht_key = crate::dht::DhtKey::from_bytes(key);
2468 dht_instance.store(&dht_key, value).await.map_err(|e| {
2469 P2PError::Dht(crate::error::DhtError::StoreFailed(
2470 format!("{:?}: {e}", key).into(),
2471 ))
2472 })?;
2473
2474 Ok(())
2475 } else {
2476 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2477 "DHT not enabled".to_string().into(),
2478 )))
2479 }
2480 }
2481
2482 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
2488 if let Some(ref dht) = self.dht {
2489 let dht_instance = dht.read().await;
2490 let dht_key = crate::dht::DhtKey::from_bytes(key);
2491 let record_result = dht_instance.retrieve(&dht_key).await.map_err(|e| {
2492 P2PError::Dht(crate::error::DhtError::StoreFailed(
2493 format!("Retrieve failed: {e}").into(),
2494 ))
2495 })?;
2496
2497 Ok(record_result)
2498 } else {
2499 Err(P2PError::Dht(crate::error::DhtError::RoutingError(
2500 "DHT not enabled".to_string().into(),
2501 )))
2502 }
2503 }
2504
2505 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
2507 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2508 let manager = bootstrap_manager.write().await;
2509 let socket_addresses: Vec<std::net::SocketAddr> = addresses
2510 .iter()
2511 .filter_map(|addr| addr.parse().ok())
2512 .collect();
2513 let contact = ContactEntry::new(peer_id, socket_addresses);
2514 manager.add_contact(contact).await.map_err(|e| {
2515 P2PError::Network(crate::error::NetworkError::ProtocolError(
2516 format!("Failed to add peer to bootstrap cache: {e}").into(),
2517 ))
2518 })?;
2519 }
2520 Ok(())
2521 }
2522
2523 pub async fn update_peer_metrics(
2525 &self,
2526 peer_id: &PeerId,
2527 success: bool,
2528 latency_ms: Option<u64>,
2529 _error: Option<String>,
2530 ) -> Result<()> {
2531 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2532 let manager = bootstrap_manager.write().await;
2533
2534 let metrics = QualityMetrics {
2536 success_rate: if success { 1.0 } else { 0.0 },
2537 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
2538 quality_score: if success { 0.8 } else { 0.2 }, last_connection_attempt: chrono::Utc::now(),
2540 last_successful_connection: if success {
2541 chrono::Utc::now()
2542 } else {
2543 chrono::Utc::now() - chrono::Duration::hours(1)
2544 },
2545 uptime_score: 0.5,
2546 };
2547
2548 manager
2549 .update_contact_metrics(peer_id, metrics)
2550 .await
2551 .map_err(|e| {
2552 P2PError::Network(crate::error::NetworkError::ProtocolError(
2553 format!("Failed to update peer metrics: {e}").into(),
2554 ))
2555 })?;
2556 }
2557 Ok(())
2558 }
2559
2560 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
2562 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2563 let manager = bootstrap_manager.read().await;
2564 let stats = manager.get_stats().await.map_err(|e| {
2565 P2PError::Network(crate::error::NetworkError::ProtocolError(
2566 format!("Failed to get bootstrap stats: {e}").into(),
2567 ))
2568 })?;
2569 Ok(Some(stats))
2570 } else {
2571 Ok(None)
2572 }
2573 }
2574
2575 pub async fn cached_peer_count(&self) -> usize {
2577 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2578 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2579 {
2580 return stats.total_contacts;
2581 }
2582 0
2583 }
2584
2585 async fn discover_peers_from(&self, peer_id: &PeerId) -> Result<usize> {
2590 use crate::dht::network_integration::DhtMessage;
2591
2592 info!("Discovering peers from bootstrap peer: {}", peer_id);
2593
2594 let our_id_bytes = {
2597 use blake3::Hasher;
2598 let mut hasher = Hasher::new();
2599 hasher.update(self.peer_id.as_bytes());
2600 let digest = hasher.finalize();
2601 let mut bytes = [0u8; 32];
2602 bytes.copy_from_slice(digest.as_bytes());
2603 bytes
2604 };
2605 let target_key = crate::dht::DhtKey::from_bytes(our_id_bytes);
2606
2607 let find_node_msg = DhtMessage::FindNode {
2609 target: target_key,
2610 count: 20, };
2612
2613 let message_bytes = postcard::to_allocvec(&find_node_msg).map_err(|e| {
2615 P2PError::Network(NetworkError::ProtocolError(
2616 format!("Failed to serialize FIND_NODE message: {e}").into(),
2617 ))
2618 })?;
2619
2620 self.send_message(peer_id, "/dht/1.0.0", message_bytes)
2622 .await?;
2623
2624 info!("Sent FIND_NODE request to {} for peer discovery", peer_id);
2633
2634 Ok(0) }
2636
2637 async fn connect_bootstrap_peers(&self) -> Result<()> {
2639 let mut bootstrap_contacts = Vec::new();
2640 let mut used_cache = false;
2641 let mut seen_addresses = std::collections::HashSet::new();
2642
2643 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
2645 self.config.bootstrap_peers_str.clone()
2646 } else {
2647 self.config
2649 .bootstrap_peers
2650 .iter()
2651 .map(|addr| addr.to_string())
2652 .collect::<Vec<_>>()
2653 };
2654
2655 if !cli_bootstrap_peers.is_empty() {
2656 info!(
2657 "Using {} CLI-provided bootstrap peers (priority)",
2658 cli_bootstrap_peers.len()
2659 );
2660 for addr in &cli_bootstrap_peers {
2661 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
2662 seen_addresses.insert(socket_addr);
2663 let contact = ContactEntry::new(
2664 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
2665 vec![socket_addr],
2666 );
2667 bootstrap_contacts.push(contact);
2668 } else {
2669 warn!("Invalid bootstrap address format: {}", addr);
2670 }
2671 }
2672 }
2673
2674 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2677 let manager = bootstrap_manager.read().await;
2678 match manager.get_quic_bootstrap_peers(20).await {
2679 Ok(contacts) => {
2681 if !contacts.is_empty() {
2682 let mut added_from_cache = 0;
2683 for contact in contacts {
2684 let new_addresses: Vec<_> = contact
2686 .addresses
2687 .iter()
2688 .filter(|addr| !seen_addresses.contains(addr))
2689 .copied()
2690 .collect();
2691
2692 if !new_addresses.is_empty() {
2693 for addr in &new_addresses {
2694 seen_addresses.insert(*addr);
2695 }
2696 let mut contact = contact.clone();
2697 contact.addresses = new_addresses;
2698 bootstrap_contacts.push(contact);
2699 added_from_cache += 1;
2700 }
2701 }
2702 if added_from_cache > 0 {
2703 info!(
2704 "Added {} cached bootstrap peers (supplementing CLI peers)",
2705 added_from_cache
2706 );
2707 used_cache = true;
2708 }
2709 }
2710 }
2711 Err(e) => {
2712 warn!("Failed to get cached bootstrap peers: {}", e);
2713 }
2714 }
2715 }
2716
2717 if bootstrap_contacts.is_empty() {
2718 info!("No bootstrap peers configured and no cached peers available");
2719 return Ok(());
2720 }
2721
2722 let mut successful_connections = 0;
2724 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2725
2726 for contact in bootstrap_contacts {
2727 for addr in &contact.addresses {
2728 match self.connect_peer(&addr.to_string()).await {
2729 Ok(peer_id) => {
2730 info!("Connected to bootstrap peer: {} ({})", peer_id, addr);
2731 successful_connections += 1;
2732 connected_peer_ids.push(peer_id.clone());
2733
2734 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2736 let manager = bootstrap_manager.write().await;
2737 let mut updated_contact = contact.clone();
2738 updated_contact.peer_id = peer_id.clone();
2739 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
2742 warn!("Failed to update bootstrap cache: {}", e);
2743 }
2744 }
2745 break; }
2747 Err(e) => {
2748 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2749
2750 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2752 let manager = bootstrap_manager.write().await;
2753 let mut updated_contact = contact.clone();
2754 updated_contact.update_connection_result(
2755 false,
2756 None,
2757 Some(e.to_string()),
2758 );
2759
2760 if let Err(e) = manager.add_contact(updated_contact).await {
2761 warn!("Failed to update bootstrap cache: {}", e);
2762 }
2763 }
2764 }
2765 }
2766 }
2767 }
2768
2769 if successful_connections == 0 {
2770 if !used_cache {
2771 warn!("Failed to connect to any bootstrap peers");
2772 }
2773 return Ok(());
2776 }
2777
2778 info!(
2779 "Successfully connected to {} bootstrap peers",
2780 successful_connections
2781 );
2782
2783 for peer_id in &connected_peer_ids {
2786 match self.discover_peers_from(peer_id).await {
2787 Ok(_) => {
2788 info!("Peer discovery initiated from bootstrap peer: {}", peer_id);
2789 }
2790 Err(e) => {
2791 warn!("Failed to discover peers from {}: {}", peer_id, e);
2792 }
2793 }
2794 }
2795
2796 self.is_bootstrapped.store(true, Ordering::SeqCst);
2799 info!(
2800 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2801 successful_connections,
2802 connected_peer_ids.len()
2803 );
2804
2805 Ok(())
2806 }
2807
2808 async fn disconnect_all_peers(&self) -> Result<()> {
2810 let peer_ids: Vec<PeerId> = self.peers.read().await.keys().cloned().collect();
2811
2812 for peer_id in peer_ids {
2813 self.disconnect_peer(&peer_id).await?;
2814 }
2815
2816 Ok(())
2817 }
2818
2819 async fn periodic_tasks(&self) -> Result<()> {
2826 let stale_threshold = self.config.stale_peer_threshold;
2828 let cleanup_threshold = stale_threshold * 2;
2830
2831 let now = Instant::now();
2832 let mut peers_to_remove: Vec<PeerId> = Vec::new();
2833 let mut peers_to_mark_disconnected: Vec<PeerId> = Vec::new();
2834
2835 {
2837 let peers = self.peers.read().await;
2838 for (peer_id, peer_info) in peers.iter() {
2839 let elapsed = now.duration_since(peer_info.last_seen);
2840
2841 match &peer_info.status {
2842 ConnectionStatus::Connected => {
2843 if elapsed > stale_threshold {
2845 debug!(
2846 peer_id = %peer_id,
2847 elapsed_secs = elapsed.as_secs(),
2848 "Peer went stale - marking for disconnection"
2849 );
2850 peers_to_mark_disconnected.push(peer_id.clone());
2851 }
2852 }
2853 ConnectionStatus::Disconnected | ConnectionStatus::Failed(_) => {
2854 if elapsed > cleanup_threshold {
2856 trace!(
2857 peer_id = %peer_id,
2858 elapsed_secs = elapsed.as_secs(),
2859 "Removing disconnected peer from tracking"
2860 );
2861 peers_to_remove.push(peer_id.clone());
2862 }
2863 }
2864 ConnectionStatus::Connecting | ConnectionStatus::Disconnecting => {
2865 if elapsed > stale_threshold {
2867 debug!(
2868 peer_id = %peer_id,
2869 status = ?peer_info.status,
2870 "Connection timed out in transitional state"
2871 );
2872 peers_to_mark_disconnected.push(peer_id.clone());
2873 }
2874 }
2875 }
2876 }
2877 }
2878
2879 if !peers_to_mark_disconnected.is_empty() {
2881 let mut peers = self.peers.write().await;
2882 for peer_id in &peers_to_mark_disconnected {
2883 if let Some(peer_info) = peers.get_mut(peer_id) {
2884 peer_info.status = ConnectionStatus::Disconnected;
2885 }
2887 }
2888 }
2889
2890 for peer_id in &peers_to_mark_disconnected {
2892 self.active_connections.write().await.remove(peer_id);
2894
2895 let _ = self
2897 .event_tx
2898 .send(P2PEvent::PeerDisconnected(peer_id.clone()));
2899
2900 info!(
2901 peer_id = %peer_id,
2902 "Stale peer disconnected"
2903 );
2904 }
2905
2906 if !peers_to_remove.is_empty() {
2908 let mut peers = self.peers.write().await;
2909 for peer_id in &peers_to_remove {
2910 peers.remove(peer_id);
2911 trace!(peer_id = %peer_id, "Peer removed from tracking");
2912 }
2913 }
2914
2915 Ok(())
2916 }
2917}
2918
2919#[async_trait::async_trait]
2921pub trait NetworkSender: Send + Sync {
2922 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2924
2925 fn local_peer_id(&self) -> &PeerId;
2927}
2928
2929#[derive(Clone)]
2931pub struct P2PNetworkSender {
2932 peer_id: PeerId,
2933 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2935}
2936
2937impl P2PNetworkSender {
2938 pub fn new(
2939 peer_id: PeerId,
2940 send_tx: tokio::sync::mpsc::UnboundedSender<(PeerId, String, Vec<u8>)>,
2941 ) -> Self {
2942 Self { peer_id, send_tx }
2943 }
2944}
2945
2946#[async_trait::async_trait]
2948impl NetworkSender for P2PNetworkSender {
2949 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()> {
2951 self.send_tx
2952 .send((peer_id.clone(), protocol.to_string(), data))
2953 .map_err(|_| {
2954 P2PError::Network(crate::error::NetworkError::ProtocolError(
2955 "Failed to send message via channel".to_string().into(),
2956 ))
2957 })?;
2958 Ok(())
2959 }
2960
2961 fn local_peer_id(&self) -> &PeerId {
2963 &self.peer_id
2964 }
2965}
2966
2967pub struct NodeBuilder {
2969 config: NodeConfig,
2970}
2971
2972impl Default for NodeBuilder {
2973 fn default() -> Self {
2974 Self::new()
2975 }
2976}
2977
2978impl NodeBuilder {
2979 pub fn new() -> Self {
2981 Self {
2982 config: NodeConfig::default(),
2983 }
2984 }
2985
2986 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
2988 self.config.peer_id = Some(peer_id);
2989 self
2990 }
2991
2992 pub fn listen_on(mut self, addr: &str) -> Self {
2994 if let Ok(multiaddr) = addr.parse() {
2995 self.config.listen_addrs.push(multiaddr);
2996 }
2997 self
2998 }
2999
3000 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
3002 if let Ok(multiaddr) = addr.parse() {
3003 self.config.bootstrap_peers.push(multiaddr);
3004 }
3005 self.config.bootstrap_peers_str.push(addr.to_string());
3006 self
3007 }
3008
3009 pub fn with_ipv6(mut self, enable: bool) -> Self {
3011 self.config.enable_ipv6 = enable;
3012 self
3013 }
3014
3015 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
3019 self.config.connection_timeout = timeout;
3020 self
3021 }
3022
3023 pub fn with_max_connections(mut self, max: usize) -> Self {
3025 self.config.max_connections = max;
3026 self
3027 }
3028
3029 pub fn with_production_mode(mut self) -> Self {
3031 self.config.production_config = Some(ProductionConfig::default());
3032 self
3033 }
3034
3035 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
3037 self.config.production_config = Some(production_config);
3038 self
3039 }
3040
3041 pub fn with_diversity_config(
3043 mut self,
3044 diversity_config: crate::security::IPDiversityConfig,
3045 ) -> Self {
3046 self.config.diversity_config = Some(diversity_config);
3047 self
3048 }
3049
3050 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
3052 self.config.dht_config = dht_config;
3053 self
3054 }
3055
3056 pub fn with_default_dht(mut self) -> Self {
3058 self.config.dht_config = DHTConfig::default();
3059 self
3060 }
3061
3062 pub async fn build(self) -> Result<P2PNode> {
3064 P2PNode::new(self.config).await
3065 }
3066}
3067
3068#[cfg(test)]
3069#[allow(clippy::unwrap_used, clippy::expect_used)]
3070mod diversity_tests {
3071 use super::*;
3072 use crate::security::IPDiversityConfig;
3073
3074 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
3075 let diversity_config = config.diversity_config.clone().unwrap_or_default();
3076 let temp_dir = tempfile::TempDir::new().expect("temp dir");
3078 let mut cache_config = config.bootstrap_cache_config.clone().unwrap_or_default();
3079 cache_config.cache_dir = temp_dir.path().to_path_buf();
3080
3081 BootstrapManager::with_full_config(
3082 cache_config,
3083 crate::rate_limit::JoinRateLimiterConfig::default(),
3084 diversity_config,
3085 )
3086 .await
3087 .expect("bootstrap manager")
3088 }
3089
3090 #[tokio::test]
3091 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
3092 let config = NodeConfig {
3093 diversity_config: Some(IPDiversityConfig::testnet()),
3094 ..Default::default()
3095 };
3096
3097 let manager = build_bootstrap_manager_like_prod(&config).await;
3098 assert!(manager.diversity_config().is_relaxed());
3099 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
3100 }
3101}
3102
3103async fn register_new_peer(
3105 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
3106 peer_id: &PeerId,
3107 remote_addr: &NetworkAddress,
3108) {
3109 let mut peers_guard = peers.write().await;
3110 let peer_info = PeerInfo {
3111 peer_id: peer_id.clone(),
3112 addresses: vec![remote_addr.to_string()],
3113 connected_at: tokio::time::Instant::now(),
3114 last_seen: tokio::time::Instant::now(),
3115 status: ConnectionStatus::Connected,
3116 protocols: vec!["p2p-core/1.0.0".to_string()],
3117 heartbeat_count: 0,
3118 };
3119 peers_guard.insert(peer_id.clone(), peer_info);
3120}
3121
3122#[cfg(test)]
3123mod tests {
3124 use super::*;
3125 use std::time::Duration;
3127 use tokio::time::timeout;
3128
3129 fn create_test_node_config() -> NodeConfig {
3135 NodeConfig {
3136 peer_id: Some("test_peer_123".to_string()),
3137 listen_addrs: vec![
3138 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
3139 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
3140 ],
3141 listen_addr: std::net::SocketAddr::new(
3142 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3143 0,
3144 ),
3145 bootstrap_peers: vec![],
3146 bootstrap_peers_str: vec![],
3147 enable_ipv6: true,
3148
3149 connection_timeout: Duration::from_secs(2),
3150 keep_alive_interval: Duration::from_secs(30),
3151 max_connections: 100,
3152 max_incoming_connections: 50,
3153 dht_config: DHTConfig::default(),
3154 security_config: SecurityConfig::default(),
3155 production_config: None,
3156 bootstrap_cache_config: None,
3157 diversity_config: None,
3158 stale_peer_threshold: default_stale_peer_threshold(),
3159 }
3160 }
3161
3162 #[tokio::test]
3166 async fn test_node_config_default() {
3167 let config = NodeConfig::default();
3168
3169 assert!(config.peer_id.is_none());
3170 assert_eq!(config.listen_addrs.len(), 2);
3171 assert!(config.enable_ipv6);
3172 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
3174 assert_eq!(config.connection_timeout, Duration::from_secs(30));
3175 }
3176
3177 #[tokio::test]
3178 async fn test_dht_config_default() {
3179 let config = DHTConfig::default();
3180
3181 assert_eq!(config.k_value, 20);
3182 assert_eq!(config.alpha_value, 5);
3183 assert_eq!(config.record_ttl, Duration::from_secs(3600));
3184 assert_eq!(config.refresh_interval, Duration::from_secs(600));
3185 }
3186
3187 #[tokio::test]
3188 async fn test_security_config_default() {
3189 let config = SecurityConfig::default();
3190
3191 assert!(config.enable_noise);
3192 assert!(config.enable_tls);
3193 assert_eq!(config.trust_level, TrustLevel::Basic);
3194 }
3195
3196 #[test]
3197 fn test_trust_level_variants() {
3198 let _none = TrustLevel::None;
3200 let _basic = TrustLevel::Basic;
3201 let _full = TrustLevel::Full;
3202
3203 assert_eq!(TrustLevel::None, TrustLevel::None);
3205 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
3206 assert_eq!(TrustLevel::Full, TrustLevel::Full);
3207 assert_ne!(TrustLevel::None, TrustLevel::Basic);
3208 }
3209
3210 #[test]
3211 fn test_connection_status_variants() {
3212 let connecting = ConnectionStatus::Connecting;
3213 let connected = ConnectionStatus::Connected;
3214 let disconnecting = ConnectionStatus::Disconnecting;
3215 let disconnected = ConnectionStatus::Disconnected;
3216 let failed = ConnectionStatus::Failed("test error".to_string());
3217
3218 assert_eq!(connecting, ConnectionStatus::Connecting);
3219 assert_eq!(connected, ConnectionStatus::Connected);
3220 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
3221 assert_eq!(disconnected, ConnectionStatus::Disconnected);
3222 assert_ne!(connecting, connected);
3223
3224 if let ConnectionStatus::Failed(msg) = failed {
3225 assert_eq!(msg, "test error");
3226 } else {
3227 panic!("Expected Failed status");
3228 }
3229 }
3230
3231 #[tokio::test]
3232 async fn test_node_creation() -> Result<()> {
3233 let config = create_test_node_config();
3234 let node = P2PNode::new(config).await?;
3235
3236 assert_eq!(node.peer_id(), "test_peer_123");
3237 assert!(!node.is_running().await);
3238 assert_eq!(node.peer_count().await, 0);
3239 assert!(node.connected_peers().await.is_empty());
3240
3241 Ok(())
3242 }
3243
3244 #[tokio::test]
3245 async fn test_node_creation_without_peer_id() -> Result<()> {
3246 let mut config = create_test_node_config();
3247 config.peer_id = None;
3248
3249 let node = P2PNode::new(config).await?;
3250
3251 assert!(node.peer_id().starts_with("peer_"));
3253 assert!(!node.is_running().await);
3254
3255 Ok(())
3256 }
3257
3258 #[tokio::test]
3259 async fn test_node_lifecycle() -> Result<()> {
3260 let config = create_test_node_config();
3261 let node = P2PNode::new(config).await?;
3262
3263 assert!(!node.is_running().await);
3265
3266 node.start().await?;
3268 assert!(node.is_running().await);
3269
3270 let listen_addrs = node.listen_addrs().await;
3272 assert!(
3273 !listen_addrs.is_empty(),
3274 "Expected at least one listening address"
3275 );
3276
3277 node.stop().await?;
3279 assert!(!node.is_running().await);
3280
3281 Ok(())
3282 }
3283
3284 #[tokio::test]
3285 async fn test_peer_connection() -> Result<()> {
3286 let config1 = create_test_node_config();
3287 let mut config2 = create_test_node_config();
3288 config2.peer_id = Some("test_peer_456".to_string());
3289
3290 let node1 = P2PNode::new(config1).await?;
3291 let node2 = P2PNode::new(config2).await?;
3292
3293 node1.start().await?;
3294 node2.start().await?;
3295
3296 let node2_addr = node2
3297 .listen_addrs()
3298 .await
3299 .into_iter()
3300 .find(|a| a.ip().is_ipv4())
3301 .ok_or_else(|| {
3302 P2PError::Network(crate::error::NetworkError::InvalidAddress(
3303 "Node 2 did not expose an IPv4 listen address".into(),
3304 ))
3305 })?;
3306
3307 let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
3309
3310 assert_eq!(node1.peer_count().await, 1);
3312
3313 let connected_peers = node1.connected_peers().await;
3315 assert_eq!(connected_peers.len(), 1);
3316 assert_eq!(connected_peers[0], peer_id);
3317
3318 let peer_info = node1.peer_info(&peer_id).await;
3320 assert!(peer_info.is_some());
3321 let info = peer_info.expect("Peer info should exist after adding peer");
3322 assert_eq!(info.peer_id, peer_id);
3323 assert_eq!(info.status, ConnectionStatus::Connected);
3324 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
3325
3326 node1.disconnect_peer(&peer_id).await?;
3328 assert_eq!(node1.peer_count().await, 0);
3329
3330 node1.stop().await?;
3331 node2.stop().await?;
3332
3333 Ok(())
3334 }
3335
3336 #[cfg_attr(target_os = "windows", ignore)]
3343 #[tokio::test]
3344 async fn test_event_subscription() -> Result<()> {
3345 let ipv4_localhost =
3350 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3351
3352 let mut config1 = create_test_node_config();
3353 config1.listen_addr = ipv4_localhost;
3354 config1.listen_addrs = vec![ipv4_localhost];
3355 config1.enable_ipv6 = false;
3356
3357 let mut config2 = create_test_node_config();
3358 config2.peer_id = Some("test_peer_456".to_string());
3359 config2.listen_addr = ipv4_localhost;
3360 config2.listen_addrs = vec![ipv4_localhost];
3361 config2.enable_ipv6 = false;
3362
3363 let node1 = P2PNode::new(config1).await?;
3364 let node2 = P2PNode::new(config2).await?;
3365
3366 node1.start().await?;
3367 node2.start().await?;
3368
3369 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
3372
3373 let mut events = node1.subscribe_events();
3374
3375 let node2_addr = node2.local_addr().ok_or_else(|| {
3377 P2PError::Network(crate::error::NetworkError::ProtocolError(
3378 "No listening address".to_string().into(),
3379 ))
3380 })?;
3381
3382 let mut peer_id = None;
3385 for attempt in 0..3 {
3386 if attempt > 0 {
3387 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
3388 }
3389 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
3390 Ok(Ok(id)) => {
3391 peer_id = Some(id);
3392 break;
3393 }
3394 Ok(Err(_)) | Err(_) => continue,
3395 }
3396 }
3397 let peer_id = peer_id.ok_or_else(|| {
3398 P2PError::Network(crate::error::NetworkError::ProtocolError(
3399 "Failed to connect after 3 attempts".to_string().into(),
3400 ))
3401 })?;
3402
3403 let event = timeout(Duration::from_secs(2), events.recv()).await;
3405 assert!(event.is_ok());
3406
3407 let event_result = event
3408 .expect("Should receive event")
3409 .expect("Event should not be error");
3410 match event_result {
3411 P2PEvent::PeerConnected(event_peer_id) => {
3412 assert_eq!(event_peer_id, peer_id);
3413 }
3414 _ => panic!("Expected PeerConnected event"),
3415 }
3416
3417 node1.disconnect_peer(&peer_id).await?;
3419
3420 let event = timeout(Duration::from_secs(2), events.recv()).await;
3422 assert!(event.is_ok());
3423
3424 let event_result = event
3425 .expect("Should receive event")
3426 .expect("Event should not be error");
3427 match event_result {
3428 P2PEvent::PeerDisconnected(event_peer_id) => {
3429 assert_eq!(event_peer_id, peer_id);
3430 }
3431 _ => panic!("Expected PeerDisconnected event"),
3432 }
3433
3434 node1.stop().await?;
3435 node2.stop().await?;
3436
3437 Ok(())
3438 }
3439
3440 #[cfg_attr(target_os = "windows", ignore)]
3442 #[tokio::test]
3443 async fn test_message_sending() -> Result<()> {
3444 let mut config1 = create_test_node_config();
3446 config1.listen_addr =
3447 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3448 let node1 = P2PNode::new(config1).await?;
3449 node1.start().await?;
3450
3451 let mut config2 = create_test_node_config();
3452 config2.listen_addr =
3453 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
3454 let node2 = P2PNode::new(config2).await?;
3455 node2.start().await?;
3456
3457 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3459
3460 let node2_addr = node2.local_addr().ok_or_else(|| {
3462 P2PError::Network(crate::error::NetworkError::ProtocolError(
3463 "No listening address".to_string().into(),
3464 ))
3465 })?;
3466
3467 let peer_id =
3469 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
3470 Ok(res) => res?,
3471 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3472 };
3473
3474 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
3476
3477 let message_data = b"Hello, peer!".to_vec();
3479 let result = match timeout(
3480 Duration::from_millis(500),
3481 node1.send_message(&peer_id, "test-protocol", message_data),
3482 )
3483 .await
3484 {
3485 Ok(res) => res,
3486 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
3487 };
3488 if let Err(e) = &result {
3491 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
3492 }
3493
3494 let non_existent_peer = "non_existent_peer".to_string();
3496 let result = node1
3497 .send_message(&non_existent_peer, "test-protocol", vec![])
3498 .await;
3499 assert!(result.is_err(), "Sending to non-existent peer should fail");
3500
3501 Ok(())
3502 }
3503
3504 #[tokio::test]
3505 async fn test_remote_mcp_operations() -> Result<()> {
3506 let config = create_test_node_config();
3507 let node = P2PNode::new(config).await?;
3508
3509 node.start().await?;
3511 node.stop().await?;
3512 Ok(())
3513 }
3514
3515 #[tokio::test]
3516 async fn test_health_check() -> Result<()> {
3517 let config = create_test_node_config();
3518 let node = P2PNode::new(config).await?;
3519
3520 let result = node.health_check().await;
3522 assert!(result.is_ok());
3523
3524 Ok(())
3529 }
3530
3531 #[tokio::test]
3532 async fn test_node_uptime() -> Result<()> {
3533 let config = create_test_node_config();
3534 let node = P2PNode::new(config).await?;
3535
3536 let uptime1 = node.uptime();
3537 assert!(uptime1 >= Duration::from_secs(0));
3538
3539 tokio::time::sleep(Duration::from_millis(10)).await;
3541
3542 let uptime2 = node.uptime();
3543 assert!(uptime2 > uptime1);
3544
3545 Ok(())
3546 }
3547
3548 #[tokio::test]
3549 async fn test_node_config_access() -> Result<()> {
3550 let config = create_test_node_config();
3551 let expected_peer_id = config.peer_id.clone();
3552 let node = P2PNode::new(config).await?;
3553
3554 let node_config = node.config();
3555 assert_eq!(node_config.peer_id, expected_peer_id);
3556 assert_eq!(node_config.max_connections, 100);
3557 Ok(())
3560 }
3561
3562 #[tokio::test]
3563 async fn test_mcp_server_access() -> Result<()> {
3564 let config = create_test_node_config();
3565 let _node = P2PNode::new(config).await?;
3566
3567 Ok(())
3569 }
3570
3571 #[tokio::test]
3572 async fn test_dht_access() -> Result<()> {
3573 let config = create_test_node_config();
3574 let node = P2PNode::new(config).await?;
3575
3576 assert!(node.dht().is_some());
3578
3579 Ok(())
3580 }
3581
3582 #[tokio::test]
3583 async fn test_node_builder() -> Result<()> {
3584 let builder = P2PNode::builder()
3586 .with_peer_id("builder_test_peer".to_string())
3587 .listen_on("/ip4/127.0.0.1/tcp/0")
3588 .listen_on("/ip6/::1/tcp/0")
3589 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
3591 .with_connection_timeout(Duration::from_secs(15))
3592 .with_max_connections(200);
3593
3594 let config = builder.config;
3596 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
3597 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
3600 assert_eq!(config.connection_timeout, Duration::from_secs(15));
3601 assert_eq!(config.max_connections, 200);
3602
3603 Ok(())
3604 }
3605
3606 #[tokio::test]
3607 async fn test_bootstrap_peers() -> Result<()> {
3608 let mut config = create_test_node_config();
3609 config.bootstrap_peers = vec![
3610 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
3611 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
3612 ];
3613
3614 let node = P2PNode::new(config).await?;
3615
3616 node.start().await?;
3618
3619 let _peer_count = node.peer_count().await;
3623
3624 node.stop().await?;
3625 Ok(())
3626 }
3627
3628 #[tokio::test]
3629 async fn test_production_mode_disabled() -> Result<()> {
3630 let config = create_test_node_config();
3631 let node = P2PNode::new(config).await?;
3632
3633 assert!(!node.is_production_mode());
3634 assert!(node.production_config().is_none());
3635
3636 let result = node.resource_metrics().await;
3638 assert!(result.is_err());
3639 assert!(result.unwrap_err().to_string().contains("not enabled"));
3640
3641 Ok(())
3642 }
3643
3644 #[tokio::test]
3645 async fn test_network_event_variants() {
3646 let peer_id = "test_peer".to_string();
3648 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
3649
3650 let _peer_connected = NetworkEvent::PeerConnected {
3651 peer_id: peer_id.clone(),
3652 addresses: vec![address.clone()],
3653 };
3654
3655 let _peer_disconnected = NetworkEvent::PeerDisconnected {
3656 peer_id: peer_id.clone(),
3657 reason: "test disconnect".to_string(),
3658 };
3659
3660 let _message_received = NetworkEvent::MessageReceived {
3661 peer_id: peer_id.clone(),
3662 protocol: "test-protocol".to_string(),
3663 data: vec![1, 2, 3],
3664 };
3665
3666 let _connection_failed = NetworkEvent::ConnectionFailed {
3667 peer_id: Some(peer_id.clone()),
3668 address: address.clone(),
3669 error: "connection refused".to_string(),
3670 };
3671
3672 let _dht_stored = NetworkEvent::DHTRecordStored {
3673 key: vec![1, 2, 3],
3674 value: vec![4, 5, 6],
3675 };
3676
3677 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
3678 key: vec![1, 2, 3],
3679 value: Some(vec![4, 5, 6]),
3680 };
3681 }
3682
3683 #[tokio::test]
3684 async fn test_peer_info_structure() {
3685 let peer_info = PeerInfo {
3686 peer_id: "test_peer".to_string(),
3687 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
3688 connected_at: Instant::now(),
3689 last_seen: Instant::now(),
3690 status: ConnectionStatus::Connected,
3691 protocols: vec!["test-protocol".to_string()],
3692 heartbeat_count: 0,
3693 };
3694
3695 assert_eq!(peer_info.peer_id, "test_peer");
3696 assert_eq!(peer_info.addresses.len(), 1);
3697 assert_eq!(peer_info.status, ConnectionStatus::Connected);
3698 assert_eq!(peer_info.protocols.len(), 1);
3699 }
3700
3701 #[tokio::test]
3702 async fn test_serialization() -> Result<()> {
3703 let config = create_test_node_config();
3705 let serialized = serde_json::to_string(&config)?;
3706 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
3707
3708 assert_eq!(config.peer_id, deserialized.peer_id);
3709 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
3710 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
3711
3712 Ok(())
3713 }
3714
3715 #[tokio::test]
3716 async fn test_get_peer_id_by_address_found() -> Result<()> {
3717 let config = create_test_node_config();
3718 let node = P2PNode::new(config).await?;
3719
3720 let test_peer_id = "peer_test_123".to_string();
3722 let test_address = "192.168.1.100:9000".to_string();
3723
3724 let peer_info = PeerInfo {
3725 peer_id: test_peer_id.clone(),
3726 addresses: vec![test_address.clone()],
3727 connected_at: Instant::now(),
3728 last_seen: Instant::now(),
3729 status: ConnectionStatus::Connected,
3730 protocols: vec!["test-protocol".to_string()],
3731 heartbeat_count: 0,
3732 };
3733
3734 node.peers
3735 .write()
3736 .await
3737 .insert(test_peer_id.clone(), peer_info);
3738
3739 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
3741 assert_eq!(found_peer_id, Some(test_peer_id));
3742
3743 Ok(())
3744 }
3745
3746 #[tokio::test]
3747 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
3748 let config = create_test_node_config();
3749 let node = P2PNode::new(config).await?;
3750
3751 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
3753 assert_eq!(result, None);
3754
3755 Ok(())
3756 }
3757
3758 #[tokio::test]
3759 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
3760 let config = create_test_node_config();
3761 let node = P2PNode::new(config).await?;
3762
3763 let result = node.get_peer_id_by_address("invalid-address").await;
3765 assert_eq!(result, None);
3766
3767 Ok(())
3768 }
3769
3770 #[tokio::test]
3771 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
3772 let config = create_test_node_config();
3773 let node = P2PNode::new(config).await?;
3774
3775 let peer1_id = "peer_1".to_string();
3777 let peer1_addr = "192.168.1.101:9001".to_string();
3778
3779 let peer2_id = "peer_2".to_string();
3780 let peer2_addr = "192.168.1.102:9002".to_string();
3781
3782 let peer1_info = PeerInfo {
3783 peer_id: peer1_id.clone(),
3784 addresses: vec![peer1_addr.clone()],
3785 connected_at: Instant::now(),
3786 last_seen: Instant::now(),
3787 status: ConnectionStatus::Connected,
3788 protocols: vec!["test-protocol".to_string()],
3789 heartbeat_count: 0,
3790 };
3791
3792 let peer2_info = PeerInfo {
3793 peer_id: peer2_id.clone(),
3794 addresses: vec![peer2_addr.clone()],
3795 connected_at: Instant::now(),
3796 last_seen: Instant::now(),
3797 status: ConnectionStatus::Connected,
3798 protocols: vec!["test-protocol".to_string()],
3799 heartbeat_count: 0,
3800 };
3801
3802 node.peers
3803 .write()
3804 .await
3805 .insert(peer1_id.clone(), peer1_info);
3806 node.peers
3807 .write()
3808 .await
3809 .insert(peer2_id.clone(), peer2_info);
3810
3811 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
3813 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
3814
3815 assert_eq!(found_peer1, Some(peer1_id));
3816 assert_eq!(found_peer2, Some(peer2_id));
3817
3818 Ok(())
3819 }
3820
3821 #[tokio::test]
3822 async fn test_list_active_connections_empty() -> Result<()> {
3823 let config = create_test_node_config();
3824 let node = P2PNode::new(config).await?;
3825
3826 let connections = node.list_active_connections().await;
3828 assert!(connections.is_empty());
3829
3830 Ok(())
3831 }
3832
3833 #[tokio::test]
3834 async fn test_list_active_connections_with_peers() -> Result<()> {
3835 let config = create_test_node_config();
3836 let node = P2PNode::new(config).await?;
3837
3838 let peer1_id = "peer_1".to_string();
3840 let peer1_addrs = vec![
3841 "192.168.1.101:9001".to_string(),
3842 "192.168.1.101:9002".to_string(),
3843 ];
3844
3845 let peer2_id = "peer_2".to_string();
3846 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
3847
3848 let peer1_info = PeerInfo {
3849 peer_id: peer1_id.clone(),
3850 addresses: peer1_addrs.clone(),
3851 connected_at: Instant::now(),
3852 last_seen: Instant::now(),
3853 status: ConnectionStatus::Connected,
3854 protocols: vec!["test-protocol".to_string()],
3855 heartbeat_count: 0,
3856 };
3857
3858 let peer2_info = PeerInfo {
3859 peer_id: peer2_id.clone(),
3860 addresses: peer2_addrs.clone(),
3861 connected_at: Instant::now(),
3862 last_seen: Instant::now(),
3863 status: ConnectionStatus::Connected,
3864 protocols: vec!["test-protocol".to_string()],
3865 heartbeat_count: 0,
3866 };
3867
3868 node.peers
3869 .write()
3870 .await
3871 .insert(peer1_id.clone(), peer1_info);
3872 node.peers
3873 .write()
3874 .await
3875 .insert(peer2_id.clone(), peer2_info);
3876
3877 node.active_connections
3879 .write()
3880 .await
3881 .insert(peer1_id.clone());
3882 node.active_connections
3883 .write()
3884 .await
3885 .insert(peer2_id.clone());
3886
3887 let connections = node.list_active_connections().await;
3889 assert_eq!(connections.len(), 2);
3890
3891 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3893 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3894
3895 assert!(peer1_conn.is_some());
3896 assert!(peer2_conn.is_some());
3897
3898 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3900 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3901
3902 Ok(())
3903 }
3904
3905 #[tokio::test]
3906 async fn test_remove_peer_success() -> Result<()> {
3907 let config = create_test_node_config();
3908 let node = P2PNode::new(config).await?;
3909
3910 let peer_id = "peer_to_remove".to_string();
3912 let peer_info = PeerInfo {
3913 peer_id: peer_id.clone(),
3914 addresses: vec!["192.168.1.100:9000".to_string()],
3915 connected_at: Instant::now(),
3916 last_seen: Instant::now(),
3917 status: ConnectionStatus::Connected,
3918 protocols: vec!["test-protocol".to_string()],
3919 heartbeat_count: 0,
3920 };
3921
3922 node.peers.write().await.insert(peer_id.clone(), peer_info);
3923
3924 assert!(node.is_peer_connected(&peer_id).await);
3926
3927 let removed = node.remove_peer(&peer_id).await;
3929 assert!(removed);
3930
3931 assert!(!node.is_peer_connected(&peer_id).await);
3933
3934 Ok(())
3935 }
3936
3937 #[tokio::test]
3938 async fn test_remove_peer_nonexistent() -> Result<()> {
3939 let config = create_test_node_config();
3940 let node = P2PNode::new(config).await?;
3941
3942 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
3944 assert!(!removed);
3945
3946 Ok(())
3947 }
3948
3949 #[tokio::test]
3950 async fn test_is_peer_connected() -> Result<()> {
3951 let config = create_test_node_config();
3952 let node = P2PNode::new(config).await?;
3953
3954 let peer_id = "test_peer".to_string();
3955
3956 assert!(!node.is_peer_connected(&peer_id).await);
3958
3959 let peer_info = PeerInfo {
3961 peer_id: peer_id.clone(),
3962 addresses: vec!["192.168.1.100:9000".to_string()],
3963 connected_at: Instant::now(),
3964 last_seen: Instant::now(),
3965 status: ConnectionStatus::Connected,
3966 protocols: vec!["test-protocol".to_string()],
3967 heartbeat_count: 0,
3968 };
3969
3970 node.peers.write().await.insert(peer_id.clone(), peer_info);
3971
3972 assert!(node.is_peer_connected(&peer_id).await);
3974
3975 node.remove_peer(&peer_id).await;
3977
3978 assert!(!node.is_peer_connected(&peer_id).await);
3980
3981 Ok(())
3982 }
3983
3984 #[test]
3985 fn test_normalize_ipv6_wildcard() {
3986 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3987
3988 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3989 let normalized = normalize_wildcard_to_loopback(wildcard);
3990
3991 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3992 assert_eq!(normalized.port(), 8080);
3993 }
3994
3995 #[test]
3996 fn test_normalize_ipv4_wildcard() {
3997 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3998
3999 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
4000 let normalized = normalize_wildcard_to_loopback(wildcard);
4001
4002 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
4003 assert_eq!(normalized.port(), 9000);
4004 }
4005
4006 #[test]
4007 fn test_normalize_specific_address_unchanged() {
4008 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
4009 let normalized = normalize_wildcard_to_loopback(specific);
4010
4011 assert_eq!(normalized, specific);
4012 }
4013
4014 #[test]
4015 fn test_normalize_loopback_unchanged() {
4016 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
4017
4018 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
4019 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
4020 assert_eq!(normalized_v6, loopback_v6);
4021
4022 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
4023 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
4024 assert_eq!(normalized_v4, loopback_v4);
4025 }
4026
4027 fn current_timestamp() -> u64 {
4031 std::time::SystemTime::now()
4032 .duration_since(std::time::UNIX_EPOCH)
4033 .map(|d| d.as_secs())
4034 .unwrap_or(0)
4035 }
4036
4037 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
4039 let msg = WireMessage {
4040 protocol: protocol.to_string(),
4041 data,
4042 from: from.to_string(),
4043 timestamp,
4044 };
4045 postcard::to_stdvec(&msg).unwrap()
4046 }
4047
4048 #[test]
4049 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
4050 let transport_id = "abcdef0123456789";
4054 let logical_id = "spoofed-logical-id";
4055 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
4056
4057 let event =
4058 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
4059
4060 match event {
4061 P2PEvent::Message {
4062 topic,
4063 source,
4064 data,
4065 } => {
4066 assert_eq!(source, transport_id, "source must be the transport peer ID");
4067 assert_ne!(
4068 source, logical_id,
4069 "source must NOT be the logical 'from' field"
4070 );
4071 assert_eq!(topic, "test/v1");
4072 assert_eq!(data, vec![1u8, 2, 3]);
4073 }
4074 other => panic!("expected P2PEvent::Message, got {:?}", other),
4075 }
4076 }
4077
4078 #[test]
4079 fn test_parse_protocol_message_rejects_invalid_bytes() {
4080 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
4082 }
4083
4084 #[test]
4085 fn test_parse_protocol_message_rejects_truncated_message() {
4086 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
4088 let truncated = &full_bytes[..full_bytes.len() / 2];
4089 assert!(parse_protocol_message(truncated, "peer-id").is_none());
4090 }
4091
4092 #[test]
4093 fn test_parse_protocol_message_empty_payload() {
4094 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
4095
4096 let event = parse_protocol_message(&bytes, "transport-peer")
4097 .expect("valid message with empty data should parse");
4098
4099 match event {
4100 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
4101 other => panic!("expected P2PEvent::Message, got {:?}", other),
4102 }
4103 }
4104
4105 #[test]
4106 fn test_parse_protocol_message_preserves_binary_payload() {
4107 let payload: Vec<u8> = (0..=255).collect();
4109 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
4110
4111 let event = parse_protocol_message(&bytes, "peer-id")
4112 .expect("valid message with full byte range should parse");
4113
4114 match event {
4115 P2PEvent::Message { data, topic, .. } => {
4116 assert_eq!(topic, "binary/v1");
4117 assert_eq!(
4118 data, payload,
4119 "payload must survive bincode round-trip exactly"
4120 );
4121 }
4122 other => panic!("expected P2PEvent::Message, got {:?}", other),
4123 }
4124 }
4125}