1use crate::PeerId;
20use crate::adaptive::trust::{TrustRecord, TrustSnapshot};
21use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
22use crate::bootstrap::cache::{CachedCloseGroupPeer, CloseGroupCache};
23use crate::bootstrap::{BootstrapConfig, BootstrapManager};
24use crate::dht::core_engine::AddressType;
25use crate::dht_network_manager::{
26 DhtNetworkConfig, DhtNetworkEvent, DhtNetworkManager, IDENTITY_EXCHANGE_TIMEOUT,
27};
28use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
29use crate::reachability::spawn_acquisition_driver;
30
31use crate::MultiAddr;
32use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
33use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
34use dashmap::DashMap;
35use futures::StreamExt;
36use parking_lot::Mutex as ParkingMutex;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::path::{Path, PathBuf};
41use std::sync::Arc;
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::time::{Duration, SystemTime, UNIX_EPOCH};
44use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
45use tokio::time::Instant;
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, info, trace, warn};
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
53pub(crate) struct WireMessage {
54 pub(crate) protocol: String,
56 pub(crate) data: Vec<u8>,
58 pub(crate) from: PeerId,
60 pub(crate) timestamp: u64,
62 #[serde(default)]
68 pub(crate) user_agent: String,
69 #[serde(default)]
71 pub(crate) public_key: Vec<u8>,
72 #[serde(default)]
74 pub(crate) signature: Vec<u8>,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
83pub enum NodeMode {
84 #[default]
86 Node,
87 Client,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93enum ListenMode {
94 Public,
96 Local,
98}
99
100pub fn user_agent_for_mode(mode: NodeMode) -> String {
105 let prefix = match mode {
106 NodeMode::Node => "node",
107 NodeMode::Client => "client",
108 };
109 format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
110}
111
112pub fn is_dht_participant(user_agent: &str) -> bool {
114 user_agent.starts_with("node/")
115}
116
117pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
119
120pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
122
123pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
125
126const DEFAULT_LISTEN_PORT: u16 = 9000;
128
129const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
131
132const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 25;
139
140const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
142
143const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 3;
157
158const MAX_CONCURRENT_BOOTSTRAP_DIALS: usize = 4;
165
166const CLIENT_BOOTSTRAP_TARGET: usize = 6;
175
176const fn default_true() -> bool {
178 true
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct NodeConfig {
184 #[serde(default)]
190 pub local: bool,
191
192 #[serde(default)]
194 pub port: u16,
195
196 #[serde(default = "default_true")]
201 pub ipv6: bool,
202
203 pub bootstrap_peers: Vec<crate::MultiAddr>,
205
206 pub connection_timeout: Duration,
209
210 pub max_connections: usize,
212
213 pub dht_config: DHTConfig,
215
216 pub bootstrap_cache_config: Option<BootstrapConfig>,
218
219 pub diversity_config: Option<crate::security::IPDiversityConfig>,
224
225 #[serde(default)]
229 pub max_message_size: Option<usize>,
230
231 #[serde(skip)]
236 pub node_identity: Option<Arc<NodeIdentity>>,
237
238 #[serde(default)]
244 pub mode: NodeMode,
245
246 #[serde(default, skip_serializing_if = "Option::is_none")]
251 pub custom_user_agent: Option<String>,
252
253 #[serde(default)]
261 pub allow_loopback: bool,
262
263 #[serde(default)]
271 pub adaptive_dht_config: AdaptiveDhtConfig,
272
273 #[serde(default, skip_serializing_if = "Option::is_none")]
284 pub close_group_cache_dir: Option<PathBuf>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct DHTConfig {
290 pub k_value: usize,
292
293 pub alpha_value: usize,
295
296 pub refresh_interval: Duration,
298}
299
300#[inline]
313fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
314 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
315
316 let (v4, v6) = match mode {
317 ListenMode::Public => (
318 std::net::Ipv4Addr::UNSPECIFIED,
319 std::net::Ipv6Addr::UNSPECIFIED,
320 ),
321 ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
322 };
323
324 if ipv6_enabled {
325 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
326 std::net::IpAddr::V6(v6),
327 port,
328 )));
329 }
330
331 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
332 std::net::IpAddr::V4(v4),
333 port,
334 )));
335
336 addrs
337}
338
339impl NodeConfig {
340 pub fn user_agent(&self) -> String {
345 self.custom_user_agent
346 .clone()
347 .unwrap_or_else(|| user_agent_for_mode(self.mode))
348 }
349
350 pub fn listen_addrs(&self) -> Vec<MultiAddr> {
355 let mode = if self.local {
356 ListenMode::Local
357 } else {
358 ListenMode::Public
359 };
360 build_listen_addrs(self.port, self.ipv6, mode)
361 }
362
363 pub fn new() -> Result<Self> {
369 Ok(Self::default())
370 }
371
372 pub fn builder() -> NodeConfigBuilder {
374 NodeConfigBuilder::default()
375 }
376}
377
378#[derive(Debug, Clone)]
401pub struct NodeConfigBuilder {
402 port: u16,
403 ipv6: bool,
404 local: bool,
405 bootstrap_peers: Vec<crate::MultiAddr>,
406 max_connections: Option<usize>,
407 connection_timeout: Option<Duration>,
408 dht_config: Option<DHTConfig>,
409 max_message_size: Option<usize>,
410 mode: NodeMode,
411 custom_user_agent: Option<String>,
412 allow_loopback: Option<bool>,
413 adaptive_dht_config: Option<AdaptiveDhtConfig>,
414 close_group_cache_dir: Option<PathBuf>,
415}
416
417impl Default for NodeConfigBuilder {
418 fn default() -> Self {
419 Self {
420 port: 0,
421 ipv6: true,
422 local: false,
423 bootstrap_peers: Vec::new(),
424 max_connections: None,
425 connection_timeout: None,
426 dht_config: None,
427 max_message_size: None,
428 mode: NodeMode::default(),
429 custom_user_agent: None,
430 allow_loopback: None,
431 adaptive_dht_config: None,
432 close_group_cache_dir: None,
433 }
434 }
435}
436
437impl NodeConfigBuilder {
438 pub fn port(mut self, port: u16) -> Self {
440 self.port = port;
441 self
442 }
443
444 pub fn ipv6(mut self, enabled: bool) -> Self {
446 self.ipv6 = enabled;
447 self
448 }
449
450 pub fn local(mut self, local: bool) -> Self {
457 self.local = local;
458 self
459 }
460
461 pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
463 self.bootstrap_peers.push(addr);
464 self
465 }
466
467 pub fn max_connections(mut self, max: usize) -> Self {
469 self.max_connections = Some(max);
470 self
471 }
472
473 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
475 self.connection_timeout = Some(timeout);
476 self
477 }
478
479 pub fn dht_config(mut self, config: DHTConfig) -> Self {
481 self.dht_config = Some(config);
482 self
483 }
484
485 pub fn max_message_size(mut self, max_message_size: usize) -> Self {
489 self.max_message_size = Some(max_message_size);
490 self
491 }
492
493 pub fn mode(mut self, mode: NodeMode) -> Self {
495 self.mode = mode;
496 self
497 }
498
499 pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
501 self.custom_user_agent = Some(user_agent.into());
502 self
503 }
504
505 pub fn allow_loopback(mut self, allow: bool) -> Self {
509 self.allow_loopback = Some(allow);
510 self
511 }
512
513 pub fn trust_enforcement(mut self, enabled: bool) -> Self {
526 let threshold = if enabled {
527 AdaptiveDhtConfig::default().swap_threshold
528 } else {
529 0.0
530 };
531 self.adaptive_dht_config = Some(AdaptiveDhtConfig {
532 swap_threshold: threshold,
533 });
534 self
535 }
536
537 pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
541 self.adaptive_dht_config = Some(config);
542 self
543 }
544
545 pub fn close_group_cache_dir(mut self, path: impl Into<PathBuf>) -> Self {
550 self.close_group_cache_dir = Some(path.into());
551 self
552 }
553
554 pub fn build(self) -> Result<NodeConfig> {
560 let allow_loopback = self.allow_loopback.unwrap_or(self.local);
562
563 Ok(NodeConfig {
564 local: self.local,
565 port: self.port,
566 ipv6: self.ipv6,
567 bootstrap_peers: self.bootstrap_peers,
568 connection_timeout: self
569 .connection_timeout
570 .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
571 max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
572 dht_config: self.dht_config.unwrap_or_default(),
573 bootstrap_cache_config: None,
574 diversity_config: None,
575 max_message_size: self.max_message_size,
576 node_identity: None,
577 mode: self.mode,
578 custom_user_agent: self.custom_user_agent,
579 allow_loopback,
580 adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
581 close_group_cache_dir: self.close_group_cache_dir,
582 })
583 }
584}
585
586impl Default for NodeConfig {
587 fn default() -> Self {
588 Self {
589 local: false,
590 port: DEFAULT_LISTEN_PORT,
591 ipv6: true,
592 bootstrap_peers: Vec::new(),
593 connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
594 max_connections: DEFAULT_MAX_CONNECTIONS,
595 dht_config: DHTConfig::default(),
596 bootstrap_cache_config: None,
597 diversity_config: None,
598 max_message_size: None,
599 node_identity: None,
600 mode: NodeMode::default(),
601 custom_user_agent: None,
602 allow_loopback: false,
603 adaptive_dht_config: AdaptiveDhtConfig::default(),
604 close_group_cache_dir: None,
605 }
606 }
607}
608
609impl DHTConfig {
610 pub const DEFAULT_K_VALUE: usize = 20;
612 const DEFAULT_ALPHA_VALUE: usize = 3;
613 const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
614 const MIN_K_VALUE: usize = 4;
616
617 pub fn validate(&self) -> Result<()> {
621 if self.k_value < Self::MIN_K_VALUE {
622 return Err(P2PError::Validation(
623 format!(
624 "k_value must be >= {} (got {}), values below {} produce degenerate behavior",
625 Self::MIN_K_VALUE,
626 self.k_value,
627 Self::MIN_K_VALUE,
628 )
629 .into(),
630 ));
631 }
632 if self.alpha_value < 1 {
633 return Err(P2PError::Validation(
634 format!("alpha_value must be >= 1 (got {})", self.alpha_value).into(),
635 ));
636 }
637 if self.refresh_interval.is_zero() {
638 return Err(P2PError::Validation("refresh_interval must be > 0".into()));
639 }
640 Ok(())
641 }
642}
643
644impl Default for DHTConfig {
645 fn default() -> Self {
646 Self {
647 k_value: Self::DEFAULT_K_VALUE,
648 alpha_value: Self::DEFAULT_ALPHA_VALUE,
649 refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
650 }
651 }
652}
653
654#[derive(Debug, Clone)]
656pub struct PeerInfo {
657 #[allow(dead_code)]
659 pub(crate) channel_id: String,
660
661 pub addresses: Vec<MultiAddr>,
663
664 pub connected_at: Instant,
666
667 pub last_seen: Instant,
669
670 pub status: ConnectionStatus,
672
673 pub protocols: Vec<String>,
675
676 pub heartbeat_count: u64,
678}
679
680#[derive(Debug, Clone, PartialEq)]
682pub enum ConnectionStatus {
683 Connecting,
685 Connected,
687 Disconnecting,
689 Disconnected,
691 Failed(String),
693}
694
695#[derive(Debug, Clone)]
700pub enum P2PEvent {
701 Message {
703 topic: String,
705 source: Option<PeerId>,
708 data: Vec<u8>,
710 },
711 PeerConnected(PeerId, String),
714 PeerDisconnected(PeerId),
716}
717
718#[derive(Debug, Clone)]
723pub struct PeerResponse {
724 pub peer_id: PeerId,
726 pub data: Vec<u8>,
728 pub latency: Duration,
730}
731
732#[derive(Debug, Clone, Serialize, Deserialize)]
737pub(crate) struct RequestResponseEnvelope {
738 pub(crate) message_id: String,
740 pub(crate) is_response: bool,
742 pub(crate) payload: Vec<u8>,
744}
745
746pub(crate) struct PendingRequest {
748 pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
750 pub(crate) expected_peer: PeerId,
752}
753
754const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
760
761pub struct P2PNode {
772 config: NodeConfig,
774
775 peer_id: PeerId,
777
778 transport: Arc<crate::transport_handle::TransportHandle>,
780
781 start_time: Instant,
783
784 shutdown: CancellationToken,
786
787 adaptive_dht: AdaptiveDHT,
790
791 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
793
794 is_bootstrapped: Arc<AtomicBool>,
796
797 is_started: Arc<AtomicBool>,
799
800 reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
804
805 relayer_peer_id: Arc<RwLock<Option<PeerId>>>,
814
815 relay_address: Arc<RwLock<Option<SocketAddr>>>,
822}
823
824pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
840 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
841
842 if addr.ip().is_unspecified() {
843 let loopback_ip = match addr {
845 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
848 std::net::SocketAddr::new(loopback_ip, addr.port())
849 } else {
850 addr
852 }
853}
854
855impl P2PNode {
856 pub async fn new(config: NodeConfig) -> Result<Self> {
858 let node_identity = match config.node_identity.clone() {
860 Some(identity) => identity,
861 None => Arc::new(NodeIdentity::generate()?),
862 };
863
864 let peer_id = *node_identity.peer_id();
866
867 config.dht_config.validate()?;
870 if let Some(ref diversity) = config.diversity_config {
871 diversity
872 .validate()
873 .map_err(|e| P2PError::Validation(format!("IP diversity config: {e}").into()))?;
874 }
875
876 let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
878 let bootstrap_manager =
879 match BootstrapManager::with_node_config(bootstrap_config, &config).await {
880 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
881 Err(e) => {
882 warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
883 None
884 }
885 };
886
887 let transport_config = crate::transport_handle::TransportConfig::from_node_config(
889 &config,
890 crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
891 node_identity.clone(),
892 );
893 let transport =
894 Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
895
896 let dht_manager_config = DhtNetworkConfig {
898 peer_id,
899 node_config: config.clone(),
900 request_timeout: config.connection_timeout,
901 max_concurrent_operations: MAX_ACTIVE_REQUESTS,
902 enable_security: true,
903 swap_threshold: 0.0, };
905 let adaptive_dht = AdaptiveDHT::new(
906 transport.clone(),
907 dht_manager_config,
908 config.adaptive_dht_config.clone(),
909 )
910 .await?;
911
912 let node = Self {
913 config,
914 peer_id,
915 transport,
916 start_time: Instant::now(),
917 shutdown: CancellationToken::new(),
918 adaptive_dht,
919 bootstrap_manager,
920 is_bootstrapped: Arc::new(AtomicBool::new(false)),
921 is_started: Arc::new(AtomicBool::new(false)),
922 reconnect_locks: ParkingMutex::new(HashMap::new()),
923 relayer_peer_id: Arc::new(RwLock::new(None)),
924 relay_address: Arc::new(RwLock::new(None)),
925 };
926 info!(
927 "Created P2P node with peer ID: {} (call start() to begin networking)",
928 node.peer_id
929 );
930
931 Ok(node)
932 }
933
934 pub fn peer_id(&self) -> &PeerId {
936 &self.peer_id
937 }
938
939 pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
941 &self.transport
942 }
943
944 pub async fn relay_address(&self) -> Option<SocketAddr> {
951 *self.relay_address.read().await
952 }
953
954 pub fn local_addr(&self) -> Option<MultiAddr> {
955 self.transport.local_addr()
956 }
957
958 pub fn is_bootstrapped(&self) -> bool {
963 self.is_bootstrapped.load(Ordering::SeqCst)
964 }
965
966 pub async fn re_bootstrap(&self) -> Result<()> {
971 self.is_bootstrapped.store(false, Ordering::SeqCst);
972 self.connect_bootstrap_peers(None).await
973 }
974
975 pub fn trust_engine(&self) -> Arc<TrustEngine> {
981 self.adaptive_dht.trust_engine().clone()
982 }
983
984 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
998 self.adaptive_dht.report_trust_event(peer_id, event).await;
999 }
1000
1001 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
1005 self.adaptive_dht.peer_trust(peer_id)
1006 }
1007
1008 pub fn adaptive_dht(&self) -> &AdaptiveDHT {
1010 &self.adaptive_dht
1011 }
1012
1013 pub async fn send_request(
1046 &self,
1047 peer_id: &PeerId,
1048 protocol: &str,
1049 data: Vec<u8>,
1050 timeout: Duration,
1051 ) -> Result<PeerResponse> {
1052 match self
1053 .transport
1054 .send_request(peer_id, protocol, data, timeout)
1055 .await
1056 {
1057 Ok(resp) => Ok(resp),
1058 Err(e) => {
1059 let event = if matches!(&e, P2PError::Timeout(_)) {
1060 TrustEvent::ConnectionTimeout
1061 } else {
1062 TrustEvent::ConnectionFailed
1063 };
1064 self.report_trust_event(peer_id, event).await;
1065 Err(e)
1066 }
1067 }
1068 }
1069
1070 pub async fn send_response(
1071 &self,
1072 peer_id: &PeerId,
1073 protocol: &str,
1074 message_id: &str,
1075 data: Vec<u8>,
1076 ) -> Result<()> {
1077 self.transport
1078 .send_response(peer_id, protocol, message_id, data)
1079 .await
1080 }
1081
1082 pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
1083 crate::transport_handle::TransportHandle::parse_request_envelope(data)
1084 }
1085
1086 pub async fn subscribe(&self, topic: &str) -> Result<()> {
1087 self.transport.subscribe(topic).await
1088 }
1089
1090 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
1091 self.transport.publish(topic, data).await
1092 }
1093
1094 pub fn config(&self) -> &NodeConfig {
1096 &self.config
1097 }
1098
1099 pub async fn start(&self) -> Result<()> {
1101 info!("Starting P2P node...");
1102
1103 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1105 let mut manager = bootstrap_manager.write().await;
1106 manager
1107 .start_maintenance()
1108 .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
1109 info!("Bootstrap cache manager started");
1110 }
1111
1112 self.transport.start_network_listeners().await?;
1114
1115 self.adaptive_dht.start().await?;
1117
1118 let listen_addrs = self.transport.listen_addrs().await;
1120 info!("P2P node started on addresses: {:?}", listen_addrs);
1121
1122 let close_group_cache = if let Some(ref dir) = self.config.close_group_cache_dir {
1129 match CloseGroupCache::load_from_dir(dir).await {
1130 Ok(Some(cache)) => {
1131 let original_count = cache.peers.len();
1134 let cache = CloseGroupCache {
1135 peers: cache
1136 .peers
1137 .into_iter()
1138 .filter(|p| p.trust.score.is_finite())
1139 .collect(),
1140 ..cache
1141 };
1142 let filtered_count = original_count - cache.peers.len();
1143 if filtered_count > 0 {
1144 warn!(
1145 "Filtered {filtered_count} peers with non-finite trust scores from close group cache"
1146 );
1147 }
1148
1149 let trust_snapshot = TrustSnapshot {
1150 peers: cache
1151 .peers
1152 .iter()
1153 .map(|p| (p.peer_id, p.trust.clone()))
1154 .collect(),
1155 };
1156 self.adaptive_dht
1157 .trust_engine()
1158 .import_snapshot(&trust_snapshot);
1159 info!(
1160 "Loaded {} peers from close group cache (trust scores imported)",
1161 cache.peers.len()
1162 );
1163 Some(cache)
1164 }
1165 Ok(None) => {
1166 debug!(
1167 "No close group cache found in {}, fresh start",
1168 dir.display()
1169 );
1170 None
1171 }
1172 Err(e) => {
1173 warn!(
1174 "Failed to load close group cache from {}: {e}",
1175 dir.display()
1176 );
1177 None
1178 }
1179 }
1180 } else {
1181 None
1182 };
1183
1184 self.connect_bootstrap_peers(close_group_cache.as_ref())
1186 .await?;
1187
1188 {
1195 let dht = self.adaptive_dht.dht_manager();
1196 let rt_size = dht.get_routing_table_size().await;
1197 dht.emit_event(DhtNetworkEvent::BootstrapComplete { num_peers: rt_size });
1198 }
1199
1200 if self.config.mode != NodeMode::Client {
1214 spawn_acquisition_driver(
1215 self.adaptive_dht.dht_manager().clone(),
1216 Arc::clone(&self.transport),
1217 Arc::clone(&self.relayer_peer_id),
1218 Arc::clone(&self.relay_address),
1219 self.shutdown.clone(),
1220 );
1221 } else {
1222 info!("client mode — skipping relay acquisition driver");
1223 }
1224
1225 {
1266 let transport = Arc::clone(&self.transport);
1267 let dht = self.adaptive_dht.dht_manager().clone();
1268 let shutdown = self.shutdown.clone();
1269 tokio::spawn(async move {
1270 loop {
1271 tokio::select! {
1272 biased;
1273 _ = shutdown.cancelled() => break,
1274 update = transport.recv_peer_address_update() => {
1275 let Some((peer_addr, advertised_addr)) = update else { break };
1276 let normalized_peer =
1277 saorsa_transport::shared::normalize_socket_addr(peer_addr);
1278 let normalized_adv =
1279 saorsa_transport::shared::normalize_socket_addr(advertised_addr);
1280 if normalized_peer.ip() == normalized_adv.ip() {
1285 debug!(
1286 "DHT_BRIDGE: dropping same-IP update peer={} addr={}",
1287 normalized_peer,
1288 normalized_adv
1289 );
1290 continue;
1291 }
1292 info!(
1293 "DHT_BRIDGE: processing relay update peer={} addr={}",
1294 normalized_peer,
1295 normalized_adv
1296 );
1297 if let Some(peer_id) = transport.peer_id_for_addr(&normalized_peer).await {
1302 let multi_addr = MultiAddr::quic(normalized_adv);
1303 info!(
1304 "Updating DHT: peer {} relay address {} (connection was {})",
1305 peer_id, advertised_addr, peer_addr
1306 );
1307 dht.touch_node_typed(
1308 &peer_id,
1309 Some(&multi_addr),
1310 AddressType::Relay,
1311 )
1312 .await;
1313 }
1314 }
1315 }
1316 }
1317 });
1318 }
1319
1320 self.is_started
1321 .store(true, std::sync::atomic::Ordering::Release);
1322
1323 Ok(())
1324 }
1325
1326 pub async fn run(&self) -> Result<()> {
1331 if !self.is_running() {
1332 self.start().await?;
1333 }
1334
1335 info!("P2P node running...");
1336
1337 self.shutdown.cancelled().await;
1340
1341 info!("P2P node stopped");
1342 Ok(())
1343 }
1344
1345 pub async fn stop(&self) -> Result<()> {
1347 info!("Stopping P2P node...");
1348
1349 if let Some(ref dir) = self.config.close_group_cache_dir
1351 && let Err(e) = self.save_close_group_cache(dir).await
1352 {
1353 warn!("Failed to save close group cache on shutdown: {e}");
1354 }
1355
1356 self.shutdown.cancel();
1358
1359 self.adaptive_dht.stop().await?;
1361
1362 self.transport.stop().await?;
1364
1365 self.is_started
1366 .store(false, std::sync::atomic::Ordering::Release);
1367
1368 info!("P2P node stopped");
1369 Ok(())
1370 }
1371
1372 pub async fn shutdown(&self) -> Result<()> {
1374 self.stop().await
1375 }
1376
1377 pub fn is_running(&self) -> bool {
1379 self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1380 }
1381
1382 pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1384 self.transport.listen_addrs().await
1385 }
1386
1387 pub async fn connected_peers(&self) -> Vec<PeerId> {
1389 self.transport.connected_peers().await
1390 }
1391
1392 pub async fn peer_count(&self) -> usize {
1394 self.transport.peer_count().await
1395 }
1396
1397 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1399 self.transport.peer_info(peer_id).await
1400 }
1401
1402 #[allow(dead_code)]
1404 pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1405 self.transport.get_channel_id_by_address(addr).await
1406 }
1407
1408 #[allow(dead_code)]
1410 pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1411 self.transport.list_active_connections().await
1412 }
1413
1414 #[allow(dead_code)]
1416 pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1417 self.transport.remove_channel(channel_id).await
1418 }
1419
1420 pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1425 self.transport.disconnect_channel(channel_id).await;
1426 }
1427
1428 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1430 self.transport.is_peer_connected(peer_id).await
1431 }
1432
1433 pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1444 self.transport.connect_peer(address).await
1445 }
1446
1447 pub async fn connect_peer_typed(
1454 &self,
1455 address: &MultiAddr,
1456 kind: AddressType,
1457 ) -> Result<String> {
1458 self.transport.connect_peer_typed(address, kind).await
1459 }
1460
1461 pub async fn wait_for_peer_identity(
1468 &self,
1469 channel_id: &str,
1470 timeout: Duration,
1471 ) -> Result<PeerId> {
1472 self.transport
1473 .wait_for_peer_identity(channel_id, timeout)
1474 .await
1475 }
1476
1477 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1479 self.transport.disconnect_peer(peer_id).await
1480 }
1481
1482 #[allow(dead_code)]
1484 pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1485 self.transport.is_connection_active(channel_id).await
1486 }
1487
1488 pub async fn send_message(
1502 &self,
1503 peer_id: &PeerId,
1504 protocol: &str,
1505 data: Vec<u8>,
1506 addrs: &[MultiAddr],
1507 ) -> Result<()> {
1508 let existing_channels = self.transport.channels_for_peer(peer_id).await;
1513
1514 if existing_channels.is_empty() {
1517 let lock = self.reconnect_lock_for(peer_id);
1518 let _guard = lock.lock().await;
1519
1520 if self.transport.is_peer_connected(peer_id).await {
1522 return self.transport.send_message(peer_id, protocol, data).await;
1523 }
1524
1525 return self
1526 .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1527 .await;
1528 }
1529
1530 let saved_addrs: Vec<MultiAddr> = self
1533 .transport
1534 .peer_info(peer_id)
1535 .await
1536 .map(|info| info.addresses)
1537 .unwrap_or_default();
1538
1539 let retry_data = data.clone();
1542
1543 match self.transport.send_message(peer_id, protocol, data).await {
1545 Ok(()) => return Ok(()),
1546 Err(e) => {
1547 if !e.is_stale_channel_send_failure() {
1548 debug!(
1549 peer = %peer_id.to_hex(),
1550 error = %e,
1551 "send failed during active channel use, not reconnecting",
1552 );
1553 return Err(e);
1554 }
1555
1556 debug!(
1557 peer = %peer_id.to_hex(),
1558 error = %e,
1559 "stale channel send failed, attempting reconnect",
1560 );
1561 }
1562 }
1563
1564 let lock = self.reconnect_lock_for(peer_id);
1567 let _guard = lock.lock().await;
1568
1569 if self.transport.is_peer_connected(peer_id).await {
1571 for channel_id in &existing_channels {
1575 self.transport.disconnect_channel(channel_id).await;
1576 }
1577 return self
1578 .transport
1579 .send_message(peer_id, protocol, retry_data)
1580 .await;
1581 }
1582
1583 self.reconnect_and_send(
1584 peer_id,
1585 protocol,
1586 retry_data,
1587 addrs,
1588 &saved_addrs,
1589 &existing_channels,
1590 )
1591 .await
1592 }
1593
1594 async fn reconnect_and_send(
1596 &self,
1597 peer_id: &PeerId,
1598 protocol: &str,
1599 data: Vec<u8>,
1600 addrs: &[MultiAddr],
1601 saved_addrs: &[MultiAddr],
1602 stale_channels: &[String],
1603 ) -> Result<()> {
1604 let (address, kind) = self
1606 .resolve_dial_address(peer_id, addrs, saved_addrs)
1607 .await
1608 .ok_or_else(|| {
1609 P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1610 })?;
1611
1612 if !stale_channels.is_empty() {
1618 for channel_id in stale_channels {
1619 self.transport.disconnect_channel(channel_id).await;
1620 }
1621 tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1622 }
1623
1624 let channel_id = self.transport.connect_peer_typed(&address, kind).await?;
1626 let authenticated = match self
1627 .transport
1628 .wait_for_peer_identity(&channel_id, IDENTITY_EXCHANGE_TIMEOUT)
1629 .await
1630 {
1631 Ok(peer) => peer,
1632 Err(e) => {
1633 self.transport.disconnect_channel(&channel_id).await;
1636 return Err(e);
1637 }
1638 };
1639
1640 if &authenticated != peer_id {
1641 self.transport.disconnect_channel(&channel_id).await;
1642 return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1643 expected: peer_id.to_hex().into(),
1644 actual: authenticated.to_hex().into(),
1645 }));
1646 }
1647
1648 self.transport.send_message(peer_id, protocol, data).await
1650 }
1651
1652 async fn resolve_dial_address(
1663 &self,
1664 peer_id: &PeerId,
1665 caller_addrs: &[MultiAddr],
1666 saved_addrs: &[MultiAddr],
1667 ) -> Option<(MultiAddr, AddressType)> {
1668 if let Some(addr) = Self::first_dialable(caller_addrs) {
1675 return Some((addr, AddressType::Unverified));
1676 }
1677 if let Some(addr) = Self::first_dialable(saved_addrs) {
1678 return Some((addr, AddressType::Unverified));
1679 }
1680
1681 self.adaptive_dht
1682 .peer_addresses_for_dial_typed(peer_id)
1683 .await
1684 .into_iter()
1685 .find(|(a, _)| {
1686 a.dialable_socket_addr()
1687 .is_some_and(|sa| !sa.ip().is_unspecified())
1688 })
1689 }
1690
1691 fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1694 addrs
1695 .iter()
1696 .find(|a| {
1697 let dialable = a
1698 .dialable_socket_addr()
1699 .is_some_and(|sa| !sa.ip().is_unspecified());
1700 if !dialable {
1701 trace!(address = %a, "skipping non-dialable address");
1702 }
1703 dialable
1704 })
1705 .cloned()
1706 }
1707
1708 fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1710 self.reconnect_locks
1711 .lock()
1712 .entry(*peer_id)
1713 .or_insert_with(|| Arc::new(TokioMutex::new(())))
1714 .clone()
1715 }
1716}
1717
1718const MAX_MESSAGE_AGE_SECS: u64 = 300;
1740const MAX_FUTURE_SECS: u64 = 300;
1742
1743fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1745 P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1746}
1747
1748pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1750 if let Err(e) = tx.send(event) {
1751 tracing::trace!("Event broadcast has no receivers: {e}");
1752 }
1753}
1754
1755pub(crate) struct ParsedMessage {
1757 pub(crate) event: P2PEvent,
1759 pub(crate) authenticated_node_id: Option<PeerId>,
1761 pub(crate) user_agent: String,
1763}
1764
1765pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1766 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1767
1768 let now = std::time::SystemTime::now()
1770 .duration_since(std::time::UNIX_EPOCH)
1771 .map(|d| d.as_secs())
1772 .unwrap_or(0);
1773
1774 if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1776 tracing::warn!(
1777 "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1778 source,
1779 message.timestamp,
1780 now.saturating_sub(message.timestamp)
1781 );
1782 return None;
1783 }
1784
1785 if message.timestamp > now + MAX_FUTURE_SECS {
1787 tracing::warn!(
1788 "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1789 source,
1790 message.timestamp,
1791 message.timestamp.saturating_sub(now)
1792 );
1793 return None;
1794 }
1795
1796 let authenticated_node_id = if !message.signature.is_empty() {
1798 match verify_message_signature(&message) {
1799 Ok(peer_id) => {
1800 debug!(
1801 "Message from {} authenticated as app-level NodeId {}",
1802 source, peer_id
1803 );
1804 Some(peer_id)
1805 }
1806 Err(e) => {
1807 warn!(
1808 "Rejecting message from {}: signature verification failed: {}",
1809 source, e
1810 );
1811 return None;
1812 }
1813 }
1814 } else {
1815 None
1816 };
1817
1818 debug!(
1819 "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1820 message.protocol,
1821 authenticated_node_id,
1822 source,
1823 message.from,
1824 message.data.len()
1825 );
1826
1827 Some(ParsedMessage {
1828 event: P2PEvent::Message {
1829 topic: message.protocol,
1830 source: authenticated_node_id,
1831 data: message.data,
1832 },
1833 authenticated_node_id,
1834 user_agent: message.user_agent,
1835 })
1836}
1837
1838fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1845 let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1846 .map_err(|e| format!("invalid public key: {e:?}"))?;
1847
1848 let peer_id = peer_id_from_public_key(&pubkey);
1849
1850 if message.from != peer_id {
1852 return Err(format!(
1853 "from field mismatch: message claims '{}' but public key derives '{}'",
1854 message.from, peer_id
1855 ));
1856 }
1857
1858 let signable = postcard::to_stdvec(&(
1859 &message.protocol,
1860 &message.data as &[u8],
1861 &message.from,
1862 message.timestamp,
1863 &message.user_agent,
1864 ))
1865 .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1866
1867 let sig = MlDsaSignature::from_bytes(&message.signature)
1868 .map_err(|e| format!("invalid signature: {e:?}"))?;
1869
1870 let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1871 .map_err(|e| format!("verification error: {e}"))?;
1872
1873 if valid {
1874 Ok(peer_id)
1875 } else {
1876 Err("signature is invalid".to_string())
1877 }
1878}
1879
1880impl P2PNode {
1881 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1883 self.transport.subscribe_events()
1884 }
1885
1886 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1888 self.subscribe_events()
1889 }
1890
1891 pub fn uptime(&self) -> Duration {
1893 self.start_time.elapsed()
1894 }
1895
1896 pub async fn health_check(&self) -> Result<()> {
1909 let peer_count = self.peer_count().await;
1910 if peer_count > self.config.max_connections {
1911 Err(protocol_error(format!(
1912 "Too many connections: {peer_count}"
1913 )))
1914 } else {
1915 Ok(())
1916 }
1917 }
1918
1919 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1921 self.adaptive_dht.dht_manager()
1922 }
1923
1924 pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1926 self.dht_manager()
1927 }
1928
1929 pub async fn add_discovered_peer(
1931 &self,
1932 _peer_id: PeerId,
1933 addresses: Vec<MultiAddr>,
1934 ) -> Result<()> {
1935 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1936 let manager = bootstrap_manager.read().await;
1937 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1938 .iter()
1939 .filter_map(|addr| addr.socket_addr())
1940 .collect();
1941 if let Some(&primary) = socket_addresses.first() {
1942 manager
1943 .add_peer(&primary, socket_addresses)
1944 .await
1945 .map_err(|e| {
1946 protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1947 })?;
1948 }
1949 }
1950 Ok(())
1951 }
1952
1953 pub async fn update_peer_metrics(
1955 &self,
1956 addr: &MultiAddr,
1957 success: bool,
1958 latency_ms: Option<u64>,
1959 _error: Option<String>,
1960 ) -> Result<()> {
1961 if let Some(ref bootstrap_manager) = self.bootstrap_manager
1962 && let Some(sa) = addr.socket_addr()
1963 {
1964 let manager = bootstrap_manager.read().await;
1965 if success {
1966 let rtt_ms = latency_ms.unwrap_or(0) as u32;
1967 manager.record_success(&sa, rtt_ms).await;
1968 } else {
1969 manager.record_failure(&sa).await;
1970 }
1971 }
1972 Ok(())
1973 }
1974
1975 pub async fn get_bootstrap_cache_stats(
1977 &self,
1978 ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1979 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1980 let manager = bootstrap_manager.read().await;
1981 Ok(Some(manager.stats().await))
1982 } else {
1983 Ok(None)
1984 }
1985 }
1986
1987 pub async fn cached_peer_count(&self) -> usize {
1989 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1990 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1991 {
1992 return stats.total_peers;
1993 }
1994 0
1995 }
1996
1997 async fn connect_bootstrap_peers(
2004 &self,
2005 close_group_cache: Option<&CloseGroupCache>,
2006 ) -> Result<()> {
2007 let mut serial_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2012 let mut parallel_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
2013 let mut used_cache = false;
2014 let mut seen_addresses = std::collections::HashSet::new();
2015
2016 if let Some(cache) = close_group_cache {
2022 let mut sorted_peers: Vec<&CachedCloseGroupPeer> = cache.peers.iter().collect();
2023 sorted_peers.sort_by(|a, b| {
2024 let score_ord = match b.trust.score.partial_cmp(&a.trust.score) {
2028 Some(ord) => ord,
2029 None => {
2030 if a.trust.score.is_nan() {
2031 std::cmp::Ordering::Greater } else {
2033 std::cmp::Ordering::Less }
2035 }
2036 };
2037 score_ord.then_with(|| {
2038 let da = self.peer_id.xor_distance(&a.peer_id);
2039 let db = self.peer_id.xor_distance(&b.peer_id);
2040 da.cmp(&db)
2041 })
2042 });
2043
2044 let mut added_from_close_group = 0usize;
2045 for peer in &sorted_peers {
2046 let new_addresses: Vec<MultiAddr> = peer
2047 .addresses
2048 .iter()
2049 .filter(|a| {
2050 a.dialable_socket_addr()
2051 .is_some_and(|sa| !seen_addresses.contains(&sa))
2052 })
2053 .cloned()
2054 .collect();
2055
2056 if !new_addresses.is_empty() {
2057 for addr in &new_addresses {
2058 if let Some(sa) = addr.socket_addr() {
2059 seen_addresses.insert(sa);
2060 }
2061 }
2062 serial_addr_sets.push(new_addresses);
2063 added_from_close_group += 1;
2064 }
2065 }
2066 if added_from_close_group > 0 {
2067 info!(
2068 "Added {} close group cache peers (highest trust first)",
2069 added_from_close_group
2070 );
2071 }
2072 }
2073
2074 if !self.config.bootstrap_peers.is_empty() {
2076 info!(
2077 "Using {} configured bootstrap peers (priority)",
2078 self.config.bootstrap_peers.len()
2079 );
2080 for multiaddr in &self.config.bootstrap_peers {
2081 let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
2082 warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
2083 continue;
2084 };
2085 seen_addresses.insert(socket_addr);
2086 parallel_addr_sets.push(vec![multiaddr.clone()]);
2087 }
2088 }
2089
2090 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2092 let manager = bootstrap_manager.read().await;
2093 let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
2094 if !cached_peers.is_empty() {
2095 let mut added_from_cache = 0;
2096 for cached in cached_peers {
2097 let mut addrs = vec![cached.primary_address];
2098 addrs.extend(cached.addresses);
2099 let new_addresses: Vec<MultiAddr> = addrs
2101 .into_iter()
2102 .filter(|a| !seen_addresses.contains(a))
2103 .map(MultiAddr::quic)
2104 .collect();
2105
2106 if !new_addresses.is_empty() {
2107 for addr in &new_addresses {
2108 if let Some(sa) = addr.socket_addr() {
2109 seen_addresses.insert(sa);
2110 }
2111 }
2112 parallel_addr_sets.push(new_addresses);
2113 added_from_cache += 1;
2114 }
2115 }
2116 if added_from_cache > 0 {
2117 info!(
2118 "Added {} cached bootstrap peers (supplementing CLI peers)",
2119 added_from_cache
2120 );
2121 used_cache = true;
2122 }
2123 }
2124 }
2125
2126 if serial_addr_sets.is_empty() && parallel_addr_sets.is_empty() {
2127 info!("No bootstrap peers configured and no cached peers available");
2128 return Ok(());
2129 }
2130
2131 let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
2134 let mut successful_connections = 0;
2135 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
2136
2137 let client_mode = matches!(self.config.mode, NodeMode::Client);
2139 for addrs in &serial_addr_sets {
2140 if let Some(peer_id) = self
2141 .dial_bootstrap_addr_set(addrs, used_cache, identity_timeout)
2142 .await
2143 {
2144 successful_connections += 1;
2145 connected_peer_ids.push(peer_id);
2146 if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2147 debug!(
2148 "Client bootstrap target reached ({successful_connections} peers) — skipping remaining serial dials"
2149 );
2150 break;
2151 }
2152 }
2153 }
2154
2155 if !client_mode || successful_connections < CLIENT_BOOTSTRAP_TARGET {
2160 let mut parallel_stream =
2161 futures::stream::iter(parallel_addr_sets.into_iter().map(|addrs| async move {
2162 self.dial_bootstrap_addr_set(&addrs, used_cache, identity_timeout)
2163 .await
2164 }))
2165 .buffer_unordered(MAX_CONCURRENT_BOOTSTRAP_DIALS);
2166 while let Some(result) = parallel_stream.next().await {
2167 if let Some(peer_id) = result {
2168 successful_connections += 1;
2169 connected_peer_ids.push(peer_id);
2170 if client_mode && successful_connections >= CLIENT_BOOTSTRAP_TARGET {
2171 debug!(
2172 "Client bootstrap target reached ({successful_connections} peers) — cancelling pending dials"
2173 );
2174 break;
2175 }
2176 }
2177 }
2178 }
2182
2183 if successful_connections == 0 {
2184 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
2188 let transport_peers = self.transport.connected_peers().await;
2189 if !transport_peers.is_empty() {
2190 info!(
2191 "No outbound bootstrap succeeded, but {} inbound peer(s) connected — proceeding with DHT bootstrap",
2192 transport_peers.len()
2193 );
2194 connected_peer_ids = transport_peers;
2195 successful_connections = connected_peer_ids.len();
2196 } else {
2197 if !used_cache {
2198 warn!("Failed to connect to any bootstrap peers");
2199 }
2200 return Ok(());
2203 }
2204 }
2205
2206 info!(
2207 "Successfully connected to {} bootstrap peers",
2208 successful_connections
2209 );
2210
2211 match self
2213 .dht_manager()
2214 .bootstrap_from_peers(&connected_peer_ids)
2215 .await
2216 {
2217 Ok(count) => info!("DHT peer discovery found {} peers", count),
2218 Err(e) => warn!("DHT peer discovery failed: {}", e),
2219 }
2220
2221 if matches!(self.config.mode, NodeMode::Node) {
2231 const SELF_LOOKUP_ROUNDS: u8 = 2;
2232 for i in 1..=SELF_LOOKUP_ROUNDS {
2233 if let Err(e) = self.dht_manager().trigger_self_lookup().await {
2234 warn!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} failed: {e}");
2235 } else {
2236 debug!("Post-bootstrap self-lookup {i}/{SELF_LOOKUP_ROUNDS} completed");
2237 }
2238 }
2239 } else {
2240 debug!("Skipping post-bootstrap self-lookups (client mode)");
2241 }
2242
2243 self.is_bootstrapped.store(true, Ordering::SeqCst);
2246 info!(
2247 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
2248 successful_connections,
2249 connected_peer_ids.len()
2250 );
2251
2252 if let Some(ref dir) = self.config.close_group_cache_dir
2255 && let Err(e) = self.save_close_group_cache(dir).await
2256 {
2257 warn!("Failed to save close group cache after bootstrap: {e}");
2258 }
2259
2260 Ok(())
2261 }
2262
2263 async fn dial_bootstrap_addr_set(
2268 &self,
2269 addrs: &[MultiAddr],
2270 used_cache: bool,
2271 identity_timeout: Duration,
2272 ) -> Option<PeerId> {
2273 for addr in addrs {
2274 match self
2279 .transport
2280 .connect_peer_typed(addr, AddressType::Unverified)
2281 .await
2282 {
2283 Ok(channel_id) => match self
2284 .transport
2285 .wait_for_peer_identity(&channel_id, identity_timeout)
2286 .await
2287 {
2288 Ok(real_peer_id) => {
2289 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
2290 let manager = bootstrap_manager.read().await;
2291 if let Some(sa) = addr.socket_addr() {
2292 manager.record_success(&sa, 100).await;
2293 }
2294 }
2295 return Some(real_peer_id);
2296 }
2297 Err(e) => {
2298 warn!(
2299 "Timeout waiting for identity from bootstrap peer {}: {}, \
2300 closing channel {}",
2301 addr, e, channel_id
2302 );
2303 self.disconnect_channel(&channel_id).await;
2304 }
2305 },
2306 Err(e) => {
2307 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
2308 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
2309 let manager = bootstrap_manager.read().await;
2310 if let Some(sa) = addr.socket_addr() {
2311 manager.record_failure(&sa).await;
2312 }
2313 }
2314 }
2315 }
2316 }
2317 None
2318 }
2319
2320 async fn save_close_group_cache(&self, dir: &Path) -> anyhow::Result<()> {
2322 let key: crate::dht::Key = *self.peer_id.as_bytes();
2323 let k_value = self.config.dht_config.k_value;
2324 let close_group = self
2325 .dht_manager()
2326 .find_closest_nodes_local(&key, k_value)
2327 .await;
2328
2329 if close_group.is_empty() {
2330 debug!("No close group peers to save");
2331 return Ok(());
2332 }
2333
2334 let trust_engine = self.adaptive_dht.trust_engine();
2335 let now_epoch = SystemTime::now()
2336 .duration_since(UNIX_EPOCH)
2337 .map(|d| d.as_secs())
2338 .unwrap_or(0);
2339
2340 let peers: Vec<CachedCloseGroupPeer> = close_group
2341 .into_iter()
2342 .filter_map(|dht_node| {
2343 let score = trust_engine.score(&dht_node.peer_id);
2344 if !score.is_finite() {
2347 return None;
2348 }
2349 Some(CachedCloseGroupPeer {
2350 peer_id: dht_node.peer_id,
2351 addresses: dht_node.addresses,
2352 trust: TrustRecord {
2353 score,
2354 last_updated_epoch_secs: now_epoch,
2355 },
2356 })
2357 })
2358 .collect();
2359
2360 let peer_count = peers.len();
2361 let cache = CloseGroupCache {
2362 peers,
2363 saved_at_epoch_secs: now_epoch,
2364 };
2365
2366 cache.save_to_dir(dir).await?;
2367 info!(
2368 "Saved {} close group peers to cache in {}",
2369 peer_count,
2370 dir.display()
2371 );
2372 Ok(())
2373 }
2374
2375 }
2377
2378#[async_trait::async_trait]
2380#[allow(dead_code)]
2381pub trait NetworkSender: Send + Sync {
2382 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
2384
2385 fn local_peer_id(&self) -> PeerId;
2387}
2388
2389#[cfg(test)]
2393#[allow(clippy::unwrap_used, clippy::expect_used)]
2394mod diversity_tests {
2395 use super::*;
2396 use crate::security::IPDiversityConfig;
2397
2398 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
2399 let temp_dir = tempfile::TempDir::new().expect("temp dir");
2401 let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
2402 bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
2403
2404 BootstrapManager::with_node_config(bootstrap_config, config)
2405 .await
2406 .expect("bootstrap manager")
2407 }
2408
2409 #[tokio::test]
2410 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
2411 let config = NodeConfig {
2412 diversity_config: Some(IPDiversityConfig::testnet()),
2413 ..Default::default()
2414 };
2415
2416 let manager = build_bootstrap_manager_like_prod(&config).await;
2417 assert_eq!(manager.diversity_config().max_per_ip, Some(usize::MAX));
2419 assert_eq!(manager.diversity_config().max_per_subnet, Some(usize::MAX));
2420 }
2421}
2422
2423pub(crate) fn register_new_channel(
2430 peers: &DashMap<String, PeerInfo>,
2431 channel_id: &str,
2432 remote_addr: &MultiAddr,
2433) {
2434 let peer_info = PeerInfo {
2435 channel_id: channel_id.to_owned(),
2436 addresses: vec![remote_addr.clone()],
2437 connected_at: tokio::time::Instant::now(),
2438 last_seen: tokio::time::Instant::now(),
2439 status: ConnectionStatus::Connected,
2440 protocols: vec!["p2p-core/1.0.0".to_string()],
2441 heartbeat_count: 0,
2442 };
2443 peers.insert(channel_id.to_owned(), peer_info);
2444}
2445
2446#[cfg(test)]
2447mod tests {
2448 use super::*;
2449 use std::time::Duration;
2451 use tokio::time::timeout;
2452
2453 const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
2455
2456 fn create_test_node_config() -> NodeConfig {
2462 NodeConfig {
2463 local: true,
2464 port: 0,
2465 ipv6: true,
2466 bootstrap_peers: vec![],
2467 connection_timeout: Duration::from_secs(2),
2468 max_connections: 100,
2469 dht_config: DHTConfig::default(),
2470 bootstrap_cache_config: None,
2471 diversity_config: None,
2472 max_message_size: None,
2473 node_identity: None,
2474 mode: NodeMode::default(),
2475 custom_user_agent: None,
2476 allow_loopback: true,
2477 adaptive_dht_config: AdaptiveDhtConfig::default(),
2478 close_group_cache_dir: None,
2479 }
2480 }
2481
2482 #[tokio::test]
2486 async fn test_node_config_default() {
2487 let config = NodeConfig::default();
2488
2489 assert_eq!(config.listen_addrs().len(), 2); assert_eq!(config.max_connections, 10000);
2491 assert_eq!(config.connection_timeout, Duration::from_secs(25));
2492 }
2493
2494 #[tokio::test]
2495 async fn test_dht_config_default() {
2496 let config = DHTConfig::default();
2497
2498 assert_eq!(config.k_value, 20);
2499 assert_eq!(config.alpha_value, 3);
2500 assert_eq!(config.refresh_interval, Duration::from_secs(600));
2501 }
2502
2503 #[test]
2504 fn test_connection_status_variants() {
2505 let connecting = ConnectionStatus::Connecting;
2506 let connected = ConnectionStatus::Connected;
2507 let disconnecting = ConnectionStatus::Disconnecting;
2508 let disconnected = ConnectionStatus::Disconnected;
2509 let failed = ConnectionStatus::Failed("test error".to_string());
2510
2511 assert_eq!(connecting, ConnectionStatus::Connecting);
2512 assert_eq!(connected, ConnectionStatus::Connected);
2513 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
2514 assert_eq!(disconnected, ConnectionStatus::Disconnected);
2515 assert_ne!(connecting, connected);
2516
2517 if let ConnectionStatus::Failed(msg) = failed {
2518 assert_eq!(msg, "test error");
2519 } else {
2520 panic!("Expected Failed status");
2521 }
2522 }
2523
2524 #[tokio::test]
2525 async fn test_node_creation() -> Result<()> {
2526 let config = create_test_node_config();
2527 let node = P2PNode::new(config).await?;
2528
2529 assert_eq!(node.peer_id().to_hex().len(), 64);
2531 assert!(!node.is_running());
2532 assert_eq!(node.peer_count().await, 0);
2533 assert!(node.connected_peers().await.is_empty());
2534
2535 Ok(())
2536 }
2537
2538 #[tokio::test]
2539 async fn test_node_lifecycle() -> Result<()> {
2540 let config = create_test_node_config();
2541 let node = P2PNode::new(config).await?;
2542
2543 assert!(!node.is_running());
2545
2546 node.start().await?;
2548 assert!(node.is_running());
2549
2550 let listen_addrs = node.listen_addrs().await;
2552 assert!(
2553 !listen_addrs.is_empty(),
2554 "Expected at least one listening address"
2555 );
2556
2557 node.stop().await?;
2559 assert!(!node.is_running());
2560
2561 Ok(())
2562 }
2563
2564 #[tokio::test]
2565 async fn test_peer_connection() -> Result<()> {
2566 let config1 = create_test_node_config();
2567 let config2 = create_test_node_config();
2568
2569 let node1 = P2PNode::new(config1).await?;
2570 let node2 = P2PNode::new(config2).await?;
2571
2572 node1.start().await?;
2573 node2.start().await?;
2574
2575 let node2_addr = node2
2576 .listen_addrs()
2577 .await
2578 .into_iter()
2579 .find(|a| a.is_ipv4())
2580 .ok_or_else(|| {
2581 P2PError::Network(crate::error::NetworkError::InvalidAddress(
2582 "Node 2 did not expose an IPv4 listen address".into(),
2583 ))
2584 })?;
2585
2586 let channel_id = node1.connect_peer(&node2_addr).await?;
2589
2590 assert!(node1.is_connection_active(&channel_id).await);
2593
2594 let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2596 assert!(peer_info.is_some());
2597 let info = peer_info.expect("Peer info should exist after connect");
2598 assert_eq!(info.channel_id, channel_id);
2599 assert_eq!(info.status, ConnectionStatus::Connected);
2600 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2601
2602 node1.remove_channel(&channel_id).await;
2604 assert!(!node1.is_connection_active(&channel_id).await);
2605
2606 node1.stop().await?;
2607 node2.stop().await?;
2608
2609 Ok(())
2610 }
2611
2612 #[tokio::test]
2613 async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2614 let config = create_test_node_config();
2615 let node = P2PNode::new(config).await?;
2616
2617 let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2618 let result = node.connect_peer(&tcp_addr).await;
2619
2620 assert!(
2621 matches!(
2622 result,
2623 Err(P2PError::Network(
2624 crate::error::NetworkError::InvalidAddress(_)
2625 ))
2626 ),
2627 "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2628 result
2629 );
2630
2631 Ok(())
2632 }
2633
2634 #[cfg_attr(target_os = "windows", ignore)]
2641 #[tokio::test]
2642 async fn test_event_subscription() -> Result<()> {
2643 let identity1 =
2647 Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2648 let identity2 =
2649 Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2650
2651 let mut config1 = create_test_node_config();
2652 config1.ipv6 = false;
2653 config1.node_identity = Some(identity1);
2654
2655 let node2_peer_id = *identity2.peer_id();
2656 let mut config2 = create_test_node_config();
2657 config2.ipv6 = false;
2658 config2.node_identity = Some(identity2);
2659
2660 let node1 = P2PNode::new(config1).await?;
2661 let node2 = P2PNode::new(config2).await?;
2662
2663 node1.start().await?;
2664 node2.start().await?;
2665
2666 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2667
2668 let mut events = node2.subscribe_events();
2670
2671 let node2_addr = node2.local_addr().ok_or_else(|| {
2672 P2PError::Network(crate::error::NetworkError::ProtocolError(
2673 "No listening address".to_string().into(),
2674 ))
2675 })?;
2676
2677 let mut channel_id = None;
2679 for attempt in 0..3 {
2680 if attempt > 0 {
2681 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2682 }
2683 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2684 Ok(Ok(id)) => {
2685 channel_id = Some(id);
2686 break;
2687 }
2688 Ok(Err(_)) | Err(_) => continue,
2689 }
2690 }
2691 let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2692
2693 let target_peer_id = node1
2695 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2696 .await?;
2697 assert_eq!(target_peer_id, node2_peer_id);
2698
2699 node1
2701 .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2702 .await?;
2703
2704 let event = timeout(Duration::from_secs(2), async {
2706 loop {
2707 match events.recv().await {
2708 Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2709 Ok(P2PEvent::Message { .. }) => continue, Ok(_) => continue,
2711 Err(e) => return Err(e),
2712 }
2713 }
2714 })
2715 .await;
2716 assert!(event.is_ok(), "Should receive PeerConnected event");
2717 let connected_peer_id = event.expect("Timed out").expect("Channel error");
2718 assert!(
2720 connected_peer_id.0.iter().any(|&b| b != 0),
2721 "PeerConnected should carry a non-zero peer ID"
2722 );
2723
2724 node1.stop().await?;
2725 node2.stop().await?;
2726
2727 Ok(())
2728 }
2729
2730 #[cfg_attr(target_os = "windows", ignore)]
2732 #[tokio::test]
2733 async fn test_message_sending() -> Result<()> {
2734 let mut config1 = create_test_node_config();
2736 config1.ipv6 = false;
2737 let node1 = P2PNode::new(config1).await?;
2738 node1.start().await?;
2739
2740 let mut config2 = create_test_node_config();
2741 config2.ipv6 = false;
2742 let node2 = P2PNode::new(config2).await?;
2743 node2.start().await?;
2744
2745 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2747
2748 let node2_addr = node2.local_addr().ok_or_else(|| {
2750 P2PError::Network(crate::error::NetworkError::ProtocolError(
2751 "No listening address".to_string().into(),
2752 ))
2753 })?;
2754
2755 let channel_id =
2757 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2758 Ok(res) => res?,
2759 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2760 };
2761
2762 let target_peer_id = node1
2764 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2765 .await?;
2766 assert_eq!(target_peer_id, node2.peer_id().clone());
2767
2768 let message_data = b"Hello, peer!".to_vec();
2770 let result = match timeout(
2771 Duration::from_millis(500),
2772 node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2773 )
2774 .await
2775 {
2776 Ok(res) => res,
2777 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2778 };
2779 if let Err(e) = &result {
2782 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2783 }
2784
2785 let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2787 let result = node1
2788 .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2789 .await;
2790 assert!(result.is_err(), "Sending to non-existent peer should fail");
2791
2792 node1.stop().await?;
2793 node2.stop().await?;
2794
2795 Ok(())
2796 }
2797
2798 #[tokio::test]
2799 async fn test_remote_mcp_operations() -> Result<()> {
2800 let config = create_test_node_config();
2801 let node = P2PNode::new(config).await?;
2802
2803 node.start().await?;
2805 node.stop().await?;
2806 Ok(())
2807 }
2808
2809 #[tokio::test]
2810 async fn test_health_check() -> Result<()> {
2811 let config = create_test_node_config();
2812 let node = P2PNode::new(config).await?;
2813
2814 let result = node.health_check().await;
2816 assert!(result.is_ok());
2817
2818 Ok(())
2823 }
2824
2825 #[tokio::test]
2826 async fn test_node_uptime() -> Result<()> {
2827 let config = create_test_node_config();
2828 let node = P2PNode::new(config).await?;
2829
2830 let uptime1 = node.uptime();
2831 assert!(uptime1 >= Duration::from_secs(0));
2832
2833 tokio::time::sleep(Duration::from_millis(10)).await;
2835
2836 let uptime2 = node.uptime();
2837 assert!(uptime2 > uptime1);
2838
2839 Ok(())
2840 }
2841
2842 #[tokio::test]
2843 async fn test_node_config_access() -> Result<()> {
2844 let config = create_test_node_config();
2845 let node = P2PNode::new(config).await?;
2846
2847 let node_config = node.config();
2848 assert_eq!(node_config.max_connections, 100);
2849 Ok(())
2852 }
2853
2854 #[tokio::test]
2855 async fn test_mcp_server_access() -> Result<()> {
2856 let config = create_test_node_config();
2857 let _node = P2PNode::new(config).await?;
2858
2859 Ok(())
2861 }
2862
2863 #[tokio::test]
2864 async fn test_dht_access() -> Result<()> {
2865 let config = create_test_node_config();
2866 let node = P2PNode::new(config).await?;
2867
2868 let _dht = node.dht();
2870
2871 Ok(())
2872 }
2873
2874 #[tokio::test]
2875 async fn test_node_config_builder() -> Result<()> {
2876 let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2877
2878 let config = NodeConfig::builder()
2879 .local(true)
2880 .ipv6(true)
2881 .bootstrap_peer(bootstrap)
2882 .connection_timeout(Duration::from_secs(15))
2883 .max_connections(200)
2884 .max_message_size(TEST_MAX_MESSAGE_SIZE)
2885 .build()?;
2886
2887 assert_eq!(config.listen_addrs().len(), 2); assert!(config.local);
2889 assert!(config.ipv6);
2890 assert_eq!(config.bootstrap_peers.len(), 1);
2891 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2892 assert_eq!(config.max_connections, 200);
2893 assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2894 assert!(config.allow_loopback); Ok(())
2897 }
2898
2899 #[tokio::test]
2900 async fn test_bootstrap_peers() -> Result<()> {
2901 let mut config = create_test_node_config();
2902 config.bootstrap_peers = vec![
2903 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2904 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2905 ];
2906
2907 let node = P2PNode::new(config).await?;
2908
2909 node.start().await?;
2911
2912 let _peer_count = node.peer_count().await;
2916
2917 node.stop().await?;
2918 Ok(())
2919 }
2920
2921 #[tokio::test]
2922 async fn test_peer_info_structure() {
2923 let peer_info = PeerInfo {
2924 channel_id: "test_peer".to_string(),
2925 addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2926 connected_at: Instant::now(),
2927 last_seen: Instant::now(),
2928 status: ConnectionStatus::Connected,
2929 protocols: vec!["test-protocol".to_string()],
2930 heartbeat_count: 0,
2931 };
2932
2933 assert_eq!(peer_info.channel_id, "test_peer");
2934 assert_eq!(peer_info.addresses.len(), 1);
2935 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2936 assert_eq!(peer_info.protocols.len(), 1);
2937 }
2938
2939 #[tokio::test]
2940 async fn test_serialization() -> Result<()> {
2941 let config = create_test_node_config();
2943 let serialized = serde_json::to_string(&config)?;
2944 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2945
2946 assert_eq!(config.local, deserialized.local);
2947 assert_eq!(config.port, deserialized.port);
2948 assert_eq!(config.ipv6, deserialized.ipv6);
2949 assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2950
2951 Ok(())
2952 }
2953
2954 #[tokio::test]
2955 async fn test_get_channel_id_by_address_found() -> Result<()> {
2956 let config = create_test_node_config();
2957 let node = P2PNode::new(config).await?;
2958
2959 let test_channel_id = "peer_test_123".to_string();
2961 let test_address = "192.168.1.100:9000";
2962 let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2963
2964 let peer_info = PeerInfo {
2965 channel_id: test_channel_id.clone(),
2966 addresses: vec![test_multiaddr],
2967 connected_at: Instant::now(),
2968 last_seen: Instant::now(),
2969 status: ConnectionStatus::Connected,
2970 protocols: vec!["test-protocol".to_string()],
2971 heartbeat_count: 0,
2972 };
2973
2974 node.transport
2975 .inject_peer(test_channel_id.clone(), peer_info)
2976 .await;
2977
2978 let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2980 let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2981 assert_eq!(found_channel_id, Some(test_channel_id));
2982
2983 Ok(())
2984 }
2985
2986 #[tokio::test]
2987 async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2988 let config = create_test_node_config();
2989 let node = P2PNode::new(config).await?;
2990
2991 let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2993 let result = node.get_channel_id_by_address(&unknown_addr).await;
2994 assert_eq!(result, None);
2995
2996 Ok(())
2997 }
2998
2999 #[tokio::test]
3000 async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
3001 let config = create_test_node_config();
3002 let node = P2PNode::new(config).await?;
3003
3004 let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
3006 mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
3007 psm: 0x0025,
3008 });
3009 let result = node.get_channel_id_by_address(&ble_addr).await;
3010 assert_eq!(result, None);
3011
3012 Ok(())
3013 }
3014
3015 #[tokio::test]
3016 async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
3017 let config = create_test_node_config();
3018 let node = P2PNode::new(config).await?;
3019
3020 let peer1_id = "peer_1".to_string();
3022 let peer1_addr_str = "192.168.1.101:9001";
3023 let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
3024
3025 let peer2_id = "peer_2".to_string();
3026 let peer2_addr_str = "192.168.1.102:9002";
3027 let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
3028
3029 let peer1_info = PeerInfo {
3030 channel_id: peer1_id.clone(),
3031 addresses: vec![peer1_multiaddr],
3032 connected_at: Instant::now(),
3033 last_seen: Instant::now(),
3034 status: ConnectionStatus::Connected,
3035 protocols: vec!["test-protocol".to_string()],
3036 heartbeat_count: 0,
3037 };
3038
3039 let peer2_info = PeerInfo {
3040 channel_id: peer2_id.clone(),
3041 addresses: vec![peer2_multiaddr],
3042 connected_at: Instant::now(),
3043 last_seen: Instant::now(),
3044 status: ConnectionStatus::Connected,
3045 protocols: vec!["test-protocol".to_string()],
3046 heartbeat_count: 0,
3047 };
3048
3049 node.transport
3050 .inject_peer(peer1_id.clone(), peer1_info)
3051 .await;
3052 node.transport
3053 .inject_peer(peer2_id.clone(), peer2_info)
3054 .await;
3055
3056 let found_peer1 = node
3058 .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
3059 .await;
3060 let found_peer2 = node
3061 .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
3062 .await;
3063
3064 assert_eq!(found_peer1, Some(peer1_id));
3065 assert_eq!(found_peer2, Some(peer2_id));
3066
3067 Ok(())
3068 }
3069
3070 #[tokio::test]
3071 async fn test_list_active_connections_empty() -> Result<()> {
3072 let config = create_test_node_config();
3073 let node = P2PNode::new(config).await?;
3074
3075 let connections = node.list_active_connections().await;
3077 assert!(connections.is_empty());
3078
3079 Ok(())
3080 }
3081
3082 #[tokio::test]
3083 async fn test_list_active_connections_with_peers() -> Result<()> {
3084 let config = create_test_node_config();
3085 let node = P2PNode::new(config).await?;
3086
3087 let peer1_id = "peer_1".to_string();
3089 let peer1_addrs = vec![
3090 MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
3091 MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
3092 ];
3093
3094 let peer2_id = "peer_2".to_string();
3095 let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
3096
3097 let peer1_info = PeerInfo {
3098 channel_id: peer1_id.clone(),
3099 addresses: peer1_addrs.clone(),
3100 connected_at: Instant::now(),
3101 last_seen: Instant::now(),
3102 status: ConnectionStatus::Connected,
3103 protocols: vec!["test-protocol".to_string()],
3104 heartbeat_count: 0,
3105 };
3106
3107 let peer2_info = PeerInfo {
3108 channel_id: peer2_id.clone(),
3109 addresses: peer2_addrs.clone(),
3110 connected_at: Instant::now(),
3111 last_seen: Instant::now(),
3112 status: ConnectionStatus::Connected,
3113 protocols: vec!["test-protocol".to_string()],
3114 heartbeat_count: 0,
3115 };
3116
3117 node.transport
3118 .inject_peer(peer1_id.clone(), peer1_info)
3119 .await;
3120 node.transport
3121 .inject_peer(peer2_id.clone(), peer2_info)
3122 .await;
3123
3124 node.transport
3126 .inject_active_connection(peer1_id.clone())
3127 .await;
3128 node.transport
3129 .inject_active_connection(peer2_id.clone())
3130 .await;
3131
3132 let connections = node.list_active_connections().await;
3134 assert_eq!(connections.len(), 2);
3135
3136 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
3138 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
3139
3140 assert!(peer1_conn.is_some());
3141 assert!(peer2_conn.is_some());
3142
3143 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
3145 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
3146
3147 Ok(())
3148 }
3149
3150 #[tokio::test]
3151 async fn test_remove_channel_success() -> Result<()> {
3152 let config = create_test_node_config();
3153 let node = P2PNode::new(config).await?;
3154
3155 let channel_id = "peer_to_remove".to_string();
3157 let channel_peer_id = PeerId::from_name(&channel_id);
3158 let peer_info = PeerInfo {
3159 channel_id: channel_id.clone(),
3160 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3161 connected_at: Instant::now(),
3162 last_seen: Instant::now(),
3163 status: ConnectionStatus::Connected,
3164 protocols: vec!["test-protocol".to_string()],
3165 heartbeat_count: 0,
3166 };
3167
3168 node.transport
3169 .inject_peer(channel_id.clone(), peer_info)
3170 .await;
3171 node.transport
3172 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3173 .await;
3174
3175 assert!(node.is_peer_connected(&channel_peer_id).await);
3177
3178 let removed = node.remove_channel(&channel_id).await;
3180 assert!(removed);
3181
3182 assert!(!node.is_peer_connected(&channel_peer_id).await);
3184
3185 Ok(())
3186 }
3187
3188 #[tokio::test]
3189 async fn test_remove_channel_nonexistent() -> Result<()> {
3190 let config = create_test_node_config();
3191 let node = P2PNode::new(config).await?;
3192
3193 let removed = node.remove_channel("nonexistent_peer").await;
3195 assert!(!removed);
3196
3197 Ok(())
3198 }
3199
3200 #[tokio::test]
3201 async fn test_is_peer_connected() -> Result<()> {
3202 let config = create_test_node_config();
3203 let node = P2PNode::new(config).await?;
3204
3205 let channel_id = "test_peer".to_string();
3206 let channel_peer_id = PeerId::from_name(&channel_id);
3207
3208 assert!(!node.is_peer_connected(&channel_peer_id).await);
3210
3211 let peer_info = PeerInfo {
3213 channel_id: channel_id.clone(),
3214 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
3215 connected_at: Instant::now(),
3216 last_seen: Instant::now(),
3217 status: ConnectionStatus::Connected,
3218 protocols: vec!["test-protocol".to_string()],
3219 heartbeat_count: 0,
3220 };
3221
3222 node.transport
3223 .inject_peer(channel_id.clone(), peer_info)
3224 .await;
3225 node.transport
3226 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
3227 .await;
3228
3229 assert!(node.is_peer_connected(&channel_peer_id).await);
3231
3232 node.remove_channel(&channel_id).await;
3234
3235 assert!(!node.is_peer_connected(&channel_peer_id).await);
3237
3238 Ok(())
3239 }
3240
3241 #[test]
3242 fn test_normalize_ipv6_wildcard() {
3243 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3244
3245 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
3246 let normalized = normalize_wildcard_to_loopback(wildcard);
3247
3248 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
3249 assert_eq!(normalized.port(), 8080);
3250 }
3251
3252 #[test]
3253 fn test_normalize_ipv4_wildcard() {
3254 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
3255
3256 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
3257 let normalized = normalize_wildcard_to_loopback(wildcard);
3258
3259 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
3260 assert_eq!(normalized.port(), 9000);
3261 }
3262
3263 #[test]
3264 fn test_normalize_specific_address_unchanged() {
3265 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
3266 let normalized = normalize_wildcard_to_loopback(specific);
3267
3268 assert_eq!(normalized, specific);
3269 }
3270
3271 #[test]
3272 fn test_normalize_loopback_unchanged() {
3273 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
3274
3275 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
3276 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
3277 assert_eq!(normalized_v6, loopback_v6);
3278
3279 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
3280 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
3281 assert_eq!(normalized_v4, loopback_v4);
3282 }
3283
3284 fn current_timestamp() -> u64 {
3288 std::time::SystemTime::now()
3289 .duration_since(std::time::UNIX_EPOCH)
3290 .map(|d| d.as_secs())
3291 .unwrap_or(0)
3292 }
3293
3294 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
3296 let msg = WireMessage {
3297 protocol: protocol.to_string(),
3298 data,
3299 from: PeerId::from_name(from),
3300 timestamp,
3301 user_agent: String::new(),
3302 public_key: Vec::new(),
3303 signature: Vec::new(),
3304 };
3305 postcard::to_stdvec(&msg).unwrap()
3306 }
3307
3308 #[test]
3309 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
3310 let transport_id = "abcdef0123456789";
3313 let logical_id = "spoofed-logical-id";
3314 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
3315
3316 let parsed =
3317 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
3318
3319 assert!(parsed.authenticated_node_id.is_none());
3321
3322 match parsed.event {
3323 P2PEvent::Message {
3324 topic,
3325 source,
3326 data,
3327 } => {
3328 assert!(source.is_none(), "unsigned message source must be None");
3329 assert_eq!(topic, "test/v1");
3330 assert_eq!(data, vec![1u8, 2, 3]);
3331 }
3332 other => panic!("expected P2PEvent::Message, got {:?}", other),
3333 }
3334 }
3335
3336 #[test]
3337 fn test_parse_protocol_message_rejects_invalid_bytes() {
3338 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
3340 }
3341
3342 #[test]
3343 fn test_parse_protocol_message_rejects_truncated_message() {
3344 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
3346 let truncated = &full_bytes[..full_bytes.len() / 2];
3347 assert!(parse_protocol_message(truncated, "peer-id").is_none());
3348 }
3349
3350 #[test]
3351 fn test_parse_protocol_message_empty_payload() {
3352 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
3353
3354 let parsed = parse_protocol_message(&bytes, "transport-peer")
3355 .expect("valid message with empty data should parse");
3356
3357 match parsed.event {
3358 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
3359 other => panic!("expected P2PEvent::Message, got {:?}", other),
3360 }
3361 }
3362
3363 #[test]
3364 fn test_parse_protocol_message_preserves_binary_payload() {
3365 let payload: Vec<u8> = (0..=255).collect();
3367 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
3368
3369 let parsed = parse_protocol_message(&bytes, "peer-id")
3370 .expect("valid message with full byte range should parse");
3371
3372 match parsed.event {
3373 P2PEvent::Message { data, topic, .. } => {
3374 assert_eq!(topic, "binary/v1");
3375 assert_eq!(
3376 data, payload,
3377 "payload must survive bincode round-trip exactly"
3378 );
3379 }
3380 other => panic!("expected P2PEvent::Message, got {:?}", other),
3381 }
3382 }
3383
3384 #[test]
3385 fn test_parse_signed_message_verifies_and_uses_node_id() {
3386 let identity = NodeIdentity::generate().expect("should generate identity");
3387 let protocol = "test/signed";
3388 let data: Vec<u8> = vec![10, 20, 30];
3389 let from = *identity.peer_id();
3391 let timestamp = current_timestamp();
3392 let user_agent = "test/1.0";
3393
3394 let signable =
3396 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3397 .unwrap();
3398 let sig = identity.sign(&signable).expect("signing should succeed");
3399
3400 let msg = WireMessage {
3401 protocol: protocol.to_string(),
3402 data: data.clone(),
3403 from,
3404 timestamp,
3405 user_agent: user_agent.to_string(),
3406 public_key: identity.public_key().as_bytes().to_vec(),
3407 signature: sig.as_bytes().to_vec(),
3408 };
3409 let bytes = postcard::to_stdvec(&msg).unwrap();
3410
3411 let parsed =
3412 parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
3413
3414 let expected_peer_id = *identity.peer_id();
3415 assert_eq!(
3416 parsed.authenticated_node_id.as_ref(),
3417 Some(&expected_peer_id)
3418 );
3419
3420 match parsed.event {
3421 P2PEvent::Message { source, .. } => {
3422 assert_eq!(
3423 source.as_ref(),
3424 Some(&expected_peer_id),
3425 "source should be the verified PeerId"
3426 );
3427 }
3428 other => panic!("expected P2PEvent::Message, got {:?}", other),
3429 }
3430 }
3431
3432 #[test]
3433 fn test_parse_message_with_bad_signature_is_rejected() {
3434 let identity = NodeIdentity::generate().expect("should generate identity");
3435 let protocol = "test/bad-sig";
3436 let data: Vec<u8> = vec![1, 2, 3];
3437 let from = *identity.peer_id();
3438 let timestamp = current_timestamp();
3439 let user_agent = "test/1.0";
3440
3441 let signable =
3443 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
3444 .unwrap();
3445 let sig = identity.sign(&signable).expect("signing should succeed");
3446
3447 let msg = WireMessage {
3449 protocol: protocol.to_string(),
3450 data: vec![99, 99, 99],
3451 from,
3452 timestamp,
3453 user_agent: user_agent.to_string(),
3454 public_key: identity.public_key().as_bytes().to_vec(),
3455 signature: sig.as_bytes().to_vec(),
3456 };
3457 let bytes = postcard::to_stdvec(&msg).unwrap();
3458
3459 assert!(
3460 parse_protocol_message(&bytes, "transport-xyz").is_none(),
3461 "message with bad signature should be rejected"
3462 );
3463 }
3464
3465 #[test]
3466 fn test_parse_message_with_mismatched_from_is_rejected() {
3467 let identity = NodeIdentity::generate().expect("should generate identity");
3468 let protocol = "test/from-mismatch";
3469 let data: Vec<u8> = vec![1, 2, 3];
3470 let fake_from = PeerId::from_bytes([0xDE; 32]);
3472 let timestamp = current_timestamp();
3473 let user_agent = "test/1.0";
3474
3475 let signable =
3476 postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
3477 .unwrap();
3478 let sig = identity.sign(&signable).expect("signing should succeed");
3479
3480 let msg = WireMessage {
3481 protocol: protocol.to_string(),
3482 data,
3483 from: fake_from,
3484 timestamp,
3485 user_agent: user_agent.to_string(),
3486 public_key: identity.public_key().as_bytes().to_vec(),
3487 signature: sig.as_bytes().to_vec(),
3488 };
3489 let bytes = postcard::to_stdvec(&msg).unwrap();
3490
3491 assert!(
3492 parse_protocol_message(&bytes, "transport-xyz").is_none(),
3493 "message with mismatched from field should be rejected"
3494 );
3495 }
3496}