1use crate::PeerId;
20use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
21use crate::bootstrap::{BootstrapConfig, BootstrapManager};
22use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
23use crate::error::{IdentityError, NetworkError, P2PError, P2pResult as Result};
24
25use crate::MultiAddr;
26use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
27use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
28use parking_lot::Mutex as ParkingMutex;
29use serde::{Deserialize, Serialize};
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::time::Duration;
34use tokio::sync::{Mutex as TokioMutex, RwLock, broadcast};
35use tokio::time::Instant;
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, info, trace, warn};
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
43pub(crate) struct WireMessage {
44 pub(crate) protocol: String,
46 pub(crate) data: Vec<u8>,
48 pub(crate) from: PeerId,
50 pub(crate) timestamp: u64,
52 #[serde(default)]
58 pub(crate) user_agent: String,
59 #[serde(default)]
61 pub(crate) public_key: Vec<u8>,
62 #[serde(default)]
64 pub(crate) signature: Vec<u8>,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
73pub enum NodeMode {
74 #[default]
76 Node,
77 Client,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83enum ListenMode {
84 Public,
86 Local,
88}
89
90pub fn user_agent_for_mode(mode: NodeMode) -> String {
95 let prefix = match mode {
96 NodeMode::Node => "node",
97 NodeMode::Client => "client",
98 };
99 format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
100}
101
102pub fn is_dht_participant(user_agent: &str) -> bool {
104 user_agent.starts_with("node/")
105}
106
107pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
109
110pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
112
113pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
115
116const DEFAULT_LISTEN_PORT: u16 = 9000;
118
119const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
121
122const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
124
125const DHT_MAX_DISTANCE: u8 = 160;
127
128const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
130
131const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 10;
133
134const fn default_true() -> bool {
136 true
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct NodeConfig {
142 #[serde(default)]
148 pub local: bool,
149
150 #[serde(default)]
152 pub port: u16,
153
154 #[serde(default = "default_true")]
159 pub ipv6: bool,
160
161 pub bootstrap_peers: Vec<crate::MultiAddr>,
163
164 pub connection_timeout: Duration,
167
168 pub max_connections: usize,
170
171 pub dht_config: DHTConfig,
173
174 pub bootstrap_cache_config: Option<BootstrapConfig>,
176
177 pub diversity_config: Option<crate::security::IPDiversityConfig>,
182
183 #[serde(default)]
187 pub max_message_size: Option<usize>,
188
189 #[serde(skip)]
194 pub node_identity: Option<Arc<NodeIdentity>>,
195
196 #[serde(default)]
202 pub mode: NodeMode,
203
204 #[serde(default, skip_serializing_if = "Option::is_none")]
209 pub custom_user_agent: Option<String>,
210
211 #[serde(default)]
219 pub allow_loopback: bool,
220
221 #[serde(default)]
229 pub adaptive_dht_config: AdaptiveDhtConfig,
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct DHTConfig {
235 pub k_value: usize,
237
238 pub alpha_value: usize,
240
241 pub refresh_interval: Duration,
243}
244
245#[inline]
258fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
259 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
260
261 let (v4, v6) = match mode {
262 ListenMode::Public => (
263 std::net::Ipv4Addr::UNSPECIFIED,
264 std::net::Ipv6Addr::UNSPECIFIED,
265 ),
266 ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
267 };
268
269 if ipv6_enabled {
270 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
271 std::net::IpAddr::V6(v6),
272 port,
273 )));
274 }
275
276 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
277 std::net::IpAddr::V4(v4),
278 port,
279 )));
280
281 addrs
282}
283
284impl NodeConfig {
285 pub fn user_agent(&self) -> String {
290 self.custom_user_agent
291 .clone()
292 .unwrap_or_else(|| user_agent_for_mode(self.mode))
293 }
294
295 pub fn listen_addrs(&self) -> Vec<MultiAddr> {
300 let mode = if self.local {
301 ListenMode::Local
302 } else {
303 ListenMode::Public
304 };
305 build_listen_addrs(self.port, self.ipv6, mode)
306 }
307
308 pub fn new() -> Result<Self> {
314 Ok(Self::default())
315 }
316
317 pub fn builder() -> NodeConfigBuilder {
319 NodeConfigBuilder::default()
320 }
321}
322
323#[derive(Debug, Clone)]
346pub struct NodeConfigBuilder {
347 port: u16,
348 ipv6: bool,
349 local: bool,
350 bootstrap_peers: Vec<crate::MultiAddr>,
351 max_connections: Option<usize>,
352 connection_timeout: Option<Duration>,
353 dht_config: Option<DHTConfig>,
354 max_message_size: Option<usize>,
355 mode: NodeMode,
356 custom_user_agent: Option<String>,
357 allow_loopback: Option<bool>,
358 adaptive_dht_config: Option<AdaptiveDhtConfig>,
359}
360
361impl Default for NodeConfigBuilder {
362 fn default() -> Self {
363 Self {
364 port: 0,
365 ipv6: true,
366 local: false,
367 bootstrap_peers: Vec::new(),
368 max_connections: None,
369 connection_timeout: None,
370 dht_config: None,
371 max_message_size: None,
372 mode: NodeMode::default(),
373 custom_user_agent: None,
374 allow_loopback: None,
375 adaptive_dht_config: None,
376 }
377 }
378}
379
380impl NodeConfigBuilder {
381 pub fn port(mut self, port: u16) -> Self {
383 self.port = port;
384 self
385 }
386
387 pub fn ipv6(mut self, enabled: bool) -> Self {
389 self.ipv6 = enabled;
390 self
391 }
392
393 pub fn local(mut self, local: bool) -> Self {
400 self.local = local;
401 self
402 }
403
404 pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
406 self.bootstrap_peers.push(addr);
407 self
408 }
409
410 pub fn max_connections(mut self, max: usize) -> Self {
412 self.max_connections = Some(max);
413 self
414 }
415
416 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
418 self.connection_timeout = Some(timeout);
419 self
420 }
421
422 pub fn dht_config(mut self, config: DHTConfig) -> Self {
424 self.dht_config = Some(config);
425 self
426 }
427
428 pub fn max_message_size(mut self, max_message_size: usize) -> Self {
432 self.max_message_size = Some(max_message_size);
433 self
434 }
435
436 pub fn mode(mut self, mode: NodeMode) -> Self {
438 self.mode = mode;
439 self
440 }
441
442 pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
444 self.custom_user_agent = Some(user_agent.into());
445 self
446 }
447
448 pub fn allow_loopback(mut self, allow: bool) -> Self {
452 self.allow_loopback = Some(allow);
453 self
454 }
455
456 pub fn trust_enforcement(mut self, enabled: bool) -> Self {
468 let threshold = if enabled {
469 AdaptiveDhtConfig::default().block_threshold
470 } else {
471 0.0
472 };
473 self.adaptive_dht_config = Some(AdaptiveDhtConfig {
474 block_threshold: threshold,
475 });
476 self
477 }
478
479 pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
483 self.adaptive_dht_config = Some(config);
484 self
485 }
486
487 pub fn build(self) -> Result<NodeConfig> {
493 let allow_loopback = self.allow_loopback.unwrap_or(self.local);
495
496 Ok(NodeConfig {
497 local: self.local,
498 port: self.port,
499 ipv6: self.ipv6,
500 bootstrap_peers: self.bootstrap_peers,
501 connection_timeout: self
502 .connection_timeout
503 .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
504 max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
505 dht_config: self.dht_config.unwrap_or_default(),
506 bootstrap_cache_config: None,
507 diversity_config: None,
508 max_message_size: self.max_message_size,
509 node_identity: None,
510 mode: self.mode,
511 custom_user_agent: self.custom_user_agent,
512 allow_loopback,
513 adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
514 })
515 }
516}
517
518impl Default for NodeConfig {
519 fn default() -> Self {
520 Self {
521 local: false,
522 port: DEFAULT_LISTEN_PORT,
523 ipv6: true,
524 bootstrap_peers: Vec::new(),
525 connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
526 max_connections: DEFAULT_MAX_CONNECTIONS,
527 dht_config: DHTConfig::default(),
528 bootstrap_cache_config: None,
529 diversity_config: None,
530 max_message_size: None,
531 node_identity: None,
532 mode: NodeMode::default(),
533 custom_user_agent: None,
534 allow_loopback: false,
535 adaptive_dht_config: AdaptiveDhtConfig::default(),
536 }
537 }
538}
539
540impl DHTConfig {
541 const DEFAULT_K_VALUE: usize = 20;
542 const DEFAULT_ALPHA_VALUE: usize = 5;
543 const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
544}
545
546impl Default for DHTConfig {
547 fn default() -> Self {
548 Self {
549 k_value: Self::DEFAULT_K_VALUE,
550 alpha_value: Self::DEFAULT_ALPHA_VALUE,
551 refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
552 }
553 }
554}
555
556#[derive(Debug, Clone)]
558pub struct PeerInfo {
559 #[allow(dead_code)]
561 pub(crate) channel_id: String,
562
563 pub addresses: Vec<MultiAddr>,
565
566 pub connected_at: Instant,
568
569 pub last_seen: Instant,
571
572 pub status: ConnectionStatus,
574
575 pub protocols: Vec<String>,
577
578 pub heartbeat_count: u64,
580}
581
582#[derive(Debug, Clone, PartialEq)]
584pub enum ConnectionStatus {
585 Connecting,
587 Connected,
589 Disconnecting,
591 Disconnected,
593 Failed(String),
595}
596
597#[derive(Debug, Clone)]
602pub enum P2PEvent {
603 Message {
605 topic: String,
607 source: Option<PeerId>,
610 data: Vec<u8>,
612 },
613 PeerConnected(PeerId, String),
616 PeerDisconnected(PeerId),
618}
619
620#[derive(Debug, Clone)]
625pub struct PeerResponse {
626 pub peer_id: PeerId,
628 pub data: Vec<u8>,
630 pub latency: Duration,
632}
633
634#[derive(Debug, Clone, Serialize, Deserialize)]
639pub(crate) struct RequestResponseEnvelope {
640 pub(crate) message_id: String,
642 pub(crate) is_response: bool,
644 pub(crate) payload: Vec<u8>,
646}
647
648pub(crate) struct PendingRequest {
650 pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
652 pub(crate) expected_peer: PeerId,
654}
655
656const RECONNECT_IDENTITY_TIMEOUT: Duration = Duration::from_secs(5);
658
659const QUIC_TEARDOWN_GRACE: Duration = Duration::from_millis(100);
665
666pub struct P2PNode {
677 config: NodeConfig,
679
680 peer_id: PeerId,
682
683 transport: Arc<crate::transport_handle::TransportHandle>,
685
686 start_time: Instant,
688
689 shutdown: CancellationToken,
691
692 adaptive_dht: AdaptiveDHT,
695
696 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
698
699 is_bootstrapped: Arc<AtomicBool>,
701
702 is_started: Arc<AtomicBool>,
704
705 reconnect_locks: ParkingMutex<HashMap<PeerId, Arc<TokioMutex<()>>>>,
709}
710
711pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
727 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
728
729 if addr.ip().is_unspecified() {
730 let loopback_ip = match addr {
732 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
735 std::net::SocketAddr::new(loopback_ip, addr.port())
736 } else {
737 addr
739 }
740}
741
742impl P2PNode {
743 pub async fn new(config: NodeConfig) -> Result<Self> {
745 let node_identity = match config.node_identity.clone() {
747 Some(identity) => identity,
748 None => Arc::new(NodeIdentity::generate()?),
749 };
750
751 let peer_id = *node_identity.peer_id();
753
754 let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
756 let bootstrap_manager =
757 match BootstrapManager::with_node_config(bootstrap_config, &config).await {
758 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
759 Err(e) => {
760 warn!("Failed to initialize bootstrap manager: {e}, continuing without cache");
761 None
762 }
763 };
764
765 let transport_config = crate::transport_handle::TransportConfig::from_node_config(
767 &config,
768 crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
769 node_identity.clone(),
770 );
771 let transport =
772 Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
773
774 let manager_dht_config = crate::dht::DHTConfig {
776 bucket_size: config.dht_config.k_value,
777 alpha: config.dht_config.alpha_value,
778 bucket_refresh_interval: config.dht_config.refresh_interval,
779 max_distance: DHT_MAX_DISTANCE,
780 };
781 let dht_manager_config = DhtNetworkConfig {
782 peer_id,
783 dht_config: manager_dht_config,
784 node_config: config.clone(),
785 request_timeout: config.connection_timeout,
786 max_concurrent_operations: MAX_ACTIVE_REQUESTS,
787 enable_security: true,
788 block_threshold: 0.0, };
790 let adaptive_dht = AdaptiveDHT::new(
791 transport.clone(),
792 dht_manager_config,
793 config.adaptive_dht_config.clone(),
794 )
795 .await?;
796
797 let node = Self {
798 config,
799 peer_id,
800 transport,
801 start_time: Instant::now(),
802 shutdown: CancellationToken::new(),
803 adaptive_dht,
804 bootstrap_manager,
805 is_bootstrapped: Arc::new(AtomicBool::new(false)),
806 is_started: Arc::new(AtomicBool::new(false)),
807 reconnect_locks: ParkingMutex::new(HashMap::new()),
808 };
809 info!(
810 "Created P2P node with peer ID: {} (call start() to begin networking)",
811 node.peer_id
812 );
813
814 Ok(node)
815 }
816
817 pub fn peer_id(&self) -> &PeerId {
819 &self.peer_id
820 }
821
822 pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
824 &self.transport
825 }
826
827 pub fn local_addr(&self) -> Option<MultiAddr> {
828 self.transport.local_addr()
829 }
830
831 pub fn is_bootstrapped(&self) -> bool {
836 self.is_bootstrapped.load(Ordering::SeqCst)
837 }
838
839 pub async fn re_bootstrap(&self) -> Result<()> {
844 self.is_bootstrapped.store(false, Ordering::SeqCst);
845 self.connect_bootstrap_peers().await
846 }
847
848 pub fn trust_engine(&self) -> Arc<TrustEngine> {
854 self.adaptive_dht.trust_engine().clone()
855 }
856
857 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
872 self.adaptive_dht.report_trust_event(peer_id, event).await;
873 }
874
875 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
879 self.adaptive_dht.peer_trust(peer_id)
880 }
881
882 pub fn adaptive_dht(&self) -> &AdaptiveDHT {
884 &self.adaptive_dht
885 }
886
887 pub async fn send_request(
920 &self,
921 peer_id: &PeerId,
922 protocol: &str,
923 data: Vec<u8>,
924 timeout: Duration,
925 ) -> Result<PeerResponse> {
926 if self.adaptive_dht.peer_trust(peer_id) < self.adaptive_dht.config().block_threshold {
928 return Err(P2PError::Network(crate::error::NetworkError::PeerBlocked(
929 *peer_id,
930 )));
931 }
932
933 match self
934 .transport
935 .send_request(peer_id, protocol, data, timeout)
936 .await
937 {
938 Ok(resp) => {
939 self.report_trust_event(peer_id, TrustEvent::SuccessfulResponse)
940 .await;
941 Ok(resp)
942 }
943 Err(e) => {
944 let event = if matches!(&e, P2PError::Timeout(_)) {
945 TrustEvent::ConnectionTimeout
946 } else {
947 TrustEvent::ConnectionFailed
948 };
949 self.report_trust_event(peer_id, event).await;
950 Err(e)
951 }
952 }
953 }
954
955 pub async fn send_response(
956 &self,
957 peer_id: &PeerId,
958 protocol: &str,
959 message_id: &str,
960 data: Vec<u8>,
961 ) -> Result<()> {
962 self.transport
963 .send_response(peer_id, protocol, message_id, data)
964 .await
965 }
966
967 pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
968 crate::transport_handle::TransportHandle::parse_request_envelope(data)
969 }
970
971 pub async fn subscribe(&self, topic: &str) -> Result<()> {
972 self.transport.subscribe(topic).await
973 }
974
975 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
976 self.transport.publish(topic, data).await
977 }
978
979 pub fn config(&self) -> &NodeConfig {
981 &self.config
982 }
983
984 pub async fn start(&self) -> Result<()> {
986 info!("Starting P2P node...");
987
988 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
990 let mut manager = bootstrap_manager.write().await;
991 manager
992 .start_maintenance()
993 .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
994 info!("Bootstrap cache manager started");
995 }
996
997 self.transport.start_network_listeners().await?;
999
1000 self.adaptive_dht.start().await?;
1002
1003 let listen_addrs = self.transport.listen_addrs().await;
1005 info!("P2P node started on addresses: {:?}", listen_addrs);
1006
1007 self.connect_bootstrap_peers().await?;
1013
1014 self.is_started
1015 .store(true, std::sync::atomic::Ordering::Release);
1016
1017 Ok(())
1018 }
1019
1020 pub async fn run(&self) -> Result<()> {
1025 if !self.is_running() {
1026 self.start().await?;
1027 }
1028
1029 info!("P2P node running...");
1030
1031 self.shutdown.cancelled().await;
1034
1035 info!("P2P node stopped");
1036 Ok(())
1037 }
1038
1039 pub async fn stop(&self) -> Result<()> {
1041 info!("Stopping P2P node...");
1042
1043 self.shutdown.cancel();
1045
1046 self.adaptive_dht.stop().await?;
1048
1049 self.transport.stop().await?;
1051
1052 self.is_started
1053 .store(false, std::sync::atomic::Ordering::Release);
1054
1055 info!("P2P node stopped");
1056 Ok(())
1057 }
1058
1059 pub async fn shutdown(&self) -> Result<()> {
1061 self.stop().await
1062 }
1063
1064 pub fn is_running(&self) -> bool {
1066 self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1067 }
1068
1069 pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1071 self.transport.listen_addrs().await
1072 }
1073
1074 pub async fn connected_peers(&self) -> Vec<PeerId> {
1076 self.transport.connected_peers().await
1077 }
1078
1079 pub async fn peer_count(&self) -> usize {
1081 self.transport.peer_count().await
1082 }
1083
1084 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1086 self.transport.peer_info(peer_id).await
1087 }
1088
1089 #[allow(dead_code)]
1091 pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1092 self.transport.get_channel_id_by_address(addr).await
1093 }
1094
1095 #[allow(dead_code)]
1097 pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1098 self.transport.list_active_connections().await
1099 }
1100
1101 #[allow(dead_code)]
1103 pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1104 self.transport.remove_channel(channel_id).await
1105 }
1106
1107 pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1112 self.transport.disconnect_channel(channel_id).await;
1113 }
1114
1115 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1117 self.transport.is_peer_connected(peer_id).await
1118 }
1119
1120 pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1127 self.transport.connect_peer(address).await
1128 }
1129
1130 pub async fn wait_for_peer_identity(
1137 &self,
1138 channel_id: &str,
1139 timeout: Duration,
1140 ) -> Result<PeerId> {
1141 self.transport
1142 .wait_for_peer_identity(channel_id, timeout)
1143 .await
1144 }
1145
1146 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1148 self.transport.disconnect_peer(peer_id).await
1149 }
1150
1151 #[allow(dead_code)]
1153 pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1154 self.transport.is_connection_active(channel_id).await
1155 }
1156
1157 pub async fn send_message(
1171 &self,
1172 peer_id: &PeerId,
1173 protocol: &str,
1174 data: Vec<u8>,
1175 addrs: &[MultiAddr],
1176 ) -> Result<()> {
1177 let existing_channels = self.transport.channels_for_peer(peer_id).await;
1182
1183 if existing_channels.is_empty() {
1186 let lock = self.reconnect_lock_for(peer_id);
1187 let _guard = lock.lock().await;
1188
1189 if self.transport.is_peer_connected(peer_id).await {
1191 return self.transport.send_message(peer_id, protocol, data).await;
1192 }
1193
1194 return self
1195 .reconnect_and_send(peer_id, protocol, data, addrs, &[], &[])
1196 .await;
1197 }
1198
1199 let saved_addrs: Vec<MultiAddr> = self
1202 .transport
1203 .peer_info(peer_id)
1204 .await
1205 .map(|info| info.addresses)
1206 .unwrap_or_default();
1207
1208 let retry_data = data.clone();
1211
1212 match self.transport.send_message(peer_id, protocol, data).await {
1214 Ok(()) => return Ok(()),
1215 Err(e) => {
1216 debug!(
1217 peer = %peer_id.to_hex(),
1218 error = %e,
1219 "send failed, attempting reconnect",
1220 );
1221 }
1222 }
1223
1224 let lock = self.reconnect_lock_for(peer_id);
1227 let _guard = lock.lock().await;
1228
1229 if self.transport.is_peer_connected(peer_id).await {
1231 for channel_id in &existing_channels {
1235 self.transport.disconnect_channel(channel_id).await;
1236 }
1237 return self
1238 .transport
1239 .send_message(peer_id, protocol, retry_data)
1240 .await;
1241 }
1242
1243 self.reconnect_and_send(
1244 peer_id,
1245 protocol,
1246 retry_data,
1247 addrs,
1248 &saved_addrs,
1249 &existing_channels,
1250 )
1251 .await
1252 }
1253
1254 async fn reconnect_and_send(
1256 &self,
1257 peer_id: &PeerId,
1258 protocol: &str,
1259 data: Vec<u8>,
1260 addrs: &[MultiAddr],
1261 saved_addrs: &[MultiAddr],
1262 stale_channels: &[String],
1263 ) -> Result<()> {
1264 let address = self
1266 .resolve_dial_address(peer_id, addrs, saved_addrs)
1267 .await
1268 .ok_or_else(|| {
1269 P2PError::Network(NetworkError::PeerNotFound(peer_id.to_hex().into()))
1270 })?;
1271
1272 if !stale_channels.is_empty() {
1278 for channel_id in stale_channels {
1279 self.transport.disconnect_channel(channel_id).await;
1280 }
1281 tokio::time::sleep(QUIC_TEARDOWN_GRACE).await;
1282 }
1283
1284 let channel_id = self.transport.connect_peer(&address).await?;
1286 let authenticated = match self
1287 .transport
1288 .wait_for_peer_identity(&channel_id, RECONNECT_IDENTITY_TIMEOUT)
1289 .await
1290 {
1291 Ok(peer) => peer,
1292 Err(e) => {
1293 self.transport.disconnect_channel(&channel_id).await;
1296 return Err(e);
1297 }
1298 };
1299
1300 if &authenticated != peer_id {
1301 self.transport.disconnect_channel(&channel_id).await;
1302 return Err(P2PError::Identity(IdentityError::IdentityMismatch {
1303 expected: peer_id.to_hex().into(),
1304 actual: authenticated.to_hex().into(),
1305 }));
1306 }
1307
1308 self.transport.send_message(peer_id, protocol, data).await
1310 }
1311
1312 async fn resolve_dial_address(
1318 &self,
1319 peer_id: &PeerId,
1320 caller_addrs: &[MultiAddr],
1321 saved_addrs: &[MultiAddr],
1322 ) -> Option<MultiAddr> {
1323 if let Some(addr) = Self::first_dialable(caller_addrs) {
1325 return Some(addr);
1326 }
1327
1328 if let Some(addr) = Self::first_dialable(saved_addrs) {
1331 return Some(addr);
1332 }
1333
1334 let dht_addrs = self.adaptive_dht.peer_addresses_for_dial(peer_id).await;
1336 Self::first_dialable(&dht_addrs)
1337 }
1338
1339 fn first_dialable(addrs: &[MultiAddr]) -> Option<MultiAddr> {
1342 addrs
1343 .iter()
1344 .find(|a| {
1345 let dialable = a
1346 .dialable_socket_addr()
1347 .is_some_and(|sa| !sa.ip().is_unspecified());
1348 if !dialable {
1349 trace!(address = %a, "skipping non-dialable address");
1350 }
1351 dialable
1352 })
1353 .cloned()
1354 }
1355
1356 fn reconnect_lock_for(&self, peer_id: &PeerId) -> Arc<TokioMutex<()>> {
1358 self.reconnect_locks
1359 .lock()
1360 .entry(*peer_id)
1361 .or_insert_with(|| Arc::new(TokioMutex::new(())))
1362 .clone()
1363 }
1364}
1365
1366const MAX_MESSAGE_AGE_SECS: u64 = 300;
1381const MAX_FUTURE_SECS: u64 = 30;
1383
1384fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1386 P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1387}
1388
1389pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1391 if let Err(e) = tx.send(event) {
1392 tracing::trace!("Event broadcast has no receivers: {e}");
1393 }
1394}
1395
1396pub(crate) struct ParsedMessage {
1398 pub(crate) event: P2PEvent,
1400 pub(crate) authenticated_node_id: Option<PeerId>,
1402 pub(crate) user_agent: String,
1404}
1405
1406pub(crate) fn parse_protocol_message(bytes: &[u8], source: &str) -> Option<ParsedMessage> {
1407 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1408
1409 let now = std::time::SystemTime::now()
1411 .duration_since(std::time::UNIX_EPOCH)
1412 .map(|d| d.as_secs())
1413 .unwrap_or(0);
1414
1415 if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1417 tracing::warn!(
1418 "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1419 source,
1420 message.timestamp,
1421 now.saturating_sub(message.timestamp)
1422 );
1423 return None;
1424 }
1425
1426 if message.timestamp > now + MAX_FUTURE_SECS {
1428 tracing::warn!(
1429 "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1430 source,
1431 message.timestamp,
1432 message.timestamp.saturating_sub(now)
1433 );
1434 return None;
1435 }
1436
1437 let authenticated_node_id = if !message.signature.is_empty() {
1439 match verify_message_signature(&message) {
1440 Ok(peer_id) => {
1441 debug!(
1442 "Message from {} authenticated as app-level NodeId {}",
1443 source, peer_id
1444 );
1445 Some(peer_id)
1446 }
1447 Err(e) => {
1448 warn!(
1449 "Rejecting message from {}: signature verification failed: {}",
1450 source, e
1451 );
1452 return None;
1453 }
1454 }
1455 } else {
1456 None
1457 };
1458
1459 debug!(
1460 "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1461 message.protocol,
1462 authenticated_node_id,
1463 source,
1464 message.from,
1465 message.data.len()
1466 );
1467
1468 Some(ParsedMessage {
1469 event: P2PEvent::Message {
1470 topic: message.protocol,
1471 source: authenticated_node_id,
1472 data: message.data,
1473 },
1474 authenticated_node_id,
1475 user_agent: message.user_agent,
1476 })
1477}
1478
1479fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1486 let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1487 .map_err(|e| format!("invalid public key: {e:?}"))?;
1488
1489 let peer_id = peer_id_from_public_key(&pubkey);
1490
1491 if message.from != peer_id {
1493 return Err(format!(
1494 "from field mismatch: message claims '{}' but public key derives '{}'",
1495 message.from, peer_id
1496 ));
1497 }
1498
1499 let signable = postcard::to_stdvec(&(
1500 &message.protocol,
1501 &message.data as &[u8],
1502 &message.from,
1503 message.timestamp,
1504 &message.user_agent,
1505 ))
1506 .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1507
1508 let sig = MlDsaSignature::from_bytes(&message.signature)
1509 .map_err(|e| format!("invalid signature: {e:?}"))?;
1510
1511 let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1512 .map_err(|e| format!("verification error: {e}"))?;
1513
1514 if valid {
1515 Ok(peer_id)
1516 } else {
1517 Err("signature is invalid".to_string())
1518 }
1519}
1520
1521impl P2PNode {
1522 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1524 self.transport.subscribe_events()
1525 }
1526
1527 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1529 self.subscribe_events()
1530 }
1531
1532 pub fn uptime(&self) -> Duration {
1534 self.start_time.elapsed()
1535 }
1536
1537 pub async fn health_check(&self) -> Result<()> {
1550 let peer_count = self.peer_count().await;
1551 if peer_count > self.config.max_connections {
1552 Err(protocol_error(format!(
1553 "Too many connections: {peer_count}"
1554 )))
1555 } else {
1556 Ok(())
1557 }
1558 }
1559
1560 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1562 self.adaptive_dht.dht_manager()
1563 }
1564
1565 pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1567 self.dht_manager()
1568 }
1569
1570 pub async fn add_discovered_peer(
1572 &self,
1573 _peer_id: PeerId,
1574 addresses: Vec<MultiAddr>,
1575 ) -> Result<()> {
1576 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1577 let manager = bootstrap_manager.read().await;
1578 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1579 .iter()
1580 .filter_map(|addr| addr.socket_addr())
1581 .collect();
1582 if let Some(&primary) = socket_addresses.first() {
1583 manager
1584 .add_peer(&primary, socket_addresses)
1585 .await
1586 .map_err(|e| {
1587 protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1588 })?;
1589 }
1590 }
1591 Ok(())
1592 }
1593
1594 pub async fn update_peer_metrics(
1596 &self,
1597 addr: &MultiAddr,
1598 success: bool,
1599 latency_ms: Option<u64>,
1600 _error: Option<String>,
1601 ) -> Result<()> {
1602 if let Some(ref bootstrap_manager) = self.bootstrap_manager
1603 && let Some(sa) = addr.socket_addr()
1604 {
1605 let manager = bootstrap_manager.read().await;
1606 if success {
1607 let rtt_ms = latency_ms.unwrap_or(0) as u32;
1608 manager.record_success(&sa, rtt_ms).await;
1609 } else {
1610 manager.record_failure(&sa).await;
1611 }
1612 }
1613 Ok(())
1614 }
1615
1616 pub async fn get_bootstrap_cache_stats(
1618 &self,
1619 ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1620 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1621 let manager = bootstrap_manager.read().await;
1622 Ok(Some(manager.stats().await))
1623 } else {
1624 Ok(None)
1625 }
1626 }
1627
1628 pub async fn cached_peer_count(&self) -> usize {
1630 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1631 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1632 {
1633 return stats.total_peers;
1634 }
1635 0
1636 }
1637
1638 async fn connect_bootstrap_peers(&self) -> Result<()> {
1640 let mut bootstrap_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
1642 let mut used_cache = false;
1643 let mut seen_addresses = std::collections::HashSet::new();
1644
1645 if !self.config.bootstrap_peers.is_empty() {
1647 info!(
1648 "Using {} configured bootstrap peers (priority)",
1649 self.config.bootstrap_peers.len()
1650 );
1651 for multiaddr in &self.config.bootstrap_peers {
1652 let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
1653 warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
1654 continue;
1655 };
1656 seen_addresses.insert(socket_addr);
1657 bootstrap_addr_sets.push(vec![multiaddr.clone()]);
1658 }
1659 }
1660
1661 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1663 let manager = bootstrap_manager.read().await;
1664 let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
1665 if !cached_peers.is_empty() {
1666 let mut added_from_cache = 0;
1667 for cached in cached_peers {
1668 let mut addrs = vec![cached.primary_address];
1669 addrs.extend(cached.addresses);
1670 let new_addresses: Vec<MultiAddr> = addrs
1672 .into_iter()
1673 .filter(|a| !seen_addresses.contains(a))
1674 .map(MultiAddr::quic)
1675 .collect();
1676
1677 if !new_addresses.is_empty() {
1678 for addr in &new_addresses {
1679 if let Some(sa) = addr.socket_addr() {
1680 seen_addresses.insert(sa);
1681 }
1682 }
1683 bootstrap_addr_sets.push(new_addresses);
1684 added_from_cache += 1;
1685 }
1686 }
1687 if added_from_cache > 0 {
1688 info!(
1689 "Added {} cached bootstrap peers (supplementing CLI peers)",
1690 added_from_cache
1691 );
1692 used_cache = true;
1693 }
1694 }
1695 }
1696
1697 if bootstrap_addr_sets.is_empty() {
1698 info!("No bootstrap peers configured and no cached peers available");
1699 return Ok(());
1700 }
1701
1702 let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
1705 let mut successful_connections = 0;
1706 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
1707
1708 for addrs in &bootstrap_addr_sets {
1709 for addr in addrs {
1710 match self.connect_peer(addr).await {
1711 Ok(channel_id) => {
1712 match self
1715 .transport
1716 .wait_for_peer_identity(&channel_id, identity_timeout)
1717 .await
1718 {
1719 Ok(real_peer_id) => {
1720 successful_connections += 1;
1721 connected_peer_ids.push(real_peer_id);
1722
1723 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1725 let manager = bootstrap_manager.read().await;
1726 if let Some(sa) = addr.socket_addr() {
1727 manager.record_success(&sa, 100).await;
1728 }
1729 }
1730 break; }
1732 Err(e) => {
1733 warn!(
1734 "Timeout waiting for identity from bootstrap peer {}: {}, \
1735 closing channel {}",
1736 addr, e, channel_id
1737 );
1738 self.disconnect_channel(&channel_id).await;
1739 }
1740 }
1741 }
1742 Err(e) => {
1743 warn!("Failed to connect to bootstrap peer {}: {}", addr, e);
1744
1745 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1747 let manager = bootstrap_manager.read().await;
1748 if let Some(sa) = addr.socket_addr() {
1749 manager.record_failure(&sa).await;
1750 }
1751 }
1752 }
1753 }
1754 }
1755 }
1756
1757 if successful_connections == 0 {
1758 if !used_cache {
1759 warn!("Failed to connect to any bootstrap peers");
1760 }
1761 return Ok(());
1764 }
1765
1766 info!(
1767 "Successfully connected to {} bootstrap peers",
1768 successful_connections
1769 );
1770
1771 match self
1773 .dht_manager()
1774 .bootstrap_from_peers(&connected_peer_ids)
1775 .await
1776 {
1777 Ok(count) => info!("DHT peer discovery found {} peers", count),
1778 Err(e) => warn!("DHT peer discovery failed: {}", e),
1779 }
1780
1781 self.is_bootstrapped.store(true, Ordering::SeqCst);
1784 info!(
1785 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
1786 successful_connections,
1787 connected_peer_ids.len()
1788 );
1789
1790 Ok(())
1791 }
1792
1793 }
1795
1796#[async_trait::async_trait]
1798#[allow(dead_code)]
1799pub trait NetworkSender: Send + Sync {
1800 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1802
1803 fn local_peer_id(&self) -> PeerId;
1805}
1806
1807#[cfg(test)]
1811#[allow(clippy::unwrap_used, clippy::expect_used)]
1812mod diversity_tests {
1813 use super::*;
1814 use crate::security::IPDiversityConfig;
1815
1816 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
1817 let temp_dir = tempfile::TempDir::new().expect("temp dir");
1819 let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
1820 bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
1821
1822 BootstrapManager::with_node_config(bootstrap_config, config)
1823 .await
1824 .expect("bootstrap manager")
1825 }
1826
1827 #[tokio::test]
1828 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
1829 let config = NodeConfig {
1830 diversity_config: Some(IPDiversityConfig::testnet()),
1831 ..Default::default()
1832 };
1833
1834 let manager = build_bootstrap_manager_like_prod(&config).await;
1835 assert!(manager.diversity_config().is_relaxed());
1836 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
1837 }
1838}
1839
1840pub(crate) async fn register_new_channel(
1842 peers: &Arc<RwLock<HashMap<String, PeerInfo>>>,
1843 channel_id: &str,
1844 remote_addr: &MultiAddr,
1845) {
1846 let mut peers_guard = peers.write().await;
1847 let peer_info = PeerInfo {
1848 channel_id: channel_id.to_owned(),
1849 addresses: vec![remote_addr.clone()],
1850 connected_at: tokio::time::Instant::now(),
1851 last_seen: tokio::time::Instant::now(),
1852 status: ConnectionStatus::Connected,
1853 protocols: vec!["p2p-core/1.0.0".to_string()],
1854 heartbeat_count: 0,
1855 };
1856 peers_guard.insert(channel_id.to_owned(), peer_info);
1857}
1858
1859#[cfg(test)]
1860mod tests {
1861 use super::*;
1862 use std::time::Duration;
1864 use tokio::time::timeout;
1865
1866 const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
1868
1869 fn create_test_node_config() -> NodeConfig {
1875 NodeConfig {
1876 local: true,
1877 port: 0,
1878 ipv6: true,
1879 bootstrap_peers: vec![],
1880 connection_timeout: Duration::from_secs(2),
1881 max_connections: 100,
1882 dht_config: DHTConfig::default(),
1883 bootstrap_cache_config: None,
1884 diversity_config: None,
1885 max_message_size: None,
1886 node_identity: None,
1887 mode: NodeMode::default(),
1888 custom_user_agent: None,
1889 allow_loopback: true,
1890 adaptive_dht_config: AdaptiveDhtConfig::default(),
1891 }
1892 }
1893
1894 #[tokio::test]
1898 async fn test_node_config_default() {
1899 let config = NodeConfig::default();
1900
1901 assert_eq!(config.listen_addrs().len(), 2); assert_eq!(config.max_connections, 10000);
1903 assert_eq!(config.connection_timeout, Duration::from_secs(30));
1904 }
1905
1906 #[tokio::test]
1907 async fn test_dht_config_default() {
1908 let config = DHTConfig::default();
1909
1910 assert_eq!(config.k_value, 20);
1911 assert_eq!(config.alpha_value, 5);
1912 assert_eq!(config.refresh_interval, Duration::from_secs(600));
1913 }
1914
1915 #[test]
1916 fn test_connection_status_variants() {
1917 let connecting = ConnectionStatus::Connecting;
1918 let connected = ConnectionStatus::Connected;
1919 let disconnecting = ConnectionStatus::Disconnecting;
1920 let disconnected = ConnectionStatus::Disconnected;
1921 let failed = ConnectionStatus::Failed("test error".to_string());
1922
1923 assert_eq!(connecting, ConnectionStatus::Connecting);
1924 assert_eq!(connected, ConnectionStatus::Connected);
1925 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
1926 assert_eq!(disconnected, ConnectionStatus::Disconnected);
1927 assert_ne!(connecting, connected);
1928
1929 if let ConnectionStatus::Failed(msg) = failed {
1930 assert_eq!(msg, "test error");
1931 } else {
1932 panic!("Expected Failed status");
1933 }
1934 }
1935
1936 #[tokio::test]
1937 async fn test_node_creation() -> Result<()> {
1938 let config = create_test_node_config();
1939 let node = P2PNode::new(config).await?;
1940
1941 assert_eq!(node.peer_id().to_hex().len(), 64);
1943 assert!(!node.is_running());
1944 assert_eq!(node.peer_count().await, 0);
1945 assert!(node.connected_peers().await.is_empty());
1946
1947 Ok(())
1948 }
1949
1950 #[tokio::test]
1951 async fn test_node_lifecycle() -> Result<()> {
1952 let config = create_test_node_config();
1953 let node = P2PNode::new(config).await?;
1954
1955 assert!(!node.is_running());
1957
1958 node.start().await?;
1960 assert!(node.is_running());
1961
1962 let listen_addrs = node.listen_addrs().await;
1964 assert!(
1965 !listen_addrs.is_empty(),
1966 "Expected at least one listening address"
1967 );
1968
1969 node.stop().await?;
1971 assert!(!node.is_running());
1972
1973 Ok(())
1974 }
1975
1976 #[tokio::test]
1977 async fn test_peer_connection() -> Result<()> {
1978 let config1 = create_test_node_config();
1979 let config2 = create_test_node_config();
1980
1981 let node1 = P2PNode::new(config1).await?;
1982 let node2 = P2PNode::new(config2).await?;
1983
1984 node1.start().await?;
1985 node2.start().await?;
1986
1987 let node2_addr = node2
1988 .listen_addrs()
1989 .await
1990 .into_iter()
1991 .find(|a| a.is_ipv4())
1992 .ok_or_else(|| {
1993 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1994 "Node 2 did not expose an IPv4 listen address".into(),
1995 ))
1996 })?;
1997
1998 let channel_id = node1.connect_peer(&node2_addr).await?;
2001
2002 assert!(node1.is_connection_active(&channel_id).await);
2005
2006 let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
2008 assert!(peer_info.is_some());
2009 let info = peer_info.expect("Peer info should exist after connect");
2010 assert_eq!(info.channel_id, channel_id);
2011 assert_eq!(info.status, ConnectionStatus::Connected);
2012 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
2013
2014 node1.remove_channel(&channel_id).await;
2016 assert!(!node1.is_connection_active(&channel_id).await);
2017
2018 node1.stop().await?;
2019 node2.stop().await?;
2020
2021 Ok(())
2022 }
2023
2024 #[tokio::test]
2025 async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
2026 let config = create_test_node_config();
2027 let node = P2PNode::new(config).await?;
2028
2029 let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
2030 let result = node.connect_peer(&tcp_addr).await;
2031
2032 assert!(
2033 matches!(
2034 result,
2035 Err(P2PError::Network(
2036 crate::error::NetworkError::InvalidAddress(_)
2037 ))
2038 ),
2039 "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
2040 result
2041 );
2042
2043 Ok(())
2044 }
2045
2046 #[cfg_attr(target_os = "windows", ignore)]
2053 #[tokio::test]
2054 async fn test_event_subscription() -> Result<()> {
2055 let identity1 =
2059 Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
2060 let identity2 =
2061 Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
2062
2063 let mut config1 = create_test_node_config();
2064 config1.ipv6 = false;
2065 config1.node_identity = Some(identity1);
2066
2067 let node2_peer_id = *identity2.peer_id();
2068 let mut config2 = create_test_node_config();
2069 config2.ipv6 = false;
2070 config2.node_identity = Some(identity2);
2071
2072 let node1 = P2PNode::new(config1).await?;
2073 let node2 = P2PNode::new(config2).await?;
2074
2075 node1.start().await?;
2076 node2.start().await?;
2077
2078 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
2079
2080 let mut events = node2.subscribe_events();
2082
2083 let node2_addr = node2.local_addr().ok_or_else(|| {
2084 P2PError::Network(crate::error::NetworkError::ProtocolError(
2085 "No listening address".to_string().into(),
2086 ))
2087 })?;
2088
2089 let mut channel_id = None;
2091 for attempt in 0..3 {
2092 if attempt > 0 {
2093 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
2094 }
2095 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
2096 Ok(Ok(id)) => {
2097 channel_id = Some(id);
2098 break;
2099 }
2100 Ok(Err(_)) | Err(_) => continue,
2101 }
2102 }
2103 let channel_id = channel_id.expect("Failed to connect after 3 attempts");
2104
2105 let target_peer_id = node1
2107 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2108 .await?;
2109 assert_eq!(target_peer_id, node2_peer_id);
2110
2111 node1
2113 .send_message(&target_peer_id, "test-topic", b"hello".to_vec(), &[])
2114 .await?;
2115
2116 let event = timeout(Duration::from_secs(2), async {
2118 loop {
2119 match events.recv().await {
2120 Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
2121 Ok(P2PEvent::Message { .. }) => continue, Ok(_) => continue,
2123 Err(e) => return Err(e),
2124 }
2125 }
2126 })
2127 .await;
2128 assert!(event.is_ok(), "Should receive PeerConnected event");
2129 let connected_peer_id = event.expect("Timed out").expect("Channel error");
2130 assert!(
2132 connected_peer_id.0.iter().any(|&b| b != 0),
2133 "PeerConnected should carry a non-zero peer ID"
2134 );
2135
2136 node1.stop().await?;
2137 node2.stop().await?;
2138
2139 Ok(())
2140 }
2141
2142 #[cfg_attr(target_os = "windows", ignore)]
2144 #[tokio::test]
2145 async fn test_message_sending() -> Result<()> {
2146 let mut config1 = create_test_node_config();
2148 config1.ipv6 = false;
2149 let node1 = P2PNode::new(config1).await?;
2150 node1.start().await?;
2151
2152 let mut config2 = create_test_node_config();
2153 config2.ipv6 = false;
2154 let node2 = P2PNode::new(config2).await?;
2155 node2.start().await?;
2156
2157 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2159
2160 let node2_addr = node2.local_addr().ok_or_else(|| {
2162 P2PError::Network(crate::error::NetworkError::ProtocolError(
2163 "No listening address".to_string().into(),
2164 ))
2165 })?;
2166
2167 let channel_id =
2169 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
2170 Ok(res) => res?,
2171 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2172 };
2173
2174 let target_peer_id = node1
2176 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
2177 .await?;
2178 assert_eq!(target_peer_id, node2.peer_id().clone());
2179
2180 let message_data = b"Hello, peer!".to_vec();
2182 let result = match timeout(
2183 Duration::from_millis(500),
2184 node1.send_message(&target_peer_id, "test-protocol", message_data, &[]),
2185 )
2186 .await
2187 {
2188 Ok(res) => res,
2189 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
2190 };
2191 if let Err(e) = &result {
2194 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
2195 }
2196
2197 let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
2199 let result = node1
2200 .send_message(&non_existent_peer, "test-protocol", vec![], &[])
2201 .await;
2202 assert!(result.is_err(), "Sending to non-existent peer should fail");
2203
2204 node1.stop().await?;
2205 node2.stop().await?;
2206
2207 Ok(())
2208 }
2209
2210 #[tokio::test]
2211 async fn test_remote_mcp_operations() -> Result<()> {
2212 let config = create_test_node_config();
2213 let node = P2PNode::new(config).await?;
2214
2215 node.start().await?;
2217 node.stop().await?;
2218 Ok(())
2219 }
2220
2221 #[tokio::test]
2222 async fn test_health_check() -> Result<()> {
2223 let config = create_test_node_config();
2224 let node = P2PNode::new(config).await?;
2225
2226 let result = node.health_check().await;
2228 assert!(result.is_ok());
2229
2230 Ok(())
2235 }
2236
2237 #[tokio::test]
2238 async fn test_node_uptime() -> Result<()> {
2239 let config = create_test_node_config();
2240 let node = P2PNode::new(config).await?;
2241
2242 let uptime1 = node.uptime();
2243 assert!(uptime1 >= Duration::from_secs(0));
2244
2245 tokio::time::sleep(Duration::from_millis(10)).await;
2247
2248 let uptime2 = node.uptime();
2249 assert!(uptime2 > uptime1);
2250
2251 Ok(())
2252 }
2253
2254 #[tokio::test]
2255 async fn test_node_config_access() -> Result<()> {
2256 let config = create_test_node_config();
2257 let node = P2PNode::new(config).await?;
2258
2259 let node_config = node.config();
2260 assert_eq!(node_config.max_connections, 100);
2261 Ok(())
2264 }
2265
2266 #[tokio::test]
2267 async fn test_mcp_server_access() -> Result<()> {
2268 let config = create_test_node_config();
2269 let _node = P2PNode::new(config).await?;
2270
2271 Ok(())
2273 }
2274
2275 #[tokio::test]
2276 async fn test_dht_access() -> Result<()> {
2277 let config = create_test_node_config();
2278 let node = P2PNode::new(config).await?;
2279
2280 let _dht = node.dht();
2282
2283 Ok(())
2284 }
2285
2286 #[tokio::test]
2287 async fn test_node_config_builder() -> Result<()> {
2288 let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2289
2290 let config = NodeConfig::builder()
2291 .local(true)
2292 .ipv6(true)
2293 .bootstrap_peer(bootstrap)
2294 .connection_timeout(Duration::from_secs(15))
2295 .max_connections(200)
2296 .max_message_size(TEST_MAX_MESSAGE_SIZE)
2297 .build()?;
2298
2299 assert_eq!(config.listen_addrs().len(), 2); assert!(config.local);
2301 assert!(config.ipv6);
2302 assert_eq!(config.bootstrap_peers.len(), 1);
2303 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2304 assert_eq!(config.max_connections, 200);
2305 assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2306 assert!(config.allow_loopback); Ok(())
2309 }
2310
2311 #[tokio::test]
2312 async fn test_bootstrap_peers() -> Result<()> {
2313 let mut config = create_test_node_config();
2314 config.bootstrap_peers = vec![
2315 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2316 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2317 ];
2318
2319 let node = P2PNode::new(config).await?;
2320
2321 node.start().await?;
2323
2324 let _peer_count = node.peer_count().await;
2328
2329 node.stop().await?;
2330 Ok(())
2331 }
2332
2333 #[tokio::test]
2334 async fn test_peer_info_structure() {
2335 let peer_info = PeerInfo {
2336 channel_id: "test_peer".to_string(),
2337 addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2338 connected_at: Instant::now(),
2339 last_seen: Instant::now(),
2340 status: ConnectionStatus::Connected,
2341 protocols: vec!["test-protocol".to_string()],
2342 heartbeat_count: 0,
2343 };
2344
2345 assert_eq!(peer_info.channel_id, "test_peer");
2346 assert_eq!(peer_info.addresses.len(), 1);
2347 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2348 assert_eq!(peer_info.protocols.len(), 1);
2349 }
2350
2351 #[tokio::test]
2352 async fn test_serialization() -> Result<()> {
2353 let config = create_test_node_config();
2355 let serialized = serde_json::to_string(&config)?;
2356 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2357
2358 assert_eq!(config.local, deserialized.local);
2359 assert_eq!(config.port, deserialized.port);
2360 assert_eq!(config.ipv6, deserialized.ipv6);
2361 assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2362
2363 Ok(())
2364 }
2365
2366 #[tokio::test]
2367 async fn test_get_channel_id_by_address_found() -> Result<()> {
2368 let config = create_test_node_config();
2369 let node = P2PNode::new(config).await?;
2370
2371 let test_channel_id = "peer_test_123".to_string();
2373 let test_address = "192.168.1.100:9000";
2374 let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2375
2376 let peer_info = PeerInfo {
2377 channel_id: test_channel_id.clone(),
2378 addresses: vec![test_multiaddr],
2379 connected_at: Instant::now(),
2380 last_seen: Instant::now(),
2381 status: ConnectionStatus::Connected,
2382 protocols: vec!["test-protocol".to_string()],
2383 heartbeat_count: 0,
2384 };
2385
2386 node.transport
2387 .inject_peer(test_channel_id.clone(), peer_info)
2388 .await;
2389
2390 let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2392 let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2393 assert_eq!(found_channel_id, Some(test_channel_id));
2394
2395 Ok(())
2396 }
2397
2398 #[tokio::test]
2399 async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2400 let config = create_test_node_config();
2401 let node = P2PNode::new(config).await?;
2402
2403 let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2405 let result = node.get_channel_id_by_address(&unknown_addr).await;
2406 assert_eq!(result, None);
2407
2408 Ok(())
2409 }
2410
2411 #[tokio::test]
2412 async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
2413 let config = create_test_node_config();
2414 let node = P2PNode::new(config).await?;
2415
2416 let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
2418 mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
2419 psm: 0x0025,
2420 });
2421 let result = node.get_channel_id_by_address(&ble_addr).await;
2422 assert_eq!(result, None);
2423
2424 Ok(())
2425 }
2426
2427 #[tokio::test]
2428 async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
2429 let config = create_test_node_config();
2430 let node = P2PNode::new(config).await?;
2431
2432 let peer1_id = "peer_1".to_string();
2434 let peer1_addr_str = "192.168.1.101:9001";
2435 let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
2436
2437 let peer2_id = "peer_2".to_string();
2438 let peer2_addr_str = "192.168.1.102:9002";
2439 let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
2440
2441 let peer1_info = PeerInfo {
2442 channel_id: peer1_id.clone(),
2443 addresses: vec![peer1_multiaddr],
2444 connected_at: Instant::now(),
2445 last_seen: Instant::now(),
2446 status: ConnectionStatus::Connected,
2447 protocols: vec!["test-protocol".to_string()],
2448 heartbeat_count: 0,
2449 };
2450
2451 let peer2_info = PeerInfo {
2452 channel_id: peer2_id.clone(),
2453 addresses: vec![peer2_multiaddr],
2454 connected_at: Instant::now(),
2455 last_seen: Instant::now(),
2456 status: ConnectionStatus::Connected,
2457 protocols: vec!["test-protocol".to_string()],
2458 heartbeat_count: 0,
2459 };
2460
2461 node.transport
2462 .inject_peer(peer1_id.clone(), peer1_info)
2463 .await;
2464 node.transport
2465 .inject_peer(peer2_id.clone(), peer2_info)
2466 .await;
2467
2468 let found_peer1 = node
2470 .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
2471 .await;
2472 let found_peer2 = node
2473 .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
2474 .await;
2475
2476 assert_eq!(found_peer1, Some(peer1_id));
2477 assert_eq!(found_peer2, Some(peer2_id));
2478
2479 Ok(())
2480 }
2481
2482 #[tokio::test]
2483 async fn test_list_active_connections_empty() -> Result<()> {
2484 let config = create_test_node_config();
2485 let node = P2PNode::new(config).await?;
2486
2487 let connections = node.list_active_connections().await;
2489 assert!(connections.is_empty());
2490
2491 Ok(())
2492 }
2493
2494 #[tokio::test]
2495 async fn test_list_active_connections_with_peers() -> Result<()> {
2496 let config = create_test_node_config();
2497 let node = P2PNode::new(config).await?;
2498
2499 let peer1_id = "peer_1".to_string();
2501 let peer1_addrs = vec![
2502 MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
2503 MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
2504 ];
2505
2506 let peer2_id = "peer_2".to_string();
2507 let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
2508
2509 let peer1_info = PeerInfo {
2510 channel_id: peer1_id.clone(),
2511 addresses: peer1_addrs.clone(),
2512 connected_at: Instant::now(),
2513 last_seen: Instant::now(),
2514 status: ConnectionStatus::Connected,
2515 protocols: vec!["test-protocol".to_string()],
2516 heartbeat_count: 0,
2517 };
2518
2519 let peer2_info = PeerInfo {
2520 channel_id: peer2_id.clone(),
2521 addresses: peer2_addrs.clone(),
2522 connected_at: Instant::now(),
2523 last_seen: Instant::now(),
2524 status: ConnectionStatus::Connected,
2525 protocols: vec!["test-protocol".to_string()],
2526 heartbeat_count: 0,
2527 };
2528
2529 node.transport
2530 .inject_peer(peer1_id.clone(), peer1_info)
2531 .await;
2532 node.transport
2533 .inject_peer(peer2_id.clone(), peer2_info)
2534 .await;
2535
2536 node.transport
2538 .inject_active_connection(peer1_id.clone())
2539 .await;
2540 node.transport
2541 .inject_active_connection(peer2_id.clone())
2542 .await;
2543
2544 let connections = node.list_active_connections().await;
2546 assert_eq!(connections.len(), 2);
2547
2548 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
2550 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
2551
2552 assert!(peer1_conn.is_some());
2553 assert!(peer2_conn.is_some());
2554
2555 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
2557 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
2558
2559 Ok(())
2560 }
2561
2562 #[tokio::test]
2563 async fn test_remove_channel_success() -> Result<()> {
2564 let config = create_test_node_config();
2565 let node = P2PNode::new(config).await?;
2566
2567 let channel_id = "peer_to_remove".to_string();
2569 let channel_peer_id = PeerId::from_name(&channel_id);
2570 let peer_info = PeerInfo {
2571 channel_id: channel_id.clone(),
2572 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2573 connected_at: Instant::now(),
2574 last_seen: Instant::now(),
2575 status: ConnectionStatus::Connected,
2576 protocols: vec!["test-protocol".to_string()],
2577 heartbeat_count: 0,
2578 };
2579
2580 node.transport
2581 .inject_peer(channel_id.clone(), peer_info)
2582 .await;
2583 node.transport
2584 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2585 .await;
2586
2587 assert!(node.is_peer_connected(&channel_peer_id).await);
2589
2590 let removed = node.remove_channel(&channel_id).await;
2592 assert!(removed);
2593
2594 assert!(!node.is_peer_connected(&channel_peer_id).await);
2596
2597 Ok(())
2598 }
2599
2600 #[tokio::test]
2601 async fn test_remove_channel_nonexistent() -> Result<()> {
2602 let config = create_test_node_config();
2603 let node = P2PNode::new(config).await?;
2604
2605 let removed = node.remove_channel("nonexistent_peer").await;
2607 assert!(!removed);
2608
2609 Ok(())
2610 }
2611
2612 #[tokio::test]
2613 async fn test_is_peer_connected() -> Result<()> {
2614 let config = create_test_node_config();
2615 let node = P2PNode::new(config).await?;
2616
2617 let channel_id = "test_peer".to_string();
2618 let channel_peer_id = PeerId::from_name(&channel_id);
2619
2620 assert!(!node.is_peer_connected(&channel_peer_id).await);
2622
2623 let peer_info = PeerInfo {
2625 channel_id: channel_id.clone(),
2626 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2627 connected_at: Instant::now(),
2628 last_seen: Instant::now(),
2629 status: ConnectionStatus::Connected,
2630 protocols: vec!["test-protocol".to_string()],
2631 heartbeat_count: 0,
2632 };
2633
2634 node.transport
2635 .inject_peer(channel_id.clone(), peer_info)
2636 .await;
2637 node.transport
2638 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2639 .await;
2640
2641 assert!(node.is_peer_connected(&channel_peer_id).await);
2643
2644 node.remove_channel(&channel_id).await;
2646
2647 assert!(!node.is_peer_connected(&channel_peer_id).await);
2649
2650 Ok(())
2651 }
2652
2653 #[test]
2654 fn test_normalize_ipv6_wildcard() {
2655 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2656
2657 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
2658 let normalized = normalize_wildcard_to_loopback(wildcard);
2659
2660 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
2661 assert_eq!(normalized.port(), 8080);
2662 }
2663
2664 #[test]
2665 fn test_normalize_ipv4_wildcard() {
2666 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2667
2668 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
2669 let normalized = normalize_wildcard_to_loopback(wildcard);
2670
2671 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2672 assert_eq!(normalized.port(), 9000);
2673 }
2674
2675 #[test]
2676 fn test_normalize_specific_address_unchanged() {
2677 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
2678 let normalized = normalize_wildcard_to_loopback(specific);
2679
2680 assert_eq!(normalized, specific);
2681 }
2682
2683 #[test]
2684 fn test_normalize_loopback_unchanged() {
2685 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
2686
2687 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
2688 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
2689 assert_eq!(normalized_v6, loopback_v6);
2690
2691 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
2692 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
2693 assert_eq!(normalized_v4, loopback_v4);
2694 }
2695
2696 fn current_timestamp() -> u64 {
2700 std::time::SystemTime::now()
2701 .duration_since(std::time::UNIX_EPOCH)
2702 .map(|d| d.as_secs())
2703 .unwrap_or(0)
2704 }
2705
2706 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
2708 let msg = WireMessage {
2709 protocol: protocol.to_string(),
2710 data,
2711 from: PeerId::from_name(from),
2712 timestamp,
2713 user_agent: String::new(),
2714 public_key: Vec::new(),
2715 signature: Vec::new(),
2716 };
2717 postcard::to_stdvec(&msg).unwrap()
2718 }
2719
2720 #[test]
2721 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
2722 let transport_id = "abcdef0123456789";
2725 let logical_id = "spoofed-logical-id";
2726 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
2727
2728 let parsed =
2729 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
2730
2731 assert!(parsed.authenticated_node_id.is_none());
2733
2734 match parsed.event {
2735 P2PEvent::Message {
2736 topic,
2737 source,
2738 data,
2739 } => {
2740 assert!(source.is_none(), "unsigned message source must be None");
2741 assert_eq!(topic, "test/v1");
2742 assert_eq!(data, vec![1u8, 2, 3]);
2743 }
2744 other => panic!("expected P2PEvent::Message, got {:?}", other),
2745 }
2746 }
2747
2748 #[test]
2749 fn test_parse_protocol_message_rejects_invalid_bytes() {
2750 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
2752 }
2753
2754 #[test]
2755 fn test_parse_protocol_message_rejects_truncated_message() {
2756 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
2758 let truncated = &full_bytes[..full_bytes.len() / 2];
2759 assert!(parse_protocol_message(truncated, "peer-id").is_none());
2760 }
2761
2762 #[test]
2763 fn test_parse_protocol_message_empty_payload() {
2764 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
2765
2766 let parsed = parse_protocol_message(&bytes, "transport-peer")
2767 .expect("valid message with empty data should parse");
2768
2769 match parsed.event {
2770 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
2771 other => panic!("expected P2PEvent::Message, got {:?}", other),
2772 }
2773 }
2774
2775 #[test]
2776 fn test_parse_protocol_message_preserves_binary_payload() {
2777 let payload: Vec<u8> = (0..=255).collect();
2779 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
2780
2781 let parsed = parse_protocol_message(&bytes, "peer-id")
2782 .expect("valid message with full byte range should parse");
2783
2784 match parsed.event {
2785 P2PEvent::Message { data, topic, .. } => {
2786 assert_eq!(topic, "binary/v1");
2787 assert_eq!(
2788 data, payload,
2789 "payload must survive bincode round-trip exactly"
2790 );
2791 }
2792 other => panic!("expected P2PEvent::Message, got {:?}", other),
2793 }
2794 }
2795
2796 #[test]
2797 fn test_parse_signed_message_verifies_and_uses_node_id() {
2798 let identity = NodeIdentity::generate().expect("should generate identity");
2799 let protocol = "test/signed";
2800 let data: Vec<u8> = vec![10, 20, 30];
2801 let from = *identity.peer_id();
2803 let timestamp = current_timestamp();
2804 let user_agent = "test/1.0";
2805
2806 let signable =
2808 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2809 .unwrap();
2810 let sig = identity.sign(&signable).expect("signing should succeed");
2811
2812 let msg = WireMessage {
2813 protocol: protocol.to_string(),
2814 data: data.clone(),
2815 from,
2816 timestamp,
2817 user_agent: user_agent.to_string(),
2818 public_key: identity.public_key().as_bytes().to_vec(),
2819 signature: sig.as_bytes().to_vec(),
2820 };
2821 let bytes = postcard::to_stdvec(&msg).unwrap();
2822
2823 let parsed =
2824 parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
2825
2826 let expected_peer_id = *identity.peer_id();
2827 assert_eq!(
2828 parsed.authenticated_node_id.as_ref(),
2829 Some(&expected_peer_id)
2830 );
2831
2832 match parsed.event {
2833 P2PEvent::Message { source, .. } => {
2834 assert_eq!(
2835 source.as_ref(),
2836 Some(&expected_peer_id),
2837 "source should be the verified PeerId"
2838 );
2839 }
2840 other => panic!("expected P2PEvent::Message, got {:?}", other),
2841 }
2842 }
2843
2844 #[test]
2845 fn test_parse_message_with_bad_signature_is_rejected() {
2846 let identity = NodeIdentity::generate().expect("should generate identity");
2847 let protocol = "test/bad-sig";
2848 let data: Vec<u8> = vec![1, 2, 3];
2849 let from = *identity.peer_id();
2850 let timestamp = current_timestamp();
2851 let user_agent = "test/1.0";
2852
2853 let signable =
2855 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2856 .unwrap();
2857 let sig = identity.sign(&signable).expect("signing should succeed");
2858
2859 let msg = WireMessage {
2861 protocol: protocol.to_string(),
2862 data: vec![99, 99, 99],
2863 from,
2864 timestamp,
2865 user_agent: user_agent.to_string(),
2866 public_key: identity.public_key().as_bytes().to_vec(),
2867 signature: sig.as_bytes().to_vec(),
2868 };
2869 let bytes = postcard::to_stdvec(&msg).unwrap();
2870
2871 assert!(
2872 parse_protocol_message(&bytes, "transport-xyz").is_none(),
2873 "message with bad signature should be rejected"
2874 );
2875 }
2876
2877 #[test]
2878 fn test_parse_message_with_mismatched_from_is_rejected() {
2879 let identity = NodeIdentity::generate().expect("should generate identity");
2880 let protocol = "test/from-mismatch";
2881 let data: Vec<u8> = vec![1, 2, 3];
2882 let fake_from = PeerId::from_bytes([0xDE; 32]);
2884 let timestamp = current_timestamp();
2885 let user_agent = "test/1.0";
2886
2887 let signable =
2888 postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
2889 .unwrap();
2890 let sig = identity.sign(&signable).expect("signing should succeed");
2891
2892 let msg = WireMessage {
2893 protocol: protocol.to_string(),
2894 data,
2895 from: fake_from,
2896 timestamp,
2897 user_agent: user_agent.to_string(),
2898 public_key: identity.public_key().as_bytes().to_vec(),
2899 signature: sig.as_bytes().to_vec(),
2900 };
2901 let bytes = postcard::to_stdvec(&msg).unwrap();
2902
2903 assert!(
2904 parse_protocol_message(&bytes, "transport-xyz").is_none(),
2905 "message with mismatched from field should be rejected"
2906 );
2907 }
2908}