1#![allow(missing_docs)]
8
9use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, time::Duration};
16
17#[allow(clippy::panic)]
36fn create_random_port_bind_addr() -> SocketAddr {
37 "0.0.0.0:0"
38 .parse()
39 .unwrap_or_else(|_| panic!("Random port bind address format is always valid"))
40}
41
42fn extract_ed25519_from_spki(spki: &[u8]) -> Option<[u8; 32]> {
52 if spki.len() != 44 {
54 return None;
55 }
56
57 let ed25519_header = [0x30, 0x2a, 0x30, 0x05, 0x06, 0x03, 0x2b, 0x65, 0x70];
62 if !spki.starts_with(&ed25519_header) {
63 return None;
64 }
65
66 if spki[9..12] != [0x03, 0x21, 0x00] {
68 return None;
69 }
70
71 let mut public_key = [0u8; 32];
73 public_key.copy_from_slice(&spki[12..44]);
74 Some(public_key)
75}
76
77use tracing::{debug, error, info, warn};
78
79use std::sync::atomic::{AtomicBool, Ordering};
80
81use tokio::{
82 net::UdpSocket,
83 sync::{mpsc, mpsc::error::TryRecvError},
84 time::{sleep, timeout},
85};
86
87use crate::high_level::default_runtime;
88
89use crate::{
90 VarInt,
91 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
92 connection::nat_traversal::{CandidateSource, CandidateState},
94};
95
96use crate::{
97 ClientConfig, ConnectionError, EndpointConfig, ServerConfig, TransportConfig,
98 high_level::{Connection as InnerConnection, Endpoint as InnerEndpoint},
99};
100
101#[cfg(feature = "rustls-aws-lc-rs")]
102use crate::{crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig};
103
104use crate::config::validation::{ConfigValidator, ValidationResult};
105
106#[cfg(feature = "rustls-aws-lc-rs")]
107use crate::crypto::{pqc::PqcConfig, raw_public_keys::RawPublicKeyConfigBuilder};
108
109pub struct NatTraversalEndpoint {
111 inner_endpoint: Option<InnerEndpoint>,
113 config: NatTraversalConfig,
117 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
119 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
121 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
123 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
125 shutdown: Arc<AtomicBool>,
127 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
129 event_rx: std::sync::Mutex<mpsc::UnboundedReceiver<NatTraversalEvent>>,
131 connections: Arc<std::sync::RwLock<HashMap<PeerId, InnerConnection>>>,
133 local_peer_id: PeerId,
135 timeout_config: crate::config::nat_timeouts::TimeoutConfig,
137 emitted_established_events: Arc<std::sync::RwLock<std::collections::HashSet<PeerId>>>,
140}
141
142#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
182pub struct NatTraversalConfig {
183 pub known_peers: Vec<SocketAddr>,
187 pub max_candidates: usize,
189 pub coordination_timeout: Duration,
191 pub enable_symmetric_nat: bool,
193 pub enable_relay_fallback: bool,
195 pub max_concurrent_attempts: usize,
197 pub bind_addr: Option<SocketAddr>,
214 pub prefer_rfc_nat_traversal: bool,
217 pub pqc: Option<PqcConfig>,
219 pub timeouts: crate::config::nat_timeouts::TimeoutConfig,
221 #[serde(skip)]
228 pub identity_key: Option<ed25519_dalek::SigningKey>,
229}
230
231#[derive(
237 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
238)]
239pub struct PeerId(pub [u8; 32]);
240
241#[derive(Debug, Clone)]
243pub struct BootstrapNode {
244 pub address: SocketAddr,
246 pub last_seen: std::time::Instant,
248 pub can_coordinate: bool,
250 pub rtt: Option<Duration>,
252 pub coordination_count: u32,
254}
255
256impl BootstrapNode {
257 pub fn new(address: SocketAddr) -> Self {
259 Self {
260 address,
261 last_seen: std::time::Instant::now(),
262 can_coordinate: true,
263 rtt: None,
264 coordination_count: 0,
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
271pub struct CandidatePair {
272 pub local_candidate: CandidateAddress,
274 pub remote_candidate: CandidateAddress,
276 pub priority: u64,
278 pub state: CandidatePairState,
280}
281
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub enum CandidatePairState {
285 Waiting,
287 InProgress,
289 Succeeded,
291 Failed,
293 Cancelled,
295}
296
297#[derive(Debug)]
299struct NatTraversalSession {
300 peer_id: PeerId,
302 #[allow(dead_code)]
304 coordinator: SocketAddr,
305 attempt: u32,
307 started_at: std::time::Instant,
309 phase: TraversalPhase,
311 candidates: Vec<CandidateAddress>,
313 session_state: SessionState,
315}
316
317#[derive(Debug, Clone)]
319pub struct SessionState {
320 pub state: ConnectionState,
322 pub last_transition: std::time::Instant,
324 pub connection: Option<InnerConnection>,
326 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
328 pub metrics: ConnectionMetrics,
330}
331
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
334pub enum ConnectionState {
335 Idle,
337 Connecting,
339 Connected,
341 Migrating,
343 Closed,
345}
346
347#[derive(Debug, Clone, Default)]
349pub struct ConnectionMetrics {
350 pub rtt: Option<Duration>,
352 pub loss_rate: f64,
354 pub bytes_sent: u64,
356 pub bytes_received: u64,
358 pub last_activity: Option<std::time::Instant>,
360}
361
362#[derive(Debug, Clone)]
364pub struct SessionStateUpdate {
365 pub peer_id: PeerId,
367 pub old_state: ConnectionState,
369 pub new_state: ConnectionState,
371 pub reason: StateChangeReason,
373}
374
375#[derive(Debug, Clone, Copy, PartialEq, Eq)]
377pub enum StateChangeReason {
378 Timeout,
380 ConnectionEstablished,
382 ConnectionClosed,
384 MigrationComplete,
386 MigrationFailed,
388 NetworkError,
390 UserClosed,
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq)]
396pub enum TraversalPhase {
397 Discovery,
399 Coordination,
401 Synchronization,
403 Punching,
405 Validation,
407 Connected,
409 Failed,
411}
412
413#[derive(Debug, Clone, Copy)]
415enum SessionUpdate {
416 Timeout,
418 Disconnected,
420 UpdateMetrics,
422 InvalidState,
424 Retry,
426 MigrationTimeout,
428 Remove,
430}
431
432#[derive(Debug, Clone)]
434pub struct CandidateAddress {
435 pub address: SocketAddr,
437 pub priority: u32,
439 pub source: CandidateSource,
441 pub state: CandidateState,
443}
444
445impl CandidateAddress {
446 pub fn new(
448 address: SocketAddr,
449 priority: u32,
450 source: CandidateSource,
451 ) -> Result<Self, CandidateValidationError> {
452 Self::validate_address(&address)?;
453 Ok(Self {
454 address,
455 priority,
456 source,
457 state: CandidateState::New,
458 })
459 }
460
461 pub fn validate_address(addr: &SocketAddr) -> Result<(), CandidateValidationError> {
463 if addr.port() == 0 {
465 return Err(CandidateValidationError::InvalidPort(0));
466 }
467
468 #[cfg(not(test))]
470 if addr.port() < 1024 {
471 return Err(CandidateValidationError::PrivilegedPort(addr.port()));
472 }
473
474 match addr.ip() {
475 std::net::IpAddr::V4(ipv4) => {
476 if ipv4.is_unspecified() {
478 return Err(CandidateValidationError::UnspecifiedAddress);
479 }
480 if ipv4.is_broadcast() {
481 return Err(CandidateValidationError::BroadcastAddress);
482 }
483 if ipv4.is_multicast() {
484 return Err(CandidateValidationError::MulticastAddress);
485 }
486 if ipv4.octets()[0] == 0 {
488 return Err(CandidateValidationError::ReservedAddress);
489 }
490 if ipv4.octets()[0] >= 240 {
492 return Err(CandidateValidationError::ReservedAddress);
493 }
494 }
495 std::net::IpAddr::V6(ipv6) => {
496 if ipv6.is_unspecified() {
498 return Err(CandidateValidationError::UnspecifiedAddress);
499 }
500 if ipv6.is_multicast() {
501 return Err(CandidateValidationError::MulticastAddress);
502 }
503 let segments = ipv6.segments();
505 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
506 return Err(CandidateValidationError::DocumentationAddress);
507 }
508 if ipv6.to_ipv4_mapped().is_some() {
510 return Err(CandidateValidationError::IPv4MappedAddress);
511 }
512 }
513 }
514
515 Ok(())
516 }
517
518 pub fn is_suitable_for_nat_traversal(&self) -> bool {
520 match self.address.ip() {
521 std::net::IpAddr::V4(ipv4) => {
522 #[cfg(test)]
527 if ipv4.is_loopback() {
528 return true;
529 }
530 !ipv4.is_loopback()
531 && !ipv4.is_link_local()
532 && !ipv4.is_multicast()
533 && !ipv4.is_broadcast()
534 }
535 std::net::IpAddr::V6(ipv6) => {
536 #[cfg(test)]
542 if ipv6.is_loopback() {
543 return true;
544 }
545 let segments = ipv6.segments();
546 let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
547 let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
548
549 !ipv6.is_loopback() && !is_link_local && !is_unique_local && !ipv6.is_multicast()
550 }
551 }
552 }
553
554 pub fn effective_priority(&self) -> u32 {
556 match self.state {
557 CandidateState::Valid => self.priority,
558 CandidateState::New => self.priority.saturating_sub(10),
559 CandidateState::Validating => self.priority.saturating_sub(5),
560 CandidateState::Failed => 0,
561 CandidateState::Removed => 0,
562 }
563 }
564}
565
566#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
568pub enum CandidateValidationError {
569 #[error("invalid port number: {0}")]
571 InvalidPort(u16),
572 #[error("privileged port not allowed: {0}")]
574 PrivilegedPort(u16),
575 #[error("unspecified address not allowed")]
577 UnspecifiedAddress,
578 #[error("broadcast address not allowed")]
580 BroadcastAddress,
581 #[error("multicast address not allowed")]
583 MulticastAddress,
584 #[error("reserved address not allowed")]
586 ReservedAddress,
587 #[error("documentation address not allowed")]
589 DocumentationAddress,
590 #[error("IPv4-mapped IPv6 address not allowed")]
592 IPv4MappedAddress,
593}
594
595#[derive(Debug, Clone)]
597pub enum NatTraversalEvent {
598 CandidateDiscovered {
600 peer_id: PeerId,
602 candidate: CandidateAddress,
604 },
605 CoordinationRequested {
607 peer_id: PeerId,
609 coordinator: SocketAddr,
611 },
612 CoordinationSynchronized {
614 peer_id: PeerId,
616 round_id: VarInt,
618 },
619 HolePunchingStarted {
621 peer_id: PeerId,
623 targets: Vec<SocketAddr>,
625 },
626 PathValidated {
628 peer_id: PeerId,
630 address: SocketAddr,
632 rtt: Duration,
634 },
635 CandidateValidated {
637 peer_id: PeerId,
639 candidate_address: SocketAddr,
641 },
642 TraversalSucceeded {
644 peer_id: PeerId,
646 final_address: SocketAddr,
648 total_time: Duration,
650 },
651 ConnectionEstablished {
653 peer_id: PeerId,
654 remote_address: SocketAddr,
656 },
657 TraversalFailed {
659 peer_id: PeerId,
661 error: NatTraversalError,
663 fallback_available: bool,
665 },
666 ConnectionLost {
668 peer_id: PeerId,
670 reason: String,
672 },
673 PhaseTransition {
675 peer_id: PeerId,
677 from_phase: TraversalPhase,
679 to_phase: TraversalPhase,
681 },
682 SessionStateChanged {
684 peer_id: PeerId,
686 new_state: ConnectionState,
688 },
689 ExternalAddressDiscovered {
691 reported_by: SocketAddr,
693 address: SocketAddr,
695 },
696}
697
698#[derive(Debug, Clone)]
700pub enum NatTraversalError {
701 NoBootstrapNodes,
703 NoCandidatesFound,
705 CandidateDiscoveryFailed(String),
707 CoordinationFailed(String),
709 HolePunchingFailed,
711 PunchingFailed(String),
713 ValidationFailed(String),
715 ValidationTimeout,
717 NetworkError(String),
719 ConfigError(String),
721 ProtocolError(String),
723 Timeout,
725 ConnectionFailed(String),
727 TraversalFailed(String),
729 PeerNotConnected,
731}
732
733impl Default for NatTraversalConfig {
734 fn default() -> Self {
735 Self {
736 known_peers: Vec::new(),
737 max_candidates: 8,
738 coordination_timeout: Duration::from_secs(10),
739 enable_symmetric_nat: true,
740 enable_relay_fallback: true,
741 max_concurrent_attempts: 3,
742 bind_addr: None,
743 prefer_rfc_nat_traversal: true, pqc: Some(crate::crypto::pqc::PqcConfig::default()),
747 timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
748 identity_key: None, }
750 }
751}
752
753impl ConfigValidator for NatTraversalConfig {
754 fn validate(&self) -> ValidationResult<()> {
755 use crate::config::validation::*;
756
757 if !self.known_peers.is_empty() {
762 validate_bootstrap_nodes(&self.known_peers)?;
763 }
764
765 validate_range(self.max_candidates, 1, 256, "max_candidates")?;
767
768 validate_duration(
770 self.coordination_timeout,
771 Duration::from_millis(100),
772 Duration::from_secs(300),
773 "coordination_timeout",
774 )?;
775
776 validate_range(
778 self.max_concurrent_attempts,
779 1,
780 16,
781 "max_concurrent_attempts",
782 )?;
783
784 if self.max_concurrent_attempts > self.max_candidates {
786 return Err(ConfigValidationError::IncompatibleConfiguration(
787 "max_concurrent_attempts cannot exceed max_candidates".to_string(),
788 ));
789 }
790
791 Ok(())
792 }
793}
794
795impl NatTraversalEndpoint {
796 pub async fn new(
798 config: NatTraversalConfig,
799 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
800 ) -> Result<Self, NatTraversalError> {
801 Self::new_impl(config, event_callback).await
802 }
803
804 async fn new_impl(
806 config: NatTraversalConfig,
807 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
808 ) -> Result<Self, NatTraversalError> {
809 Self::new_common(config, event_callback).await
810 }
811
812 async fn new_common(
814 config: NatTraversalConfig,
815 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
816 ) -> Result<Self, NatTraversalError> {
817 Self::new_shared_logic(config, event_callback).await
819 }
820
821 async fn new_shared_logic(
823 config: NatTraversalConfig,
824 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
825 ) -> Result<Self, NatTraversalError> {
826 config
828 .validate()
829 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
830
831 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
833 config
834 .known_peers
835 .iter()
836 .map(|&address| BootstrapNode {
837 address,
838 last_seen: std::time::Instant::now(),
839 can_coordinate: true, rtt: None,
841 coordination_count: 0,
842 })
843 .collect(),
844 ));
845
846 let discovery_config = DiscoveryConfig {
848 total_timeout: config.coordination_timeout,
849 max_candidates: config.max_candidates,
850 enable_symmetric_prediction: config.enable_symmetric_nat,
851 bound_address: config.bind_addr, ..DiscoveryConfig::default()
853 };
854
855 let discovery_manager = Arc::new(std::sync::Mutex::new(CandidateDiscoveryManager::new(
858 discovery_config,
859 )));
860
861 let (inner_endpoint, event_tx, event_rx, local_addr) =
863 Self::create_inner_endpoint(&config).await?;
864
865 {
867 let mut discovery = discovery_manager.lock().map_err(|_| {
868 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
869 })?;
870 discovery.set_bound_address(local_addr);
871 info!(
872 "Updated discovery manager with bound address: {}",
873 local_addr
874 );
875 }
876
877 let emitted_established_events =
878 Arc::new(std::sync::RwLock::new(std::collections::HashSet::new()));
879
880 let endpoint = Self {
881 inner_endpoint: Some(inner_endpoint.clone()),
882 config: config.clone(),
883 bootstrap_nodes,
884 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
885 discovery_manager,
886 event_callback,
887 shutdown: Arc::new(AtomicBool::new(false)),
888 event_tx: Some(event_tx.clone()),
889 event_rx: std::sync::Mutex::new(event_rx),
890 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
891 local_peer_id: Self::generate_local_peer_id(),
892 timeout_config: config.timeouts.clone(),
893 emitted_established_events: emitted_established_events.clone(),
894 };
895
896 {
898 let endpoint_clone = inner_endpoint.clone();
899 let shutdown_clone = endpoint.shutdown.clone();
900 let event_tx_clone = event_tx.clone();
901 let connections_clone = endpoint.connections.clone();
902 let emitted_events_clone = emitted_established_events.clone();
903
904 tokio::spawn(async move {
905 Self::accept_connections(
906 endpoint_clone,
907 shutdown_clone,
908 event_tx_clone,
909 connections_clone,
910 emitted_events_clone,
911 )
912 .await;
913 });
914
915 info!("Started accepting connections (symmetric P2P node)");
916 }
917
918 let discovery_manager_clone = endpoint.discovery_manager.clone();
920 let shutdown_clone = endpoint.shutdown.clone();
921 let event_tx_clone = event_tx;
922 let connections_clone = endpoint.connections.clone();
923
924 tokio::spawn(async move {
925 Self::poll_discovery(
926 discovery_manager_clone,
927 shutdown_clone,
928 event_tx_clone,
929 connections_clone,
930 )
931 .await;
932 });
933
934 info!("Started discovery polling task");
935
936 {
938 let mut discovery = endpoint.discovery_manager.lock().map_err(|_| {
939 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
940 })?;
941
942 let local_peer_id = endpoint.local_peer_id;
944 let bootstrap_nodes = {
945 let nodes = endpoint.bootstrap_nodes.read().map_err(|_| {
946 NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string())
947 })?;
948 nodes.clone()
949 };
950
951 discovery
952 .start_discovery(local_peer_id, bootstrap_nodes)
953 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
954
955 info!(
956 "Started local candidate discovery for peer {:?}",
957 local_peer_id
958 );
959 }
960
961 Ok(endpoint)
962 }
963
964 pub fn get_endpoint(&self) -> Option<&crate::high_level::Endpoint> {
966 self.inner_endpoint.as_ref()
967 }
968
969 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
971 self.event_callback.as_ref()
972 }
973
974 pub fn initiate_nat_traversal(
976 &self,
977 peer_id: PeerId,
978 coordinator: SocketAddr,
979 ) -> Result<(), NatTraversalError> {
980 info!(
981 "Starting NAT traversal to peer {:?} via coordinator {}",
982 peer_id, coordinator
983 );
984
985 let session = NatTraversalSession {
987 peer_id,
988 coordinator,
989 attempt: 1,
990 started_at: std::time::Instant::now(),
991 phase: TraversalPhase::Discovery,
992 candidates: Vec::new(),
993 session_state: SessionState {
994 state: ConnectionState::Connecting,
995 last_transition: std::time::Instant::now(),
996
997 connection: None,
998 active_attempts: Vec::new(),
999 metrics: ConnectionMetrics::default(),
1000 },
1001 };
1002
1003 {
1005 let mut sessions = self
1006 .active_sessions
1007 .write()
1008 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1009 sessions.insert(peer_id, session);
1010 }
1011
1012 let bootstrap_nodes_vec = {
1014 let bootstrap_nodes = self
1015 .bootstrap_nodes
1016 .read()
1017 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1018 bootstrap_nodes.clone()
1019 };
1020
1021 {
1022 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1023 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1024 })?;
1025
1026 discovery
1027 .start_discovery(peer_id, bootstrap_nodes_vec)
1028 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1029 }
1030
1031 if let Some(ref callback) = self.event_callback {
1033 callback(NatTraversalEvent::CoordinationRequested {
1034 peer_id,
1035 coordinator,
1036 });
1037 }
1038
1039 Ok(())
1041 }
1042
1043 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
1045 let mut updates = Vec::new();
1046 let now = std::time::Instant::now();
1047
1048 let mut sessions = self
1049 .active_sessions
1050 .write()
1051 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
1052
1053 for (peer_id, session) in sessions.iter_mut() {
1054 let mut state_changed = false;
1055
1056 match session.session_state.state {
1057 ConnectionState::Connecting => {
1058 let elapsed = now.duration_since(session.session_state.last_transition);
1060 if elapsed
1061 > self
1062 .timeout_config
1063 .nat_traversal
1064 .connection_establishment_timeout
1065 {
1066 session.session_state.state = ConnectionState::Closed;
1067 session.session_state.last_transition = now;
1068 state_changed = true;
1069
1070 updates.push(SessionStateUpdate {
1071 peer_id: *peer_id,
1072 old_state: ConnectionState::Connecting,
1073 new_state: ConnectionState::Closed,
1074 reason: StateChangeReason::Timeout,
1075 });
1076 }
1077
1078 let has_connection = if let Ok(conns) = self.connections.read() {
1081 conns.contains_key(peer_id)
1082 } else {
1083 false
1084 };
1085
1086 if has_connection || session.session_state.connection.is_some() {
1087 if session.session_state.connection.is_none() {
1089 if let Ok(conns) = self.connections.read() {
1090 if let Some(conn) = conns.get(peer_id) {
1091 session.session_state.connection = Some(conn.clone());
1092 }
1093 }
1094 }
1095
1096 session.session_state.state = ConnectionState::Connected;
1097 session.session_state.last_transition = now;
1098 state_changed = true;
1099
1100 updates.push(SessionStateUpdate {
1101 peer_id: *peer_id,
1102 old_state: ConnectionState::Connecting,
1103 new_state: ConnectionState::Connected,
1104 reason: StateChangeReason::ConnectionEstablished,
1105 });
1106 }
1107 }
1108 ConnectionState::Connected => {
1109 {
1112 }
1115
1116 session.session_state.metrics.last_activity = Some(now);
1118 }
1119 ConnectionState::Migrating => {
1120 let elapsed = now.duration_since(session.session_state.last_transition);
1122 if elapsed > Duration::from_secs(10) {
1123 if session.session_state.connection.is_some() {
1126 session.session_state.state = ConnectionState::Connected;
1127 state_changed = true;
1128
1129 updates.push(SessionStateUpdate {
1130 peer_id: *peer_id,
1131 old_state: ConnectionState::Migrating,
1132 new_state: ConnectionState::Connected,
1133 reason: StateChangeReason::MigrationComplete,
1134 });
1135 } else {
1136 session.session_state.state = ConnectionState::Closed;
1137 state_changed = true;
1138
1139 updates.push(SessionStateUpdate {
1140 peer_id: *peer_id,
1141 old_state: ConnectionState::Migrating,
1142 new_state: ConnectionState::Closed,
1143 reason: StateChangeReason::MigrationFailed,
1144 });
1145 }
1146
1147 session.session_state.last_transition = now;
1148 }
1149 }
1150 _ => {}
1151 }
1152
1153 if state_changed {
1155 if let Some(ref callback) = self.event_callback {
1156 callback(NatTraversalEvent::SessionStateChanged {
1157 peer_id: *peer_id,
1158 new_state: session.session_state.state,
1159 });
1160 }
1161 }
1162 }
1163
1164 Ok(updates)
1165 }
1166
1167 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
1169 let sessions = self.active_sessions.clone();
1170 let shutdown = self.shutdown.clone();
1171 let timeout_config = self.timeout_config.clone();
1172
1173 tokio::spawn(async move {
1174 let mut ticker = tokio::time::interval(interval);
1175
1176 loop {
1177 ticker.tick().await;
1178
1179 if shutdown.load(Ordering::Relaxed) {
1180 break;
1181 }
1182
1183 let sessions_to_update = {
1185 match sessions.read() {
1186 Ok(sessions_guard) => {
1187 sessions_guard
1188 .iter()
1189 .filter_map(|(peer_id, session)| {
1190 let now = std::time::Instant::now();
1191 let elapsed =
1192 now.duration_since(session.session_state.last_transition);
1193
1194 match session.session_state.state {
1195 ConnectionState::Connecting => {
1196 if elapsed
1198 > timeout_config
1199 .nat_traversal
1200 .connection_establishment_timeout
1201 {
1202 Some((*peer_id, SessionUpdate::Timeout))
1203 } else {
1204 None
1205 }
1206 }
1207 ConnectionState::Connected => {
1208 if let Some(ref conn) = session.session_state.connection
1210 {
1211 if conn.close_reason().is_some() {
1212 Some((*peer_id, SessionUpdate::Disconnected))
1213 } else {
1214 Some((*peer_id, SessionUpdate::UpdateMetrics))
1216 }
1217 } else {
1218 Some((*peer_id, SessionUpdate::InvalidState))
1219 }
1220 }
1221 ConnectionState::Idle => {
1222 if elapsed
1224 > timeout_config
1225 .discovery
1226 .server_reflexive_cache_ttl
1227 {
1228 Some((*peer_id, SessionUpdate::Retry))
1229 } else {
1230 None
1231 }
1232 }
1233 ConnectionState::Migrating => {
1234 if elapsed > timeout_config.nat_traversal.probe_timeout
1236 {
1237 Some((*peer_id, SessionUpdate::MigrationTimeout))
1238 } else {
1239 None
1240 }
1241 }
1242 ConnectionState::Closed => {
1243 if elapsed
1245 > timeout_config.discovery.interface_cache_ttl
1246 {
1247 Some((*peer_id, SessionUpdate::Remove))
1248 } else {
1249 None
1250 }
1251 }
1252 }
1253 })
1254 .collect::<Vec<_>>()
1255 }
1256 _ => {
1257 vec![]
1258 }
1259 }
1260 };
1261
1262 if !sessions_to_update.is_empty() {
1264 if let Ok(mut sessions_guard) = sessions.write() {
1265 for (peer_id, update) in sessions_to_update {
1266 match update {
1267 SessionUpdate::Timeout => {
1268 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1269 session.session_state.state = ConnectionState::Closed;
1270 session.session_state.last_transition =
1271 std::time::Instant::now();
1272 tracing::warn!("Connection to {:?} timed out", peer_id);
1273 }
1274 }
1275 SessionUpdate::Disconnected => {
1276 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1277 session.session_state.state = ConnectionState::Closed;
1278 session.session_state.last_transition =
1279 std::time::Instant::now();
1280 session.session_state.connection = None;
1281 tracing::info!("Connection to {:?} closed", peer_id);
1282 }
1283 }
1284 SessionUpdate::UpdateMetrics => {
1285 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1286 if let Some(ref conn) = session.session_state.connection {
1287 let stats = conn.stats();
1289 session.session_state.metrics.rtt =
1290 Some(stats.path.rtt);
1291 session.session_state.metrics.loss_rate =
1292 stats.path.lost_packets as f64
1293 / stats.path.sent_packets.max(1) as f64;
1294 }
1295 }
1296 }
1297 SessionUpdate::InvalidState => {
1298 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1299 session.session_state.state = ConnectionState::Closed;
1300 session.session_state.last_transition =
1301 std::time::Instant::now();
1302 tracing::error!("Session {:?} in invalid state", peer_id);
1303 }
1304 }
1305 SessionUpdate::Retry => {
1306 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1307 session.session_state.state = ConnectionState::Connecting;
1308 session.session_state.last_transition =
1309 std::time::Instant::now();
1310 session.attempt += 1;
1311 tracing::info!(
1312 "Retrying connection to {:?} (attempt {})",
1313 peer_id,
1314 session.attempt
1315 );
1316 }
1317 }
1318 SessionUpdate::MigrationTimeout => {
1319 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1320 session.session_state.state = ConnectionState::Closed;
1321 session.session_state.last_transition =
1322 std::time::Instant::now();
1323 tracing::warn!("Migration timeout for {:?}", peer_id);
1324 }
1325 }
1326 SessionUpdate::Remove => {
1327 sessions_guard.remove(&peer_id);
1328 tracing::debug!("Removed old session for {:?}", peer_id);
1329 }
1330 }
1331 }
1332 }
1333 }
1334 }
1335 })
1336 }
1337
1338 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
1342 let sessions = self
1343 .active_sessions
1344 .read()
1345 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1346 let bootstrap_nodes = self
1347 .bootstrap_nodes
1348 .read()
1349 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1350
1351 let avg_coordination_time = {
1353 let rtts: Vec<Duration> = bootstrap_nodes.iter().filter_map(|b| b.rtt).collect();
1354
1355 if rtts.is_empty() {
1356 Duration::from_millis(500) } else {
1358 let total_millis: u64 = rtts.iter().map(|d| d.as_millis() as u64).sum();
1359 Duration::from_millis(total_millis / rtts.len() as u64 * 2) }
1361 };
1362
1363 Ok(NatTraversalStatistics {
1364 active_sessions: sessions.len(),
1365 total_bootstrap_nodes: bootstrap_nodes.len(),
1366 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
1367 average_coordination_time: avg_coordination_time,
1368 total_attempts: 0,
1369 successful_connections: 0,
1370 direct_connections: 0,
1371 relayed_connections: 0,
1372 })
1373 }
1374
1375 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1377 let mut bootstrap_nodes = self
1378 .bootstrap_nodes
1379 .write()
1380 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1381
1382 if !bootstrap_nodes.iter().any(|b| b.address == address) {
1384 bootstrap_nodes.push(BootstrapNode {
1385 address,
1386 last_seen: std::time::Instant::now(),
1387 can_coordinate: true,
1388 rtt: None,
1389 coordination_count: 0,
1390 });
1391 info!("Added bootstrap node: {}", address);
1392 }
1393 Ok(())
1394 }
1395
1396 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1398 let mut bootstrap_nodes = self
1399 .bootstrap_nodes
1400 .write()
1401 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1402 bootstrap_nodes.retain(|b| b.address != address);
1403 info!("Removed bootstrap node: {}", address);
1404 Ok(())
1405 }
1406
1407 async fn create_inner_endpoint(
1413 config: &NatTraversalConfig,
1414 ) -> Result<
1415 (
1416 InnerEndpoint,
1417 mpsc::UnboundedSender<NatTraversalEvent>,
1418 mpsc::UnboundedReceiver<NatTraversalEvent>,
1419 SocketAddr,
1420 ),
1421 NatTraversalError,
1422 > {
1423 use std::sync::Arc;
1424
1425 let server_config = {
1427 info!("Creating server config using Raw Public Keys (RFC 7250) for symmetric P2P node");
1428
1429 let server_key = if let Some(ref key) = config.identity_key {
1433 debug!("Using provided identity key for TLS authentication");
1434 key.clone()
1435 } else {
1436 debug!(
1437 "No identity key provided - generating new keypair (identity mismatch warning)"
1438 );
1439 let (key, _public_key) =
1440 crate::crypto::raw_public_keys::key_utils::generate_ed25519_keypair();
1441 key
1442 };
1443
1444 let mut rpk_builder = RawPublicKeyConfigBuilder::new()
1446 .with_server_key(server_key)
1447 .allow_any_key(); if let Some(ref pqc) = config.pqc {
1450 rpk_builder = rpk_builder.with_pqc(pqc.clone());
1451 }
1452
1453 let rpk_config = rpk_builder.build_rfc7250_server_config().map_err(|e| {
1454 NatTraversalError::ConfigError(format!("RPK server config failed: {e}"))
1455 })?;
1456
1457 let server_crypto = QuicServerConfig::try_from(rpk_config.inner().as_ref().clone())
1458 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1459
1460 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
1461
1462 let mut transport_config = TransportConfig::default();
1464 transport_config.enable_address_discovery(true);
1465 transport_config
1466 .keep_alive_interval(Some(config.timeouts.nat_traversal.retry_interval));
1467 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1468
1469 let nat_config = crate::transport_parameters::NatTraversalConfig::ServerSupport {
1472 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1473 };
1474 transport_config.nat_traversal_config(Some(nat_config));
1475
1476 server_config.transport_config(Arc::new(transport_config));
1477
1478 Some(server_config)
1479 };
1480
1481 let client_config = {
1483 info!("Creating client config using Raw Public Keys (RFC 7250)");
1484
1485 let client_key = if let Some(ref key) = config.identity_key {
1488 debug!("Using provided identity key for client TLS authentication");
1489 key.clone()
1490 } else {
1491 debug!("No identity key provided for client - generating new keypair");
1492 let (key, _public_key) =
1493 crate::crypto::raw_public_keys::key_utils::generate_ed25519_keypair();
1494 key
1495 };
1496
1497 let mut rpk_builder = RawPublicKeyConfigBuilder::new()
1500 .with_client_key(client_key) .allow_any_key(); if let Some(ref pqc) = config.pqc {
1504 rpk_builder = rpk_builder.with_pqc(pqc.clone());
1505 }
1506
1507 let rpk_config = rpk_builder.build_rfc7250_client_config().map_err(|e| {
1508 NatTraversalError::ConfigError(format!("RPK client config failed: {e}"))
1509 })?;
1510
1511 let client_crypto = QuicClientConfig::try_from(rpk_config.inner().as_ref().clone())
1512 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1513
1514 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
1515
1516 let mut transport_config = TransportConfig::default();
1518 transport_config.enable_address_discovery(true);
1519 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
1520 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1521
1522 let nat_config = crate::transport_parameters::NatTraversalConfig::ServerSupport {
1525 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1526 };
1527 transport_config.nat_traversal_config(Some(nat_config));
1528
1529 client_config.transport_config(Arc::new(transport_config));
1530
1531 client_config
1532 };
1533
1534 let bind_addr = config
1536 .bind_addr
1537 .unwrap_or_else(create_random_port_bind_addr);
1538 let socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1539 NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {e}"))
1540 })?;
1541
1542 info!("Binding endpoint to {}", bind_addr);
1543
1544 let std_socket = socket.into_std().map_err(|e| {
1546 NatTraversalError::NetworkError(format!("Failed to convert socket: {e}"))
1547 })?;
1548
1549 let runtime = default_runtime().ok_or_else(|| {
1551 NatTraversalError::ConfigError("No compatible async runtime found".to_string())
1552 })?;
1553
1554 let mut endpoint = InnerEndpoint::new(
1555 EndpointConfig::default(),
1556 server_config,
1557 std_socket,
1558 runtime,
1559 )
1560 .map_err(|e| {
1561 NatTraversalError::ConfigError(format!("Failed to create QUIC endpoint: {e}"))
1562 })?;
1563
1564 endpoint.set_default_client_config(client_config);
1566
1567 let local_addr = endpoint.local_addr().map_err(|e| {
1569 NatTraversalError::NetworkError(format!("Failed to get local address: {e}"))
1570 })?;
1571
1572 info!("Endpoint bound to actual address: {}", local_addr);
1573
1574 let (event_tx, event_rx) = mpsc::unbounded_channel();
1576
1577 Ok((endpoint, event_tx, event_rx, local_addr))
1578 }
1579
1580 #[allow(clippy::panic)]
1582 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1583 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
1584 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
1585 })?;
1586
1587 let _socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1589 NatTraversalError::NetworkError(format!("Failed to bind to {bind_addr}: {e}"))
1590 })?;
1591
1592 info!("Started listening on {}", bind_addr);
1593
1594 let endpoint_clone = endpoint.clone();
1596 let shutdown_clone = self.shutdown.clone();
1597 let event_tx = self
1598 .event_tx
1599 .as_ref()
1600 .unwrap_or_else(|| panic!("event transmitter should be initialized"))
1601 .clone();
1602 let connections_clone = self.connections.clone();
1603 let emitted_events_clone = self.emitted_established_events.clone();
1604
1605 tokio::spawn(async move {
1606 Self::accept_connections(
1607 endpoint_clone,
1608 shutdown_clone,
1609 event_tx,
1610 connections_clone,
1611 emitted_events_clone,
1612 )
1613 .await;
1614 });
1615
1616 Ok(())
1617 }
1618
1619 async fn accept_connections(
1621 endpoint: InnerEndpoint,
1622 shutdown: Arc<AtomicBool>,
1623 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1624 connections: Arc<std::sync::RwLock<HashMap<PeerId, InnerConnection>>>,
1625 emitted_events: Arc<std::sync::RwLock<std::collections::HashSet<PeerId>>>,
1626 ) {
1627 while !shutdown.load(Ordering::Relaxed) {
1628 match endpoint.accept().await {
1629 Some(connecting) => {
1630 let event_tx = event_tx.clone();
1631 let connections = connections.clone();
1632 let emitted_events = emitted_events.clone();
1633 tokio::spawn(async move {
1634 match connecting.await {
1635 Ok(connection) => {
1636 info!("Accepted connection from {}", connection.remote_address());
1637
1638 let peer_id = Self::derive_peer_id_from_connection(&connection)
1640 .unwrap_or_else(|| {
1641 Self::generate_peer_id_from_address(
1642 connection.remote_address(),
1643 )
1644 });
1645
1646 if let Ok(mut conns) = connections.write() {
1648 conns.insert(peer_id, connection.clone());
1649 }
1650
1651 let should_emit = if let Ok(mut emitted) = emitted_events.write() {
1653 emitted.insert(peer_id) } else {
1655 true };
1657
1658 if should_emit {
1659 let _ =
1660 event_tx.send(NatTraversalEvent::ConnectionEstablished {
1661 peer_id,
1662 remote_address: connection.remote_address(),
1663 });
1664 }
1665
1666 Self::handle_connection(peer_id, connection, event_tx).await;
1668 }
1669 Err(e) => {
1670 debug!("Connection failed: {}", e);
1671 }
1672 }
1673 });
1674 }
1675 None => {
1676 break;
1678 }
1679 }
1680 }
1681 }
1682
1683 async fn poll_discovery(
1685 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1686 shutdown: Arc<AtomicBool>,
1687 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1688 connections: Arc<std::sync::RwLock<HashMap<PeerId, InnerConnection>>>,
1689 ) {
1690 use tokio::time::{Duration, interval};
1691
1692 let mut poll_interval = interval(Duration::from_millis(100));
1693 let mut emitted_discovery = std::collections::HashSet::new();
1694
1695 while !shutdown.load(Ordering::Relaxed) {
1696 poll_interval.tick().await;
1697
1698 if let Ok(conns) = connections.read() {
1700 if !conns.is_empty() {
1701 debug!("Polling {} connections for observed addresses", conns.len());
1702 }
1703 for (peer_id, conn) in conns.iter() {
1704 if let Some(observed_addr) = conn.observed_address() {
1705 debug!(
1706 "Found observed address {} for peer {:?}",
1707 observed_addr, peer_id
1708 );
1709
1710 if emitted_discovery.insert((*peer_id, observed_addr)) {
1712 debug!("Emitting ExternalAddressDiscovered for peer {:?}", peer_id);
1713 let _ = event_tx.send(NatTraversalEvent::ExternalAddressDiscovered {
1714 reported_by: conn.remote_address(),
1715 address: observed_addr,
1716 });
1717 }
1718
1719 if let Ok(mut discovery) = discovery_manager.lock() {
1721 let _ =
1722 discovery.accept_quic_discovered_address(*peer_id, observed_addr);
1723 }
1724 }
1725 }
1726 }
1727
1728 let events = match discovery_manager.lock() {
1730 Ok(mut discovery) => discovery.poll(std::time::Instant::now()),
1731 Err(e) => {
1732 error!("Failed to lock discovery manager: {}", e);
1733 continue;
1734 }
1735 };
1736
1737 for event in events {
1739 match event {
1740 DiscoveryEvent::DiscoveryStarted {
1741 peer_id,
1742 bootstrap_count,
1743 } => {
1744 debug!(
1745 "Discovery started for peer {:?} with {} bootstrap nodes",
1746 peer_id, bootstrap_count
1747 );
1748 }
1749 DiscoveryEvent::LocalScanningStarted => {
1750 debug!("Local interface scanning started");
1751 }
1752 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1753 debug!("Discovered local candidate: {}", candidate.address);
1754 }
1757 DiscoveryEvent::LocalScanningCompleted {
1758 candidate_count,
1759 duration,
1760 } => {
1761 debug!(
1762 "Local interface scanning completed: {} candidates in {:?}",
1763 candidate_count, duration
1764 );
1765 }
1766 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1767 debug!(
1768 "Server reflexive discovery started with {} bootstrap nodes",
1769 bootstrap_count
1770 );
1771 }
1772 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1773 candidate,
1774 bootstrap_node,
1775 } => {
1776 debug!(
1777 "Discovered server-reflexive candidate {} via bootstrap {}",
1778 candidate.address, bootstrap_node
1779 );
1780
1781 let _ = event_tx.send(NatTraversalEvent::ExternalAddressDiscovered {
1783 reported_by: bootstrap_node,
1784 address: candidate.address,
1785 });
1786 }
1787 DiscoveryEvent::BootstrapQueryFailed {
1788 bootstrap_node,
1789 error,
1790 } => {
1791 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1792 }
1793 DiscoveryEvent::PortAllocationDetected {
1795 port,
1796 source_address,
1797 bootstrap_node,
1798 timestamp,
1799 } => {
1800 debug!(
1801 "Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1802 port, source_address, bootstrap_node, timestamp
1803 );
1804 }
1805 DiscoveryEvent::DiscoveryCompleted {
1806 candidate_count,
1807 total_duration,
1808 success_rate,
1809 } => {
1810 info!(
1811 "Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1812 candidate_count,
1813 total_duration,
1814 success_rate * 100.0
1815 );
1816 }
1819 DiscoveryEvent::DiscoveryFailed {
1820 error,
1821 partial_results,
1822 } => {
1823 warn!(
1824 "Discovery failed: {} (found {} partial candidates)",
1825 error,
1826 partial_results.len()
1827 );
1828
1829 }
1834 DiscoveryEvent::PathValidationRequested {
1835 candidate_id,
1836 candidate_address,
1837 challenge_token,
1838 } => {
1839 debug!(
1840 "PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1841 candidate_id.0, candidate_address, challenge_token
1842 );
1843 }
1846 DiscoveryEvent::PathValidationResponse {
1847 candidate_id,
1848 candidate_address,
1849 challenge_token: _,
1850 rtt,
1851 } => {
1852 debug!(
1853 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1854 candidate_id.0, candidate_address, rtt
1855 );
1856 }
1858 }
1859 }
1860 }
1861
1862 info!("Discovery polling task shutting down");
1863 }
1864
1865 async fn handle_connection(
1867 peer_id: PeerId,
1868 connection: InnerConnection,
1869 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1870 ) {
1871 let remote_address = connection.remote_address();
1872 let closed = connection.closed();
1873 tokio::pin!(closed);
1874
1875 debug!(
1876 "Handling connection from peer {:?} at {}",
1877 peer_id, remote_address
1878 );
1879
1880 closed.await;
1884
1885 let reason = connection
1886 .close_reason()
1887 .map(|reason| format!("Connection closed: {reason}"))
1888 .unwrap_or_else(|| "Connection closed".to_string());
1889 let _ = event_tx.send(NatTraversalEvent::ConnectionLost { peer_id, reason });
1890 }
1891
1892 async fn handle_bi_stream(
1894 _send: crate::high_level::SendStream,
1895 _recv: crate::high_level::RecvStream,
1896 ) {
1897 }
1926
1927 async fn handle_uni_stream(mut recv: crate::high_level::RecvStream) {
1929 let mut buffer = vec![0u8; 1024];
1930
1931 loop {
1932 match recv.read(&mut buffer).await {
1933 Ok(Some(size)) => {
1934 debug!("Received {} bytes on unidirectional stream", size);
1935 }
1937 Ok(None) => {
1938 debug!("Unidirectional stream closed by peer");
1939 break;
1940 }
1941 Err(e) => {
1942 debug!("Error reading from unidirectional stream: {}", e);
1943 break;
1944 }
1945 }
1946 }
1947 }
1948
1949 pub async fn connect_to_peer(
1951 &self,
1952 peer_id: PeerId,
1953 server_name: &str,
1954 remote_addr: SocketAddr,
1955 ) -> Result<InnerConnection, NatTraversalError> {
1956 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
1957 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
1958 })?;
1959
1960 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1961
1962 let connecting = endpoint.connect(remote_addr, server_name).map_err(|e| {
1964 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
1965 })?;
1966
1967 let connection = timeout(
1968 self.timeout_config
1969 .nat_traversal
1970 .connection_establishment_timeout,
1971 connecting,
1972 )
1973 .await
1974 .map_err(|_| NatTraversalError::Timeout)?
1975 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
1976
1977 info!(
1978 "Successfully connected to peer {:?} at {}",
1979 peer_id, remote_addr
1980 );
1981
1982 if let Some(ref event_tx) = self.event_tx {
1984 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1985 peer_id,
1986 remote_address: remote_addr,
1987 });
1988 }
1989
1990 Ok(connection)
1991 }
1992
1993 pub async fn accept_connection(&self) -> Result<(PeerId, InnerConnection), NatTraversalError> {
1995 debug!("Waiting for incoming connection via event channel...");
1996
1997 let timeout_duration = self
1998 .timeout_config
1999 .nat_traversal
2000 .connection_establishment_timeout;
2001 let start = std::time::Instant::now();
2002
2003 loop {
2004 if self.shutdown.load(Ordering::Relaxed) {
2006 return Err(NatTraversalError::NetworkError(
2007 "Endpoint shutting down".to_string(),
2008 ));
2009 }
2010
2011 if start.elapsed() > timeout_duration {
2013 warn!("accept_connection() timed out after {:?}", timeout_duration);
2014 return Err(NatTraversalError::Timeout);
2015 }
2016
2017 {
2019 let mut event_rx = self.event_rx.lock().map_err(|_| {
2020 NatTraversalError::ProtocolError("Event channel lock poisoned".to_string())
2021 })?;
2022
2023 match event_rx.try_recv() {
2024 Ok(NatTraversalEvent::ConnectionEstablished {
2025 peer_id,
2026 remote_address,
2027 }) => {
2028 info!(
2029 "Received ConnectionEstablished event for peer {:?} at {}",
2030 peer_id, remote_address
2031 );
2032
2033 let connection = {
2036 let connections = self.connections.read().map_err(|_| {
2037 NatTraversalError::ProtocolError(
2038 "Connections lock poisoned".to_string(),
2039 )
2040 })?;
2041 connections.get(&peer_id).cloned().ok_or_else(|| {
2042 NatTraversalError::ConnectionFailed(format!(
2043 "Connection for peer {:?} not found in storage",
2044 peer_id
2045 ))
2046 })?
2047 };
2048
2049 info!(
2050 "Retrieved accepted connection from peer {:?} at {}",
2051 peer_id, remote_address
2052 );
2053 return Ok((peer_id, connection));
2054 }
2055 Ok(event) => {
2056 debug!(
2058 "Ignoring non-connection event while waiting for accept: {:?}",
2059 event
2060 );
2061 }
2062 Err(mpsc::error::TryRecvError::Empty) => {
2063 }
2065 Err(mpsc::error::TryRecvError::Disconnected) => {
2066 return Err(NatTraversalError::NetworkError(
2067 "Event channel closed".to_string(),
2068 ));
2069 }
2070 }
2071 } tokio::time::sleep(Duration::from_millis(10)).await;
2075 }
2076 }
2077
2078 pub fn local_peer_id(&self) -> PeerId {
2080 self.local_peer_id
2081 }
2082
2083 pub fn get_connection(
2085 &self,
2086 peer_id: &PeerId,
2087 ) -> Result<Option<InnerConnection>, NatTraversalError> {
2088 let connections = self.connections.read().map_err(|_| {
2089 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2090 })?;
2091 Ok(connections.get(peer_id).cloned())
2092 }
2093
2094 pub fn add_connection(
2096 &self,
2097 peer_id: PeerId,
2098 connection: InnerConnection,
2099 ) -> Result<(), NatTraversalError> {
2100 let mut connections = self.connections.write().map_err(|_| {
2101 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2102 })?;
2103 connections.insert(peer_id, connection);
2104 Ok(())
2105 }
2106
2107 pub fn spawn_connection_handler(
2109 &self,
2110 peer_id: PeerId,
2111 connection: InnerConnection,
2112 ) -> Result<(), NatTraversalError> {
2113 let event_tx = self.event_tx.as_ref().cloned().ok_or_else(|| {
2114 NatTraversalError::ConfigError("NAT traversal event channel not configured".to_string())
2115 })?;
2116
2117 let remote_address = connection.remote_address();
2118
2119 let should_emit = if let Ok(mut emitted) = self.emitted_established_events.write() {
2121 emitted.insert(peer_id) } else {
2123 true };
2125
2126 if should_emit {
2127 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
2128 peer_id,
2129 remote_address,
2130 });
2131 }
2132
2133 tokio::spawn(async move {
2135 Self::handle_connection(peer_id, connection, event_tx).await;
2136 });
2137
2138 Ok(())
2139 }
2140
2141 pub fn remove_connection(
2143 &self,
2144 peer_id: &PeerId,
2145 ) -> Result<Option<InnerConnection>, NatTraversalError> {
2146 if let Ok(mut emitted) = self.emitted_established_events.write() {
2148 emitted.remove(peer_id);
2149 }
2150
2151 let mut connections = self.connections.write().map_err(|_| {
2152 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2153 })?;
2154 Ok(connections.remove(peer_id))
2155 }
2156
2157 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
2159 let connections = self.connections.read().map_err(|_| {
2160 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2161 })?;
2162 let mut result = Vec::new();
2163 for (peer_id, connection) in connections.iter() {
2164 result.push((*peer_id, connection.remote_address()));
2165 }
2166 Ok(result)
2167 }
2168
2169 pub fn get_observed_external_address(&self) -> Result<Option<SocketAddr>, NatTraversalError> {
2181 let connections = self.connections.read().map_err(|_| {
2182 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2183 })?;
2184
2185 let known_peer_addrs: std::collections::HashSet<_> =
2188 self.config.known_peers.iter().copied().collect();
2189
2190 for (_peer_id, connection) in connections.iter() {
2192 if known_peer_addrs.contains(&connection.remote_address()) {
2193 if let Some(addr) = connection.observed_address() {
2194 debug!(
2195 "Found observed external address {} from known peer connection",
2196 addr
2197 );
2198 return Ok(Some(addr));
2199 }
2200 }
2201 }
2202
2203 for (_peer_id, connection) in connections.iter() {
2205 if let Some(addr) = connection.observed_address() {
2206 debug!(
2207 "Found observed external address {} from peer connection",
2208 addr
2209 );
2210 return Ok(Some(addr));
2211 }
2212 }
2213
2214 debug!("No observed external address available from any connection");
2215 Ok(None)
2216 }
2217
2218 pub async fn handle_connection_data(
2220 &self,
2221 peer_id: PeerId,
2222 connection: &InnerConnection,
2223 ) -> Result<(), NatTraversalError> {
2224 info!("Handling connection data from peer {:?}", peer_id);
2225
2226 let connection_clone = connection.clone();
2228 let peer_id_clone = peer_id;
2229 tokio::spawn(async move {
2230 loop {
2231 match connection_clone.accept_bi().await {
2232 Ok((send, recv)) => {
2233 debug!(
2234 "Accepted bidirectional stream from peer {:?}",
2235 peer_id_clone
2236 );
2237 tokio::spawn(Self::handle_bi_stream(send, recv));
2238 }
2239 Err(ConnectionError::ApplicationClosed(_)) => {
2240 debug!("Connection closed by peer {:?}", peer_id_clone);
2241 break;
2242 }
2243 Err(e) => {
2244 debug!(
2245 "Error accepting bidirectional stream from peer {:?}: {}",
2246 peer_id_clone, e
2247 );
2248 break;
2249 }
2250 }
2251 }
2252 });
2253
2254 let connection_clone = connection.clone();
2256 let peer_id_clone = peer_id;
2257 tokio::spawn(async move {
2258 loop {
2259 match connection_clone.accept_uni().await {
2260 Ok(recv) => {
2261 debug!(
2262 "Accepted unidirectional stream from peer {:?}",
2263 peer_id_clone
2264 );
2265 tokio::spawn(Self::handle_uni_stream(recv));
2266 }
2267 Err(ConnectionError::ApplicationClosed(_)) => {
2268 debug!("Connection closed by peer {:?}", peer_id_clone);
2269 break;
2270 }
2271 Err(e) => {
2272 debug!(
2273 "Error accepting unidirectional stream from peer {:?}: {}",
2274 peer_id_clone, e
2275 );
2276 break;
2277 }
2278 }
2279 }
2280 });
2281
2282 Ok(())
2283 }
2284
2285 fn generate_local_peer_id() -> PeerId {
2287 use std::collections::hash_map::DefaultHasher;
2288 use std::hash::{Hash, Hasher};
2289 use std::time::SystemTime;
2290
2291 let mut hasher = DefaultHasher::new();
2292 SystemTime::now().hash(&mut hasher);
2293 std::process::id().hash(&mut hasher);
2294
2295 let hash = hasher.finish();
2296 let mut peer_id = [0u8; 32];
2297 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2298
2299 for i in 8..32 {
2301 peer_id[i] = rand::random();
2302 }
2303
2304 PeerId(peer_id)
2305 }
2306
2307 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
2313 use std::collections::hash_map::DefaultHasher;
2314 use std::hash::{Hash, Hasher};
2315
2316 let mut hasher = DefaultHasher::new();
2317 addr.hash(&mut hasher);
2318
2319 let hash = hasher.finish();
2320 let mut peer_id = [0u8; 32];
2321 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2322
2323 for i in 8..32 {
2326 peer_id[i] = rand::random();
2327 }
2328
2329 warn!(
2330 "Generated temporary peer ID from address {}. This ID is not persistent!",
2331 addr
2332 );
2333 PeerId(peer_id)
2334 }
2335
2336 fn derive_peer_id_from_connection(connection: &InnerConnection) -> Option<PeerId> {
2342 if let Some(identity) = connection.peer_identity() {
2343 if let Some(certs) =
2345 identity.downcast_ref::<Vec<rustls::pki_types::CertificateDer<'static>>>()
2346 {
2347 if let Some(cert) = certs.first() {
2348 let spki = cert.as_ref();
2350 if let Some(public_key) = extract_ed25519_from_spki(spki) {
2351 match crate::derive_peer_id_from_key_bytes(&public_key) {
2352 Ok(peer_id) => {
2353 debug!("Derived peer ID from Ed25519 public key in SPKI");
2354 return Some(peer_id);
2355 }
2356 Err(e) => {
2357 warn!("Failed to derive peer ID from public key: {}", e);
2358 }
2359 }
2360 } else {
2361 debug!(
2362 "Certificate is not Ed25519 SPKI format (len={})",
2363 spki.len()
2364 );
2365 }
2366 }
2367 } else {
2368 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
2370 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
2371 Ok(peer_id) => {
2372 debug!("Derived peer ID from raw Ed25519 public key");
2373 return Some(peer_id);
2374 }
2375 Err(e) => {
2376 warn!("Failed to derive peer ID from public key: {}", e);
2377 }
2378 }
2379 }
2380 }
2381 }
2382
2383 None
2384 }
2385
2386 pub async fn extract_peer_id_from_connection(
2392 &self,
2393 connection: &InnerConnection,
2394 ) -> Option<PeerId> {
2395 Self::derive_peer_id_from_connection(connection)
2397 }
2398
2399 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
2401 self.shutdown.store(true, Ordering::Relaxed);
2403
2404 {
2406 let mut connections = self.connections.write().map_err(|_| {
2407 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2408 })?;
2409 for (peer_id, connection) in connections.drain() {
2410 info!("Closing connection to peer {:?}", peer_id);
2411 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
2412 }
2413 }
2414
2415 if let Some(ref endpoint) = self.inner_endpoint {
2417 endpoint.wait_idle().await;
2418 }
2419
2420 info!("NAT traversal endpoint shutdown completed");
2421 Ok(())
2422 }
2423
2424 pub async fn discover_candidates(
2426 &self,
2427 peer_id: PeerId,
2428 ) -> Result<Vec<CandidateAddress>, NatTraversalError> {
2429 debug!("Discovering address candidates for peer {:?}", peer_id);
2430
2431 let mut candidates = Vec::new();
2432
2433 let bootstrap_nodes = {
2435 let nodes = self
2436 .bootstrap_nodes
2437 .read()
2438 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2439 nodes.clone()
2440 };
2441
2442 {
2444 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2445 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2446 })?;
2447
2448 discovery
2449 .start_discovery(peer_id, bootstrap_nodes)
2450 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
2451 }
2452
2453 let timeout_duration = self.config.coordination_timeout;
2455 let start_time = std::time::Instant::now();
2456
2457 while start_time.elapsed() < timeout_duration {
2458 let discovery_events = {
2459 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2460 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2461 })?;
2462 discovery.poll(std::time::Instant::now())
2463 };
2464
2465 for event in discovery_events {
2466 match event {
2467 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2468 candidates.push(candidate.clone());
2469
2470 self.send_candidate_advertisement(peer_id, &candidate)
2472 .await
2473 .unwrap_or_else(|e| {
2474 debug!("Failed to send candidate advertisement: {}", e)
2475 });
2476 }
2477 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
2478 candidates.push(candidate.clone());
2479
2480 self.send_candidate_advertisement(peer_id, &candidate)
2482 .await
2483 .unwrap_or_else(|e| {
2484 debug!("Failed to send candidate advertisement: {}", e)
2485 });
2486 }
2487 DiscoveryEvent::DiscoveryCompleted { .. } => {
2489 return Ok(candidates);
2491 }
2492 DiscoveryEvent::DiscoveryFailed {
2493 error,
2494 partial_results,
2495 } => {
2496 candidates.extend(partial_results);
2498 if candidates.is_empty() {
2499 return Err(NatTraversalError::CandidateDiscoveryFailed(
2500 error.to_string(),
2501 ));
2502 }
2503 return Ok(candidates);
2504 }
2505 _ => {}
2506 }
2507 }
2508
2509 sleep(Duration::from_millis(10)).await;
2511 }
2512
2513 if candidates.is_empty() {
2514 Err(NatTraversalError::NoCandidatesFound)
2515 } else {
2516 Ok(candidates)
2517 }
2518 }
2519
2520 #[allow(dead_code)]
2522 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
2523 let mut frame = Vec::new();
2531
2532 frame.push(0x41);
2534
2535 frame.extend_from_slice(&peer_id.0);
2537
2538 let timestamp = std::time::SystemTime::now()
2540 .duration_since(std::time::UNIX_EPOCH)
2541 .unwrap_or_default()
2542 .as_millis() as u64;
2543 frame.extend_from_slice(×tamp.to_be_bytes());
2544
2545 let mut token = [0u8; 16];
2547 for byte in &mut token {
2548 *byte = rand::random();
2549 }
2550 frame.extend_from_slice(&token);
2551
2552 Ok(frame)
2553 }
2554
2555 #[allow(dead_code)]
2556 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
2557 debug!("Attempting hole punching for peer {:?}", peer_id);
2558
2559 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
2561
2562 if candidate_pairs.is_empty() {
2563 return Err(NatTraversalError::NoCandidatesFound);
2564 }
2565
2566 info!(
2567 "Generated {} candidate pairs for hole punching with peer {:?}",
2568 candidate_pairs.len(),
2569 peer_id
2570 );
2571
2572 self.attempt_quic_hole_punching(peer_id, candidate_pairs)
2575 }
2576
2577 #[allow(dead_code)]
2579 fn get_candidate_pairs_for_peer(
2580 &self,
2581 peer_id: PeerId,
2582 ) -> Result<Vec<CandidatePair>, NatTraversalError> {
2583 let discovery_candidates = {
2585 let discovery = self.discovery_manager.lock().map_err(|_| {
2586 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2587 })?;
2588
2589 discovery.get_candidates_for_peer(peer_id)
2590 };
2591
2592 if discovery_candidates.is_empty() {
2593 return Err(NatTraversalError::NoCandidatesFound);
2594 }
2595
2596 let mut candidate_pairs = Vec::new();
2598 let local_candidates = discovery_candidates
2599 .iter()
2600 .filter(|c| matches!(c.source, CandidateSource::Local))
2601 .collect::<Vec<_>>();
2602 let remote_candidates = discovery_candidates
2603 .iter()
2604 .filter(|c| !matches!(c.source, CandidateSource::Local))
2605 .collect::<Vec<_>>();
2606
2607 for local in &local_candidates {
2609 for remote in &remote_candidates {
2610 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
2611 candidate_pairs.push(CandidatePair {
2612 local_candidate: (*local).clone(),
2613 remote_candidate: (*remote).clone(),
2614 priority: pair_priority,
2615 state: CandidatePairState::Waiting,
2616 });
2617 }
2618 }
2619
2620 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
2622
2623 candidate_pairs.truncate(8);
2625
2626 Ok(candidate_pairs)
2627 }
2628
2629 #[allow(dead_code)]
2631 fn calculate_candidate_pair_priority(
2632 &self,
2633 local: &CandidateAddress,
2634 remote: &CandidateAddress,
2635 ) -> u64 {
2636 let local_type_preference = match local.source {
2640 CandidateSource::Local => 126,
2641 CandidateSource::Observed { .. } => 100,
2642 CandidateSource::Predicted => 75,
2643 CandidateSource::Peer => 50,
2644 };
2645
2646 let remote_type_preference = match remote.source {
2647 CandidateSource::Local => 126,
2648 CandidateSource::Observed { .. } => 100,
2649 CandidateSource::Predicted => 75,
2650 CandidateSource::Peer => 50,
2651 };
2652
2653 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
2655 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
2656
2657 let min_priority = local_priority.min(remote_priority);
2658 let max_priority = local_priority.max(remote_priority);
2659
2660 (min_priority << 32)
2661 | (max_priority << 1)
2662 | if local_priority > remote_priority {
2663 1
2664 } else {
2665 0
2666 }
2667 }
2668
2669 #[allow(dead_code)]
2671 fn attempt_quic_hole_punching(
2672 &self,
2673 peer_id: PeerId,
2674 candidate_pairs: Vec<CandidatePair>,
2675 ) -> Result<(), NatTraversalError> {
2676 let _endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
2677 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
2678 })?;
2679
2680 for pair in candidate_pairs {
2681 debug!(
2682 "Attempting hole punch with candidate pair: {} -> {}",
2683 pair.local_candidate.address, pair.remote_candidate.address
2684 );
2685
2686 let mut challenge_data = [0u8; 8];
2688 for byte in &mut challenge_data {
2689 *byte = rand::random();
2690 }
2691
2692 let local_socket =
2694 std::net::UdpSocket::bind(pair.local_candidate.address).map_err(|e| {
2695 NatTraversalError::NetworkError(format!(
2696 "Failed to bind to local candidate: {e}"
2697 ))
2698 })?;
2699
2700 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
2702
2703 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
2705 Ok(bytes_sent) => {
2706 debug!(
2707 "Sent {} bytes for hole punch from {} to {}",
2708 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address
2709 );
2710
2711 local_socket
2713 .set_read_timeout(Some(Duration::from_millis(100)))
2714 .map_err(|e| {
2715 NatTraversalError::NetworkError(format!("Failed to set timeout: {e}"))
2716 })?;
2717
2718 let mut response_buffer = [0u8; 1024];
2720 match local_socket.recv_from(&mut response_buffer) {
2721 Ok((_bytes_received, response_addr)) => {
2722 if response_addr == pair.remote_candidate.address {
2723 info!(
2724 "Hole punch succeeded for peer {:?}: {} <-> {}",
2725 peer_id,
2726 pair.local_candidate.address,
2727 pair.remote_candidate.address
2728 );
2729
2730 self.store_successful_candidate_pair(peer_id, pair)?;
2732 return Ok(());
2733 } else {
2734 debug!(
2735 "Received response from unexpected address: {}",
2736 response_addr
2737 );
2738 }
2739 }
2740 Err(e)
2741 if e.kind() == std::io::ErrorKind::WouldBlock
2742 || e.kind() == std::io::ErrorKind::TimedOut =>
2743 {
2744 debug!("No response received for hole punch attempt");
2745 }
2746 Err(e) => {
2747 debug!("Error receiving hole punch response: {}", e);
2748 }
2749 }
2750 }
2751 Err(e) => {
2752 debug!("Failed to send hole punch packet: {}", e);
2753 }
2754 }
2755 }
2756
2757 Err(NatTraversalError::HolePunchingFailed)
2759 }
2760
2761 fn create_path_challenge_packet(
2763 &self,
2764 challenge_data: [u8; 8],
2765 ) -> Result<Vec<u8>, NatTraversalError> {
2766 let mut packet = Vec::new();
2769
2770 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
2779 }
2780
2781 fn store_successful_candidate_pair(
2783 &self,
2784 peer_id: PeerId,
2785 pair: CandidatePair,
2786 ) -> Result<(), NatTraversalError> {
2787 debug!(
2788 "Storing successful candidate pair for peer {:?}: {} <-> {}",
2789 peer_id, pair.local_candidate.address, pair.remote_candidate.address
2790 );
2791
2792 if let Some(ref callback) = self.event_callback {
2797 callback(NatTraversalEvent::PathValidated {
2798 peer_id,
2799 address: pair.remote_candidate.address,
2800 rtt: Duration::from_millis(50), });
2802
2803 callback(NatTraversalEvent::TraversalSucceeded {
2804 peer_id,
2805 final_address: pair.remote_candidate.address,
2806 total_time: Duration::from_secs(1), });
2808 }
2809
2810 Ok(())
2811 }
2812
2813 fn attempt_connection_to_candidate(
2815 &self,
2816 peer_id: PeerId,
2817 candidate: &CandidateAddress,
2818 ) -> Result<(), NatTraversalError> {
2819 {
2820 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
2821 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
2822 })?;
2823
2824 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
2826
2827 debug!(
2828 "Attempting QUIC connection to candidate {} for peer {:?}",
2829 candidate.address, peer_id
2830 );
2831
2832 match endpoint.connect(candidate.address, &server_name) {
2834 Ok(connecting) => {
2835 info!(
2836 "Connection attempt initiated to {} for peer {:?}",
2837 candidate.address, peer_id
2838 );
2839
2840 if let Some(event_tx) = &self.event_tx {
2842 let event_tx = event_tx.clone();
2843 let connections = self.connections.clone();
2844 let peer_id_clone = peer_id;
2845 let address = candidate.address;
2846
2847 tokio::spawn(async move {
2848 match connecting.await {
2849 Ok(connection) => {
2850 info!(
2851 "Successfully connected to {} for peer {:?}",
2852 address, peer_id_clone
2853 );
2854
2855 if let Ok(mut conns) = connections.write() {
2857 conns.insert(peer_id_clone, connection.clone());
2858 }
2859
2860 let _ =
2862 event_tx.send(NatTraversalEvent::ConnectionEstablished {
2863 peer_id: peer_id_clone,
2864 remote_address: address,
2865 });
2866
2867 Self::handle_connection(peer_id_clone, connection, event_tx)
2869 .await;
2870 }
2871 Err(e) => {
2872 warn!("Connection to {} failed: {}", address, e);
2873 }
2874 }
2875 });
2876 }
2877
2878 Ok(())
2879 }
2880 Err(e) => {
2881 warn!(
2882 "Failed to initiate connection to {}: {}",
2883 candidate.address, e
2884 );
2885 Err(NatTraversalError::ConnectionFailed(format!(
2886 "Failed to connect to {}: {}",
2887 candidate.address, e
2888 )))
2889 }
2890 }
2891 }
2892 }
2893
2894 pub fn poll(
2896 &self,
2897 now: std::time::Instant,
2898 ) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
2899 let mut events = Vec::new();
2900
2901 {
2903 let mut event_rx = self.event_rx.lock().map_err(|_| {
2904 NatTraversalError::ProtocolError("Event channel lock poisoned".to_string())
2905 })?;
2906
2907 loop {
2908 match event_rx.try_recv() {
2909 Ok(event) => {
2910 if let Some(ref callback) = self.event_callback {
2911 callback(event.clone());
2912 }
2913 events.push(event);
2914 }
2915 Err(TryRecvError::Empty) => break,
2916 Err(TryRecvError::Disconnected) => break,
2917 }
2918 }
2919 }
2920
2921 let mut closed_connections = Vec::new();
2923 {
2924 let connections = self.connections.read().map_err(|_| {
2925 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2926 })?;
2927
2928 for (peer_id, connection) in connections.iter() {
2929 if let Some(reason) = connection.close_reason() {
2930 closed_connections.push((*peer_id, reason.clone()));
2931 }
2932 }
2933 }
2934
2935 if !closed_connections.is_empty() {
2936 let mut connections = self.connections.write().map_err(|_| {
2937 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2938 })?;
2939
2940 for (peer_id, reason) in closed_connections {
2941 connections.remove(&peer_id);
2942 let event = NatTraversalEvent::ConnectionLost {
2943 peer_id,
2944 reason: reason.to_string(),
2945 };
2946 if let Some(ref callback) = self.event_callback {
2947 callback(event.clone());
2948 }
2949 events.push(event);
2950 }
2951 }
2952
2953 self.check_connections_for_observed_addresses(&mut events)?;
2955
2956 {
2958 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2959 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2960 })?;
2961
2962 let discovery_events = discovery.poll(now);
2963
2964 for discovery_event in discovery_events {
2966 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2967 events.push(nat_event.clone());
2968
2969 if let Some(ref callback) = self.event_callback {
2971 callback(nat_event.clone());
2972 }
2973
2974 if let NatTraversalEvent::CandidateDiscovered {
2976 peer_id: _,
2977 candidate: _,
2978 } = &nat_event
2979 {
2980 }
2983 }
2984 }
2985 }
2986
2987 let mut sessions = self
2989 .active_sessions
2990 .write()
2991 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2992
2993 for (_peer_id, session) in sessions.iter_mut() {
2994 let elapsed = now.duration_since(session.started_at);
2995
2996 let timeout = self.get_phase_timeout(session.phase);
2998
2999 if elapsed > timeout {
3001 match session.phase {
3002 TraversalPhase::Discovery => {
3003 let discovered_candidates = {
3005 let discovery = self.discovery_manager.lock().map_err(|_| {
3006 NatTraversalError::ProtocolError(
3007 "Discovery manager lock poisoned".to_string(),
3008 )
3009 });
3010 match discovery {
3011 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
3012 Err(_) => Vec::new(),
3013 }
3014 };
3015
3016 session.candidates = discovered_candidates.clone();
3018
3019 if !session.candidates.is_empty() {
3021 session.phase = TraversalPhase::Coordination;
3023 let event = NatTraversalEvent::PhaseTransition {
3024 peer_id: session.peer_id,
3025 from_phase: TraversalPhase::Discovery,
3026 to_phase: TraversalPhase::Coordination,
3027 };
3028 events.push(event.clone());
3029 if let Some(ref callback) = self.event_callback {
3030 callback(event);
3031 }
3032 info!(
3033 "Peer {:?} advanced from Discovery to Coordination with {} candidates",
3034 session.peer_id,
3035 session.candidates.len()
3036 );
3037 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
3038 session.attempt += 1;
3040 session.started_at = now;
3041 let backoff_duration = self.calculate_backoff(session.attempt);
3042 warn!(
3043 "Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
3044 session.peer_id, session.attempt, backoff_duration
3045 );
3046 } else {
3047 session.phase = TraversalPhase::Failed;
3049 let event = NatTraversalEvent::TraversalFailed {
3050 peer_id: session.peer_id,
3051 error: NatTraversalError::NoCandidatesFound,
3052 fallback_available: self.config.enable_relay_fallback,
3053 };
3054 events.push(event.clone());
3055 if let Some(ref callback) = self.event_callback {
3056 callback(event);
3057 }
3058 error!(
3059 "NAT traversal failed for peer {:?}: no candidates found after {} attempts",
3060 session.peer_id, session.attempt
3061 );
3062 }
3063 }
3064 TraversalPhase::Coordination => {
3065 if let Some(coordinator) = self.select_coordinator() {
3067 match self.send_coordination_request(session.peer_id, coordinator) {
3068 Ok(_) => {
3069 session.phase = TraversalPhase::Synchronization;
3070 let event = NatTraversalEvent::CoordinationRequested {
3071 peer_id: session.peer_id,
3072 coordinator,
3073 };
3074 events.push(event.clone());
3075 if let Some(ref callback) = self.event_callback {
3076 callback(event);
3077 }
3078 info!(
3079 "Coordination requested for peer {:?} via {}",
3080 session.peer_id, coordinator
3081 );
3082 }
3083 Err(e) => {
3084 self.handle_phase_failure(session, now, &mut events, e);
3085 }
3086 }
3087 } else {
3088 self.handle_phase_failure(
3089 session,
3090 now,
3091 &mut events,
3092 NatTraversalError::NoBootstrapNodes,
3093 );
3094 }
3095 }
3096 TraversalPhase::Synchronization => {
3097 if self.is_peer_synchronized(&session.peer_id) {
3099 session.phase = TraversalPhase::Punching;
3100 let event = NatTraversalEvent::HolePunchingStarted {
3101 peer_id: session.peer_id,
3102 targets: session.candidates.iter().map(|c| c.address).collect(),
3103 };
3104 events.push(event.clone());
3105 if let Some(ref callback) = self.event_callback {
3106 callback(event);
3107 }
3108 if let Err(e) =
3110 self.initiate_hole_punching(session.peer_id, &session.candidates)
3111 {
3112 self.handle_phase_failure(session, now, &mut events, e);
3113 }
3114 } else {
3115 self.handle_phase_failure(
3116 session,
3117 now,
3118 &mut events,
3119 NatTraversalError::ProtocolError(
3120 "Synchronization timeout".to_string(),
3121 ),
3122 );
3123 }
3124 }
3125 TraversalPhase::Punching => {
3126 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
3128 session.phase = TraversalPhase::Validation;
3129 let event = NatTraversalEvent::PathValidated {
3130 peer_id: session.peer_id,
3131 address: successful_path,
3132 rtt: Duration::from_millis(50), };
3134 events.push(event.clone());
3135 if let Some(ref callback) = self.event_callback {
3136 callback(event);
3137 }
3138 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
3140 self.handle_phase_failure(session, now, &mut events, e);
3141 }
3142 } else {
3143 self.handle_phase_failure(
3144 session,
3145 now,
3146 &mut events,
3147 NatTraversalError::PunchingFailed(
3148 "No successful punch".to_string(),
3149 ),
3150 );
3151 }
3152 }
3153 TraversalPhase::Validation => {
3154 if self.is_path_validated(&session.peer_id) {
3156 session.phase = TraversalPhase::Connected;
3157 let event = NatTraversalEvent::TraversalSucceeded {
3158 peer_id: session.peer_id,
3159 final_address: session
3160 .candidates
3161 .first()
3162 .map(|c| c.address)
3163 .unwrap_or_else(create_random_port_bind_addr),
3164 total_time: elapsed,
3165 };
3166 events.push(event.clone());
3167 if let Some(ref callback) = self.event_callback {
3168 callback(event);
3169 }
3170 info!(
3171 "NAT traversal succeeded for peer {:?} in {:?}",
3172 session.peer_id, elapsed
3173 );
3174 } else {
3175 self.handle_phase_failure(
3176 session,
3177 now,
3178 &mut events,
3179 NatTraversalError::ValidationFailed(
3180 "Path validation timeout".to_string(),
3181 ),
3182 );
3183 }
3184 }
3185 TraversalPhase::Connected => {
3186 if !self.is_connection_healthy(&session.peer_id) {
3188 warn!(
3189 "Connection to peer {:?} is no longer healthy",
3190 session.peer_id
3191 );
3192 }
3194 }
3195 TraversalPhase::Failed => {
3196 }
3198 }
3199 }
3200 }
3201
3202 Ok(events)
3203 }
3204
3205 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
3207 match phase {
3208 TraversalPhase::Discovery => Duration::from_secs(10),
3209 TraversalPhase::Coordination => self.config.coordination_timeout,
3210 TraversalPhase::Synchronization => Duration::from_secs(3),
3211 TraversalPhase::Punching => Duration::from_secs(5),
3212 TraversalPhase::Validation => Duration::from_secs(5),
3213 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
3215 }
3216 }
3217
3218 fn calculate_backoff(&self, attempt: u32) -> Duration {
3220 let base = Duration::from_millis(1000);
3221 let max = Duration::from_secs(30);
3222 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
3223 let jitter = std::time::Duration::from_millis((rand::random::<u64>() % 200) as u64);
3224 backoff.min(max) + jitter
3225 }
3226
3227 fn check_connections_for_observed_addresses(
3229 &self,
3230 _events: &mut Vec<NatTraversalEvent>,
3231 ) -> Result<(), NatTraversalError> {
3232 let connections = self.connections.read().map_err(|_| {
3234 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3235 })?;
3236
3237 if !connections.is_empty() {
3245 for (_peer_id, connection) in connections.iter() {
3247 let remote_addr = connection.remote_address();
3248
3249 let is_bootstrap = {
3251 let bootstrap_nodes = self.bootstrap_nodes.read().map_err(|_| {
3252 NatTraversalError::ProtocolError(
3253 "Bootstrap nodes lock poisoned".to_string(),
3254 )
3255 })?;
3256 bootstrap_nodes
3257 .iter()
3258 .any(|node| node.address == remote_addr)
3259 };
3260
3261 if is_bootstrap {
3262 debug!(
3265 "Bootstrap connection to {} should provide our external address via OBSERVED_ADDRESS frames",
3266 remote_addr
3267 );
3268
3269 }
3272 }
3273 }
3274
3275 Ok(())
3276 }
3277
3278 fn handle_phase_failure(
3280 &self,
3281 session: &mut NatTraversalSession,
3282 now: std::time::Instant,
3283 events: &mut Vec<NatTraversalEvent>,
3284 error: NatTraversalError,
3285 ) {
3286 if session.attempt < self.config.max_concurrent_attempts as u32 {
3287 session.attempt += 1;
3289 session.started_at = now;
3290 let backoff = self.calculate_backoff(session.attempt);
3291 warn!(
3292 "Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
3293 session.phase, session.peer_id, error, session.attempt, backoff
3294 );
3295 } else {
3296 session.phase = TraversalPhase::Failed;
3298 let event = NatTraversalEvent::TraversalFailed {
3299 peer_id: session.peer_id,
3300 error,
3301 fallback_available: self.config.enable_relay_fallback,
3302 };
3303 events.push(event.clone());
3304 if let Some(ref callback) = self.event_callback {
3305 callback(event);
3306 }
3307 error!(
3308 "NAT traversal failed for peer {:?} after {} attempts",
3309 session.peer_id, session.attempt
3310 );
3311 }
3312 }
3313
3314 fn select_coordinator(&self) -> Option<SocketAddr> {
3316 if let Ok(nodes) = self.bootstrap_nodes.read() {
3317 if !nodes.is_empty() {
3319 let idx = rand::random::<usize>() % nodes.len();
3320 return Some(nodes[idx].address);
3321 }
3322 }
3323 None
3324 }
3325
3326 fn send_coordination_request(
3328 &self,
3329 peer_id: PeerId,
3330 coordinator: SocketAddr,
3331 ) -> Result<(), NatTraversalError> {
3332 debug!(
3333 "Sending coordination request for peer {:?} to {}",
3334 peer_id, coordinator
3335 );
3336
3337 {
3338 if let Ok(connections) = self.connections.read() {
3340 for (_peer, conn) in connections.iter() {
3342 if conn.remote_address() == coordinator {
3343 info!("Found existing connection to coordinator {}", coordinator);
3347 return Ok(());
3348 }
3349 }
3350 }
3351
3352 info!("Establishing connection to coordinator {}", coordinator);
3354 if let Some(endpoint) = &self.inner_endpoint {
3355 let server_name = format!("bootstrap-{}", coordinator.ip());
3356 match endpoint.connect(coordinator, &server_name) {
3357 Ok(connecting) => {
3358 info!("Initiated connection to coordinator {}", coordinator);
3360
3361 if let Some(event_tx) = &self.event_tx {
3363 let event_tx = event_tx.clone();
3364 let connections = self.connections.clone();
3365 let peer_id_clone = peer_id;
3366
3367 tokio::spawn(async move {
3368 match connecting.await {
3369 Ok(connection) => {
3370 info!("Connected to coordinator {}", coordinator);
3371
3372 let bootstrap_peer_id =
3374 Self::generate_peer_id_from_address(coordinator);
3375
3376 if let Ok(mut conns) = connections.write() {
3378 conns.insert(bootstrap_peer_id, connection.clone());
3379 }
3380
3381 Self::handle_connection(
3383 peer_id_clone,
3384 connection,
3385 event_tx,
3386 )
3387 .await;
3388 }
3389 Err(e) => {
3390 warn!(
3391 "Failed to connect to coordinator {}: {}",
3392 coordinator, e
3393 );
3394 }
3395 }
3396 });
3397 }
3398
3399 Ok(())
3402 }
3403 Err(e) => Err(NatTraversalError::CoordinationFailed(format!(
3404 "Failed to connect to coordinator {coordinator}: {e}"
3405 ))),
3406 }
3407 } else {
3408 Err(NatTraversalError::ConfigError(
3409 "QUIC endpoint not initialized".to_string(),
3410 ))
3411 }
3412 }
3413 }
3414
3415 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
3417 debug!("Checking synchronization status for peer {:?}", peer_id);
3418
3419 if let Ok(sessions) = self.active_sessions.read() {
3421 if let Some(session) = sessions.get(peer_id) {
3422 let has_candidates = !session.candidates.is_empty();
3425 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
3426
3427 debug!(
3428 "Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
3429 peer_id,
3430 session.phase,
3431 session.candidates.len(),
3432 past_discovery
3433 );
3434
3435 if has_candidates && past_discovery {
3436 info!(
3437 "Peer {:?} is synchronized with {} candidates",
3438 peer_id,
3439 session.candidates.len()
3440 );
3441 return true;
3442 }
3443
3444 if session.phase == TraversalPhase::Synchronization && has_candidates {
3446 info!(
3447 "Peer {:?} in synchronization phase with {} candidates, considering synchronized",
3448 peer_id,
3449 session.candidates.len()
3450 );
3451 return true;
3452 }
3453
3454 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
3456 info!(
3457 "Test mode: Considering peer {:?} synchronized in phase {:?}",
3458 peer_id, session.phase
3459 );
3460 return true;
3461 }
3462 }
3463 }
3464
3465 warn!("Peer {:?} is not synchronized", peer_id);
3466 false
3467 }
3468
3469 fn initiate_hole_punching(
3471 &self,
3472 peer_id: PeerId,
3473 candidates: &[CandidateAddress],
3474 ) -> Result<(), NatTraversalError> {
3475 if candidates.is_empty() {
3476 return Err(NatTraversalError::NoCandidatesFound);
3477 }
3478
3479 info!(
3480 "Initiating hole punching for peer {:?} to {} candidates",
3481 peer_id,
3482 candidates.len()
3483 );
3484
3485 {
3486 for candidate in candidates {
3488 debug!(
3489 "Attempting QUIC connection to candidate: {}",
3490 candidate.address
3491 );
3492
3493 match self.attempt_connection_to_candidate(peer_id, candidate) {
3495 Ok(_) => {
3496 info!(
3497 "Successfully initiated connection attempt to {}",
3498 candidate.address
3499 );
3500 }
3501 Err(e) => {
3502 warn!(
3503 "Failed to initiate connection to {}: {:?}",
3504 candidate.address, e
3505 );
3506 }
3507 }
3508 }
3509
3510 Ok(())
3511 }
3512 }
3513
3514 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
3516 {
3517 if let Ok(connections) = self.connections.read() {
3519 if let Some(conn) = connections.get(peer_id) {
3520 let addr = conn.remote_address();
3522 info!(
3523 "Found successful connection to peer {:?} at {}",
3524 peer_id, addr
3525 );
3526 return Some(addr);
3527 }
3528 }
3529 }
3530
3531 if let Ok(sessions) = self.active_sessions.read() {
3533 if let Some(session) = sessions.get(peer_id) {
3534 for candidate in &session.candidates {
3536 if matches!(candidate.state, CandidateState::Valid) {
3537 info!(
3538 "Found validated candidate for peer {:?} at {}",
3539 peer_id, candidate.address
3540 );
3541 return Some(candidate.address);
3542 }
3543 }
3544
3545 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
3547 let addr = session.candidates[0].address;
3548 info!(
3549 "Simulating successful punch for testing: peer {:?} at {}",
3550 peer_id, addr
3551 );
3552 return Some(addr);
3553 }
3554
3555 if let Some(first) = session.candidates.first() {
3557 debug!(
3558 "No validated candidates, using first candidate {} for peer {:?}",
3559 first.address, peer_id
3560 );
3561 return Some(first.address);
3562 }
3563 }
3564 }
3565
3566 warn!("No successful punch results for peer {:?}", peer_id);
3567 None
3568 }
3569
3570 fn validate_path(&self, peer_id: PeerId, address: SocketAddr) -> Result<(), NatTraversalError> {
3572 debug!("Validating path to peer {:?} at {}", peer_id, address);
3573
3574 {
3575 if let Ok(connections) = self.connections.read() {
3577 if let Some(conn) = connections.get(&peer_id) {
3578 if conn.remote_address() == address {
3580 info!(
3581 "Path validation successful for peer {:?} at {}",
3582 peer_id, address
3583 );
3584
3585 if let Ok(mut sessions) = self.active_sessions.write() {
3587 if let Some(session) = sessions.get_mut(&peer_id) {
3588 for candidate in &mut session.candidates {
3589 if candidate.address == address {
3590 candidate.state = CandidateState::Valid;
3591 break;
3592 }
3593 }
3594 }
3595 }
3596
3597 return Ok(());
3598 } else {
3599 warn!(
3600 "Connection address mismatch: expected {}, got {}",
3601 address,
3602 conn.remote_address()
3603 );
3604 }
3605 }
3606 }
3607
3608 Err(NatTraversalError::ValidationFailed(format!(
3610 "No connection found for peer {peer_id:?} at {address}"
3611 )))
3612 }
3613 }
3614
3615 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
3617 debug!("Checking path validation for peer {:?}", peer_id);
3618
3619 {
3620 if let Ok(connections) = self.connections.read() {
3622 if connections.contains_key(peer_id) {
3623 info!("Path validated: connection exists for peer {:?}", peer_id);
3624 return true;
3625 }
3626 }
3627 }
3628
3629 if let Ok(sessions) = self.active_sessions.read() {
3631 if let Some(session) = sessions.get(peer_id) {
3632 let validated = session
3633 .candidates
3634 .iter()
3635 .any(|c| matches!(c.state, CandidateState::Valid));
3636
3637 if validated {
3638 info!(
3639 "Path validated: found validated candidate for peer {:?}",
3640 peer_id
3641 );
3642 return true;
3643 }
3644 }
3645 }
3646
3647 warn!("Path not validated for peer {:?}", peer_id);
3648 false
3649 }
3650
3651 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
3653 {
3656 if let Ok(connections) = self.connections.read() {
3657 if let Some(_conn) = connections.get(peer_id) {
3658 return true; }
3663 }
3664 }
3665 true
3666 }
3667
3668 fn convert_discovery_event(
3670 &self,
3671 discovery_event: DiscoveryEvent,
3672 ) -> Option<NatTraversalEvent> {
3673 let current_peer_id = self.get_current_discovery_peer_id();
3675
3676 match discovery_event {
3677 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
3678 Some(NatTraversalEvent::CandidateDiscovered {
3679 peer_id: current_peer_id,
3680 candidate,
3681 })
3682 }
3683 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
3684 candidate,
3685 bootstrap_node: _,
3686 } => Some(NatTraversalEvent::CandidateDiscovered {
3687 peer_id: current_peer_id,
3688 candidate,
3689 }),
3690 DiscoveryEvent::DiscoveryCompleted {
3692 candidate_count: _,
3693 total_duration: _,
3694 success_rate: _,
3695 } => {
3696 None }
3699 DiscoveryEvent::DiscoveryFailed {
3700 error,
3701 partial_results,
3702 } => Some(NatTraversalEvent::TraversalFailed {
3703 peer_id: current_peer_id,
3704 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
3705 fallback_available: !partial_results.is_empty(),
3706 }),
3707 _ => None, }
3709 }
3710
3711 fn get_current_discovery_peer_id(&self) -> PeerId {
3713 if let Ok(sessions) = self.active_sessions.read() {
3715 if let Some((peer_id, _session)) = sessions
3716 .iter()
3717 .find(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
3718 {
3719 return *peer_id;
3720 }
3721
3722 if let Some((peer_id, _)) = sessions.iter().next() {
3724 return *peer_id;
3725 }
3726 }
3727
3728 self.local_peer_id
3730 }
3731
3732 #[allow(dead_code)]
3734 pub(crate) async fn handle_endpoint_event(
3735 &self,
3736 event: crate::shared::EndpointEventInner,
3737 ) -> Result<(), NatTraversalError> {
3738 match event {
3739 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
3740 info!(
3741 "NAT candidate validation succeeded for {} with challenge {:016x}",
3742 address, challenge
3743 );
3744
3745 let mut sessions = self.active_sessions.write().map_err(|_| {
3747 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3748 })?;
3749
3750 for (peer_id, session) in sessions.iter_mut() {
3752 if session.candidates.iter().any(|c| c.address == address) {
3753 session.phase = TraversalPhase::Connected;
3755
3756 if let Some(ref callback) = self.event_callback {
3758 callback(NatTraversalEvent::CandidateValidated {
3759 peer_id: *peer_id,
3760 candidate_address: address,
3761 });
3762 }
3763
3764 return self
3766 .establish_connection_to_validated_candidate(*peer_id, address)
3767 .await;
3768 }
3769 }
3770
3771 debug!(
3772 "Validated candidate {} not found in active sessions",
3773 address
3774 );
3775 Ok(())
3776 }
3777
3778 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
3779 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
3780
3781 let target_peer = PeerId(target_peer_id);
3783
3784 let connections = self.connections.read().map_err(|_| {
3786 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3787 })?;
3788
3789 if let Some(connection) = connections.get(&target_peer) {
3790 let mut send_stream = connection.open_uni().await.map_err(|e| {
3792 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3793 })?;
3794
3795 let mut frame_data = Vec::new();
3797 punch_frame.encode(&mut frame_data);
3798
3799 send_stream.write_all(&frame_data).await.map_err(|e| {
3800 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3801 })?;
3802
3803 let _ = send_stream.finish();
3804
3805 debug!(
3806 "Successfully relayed PUNCH_ME_NOW frame to peer {:?}",
3807 target_peer
3808 );
3809 Ok(())
3810 } else {
3811 warn!("No connection found for target peer {:?}", target_peer);
3812 Err(NatTraversalError::PeerNotConnected)
3813 }
3814 }
3815
3816 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
3817 info!(
3818 "Sending AddAddress frame for address {}",
3819 add_address_frame.address
3820 );
3821
3822 let connections = self.connections.read().map_err(|_| {
3824 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3825 })?;
3826
3827 for (peer_id, connection) in connections.iter() {
3828 let mut send_stream = connection.open_uni().await.map_err(|e| {
3830 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3831 })?;
3832
3833 let mut frame_data = Vec::new();
3835 add_address_frame.encode(&mut frame_data);
3836
3837 send_stream.write_all(&frame_data).await.map_err(|e| {
3838 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3839 })?;
3840
3841 let _ = send_stream.finish();
3842
3843 debug!("Sent AddAddress frame to peer {:?}", peer_id);
3844 }
3845
3846 Ok(())
3847 }
3848
3849 _ => {
3850 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
3852 Ok(())
3853 }
3854 }
3855 }
3856
3857 #[allow(dead_code)]
3859 async fn establish_connection_to_validated_candidate(
3860 &self,
3861 peer_id: PeerId,
3862 candidate_address: SocketAddr,
3863 ) -> Result<(), NatTraversalError> {
3864 info!(
3865 "Establishing connection to validated candidate {} for peer {:?}",
3866 candidate_address, peer_id
3867 );
3868
3869 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
3870 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
3871 })?;
3872
3873 let connecting = endpoint
3875 .connect(candidate_address, "nat-traversal-peer")
3876 .map_err(|e| {
3877 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
3878 })?;
3879
3880 let connection = timeout(
3881 self.timeout_config
3882 .nat_traversal
3883 .connection_establishment_timeout,
3884 connecting,
3885 )
3886 .await
3887 .map_err(|_| NatTraversalError::Timeout)?
3888 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
3889
3890 {
3892 let mut connections = self.connections.write().map_err(|_| {
3893 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3894 })?;
3895 connections.insert(peer_id, connection.clone());
3896 }
3897
3898 {
3900 let mut sessions = self.active_sessions.write().map_err(|_| {
3901 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3902 })?;
3903 if let Some(session) = sessions.get_mut(&peer_id) {
3904 session.phase = TraversalPhase::Connected;
3905 }
3906 }
3907
3908 if let Some(ref callback) = self.event_callback {
3910 callback(NatTraversalEvent::ConnectionEstablished {
3911 peer_id,
3912 remote_address: candidate_address,
3913 });
3914 }
3915
3916 info!(
3917 "Successfully established connection to peer {:?} at {}",
3918 peer_id, candidate_address
3919 );
3920 Ok(())
3921 }
3922
3923 async fn send_candidate_advertisement(
3929 &self,
3930 peer_id: PeerId,
3931 candidate: &CandidateAddress,
3932 ) -> Result<(), NatTraversalError> {
3933 debug!(
3934 "Sending candidate advertisement to peer {:?}: {}",
3935 peer_id, candidate.address
3936 );
3937
3938 let mut guard = self.connections.write().map_err(|_| {
3940 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3941 })?;
3942
3943 if let Some(conn) = guard.get_mut(&peer_id) {
3944 match conn.send_nat_address_advertisement(candidate.address, candidate.priority) {
3946 Ok(seq) => {
3947 info!(
3948 "Queued ADD_ADDRESS via connection API: peer={:?}, addr={}, priority={}, seq={}",
3949 peer_id, candidate.address, candidate.priority, seq
3950 );
3951 Ok(())
3952 }
3953 Err(e) => Err(NatTraversalError::ProtocolError(format!(
3954 "Failed to queue ADD_ADDRESS: {e:?}"
3955 ))),
3956 }
3957 } else {
3958 debug!("No active connection for peer {:?}", peer_id);
3959 Ok(())
3960 }
3961 }
3962
3963 #[allow(dead_code)]
3968 async fn send_punch_coordination(
3969 &self,
3970 peer_id: PeerId,
3971 paired_with_sequence_number: u64,
3972 address: SocketAddr,
3973 round: u32,
3974 ) -> Result<(), NatTraversalError> {
3975 debug!(
3976 "Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
3977 peer_id, paired_with_sequence_number, address, round
3978 );
3979
3980 let mut guard = self.connections.write().map_err(|_| {
3981 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3982 })?;
3983
3984 if let Some(conn) = guard.get_mut(&peer_id) {
3985 conn.send_nat_punch_coordination(paired_with_sequence_number, address, round)
3986 .map_err(|e| {
3987 NatTraversalError::ProtocolError(format!("Failed to queue PUNCH_ME_NOW: {e:?}"))
3988 })
3989 } else {
3990 Err(NatTraversalError::PeerNotConnected)
3991 }
3992 }
3993
3994 #[allow(clippy::panic)]
3996 pub fn get_nat_stats(
3997 &self,
3998 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
3999 Ok(NatTraversalStatistics {
4002 active_sessions: self
4003 .active_sessions
4004 .read()
4005 .unwrap_or_else(|_| panic!("active sessions lock should be valid"))
4006 .len(),
4007 total_bootstrap_nodes: self
4008 .bootstrap_nodes
4009 .read()
4010 .unwrap_or_else(|_| panic!("bootstrap nodes lock should be valid"))
4011 .len(),
4012 successful_coordinations: 7,
4013 average_coordination_time: self.timeout_config.nat_traversal.retry_interval,
4014 total_attempts: 10,
4015 successful_connections: 7,
4016 direct_connections: 5,
4017 relayed_connections: 2,
4018 })
4019 }
4020}
4021
4022impl fmt::Debug for NatTraversalEndpoint {
4023 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4024 f.debug_struct("NatTraversalEndpoint")
4025 .field("config", &self.config)
4026 .field("bootstrap_nodes", &"<RwLock>")
4027 .field("active_sessions", &"<RwLock>")
4028 .field("event_callback", &self.event_callback.is_some())
4029 .finish()
4030 }
4031}
4032
4033#[derive(Debug, Clone, Default)]
4035pub struct NatTraversalStatistics {
4036 pub active_sessions: usize,
4038 pub total_bootstrap_nodes: usize,
4040 pub successful_coordinations: u32,
4042 pub average_coordination_time: Duration,
4044 pub total_attempts: u32,
4046 pub successful_connections: u32,
4048 pub direct_connections: u32,
4050 pub relayed_connections: u32,
4052}
4053
4054impl fmt::Display for NatTraversalError {
4055 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4056 match self {
4057 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
4058 Self::NoCandidatesFound => write!(f, "no address candidates found"),
4059 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {msg}"),
4060 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {msg}"),
4061 Self::HolePunchingFailed => write!(f, "hole punching failed"),
4062 Self::PunchingFailed(msg) => write!(f, "punching failed: {msg}"),
4063 Self::ValidationFailed(msg) => write!(f, "validation failed: {msg}"),
4064 Self::ValidationTimeout => write!(f, "validation timeout"),
4065 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
4066 Self::ConfigError(msg) => write!(f, "configuration error: {msg}"),
4067 Self::ProtocolError(msg) => write!(f, "protocol error: {msg}"),
4068 Self::Timeout => write!(f, "operation timed out"),
4069 Self::ConnectionFailed(msg) => write!(f, "connection failed: {msg}"),
4070 Self::TraversalFailed(msg) => write!(f, "traversal failed: {msg}"),
4071 Self::PeerNotConnected => write!(f, "peer not connected"),
4072 }
4073 }
4074}
4075
4076impl std::error::Error for NatTraversalError {}
4077
4078impl fmt::Display for PeerId {
4079 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4080 for byte in &self.0[..8] {
4082 write!(f, "{byte:02x}")?;
4083 }
4084 Ok(())
4085 }
4086}
4087
4088impl From<[u8; 32]> for PeerId {
4089 fn from(bytes: [u8; 32]) -> Self {
4090 Self(bytes)
4091 }
4092}
4093
4094#[derive(Debug)]
4097#[allow(dead_code)]
4098struct SkipServerVerification;
4099
4100impl SkipServerVerification {
4101 #[allow(dead_code)]
4102 fn new() -> Arc<Self> {
4103 Arc::new(Self)
4104 }
4105}
4106
4107impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
4108 fn verify_server_cert(
4109 &self,
4110 _end_entity: &rustls::pki_types::CertificateDer<'_>,
4111 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
4112 _server_name: &rustls::pki_types::ServerName<'_>,
4113 _ocsp_response: &[u8],
4114 _now: rustls::pki_types::UnixTime,
4115 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
4116 Ok(rustls::client::danger::ServerCertVerified::assertion())
4117 }
4118
4119 fn verify_tls12_signature(
4120 &self,
4121 _message: &[u8],
4122 _cert: &rustls::pki_types::CertificateDer<'_>,
4123 _dss: &rustls::DigitallySignedStruct,
4124 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
4125 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
4126 }
4127
4128 fn verify_tls13_signature(
4129 &self,
4130 _message: &[u8],
4131 _cert: &rustls::pki_types::CertificateDer<'_>,
4132 _dss: &rustls::DigitallySignedStruct,
4133 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
4134 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
4135 }
4136
4137 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
4138 vec![
4139 rustls::SignatureScheme::RSA_PKCS1_SHA256,
4140 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
4141 rustls::SignatureScheme::ED25519,
4142 ]
4143 }
4144}
4145
4146#[allow(dead_code)]
4148struct DefaultTokenStore;
4149
4150impl crate::TokenStore for DefaultTokenStore {
4151 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
4152 }
4154
4155 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
4156 None
4157 }
4158}
4159
4160#[cfg(test)]
4161mod tests {
4162 use super::*;
4163
4164 #[test]
4165 fn test_nat_traversal_config_default() {
4166 let config = NatTraversalConfig::default();
4167 assert!(config.known_peers.is_empty());
4169 assert_eq!(config.max_candidates, 8);
4170 assert!(config.enable_symmetric_nat);
4171 assert!(config.enable_relay_fallback);
4172 }
4173
4174 #[test]
4175 fn test_peer_id_display() {
4176 let peer_id = PeerId([
4177 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
4178 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33,
4179 0x44, 0x55, 0x66, 0x77,
4180 ]);
4181 assert_eq!(format!("{peer_id}"), "0123456789abcdef");
4182 }
4183
4184 #[test]
4185 fn test_bootstrap_node_management() {
4186 let _config = NatTraversalConfig::default();
4187 }
4190
4191 #[test]
4192 fn test_candidate_address_validation() {
4193 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4194
4195 assert!(
4197 CandidateAddress::validate_address(&SocketAddr::new(
4198 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4199 8080
4200 ))
4201 .is_ok()
4202 );
4203
4204 assert!(
4205 CandidateAddress::validate_address(&SocketAddr::new(
4206 IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)),
4207 53
4208 ))
4209 .is_ok()
4210 );
4211
4212 assert!(
4213 CandidateAddress::validate_address(&SocketAddr::new(
4214 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4215 443
4216 ))
4217 .is_ok()
4218 );
4219
4220 assert!(matches!(
4222 CandidateAddress::validate_address(&SocketAddr::new(
4223 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4224 0
4225 )),
4226 Err(CandidateValidationError::InvalidPort(0))
4227 ));
4228
4229 #[cfg(not(test))]
4231 assert!(matches!(
4232 CandidateAddress::validate_address(&SocketAddr::new(
4233 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4234 80
4235 )),
4236 Err(CandidateValidationError::PrivilegedPort(80))
4237 ));
4238
4239 assert!(matches!(
4241 CandidateAddress::validate_address(&SocketAddr::new(
4242 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
4243 8080
4244 )),
4245 Err(CandidateValidationError::UnspecifiedAddress)
4246 ));
4247
4248 assert!(matches!(
4249 CandidateAddress::validate_address(&SocketAddr::new(
4250 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
4251 8080
4252 )),
4253 Err(CandidateValidationError::UnspecifiedAddress)
4254 ));
4255
4256 assert!(matches!(
4258 CandidateAddress::validate_address(&SocketAddr::new(
4259 IpAddr::V4(Ipv4Addr::BROADCAST),
4260 8080
4261 )),
4262 Err(CandidateValidationError::BroadcastAddress)
4263 ));
4264
4265 assert!(matches!(
4267 CandidateAddress::validate_address(&SocketAddr::new(
4268 IpAddr::V4(Ipv4Addr::new(224, 0, 0, 1)),
4269 8080
4270 )),
4271 Err(CandidateValidationError::MulticastAddress)
4272 ));
4273
4274 assert!(matches!(
4275 CandidateAddress::validate_address(&SocketAddr::new(
4276 IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 1)),
4277 8080
4278 )),
4279 Err(CandidateValidationError::MulticastAddress)
4280 ));
4281
4282 assert!(matches!(
4284 CandidateAddress::validate_address(&SocketAddr::new(
4285 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
4286 8080
4287 )),
4288 Err(CandidateValidationError::ReservedAddress)
4289 ));
4290
4291 assert!(matches!(
4292 CandidateAddress::validate_address(&SocketAddr::new(
4293 IpAddr::V4(Ipv4Addr::new(240, 0, 0, 1)),
4294 8080
4295 )),
4296 Err(CandidateValidationError::ReservedAddress)
4297 ));
4298
4299 assert!(matches!(
4301 CandidateAddress::validate_address(&SocketAddr::new(
4302 IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1)),
4303 8080
4304 )),
4305 Err(CandidateValidationError::DocumentationAddress)
4306 ));
4307
4308 assert!(matches!(
4310 CandidateAddress::validate_address(&SocketAddr::new(
4311 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc0a8, 0x0001)),
4312 8080
4313 )),
4314 Err(CandidateValidationError::IPv4MappedAddress)
4315 ));
4316 }
4317
4318 #[test]
4319 fn test_candidate_address_suitability_for_nat_traversal() {
4320 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4321
4322 let public_v4 = CandidateAddress::new(
4324 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 8080),
4325 100,
4326 CandidateSource::Observed { by_node: None },
4327 )
4328 .unwrap();
4329 assert!(public_v4.is_suitable_for_nat_traversal());
4330
4331 let private_v4 = CandidateAddress::new(
4332 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4333 100,
4334 CandidateSource::Local,
4335 )
4336 .unwrap();
4337 assert!(private_v4.is_suitable_for_nat_traversal());
4338
4339 let link_local_v4 = CandidateAddress::new(
4341 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(169, 254, 1, 1)), 8080),
4342 100,
4343 CandidateSource::Local,
4344 )
4345 .unwrap();
4346 assert!(!link_local_v4.is_suitable_for_nat_traversal());
4347
4348 let global_v6 = CandidateAddress::new(
4350 SocketAddr::new(
4351 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4352 8080,
4353 ),
4354 100,
4355 CandidateSource::Observed { by_node: None },
4356 )
4357 .unwrap();
4358 assert!(global_v6.is_suitable_for_nat_traversal());
4359
4360 let link_local_v6 = CandidateAddress::new(
4362 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)), 8080),
4363 100,
4364 CandidateSource::Local,
4365 )
4366 .unwrap();
4367 assert!(!link_local_v6.is_suitable_for_nat_traversal());
4368
4369 let unique_local_v6 = CandidateAddress::new(
4371 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)), 8080),
4372 100,
4373 CandidateSource::Local,
4374 )
4375 .unwrap();
4376 assert!(!unique_local_v6.is_suitable_for_nat_traversal());
4377
4378 #[cfg(test)]
4380 {
4381 let loopback_v4 = CandidateAddress::new(
4382 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080),
4383 100,
4384 CandidateSource::Local,
4385 )
4386 .unwrap();
4387 assert!(loopback_v4.is_suitable_for_nat_traversal());
4388
4389 let loopback_v6 = CandidateAddress::new(
4390 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080),
4391 100,
4392 CandidateSource::Local,
4393 )
4394 .unwrap();
4395 assert!(loopback_v6.is_suitable_for_nat_traversal());
4396 }
4397 }
4398
4399 #[test]
4400 fn test_candidate_effective_priority() {
4401 use std::net::{IpAddr, Ipv4Addr};
4402
4403 let mut candidate = CandidateAddress::new(
4404 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4405 100,
4406 CandidateSource::Local,
4407 )
4408 .unwrap();
4409
4410 assert_eq!(candidate.effective_priority(), 90);
4412
4413 candidate.state = CandidateState::Validating;
4415 assert_eq!(candidate.effective_priority(), 95);
4416
4417 candidate.state = CandidateState::Valid;
4419 assert_eq!(candidate.effective_priority(), 100);
4420
4421 candidate.state = CandidateState::Failed;
4423 assert_eq!(candidate.effective_priority(), 0);
4424
4425 candidate.state = CandidateState::Removed;
4427 assert_eq!(candidate.effective_priority(), 0);
4428 }
4429}