1use crate::PeerId;
20use crate::adaptive::{AdaptiveDHT, AdaptiveDhtConfig, TrustEngine, TrustEvent};
21use crate::bootstrap::{BootstrapConfig, BootstrapManager};
22use crate::dht_network_manager::{DhtNetworkConfig, DhtNetworkManager};
23use crate::error::{NetworkError, P2PError, P2pResult as Result};
24
25use crate::MultiAddr;
26use crate::identity::node_identity::{NodeIdentity, peer_id_from_public_key};
27use crate::quantum_crypto::saorsa_transport_integration::{MlDsaPublicKey, MlDsaSignature};
28use serde::{Deserialize, Serialize};
29use std::collections::HashMap;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use std::time::Duration;
33use tokio::sync::{RwLock, broadcast};
34use tokio::time::Instant;
35use tokio_util::sync::CancellationToken;
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
41pub(crate) struct WireMessage {
42 pub(crate) protocol: String,
44 pub(crate) data: Vec<u8>,
46 pub(crate) from: PeerId,
48 pub(crate) timestamp: u64,
50 #[serde(default)]
56 pub(crate) user_agent: String,
57 #[serde(default)]
59 pub(crate) public_key: Vec<u8>,
60 #[serde(default)]
62 pub(crate) signature: Vec<u8>,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
71pub enum NodeMode {
72 #[default]
74 Node,
75 Client,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81enum ListenMode {
82 Public,
84 Local,
86}
87
88pub fn user_agent_for_mode(mode: NodeMode) -> String {
93 let prefix = match mode {
94 NodeMode::Node => "node",
95 NodeMode::Client => "client",
96 };
97 format!("{prefix}/{}", env!("CARGO_PKG_VERSION"))
98}
99
100pub fn is_dht_participant(user_agent: &str) -> bool {
102 user_agent.starts_with("node/")
103}
104
105pub(crate) const MESSAGE_RECV_CHANNEL_CAPACITY: usize = 256;
107
108pub(crate) const MAX_ACTIVE_REQUESTS: usize = 256;
110
111pub(crate) const MAX_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
113
114const DEFAULT_LISTEN_PORT: u16 = 9000;
116
117const DEFAULT_MAX_CONNECTIONS: usize = 10_000;
119
120const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
122
123const DHT_MAX_DISTANCE: u8 = 160;
125
126const BOOTSTRAP_PEER_BATCH_SIZE: usize = 20;
128
129const BOOTSTRAP_IDENTITY_TIMEOUT_SECS: u64 = 10;
131
132const fn default_true() -> bool {
134 true
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct NodeConfig {
140 #[serde(default)]
146 pub local: bool,
147
148 #[serde(default)]
150 pub port: u16,
151
152 #[serde(default = "default_true")]
157 pub ipv6: bool,
158
159 pub bootstrap_peers: Vec<crate::MultiAddr>,
161
162 pub connection_timeout: Duration,
165
166 pub max_connections: usize,
168
169 pub dht_config: DHTConfig,
171
172 pub bootstrap_cache_config: Option<BootstrapConfig>,
174
175 pub diversity_config: Option<crate::security::IPDiversityConfig>,
180
181 #[serde(default)]
185 pub max_message_size: Option<usize>,
186
187 #[serde(skip)]
192 pub node_identity: Option<Arc<NodeIdentity>>,
193
194 #[serde(default)]
200 pub mode: NodeMode,
201
202 #[serde(default, skip_serializing_if = "Option::is_none")]
207 pub custom_user_agent: Option<String>,
208
209 #[serde(default)]
217 pub allow_loopback: bool,
218
219 #[serde(default)]
227 pub adaptive_dht_config: AdaptiveDhtConfig,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct DHTConfig {
233 pub k_value: usize,
235
236 pub alpha_value: usize,
238
239 pub refresh_interval: Duration,
241}
242
243#[inline]
256fn build_listen_addrs(port: u16, ipv6_enabled: bool, mode: ListenMode) -> Vec<MultiAddr> {
257 let mut addrs = Vec::with_capacity(if ipv6_enabled { 2 } else { 1 });
258
259 let (v4, v6) = match mode {
260 ListenMode::Public => (
261 std::net::Ipv4Addr::UNSPECIFIED,
262 std::net::Ipv6Addr::UNSPECIFIED,
263 ),
264 ListenMode::Local => (std::net::Ipv4Addr::LOCALHOST, std::net::Ipv6Addr::LOCALHOST),
265 };
266
267 if ipv6_enabled {
268 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
269 std::net::IpAddr::V6(v6),
270 port,
271 )));
272 }
273
274 addrs.push(MultiAddr::quic(std::net::SocketAddr::new(
275 std::net::IpAddr::V4(v4),
276 port,
277 )));
278
279 addrs
280}
281
282impl NodeConfig {
283 pub fn user_agent(&self) -> String {
288 self.custom_user_agent
289 .clone()
290 .unwrap_or_else(|| user_agent_for_mode(self.mode))
291 }
292
293 pub fn listen_addrs(&self) -> Vec<MultiAddr> {
298 let mode = if self.local {
299 ListenMode::Local
300 } else {
301 ListenMode::Public
302 };
303 build_listen_addrs(self.port, self.ipv6, mode)
304 }
305
306 pub fn new() -> Result<Self> {
312 Ok(Self::default())
313 }
314
315 pub fn builder() -> NodeConfigBuilder {
317 NodeConfigBuilder::default()
318 }
319}
320
321#[derive(Debug, Clone)]
344pub struct NodeConfigBuilder {
345 port: u16,
346 ipv6: bool,
347 local: bool,
348 bootstrap_peers: Vec<crate::MultiAddr>,
349 max_connections: Option<usize>,
350 connection_timeout: Option<Duration>,
351 dht_config: Option<DHTConfig>,
352 max_message_size: Option<usize>,
353 mode: NodeMode,
354 custom_user_agent: Option<String>,
355 allow_loopback: Option<bool>,
356 adaptive_dht_config: Option<AdaptiveDhtConfig>,
357}
358
359impl Default for NodeConfigBuilder {
360 fn default() -> Self {
361 Self {
362 port: 0,
363 ipv6: true,
364 local: false,
365 bootstrap_peers: Vec::new(),
366 max_connections: None,
367 connection_timeout: None,
368 dht_config: None,
369 max_message_size: None,
370 mode: NodeMode::default(),
371 custom_user_agent: None,
372 allow_loopback: None,
373 adaptive_dht_config: None,
374 }
375 }
376}
377
378impl NodeConfigBuilder {
379 pub fn port(mut self, port: u16) -> Self {
381 self.port = port;
382 self
383 }
384
385 pub fn ipv6(mut self, enabled: bool) -> Self {
387 self.ipv6 = enabled;
388 self
389 }
390
391 pub fn local(mut self, local: bool) -> Self {
398 self.local = local;
399 self
400 }
401
402 pub fn bootstrap_peer(mut self, addr: crate::MultiAddr) -> Self {
404 self.bootstrap_peers.push(addr);
405 self
406 }
407
408 pub fn max_connections(mut self, max: usize) -> Self {
410 self.max_connections = Some(max);
411 self
412 }
413
414 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
416 self.connection_timeout = Some(timeout);
417 self
418 }
419
420 pub fn dht_config(mut self, config: DHTConfig) -> Self {
422 self.dht_config = Some(config);
423 self
424 }
425
426 pub fn max_message_size(mut self, max_message_size: usize) -> Self {
430 self.max_message_size = Some(max_message_size);
431 self
432 }
433
434 pub fn mode(mut self, mode: NodeMode) -> Self {
436 self.mode = mode;
437 self
438 }
439
440 pub fn custom_user_agent(mut self, user_agent: impl Into<String>) -> Self {
442 self.custom_user_agent = Some(user_agent.into());
443 self
444 }
445
446 pub fn allow_loopback(mut self, allow: bool) -> Self {
450 self.allow_loopback = Some(allow);
451 self
452 }
453
454 pub fn trust_enforcement(mut self, enabled: bool) -> Self {
466 let threshold = if enabled {
467 AdaptiveDhtConfig::default().block_threshold
468 } else {
469 0.0
470 };
471 self.adaptive_dht_config = Some(AdaptiveDhtConfig {
472 block_threshold: threshold,
473 });
474 self
475 }
476
477 pub fn adaptive_dht_config(mut self, config: AdaptiveDhtConfig) -> Self {
481 self.adaptive_dht_config = Some(config);
482 self
483 }
484
485 pub fn build(self) -> Result<NodeConfig> {
491 let allow_loopback = self.allow_loopback.unwrap_or(self.local);
493
494 Ok(NodeConfig {
495 local: self.local,
496 port: self.port,
497 ipv6: self.ipv6,
498 bootstrap_peers: self.bootstrap_peers,
499 connection_timeout: self
500 .connection_timeout
501 .unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)),
502 max_connections: self.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS),
503 dht_config: self.dht_config.unwrap_or_default(),
504 bootstrap_cache_config: None,
505 diversity_config: None,
506 max_message_size: self.max_message_size,
507 node_identity: None,
508 mode: self.mode,
509 custom_user_agent: self.custom_user_agent,
510 allow_loopback,
511 adaptive_dht_config: self.adaptive_dht_config.unwrap_or_default(),
512 })
513 }
514}
515
516impl Default for NodeConfig {
517 fn default() -> Self {
518 Self {
519 local: false,
520 port: DEFAULT_LISTEN_PORT,
521 ipv6: true,
522 bootstrap_peers: Vec::new(),
523 connection_timeout: Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS),
524 max_connections: DEFAULT_MAX_CONNECTIONS,
525 dht_config: DHTConfig::default(),
526 bootstrap_cache_config: None,
527 diversity_config: None,
528 max_message_size: None,
529 node_identity: None,
530 mode: NodeMode::default(),
531 custom_user_agent: None,
532 allow_loopback: false,
533 adaptive_dht_config: AdaptiveDhtConfig::default(),
534 }
535 }
536}
537
538impl DHTConfig {
539 const DEFAULT_K_VALUE: usize = 20;
540 const DEFAULT_ALPHA_VALUE: usize = 5;
541 const DEFAULT_REFRESH_INTERVAL_SECS: u64 = 600;
542}
543
544impl Default for DHTConfig {
545 fn default() -> Self {
546 Self {
547 k_value: Self::DEFAULT_K_VALUE,
548 alpha_value: Self::DEFAULT_ALPHA_VALUE,
549 refresh_interval: Duration::from_secs(Self::DEFAULT_REFRESH_INTERVAL_SECS),
550 }
551 }
552}
553
554#[derive(Debug, Clone)]
556pub struct PeerInfo {
557 #[allow(dead_code)]
559 pub(crate) channel_id: String,
560
561 pub addresses: Vec<MultiAddr>,
563
564 pub connected_at: Instant,
566
567 pub last_seen: Instant,
569
570 pub status: ConnectionStatus,
572
573 pub protocols: Vec<String>,
575
576 pub heartbeat_count: u64,
578}
579
580#[derive(Debug, Clone, PartialEq)]
582pub enum ConnectionStatus {
583 Connecting,
585 Connected,
587 Disconnecting,
589 Disconnected,
591 Failed(String),
593}
594
595#[derive(Debug, Clone)]
600pub enum P2PEvent {
601 Message {
603 topic: String,
605 source: Option<PeerId>,
608 data: Vec<u8>,
610 },
611 PeerConnected(PeerId, String),
614 PeerDisconnected(PeerId),
616}
617
618#[derive(Debug, Clone)]
623pub struct PeerResponse {
624 pub peer_id: PeerId,
626 pub data: Vec<u8>,
628 pub latency: Duration,
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
637pub(crate) struct RequestResponseEnvelope {
638 pub(crate) message_id: String,
640 pub(crate) is_response: bool,
642 pub(crate) payload: Vec<u8>,
644}
645
646pub(crate) struct PendingRequest {
648 pub(crate) response_tx: tokio::sync::oneshot::Sender<Vec<u8>>,
650 pub(crate) expected_peer: PeerId,
652}
653
654pub struct P2PNode {
665 config: NodeConfig,
667
668 peer_id: PeerId,
670
671 transport: Arc<crate::transport_handle::TransportHandle>,
673
674 start_time: Instant,
676
677 shutdown: CancellationToken,
679
680 adaptive_dht: AdaptiveDHT,
683
684 bootstrap_manager: Option<Arc<RwLock<BootstrapManager>>>,
686
687 is_bootstrapped: Arc<AtomicBool>,
689
690 is_started: Arc<AtomicBool>,
692}
693
694pub(crate) fn normalize_wildcard_to_loopback(addr: std::net::SocketAddr) -> std::net::SocketAddr {
710 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711
712 if addr.ip().is_unspecified() {
713 let loopback_ip = match addr {
715 std::net::SocketAddr::V6(_) => IpAddr::V6(Ipv6Addr::LOCALHOST), std::net::SocketAddr::V4(_) => IpAddr::V4(Ipv4Addr::LOCALHOST), };
718 std::net::SocketAddr::new(loopback_ip, addr.port())
719 } else {
720 addr
722 }
723}
724
725impl P2PNode {
726 pub async fn new(config: NodeConfig) -> Result<Self> {
728 let node_identity = match config.node_identity.clone() {
730 Some(identity) => identity,
731 None => Arc::new(NodeIdentity::generate()?),
732 };
733
734 let peer_id = *node_identity.peer_id();
736
737 let bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
739 let bootstrap_manager =
740 match BootstrapManager::with_node_config(bootstrap_config, &config).await {
741 Ok(manager) => Some(Arc::new(RwLock::new(manager))),
742 Err(_e) => {
743 warn!("Failed to initialize bootstrap manager: {_e}, continuing without cache");
744 None
745 }
746 };
747
748 let transport_config = crate::transport_handle::TransportConfig::from_node_config(
750 &config,
751 crate::DEFAULT_EVENT_CHANNEL_CAPACITY,
752 node_identity.clone(),
753 );
754 let transport =
755 Arc::new(crate::transport_handle::TransportHandle::new(transport_config).await?);
756
757 let manager_dht_config = crate::dht::DHTConfig {
759 bucket_size: config.dht_config.k_value,
760 alpha: config.dht_config.alpha_value,
761 bucket_refresh_interval: config.dht_config.refresh_interval,
762 max_distance: DHT_MAX_DISTANCE,
763 };
764 let dht_manager_config = DhtNetworkConfig {
765 peer_id,
766 dht_config: manager_dht_config,
767 node_config: config.clone(),
768 request_timeout: config.connection_timeout,
769 max_concurrent_operations: MAX_ACTIVE_REQUESTS,
770 enable_security: true,
771 block_threshold: 0.0, };
773 let adaptive_dht = AdaptiveDHT::new(
774 transport.clone(),
775 dht_manager_config,
776 config.adaptive_dht_config.clone(),
777 )
778 .await?;
779
780 let node = Self {
781 config,
782 peer_id,
783 transport,
784 start_time: Instant::now(),
785 shutdown: CancellationToken::new(),
786 adaptive_dht,
787 bootstrap_manager,
788 is_bootstrapped: Arc::new(AtomicBool::new(false)),
789 is_started: Arc::new(AtomicBool::new(false)),
790 };
791 info!(
792 "Created P2P node with peer ID: {} (call start() to begin networking)",
793 node.peer_id
794 );
795
796 Ok(node)
797 }
798
799 pub fn peer_id(&self) -> &PeerId {
801 &self.peer_id
802 }
803
804 pub fn transport(&self) -> &Arc<crate::transport_handle::TransportHandle> {
806 &self.transport
807 }
808
809 pub fn local_addr(&self) -> Option<MultiAddr> {
810 self.transport.local_addr()
811 }
812
813 pub fn is_bootstrapped(&self) -> bool {
818 self.is_bootstrapped.load(Ordering::SeqCst)
819 }
820
821 pub async fn re_bootstrap(&self) -> Result<()> {
826 self.is_bootstrapped.store(false, Ordering::SeqCst);
827 self.connect_bootstrap_peers().await
828 }
829
830 pub fn trust_engine(&self) -> Arc<TrustEngine> {
836 self.adaptive_dht.trust_engine().clone()
837 }
838
839 pub async fn report_trust_event(&self, peer_id: &PeerId, event: TrustEvent) {
854 self.adaptive_dht.report_trust_event(peer_id, event).await;
855 }
856
857 pub fn peer_trust(&self, peer_id: &PeerId) -> f64 {
861 self.adaptive_dht.peer_trust(peer_id)
862 }
863
864 pub fn adaptive_dht(&self) -> &AdaptiveDHT {
866 &self.adaptive_dht
867 }
868
869 pub async fn send_request(
902 &self,
903 peer_id: &PeerId,
904 protocol: &str,
905 data: Vec<u8>,
906 timeout: Duration,
907 ) -> Result<PeerResponse> {
908 if self.adaptive_dht.peer_trust(peer_id) < self.adaptive_dht.config().block_threshold {
910 return Err(P2PError::Network(crate::error::NetworkError::PeerBlocked(
911 *peer_id,
912 )));
913 }
914
915 match self
916 .transport
917 .send_request(peer_id, protocol, data, timeout)
918 .await
919 {
920 Ok(resp) => {
921 self.report_trust_event(peer_id, TrustEvent::SuccessfulResponse)
922 .await;
923 Ok(resp)
924 }
925 Err(e) => {
926 let event = if matches!(&e, P2PError::Timeout(_)) {
927 TrustEvent::ConnectionTimeout
928 } else {
929 TrustEvent::ConnectionFailed
930 };
931 self.report_trust_event(peer_id, event).await;
932 Err(e)
933 }
934 }
935 }
936
937 pub async fn send_response(
938 &self,
939 peer_id: &PeerId,
940 protocol: &str,
941 message_id: &str,
942 data: Vec<u8>,
943 ) -> Result<()> {
944 self.transport
945 .send_response(peer_id, protocol, message_id, data)
946 .await
947 }
948
949 pub fn parse_request_envelope(data: &[u8]) -> Option<(String, bool, Vec<u8>)> {
950 crate::transport_handle::TransportHandle::parse_request_envelope(data)
951 }
952
953 pub async fn subscribe(&self, topic: &str) -> Result<()> {
954 self.transport.subscribe(topic).await
955 }
956
957 pub async fn publish(&self, topic: &str, data: &[u8]) -> Result<()> {
958 self.transport.publish(topic, data).await
959 }
960
961 pub fn config(&self) -> &NodeConfig {
963 &self.config
964 }
965
966 pub async fn start(&self) -> Result<()> {
968 info!("Starting P2P node...");
969
970 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
972 let mut manager = bootstrap_manager.write().await;
973 manager
974 .start_maintenance()
975 .map_err(|e| protocol_error(format!("Failed to start bootstrap manager: {e}")))?;
976 info!("Bootstrap cache manager started");
977 }
978
979 self.transport.start_network_listeners().await?;
981
982 self.adaptive_dht.start().await?;
984
985 let _listen_addrs = self.transport.listen_addrs().await;
987 info!("P2P node started on addresses: {:?}", _listen_addrs);
988
989 self.connect_bootstrap_peers().await?;
995
996 self.is_started
997 .store(true, std::sync::atomic::Ordering::Release);
998
999 Ok(())
1000 }
1001
1002 pub async fn run(&self) -> Result<()> {
1007 if !self.is_running() {
1008 self.start().await?;
1009 }
1010
1011 info!("P2P node running...");
1012
1013 self.shutdown.cancelled().await;
1016
1017 info!("P2P node stopped");
1018 Ok(())
1019 }
1020
1021 pub async fn stop(&self) -> Result<()> {
1023 info!("Stopping P2P node...");
1024
1025 self.shutdown.cancel();
1027
1028 self.adaptive_dht.stop().await?;
1030
1031 self.transport.stop().await?;
1033
1034 self.is_started
1035 .store(false, std::sync::atomic::Ordering::Release);
1036
1037 info!("P2P node stopped");
1038 Ok(())
1039 }
1040
1041 pub async fn shutdown(&self) -> Result<()> {
1043 self.stop().await
1044 }
1045
1046 pub fn is_running(&self) -> bool {
1048 self.is_started.load(std::sync::atomic::Ordering::Acquire) && !self.shutdown.is_cancelled()
1049 }
1050
1051 pub async fn listen_addrs(&self) -> Vec<MultiAddr> {
1053 self.transport.listen_addrs().await
1054 }
1055
1056 pub async fn connected_peers(&self) -> Vec<PeerId> {
1058 self.transport.connected_peers().await
1059 }
1060
1061 pub async fn peer_count(&self) -> usize {
1063 self.transport.peer_count().await
1064 }
1065
1066 pub async fn peer_info(&self, peer_id: &PeerId) -> Option<PeerInfo> {
1068 self.transport.peer_info(peer_id).await
1069 }
1070
1071 #[allow(dead_code)]
1073 pub(crate) async fn get_channel_id_by_address(&self, addr: &MultiAddr) -> Option<String> {
1074 self.transport.get_channel_id_by_address(addr).await
1075 }
1076
1077 #[allow(dead_code)]
1079 pub(crate) async fn list_active_connections(&self) -> Vec<(String, Vec<MultiAddr>)> {
1080 self.transport.list_active_connections().await
1081 }
1082
1083 #[allow(dead_code)]
1085 pub(crate) async fn remove_channel(&self, channel_id: &str) -> bool {
1086 self.transport.remove_channel(channel_id).await
1087 }
1088
1089 pub(crate) async fn disconnect_channel(&self, channel_id: &str) {
1094 self.transport.disconnect_channel(channel_id).await;
1095 }
1096
1097 pub async fn is_peer_connected(&self, peer_id: &PeerId) -> bool {
1099 self.transport.is_peer_connected(peer_id).await
1100 }
1101
1102 pub async fn connect_peer(&self, address: &MultiAddr) -> Result<String> {
1109 self.transport.connect_peer(address).await
1110 }
1111
1112 pub async fn wait_for_peer_identity(
1119 &self,
1120 channel_id: &str,
1121 timeout: Duration,
1122 ) -> Result<PeerId> {
1123 self.transport
1124 .wait_for_peer_identity(channel_id, timeout)
1125 .await
1126 }
1127
1128 pub async fn disconnect_peer(&self, peer_id: &PeerId) -> Result<()> {
1130 self.transport.disconnect_peer(peer_id).await
1131 }
1132
1133 #[allow(dead_code)]
1135 pub(crate) async fn is_connection_active(&self, channel_id: &str) -> bool {
1136 self.transport.is_connection_active(channel_id).await
1137 }
1138
1139 pub async fn send_message(
1141 &self,
1142 peer_id: &PeerId,
1143 protocol: &str,
1144 data: Vec<u8>,
1145 ) -> Result<()> {
1146 self.transport.send_message(peer_id, protocol, data).await
1147 }
1148}
1149
1150const MAX_MESSAGE_AGE_SECS: u64 = 300;
1165const MAX_FUTURE_SECS: u64 = 30;
1167
1168fn protocol_error(msg: impl std::fmt::Display) -> P2PError {
1170 P2PError::Network(NetworkError::ProtocolError(msg.to_string().into()))
1171}
1172
1173pub(crate) fn broadcast_event(tx: &broadcast::Sender<P2PEvent>, event: P2PEvent) {
1175 if let Err(_e) = tx.send(event) {
1176 trace!("Event broadcast has no receivers: {_e}");
1177 }
1178}
1179
1180pub(crate) struct ParsedMessage {
1182 pub(crate) event: P2PEvent,
1184 pub(crate) authenticated_node_id: Option<PeerId>,
1186 pub(crate) user_agent: String,
1188}
1189
1190pub(crate) fn parse_protocol_message(bytes: &[u8], _source: &str) -> Option<ParsedMessage> {
1191 let message: WireMessage = postcard::from_bytes(bytes).ok()?;
1192
1193 let now = std::time::SystemTime::now()
1195 .duration_since(std::time::UNIX_EPOCH)
1196 .map(|d| d.as_secs())
1197 .unwrap_or(0);
1198
1199 if message.timestamp < now.saturating_sub(MAX_MESSAGE_AGE_SECS) {
1201 warn!(
1202 "Rejecting stale message from {} (timestamp {} is {} seconds old)",
1203 _source,
1204 message.timestamp,
1205 now.saturating_sub(message.timestamp)
1206 );
1207 return None;
1208 }
1209
1210 if message.timestamp > now + MAX_FUTURE_SECS {
1212 warn!(
1213 "Rejecting future-dated message from {} (timestamp {} is {} seconds ahead)",
1214 _source,
1215 message.timestamp,
1216 message.timestamp.saturating_sub(now)
1217 );
1218 return None;
1219 }
1220
1221 let authenticated_node_id = if !message.signature.is_empty() {
1223 match verify_message_signature(&message) {
1224 Ok(peer_id) => {
1225 debug!(
1226 "Message from {} authenticated as app-level NodeId {}",
1227 _source, peer_id
1228 );
1229 Some(peer_id)
1230 }
1231 Err(_e) => {
1232 warn!(
1233 "Rejecting message from {}: signature verification failed: {}",
1234 _source, _e
1235 );
1236 return None;
1237 }
1238 }
1239 } else {
1240 None
1241 };
1242
1243 debug!(
1244 "Parsed P2PEvent::Message - topic: {}, source: {:?} (transport: {}, logical: {}), payload_len: {}",
1245 message.protocol,
1246 authenticated_node_id,
1247 _source,
1248 message.from,
1249 message.data.len()
1250 );
1251
1252 Some(ParsedMessage {
1253 event: P2PEvent::Message {
1254 topic: message.protocol,
1255 source: authenticated_node_id,
1256 data: message.data,
1257 },
1258 authenticated_node_id,
1259 user_agent: message.user_agent,
1260 })
1261}
1262
1263fn verify_message_signature(message: &WireMessage) -> std::result::Result<PeerId, String> {
1270 let pubkey = MlDsaPublicKey::from_bytes(&message.public_key)
1271 .map_err(|e| format!("invalid public key: {e:?}"))?;
1272
1273 let peer_id = peer_id_from_public_key(&pubkey);
1274
1275 if message.from != peer_id {
1277 return Err(format!(
1278 "from field mismatch: message claims '{}' but public key derives '{}'",
1279 message.from, peer_id
1280 ));
1281 }
1282
1283 let signable = postcard::to_stdvec(&(
1284 &message.protocol,
1285 &message.data as &[u8],
1286 &message.from,
1287 message.timestamp,
1288 &message.user_agent,
1289 ))
1290 .map_err(|e| format!("failed to serialize signable bytes: {e}"))?;
1291
1292 let sig = MlDsaSignature::from_bytes(&message.signature)
1293 .map_err(|e| format!("invalid signature: {e:?}"))?;
1294
1295 let valid = crate::quantum_crypto::ml_dsa_verify(&pubkey, &signable, &sig)
1296 .map_err(|e| format!("verification error: {e}"))?;
1297
1298 if valid {
1299 Ok(peer_id)
1300 } else {
1301 Err("signature is invalid".to_string())
1302 }
1303}
1304
1305impl P2PNode {
1306 pub fn subscribe_events(&self) -> broadcast::Receiver<P2PEvent> {
1308 self.transport.subscribe_events()
1309 }
1310
1311 pub fn events(&self) -> broadcast::Receiver<P2PEvent> {
1313 self.subscribe_events()
1314 }
1315
1316 pub fn uptime(&self) -> Duration {
1318 self.start_time.elapsed()
1319 }
1320
1321 pub async fn health_check(&self) -> Result<()> {
1334 let peer_count = self.peer_count().await;
1335 if peer_count > self.config.max_connections {
1336 Err(protocol_error(format!(
1337 "Too many connections: {peer_count}"
1338 )))
1339 } else {
1340 Ok(())
1341 }
1342 }
1343
1344 pub fn dht_manager(&self) -> &Arc<DhtNetworkManager> {
1346 self.adaptive_dht.dht_manager()
1347 }
1348
1349 pub fn dht(&self) -> &Arc<DhtNetworkManager> {
1351 self.dht_manager()
1352 }
1353
1354 pub async fn add_discovered_peer(
1356 &self,
1357 _peer_id: PeerId,
1358 addresses: Vec<MultiAddr>,
1359 ) -> Result<()> {
1360 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1361 let manager = bootstrap_manager.read().await;
1362 let socket_addresses: Vec<std::net::SocketAddr> = addresses
1363 .iter()
1364 .filter_map(|addr| addr.socket_addr())
1365 .collect();
1366 if let Some(&primary) = socket_addresses.first() {
1367 manager
1368 .add_peer(&primary, socket_addresses)
1369 .await
1370 .map_err(|e| {
1371 protocol_error(format!("Failed to add peer to bootstrap cache: {e}"))
1372 })?;
1373 }
1374 }
1375 Ok(())
1376 }
1377
1378 pub async fn update_peer_metrics(
1380 &self,
1381 addr: &MultiAddr,
1382 success: bool,
1383 latency_ms: Option<u64>,
1384 _error: Option<String>,
1385 ) -> Result<()> {
1386 if let Some(ref bootstrap_manager) = self.bootstrap_manager
1387 && let Some(sa) = addr.socket_addr()
1388 {
1389 let manager = bootstrap_manager.read().await;
1390 if success {
1391 let rtt_ms = latency_ms.unwrap_or(0) as u32;
1392 manager.record_success(&sa, rtt_ms).await;
1393 } else {
1394 manager.record_failure(&sa).await;
1395 }
1396 }
1397 Ok(())
1398 }
1399
1400 pub async fn get_bootstrap_cache_stats(
1402 &self,
1403 ) -> Result<Option<crate::bootstrap::BootstrapStats>> {
1404 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1405 let manager = bootstrap_manager.read().await;
1406 Ok(Some(manager.stats().await))
1407 } else {
1408 Ok(None)
1409 }
1410 }
1411
1412 pub async fn cached_peer_count(&self) -> usize {
1414 if let Some(ref _bootstrap_manager) = self.bootstrap_manager
1415 && let Ok(Some(stats)) = self.get_bootstrap_cache_stats().await
1416 {
1417 return stats.total_peers;
1418 }
1419 0
1420 }
1421
1422 async fn connect_bootstrap_peers(&self) -> Result<()> {
1424 let mut bootstrap_addr_sets: Vec<Vec<MultiAddr>> = Vec::new();
1426 let mut used_cache = false;
1427 let mut seen_addresses = std::collections::HashSet::new();
1428
1429 if !self.config.bootstrap_peers.is_empty() {
1431 info!(
1432 "Using {} configured bootstrap peers (priority)",
1433 self.config.bootstrap_peers.len()
1434 );
1435 for multiaddr in &self.config.bootstrap_peers {
1436 let Some(socket_addr) = multiaddr.dialable_socket_addr() else {
1437 warn!("Skipping non-QUIC bootstrap peer: {}", multiaddr);
1438 continue;
1439 };
1440 seen_addresses.insert(socket_addr);
1441 bootstrap_addr_sets.push(vec![multiaddr.clone()]);
1442 }
1443 }
1444
1445 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1447 let manager = bootstrap_manager.read().await;
1448 let cached_peers = manager.select_peers(BOOTSTRAP_PEER_BATCH_SIZE).await;
1449 if !cached_peers.is_empty() {
1450 let mut added_from_cache = 0;
1451 for cached in cached_peers {
1452 let mut addrs = vec![cached.primary_address];
1453 addrs.extend(cached.addresses);
1454 let new_addresses: Vec<MultiAddr> = addrs
1456 .into_iter()
1457 .filter(|a| !seen_addresses.contains(a))
1458 .map(MultiAddr::quic)
1459 .collect();
1460
1461 if !new_addresses.is_empty() {
1462 for addr in &new_addresses {
1463 if let Some(sa) = addr.socket_addr() {
1464 seen_addresses.insert(sa);
1465 }
1466 }
1467 bootstrap_addr_sets.push(new_addresses);
1468 added_from_cache += 1;
1469 }
1470 }
1471 if added_from_cache > 0 {
1472 info!(
1473 "Added {} cached bootstrap peers (supplementing CLI peers)",
1474 added_from_cache
1475 );
1476 used_cache = true;
1477 }
1478 }
1479 }
1480
1481 if bootstrap_addr_sets.is_empty() {
1482 info!("No bootstrap peers configured and no cached peers available");
1483 return Ok(());
1484 }
1485
1486 let identity_timeout = Duration::from_secs(BOOTSTRAP_IDENTITY_TIMEOUT_SECS);
1489 let mut successful_connections = 0;
1490 let mut connected_peer_ids: Vec<PeerId> = Vec::new();
1491
1492 for addrs in &bootstrap_addr_sets {
1493 for addr in addrs {
1494 match self.connect_peer(addr).await {
1495 Ok(channel_id) => {
1496 match self
1499 .transport
1500 .wait_for_peer_identity(&channel_id, identity_timeout)
1501 .await
1502 {
1503 Ok(real_peer_id) => {
1504 successful_connections += 1;
1505 connected_peer_ids.push(real_peer_id);
1506
1507 if let Some(ref bootstrap_manager) = self.bootstrap_manager {
1509 let manager = bootstrap_manager.read().await;
1510 if let Some(sa) = addr.socket_addr() {
1511 manager.record_success(&sa, 100).await;
1512 }
1513 }
1514 break; }
1516 Err(_e) => {
1517 warn!(
1518 "Timeout waiting for identity from bootstrap peer {}: {}, \
1519 closing channel {}",
1520 addr, _e, channel_id
1521 );
1522 self.disconnect_channel(&channel_id).await;
1523 }
1524 }
1525 }
1526 Err(_e) => {
1527 warn!("Failed to connect to bootstrap peer {}: {}", addr, _e);
1528
1529 if used_cache && let Some(ref bootstrap_manager) = self.bootstrap_manager {
1531 let manager = bootstrap_manager.read().await;
1532 if let Some(sa) = addr.socket_addr() {
1533 manager.record_failure(&sa).await;
1534 }
1535 }
1536 }
1537 }
1538 }
1539 }
1540
1541 if successful_connections == 0 {
1542 if !used_cache {
1543 warn!("Failed to connect to any bootstrap peers");
1544 }
1545 return Ok(());
1548 }
1549
1550 info!(
1551 "Successfully connected to {} bootstrap peers",
1552 successful_connections
1553 );
1554
1555 match self
1557 .dht_manager()
1558 .bootstrap_from_peers(&connected_peer_ids)
1559 .await
1560 {
1561 Ok(_count) => info!("DHT peer discovery found {} peers", _count),
1562 Err(_e) => warn!("DHT peer discovery failed: {}", _e),
1563 }
1564
1565 self.is_bootstrapped.store(true, Ordering::SeqCst);
1568 info!(
1569 "Bootstrap complete: connected to {} peers, initiated {} discovery requests",
1570 successful_connections,
1571 connected_peer_ids.len()
1572 );
1573
1574 Ok(())
1575 }
1576
1577 }
1579
1580#[async_trait::async_trait]
1582#[allow(dead_code)]
1583pub trait NetworkSender: Send + Sync {
1584 async fn send_message(&self, peer_id: &PeerId, protocol: &str, data: Vec<u8>) -> Result<()>;
1586
1587 fn local_peer_id(&self) -> PeerId;
1589}
1590
1591#[cfg(test)]
1595#[allow(clippy::unwrap_used, clippy::expect_used)]
1596mod diversity_tests {
1597 use super::*;
1598 use crate::security::IPDiversityConfig;
1599
1600 async fn build_bootstrap_manager_like_prod(config: &NodeConfig) -> BootstrapManager {
1601 let temp_dir = tempfile::TempDir::new().expect("temp dir");
1603 let mut bootstrap_config = config.bootstrap_cache_config.clone().unwrap_or_default();
1604 bootstrap_config.cache_dir = temp_dir.path().to_path_buf();
1605
1606 BootstrapManager::with_node_config(bootstrap_config, config)
1607 .await
1608 .expect("bootstrap manager")
1609 }
1610
1611 #[tokio::test]
1612 async fn test_nodeconfig_diversity_config_used_for_bootstrap() {
1613 let config = NodeConfig {
1614 diversity_config: Some(IPDiversityConfig::testnet()),
1615 ..Default::default()
1616 };
1617
1618 let manager = build_bootstrap_manager_like_prod(&config).await;
1619 assert!(manager.diversity_config().is_relaxed());
1620 assert_eq!(manager.diversity_config().max_nodes_per_asn, 5000);
1621 }
1622}
1623
1624pub(crate) async fn register_new_channel(
1626 peers: &Arc<RwLock<HashMap<String, PeerInfo>>>,
1627 channel_id: &str,
1628 remote_addr: &MultiAddr,
1629) {
1630 let mut peers_guard = peers.write().await;
1631 let peer_info = PeerInfo {
1632 channel_id: channel_id.to_owned(),
1633 addresses: vec![remote_addr.clone()],
1634 connected_at: tokio::time::Instant::now(),
1635 last_seen: tokio::time::Instant::now(),
1636 status: ConnectionStatus::Connected,
1637 protocols: vec!["p2p-core/1.0.0".to_string()],
1638 heartbeat_count: 0,
1639 };
1640 peers_guard.insert(channel_id.to_owned(), peer_info);
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645 use super::*;
1646 use std::time::Duration;
1648 use tokio::time::timeout;
1649
1650 const TEST_MAX_MESSAGE_SIZE: usize = 2 * 1024 * 1024;
1652
1653 fn create_test_node_config() -> NodeConfig {
1659 NodeConfig {
1660 local: true,
1661 port: 0,
1662 ipv6: true,
1663 bootstrap_peers: vec![],
1664 connection_timeout: Duration::from_secs(2),
1665 max_connections: 100,
1666 dht_config: DHTConfig::default(),
1667 bootstrap_cache_config: None,
1668 diversity_config: None,
1669 max_message_size: None,
1670 node_identity: None,
1671 mode: NodeMode::default(),
1672 custom_user_agent: None,
1673 allow_loopback: true,
1674 adaptive_dht_config: AdaptiveDhtConfig::default(),
1675 }
1676 }
1677
1678 #[tokio::test]
1682 async fn test_node_config_default() {
1683 let config = NodeConfig::default();
1684
1685 assert_eq!(config.listen_addrs().len(), 2); assert_eq!(config.max_connections, 10000);
1687 assert_eq!(config.connection_timeout, Duration::from_secs(30));
1688 }
1689
1690 #[tokio::test]
1691 async fn test_dht_config_default() {
1692 let config = DHTConfig::default();
1693
1694 assert_eq!(config.k_value, 20);
1695 assert_eq!(config.alpha_value, 5);
1696 assert_eq!(config.refresh_interval, Duration::from_secs(600));
1697 }
1698
1699 #[test]
1700 fn test_connection_status_variants() {
1701 let connecting = ConnectionStatus::Connecting;
1702 let connected = ConnectionStatus::Connected;
1703 let disconnecting = ConnectionStatus::Disconnecting;
1704 let disconnected = ConnectionStatus::Disconnected;
1705 let failed = ConnectionStatus::Failed("test error".to_string());
1706
1707 assert_eq!(connecting, ConnectionStatus::Connecting);
1708 assert_eq!(connected, ConnectionStatus::Connected);
1709 assert_eq!(disconnecting, ConnectionStatus::Disconnecting);
1710 assert_eq!(disconnected, ConnectionStatus::Disconnected);
1711 assert_ne!(connecting, connected);
1712
1713 if let ConnectionStatus::Failed(msg) = failed {
1714 assert_eq!(msg, "test error");
1715 } else {
1716 panic!("Expected Failed status");
1717 }
1718 }
1719
1720 #[tokio::test]
1721 async fn test_node_creation() -> Result<()> {
1722 let config = create_test_node_config();
1723 let node = P2PNode::new(config).await?;
1724
1725 assert_eq!(node.peer_id().to_hex().len(), 64);
1727 assert!(!node.is_running());
1728 assert_eq!(node.peer_count().await, 0);
1729 assert!(node.connected_peers().await.is_empty());
1730
1731 Ok(())
1732 }
1733
1734 #[tokio::test]
1735 async fn test_node_lifecycle() -> Result<()> {
1736 let config = create_test_node_config();
1737 let node = P2PNode::new(config).await?;
1738
1739 assert!(!node.is_running());
1741
1742 node.start().await?;
1744 assert!(node.is_running());
1745
1746 let listen_addrs = node.listen_addrs().await;
1748 assert!(
1749 !listen_addrs.is_empty(),
1750 "Expected at least one listening address"
1751 );
1752
1753 node.stop().await?;
1755 assert!(!node.is_running());
1756
1757 Ok(())
1758 }
1759
1760 #[tokio::test]
1761 async fn test_peer_connection() -> Result<()> {
1762 let config1 = create_test_node_config();
1763 let config2 = create_test_node_config();
1764
1765 let node1 = P2PNode::new(config1).await?;
1766 let node2 = P2PNode::new(config2).await?;
1767
1768 node1.start().await?;
1769 node2.start().await?;
1770
1771 let node2_addr = node2
1772 .listen_addrs()
1773 .await
1774 .into_iter()
1775 .find(|a| a.is_ipv4())
1776 .ok_or_else(|| {
1777 P2PError::Network(crate::error::NetworkError::InvalidAddress(
1778 "Node 2 did not expose an IPv4 listen address".into(),
1779 ))
1780 })?;
1781
1782 let channel_id = node1.connect_peer(&node2_addr).await?;
1785
1786 assert!(node1.is_connection_active(&channel_id).await);
1789
1790 let peer_info = node1.transport.peer_info_by_channel(&channel_id).await;
1792 assert!(peer_info.is_some());
1793 let info = peer_info.expect("Peer info should exist after connect");
1794 assert_eq!(info.channel_id, channel_id);
1795 assert_eq!(info.status, ConnectionStatus::Connected);
1796 assert!(info.protocols.contains(&"p2p-foundation/1.0".to_string()));
1797
1798 node1.remove_channel(&channel_id).await;
1800 assert!(!node1.is_connection_active(&channel_id).await);
1801
1802 node1.stop().await?;
1803 node2.stop().await?;
1804
1805 Ok(())
1806 }
1807
1808 #[tokio::test]
1809 async fn test_connect_peer_rejects_tcp_multiaddr() -> Result<()> {
1810 let config = create_test_node_config();
1811 let node = P2PNode::new(config).await?;
1812
1813 let tcp_addr: MultiAddr = "/ip4/127.0.0.1/tcp/1".parse().unwrap();
1814 let result = node.connect_peer(&tcp_addr).await;
1815
1816 assert!(
1817 matches!(
1818 result,
1819 Err(P2PError::Network(
1820 crate::error::NetworkError::InvalidAddress(_)
1821 ))
1822 ),
1823 "TCP multiaddrs should be rejected before a QUIC dial is attempted, got: {:?}",
1824 result
1825 );
1826
1827 Ok(())
1828 }
1829
1830 #[cfg_attr(target_os = "windows", ignore)]
1837 #[tokio::test]
1838 async fn test_event_subscription() -> Result<()> {
1839 let identity1 =
1843 Arc::new(NodeIdentity::generate().expect("should generate identity for test node1"));
1844 let identity2 =
1845 Arc::new(NodeIdentity::generate().expect("should generate identity for test node2"));
1846
1847 let mut config1 = create_test_node_config();
1848 config1.ipv6 = false;
1849 config1.node_identity = Some(identity1);
1850
1851 let node2_peer_id = *identity2.peer_id();
1852 let mut config2 = create_test_node_config();
1853 config2.ipv6 = false;
1854 config2.node_identity = Some(identity2);
1855
1856 let node1 = P2PNode::new(config1).await?;
1857 let node2 = P2PNode::new(config2).await?;
1858
1859 node1.start().await?;
1860 node2.start().await?;
1861
1862 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1863
1864 let mut events = node2.subscribe_events();
1866
1867 let node2_addr = node2.local_addr().ok_or_else(|| {
1868 P2PError::Network(crate::error::NetworkError::ProtocolError(
1869 "No listening address".to_string().into(),
1870 ))
1871 })?;
1872
1873 let mut channel_id = None;
1875 for attempt in 0..3 {
1876 if attempt > 0 {
1877 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1878 }
1879 match timeout(Duration::from_secs(2), node1.connect_peer(&node2_addr)).await {
1880 Ok(Ok(id)) => {
1881 channel_id = Some(id);
1882 break;
1883 }
1884 Ok(Err(_)) | Err(_) => continue,
1885 }
1886 }
1887 let channel_id = channel_id.expect("Failed to connect after 3 attempts");
1888
1889 let target_peer_id = node1
1891 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
1892 .await?;
1893 assert_eq!(target_peer_id, node2_peer_id);
1894
1895 node1
1897 .send_message(&target_peer_id, "test-topic", b"hello".to_vec())
1898 .await?;
1899
1900 let event = timeout(Duration::from_secs(2), async {
1902 loop {
1903 match events.recv().await {
1904 Ok(P2PEvent::PeerConnected(id, _)) => return Ok(id),
1905 Ok(P2PEvent::Message { .. }) => continue, Ok(_) => continue,
1907 Err(e) => return Err(e),
1908 }
1909 }
1910 })
1911 .await;
1912 assert!(event.is_ok(), "Should receive PeerConnected event");
1913 let connected_peer_id = event.expect("Timed out").expect("Channel error");
1914 assert!(
1916 connected_peer_id.0.iter().any(|&b| b != 0),
1917 "PeerConnected should carry a non-zero peer ID"
1918 );
1919
1920 node1.stop().await?;
1921 node2.stop().await?;
1922
1923 Ok(())
1924 }
1925
1926 #[cfg_attr(target_os = "windows", ignore)]
1928 #[tokio::test]
1929 async fn test_message_sending() -> Result<()> {
1930 let mut config1 = create_test_node_config();
1932 config1.ipv6 = false;
1933 let node1 = P2PNode::new(config1).await?;
1934 node1.start().await?;
1935
1936 let mut config2 = create_test_node_config();
1937 config2.ipv6 = false;
1938 let node2 = P2PNode::new(config2).await?;
1939 node2.start().await?;
1940
1941 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1943
1944 let node2_addr = node2.local_addr().ok_or_else(|| {
1946 P2PError::Network(crate::error::NetworkError::ProtocolError(
1947 "No listening address".to_string().into(),
1948 ))
1949 })?;
1950
1951 let channel_id =
1953 match timeout(Duration::from_millis(500), node1.connect_peer(&node2_addr)).await {
1954 Ok(res) => res?,
1955 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
1956 };
1957
1958 let target_peer_id = node1
1960 .wait_for_peer_identity(&channel_id, Duration::from_secs(2))
1961 .await?;
1962 assert_eq!(target_peer_id, node2.peer_id().clone());
1963
1964 let message_data = b"Hello, peer!".to_vec();
1966 let result = match timeout(
1967 Duration::from_millis(500),
1968 node1.send_message(&target_peer_id, "test-protocol", message_data),
1969 )
1970 .await
1971 {
1972 Ok(res) => res,
1973 Err(_) => return Err(P2PError::Network(NetworkError::Timeout)),
1974 };
1975 if let Err(e) = &result {
1978 assert!(!e.to_string().contains("not connected"), "Got error: {}", e);
1979 }
1980
1981 let non_existent_peer = PeerId::from_bytes([0xFFu8; 32]);
1983 let result = node1
1984 .send_message(&non_existent_peer, "test-protocol", vec![])
1985 .await;
1986 assert!(result.is_err(), "Sending to non-existent peer should fail");
1987
1988 node1.stop().await?;
1989 node2.stop().await?;
1990
1991 Ok(())
1992 }
1993
1994 #[tokio::test]
1995 async fn test_remote_mcp_operations() -> Result<()> {
1996 let config = create_test_node_config();
1997 let node = P2PNode::new(config).await?;
1998
1999 node.start().await?;
2001 node.stop().await?;
2002 Ok(())
2003 }
2004
2005 #[tokio::test]
2006 async fn test_health_check() -> Result<()> {
2007 let config = create_test_node_config();
2008 let node = P2PNode::new(config).await?;
2009
2010 let result = node.health_check().await;
2012 assert!(result.is_ok());
2013
2014 Ok(())
2019 }
2020
2021 #[tokio::test]
2022 async fn test_node_uptime() -> Result<()> {
2023 let config = create_test_node_config();
2024 let node = P2PNode::new(config).await?;
2025
2026 let uptime1 = node.uptime();
2027 assert!(uptime1 >= Duration::from_secs(0));
2028
2029 tokio::time::sleep(Duration::from_millis(10)).await;
2031
2032 let uptime2 = node.uptime();
2033 assert!(uptime2 > uptime1);
2034
2035 Ok(())
2036 }
2037
2038 #[tokio::test]
2039 async fn test_node_config_access() -> Result<()> {
2040 let config = create_test_node_config();
2041 let node = P2PNode::new(config).await?;
2042
2043 let node_config = node.config();
2044 assert_eq!(node_config.max_connections, 100);
2045 Ok(())
2048 }
2049
2050 #[tokio::test]
2051 async fn test_mcp_server_access() -> Result<()> {
2052 let config = create_test_node_config();
2053 let _node = P2PNode::new(config).await?;
2054
2055 Ok(())
2057 }
2058
2059 #[tokio::test]
2060 async fn test_dht_access() -> Result<()> {
2061 let config = create_test_node_config();
2062 let node = P2PNode::new(config).await?;
2063
2064 let _dht = node.dht();
2066
2067 Ok(())
2068 }
2069
2070 #[tokio::test]
2071 async fn test_node_config_builder() -> Result<()> {
2072 let bootstrap: MultiAddr = "/ip4/127.0.0.1/udp/9000/quic".parse().unwrap();
2073
2074 let config = NodeConfig::builder()
2075 .local(true)
2076 .ipv6(true)
2077 .bootstrap_peer(bootstrap)
2078 .connection_timeout(Duration::from_secs(15))
2079 .max_connections(200)
2080 .max_message_size(TEST_MAX_MESSAGE_SIZE)
2081 .build()?;
2082
2083 assert_eq!(config.listen_addrs().len(), 2); assert!(config.local);
2085 assert!(config.ipv6);
2086 assert_eq!(config.bootstrap_peers.len(), 1);
2087 assert_eq!(config.connection_timeout, Duration::from_secs(15));
2088 assert_eq!(config.max_connections, 200);
2089 assert_eq!(config.max_message_size, Some(TEST_MAX_MESSAGE_SIZE));
2090 assert!(config.allow_loopback); Ok(())
2093 }
2094
2095 #[tokio::test]
2096 async fn test_bootstrap_peers() -> Result<()> {
2097 let mut config = create_test_node_config();
2098 config.bootstrap_peers = vec![
2099 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9200),
2100 crate::MultiAddr::from_ipv4(std::net::Ipv4Addr::LOCALHOST, 9201),
2101 ];
2102
2103 let node = P2PNode::new(config).await?;
2104
2105 node.start().await?;
2107
2108 let _peer_count = node.peer_count().await;
2112
2113 node.stop().await?;
2114 Ok(())
2115 }
2116
2117 #[tokio::test]
2118 async fn test_peer_info_structure() {
2119 let peer_info = PeerInfo {
2120 channel_id: "test_peer".to_string(),
2121 addresses: vec!["/ip4/127.0.0.1/tcp/9000".parse::<MultiAddr>().unwrap()],
2122 connected_at: Instant::now(),
2123 last_seen: Instant::now(),
2124 status: ConnectionStatus::Connected,
2125 protocols: vec!["test-protocol".to_string()],
2126 heartbeat_count: 0,
2127 };
2128
2129 assert_eq!(peer_info.channel_id, "test_peer");
2130 assert_eq!(peer_info.addresses.len(), 1);
2131 assert_eq!(peer_info.status, ConnectionStatus::Connected);
2132 assert_eq!(peer_info.protocols.len(), 1);
2133 }
2134
2135 #[tokio::test]
2136 async fn test_serialization() -> Result<()> {
2137 let config = create_test_node_config();
2139 let serialized = serde_json::to_string(&config)?;
2140 let deserialized: NodeConfig = serde_json::from_str(&serialized)?;
2141
2142 assert_eq!(config.local, deserialized.local);
2143 assert_eq!(config.port, deserialized.port);
2144 assert_eq!(config.ipv6, deserialized.ipv6);
2145 assert_eq!(config.bootstrap_peers, deserialized.bootstrap_peers);
2146
2147 Ok(())
2148 }
2149
2150 #[tokio::test]
2151 async fn test_get_channel_id_by_address_found() -> Result<()> {
2152 let config = create_test_node_config();
2153 let node = P2PNode::new(config).await?;
2154
2155 let test_channel_id = "peer_test_123".to_string();
2157 let test_address = "192.168.1.100:9000";
2158 let test_multiaddr = MultiAddr::quic(test_address.parse().unwrap());
2159
2160 let peer_info = PeerInfo {
2161 channel_id: test_channel_id.clone(),
2162 addresses: vec![test_multiaddr],
2163 connected_at: Instant::now(),
2164 last_seen: Instant::now(),
2165 status: ConnectionStatus::Connected,
2166 protocols: vec!["test-protocol".to_string()],
2167 heartbeat_count: 0,
2168 };
2169
2170 node.transport
2171 .inject_peer(test_channel_id.clone(), peer_info)
2172 .await;
2173
2174 let lookup_addr = MultiAddr::quic(test_address.parse().unwrap());
2176 let found_channel_id = node.get_channel_id_by_address(&lookup_addr).await;
2177 assert_eq!(found_channel_id, Some(test_channel_id));
2178
2179 Ok(())
2180 }
2181
2182 #[tokio::test]
2183 async fn test_get_channel_id_by_address_not_found() -> Result<()> {
2184 let config = create_test_node_config();
2185 let node = P2PNode::new(config).await?;
2186
2187 let unknown_addr = MultiAddr::quic("192.168.1.200:9000".parse().unwrap());
2189 let result = node.get_channel_id_by_address(&unknown_addr).await;
2190 assert_eq!(result, None);
2191
2192 Ok(())
2193 }
2194
2195 #[tokio::test]
2196 async fn test_get_channel_id_by_address_invalid_format() -> Result<()> {
2197 let config = create_test_node_config();
2198 let node = P2PNode::new(config).await?;
2199
2200 let ble_addr = MultiAddr::new(crate::address::TransportAddr::Ble {
2202 mac: [0x02, 0x00, 0x00, 0x00, 0x00, 0x01],
2203 psm: 0x0025,
2204 });
2205 let result = node.get_channel_id_by_address(&ble_addr).await;
2206 assert_eq!(result, None);
2207
2208 Ok(())
2209 }
2210
2211 #[tokio::test]
2212 async fn test_get_channel_id_by_address_multiple_peers() -> Result<()> {
2213 let config = create_test_node_config();
2214 let node = P2PNode::new(config).await?;
2215
2216 let peer1_id = "peer_1".to_string();
2218 let peer1_addr_str = "192.168.1.101:9001";
2219 let peer1_multiaddr = MultiAddr::quic(peer1_addr_str.parse().unwrap());
2220
2221 let peer2_id = "peer_2".to_string();
2222 let peer2_addr_str = "192.168.1.102:9002";
2223 let peer2_multiaddr = MultiAddr::quic(peer2_addr_str.parse().unwrap());
2224
2225 let peer1_info = PeerInfo {
2226 channel_id: peer1_id.clone(),
2227 addresses: vec![peer1_multiaddr],
2228 connected_at: Instant::now(),
2229 last_seen: Instant::now(),
2230 status: ConnectionStatus::Connected,
2231 protocols: vec!["test-protocol".to_string()],
2232 heartbeat_count: 0,
2233 };
2234
2235 let peer2_info = PeerInfo {
2236 channel_id: peer2_id.clone(),
2237 addresses: vec![peer2_multiaddr],
2238 connected_at: Instant::now(),
2239 last_seen: Instant::now(),
2240 status: ConnectionStatus::Connected,
2241 protocols: vec!["test-protocol".to_string()],
2242 heartbeat_count: 0,
2243 };
2244
2245 node.transport
2246 .inject_peer(peer1_id.clone(), peer1_info)
2247 .await;
2248 node.transport
2249 .inject_peer(peer2_id.clone(), peer2_info)
2250 .await;
2251
2252 let found_peer1 = node
2254 .get_channel_id_by_address(&MultiAddr::quic(peer1_addr_str.parse().unwrap()))
2255 .await;
2256 let found_peer2 = node
2257 .get_channel_id_by_address(&MultiAddr::quic(peer2_addr_str.parse().unwrap()))
2258 .await;
2259
2260 assert_eq!(found_peer1, Some(peer1_id));
2261 assert_eq!(found_peer2, Some(peer2_id));
2262
2263 Ok(())
2264 }
2265
2266 #[tokio::test]
2267 async fn test_list_active_connections_empty() -> Result<()> {
2268 let config = create_test_node_config();
2269 let node = P2PNode::new(config).await?;
2270
2271 let connections = node.list_active_connections().await;
2273 assert!(connections.is_empty());
2274
2275 Ok(())
2276 }
2277
2278 #[tokio::test]
2279 async fn test_list_active_connections_with_peers() -> Result<()> {
2280 let config = create_test_node_config();
2281 let node = P2PNode::new(config).await?;
2282
2283 let peer1_id = "peer_1".to_string();
2285 let peer1_addrs = vec![
2286 MultiAddr::quic("192.168.1.101:9001".parse().unwrap()),
2287 MultiAddr::quic("192.168.1.101:9002".parse().unwrap()),
2288 ];
2289
2290 let peer2_id = "peer_2".to_string();
2291 let peer2_addrs = vec![MultiAddr::quic("192.168.1.102:9003".parse().unwrap())];
2292
2293 let peer1_info = PeerInfo {
2294 channel_id: peer1_id.clone(),
2295 addresses: peer1_addrs.clone(),
2296 connected_at: Instant::now(),
2297 last_seen: Instant::now(),
2298 status: ConnectionStatus::Connected,
2299 protocols: vec!["test-protocol".to_string()],
2300 heartbeat_count: 0,
2301 };
2302
2303 let peer2_info = PeerInfo {
2304 channel_id: peer2_id.clone(),
2305 addresses: peer2_addrs.clone(),
2306 connected_at: Instant::now(),
2307 last_seen: Instant::now(),
2308 status: ConnectionStatus::Connected,
2309 protocols: vec!["test-protocol".to_string()],
2310 heartbeat_count: 0,
2311 };
2312
2313 node.transport
2314 .inject_peer(peer1_id.clone(), peer1_info)
2315 .await;
2316 node.transport
2317 .inject_peer(peer2_id.clone(), peer2_info)
2318 .await;
2319
2320 node.transport
2322 .inject_active_connection(peer1_id.clone())
2323 .await;
2324 node.transport
2325 .inject_active_connection(peer2_id.clone())
2326 .await;
2327
2328 let connections = node.list_active_connections().await;
2330 assert_eq!(connections.len(), 2);
2331
2332 let peer1_conn = connections.iter().find(|(id, _)| id == &peer1_id);
2334 let peer2_conn = connections.iter().find(|(id, _)| id == &peer2_id);
2335
2336 assert!(peer1_conn.is_some());
2337 assert!(peer2_conn.is_some());
2338
2339 assert_eq!(peer1_conn.unwrap().1, peer1_addrs);
2341 assert_eq!(peer2_conn.unwrap().1, peer2_addrs);
2342
2343 Ok(())
2344 }
2345
2346 #[tokio::test]
2347 async fn test_remove_channel_success() -> Result<()> {
2348 let config = create_test_node_config();
2349 let node = P2PNode::new(config).await?;
2350
2351 let channel_id = "peer_to_remove".to_string();
2353 let channel_peer_id = PeerId::from_name(&channel_id);
2354 let peer_info = PeerInfo {
2355 channel_id: channel_id.clone(),
2356 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2357 connected_at: Instant::now(),
2358 last_seen: Instant::now(),
2359 status: ConnectionStatus::Connected,
2360 protocols: vec!["test-protocol".to_string()],
2361 heartbeat_count: 0,
2362 };
2363
2364 node.transport
2365 .inject_peer(channel_id.clone(), peer_info)
2366 .await;
2367 node.transport
2368 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2369 .await;
2370
2371 assert!(node.is_peer_connected(&channel_peer_id).await);
2373
2374 let removed = node.remove_channel(&channel_id).await;
2376 assert!(removed);
2377
2378 assert!(!node.is_peer_connected(&channel_peer_id).await);
2380
2381 Ok(())
2382 }
2383
2384 #[tokio::test]
2385 async fn test_remove_channel_nonexistent() -> Result<()> {
2386 let config = create_test_node_config();
2387 let node = P2PNode::new(config).await?;
2388
2389 let removed = node.remove_channel("nonexistent_peer").await;
2391 assert!(!removed);
2392
2393 Ok(())
2394 }
2395
2396 #[tokio::test]
2397 async fn test_is_peer_connected() -> Result<()> {
2398 let config = create_test_node_config();
2399 let node = P2PNode::new(config).await?;
2400
2401 let channel_id = "test_peer".to_string();
2402 let channel_peer_id = PeerId::from_name(&channel_id);
2403
2404 assert!(!node.is_peer_connected(&channel_peer_id).await);
2406
2407 let peer_info = PeerInfo {
2409 channel_id: channel_id.clone(),
2410 addresses: vec![MultiAddr::quic("192.168.1.100:9000".parse().unwrap())],
2411 connected_at: Instant::now(),
2412 last_seen: Instant::now(),
2413 status: ConnectionStatus::Connected,
2414 protocols: vec!["test-protocol".to_string()],
2415 heartbeat_count: 0,
2416 };
2417
2418 node.transport
2419 .inject_peer(channel_id.clone(), peer_info)
2420 .await;
2421 node.transport
2422 .inject_peer_to_channel(channel_peer_id, channel_id.clone())
2423 .await;
2424
2425 assert!(node.is_peer_connected(&channel_peer_id).await);
2427
2428 node.remove_channel(&channel_id).await;
2430
2431 assert!(!node.is_peer_connected(&channel_peer_id).await);
2433
2434 Ok(())
2435 }
2436
2437 #[test]
2438 fn test_normalize_ipv6_wildcard() {
2439 use std::net::{IpAddr, Ipv6Addr, SocketAddr};
2440
2441 let wildcard = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 8080);
2442 let normalized = normalize_wildcard_to_loopback(wildcard);
2443
2444 assert_eq!(normalized.ip(), IpAddr::V6(Ipv6Addr::LOCALHOST));
2445 assert_eq!(normalized.port(), 8080);
2446 }
2447
2448 #[test]
2449 fn test_normalize_ipv4_wildcard() {
2450 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2451
2452 let wildcard = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9000);
2453 let normalized = normalize_wildcard_to_loopback(wildcard);
2454
2455 assert_eq!(normalized.ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
2456 assert_eq!(normalized.port(), 9000);
2457 }
2458
2459 #[test]
2460 fn test_normalize_specific_address_unchanged() {
2461 let specific: std::net::SocketAddr = "192.168.1.100:3000".parse().unwrap();
2462 let normalized = normalize_wildcard_to_loopback(specific);
2463
2464 assert_eq!(normalized, specific);
2465 }
2466
2467 #[test]
2468 fn test_normalize_loopback_unchanged() {
2469 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
2470
2471 let loopback_v6 = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 5000);
2472 let normalized_v6 = normalize_wildcard_to_loopback(loopback_v6);
2473 assert_eq!(normalized_v6, loopback_v6);
2474
2475 let loopback_v4 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000);
2476 let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4);
2477 assert_eq!(normalized_v4, loopback_v4);
2478 }
2479
2480 fn current_timestamp() -> u64 {
2484 std::time::SystemTime::now()
2485 .duration_since(std::time::UNIX_EPOCH)
2486 .map(|d| d.as_secs())
2487 .unwrap_or(0)
2488 }
2489
2490 fn make_wire_bytes(protocol: &str, data: Vec<u8>, from: &str, timestamp: u64) -> Vec<u8> {
2492 let msg = WireMessage {
2493 protocol: protocol.to_string(),
2494 data,
2495 from: PeerId::from_name(from),
2496 timestamp,
2497 user_agent: String::new(),
2498 public_key: Vec::new(),
2499 signature: Vec::new(),
2500 };
2501 postcard::to_stdvec(&msg).unwrap()
2502 }
2503
2504 #[test]
2505 fn test_parse_protocol_message_uses_transport_peer_id_as_source() {
2506 let transport_id = "abcdef0123456789";
2509 let logical_id = "spoofed-logical-id";
2510 let bytes = make_wire_bytes("test/v1", vec![1, 2, 3], logical_id, current_timestamp());
2511
2512 let parsed =
2513 parse_protocol_message(&bytes, transport_id).expect("valid message should parse");
2514
2515 assert!(parsed.authenticated_node_id.is_none());
2517
2518 match parsed.event {
2519 P2PEvent::Message {
2520 topic,
2521 source,
2522 data,
2523 } => {
2524 assert!(source.is_none(), "unsigned message source must be None");
2525 assert_eq!(topic, "test/v1");
2526 assert_eq!(data, vec![1u8, 2, 3]);
2527 }
2528 other => panic!("expected P2PEvent::Message, got {:?}", other),
2529 }
2530 }
2531
2532 #[test]
2533 fn test_parse_protocol_message_rejects_invalid_bytes() {
2534 assert!(parse_protocol_message(b"not valid bincode", "peer-id").is_none());
2536 }
2537
2538 #[test]
2539 fn test_parse_protocol_message_rejects_truncated_message() {
2540 let full_bytes = make_wire_bytes("test/v1", vec![1, 2, 3], "sender", current_timestamp());
2542 let truncated = &full_bytes[..full_bytes.len() / 2];
2543 assert!(parse_protocol_message(truncated, "peer-id").is_none());
2544 }
2545
2546 #[test]
2547 fn test_parse_protocol_message_empty_payload() {
2548 let bytes = make_wire_bytes("ping", vec![], "sender", current_timestamp());
2549
2550 let parsed = parse_protocol_message(&bytes, "transport-peer")
2551 .expect("valid message with empty data should parse");
2552
2553 match parsed.event {
2554 P2PEvent::Message { data, .. } => assert!(data.is_empty()),
2555 other => panic!("expected P2PEvent::Message, got {:?}", other),
2556 }
2557 }
2558
2559 #[test]
2560 fn test_parse_protocol_message_preserves_binary_payload() {
2561 let payload: Vec<u8> = (0..=255).collect();
2563 let bytes = make_wire_bytes("binary/v1", payload.clone(), "sender", current_timestamp());
2564
2565 let parsed = parse_protocol_message(&bytes, "peer-id")
2566 .expect("valid message with full byte range should parse");
2567
2568 match parsed.event {
2569 P2PEvent::Message { data, topic, .. } => {
2570 assert_eq!(topic, "binary/v1");
2571 assert_eq!(
2572 data, payload,
2573 "payload must survive bincode round-trip exactly"
2574 );
2575 }
2576 other => panic!("expected P2PEvent::Message, got {:?}", other),
2577 }
2578 }
2579
2580 #[test]
2581 fn test_parse_signed_message_verifies_and_uses_node_id() {
2582 let identity = NodeIdentity::generate().expect("should generate identity");
2583 let protocol = "test/signed";
2584 let data: Vec<u8> = vec![10, 20, 30];
2585 let from = *identity.peer_id();
2587 let timestamp = current_timestamp();
2588 let user_agent = "test/1.0";
2589
2590 let signable =
2592 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2593 .unwrap();
2594 let sig = identity.sign(&signable).expect("signing should succeed");
2595
2596 let msg = WireMessage {
2597 protocol: protocol.to_string(),
2598 data: data.clone(),
2599 from,
2600 timestamp,
2601 user_agent: user_agent.to_string(),
2602 public_key: identity.public_key().as_bytes().to_vec(),
2603 signature: sig.as_bytes().to_vec(),
2604 };
2605 let bytes = postcard::to_stdvec(&msg).unwrap();
2606
2607 let parsed =
2608 parse_protocol_message(&bytes, "transport-xyz").expect("signed message should parse");
2609
2610 let expected_peer_id = *identity.peer_id();
2611 assert_eq!(
2612 parsed.authenticated_node_id.as_ref(),
2613 Some(&expected_peer_id)
2614 );
2615
2616 match parsed.event {
2617 P2PEvent::Message { source, .. } => {
2618 assert_eq!(
2619 source.as_ref(),
2620 Some(&expected_peer_id),
2621 "source should be the verified PeerId"
2622 );
2623 }
2624 other => panic!("expected P2PEvent::Message, got {:?}", other),
2625 }
2626 }
2627
2628 #[test]
2629 fn test_parse_message_with_bad_signature_is_rejected() {
2630 let identity = NodeIdentity::generate().expect("should generate identity");
2631 let protocol = "test/bad-sig";
2632 let data: Vec<u8> = vec![1, 2, 3];
2633 let from = *identity.peer_id();
2634 let timestamp = current_timestamp();
2635 let user_agent = "test/1.0";
2636
2637 let signable =
2639 postcard::to_stdvec(&(protocol, data.as_slice(), &from, timestamp, user_agent))
2640 .unwrap();
2641 let sig = identity.sign(&signable).expect("signing should succeed");
2642
2643 let msg = WireMessage {
2645 protocol: protocol.to_string(),
2646 data: vec![99, 99, 99],
2647 from,
2648 timestamp,
2649 user_agent: user_agent.to_string(),
2650 public_key: identity.public_key().as_bytes().to_vec(),
2651 signature: sig.as_bytes().to_vec(),
2652 };
2653 let bytes = postcard::to_stdvec(&msg).unwrap();
2654
2655 assert!(
2656 parse_protocol_message(&bytes, "transport-xyz").is_none(),
2657 "message with bad signature should be rejected"
2658 );
2659 }
2660
2661 #[test]
2662 fn test_parse_message_with_mismatched_from_is_rejected() {
2663 let identity = NodeIdentity::generate().expect("should generate identity");
2664 let protocol = "test/from-mismatch";
2665 let data: Vec<u8> = vec![1, 2, 3];
2666 let fake_from = PeerId::from_bytes([0xDE; 32]);
2668 let timestamp = current_timestamp();
2669 let user_agent = "test/1.0";
2670
2671 let signable =
2672 postcard::to_stdvec(&(protocol, data.as_slice(), &fake_from, timestamp, user_agent))
2673 .unwrap();
2674 let sig = identity.sign(&signable).expect("signing should succeed");
2675
2676 let msg = WireMessage {
2677 protocol: protocol.to_string(),
2678 data,
2679 from: fake_from,
2680 timestamp,
2681 user_agent: user_agent.to_string(),
2682 public_key: identity.public_key().as_bytes().to_vec(),
2683 signature: sig.as_bytes().to_vec(),
2684 };
2685 let bytes = postcard::to_stdvec(&msg).unwrap();
2686
2687 assert!(
2688 parse_protocol_message(&bytes, "transport-xyz").is_none(),
2689 "message with mismatched from field should be rejected"
2690 );
2691 }
2692}