1use crate::adaptive::{
20 EigenTrustEngine, NodeId, NodeId as AdaptiveNodeId, NodeStatisticsUpdate, TrustProvider,
21};
22use crate::bootstrap::{BootstrapManager, ContactEntry, QualityMetrics};
23use crate::config::Config;
24use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
25use crate::error::{NetworkError, P2PError, P2pResult as Result, PeerFailureReason};
26
27use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics};
28use crate::{NetworkAddress, PeerId};
29use serde::{Deserialize, Serialize};
30use std::collections::{HashMap, HashSet};
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::time::Duration;
34use tokio::sync::{RwLock, broadcast};
35use tokio::time::Instant;
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, info, warn};
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
45pub(crate) struct WireMessage {
46 pub(crate) protocol: String,
48 pub(crate) data: Vec<u8>,
50 pub(crate) from: String,
52 pub(crate) timestamp: u64,
54}
55
56pub(crate) const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive";
58
59pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
61
62pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
64
65pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
67
68const DEFAULT_LISTEN_PORT: u16 = 9000;
70
71const DHT_MAX_DISTANCE: u8 = 160;
73
74const RUN_LOOP_TICK_INTERVAL_MS: u64 = 100;
76
77const BOOTSTRAP_QUALITY_SCORE_SUCCESS: f64 = 0.8;
79const BOOTSTRAP_QUALITY_SCORE_FAILURE: f64 = 0.2;
81const BOOTSTRAP_DEFAULT_UPTIME_SCORE: f64 = 0.5;
83const BOOTSTRAP_FAILURE_PENALTY_HOURS: i64 = 1;
85
86const DEFAULT_NEUTRAL_TRUST: f64 = 0.5;
88
89const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct NodeConfig {
95 pub peer_id: Option<PeerId>,
97
98 pub listen_addrs: Vec<std::net::SocketAddr>,
100
101 pub listen_addr: std::net::SocketAddr,
103
104 pub bootstrap_peers: Vec<std::net::SocketAddr>,
106
107 pub bootstrap_peers_str: Vec<String>,
109
110 pub enable_ipv6: bool,
112
113 pub connection_timeout: Duration,
116
117 pub keep_alive_interval: Duration,
119
120 pub max_connections: usize,
122
123 pub max_incoming_connections: usize,
125
126 pub dht_config: DHTConfig,
128
129 pub security_config: SecurityConfig,
131
132 pub production_config: Option<ProductionConfig>,
134
135 pub bootstrap_cache_config: Option<crate::bootstrap::CacheConfig>,
137
138 pub diversity_config: Option<crate::security::IPDiversityConfig>,
143
144 #[serde(default = "default_stale_peer_threshold")]
147 pub stale_peer_threshold: Duration,
148
149 #[serde(default)]
153 pub max_message_size: Option<usize>,
154}
155
156fn default_stale_peer_threshold() -> Duration {
158 Duration::from_secs(60)
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct DHTConfig {
164 pub k_value: usize,
166
167 pub alpha_value: usize,
169
170 pub record_ttl: Duration,
172
173 pub refresh_interval: Duration,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct SecurityConfig {
180 pub enable_noise: bool,
182
183 pub enable_tls: bool,
185
186 pub trust_level: TrustLevel,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
192pub enum TrustLevel {
193 None,
195 Basic,
197 Full,
199}
200
201#[inline]
209fn build_listen_addrs(port: u16, ipv6_enabled: bool) -> Vec<std::net::SocketAddr> {
210 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
211
212 if ipv6_enabled {
213 addrs.push(std::net::SocketAddr::new(
214 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
215 port,
216 ));
217 }
218
219 addrs.push(std::net::SocketAddr::new(
220 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
221 port,
222 ));
223
224 addrs
225}
226
227impl NodeConfig {
228 pub fn new() -> Result<Self> {
234 let config = Config::default();
235 let listen_addr = config.listen_socket_addr()?;
236
237 Ok(Self {
238 peer_id: None,
239 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
240 listen_addr,
241 bootstrap_peers: Vec::new(),
242 bootstrap_peers_str: config.network.bootstrap_nodes.clone(),
243 enable_ipv6: config.network.ipv6_enabled,
244 connection_timeout: Duration::from_secs(config.network.connection_timeout),
245 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
246 max_connections: config.network.max_connections,
247 max_incoming_connections: config.security.connection_limit as usize,
248 dht_config: DHTConfig::default(),
249 security_config: SecurityConfig::default(),
250 production_config: None,
251 bootstrap_cache_config: None,
252 diversity_config: None,
253 stale_peer_threshold: default_stale_peer_threshold(),
254 max_message_size: None,
255 })
256 }
257
258 pub fn builder() -> NodeConfigBuilder {
260 NodeConfigBuilder::default()
261 }
262}
263
264#[derive(Debug, Clone, Default)]
270pub struct NodeConfigBuilder {
271 peer_id: Option<PeerId>,
272 listen_port: Option<u16>,
273 enable_ipv6: Option<bool>,
274 bootstrap_peers: Vec<std::net::SocketAddr>,
275 max_connections: Option<usize>,
276 connection_timeout: Option<Duration>,
277 keep_alive_interval: Option<Duration>,
278 dht_config: Option<DHTConfig>,
279 security_config: Option<SecurityConfig>,
280 production_config: Option<ProductionConfig>,
281}
282
283impl NodeConfigBuilder {
284 pub fn peer_id(mut self, peer_id: PeerId) -> Self {
286 self.peer_id = Some(peer_id);
287 self
288 }
289
290 pub fn listen_port(mut self, port: u16) -> Self {
292 self.listen_port = Some(port);
293 self
294 }
295
296 pub fn ipv6(mut self, enabled: bool) -> Self {
298 self.enable_ipv6 = Some(enabled);
299 self
300 }
301
302 pub fn bootstrap_peer(mut self, addr: std::net::SocketAddr) -> Self {
304 self.bootstrap_peers.push(addr);
305 self
306 }
307
308 pub fn max_connections(mut self, max: usize) -> Self {
310 self.max_connections = Some(max);
311 self
312 }
313
314 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
316 self.connection_timeout = Some(timeout);
317 self
318 }
319
320 pub fn keep_alive_interval(mut self, interval: Duration) -> Self {
322 self.keep_alive_interval = Some(interval);
323 self
324 }
325
326 pub fn dht_config(mut self, config: DHTConfig) -> Self {
328 self.dht_config = Some(config);
329 self
330 }
331
332 pub fn security_config(mut self, config: SecurityConfig) -> Self {
334 self.security_config = Some(config);
335 self
336 }
337
338 pub fn production_config(mut self, config: ProductionConfig) -> Self {
340 self.production_config = Some(config);
341 self
342 }
343
344 pub fn build(self) -> Result<NodeConfig> {
350 let base_config = Config::default();
351 let default_port = base_config
352 .listen_socket_addr()
353 .map(|addr| addr.port())
354 .unwrap_or(DEFAULT_LISTEN_PORT);
355 let port = self.listen_port.unwrap_or(default_port);
356 let ipv6_enabled = self.enable_ipv6.unwrap_or(base_config.network.ipv6_enabled);
357
358 let listen_addr =
359 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), port);
360
361 Ok(NodeConfig {
362 peer_id: self.peer_id,
363 listen_addrs: build_listen_addrs(port, ipv6_enabled),
364 listen_addr,
365 bootstrap_peers: self.bootstrap_peers.clone(),
366 bootstrap_peers_str: self.bootstrap_peers.iter().map(|a| a.to_string()).collect(),
367 enable_ipv6: ipv6_enabled,
368 connection_timeout: self
369 .connection_timeout
370 .unwrap_or(Duration::from_secs(base_config.network.connection_timeout)),
371 keep_alive_interval: self
372 .keep_alive_interval
373 .unwrap_or(Duration::from_secs(base_config.network.keepalive_interval)),
374 max_connections: self
375 .max_connections
376 .unwrap_or(base_config.network.max_connections),
377 max_incoming_connections: base_config.security.connection_limit as usize,
378 dht_config: self.dht_config.unwrap_or_default(),
379 security_config: self.security_config.unwrap_or_default(),
380 production_config: self.production_config,
381 bootstrap_cache_config: None,
382 diversity_config: None,
383 stale_peer_threshold: default_stale_peer_threshold(),
384 max_message_size: None,
385 })
386 }
387}
388
389impl Default for NodeConfig {
390 fn default() -> Self {
391 let config = Config::default();
392 let listen_addr = config.listen_socket_addr().unwrap_or_else(|_| {
393 std::net::SocketAddr::new(
394 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
395 DEFAULT_LISTEN_PORT,
396 )
397 });
398
399 Self {
400 peer_id: None,
401 listen_addrs: build_listen_addrs(listen_addr.port(), config.network.ipv6_enabled),
402 listen_addr,
403 bootstrap_peers: Vec::new(),
404 bootstrap_peers_str: Vec::new(),
405 enable_ipv6: config.network.ipv6_enabled,
406 connection_timeout: Duration::from_secs(config.network.connection_timeout),
407 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
408 max_connections: config.network.max_connections,
409 max_incoming_connections: config.security.connection_limit as usize,
410 dht_config: DHTConfig::default(),
411 security_config: SecurityConfig::default(),
412 production_config: None,
413 bootstrap_cache_config: None,
414 diversity_config: None,
415 stale_peer_threshold: default_stale_peer_threshold(),
416 max_message_size: None,
417 }
418 }
419}
420
421impl NodeConfig {
422 pub fn from_config(config: &Config) -> Result<Self> {
424 let listen_addr = config.listen_socket_addr()?;
425 let bootstrap_addrs = config.bootstrap_addrs()?;
426
427 let mut node_config = Self {
428 peer_id: None,
429 listen_addrs: vec![listen_addr],
430 listen_addr,
431 bootstrap_peers: bootstrap_addrs
432 .iter()
433 .map(|addr| addr.socket_addr())
434 .collect(),
435 bootstrap_peers_str: config
436 .network
437 .bootstrap_nodes
438 .iter()
439 .map(|addr| addr.to_string())
440 .collect(),
441 enable_ipv6: config.network.ipv6_enabled,
442
443 connection_timeout: Duration::from_secs(config.network.connection_timeout),
444 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
445 max_connections: config.network.max_connections,
446 max_incoming_connections: config.security.connection_limit as usize,
447 dht_config: DHTConfig {
448 k_value: 20,
449 alpha_value: 3,
450 record_ttl: Duration::from_secs(3600),
451 refresh_interval: Duration::from_secs(900),
452 },
453 security_config: SecurityConfig {
454 enable_noise: true,
455 enable_tls: true,
456 trust_level: TrustLevel::Basic,
457 },
458 production_config: Some(ProductionConfig {
459 max_connections: config.network.max_connections,
460 max_memory_bytes: 0, max_bandwidth_bps: 0, connection_timeout: Duration::from_secs(config.network.connection_timeout),
463 keep_alive_interval: Duration::from_secs(config.network.keepalive_interval),
464 health_check_interval: Duration::from_secs(30),
465 metrics_interval: Duration::from_secs(60),
466 enable_performance_tracking: true,
467 enable_auto_cleanup: true,
468 shutdown_timeout: Duration::from_secs(30),
469 rate_limits: crate::production::RateLimitConfig::default(),
470 }),
471 bootstrap_cache_config: None,
472 diversity_config: None,
473 stale_peer_threshold: default_stale_peer_threshold(),
474 max_message_size: None,
475 };
476
477 if config.network.ipv6_enabled {
479 node_config.listen_addrs.push(std::net::SocketAddr::new(
480 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
481 listen_addr.port(),
482 ));
483 }
484
485 Ok(node_config)
486 }
487
488 pub fn with_listen_addr(addr: &str) -> Result<Self> {
490 let listen_addr: std::net::SocketAddr = addr
491 .parse()
492 .map_err(|e: std::net::AddrParseError| {
493 NetworkError::InvalidAddress(e.to_string().into())
494 })
495 .map_err(P2PError::Network)?;
496 let cfg = NodeConfig {
497 listen_addr,
498 listen_addrs: vec![listen_addr],
499 diversity_config: None,
500 ..Default::default()
501 };
502 Ok(cfg)
503 }
504}
505
506impl DHTConfig {
507 const DEFAULT_K_VALUE: usize = 20;
508 const DEFAULT_ALPHA_VALUE: usize = 5;
509 const DEFAULT_RECORD_TTL_SECS: u64 = 3600;
510 const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
511}
512
513impl Default for DHTConfig {
514 fn default() -> Self {
515 Self {
516 k_value: Self::DEFAULT_K_VALUE,
517 alpha_value: Self::DEFAULT_ALPHA_VALUE,
518 record_ttl: Duration::from_secs(Self::DEFAULT_RECORD_TTL_SECS),
519 refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
520 }
521 }
522}
523
524impl Default for SecurityConfig {
525 fn default() -> Self {
526 Self {
527 enable_noise: true,
528 enable_tls: true,
529 trust_level: TrustLevel::Basic,
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
536pub struct PeerInfo {
537 pub peer_id: PeerId,
539
540 pub addresses: Vec<String>,
542
543 pub connected_at: Instant,
545
546 pub last_seen: Instant,
548
549 pub status: ConnectionStatus,
551
552 pub protocols: Vec<String>,
554
555 pub heartbeat_count: u64,
557}
558
559#[derive(Debug, Clone, PartialEq)]
561pub enum ConnectionStatus {
562 Connecting,
564 Connected,
566 Disconnecting,
568 Disconnected,
570 Failed(String),
572}
573
574#[derive(Debug, Clone)]
576pub enum NetworkEvent {
577 PeerConnected {
579 peer_id: PeerId,
581 addresses: Vec<String>,
583 },
584
585 PeerDisconnected {
587 peer_id: PeerId,
589 reason: String,
591 },
592
593 MessageReceived {
595 peer_id: PeerId,
597 protocol: String,
599 data: Vec<u8>,
601 },
602
603 ConnectionFailed {
605 peer_id: Option<PeerId>,
607 address: String,
609 error: String,
611 },
612
613 DHTRecordStored {
615 key: Vec<u8>,
617 value: Vec<u8>,
619 },
620
621 DHTRecordRetrieved {
623 key: Vec<u8>,
625 value: Option<Vec<u8>>,
627 },
628}
629
630#[derive(Debug, Clone)]
635pub enum P2PEvent {
636 Message {
638 topic: String,
640 source: PeerId,
642 data: Vec<u8>,
644 },
645 PeerConnected(PeerId),
647 PeerDisconnected(PeerId),
649}
650
651#[derive(Debug, Clone)]
656pub struct PeerResponse {
657 pub peer_id: PeerId,
659 pub data: Vec<u8>,
661 pub latency: Duration,
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize)]
670pub(crate) struct RequestResponseEnvelope {
671 pub(crate) message_id: String,
673 pub(crate) is_response: bool,
675 pub(crate) payload: Vec<u8>,
677}
678
679pub(crate) struct PendingRequest {
681 pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
683 pub(crate) expected_peer: String,
685}
686
687pub struct P2PNode {
698 config: NodeConfig,
700
701 peer_id: PeerId,
703
704 transport: Arc<crate::transport_handle::TransportHandle>,
706
707 start_time: Instant,
709
710 shutdown: CancellationToken,
712
713 dht_manager: Arc<DhtNetworkManager>,
715
716 resource_manager: Option<Arc<ResourceManager>>,
718
719 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
721
722 pub security_dashboard: Option<Arc<crate::dht::metrics::SecurityDashboard>>,
724
725 is_bootstrapped: Arc<AtomicBool>,
727
728 is_started: Arc<AtomicBool>,
730
731 trust_engine: Option<Arc<EigenTrustEngine>>,
737}
738
739pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
755 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
756
757 if addr.ip().is_unspecified() {
758 let loopback_ip = match addr {
760 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
763 std::net::SocketAddr::new(loopback_ip, addr.port())
764 } else {
765 addr
767 }
768}
769
770impl P2PNode {
771 pub async fn new(config: NodeConfig) -> Result<Self> {
773 let peer_id = config.peer_id.clone().unwrap_or_else(|| {
774 let uuid_str = uuid::Uuid::new_v4().to_string();
777 format!("peer_{}", &uuid_str[..8])
778 });
779
780 {
783 let nid = crate::dht::derive_dht_key_from_peer_id(&peer_id);
784 let _twdht = std::sync::Arc::new(crate::dht::TrustWeightedKademlia::new(
785 crate::identity::node_identity::NodeId::from_bytes(nid),
786 ));
787 }
790
791 let resource_manager = config
793 .production_config
794 .clone()
795 .map(|prod_config| Arc::new(ResourceManager::new(prod_config)));
796
797 let diversity_config = config.diversity_config.clone().unwrap_or_default();
799 let bootstrap_manager = if let Some(ref cache_config) = config.bootstrap_cache_config {
800 match BootstrapManager::with_full_config(
801 cache_config.clone(),
802 crate::rate_limit::JoinRateLimiterConfig::default(),
803 diversity_config.clone(),
804 )
805 .await
806 {
807 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
808 Err(e) => {
809 warn!(
810 "Failed to initialize bootstrap manager: {}, continuing without cache",
811 e
812 );
813 None
814 }
815 }
816 } else {
817 match BootstrapManager::with_full_config(
818 crate::bootstrap::CacheConfig::default(),
819 crate::rate_limit::JoinRateLimiterConfig::default(),
820 diversity_config,
821 )
822 .await
823 {
824 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
825 Err(e) => {
826 warn!(
827 "Failed to initialize bootstrap manager: {}, continuing without cache",
828 e
829 );
830 None
831 }
832 }
833 };
834
835 let trust_engine = {
837 let mut pre_trusted = HashSet::new();
838 for bootstrap_peer in &config.bootstrap_peers_str {
839 let node_id_bytes = crate::dht::derive_dht_key_from_peer_id(bootstrap_peer);
840 pre_trusted.insert(NodeId::from_bytes(node_id_bytes));
841 }
842
843 let engine = Arc::new(EigenTrustEngine::new(pre_trusted));
844 engine.clone().start_background_updates();
845 Some(engine)
846 };
847
848 let transport_config = crate::transport_handle::TransportConfig {
850 peer_id: peer_id.clone(),
851 listen_addr: config.listen_addr,
852 enable_ipv6: config.enable_ipv6,
853 connection_timeout: config.connection_timeout,
854 stale_peer_threshold: config.stale_peer_threshold,
855 max_connections: config.max_connections,
856 production_config: config.production_config.clone(),
857 event_channel_capacity: crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
858 max_message_size: config.max_message_size,
859 };
860 let transport =
861 Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
862
863 let manager_dht_config = crate::dht::DHTConfig {
865 replication_factor: config.dht_config.k_value,
866 bucket_size: config.dht_config.k_value,
867 alpha: config.dht_config.alpha_value,
868 record_ttl: config.dht_config.record_ttl,
869 bucket_refresh_interval: config.dht_config.refresh_interval,
870 republish_interval: config.dht_config.refresh_interval,
871 max_distance: DHT_MAX_DISTANCE,
872 };
873 let dht_manager_config = DhtNetworkConfig {
874 local_peer_id: peer_id.clone(),
875 dht_config: manager_dht_config,
876 node_config: config.clone(),
877 request_timeout: config.connection_timeout,
878 max_concurrent_operations: MAX_ACTIVE_REQUESTS,
879 replication_factor: config.dht_config.k_value,
880 enable_security: true,
881 };
882 let dht_manager = Arc::new(
883 DhtNetworkManager::new(transport.clone(), trust_engine.clone(), dht_manager_config)
884 .await?,
885 );
886
887 let security_metrics = dht_manager.security_metrics().await;
888 let security_dashboard = Some(Arc::new(crate::dht::metrics::SecurityDashboard::new(
889 security_metrics,
890 Arc::new(crate::dht::metrics::DhtMetricsCollector::new()),
891 Arc::new(crate::dht::metrics::TrustMetricsCollector::new()),
892 Arc::new(crate::dht::metrics::PlacementMetricsCollector::new()),
893 )));
894
895 let node = Self {
896 config,
897 peer_id,
898 transport,
899 start_time: Instant::now(),
900 shutdown: CancellationToken::new(),
901 dht_manager,
902 resource_manager,
903 bootstrap_manager,
904 security_dashboard,
905 is_bootstrapped: Arc::new(AtomicBool::new(false)),
906 is_started: Arc::new(AtomicBool::new(false)),
907 trust_engine,
908 };
909 info!(
910 "Created P2P node with peer ID: {} (call start() to begin networking)",
911 node.peer_id
912 );
913
914 Ok(node)
915 }
916
917 pub fn builder() -> NodeBuilder {
919 NodeBuilder::new()
920 }
921
922 pub fn peer_id(&self) -> &PeerId {
924 &self.peer_id
925 }
926
927 pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
929 &self.transport
930 }
931
932 pub fn transport_peer_id(&self) -> Option<String> {
934 self.transport.transport_peer_id()
935 }
936
937 pub fn local_addr(&self) -> Option<String> {
938 self.transport.local_addr()
939 }
940
941 pub fn is_bootstrapped(&self) -> bool {
946 self.is_bootstrapped.load(Ordering::SeqCst)
947 }
948
949 pub async fn re_bootstrap(&self) -> Result<()> {
954 self.is_bootstrapped.store(false, Ordering::SeqCst);
955 self.connect_bootstrap_peers().await
956 }
957
958 pub fn trust_engine(&self) -> Option<Arc<EigenTrustEngine>> {
981 self.trust_engine.clone()
982 }
983
984 fn peer_id_to_trust_node_id(peer_id: &str) -> AdaptiveNodeId {
988 crate::network::peer_id_to_trust_node_id(peer_id)
989 }
990
991 pub async fn report_peer_success(&self, peer_id: &str) -> Result<()> {
1010 if let Some(ref engine) = self.trust_engine {
1011 let node_id = Self::peer_id_to_trust_node_id(peer_id);
1012
1013 engine
1014 .update_node_stats(&node_id, NodeStatisticsUpdate::CorrectResponse)
1015 .await;
1016 Ok(())
1017 } else {
1018 Ok(())
1020 }
1021 }
1022
1023 pub async fn report_peer_failure(&self, peer_id: &str) -> Result<()> {
1043 self.report_peer_failure_with_reason(peer_id, PeerFailureReason::ConnectionFailed)
1045 .await
1046 }
1047
1048 pub async fn report_peer_failure_with_reason(
1076 &self,
1077 peer_id: &str,
1078 reason: PeerFailureReason,
1079 ) -> Result<()> {
1080 if let Some(ref engine) = self.trust_engine {
1081 let node_id = Self::peer_id_to_trust_node_id(peer_id);
1082
1083 let update = match reason {
1084 PeerFailureReason::Timeout | PeerFailureReason::ConnectionFailed => {
1085 NodeStatisticsUpdate::FailedResponse
1086 }
1087 PeerFailureReason::DataUnavailable => NodeStatisticsUpdate::DataUnavailable,
1088 PeerFailureReason::CorruptedData => NodeStatisticsUpdate::CorruptedData,
1089 PeerFailureReason::ProtocolError => NodeStatisticsUpdate::ProtocolViolation,
1090 PeerFailureReason::Refused => NodeStatisticsUpdate::FailedResponse,
1091 };
1092
1093 engine.update_node_stats(&node_id, update).await;
1094 Ok(())
1095 } else {
1096 Ok(())
1098 }
1099 }
1100
1101 pub fn peer_trust(&self, peer_id: &str) -> f64 {
1120 if let Some(ref engine) = self.trust_engine {
1121 let node_id = Self::peer_id_to_trust_node_id(peer_id);
1122
1123 engine.get_trust(&node_id)
1124 } else {
1125 DEFAULT_NEUTRAL_TRUST
1127 }
1128 }
1129
1130 pub async fn send_request(
1163 &self,
1164 peer_id: &PeerId,
1165 protocol: &str,
1166 data: Vec<u8>,
1167 timeout: Duration,
1168 ) -> Result<PeerResponse> {
1169 match self
1170 .transport
1171 .send_request(peer_id, protocol, data, timeout)
1172 .await
1173 {
1174 Ok(resp) => {
1175 let _ = self.report_peer_success(peer_id).await;
1176 Ok(resp)
1177 }
1178 Err(e) => {
1179 let reason = if matches!(&e, P2PError::Timeout(_)) {
1181 PeerFailureReason::Timeout
1182 } else {
1183 PeerFailureReason::ConnectionFailed
1184 };
1185 let _ = self.report_peer_failure_with_reason(peer_id, reason).await;
1186 Err(e)
1187 }
1188 }
1189 }
1190
1191 pub async fn send_response(
1192 &self,
1193 peer_id: &PeerId,
1194 protocol: &str,
1195 message_id: &str,
1196 data: Vec<u8>,
1197 ) -> Result<()> {
1198 self.transport
1199 .send_response(peer_id, protocol, message_id, data)
1200 .await
1201 }
1202
1203 pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1204 crate::transport_handle::TransportHandle::parse_request_envelope(data)
1205 }
1206
1207 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1208 self.transport.subscribe(topic).await
1209 }
1210
1211 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1212 self.transport.publish(topic, data).await
1213 }
1214
1215 pub fn config(&self) -> &NodeConfig {
1217 &self.config
1218 }
1219
1220 pub async fn start(&self) -> Result<()> {
1222 info!("Starting P2P node...");
1223
1224 if let Some(ref resource_manager) = self.resource_manager {
1226 resource_manager
1227 .start()
1228 .await
1229 .map_err(|e| protocol_error(format!("Failed to start resource manager: {e}")))?;
1230 info!("Production resource manager started");
1231 }
1232
1233 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1235 let mut manager = bootstrap_manager.write().await;
1236 manager
1237 .start_background_tasks()
1238 .await
1239 .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
1240 info!("Bootstrap cache manager started");
1241 }
1242
1243 self.transport.start_network_listeners().await?;
1245
1246 Arc::clone(&self.dht_manager).start().await?;
1248
1249 let listen_addrs = self.transport.listen_addrs().await;
1251 info!("P2P node started on addresses: {:?}", listen_addrs);
1252
1253 self.connect_bootstrap_peers().await?;
1259
1260 self.is_started
1261 .store(true, std::sync::atomic::Ordering::Release);
1262
1263 Ok(())
1264 }
1265
1266 pub async fn run(&self) -> Result<()> {
1271 if !self.is_running() {
1272 self.start().await?;
1273 }
1274
1275 info!("P2P node running...");
1276
1277 let mut interval = tokio::time::interval(Duration::from_millis(RUN_LOOP_TICK_INTERVAL_MS));
1278 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1279
1280 loop {
1282 tokio::select! {
1283 _ = interval.tick() => {
1284 self.transport.maintenance_tick().await?;
1285 }
1286 () = self.shutdown.cancelled() => {
1287 break;
1288 }
1289 }
1290 }
1291
1292 info!("P2P node stopped");
1293 Ok(())
1294 }
1295
1296 pub async fn stop(&self) -> Result<()> {
1298 info!("Stopping P2P node...");
1299
1300 self.shutdown.cancel();
1302
1303 self.dht_manager.stop().await?;
1305
1306 self.transport.stop().await?;
1308
1309 if let Some(ref resource_manager) = self.resource_manager {
1311 resource_manager
1312 .shutdown()
1313 .await
1314 .map_err(|e| protocol_error(format!("Failed to shutdown resource manager: {e}")))?;
1315 info!("Production resource manager stopped");
1316 }
1317
1318 self.is_started
1319 .store(false, std::sync::atomic::Ordering::Release);
1320
1321 info!("P2P node stopped");
1322 Ok(())
1323 }
1324
1325 pub async fn shutdown(&self) -> Result<()> {
1327 self.stop().await
1328 }
1329
1330 pub fn is_running(&self) -> bool {
1332 self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1333 }
1334
1335 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
1337 self.transport.listen_addrs().await
1338 }
1339
1340 pub async fn connected_peers(&self) -> Vec<PeerId> {
1342 self.transport.connected_peers().await
1343 }
1344
1345 pub async fn peer_count(&self) -> usize {
1347 self.transport.peer_count().await
1348 }
1349
1350 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1352 self.transport.peer_info(peer_id).await
1353 }
1354
1355 pub async fn get_peer_id_by_address(&self, addr: &str) -> Option<PeerId> {
1357 self.transport.get_peer_id_by_address(addr).await
1358 }
1359
1360 pub async fn list_active_connections(&self) -> Vec<(PeerId, Vec<String>)> {
1362 self.transport.list_active_connections().await
1363 }
1364
1365 pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
1367 self.transport.remove_peer(peer_id).await
1368 }
1369
1370 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1372 self.transport.is_peer_connected(peer_id).await
1373 }
1374
1375 pub async fn connect_peer(&self, address: &str) -> Result<PeerId> {
1377 self.transport.connect_peer(address).await
1378 }
1379
1380 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1382 self.transport.disconnect_peer(peer_id).await
1383 }
1384
1385 pub async fn is_connection_active(&self, peer_id: &str) -> bool {
1387 self.transport.is_connection_active(peer_id).await
1388 }
1389
1390 pub async fn send_message(
1392 &self,
1393 peer_id: &PeerId,
1394 protocol: &str,
1395 data: Vec<u8>,
1396 ) -> Result<()> {
1397 self.transport.send_message(peer_id, protocol, data).await
1398 }
1399}
1400
1401const MAX_MESSAGE_AGE_SECS: u64 = 300;
1416const MAX_FUTURE_SECS: u64 = 30;
1418
1419pub fn peer_id_to_trust_node_id(peer_id: &str) -> AdaptiveNodeId {
1425 if let Ok(bytes) = hex::decode(peer_id)
1426 && bytes.len() == 32
1427 {
1428 let mut arr = [0u8; 32];
1429 arr.copy_from_slice(&bytes);
1430 return AdaptiveNodeId::from_bytes(arr);
1431 }
1432 let hash = crate::dht::derive_dht_key_from_peer_id(peer_id);
1434 AdaptiveNodeId::from_bytes(hash)
1435}
1436
1437fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1439 P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1440}
1441
1442pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1444 if let Err(e) = tx.send(event) {
1445 tracing::trace!("Event broadcast has no receivers: {e}");
1446 }
1447}
1448
1449pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<P2PEvent> {
1450 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1451
1452 let now = std::time::SystemTime::now()
1454 .duration_since(std::time::UNIX_EPOCH)
1455 .map(|d| d.as_secs())
1456 .unwrap_or(0);
1457
1458 if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1460 tracing::warn!(
1461 "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1462 source,
1463 message.timestamp,
1464 now.saturating_sub(message.timestamp)
1465 );
1466 return None;
1467 }
1468
1469 if message.timestamp > now + MAX_FUTURE_SECS {
1471 tracing::warn!(
1472 "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1473 source,
1474 message.timestamp,
1475 message.timestamp.saturating_sub(now)
1476 );
1477 return None;
1478 }
1479
1480 debug!(
1481 "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}",
1482 message.protocol,
1483 source,
1484 message.from,
1485 message.data.len()
1486 );
1487
1488 Some(P2PEvent::Message {
1489 topic: message.protocol,
1490 source: source.to_string(),
1491 data: message.data,
1492 })
1493}
1494
1495impl P2PNode {
1496 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1498 self.transport.subscribe_events()
1499 }
1500
1501 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1503 self.subscribe_events()
1504 }
1505
1506 pub fn uptime(&self) -> Duration {
1508 self.start_time.elapsed()
1509 }
1510
1511 pub async fn resource_metrics(&self) -> Result<ResourceMetrics> {
1521 if let Some(ref resource_manager) = self.resource_manager {
1522 Ok(resource_manager.get_metrics().await)
1523 } else {
1524 Err(protocol_error("Production resource manager not enabled"))
1525 }
1526 }
1527
1528 pub async fn health_check(&self) -> Result<()> {
1533 if let Some(ref resource_manager) = self.resource_manager {
1534 resource_manager.health_check().await
1535 } else {
1536 let peer_count = self.peer_count().await;
1538 if peer_count > self.config.max_connections {
1539 Err(protocol_error(format!(
1540 "Too many connections: {peer_count}"
1541 )))
1542 } else {
1543 Ok(())
1544 }
1545 }
1546 }
1547
1548 pub fn production_config(&self) -> Option<&ProductionConfig> {
1550 self.config.production_config.as_ref()
1551 }
1552
1553 pub fn is_production_mode(&self) -> bool {
1555 self.resource_manager.is_some()
1556 }
1557
1558 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1560 &self.dht_manager
1561 }
1562
1563 pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1565 self.dht_manager()
1566 }
1567
1568 pub async fn dht_put(&self, key: crate::dht::Key, value: Vec<u8>) -> Result<()> {
1573 self.dht_manager.store_local(key, value).await
1574 }
1575
1576 pub async fn dht_get(&self, key: crate::dht::Key) -> Result<Option<Vec<u8>>> {
1581 self.dht_manager.get_local(&key).await
1582 }
1583
1584 pub async fn add_discovered_peer(&self, peer_id: PeerId, addresses: Vec<String>) -> Result<()> {
1586 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1587 let manager = bootstrap_manager.write().await;
1588 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1589 .iter()
1590 .filter_map(|addr| addr.parse().ok())
1591 .collect();
1592 let contact = ContactEntry::new(peer_id, socket_addresses);
1593 manager.add_contact(contact).await.map_err(|e| {
1594 protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1595 })?;
1596 }
1597 Ok(())
1598 }
1599
1600 pub async fn update_peer_metrics(
1602 &self,
1603 peer_id: &PeerId,
1604 success: bool,
1605 latency_ms: Option<u64>,
1606 _error: Option<String>,
1607 ) -> Result<()> {
1608 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1609 let manager = bootstrap_manager.write().await;
1610
1611 let metrics = QualityMetrics {
1613 success_rate: if success { 1.0 } else { 0.0 },
1614 avg_latency_ms: latency_ms.unwrap_or(0) as f64,
1615 quality_score: if success {
1616 BOOTSTRAP_QUALITY_SCORE_SUCCESS
1617 } else {
1618 BOOTSTRAP_QUALITY_SCORE_FAILURE
1619 },
1620 last_connection_attempt: chrono::Utc::now(),
1621 last_successful_connection: if success {
1622 chrono::Utc::now()
1623 } else {
1624 chrono::Utc::now() - chrono::Duration::hours(BOOTSTRAP_FAILURE_PENALTY_HOURS)
1625 },
1626 uptime_score: BOOTSTRAP_DEFAULT_UPTIME_SCORE,
1627 };
1628
1629 manager
1630 .update_contact_metrics(peer_id, metrics)
1631 .await
1632 .map_err(|e| protocol_error(format!("Failed to update peer metrics: {e}")))?;
1633 }
1634 Ok(())
1635 }
1636
1637 pub async fn get_bootstrap_cache_stats(&self) -> Result<Option<crate::bootstrap::CacheStats>> {
1639 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1640 let manager = bootstrap_manager.read().await;
1641 let stats = manager
1642 .get_stats()
1643 .await
1644 .map_err(|e| protocol_error(format!("Failed to get bootstrap stats: {e}")))?;
1645 Ok(Some(stats))
1646 } else {
1647 Ok(None)
1648 }
1649 }
1650
1651 pub async fn cached_peer_count(&self) -> usize {
1653 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1654 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1655 {
1656 return stats.total_contacts;
1657 }
1658 0
1659 }
1660
1661 async fn connect_bootstrap_peers(&self) -> Result<()> {
1663 let mut bootstrap_contacts = Vec::new();
1664 let mut used_cache = false;
1665 let mut seen_addresses = std::collections::HashSet::new();
1666
1667 let cli_bootstrap_peers = if !self.config.bootstrap_peers_str.is_empty() {
1669 self.config.bootstrap_peers_str.clone()
1670 } else {
1671 self.config
1673 .bootstrap_peers
1674 .iter()
1675 .map(|addr| addr.to_string())
1676 .collect::<Vec<_>>()
1677 };
1678
1679 if !cli_bootstrap_peers.is_empty() {
1680 info!(
1681 "Using {} CLI-provided bootstrap peers (priority)",
1682 cli_bootstrap_peers.len()
1683 );
1684 for addr in &cli_bootstrap_peers {
1685 if let Ok(socket_addr) = addr.parse::<std::net::SocketAddr>() {
1686 seen_addresses.insert(socket_addr);
1687 let contact = ContactEntry::new(
1688 format!("cli_peer_{}", addr.chars().take(8).collect::<String>()),
1689 vec![socket_addr],
1690 );
1691 bootstrap_contacts.push(contact);
1692 } else {
1693 warn!("Invalid bootstrap address format: {}", addr);
1694 }
1695 }
1696 }
1697
1698 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1701 let manager = bootstrap_manager.read().await;
1702 match manager
1703 .get_quic_bootstrap_peers(BOOTSTRAP_PEER_BATCH_SIZE)
1704 .await
1705 {
1706 Ok(contacts) => {
1708 if !contacts.is_empty() {
1709 let mut added_from_cache = 0;
1710 for contact in contacts {
1711 let new_addresses: Vec<_> = contact
1713 .addresses
1714 .iter()
1715 .filter(|addr| !seen_addresses.contains(addr))
1716 .copied()
1717 .collect();
1718
1719 if !new_addresses.is_empty() {
1720 for addr in &new_addresses {
1721 seen_addresses.insert(*addr);
1722 }
1723 let mut contact = contact.clone();
1724 contact.addresses = new_addresses;
1725 bootstrap_contacts.push(contact);
1726 added_from_cache += 1;
1727 }
1728 }
1729 if added_from_cache > 0 {
1730 info!(
1731 "Added {} cached bootstrap peers (supplementing CLI peers)",
1732 added_from_cache
1733 );
1734 used_cache = true;
1735 }
1736 }
1737 }
1738 Err(e) => {
1739 warn!("Failed to get cached bootstrap peers: {}", e);
1740 }
1741 }
1742 }
1743
1744 if bootstrap_contacts.is_empty() {
1745 info!("No bootstrap peers configured and no cached peers available");
1746 return Ok(());
1747 }
1748
1749 let mut successful_connections = 0;
1751 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
1752
1753 for contact in bootstrap_contacts.iter() {
1754 for addr in &contact.addresses {
1755 match self.connect_peer(&addr.to_string()).await {
1756 Ok(peer_id) => {
1757 successful_connections += 1;
1758 connected_peer_ids.push(peer_id.clone());
1759
1760 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1762 let manager = bootstrap_manager.write().await;
1763 let mut updated_contact = contact.clone();
1764 updated_contact.peer_id = peer_id.clone();
1765 updated_contact.update_connection_result(true, Some(100), None); if let Err(e) = manager.add_contact(updated_contact).await {
1768 warn!("Failed to update bootstrap cache: {}", e);
1769 }
1770 }
1771 break; }
1773 Err(e) => {
1774 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1775
1776 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1778 let manager = bootstrap_manager.write().await;
1779 let mut updated_contact = contact.clone();
1780 updated_contact.update_connection_result(
1781 false,
1782 None,
1783 Some(e.to_string()),
1784 );
1785
1786 if let Err(e) = manager.add_contact(updated_contact).await {
1787 warn!("Failed to update bootstrap cache: {}", e);
1788 }
1789 }
1790 }
1791 }
1792 }
1793 }
1794
1795 if successful_connections == 0 {
1796 if !used_cache {
1797 warn!("Failed to connect to any bootstrap peers");
1798 }
1799 return Ok(());
1802 }
1803
1804 info!(
1805 "Successfully connected to {} bootstrap peers",
1806 successful_connections
1807 );
1808
1809 match self
1812 .dht_manager
1813 .bootstrap_from_peers(&connected_peer_ids)
1814 .await
1815 {
1816 Ok(count) => info!("DHT peer discovery found {} peers", count),
1817 Err(e) => warn!("DHT peer discovery failed: {}", e),
1818 }
1819
1820 self.is_bootstrapped.store(true, Ordering::SeqCst);
1823 info!(
1824 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
1825 successful_connections,
1826 connected_peer_ids.len()
1827 );
1828
1829 Ok(())
1830 }
1831
1832 }
1834
1835#[async_trait::async_trait]
1837pub trait NetworkSender: Send + Sync {
1838 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1840
1841 fn local_peer_id(&self) -> &PeerId;
1843}
1844
1845pub struct NodeBuilder {
1849 config: NodeConfig,
1850}
1851
1852impl Default for NodeBuilder {
1853 fn default() -> Self {
1854 Self::new()
1855 }
1856}
1857
1858impl NodeBuilder {
1859 pub fn new() -> Self {
1861 Self {
1862 config: NodeConfig::default(),
1863 }
1864 }
1865
1866 pub fn with_peer_id(mut self, peer_id: PeerId) -> Self {
1868 self.config.peer_id = Some(peer_id);
1869 self
1870 }
1871
1872 pub fn listen_on(mut self, addr: &str) -> Self {
1874 if let Ok(multiaddr) = addr.parse() {
1875 self.config.listen_addrs.push(multiaddr);
1876 }
1877 self
1878 }
1879
1880 pub fn with_bootstrap_peer(mut self, addr: &str) -> Self {
1882 if let Ok(multiaddr) = addr.parse() {
1883 self.config.bootstrap_peers.push(multiaddr);
1884 }
1885 self.config.bootstrap_peers_str.push(addr.to_string());
1886 self
1887 }
1888
1889 pub fn with_ipv6(mut self, enable: bool) -> Self {
1891 self.config.enable_ipv6 = enable;
1892 self
1893 }
1894
1895 pub fn with_connection_timeout(mut self, timeout: Duration) -> Self {
1899 self.config.connection_timeout = timeout;
1900 self
1901 }
1902
1903 pub fn with_max_connections(mut self, max: usize) -> Self {
1905 self.config.max_connections = max;
1906 self
1907 }
1908
1909 pub fn with_production_mode(mut self) -> Self {
1911 self.config.production_config = Some(ProductionConfig::default());
1912 self
1913 }
1914
1915 pub fn with_production_config(mut self, production_config: ProductionConfig) -> Self {
1917 self.config.production_config = Some(production_config);
1918 self
1919 }
1920
1921 pub fn with_diversity_config(
1923 mut self,
1924 diversity_config: crate::security::IPDiversityConfig,
1925 ) -> Self {
1926 self.config.diversity_config = Some(diversity_config);
1927 self
1928 }
1929
1930 pub fn with_dht(mut self, dht_config: DHTConfig) -> Self {
1932 self.config.dht_config = dht_config;
1933 self
1934 }
1935
1936 pub fn with_default_dht(mut self) -> Self {
1938 self.config.dht_config = DHTConfig::default();
1939 self
1940 }
1941
1942 pub async fn build(self) -> Result<P2PNode> {
1944 P2PNode::new(self.config).await
1945 }
1946}
1947
1948#[cfg(test)]
1949#[allow(clippy::unwrap_used, clippy::expect_used)]
1950mod diversity_tests {
1951 use super::*;
1952 use crate::security::IPDiversityConfig;
1953
1954 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
1955 let diversity_config = config.diversity_config.clone().unwrap_or_default();
1956 let temp_dir = tempfile::TempDir::new().expect("temp dir");
1958 let mut cache_config = config.bootstrap_cache_config.clone().unwrap_or_default();
1959 cache_config.cache_dir = temp_dir.path().to_path_buf();
1960
1961 BootstrapManager::with_full_config(
1962 cache_config,
1963 crate::rate_limit::JoinRateLimiterConfig::default(),
1964 diversity_config,
1965 )
1966 .await
1967 .expect("bootstrap manager")
1968 }
1969
1970 #[tokio::test]
1971 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
1972 let config = NodeConfig {
1973 diversity_config: Some(IPDiversityConfig::testnet()),
1974 ..Default::default()
1975 };
1976
1977 let manager = build_bootstrap_manager_like_prod(&config).await;
1978 assert!(manager.diversity_config().is_relaxed());
1979 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
1980 }
1981}
1982
1983pub(crate) async fn register_new_peer(
1985 peers: &Arc<RwLock<HashMap<PeerId, PeerInfo>>>,
1986 peer_id: &PeerId,
1987 remote_addr: &NetworkAddress,
1988) {
1989 let mut peers_guard = peers.write().await;
1990 let peer_info = PeerInfo {
1991 peer_id: peer_id.clone(),
1992 addresses: vec![remote_addr.to_string()],
1993 connected_at: tokio::time::Instant::now(),
1994 last_seen: tokio::time::Instant::now(),
1995 status: ConnectionStatus::Connected,
1996 protocols: vec!["p2p-core/1.0.0".to_string()],
1997 heartbeat_count: 0,
1998 };
1999 peers_guard.insert(peer_id.clone(), peer_info);
2000}
2001
2002#[cfg(test)]
2003mod tests {
2004 use super::*;
2005 use std::time::Duration;
2007 use tokio::time::timeout;
2008
2009 fn create_test_node_config() -> NodeConfig {
2015 NodeConfig {
2016 peer_id: Some("test_peer_123".to_string()),
2017 listen_addrs: vec![
2018 std::net::SocketAddr::new(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST), 0),
2019 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0),
2020 ],
2021 listen_addr: std::net::SocketAddr::new(
2022 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2023 0,
2024 ),
2025 bootstrap_peers: vec![],
2026 bootstrap_peers_str: vec![],
2027 enable_ipv6: true,
2028
2029 connection_timeout: Duration::from_secs(2),
2030 keep_alive_interval: Duration::from_secs(30),
2031 max_connections: 100,
2032 max_incoming_connections: 50,
2033 dht_config: DHTConfig::default(),
2034 security_config: SecurityConfig::default(),
2035 production_config: None,
2036 bootstrap_cache_config: None,
2037 diversity_config: None,
2038 stale_peer_threshold: default_stale_peer_threshold(),
2039 max_message_size: None,
2040 }
2041 }
2042
2043 #[tokio::test]
2047 async fn test_node_config_default() {
2048 let config = NodeConfig::default();
2049
2050 assert!(config.peer_id.is_none());
2051 assert_eq!(config.listen_addrs.len(), 2);
2052 assert!(config.enable_ipv6);
2053 assert_eq!(config.max_connections, 10000); assert_eq!(config.max_incoming_connections, 100);
2055 assert_eq!(config.connection_timeout, Duration::from_secs(30));
2056 }
2057
2058 #[tokio::test]
2059 async fn test_dht_config_default() {
2060 let config = DHTConfig::default();
2061
2062 assert_eq!(config.k_value, 20);
2063 assert_eq!(config.alpha_value, 5);
2064 assert_eq!(config.record_ttl, Duration::from_secs(3600));
2065 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2066 }
2067
2068 #[tokio::test]
2069 async fn test_security_config_default() {
2070 let config = SecurityConfig::default();
2071
2072 assert!(config.enable_noise);
2073 assert!(config.enable_tls);
2074 assert_eq!(config.trust_level, TrustLevel::Basic);
2075 }
2076
2077 #[test]
2078 fn test_trust_level_variants() {
2079 let _none = TrustLevel::None;
2081 let _basic = TrustLevel::Basic;
2082 let _full = TrustLevel::Full;
2083
2084 assert_eq!(TrustLevel::None, TrustLevel::None);
2086 assert_eq!(TrustLevel::Basic, TrustLevel::Basic);
2087 assert_eq!(TrustLevel::Full, TrustLevel::Full);
2088 assert_ne!(TrustLevel::None, TrustLevel::Basic);
2089 }
2090
2091 #[test]
2092 fn test_connection_status_variants() {
2093 let connecting = ConnectionStatus::Connecting;
2094 let connected = ConnectionStatus::Connected;
2095 let disconnecting = ConnectionStatus::Disconnecting;
2096 let disconnected = ConnectionStatus::Disconnected;
2097 let failed = ConnectionStatus::Failed("test error".to_string());
2098
2099 assert_eq!(connecting, ConnectionStatus::Connecting);
2100 assert_eq!(connected, ConnectionStatus::Connected);
2101 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2102 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2103 assert_ne!(connecting, connected);
2104
2105 if let ConnectionStatus::Failed(msg) = failed {
2106 assert_eq!(msg, "test error");
2107 } else {
2108 panic!("Expected Failed status");
2109 }
2110 }
2111
2112 #[tokio::test]
2113 async fn test_node_creation() -> Result<()> {
2114 let config = create_test_node_config();
2115 let node = P2PNode::new(config).await?;
2116
2117 assert_eq!(node.peer_id(), "test_peer_123");
2118 assert!(!node.is_running());
2119 assert_eq!(node.peer_count().await, 0);
2120 assert!(node.connected_peers().await.is_empty());
2121
2122 Ok(())
2123 }
2124
2125 #[tokio::test]
2126 async fn test_node_creation_without_peer_id() -> Result<()> {
2127 let mut config = create_test_node_config();
2128 config.peer_id = None;
2129
2130 let node = P2PNode::new(config).await?;
2131
2132 assert!(node.peer_id().starts_with("peer_"));
2134 assert!(!node.is_running());
2135
2136 Ok(())
2137 }
2138
2139 #[tokio::test]
2140 async fn test_node_lifecycle() -> Result<()> {
2141 let config = create_test_node_config();
2142 let node = P2PNode::new(config).await?;
2143
2144 assert!(!node.is_running());
2146
2147 node.start().await?;
2149 assert!(node.is_running());
2150
2151 let listen_addrs = node.listen_addrs().await;
2153 assert!(
2154 !listen_addrs.is_empty(),
2155 "Expected at least one listening address"
2156 );
2157
2158 node.stop().await?;
2160 assert!(!node.is_running());
2161
2162 Ok(())
2163 }
2164
2165 #[tokio::test]
2166 async fn test_peer_connection() -> Result<()> {
2167 let config1 = create_test_node_config();
2168 let mut config2 = create_test_node_config();
2169 config2.peer_id = Some("test_peer_456".to_string());
2170
2171 let node1 = P2PNode::new(config1).await?;
2172 let node2 = P2PNode::new(config2).await?;
2173
2174 node1.start().await?;
2175 node2.start().await?;
2176
2177 let node2_addr = node2
2178 .listen_addrs()
2179 .await
2180 .into_iter()
2181 .find(|a| a.ip().is_ipv4())
2182 .ok_or_else(|| {
2183 P2PError::Network(crate::error::NetworkError::InvalidAddress(
2184 "Node 2 did not expose an IPv4 listen address".into(),
2185 ))
2186 })?;
2187
2188 let peer_id = node1.connect_peer(&node2_addr.to_string()).await?;
2190
2191 assert_eq!(node1.peer_count().await, 1);
2193
2194 let connected_peers = node1.connected_peers().await;
2196 assert_eq!(connected_peers.len(), 1);
2197 assert_eq!(connected_peers[0], peer_id);
2198
2199 let peer_info = node1.peer_info(&peer_id).await;
2201 assert!(peer_info.is_some());
2202 let info = peer_info.expect("Peer info should exist after adding peer");
2203 assert_eq!(info.peer_id, peer_id);
2204 assert_eq!(info.status, ConnectionStatus::Connected);
2205 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2206
2207 node1.disconnect_peer(&peer_id).await?;
2209 assert_eq!(node1.peer_count().await, 0);
2210
2211 node1.stop().await?;
2212 node2.stop().await?;
2213
2214 Ok(())
2215 }
2216
2217 #[cfg_attr(target_os = "windows", ignore)]
2224 #[tokio::test]
2225 async fn test_event_subscription() -> Result<()> {
2226 let ipv4_localhost =
2231 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2232
2233 let mut config1 = create_test_node_config();
2234 config1.listen_addr = ipv4_localhost;
2235 config1.listen_addrs = vec![ipv4_localhost];
2236 config1.enable_ipv6 = false;
2237
2238 let mut config2 = create_test_node_config();
2239 config2.peer_id = Some("test_peer_456".to_string());
2240 config2.listen_addr = ipv4_localhost;
2241 config2.listen_addrs = vec![ipv4_localhost];
2242 config2.enable_ipv6 = false;
2243
2244 let node1 = P2PNode::new(config1).await?;
2245 let node2 = P2PNode::new(config2).await?;
2246
2247 node1.start().await?;
2248 node2.start().await?;
2249
2250 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2253
2254 let mut events = node1.subscribe_events();
2255
2256 let node2_addr = node2.local_addr().ok_or_else(|| {
2258 P2PError::Network(crate::error::NetworkError::ProtocolError(
2259 "No listening address".to_string().into(),
2260 ))
2261 })?;
2262
2263 let mut peer_id = None;
2266 for attempt in 0..3 {
2267 if attempt > 0 {
2268 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2269 }
2270 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2271 Ok(Ok(id)) => {
2272 peer_id = Some(id);
2273 break;
2274 }
2275 Ok(Err(_)) | Err(_) => continue,
2276 }
2277 }
2278 let peer_id = peer_id.ok_or_else(|| {
2279 P2PError::Network(crate::error::NetworkError::ProtocolError(
2280 "Failed to connect after 3 attempts".to_string().into(),
2281 ))
2282 })?;
2283
2284 let event = timeout(Duration::from_secs(2), events.recv()).await;
2286 assert!(event.is_ok());
2287
2288 let event_result = event
2289 .expect("Should receive event")
2290 .expect("Event should not be error");
2291 match event_result {
2292 P2PEvent::PeerConnected(event_peer_id) => {
2293 assert_eq!(event_peer_id, peer_id);
2294 }
2295 _ => panic!("Expected PeerConnected event"),
2296 }
2297
2298 node1.disconnect_peer(&peer_id).await?;
2300
2301 let event = timeout(Duration::from_secs(2), events.recv()).await;
2303 assert!(event.is_ok());
2304
2305 let event_result = event
2306 .expect("Should receive event")
2307 .expect("Event should not be error");
2308 match event_result {
2309 P2PEvent::PeerDisconnected(event_peer_id) => {
2310 assert_eq!(event_peer_id, peer_id);
2311 }
2312 _ => panic!("Expected PeerDisconnected event"),
2313 }
2314
2315 node1.stop().await?;
2316 node2.stop().await?;
2317
2318 Ok(())
2319 }
2320
2321 #[cfg_attr(target_os = "windows", ignore)]
2323 #[tokio::test]
2324 async fn test_message_sending() -> Result<()> {
2325 let mut config1 = create_test_node_config();
2327 config1.listen_addr =
2328 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2329 let node1 = P2PNode::new(config1).await?;
2330 node1.start().await?;
2331
2332 let mut config2 = create_test_node_config();
2333 config2.listen_addr =
2334 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
2335 let node2 = P2PNode::new(config2).await?;
2336 node2.start().await?;
2337
2338 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2340
2341 let node2_addr = node2.local_addr().ok_or_else(|| {
2343 P2PError::Network(crate::error::NetworkError::ProtocolError(
2344 "No listening address".to_string().into(),
2345 ))
2346 })?;
2347
2348 let peer_id =
2350 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2351 Ok(res) => res?,
2352 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2353 };
2354
2355 tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
2357
2358 let message_data = b"Hello, peer!".to_vec();
2360 let result = match timeout(
2361 Duration::from_millis(500),
2362 node1.send_message(&peer_id, "test-protocol", message_data),
2363 )
2364 .await
2365 {
2366 Ok(res) => res,
2367 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2368 };
2369 if let Err(e) = &result {
2372 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2373 }
2374
2375 let non_existent_peer = "non_existent_peer".to_string();
2377 let result = node1
2378 .send_message(&non_existent_peer, "test-protocol", vec![])
2379 .await;
2380 assert!(result.is_err(), "Sending to non-existent peer should fail");
2381
2382 Ok(())
2383 }
2384
2385 #[tokio::test]
2386 async fn test_remote_mcp_operations() -> Result<()> {
2387 let config = create_test_node_config();
2388 let node = P2PNode::new(config).await?;
2389
2390 node.start().await?;
2392 node.stop().await?;
2393 Ok(())
2394 }
2395
2396 #[tokio::test]
2397 async fn test_health_check() -> Result<()> {
2398 let config = create_test_node_config();
2399 let node = P2PNode::new(config).await?;
2400
2401 let result = node.health_check().await;
2403 assert!(result.is_ok());
2404
2405 Ok(())
2410 }
2411
2412 #[tokio::test]
2413 async fn test_node_uptime() -> Result<()> {
2414 let config = create_test_node_config();
2415 let node = P2PNode::new(config).await?;
2416
2417 let uptime1 = node.uptime();
2418 assert!(uptime1 >= Duration::from_secs(0));
2419
2420 tokio::time::sleep(Duration::from_millis(10)).await;
2422
2423 let uptime2 = node.uptime();
2424 assert!(uptime2 > uptime1);
2425
2426 Ok(())
2427 }
2428
2429 #[tokio::test]
2430 async fn test_node_config_access() -> Result<()> {
2431 let config = create_test_node_config();
2432 let expected_peer_id = config.peer_id.clone();
2433 let node = P2PNode::new(config).await?;
2434
2435 let node_config = node.config();
2436 assert_eq!(node_config.peer_id, expected_peer_id);
2437 assert_eq!(node_config.max_connections, 100);
2438 Ok(())
2441 }
2442
2443 #[tokio::test]
2444 async fn test_mcp_server_access() -> Result<()> {
2445 let config = create_test_node_config();
2446 let _node = P2PNode::new(config).await?;
2447
2448 Ok(())
2450 }
2451
2452 #[tokio::test]
2453 async fn test_dht_access() -> Result<()> {
2454 let config = create_test_node_config();
2455 let node = P2PNode::new(config).await?;
2456
2457 let _dht = node.dht();
2459
2460 Ok(())
2461 }
2462
2463 #[tokio::test]
2464 async fn test_node_builder() -> Result<()> {
2465 let builder = P2PNode::builder()
2467 .with_peer_id("builder_test_peer".to_string())
2468 .listen_on("/ip4/127.0.0.1/tcp/0")
2469 .listen_on("/ip6/::1/tcp/0")
2470 .with_bootstrap_peer("/ip4/127.0.0.1/tcp/9000") .with_ipv6(true)
2472 .with_connection_timeout(Duration::from_secs(15))
2473 .with_max_connections(200);
2474
2475 let config = builder.config;
2477 assert_eq!(config.peer_id, Some("builder_test_peer".to_string()));
2478 assert_eq!(config.listen_addrs.len(), 2); assert_eq!(config.bootstrap_peers_str.len(), 1); assert!(config.enable_ipv6);
2481 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2482 assert_eq!(config.max_connections, 200);
2483
2484 Ok(())
2485 }
2486
2487 #[tokio::test]
2488 async fn test_bootstrap_peers() -> Result<()> {
2489 let mut config = create_test_node_config();
2490 config.bootstrap_peers = vec![
2491 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9200),
2492 std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 9201),
2493 ];
2494
2495 let node = P2PNode::new(config).await?;
2496
2497 node.start().await?;
2499
2500 let _peer_count = node.peer_count().await;
2504
2505 node.stop().await?;
2506 Ok(())
2507 }
2508
2509 #[tokio::test]
2510 async fn test_production_mode_disabled() -> Result<()> {
2511 let config = create_test_node_config();
2512 let node = P2PNode::new(config).await?;
2513
2514 assert!(!node.is_production_mode());
2515 assert!(node.production_config().is_none());
2516
2517 let result = node.resource_metrics().await;
2519 assert!(result.is_err());
2520 assert!(result.unwrap_err().to_string().contains("not enabled"));
2521
2522 Ok(())
2523 }
2524
2525 #[tokio::test]
2526 async fn test_network_event_variants() {
2527 let peer_id = "test_peer".to_string();
2529 let address = "/ip4/127.0.0.1/tcp/9000".to_string();
2530
2531 let _peer_connected = NetworkEvent::PeerConnected {
2532 peer_id: peer_id.clone(),
2533 addresses: vec![address.clone()],
2534 };
2535
2536 let _peer_disconnected = NetworkEvent::PeerDisconnected {
2537 peer_id: peer_id.clone(),
2538 reason: "test disconnect".to_string(),
2539 };
2540
2541 let _message_received = NetworkEvent::MessageReceived {
2542 peer_id: peer_id.clone(),
2543 protocol: "test-protocol".to_string(),
2544 data: vec![1, 2, 3],
2545 };
2546
2547 let _connection_failed = NetworkEvent::ConnectionFailed {
2548 peer_id: Some(peer_id.clone()),
2549 address: address.clone(),
2550 error: "connection refused".to_string(),
2551 };
2552
2553 let _dht_stored = NetworkEvent::DHTRecordStored {
2554 key: vec![1, 2, 3],
2555 value: vec![4, 5, 6],
2556 };
2557
2558 let _dht_retrieved = NetworkEvent::DHTRecordRetrieved {
2559 key: vec![1, 2, 3],
2560 value: Some(vec![4, 5, 6]),
2561 };
2562 }
2563
2564 #[tokio::test]
2565 async fn test_peer_info_structure() {
2566 let peer_info = PeerInfo {
2567 peer_id: "test_peer".to_string(),
2568 addresses: vec!["/ip4/127.0.0.1/tcp/9000".to_string()],
2569 connected_at: Instant::now(),
2570 last_seen: Instant::now(),
2571 status: ConnectionStatus::Connected,
2572 protocols: vec!["test-protocol".to_string()],
2573 heartbeat_count: 0,
2574 };
2575
2576 assert_eq!(peer_info.peer_id, "test_peer");
2577 assert_eq!(peer_info.addresses.len(), 1);
2578 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2579 assert_eq!(peer_info.protocols.len(), 1);
2580 }
2581
2582 #[tokio::test]
2583 async fn test_serialization() -> Result<()> {
2584 let config = create_test_node_config();
2586 let serialized = serde_json::to_string(&config)?;
2587 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2588
2589 assert_eq!(config.peer_id, deserialized.peer_id);
2590 assert_eq!(config.listen_addrs, deserialized.listen_addrs);
2591 assert_eq!(config.enable_ipv6, deserialized.enable_ipv6);
2592
2593 Ok(())
2594 }
2595
2596 #[tokio::test]
2597 async fn test_get_peer_id_by_address_found() -> Result<()> {
2598 let config = create_test_node_config();
2599 let node = P2PNode::new(config).await?;
2600
2601 let test_peer_id = "peer_test_123".to_string();
2603 let test_address = "192.168.1.100:9000".to_string();
2604
2605 let peer_info = PeerInfo {
2606 peer_id: test_peer_id.clone(),
2607 addresses: vec![test_address.clone()],
2608 connected_at: Instant::now(),
2609 last_seen: Instant::now(),
2610 status: ConnectionStatus::Connected,
2611 protocols: vec!["test-protocol".to_string()],
2612 heartbeat_count: 0,
2613 };
2614
2615 node.transport
2616 .inject_peer(test_peer_id.clone(), peer_info)
2617 .await;
2618
2619 let found_peer_id = node.get_peer_id_by_address(&test_address).await;
2621 assert_eq!(found_peer_id, Some(test_peer_id));
2622
2623 Ok(())
2624 }
2625
2626 #[tokio::test]
2627 async fn test_get_peer_id_by_address_not_found() -> Result<()> {
2628 let config = create_test_node_config();
2629 let node = P2PNode::new(config).await?;
2630
2631 let result = node.get_peer_id_by_address("192.168.1.200:9000").await;
2633 assert_eq!(result, None);
2634
2635 Ok(())
2636 }
2637
2638 #[tokio::test]
2639 async fn test_get_peer_id_by_address_invalid_format() -> Result<()> {
2640 let config = create_test_node_config();
2641 let node = P2PNode::new(config).await?;
2642
2643 let result = node.get_peer_id_by_address("invalid-address").await;
2645 assert_eq!(result, None);
2646
2647 Ok(())
2648 }
2649
2650 #[tokio::test]
2651 async fn test_get_peer_id_by_address_multiple_peers() -> Result<()> {
2652 let config = create_test_node_config();
2653 let node = P2PNode::new(config).await?;
2654
2655 let peer1_id = "peer_1".to_string();
2657 let peer1_addr = "192.168.1.101:9001".to_string();
2658
2659 let peer2_id = "peer_2".to_string();
2660 let peer2_addr = "192.168.1.102:9002".to_string();
2661
2662 let peer1_info = PeerInfo {
2663 peer_id: peer1_id.clone(),
2664 addresses: vec![peer1_addr.clone()],
2665 connected_at: Instant::now(),
2666 last_seen: Instant::now(),
2667 status: ConnectionStatus::Connected,
2668 protocols: vec!["test-protocol".to_string()],
2669 heartbeat_count: 0,
2670 };
2671
2672 let peer2_info = PeerInfo {
2673 peer_id: peer2_id.clone(),
2674 addresses: vec![peer2_addr.clone()],
2675 connected_at: Instant::now(),
2676 last_seen: Instant::now(),
2677 status: ConnectionStatus::Connected,
2678 protocols: vec!["test-protocol".to_string()],
2679 heartbeat_count: 0,
2680 };
2681
2682 node.transport
2683 .inject_peer(peer1_id.clone(), peer1_info)
2684 .await;
2685 node.transport
2686 .inject_peer(peer2_id.clone(), peer2_info)
2687 .await;
2688
2689 let found_peer1 = node.get_peer_id_by_address(&peer1_addr).await;
2691 let found_peer2 = node.get_peer_id_by_address(&peer2_addr).await;
2692
2693 assert_eq!(found_peer1, Some(peer1_id));
2694 assert_eq!(found_peer2, Some(peer2_id));
2695
2696 Ok(())
2697 }
2698
2699 #[tokio::test]
2700 async fn test_list_active_connections_empty() -> Result<()> {
2701 let config = create_test_node_config();
2702 let node = P2PNode::new(config).await?;
2703
2704 let connections = node.list_active_connections().await;
2706 assert!(connections.is_empty());
2707
2708 Ok(())
2709 }
2710
2711 #[tokio::test]
2712 async fn test_list_active_connections_with_peers() -> Result<()> {
2713 let config = create_test_node_config();
2714 let node = P2PNode::new(config).await?;
2715
2716 let peer1_id = "peer_1".to_string();
2718 let peer1_addrs = vec![
2719 "192.168.1.101:9001".to_string(),
2720 "192.168.1.101:9002".to_string(),
2721 ];
2722
2723 let peer2_id = "peer_2".to_string();
2724 let peer2_addrs = vec!["192.168.1.102:9003".to_string()];
2725
2726 let peer1_info = PeerInfo {
2727 peer_id: peer1_id.clone(),
2728 addresses: peer1_addrs.clone(),
2729 connected_at: Instant::now(),
2730 last_seen: Instant::now(),
2731 status: ConnectionStatus::Connected,
2732 protocols: vec!["test-protocol".to_string()],
2733 heartbeat_count: 0,
2734 };
2735
2736 let peer2_info = PeerInfo {
2737 peer_id: peer2_id.clone(),
2738 addresses: peer2_addrs.clone(),
2739 connected_at: Instant::now(),
2740 last_seen: Instant::now(),
2741 status: ConnectionStatus::Connected,
2742 protocols: vec!["test-protocol".to_string()],
2743 heartbeat_count: 0,
2744 };
2745
2746 node.transport
2747 .inject_peer(peer1_id.clone(), peer1_info)
2748 .await;
2749 node.transport
2750 .inject_peer(peer2_id.clone(), peer2_info)
2751 .await;
2752
2753 node.transport
2755 .inject_active_connection(peer1_id.clone())
2756 .await;
2757 node.transport
2758 .inject_active_connection(peer2_id.clone())
2759 .await;
2760
2761 let connections = node.list_active_connections().await;
2763 assert_eq!(connections.len(), 2);
2764
2765 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
2767 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
2768
2769 assert!(peer1_conn.is_some());
2770 assert!(peer2_conn.is_some());
2771
2772 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
2774 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
2775
2776 Ok(())
2777 }
2778
2779 #[tokio::test]
2780 async fn test_remove_peer_success() -> Result<()> {
2781 let config = create_test_node_config();
2782 let node = P2PNode::new(config).await?;
2783
2784 let peer_id = "peer_to_remove".to_string();
2786 let peer_info = PeerInfo {
2787 peer_id: peer_id.clone(),
2788 addresses: vec!["192.168.1.100:9000".to_string()],
2789 connected_at: Instant::now(),
2790 last_seen: Instant::now(),
2791 status: ConnectionStatus::Connected,
2792 protocols: vec!["test-protocol".to_string()],
2793 heartbeat_count: 0,
2794 };
2795
2796 node.transport.inject_peer(peer_id.clone(), peer_info).await;
2797
2798 assert!(node.is_peer_connected(&peer_id).await);
2800
2801 let removed = node.remove_peer(&peer_id).await;
2803 assert!(removed);
2804
2805 assert!(!node.is_peer_connected(&peer_id).await);
2807
2808 Ok(())
2809 }
2810
2811 #[tokio::test]
2812 async fn test_remove_peer_nonexistent() -> Result<()> {
2813 let config = create_test_node_config();
2814 let node = P2PNode::new(config).await?;
2815
2816 let removed = node.remove_peer(&"nonexistent_peer".to_string()).await;
2818 assert!(!removed);
2819
2820 Ok(())
2821 }
2822
2823 #[tokio::test]
2824 async fn test_is_peer_connected() -> Result<()> {
2825 let config = create_test_node_config();
2826 let node = P2PNode::new(config).await?;
2827
2828 let peer_id = "test_peer".to_string();
2829
2830 assert!(!node.is_peer_connected(&peer_id).await);
2832
2833 let peer_info = PeerInfo {
2835 peer_id: peer_id.clone(),
2836 addresses: vec!["192.168.1.100:9000".to_string()],
2837 connected_at: Instant::now(),
2838 last_seen: Instant::now(),
2839 status: ConnectionStatus::Connected,
2840 protocols: vec!["test-protocol".to_string()],
2841 heartbeat_count: 0,
2842 };
2843
2844 node.transport.inject_peer(peer_id.clone(), peer_info).await;
2845
2846 assert!(node.is_peer_connected(&peer_id).await);
2848
2849 node.remove_peer(&peer_id).await;
2851
2852 assert!(!node.is_peer_connected(&peer_id).await);
2854
2855 Ok(())
2856 }
2857
2858 #[test]
2859 fn test_normalize_ipv6_wildcard() {
2860 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2861
2862 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
2863 let normalized = normalize_wildcard_to_loopback(wildcard);
2864
2865 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
2866 assert_eq!(normalized.port(), 8080);
2867 }
2868
2869 #[test]
2870 fn test_normalize_ipv4_wildcard() {
2871 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2872
2873 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
2874 let normalized = normalize_wildcard_to_loopback(wildcard);
2875
2876 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2877 assert_eq!(normalized.port(), 9000);
2878 }
2879
2880 #[test]
2881 fn test_normalize_specific_address_unchanged() {
2882 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
2883 let normalized = normalize_wildcard_to_loopback(specific);
2884
2885 assert_eq!(normalized, specific);
2886 }
2887
2888 #[test]
2889 fn test_normalize_loopback_unchanged() {
2890 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
2891
2892 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
2893 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
2894 assert_eq!(normalized_v6, loopback_v6);
2895
2896 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
2897 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
2898 assert_eq!(normalized_v4, loopback_v4);
2899 }
2900
2901 fn current_timestamp() -> u64 {
2905 std::time::SystemTime::now()
2906 .duration_since(std::time::UNIX_EPOCH)
2907 .map(|d| d.as_secs())
2908 .unwrap_or(0)
2909 }
2910
2911 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
2913 let msg = WireMessage {
2914 protocol: protocol.to_string(),
2915 data,
2916 from: from.to_string(),
2917 timestamp,
2918 };
2919 postcard::to_stdvec(&msg).unwrap()
2920 }
2921
2922 #[test]
2923 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
2924 let transport_id = "abcdef0123456789";
2928 let logical_id = "spoofed-logical-id";
2929 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
2930
2931 let event =
2932 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
2933
2934 match event {
2935 P2PEvent::Message {
2936 topic,
2937 source,
2938 data,
2939 } => {
2940 assert_eq!(source, transport_id, "source must be the transport peer ID");
2941 assert_ne!(
2942 source, logical_id,
2943 "source must NOT be the logical 'from' field"
2944 );
2945 assert_eq!(topic, "test/v1");
2946 assert_eq!(data, vec![1u8, 2, 3]);
2947 }
2948 other => panic!("expected P2PEvent::Message, got {:?}", other),
2949 }
2950 }
2951
2952 #[test]
2953 fn test_parse_protocol_message_rejects_invalid_bytes() {
2954 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
2956 }
2957
2958 #[test]
2959 fn test_parse_protocol_message_rejects_truncated_message() {
2960 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
2962 let truncated = &full_bytes[..full_bytes.len() / 2];
2963 assert!(parse_protocol_message(truncated, "peer-id").is_none());
2964 }
2965
2966 #[test]
2967 fn test_parse_protocol_message_empty_payload() {
2968 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
2969
2970 let event = parse_protocol_message(&bytes, "transport-peer")
2971 .expect("valid message with empty data should parse");
2972
2973 match event {
2974 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
2975 other => panic!("expected P2PEvent::Message, got {:?}", other),
2976 }
2977 }
2978
2979 #[test]
2980 fn test_parse_protocol_message_preserves_binary_payload() {
2981 let payload: Vec<u8> = (0..=255).collect();
2983 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
2984
2985 let event = parse_protocol_message(&bytes, "peer-id")
2986 .expect("valid message with full byte range should parse");
2987
2988 match event {
2989 P2PEvent::Message { data, topic, .. } => {
2990 assert_eq!(topic, "binary/v1");
2991 assert_eq!(
2992 data, payload,
2993 "payload must survive bincode round-trip exactly"
2994 );
2995 }
2996 other => panic!("expected P2PEvent::Message, got {:?}", other),
2997 }
2998 }
2999}