1use crate::PeerId;
20use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
21use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
22use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
23use crate::dht::core_engine::AddressType;
24use crate::dht_network_manager::{
25 DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, IDENTITY_EXCHANGE_TIMEOUT,
26};
27use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
28use crate::reachability::spawn_acquisition_driver;
29
30use crate::MultiAddr;
31use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
32use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
33use dashmap::DashMap;
34use futures::StreamExt;
35use parking_lot::Mutex as ParkingMutex;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::net::SocketAddr;
39use std::path::{Path, PathBuf};
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, Ordering};
42use std::time::{Duration, SystemTime, UNIX_EPOCH};
43use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
44use tokio::time::Instant;
45use tokio_util::sync::CancellationToken;
46use tracing::{debug, info, trace, warn};
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
52pub(crate) struct WireMessage {
53 pub(crate) protocol: String,
55 pub(crate) data: Vec<u8>,
57 pub(crate) from: PeerId,
59 pub(crate) timestamp: u64,
61 #[serde(default)]
67 pub(crate) user_agent: String,
68 #[serde(default)]
70 pub(crate) public_key: Vec<u8>,
71 #[serde(default)]
73 pub(crate) signature: Vec<u8>,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
82pub enum NodeMode {
83 #[default]
85 Node,
86 Client,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92enum ListenMode {
93 Public,
95 Local,
97}
98
99pub fn user_agent_for_mode(mode: NodeMode) -> String {
104 let prefix = match mode {
105 NodeMode::Node => "node",
106 NodeMode::Client => "client",
107 };
108 format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
109}
110
111pub fn is_dht_participant(user_agent: &str) -> bool {
113 user_agent.starts_with("node/")
114}
115
116pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
118
119pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
121
122pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
124
125const DEFAULT_LISTEN_PORT: u16 = 9000;
127
128const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
130
131const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 25;
138
139const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 3;
152
153const MAX_CONCURRENT_BOOTSTRAP_DIALS: usize = 4;
160
161const CLIENT_BOOTSTRAP_TARGET: usize = 6;
170
171const fn default_true() -> bool {
173 true
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct NodeConfig {
179 #[serde(default)]
185 pub local: bool,
186
187 #[serde(default)]
189 pub port: u16,
190
191 #[serde(default = "default_true")]
196 pub ipv6: bool,
197
198 pub bootstrap_peers: Vec<crate::MultiAddr>,
200
201 pub connection_timeout: Duration,
204
205 pub max_connections: usize,
207
208 pub dht_config: DHTConfig,
210
211 pub diversity_config: Option<crate::security::IPDiversityConfig>,
216
217 #[serde(default)]
221 pub max_message_size: Option<usize>,
222
223 #[serde(skip)]
228 pub node_identity: Option<Arc<NodeIdentity>>,
229
230 #[serde(default)]
236 pub mode: NodeMode,
237
238 #[serde(default, skip_serializing_if = "Option::is_none")]
243 pub custom_user_agent: Option<String>,
244
245 #[serde(default)]
253 pub allow_loopback: bool,
254
255 #[serde(default)]
263 pub adaptive_dht_config: AdaptiveDhtConfig,
264
265 #[serde(default, skip_serializing_if = "Option::is_none")]
276 pub close_group_cache_dir: Option<PathBuf>,
277}
278
279#[derive(Debug, Clone, Serialize, Deserialize)]
281pub struct DHTConfig {
282 pub k_value: usize,
284
285 pub alpha_value: usize,
287
288 pub refresh_interval: Duration,
290}
291
292#[inline]
305fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
306 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
307
308 let (v4, v6) = match mode {
309 ListenMode::Public => (
310 std::net::Ipv4Addr::UNSPECIFIED,
311 std::net::Ipv6Addr::UNSPECIFIED,
312 ),
313 ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
314 };
315
316 if ipv6_enabled {
317 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
318 std::net::IpAddr::V6(v6),
319 port,
320 )));
321 }
322
323 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
324 std::net::IpAddr::V4(v4),
325 port,
326 )));
327
328 addrs
329}
330
331impl NodeConfig {
332 pub fn user_agent(&self) -> String {
337 self.custom_user_agent
338 .clone()
339 .unwrap_or_else(|| user_agent_for_mode(self.mode))
340 }
341
342 pub fn listen_addrs(&self) -> Vec<MultiAddr> {
347 let mode = if self.local {
348 ListenMode::Local
349 } else {
350 ListenMode::Public
351 };
352 build_listen_addrs(self.port, self.ipv6, mode)
353 }
354
355 pub fn new() -> Result<Self> {
361 Ok(Self::default())
362 }
363
364 pub fn builder() -> NodeConfigBuilder {
366 NodeConfigBuilder::default()
367 }
368}
369
370#[derive(Debug, Clone)]
393pub struct NodeConfigBuilder {
394 port: u16,
395 ipv6: bool,
396 local: bool,
397 bootstrap_peers: Vec<crate::MultiAddr>,
398 max_connections: Option<usize>,
399 connection_timeout: Option<Duration>,
400 dht_config: Option<DHTConfig>,
401 max_message_size: Option<usize>,
402 mode: NodeMode,
403 custom_user_agent: Option<String>,
404 allow_loopback: Option<bool>,
405 adaptive_dht_config: Option<AdaptiveDhtConfig>,
406 close_group_cache_dir: Option<PathBuf>,
407}
408
409impl Default for NodeConfigBuilder {
410 fn default() -> Self {
411 Self {
412 port: 0,
413 ipv6: true,
414 local: false,
415 bootstrap_peers: Vec::new(),
416 max_connections: None,
417 connection_timeout: None,
418 dht_config: None,
419 max_message_size: None,
420 mode: NodeMode::default(),
421 custom_user_agent: None,
422 allow_loopback: None,
423 adaptive_dht_config: None,
424 close_group_cache_dir: None,
425 }
426 }
427}
428
429impl NodeConfigBuilder {
430 pub fn port(mut self, port: u16) -> Self {
432 self.port = port;
433 self
434 }
435
436 pub fn ipv6(mut self, enabled: bool) -> Self {
438 self.ipv6 = enabled;
439 self
440 }
441
442 pub fn local(mut self, local: bool) -> Self {
449 self.local = local;
450 self
451 }
452
453 pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
455 self.bootstrap_peers.push(addr);
456 self
457 }
458
459 pub fn max_connections(mut self, max: usize) -> Self {
461 self.max_connections = Some(max);
462 self
463 }
464
465 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
467 self.connection_timeout = Some(timeout);
468 self
469 }
470
471 pub fn dht_config(mut self, config: DHTConfig) -> Self {
473 self.dht_config = Some(config);
474 self
475 }
476
477 pub fn max_message_size(mut self, max_message_size: usize) -> Self {
481 self.max_message_size = Some(max_message_size);
482 self
483 }
484
485 pub fn mode(mut self, mode: NodeMode) -> Self {
487 self.mode = mode;
488 self
489 }
490
491 pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
493 self.custom_user_agent = Some(user_agent.into());
494 self
495 }
496
497 pub fn allow_loopback(mut self, allow: bool) -> Self {
501 self.allow_loopback = Some(allow);
502 self
503 }
504
505 pub fn trust_enforcement(mut self, enabled: bool) -> Self {
518 let threshold = if enabled {
519 AdaptiveDhtConfig::default().swap_threshold
520 } else {
521 0.0
522 };
523 self.adaptive_dht_config = Some(AdaptiveDhtConfig {
524 swap_threshold: threshold,
525 });
526 self
527 }
528
529 pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
533 self.adaptive_dht_config = Some(config);
534 self
535 }
536
537 pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
542 self.close_group_cache_dir = Some(path.into());
543 self
544 }
545
546 pub fn build(self) -> Result<NodeConfig> {
552 let allow_loopback = self.allow_loopback.unwrap_or(self.local);
554
555 Ok(NodeConfig {
556 local: self.local,
557 port: self.port,
558 ipv6: self.ipv6,
559 bootstrap_peers: self.bootstrap_peers,
560 connection_timeout: self
561 .connection_timeout
562 .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
563 max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
564 dht_config: self.dht_config.unwrap_or_default(),
565 diversity_config: None,
566 max_message_size: self.max_message_size,
567 node_identity: None,
568 mode: self.mode,
569 custom_user_agent: self.custom_user_agent,
570 allow_loopback,
571 adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
572 close_group_cache_dir: self.close_group_cache_dir,
573 })
574 }
575}
576
577impl Default for NodeConfig {
578 fn default() -> Self {
579 Self {
580 local: false,
581 port: DEFAULT_LISTEN_PORT,
582 ipv6: true,
583 bootstrap_peers: Vec::new(),
584 connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
585 max_connections: DEFAULT_MAX_CONNECTIONS,
586 dht_config: DHTConfig::default(),
587 diversity_config: None,
588 max_message_size: None,
589 node_identity: None,
590 mode: NodeMode::default(),
591 custom_user_agent: None,
592 allow_loopback: false,
593 adaptive_dht_config: AdaptiveDhtConfig::default(),
594 close_group_cache_dir: None,
595 }
596 }
597}
598
599impl DHTConfig {
600 pub const DEFAULT_K_VALUE: usize = 20;
602 const DEFAULT_ALPHA_VALUE: usize = 3;
603 const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
604 const MIN_K_VALUE: usize = 4;
606
607 pub fn validate(&self) -> Result<()> {
611 if self.k_value < Self::MIN_K_VALUE {
612 return Err(P2PError::Validation(
613 format!(
614 "k_value must be >= {} (got {}), values below {} produce degenerate behavior",
615 Self::MIN_K_VALUE,
616 self.k_value,
617 Self::MIN_K_VALUE,
618 )
619 .into(),
620 ));
621 }
622 if self.alpha_value < 1 {
623 return Err(P2PError::Validation(
624 format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
625 ));
626 }
627 if self.refresh_interval.is_zero() {
628 return Err(P2PError::Validation("refresh_interval must be > 0".into()));
629 }
630 Ok(())
631 }
632}
633
634impl Default for DHTConfig {
635 fn default() -> Self {
636 Self {
637 k_value: Self::DEFAULT_K_VALUE,
638 alpha_value: Self::DEFAULT_ALPHA_VALUE,
639 refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
640 }
641 }
642}
643
644#[derive(Debug, Clone)]
646pub struct PeerInfo {
647 #[allow(dead_code)]
649 pub(crate) channel_id: String,
650
651 pub addresses: Vec<MultiAddr>,
653
654 pub connected_at: Instant,
656
657 pub last_seen: Instant,
659
660 pub status: ConnectionStatus,
662
663 pub protocols: Vec<String>,
665
666 pub heartbeat_count: u64,
668}
669
670#[derive(Debug, Clone, PartialEq)]
672pub enum ConnectionStatus {
673 Connecting,
675 Connected,
677 Disconnecting,
679 Disconnected,
681 Failed(String),
683}
684
685#[derive(Debug, Clone)]
690pub enum P2PEvent {
691 Message {
693 topic: String,
695 source: Option<PeerId>,
698 transport_source: Option<MultiAddr>,
702 timestamp: u64,
710 data: Vec<u8>,
712 },
713 PeerConnected(PeerId, String),
716 PeerDisconnected(PeerId),
718}
719
720#[derive(Debug, Clone)]
725pub struct PeerResponse {
726 pub peer_id: PeerId,
728 pub data: Vec<u8>,
730 pub latency: Duration,
732}
733
734#[derive(Debug, Clone, Serialize, Deserialize)]
739pub(crate) struct RequestResponseEnvelope {
740 pub(crate) message_id: String,
742 pub(crate) is_response: bool,
744 pub(crate) payload: Vec<u8>,
746}
747
748pub(crate) struct PendingRequest {
750 pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
752 pub(crate) expected_peer: PeerId,
754}
755
756const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
762
763pub struct P2PNode {
774 config: NodeConfig,
776
777 peer_id: PeerId,
779
780 transport: Arc<crate::transport_handle::TransportHandle>,
782
783 start_time: Instant,
785
786 shutdown: CancellationToken,
788
789 adaptive_dht: AdaptiveDHT,
792
793 is_bootstrapped: Arc<AtomicBool>,
795
796 is_started: Arc<AtomicBool>,
798
799 reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
803
804 relayer_peer_id: Arc<RwLock<Option<PeerId>>>,
813
814 relay_address: Arc<RwLock<Option<SocketAddr>>>,
821}
822
823pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
839 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
840
841 if addr.ip().is_unspecified() {
842 let loopback_ip = match addr {
844 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
847 std::net::SocketAddr::new(loopback_ip, addr.port())
848 } else {
849 addr
851 }
852}
853
854impl P2PNode {
855 pub async fn new(config: NodeConfig) -> Result<Self> {
857 let node_identity = match config.node_identity.clone() {
859 Some(identity) => identity,
860 None => Arc::new(NodeIdentity::generate()?),
861 };
862
863 let peer_id = *node_identity.peer_id();
865
866 config.dht_config.validate()?;
869 if let Some(ref diversity) = config.diversity_config {
870 diversity
871 .validate()
872 .map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
873 }
874
875 let transport_config = crate::transport_handle::TransportConfig::from_node_config(
877 &config,
878 crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
879 node_identity.clone(),
880 );
881 let transport =
882 Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
883
884 let dht_manager_config = DhtNetworkConfig {
886 peer_id,
887 node_config: config.clone(),
888 request_timeout: config.connection_timeout,
889 max_concurrent_operations: MAX_ACTIVE_REQUESTS,
890 enable_security: true,
891 swap_threshold: 0.0, };
893 let adaptive_dht = AdaptiveDHT::new(
894 transport.clone(),
895 dht_manager_config,
896 config.adaptive_dht_config.clone(),
897 )
898 .await?;
899
900 let node = Self {
901 config,
902 peer_id,
903 transport,
904 start_time: Instant::now(),
905 shutdown: CancellationToken::new(),
906 adaptive_dht,
907 is_bootstrapped: Arc::new(AtomicBool::new(false)),
908 is_started: Arc::new(AtomicBool::new(false)),
909 reconnect_locks: ParkingMutex::new(HashMap::new()),
910 relayer_peer_id: Arc::new(RwLock::new(None)),
911 relay_address: Arc::new(RwLock::new(None)),
912 };
913 info!(
914 "Created P2P node with peer ID: {} (call start() to begin networking)",
915 node.peer_id
916 );
917
918 Ok(node)
919 }
920
921 pub fn peer_id(&self) -> &PeerId {
923 &self.peer_id
924 }
925
926 pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
928 &self.transport
929 }
930
931 pub async fn relay_address(&self) -> Option<SocketAddr> {
938 *self.relay_address.read().await
939 }
940
941 pub fn local_addr(&self) -> Option<MultiAddr> {
942 self.transport.local_addr()
943 }
944
945 pub fn is_bootstrapped(&self) -> bool {
950 self.is_bootstrapped.load(Ordering::SeqCst)
951 }
952
953 pub async fn re_bootstrap(&self) -> Result<()> {
958 self.is_bootstrapped.store(false, Ordering::SeqCst);
959 self.connect_bootstrap_peers(None).await
960 }
961
962 pub fn trust_engine(&self) -> Arc<TrustEngine> {
968 self.adaptive_dht.trust_engine().clone()
969 }
970
971 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
985 self.adaptive_dht.report_trust_event(peer_id, event).await;
986 }
987
988 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
992 self.adaptive_dht.peer_trust(peer_id)
993 }
994
995 pub fn adaptive_dht(&self) -> &AdaptiveDHT {
997 &self.adaptive_dht
998 }
999
1000 pub async fn send_request(
1033 &self,
1034 peer_id: &PeerId,
1035 protocol: &str,
1036 data: Vec<u8>,
1037 timeout: Duration,
1038 ) -> Result<PeerResponse> {
1039 let result = self
1040 .send_request_reconnecting(peer_id, protocol, data, timeout)
1041 .await;
1042 if let Err(ref e) = result {
1043 let event = if matches!(e, P2PError::Timeout(_)) {
1044 TrustEvent::ConnectionTimeout
1045 } else {
1046 TrustEvent::ConnectionFailed
1047 };
1048 self.report_trust_event(peer_id, event).await;
1049 }
1050 result
1051 }
1052
1053 async fn send_request_reconnecting(
1070 &self,
1071 peer_id: &PeerId,
1072 protocol: &str,
1073 data: Vec<u8>,
1074 timeout: Duration,
1075 ) -> Result<PeerResponse> {
1076 let existing_channels = self.transport.channels_for_peer(peer_id).await;
1081
1082 if existing_channels.is_empty() {
1085 {
1090 let lock = self.reconnect_lock_for(peer_id);
1091 let _guard = lock.lock().await;
1092 if !self.transport.is_peer_connected(peer_id).await {
1094 self.ensure_channel(peer_id, &[], &[], &[]).await?;
1095 }
1096 }
1097 return self
1098 .transport
1099 .send_request(peer_id, protocol, data, timeout)
1100 .await;
1101 }
1102
1103 let saved_addrs: Vec<MultiAddr> = self
1106 .transport
1107 .peer_info(peer_id)
1108 .await
1109 .map(|info| info.addresses)
1110 .unwrap_or_default();
1111
1112 let retry_data = data.clone();
1115
1116 match self
1118 .transport
1119 .send_request(peer_id, protocol, data, timeout)
1120 .await
1121 {
1122 Ok(resp) => return Ok(resp),
1123 Err(e) => {
1124 if !e.is_stale_channel_send_failure() {
1128 return Err(e);
1129 }
1130 debug!(
1131 peer = %peer_id.to_hex(),
1132 error = %e,
1133 "stale channel request failed, attempting reconnect",
1134 );
1135 }
1136 }
1137
1138 {
1143 let lock = self.reconnect_lock_for(peer_id);
1144 let _guard = lock.lock().await;
1145
1146 if self.transport.is_peer_connected(peer_id).await {
1148 for channel_id in &existing_channels {
1151 self.transport.disconnect_channel(channel_id).await;
1152 }
1153 } else {
1154 self.ensure_channel(peer_id, &[], &saved_addrs, &existing_channels)
1155 .await?;
1156 }
1157 }
1158 self.transport
1159 .send_request(peer_id, protocol, retry_data, timeout)
1160 .await
1161 }
1162
1163 pub async fn send_response(
1164 &self,
1165 peer_id: &PeerId,
1166 protocol: &str,
1167 message_id: &str,
1168 data: Vec<u8>,
1169 ) -> Result<()> {
1170 self.transport
1171 .send_response(peer_id, protocol, message_id, data)
1172 .await
1173 }
1174
1175 pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1176 crate::transport_handle::TransportHandle::parse_request_envelope(data)
1177 }
1178
1179 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1180 self.transport.subscribe(topic).await
1181 }
1182
1183 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1184 self.transport.publish(topic, data).await
1185 }
1186
1187 pub fn config(&self) -> &NodeConfig {
1189 &self.config
1190 }
1191
1192 pub async fn start(&self) -> Result<()> {
1194 info!("Starting P2P node...");
1195
1196 self.transport.start_network_listeners().await?;
1198
1199 self.adaptive_dht.start().await?;
1201
1202 let listen_addrs = self.transport.listen_addrs().await;
1204 info!("P2P node started on addresses: {:?}", listen_addrs);
1205
1206 let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
1213 match CloseGroupCache::load_from_dir(dir).await {
1214 Ok(Some(cache)) => {
1215 let original_count = cache.peers.len();
1218 let cache = CloseGroupCache {
1219 peers: cache
1220 .peers
1221 .into_iter()
1222 .filter(|p| p.trust.score.is_finite())
1223 .collect(),
1224 ..cache
1225 };
1226 let filtered_count = original_count - cache.peers.len();
1227 if filtered_count > 0 {
1228 warn!(
1229 "Filtered {filtered_count} peers with non-finite trust scores from close group cache"
1230 );
1231 }
1232
1233 let trust_snapshot = TrustSnapshot {
1234 peers: cache
1235 .peers
1236 .iter()
1237 .map(|p| (p.peer_id, p.trust.clone()))
1238 .collect(),
1239 };
1240 self.adaptive_dht
1241 .trust_engine()
1242 .import_snapshot(&trust_snapshot);
1243 info!(
1244 "Loaded {} peers from close group cache (trust scores imported)",
1245 cache.peers.len()
1246 );
1247 Some(cache)
1248 }
1249 Ok(None) => {
1250 debug!(
1251 "No close group cache found in {}, fresh start",
1252 dir.display()
1253 );
1254 None
1255 }
1256 Err(e) => {
1257 warn!(
1258 "Failed to load close group cache from {}: {e}",
1259 dir.display()
1260 );
1261 None
1262 }
1263 }
1264 } else {
1265 None
1266 };
1267
1268 self.connect_bootstrap_peers(close_group_cache.as_ref())
1270 .await?;
1271
1272 {
1279 let dht = self.adaptive_dht.dht_manager();
1280 let rt_size = dht.get_routing_table_size().await;
1281 dht.emit_event(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
1282 }
1283
1284 if self.config.mode != NodeMode::Client {
1298 spawn_acquisition_driver(
1299 self.adaptive_dht.dht_manager().clone(),
1300 Arc::clone(&self.transport),
1301 Arc::clone(&self.relayer_peer_id),
1302 Arc::clone(&self.relay_address),
1303 self.shutdown.clone(),
1304 );
1305 } else {
1306 info!("client mode — skipping relay acquisition driver");
1307 }
1308
1309 {
1350 let transport = Arc::clone(&self.transport);
1351 let dht = self.adaptive_dht.dht_manager().clone();
1352 let shutdown = self.shutdown.clone();
1353 tokio::spawn(async move {
1354 loop {
1355 tokio::select! {
1356 biased;
1357 _ = shutdown.cancelled() => break,
1358 update = transport.recv_peer_address_update() => {
1359 let Some((peer_addr, advertised_addr)) = update else { break };
1360 let normalized_peer =
1361 saorsa_transport::shared::normalize_socket_addr(peer_addr);
1362 let normalized_adv =
1363 saorsa_transport::shared::normalize_socket_addr(advertised_addr);
1364 if normalized_peer.ip() == normalized_adv.ip() {
1369 debug!(
1370 "DHT_BRIDGE: dropping same-IP update peer={} addr={}",
1371 normalized_peer,
1372 normalized_adv
1373 );
1374 continue;
1375 }
1376 info!(
1377 "DHT_BRIDGE: processing relay update peer={} addr={}",
1378 normalized_peer,
1379 normalized_adv
1380 );
1381 if let Some(peer_id) = transport.peer_id_for_addr(&normalized_peer).await {
1386 let multi_addr = MultiAddr::quic(normalized_adv);
1387 info!(
1388 "Updating DHT: peer {} relay address {} (connection was {})",
1389 peer_id, advertised_addr, peer_addr
1390 );
1391 if !dht
1392 .touch_legacy_relay_hint_if_unsequenced(&peer_id, &multi_addr)
1393 .await
1394 {
1395 debug!(
1396 "DHT_BRIDGE: ignored legacy relay hint for sequenced peer {} addr {}",
1397 peer_id, advertised_addr
1398 );
1399 }
1400 }
1401 }
1402 }
1403 }
1404 });
1405 }
1406
1407 self.is_started
1408 .store(true, std::sync::atomic::Ordering::Release);
1409
1410 Ok(())
1411 }
1412
1413 pub async fn run(&self) -> Result<()> {
1418 if !self.is_running() {
1419 self.start().await?;
1420 }
1421
1422 info!("P2P node running...");
1423
1424 self.shutdown.cancelled().await;
1427
1428 info!("P2P node stopped");
1429 Ok(())
1430 }
1431
1432 pub async fn stop(&self) -> Result<()> {
1434 info!("Stopping P2P node...");
1435
1436 if let Some(ref dir) = self.config.close_group_cache_dir
1438 && let Err(e) = self.save_close_group_cache(dir).await
1439 {
1440 warn!("Failed to save close group cache on shutdown: {e}");
1441 }
1442
1443 self.shutdown.cancel();
1445
1446 self.adaptive_dht.stop().await?;
1448
1449 self.transport.stop().await?;
1451
1452 self.is_started
1453 .store(false, std::sync::atomic::Ordering::Release);
1454
1455 info!("P2P node stopped");
1456 Ok(())
1457 }
1458
1459 pub async fn shutdown(&self) -> Result<()> {
1461 self.stop().await
1462 }
1463
1464 pub fn is_running(&self) -> bool {
1466 self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1467 }
1468
1469 pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1471 self.transport.listen_addrs().await
1472 }
1473
1474 pub async fn connected_peers(&self) -> Vec<PeerId> {
1476 self.transport.connected_peers().await
1477 }
1478
1479 pub async fn peer_count(&self) -> usize {
1481 self.transport.peer_count().await
1482 }
1483
1484 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1486 self.transport.peer_info(peer_id).await
1487 }
1488
1489 #[allow(dead_code)]
1491 pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1492 self.transport.get_channel_id_by_address(addr).await
1493 }
1494
1495 #[allow(dead_code)]
1497 pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1498 self.transport.list_active_connections().await
1499 }
1500
1501 #[allow(dead_code)]
1503 pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1504 self.transport.remove_channel(channel_id).await
1505 }
1506
1507 pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1512 self.transport.disconnect_channel(channel_id).await;
1513 }
1514
1515 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1517 self.transport.is_peer_connected(peer_id).await
1518 }
1519
1520 pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1531 self.transport.connect_peer(address).await
1532 }
1533
1534 pub async fn connect_peer_typed(
1541 &self,
1542 address: &MultiAddr,
1543 kind: AddressType,
1544 ) -> Result<String> {
1545 self.transport.connect_peer_typed(address, kind).await
1546 }
1547
1548 pub async fn wait_for_peer_identity(
1555 &self,
1556 channel_id: &str,
1557 timeout: Duration,
1558 ) -> Result<PeerId> {
1559 self.transport
1560 .wait_for_peer_identity(channel_id, timeout)
1561 .await
1562 }
1563
1564 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1566 self.transport.disconnect_peer(peer_id).await
1567 }
1568
1569 #[allow(dead_code)]
1571 pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1572 self.transport.is_connection_active(channel_id).await
1573 }
1574
1575 pub async fn send_message(
1589 &self,
1590 peer_id: &PeerId,
1591 protocol: &str,
1592 data: Vec<u8>,
1593 addrs: &[MultiAddr],
1594 ) -> Result<()> {
1595 let existing_channels = self.transport.channels_for_peer(peer_id).await;
1600
1601 if existing_channels.is_empty() {
1604 let lock = self.reconnect_lock_for(peer_id);
1605 let _guard = lock.lock().await;
1606
1607 if self.transport.is_peer_connected(peer_id).await {
1609 return self.transport.send_message(peer_id, protocol, data).await;
1610 }
1611
1612 return self
1613 .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1614 .await;
1615 }
1616
1617 let saved_addrs: Vec<MultiAddr> = self
1620 .transport
1621 .peer_info(peer_id)
1622 .await
1623 .map(|info| info.addresses)
1624 .unwrap_or_default();
1625
1626 let retry_data = data.clone();
1629
1630 let send_result = self.transport.send_message(peer_id, protocol, data).await;
1632 match send_result {
1633 Ok(()) => return Ok(()),
1634 Err(e) => {
1635 if !e.is_stale_channel_send_failure() {
1636 debug!(
1637 peer = %peer_id.to_hex(),
1638 error = %e,
1639 "send failed during active channel use, not reconnecting",
1640 );
1641 return Err(e);
1642 }
1643
1644 debug!(
1645 peer = %peer_id.to_hex(),
1646 error = %e,
1647 "stale channel send failed, attempting reconnect",
1648 );
1649 }
1650 }
1651
1652 let lock = self.reconnect_lock_for(peer_id);
1655 let _guard = lock.lock().await;
1656
1657 if self.transport.is_peer_connected(peer_id).await {
1659 for channel_id in &existing_channels {
1663 self.transport.disconnect_channel(channel_id).await;
1664 }
1665 return self
1666 .transport
1667 .send_message(peer_id, protocol, retry_data)
1668 .await;
1669 }
1670
1671 self.reconnect_and_send(
1672 peer_id,
1673 protocol,
1674 retry_data,
1675 addrs,
1676 &saved_addrs,
1677 &existing_channels,
1678 )
1679 .await
1680 }
1681
1682 async fn ensure_channel(
1697 &self,
1698 peer_id: &PeerId,
1699 addrs: &[MultiAddr],
1700 saved_addrs: &[MultiAddr],
1701 stale_channels: &[String],
1702 ) -> Result<()> {
1703 let (address, kind) = self
1705 .resolve_dial_address(peer_id, addrs, saved_addrs)
1706 .await
1707 .ok_or_else(|| {
1708 P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1709 })?;
1710
1711 if !stale_channels.is_empty() {
1717 for channel_id in stale_channels {
1718 self.transport.disconnect_channel(channel_id).await;
1719 }
1720 tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1721 }
1722
1723 let channel_id = self.transport.connect_peer_typed(&address, kind).await?;
1725 let authenticated = match self
1726 .transport
1727 .wait_for_peer_identity(&channel_id, IDENTITY_EXCHANGE_TIMEOUT)
1728 .await
1729 {
1730 Ok(peer) => peer,
1731 Err(e) => {
1732 self.transport.disconnect_channel(&channel_id).await;
1735 return Err(e);
1736 }
1737 };
1738
1739 if &authenticated != peer_id {
1740 self.transport.disconnect_channel(&channel_id).await;
1741 return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1742 expected: peer_id.to_hex().into(),
1743 actual: authenticated.to_hex().into(),
1744 }));
1745 }
1746
1747 Ok(())
1748 }
1749
1750 async fn reconnect_and_send(
1752 &self,
1753 peer_id: &PeerId,
1754 protocol: &str,
1755 data: Vec<u8>,
1756 addrs: &[MultiAddr],
1757 saved_addrs: &[MultiAddr],
1758 stale_channels: &[String],
1759 ) -> Result<()> {
1760 self.ensure_channel(peer_id, addrs, saved_addrs, stale_channels)
1761 .await?;
1762 self.transport.send_message(peer_id, protocol, data).await
1764 }
1765
1766 async fn resolve_dial_address(
1777 &self,
1778 peer_id: &PeerId,
1779 caller_addrs: &[MultiAddr],
1780 saved_addrs: &[MultiAddr],
1781 ) -> Option<(MultiAddr, AddressType)> {
1782 if let Some(addr) = Self::first_dialable(caller_addrs) {
1789 return Some((addr, AddressType::Unverified));
1790 }
1791 if let Some(addr) = Self::first_dialable(saved_addrs) {
1792 return Some((addr, AddressType::Unverified));
1793 }
1794
1795 self.adaptive_dht
1796 .peer_addresses_for_dial_typed(peer_id)
1797 .await
1798 .into_iter()
1799 .find(|(a, _)| {
1800 a.dialable_socket_addr()
1801 .is_some_and(|sa| !sa.ip().is_unspecified())
1802 })
1803 }
1804
1805 fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1808 addrs
1809 .iter()
1810 .find(|a| {
1811 let dialable = a
1812 .dialable_socket_addr()
1813 .is_some_and(|sa| !sa.ip().is_unspecified());
1814 if !dialable {
1815 trace!(address = %a, "skipping non-dialable address");
1816 }
1817 dialable
1818 })
1819 .cloned()
1820 }
1821
1822 fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1824 self.reconnect_locks
1825 .lock()
1826 .entry(*peer_id)
1827 .or_insert_with(|| Arc::new(TokioMutex::new(())))
1828 .clone()
1829 }
1830}
1831
1832fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1834 P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1835}
1836
1837pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1839 if let Err(e) = tx.send(event) {
1840 tracing::trace!("Event broadcast has no receivers: {e}");
1841 }
1842}
1843
1844pub(crate) struct ParsedMessage {
1850 pub(crate) event: P2PEvent,
1852 pub(crate) authenticated_node_id: Option<PeerId>,
1854 pub(crate) user_agent: String,
1856}
1857
1858pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1868 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1869 let transport_source = source.parse::<SocketAddr>().ok().map(MultiAddr::quic);
1870
1871 let authenticated_node_id = if !message.signature.is_empty() {
1873 match verify_message_signature(&message) {
1874 Ok(peer_id) => {
1875 debug!(
1876 "Message from {} authenticated as app-level NodeId {}",
1877 source, peer_id
1878 );
1879 Some(peer_id)
1880 }
1881 Err(e) => {
1882 warn!(
1883 "Rejecting message from {}: signature verification failed: {}",
1884 source, e
1885 );
1886 return None;
1887 }
1888 }
1889 } else {
1890 None
1891 };
1892
1893 debug!(
1894 "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1895 message.protocol,
1896 authenticated_node_id,
1897 source,
1898 message.from,
1899 message.data.len()
1900 );
1901
1902 Some(ParsedMessage {
1903 event: P2PEvent::Message {
1904 topic: message.protocol,
1905 source: authenticated_node_id,
1906 transport_source,
1907 timestamp: message.timestamp,
1908 data: message.data,
1909 },
1910 authenticated_node_id,
1911 user_agent: message.user_agent,
1912 })
1913}
1914
1915fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1922 let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1923 .map_err(|e| format!("invalid public key: {e:?}"))?;
1924
1925 let peer_id = peer_id_from_public_key(&pubkey);
1926
1927 if message.from != peer_id {
1929 return Err(format!(
1930 "from field mismatch: message claims '{}' but public key derives '{}'",
1931 message.from, peer_id
1932 ));
1933 }
1934
1935 let signable = postcard::to_stdvec(&(
1936 &message.protocol,
1937 &message.data as &[u8],
1938 &message.from,
1939 message.timestamp,
1940 &message.user_agent,
1941 ))
1942 .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1943
1944 let sig = MlDsaSignature::from_bytes(&message.signature)
1945 .map_err(|e| format!("invalid signature: {e:?}"))?;
1946
1947 let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1948 .map_err(|e| format!("verification error: {e}"))?;
1949
1950 if valid {
1951 Ok(peer_id)
1952 } else {
1953 Err("signature is invalid".to_string())
1954 }
1955}
1956
1957impl P2PNode {
1958 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1960 self.transport.subscribe_events()
1961 }
1962
1963 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1965 self.subscribe_events()
1966 }
1967
1968 pub fn uptime(&self) -> Duration {
1970 self.start_time.elapsed()
1971 }
1972
1973 pub async fn health_check(&self) -> Result<()> {
1986 let peer_count = self.peer_count().await;
1987 if peer_count > self.config.max_connections {
1988 Err(protocol_error(format!(
1989 "Too many connections: {peer_count}"
1990 )))
1991 } else {
1992 Ok(())
1993 }
1994 }
1995
1996 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1998 self.adaptive_dht.dht_manager()
1999 }
2000
2001 pub fn dht(&self) -> &Arc<DhtNetworkManager> {
2003 self.dht_manager()
2004 }
2005
2006 async fn connect_bootstrap_peers(
2013 &self,
2014 close_group_cache: Option<&CloseGroupCache>,
2015 ) -> Result<()> {
2016 let mut serial_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2021 let mut parallel_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2022 let mut seen_addresses = std::collections::HashSet::new();
2023
2024 if let Some(cache) = close_group_cache {
2030 let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
2031 sorted_peers.sort_by(|a, b| {
2032 let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
2036 Some(ord) => ord,
2037 None => {
2038 if a.trust.score.is_nan() {
2039 std::cmp::Ordering::Greater } else {
2041 std::cmp::Ordering::Less }
2043 }
2044 };
2045 score_ord.then_with(|| {
2046 let da = self.peer_id.xor_distance(&a.peer_id);
2047 let db = self.peer_id.xor_distance(&b.peer_id);
2048 da.cmp(&db)
2049 })
2050 });
2051
2052 let mut added_from_close_group = 0usize;
2053 for peer in &sorted_peers {
2054 let new_addresses: Vec<MultiAddr> = peer
2055 .addresses
2056 .iter()
2057 .filter(|a| {
2058 a.dialable_socket_addr()
2059 .is_some_and(|sa| !seen_addresses.contains(&sa))
2060 })
2061 .cloned()
2062 .collect();
2063
2064 if !new_addresses.is_empty() {
2065 for addr in &new_addresses {
2066 if let Some(sa) = addr.socket_addr() {
2067 seen_addresses.insert(sa);
2068 }
2069 }
2070 serial_addr_sets.push(new_addresses);
2071 added_from_close_group += 1;
2072 }
2073 }
2074 if added_from_close_group > 0 {
2075 info!(
2076 "Added {} close group cache peers (highest trust first)",
2077 added_from_close_group
2078 );
2079 }
2080 }
2081
2082 if !self.config.bootstrap_peers.is_empty() {
2084 info!(
2085 "Using {} configured bootstrap peers (priority)",
2086 self.config.bootstrap_peers.len()
2087 );
2088 for multiaddr in &self.config.bootstrap_peers {
2089 let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
2090 warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
2091 continue;
2092 };
2093 seen_addresses.insert(socket_addr);
2094 parallel_addr_sets.push(vec![multiaddr.clone()]);
2095 }
2096 }
2097
2098 if serial_addr_sets.is_empty() && parallel_addr_sets.is_empty() {
2099 info!("No bootstrap peers configured");
2100 return Ok(());
2101 }
2102
2103 let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
2106 let mut successful_connections = 0;
2107 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2108
2109 let client_mode = matches!(self.config.mode, NodeMode::Client);
2111 for addrs in &serial_addr_sets {
2112 if let Some(peer_id) = self.dial_bootstrap_addr_set(addrs, identity_timeout).await {
2113 successful_connections += 1;
2114 connected_peer_ids.push(peer_id);
2115 if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2116 debug!(
2117 "Client bootstrap target reached ({successful_connections} peers) — skipping remaining serial dials"
2118 );
2119 break;
2120 }
2121 }
2122 }
2123
2124 if !client_mode || successful_connections < CLIENT_BOOTSTRAP_TARGET {
2129 let mut parallel_stream =
2130 futures::stream::iter(parallel_addr_sets.into_iter().map(|addrs| async move {
2131 self.dial_bootstrap_addr_set(&addrs, identity_timeout).await
2132 }))
2133 .buffer_unordered(MAX_CONCURRENT_BOOTSTRAP_DIALS);
2134 while let Some(result) = parallel_stream.next().await {
2135 if let Some(peer_id) = result {
2136 successful_connections += 1;
2137 connected_peer_ids.push(peer_id);
2138 if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2139 debug!(
2140 "Client bootstrap target reached ({successful_connections} peers) — cancelling pending dials"
2141 );
2142 break;
2143 }
2144 }
2145 }
2146 }
2150
2151 if successful_connections == 0 {
2152 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2156 let transport_peers = self.transport.connected_peers().await;
2157 if !transport_peers.is_empty() {
2158 info!(
2159 "No outbound bootstrap succeeded, but {} inbound peer(s) connected — proceeding with DHT bootstrap",
2160 transport_peers.len()
2161 );
2162 connected_peer_ids = transport_peers;
2163 successful_connections = connected_peer_ids.len();
2164 } else {
2165 warn!("Failed to connect to any bootstrap peers");
2166 return Ok(());
2169 }
2170 }
2171
2172 info!(
2173 "Successfully connected to {} bootstrap peers",
2174 successful_connections
2175 );
2176
2177 match self
2179 .dht_manager()
2180 .bootstrap_from_peers(&connected_peer_ids)
2181 .await
2182 {
2183 Ok(count) => info!("DHT peer discovery found {} peers", count),
2184 Err(e) => warn!("DHT peer discovery failed: {}", e),
2185 }
2186
2187 if matches!(self.config.mode, NodeMode::Node) {
2197 const SELF_LOOKUP_ROUNDS: u8 = 2;
2198 for i in 1..=SELF_LOOKUP_ROUNDS {
2199 if let Err(e) = self.dht_manager().trigger_self_lookup().await {
2200 warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
2201 } else {
2202 debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
2203 }
2204 }
2205 } else {
2206 debug!("Skipping post-bootstrap self-lookups (client mode)");
2207 }
2208
2209 self.is_bootstrapped.store(true, Ordering::SeqCst);
2212 info!(
2213 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2214 successful_connections,
2215 connected_peer_ids.len()
2216 );
2217
2218 if let Some(ref dir) = self.config.close_group_cache_dir
2221 && let Err(e) = self.save_close_group_cache(dir).await
2222 {
2223 warn!("Failed to save close group cache after bootstrap: {e}");
2224 }
2225
2226 Ok(())
2227 }
2228
2229 async fn dial_bootstrap_addr_set(
2234 &self,
2235 addrs: &[MultiAddr],
2236 identity_timeout: Duration,
2237 ) -> Option<PeerId> {
2238 for addr in addrs {
2239 match self
2244 .transport
2245 .connect_peer_typed(addr, AddressType::Unverified)
2246 .await
2247 {
2248 Ok(channel_id) => match self
2249 .transport
2250 .wait_for_peer_identity(&channel_id, identity_timeout)
2251 .await
2252 {
2253 Ok(real_peer_id) => return Some(real_peer_id),
2254 Err(e) => {
2255 warn!(
2256 "Timeout waiting for identity from bootstrap peer {}: {}, \
2257 closing channel {}",
2258 addr, e, channel_id
2259 );
2260 self.disconnect_channel(&channel_id).await;
2261 }
2262 },
2263 Err(e) => {
2264 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2265 }
2266 }
2267 }
2268 None
2269 }
2270
2271 async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
2273 let key: crate::dht::Key = *self.peer_id.as_bytes();
2274 let k_value = self.config.dht_config.k_value;
2275 let close_group = self
2276 .dht_manager()
2277 .find_closest_nodes_local(&key, k_value)
2278 .await;
2279
2280 if close_group.is_empty() {
2281 debug!("No close group peers to save");
2282 return Ok(());
2283 }
2284
2285 let trust_engine = self.adaptive_dht.trust_engine();
2286 let now_epoch = SystemTime::now()
2287 .duration_since(UNIX_EPOCH)
2288 .map(|d| d.as_secs())
2289 .unwrap_or(0);
2290
2291 let peers: Vec<CachedCloseGroupPeer> = close_group
2292 .into_iter()
2293 .filter_map(|dht_node| {
2294 let score = trust_engine.score(&dht_node.peer_id);
2295 if !score.is_finite() {
2298 return None;
2299 }
2300 Some(CachedCloseGroupPeer {
2301 peer_id: dht_node.peer_id,
2302 addresses: dht_node.addresses,
2303 trust: TrustRecord {
2304 score,
2305 last_updated_epoch_secs: now_epoch,
2306 },
2307 })
2308 })
2309 .collect();
2310
2311 let peer_count = peers.len();
2312 let cache = CloseGroupCache {
2313 peers,
2314 saved_at_epoch_secs: now_epoch,
2315 };
2316
2317 cache.save_to_dir(dir).await?;
2318 info!(
2319 "Saved {} close group peers to cache in {}",
2320 peer_count,
2321 dir.display()
2322 );
2323 Ok(())
2324 }
2325
2326 }
2328
2329#[async_trait::async_trait]
2331#[allow(dead_code)]
2332pub trait NetworkSender: Send + Sync {
2333 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2335
2336 fn local_peer_id(&self) -> PeerId;
2338}
2339
2340pub(crate) fn register_new_channel(
2350 peers: &DashMap<String, PeerInfo>,
2351 channel_id: &str,
2352 remote_addr: &MultiAddr,
2353) {
2354 let peer_info = PeerInfo {
2355 channel_id: channel_id.to_owned(),
2356 addresses: vec![remote_addr.clone()],
2357 connected_at: tokio::time::Instant::now(),
2358 last_seen: tokio::time::Instant::now(),
2359 status: ConnectionStatus::Connected,
2360 protocols: vec!["p2p-core/1.0.0".to_string()],
2361 heartbeat_count: 0,
2362 };
2363 peers.insert(channel_id.to_owned(), peer_info);
2364}
2365
2366#[cfg(test)]
2367mod tests {
2368 use super::*;
2369 use std::time::Duration;
2371 use tokio::time::timeout;
2372
2373 const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
2375
2376 fn create_test_node_config() -> NodeConfig {
2382 NodeConfig {
2383 local: true,
2384 port: 0,
2385 ipv6: true,
2386 bootstrap_peers: vec![],
2387 connection_timeout: Duration::from_secs(2),
2388 max_connections: 100,
2389 dht_config: DHTConfig::default(),
2390 diversity_config: None,
2391 max_message_size: None,
2392 node_identity: None,
2393 mode: NodeMode::default(),
2394 custom_user_agent: None,
2395 allow_loopback: true,
2396 adaptive_dht_config: AdaptiveDhtConfig::default(),
2397 close_group_cache_dir: None,
2398 }
2399 }
2400
2401 #[tokio::test]
2405 async fn test_node_config_default() {
2406 let config = NodeConfig::default();
2407
2408 assert_eq!(config.listen_addrs().len(), 2); assert_eq!(config.max_connections, 10000);
2410 assert_eq!(config.connection_timeout, Duration::from_secs(25));
2411 }
2412
2413 #[tokio::test]
2414 async fn test_dht_config_default() {
2415 let config = DHTConfig::default();
2416
2417 assert_eq!(config.k_value, 20);
2418 assert_eq!(config.alpha_value, 3);
2419 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2420 }
2421
2422 #[test]
2423 fn test_connection_status_variants() {
2424 let connecting = ConnectionStatus::Connecting;
2425 let connected = ConnectionStatus::Connected;
2426 let disconnecting = ConnectionStatus::Disconnecting;
2427 let disconnected = ConnectionStatus::Disconnected;
2428 let failed = ConnectionStatus::Failed("test error".to_string());
2429
2430 assert_eq!(connecting, ConnectionStatus::Connecting);
2431 assert_eq!(connected, ConnectionStatus::Connected);
2432 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2433 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2434 assert_ne!(connecting, connected);
2435
2436 if let ConnectionStatus::Failed(msg) = failed {
2437 assert_eq!(msg, "test error");
2438 } else {
2439 panic!("Expected Failed status");
2440 }
2441 }
2442
2443 #[tokio::test]
2444 async fn test_node_creation() -> Result<()> {
2445 let config = create_test_node_config();
2446 let node = P2PNode::new(config).await?;
2447
2448 assert_eq!(node.peer_id().to_hex().len(), 64);
2450 assert!(!node.is_running());
2451 assert_eq!(node.peer_count().await, 0);
2452 assert!(node.connected_peers().await.is_empty());
2453
2454 Ok(())
2455 }
2456
2457 #[tokio::test]
2458 async fn test_node_lifecycle() -> Result<()> {
2459 let config = create_test_node_config();
2460 let node = P2PNode::new(config).await?;
2461
2462 assert!(!node.is_running());
2464
2465 node.start().await?;
2467 assert!(node.is_running());
2468
2469 let listen_addrs = node.listen_addrs().await;
2471 assert!(
2472 !listen_addrs.is_empty(),
2473 "Expected at least one listening address"
2474 );
2475
2476 node.stop().await?;
2478 assert!(!node.is_running());
2479
2480 Ok(())
2481 }
2482
2483 #[tokio::test]
2484 async fn test_peer_connection() -> Result<()> {
2485 let config1 = create_test_node_config();
2486 let config2 = create_test_node_config();
2487
2488 let node1 = P2PNode::new(config1).await?;
2489 let node2 = P2PNode::new(config2).await?;
2490
2491 node1.start().await?;
2492 node2.start().await?;
2493
2494 let node2_addr = node2
2495 .listen_addrs()
2496 .await
2497 .into_iter()
2498 .find(|a| a.is_ipv4())
2499 .ok_or_else(|| {
2500 P2PError::Network(crate::error::NetworkError::InvalidAddress(
2501 "Node 2 did not expose an IPv4 listen address".into(),
2502 ))
2503 })?;
2504
2505 let channel_id = node1.connect_peer(&node2_addr).await?;
2508
2509 assert!(node1.is_connection_active(&channel_id).await);
2512
2513 let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2515 assert!(peer_info.is_some());
2516 let info = peer_info.expect("Peer info should exist after connect");
2517 assert_eq!(info.channel_id, channel_id);
2518 assert_eq!(info.status, ConnectionStatus::Connected);
2519 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2520
2521 node1.remove_channel(&channel_id).await;
2523 assert!(!node1.is_connection_active(&channel_id).await);
2524
2525 node1.stop().await?;
2526 node2.stop().await?;
2527
2528 Ok(())
2529 }
2530
2531 #[tokio::test]
2532 async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2533 let config = create_test_node_config();
2534 let node = P2PNode::new(config).await?;
2535
2536 let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2537 let result = node.connect_peer(&tcp_addr).await;
2538
2539 assert!(
2540 matches!(
2541 result,
2542 Err(P2PError::Network(
2543 crate::error::NetworkError::InvalidAddress(_)
2544 ))
2545 ),
2546 "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2547 result
2548 );
2549
2550 Ok(())
2551 }
2552
2553 #[cfg_attr(target_os = "windows", ignore)]
2560 #[tokio::test]
2561 async fn test_event_subscription() -> Result<()> {
2562 let identity1 =
2566 Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2567 let identity2 =
2568 Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2569
2570 let mut config1 = create_test_node_config();
2571 config1.ipv6 = false;
2572 config1.node_identity = Some(identity1);
2573
2574 let node2_peer_id = *identity2.peer_id();
2575 let mut config2 = create_test_node_config();
2576 config2.ipv6 = false;
2577 config2.node_identity = Some(identity2);
2578
2579 let node1 = P2PNode::new(config1).await?;
2580 let node2 = P2PNode::new(config2).await?;
2581
2582 node1.start().await?;
2583 node2.start().await?;
2584
2585 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2586
2587 let mut events = node2.subscribe_events();
2589
2590 let node2_addr = node2.local_addr().ok_or_else(|| {
2591 P2PError::Network(crate::error::NetworkError::ProtocolError(
2592 "No listening address".to_string().into(),
2593 ))
2594 })?;
2595
2596 let mut channel_id = None;
2598 for attempt in 0..3 {
2599 if attempt > 0 {
2600 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2601 }
2602 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2603 Ok(Ok(id)) => {
2604 channel_id = Some(id);
2605 break;
2606 }
2607 Ok(Err(_)) | Err(_) => continue,
2608 }
2609 }
2610 let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2611
2612 let target_peer_id = node1
2614 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2615 .await?;
2616 assert_eq!(target_peer_id, node2_peer_id);
2617
2618 node1
2620 .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2621 .await?;
2622
2623 let event = timeout(Duration::from_secs(2), async {
2625 loop {
2626 match events.recv().await {
2627 Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2628 Ok(P2PEvent::Message { .. }) => continue, Ok(_) => continue,
2630 Err(e) => return Err(e),
2631 }
2632 }
2633 })
2634 .await;
2635 assert!(event.is_ok(), "Should receive PeerConnected event");
2636 let connected_peer_id = event.expect("Timed out").expect("Channel error");
2637 assert!(
2639 connected_peer_id.0.iter().any(|&b| b != 0),
2640 "PeerConnected should carry a non-zero peer ID"
2641 );
2642
2643 node1.stop().await?;
2644 node2.stop().await?;
2645
2646 Ok(())
2647 }
2648
2649 #[cfg_attr(target_os = "windows", ignore)]
2651 #[tokio::test]
2652 async fn test_message_sending() -> Result<()> {
2653 let mut config1 = create_test_node_config();
2655 config1.ipv6 = false;
2656 let node1 = P2PNode::new(config1).await?;
2657 node1.start().await?;
2658
2659 let mut config2 = create_test_node_config();
2660 config2.ipv6 = false;
2661 let node2 = P2PNode::new(config2).await?;
2662 node2.start().await?;
2663
2664 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2666
2667 let node2_addr = node2.local_addr().ok_or_else(|| {
2669 P2PError::Network(crate::error::NetworkError::ProtocolError(
2670 "No listening address".to_string().into(),
2671 ))
2672 })?;
2673
2674 let channel_id =
2676 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2677 Ok(res) => res?,
2678 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2679 };
2680
2681 let target_peer_id = node1
2683 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2684 .await?;
2685 assert_eq!(target_peer_id, node2.peer_id().clone());
2686
2687 let message_data = b"Hello, peer!".to_vec();
2689 let result = match timeout(
2690 Duration::from_millis(500),
2691 node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2692 )
2693 .await
2694 {
2695 Ok(res) => res,
2696 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2697 };
2698 if let Err(e) = &result {
2701 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2702 }
2703
2704 let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2706 let result = node1
2707 .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2708 .await;
2709 assert!(result.is_err(), "Sending to non-existent peer should fail");
2710
2711 node1.stop().await?;
2712 node2.stop().await?;
2713
2714 Ok(())
2715 }
2716
2717 #[tokio::test]
2718 async fn test_remote_mcp_operations() -> Result<()> {
2719 let config = create_test_node_config();
2720 let node = P2PNode::new(config).await?;
2721
2722 node.start().await?;
2724 node.stop().await?;
2725 Ok(())
2726 }
2727
2728 #[tokio::test]
2729 async fn test_health_check() -> Result<()> {
2730 let config = create_test_node_config();
2731 let node = P2PNode::new(config).await?;
2732
2733 let result = node.health_check().await;
2735 assert!(result.is_ok());
2736
2737 Ok(())
2742 }
2743
2744 #[tokio::test]
2745 async fn test_node_uptime() -> Result<()> {
2746 let config = create_test_node_config();
2747 let node = P2PNode::new(config).await?;
2748
2749 let uptime1 = node.uptime();
2750 assert!(uptime1 >= Duration::from_secs(0));
2751
2752 tokio::time::sleep(Duration::from_millis(10)).await;
2754
2755 let uptime2 = node.uptime();
2756 assert!(uptime2 > uptime1);
2757
2758 Ok(())
2759 }
2760
2761 #[tokio::test]
2762 async fn test_node_config_access() -> Result<()> {
2763 let config = create_test_node_config();
2764 let node = P2PNode::new(config).await?;
2765
2766 let node_config = node.config();
2767 assert_eq!(node_config.max_connections, 100);
2768 Ok(())
2771 }
2772
2773 #[tokio::test]
2774 async fn test_mcp_server_access() -> Result<()> {
2775 let config = create_test_node_config();
2776 let _node = P2PNode::new(config).await?;
2777
2778 Ok(())
2780 }
2781
2782 #[tokio::test]
2783 async fn test_dht_access() -> Result<()> {
2784 let config = create_test_node_config();
2785 let node = P2PNode::new(config).await?;
2786
2787 let _dht = node.dht();
2789
2790 Ok(())
2791 }
2792
2793 #[tokio::test]
2794 async fn test_node_config_builder() -> Result<()> {
2795 let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2796
2797 let config = NodeConfig::builder()
2798 .local(true)
2799 .ipv6(true)
2800 .bootstrap_peer(bootstrap)
2801 .connection_timeout(Duration::from_secs(15))
2802 .max_connections(200)
2803 .max_message_size(TEST_MAX_MESSAGE_SIZE)
2804 .build()?;
2805
2806 assert_eq!(config.listen_addrs().len(), 2); assert!(config.local);
2808 assert!(config.ipv6);
2809 assert_eq!(config.bootstrap_peers.len(), 1);
2810 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2811 assert_eq!(config.max_connections, 200);
2812 assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2813 assert!(config.allow_loopback); Ok(())
2816 }
2817
2818 #[tokio::test]
2819 async fn test_bootstrap_peers() -> Result<()> {
2820 let mut config = create_test_node_config();
2821 config.bootstrap_peers = vec![
2822 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2823 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2824 ];
2825
2826 let node = P2PNode::new(config).await?;
2827
2828 node.start().await?;
2830
2831 let _peer_count = node.peer_count().await;
2835
2836 node.stop().await?;
2837 Ok(())
2838 }
2839
2840 #[tokio::test]
2841 async fn test_peer_info_structure() {
2842 let peer_info = PeerInfo {
2843 channel_id: "test_peer".to_string(),
2844 addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2845 connected_at: Instant::now(),
2846 last_seen: Instant::now(),
2847 status: ConnectionStatus::Connected,
2848 protocols: vec!["test-protocol".to_string()],
2849 heartbeat_count: 0,
2850 };
2851
2852 assert_eq!(peer_info.channel_id, "test_peer");
2853 assert_eq!(peer_info.addresses.len(), 1);
2854 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2855 assert_eq!(peer_info.protocols.len(), 1);
2856 }
2857
2858 #[tokio::test]
2859 async fn test_serialization() -> Result<()> {
2860 let config = create_test_node_config();
2862 let serialized = serde_json::to_string(&config)?;
2863 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2864
2865 assert_eq!(config.local, deserialized.local);
2866 assert_eq!(config.port, deserialized.port);
2867 assert_eq!(config.ipv6, deserialized.ipv6);
2868 assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2869
2870 Ok(())
2871 }
2872
2873 #[tokio::test]
2874 async fn test_get_channel_id_by_address_found() -> Result<()> {
2875 let config = create_test_node_config();
2876 let node = P2PNode::new(config).await?;
2877
2878 let test_channel_id = "peer_test_123".to_string();
2880 let test_address = "192.168.1.100:9000";
2881 let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2882
2883 let peer_info = PeerInfo {
2884 channel_id: test_channel_id.clone(),
2885 addresses: vec![test_multiaddr],
2886 connected_at: Instant::now(),
2887 last_seen: Instant::now(),
2888 status: ConnectionStatus::Connected,
2889 protocols: vec!["test-protocol".to_string()],
2890 heartbeat_count: 0,
2891 };
2892
2893 node.transport
2894 .inject_peer(test_channel_id.clone(), peer_info)
2895 .await;
2896
2897 let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2899 let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2900 assert_eq!(found_channel_id, Some(test_channel_id));
2901
2902 Ok(())
2903 }
2904
2905 #[tokio::test]
2906 async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2907 let config = create_test_node_config();
2908 let node = P2PNode::new(config).await?;
2909
2910 let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2912 let result = node.get_channel_id_by_address(&unknown_addr).await;
2913 assert_eq!(result, None);
2914
2915 Ok(())
2916 }
2917
2918 #[tokio::test]
2919 async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
2920 let config = create_test_node_config();
2921 let node = P2PNode::new(config).await?;
2922
2923 let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
2925 mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
2926 psm: 0x0025,
2927 });
2928 let result = node.get_channel_id_by_address(&ble_addr).await;
2929 assert_eq!(result, None);
2930
2931 Ok(())
2932 }
2933
2934 #[tokio::test]
2935 async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
2936 let config = create_test_node_config();
2937 let node = P2PNode::new(config).await?;
2938
2939 let peer1_id = "peer_1".to_string();
2941 let peer1_addr_str = "192.168.1.101:9001";
2942 let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
2943
2944 let peer2_id = "peer_2".to_string();
2945 let peer2_addr_str = "192.168.1.102:9002";
2946 let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
2947
2948 let peer1_info = PeerInfo {
2949 channel_id: peer1_id.clone(),
2950 addresses: vec![peer1_multiaddr],
2951 connected_at: Instant::now(),
2952 last_seen: Instant::now(),
2953 status: ConnectionStatus::Connected,
2954 protocols: vec!["test-protocol".to_string()],
2955 heartbeat_count: 0,
2956 };
2957
2958 let peer2_info = PeerInfo {
2959 channel_id: peer2_id.clone(),
2960 addresses: vec![peer2_multiaddr],
2961 connected_at: Instant::now(),
2962 last_seen: Instant::now(),
2963 status: ConnectionStatus::Connected,
2964 protocols: vec!["test-protocol".to_string()],
2965 heartbeat_count: 0,
2966 };
2967
2968 node.transport
2969 .inject_peer(peer1_id.clone(), peer1_info)
2970 .await;
2971 node.transport
2972 .inject_peer(peer2_id.clone(), peer2_info)
2973 .await;
2974
2975 let found_peer1 = node
2977 .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
2978 .await;
2979 let found_peer2 = node
2980 .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
2981 .await;
2982
2983 assert_eq!(found_peer1, Some(peer1_id));
2984 assert_eq!(found_peer2, Some(peer2_id));
2985
2986 Ok(())
2987 }
2988
2989 #[tokio::test]
2990 async fn test_list_active_connections_empty() -> Result<()> {
2991 let config = create_test_node_config();
2992 let node = P2PNode::new(config).await?;
2993
2994 let connections = node.list_active_connections().await;
2996 assert!(connections.is_empty());
2997
2998 Ok(())
2999 }
3000
3001 #[tokio::test]
3002 async fn test_list_active_connections_with_peers() -> Result<()> {
3003 let config = create_test_node_config();
3004 let node = P2PNode::new(config).await?;
3005
3006 let peer1_id = "peer_1".to_string();
3008 let peer1_addrs = vec![
3009 MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
3010 MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
3011 ];
3012
3013 let peer2_id = "peer_2".to_string();
3014 let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
3015
3016 let peer1_info = PeerInfo {
3017 channel_id: peer1_id.clone(),
3018 addresses: peer1_addrs.clone(),
3019 connected_at: Instant::now(),
3020 last_seen: Instant::now(),
3021 status: ConnectionStatus::Connected,
3022 protocols: vec!["test-protocol".to_string()],
3023 heartbeat_count: 0,
3024 };
3025
3026 let peer2_info = PeerInfo {
3027 channel_id: peer2_id.clone(),
3028 addresses: peer2_addrs.clone(),
3029 connected_at: Instant::now(),
3030 last_seen: Instant::now(),
3031 status: ConnectionStatus::Connected,
3032 protocols: vec!["test-protocol".to_string()],
3033 heartbeat_count: 0,
3034 };
3035
3036 node.transport
3037 .inject_peer(peer1_id.clone(), peer1_info)
3038 .await;
3039 node.transport
3040 .inject_peer(peer2_id.clone(), peer2_info)
3041 .await;
3042
3043 node.transport
3045 .inject_active_connection(peer1_id.clone())
3046 .await;
3047 node.transport
3048 .inject_active_connection(peer2_id.clone())
3049 .await;
3050
3051 let connections = node.list_active_connections().await;
3053 assert_eq!(connections.len(), 2);
3054
3055 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3057 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3058
3059 assert!(peer1_conn.is_some());
3060 assert!(peer2_conn.is_some());
3061
3062 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3064 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3065
3066 Ok(())
3067 }
3068
3069 #[tokio::test]
3070 async fn test_remove_channel_success() -> Result<()> {
3071 let config = create_test_node_config();
3072 let node = P2PNode::new(config).await?;
3073
3074 let channel_id = "peer_to_remove".to_string();
3076 let channel_peer_id = PeerId::from_name(&channel_id);
3077 let peer_info = PeerInfo {
3078 channel_id: channel_id.clone(),
3079 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3080 connected_at: Instant::now(),
3081 last_seen: Instant::now(),
3082 status: ConnectionStatus::Connected,
3083 protocols: vec!["test-protocol".to_string()],
3084 heartbeat_count: 0,
3085 };
3086
3087 node.transport
3088 .inject_peer(channel_id.clone(), peer_info)
3089 .await;
3090 node.transport
3091 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3092 .await;
3093
3094 assert!(node.is_peer_connected(&channel_peer_id).await);
3096
3097 let removed = node.remove_channel(&channel_id).await;
3099 assert!(removed);
3100
3101 assert!(!node.is_peer_connected(&channel_peer_id).await);
3103
3104 Ok(())
3105 }
3106
3107 #[tokio::test]
3108 async fn test_remove_channel_nonexistent() -> Result<()> {
3109 let config = create_test_node_config();
3110 let node = P2PNode::new(config).await?;
3111
3112 let removed = node.remove_channel("nonexistent_peer").await;
3114 assert!(!removed);
3115
3116 Ok(())
3117 }
3118
3119 #[tokio::test]
3120 async fn test_is_peer_connected() -> Result<()> {
3121 let config = create_test_node_config();
3122 let node = P2PNode::new(config).await?;
3123
3124 let channel_id = "test_peer".to_string();
3125 let channel_peer_id = PeerId::from_name(&channel_id);
3126
3127 assert!(!node.is_peer_connected(&channel_peer_id).await);
3129
3130 let peer_info = PeerInfo {
3132 channel_id: channel_id.clone(),
3133 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3134 connected_at: Instant::now(),
3135 last_seen: Instant::now(),
3136 status: ConnectionStatus::Connected,
3137 protocols: vec!["test-protocol".to_string()],
3138 heartbeat_count: 0,
3139 };
3140
3141 node.transport
3142 .inject_peer(channel_id.clone(), peer_info)
3143 .await;
3144 node.transport
3145 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3146 .await;
3147
3148 assert!(node.is_peer_connected(&channel_peer_id).await);
3150
3151 node.remove_channel(&channel_id).await;
3153
3154 assert!(!node.is_peer_connected(&channel_peer_id).await);
3156
3157 Ok(())
3158 }
3159
3160 #[test]
3161 fn test_normalize_ipv6_wildcard() {
3162 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3163
3164 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3165 let normalized = normalize_wildcard_to_loopback(wildcard);
3166
3167 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3168 assert_eq!(normalized.port(), 8080);
3169 }
3170
3171 #[test]
3172 fn test_normalize_ipv4_wildcard() {
3173 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3174
3175 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3176 let normalized = normalize_wildcard_to_loopback(wildcard);
3177
3178 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3179 assert_eq!(normalized.port(), 9000);
3180 }
3181
3182 #[test]
3183 fn test_normalize_specific_address_unchanged() {
3184 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3185 let normalized = normalize_wildcard_to_loopback(specific);
3186
3187 assert_eq!(normalized, specific);
3188 }
3189
3190 #[test]
3191 fn test_normalize_loopback_unchanged() {
3192 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3193
3194 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3195 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3196 assert_eq!(normalized_v6, loopback_v6);
3197
3198 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3199 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3200 assert_eq!(normalized_v4, loopback_v4);
3201 }
3202
3203 fn current_timestamp() -> u64 {
3207 std::time::SystemTime::now()
3208 .duration_since(std::time::UNIX_EPOCH)
3209 .map(|d| d.as_secs())
3210 .unwrap_or(0)
3211 }
3212
3213 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3215 let msg = WireMessage {
3216 protocol: protocol.to_string(),
3217 data,
3218 from: PeerId::from_name(from),
3219 timestamp,
3220 user_agent: String::new(),
3221 public_key: Vec::new(),
3222 signature: Vec::new(),
3223 };
3224 postcard::to_stdvec(&msg).unwrap()
3225 }
3226
3227 fn make_signed_wire_bytes(
3229 identity: &NodeIdentity,
3230 protocol: &str,
3231 data: Vec<u8>,
3232 timestamp: u64,
3233 ) -> Vec<u8> {
3234 let from = *identity.peer_id();
3235 let user_agent = "test/1.0";
3236 let signable =
3237 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3238 .unwrap();
3239 let sig = identity.sign(&signable).expect("signing should succeed");
3240 let msg = WireMessage {
3241 protocol: protocol.to_string(),
3242 data,
3243 from,
3244 timestamp,
3245 user_agent: user_agent.to_string(),
3246 public_key: identity.public_key().as_bytes().to_vec(),
3247 signature: sig.as_bytes().to_vec(),
3248 };
3249 postcard::to_stdvec(&msg).unwrap()
3250 }
3251
3252 #[test]
3253 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3254 let transport_id = "abcdef0123456789";
3257 let logical_id = "spoofed-logical-id";
3258 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
3259
3260 let parsed =
3261 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3262
3263 assert!(parsed.authenticated_node_id.is_none());
3265
3266 match parsed.event {
3267 P2PEvent::Message {
3268 topic,
3269 source,
3270 transport_source,
3271 timestamp: _,
3272 data,
3273 } => {
3274 assert!(source.is_none(), "unsigned message source must be None");
3275 assert!(
3276 transport_source.is_none(),
3277 "non-socket transport source should not produce an IP transport address"
3278 );
3279 assert_eq!(topic, "test/v1");
3280 assert_eq!(data, vec![1u8, 2, 3]);
3281 }
3282 other => panic!("expected P2PEvent::Message, got {:?}", other),
3283 }
3284 }
3285
3286 #[test]
3287 fn test_parse_protocol_message_rejects_invalid_bytes() {
3288 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3290 }
3291
3292 #[test]
3293 fn test_parse_protocol_message_rejects_truncated_message() {
3294 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
3296 let truncated = &full_bytes[..full_bytes.len() / 2];
3297 assert!(parse_protocol_message(truncated, "peer-id").is_none());
3298 }
3299
3300 #[test]
3301 fn test_parse_protocol_message_empty_payload() {
3302 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
3303
3304 let parsed = parse_protocol_message(&bytes, "transport-peer")
3305 .expect("valid message with empty data should parse");
3306
3307 match parsed.event {
3308 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3309 other => panic!("expected P2PEvent::Message, got {:?}", other),
3310 }
3311 }
3312
3313 #[test]
3314 fn test_parse_protocol_message_records_ip_transport_source() {
3315 let bytes = make_wire_bytes("ping", vec![1], "sender", current_timestamp());
3316
3317 let parsed =
3318 parse_protocol_message(&bytes, "192.168.1.2:4567").expect("valid message should parse");
3319
3320 match parsed.event {
3321 P2PEvent::Message {
3322 transport_source, ..
3323 } => {
3324 assert_eq!(
3325 transport_source,
3326 Some(MultiAddr::quic("192.168.1.2:4567".parse().unwrap()))
3327 );
3328 }
3329 other => panic!("expected P2PEvent::Message, got {:?}", other),
3330 }
3331 }
3332
3333 #[test]
3334 fn test_parse_protocol_message_preserves_binary_payload() {
3335 let payload: Vec<u8> = (0..=255).collect();
3337 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
3338
3339 let parsed = parse_protocol_message(&bytes, "peer-id")
3340 .expect("valid message with full byte range should parse");
3341
3342 match parsed.event {
3343 P2PEvent::Message { data, topic, .. } => {
3344 assert_eq!(topic, "binary/v1");
3345 assert_eq!(
3346 data, payload,
3347 "payload must survive bincode round-trip exactly"
3348 );
3349 }
3350 other => panic!("expected P2PEvent::Message, got {:?}", other),
3351 }
3352 }
3353
3354 #[test]
3355 fn test_parse_signed_message_verifies_and_uses_node_id() {
3356 let identity = NodeIdentity::generate().expect("should generate identity");
3357 let protocol = "test/signed";
3358 let data: Vec<u8> = vec![10, 20, 30];
3359 let from = *identity.peer_id();
3361 let timestamp = current_timestamp();
3362 let user_agent = "test/1.0";
3363
3364 let signable =
3366 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3367 .unwrap();
3368 let sig = identity.sign(&signable).expect("signing should succeed");
3369
3370 let msg = WireMessage {
3371 protocol: protocol.to_string(),
3372 data: data.clone(),
3373 from,
3374 timestamp,
3375 user_agent: user_agent.to_string(),
3376 public_key: identity.public_key().as_bytes().to_vec(),
3377 signature: sig.as_bytes().to_vec(),
3378 };
3379 let bytes = postcard::to_stdvec(&msg).unwrap();
3380
3381 let parsed =
3382 parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
3383
3384 let expected_peer_id = *identity.peer_id();
3385 assert_eq!(
3386 parsed.authenticated_node_id.as_ref(),
3387 Some(&expected_peer_id)
3388 );
3389
3390 match parsed.event {
3391 P2PEvent::Message { source, .. } => {
3392 assert_eq!(
3393 source.as_ref(),
3394 Some(&expected_peer_id),
3395 "source should be the verified PeerId"
3396 );
3397 }
3398 other => panic!("expected P2PEvent::Message, got {:?}", other),
3399 }
3400 }
3401
3402 #[test]
3403 fn test_parse_message_with_bad_signature_is_rejected() {
3404 let identity = NodeIdentity::generate().expect("should generate identity");
3405 let protocol = "test/bad-sig";
3406 let data: Vec<u8> = vec![1, 2, 3];
3407 let from = *identity.peer_id();
3408 let timestamp = current_timestamp();
3409 let user_agent = "test/1.0";
3410
3411 let signable =
3413 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3414 .unwrap();
3415 let sig = identity.sign(&signable).expect("signing should succeed");
3416
3417 let msg = WireMessage {
3419 protocol: protocol.to_string(),
3420 data: vec![99, 99, 99],
3421 from,
3422 timestamp,
3423 user_agent: user_agent.to_string(),
3424 public_key: identity.public_key().as_bytes().to_vec(),
3425 signature: sig.as_bytes().to_vec(),
3426 };
3427 let bytes = postcard::to_stdvec(&msg).unwrap();
3428
3429 assert!(
3430 parse_protocol_message(&bytes, "transport-xyz").is_none(),
3431 "message with bad signature should be rejected"
3432 );
3433 }
3434
3435 #[test]
3436 fn test_parse_message_with_mismatched_from_is_rejected() {
3437 let identity = NodeIdentity::generate().expect("should generate identity");
3438 let protocol = "test/from-mismatch";
3439 let data: Vec<u8> = vec![1, 2, 3];
3440 let fake_from = PeerId::from_bytes([0xDE; 32]);
3442 let timestamp = current_timestamp();
3443 let user_agent = "test/1.0";
3444
3445 let signable =
3446 postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
3447 .unwrap();
3448 let sig = identity.sign(&signable).expect("signing should succeed");
3449
3450 let msg = WireMessage {
3451 protocol: protocol.to_string(),
3452 data,
3453 from: fake_from,
3454 timestamp,
3455 user_agent: user_agent.to_string(),
3456 public_key: identity.public_key().as_bytes().to_vec(),
3457 signature: sig.as_bytes().to_vec(),
3458 };
3459 let bytes = postcard::to_stdvec(&msg).unwrap();
3460
3461 assert!(
3462 parse_protocol_message(&bytes, "transport-xyz").is_none(),
3463 "message with mismatched from field should be rejected"
3464 );
3465 }
3466
3467 #[test]
3468 fn test_parse_protocol_message_accepts_arbitrary_timestamps() {
3469 let payload = vec![1, 2, 3];
3473
3474 let old_ts = current_timestamp().saturating_sub(36_000);
3476 let old_bytes = make_wire_bytes("test/old", payload.clone(), "sender", old_ts);
3477 assert!(
3478 parse_protocol_message(&old_bytes, "peer-id").is_some(),
3479 "should accept unsigned message with timestamp 10h in the past"
3480 );
3481
3482 let future_ts = current_timestamp().saturating_add(36_000);
3484 let future_bytes = make_wire_bytes("test/future", payload.clone(), "sender", future_ts);
3485 assert!(
3486 parse_protocol_message(&future_bytes, "peer-id").is_some(),
3487 "should accept unsigned message with timestamp 10h in the future"
3488 );
3489
3490 let identity = NodeIdentity::generate().expect("should generate identity");
3493 let signed_old =
3494 make_signed_wire_bytes(&identity, "test/signed-old", payload.clone(), old_ts);
3495 assert!(
3496 parse_protocol_message(&signed_old, "transport-xyz").is_some(),
3497 "should accept signed message with timestamp 10h in the past"
3498 );
3499
3500 let signed_future =
3501 make_signed_wire_bytes(&identity, "test/signed-future", payload, future_ts);
3502 assert!(
3503 parse_protocol_message(&signed_future, "transport-xyz").is_some(),
3504 "should accept signed message with timestamp 10h in the future"
3505 );
3506 }
3507
3508 #[test]
3509 fn test_parse_protocol_message_exposes_timestamp_on_event() {
3510 let ts: u64 = 1_234_567_890;
3514 let bytes = make_wire_bytes("test/ts", vec![9, 9, 9], "sender", ts);
3515 let parsed = parse_protocol_message(&bytes, "peer-id").expect("valid message should parse");
3516 match parsed.event {
3517 P2PEvent::Message { timestamp, .. } => {
3518 assert_eq!(timestamp, ts, "P2PEvent::Message.timestamp must round-trip");
3519 }
3520 other => panic!("expected P2PEvent::Message, got {:?}", other),
3521 }
3522 }
3523
3524 #[test]
3525 fn test_signed_message_timestamp_is_signature_covered() {
3526 let identity = NodeIdentity::generate().expect("should generate identity");
3530 let ts: u64 = 1_700_000_000;
3531 let signed = make_signed_wire_bytes(&identity, "test/sig", vec![1, 2, 3], ts);
3532
3533 let parsed = parse_protocol_message(&signed, "transport-xyz")
3535 .expect("unmodified signed message should parse");
3536 assert!(parsed.authenticated_node_id.is_some());
3537
3538 let mut tampered: WireMessage =
3540 postcard::from_bytes(&signed).expect("signed bytes must deserialize");
3541 tampered.timestamp = ts.wrapping_add(1);
3542 let tampered_bytes = postcard::to_stdvec(&tampered).expect("re-serialize");
3543
3544 assert!(
3545 parse_protocol_message(&tampered_bytes, "transport-xyz").is_none(),
3546 "timestamp-only mutation on a signed message must fail signature verification"
3547 );
3548 }
3549}