1use crate::PeerId;
20use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
21use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
22use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
23use crate::bootstrap::{BootstrapConfig, BootstrapManager};
24use crate::dht::core_engine::AddressType;
25use crate::dht_network_manager::{
26 DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, IDENTITY_EXCHANGE_TIMEOUT,
27};
28use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
29use crate::reachability::spawn_acquisition_driver;
30
31use crate::MultiAddr;
32use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
33use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
34use dashmap::DashMap;
35use futures::StreamExt;
36use parking_lot::Mutex as ParkingMutex;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::time::{Duration, SystemTime, UNIX_EPOCH};
44use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
45use tokio::time::Instant;
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, info, trace, warn};
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
53pub(crate) struct WireMessage {
54 pub(crate) protocol: String,
56 pub(crate) data: Vec<u8>,
58 pub(crate) from: PeerId,
60 pub(crate) timestamp: u64,
62 #[serde(default)]
68 pub(crate) user_agent: String,
69 #[serde(default)]
71 pub(crate) public_key: Vec<u8>,
72 #[serde(default)]
74 pub(crate) signature: Vec<u8>,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
83pub enum NodeMode {
84 #[default]
86 Node,
87 Client,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93enum ListenMode {
94 Public,
96 Local,
98}
99
100pub fn user_agent_for_mode(mode: NodeMode) -> String {
105 let prefix = match mode {
106 NodeMode::Node => "node",
107 NodeMode::Client => "client",
108 };
109 format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
110}
111
112pub fn is_dht_participant(user_agent: &str) -> bool {
114 user_agent.starts_with("node/")
115}
116
117pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
119
120pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
122
123pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
125
126const DEFAULT_LISTEN_PORT: u16 = 9000;
128
129const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
131
132const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 25;
139
140const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
142
143const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 3;
157
158const MAX_CONCURRENT_BOOTSTRAP_DIALS: usize = 4;
165
166const CLIENT_BOOTSTRAP_TARGET: usize = 6;
175
176const fn default_true() -> bool {
178 true
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct NodeConfig {
184 #[serde(default)]
190 pub local: bool,
191
192 #[serde(default)]
194 pub port: u16,
195
196 #[serde(default = "default_true")]
201 pub ipv6: bool,
202
203 pub bootstrap_peers: Vec<crate::MultiAddr>,
205
206 pub connection_timeout: Duration,
209
210 pub max_connections: usize,
212
213 pub dht_config: DHTConfig,
215
216 pub bootstrap_cache_config: Option<BootstrapConfig>,
218
219 pub diversity_config: Option<crate::security::IPDiversityConfig>,
224
225 #[serde(default)]
229 pub max_message_size: Option<usize>,
230
231 #[serde(skip)]
236 pub node_identity: Option<Arc<NodeIdentity>>,
237
238 #[serde(default)]
244 pub mode: NodeMode,
245
246 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub custom_user_agent: Option<String>,
252
253 #[serde(default)]
261 pub allow_loopback: bool,
262
263 #[serde(default)]
271 pub adaptive_dht_config: AdaptiveDhtConfig,
272
273 #[serde(default, skip_serializing_if = "Option::is_none")]
284 pub close_group_cache_dir: Option<PathBuf>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct DHTConfig {
290 pub k_value: usize,
292
293 pub alpha_value: usize,
295
296 pub refresh_interval: Duration,
298}
299
300#[inline]
313fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
314 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
315
316 let (v4, v6) = match mode {
317 ListenMode::Public => (
318 std::net::Ipv4Addr::UNSPECIFIED,
319 std::net::Ipv6Addr::UNSPECIFIED,
320 ),
321 ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
322 };
323
324 if ipv6_enabled {
325 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
326 std::net::IpAddr::V6(v6),
327 port,
328 )));
329 }
330
331 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
332 std::net::IpAddr::V4(v4),
333 port,
334 )));
335
336 addrs
337}
338
339impl NodeConfig {
340 pub fn user_agent(&self) -> String {
345 self.custom_user_agent
346 .clone()
347 .unwrap_or_else(|| user_agent_for_mode(self.mode))
348 }
349
350 pub fn listen_addrs(&self) -> Vec<MultiAddr> {
355 let mode = if self.local {
356 ListenMode::Local
357 } else {
358 ListenMode::Public
359 };
360 build_listen_addrs(self.port, self.ipv6, mode)
361 }
362
363 pub fn new() -> Result<Self> {
369 Ok(Self::default())
370 }
371
372 pub fn builder() -> NodeConfigBuilder {
374 NodeConfigBuilder::default()
375 }
376}
377
378#[derive(Debug, Clone)]
401pub struct NodeConfigBuilder {
402 port: u16,
403 ipv6: bool,
404 local: bool,
405 bootstrap_peers: Vec<crate::MultiAddr>,
406 max_connections: Option<usize>,
407 connection_timeout: Option<Duration>,
408 dht_config: Option<DHTConfig>,
409 max_message_size: Option<usize>,
410 mode: NodeMode,
411 custom_user_agent: Option<String>,
412 allow_loopback: Option<bool>,
413 adaptive_dht_config: Option<AdaptiveDhtConfig>,
414 close_group_cache_dir: Option<PathBuf>,
415}
416
417impl Default for NodeConfigBuilder {
418 fn default() -> Self {
419 Self {
420 port: 0,
421 ipv6: true,
422 local: false,
423 bootstrap_peers: Vec::new(),
424 max_connections: None,
425 connection_timeout: None,
426 dht_config: None,
427 max_message_size: None,
428 mode: NodeMode::default(),
429 custom_user_agent: None,
430 allow_loopback: None,
431 adaptive_dht_config: None,
432 close_group_cache_dir: None,
433 }
434 }
435}
436
437impl NodeConfigBuilder {
438 pub fn port(mut self, port: u16) -> Self {
440 self.port = port;
441 self
442 }
443
444 pub fn ipv6(mut self, enabled: bool) -> Self {
446 self.ipv6 = enabled;
447 self
448 }
449
450 pub fn local(mut self, local: bool) -> Self {
457 self.local = local;
458 self
459 }
460
461 pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
463 self.bootstrap_peers.push(addr);
464 self
465 }
466
467 pub fn max_connections(mut self, max: usize) -> Self {
469 self.max_connections = Some(max);
470 self
471 }
472
473 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
475 self.connection_timeout = Some(timeout);
476 self
477 }
478
479 pub fn dht_config(mut self, config: DHTConfig) -> Self {
481 self.dht_config = Some(config);
482 self
483 }
484
485 pub fn max_message_size(mut self, max_message_size: usize) -> Self {
489 self.max_message_size = Some(max_message_size);
490 self
491 }
492
493 pub fn mode(mut self, mode: NodeMode) -> Self {
495 self.mode = mode;
496 self
497 }
498
499 pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
501 self.custom_user_agent = Some(user_agent.into());
502 self
503 }
504
505 pub fn allow_loopback(mut self, allow: bool) -> Self {
509 self.allow_loopback = Some(allow);
510 self
511 }
512
513 pub fn trust_enforcement(mut self, enabled: bool) -> Self {
526 let threshold = if enabled {
527 AdaptiveDhtConfig::default().swap_threshold
528 } else {
529 0.0
530 };
531 self.adaptive_dht_config = Some(AdaptiveDhtConfig {
532 swap_threshold: threshold,
533 });
534 self
535 }
536
537 pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
541 self.adaptive_dht_config = Some(config);
542 self
543 }
544
545 pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
550 self.close_group_cache_dir = Some(path.into());
551 self
552 }
553
554 pub fn build(self) -> Result<NodeConfig> {
560 let allow_loopback = self.allow_loopback.unwrap_or(self.local);
562
563 Ok(NodeConfig {
564 local: self.local,
565 port: self.port,
566 ipv6: self.ipv6,
567 bootstrap_peers: self.bootstrap_peers,
568 connection_timeout: self
569 .connection_timeout
570 .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
571 max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
572 dht_config: self.dht_config.unwrap_or_default(),
573 bootstrap_cache_config: None,
574 diversity_config: None,
575 max_message_size: self.max_message_size,
576 node_identity: None,
577 mode: self.mode,
578 custom_user_agent: self.custom_user_agent,
579 allow_loopback,
580 adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
581 close_group_cache_dir: self.close_group_cache_dir,
582 })
583 }
584}
585
586impl Default for NodeConfig {
587 fn default() -> Self {
588 Self {
589 local: false,
590 port: DEFAULT_LISTEN_PORT,
591 ipv6: true,
592 bootstrap_peers: Vec::new(),
593 connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
594 max_connections: DEFAULT_MAX_CONNECTIONS,
595 dht_config: DHTConfig::default(),
596 bootstrap_cache_config: None,
597 diversity_config: None,
598 max_message_size: None,
599 node_identity: None,
600 mode: NodeMode::default(),
601 custom_user_agent: None,
602 allow_loopback: false,
603 adaptive_dht_config: AdaptiveDhtConfig::default(),
604 close_group_cache_dir: None,
605 }
606 }
607}
608
609impl DHTConfig {
610 pub const DEFAULT_K_VALUE: usize = 20;
612 const DEFAULT_ALPHA_VALUE: usize = 3;
613 const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
614 const MIN_K_VALUE: usize = 4;
616
617 pub fn validate(&self) -> Result<()> {
621 if self.k_value < Self::MIN_K_VALUE {
622 return Err(P2PError::Validation(
623 format!(
624 "k_value must be >= {} (got {}), values below {} produce degenerate behavior",
625 Self::MIN_K_VALUE,
626 self.k_value,
627 Self::MIN_K_VALUE,
628 )
629 .into(),
630 ));
631 }
632 if self.alpha_value < 1 {
633 return Err(P2PError::Validation(
634 format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
635 ));
636 }
637 if self.refresh_interval.is_zero() {
638 return Err(P2PError::Validation("refresh_interval must be > 0".into()));
639 }
640 Ok(())
641 }
642}
643
644impl Default for DHTConfig {
645 fn default() -> Self {
646 Self {
647 k_value: Self::DEFAULT_K_VALUE,
648 alpha_value: Self::DEFAULT_ALPHA_VALUE,
649 refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
650 }
651 }
652}
653
654#[derive(Debug, Clone)]
656pub struct PeerInfo {
657 #[allow(dead_code)]
659 pub(crate) channel_id: String,
660
661 pub addresses: Vec<MultiAddr>,
663
664 pub connected_at: Instant,
666
667 pub last_seen: Instant,
669
670 pub status: ConnectionStatus,
672
673 pub protocols: Vec<String>,
675
676 pub heartbeat_count: u64,
678}
679
680#[derive(Debug, Clone, PartialEq)]
682pub enum ConnectionStatus {
683 Connecting,
685 Connected,
687 Disconnecting,
689 Disconnected,
691 Failed(String),
693}
694
695#[derive(Debug, Clone)]
700pub enum P2PEvent {
701 Message {
703 topic: String,
705 source: Option<PeerId>,
708 transport_source: Option<MultiAddr>,
712 data: Vec<u8>,
714 },
715 PeerConnected(PeerId, String),
718 PeerDisconnected(PeerId),
720}
721
722#[derive(Debug, Clone)]
727pub struct PeerResponse {
728 pub peer_id: PeerId,
730 pub data: Vec<u8>,
732 pub latency: Duration,
734}
735
736#[derive(Debug, Clone, Serialize, Deserialize)]
741pub(crate) struct RequestResponseEnvelope {
742 pub(crate) message_id: String,
744 pub(crate) is_response: bool,
746 pub(crate) payload: Vec<u8>,
748}
749
750pub(crate) struct PendingRequest {
752 pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
754 pub(crate) expected_peer: PeerId,
756}
757
758const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
764
765pub struct P2PNode {
776 config: NodeConfig,
778
779 peer_id: PeerId,
781
782 transport: Arc<crate::transport_handle::TransportHandle>,
784
785 start_time: Instant,
787
788 shutdown: CancellationToken,
790
791 adaptive_dht: AdaptiveDHT,
794
795 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
797
798 is_bootstrapped: Arc<AtomicBool>,
800
801 is_started: Arc<AtomicBool>,
803
804 reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
808
809 relayer_peer_id: Arc<RwLock<Option<PeerId>>>,
818
819 relay_address: Arc<RwLock<Option<SocketAddr>>>,
826}
827
828pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
844 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
845
846 if addr.ip().is_unspecified() {
847 let loopback_ip = match addr {
849 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
852 std::net::SocketAddr::new(loopback_ip, addr.port())
853 } else {
854 addr
856 }
857}
858
859impl P2PNode {
860 pub async fn new(config: NodeConfig) -> Result<Self> {
862 let node_identity = match config.node_identity.clone() {
864 Some(identity) => identity,
865 None => Arc::new(NodeIdentity::generate()?),
866 };
867
868 let peer_id = *node_identity.peer_id();
870
871 config.dht_config.validate()?;
874 if let Some(ref diversity) = config.diversity_config {
875 diversity
876 .validate()
877 .map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
878 }
879
880 let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
882 let bootstrap_manager =
883 match BootstrapManager::with_node_config(bootstrap_config, &config).await {
884 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
885 Err(e) => {
886 warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
887 None
888 }
889 };
890
891 let transport_config = crate::transport_handle::TransportConfig::from_node_config(
893 &config,
894 crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
895 node_identity.clone(),
896 );
897 let transport =
898 Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
899
900 let dht_manager_config = DhtNetworkConfig {
902 peer_id,
903 node_config: config.clone(),
904 request_timeout: config.connection_timeout,
905 max_concurrent_operations: MAX_ACTIVE_REQUESTS,
906 enable_security: true,
907 swap_threshold: 0.0, };
909 let adaptive_dht = AdaptiveDHT::new(
910 transport.clone(),
911 dht_manager_config,
912 config.adaptive_dht_config.clone(),
913 )
914 .await?;
915
916 let node = Self {
917 config,
918 peer_id,
919 transport,
920 start_time: Instant::now(),
921 shutdown: CancellationToken::new(),
922 adaptive_dht,
923 bootstrap_manager,
924 is_bootstrapped: Arc::new(AtomicBool::new(false)),
925 is_started: Arc::new(AtomicBool::new(false)),
926 reconnect_locks: ParkingMutex::new(HashMap::new()),
927 relayer_peer_id: Arc::new(RwLock::new(None)),
928 relay_address: Arc::new(RwLock::new(None)),
929 };
930 info!(
931 "Created P2P node with peer ID: {} (call start() to begin networking)",
932 node.peer_id
933 );
934
935 Ok(node)
936 }
937
938 pub fn peer_id(&self) -> &PeerId {
940 &self.peer_id
941 }
942
943 pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
945 &self.transport
946 }
947
948 pub async fn relay_address(&self) -> Option<SocketAddr> {
955 *self.relay_address.read().await
956 }
957
958 pub fn local_addr(&self) -> Option<MultiAddr> {
959 self.transport.local_addr()
960 }
961
962 pub fn is_bootstrapped(&self) -> bool {
967 self.is_bootstrapped.load(Ordering::SeqCst)
968 }
969
970 pub async fn re_bootstrap(&self) -> Result<()> {
975 self.is_bootstrapped.store(false, Ordering::SeqCst);
976 self.connect_bootstrap_peers(None).await
977 }
978
979 pub fn trust_engine(&self) -> Arc<TrustEngine> {
985 self.adaptive_dht.trust_engine().clone()
986 }
987
988 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
1002 self.adaptive_dht.report_trust_event(peer_id, event).await;
1003 }
1004
1005 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
1009 self.adaptive_dht.peer_trust(peer_id)
1010 }
1011
1012 pub fn adaptive_dht(&self) -> &AdaptiveDHT {
1014 &self.adaptive_dht
1015 }
1016
1017 pub async fn send_request(
1050 &self,
1051 peer_id: &PeerId,
1052 protocol: &str,
1053 data: Vec<u8>,
1054 timeout: Duration,
1055 ) -> Result<PeerResponse> {
1056 match self
1057 .transport
1058 .send_request(peer_id, protocol, data, timeout)
1059 .await
1060 {
1061 Ok(resp) => Ok(resp),
1062 Err(e) => {
1063 let event = if matches!(&e, P2PError::Timeout(_)) {
1064 TrustEvent::ConnectionTimeout
1065 } else {
1066 TrustEvent::ConnectionFailed
1067 };
1068 self.report_trust_event(peer_id, event).await;
1069 Err(e)
1070 }
1071 }
1072 }
1073
1074 pub async fn send_response(
1075 &self,
1076 peer_id: &PeerId,
1077 protocol: &str,
1078 message_id: &str,
1079 data: Vec<u8>,
1080 ) -> Result<()> {
1081 self.transport
1082 .send_response(peer_id, protocol, message_id, data)
1083 .await
1084 }
1085
1086 pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1087 crate::transport_handle::TransportHandle::parse_request_envelope(data)
1088 }
1089
1090 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1091 self.transport.subscribe(topic).await
1092 }
1093
1094 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1095 self.transport.publish(topic, data).await
1096 }
1097
1098 pub fn config(&self) -> &NodeConfig {
1100 &self.config
1101 }
1102
1103 pub async fn start(&self) -> Result<()> {
1105 info!("Starting P2P node...");
1106
1107 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1109 let mut manager = bootstrap_manager.write().await;
1110 manager
1111 .start_maintenance()
1112 .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
1113 info!("Bootstrap cache manager started");
1114 }
1115
1116 self.transport.start_network_listeners().await?;
1118
1119 self.adaptive_dht.start().await?;
1121
1122 let listen_addrs = self.transport.listen_addrs().await;
1124 info!("P2P node started on addresses: {:?}", listen_addrs);
1125
1126 let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
1133 match CloseGroupCache::load_from_dir(dir).await {
1134 Ok(Some(cache)) => {
1135 let original_count = cache.peers.len();
1138 let cache = CloseGroupCache {
1139 peers: cache
1140 .peers
1141 .into_iter()
1142 .filter(|p| p.trust.score.is_finite())
1143 .collect(),
1144 ..cache
1145 };
1146 let filtered_count = original_count - cache.peers.len();
1147 if filtered_count > 0 {
1148 warn!(
1149 "Filtered {filtered_count} peers with non-finite trust scores from close group cache"
1150 );
1151 }
1152
1153 let trust_snapshot = TrustSnapshot {
1154 peers: cache
1155 .peers
1156 .iter()
1157 .map(|p| (p.peer_id, p.trust.clone()))
1158 .collect(),
1159 };
1160 self.adaptive_dht
1161 .trust_engine()
1162 .import_snapshot(&trust_snapshot);
1163 info!(
1164 "Loaded {} peers from close group cache (trust scores imported)",
1165 cache.peers.len()
1166 );
1167 Some(cache)
1168 }
1169 Ok(None) => {
1170 debug!(
1171 "No close group cache found in {}, fresh start",
1172 dir.display()
1173 );
1174 None
1175 }
1176 Err(e) => {
1177 warn!(
1178 "Failed to load close group cache from {}: {e}",
1179 dir.display()
1180 );
1181 None
1182 }
1183 }
1184 } else {
1185 None
1186 };
1187
1188 self.connect_bootstrap_peers(close_group_cache.as_ref())
1190 .await?;
1191
1192 {
1199 let dht = self.adaptive_dht.dht_manager();
1200 let rt_size = dht.get_routing_table_size().await;
1201 dht.emit_event(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
1202 }
1203
1204 if self.config.mode != NodeMode::Client {
1218 spawn_acquisition_driver(
1219 self.adaptive_dht.dht_manager().clone(),
1220 Arc::clone(&self.transport),
1221 Arc::clone(&self.relayer_peer_id),
1222 Arc::clone(&self.relay_address),
1223 self.shutdown.clone(),
1224 );
1225 } else {
1226 info!("client mode — skipping relay acquisition driver");
1227 }
1228
1229 {
1270 let transport = Arc::clone(&self.transport);
1271 let dht = self.adaptive_dht.dht_manager().clone();
1272 let shutdown = self.shutdown.clone();
1273 tokio::spawn(async move {
1274 loop {
1275 tokio::select! {
1276 biased;
1277 _ = shutdown.cancelled() => break,
1278 update = transport.recv_peer_address_update() => {
1279 let Some((peer_addr, advertised_addr)) = update else { break };
1280 let normalized_peer =
1281 saorsa_transport::shared::normalize_socket_addr(peer_addr);
1282 let normalized_adv =
1283 saorsa_transport::shared::normalize_socket_addr(advertised_addr);
1284 if normalized_peer.ip() == normalized_adv.ip() {
1289 debug!(
1290 "DHT_BRIDGE: dropping same-IP update peer={} addr={}",
1291 normalized_peer,
1292 normalized_adv
1293 );
1294 continue;
1295 }
1296 info!(
1297 "DHT_BRIDGE: processing relay update peer={} addr={}",
1298 normalized_peer,
1299 normalized_adv
1300 );
1301 if let Some(peer_id) = transport.peer_id_for_addr(&normalized_peer).await {
1306 let multi_addr = MultiAddr::quic(normalized_adv);
1307 info!(
1308 "Updating DHT: peer {} relay address {} (connection was {})",
1309 peer_id, advertised_addr, peer_addr
1310 );
1311 if !dht
1312 .touch_legacy_relay_hint_if_unsequenced(&peer_id, &multi_addr)
1313 .await
1314 {
1315 debug!(
1316 "DHT_BRIDGE: ignored legacy relay hint for sequenced peer {} addr {}",
1317 peer_id, advertised_addr
1318 );
1319 }
1320 }
1321 }
1322 }
1323 }
1324 });
1325 }
1326
1327 self.is_started
1328 .store(true, std::sync::atomic::Ordering::Release);
1329
1330 Ok(())
1331 }
1332
1333 pub async fn run(&self) -> Result<()> {
1338 if !self.is_running() {
1339 self.start().await?;
1340 }
1341
1342 info!("P2P node running...");
1343
1344 self.shutdown.cancelled().await;
1347
1348 info!("P2P node stopped");
1349 Ok(())
1350 }
1351
1352 pub async fn stop(&self) -> Result<()> {
1354 info!("Stopping P2P node...");
1355
1356 if let Some(ref dir) = self.config.close_group_cache_dir
1358 && let Err(e) = self.save_close_group_cache(dir).await
1359 {
1360 warn!("Failed to save close group cache on shutdown: {e}");
1361 }
1362
1363 self.shutdown.cancel();
1365
1366 self.adaptive_dht.stop().await?;
1368
1369 self.transport.stop().await?;
1371
1372 self.is_started
1373 .store(false, std::sync::atomic::Ordering::Release);
1374
1375 info!("P2P node stopped");
1376 Ok(())
1377 }
1378
1379 pub async fn shutdown(&self) -> Result<()> {
1381 self.stop().await
1382 }
1383
1384 pub fn is_running(&self) -> bool {
1386 self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1387 }
1388
1389 pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1391 self.transport.listen_addrs().await
1392 }
1393
1394 pub async fn connected_peers(&self) -> Vec<PeerId> {
1396 self.transport.connected_peers().await
1397 }
1398
1399 pub async fn peer_count(&self) -> usize {
1401 self.transport.peer_count().await
1402 }
1403
1404 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1406 self.transport.peer_info(peer_id).await
1407 }
1408
1409 #[allow(dead_code)]
1411 pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1412 self.transport.get_channel_id_by_address(addr).await
1413 }
1414
1415 #[allow(dead_code)]
1417 pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1418 self.transport.list_active_connections().await
1419 }
1420
1421 #[allow(dead_code)]
1423 pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1424 self.transport.remove_channel(channel_id).await
1425 }
1426
1427 pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1432 self.transport.disconnect_channel(channel_id).await;
1433 }
1434
1435 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1437 self.transport.is_peer_connected(peer_id).await
1438 }
1439
1440 pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1451 self.transport.connect_peer(address).await
1452 }
1453
1454 pub async fn connect_peer_typed(
1461 &self,
1462 address: &MultiAddr,
1463 kind: AddressType,
1464 ) -> Result<String> {
1465 self.transport.connect_peer_typed(address, kind).await
1466 }
1467
1468 pub async fn wait_for_peer_identity(
1475 &self,
1476 channel_id: &str,
1477 timeout: Duration,
1478 ) -> Result<PeerId> {
1479 self.transport
1480 .wait_for_peer_identity(channel_id, timeout)
1481 .await
1482 }
1483
1484 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1486 self.transport.disconnect_peer(peer_id).await
1487 }
1488
1489 #[allow(dead_code)]
1491 pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1492 self.transport.is_connection_active(channel_id).await
1493 }
1494
1495 pub async fn send_message(
1509 &self,
1510 peer_id: &PeerId,
1511 protocol: &str,
1512 data: Vec<u8>,
1513 addrs: &[MultiAddr],
1514 ) -> Result<()> {
1515 let existing_channels = self.transport.channels_for_peer(peer_id).await;
1520
1521 if existing_channels.is_empty() {
1524 let lock = self.reconnect_lock_for(peer_id);
1525 let _guard = lock.lock().await;
1526
1527 if self.transport.is_peer_connected(peer_id).await {
1529 return self.transport.send_message(peer_id, protocol, data).await;
1530 }
1531
1532 return self
1533 .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1534 .await;
1535 }
1536
1537 let saved_addrs: Vec<MultiAddr> = self
1540 .transport
1541 .peer_info(peer_id)
1542 .await
1543 .map(|info| info.addresses)
1544 .unwrap_or_default();
1545
1546 let retry_data = data.clone();
1549
1550 let send_result = self.transport.send_message(peer_id, protocol, data).await;
1552 match send_result {
1553 Ok(()) => return Ok(()),
1554 Err(e) => {
1555 if !e.is_stale_channel_send_failure() {
1556 debug!(
1557 peer = %peer_id.to_hex(),
1558 error = %e,
1559 "send failed during active channel use, not reconnecting",
1560 );
1561 return Err(e);
1562 }
1563
1564 debug!(
1565 peer = %peer_id.to_hex(),
1566 error = %e,
1567 "stale channel send failed, attempting reconnect",
1568 );
1569 }
1570 }
1571
1572 let lock = self.reconnect_lock_for(peer_id);
1575 let _guard = lock.lock().await;
1576
1577 if self.transport.is_peer_connected(peer_id).await {
1579 for channel_id in &existing_channels {
1583 self.transport.disconnect_channel(channel_id).await;
1584 }
1585 return self
1586 .transport
1587 .send_message(peer_id, protocol, retry_data)
1588 .await;
1589 }
1590
1591 self.reconnect_and_send(
1592 peer_id,
1593 protocol,
1594 retry_data,
1595 addrs,
1596 &saved_addrs,
1597 &existing_channels,
1598 )
1599 .await
1600 }
1601
1602 async fn reconnect_and_send(
1604 &self,
1605 peer_id: &PeerId,
1606 protocol: &str,
1607 data: Vec<u8>,
1608 addrs: &[MultiAddr],
1609 saved_addrs: &[MultiAddr],
1610 stale_channels: &[String],
1611 ) -> Result<()> {
1612 let (address, kind) = self
1614 .resolve_dial_address(peer_id, addrs, saved_addrs)
1615 .await
1616 .ok_or_else(|| {
1617 P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1618 })?;
1619
1620 if !stale_channels.is_empty() {
1626 for channel_id in stale_channels {
1627 self.transport.disconnect_channel(channel_id).await;
1628 }
1629 tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1630 }
1631
1632 let channel_id = self.transport.connect_peer_typed(&address, kind).await?;
1634 let authenticated = match self
1635 .transport
1636 .wait_for_peer_identity(&channel_id, IDENTITY_EXCHANGE_TIMEOUT)
1637 .await
1638 {
1639 Ok(peer) => peer,
1640 Err(e) => {
1641 self.transport.disconnect_channel(&channel_id).await;
1644 return Err(e);
1645 }
1646 };
1647
1648 if &authenticated != peer_id {
1649 self.transport.disconnect_channel(&channel_id).await;
1650 return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1651 expected: peer_id.to_hex().into(),
1652 actual: authenticated.to_hex().into(),
1653 }));
1654 }
1655
1656 self.transport.send_message(peer_id, protocol, data).await
1658 }
1659
1660 async fn resolve_dial_address(
1671 &self,
1672 peer_id: &PeerId,
1673 caller_addrs: &[MultiAddr],
1674 saved_addrs: &[MultiAddr],
1675 ) -> Option<(MultiAddr, AddressType)> {
1676 if let Some(addr) = Self::first_dialable(caller_addrs) {
1683 return Some((addr, AddressType::Unverified));
1684 }
1685 if let Some(addr) = Self::first_dialable(saved_addrs) {
1686 return Some((addr, AddressType::Unverified));
1687 }
1688
1689 self.adaptive_dht
1690 .peer_addresses_for_dial_typed(peer_id)
1691 .await
1692 .into_iter()
1693 .find(|(a, _)| {
1694 a.dialable_socket_addr()
1695 .is_some_and(|sa| !sa.ip().is_unspecified())
1696 })
1697 }
1698
1699 fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1702 addrs
1703 .iter()
1704 .find(|a| {
1705 let dialable = a
1706 .dialable_socket_addr()
1707 .is_some_and(|sa| !sa.ip().is_unspecified());
1708 if !dialable {
1709 trace!(address = %a, "skipping non-dialable address");
1710 }
1711 dialable
1712 })
1713 .cloned()
1714 }
1715
1716 fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1718 self.reconnect_locks
1719 .lock()
1720 .entry(*peer_id)
1721 .or_insert_with(|| Arc::new(TokioMutex::new(())))
1722 .clone()
1723 }
1724}
1725
1726const MAX_MESSAGE_AGE_SECS: u64 = 300;
1748const MAX_FUTURE_SECS: u64 = 300;
1750
1751fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1753 P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1754}
1755
1756pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1758 if let Err(e) = tx.send(event) {
1759 tracing::trace!("Event broadcast has no receivers: {e}");
1760 }
1761}
1762
1763pub(crate) struct ParsedMessage {
1765 pub(crate) event: P2PEvent,
1767 pub(crate) authenticated_node_id: Option<PeerId>,
1769 pub(crate) user_agent: String,
1771}
1772
1773pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1774 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1775 let transport_source = source.parse::<SocketAddr>().ok().map(MultiAddr::quic);
1776
1777 let now = std::time::SystemTime::now()
1779 .duration_since(std::time::UNIX_EPOCH)
1780 .map(|d| d.as_secs())
1781 .unwrap_or(0);
1782
1783 if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1785 tracing::warn!(
1786 "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1787 source,
1788 message.timestamp,
1789 now.saturating_sub(message.timestamp)
1790 );
1791 return None;
1792 }
1793
1794 if message.timestamp > now + MAX_FUTURE_SECS {
1796 tracing::warn!(
1797 "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1798 source,
1799 message.timestamp,
1800 message.timestamp.saturating_sub(now)
1801 );
1802 return None;
1803 }
1804
1805 let authenticated_node_id = if !message.signature.is_empty() {
1807 match verify_message_signature(&message) {
1808 Ok(peer_id) => {
1809 debug!(
1810 "Message from {} authenticated as app-level NodeId {}",
1811 source, peer_id
1812 );
1813 Some(peer_id)
1814 }
1815 Err(e) => {
1816 warn!(
1817 "Rejecting message from {}: signature verification failed: {}",
1818 source, e
1819 );
1820 return None;
1821 }
1822 }
1823 } else {
1824 None
1825 };
1826
1827 debug!(
1828 "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1829 message.protocol,
1830 authenticated_node_id,
1831 source,
1832 message.from,
1833 message.data.len()
1834 );
1835
1836 Some(ParsedMessage {
1837 event: P2PEvent::Message {
1838 topic: message.protocol,
1839 source: authenticated_node_id,
1840 transport_source,
1841 data: message.data,
1842 },
1843 authenticated_node_id,
1844 user_agent: message.user_agent,
1845 })
1846}
1847
1848fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1855 let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1856 .map_err(|e| format!("invalid public key: {e:?}"))?;
1857
1858 let peer_id = peer_id_from_public_key(&pubkey);
1859
1860 if message.from != peer_id {
1862 return Err(format!(
1863 "from field mismatch: message claims '{}' but public key derives '{}'",
1864 message.from, peer_id
1865 ));
1866 }
1867
1868 let signable = postcard::to_stdvec(&(
1869 &message.protocol,
1870 &message.data as &[u8],
1871 &message.from,
1872 message.timestamp,
1873 &message.user_agent,
1874 ))
1875 .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1876
1877 let sig = MlDsaSignature::from_bytes(&message.signature)
1878 .map_err(|e| format!("invalid signature: {e:?}"))?;
1879
1880 let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1881 .map_err(|e| format!("verification error: {e}"))?;
1882
1883 if valid {
1884 Ok(peer_id)
1885 } else {
1886 Err("signature is invalid".to_string())
1887 }
1888}
1889
1890impl P2PNode {
1891 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1893 self.transport.subscribe_events()
1894 }
1895
1896 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1898 self.subscribe_events()
1899 }
1900
1901 pub fn uptime(&self) -> Duration {
1903 self.start_time.elapsed()
1904 }
1905
1906 pub async fn health_check(&self) -> Result<()> {
1919 let peer_count = self.peer_count().await;
1920 if peer_count > self.config.max_connections {
1921 Err(protocol_error(format!(
1922 "Too many connections: {peer_count}"
1923 )))
1924 } else {
1925 Ok(())
1926 }
1927 }
1928
1929 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1931 self.adaptive_dht.dht_manager()
1932 }
1933
1934 pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1936 self.dht_manager()
1937 }
1938
1939 pub async fn add_discovered_peer(
1941 &self,
1942 _peer_id: PeerId,
1943 addresses: Vec<MultiAddr>,
1944 ) -> Result<()> {
1945 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1946 let manager = bootstrap_manager.read().await;
1947 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1948 .iter()
1949 .filter_map(|addr| addr.socket_addr())
1950 .collect();
1951 if let Some(&primary) = socket_addresses.first() {
1952 manager
1953 .add_peer(&primary, socket_addresses)
1954 .await
1955 .map_err(|e| {
1956 protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1957 })?;
1958 }
1959 }
1960 Ok(())
1961 }
1962
1963 pub async fn update_peer_metrics(
1965 &self,
1966 addr: &MultiAddr,
1967 success: bool,
1968 latency_ms: Option<u64>,
1969 _error: Option<String>,
1970 ) -> Result<()> {
1971 if let Some(ref bootstrap_manager) = self.bootstrap_manager
1972 && let Some(sa) = addr.socket_addr()
1973 {
1974 let manager = bootstrap_manager.read().await;
1975 if success {
1976 let rtt_ms = latency_ms.unwrap_or(0) as u32;
1977 manager.record_success(&sa, rtt_ms).await;
1978 } else {
1979 manager.record_failure(&sa).await;
1980 }
1981 }
1982 Ok(())
1983 }
1984
1985 pub async fn get_bootstrap_cache_stats(
1987 &self,
1988 ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1989 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1990 let manager = bootstrap_manager.read().await;
1991 Ok(Some(manager.stats().await))
1992 } else {
1993 Ok(None)
1994 }
1995 }
1996
1997 pub async fn cached_peer_count(&self) -> usize {
1999 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
2000 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
2001 {
2002 return stats.total_peers;
2003 }
2004 0
2005 }
2006
2007 async fn connect_bootstrap_peers(
2014 &self,
2015 close_group_cache: Option<&CloseGroupCache>,
2016 ) -> Result<()> {
2017 let mut serial_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2022 let mut parallel_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2023 let mut used_cache = false;
2024 let mut seen_addresses = std::collections::HashSet::new();
2025
2026 if let Some(cache) = close_group_cache {
2032 let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
2033 sorted_peers.sort_by(|a, b| {
2034 let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
2038 Some(ord) => ord,
2039 None => {
2040 if a.trust.score.is_nan() {
2041 std::cmp::Ordering::Greater } else {
2043 std::cmp::Ordering::Less }
2045 }
2046 };
2047 score_ord.then_with(|| {
2048 let da = self.peer_id.xor_distance(&a.peer_id);
2049 let db = self.peer_id.xor_distance(&b.peer_id);
2050 da.cmp(&db)
2051 })
2052 });
2053
2054 let mut added_from_close_group = 0usize;
2055 for peer in &sorted_peers {
2056 let new_addresses: Vec<MultiAddr> = peer
2057 .addresses
2058 .iter()
2059 .filter(|a| {
2060 a.dialable_socket_addr()
2061 .is_some_and(|sa| !seen_addresses.contains(&sa))
2062 })
2063 .cloned()
2064 .collect();
2065
2066 if !new_addresses.is_empty() {
2067 for addr in &new_addresses {
2068 if let Some(sa) = addr.socket_addr() {
2069 seen_addresses.insert(sa);
2070 }
2071 }
2072 serial_addr_sets.push(new_addresses);
2073 added_from_close_group += 1;
2074 }
2075 }
2076 if added_from_close_group > 0 {
2077 info!(
2078 "Added {} close group cache peers (highest trust first)",
2079 added_from_close_group
2080 );
2081 }
2082 }
2083
2084 if !self.config.bootstrap_peers.is_empty() {
2086 info!(
2087 "Using {} configured bootstrap peers (priority)",
2088 self.config.bootstrap_peers.len()
2089 );
2090 for multiaddr in &self.config.bootstrap_peers {
2091 let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
2092 warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
2093 continue;
2094 };
2095 seen_addresses.insert(socket_addr);
2096 parallel_addr_sets.push(vec![multiaddr.clone()]);
2097 }
2098 }
2099
2100 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2102 let manager = bootstrap_manager.read().await;
2103 let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
2104 if !cached_peers.is_empty() {
2105 let mut added_from_cache = 0;
2106 for cached in cached_peers {
2107 let mut addrs = vec![cached.primary_address];
2108 addrs.extend(cached.addresses);
2109 let new_addresses: Vec<MultiAddr> = addrs
2111 .into_iter()
2112 .filter(|a| !seen_addresses.contains(a))
2113 .map(MultiAddr::quic)
2114 .collect();
2115
2116 if !new_addresses.is_empty() {
2117 for addr in &new_addresses {
2118 if let Some(sa) = addr.socket_addr() {
2119 seen_addresses.insert(sa);
2120 }
2121 }
2122 parallel_addr_sets.push(new_addresses);
2123 added_from_cache += 1;
2124 }
2125 }
2126 if added_from_cache > 0 {
2127 info!(
2128 "Added {} cached bootstrap peers (supplementing CLI peers)",
2129 added_from_cache
2130 );
2131 used_cache = true;
2132 }
2133 }
2134 }
2135
2136 if serial_addr_sets.is_empty() && parallel_addr_sets.is_empty() {
2137 info!("No bootstrap peers configured and no cached peers available");
2138 return Ok(());
2139 }
2140
2141 let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
2144 let mut successful_connections = 0;
2145 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2146
2147 let client_mode = matches!(self.config.mode, NodeMode::Client);
2149 for addrs in &serial_addr_sets {
2150 if let Some(peer_id) = self
2151 .dial_bootstrap_addr_set(addrs, used_cache, identity_timeout)
2152 .await
2153 {
2154 successful_connections += 1;
2155 connected_peer_ids.push(peer_id);
2156 if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2157 debug!(
2158 "Client bootstrap target reached ({successful_connections} peers) — skipping remaining serial dials"
2159 );
2160 break;
2161 }
2162 }
2163 }
2164
2165 if !client_mode || successful_connections < CLIENT_BOOTSTRAP_TARGET {
2170 let mut parallel_stream =
2171 futures::stream::iter(parallel_addr_sets.into_iter().map(|addrs| async move {
2172 self.dial_bootstrap_addr_set(&addrs, used_cache, identity_timeout)
2173 .await
2174 }))
2175 .buffer_unordered(MAX_CONCURRENT_BOOTSTRAP_DIALS);
2176 while let Some(result) = parallel_stream.next().await {
2177 if let Some(peer_id) = result {
2178 successful_connections += 1;
2179 connected_peer_ids.push(peer_id);
2180 if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2181 debug!(
2182 "Client bootstrap target reached ({successful_connections} peers) — cancelling pending dials"
2183 );
2184 break;
2185 }
2186 }
2187 }
2188 }
2192
2193 if successful_connections == 0 {
2194 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2198 let transport_peers = self.transport.connected_peers().await;
2199 if !transport_peers.is_empty() {
2200 info!(
2201 "No outbound bootstrap succeeded, but {} inbound peer(s) connected — proceeding with DHT bootstrap",
2202 transport_peers.len()
2203 );
2204 connected_peer_ids = transport_peers;
2205 successful_connections = connected_peer_ids.len();
2206 } else {
2207 if !used_cache {
2208 warn!("Failed to connect to any bootstrap peers");
2209 }
2210 return Ok(());
2213 }
2214 }
2215
2216 info!(
2217 "Successfully connected to {} bootstrap peers",
2218 successful_connections
2219 );
2220
2221 match self
2223 .dht_manager()
2224 .bootstrap_from_peers(&connected_peer_ids)
2225 .await
2226 {
2227 Ok(count) => info!("DHT peer discovery found {} peers", count),
2228 Err(e) => warn!("DHT peer discovery failed: {}", e),
2229 }
2230
2231 if matches!(self.config.mode, NodeMode::Node) {
2241 const SELF_LOOKUP_ROUNDS: u8 = 2;
2242 for i in 1..=SELF_LOOKUP_ROUNDS {
2243 if let Err(e) = self.dht_manager().trigger_self_lookup().await {
2244 warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
2245 } else {
2246 debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
2247 }
2248 }
2249 } else {
2250 debug!("Skipping post-bootstrap self-lookups (client mode)");
2251 }
2252
2253 self.is_bootstrapped.store(true, Ordering::SeqCst);
2256 info!(
2257 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2258 successful_connections,
2259 connected_peer_ids.len()
2260 );
2261
2262 if let Some(ref dir) = self.config.close_group_cache_dir
2265 && let Err(e) = self.save_close_group_cache(dir).await
2266 {
2267 warn!("Failed to save close group cache after bootstrap: {e}");
2268 }
2269
2270 Ok(())
2271 }
2272
2273 async fn dial_bootstrap_addr_set(
2278 &self,
2279 addrs: &[MultiAddr],
2280 used_cache: bool,
2281 identity_timeout: Duration,
2282 ) -> Option<PeerId> {
2283 for addr in addrs {
2284 match self
2289 .transport
2290 .connect_peer_typed(addr, AddressType::Unverified)
2291 .await
2292 {
2293 Ok(channel_id) => match self
2294 .transport
2295 .wait_for_peer_identity(&channel_id, identity_timeout)
2296 .await
2297 {
2298 Ok(real_peer_id) => {
2299 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2300 let manager = bootstrap_manager.read().await;
2301 if let Some(sa) = addr.socket_addr() {
2302 manager.record_success(&sa, 100).await;
2303 }
2304 }
2305 return Some(real_peer_id);
2306 }
2307 Err(e) => {
2308 warn!(
2309 "Timeout waiting for identity from bootstrap peer {}: {}, \
2310 closing channel {}",
2311 addr, e, channel_id
2312 );
2313 self.disconnect_channel(&channel_id).await;
2314 }
2315 },
2316 Err(e) => {
2317 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2318 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2319 let manager = bootstrap_manager.read().await;
2320 if let Some(sa) = addr.socket_addr() {
2321 manager.record_failure(&sa).await;
2322 }
2323 }
2324 }
2325 }
2326 }
2327 None
2328 }
2329
2330 async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
2332 let key: crate::dht::Key = *self.peer_id.as_bytes();
2333 let k_value = self.config.dht_config.k_value;
2334 let close_group = self
2335 .dht_manager()
2336 .find_closest_nodes_local(&key, k_value)
2337 .await;
2338
2339 if close_group.is_empty() {
2340 debug!("No close group peers to save");
2341 return Ok(());
2342 }
2343
2344 let trust_engine = self.adaptive_dht.trust_engine();
2345 let now_epoch = SystemTime::now()
2346 .duration_since(UNIX_EPOCH)
2347 .map(|d| d.as_secs())
2348 .unwrap_or(0);
2349
2350 let peers: Vec<CachedCloseGroupPeer> = close_group
2351 .into_iter()
2352 .filter_map(|dht_node| {
2353 let score = trust_engine.score(&dht_node.peer_id);
2354 if !score.is_finite() {
2357 return None;
2358 }
2359 Some(CachedCloseGroupPeer {
2360 peer_id: dht_node.peer_id,
2361 addresses: dht_node.addresses,
2362 trust: TrustRecord {
2363 score,
2364 last_updated_epoch_secs: now_epoch,
2365 },
2366 })
2367 })
2368 .collect();
2369
2370 let peer_count = peers.len();
2371 let cache = CloseGroupCache {
2372 peers,
2373 saved_at_epoch_secs: now_epoch,
2374 };
2375
2376 cache.save_to_dir(dir).await?;
2377 info!(
2378 "Saved {} close group peers to cache in {}",
2379 peer_count,
2380 dir.display()
2381 );
2382 Ok(())
2383 }
2384
2385 }
2387
2388#[async_trait::async_trait]
2390#[allow(dead_code)]
2391pub trait NetworkSender: Send + Sync {
2392 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2394
2395 fn local_peer_id(&self) -> PeerId;
2397}
2398
2399#[cfg(test)]
2403#[allow(clippy::unwrap_used, clippy::expect_used)]
2404mod diversity_tests {
2405 use super::*;
2406 use crate::security::IPDiversityConfig;
2407
2408 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2409 let temp_dir = tempfile::TempDir::new().expect("temp dir");
2411 let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
2412 bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
2413
2414 BootstrapManager::with_node_config(bootstrap_config, config)
2415 .await
2416 .expect("bootstrap manager")
2417 }
2418
2419 #[tokio::test]
2420 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2421 let config = NodeConfig {
2422 diversity_config: Some(IPDiversityConfig::testnet()),
2423 ..Default::default()
2424 };
2425
2426 let manager = build_bootstrap_manager_like_prod(&config).await;
2427 assert_eq!(manager.diversity_config().max_per_ip, Some(usize::MAX));
2429 assert_eq!(manager.diversity_config().max_per_subnet, Some(usize::MAX));
2430 }
2431}
2432
2433pub(crate) fn register_new_channel(
2440 peers: &DashMap<String, PeerInfo>,
2441 channel_id: &str,
2442 remote_addr: &MultiAddr,
2443) {
2444 let peer_info = PeerInfo {
2445 channel_id: channel_id.to_owned(),
2446 addresses: vec![remote_addr.clone()],
2447 connected_at: tokio::time::Instant::now(),
2448 last_seen: tokio::time::Instant::now(),
2449 status: ConnectionStatus::Connected,
2450 protocols: vec!["p2p-core/1.0.0".to_string()],
2451 heartbeat_count: 0,
2452 };
2453 peers.insert(channel_id.to_owned(), peer_info);
2454}
2455
2456#[cfg(test)]
2457mod tests {
2458 use super::*;
2459 use std::time::Duration;
2461 use tokio::time::timeout;
2462
2463 const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
2465
2466 fn create_test_node_config() -> NodeConfig {
2472 NodeConfig {
2473 local: true,
2474 port: 0,
2475 ipv6: true,
2476 bootstrap_peers: vec![],
2477 connection_timeout: Duration::from_secs(2),
2478 max_connections: 100,
2479 dht_config: DHTConfig::default(),
2480 bootstrap_cache_config: None,
2481 diversity_config: None,
2482 max_message_size: None,
2483 node_identity: None,
2484 mode: NodeMode::default(),
2485 custom_user_agent: None,
2486 allow_loopback: true,
2487 adaptive_dht_config: AdaptiveDhtConfig::default(),
2488 close_group_cache_dir: None,
2489 }
2490 }
2491
2492 #[tokio::test]
2496 async fn test_node_config_default() {
2497 let config = NodeConfig::default();
2498
2499 assert_eq!(config.listen_addrs().len(), 2); assert_eq!(config.max_connections, 10000);
2501 assert_eq!(config.connection_timeout, Duration::from_secs(25));
2502 }
2503
2504 #[tokio::test]
2505 async fn test_dht_config_default() {
2506 let config = DHTConfig::default();
2507
2508 assert_eq!(config.k_value, 20);
2509 assert_eq!(config.alpha_value, 3);
2510 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2511 }
2512
2513 #[test]
2514 fn test_connection_status_variants() {
2515 let connecting = ConnectionStatus::Connecting;
2516 let connected = ConnectionStatus::Connected;
2517 let disconnecting = ConnectionStatus::Disconnecting;
2518 let disconnected = ConnectionStatus::Disconnected;
2519 let failed = ConnectionStatus::Failed("test error".to_string());
2520
2521 assert_eq!(connecting, ConnectionStatus::Connecting);
2522 assert_eq!(connected, ConnectionStatus::Connected);
2523 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2524 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2525 assert_ne!(connecting, connected);
2526
2527 if let ConnectionStatus::Failed(msg) = failed {
2528 assert_eq!(msg, "test error");
2529 } else {
2530 panic!("Expected Failed status");
2531 }
2532 }
2533
2534 #[tokio::test]
2535 async fn test_node_creation() -> Result<()> {
2536 let config = create_test_node_config();
2537 let node = P2PNode::new(config).await?;
2538
2539 assert_eq!(node.peer_id().to_hex().len(), 64);
2541 assert!(!node.is_running());
2542 assert_eq!(node.peer_count().await, 0);
2543 assert!(node.connected_peers().await.is_empty());
2544
2545 Ok(())
2546 }
2547
2548 #[tokio::test]
2549 async fn test_node_lifecycle() -> Result<()> {
2550 let config = create_test_node_config();
2551 let node = P2PNode::new(config).await?;
2552
2553 assert!(!node.is_running());
2555
2556 node.start().await?;
2558 assert!(node.is_running());
2559
2560 let listen_addrs = node.listen_addrs().await;
2562 assert!(
2563 !listen_addrs.is_empty(),
2564 "Expected at least one listening address"
2565 );
2566
2567 node.stop().await?;
2569 assert!(!node.is_running());
2570
2571 Ok(())
2572 }
2573
2574 #[tokio::test]
2575 async fn test_peer_connection() -> Result<()> {
2576 let config1 = create_test_node_config();
2577 let config2 = create_test_node_config();
2578
2579 let node1 = P2PNode::new(config1).await?;
2580 let node2 = P2PNode::new(config2).await?;
2581
2582 node1.start().await?;
2583 node2.start().await?;
2584
2585 let node2_addr = node2
2586 .listen_addrs()
2587 .await
2588 .into_iter()
2589 .find(|a| a.is_ipv4())
2590 .ok_or_else(|| {
2591 P2PError::Network(crate::error::NetworkError::InvalidAddress(
2592 "Node 2 did not expose an IPv4 listen address".into(),
2593 ))
2594 })?;
2595
2596 let channel_id = node1.connect_peer(&node2_addr).await?;
2599
2600 assert!(node1.is_connection_active(&channel_id).await);
2603
2604 let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2606 assert!(peer_info.is_some());
2607 let info = peer_info.expect("Peer info should exist after connect");
2608 assert_eq!(info.channel_id, channel_id);
2609 assert_eq!(info.status, ConnectionStatus::Connected);
2610 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2611
2612 node1.remove_channel(&channel_id).await;
2614 assert!(!node1.is_connection_active(&channel_id).await);
2615
2616 node1.stop().await?;
2617 node2.stop().await?;
2618
2619 Ok(())
2620 }
2621
2622 #[tokio::test]
2623 async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2624 let config = create_test_node_config();
2625 let node = P2PNode::new(config).await?;
2626
2627 let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2628 let result = node.connect_peer(&tcp_addr).await;
2629
2630 assert!(
2631 matches!(
2632 result,
2633 Err(P2PError::Network(
2634 crate::error::NetworkError::InvalidAddress(_)
2635 ))
2636 ),
2637 "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2638 result
2639 );
2640
2641 Ok(())
2642 }
2643
2644 #[cfg_attr(target_os = "windows", ignore)]
2651 #[tokio::test]
2652 async fn test_event_subscription() -> Result<()> {
2653 let identity1 =
2657 Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2658 let identity2 =
2659 Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2660
2661 let mut config1 = create_test_node_config();
2662 config1.ipv6 = false;
2663 config1.node_identity = Some(identity1);
2664
2665 let node2_peer_id = *identity2.peer_id();
2666 let mut config2 = create_test_node_config();
2667 config2.ipv6 = false;
2668 config2.node_identity = Some(identity2);
2669
2670 let node1 = P2PNode::new(config1).await?;
2671 let node2 = P2PNode::new(config2).await?;
2672
2673 node1.start().await?;
2674 node2.start().await?;
2675
2676 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2677
2678 let mut events = node2.subscribe_events();
2680
2681 let node2_addr = node2.local_addr().ok_or_else(|| {
2682 P2PError::Network(crate::error::NetworkError::ProtocolError(
2683 "No listening address".to_string().into(),
2684 ))
2685 })?;
2686
2687 let mut channel_id = None;
2689 for attempt in 0..3 {
2690 if attempt > 0 {
2691 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2692 }
2693 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2694 Ok(Ok(id)) => {
2695 channel_id = Some(id);
2696 break;
2697 }
2698 Ok(Err(_)) | Err(_) => continue,
2699 }
2700 }
2701 let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2702
2703 let target_peer_id = node1
2705 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2706 .await?;
2707 assert_eq!(target_peer_id, node2_peer_id);
2708
2709 node1
2711 .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2712 .await?;
2713
2714 let event = timeout(Duration::from_secs(2), async {
2716 loop {
2717 match events.recv().await {
2718 Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2719 Ok(P2PEvent::Message { .. }) => continue, Ok(_) => continue,
2721 Err(e) => return Err(e),
2722 }
2723 }
2724 })
2725 .await;
2726 assert!(event.is_ok(), "Should receive PeerConnected event");
2727 let connected_peer_id = event.expect("Timed out").expect("Channel error");
2728 assert!(
2730 connected_peer_id.0.iter().any(|&b| b != 0),
2731 "PeerConnected should carry a non-zero peer ID"
2732 );
2733
2734 node1.stop().await?;
2735 node2.stop().await?;
2736
2737 Ok(())
2738 }
2739
2740 #[cfg_attr(target_os = "windows", ignore)]
2742 #[tokio::test]
2743 async fn test_message_sending() -> Result<()> {
2744 let mut config1 = create_test_node_config();
2746 config1.ipv6 = false;
2747 let node1 = P2PNode::new(config1).await?;
2748 node1.start().await?;
2749
2750 let mut config2 = create_test_node_config();
2751 config2.ipv6 = false;
2752 let node2 = P2PNode::new(config2).await?;
2753 node2.start().await?;
2754
2755 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2757
2758 let node2_addr = node2.local_addr().ok_or_else(|| {
2760 P2PError::Network(crate::error::NetworkError::ProtocolError(
2761 "No listening address".to_string().into(),
2762 ))
2763 })?;
2764
2765 let channel_id =
2767 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2768 Ok(res) => res?,
2769 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2770 };
2771
2772 let target_peer_id = node1
2774 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2775 .await?;
2776 assert_eq!(target_peer_id, node2.peer_id().clone());
2777
2778 let message_data = b"Hello, peer!".to_vec();
2780 let result = match timeout(
2781 Duration::from_millis(500),
2782 node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2783 )
2784 .await
2785 {
2786 Ok(res) => res,
2787 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2788 };
2789 if let Err(e) = &result {
2792 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2793 }
2794
2795 let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2797 let result = node1
2798 .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2799 .await;
2800 assert!(result.is_err(), "Sending to non-existent peer should fail");
2801
2802 node1.stop().await?;
2803 node2.stop().await?;
2804
2805 Ok(())
2806 }
2807
2808 #[tokio::test]
2809 async fn test_remote_mcp_operations() -> Result<()> {
2810 let config = create_test_node_config();
2811 let node = P2PNode::new(config).await?;
2812
2813 node.start().await?;
2815 node.stop().await?;
2816 Ok(())
2817 }
2818
2819 #[tokio::test]
2820 async fn test_health_check() -> Result<()> {
2821 let config = create_test_node_config();
2822 let node = P2PNode::new(config).await?;
2823
2824 let result = node.health_check().await;
2826 assert!(result.is_ok());
2827
2828 Ok(())
2833 }
2834
2835 #[tokio::test]
2836 async fn test_node_uptime() -> Result<()> {
2837 let config = create_test_node_config();
2838 let node = P2PNode::new(config).await?;
2839
2840 let uptime1 = node.uptime();
2841 assert!(uptime1 >= Duration::from_secs(0));
2842
2843 tokio::time::sleep(Duration::from_millis(10)).await;
2845
2846 let uptime2 = node.uptime();
2847 assert!(uptime2 > uptime1);
2848
2849 Ok(())
2850 }
2851
2852 #[tokio::test]
2853 async fn test_node_config_access() -> Result<()> {
2854 let config = create_test_node_config();
2855 let node = P2PNode::new(config).await?;
2856
2857 let node_config = node.config();
2858 assert_eq!(node_config.max_connections, 100);
2859 Ok(())
2862 }
2863
2864 #[tokio::test]
2865 async fn test_mcp_server_access() -> Result<()> {
2866 let config = create_test_node_config();
2867 let _node = P2PNode::new(config).await?;
2868
2869 Ok(())
2871 }
2872
2873 #[tokio::test]
2874 async fn test_dht_access() -> Result<()> {
2875 let config = create_test_node_config();
2876 let node = P2PNode::new(config).await?;
2877
2878 let _dht = node.dht();
2880
2881 Ok(())
2882 }
2883
2884 #[tokio::test]
2885 async fn test_node_config_builder() -> Result<()> {
2886 let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2887
2888 let config = NodeConfig::builder()
2889 .local(true)
2890 .ipv6(true)
2891 .bootstrap_peer(bootstrap)
2892 .connection_timeout(Duration::from_secs(15))
2893 .max_connections(200)
2894 .max_message_size(TEST_MAX_MESSAGE_SIZE)
2895 .build()?;
2896
2897 assert_eq!(config.listen_addrs().len(), 2); assert!(config.local);
2899 assert!(config.ipv6);
2900 assert_eq!(config.bootstrap_peers.len(), 1);
2901 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2902 assert_eq!(config.max_connections, 200);
2903 assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2904 assert!(config.allow_loopback); Ok(())
2907 }
2908
2909 #[tokio::test]
2910 async fn test_bootstrap_peers() -> Result<()> {
2911 let mut config = create_test_node_config();
2912 config.bootstrap_peers = vec![
2913 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2914 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2915 ];
2916
2917 let node = P2PNode::new(config).await?;
2918
2919 node.start().await?;
2921
2922 let _peer_count = node.peer_count().await;
2926
2927 node.stop().await?;
2928 Ok(())
2929 }
2930
2931 #[tokio::test]
2932 async fn test_peer_info_structure() {
2933 let peer_info = PeerInfo {
2934 channel_id: "test_peer".to_string(),
2935 addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2936 connected_at: Instant::now(),
2937 last_seen: Instant::now(),
2938 status: ConnectionStatus::Connected,
2939 protocols: vec!["test-protocol".to_string()],
2940 heartbeat_count: 0,
2941 };
2942
2943 assert_eq!(peer_info.channel_id, "test_peer");
2944 assert_eq!(peer_info.addresses.len(), 1);
2945 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2946 assert_eq!(peer_info.protocols.len(), 1);
2947 }
2948
2949 #[tokio::test]
2950 async fn test_serialization() -> Result<()> {
2951 let config = create_test_node_config();
2953 let serialized = serde_json::to_string(&config)?;
2954 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2955
2956 assert_eq!(config.local, deserialized.local);
2957 assert_eq!(config.port, deserialized.port);
2958 assert_eq!(config.ipv6, deserialized.ipv6);
2959 assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2960
2961 Ok(())
2962 }
2963
2964 #[tokio::test]
2965 async fn test_get_channel_id_by_address_found() -> Result<()> {
2966 let config = create_test_node_config();
2967 let node = P2PNode::new(config).await?;
2968
2969 let test_channel_id = "peer_test_123".to_string();
2971 let test_address = "192.168.1.100:9000";
2972 let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2973
2974 let peer_info = PeerInfo {
2975 channel_id: test_channel_id.clone(),
2976 addresses: vec![test_multiaddr],
2977 connected_at: Instant::now(),
2978 last_seen: Instant::now(),
2979 status: ConnectionStatus::Connected,
2980 protocols: vec!["test-protocol".to_string()],
2981 heartbeat_count: 0,
2982 };
2983
2984 node.transport
2985 .inject_peer(test_channel_id.clone(), peer_info)
2986 .await;
2987
2988 let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2990 let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2991 assert_eq!(found_channel_id, Some(test_channel_id));
2992
2993 Ok(())
2994 }
2995
2996 #[tokio::test]
2997 async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2998 let config = create_test_node_config();
2999 let node = P2PNode::new(config).await?;
3000
3001 let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
3003 let result = node.get_channel_id_by_address(&unknown_addr).await;
3004 assert_eq!(result, None);
3005
3006 Ok(())
3007 }
3008
3009 #[tokio::test]
3010 async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
3011 let config = create_test_node_config();
3012 let node = P2PNode::new(config).await?;
3013
3014 let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
3016 mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
3017 psm: 0x0025,
3018 });
3019 let result = node.get_channel_id_by_address(&ble_addr).await;
3020 assert_eq!(result, None);
3021
3022 Ok(())
3023 }
3024
3025 #[tokio::test]
3026 async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
3027 let config = create_test_node_config();
3028 let node = P2PNode::new(config).await?;
3029
3030 let peer1_id = "peer_1".to_string();
3032 let peer1_addr_str = "192.168.1.101:9001";
3033 let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
3034
3035 let peer2_id = "peer_2".to_string();
3036 let peer2_addr_str = "192.168.1.102:9002";
3037 let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
3038
3039 let peer1_info = PeerInfo {
3040 channel_id: peer1_id.clone(),
3041 addresses: vec![peer1_multiaddr],
3042 connected_at: Instant::now(),
3043 last_seen: Instant::now(),
3044 status: ConnectionStatus::Connected,
3045 protocols: vec!["test-protocol".to_string()],
3046 heartbeat_count: 0,
3047 };
3048
3049 let peer2_info = PeerInfo {
3050 channel_id: peer2_id.clone(),
3051 addresses: vec![peer2_multiaddr],
3052 connected_at: Instant::now(),
3053 last_seen: Instant::now(),
3054 status: ConnectionStatus::Connected,
3055 protocols: vec!["test-protocol".to_string()],
3056 heartbeat_count: 0,
3057 };
3058
3059 node.transport
3060 .inject_peer(peer1_id.clone(), peer1_info)
3061 .await;
3062 node.transport
3063 .inject_peer(peer2_id.clone(), peer2_info)
3064 .await;
3065
3066 let found_peer1 = node
3068 .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
3069 .await;
3070 let found_peer2 = node
3071 .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
3072 .await;
3073
3074 assert_eq!(found_peer1, Some(peer1_id));
3075 assert_eq!(found_peer2, Some(peer2_id));
3076
3077 Ok(())
3078 }
3079
3080 #[tokio::test]
3081 async fn test_list_active_connections_empty() -> Result<()> {
3082 let config = create_test_node_config();
3083 let node = P2PNode::new(config).await?;
3084
3085 let connections = node.list_active_connections().await;
3087 assert!(connections.is_empty());
3088
3089 Ok(())
3090 }
3091
3092 #[tokio::test]
3093 async fn test_list_active_connections_with_peers() -> Result<()> {
3094 let config = create_test_node_config();
3095 let node = P2PNode::new(config).await?;
3096
3097 let peer1_id = "peer_1".to_string();
3099 let peer1_addrs = vec![
3100 MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
3101 MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
3102 ];
3103
3104 let peer2_id = "peer_2".to_string();
3105 let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
3106
3107 let peer1_info = PeerInfo {
3108 channel_id: peer1_id.clone(),
3109 addresses: peer1_addrs.clone(),
3110 connected_at: Instant::now(),
3111 last_seen: Instant::now(),
3112 status: ConnectionStatus::Connected,
3113 protocols: vec!["test-protocol".to_string()],
3114 heartbeat_count: 0,
3115 };
3116
3117 let peer2_info = PeerInfo {
3118 channel_id: peer2_id.clone(),
3119 addresses: peer2_addrs.clone(),
3120 connected_at: Instant::now(),
3121 last_seen: Instant::now(),
3122 status: ConnectionStatus::Connected,
3123 protocols: vec!["test-protocol".to_string()],
3124 heartbeat_count: 0,
3125 };
3126
3127 node.transport
3128 .inject_peer(peer1_id.clone(), peer1_info)
3129 .await;
3130 node.transport
3131 .inject_peer(peer2_id.clone(), peer2_info)
3132 .await;
3133
3134 node.transport
3136 .inject_active_connection(peer1_id.clone())
3137 .await;
3138 node.transport
3139 .inject_active_connection(peer2_id.clone())
3140 .await;
3141
3142 let connections = node.list_active_connections().await;
3144 assert_eq!(connections.len(), 2);
3145
3146 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3148 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3149
3150 assert!(peer1_conn.is_some());
3151 assert!(peer2_conn.is_some());
3152
3153 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3155 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3156
3157 Ok(())
3158 }
3159
3160 #[tokio::test]
3161 async fn test_remove_channel_success() -> Result<()> {
3162 let config = create_test_node_config();
3163 let node = P2PNode::new(config).await?;
3164
3165 let channel_id = "peer_to_remove".to_string();
3167 let channel_peer_id = PeerId::from_name(&channel_id);
3168 let peer_info = PeerInfo {
3169 channel_id: channel_id.clone(),
3170 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3171 connected_at: Instant::now(),
3172 last_seen: Instant::now(),
3173 status: ConnectionStatus::Connected,
3174 protocols: vec!["test-protocol".to_string()],
3175 heartbeat_count: 0,
3176 };
3177
3178 node.transport
3179 .inject_peer(channel_id.clone(), peer_info)
3180 .await;
3181 node.transport
3182 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3183 .await;
3184
3185 assert!(node.is_peer_connected(&channel_peer_id).await);
3187
3188 let removed = node.remove_channel(&channel_id).await;
3190 assert!(removed);
3191
3192 assert!(!node.is_peer_connected(&channel_peer_id).await);
3194
3195 Ok(())
3196 }
3197
3198 #[tokio::test]
3199 async fn test_remove_channel_nonexistent() -> Result<()> {
3200 let config = create_test_node_config();
3201 let node = P2PNode::new(config).await?;
3202
3203 let removed = node.remove_channel("nonexistent_peer").await;
3205 assert!(!removed);
3206
3207 Ok(())
3208 }
3209
3210 #[tokio::test]
3211 async fn test_is_peer_connected() -> Result<()> {
3212 let config = create_test_node_config();
3213 let node = P2PNode::new(config).await?;
3214
3215 let channel_id = "test_peer".to_string();
3216 let channel_peer_id = PeerId::from_name(&channel_id);
3217
3218 assert!(!node.is_peer_connected(&channel_peer_id).await);
3220
3221 let peer_info = PeerInfo {
3223 channel_id: channel_id.clone(),
3224 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3225 connected_at: Instant::now(),
3226 last_seen: Instant::now(),
3227 status: ConnectionStatus::Connected,
3228 protocols: vec!["test-protocol".to_string()],
3229 heartbeat_count: 0,
3230 };
3231
3232 node.transport
3233 .inject_peer(channel_id.clone(), peer_info)
3234 .await;
3235 node.transport
3236 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3237 .await;
3238
3239 assert!(node.is_peer_connected(&channel_peer_id).await);
3241
3242 node.remove_channel(&channel_id).await;
3244
3245 assert!(!node.is_peer_connected(&channel_peer_id).await);
3247
3248 Ok(())
3249 }
3250
3251 #[test]
3252 fn test_normalize_ipv6_wildcard() {
3253 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3254
3255 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3256 let normalized = normalize_wildcard_to_loopback(wildcard);
3257
3258 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3259 assert_eq!(normalized.port(), 8080);
3260 }
3261
3262 #[test]
3263 fn test_normalize_ipv4_wildcard() {
3264 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3265
3266 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3267 let normalized = normalize_wildcard_to_loopback(wildcard);
3268
3269 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3270 assert_eq!(normalized.port(), 9000);
3271 }
3272
3273 #[test]
3274 fn test_normalize_specific_address_unchanged() {
3275 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3276 let normalized = normalize_wildcard_to_loopback(specific);
3277
3278 assert_eq!(normalized, specific);
3279 }
3280
3281 #[test]
3282 fn test_normalize_loopback_unchanged() {
3283 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3284
3285 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3286 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3287 assert_eq!(normalized_v6, loopback_v6);
3288
3289 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3290 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3291 assert_eq!(normalized_v4, loopback_v4);
3292 }
3293
3294 fn current_timestamp() -> u64 {
3298 std::time::SystemTime::now()
3299 .duration_since(std::time::UNIX_EPOCH)
3300 .map(|d| d.as_secs())
3301 .unwrap_or(0)
3302 }
3303
3304 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3306 let msg = WireMessage {
3307 protocol: protocol.to_string(),
3308 data,
3309 from: PeerId::from_name(from),
3310 timestamp,
3311 user_agent: String::new(),
3312 public_key: Vec::new(),
3313 signature: Vec::new(),
3314 };
3315 postcard::to_stdvec(&msg).unwrap()
3316 }
3317
3318 #[test]
3319 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3320 let transport_id = "abcdef0123456789";
3323 let logical_id = "spoofed-logical-id";
3324 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
3325
3326 let parsed =
3327 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3328
3329 assert!(parsed.authenticated_node_id.is_none());
3331
3332 match parsed.event {
3333 P2PEvent::Message {
3334 topic,
3335 source,
3336 transport_source,
3337 data,
3338 } => {
3339 assert!(source.is_none(), "unsigned message source must be None");
3340 assert!(
3341 transport_source.is_none(),
3342 "non-socket transport source should not produce an IP transport address"
3343 );
3344 assert_eq!(topic, "test/v1");
3345 assert_eq!(data, vec![1u8, 2, 3]);
3346 }
3347 other => panic!("expected P2PEvent::Message, got {:?}", other),
3348 }
3349 }
3350
3351 #[test]
3352 fn test_parse_protocol_message_rejects_invalid_bytes() {
3353 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3355 }
3356
3357 #[test]
3358 fn test_parse_protocol_message_rejects_truncated_message() {
3359 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
3361 let truncated = &full_bytes[..full_bytes.len() / 2];
3362 assert!(parse_protocol_message(truncated, "peer-id").is_none());
3363 }
3364
3365 #[test]
3366 fn test_parse_protocol_message_empty_payload() {
3367 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
3368
3369 let parsed = parse_protocol_message(&bytes, "transport-peer")
3370 .expect("valid message with empty data should parse");
3371
3372 match parsed.event {
3373 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3374 other => panic!("expected P2PEvent::Message, got {:?}", other),
3375 }
3376 }
3377
3378 #[test]
3379 fn test_parse_protocol_message_records_ip_transport_source() {
3380 let bytes = make_wire_bytes("ping", vec![1], "sender", current_timestamp());
3381
3382 let parsed =
3383 parse_protocol_message(&bytes, "192.168.1.2:4567").expect("valid message should parse");
3384
3385 match parsed.event {
3386 P2PEvent::Message {
3387 transport_source, ..
3388 } => {
3389 assert_eq!(
3390 transport_source,
3391 Some(MultiAddr::quic("192.168.1.2:4567".parse().unwrap()))
3392 );
3393 }
3394 other => panic!("expected P2PEvent::Message, got {:?}", other),
3395 }
3396 }
3397
3398 #[test]
3399 fn test_parse_protocol_message_preserves_binary_payload() {
3400 let payload: Vec<u8> = (0..=255).collect();
3402 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
3403
3404 let parsed = parse_protocol_message(&bytes, "peer-id")
3405 .expect("valid message with full byte range should parse");
3406
3407 match parsed.event {
3408 P2PEvent::Message { data, topic, .. } => {
3409 assert_eq!(topic, "binary/v1");
3410 assert_eq!(
3411 data, payload,
3412 "payload must survive bincode round-trip exactly"
3413 );
3414 }
3415 other => panic!("expected P2PEvent::Message, got {:?}", other),
3416 }
3417 }
3418
3419 #[test]
3420 fn test_parse_signed_message_verifies_and_uses_node_id() {
3421 let identity = NodeIdentity::generate().expect("should generate identity");
3422 let protocol = "test/signed";
3423 let data: Vec<u8> = vec![10, 20, 30];
3424 let from = *identity.peer_id();
3426 let timestamp = current_timestamp();
3427 let user_agent = "test/1.0";
3428
3429 let signable =
3431 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3432 .unwrap();
3433 let sig = identity.sign(&signable).expect("signing should succeed");
3434
3435 let msg = WireMessage {
3436 protocol: protocol.to_string(),
3437 data: data.clone(),
3438 from,
3439 timestamp,
3440 user_agent: user_agent.to_string(),
3441 public_key: identity.public_key().as_bytes().to_vec(),
3442 signature: sig.as_bytes().to_vec(),
3443 };
3444 let bytes = postcard::to_stdvec(&msg).unwrap();
3445
3446 let parsed =
3447 parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
3448
3449 let expected_peer_id = *identity.peer_id();
3450 assert_eq!(
3451 parsed.authenticated_node_id.as_ref(),
3452 Some(&expected_peer_id)
3453 );
3454
3455 match parsed.event {
3456 P2PEvent::Message { source, .. } => {
3457 assert_eq!(
3458 source.as_ref(),
3459 Some(&expected_peer_id),
3460 "source should be the verified PeerId"
3461 );
3462 }
3463 other => panic!("expected P2PEvent::Message, got {:?}", other),
3464 }
3465 }
3466
3467 #[test]
3468 fn test_parse_message_with_bad_signature_is_rejected() {
3469 let identity = NodeIdentity::generate().expect("should generate identity");
3470 let protocol = "test/bad-sig";
3471 let data: Vec<u8> = vec![1, 2, 3];
3472 let from = *identity.peer_id();
3473 let timestamp = current_timestamp();
3474 let user_agent = "test/1.0";
3475
3476 let signable =
3478 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3479 .unwrap();
3480 let sig = identity.sign(&signable).expect("signing should succeed");
3481
3482 let msg = WireMessage {
3484 protocol: protocol.to_string(),
3485 data: vec![99, 99, 99],
3486 from,
3487 timestamp,
3488 user_agent: user_agent.to_string(),
3489 public_key: identity.public_key().as_bytes().to_vec(),
3490 signature: sig.as_bytes().to_vec(),
3491 };
3492 let bytes = postcard::to_stdvec(&msg).unwrap();
3493
3494 assert!(
3495 parse_protocol_message(&bytes, "transport-xyz").is_none(),
3496 "message with bad signature should be rejected"
3497 );
3498 }
3499
3500 #[test]
3501 fn test_parse_message_with_mismatched_from_is_rejected() {
3502 let identity = NodeIdentity::generate().expect("should generate identity");
3503 let protocol = "test/from-mismatch";
3504 let data: Vec<u8> = vec![1, 2, 3];
3505 let fake_from = PeerId::from_bytes([0xDE; 32]);
3507 let timestamp = current_timestamp();
3508 let user_agent = "test/1.0";
3509
3510 let signable =
3511 postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
3512 .unwrap();
3513 let sig = identity.sign(&signable).expect("signing should succeed");
3514
3515 let msg = WireMessage {
3516 protocol: protocol.to_string(),
3517 data,
3518 from: fake_from,
3519 timestamp,
3520 user_agent: user_agent.to_string(),
3521 public_key: identity.public_key().as_bytes().to_vec(),
3522 signature: sig.as_bytes().to_vec(),
3523 };
3524 let bytes = postcard::to_stdvec(&msg).unwrap();
3525
3526 assert!(
3527 parse_protocol_message(&bytes, "transport-xyz").is_none(),
3528 "message with mismatched from field should be rejected"
3529 );
3530 }
3531}