1#![allow(missing_docs)]
8
9use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, time::Duration};
16
17#[allow(clippy::panic)]
36fn create_random_port_bind_addr() -> SocketAddr {
37 "0.0.0.0:0"
38 .parse()
39 .unwrap_or_else(|_| panic!("Random port bind address format is always valid"))
40}
41
42fn extract_ml_dsa_from_spki(spki: &[u8]) -> Option<crate::crypto::pqc::types::MlDsaPublicKey> {
49 crate::crypto::raw_public_keys::pqc::extract_public_key_from_spki(spki).ok()
50}
51
52use tracing::{debug, error, info, warn};
53
54use std::sync::atomic::{AtomicBool, Ordering};
55
56use tokio::{
57 net::UdpSocket,
58 sync::{mpsc, mpsc::error::TryRecvError},
59 time::{sleep, timeout},
60};
61
62use crate::high_level::default_runtime;
63
64use crate::{
65 VarInt,
66 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
67 connection::nat_traversal::{CandidateSource, CandidateState},
69 masque::integration::{RelayManager, RelayManagerConfig},
70};
71
72use crate::{
73 ClientConfig, ConnectionError, EndpointConfig, ServerConfig, TransportConfig,
74 high_level::{Connection as InnerConnection, Endpoint as InnerEndpoint},
75};
76
77#[cfg(feature = "rustls-aws-lc-rs")]
78use crate::{crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig};
79
80use crate::config::validation::{ConfigValidator, ValidationResult};
81
82#[cfg(feature = "rustls-aws-lc-rs")]
83use crate::crypto::{pqc::PqcConfig, raw_public_keys::RawPublicKeyConfigBuilder};
84
85pub struct NatTraversalEndpoint {
87 inner_endpoint: Option<InnerEndpoint>,
89 config: NatTraversalConfig,
93 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
95 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
97 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
99 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
101 shutdown: Arc<AtomicBool>,
103 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
105 event_rx: std::sync::Mutex<mpsc::UnboundedReceiver<NatTraversalEvent>>,
107 connections: Arc<std::sync::RwLock<HashMap<PeerId, InnerConnection>>>,
109 local_peer_id: PeerId,
111 timeout_config: crate::config::nat_timeouts::TimeoutConfig,
113 emitted_established_events: Arc<std::sync::RwLock<std::collections::HashSet<PeerId>>>,
116 relay_manager: Option<Arc<RelayManager>>,
118}
119
120#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
160pub struct NatTraversalConfig {
161 pub known_peers: Vec<SocketAddr>,
165 pub max_candidates: usize,
167 pub coordination_timeout: Duration,
169 pub enable_symmetric_nat: bool,
171 pub enable_relay_fallback: bool,
173 pub relay_nodes: Vec<SocketAddr>,
176 pub max_concurrent_attempts: usize,
178 pub bind_addr: Option<SocketAddr>,
195 pub prefer_rfc_nat_traversal: bool,
198 pub pqc: Option<PqcConfig>,
200 pub timeouts: crate::config::nat_timeouts::TimeoutConfig,
202 #[serde(skip)]
210 pub identity_key: Option<(
211 crate::crypto::pqc::types::MlDsaPublicKey,
212 crate::crypto::pqc::types::MlDsaSecretKey,
213 )>,
214}
215
216#[derive(
222 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
223)]
224pub struct PeerId(pub [u8; 32]);
225
226#[derive(Debug, Clone)]
228pub struct BootstrapNode {
229 pub address: SocketAddr,
231 pub last_seen: std::time::Instant,
233 pub can_coordinate: bool,
235 pub rtt: Option<Duration>,
237 pub coordination_count: u32,
239}
240
241impl BootstrapNode {
242 pub fn new(address: SocketAddr) -> Self {
244 Self {
245 address,
246 last_seen: std::time::Instant::now(),
247 can_coordinate: true,
248 rtt: None,
249 coordination_count: 0,
250 }
251 }
252}
253
254#[derive(Debug, Clone)]
256pub struct CandidatePair {
257 pub local_candidate: CandidateAddress,
259 pub remote_candidate: CandidateAddress,
261 pub priority: u64,
263 pub state: CandidatePairState,
265}
266
267#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269pub enum CandidatePairState {
270 Waiting,
272 InProgress,
274 Succeeded,
276 Failed,
278 Cancelled,
280}
281
282#[derive(Debug)]
284struct NatTraversalSession {
285 peer_id: PeerId,
287 #[allow(dead_code)]
289 coordinator: SocketAddr,
290 attempt: u32,
292 started_at: std::time::Instant,
294 phase: TraversalPhase,
296 candidates: Vec<CandidateAddress>,
298 session_state: SessionState,
300}
301
302#[derive(Debug, Clone)]
304pub struct SessionState {
305 pub state: ConnectionState,
307 pub last_transition: std::time::Instant,
309 pub connection: Option<InnerConnection>,
311 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
313 pub metrics: ConnectionMetrics,
315}
316
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
319pub enum ConnectionState {
320 Idle,
322 Connecting,
324 Connected,
326 Migrating,
328 Closed,
330}
331
332#[derive(Debug, Clone, Default)]
334pub struct ConnectionMetrics {
335 pub rtt: Option<Duration>,
337 pub loss_rate: f64,
339 pub bytes_sent: u64,
341 pub bytes_received: u64,
343 pub last_activity: Option<std::time::Instant>,
345}
346
347#[derive(Debug, Clone)]
349pub struct SessionStateUpdate {
350 pub peer_id: PeerId,
352 pub old_state: ConnectionState,
354 pub new_state: ConnectionState,
356 pub reason: StateChangeReason,
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub enum StateChangeReason {
363 Timeout,
365 ConnectionEstablished,
367 ConnectionClosed,
369 MigrationComplete,
371 MigrationFailed,
373 NetworkError,
375 UserClosed,
377}
378
379#[derive(Debug, Clone, Copy, PartialEq, Eq)]
381pub enum TraversalPhase {
382 Discovery,
384 Coordination,
386 Synchronization,
388 Punching,
390 Validation,
392 Connected,
394 Failed,
396}
397
398#[derive(Debug, Clone, Copy)]
400enum SessionUpdate {
401 Timeout,
403 Disconnected,
405 UpdateMetrics,
407 InvalidState,
409 Retry,
411 MigrationTimeout,
413 Remove,
415}
416
417#[derive(Debug, Clone)]
419pub struct CandidateAddress {
420 pub address: SocketAddr,
422 pub priority: u32,
424 pub source: CandidateSource,
426 pub state: CandidateState,
428}
429
430impl CandidateAddress {
431 pub fn new(
433 address: SocketAddr,
434 priority: u32,
435 source: CandidateSource,
436 ) -> Result<Self, CandidateValidationError> {
437 Self::validate_address(&address)?;
438 Ok(Self {
439 address,
440 priority,
441 source,
442 state: CandidateState::New,
443 })
444 }
445
446 pub fn validate_address(addr: &SocketAddr) -> Result<(), CandidateValidationError> {
448 if addr.port() == 0 {
450 return Err(CandidateValidationError::InvalidPort(0));
451 }
452
453 #[cfg(not(test))]
455 if addr.port() < 1024 {
456 return Err(CandidateValidationError::PrivilegedPort(addr.port()));
457 }
458
459 match addr.ip() {
460 std::net::IpAddr::V4(ipv4) => {
461 if ipv4.is_unspecified() {
463 return Err(CandidateValidationError::UnspecifiedAddress);
464 }
465 if ipv4.is_broadcast() {
466 return Err(CandidateValidationError::BroadcastAddress);
467 }
468 if ipv4.is_multicast() {
469 return Err(CandidateValidationError::MulticastAddress);
470 }
471 if ipv4.octets()[0] == 0 {
473 return Err(CandidateValidationError::ReservedAddress);
474 }
475 if ipv4.octets()[0] >= 240 {
477 return Err(CandidateValidationError::ReservedAddress);
478 }
479 }
480 std::net::IpAddr::V6(ipv6) => {
481 if ipv6.is_unspecified() {
483 return Err(CandidateValidationError::UnspecifiedAddress);
484 }
485 if ipv6.is_multicast() {
486 return Err(CandidateValidationError::MulticastAddress);
487 }
488 let segments = ipv6.segments();
490 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
491 return Err(CandidateValidationError::DocumentationAddress);
492 }
493 if ipv6.to_ipv4_mapped().is_some() {
495 return Err(CandidateValidationError::IPv4MappedAddress);
496 }
497 }
498 }
499
500 Ok(())
501 }
502
503 pub fn is_suitable_for_nat_traversal(&self) -> bool {
505 match self.address.ip() {
506 std::net::IpAddr::V4(ipv4) => {
507 #[cfg(test)]
512 if ipv4.is_loopback() {
513 return true;
514 }
515 !ipv4.is_loopback()
516 && !ipv4.is_link_local()
517 && !ipv4.is_multicast()
518 && !ipv4.is_broadcast()
519 }
520 std::net::IpAddr::V6(ipv6) => {
521 #[cfg(test)]
527 if ipv6.is_loopback() {
528 return true;
529 }
530 let segments = ipv6.segments();
531 let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
532 let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
533
534 !ipv6.is_loopback() && !is_link_local && !is_unique_local && !ipv6.is_multicast()
535 }
536 }
537 }
538
539 pub fn effective_priority(&self) -> u32 {
541 match self.state {
542 CandidateState::Valid => self.priority,
543 CandidateState::New => self.priority.saturating_sub(10),
544 CandidateState::Validating => self.priority.saturating_sub(5),
545 CandidateState::Failed => 0,
546 CandidateState::Removed => 0,
547 }
548 }
549}
550
551#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
553pub enum CandidateValidationError {
554 #[error("invalid port number: {0}")]
556 InvalidPort(u16),
557 #[error("privileged port not allowed: {0}")]
559 PrivilegedPort(u16),
560 #[error("unspecified address not allowed")]
562 UnspecifiedAddress,
563 #[error("broadcast address not allowed")]
565 BroadcastAddress,
566 #[error("multicast address not allowed")]
568 MulticastAddress,
569 #[error("reserved address not allowed")]
571 ReservedAddress,
572 #[error("documentation address not allowed")]
574 DocumentationAddress,
575 #[error("IPv4-mapped IPv6 address not allowed")]
577 IPv4MappedAddress,
578}
579
580#[derive(Debug, Clone)]
582pub enum NatTraversalEvent {
583 CandidateDiscovered {
585 peer_id: PeerId,
587 candidate: CandidateAddress,
589 },
590 CoordinationRequested {
592 peer_id: PeerId,
594 coordinator: SocketAddr,
596 },
597 CoordinationSynchronized {
599 peer_id: PeerId,
601 round_id: VarInt,
603 },
604 HolePunchingStarted {
606 peer_id: PeerId,
608 targets: Vec<SocketAddr>,
610 },
611 PathValidated {
613 peer_id: PeerId,
615 address: SocketAddr,
617 rtt: Duration,
619 },
620 CandidateValidated {
622 peer_id: PeerId,
624 candidate_address: SocketAddr,
626 },
627 TraversalSucceeded {
629 peer_id: PeerId,
631 final_address: SocketAddr,
633 total_time: Duration,
635 },
636 ConnectionEstablished {
638 peer_id: PeerId,
639 remote_address: SocketAddr,
641 },
642 TraversalFailed {
644 peer_id: PeerId,
646 error: NatTraversalError,
648 fallback_available: bool,
650 },
651 ConnectionLost {
653 peer_id: PeerId,
655 reason: String,
657 },
658 PhaseTransition {
660 peer_id: PeerId,
662 from_phase: TraversalPhase,
664 to_phase: TraversalPhase,
666 },
667 SessionStateChanged {
669 peer_id: PeerId,
671 new_state: ConnectionState,
673 },
674 ExternalAddressDiscovered {
676 reported_by: SocketAddr,
678 address: SocketAddr,
680 },
681}
682
683#[derive(Debug, Clone)]
685pub enum NatTraversalError {
686 NoBootstrapNodes,
688 NoCandidatesFound,
690 CandidateDiscoveryFailed(String),
692 CoordinationFailed(String),
694 HolePunchingFailed,
696 PunchingFailed(String),
698 ValidationFailed(String),
700 ValidationTimeout,
702 NetworkError(String),
704 ConfigError(String),
706 ProtocolError(String),
708 Timeout,
710 ConnectionFailed(String),
712 TraversalFailed(String),
714 PeerNotConnected,
716}
717
718impl Default for NatTraversalConfig {
719 fn default() -> Self {
720 Self {
721 known_peers: Vec::new(),
722 max_candidates: 8,
723 coordination_timeout: Duration::from_secs(10),
724 enable_symmetric_nat: true,
725 enable_relay_fallback: true,
726 relay_nodes: Vec::new(),
727 max_concurrent_attempts: 3,
728 bind_addr: None,
729 prefer_rfc_nat_traversal: true, pqc: Some(crate::crypto::pqc::PqcConfig::default()),
733 timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
734 identity_key: None, }
736 }
737}
738
739impl ConfigValidator for NatTraversalConfig {
740 fn validate(&self) -> ValidationResult<()> {
741 use crate::config::validation::*;
742
743 if !self.known_peers.is_empty() {
748 validate_bootstrap_nodes(&self.known_peers)?;
749 }
750
751 validate_range(self.max_candidates, 1, 256, "max_candidates")?;
753
754 validate_duration(
756 self.coordination_timeout,
757 Duration::from_millis(100),
758 Duration::from_secs(300),
759 "coordination_timeout",
760 )?;
761
762 validate_range(
764 self.max_concurrent_attempts,
765 1,
766 16,
767 "max_concurrent_attempts",
768 )?;
769
770 if self.max_concurrent_attempts > self.max_candidates {
772 return Err(ConfigValidationError::IncompatibleConfiguration(
773 "max_concurrent_attempts cannot exceed max_candidates".to_string(),
774 ));
775 }
776
777 Ok(())
778 }
779}
780
781impl NatTraversalEndpoint {
782 pub async fn new(
784 config: NatTraversalConfig,
785 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
786 ) -> Result<Self, NatTraversalError> {
787 Self::new_impl(config, event_callback).await
788 }
789
790 async fn new_impl(
792 config: NatTraversalConfig,
793 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
794 ) -> Result<Self, NatTraversalError> {
795 Self::new_common(config, event_callback).await
796 }
797
798 async fn new_common(
800 config: NatTraversalConfig,
801 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
802 ) -> Result<Self, NatTraversalError> {
803 Self::new_shared_logic(config, event_callback).await
805 }
806
807 async fn new_shared_logic(
809 config: NatTraversalConfig,
810 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
811 ) -> Result<Self, NatTraversalError> {
812 config
814 .validate()
815 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
816
817 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
819 config
820 .known_peers
821 .iter()
822 .map(|&address| BootstrapNode {
823 address,
824 last_seen: std::time::Instant::now(),
825 can_coordinate: true, rtt: None,
827 coordination_count: 0,
828 })
829 .collect(),
830 ));
831
832 let discovery_config = DiscoveryConfig {
834 total_timeout: config.coordination_timeout,
835 max_candidates: config.max_candidates,
836 enable_symmetric_prediction: config.enable_symmetric_nat,
837 bound_address: config.bind_addr, ..DiscoveryConfig::default()
839 };
840
841 let discovery_manager = Arc::new(std::sync::Mutex::new(CandidateDiscoveryManager::new(
844 discovery_config,
845 )));
846
847 let (inner_endpoint, event_tx, event_rx, local_addr) =
849 Self::create_inner_endpoint(&config).await?;
850
851 {
853 let mut discovery = discovery_manager.lock().map_err(|_| {
854 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
855 })?;
856 discovery.set_bound_address(local_addr);
857 info!(
858 "Updated discovery manager with bound address: {}",
859 local_addr
860 );
861 }
862
863 let emitted_established_events =
864 Arc::new(std::sync::RwLock::new(std::collections::HashSet::new()));
865
866 let relay_manager = if config.enable_relay_fallback && !config.relay_nodes.is_empty() {
868 let relay_config = RelayManagerConfig {
869 max_relays: config.relay_nodes.len().min(5), connect_timeout: config.coordination_timeout,
871 ..RelayManagerConfig::default()
872 };
873 let manager = RelayManager::new(relay_config);
874 for relay_addr in &config.relay_nodes {
876 manager.add_relay_node(*relay_addr).await;
877 }
878 Some(Arc::new(manager))
879 } else {
880 None
881 };
882
883 let endpoint = Self {
884 inner_endpoint: Some(inner_endpoint.clone()),
885 config: config.clone(),
886 bootstrap_nodes,
887 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
888 discovery_manager,
889 event_callback,
890 shutdown: Arc::new(AtomicBool::new(false)),
891 event_tx: Some(event_tx.clone()),
892 event_rx: std::sync::Mutex::new(event_rx),
893 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
894 local_peer_id: Self::generate_local_peer_id(),
895 timeout_config: config.timeouts.clone(),
896 emitted_established_events: emitted_established_events.clone(),
897 relay_manager,
898 };
899
900 {
902 let endpoint_clone = inner_endpoint.clone();
903 let shutdown_clone = endpoint.shutdown.clone();
904 let event_tx_clone = event_tx.clone();
905 let connections_clone = endpoint.connections.clone();
906 let emitted_events_clone = emitted_established_events.clone();
907
908 tokio::spawn(async move {
909 Self::accept_connections(
910 endpoint_clone,
911 shutdown_clone,
912 event_tx_clone,
913 connections_clone,
914 emitted_events_clone,
915 )
916 .await;
917 });
918
919 info!("Started accepting connections (symmetric P2P node)");
920 }
921
922 let discovery_manager_clone = endpoint.discovery_manager.clone();
924 let shutdown_clone = endpoint.shutdown.clone();
925 let event_tx_clone = event_tx;
926 let connections_clone = endpoint.connections.clone();
927
928 tokio::spawn(async move {
929 Self::poll_discovery(
930 discovery_manager_clone,
931 shutdown_clone,
932 event_tx_clone,
933 connections_clone,
934 )
935 .await;
936 });
937
938 info!("Started discovery polling task");
939
940 {
942 let mut discovery = endpoint.discovery_manager.lock().map_err(|_| {
943 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
944 })?;
945
946 let local_peer_id = endpoint.local_peer_id;
948 let bootstrap_nodes = {
949 let nodes = endpoint.bootstrap_nodes.read().map_err(|_| {
950 NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string())
951 })?;
952 nodes.clone()
953 };
954
955 discovery
956 .start_discovery(local_peer_id, bootstrap_nodes)
957 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
958
959 info!(
960 "Started local candidate discovery for peer {:?}",
961 local_peer_id
962 );
963 }
964
965 Ok(endpoint)
966 }
967
968 pub fn get_endpoint(&self) -> Option<&crate::high_level::Endpoint> {
970 self.inner_endpoint.as_ref()
971 }
972
973 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
975 self.event_callback.as_ref()
976 }
977
978 pub fn initiate_nat_traversal(
980 &self,
981 peer_id: PeerId,
982 coordinator: SocketAddr,
983 ) -> Result<(), NatTraversalError> {
984 info!(
985 "Starting NAT traversal to peer {:?} via coordinator {}",
986 peer_id, coordinator
987 );
988
989 let session = NatTraversalSession {
991 peer_id,
992 coordinator,
993 attempt: 1,
994 started_at: std::time::Instant::now(),
995 phase: TraversalPhase::Discovery,
996 candidates: Vec::new(),
997 session_state: SessionState {
998 state: ConnectionState::Connecting,
999 last_transition: std::time::Instant::now(),
1000
1001 connection: None,
1002 active_attempts: Vec::new(),
1003 metrics: ConnectionMetrics::default(),
1004 },
1005 };
1006
1007 {
1009 let mut sessions = self
1010 .active_sessions
1011 .write()
1012 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1013 sessions.insert(peer_id, session);
1014 }
1015
1016 let bootstrap_nodes_vec = {
1018 let bootstrap_nodes = self
1019 .bootstrap_nodes
1020 .read()
1021 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1022 bootstrap_nodes.clone()
1023 };
1024
1025 {
1026 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1027 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1028 })?;
1029
1030 discovery
1031 .start_discovery(peer_id, bootstrap_nodes_vec)
1032 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1033 }
1034
1035 if let Some(ref callback) = self.event_callback {
1037 callback(NatTraversalEvent::CoordinationRequested {
1038 peer_id,
1039 coordinator,
1040 });
1041 }
1042
1043 Ok(())
1045 }
1046
1047 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
1049 let mut updates = Vec::new();
1050 let now = std::time::Instant::now();
1051
1052 let mut sessions = self
1053 .active_sessions
1054 .write()
1055 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
1056
1057 for (peer_id, session) in sessions.iter_mut() {
1058 let mut state_changed = false;
1059
1060 match session.session_state.state {
1061 ConnectionState::Connecting => {
1062 let elapsed = now.duration_since(session.session_state.last_transition);
1064 if elapsed
1065 > self
1066 .timeout_config
1067 .nat_traversal
1068 .connection_establishment_timeout
1069 {
1070 session.session_state.state = ConnectionState::Closed;
1071 session.session_state.last_transition = now;
1072 state_changed = true;
1073
1074 updates.push(SessionStateUpdate {
1075 peer_id: *peer_id,
1076 old_state: ConnectionState::Connecting,
1077 new_state: ConnectionState::Closed,
1078 reason: StateChangeReason::Timeout,
1079 });
1080 }
1081
1082 let has_connection = if let Ok(conns) = self.connections.read() {
1085 conns.contains_key(peer_id)
1086 } else {
1087 false
1088 };
1089
1090 if has_connection || session.session_state.connection.is_some() {
1091 if session.session_state.connection.is_none() {
1093 if let Ok(conns) = self.connections.read() {
1094 if let Some(conn) = conns.get(peer_id) {
1095 session.session_state.connection = Some(conn.clone());
1096 }
1097 }
1098 }
1099
1100 session.session_state.state = ConnectionState::Connected;
1101 session.session_state.last_transition = now;
1102 state_changed = true;
1103
1104 updates.push(SessionStateUpdate {
1105 peer_id: *peer_id,
1106 old_state: ConnectionState::Connecting,
1107 new_state: ConnectionState::Connected,
1108 reason: StateChangeReason::ConnectionEstablished,
1109 });
1110 }
1111 }
1112 ConnectionState::Connected => {
1113 {
1116 }
1119
1120 session.session_state.metrics.last_activity = Some(now);
1122 }
1123 ConnectionState::Migrating => {
1124 let elapsed = now.duration_since(session.session_state.last_transition);
1126 if elapsed > Duration::from_secs(10) {
1127 if session.session_state.connection.is_some() {
1130 session.session_state.state = ConnectionState::Connected;
1131 state_changed = true;
1132
1133 updates.push(SessionStateUpdate {
1134 peer_id: *peer_id,
1135 old_state: ConnectionState::Migrating,
1136 new_state: ConnectionState::Connected,
1137 reason: StateChangeReason::MigrationComplete,
1138 });
1139 } else {
1140 session.session_state.state = ConnectionState::Closed;
1141 state_changed = true;
1142
1143 updates.push(SessionStateUpdate {
1144 peer_id: *peer_id,
1145 old_state: ConnectionState::Migrating,
1146 new_state: ConnectionState::Closed,
1147 reason: StateChangeReason::MigrationFailed,
1148 });
1149 }
1150
1151 session.session_state.last_transition = now;
1152 }
1153 }
1154 _ => {}
1155 }
1156
1157 if state_changed {
1159 if let Some(ref callback) = self.event_callback {
1160 callback(NatTraversalEvent::SessionStateChanged {
1161 peer_id: *peer_id,
1162 new_state: session.session_state.state,
1163 });
1164 }
1165 }
1166 }
1167
1168 Ok(updates)
1169 }
1170
1171 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
1173 let sessions = self.active_sessions.clone();
1174 let shutdown = self.shutdown.clone();
1175 let timeout_config = self.timeout_config.clone();
1176
1177 tokio::spawn(async move {
1178 let mut ticker = tokio::time::interval(interval);
1179
1180 loop {
1181 ticker.tick().await;
1182
1183 if shutdown.load(Ordering::Relaxed) {
1184 break;
1185 }
1186
1187 let sessions_to_update = {
1189 match sessions.read() {
1190 Ok(sessions_guard) => {
1191 sessions_guard
1192 .iter()
1193 .filter_map(|(peer_id, session)| {
1194 let now = std::time::Instant::now();
1195 let elapsed =
1196 now.duration_since(session.session_state.last_transition);
1197
1198 match session.session_state.state {
1199 ConnectionState::Connecting => {
1200 if elapsed
1202 > timeout_config
1203 .nat_traversal
1204 .connection_establishment_timeout
1205 {
1206 Some((*peer_id, SessionUpdate::Timeout))
1207 } else {
1208 None
1209 }
1210 }
1211 ConnectionState::Connected => {
1212 if let Some(ref conn) = session.session_state.connection
1214 {
1215 if conn.close_reason().is_some() {
1216 Some((*peer_id, SessionUpdate::Disconnected))
1217 } else {
1218 Some((*peer_id, SessionUpdate::UpdateMetrics))
1220 }
1221 } else {
1222 Some((*peer_id, SessionUpdate::InvalidState))
1223 }
1224 }
1225 ConnectionState::Idle => {
1226 if elapsed
1228 > timeout_config
1229 .discovery
1230 .server_reflexive_cache_ttl
1231 {
1232 Some((*peer_id, SessionUpdate::Retry))
1233 } else {
1234 None
1235 }
1236 }
1237 ConnectionState::Migrating => {
1238 if elapsed > timeout_config.nat_traversal.probe_timeout
1240 {
1241 Some((*peer_id, SessionUpdate::MigrationTimeout))
1242 } else {
1243 None
1244 }
1245 }
1246 ConnectionState::Closed => {
1247 if elapsed
1249 > timeout_config.discovery.interface_cache_ttl
1250 {
1251 Some((*peer_id, SessionUpdate::Remove))
1252 } else {
1253 None
1254 }
1255 }
1256 }
1257 })
1258 .collect::<Vec<_>>()
1259 }
1260 _ => {
1261 vec![]
1262 }
1263 }
1264 };
1265
1266 if !sessions_to_update.is_empty() {
1268 if let Ok(mut sessions_guard) = sessions.write() {
1269 for (peer_id, update) in sessions_to_update {
1270 match update {
1271 SessionUpdate::Timeout => {
1272 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1273 session.session_state.state = ConnectionState::Closed;
1274 session.session_state.last_transition =
1275 std::time::Instant::now();
1276 tracing::warn!("Connection to {:?} timed out", peer_id);
1277 }
1278 }
1279 SessionUpdate::Disconnected => {
1280 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1281 session.session_state.state = ConnectionState::Closed;
1282 session.session_state.last_transition =
1283 std::time::Instant::now();
1284 session.session_state.connection = None;
1285 tracing::info!("Connection to {:?} closed", peer_id);
1286 }
1287 }
1288 SessionUpdate::UpdateMetrics => {
1289 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1290 if let Some(ref conn) = session.session_state.connection {
1291 let stats = conn.stats();
1293 session.session_state.metrics.rtt =
1294 Some(stats.path.rtt);
1295 session.session_state.metrics.loss_rate =
1296 stats.path.lost_packets as f64
1297 / stats.path.sent_packets.max(1) as f64;
1298 }
1299 }
1300 }
1301 SessionUpdate::InvalidState => {
1302 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1303 session.session_state.state = ConnectionState::Closed;
1304 session.session_state.last_transition =
1305 std::time::Instant::now();
1306 tracing::error!("Session {:?} in invalid state", peer_id);
1307 }
1308 }
1309 SessionUpdate::Retry => {
1310 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1311 session.session_state.state = ConnectionState::Connecting;
1312 session.session_state.last_transition =
1313 std::time::Instant::now();
1314 session.attempt += 1;
1315 tracing::info!(
1316 "Retrying connection to {:?} (attempt {})",
1317 peer_id,
1318 session.attempt
1319 );
1320 }
1321 }
1322 SessionUpdate::MigrationTimeout => {
1323 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1324 session.session_state.state = ConnectionState::Closed;
1325 session.session_state.last_transition =
1326 std::time::Instant::now();
1327 tracing::warn!("Migration timeout for {:?}", peer_id);
1328 }
1329 }
1330 SessionUpdate::Remove => {
1331 sessions_guard.remove(&peer_id);
1332 tracing::debug!("Removed old session for {:?}", peer_id);
1333 }
1334 }
1335 }
1336 }
1337 }
1338 }
1339 })
1340 }
1341
1342 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
1346 let sessions = self
1347 .active_sessions
1348 .read()
1349 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1350 let bootstrap_nodes = self
1351 .bootstrap_nodes
1352 .read()
1353 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1354
1355 let avg_coordination_time = {
1357 let rtts: Vec<Duration> = bootstrap_nodes.iter().filter_map(|b| b.rtt).collect();
1358
1359 if rtts.is_empty() {
1360 Duration::from_millis(500) } else {
1362 let total_millis: u64 = rtts.iter().map(|d| d.as_millis() as u64).sum();
1363 Duration::from_millis(total_millis / rtts.len() as u64 * 2) }
1365 };
1366
1367 Ok(NatTraversalStatistics {
1368 active_sessions: sessions.len(),
1369 total_bootstrap_nodes: bootstrap_nodes.len(),
1370 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
1371 average_coordination_time: avg_coordination_time,
1372 total_attempts: 0,
1373 successful_connections: 0,
1374 direct_connections: 0,
1375 relayed_connections: 0,
1376 })
1377 }
1378
1379 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1381 let mut bootstrap_nodes = self
1382 .bootstrap_nodes
1383 .write()
1384 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1385
1386 if !bootstrap_nodes.iter().any(|b| b.address == address) {
1388 bootstrap_nodes.push(BootstrapNode {
1389 address,
1390 last_seen: std::time::Instant::now(),
1391 can_coordinate: true,
1392 rtt: None,
1393 coordination_count: 0,
1394 });
1395 info!("Added bootstrap node: {}", address);
1396 }
1397 Ok(())
1398 }
1399
1400 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1402 let mut bootstrap_nodes = self
1403 .bootstrap_nodes
1404 .write()
1405 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1406 bootstrap_nodes.retain(|b| b.address != address);
1407 info!("Removed bootstrap node: {}", address);
1408 Ok(())
1409 }
1410
1411 async fn create_inner_endpoint(
1417 config: &NatTraversalConfig,
1418 ) -> Result<
1419 (
1420 InnerEndpoint,
1421 mpsc::UnboundedSender<NatTraversalEvent>,
1422 mpsc::UnboundedReceiver<NatTraversalEvent>,
1423 SocketAddr,
1424 ),
1425 NatTraversalError,
1426 > {
1427 use std::sync::Arc;
1428
1429 let server_config = {
1431 info!("Creating server config using Raw Public Keys (RFC 7250) for symmetric P2P node");
1432
1433 let (server_pub_key, server_sec_key) = match config.identity_key.clone() {
1437 Some(key) => {
1438 debug!("Using provided identity key for TLS authentication");
1439 key
1440 }
1441 None => {
1442 debug!(
1443 "No identity key provided - generating new keypair (identity mismatch warning)"
1444 );
1445 crate::crypto::raw_public_keys::key_utils::generate_ml_dsa_keypair().map_err(
1446 |e| {
1447 NatTraversalError::ConfigError(format!(
1448 "ML-DSA-65 keygen failed: {e:?}"
1449 ))
1450 },
1451 )?
1452 }
1453 };
1454
1455 let mut rpk_builder = RawPublicKeyConfigBuilder::new()
1457 .with_server_key(server_pub_key, server_sec_key)
1458 .allow_any_key(); if let Some(ref pqc) = config.pqc {
1461 rpk_builder = rpk_builder.with_pqc(pqc.clone());
1462 }
1463
1464 let rpk_config = rpk_builder.build_rfc7250_server_config().map_err(|e| {
1465 NatTraversalError::ConfigError(format!("RPK server config failed: {e}"))
1466 })?;
1467
1468 let server_crypto = QuicServerConfig::try_from(rpk_config.inner().as_ref().clone())
1469 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1470
1471 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
1472
1473 let mut transport_config = TransportConfig::default();
1475 transport_config.enable_address_discovery(true);
1476 transport_config
1477 .keep_alive_interval(Some(config.timeouts.nat_traversal.retry_interval));
1478 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1479
1480 let nat_config = crate::transport_parameters::NatTraversalConfig::ServerSupport {
1483 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1484 };
1485 transport_config.nat_traversal_config(Some(nat_config));
1486
1487 server_config.transport_config(Arc::new(transport_config));
1488
1489 Some(server_config)
1490 };
1491
1492 let client_config = {
1494 info!("Creating client config using Raw Public Keys (RFC 7250)");
1495
1496 let (client_pub_key, client_sec_key) = match config.identity_key.clone() {
1499 Some(key) => {
1500 debug!("Using provided identity key for client TLS authentication");
1501 key
1502 }
1503 None => {
1504 debug!("No identity key provided for client - generating new keypair");
1505 crate::crypto::raw_public_keys::key_utils::generate_ml_dsa_keypair().map_err(
1506 |e| {
1507 NatTraversalError::ConfigError(format!(
1508 "ML-DSA-65 keygen failed: {e:?}"
1509 ))
1510 },
1511 )?
1512 }
1513 };
1514
1515 let mut rpk_builder = RawPublicKeyConfigBuilder::new()
1518 .with_client_key(client_pub_key, client_sec_key) .allow_any_key(); if let Some(ref pqc) = config.pqc {
1522 rpk_builder = rpk_builder.with_pqc(pqc.clone());
1523 }
1524
1525 let rpk_config = rpk_builder.build_rfc7250_client_config().map_err(|e| {
1526 NatTraversalError::ConfigError(format!("RPK client config failed: {e}"))
1527 })?;
1528
1529 let client_crypto = QuicClientConfig::try_from(rpk_config.inner().as_ref().clone())
1530 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1531
1532 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
1533
1534 let mut transport_config = TransportConfig::default();
1536 transport_config.enable_address_discovery(true);
1537 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
1538 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1539
1540 let nat_config = crate::transport_parameters::NatTraversalConfig::ServerSupport {
1543 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1544 };
1545 transport_config.nat_traversal_config(Some(nat_config));
1546
1547 client_config.transport_config(Arc::new(transport_config));
1548
1549 client_config
1550 };
1551
1552 let bind_addr = config
1554 .bind_addr
1555 .unwrap_or_else(create_random_port_bind_addr);
1556 let socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1557 NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {e}"))
1558 })?;
1559
1560 info!("Binding endpoint to {}", bind_addr);
1561
1562 let std_socket = socket.into_std().map_err(|e| {
1564 NatTraversalError::NetworkError(format!("Failed to convert socket: {e}"))
1565 })?;
1566
1567 let runtime = default_runtime().ok_or_else(|| {
1569 NatTraversalError::ConfigError("No compatible async runtime found".to_string())
1570 })?;
1571
1572 let mut endpoint = InnerEndpoint::new(
1573 EndpointConfig::default(),
1574 server_config,
1575 std_socket,
1576 runtime,
1577 )
1578 .map_err(|e| {
1579 NatTraversalError::ConfigError(format!("Failed to create QUIC endpoint: {e}"))
1580 })?;
1581
1582 endpoint.set_default_client_config(client_config);
1584
1585 let local_addr = endpoint.local_addr().map_err(|e| {
1587 NatTraversalError::NetworkError(format!("Failed to get local address: {e}"))
1588 })?;
1589
1590 info!("Endpoint bound to actual address: {}", local_addr);
1591
1592 let (event_tx, event_rx) = mpsc::unbounded_channel();
1594
1595 Ok((endpoint, event_tx, event_rx, local_addr))
1596 }
1597
1598 #[allow(clippy::panic)]
1600 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1601 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
1602 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
1603 })?;
1604
1605 let _socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1607 NatTraversalError::NetworkError(format!("Failed to bind to {bind_addr}: {e}"))
1608 })?;
1609
1610 info!("Started listening on {}", bind_addr);
1611
1612 let endpoint_clone = endpoint.clone();
1614 let shutdown_clone = self.shutdown.clone();
1615 let event_tx = self
1616 .event_tx
1617 .as_ref()
1618 .unwrap_or_else(|| panic!("event transmitter should be initialized"))
1619 .clone();
1620 let connections_clone = self.connections.clone();
1621 let emitted_events_clone = self.emitted_established_events.clone();
1622
1623 tokio::spawn(async move {
1624 Self::accept_connections(
1625 endpoint_clone,
1626 shutdown_clone,
1627 event_tx,
1628 connections_clone,
1629 emitted_events_clone,
1630 )
1631 .await;
1632 });
1633
1634 Ok(())
1635 }
1636
1637 async fn accept_connections(
1639 endpoint: InnerEndpoint,
1640 shutdown: Arc<AtomicBool>,
1641 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1642 connections: Arc<std::sync::RwLock<HashMap<PeerId, InnerConnection>>>,
1643 emitted_events: Arc<std::sync::RwLock<std::collections::HashSet<PeerId>>>,
1644 ) {
1645 while !shutdown.load(Ordering::Relaxed) {
1646 match endpoint.accept().await {
1647 Some(connecting) => {
1648 let event_tx = event_tx.clone();
1649 let connections = connections.clone();
1650 let emitted_events = emitted_events.clone();
1651 tokio::spawn(async move {
1652 match connecting.await {
1653 Ok(connection) => {
1654 info!("Accepted connection from {}", connection.remote_address());
1655
1656 let peer_id = Self::derive_peer_id_from_connection(&connection)
1658 .unwrap_or_else(|| {
1659 Self::generate_peer_id_from_address(
1660 connection.remote_address(),
1661 )
1662 });
1663
1664 if let Ok(mut conns) = connections.write() {
1666 conns.insert(peer_id, connection.clone());
1667 }
1668
1669 let should_emit = if let Ok(mut emitted) = emitted_events.write() {
1671 emitted.insert(peer_id) } else {
1673 true };
1675
1676 if should_emit {
1677 let _ =
1678 event_tx.send(NatTraversalEvent::ConnectionEstablished {
1679 peer_id,
1680 remote_address: connection.remote_address(),
1681 });
1682 }
1683
1684 Self::handle_connection(peer_id, connection, event_tx).await;
1686 }
1687 Err(e) => {
1688 debug!("Connection failed: {}", e);
1689 }
1690 }
1691 });
1692 }
1693 None => {
1694 break;
1696 }
1697 }
1698 }
1699 }
1700
1701 async fn poll_discovery(
1703 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1704 shutdown: Arc<AtomicBool>,
1705 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1706 connections: Arc<std::sync::RwLock<HashMap<PeerId, InnerConnection>>>,
1707 ) {
1708 use tokio::time::{Duration, interval};
1709
1710 let mut poll_interval = interval(Duration::from_millis(100));
1711 let mut emitted_discovery = std::collections::HashSet::new();
1712
1713 while !shutdown.load(Ordering::Relaxed) {
1714 poll_interval.tick().await;
1715
1716 if let Ok(conns) = connections.read() {
1718 if !conns.is_empty() {
1719 debug!("Polling {} connections for observed addresses", conns.len());
1720 }
1721 for (peer_id, conn) in conns.iter() {
1722 if let Some(observed_addr) = conn.observed_address() {
1723 debug!(
1724 "Found observed address {} for peer {:?}",
1725 observed_addr, peer_id
1726 );
1727
1728 if emitted_discovery.insert((*peer_id, observed_addr)) {
1730 debug!("Emitting ExternalAddressDiscovered for peer {:?}", peer_id);
1731 let _ = event_tx.send(NatTraversalEvent::ExternalAddressDiscovered {
1732 reported_by: conn.remote_address(),
1733 address: observed_addr,
1734 });
1735 }
1736
1737 if let Ok(mut discovery) = discovery_manager.lock() {
1739 let _ =
1740 discovery.accept_quic_discovered_address(*peer_id, observed_addr);
1741 }
1742 }
1743 }
1744 }
1745
1746 let events = match discovery_manager.lock() {
1748 Ok(mut discovery) => discovery.poll(std::time::Instant::now()),
1749 Err(e) => {
1750 error!("Failed to lock discovery manager: {}", e);
1751 continue;
1752 }
1753 };
1754
1755 for event in events {
1757 match event {
1758 DiscoveryEvent::DiscoveryStarted {
1759 peer_id,
1760 bootstrap_count,
1761 } => {
1762 debug!(
1763 "Discovery started for peer {:?} with {} bootstrap nodes",
1764 peer_id, bootstrap_count
1765 );
1766 }
1767 DiscoveryEvent::LocalScanningStarted => {
1768 debug!("Local interface scanning started");
1769 }
1770 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1771 debug!("Discovered local candidate: {}", candidate.address);
1772 }
1775 DiscoveryEvent::LocalScanningCompleted {
1776 candidate_count,
1777 duration,
1778 } => {
1779 debug!(
1780 "Local interface scanning completed: {} candidates in {:?}",
1781 candidate_count, duration
1782 );
1783 }
1784 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1785 debug!(
1786 "Server reflexive discovery started with {} bootstrap nodes",
1787 bootstrap_count
1788 );
1789 }
1790 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1791 candidate,
1792 bootstrap_node,
1793 } => {
1794 debug!(
1795 "Discovered server-reflexive candidate {} via bootstrap {}",
1796 candidate.address, bootstrap_node
1797 );
1798
1799 let _ = event_tx.send(NatTraversalEvent::ExternalAddressDiscovered {
1801 reported_by: bootstrap_node,
1802 address: candidate.address,
1803 });
1804 }
1805 DiscoveryEvent::BootstrapQueryFailed {
1806 bootstrap_node,
1807 error,
1808 } => {
1809 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1810 }
1811 DiscoveryEvent::PortAllocationDetected {
1813 port,
1814 source_address,
1815 bootstrap_node,
1816 timestamp,
1817 } => {
1818 debug!(
1819 "Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1820 port, source_address, bootstrap_node, timestamp
1821 );
1822 }
1823 DiscoveryEvent::DiscoveryCompleted {
1824 candidate_count,
1825 total_duration,
1826 success_rate,
1827 } => {
1828 info!(
1829 "Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1830 candidate_count,
1831 total_duration,
1832 success_rate * 100.0
1833 );
1834 }
1837 DiscoveryEvent::DiscoveryFailed {
1838 error,
1839 partial_results,
1840 } => {
1841 warn!(
1842 "Discovery failed: {} (found {} partial candidates)",
1843 error,
1844 partial_results.len()
1845 );
1846
1847 }
1852 DiscoveryEvent::PathValidationRequested {
1853 candidate_id,
1854 candidate_address,
1855 challenge_token,
1856 } => {
1857 debug!(
1858 "PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1859 candidate_id.0, candidate_address, challenge_token
1860 );
1861 }
1864 DiscoveryEvent::PathValidationResponse {
1865 candidate_id,
1866 candidate_address,
1867 challenge_token: _,
1868 rtt,
1869 } => {
1870 debug!(
1871 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1872 candidate_id.0, candidate_address, rtt
1873 );
1874 }
1876 }
1877 }
1878 }
1879
1880 info!("Discovery polling task shutting down");
1881 }
1882
1883 async fn handle_connection(
1885 peer_id: PeerId,
1886 connection: InnerConnection,
1887 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1888 ) {
1889 let remote_address = connection.remote_address();
1890 let closed = connection.closed();
1891 tokio::pin!(closed);
1892
1893 debug!(
1894 "Handling connection from peer {:?} at {}",
1895 peer_id, remote_address
1896 );
1897
1898 closed.await;
1902
1903 let reason = connection
1904 .close_reason()
1905 .map(|reason| format!("Connection closed: {reason}"))
1906 .unwrap_or_else(|| "Connection closed".to_string());
1907 let _ = event_tx.send(NatTraversalEvent::ConnectionLost { peer_id, reason });
1908 }
1909
1910 async fn handle_bi_stream(
1912 _send: crate::high_level::SendStream,
1913 _recv: crate::high_level::RecvStream,
1914 ) {
1915 }
1944
1945 async fn handle_uni_stream(mut recv: crate::high_level::RecvStream) {
1947 let mut buffer = vec![0u8; 1024];
1948
1949 loop {
1950 match recv.read(&mut buffer).await {
1951 Ok(Some(size)) => {
1952 debug!("Received {} bytes on unidirectional stream", size);
1953 }
1955 Ok(None) => {
1956 debug!("Unidirectional stream closed by peer");
1957 break;
1958 }
1959 Err(e) => {
1960 debug!("Error reading from unidirectional stream: {}", e);
1961 break;
1962 }
1963 }
1964 }
1965 }
1966
1967 pub async fn connect_to_peer(
1969 &self,
1970 peer_id: PeerId,
1971 server_name: &str,
1972 remote_addr: SocketAddr,
1973 ) -> Result<InnerConnection, NatTraversalError> {
1974 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
1975 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
1976 })?;
1977
1978 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1979
1980 let connecting = endpoint.connect(remote_addr, server_name).map_err(|e| {
1982 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
1983 })?;
1984
1985 let connection = timeout(
1986 self.timeout_config
1987 .nat_traversal
1988 .connection_establishment_timeout,
1989 connecting,
1990 )
1991 .await
1992 .map_err(|_| NatTraversalError::Timeout)?
1993 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
1994
1995 info!(
1996 "Successfully connected to peer {:?} at {}",
1997 peer_id, remote_addr
1998 );
1999
2000 if let Some(ref event_tx) = self.event_tx {
2002 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
2003 peer_id,
2004 remote_address: remote_addr,
2005 });
2006 }
2007
2008 Ok(connection)
2009 }
2010
2011 pub async fn connect_with_fallback(
2016 &self,
2017 peer_id: PeerId,
2018 server_name: &str,
2019 remote_addr: SocketAddr,
2020 ) -> Result<InnerConnection, NatTraversalError> {
2021 match self
2023 .connect_to_peer(peer_id, server_name, remote_addr)
2024 .await
2025 {
2026 Ok(conn) => return Ok(conn),
2027 Err(e) if self.relay_manager.is_some() => {
2028 info!(
2029 "Direct connection to {:?} failed ({:?}), attempting relay fallback",
2030 peer_id, e
2031 );
2032 }
2033 Err(e) => return Err(e),
2034 }
2035
2036 let relay_manager = self.relay_manager.as_ref().ok_or_else(|| {
2038 NatTraversalError::ConnectionFailed("No relay nodes configured".to_string())
2039 })?;
2040
2041 let available_relays = relay_manager.available_relays().await;
2043 if available_relays.is_empty() {
2044 return Err(NatTraversalError::ConnectionFailed(
2045 "No available relay nodes".to_string(),
2046 ));
2047 }
2048
2049 for relay_addr in available_relays {
2051 info!("Attempting connection via relay: {}", relay_addr);
2052
2053 let request = relay_manager.create_connect_request();
2055
2056 debug!(
2063 "Relay fallback for {} via {} - request: {:?}",
2064 remote_addr, relay_addr, request
2065 );
2066 }
2067
2068 Err(NatTraversalError::ConnectionFailed(
2069 "All relay attempts failed".to_string(),
2070 ))
2071 }
2072
2073 pub fn relay_manager(&self) -> Option<Arc<RelayManager>> {
2077 self.relay_manager.clone()
2078 }
2079
2080 pub async fn has_relay_fallback(&self) -> bool {
2082 match &self.relay_manager {
2083 Some(manager) => manager.has_available_relay().await,
2084 None => false,
2085 }
2086 }
2087
2088 pub async fn accept_connection(&self) -> Result<(PeerId, InnerConnection), NatTraversalError> {
2090 debug!("Waiting for incoming connection via event channel...");
2091
2092 let timeout_duration = self
2093 .timeout_config
2094 .nat_traversal
2095 .connection_establishment_timeout;
2096 let start = std::time::Instant::now();
2097
2098 loop {
2099 if self.shutdown.load(Ordering::Relaxed) {
2101 return Err(NatTraversalError::NetworkError(
2102 "Endpoint shutting down".to_string(),
2103 ));
2104 }
2105
2106 if start.elapsed() > timeout_duration {
2108 warn!("accept_connection() timed out after {:?}", timeout_duration);
2109 return Err(NatTraversalError::Timeout);
2110 }
2111
2112 {
2114 let mut event_rx = self.event_rx.lock().map_err(|_| {
2115 NatTraversalError::ProtocolError("Event channel lock poisoned".to_string())
2116 })?;
2117
2118 match event_rx.try_recv() {
2119 Ok(NatTraversalEvent::ConnectionEstablished {
2120 peer_id,
2121 remote_address,
2122 }) => {
2123 info!(
2124 "Received ConnectionEstablished event for peer {:?} at {}",
2125 peer_id, remote_address
2126 );
2127
2128 let connection = {
2131 let connections = self.connections.read().map_err(|_| {
2132 NatTraversalError::ProtocolError(
2133 "Connections lock poisoned".to_string(),
2134 )
2135 })?;
2136 connections.get(&peer_id).cloned().ok_or_else(|| {
2137 NatTraversalError::ConnectionFailed(format!(
2138 "Connection for peer {:?} not found in storage",
2139 peer_id
2140 ))
2141 })?
2142 };
2143
2144 info!(
2145 "Retrieved accepted connection from peer {:?} at {}",
2146 peer_id, remote_address
2147 );
2148 return Ok((peer_id, connection));
2149 }
2150 Ok(event) => {
2151 debug!(
2153 "Ignoring non-connection event while waiting for accept: {:?}",
2154 event
2155 );
2156 }
2157 Err(mpsc::error::TryRecvError::Empty) => {
2158 }
2160 Err(mpsc::error::TryRecvError::Disconnected) => {
2161 return Err(NatTraversalError::NetworkError(
2162 "Event channel closed".to_string(),
2163 ));
2164 }
2165 }
2166 } tokio::time::sleep(Duration::from_millis(10)).await;
2170 }
2171 }
2172
2173 pub fn local_peer_id(&self) -> PeerId {
2175 self.local_peer_id
2176 }
2177
2178 pub fn get_connection(
2180 &self,
2181 peer_id: &PeerId,
2182 ) -> Result<Option<InnerConnection>, NatTraversalError> {
2183 let connections = self.connections.read().map_err(|_| {
2184 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2185 })?;
2186 Ok(connections.get(peer_id).cloned())
2187 }
2188
2189 pub fn add_connection(
2191 &self,
2192 peer_id: PeerId,
2193 connection: InnerConnection,
2194 ) -> Result<(), NatTraversalError> {
2195 let mut connections = self.connections.write().map_err(|_| {
2196 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2197 })?;
2198 connections.insert(peer_id, connection);
2199 Ok(())
2200 }
2201
2202 pub fn spawn_connection_handler(
2204 &self,
2205 peer_id: PeerId,
2206 connection: InnerConnection,
2207 ) -> Result<(), NatTraversalError> {
2208 let event_tx = self.event_tx.as_ref().cloned().ok_or_else(|| {
2209 NatTraversalError::ConfigError("NAT traversal event channel not configured".to_string())
2210 })?;
2211
2212 let remote_address = connection.remote_address();
2213
2214 let should_emit = if let Ok(mut emitted) = self.emitted_established_events.write() {
2216 emitted.insert(peer_id) } else {
2218 true };
2220
2221 if should_emit {
2222 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
2223 peer_id,
2224 remote_address,
2225 });
2226 }
2227
2228 tokio::spawn(async move {
2230 Self::handle_connection(peer_id, connection, event_tx).await;
2231 });
2232
2233 Ok(())
2234 }
2235
2236 pub fn remove_connection(
2238 &self,
2239 peer_id: &PeerId,
2240 ) -> Result<Option<InnerConnection>, NatTraversalError> {
2241 if let Ok(mut emitted) = self.emitted_established_events.write() {
2243 emitted.remove(peer_id);
2244 }
2245
2246 let mut connections = self.connections.write().map_err(|_| {
2247 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2248 })?;
2249 Ok(connections.remove(peer_id))
2250 }
2251
2252 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
2254 let connections = self.connections.read().map_err(|_| {
2255 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2256 })?;
2257 let mut result = Vec::new();
2258 for (peer_id, connection) in connections.iter() {
2259 result.push((*peer_id, connection.remote_address()));
2260 }
2261 Ok(result)
2262 }
2263
2264 pub fn get_observed_external_address(&self) -> Result<Option<SocketAddr>, NatTraversalError> {
2276 let connections = self.connections.read().map_err(|_| {
2277 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2278 })?;
2279
2280 let known_peer_addrs: std::collections::HashSet<_> =
2283 self.config.known_peers.iter().copied().collect();
2284
2285 for (_peer_id, connection) in connections.iter() {
2287 if known_peer_addrs.contains(&connection.remote_address()) {
2288 if let Some(addr) = connection.observed_address() {
2289 debug!(
2290 "Found observed external address {} from known peer connection",
2291 addr
2292 );
2293 return Ok(Some(addr));
2294 }
2295 }
2296 }
2297
2298 for (_peer_id, connection) in connections.iter() {
2300 if let Some(addr) = connection.observed_address() {
2301 debug!(
2302 "Found observed external address {} from peer connection",
2303 addr
2304 );
2305 return Ok(Some(addr));
2306 }
2307 }
2308
2309 debug!("No observed external address available from any connection");
2310 Ok(None)
2311 }
2312
2313 pub async fn handle_connection_data(
2315 &self,
2316 peer_id: PeerId,
2317 connection: &InnerConnection,
2318 ) -> Result<(), NatTraversalError> {
2319 info!("Handling connection data from peer {:?}", peer_id);
2320
2321 let connection_clone = connection.clone();
2323 let peer_id_clone = peer_id;
2324 tokio::spawn(async move {
2325 loop {
2326 match connection_clone.accept_bi().await {
2327 Ok((send, recv)) => {
2328 debug!(
2329 "Accepted bidirectional stream from peer {:?}",
2330 peer_id_clone
2331 );
2332 tokio::spawn(Self::handle_bi_stream(send, recv));
2333 }
2334 Err(ConnectionError::ApplicationClosed(_)) => {
2335 debug!("Connection closed by peer {:?}", peer_id_clone);
2336 break;
2337 }
2338 Err(e) => {
2339 debug!(
2340 "Error accepting bidirectional stream from peer {:?}: {}",
2341 peer_id_clone, e
2342 );
2343 break;
2344 }
2345 }
2346 }
2347 });
2348
2349 let connection_clone = connection.clone();
2351 let peer_id_clone = peer_id;
2352 tokio::spawn(async move {
2353 loop {
2354 match connection_clone.accept_uni().await {
2355 Ok(recv) => {
2356 debug!(
2357 "Accepted unidirectional stream from peer {:?}",
2358 peer_id_clone
2359 );
2360 tokio::spawn(Self::handle_uni_stream(recv));
2361 }
2362 Err(ConnectionError::ApplicationClosed(_)) => {
2363 debug!("Connection closed by peer {:?}", peer_id_clone);
2364 break;
2365 }
2366 Err(e) => {
2367 debug!(
2368 "Error accepting unidirectional stream from peer {:?}: {}",
2369 peer_id_clone, e
2370 );
2371 break;
2372 }
2373 }
2374 }
2375 });
2376
2377 Ok(())
2378 }
2379
2380 fn generate_local_peer_id() -> PeerId {
2382 use std::collections::hash_map::DefaultHasher;
2383 use std::hash::{Hash, Hasher};
2384 use std::time::SystemTime;
2385
2386 let mut hasher = DefaultHasher::new();
2387 SystemTime::now().hash(&mut hasher);
2388 std::process::id().hash(&mut hasher);
2389
2390 let hash = hasher.finish();
2391 let mut peer_id = [0u8; 32];
2392 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2393
2394 for i in 8..32 {
2396 peer_id[i] = rand::random();
2397 }
2398
2399 PeerId(peer_id)
2400 }
2401
2402 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
2408 use std::collections::hash_map::DefaultHasher;
2409 use std::hash::{Hash, Hasher};
2410
2411 let mut hasher = DefaultHasher::new();
2412 addr.hash(&mut hasher);
2413
2414 let hash = hasher.finish();
2415 let mut peer_id = [0u8; 32];
2416 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2417
2418 for i in 8..32 {
2421 peer_id[i] = rand::random();
2422 }
2423
2424 warn!(
2425 "Generated temporary peer ID from address {}. This ID is not persistent!",
2426 addr
2427 );
2428 PeerId(peer_id)
2429 }
2430
2431 fn derive_peer_id_from_connection(connection: &InnerConnection) -> Option<PeerId> {
2437 if let Some(identity) = connection.peer_identity() {
2438 if let Some(certs) =
2440 identity.downcast_ref::<Vec<rustls::pki_types::CertificateDer<'static>>>()
2441 {
2442 if let Some(cert) = certs.first() {
2443 let spki = cert.as_ref();
2445 if let Some(public_key) = extract_ml_dsa_from_spki(spki) {
2446 let peer_id =
2447 crate::crypto::raw_public_keys::pqc::derive_peer_id_from_public_key(
2448 &public_key,
2449 );
2450 debug!("Derived peer ID from ML-DSA-65 public key in SPKI");
2451 return Some(peer_id);
2452 } else {
2453 debug!(
2454 "Certificate is not ML-DSA-65 SPKI format (len={})",
2455 spki.len()
2456 );
2457 }
2458 }
2459 }
2460 }
2461
2462 None
2463 }
2464
2465 pub async fn extract_peer_id_from_connection(
2472 &self,
2473 connection: &InnerConnection,
2474 ) -> Option<PeerId> {
2475 Self::derive_peer_id_from_connection(connection)
2477 }
2478
2479 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
2481 self.shutdown.store(true, Ordering::Relaxed);
2483
2484 {
2486 let mut connections = self.connections.write().map_err(|_| {
2487 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2488 })?;
2489 for (peer_id, connection) in connections.drain() {
2490 info!("Closing connection to peer {:?}", peer_id);
2491 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
2492 }
2493 }
2494
2495 if let Some(ref endpoint) = self.inner_endpoint {
2497 endpoint.wait_idle().await;
2498 }
2499
2500 info!("NAT traversal endpoint shutdown completed");
2501 Ok(())
2502 }
2503
2504 pub async fn discover_candidates(
2506 &self,
2507 peer_id: PeerId,
2508 ) -> Result<Vec<CandidateAddress>, NatTraversalError> {
2509 debug!("Discovering address candidates for peer {:?}", peer_id);
2510
2511 let mut candidates = Vec::new();
2512
2513 let bootstrap_nodes = {
2515 let nodes = self
2516 .bootstrap_nodes
2517 .read()
2518 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2519 nodes.clone()
2520 };
2521
2522 {
2524 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2525 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2526 })?;
2527
2528 discovery
2529 .start_discovery(peer_id, bootstrap_nodes)
2530 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
2531 }
2532
2533 let timeout_duration = self.config.coordination_timeout;
2535 let start_time = std::time::Instant::now();
2536
2537 while start_time.elapsed() < timeout_duration {
2538 let discovery_events = {
2539 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2540 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2541 })?;
2542 discovery.poll(std::time::Instant::now())
2543 };
2544
2545 for event in discovery_events {
2546 match event {
2547 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2548 candidates.push(candidate.clone());
2549
2550 self.send_candidate_advertisement(peer_id, &candidate)
2552 .await
2553 .unwrap_or_else(|e| {
2554 debug!("Failed to send candidate advertisement: {}", e)
2555 });
2556 }
2557 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
2558 candidates.push(candidate.clone());
2559
2560 self.send_candidate_advertisement(peer_id, &candidate)
2562 .await
2563 .unwrap_or_else(|e| {
2564 debug!("Failed to send candidate advertisement: {}", e)
2565 });
2566 }
2567 DiscoveryEvent::DiscoveryCompleted { .. } => {
2569 return Ok(candidates);
2571 }
2572 DiscoveryEvent::DiscoveryFailed {
2573 error,
2574 partial_results,
2575 } => {
2576 candidates.extend(partial_results);
2578 if candidates.is_empty() {
2579 return Err(NatTraversalError::CandidateDiscoveryFailed(
2580 error.to_string(),
2581 ));
2582 }
2583 return Ok(candidates);
2584 }
2585 _ => {}
2586 }
2587 }
2588
2589 sleep(Duration::from_millis(10)).await;
2591 }
2592
2593 if candidates.is_empty() {
2594 Err(NatTraversalError::NoCandidatesFound)
2595 } else {
2596 Ok(candidates)
2597 }
2598 }
2599
2600 #[allow(dead_code)]
2602 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
2603 let mut frame = Vec::new();
2611
2612 frame.push(0x41);
2614
2615 frame.extend_from_slice(&peer_id.0);
2617
2618 let timestamp = std::time::SystemTime::now()
2620 .duration_since(std::time::UNIX_EPOCH)
2621 .unwrap_or_default()
2622 .as_millis() as u64;
2623 frame.extend_from_slice(×tamp.to_be_bytes());
2624
2625 let mut token = [0u8; 16];
2627 for byte in &mut token {
2628 *byte = rand::random();
2629 }
2630 frame.extend_from_slice(&token);
2631
2632 Ok(frame)
2633 }
2634
2635 #[allow(dead_code)]
2636 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
2637 debug!("Attempting hole punching for peer {:?}", peer_id);
2638
2639 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
2641
2642 if candidate_pairs.is_empty() {
2643 return Err(NatTraversalError::NoCandidatesFound);
2644 }
2645
2646 info!(
2647 "Generated {} candidate pairs for hole punching with peer {:?}",
2648 candidate_pairs.len(),
2649 peer_id
2650 );
2651
2652 self.attempt_quic_hole_punching(peer_id, candidate_pairs)
2655 }
2656
2657 #[allow(dead_code)]
2659 fn get_candidate_pairs_for_peer(
2660 &self,
2661 peer_id: PeerId,
2662 ) -> Result<Vec<CandidatePair>, NatTraversalError> {
2663 let discovery_candidates = {
2665 let discovery = self.discovery_manager.lock().map_err(|_| {
2666 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2667 })?;
2668
2669 discovery.get_candidates_for_peer(peer_id)
2670 };
2671
2672 if discovery_candidates.is_empty() {
2673 return Err(NatTraversalError::NoCandidatesFound);
2674 }
2675
2676 let mut candidate_pairs = Vec::new();
2678 let local_candidates = discovery_candidates
2679 .iter()
2680 .filter(|c| matches!(c.source, CandidateSource::Local))
2681 .collect::<Vec<_>>();
2682 let remote_candidates = discovery_candidates
2683 .iter()
2684 .filter(|c| !matches!(c.source, CandidateSource::Local))
2685 .collect::<Vec<_>>();
2686
2687 for local in &local_candidates {
2689 for remote in &remote_candidates {
2690 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
2691 candidate_pairs.push(CandidatePair {
2692 local_candidate: (*local).clone(),
2693 remote_candidate: (*remote).clone(),
2694 priority: pair_priority,
2695 state: CandidatePairState::Waiting,
2696 });
2697 }
2698 }
2699
2700 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
2702
2703 candidate_pairs.truncate(8);
2705
2706 Ok(candidate_pairs)
2707 }
2708
2709 #[allow(dead_code)]
2711 fn calculate_candidate_pair_priority(
2712 &self,
2713 local: &CandidateAddress,
2714 remote: &CandidateAddress,
2715 ) -> u64 {
2716 let local_type_preference = match local.source {
2720 CandidateSource::Local => 126,
2721 CandidateSource::Observed { .. } => 100,
2722 CandidateSource::Predicted => 75,
2723 CandidateSource::Peer => 50,
2724 };
2725
2726 let remote_type_preference = match remote.source {
2727 CandidateSource::Local => 126,
2728 CandidateSource::Observed { .. } => 100,
2729 CandidateSource::Predicted => 75,
2730 CandidateSource::Peer => 50,
2731 };
2732
2733 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
2735 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
2736
2737 let min_priority = local_priority.min(remote_priority);
2738 let max_priority = local_priority.max(remote_priority);
2739
2740 (min_priority << 32)
2741 | (max_priority << 1)
2742 | if local_priority > remote_priority {
2743 1
2744 } else {
2745 0
2746 }
2747 }
2748
2749 #[allow(dead_code)]
2751 fn attempt_quic_hole_punching(
2752 &self,
2753 peer_id: PeerId,
2754 candidate_pairs: Vec<CandidatePair>,
2755 ) -> Result<(), NatTraversalError> {
2756 let _endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
2757 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
2758 })?;
2759
2760 for pair in candidate_pairs {
2761 debug!(
2762 "Attempting hole punch with candidate pair: {} -> {}",
2763 pair.local_candidate.address, pair.remote_candidate.address
2764 );
2765
2766 let mut challenge_data = [0u8; 8];
2768 for byte in &mut challenge_data {
2769 *byte = rand::random();
2770 }
2771
2772 let local_socket =
2774 std::net::UdpSocket::bind(pair.local_candidate.address).map_err(|e| {
2775 NatTraversalError::NetworkError(format!(
2776 "Failed to bind to local candidate: {e}"
2777 ))
2778 })?;
2779
2780 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
2782
2783 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
2785 Ok(bytes_sent) => {
2786 debug!(
2787 "Sent {} bytes for hole punch from {} to {}",
2788 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address
2789 );
2790
2791 local_socket
2793 .set_read_timeout(Some(Duration::from_millis(100)))
2794 .map_err(|e| {
2795 NatTraversalError::NetworkError(format!("Failed to set timeout: {e}"))
2796 })?;
2797
2798 let mut response_buffer = [0u8; 1024];
2800 match local_socket.recv_from(&mut response_buffer) {
2801 Ok((_bytes_received, response_addr)) => {
2802 if response_addr == pair.remote_candidate.address {
2803 info!(
2804 "Hole punch succeeded for peer {:?}: {} <-> {}",
2805 peer_id,
2806 pair.local_candidate.address,
2807 pair.remote_candidate.address
2808 );
2809
2810 self.store_successful_candidate_pair(peer_id, pair)?;
2812 return Ok(());
2813 } else {
2814 debug!(
2815 "Received response from unexpected address: {}",
2816 response_addr
2817 );
2818 }
2819 }
2820 Err(e)
2821 if e.kind() == std::io::ErrorKind::WouldBlock
2822 || e.kind() == std::io::ErrorKind::TimedOut =>
2823 {
2824 debug!("No response received for hole punch attempt");
2825 }
2826 Err(e) => {
2827 debug!("Error receiving hole punch response: {}", e);
2828 }
2829 }
2830 }
2831 Err(e) => {
2832 debug!("Failed to send hole punch packet: {}", e);
2833 }
2834 }
2835 }
2836
2837 Err(NatTraversalError::HolePunchingFailed)
2839 }
2840
2841 fn create_path_challenge_packet(
2843 &self,
2844 challenge_data: [u8; 8],
2845 ) -> Result<Vec<u8>, NatTraversalError> {
2846 let mut packet = Vec::new();
2849
2850 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
2859 }
2860
2861 fn store_successful_candidate_pair(
2863 &self,
2864 peer_id: PeerId,
2865 pair: CandidatePair,
2866 ) -> Result<(), NatTraversalError> {
2867 debug!(
2868 "Storing successful candidate pair for peer {:?}: {} <-> {}",
2869 peer_id, pair.local_candidate.address, pair.remote_candidate.address
2870 );
2871
2872 if let Some(ref callback) = self.event_callback {
2877 callback(NatTraversalEvent::PathValidated {
2878 peer_id,
2879 address: pair.remote_candidate.address,
2880 rtt: Duration::from_millis(50), });
2882
2883 callback(NatTraversalEvent::TraversalSucceeded {
2884 peer_id,
2885 final_address: pair.remote_candidate.address,
2886 total_time: Duration::from_secs(1), });
2888 }
2889
2890 Ok(())
2891 }
2892
2893 fn attempt_connection_to_candidate(
2895 &self,
2896 peer_id: PeerId,
2897 candidate: &CandidateAddress,
2898 ) -> Result<(), NatTraversalError> {
2899 {
2900 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
2901 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
2902 })?;
2903
2904 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
2906
2907 debug!(
2908 "Attempting QUIC connection to candidate {} for peer {:?}",
2909 candidate.address, peer_id
2910 );
2911
2912 match endpoint.connect(candidate.address, &server_name) {
2914 Ok(connecting) => {
2915 info!(
2916 "Connection attempt initiated to {} for peer {:?}",
2917 candidate.address, peer_id
2918 );
2919
2920 if let Some(event_tx) = &self.event_tx {
2922 let event_tx = event_tx.clone();
2923 let connections = self.connections.clone();
2924 let peer_id_clone = peer_id;
2925 let address = candidate.address;
2926
2927 tokio::spawn(async move {
2928 match connecting.await {
2929 Ok(connection) => {
2930 info!(
2931 "Successfully connected to {} for peer {:?}",
2932 address, peer_id_clone
2933 );
2934
2935 if let Ok(mut conns) = connections.write() {
2937 conns.insert(peer_id_clone, connection.clone());
2938 }
2939
2940 let _ =
2942 event_tx.send(NatTraversalEvent::ConnectionEstablished {
2943 peer_id: peer_id_clone,
2944 remote_address: address,
2945 });
2946
2947 Self::handle_connection(peer_id_clone, connection, event_tx)
2949 .await;
2950 }
2951 Err(e) => {
2952 warn!("Connection to {} failed: {}", address, e);
2953 }
2954 }
2955 });
2956 }
2957
2958 Ok(())
2959 }
2960 Err(e) => {
2961 warn!(
2962 "Failed to initiate connection to {}: {}",
2963 candidate.address, e
2964 );
2965 Err(NatTraversalError::ConnectionFailed(format!(
2966 "Failed to connect to {}: {}",
2967 candidate.address, e
2968 )))
2969 }
2970 }
2971 }
2972 }
2973
2974 pub fn poll(
2976 &self,
2977 now: std::time::Instant,
2978 ) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
2979 let mut events = Vec::new();
2980
2981 {
2983 let mut event_rx = self.event_rx.lock().map_err(|_| {
2984 NatTraversalError::ProtocolError("Event channel lock poisoned".to_string())
2985 })?;
2986
2987 loop {
2988 match event_rx.try_recv() {
2989 Ok(event) => {
2990 if let Some(ref callback) = self.event_callback {
2991 callback(event.clone());
2992 }
2993 events.push(event);
2994 }
2995 Err(TryRecvError::Empty) => break,
2996 Err(TryRecvError::Disconnected) => break,
2997 }
2998 }
2999 }
3000
3001 let mut closed_connections = Vec::new();
3003 {
3004 let connections = self.connections.read().map_err(|_| {
3005 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3006 })?;
3007
3008 for (peer_id, connection) in connections.iter() {
3009 if let Some(reason) = connection.close_reason() {
3010 closed_connections.push((*peer_id, reason.clone()));
3011 }
3012 }
3013 }
3014
3015 if !closed_connections.is_empty() {
3016 let mut connections = self.connections.write().map_err(|_| {
3017 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3018 })?;
3019
3020 for (peer_id, reason) in closed_connections {
3021 connections.remove(&peer_id);
3022 let event = NatTraversalEvent::ConnectionLost {
3023 peer_id,
3024 reason: reason.to_string(),
3025 };
3026 if let Some(ref callback) = self.event_callback {
3027 callback(event.clone());
3028 }
3029 events.push(event);
3030 }
3031 }
3032
3033 self.check_connections_for_observed_addresses(&mut events)?;
3035
3036 {
3038 let mut discovery = self.discovery_manager.lock().map_err(|_| {
3039 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
3040 })?;
3041
3042 let discovery_events = discovery.poll(now);
3043
3044 for discovery_event in discovery_events {
3046 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
3047 events.push(nat_event.clone());
3048
3049 if let Some(ref callback) = self.event_callback {
3051 callback(nat_event.clone());
3052 }
3053
3054 if let NatTraversalEvent::CandidateDiscovered {
3056 peer_id: _,
3057 candidate: _,
3058 } = &nat_event
3059 {
3060 }
3063 }
3064 }
3065 }
3066
3067 let mut sessions = self
3069 .active_sessions
3070 .write()
3071 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
3072
3073 for (_peer_id, session) in sessions.iter_mut() {
3074 let elapsed = now.duration_since(session.started_at);
3075
3076 let timeout = self.get_phase_timeout(session.phase);
3078
3079 if elapsed > timeout {
3081 match session.phase {
3082 TraversalPhase::Discovery => {
3083 let discovered_candidates = {
3085 let discovery = self.discovery_manager.lock().map_err(|_| {
3086 NatTraversalError::ProtocolError(
3087 "Discovery manager lock poisoned".to_string(),
3088 )
3089 });
3090 match discovery {
3091 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
3092 Err(_) => Vec::new(),
3093 }
3094 };
3095
3096 session.candidates = discovered_candidates.clone();
3098
3099 if !session.candidates.is_empty() {
3101 session.phase = TraversalPhase::Coordination;
3103 let event = NatTraversalEvent::PhaseTransition {
3104 peer_id: session.peer_id,
3105 from_phase: TraversalPhase::Discovery,
3106 to_phase: TraversalPhase::Coordination,
3107 };
3108 events.push(event.clone());
3109 if let Some(ref callback) = self.event_callback {
3110 callback(event);
3111 }
3112 info!(
3113 "Peer {:?} advanced from Discovery to Coordination with {} candidates",
3114 session.peer_id,
3115 session.candidates.len()
3116 );
3117 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
3118 session.attempt += 1;
3120 session.started_at = now;
3121 let backoff_duration = self.calculate_backoff(session.attempt);
3122 warn!(
3123 "Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
3124 session.peer_id, session.attempt, backoff_duration
3125 );
3126 } else {
3127 session.phase = TraversalPhase::Failed;
3129 let event = NatTraversalEvent::TraversalFailed {
3130 peer_id: session.peer_id,
3131 error: NatTraversalError::NoCandidatesFound,
3132 fallback_available: self.config.enable_relay_fallback,
3133 };
3134 events.push(event.clone());
3135 if let Some(ref callback) = self.event_callback {
3136 callback(event);
3137 }
3138 error!(
3139 "NAT traversal failed for peer {:?}: no candidates found after {} attempts",
3140 session.peer_id, session.attempt
3141 );
3142 }
3143 }
3144 TraversalPhase::Coordination => {
3145 if let Some(coordinator) = self.select_coordinator() {
3147 match self.send_coordination_request(session.peer_id, coordinator) {
3148 Ok(_) => {
3149 session.phase = TraversalPhase::Synchronization;
3150 let event = NatTraversalEvent::CoordinationRequested {
3151 peer_id: session.peer_id,
3152 coordinator,
3153 };
3154 events.push(event.clone());
3155 if let Some(ref callback) = self.event_callback {
3156 callback(event);
3157 }
3158 info!(
3159 "Coordination requested for peer {:?} via {}",
3160 session.peer_id, coordinator
3161 );
3162 }
3163 Err(e) => {
3164 self.handle_phase_failure(session, now, &mut events, e);
3165 }
3166 }
3167 } else {
3168 self.handle_phase_failure(
3169 session,
3170 now,
3171 &mut events,
3172 NatTraversalError::NoBootstrapNodes,
3173 );
3174 }
3175 }
3176 TraversalPhase::Synchronization => {
3177 if self.is_peer_synchronized(&session.peer_id) {
3179 session.phase = TraversalPhase::Punching;
3180 let event = NatTraversalEvent::HolePunchingStarted {
3181 peer_id: session.peer_id,
3182 targets: session.candidates.iter().map(|c| c.address).collect(),
3183 };
3184 events.push(event.clone());
3185 if let Some(ref callback) = self.event_callback {
3186 callback(event);
3187 }
3188 if let Err(e) =
3190 self.initiate_hole_punching(session.peer_id, &session.candidates)
3191 {
3192 self.handle_phase_failure(session, now, &mut events, e);
3193 }
3194 } else {
3195 self.handle_phase_failure(
3196 session,
3197 now,
3198 &mut events,
3199 NatTraversalError::ProtocolError(
3200 "Synchronization timeout".to_string(),
3201 ),
3202 );
3203 }
3204 }
3205 TraversalPhase::Punching => {
3206 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
3208 session.phase = TraversalPhase::Validation;
3209 let event = NatTraversalEvent::PathValidated {
3210 peer_id: session.peer_id,
3211 address: successful_path,
3212 rtt: Duration::from_millis(50), };
3214 events.push(event.clone());
3215 if let Some(ref callback) = self.event_callback {
3216 callback(event);
3217 }
3218 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
3220 self.handle_phase_failure(session, now, &mut events, e);
3221 }
3222 } else {
3223 self.handle_phase_failure(
3224 session,
3225 now,
3226 &mut events,
3227 NatTraversalError::PunchingFailed(
3228 "No successful punch".to_string(),
3229 ),
3230 );
3231 }
3232 }
3233 TraversalPhase::Validation => {
3234 if self.is_path_validated(&session.peer_id) {
3236 session.phase = TraversalPhase::Connected;
3237 let event = NatTraversalEvent::TraversalSucceeded {
3238 peer_id: session.peer_id,
3239 final_address: session
3240 .candidates
3241 .first()
3242 .map(|c| c.address)
3243 .unwrap_or_else(create_random_port_bind_addr),
3244 total_time: elapsed,
3245 };
3246 events.push(event.clone());
3247 if let Some(ref callback) = self.event_callback {
3248 callback(event);
3249 }
3250 info!(
3251 "NAT traversal succeeded for peer {:?} in {:?}",
3252 session.peer_id, elapsed
3253 );
3254 } else {
3255 self.handle_phase_failure(
3256 session,
3257 now,
3258 &mut events,
3259 NatTraversalError::ValidationFailed(
3260 "Path validation timeout".to_string(),
3261 ),
3262 );
3263 }
3264 }
3265 TraversalPhase::Connected => {
3266 if !self.is_connection_healthy(&session.peer_id) {
3268 warn!(
3269 "Connection to peer {:?} is no longer healthy",
3270 session.peer_id
3271 );
3272 }
3274 }
3275 TraversalPhase::Failed => {
3276 }
3278 }
3279 }
3280 }
3281
3282 Ok(events)
3283 }
3284
3285 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
3287 match phase {
3288 TraversalPhase::Discovery => Duration::from_secs(10),
3289 TraversalPhase::Coordination => self.config.coordination_timeout,
3290 TraversalPhase::Synchronization => Duration::from_secs(3),
3291 TraversalPhase::Punching => Duration::from_secs(5),
3292 TraversalPhase::Validation => Duration::from_secs(5),
3293 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
3295 }
3296 }
3297
3298 fn calculate_backoff(&self, attempt: u32) -> Duration {
3300 let base = Duration::from_millis(1000);
3301 let max = Duration::from_secs(30);
3302 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
3303 let jitter = std::time::Duration::from_millis((rand::random::<u64>() % 200) as u64);
3304 backoff.min(max) + jitter
3305 }
3306
3307 fn check_connections_for_observed_addresses(
3309 &self,
3310 _events: &mut Vec<NatTraversalEvent>,
3311 ) -> Result<(), NatTraversalError> {
3312 let connections = self.connections.read().map_err(|_| {
3314 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3315 })?;
3316
3317 if !connections.is_empty() {
3325 for (_peer_id, connection) in connections.iter() {
3327 let remote_addr = connection.remote_address();
3328
3329 let is_bootstrap = {
3331 let bootstrap_nodes = self.bootstrap_nodes.read().map_err(|_| {
3332 NatTraversalError::ProtocolError(
3333 "Bootstrap nodes lock poisoned".to_string(),
3334 )
3335 })?;
3336 bootstrap_nodes
3337 .iter()
3338 .any(|node| node.address == remote_addr)
3339 };
3340
3341 if is_bootstrap {
3342 debug!(
3345 "Bootstrap connection to {} should provide our external address via OBSERVED_ADDRESS frames",
3346 remote_addr
3347 );
3348
3349 }
3352 }
3353 }
3354
3355 Ok(())
3356 }
3357
3358 fn handle_phase_failure(
3360 &self,
3361 session: &mut NatTraversalSession,
3362 now: std::time::Instant,
3363 events: &mut Vec<NatTraversalEvent>,
3364 error: NatTraversalError,
3365 ) {
3366 if session.attempt < self.config.max_concurrent_attempts as u32 {
3367 session.attempt += 1;
3369 session.started_at = now;
3370 let backoff = self.calculate_backoff(session.attempt);
3371 warn!(
3372 "Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
3373 session.phase, session.peer_id, error, session.attempt, backoff
3374 );
3375 } else {
3376 session.phase = TraversalPhase::Failed;
3378 let event = NatTraversalEvent::TraversalFailed {
3379 peer_id: session.peer_id,
3380 error,
3381 fallback_available: self.config.enable_relay_fallback,
3382 };
3383 events.push(event.clone());
3384 if let Some(ref callback) = self.event_callback {
3385 callback(event);
3386 }
3387 error!(
3388 "NAT traversal failed for peer {:?} after {} attempts",
3389 session.peer_id, session.attempt
3390 );
3391 }
3392 }
3393
3394 fn select_coordinator(&self) -> Option<SocketAddr> {
3396 if let Ok(nodes) = self.bootstrap_nodes.read() {
3397 if !nodes.is_empty() {
3399 let idx = rand::random::<usize>() % nodes.len();
3400 return Some(nodes[idx].address);
3401 }
3402 }
3403 None
3404 }
3405
3406 fn send_coordination_request(
3408 &self,
3409 peer_id: PeerId,
3410 coordinator: SocketAddr,
3411 ) -> Result<(), NatTraversalError> {
3412 debug!(
3413 "Sending coordination request for peer {:?} to {}",
3414 peer_id, coordinator
3415 );
3416
3417 {
3418 if let Ok(connections) = self.connections.read() {
3420 for (_peer, conn) in connections.iter() {
3422 if conn.remote_address() == coordinator {
3423 info!("Found existing connection to coordinator {}", coordinator);
3427 return Ok(());
3428 }
3429 }
3430 }
3431
3432 info!("Establishing connection to coordinator {}", coordinator);
3434 if let Some(endpoint) = &self.inner_endpoint {
3435 let server_name = format!("bootstrap-{}", coordinator.ip());
3436 match endpoint.connect(coordinator, &server_name) {
3437 Ok(connecting) => {
3438 info!("Initiated connection to coordinator {}", coordinator);
3440
3441 if let Some(event_tx) = &self.event_tx {
3443 let event_tx = event_tx.clone();
3444 let connections = self.connections.clone();
3445 let peer_id_clone = peer_id;
3446
3447 tokio::spawn(async move {
3448 match connecting.await {
3449 Ok(connection) => {
3450 info!("Connected to coordinator {}", coordinator);
3451
3452 let bootstrap_peer_id =
3454 Self::generate_peer_id_from_address(coordinator);
3455
3456 if let Ok(mut conns) = connections.write() {
3458 conns.insert(bootstrap_peer_id, connection.clone());
3459 }
3460
3461 Self::handle_connection(
3463 peer_id_clone,
3464 connection,
3465 event_tx,
3466 )
3467 .await;
3468 }
3469 Err(e) => {
3470 warn!(
3471 "Failed to connect to coordinator {}: {}",
3472 coordinator, e
3473 );
3474 }
3475 }
3476 });
3477 }
3478
3479 Ok(())
3482 }
3483 Err(e) => Err(NatTraversalError::CoordinationFailed(format!(
3484 "Failed to connect to coordinator {coordinator}: {e}"
3485 ))),
3486 }
3487 } else {
3488 Err(NatTraversalError::ConfigError(
3489 "QUIC endpoint not initialized".to_string(),
3490 ))
3491 }
3492 }
3493 }
3494
3495 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
3497 debug!("Checking synchronization status for peer {:?}", peer_id);
3498
3499 if let Ok(sessions) = self.active_sessions.read() {
3501 if let Some(session) = sessions.get(peer_id) {
3502 let has_candidates = !session.candidates.is_empty();
3505 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
3506
3507 debug!(
3508 "Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
3509 peer_id,
3510 session.phase,
3511 session.candidates.len(),
3512 past_discovery
3513 );
3514
3515 if has_candidates && past_discovery {
3516 info!(
3517 "Peer {:?} is synchronized with {} candidates",
3518 peer_id,
3519 session.candidates.len()
3520 );
3521 return true;
3522 }
3523
3524 if session.phase == TraversalPhase::Synchronization && has_candidates {
3526 info!(
3527 "Peer {:?} in synchronization phase with {} candidates, considering synchronized",
3528 peer_id,
3529 session.candidates.len()
3530 );
3531 return true;
3532 }
3533
3534 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
3536 info!(
3537 "Test mode: Considering peer {:?} synchronized in phase {:?}",
3538 peer_id, session.phase
3539 );
3540 return true;
3541 }
3542 }
3543 }
3544
3545 warn!("Peer {:?} is not synchronized", peer_id);
3546 false
3547 }
3548
3549 fn initiate_hole_punching(
3551 &self,
3552 peer_id: PeerId,
3553 candidates: &[CandidateAddress],
3554 ) -> Result<(), NatTraversalError> {
3555 if candidates.is_empty() {
3556 return Err(NatTraversalError::NoCandidatesFound);
3557 }
3558
3559 info!(
3560 "Initiating hole punching for peer {:?} to {} candidates",
3561 peer_id,
3562 candidates.len()
3563 );
3564
3565 {
3566 for candidate in candidates {
3568 debug!(
3569 "Attempting QUIC connection to candidate: {}",
3570 candidate.address
3571 );
3572
3573 match self.attempt_connection_to_candidate(peer_id, candidate) {
3575 Ok(_) => {
3576 info!(
3577 "Successfully initiated connection attempt to {}",
3578 candidate.address
3579 );
3580 }
3581 Err(e) => {
3582 warn!(
3583 "Failed to initiate connection to {}: {:?}",
3584 candidate.address, e
3585 );
3586 }
3587 }
3588 }
3589
3590 Ok(())
3591 }
3592 }
3593
3594 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
3596 {
3597 if let Ok(connections) = self.connections.read() {
3599 if let Some(conn) = connections.get(peer_id) {
3600 let addr = conn.remote_address();
3602 info!(
3603 "Found successful connection to peer {:?} at {}",
3604 peer_id, addr
3605 );
3606 return Some(addr);
3607 }
3608 }
3609 }
3610
3611 if let Ok(sessions) = self.active_sessions.read() {
3613 if let Some(session) = sessions.get(peer_id) {
3614 for candidate in &session.candidates {
3616 if matches!(candidate.state, CandidateState::Valid) {
3617 info!(
3618 "Found validated candidate for peer {:?} at {}",
3619 peer_id, candidate.address
3620 );
3621 return Some(candidate.address);
3622 }
3623 }
3624
3625 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
3627 let addr = session.candidates[0].address;
3628 info!(
3629 "Simulating successful punch for testing: peer {:?} at {}",
3630 peer_id, addr
3631 );
3632 return Some(addr);
3633 }
3634
3635 if let Some(first) = session.candidates.first() {
3637 debug!(
3638 "No validated candidates, using first candidate {} for peer {:?}",
3639 first.address, peer_id
3640 );
3641 return Some(first.address);
3642 }
3643 }
3644 }
3645
3646 warn!("No successful punch results for peer {:?}", peer_id);
3647 None
3648 }
3649
3650 fn validate_path(&self, peer_id: PeerId, address: SocketAddr) -> Result<(), NatTraversalError> {
3652 debug!("Validating path to peer {:?} at {}", peer_id, address);
3653
3654 {
3655 if let Ok(connections) = self.connections.read() {
3657 if let Some(conn) = connections.get(&peer_id) {
3658 if conn.remote_address() == address {
3660 info!(
3661 "Path validation successful for peer {:?} at {}",
3662 peer_id, address
3663 );
3664
3665 if let Ok(mut sessions) = self.active_sessions.write() {
3667 if let Some(session) = sessions.get_mut(&peer_id) {
3668 for candidate in &mut session.candidates {
3669 if candidate.address == address {
3670 candidate.state = CandidateState::Valid;
3671 break;
3672 }
3673 }
3674 }
3675 }
3676
3677 return Ok(());
3678 } else {
3679 warn!(
3680 "Connection address mismatch: expected {}, got {}",
3681 address,
3682 conn.remote_address()
3683 );
3684 }
3685 }
3686 }
3687
3688 Err(NatTraversalError::ValidationFailed(format!(
3690 "No connection found for peer {peer_id:?} at {address}"
3691 )))
3692 }
3693 }
3694
3695 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
3697 debug!("Checking path validation for peer {:?}", peer_id);
3698
3699 {
3700 if let Ok(connections) = self.connections.read() {
3702 if connections.contains_key(peer_id) {
3703 info!("Path validated: connection exists for peer {:?}", peer_id);
3704 return true;
3705 }
3706 }
3707 }
3708
3709 if let Ok(sessions) = self.active_sessions.read() {
3711 if let Some(session) = sessions.get(peer_id) {
3712 let validated = session
3713 .candidates
3714 .iter()
3715 .any(|c| matches!(c.state, CandidateState::Valid));
3716
3717 if validated {
3718 info!(
3719 "Path validated: found validated candidate for peer {:?}",
3720 peer_id
3721 );
3722 return true;
3723 }
3724 }
3725 }
3726
3727 warn!("Path not validated for peer {:?}", peer_id);
3728 false
3729 }
3730
3731 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
3733 {
3736 if let Ok(connections) = self.connections.read() {
3737 if let Some(_conn) = connections.get(peer_id) {
3738 return true; }
3743 }
3744 }
3745 true
3746 }
3747
3748 fn convert_discovery_event(
3750 &self,
3751 discovery_event: DiscoveryEvent,
3752 ) -> Option<NatTraversalEvent> {
3753 let current_peer_id = self.get_current_discovery_peer_id();
3755
3756 match discovery_event {
3757 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
3758 Some(NatTraversalEvent::CandidateDiscovered {
3759 peer_id: current_peer_id,
3760 candidate,
3761 })
3762 }
3763 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
3764 candidate,
3765 bootstrap_node: _,
3766 } => Some(NatTraversalEvent::CandidateDiscovered {
3767 peer_id: current_peer_id,
3768 candidate,
3769 }),
3770 DiscoveryEvent::DiscoveryCompleted {
3772 candidate_count: _,
3773 total_duration: _,
3774 success_rate: _,
3775 } => {
3776 None }
3779 DiscoveryEvent::DiscoveryFailed {
3780 error,
3781 partial_results,
3782 } => Some(NatTraversalEvent::TraversalFailed {
3783 peer_id: current_peer_id,
3784 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
3785 fallback_available: !partial_results.is_empty(),
3786 }),
3787 _ => None, }
3789 }
3790
3791 fn get_current_discovery_peer_id(&self) -> PeerId {
3793 if let Ok(sessions) = self.active_sessions.read() {
3795 if let Some((peer_id, _session)) = sessions
3796 .iter()
3797 .find(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
3798 {
3799 return *peer_id;
3800 }
3801
3802 if let Some((peer_id, _)) = sessions.iter().next() {
3804 return *peer_id;
3805 }
3806 }
3807
3808 self.local_peer_id
3810 }
3811
3812 #[allow(dead_code)]
3814 pub(crate) async fn handle_endpoint_event(
3815 &self,
3816 event: crate::shared::EndpointEventInner,
3817 ) -> Result<(), NatTraversalError> {
3818 match event {
3819 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
3820 info!(
3821 "NAT candidate validation succeeded for {} with challenge {:016x}",
3822 address, challenge
3823 );
3824
3825 let mut sessions = self.active_sessions.write().map_err(|_| {
3827 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3828 })?;
3829
3830 for (peer_id, session) in sessions.iter_mut() {
3832 if session.candidates.iter().any(|c| c.address == address) {
3833 session.phase = TraversalPhase::Connected;
3835
3836 if let Some(ref callback) = self.event_callback {
3838 callback(NatTraversalEvent::CandidateValidated {
3839 peer_id: *peer_id,
3840 candidate_address: address,
3841 });
3842 }
3843
3844 return self
3846 .establish_connection_to_validated_candidate(*peer_id, address)
3847 .await;
3848 }
3849 }
3850
3851 debug!(
3852 "Validated candidate {} not found in active sessions",
3853 address
3854 );
3855 Ok(())
3856 }
3857
3858 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
3859 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
3860
3861 let target_peer = PeerId(target_peer_id);
3863
3864 let connections = self.connections.read().map_err(|_| {
3866 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3867 })?;
3868
3869 if let Some(connection) = connections.get(&target_peer) {
3870 let mut send_stream = connection.open_uni().await.map_err(|e| {
3872 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3873 })?;
3874
3875 let mut frame_data = Vec::new();
3877 punch_frame.encode(&mut frame_data);
3878
3879 send_stream.write_all(&frame_data).await.map_err(|e| {
3880 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3881 })?;
3882
3883 let _ = send_stream.finish();
3884
3885 debug!(
3886 "Successfully relayed PUNCH_ME_NOW frame to peer {:?}",
3887 target_peer
3888 );
3889 Ok(())
3890 } else {
3891 warn!("No connection found for target peer {:?}", target_peer);
3892 Err(NatTraversalError::PeerNotConnected)
3893 }
3894 }
3895
3896 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
3897 info!(
3898 "Sending AddAddress frame for address {}",
3899 add_address_frame.address
3900 );
3901
3902 let connections = self.connections.read().map_err(|_| {
3904 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3905 })?;
3906
3907 for (peer_id, connection) in connections.iter() {
3908 let mut send_stream = connection.open_uni().await.map_err(|e| {
3910 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3911 })?;
3912
3913 let mut frame_data = Vec::new();
3915 add_address_frame.encode(&mut frame_data);
3916
3917 send_stream.write_all(&frame_data).await.map_err(|e| {
3918 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3919 })?;
3920
3921 let _ = send_stream.finish();
3922
3923 debug!("Sent AddAddress frame to peer {:?}", peer_id);
3924 }
3925
3926 Ok(())
3927 }
3928
3929 _ => {
3930 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
3932 Ok(())
3933 }
3934 }
3935 }
3936
3937 #[allow(dead_code)]
3939 async fn establish_connection_to_validated_candidate(
3940 &self,
3941 peer_id: PeerId,
3942 candidate_address: SocketAddr,
3943 ) -> Result<(), NatTraversalError> {
3944 info!(
3945 "Establishing connection to validated candidate {} for peer {:?}",
3946 candidate_address, peer_id
3947 );
3948
3949 let endpoint = self.inner_endpoint.as_ref().ok_or_else(|| {
3950 NatTraversalError::ConfigError("QUIC endpoint not initialized".to_string())
3951 })?;
3952
3953 let connecting = endpoint
3955 .connect(candidate_address, "nat-traversal-peer")
3956 .map_err(|e| {
3957 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
3958 })?;
3959
3960 let connection = timeout(
3961 self.timeout_config
3962 .nat_traversal
3963 .connection_establishment_timeout,
3964 connecting,
3965 )
3966 .await
3967 .map_err(|_| NatTraversalError::Timeout)?
3968 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
3969
3970 {
3972 let mut connections = self.connections.write().map_err(|_| {
3973 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3974 })?;
3975 connections.insert(peer_id, connection.clone());
3976 }
3977
3978 {
3980 let mut sessions = self.active_sessions.write().map_err(|_| {
3981 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3982 })?;
3983 if let Some(session) = sessions.get_mut(&peer_id) {
3984 session.phase = TraversalPhase::Connected;
3985 }
3986 }
3987
3988 if let Some(ref callback) = self.event_callback {
3990 callback(NatTraversalEvent::ConnectionEstablished {
3991 peer_id,
3992 remote_address: candidate_address,
3993 });
3994 }
3995
3996 info!(
3997 "Successfully established connection to peer {:?} at {}",
3998 peer_id, candidate_address
3999 );
4000 Ok(())
4001 }
4002
4003 async fn send_candidate_advertisement(
4009 &self,
4010 peer_id: PeerId,
4011 candidate: &CandidateAddress,
4012 ) -> Result<(), NatTraversalError> {
4013 debug!(
4014 "Sending candidate advertisement to peer {:?}: {}",
4015 peer_id, candidate.address
4016 );
4017
4018 let mut guard = self.connections.write().map_err(|_| {
4020 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
4021 })?;
4022
4023 if let Some(conn) = guard.get_mut(&peer_id) {
4024 match conn.send_nat_address_advertisement(candidate.address, candidate.priority) {
4026 Ok(seq) => {
4027 info!(
4028 "Queued ADD_ADDRESS via connection API: peer={:?}, addr={}, priority={}, seq={}",
4029 peer_id, candidate.address, candidate.priority, seq
4030 );
4031 Ok(())
4032 }
4033 Err(e) => Err(NatTraversalError::ProtocolError(format!(
4034 "Failed to queue ADD_ADDRESS: {e:?}"
4035 ))),
4036 }
4037 } else {
4038 debug!("No active connection for peer {:?}", peer_id);
4039 Ok(())
4040 }
4041 }
4042
4043 #[allow(dead_code)]
4048 async fn send_punch_coordination(
4049 &self,
4050 peer_id: PeerId,
4051 paired_with_sequence_number: u64,
4052 address: SocketAddr,
4053 round: u32,
4054 ) -> Result<(), NatTraversalError> {
4055 debug!(
4056 "Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
4057 peer_id, paired_with_sequence_number, address, round
4058 );
4059
4060 let mut guard = self.connections.write().map_err(|_| {
4061 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
4062 })?;
4063
4064 if let Some(conn) = guard.get_mut(&peer_id) {
4065 conn.send_nat_punch_coordination(paired_with_sequence_number, address, round)
4066 .map_err(|e| {
4067 NatTraversalError::ProtocolError(format!("Failed to queue PUNCH_ME_NOW: {e:?}"))
4068 })
4069 } else {
4070 Err(NatTraversalError::PeerNotConnected)
4071 }
4072 }
4073
4074 #[allow(clippy::panic)]
4076 pub fn get_nat_stats(
4077 &self,
4078 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
4079 Ok(NatTraversalStatistics {
4082 active_sessions: self
4083 .active_sessions
4084 .read()
4085 .unwrap_or_else(|_| panic!("active sessions lock should be valid"))
4086 .len(),
4087 total_bootstrap_nodes: self
4088 .bootstrap_nodes
4089 .read()
4090 .unwrap_or_else(|_| panic!("bootstrap nodes lock should be valid"))
4091 .len(),
4092 successful_coordinations: 7,
4093 average_coordination_time: self.timeout_config.nat_traversal.retry_interval,
4094 total_attempts: 10,
4095 successful_connections: 7,
4096 direct_connections: 5,
4097 relayed_connections: 2,
4098 })
4099 }
4100}
4101
4102impl fmt::Debug for NatTraversalEndpoint {
4103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4104 f.debug_struct("NatTraversalEndpoint")
4105 .field("config", &self.config)
4106 .field("bootstrap_nodes", &"<RwLock>")
4107 .field("active_sessions", &"<RwLock>")
4108 .field("event_callback", &self.event_callback.is_some())
4109 .finish()
4110 }
4111}
4112
4113#[derive(Debug, Clone, Default)]
4115pub struct NatTraversalStatistics {
4116 pub active_sessions: usize,
4118 pub total_bootstrap_nodes: usize,
4120 pub successful_coordinations: u32,
4122 pub average_coordination_time: Duration,
4124 pub total_attempts: u32,
4126 pub successful_connections: u32,
4128 pub direct_connections: u32,
4130 pub relayed_connections: u32,
4132}
4133
4134impl fmt::Display for NatTraversalError {
4135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4136 match self {
4137 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
4138 Self::NoCandidatesFound => write!(f, "no address candidates found"),
4139 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {msg}"),
4140 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {msg}"),
4141 Self::HolePunchingFailed => write!(f, "hole punching failed"),
4142 Self::PunchingFailed(msg) => write!(f, "punching failed: {msg}"),
4143 Self::ValidationFailed(msg) => write!(f, "validation failed: {msg}"),
4144 Self::ValidationTimeout => write!(f, "validation timeout"),
4145 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
4146 Self::ConfigError(msg) => write!(f, "configuration error: {msg}"),
4147 Self::ProtocolError(msg) => write!(f, "protocol error: {msg}"),
4148 Self::Timeout => write!(f, "operation timed out"),
4149 Self::ConnectionFailed(msg) => write!(f, "connection failed: {msg}"),
4150 Self::TraversalFailed(msg) => write!(f, "traversal failed: {msg}"),
4151 Self::PeerNotConnected => write!(f, "peer not connected"),
4152 }
4153 }
4154}
4155
4156impl std::error::Error for NatTraversalError {}
4157
4158impl fmt::Display for PeerId {
4159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4160 for byte in &self.0[..8] {
4162 write!(f, "{byte:02x}")?;
4163 }
4164 Ok(())
4165 }
4166}
4167
4168impl From<[u8; 32]> for PeerId {
4169 fn from(bytes: [u8; 32]) -> Self {
4170 Self(bytes)
4171 }
4172}
4173
4174#[derive(Debug)]
4177#[allow(dead_code)]
4178struct SkipServerVerification;
4179
4180impl SkipServerVerification {
4181 #[allow(dead_code)]
4182 fn new() -> Arc<Self> {
4183 Arc::new(Self)
4184 }
4185}
4186
4187impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
4188 fn verify_server_cert(
4189 &self,
4190 _end_entity: &rustls::pki_types::CertificateDer<'_>,
4191 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
4192 _server_name: &rustls::pki_types::ServerName<'_>,
4193 _ocsp_response: &[u8],
4194 _now: rustls::pki_types::UnixTime,
4195 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
4196 Ok(rustls::client::danger::ServerCertVerified::assertion())
4197 }
4198
4199 fn verify_tls12_signature(
4200 &self,
4201 _message: &[u8],
4202 _cert: &rustls::pki_types::CertificateDer<'_>,
4203 _dss: &rustls::DigitallySignedStruct,
4204 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
4205 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
4206 }
4207
4208 fn verify_tls13_signature(
4209 &self,
4210 _message: &[u8],
4211 _cert: &rustls::pki_types::CertificateDer<'_>,
4212 _dss: &rustls::DigitallySignedStruct,
4213 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
4214 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
4215 }
4216
4217 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
4218 vec![rustls::SignatureScheme::ML_DSA_65]
4220 }
4221}
4222
4223#[allow(dead_code)]
4225struct DefaultTokenStore;
4226
4227impl crate::TokenStore for DefaultTokenStore {
4228 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
4229 }
4231
4232 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
4233 None
4234 }
4235}
4236
4237#[cfg(test)]
4238mod tests {
4239 use super::*;
4240
4241 #[test]
4242 fn test_nat_traversal_config_default() {
4243 let config = NatTraversalConfig::default();
4244 assert!(config.known_peers.is_empty());
4246 assert_eq!(config.max_candidates, 8);
4247 assert!(config.enable_symmetric_nat);
4248 assert!(config.enable_relay_fallback);
4249 }
4250
4251 #[test]
4252 fn test_peer_id_display() {
4253 let peer_id = PeerId([
4254 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
4255 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33,
4256 0x44, 0x55, 0x66, 0x77,
4257 ]);
4258 assert_eq!(format!("{peer_id}"), "0123456789abcdef");
4259 }
4260
4261 #[test]
4262 fn test_bootstrap_node_management() {
4263 let _config = NatTraversalConfig::default();
4264 }
4267
4268 #[test]
4269 fn test_candidate_address_validation() {
4270 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4271
4272 assert!(
4274 CandidateAddress::validate_address(&SocketAddr::new(
4275 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4276 8080
4277 ))
4278 .is_ok()
4279 );
4280
4281 assert!(
4282 CandidateAddress::validate_address(&SocketAddr::new(
4283 IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)),
4284 53
4285 ))
4286 .is_ok()
4287 );
4288
4289 assert!(
4290 CandidateAddress::validate_address(&SocketAddr::new(
4291 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4292 443
4293 ))
4294 .is_ok()
4295 );
4296
4297 assert!(matches!(
4299 CandidateAddress::validate_address(&SocketAddr::new(
4300 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4301 0
4302 )),
4303 Err(CandidateValidationError::InvalidPort(0))
4304 ));
4305
4306 #[cfg(not(test))]
4308 assert!(matches!(
4309 CandidateAddress::validate_address(&SocketAddr::new(
4310 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4311 80
4312 )),
4313 Err(CandidateValidationError::PrivilegedPort(80))
4314 ));
4315
4316 assert!(matches!(
4318 CandidateAddress::validate_address(&SocketAddr::new(
4319 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
4320 8080
4321 )),
4322 Err(CandidateValidationError::UnspecifiedAddress)
4323 ));
4324
4325 assert!(matches!(
4326 CandidateAddress::validate_address(&SocketAddr::new(
4327 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
4328 8080
4329 )),
4330 Err(CandidateValidationError::UnspecifiedAddress)
4331 ));
4332
4333 assert!(matches!(
4335 CandidateAddress::validate_address(&SocketAddr::new(
4336 IpAddr::V4(Ipv4Addr::BROADCAST),
4337 8080
4338 )),
4339 Err(CandidateValidationError::BroadcastAddress)
4340 ));
4341
4342 assert!(matches!(
4344 CandidateAddress::validate_address(&SocketAddr::new(
4345 IpAddr::V4(Ipv4Addr::new(224, 0, 0, 1)),
4346 8080
4347 )),
4348 Err(CandidateValidationError::MulticastAddress)
4349 ));
4350
4351 assert!(matches!(
4352 CandidateAddress::validate_address(&SocketAddr::new(
4353 IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 1)),
4354 8080
4355 )),
4356 Err(CandidateValidationError::MulticastAddress)
4357 ));
4358
4359 assert!(matches!(
4361 CandidateAddress::validate_address(&SocketAddr::new(
4362 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
4363 8080
4364 )),
4365 Err(CandidateValidationError::ReservedAddress)
4366 ));
4367
4368 assert!(matches!(
4369 CandidateAddress::validate_address(&SocketAddr::new(
4370 IpAddr::V4(Ipv4Addr::new(240, 0, 0, 1)),
4371 8080
4372 )),
4373 Err(CandidateValidationError::ReservedAddress)
4374 ));
4375
4376 assert!(matches!(
4378 CandidateAddress::validate_address(&SocketAddr::new(
4379 IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1)),
4380 8080
4381 )),
4382 Err(CandidateValidationError::DocumentationAddress)
4383 ));
4384
4385 assert!(matches!(
4387 CandidateAddress::validate_address(&SocketAddr::new(
4388 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc0a8, 0x0001)),
4389 8080
4390 )),
4391 Err(CandidateValidationError::IPv4MappedAddress)
4392 ));
4393 }
4394
4395 #[test]
4396 fn test_candidate_address_suitability_for_nat_traversal() {
4397 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4398
4399 let public_v4 = CandidateAddress::new(
4401 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 8080),
4402 100,
4403 CandidateSource::Observed { by_node: None },
4404 )
4405 .unwrap();
4406 assert!(public_v4.is_suitable_for_nat_traversal());
4407
4408 let private_v4 = CandidateAddress::new(
4409 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4410 100,
4411 CandidateSource::Local,
4412 )
4413 .unwrap();
4414 assert!(private_v4.is_suitable_for_nat_traversal());
4415
4416 let link_local_v4 = CandidateAddress::new(
4418 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(169, 254, 1, 1)), 8080),
4419 100,
4420 CandidateSource::Local,
4421 )
4422 .unwrap();
4423 assert!(!link_local_v4.is_suitable_for_nat_traversal());
4424
4425 let global_v6 = CandidateAddress::new(
4427 SocketAddr::new(
4428 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4429 8080,
4430 ),
4431 100,
4432 CandidateSource::Observed { by_node: None },
4433 )
4434 .unwrap();
4435 assert!(global_v6.is_suitable_for_nat_traversal());
4436
4437 let link_local_v6 = CandidateAddress::new(
4439 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)), 8080),
4440 100,
4441 CandidateSource::Local,
4442 )
4443 .unwrap();
4444 assert!(!link_local_v6.is_suitable_for_nat_traversal());
4445
4446 let unique_local_v6 = CandidateAddress::new(
4448 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)), 8080),
4449 100,
4450 CandidateSource::Local,
4451 )
4452 .unwrap();
4453 assert!(!unique_local_v6.is_suitable_for_nat_traversal());
4454
4455 #[cfg(test)]
4457 {
4458 let loopback_v4 = CandidateAddress::new(
4459 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080),
4460 100,
4461 CandidateSource::Local,
4462 )
4463 .unwrap();
4464 assert!(loopback_v4.is_suitable_for_nat_traversal());
4465
4466 let loopback_v6 = CandidateAddress::new(
4467 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080),
4468 100,
4469 CandidateSource::Local,
4470 )
4471 .unwrap();
4472 assert!(loopback_v6.is_suitable_for_nat_traversal());
4473 }
4474 }
4475
4476 #[test]
4477 fn test_candidate_effective_priority() {
4478 use std::net::{IpAddr, Ipv4Addr};
4479
4480 let mut candidate = CandidateAddress::new(
4481 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4482 100,
4483 CandidateSource::Local,
4484 )
4485 .unwrap();
4486
4487 assert_eq!(candidate.effective_priority(), 90);
4489
4490 candidate.state = CandidateState::Validating;
4492 assert_eq!(candidate.effective_priority(), 95);
4493
4494 candidate.state = CandidateState::Valid;
4496 assert_eq!(candidate.effective_priority(), 100);
4497
4498 candidate.state = CandidateState::Failed;
4500 assert_eq!(candidate.effective_priority(), 0);
4501
4502 candidate.state = CandidateState::Removed;
4504 assert_eq!(candidate.effective_priority(), 0);
4505 }
4506}