1#![allow(missing_docs)]
8
9use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, time::Duration};
16
17#[allow(clippy::panic)]
36fn create_random_port_bind_addr() -> SocketAddr {
37 "0.0.0.0:0"
38 .parse()
39 .unwrap_or_else(|_| panic!("Random port bind address format is always valid"))
40}
41
42use tracing::{debug, error, info, warn};
43
44use std::sync::atomic::{AtomicBool, Ordering};
45
46use tokio::{
47 net::UdpSocket,
48 sync::{mpsc, mpsc::error::TryRecvError},
49 time::{sleep, timeout},
50};
51
52use crate::high_level::default_runtime;
53
54use crate::{
55 VarInt,
56 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
57 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
58};
59
60use crate::{
61 ClientConfig, ConnectionError, EndpointConfig, ServerConfig, TransportConfig,
62 high_level::{Connection as QuinnConnection, Endpoint as QuinnEndpoint},
63};
64
65#[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
66use crate::{crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig};
67
68use crate::config::validation::{ConfigValidator, ValidationResult};
69
70#[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
71use crate::crypto::raw_public_keys::RawPublicKeyConfigBuilder;
72
73pub struct NatTraversalEndpoint {
75 quinn_endpoint: Option<QuinnEndpoint>,
77 config: NatTraversalConfig,
81 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
83 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
85 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
87 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
89 shutdown: Arc<AtomicBool>,
91 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
93 event_rx: std::sync::Mutex<mpsc::UnboundedReceiver<NatTraversalEvent>>,
95 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
97 local_peer_id: PeerId,
99 timeout_config: crate::config::nat_timeouts::TimeoutConfig,
101}
102
103#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
135pub struct NatTraversalConfig {
136 pub role: EndpointRole,
138 pub bootstrap_nodes: Vec<SocketAddr>,
140 pub max_candidates: usize,
142 pub coordination_timeout: Duration,
144 pub enable_symmetric_nat: bool,
146 pub enable_relay_fallback: bool,
148 pub max_concurrent_attempts: usize,
150 pub bind_addr: Option<SocketAddr>,
167 pub prefer_rfc_nat_traversal: bool,
170 pub timeouts: crate::config::nat_timeouts::TimeoutConfig,
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
176pub enum EndpointRole {
177 Client,
179 Server {
181 can_coordinate: bool,
183 },
184 Bootstrap,
186}
187
188impl EndpointRole {
189 pub fn name(&self) -> &'static str {
191 match self {
192 Self::Client => "client",
193 Self::Server { .. } => "server",
194 Self::Bootstrap => "bootstrap",
195 }
196 }
197}
198
199#[derive(
201 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
202)]
203pub struct PeerId(pub [u8; 32]);
204
205#[derive(Debug, Clone)]
207pub struct BootstrapNode {
208 pub address: SocketAddr,
210 pub last_seen: std::time::Instant,
212 pub can_coordinate: bool,
214 pub rtt: Option<Duration>,
216 pub coordination_count: u32,
218}
219
220impl BootstrapNode {
221 pub fn new(address: SocketAddr) -> Self {
223 Self {
224 address,
225 last_seen: std::time::Instant::now(),
226 can_coordinate: true,
227 rtt: None,
228 coordination_count: 0,
229 }
230 }
231}
232
233#[derive(Debug, Clone)]
235pub struct CandidatePair {
236 pub local_candidate: CandidateAddress,
238 pub remote_candidate: CandidateAddress,
240 pub priority: u64,
242 pub state: CandidatePairState,
244}
245
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
248pub enum CandidatePairState {
249 Waiting,
251 InProgress,
253 Succeeded,
255 Failed,
257 Cancelled,
259}
260
261#[derive(Debug)]
263struct NatTraversalSession {
264 peer_id: PeerId,
266 #[allow(dead_code)]
268 coordinator: SocketAddr,
269 attempt: u32,
271 started_at: std::time::Instant,
273 phase: TraversalPhase,
275 candidates: Vec<CandidateAddress>,
277 session_state: SessionState,
279}
280
281#[derive(Debug, Clone)]
283pub struct SessionState {
284 pub state: ConnectionState,
286 pub last_transition: std::time::Instant,
288 pub connection: Option<QuinnConnection>,
290 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
292 pub metrics: ConnectionMetrics,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298pub enum ConnectionState {
299 Idle,
301 Connecting,
303 Connected,
305 Migrating,
307 Closed,
309}
310
311#[derive(Debug, Clone, Default)]
313pub struct ConnectionMetrics {
314 pub rtt: Option<Duration>,
316 pub loss_rate: f64,
318 pub bytes_sent: u64,
320 pub bytes_received: u64,
322 pub last_activity: Option<std::time::Instant>,
324}
325
326#[derive(Debug, Clone)]
328pub struct SessionStateUpdate {
329 pub peer_id: PeerId,
331 pub old_state: ConnectionState,
333 pub new_state: ConnectionState,
335 pub reason: StateChangeReason,
337}
338
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341pub enum StateChangeReason {
342 Timeout,
344 ConnectionEstablished,
346 ConnectionClosed,
348 MigrationComplete,
350 MigrationFailed,
352 NetworkError,
354 UserClosed,
356}
357
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360pub enum TraversalPhase {
361 Discovery,
363 Coordination,
365 Synchronization,
367 Punching,
369 Validation,
371 Connected,
373 Failed,
375}
376
377#[derive(Debug, Clone, Copy)]
379enum SessionUpdate {
380 Timeout,
382 Disconnected,
384 UpdateMetrics,
386 InvalidState,
388 Retry,
390 MigrationTimeout,
392 Remove,
394}
395
396#[derive(Debug, Clone)]
398pub struct CandidateAddress {
399 pub address: SocketAddr,
401 pub priority: u32,
403 pub source: CandidateSource,
405 pub state: CandidateState,
407}
408
409impl CandidateAddress {
410 pub fn new(
412 address: SocketAddr,
413 priority: u32,
414 source: CandidateSource,
415 ) -> Result<Self, CandidateValidationError> {
416 Self::validate_address(&address)?;
417 Ok(Self {
418 address,
419 priority,
420 source,
421 state: CandidateState::New,
422 })
423 }
424
425 pub fn validate_address(addr: &SocketAddr) -> Result<(), CandidateValidationError> {
427 if addr.port() == 0 {
429 return Err(CandidateValidationError::InvalidPort(0));
430 }
431
432 #[cfg(not(test))]
434 if addr.port() < 1024 {
435 return Err(CandidateValidationError::PrivilegedPort(addr.port()));
436 }
437
438 match addr.ip() {
439 std::net::IpAddr::V4(ipv4) => {
440 if ipv4.is_unspecified() {
442 return Err(CandidateValidationError::UnspecifiedAddress);
443 }
444 if ipv4.is_broadcast() {
445 return Err(CandidateValidationError::BroadcastAddress);
446 }
447 if ipv4.is_multicast() {
448 return Err(CandidateValidationError::MulticastAddress);
449 }
450 if ipv4.octets()[0] == 0 {
452 return Err(CandidateValidationError::ReservedAddress);
453 }
454 if ipv4.octets()[0] >= 240 {
456 return Err(CandidateValidationError::ReservedAddress);
457 }
458 }
459 std::net::IpAddr::V6(ipv6) => {
460 if ipv6.is_unspecified() {
462 return Err(CandidateValidationError::UnspecifiedAddress);
463 }
464 if ipv6.is_multicast() {
465 return Err(CandidateValidationError::MulticastAddress);
466 }
467 let segments = ipv6.segments();
469 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
470 return Err(CandidateValidationError::DocumentationAddress);
471 }
472 if ipv6.to_ipv4_mapped().is_some() {
474 return Err(CandidateValidationError::IPv4MappedAddress);
475 }
476 }
477 }
478
479 Ok(())
480 }
481
482 pub fn is_suitable_for_nat_traversal(&self) -> bool {
484 match self.address.ip() {
485 std::net::IpAddr::V4(ipv4) => {
486 #[cfg(test)]
491 if ipv4.is_loopback() {
492 return true;
493 }
494 !ipv4.is_loopback()
495 && !ipv4.is_link_local()
496 && !ipv4.is_multicast()
497 && !ipv4.is_broadcast()
498 }
499 std::net::IpAddr::V6(ipv6) => {
500 #[cfg(test)]
506 if ipv6.is_loopback() {
507 return true;
508 }
509 let segments = ipv6.segments();
510 let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
511 let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
512
513 !ipv6.is_loopback() && !is_link_local && !is_unique_local && !ipv6.is_multicast()
514 }
515 }
516 }
517
518 pub fn effective_priority(&self) -> u32 {
520 match self.state {
521 CandidateState::Valid => self.priority,
522 CandidateState::New => self.priority.saturating_sub(10),
523 CandidateState::Validating => self.priority.saturating_sub(5),
524 CandidateState::Failed => 0,
525 CandidateState::Removed => 0,
526 }
527 }
528}
529
530#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
532pub enum CandidateValidationError {
533 #[error("invalid port number: {0}")]
535 InvalidPort(u16),
536 #[error("privileged port not allowed: {0}")]
538 PrivilegedPort(u16),
539 #[error("unspecified address not allowed")]
541 UnspecifiedAddress,
542 #[error("broadcast address not allowed")]
544 BroadcastAddress,
545 #[error("multicast address not allowed")]
547 MulticastAddress,
548 #[error("reserved address not allowed")]
550 ReservedAddress,
551 #[error("documentation address not allowed")]
553 DocumentationAddress,
554 #[error("IPv4-mapped IPv6 address not allowed")]
556 IPv4MappedAddress,
557}
558
559#[derive(Debug, Clone)]
561pub enum NatTraversalEvent {
562 CandidateDiscovered {
564 peer_id: PeerId,
566 candidate: CandidateAddress,
568 },
569 CoordinationRequested {
571 peer_id: PeerId,
573 coordinator: SocketAddr,
575 },
576 CoordinationSynchronized {
578 peer_id: PeerId,
580 round_id: VarInt,
582 },
583 HolePunchingStarted {
585 peer_id: PeerId,
587 targets: Vec<SocketAddr>,
589 },
590 PathValidated {
592 peer_id: PeerId,
594 address: SocketAddr,
596 rtt: Duration,
598 },
599 CandidateValidated {
601 peer_id: PeerId,
603 candidate_address: SocketAddr,
605 },
606 TraversalSucceeded {
608 peer_id: PeerId,
610 final_address: SocketAddr,
612 total_time: Duration,
614 },
615 ConnectionEstablished {
617 peer_id: PeerId,
618 remote_address: SocketAddr,
620 },
621 TraversalFailed {
623 peer_id: PeerId,
625 error: NatTraversalError,
627 fallback_available: bool,
629 },
630 ConnectionLost {
632 peer_id: PeerId,
634 reason: String,
636 },
637 PhaseTransition {
639 peer_id: PeerId,
641 from_phase: TraversalPhase,
643 to_phase: TraversalPhase,
645 },
646 SessionStateChanged {
648 peer_id: PeerId,
650 new_state: ConnectionState,
652 },
653}
654
655#[derive(Debug, Clone)]
657pub enum NatTraversalError {
658 NoBootstrapNodes,
660 NoCandidatesFound,
662 CandidateDiscoveryFailed(String),
664 CoordinationFailed(String),
666 HolePunchingFailed,
668 PunchingFailed(String),
670 ValidationFailed(String),
672 ValidationTimeout,
674 NetworkError(String),
676 ConfigError(String),
678 ProtocolError(String),
680 Timeout,
682 ConnectionFailed(String),
684 TraversalFailed(String),
686 PeerNotConnected,
688}
689
690impl Default for NatTraversalConfig {
691 fn default() -> Self {
692 Self {
693 role: EndpointRole::Client,
694 bootstrap_nodes: Vec::new(),
695 max_candidates: 8,
696 coordination_timeout: Duration::from_secs(10),
697 enable_symmetric_nat: true,
698 enable_relay_fallback: true,
699 max_concurrent_attempts: 3,
700 bind_addr: None,
701 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
703 }
704 }
705}
706
707impl ConfigValidator for NatTraversalConfig {
708 fn validate(&self) -> ValidationResult<()> {
709 use crate::config::validation::*;
710
711 match self.role {
713 EndpointRole::Client => {
714 if self.bootstrap_nodes.is_empty() {
715 return Err(ConfigValidationError::InvalidRole(
716 "Client endpoints require at least one bootstrap node".to_string(),
717 ));
718 }
719 }
720 EndpointRole::Server { can_coordinate } => {
721 if can_coordinate && self.bootstrap_nodes.is_empty() {
722 return Err(ConfigValidationError::InvalidRole(
723 "Server endpoints with coordination capability require bootstrap nodes"
724 .to_string(),
725 ));
726 }
727 }
728 EndpointRole::Bootstrap => {
729 }
731 }
732
733 if !self.bootstrap_nodes.is_empty() {
735 validate_bootstrap_nodes(&self.bootstrap_nodes)?;
736 }
737
738 validate_range(self.max_candidates, 1, 256, "max_candidates")?;
740
741 validate_duration(
743 self.coordination_timeout,
744 Duration::from_millis(100),
745 Duration::from_secs(300),
746 "coordination_timeout",
747 )?;
748
749 validate_range(
751 self.max_concurrent_attempts,
752 1,
753 16,
754 "max_concurrent_attempts",
755 )?;
756
757 if self.max_concurrent_attempts > self.max_candidates {
759 return Err(ConfigValidationError::IncompatibleConfiguration(
760 "max_concurrent_attempts cannot exceed max_candidates".to_string(),
761 ));
762 }
763
764 if self.role == EndpointRole::Bootstrap && self.enable_relay_fallback {
765 return Err(ConfigValidationError::IncompatibleConfiguration(
766 "Bootstrap nodes should not enable relay fallback".to_string(),
767 ));
768 }
769
770 Ok(())
771 }
772}
773
774impl NatTraversalEndpoint {
775 pub async fn new(
777 config: NatTraversalConfig,
778 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
779 ) -> Result<Self, NatTraversalError> {
780 Self::new_impl(config, event_callback).await
781 }
782
783 async fn new_impl(
785 config: NatTraversalConfig,
786 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
787 ) -> Result<Self, NatTraversalError> {
788 Self::new_common(config, event_callback).await
789 }
790
791 async fn new_common(
793 config: NatTraversalConfig,
794 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
795 ) -> Result<Self, NatTraversalError> {
796 Self::new_shared_logic(config, event_callback).await
798 }
799
800 async fn new_shared_logic(
802 config: NatTraversalConfig,
803 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
804 ) -> Result<Self, NatTraversalError> {
805 {
808 config
809 .validate()
810 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
811 }
812
813 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
817 config
818 .bootstrap_nodes
819 .iter()
820 .map(|&address| BootstrapNode {
821 address,
822 last_seen: std::time::Instant::now(),
823 can_coordinate: true, rtt: None,
825 coordination_count: 0,
826 })
827 .collect(),
828 ));
829
830 let discovery_config = DiscoveryConfig {
832 total_timeout: config.coordination_timeout,
833 max_candidates: config.max_candidates,
834 enable_symmetric_prediction: config.enable_symmetric_nat,
835 bound_address: config.bind_addr, ..DiscoveryConfig::default()
837 };
838
839 let nat_traversal_role = match config.role {
840 EndpointRole::Client => NatTraversalRole::Client,
841 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server {
842 can_relay: can_coordinate,
843 },
844 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
845 };
846
847 let discovery_manager = Arc::new(std::sync::Mutex::new(CandidateDiscoveryManager::new(
848 discovery_config,
849 )));
850
851 let (quinn_endpoint, event_tx, event_rx, local_addr) =
854 Self::create_quinn_endpoint(&config, nat_traversal_role).await?;
855
856 {
858 let mut discovery = discovery_manager.lock().map_err(|_| {
859 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
860 })?;
861 discovery.set_bound_address(local_addr);
862 info!(
863 "Updated discovery manager with bound address: {}",
864 local_addr
865 );
866 }
867
868 let endpoint = Self {
869 quinn_endpoint: Some(quinn_endpoint.clone()),
870 config: config.clone(),
871 bootstrap_nodes,
872 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
873 discovery_manager,
874 event_callback,
875 shutdown: Arc::new(AtomicBool::new(false)),
876 event_tx: Some(event_tx.clone()),
877 event_rx: std::sync::Mutex::new(event_rx),
878 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
879 local_peer_id: Self::generate_local_peer_id(),
880 timeout_config: config.timeouts.clone(),
881 };
882
883 if matches!(
885 config.role,
886 EndpointRole::Bootstrap | EndpointRole::Server { .. }
887 ) {
888 let endpoint_clone = quinn_endpoint.clone();
889 let shutdown_clone = endpoint.shutdown.clone();
890 let event_tx_clone = event_tx.clone();
891 let connections_clone = endpoint.connections.clone();
892
893 tokio::spawn(async move {
894 Self::accept_connections(
895 endpoint_clone,
896 shutdown_clone,
897 event_tx_clone,
898 connections_clone,
899 )
900 .await;
901 });
902
903 info!("Started accepting connections for {:?} role", config.role);
904 }
905
906 let discovery_manager_clone = endpoint.discovery_manager.clone();
908 let shutdown_clone = endpoint.shutdown.clone();
909 let event_tx_clone = event_tx;
910
911 tokio::spawn(async move {
912 Self::poll_discovery(discovery_manager_clone, shutdown_clone, event_tx_clone).await;
913 });
914
915 info!("Started discovery polling task");
916
917 {
919 let mut discovery = endpoint.discovery_manager.lock().map_err(|_| {
920 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
921 })?;
922
923 let local_peer_id = endpoint.local_peer_id;
925 let bootstrap_nodes = {
926 let nodes = endpoint.bootstrap_nodes.read().map_err(|_| {
927 NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string())
928 })?;
929 nodes.clone()
930 };
931
932 discovery
933 .start_discovery(local_peer_id, bootstrap_nodes)
934 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
935
936 info!(
937 "Started local candidate discovery for peer {:?}",
938 local_peer_id
939 );
940 }
941
942 Ok(endpoint)
943 }
944
945 pub fn get_quinn_endpoint(&self) -> Option<&crate::high_level::Endpoint> {
947 self.quinn_endpoint.as_ref()
948 }
949
950 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
952 self.event_callback.as_ref()
953 }
954
955 pub fn initiate_nat_traversal(
957 &self,
958 peer_id: PeerId,
959 coordinator: SocketAddr,
960 ) -> Result<(), NatTraversalError> {
961 info!(
962 "Starting NAT traversal to peer {:?} via coordinator {}",
963 peer_id, coordinator
964 );
965
966 let session = NatTraversalSession {
968 peer_id,
969 coordinator,
970 attempt: 1,
971 started_at: std::time::Instant::now(),
972 phase: TraversalPhase::Discovery,
973 candidates: Vec::new(),
974 session_state: SessionState {
975 state: ConnectionState::Connecting,
976 last_transition: std::time::Instant::now(),
977
978 connection: None,
979 active_attempts: Vec::new(),
980 metrics: ConnectionMetrics::default(),
981 },
982 };
983
984 {
986 let mut sessions = self
987 .active_sessions
988 .write()
989 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
990 sessions.insert(peer_id, session);
991 }
992
993 let bootstrap_nodes_vec = {
995 let bootstrap_nodes = self
996 .bootstrap_nodes
997 .read()
998 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
999 bootstrap_nodes.clone()
1000 };
1001
1002 {
1003 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1004 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1005 })?;
1006
1007 discovery
1008 .start_discovery(peer_id, bootstrap_nodes_vec)
1009 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1010 }
1011
1012 if let Some(ref callback) = self.event_callback {
1014 callback(NatTraversalEvent::CoordinationRequested {
1015 peer_id,
1016 coordinator,
1017 });
1018 }
1019
1020 Ok(())
1022 }
1023
1024 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
1026 let mut updates = Vec::new();
1027 let now = std::time::Instant::now();
1028
1029 let mut sessions = self
1030 .active_sessions
1031 .write()
1032 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
1033
1034 for (peer_id, session) in sessions.iter_mut() {
1035 let mut state_changed = false;
1036
1037 match session.session_state.state {
1038 ConnectionState::Connecting => {
1039 let elapsed = now.duration_since(session.session_state.last_transition);
1041 if elapsed
1042 > self
1043 .timeout_config
1044 .nat_traversal
1045 .connection_establishment_timeout
1046 {
1047 session.session_state.state = ConnectionState::Closed;
1048 session.session_state.last_transition = now;
1049 state_changed = true;
1050
1051 updates.push(SessionStateUpdate {
1052 peer_id: *peer_id,
1053 old_state: ConnectionState::Connecting,
1054 new_state: ConnectionState::Closed,
1055 reason: StateChangeReason::Timeout,
1056 });
1057 }
1058
1059 if let Some(ref _connection) = session.session_state.connection {
1062 session.session_state.state = ConnectionState::Connected;
1063 session.session_state.last_transition = now;
1064 state_changed = true;
1065
1066 updates.push(SessionStateUpdate {
1067 peer_id: *peer_id,
1068 old_state: ConnectionState::Connecting,
1069 new_state: ConnectionState::Connected,
1070 reason: StateChangeReason::ConnectionEstablished,
1071 });
1072 }
1073 }
1074 ConnectionState::Connected => {
1075 {
1078 }
1081
1082 session.session_state.metrics.last_activity = Some(now);
1084 }
1085 ConnectionState::Migrating => {
1086 let elapsed = now.duration_since(session.session_state.last_transition);
1088 if elapsed > Duration::from_secs(10) {
1089 if session.session_state.connection.is_some() {
1092 session.session_state.state = ConnectionState::Connected;
1093 state_changed = true;
1094
1095 updates.push(SessionStateUpdate {
1096 peer_id: *peer_id,
1097 old_state: ConnectionState::Migrating,
1098 new_state: ConnectionState::Connected,
1099 reason: StateChangeReason::MigrationComplete,
1100 });
1101 } else {
1102 session.session_state.state = ConnectionState::Closed;
1103 state_changed = true;
1104
1105 updates.push(SessionStateUpdate {
1106 peer_id: *peer_id,
1107 old_state: ConnectionState::Migrating,
1108 new_state: ConnectionState::Closed,
1109 reason: StateChangeReason::MigrationFailed,
1110 });
1111 }
1112
1113 session.session_state.last_transition = now;
1114 }
1115 }
1116 _ => {}
1117 }
1118
1119 if state_changed {
1121 if let Some(ref callback) = self.event_callback {
1122 callback(NatTraversalEvent::SessionStateChanged {
1123 peer_id: *peer_id,
1124 new_state: session.session_state.state,
1125 });
1126 }
1127 }
1128 }
1129
1130 Ok(updates)
1131 }
1132
1133 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
1135 let sessions = self.active_sessions.clone();
1136 let shutdown = self.shutdown.clone();
1137 let timeout_config = self.timeout_config.clone();
1138
1139 tokio::spawn(async move {
1140 let mut ticker = tokio::time::interval(interval);
1141
1142 loop {
1143 ticker.tick().await;
1144
1145 if shutdown.load(Ordering::Relaxed) {
1146 break;
1147 }
1148
1149 let sessions_to_update = {
1151 match sessions.read() {
1152 Ok(sessions_guard) => {
1153 sessions_guard
1154 .iter()
1155 .filter_map(|(peer_id, session)| {
1156 let now = std::time::Instant::now();
1157 let elapsed =
1158 now.duration_since(session.session_state.last_transition);
1159
1160 match session.session_state.state {
1161 ConnectionState::Connecting => {
1162 if elapsed
1164 > timeout_config
1165 .nat_traversal
1166 .connection_establishment_timeout
1167 {
1168 Some((*peer_id, SessionUpdate::Timeout))
1169 } else {
1170 None
1171 }
1172 }
1173 ConnectionState::Connected => {
1174 if let Some(ref conn) = session.session_state.connection
1176 {
1177 if conn.close_reason().is_some() {
1178 Some((*peer_id, SessionUpdate::Disconnected))
1179 } else {
1180 Some((*peer_id, SessionUpdate::UpdateMetrics))
1182 }
1183 } else {
1184 Some((*peer_id, SessionUpdate::InvalidState))
1185 }
1186 }
1187 ConnectionState::Idle => {
1188 if elapsed
1190 > timeout_config
1191 .discovery
1192 .server_reflexive_cache_ttl
1193 {
1194 Some((*peer_id, SessionUpdate::Retry))
1195 } else {
1196 None
1197 }
1198 }
1199 ConnectionState::Migrating => {
1200 if elapsed > timeout_config.nat_traversal.probe_timeout
1202 {
1203 Some((*peer_id, SessionUpdate::MigrationTimeout))
1204 } else {
1205 None
1206 }
1207 }
1208 ConnectionState::Closed => {
1209 if elapsed
1211 > timeout_config.discovery.interface_cache_ttl
1212 {
1213 Some((*peer_id, SessionUpdate::Remove))
1214 } else {
1215 None
1216 }
1217 }
1218 }
1219 })
1220 .collect::<Vec<_>>()
1221 }
1222 _ => {
1223 vec![]
1224 }
1225 }
1226 };
1227
1228 if !sessions_to_update.is_empty() {
1230 if let Ok(mut sessions_guard) = sessions.write() {
1231 for (peer_id, update) in sessions_to_update {
1232 match update {
1233 SessionUpdate::Timeout => {
1234 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1235 session.session_state.state = ConnectionState::Closed;
1236 session.session_state.last_transition =
1237 std::time::Instant::now();
1238 tracing::warn!("Connection to {:?} timed out", peer_id);
1239 }
1240 }
1241 SessionUpdate::Disconnected => {
1242 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1243 session.session_state.state = ConnectionState::Closed;
1244 session.session_state.last_transition =
1245 std::time::Instant::now();
1246 session.session_state.connection = None;
1247 tracing::info!("Connection to {:?} closed", peer_id);
1248 }
1249 }
1250 SessionUpdate::UpdateMetrics => {
1251 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1252 if let Some(ref conn) = session.session_state.connection {
1253 let stats = conn.stats();
1255 session.session_state.metrics.rtt =
1256 Some(stats.path.rtt);
1257 session.session_state.metrics.loss_rate =
1258 stats.path.lost_packets as f64
1259 / stats.path.sent_packets.max(1) as f64;
1260 }
1261 }
1262 }
1263 SessionUpdate::InvalidState => {
1264 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1265 session.session_state.state = ConnectionState::Closed;
1266 session.session_state.last_transition =
1267 std::time::Instant::now();
1268 tracing::error!("Session {:?} in invalid state", peer_id);
1269 }
1270 }
1271 SessionUpdate::Retry => {
1272 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1273 session.session_state.state = ConnectionState::Connecting;
1274 session.session_state.last_transition =
1275 std::time::Instant::now();
1276 session.attempt += 1;
1277 tracing::info!(
1278 "Retrying connection to {:?} (attempt {})",
1279 peer_id,
1280 session.attempt
1281 );
1282 }
1283 }
1284 SessionUpdate::MigrationTimeout => {
1285 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1286 session.session_state.state = ConnectionState::Closed;
1287 session.session_state.last_transition =
1288 std::time::Instant::now();
1289 tracing::warn!("Migration timeout for {:?}", peer_id);
1290 }
1291 }
1292 SessionUpdate::Remove => {
1293 sessions_guard.remove(&peer_id);
1294 tracing::debug!("Removed old session for {:?}", peer_id);
1295 }
1296 }
1297 }
1298 }
1299 }
1300 }
1301 })
1302 }
1303
1304 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
1308 let sessions = self
1309 .active_sessions
1310 .read()
1311 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1312 let bootstrap_nodes = self
1313 .bootstrap_nodes
1314 .read()
1315 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1316
1317 let avg_coordination_time = {
1319 let rtts: Vec<Duration> = bootstrap_nodes.iter().filter_map(|b| b.rtt).collect();
1320
1321 if rtts.is_empty() {
1322 Duration::from_millis(500) } else {
1324 let total_millis: u64 = rtts.iter().map(|d| d.as_millis() as u64).sum();
1325 Duration::from_millis(total_millis / rtts.len() as u64 * 2) }
1327 };
1328
1329 Ok(NatTraversalStatistics {
1330 active_sessions: sessions.len(),
1331 total_bootstrap_nodes: bootstrap_nodes.len(),
1332 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
1333 average_coordination_time: avg_coordination_time,
1334 total_attempts: 0,
1335 successful_connections: 0,
1336 direct_connections: 0,
1337 relayed_connections: 0,
1338 })
1339 }
1340
1341 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1343 let mut bootstrap_nodes = self
1344 .bootstrap_nodes
1345 .write()
1346 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1347
1348 if !bootstrap_nodes.iter().any(|b| b.address == address) {
1350 bootstrap_nodes.push(BootstrapNode {
1351 address,
1352 last_seen: std::time::Instant::now(),
1353 can_coordinate: true,
1354 rtt: None,
1355 coordination_count: 0,
1356 });
1357 info!("Added bootstrap node: {}", address);
1358 }
1359 Ok(())
1360 }
1361
1362 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1364 let mut bootstrap_nodes = self
1365 .bootstrap_nodes
1366 .write()
1367 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1368 bootstrap_nodes.retain(|b| b.address != address);
1369 info!("Removed bootstrap node: {}", address);
1370 Ok(())
1371 }
1372
1373 async fn create_quinn_endpoint(
1377 config: &NatTraversalConfig,
1378 _nat_role: NatTraversalRole,
1379 ) -> Result<
1380 (
1381 QuinnEndpoint,
1382 mpsc::UnboundedSender<NatTraversalEvent>,
1383 mpsc::UnboundedReceiver<NatTraversalEvent>,
1384 SocketAddr,
1385 ),
1386 NatTraversalError,
1387 > {
1388 use std::sync::Arc;
1389
1390 let server_config = match config.role {
1392 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1393 info!("Creating server config for role: {:?} using Raw Public Keys (RFC 7250)", config.role);
1394
1395 let (server_key, _public_key) = crate::crypto::raw_public_keys::key_utils::generate_ed25519_keypair();
1397
1398 let rpk_config = RawPublicKeyConfigBuilder::new()
1400 .with_server_key(server_key)
1401 .allow_any_key() .build_rfc7250_server_config()
1403 .map_err(|e| NatTraversalError::ConfigError(format!("RPK server config failed: {e}")))?;
1404
1405 let server_crypto = QuicServerConfig::try_from(rpk_config.inner().as_ref().clone())
1406 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1407
1408 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
1409
1410 let mut transport_config = TransportConfig::default();
1412 transport_config
1413 .keep_alive_interval(Some(config.timeouts.nat_traversal.retry_interval));
1414 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1415
1416 let nat_config = match config.role {
1421 EndpointRole::Client => {
1422 crate::transport_parameters::NatTraversalConfig::ClientSupport
1423 }
1424 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1425 crate::transport_parameters::NatTraversalConfig::ServerSupport {
1426 concurrency_limit: VarInt::from_u32(
1427 config.max_concurrent_attempts as u32,
1428 ),
1429 }
1430 }
1431 };
1432 transport_config.nat_traversal_config(Some(nat_config));
1433
1434 server_config.transport_config(Arc::new(transport_config));
1435
1436 Some(server_config)
1437 }
1438 _ => None,
1439 };
1440
1441 let client_config = {
1443 info!("Creating client config using Raw Public Keys (RFC 7250)");
1444
1445 let rpk_config = RawPublicKeyConfigBuilder::new()
1447 .allow_any_key() .build_rfc7250_client_config()
1449 .map_err(|e| NatTraversalError::ConfigError(format!("RPK client config failed: {e}")))?;
1450
1451 let client_crypto = QuicClientConfig::try_from(rpk_config.inner().as_ref().clone())
1452 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1453
1454 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
1455
1456 let mut transport_config = TransportConfig::default();
1458 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
1459 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1460
1461 let nat_config = match config.role {
1466 EndpointRole::Client => {
1467 crate::transport_parameters::NatTraversalConfig::ClientSupport
1468 }
1469 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1470 crate::transport_parameters::NatTraversalConfig::ServerSupport {
1471 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1472 }
1473 }
1474 };
1475 transport_config.nat_traversal_config(Some(nat_config));
1476
1477 client_config.transport_config(Arc::new(transport_config));
1478
1479 client_config
1480 };
1481
1482 let bind_addr = config
1484 .bind_addr
1485 .unwrap_or_else(create_random_port_bind_addr);
1486 let socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1487 NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {e}"))
1488 })?;
1489
1490 info!("Binding endpoint to {}", bind_addr);
1491
1492 let std_socket = socket.into_std().map_err(|e| {
1494 NatTraversalError::NetworkError(format!("Failed to convert socket: {e}"))
1495 })?;
1496
1497 let runtime = default_runtime().ok_or_else(|| {
1499 NatTraversalError::ConfigError("No compatible async runtime found".to_string())
1500 })?;
1501
1502 let mut endpoint = QuinnEndpoint::new(
1503 EndpointConfig::default(),
1504 server_config,
1505 std_socket,
1506 runtime,
1507 )
1508 .map_err(|e| {
1509 NatTraversalError::ConfigError(format!("Failed to create Quinn endpoint: {e}"))
1510 })?;
1511
1512 endpoint.set_default_client_config(client_config);
1514
1515 let local_addr = endpoint.local_addr().map_err(|e| {
1517 NatTraversalError::NetworkError(format!("Failed to get local address: {e}"))
1518 })?;
1519
1520 info!("Endpoint bound to actual address: {}", local_addr);
1521
1522 let (event_tx, event_rx) = mpsc::unbounded_channel();
1524
1525 Ok((endpoint, event_tx, event_rx, local_addr))
1526 }
1527
1528 #[allow(clippy::panic)]
1530 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1531 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1532 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1533 })?;
1534
1535 let _socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1537 NatTraversalError::NetworkError(format!("Failed to bind to {bind_addr}: {e}"))
1538 })?;
1539
1540 info!("Started listening on {}", bind_addr);
1541
1542 let endpoint_clone = endpoint.clone();
1544 let shutdown_clone = self.shutdown.clone();
1545 let event_tx = self
1546 .event_tx
1547 .as_ref()
1548 .unwrap_or_else(|| panic!("event transmitter should be initialized"))
1549 .clone();
1550 let connections_clone = self.connections.clone();
1551
1552 tokio::spawn(async move {
1553 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx, connections_clone)
1554 .await;
1555 });
1556
1557 Ok(())
1558 }
1559
1560 async fn accept_connections(
1562 endpoint: QuinnEndpoint,
1563 shutdown: Arc<AtomicBool>,
1564 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1565 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
1566 ) {
1567 while !shutdown.load(Ordering::Relaxed) {
1568 match endpoint.accept().await {
1569 Some(connecting) => {
1570 let event_tx = event_tx.clone();
1571 let connections = connections.clone();
1572 tokio::spawn(async move {
1573 match connecting.await {
1574 Ok(connection) => {
1575 info!("Accepted connection from {}", connection.remote_address());
1576
1577 let peer_id = Self::generate_peer_id_from_address(
1579 connection.remote_address(),
1580 );
1581
1582 if let Ok(mut conns) = connections.write() {
1584 conns.insert(peer_id, connection.clone());
1585 }
1586
1587 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1588 peer_id,
1589 remote_address: connection.remote_address(),
1590 });
1591
1592 Self::handle_connection(peer_id, connection, event_tx).await;
1594 }
1595 Err(e) => {
1596 debug!("Connection failed: {}", e);
1597 }
1598 }
1599 });
1600 }
1601 None => {
1602 break;
1604 }
1605 }
1606 }
1607 }
1608
1609 async fn poll_discovery(
1611 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1612 shutdown: Arc<AtomicBool>,
1613 _event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1614 ) {
1615 use tokio::time::{Duration, interval};
1616
1617 let mut poll_interval = interval(Duration::from_millis(100));
1618
1619 while !shutdown.load(Ordering::Relaxed) {
1620 poll_interval.tick().await;
1621
1622 let events = match discovery_manager.lock() {
1624 Ok(mut discovery) => discovery.poll(std::time::Instant::now()),
1625 Err(e) => {
1626 error!("Failed to lock discovery manager: {}", e);
1627 continue;
1628 }
1629 };
1630
1631 for event in events {
1633 match event {
1634 DiscoveryEvent::DiscoveryStarted {
1635 peer_id,
1636 bootstrap_count,
1637 } => {
1638 debug!(
1639 "Discovery started for peer {:?} with {} bootstrap nodes",
1640 peer_id, bootstrap_count
1641 );
1642 }
1643 DiscoveryEvent::LocalScanningStarted => {
1644 debug!("Local interface scanning started");
1645 }
1646 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1647 debug!("Discovered local candidate: {}", candidate.address);
1648 }
1651 DiscoveryEvent::LocalScanningCompleted {
1652 candidate_count,
1653 duration,
1654 } => {
1655 debug!(
1656 "Local interface scanning completed: {} candidates in {:?}",
1657 candidate_count, duration
1658 );
1659 }
1660 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1661 debug!(
1662 "Server reflexive discovery started with {} bootstrap nodes",
1663 bootstrap_count
1664 );
1665 }
1666 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1667 candidate,
1668 bootstrap_node,
1669 } => {
1670 debug!(
1671 "Discovered server-reflexive candidate {} via bootstrap {}",
1672 candidate.address, bootstrap_node
1673 );
1674 }
1676 DiscoveryEvent::BootstrapQueryFailed {
1677 bootstrap_node,
1678 error,
1679 } => {
1680 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1681 }
1682 DiscoveryEvent::PortAllocationDetected {
1684 port,
1685 source_address,
1686 bootstrap_node,
1687 timestamp,
1688 } => {
1689 debug!(
1690 "Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1691 port, source_address, bootstrap_node, timestamp
1692 );
1693 }
1694 DiscoveryEvent::DiscoveryCompleted {
1695 candidate_count,
1696 total_duration,
1697 success_rate,
1698 } => {
1699 info!(
1700 "Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1701 candidate_count,
1702 total_duration,
1703 success_rate * 100.0
1704 );
1705 }
1708 DiscoveryEvent::DiscoveryFailed {
1709 error,
1710 partial_results,
1711 } => {
1712 warn!(
1713 "Discovery failed: {} (found {} partial candidates)",
1714 error,
1715 partial_results.len()
1716 );
1717
1718 }
1723 DiscoveryEvent::PathValidationRequested {
1724 candidate_id,
1725 candidate_address,
1726 challenge_token,
1727 } => {
1728 debug!(
1729 "PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1730 candidate_id.0, candidate_address, challenge_token
1731 );
1732 }
1735 DiscoveryEvent::PathValidationResponse {
1736 candidate_id,
1737 candidate_address,
1738 challenge_token: _,
1739 rtt,
1740 } => {
1741 debug!(
1742 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1743 candidate_id.0, candidate_address, rtt
1744 );
1745 }
1747 }
1748 }
1749 }
1750
1751 info!("Discovery polling task shutting down");
1752 }
1753
1754 async fn handle_connection(
1756 peer_id: PeerId,
1757 connection: QuinnConnection,
1758 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1759 ) {
1760 let remote_address = connection.remote_address();
1761 let closed = connection.closed();
1762 tokio::pin!(closed);
1763
1764 debug!(
1765 "Handling connection from peer {:?} at {}",
1766 peer_id, remote_address
1767 );
1768
1769 closed.await;
1773
1774 let reason = connection
1775 .close_reason()
1776 .map(|reason| format!("Connection closed: {reason}"))
1777 .unwrap_or_else(|| "Connection closed".to_string());
1778 let _ = event_tx.send(NatTraversalEvent::ConnectionLost { peer_id, reason });
1779 }
1780
1781 async fn handle_bi_stream(
1783 _send: crate::high_level::SendStream,
1784 _recv: crate::high_level::RecvStream,
1785 ) {
1786 }
1815
1816 async fn handle_uni_stream(mut recv: crate::high_level::RecvStream) {
1818 let mut buffer = vec![0u8; 1024];
1819
1820 loop {
1821 match recv.read(&mut buffer).await {
1822 Ok(Some(size)) => {
1823 debug!("Received {} bytes on unidirectional stream", size);
1824 }
1826 Ok(None) => {
1827 debug!("Unidirectional stream closed by peer");
1828 break;
1829 }
1830 Err(e) => {
1831 debug!("Error reading from unidirectional stream: {}", e);
1832 break;
1833 }
1834 }
1835 }
1836 }
1837
1838 pub async fn connect_to_peer(
1840 &self,
1841 peer_id: PeerId,
1842 server_name: &str,
1843 remote_addr: SocketAddr,
1844 ) -> Result<QuinnConnection, NatTraversalError> {
1845 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1846 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1847 })?;
1848
1849 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1850
1851 let connecting = endpoint.connect(remote_addr, server_name).map_err(|e| {
1853 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
1854 })?;
1855
1856 let connection = timeout(
1857 self.timeout_config
1858 .nat_traversal
1859 .connection_establishment_timeout,
1860 connecting,
1861 )
1862 .await
1863 .map_err(|_| NatTraversalError::Timeout)?
1864 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
1865
1866 info!(
1867 "Successfully connected to peer {:?} at {}",
1868 peer_id, remote_addr
1869 );
1870
1871 if let Some(ref event_tx) = self.event_tx {
1873 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1874 peer_id,
1875 remote_address: remote_addr,
1876 });
1877 }
1878
1879 Ok(connection)
1880 }
1881
1882 pub async fn accept_connection(&self) -> Result<(PeerId, QuinnConnection), NatTraversalError> {
1884 info!("Waiting for incoming connection via event channel...");
1885
1886 let timeout_duration = self.timeout_config.nat_traversal.connection_establishment_timeout;
1887 let start = std::time::Instant::now();
1888
1889 loop {
1890 if self.shutdown.load(Ordering::Relaxed) {
1892 return Err(NatTraversalError::NetworkError("Endpoint shutting down".to_string()));
1893 }
1894
1895 if start.elapsed() > timeout_duration {
1897 warn!("accept_connection() timed out after {:?}", timeout_duration);
1898 return Err(NatTraversalError::Timeout);
1899 }
1900
1901 {
1903 let mut event_rx = self.event_rx.lock().map_err(|_| {
1904 NatTraversalError::ProtocolError("Event channel lock poisoned".to_string())
1905 })?;
1906
1907 match event_rx.try_recv() {
1908 Ok(NatTraversalEvent::ConnectionEstablished { peer_id, remote_address }) => {
1909 info!("Received ConnectionEstablished event for peer {:?} at {}", peer_id, remote_address);
1910
1911 let connection = {
1914 let connections = self.connections.read().map_err(|_| {
1915 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1916 })?;
1917 connections.get(&peer_id).cloned().ok_or_else(|| {
1918 NatTraversalError::ConnectionFailed(format!(
1919 "Connection for peer {:?} not found in storage", peer_id
1920 ))
1921 })?
1922 };
1923
1924 info!("Retrieved accepted connection from peer {:?} at {}", peer_id, remote_address);
1925 return Ok((peer_id, connection));
1926 }
1927 Ok(event) => {
1928 debug!("Ignoring non-connection event while waiting for accept: {:?}", event);
1930 }
1931 Err(mpsc::error::TryRecvError::Empty) => {
1932 }
1934 Err(mpsc::error::TryRecvError::Disconnected) => {
1935 return Err(NatTraversalError::NetworkError("Event channel closed".to_string()));
1936 }
1937 }
1938 } tokio::time::sleep(Duration::from_millis(10)).await;
1942 }
1943 }
1944
1945 pub fn local_peer_id(&self) -> PeerId {
1947 self.local_peer_id
1948 }
1949
1950 pub fn get_connection(
1952 &self,
1953 peer_id: &PeerId,
1954 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
1955 let connections = self.connections.read().map_err(|_| {
1956 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1957 })?;
1958 Ok(connections.get(peer_id).cloned())
1959 }
1960
1961 pub fn add_connection(
1963 &self,
1964 peer_id: PeerId,
1965 connection: QuinnConnection,
1966 ) -> Result<(), NatTraversalError> {
1967 let mut connections = self.connections.write().map_err(|_| {
1968 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1969 })?;
1970 connections.insert(peer_id, connection);
1971 Ok(())
1972 }
1973
1974 pub fn spawn_connection_handler(
1976 &self,
1977 peer_id: PeerId,
1978 connection: QuinnConnection,
1979 ) -> Result<(), NatTraversalError> {
1980 let event_tx = self.event_tx.as_ref().cloned().ok_or_else(|| {
1981 NatTraversalError::ConfigError("NAT traversal event channel not configured".to_string())
1982 })?;
1983
1984 let remote_address = connection.remote_address();
1985
1986 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1988 peer_id,
1989 remote_address,
1990 });
1991
1992 tokio::spawn(async move {
1994 Self::handle_connection(peer_id, connection, event_tx).await;
1995 });
1996
1997 Ok(())
1998 }
1999
2000 pub fn remove_connection(
2002 &self,
2003 peer_id: &PeerId,
2004 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
2005 let mut connections = self.connections.write().map_err(|_| {
2006 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2007 })?;
2008 Ok(connections.remove(peer_id))
2009 }
2010
2011 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
2013 let connections = self.connections.read().map_err(|_| {
2014 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2015 })?;
2016 let mut result = Vec::new();
2017 for (peer_id, connection) in connections.iter() {
2018 result.push((*peer_id, connection.remote_address()));
2019 }
2020 Ok(result)
2021 }
2022
2023 pub async fn handle_connection_data(
2025 &self,
2026 peer_id: PeerId,
2027 connection: &QuinnConnection,
2028 ) -> Result<(), NatTraversalError> {
2029 info!("Handling connection data from peer {:?}", peer_id);
2030
2031 let connection_clone = connection.clone();
2033 let peer_id_clone = peer_id;
2034 tokio::spawn(async move {
2035 loop {
2036 match connection_clone.accept_bi().await {
2037 Ok((send, recv)) => {
2038 debug!(
2039 "Accepted bidirectional stream from peer {:?}",
2040 peer_id_clone
2041 );
2042 tokio::spawn(Self::handle_bi_stream(send, recv));
2043 }
2044 Err(ConnectionError::ApplicationClosed(_)) => {
2045 debug!("Connection closed by peer {:?}", peer_id_clone);
2046 break;
2047 }
2048 Err(e) => {
2049 debug!(
2050 "Error accepting bidirectional stream from peer {:?}: {}",
2051 peer_id_clone, e
2052 );
2053 break;
2054 }
2055 }
2056 }
2057 });
2058
2059 let connection_clone = connection.clone();
2061 let peer_id_clone = peer_id;
2062 tokio::spawn(async move {
2063 loop {
2064 match connection_clone.accept_uni().await {
2065 Ok(recv) => {
2066 debug!(
2067 "Accepted unidirectional stream from peer {:?}",
2068 peer_id_clone
2069 );
2070 tokio::spawn(Self::handle_uni_stream(recv));
2071 }
2072 Err(ConnectionError::ApplicationClosed(_)) => {
2073 debug!("Connection closed by peer {:?}", peer_id_clone);
2074 break;
2075 }
2076 Err(e) => {
2077 debug!(
2078 "Error accepting unidirectional stream from peer {:?}: {}",
2079 peer_id_clone, e
2080 );
2081 break;
2082 }
2083 }
2084 }
2085 });
2086
2087 Ok(())
2088 }
2089
2090 fn generate_local_peer_id() -> PeerId {
2092 use std::collections::hash_map::DefaultHasher;
2093 use std::hash::{Hash, Hasher};
2094 use std::time::SystemTime;
2095
2096 let mut hasher = DefaultHasher::new();
2097 SystemTime::now().hash(&mut hasher);
2098 std::process::id().hash(&mut hasher);
2099
2100 let hash = hasher.finish();
2101 let mut peer_id = [0u8; 32];
2102 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2103
2104 for i in 8..32 {
2106 peer_id[i] = rand::random();
2107 }
2108
2109 PeerId(peer_id)
2110 }
2111
2112 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
2118 use std::collections::hash_map::DefaultHasher;
2119 use std::hash::{Hash, Hasher};
2120
2121 let mut hasher = DefaultHasher::new();
2122 addr.hash(&mut hasher);
2123
2124 let hash = hasher.finish();
2125 let mut peer_id = [0u8; 32];
2126 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2127
2128 for i in 8..32 {
2131 peer_id[i] = rand::random();
2132 }
2133
2134 warn!(
2135 "Generated temporary peer ID from address {}. This ID is not persistent!",
2136 addr
2137 );
2138 PeerId(peer_id)
2139 }
2140
2141 async fn extract_peer_id_from_connection(
2143 &self,
2144 connection: &QuinnConnection,
2145 ) -> Option<PeerId> {
2146 if let Some(identity) = connection.peer_identity() {
2148 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
2150 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
2152 Ok(peer_id) => {
2153 debug!("Derived peer ID from Ed25519 public key");
2154 return Some(peer_id);
2155 }
2156 Err(e) => {
2157 warn!("Failed to derive peer ID from public key: {}", e);
2158 }
2159 }
2160 }
2161 }
2163
2164 None
2165 }
2166
2167 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
2169 self.shutdown.store(true, Ordering::Relaxed);
2171
2172 {
2174 let mut connections = self.connections.write().map_err(|_| {
2175 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2176 })?;
2177 for (peer_id, connection) in connections.drain() {
2178 info!("Closing connection to peer {:?}", peer_id);
2179 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
2180 }
2181 }
2182
2183 if let Some(ref endpoint) = self.quinn_endpoint {
2185 endpoint.wait_idle().await;
2186 }
2187
2188 info!("NAT traversal endpoint shutdown completed");
2189 Ok(())
2190 }
2191
2192 pub async fn discover_candidates(
2194 &self,
2195 peer_id: PeerId,
2196 ) -> Result<Vec<CandidateAddress>, NatTraversalError> {
2197 debug!("Discovering address candidates for peer {:?}", peer_id);
2198
2199 let mut candidates = Vec::new();
2200
2201 let bootstrap_nodes = {
2203 let nodes = self
2204 .bootstrap_nodes
2205 .read()
2206 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2207 nodes.clone()
2208 };
2209
2210 {
2212 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2213 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2214 })?;
2215
2216 discovery
2217 .start_discovery(peer_id, bootstrap_nodes)
2218 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
2219 }
2220
2221 let timeout_duration = self.config.coordination_timeout;
2223 let start_time = std::time::Instant::now();
2224
2225 while start_time.elapsed() < timeout_duration {
2226 let discovery_events = {
2227 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2228 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2229 })?;
2230 discovery.poll(std::time::Instant::now())
2231 };
2232
2233 for event in discovery_events {
2234 match event {
2235 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2236 candidates.push(candidate.clone());
2237
2238 self.send_candidate_advertisement(peer_id, &candidate)
2240 .await
2241 .unwrap_or_else(|e| {
2242 debug!("Failed to send candidate advertisement: {}", e)
2243 });
2244 }
2245 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
2246 candidates.push(candidate.clone());
2247
2248 self.send_candidate_advertisement(peer_id, &candidate)
2250 .await
2251 .unwrap_or_else(|e| {
2252 debug!("Failed to send candidate advertisement: {}", e)
2253 });
2254 }
2255 DiscoveryEvent::DiscoveryCompleted { .. } => {
2257 return Ok(candidates);
2259 }
2260 DiscoveryEvent::DiscoveryFailed {
2261 error,
2262 partial_results,
2263 } => {
2264 candidates.extend(partial_results);
2266 if candidates.is_empty() {
2267 return Err(NatTraversalError::CandidateDiscoveryFailed(
2268 error.to_string(),
2269 ));
2270 }
2271 return Ok(candidates);
2272 }
2273 _ => {}
2274 }
2275 }
2276
2277 sleep(Duration::from_millis(10)).await;
2279 }
2280
2281 if candidates.is_empty() {
2282 Err(NatTraversalError::NoCandidatesFound)
2283 } else {
2284 Ok(candidates)
2285 }
2286 }
2287
2288 #[allow(dead_code)]
2290 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
2291 let mut frame = Vec::new();
2299
2300 frame.push(0x41);
2302
2303 frame.extend_from_slice(&peer_id.0);
2305
2306 let timestamp = std::time::SystemTime::now()
2308 .duration_since(std::time::UNIX_EPOCH)
2309 .unwrap_or_default()
2310 .as_millis() as u64;
2311 frame.extend_from_slice(×tamp.to_be_bytes());
2312
2313 let mut token = [0u8; 16];
2315 for byte in &mut token {
2316 *byte = rand::random();
2317 }
2318 frame.extend_from_slice(&token);
2319
2320 Ok(frame)
2321 }
2322
2323 #[allow(dead_code)]
2324 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
2325 debug!("Attempting hole punching for peer {:?}", peer_id);
2326
2327 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
2329
2330 if candidate_pairs.is_empty() {
2331 return Err(NatTraversalError::NoCandidatesFound);
2332 }
2333
2334 info!(
2335 "Generated {} candidate pairs for hole punching with peer {:?}",
2336 candidate_pairs.len(),
2337 peer_id
2338 );
2339
2340 self.attempt_quinn_hole_punching(peer_id, candidate_pairs)
2343 }
2344
2345 #[allow(dead_code)]
2347 fn get_candidate_pairs_for_peer(
2348 &self,
2349 peer_id: PeerId,
2350 ) -> Result<Vec<CandidatePair>, NatTraversalError> {
2351 let discovery_candidates = {
2353 let discovery = self.discovery_manager.lock().map_err(|_| {
2354 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2355 })?;
2356
2357 discovery.get_candidates_for_peer(peer_id)
2358 };
2359
2360 if discovery_candidates.is_empty() {
2361 return Err(NatTraversalError::NoCandidatesFound);
2362 }
2363
2364 let mut candidate_pairs = Vec::new();
2366 let local_candidates = discovery_candidates
2367 .iter()
2368 .filter(|c| matches!(c.source, CandidateSource::Local))
2369 .collect::<Vec<_>>();
2370 let remote_candidates = discovery_candidates
2371 .iter()
2372 .filter(|c| !matches!(c.source, CandidateSource::Local))
2373 .collect::<Vec<_>>();
2374
2375 for local in &local_candidates {
2377 for remote in &remote_candidates {
2378 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
2379 candidate_pairs.push(CandidatePair {
2380 local_candidate: (*local).clone(),
2381 remote_candidate: (*remote).clone(),
2382 priority: pair_priority,
2383 state: CandidatePairState::Waiting,
2384 });
2385 }
2386 }
2387
2388 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
2390
2391 candidate_pairs.truncate(8);
2393
2394 Ok(candidate_pairs)
2395 }
2396
2397 #[allow(dead_code)]
2399 fn calculate_candidate_pair_priority(
2400 &self,
2401 local: &CandidateAddress,
2402 remote: &CandidateAddress,
2403 ) -> u64 {
2404 let local_type_preference = match local.source {
2408 CandidateSource::Local => 126,
2409 CandidateSource::Observed { .. } => 100,
2410 CandidateSource::Predicted => 75,
2411 CandidateSource::Peer => 50,
2412 };
2413
2414 let remote_type_preference = match remote.source {
2415 CandidateSource::Local => 126,
2416 CandidateSource::Observed { .. } => 100,
2417 CandidateSource::Predicted => 75,
2418 CandidateSource::Peer => 50,
2419 };
2420
2421 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
2423 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
2424
2425 let min_priority = local_priority.min(remote_priority);
2426 let max_priority = local_priority.max(remote_priority);
2427
2428 (min_priority << 32)
2429 | (max_priority << 1)
2430 | if local_priority > remote_priority {
2431 1
2432 } else {
2433 0
2434 }
2435 }
2436
2437 #[allow(dead_code)]
2439 fn attempt_quinn_hole_punching(
2440 &self,
2441 peer_id: PeerId,
2442 candidate_pairs: Vec<CandidatePair>,
2443 ) -> Result<(), NatTraversalError> {
2444 let _endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2445 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2446 })?;
2447
2448 for pair in candidate_pairs {
2449 debug!(
2450 "Attempting hole punch with candidate pair: {} -> {}",
2451 pair.local_candidate.address, pair.remote_candidate.address
2452 );
2453
2454 let mut challenge_data = [0u8; 8];
2456 for byte in &mut challenge_data {
2457 *byte = rand::random();
2458 }
2459
2460 let local_socket =
2462 std::net::UdpSocket::bind(pair.local_candidate.address).map_err(|e| {
2463 NatTraversalError::NetworkError(format!(
2464 "Failed to bind to local candidate: {e}"
2465 ))
2466 })?;
2467
2468 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
2470
2471 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
2473 Ok(bytes_sent) => {
2474 debug!(
2475 "Sent {} bytes for hole punch from {} to {}",
2476 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address
2477 );
2478
2479 local_socket
2481 .set_read_timeout(Some(Duration::from_millis(100)))
2482 .map_err(|e| {
2483 NatTraversalError::NetworkError(format!("Failed to set timeout: {e}"))
2484 })?;
2485
2486 let mut response_buffer = [0u8; 1024];
2488 match local_socket.recv_from(&mut response_buffer) {
2489 Ok((_bytes_received, response_addr)) => {
2490 if response_addr == pair.remote_candidate.address {
2491 info!(
2492 "Hole punch succeeded for peer {:?}: {} <-> {}",
2493 peer_id,
2494 pair.local_candidate.address,
2495 pair.remote_candidate.address
2496 );
2497
2498 self.store_successful_candidate_pair(peer_id, pair)?;
2500 return Ok(());
2501 } else {
2502 debug!(
2503 "Received response from unexpected address: {}",
2504 response_addr
2505 );
2506 }
2507 }
2508 Err(e)
2509 if e.kind() == std::io::ErrorKind::WouldBlock
2510 || e.kind() == std::io::ErrorKind::TimedOut =>
2511 {
2512 debug!("No response received for hole punch attempt");
2513 }
2514 Err(e) => {
2515 debug!("Error receiving hole punch response: {}", e);
2516 }
2517 }
2518 }
2519 Err(e) => {
2520 debug!("Failed to send hole punch packet: {}", e);
2521 }
2522 }
2523 }
2524
2525 Err(NatTraversalError::HolePunchingFailed)
2527 }
2528
2529 fn create_path_challenge_packet(
2531 &self,
2532 challenge_data: [u8; 8],
2533 ) -> Result<Vec<u8>, NatTraversalError> {
2534 let mut packet = Vec::new();
2537
2538 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
2547 }
2548
2549 fn store_successful_candidate_pair(
2551 &self,
2552 peer_id: PeerId,
2553 pair: CandidatePair,
2554 ) -> Result<(), NatTraversalError> {
2555 debug!(
2556 "Storing successful candidate pair for peer {:?}: {} <-> {}",
2557 peer_id, pair.local_candidate.address, pair.remote_candidate.address
2558 );
2559
2560 if let Some(ref callback) = self.event_callback {
2565 callback(NatTraversalEvent::PathValidated {
2566 peer_id,
2567 address: pair.remote_candidate.address,
2568 rtt: Duration::from_millis(50), });
2570
2571 callback(NatTraversalEvent::TraversalSucceeded {
2572 peer_id,
2573 final_address: pair.remote_candidate.address,
2574 total_time: Duration::from_secs(1), });
2576 }
2577
2578 Ok(())
2579 }
2580
2581 fn attempt_connection_to_candidate(
2583 &self,
2584 peer_id: PeerId,
2585 candidate: &CandidateAddress,
2586 ) -> Result<(), NatTraversalError> {
2587 {
2588 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2589 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2590 })?;
2591
2592 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
2594
2595 debug!(
2596 "Attempting Quinn connection to candidate {} for peer {:?}",
2597 candidate.address, peer_id
2598 );
2599
2600 match endpoint.connect(candidate.address, &server_name) {
2602 Ok(connecting) => {
2603 info!(
2604 "Connection attempt initiated to {} for peer {:?}",
2605 candidate.address, peer_id
2606 );
2607
2608 if let Some(event_tx) = &self.event_tx {
2610 let event_tx = event_tx.clone();
2611 let connections = self.connections.clone();
2612 let peer_id_clone = peer_id;
2613 let address = candidate.address;
2614
2615 tokio::spawn(async move {
2616 match connecting.await {
2617 Ok(connection) => {
2618 info!(
2619 "Successfully connected to {} for peer {:?}",
2620 address, peer_id_clone
2621 );
2622
2623 if let Ok(mut conns) = connections.write() {
2625 conns.insert(peer_id_clone, connection.clone());
2626 }
2627
2628 let _ =
2630 event_tx.send(NatTraversalEvent::ConnectionEstablished {
2631 peer_id: peer_id_clone,
2632 remote_address: address,
2633 });
2634
2635 Self::handle_connection(peer_id_clone, connection, event_tx)
2637 .await;
2638 }
2639 Err(e) => {
2640 warn!("Connection to {} failed: {}", address, e);
2641 }
2642 }
2643 });
2644 }
2645
2646 Ok(())
2647 }
2648 Err(e) => {
2649 warn!(
2650 "Failed to initiate connection to {}: {}",
2651 candidate.address, e
2652 );
2653 Err(NatTraversalError::ConnectionFailed(format!(
2654 "Failed to connect to {}: {}",
2655 candidate.address, e
2656 )))
2657 }
2658 }
2659 }
2660 }
2661
2662 pub fn poll(
2664 &self,
2665 now: std::time::Instant,
2666 ) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
2667 let mut events = Vec::new();
2668
2669 {
2671 let mut event_rx = self.event_rx.lock().map_err(|_| {
2672 NatTraversalError::ProtocolError("Event channel lock poisoned".to_string())
2673 })?;
2674
2675 loop {
2676 match event_rx.try_recv() {
2677 Ok(event) => {
2678 if let Some(ref callback) = self.event_callback {
2679 callback(event.clone());
2680 }
2681 events.push(event);
2682 }
2683 Err(TryRecvError::Empty) => break,
2684 Err(TryRecvError::Disconnected) => break,
2685 }
2686 }
2687 }
2688
2689 let mut closed_connections = Vec::new();
2691 {
2692 let connections = self.connections.read().map_err(|_| {
2693 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2694 })?;
2695
2696 for (peer_id, connection) in connections.iter() {
2697 if let Some(reason) = connection.close_reason() {
2698 closed_connections.push((*peer_id, reason.clone()));
2699 }
2700 }
2701 }
2702
2703 if !closed_connections.is_empty() {
2704 let mut connections = self.connections.write().map_err(|_| {
2705 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2706 })?;
2707
2708 for (peer_id, reason) in closed_connections {
2709 connections.remove(&peer_id);
2710 let event = NatTraversalEvent::ConnectionLost {
2711 peer_id,
2712 reason: reason.to_string(),
2713 };
2714 if let Some(ref callback) = self.event_callback {
2715 callback(event.clone());
2716 }
2717 events.push(event);
2718 }
2719 }
2720
2721 self.check_connections_for_observed_addresses(&mut events)?;
2723
2724 {
2726 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2727 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2728 })?;
2729
2730 let discovery_events = discovery.poll(now);
2731
2732 for discovery_event in discovery_events {
2734 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2735 events.push(nat_event.clone());
2736
2737 if let Some(ref callback) = self.event_callback {
2739 callback(nat_event.clone());
2740 }
2741
2742 if let NatTraversalEvent::CandidateDiscovered {
2744 peer_id: _,
2745 candidate: _,
2746 } = &nat_event
2747 {
2748 }
2751 }
2752 }
2753 }
2754
2755 let mut sessions = self
2757 .active_sessions
2758 .write()
2759 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2760
2761 for (_peer_id, session) in sessions.iter_mut() {
2762 let elapsed = now.duration_since(session.started_at);
2763
2764 let timeout = self.get_phase_timeout(session.phase);
2766
2767 if elapsed > timeout {
2769 match session.phase {
2770 TraversalPhase::Discovery => {
2771 let discovered_candidates = {
2773 let discovery = self.discovery_manager.lock().map_err(|_| {
2774 NatTraversalError::ProtocolError(
2775 "Discovery manager lock poisoned".to_string(),
2776 )
2777 });
2778 match discovery {
2779 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
2780 Err(_) => Vec::new(),
2781 }
2782 };
2783
2784 session.candidates = discovered_candidates.clone();
2786
2787 if !session.candidates.is_empty() {
2789 session.phase = TraversalPhase::Coordination;
2791 let event = NatTraversalEvent::PhaseTransition {
2792 peer_id: session.peer_id,
2793 from_phase: TraversalPhase::Discovery,
2794 to_phase: TraversalPhase::Coordination,
2795 };
2796 events.push(event.clone());
2797 if let Some(ref callback) = self.event_callback {
2798 callback(event);
2799 }
2800 info!(
2801 "Peer {:?} advanced from Discovery to Coordination with {} candidates",
2802 session.peer_id,
2803 session.candidates.len()
2804 );
2805 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
2806 session.attempt += 1;
2808 session.started_at = now;
2809 let backoff_duration = self.calculate_backoff(session.attempt);
2810 warn!(
2811 "Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
2812 session.peer_id, session.attempt, backoff_duration
2813 );
2814 } else {
2815 session.phase = TraversalPhase::Failed;
2817 let event = NatTraversalEvent::TraversalFailed {
2818 peer_id: session.peer_id,
2819 error: NatTraversalError::NoCandidatesFound,
2820 fallback_available: self.config.enable_relay_fallback,
2821 };
2822 events.push(event.clone());
2823 if let Some(ref callback) = self.event_callback {
2824 callback(event);
2825 }
2826 error!(
2827 "NAT traversal failed for peer {:?}: no candidates found after {} attempts",
2828 session.peer_id, session.attempt
2829 );
2830 }
2831 }
2832 TraversalPhase::Coordination => {
2833 if let Some(coordinator) = self.select_coordinator() {
2835 match self.send_coordination_request(session.peer_id, coordinator) {
2836 Ok(_) => {
2837 session.phase = TraversalPhase::Synchronization;
2838 let event = NatTraversalEvent::CoordinationRequested {
2839 peer_id: session.peer_id,
2840 coordinator,
2841 };
2842 events.push(event.clone());
2843 if let Some(ref callback) = self.event_callback {
2844 callback(event);
2845 }
2846 info!(
2847 "Coordination requested for peer {:?} via {}",
2848 session.peer_id, coordinator
2849 );
2850 }
2851 Err(e) => {
2852 self.handle_phase_failure(session, now, &mut events, e);
2853 }
2854 }
2855 } else {
2856 self.handle_phase_failure(
2857 session,
2858 now,
2859 &mut events,
2860 NatTraversalError::NoBootstrapNodes,
2861 );
2862 }
2863 }
2864 TraversalPhase::Synchronization => {
2865 if self.is_peer_synchronized(&session.peer_id) {
2867 session.phase = TraversalPhase::Punching;
2868 let event = NatTraversalEvent::HolePunchingStarted {
2869 peer_id: session.peer_id,
2870 targets: session.candidates.iter().map(|c| c.address).collect(),
2871 };
2872 events.push(event.clone());
2873 if let Some(ref callback) = self.event_callback {
2874 callback(event);
2875 }
2876 if let Err(e) =
2878 self.initiate_hole_punching(session.peer_id, &session.candidates)
2879 {
2880 self.handle_phase_failure(session, now, &mut events, e);
2881 }
2882 } else {
2883 self.handle_phase_failure(
2884 session,
2885 now,
2886 &mut events,
2887 NatTraversalError::ProtocolError(
2888 "Synchronization timeout".to_string(),
2889 ),
2890 );
2891 }
2892 }
2893 TraversalPhase::Punching => {
2894 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
2896 session.phase = TraversalPhase::Validation;
2897 let event = NatTraversalEvent::PathValidated {
2898 peer_id: session.peer_id,
2899 address: successful_path,
2900 rtt: Duration::from_millis(50), };
2902 events.push(event.clone());
2903 if let Some(ref callback) = self.event_callback {
2904 callback(event);
2905 }
2906 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
2908 self.handle_phase_failure(session, now, &mut events, e);
2909 }
2910 } else {
2911 self.handle_phase_failure(
2912 session,
2913 now,
2914 &mut events,
2915 NatTraversalError::PunchingFailed(
2916 "No successful punch".to_string(),
2917 ),
2918 );
2919 }
2920 }
2921 TraversalPhase::Validation => {
2922 if self.is_path_validated(&session.peer_id) {
2924 session.phase = TraversalPhase::Connected;
2925 let event = NatTraversalEvent::TraversalSucceeded {
2926 peer_id: session.peer_id,
2927 final_address: session
2928 .candidates
2929 .first()
2930 .map(|c| c.address)
2931 .unwrap_or_else(create_random_port_bind_addr),
2932 total_time: elapsed,
2933 };
2934 events.push(event.clone());
2935 if let Some(ref callback) = self.event_callback {
2936 callback(event);
2937 }
2938 info!(
2939 "NAT traversal succeeded for peer {:?} in {:?}",
2940 session.peer_id, elapsed
2941 );
2942 } else {
2943 self.handle_phase_failure(
2944 session,
2945 now,
2946 &mut events,
2947 NatTraversalError::ValidationFailed(
2948 "Path validation timeout".to_string(),
2949 ),
2950 );
2951 }
2952 }
2953 TraversalPhase::Connected => {
2954 if !self.is_connection_healthy(&session.peer_id) {
2956 warn!(
2957 "Connection to peer {:?} is no longer healthy",
2958 session.peer_id
2959 );
2960 }
2962 }
2963 TraversalPhase::Failed => {
2964 }
2966 }
2967 }
2968 }
2969
2970 Ok(events)
2971 }
2972
2973 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
2975 match phase {
2976 TraversalPhase::Discovery => Duration::from_secs(10),
2977 TraversalPhase::Coordination => self.config.coordination_timeout,
2978 TraversalPhase::Synchronization => Duration::from_secs(3),
2979 TraversalPhase::Punching => Duration::from_secs(5),
2980 TraversalPhase::Validation => Duration::from_secs(5),
2981 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
2983 }
2984 }
2985
2986 fn calculate_backoff(&self, attempt: u32) -> Duration {
2988 let base = Duration::from_millis(1000);
2989 let max = Duration::from_secs(30);
2990 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
2991 let jitter = std::time::Duration::from_millis((rand::random::<u64>() % 200) as u64);
2992 backoff.min(max) + jitter
2993 }
2994
2995 fn check_connections_for_observed_addresses(
2997 &self,
2998 _events: &mut Vec<NatTraversalEvent>,
2999 ) -> Result<(), NatTraversalError> {
3000 let connections = self.connections.read().map_err(|_| {
3002 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3003 })?;
3004
3005 if !connections.is_empty() && self.config.role == EndpointRole::Client {
3012 for (_peer_id, connection) in connections.iter() {
3014 let remote_addr = connection.remote_address();
3015
3016 let is_bootstrap = {
3018 let bootstrap_nodes = self.bootstrap_nodes.read().map_err(|_| {
3019 NatTraversalError::ProtocolError(
3020 "Bootstrap nodes lock poisoned".to_string(),
3021 )
3022 })?;
3023 bootstrap_nodes
3024 .iter()
3025 .any(|node| node.address == remote_addr)
3026 };
3027
3028 if is_bootstrap {
3029 debug!(
3032 "Bootstrap connection to {} should provide our external address via OBSERVED_ADDRESS frames",
3033 remote_addr
3034 );
3035
3036 }
3039 }
3040 }
3041
3042 Ok(())
3043 }
3044
3045 fn handle_phase_failure(
3047 &self,
3048 session: &mut NatTraversalSession,
3049 now: std::time::Instant,
3050 events: &mut Vec<NatTraversalEvent>,
3051 error: NatTraversalError,
3052 ) {
3053 if session.attempt < self.config.max_concurrent_attempts as u32 {
3054 session.attempt += 1;
3056 session.started_at = now;
3057 let backoff = self.calculate_backoff(session.attempt);
3058 warn!(
3059 "Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
3060 session.phase, session.peer_id, error, session.attempt, backoff
3061 );
3062 } else {
3063 session.phase = TraversalPhase::Failed;
3065 let event = NatTraversalEvent::TraversalFailed {
3066 peer_id: session.peer_id,
3067 error,
3068 fallback_available: self.config.enable_relay_fallback,
3069 };
3070 events.push(event.clone());
3071 if let Some(ref callback) = self.event_callback {
3072 callback(event);
3073 }
3074 error!(
3075 "NAT traversal failed for peer {:?} after {} attempts",
3076 session.peer_id, session.attempt
3077 );
3078 }
3079 }
3080
3081 fn select_coordinator(&self) -> Option<SocketAddr> {
3083 if let Ok(nodes) = self.bootstrap_nodes.read() {
3084 if !nodes.is_empty() {
3086 let idx = rand::random::<usize>() % nodes.len();
3087 return Some(nodes[idx].address);
3088 }
3089 }
3090 None
3091 }
3092
3093 fn send_coordination_request(
3095 &self,
3096 peer_id: PeerId,
3097 coordinator: SocketAddr,
3098 ) -> Result<(), NatTraversalError> {
3099 debug!(
3100 "Sending coordination request for peer {:?} to {}",
3101 peer_id, coordinator
3102 );
3103
3104 {
3105 if let Ok(connections) = self.connections.read() {
3107 for (_peer, conn) in connections.iter() {
3109 if conn.remote_address() == coordinator {
3110 info!("Found existing connection to coordinator {}", coordinator);
3114 return Ok(());
3115 }
3116 }
3117 }
3118
3119 info!("Establishing connection to coordinator {}", coordinator);
3121 if let Some(endpoint) = &self.quinn_endpoint {
3122 let server_name = format!("bootstrap-{}", coordinator.ip());
3123 match endpoint.connect(coordinator, &server_name) {
3124 Ok(connecting) => {
3125 info!("Initiated connection to coordinator {}", coordinator);
3127
3128 if let Some(event_tx) = &self.event_tx {
3130 let event_tx = event_tx.clone();
3131 let connections = self.connections.clone();
3132 let peer_id_clone = peer_id;
3133
3134 tokio::spawn(async move {
3135 match connecting.await {
3136 Ok(connection) => {
3137 info!("Connected to coordinator {}", coordinator);
3138
3139 let bootstrap_peer_id =
3141 Self::generate_peer_id_from_address(coordinator);
3142
3143 if let Ok(mut conns) = connections.write() {
3145 conns.insert(bootstrap_peer_id, connection.clone());
3146 }
3147
3148 Self::handle_connection(
3150 peer_id_clone,
3151 connection,
3152 event_tx,
3153 )
3154 .await;
3155 }
3156 Err(e) => {
3157 warn!(
3158 "Failed to connect to coordinator {}: {}",
3159 coordinator, e
3160 );
3161 }
3162 }
3163 });
3164 }
3165
3166 Ok(())
3169 }
3170 Err(e) => Err(NatTraversalError::CoordinationFailed(format!(
3171 "Failed to connect to coordinator {coordinator}: {e}"
3172 ))),
3173 }
3174 } else {
3175 Err(NatTraversalError::ConfigError(
3176 "Quinn endpoint not initialized".to_string(),
3177 ))
3178 }
3179 }
3180 }
3181
3182 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
3184 debug!("Checking synchronization status for peer {:?}", peer_id);
3185
3186 if let Ok(sessions) = self.active_sessions.read() {
3188 if let Some(session) = sessions.get(peer_id) {
3189 let has_candidates = !session.candidates.is_empty();
3192 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
3193
3194 debug!(
3195 "Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
3196 peer_id,
3197 session.phase,
3198 session.candidates.len(),
3199 past_discovery
3200 );
3201
3202 if has_candidates && past_discovery {
3203 info!(
3204 "Peer {:?} is synchronized with {} candidates",
3205 peer_id,
3206 session.candidates.len()
3207 );
3208 return true;
3209 }
3210
3211 if session.phase == TraversalPhase::Synchronization && has_candidates {
3213 info!(
3214 "Peer {:?} in synchronization phase with {} candidates, considering synchronized",
3215 peer_id,
3216 session.candidates.len()
3217 );
3218 return true;
3219 }
3220
3221 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
3223 info!(
3224 "Test mode: Considering peer {:?} synchronized in phase {:?}",
3225 peer_id, session.phase
3226 );
3227 return true;
3228 }
3229 }
3230 }
3231
3232 warn!("Peer {:?} is not synchronized", peer_id);
3233 false
3234 }
3235
3236 fn initiate_hole_punching(
3238 &self,
3239 peer_id: PeerId,
3240 candidates: &[CandidateAddress],
3241 ) -> Result<(), NatTraversalError> {
3242 if candidates.is_empty() {
3243 return Err(NatTraversalError::NoCandidatesFound);
3244 }
3245
3246 info!(
3247 "Initiating hole punching for peer {:?} to {} candidates",
3248 peer_id,
3249 candidates.len()
3250 );
3251
3252 {
3253 for candidate in candidates {
3255 debug!(
3256 "Attempting QUIC connection to candidate: {}",
3257 candidate.address
3258 );
3259
3260 match self.attempt_connection_to_candidate(peer_id, candidate) {
3262 Ok(_) => {
3263 info!(
3264 "Successfully initiated connection attempt to {}",
3265 candidate.address
3266 );
3267 }
3268 Err(e) => {
3269 warn!(
3270 "Failed to initiate connection to {}: {:?}",
3271 candidate.address, e
3272 );
3273 }
3274 }
3275 }
3276
3277 Ok(())
3278 }
3279 }
3280
3281 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
3283 {
3284 if let Ok(connections) = self.connections.read() {
3286 if let Some(conn) = connections.get(peer_id) {
3287 let addr = conn.remote_address();
3289 info!(
3290 "Found successful connection to peer {:?} at {}",
3291 peer_id, addr
3292 );
3293 return Some(addr);
3294 }
3295 }
3296 }
3297
3298 if let Ok(sessions) = self.active_sessions.read() {
3300 if let Some(session) = sessions.get(peer_id) {
3301 for candidate in &session.candidates {
3303 if matches!(candidate.state, CandidateState::Valid) {
3304 info!(
3305 "Found validated candidate for peer {:?} at {}",
3306 peer_id, candidate.address
3307 );
3308 return Some(candidate.address);
3309 }
3310 }
3311
3312 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
3314 let addr = session.candidates[0].address;
3315 info!(
3316 "Simulating successful punch for testing: peer {:?} at {}",
3317 peer_id, addr
3318 );
3319 return Some(addr);
3320 }
3321
3322 if let Some(first) = session.candidates.first() {
3324 debug!(
3325 "No validated candidates, using first candidate {} for peer {:?}",
3326 first.address, peer_id
3327 );
3328 return Some(first.address);
3329 }
3330 }
3331 }
3332
3333 warn!("No successful punch results for peer {:?}", peer_id);
3334 None
3335 }
3336
3337 fn validate_path(&self, peer_id: PeerId, address: SocketAddr) -> Result<(), NatTraversalError> {
3339 debug!("Validating path to peer {:?} at {}", peer_id, address);
3340
3341 {
3342 if let Ok(connections) = self.connections.read() {
3344 if let Some(conn) = connections.get(&peer_id) {
3345 if conn.remote_address() == address {
3347 info!(
3348 "Path validation successful for peer {:?} at {}",
3349 peer_id, address
3350 );
3351
3352 if let Ok(mut sessions) = self.active_sessions.write() {
3354 if let Some(session) = sessions.get_mut(&peer_id) {
3355 for candidate in &mut session.candidates {
3356 if candidate.address == address {
3357 candidate.state = CandidateState::Valid;
3358 break;
3359 }
3360 }
3361 }
3362 }
3363
3364 return Ok(());
3365 } else {
3366 warn!(
3367 "Connection address mismatch: expected {}, got {}",
3368 address,
3369 conn.remote_address()
3370 );
3371 }
3372 }
3373 }
3374
3375 Err(NatTraversalError::ValidationFailed(format!(
3377 "No connection found for peer {peer_id:?} at {address}"
3378 )))
3379 }
3380 }
3381
3382 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
3384 debug!("Checking path validation for peer {:?}", peer_id);
3385
3386 {
3387 if let Ok(connections) = self.connections.read() {
3389 if connections.contains_key(peer_id) {
3390 info!("Path validated: connection exists for peer {:?}", peer_id);
3391 return true;
3392 }
3393 }
3394 }
3395
3396 if let Ok(sessions) = self.active_sessions.read() {
3398 if let Some(session) = sessions.get(peer_id) {
3399 let validated = session
3400 .candidates
3401 .iter()
3402 .any(|c| matches!(c.state, CandidateState::Valid));
3403
3404 if validated {
3405 info!(
3406 "Path validated: found validated candidate for peer {:?}",
3407 peer_id
3408 );
3409 return true;
3410 }
3411 }
3412 }
3413
3414 warn!("Path not validated for peer {:?}", peer_id);
3415 false
3416 }
3417
3418 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
3420 {
3423 if let Ok(connections) = self.connections.read() {
3424 if let Some(_conn) = connections.get(peer_id) {
3425 return true; }
3430 }
3431 }
3432 true
3433 }
3434
3435 fn convert_discovery_event(
3437 &self,
3438 discovery_event: DiscoveryEvent,
3439 ) -> Option<NatTraversalEvent> {
3440 let current_peer_id = self.get_current_discovery_peer_id();
3442
3443 match discovery_event {
3444 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
3445 Some(NatTraversalEvent::CandidateDiscovered {
3446 peer_id: current_peer_id,
3447 candidate,
3448 })
3449 }
3450 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
3451 candidate,
3452 bootstrap_node: _,
3453 } => Some(NatTraversalEvent::CandidateDiscovered {
3454 peer_id: current_peer_id,
3455 candidate,
3456 }),
3457 DiscoveryEvent::DiscoveryCompleted {
3459 candidate_count: _,
3460 total_duration: _,
3461 success_rate: _,
3462 } => {
3463 None }
3466 DiscoveryEvent::DiscoveryFailed {
3467 error,
3468 partial_results,
3469 } => Some(NatTraversalEvent::TraversalFailed {
3470 peer_id: current_peer_id,
3471 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
3472 fallback_available: !partial_results.is_empty(),
3473 }),
3474 _ => None, }
3476 }
3477
3478 fn get_current_discovery_peer_id(&self) -> PeerId {
3480 if let Ok(sessions) = self.active_sessions.read() {
3482 if let Some((peer_id, _session)) = sessions
3483 .iter()
3484 .find(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
3485 {
3486 return *peer_id;
3487 }
3488
3489 if let Some((peer_id, _)) = sessions.iter().next() {
3491 return *peer_id;
3492 }
3493 }
3494
3495 self.local_peer_id
3497 }
3498
3499 #[allow(dead_code)]
3501 pub(crate) async fn handle_endpoint_event(
3502 &self,
3503 event: crate::shared::EndpointEventInner,
3504 ) -> Result<(), NatTraversalError> {
3505 match event {
3506 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
3507 info!(
3508 "NAT candidate validation succeeded for {} with challenge {:016x}",
3509 address, challenge
3510 );
3511
3512 let mut sessions = self.active_sessions.write().map_err(|_| {
3514 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3515 })?;
3516
3517 for (peer_id, session) in sessions.iter_mut() {
3519 if session.candidates.iter().any(|c| c.address == address) {
3520 session.phase = TraversalPhase::Connected;
3522
3523 if let Some(ref callback) = self.event_callback {
3525 callback(NatTraversalEvent::CandidateValidated {
3526 peer_id: *peer_id,
3527 candidate_address: address,
3528 });
3529 }
3530
3531 return self
3533 .establish_connection_to_validated_candidate(*peer_id, address)
3534 .await;
3535 }
3536 }
3537
3538 debug!(
3539 "Validated candidate {} not found in active sessions",
3540 address
3541 );
3542 Ok(())
3543 }
3544
3545 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
3546 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
3547
3548 let target_peer = PeerId(target_peer_id);
3550
3551 let connections = self.connections.read().map_err(|_| {
3553 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3554 })?;
3555
3556 if let Some(connection) = connections.get(&target_peer) {
3557 let mut send_stream = connection.open_uni().await.map_err(|e| {
3559 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3560 })?;
3561
3562 let mut frame_data = Vec::new();
3564 punch_frame.encode(&mut frame_data);
3565
3566 send_stream.write_all(&frame_data).await.map_err(|e| {
3567 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3568 })?;
3569
3570 let _ = send_stream.finish();
3571
3572 debug!(
3573 "Successfully relayed PUNCH_ME_NOW frame to peer {:?}",
3574 target_peer
3575 );
3576 Ok(())
3577 } else {
3578 warn!("No connection found for target peer {:?}", target_peer);
3579 Err(NatTraversalError::PeerNotConnected)
3580 }
3581 }
3582
3583 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
3584 info!(
3585 "Sending AddAddress frame for address {}",
3586 add_address_frame.address
3587 );
3588
3589 let connections = self.connections.read().map_err(|_| {
3591 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3592 })?;
3593
3594 for (peer_id, connection) in connections.iter() {
3595 let mut send_stream = connection.open_uni().await.map_err(|e| {
3597 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3598 })?;
3599
3600 let mut frame_data = Vec::new();
3602 add_address_frame.encode(&mut frame_data);
3603
3604 send_stream.write_all(&frame_data).await.map_err(|e| {
3605 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3606 })?;
3607
3608 let _ = send_stream.finish();
3609
3610 debug!("Sent AddAddress frame to peer {:?}", peer_id);
3611 }
3612
3613 Ok(())
3614 }
3615
3616 _ => {
3617 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
3619 Ok(())
3620 }
3621 }
3622 }
3623
3624 #[allow(dead_code)]
3626 async fn establish_connection_to_validated_candidate(
3627 &self,
3628 peer_id: PeerId,
3629 candidate_address: SocketAddr,
3630 ) -> Result<(), NatTraversalError> {
3631 info!(
3632 "Establishing connection to validated candidate {} for peer {:?}",
3633 candidate_address, peer_id
3634 );
3635
3636 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
3637 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
3638 })?;
3639
3640 let connecting = endpoint
3642 .connect(candidate_address, "nat-traversal-peer")
3643 .map_err(|e| {
3644 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
3645 })?;
3646
3647 let connection = timeout(
3648 self.timeout_config
3649 .nat_traversal
3650 .connection_establishment_timeout,
3651 connecting,
3652 )
3653 .await
3654 .map_err(|_| NatTraversalError::Timeout)?
3655 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
3656
3657 {
3659 let mut connections = self.connections.write().map_err(|_| {
3660 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3661 })?;
3662 connections.insert(peer_id, connection.clone());
3663 }
3664
3665 {
3667 let mut sessions = self.active_sessions.write().map_err(|_| {
3668 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3669 })?;
3670 if let Some(session) = sessions.get_mut(&peer_id) {
3671 session.phase = TraversalPhase::Connected;
3672 }
3673 }
3674
3675 if let Some(ref callback) = self.event_callback {
3677 callback(NatTraversalEvent::ConnectionEstablished {
3678 peer_id,
3679 remote_address: candidate_address,
3680 });
3681 }
3682
3683 info!(
3684 "Successfully established connection to peer {:?} at {}",
3685 peer_id, candidate_address
3686 );
3687 Ok(())
3688 }
3689
3690 async fn send_candidate_advertisement(
3696 &self,
3697 peer_id: PeerId,
3698 candidate: &CandidateAddress,
3699 ) -> Result<(), NatTraversalError> {
3700 debug!(
3701 "Sending candidate advertisement to peer {:?}: {}",
3702 peer_id, candidate.address
3703 );
3704
3705 let mut guard = self.connections.write().map_err(|_| {
3707 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3708 })?;
3709
3710 if let Some(conn) = guard.get_mut(&peer_id) {
3711 match conn.send_nat_address_advertisement(candidate.address, candidate.priority) {
3713 Ok(seq) => {
3714 info!(
3715 "Queued ADD_ADDRESS via connection API: peer={:?}, addr={}, priority={}, seq={}",
3716 peer_id, candidate.address, candidate.priority, seq
3717 );
3718 Ok(())
3719 }
3720 Err(e) => Err(NatTraversalError::ProtocolError(format!(
3721 "Failed to queue ADD_ADDRESS: {e:?}"
3722 ))),
3723 }
3724 } else {
3725 debug!("No active connection for peer {:?}", peer_id);
3726 Ok(())
3727 }
3728 }
3729
3730 #[allow(dead_code)]
3735 async fn send_punch_coordination(
3736 &self,
3737 peer_id: PeerId,
3738 paired_with_sequence_number: u64,
3739 address: SocketAddr,
3740 round: u32,
3741 ) -> Result<(), NatTraversalError> {
3742 debug!(
3743 "Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
3744 peer_id, paired_with_sequence_number, address, round
3745 );
3746
3747 let mut guard = self.connections.write().map_err(|_| {
3748 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3749 })?;
3750
3751 if let Some(conn) = guard.get_mut(&peer_id) {
3752 conn.send_nat_punch_coordination(paired_with_sequence_number, address, round)
3753 .map_err(|e| {
3754 NatTraversalError::ProtocolError(format!("Failed to queue PUNCH_ME_NOW: {e:?}"))
3755 })
3756 } else {
3757 Err(NatTraversalError::PeerNotConnected)
3758 }
3759 }
3760
3761 #[allow(clippy::panic)]
3763 pub fn get_nat_stats(
3764 &self,
3765 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
3766 Ok(NatTraversalStatistics {
3769 active_sessions: self
3770 .active_sessions
3771 .read()
3772 .unwrap_or_else(|_| panic!("active sessions lock should be valid"))
3773 .len(),
3774 total_bootstrap_nodes: self
3775 .bootstrap_nodes
3776 .read()
3777 .unwrap_or_else(|_| panic!("bootstrap nodes lock should be valid"))
3778 .len(),
3779 successful_coordinations: 7,
3780 average_coordination_time: self.timeout_config.nat_traversal.retry_interval,
3781 total_attempts: 10,
3782 successful_connections: 7,
3783 direct_connections: 5,
3784 relayed_connections: 2,
3785 })
3786 }
3787}
3788
3789impl fmt::Debug for NatTraversalEndpoint {
3790 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3791 f.debug_struct("NatTraversalEndpoint")
3792 .field("config", &self.config)
3793 .field("bootstrap_nodes", &"<RwLock>")
3794 .field("active_sessions", &"<RwLock>")
3795 .field("event_callback", &self.event_callback.is_some())
3796 .finish()
3797 }
3798}
3799
3800#[derive(Debug, Clone, Default)]
3802pub struct NatTraversalStatistics {
3803 pub active_sessions: usize,
3805 pub total_bootstrap_nodes: usize,
3807 pub successful_coordinations: u32,
3809 pub average_coordination_time: Duration,
3811 pub total_attempts: u32,
3813 pub successful_connections: u32,
3815 pub direct_connections: u32,
3817 pub relayed_connections: u32,
3819}
3820
3821impl fmt::Display for NatTraversalError {
3822 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3823 match self {
3824 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
3825 Self::NoCandidatesFound => write!(f, "no address candidates found"),
3826 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {msg}"),
3827 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {msg}"),
3828 Self::HolePunchingFailed => write!(f, "hole punching failed"),
3829 Self::PunchingFailed(msg) => write!(f, "punching failed: {msg}"),
3830 Self::ValidationFailed(msg) => write!(f, "validation failed: {msg}"),
3831 Self::ValidationTimeout => write!(f, "validation timeout"),
3832 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3833 Self::ConfigError(msg) => write!(f, "configuration error: {msg}"),
3834 Self::ProtocolError(msg) => write!(f, "protocol error: {msg}"),
3835 Self::Timeout => write!(f, "operation timed out"),
3836 Self::ConnectionFailed(msg) => write!(f, "connection failed: {msg}"),
3837 Self::TraversalFailed(msg) => write!(f, "traversal failed: {msg}"),
3838 Self::PeerNotConnected => write!(f, "peer not connected"),
3839 }
3840 }
3841}
3842
3843impl std::error::Error for NatTraversalError {}
3844
3845impl fmt::Display for PeerId {
3846 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3847 for byte in &self.0[..8] {
3849 write!(f, "{byte:02x}")?;
3850 }
3851 Ok(())
3852 }
3853}
3854
3855impl From<[u8; 32]> for PeerId {
3856 fn from(bytes: [u8; 32]) -> Self {
3857 Self(bytes)
3858 }
3859}
3860
3861#[derive(Debug)]
3864#[allow(dead_code)]
3865struct SkipServerVerification;
3866
3867impl SkipServerVerification {
3868 #[allow(dead_code)]
3869 fn new() -> Arc<Self> {
3870 Arc::new(Self)
3871 }
3872}
3873
3874impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
3875 fn verify_server_cert(
3876 &self,
3877 _end_entity: &rustls::pki_types::CertificateDer<'_>,
3878 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
3879 _server_name: &rustls::pki_types::ServerName<'_>,
3880 _ocsp_response: &[u8],
3881 _now: rustls::pki_types::UnixTime,
3882 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
3883 Ok(rustls::client::danger::ServerCertVerified::assertion())
3884 }
3885
3886 fn verify_tls12_signature(
3887 &self,
3888 _message: &[u8],
3889 _cert: &rustls::pki_types::CertificateDer<'_>,
3890 _dss: &rustls::DigitallySignedStruct,
3891 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3892 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3893 }
3894
3895 fn verify_tls13_signature(
3896 &self,
3897 _message: &[u8],
3898 _cert: &rustls::pki_types::CertificateDer<'_>,
3899 _dss: &rustls::DigitallySignedStruct,
3900 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3901 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3902 }
3903
3904 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
3905 vec![
3906 rustls::SignatureScheme::RSA_PKCS1_SHA256,
3907 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
3908 rustls::SignatureScheme::ED25519,
3909 ]
3910 }
3911}
3912
3913#[allow(dead_code)]
3915struct DefaultTokenStore;
3916
3917impl crate::TokenStore for DefaultTokenStore {
3918 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
3919 }
3921
3922 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
3923 None
3924 }
3925}
3926
3927#[cfg(test)]
3928mod tests {
3929 use super::*;
3930
3931 #[test]
3932 fn test_nat_traversal_config_default() {
3933 let config = NatTraversalConfig::default();
3934 assert_eq!(config.role, EndpointRole::Client);
3935 assert_eq!(config.max_candidates, 8);
3936 assert!(config.enable_symmetric_nat);
3937 assert!(config.enable_relay_fallback);
3938 }
3939
3940 #[test]
3941 fn test_peer_id_display() {
3942 let peer_id = PeerId([
3943 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
3944 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33,
3945 0x44, 0x55, 0x66, 0x77,
3946 ]);
3947 assert_eq!(format!("{peer_id}"), "0123456789abcdef");
3948 }
3949
3950 #[test]
3951 fn test_bootstrap_node_management() {
3952 let _config = NatTraversalConfig::default();
3953 }
3956
3957 #[test]
3958 fn test_candidate_address_validation() {
3959 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
3960
3961 assert!(
3963 CandidateAddress::validate_address(&SocketAddr::new(
3964 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3965 8080
3966 ))
3967 .is_ok()
3968 );
3969
3970 assert!(
3971 CandidateAddress::validate_address(&SocketAddr::new(
3972 IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)),
3973 53
3974 ))
3975 .is_ok()
3976 );
3977
3978 assert!(
3979 CandidateAddress::validate_address(&SocketAddr::new(
3980 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
3981 443
3982 ))
3983 .is_ok()
3984 );
3985
3986 assert!(matches!(
3988 CandidateAddress::validate_address(&SocketAddr::new(
3989 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3990 0
3991 )),
3992 Err(CandidateValidationError::InvalidPort(0))
3993 ));
3994
3995 #[cfg(not(test))]
3997 assert!(matches!(
3998 CandidateAddress::validate_address(&SocketAddr::new(
3999 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
4000 80
4001 )),
4002 Err(CandidateValidationError::PrivilegedPort(80))
4003 ));
4004
4005 assert!(matches!(
4007 CandidateAddress::validate_address(&SocketAddr::new(
4008 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
4009 8080
4010 )),
4011 Err(CandidateValidationError::UnspecifiedAddress)
4012 ));
4013
4014 assert!(matches!(
4015 CandidateAddress::validate_address(&SocketAddr::new(
4016 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
4017 8080
4018 )),
4019 Err(CandidateValidationError::UnspecifiedAddress)
4020 ));
4021
4022 assert!(matches!(
4024 CandidateAddress::validate_address(&SocketAddr::new(
4025 IpAddr::V4(Ipv4Addr::BROADCAST),
4026 8080
4027 )),
4028 Err(CandidateValidationError::BroadcastAddress)
4029 ));
4030
4031 assert!(matches!(
4033 CandidateAddress::validate_address(&SocketAddr::new(
4034 IpAddr::V4(Ipv4Addr::new(224, 0, 0, 1)),
4035 8080
4036 )),
4037 Err(CandidateValidationError::MulticastAddress)
4038 ));
4039
4040 assert!(matches!(
4041 CandidateAddress::validate_address(&SocketAddr::new(
4042 IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 1)),
4043 8080
4044 )),
4045 Err(CandidateValidationError::MulticastAddress)
4046 ));
4047
4048 assert!(matches!(
4050 CandidateAddress::validate_address(&SocketAddr::new(
4051 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
4052 8080
4053 )),
4054 Err(CandidateValidationError::ReservedAddress)
4055 ));
4056
4057 assert!(matches!(
4058 CandidateAddress::validate_address(&SocketAddr::new(
4059 IpAddr::V4(Ipv4Addr::new(240, 0, 0, 1)),
4060 8080
4061 )),
4062 Err(CandidateValidationError::ReservedAddress)
4063 ));
4064
4065 assert!(matches!(
4067 CandidateAddress::validate_address(&SocketAddr::new(
4068 IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1)),
4069 8080
4070 )),
4071 Err(CandidateValidationError::DocumentationAddress)
4072 ));
4073
4074 assert!(matches!(
4076 CandidateAddress::validate_address(&SocketAddr::new(
4077 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc0a8, 0x0001)),
4078 8080
4079 )),
4080 Err(CandidateValidationError::IPv4MappedAddress)
4081 ));
4082 }
4083
4084 #[test]
4085 fn test_candidate_address_suitability_for_nat_traversal() {
4086 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4087
4088 let public_v4 = CandidateAddress::new(
4090 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 8080),
4091 100,
4092 CandidateSource::Observed { by_node: None },
4093 )
4094 .unwrap();
4095 assert!(public_v4.is_suitable_for_nat_traversal());
4096
4097 let private_v4 = CandidateAddress::new(
4098 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4099 100,
4100 CandidateSource::Local,
4101 )
4102 .unwrap();
4103 assert!(private_v4.is_suitable_for_nat_traversal());
4104
4105 let link_local_v4 = CandidateAddress::new(
4107 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(169, 254, 1, 1)), 8080),
4108 100,
4109 CandidateSource::Local,
4110 )
4111 .unwrap();
4112 assert!(!link_local_v4.is_suitable_for_nat_traversal());
4113
4114 let global_v6 = CandidateAddress::new(
4116 SocketAddr::new(
4117 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4118 8080,
4119 ),
4120 100,
4121 CandidateSource::Observed { by_node: None },
4122 )
4123 .unwrap();
4124 assert!(global_v6.is_suitable_for_nat_traversal());
4125
4126 let link_local_v6 = CandidateAddress::new(
4128 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)), 8080),
4129 100,
4130 CandidateSource::Local,
4131 )
4132 .unwrap();
4133 assert!(!link_local_v6.is_suitable_for_nat_traversal());
4134
4135 let unique_local_v6 = CandidateAddress::new(
4137 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)), 8080),
4138 100,
4139 CandidateSource::Local,
4140 )
4141 .unwrap();
4142 assert!(!unique_local_v6.is_suitable_for_nat_traversal());
4143
4144 #[cfg(test)]
4146 {
4147 let loopback_v4 = CandidateAddress::new(
4148 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080),
4149 100,
4150 CandidateSource::Local,
4151 )
4152 .unwrap();
4153 assert!(loopback_v4.is_suitable_for_nat_traversal());
4154
4155 let loopback_v6 = CandidateAddress::new(
4156 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080),
4157 100,
4158 CandidateSource::Local,
4159 )
4160 .unwrap();
4161 assert!(loopback_v6.is_suitable_for_nat_traversal());
4162 }
4163 }
4164
4165 #[test]
4166 fn test_candidate_effective_priority() {
4167 use std::net::{IpAddr, Ipv4Addr};
4168
4169 let mut candidate = CandidateAddress::new(
4170 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4171 100,
4172 CandidateSource::Local,
4173 )
4174 .unwrap();
4175
4176 assert_eq!(candidate.effective_priority(), 90);
4178
4179 candidate.state = CandidateState::Validating;
4181 assert_eq!(candidate.effective_priority(), 95);
4182
4183 candidate.state = CandidateState::Valid;
4185 assert_eq!(candidate.effective_priority(), 100);
4186
4187 candidate.state = CandidateState::Failed;
4189 assert_eq!(candidate.effective_priority(), 0);
4190
4191 candidate.state = CandidateState::Removed;
4193 assert_eq!(candidate.effective_priority(), 0);
4194 }
4195}