1#![allow(missing_docs)]
8
9use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, time::Duration};
16
17#[allow(clippy::panic)]
36fn create_random_port_bind_addr() -> SocketAddr {
37 "0.0.0.0:0"
38 .parse()
39 .unwrap_or_else(|_| panic!("Random port bind address format is always valid"))
40}
41
42use tracing::{debug, error, info, warn};
43
44use std::sync::atomic::{AtomicBool, Ordering};
45
46use tokio::{
47 net::UdpSocket,
48 sync::mpsc,
49 time::{sleep, timeout},
50};
51
52use crate::high_level::default_runtime;
53
54use crate::{
55 VarInt,
56 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
57 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
58};
59
60use crate::{
61 ClientConfig, ConnectionError, EndpointConfig, ServerConfig, TransportConfig,
62 high_level::{Connection as QuinnConnection, Endpoint as QuinnEndpoint},
63};
64
65#[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
66use crate::{crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig};
67
68use crate::config::validation::{ConfigValidator, ValidationResult};
69
70#[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
71use crate::crypto::certificate_manager::{CertificateConfig, CertificateManager};
72
73pub struct NatTraversalEndpoint {
75 quinn_endpoint: Option<QuinnEndpoint>,
77 config: NatTraversalConfig,
81 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
83 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
85 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
87 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
89 shutdown: Arc<AtomicBool>,
91 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
93 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
95 local_peer_id: PeerId,
97 timeout_config: crate::config::nat_timeouts::TimeoutConfig,
99}
100
101#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
133pub struct NatTraversalConfig {
134 pub role: EndpointRole,
136 pub bootstrap_nodes: Vec<SocketAddr>,
138 pub max_candidates: usize,
140 pub coordination_timeout: Duration,
142 pub enable_symmetric_nat: bool,
144 pub enable_relay_fallback: bool,
146 pub max_concurrent_attempts: usize,
148 pub bind_addr: Option<SocketAddr>,
165 pub prefer_rfc_nat_traversal: bool,
168 pub timeouts: crate::config::nat_timeouts::TimeoutConfig,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
174pub enum EndpointRole {
175 Client,
177 Server {
179 can_coordinate: bool,
181 },
182 Bootstrap,
184}
185
186impl EndpointRole {
187 pub fn name(&self) -> &'static str {
189 match self {
190 Self::Client => "client",
191 Self::Server { .. } => "server",
192 Self::Bootstrap => "bootstrap",
193 }
194 }
195}
196
197#[derive(
199 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
200)]
201pub struct PeerId(pub [u8; 32]);
202
203#[derive(Debug, Clone)]
205pub struct BootstrapNode {
206 pub address: SocketAddr,
208 pub last_seen: std::time::Instant,
210 pub can_coordinate: bool,
212 pub rtt: Option<Duration>,
214 pub coordination_count: u32,
216}
217
218impl BootstrapNode {
219 pub fn new(address: SocketAddr) -> Self {
221 Self {
222 address,
223 last_seen: std::time::Instant::now(),
224 can_coordinate: true,
225 rtt: None,
226 coordination_count: 0,
227 }
228 }
229}
230
231#[derive(Debug, Clone)]
233pub struct CandidatePair {
234 pub local_candidate: CandidateAddress,
236 pub remote_candidate: CandidateAddress,
238 pub priority: u64,
240 pub state: CandidatePairState,
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246pub enum CandidatePairState {
247 Waiting,
249 InProgress,
251 Succeeded,
253 Failed,
255 Cancelled,
257}
258
259#[derive(Debug)]
261struct NatTraversalSession {
262 peer_id: PeerId,
264 #[allow(dead_code)]
266 coordinator: SocketAddr,
267 attempt: u32,
269 started_at: std::time::Instant,
271 phase: TraversalPhase,
273 candidates: Vec<CandidateAddress>,
275 session_state: SessionState,
277}
278
279#[derive(Debug, Clone)]
281pub struct SessionState {
282 pub state: ConnectionState,
284 pub last_transition: std::time::Instant,
286 pub connection: Option<QuinnConnection>,
288 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
290 pub metrics: ConnectionMetrics,
292}
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296pub enum ConnectionState {
297 Idle,
299 Connecting,
301 Connected,
303 Migrating,
305 Closed,
307}
308
309#[derive(Debug, Clone, Default)]
311pub struct ConnectionMetrics {
312 pub rtt: Option<Duration>,
314 pub loss_rate: f64,
316 pub bytes_sent: u64,
318 pub bytes_received: u64,
320 pub last_activity: Option<std::time::Instant>,
322}
323
324#[derive(Debug, Clone)]
326pub struct SessionStateUpdate {
327 pub peer_id: PeerId,
329 pub old_state: ConnectionState,
331 pub new_state: ConnectionState,
333 pub reason: StateChangeReason,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
339pub enum StateChangeReason {
340 Timeout,
342 ConnectionEstablished,
344 ConnectionClosed,
346 MigrationComplete,
348 MigrationFailed,
350 NetworkError,
352 UserClosed,
354}
355
356#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358pub enum TraversalPhase {
359 Discovery,
361 Coordination,
363 Synchronization,
365 Punching,
367 Validation,
369 Connected,
371 Failed,
373}
374
375#[derive(Debug, Clone, Copy)]
377enum SessionUpdate {
378 Timeout,
380 Disconnected,
382 UpdateMetrics,
384 InvalidState,
386 Retry,
388 MigrationTimeout,
390 Remove,
392}
393
394#[derive(Debug, Clone)]
396pub struct CandidateAddress {
397 pub address: SocketAddr,
399 pub priority: u32,
401 pub source: CandidateSource,
403 pub state: CandidateState,
405}
406
407impl CandidateAddress {
408 pub fn new(
410 address: SocketAddr,
411 priority: u32,
412 source: CandidateSource,
413 ) -> Result<Self, CandidateValidationError> {
414 Self::validate_address(&address)?;
415 Ok(Self {
416 address,
417 priority,
418 source,
419 state: CandidateState::New,
420 })
421 }
422
423 pub fn validate_address(addr: &SocketAddr) -> Result<(), CandidateValidationError> {
425 if addr.port() == 0 {
427 return Err(CandidateValidationError::InvalidPort(0));
428 }
429
430 #[cfg(not(test))]
432 if addr.port() < 1024 {
433 return Err(CandidateValidationError::PrivilegedPort(addr.port()));
434 }
435
436 match addr.ip() {
437 std::net::IpAddr::V4(ipv4) => {
438 if ipv4.is_unspecified() {
440 return Err(CandidateValidationError::UnspecifiedAddress);
441 }
442 if ipv4.is_broadcast() {
443 return Err(CandidateValidationError::BroadcastAddress);
444 }
445 if ipv4.is_multicast() {
446 return Err(CandidateValidationError::MulticastAddress);
447 }
448 if ipv4.octets()[0] == 0 {
450 return Err(CandidateValidationError::ReservedAddress);
451 }
452 if ipv4.octets()[0] >= 240 {
454 return Err(CandidateValidationError::ReservedAddress);
455 }
456 }
457 std::net::IpAddr::V6(ipv6) => {
458 if ipv6.is_unspecified() {
460 return Err(CandidateValidationError::UnspecifiedAddress);
461 }
462 if ipv6.is_multicast() {
463 return Err(CandidateValidationError::MulticastAddress);
464 }
465 let segments = ipv6.segments();
467 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
468 return Err(CandidateValidationError::DocumentationAddress);
469 }
470 if ipv6.to_ipv4_mapped().is_some() {
472 return Err(CandidateValidationError::IPv4MappedAddress);
473 }
474 }
475 }
476
477 Ok(())
478 }
479
480 pub fn is_suitable_for_nat_traversal(&self) -> bool {
482 match self.address.ip() {
483 std::net::IpAddr::V4(ipv4) => {
484 #[cfg(test)]
489 if ipv4.is_loopback() {
490 return true;
491 }
492 !ipv4.is_loopback()
493 && !ipv4.is_link_local()
494 && !ipv4.is_multicast()
495 && !ipv4.is_broadcast()
496 }
497 std::net::IpAddr::V6(ipv6) => {
498 #[cfg(test)]
504 if ipv6.is_loopback() {
505 return true;
506 }
507 let segments = ipv6.segments();
508 let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
509 let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
510
511 !ipv6.is_loopback() && !is_link_local && !is_unique_local && !ipv6.is_multicast()
512 }
513 }
514 }
515
516 pub fn effective_priority(&self) -> u32 {
518 match self.state {
519 CandidateState::Valid => self.priority,
520 CandidateState::New => self.priority.saturating_sub(10),
521 CandidateState::Validating => self.priority.saturating_sub(5),
522 CandidateState::Failed => 0,
523 CandidateState::Removed => 0,
524 }
525 }
526}
527
528#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
530pub enum CandidateValidationError {
531 #[error("invalid port number: {0}")]
533 InvalidPort(u16),
534 #[error("privileged port not allowed: {0}")]
536 PrivilegedPort(u16),
537 #[error("unspecified address not allowed")]
539 UnspecifiedAddress,
540 #[error("broadcast address not allowed")]
542 BroadcastAddress,
543 #[error("multicast address not allowed")]
545 MulticastAddress,
546 #[error("reserved address not allowed")]
548 ReservedAddress,
549 #[error("documentation address not allowed")]
551 DocumentationAddress,
552 #[error("IPv4-mapped IPv6 address not allowed")]
554 IPv4MappedAddress,
555}
556
557#[derive(Debug, Clone)]
559pub enum NatTraversalEvent {
560 CandidateDiscovered {
562 peer_id: PeerId,
564 candidate: CandidateAddress,
566 },
567 CoordinationRequested {
569 peer_id: PeerId,
571 coordinator: SocketAddr,
573 },
574 CoordinationSynchronized {
576 peer_id: PeerId,
578 round_id: VarInt,
580 },
581 HolePunchingStarted {
583 peer_id: PeerId,
585 targets: Vec<SocketAddr>,
587 },
588 PathValidated {
590 peer_id: PeerId,
592 address: SocketAddr,
594 rtt: Duration,
596 },
597 CandidateValidated {
599 peer_id: PeerId,
601 candidate_address: SocketAddr,
603 },
604 TraversalSucceeded {
606 peer_id: PeerId,
608 final_address: SocketAddr,
610 total_time: Duration,
612 },
613 ConnectionEstablished {
615 peer_id: PeerId,
616 remote_address: SocketAddr,
618 },
619 TraversalFailed {
621 peer_id: PeerId,
623 error: NatTraversalError,
625 fallback_available: bool,
627 },
628 ConnectionLost {
630 peer_id: PeerId,
632 reason: String,
634 },
635 PhaseTransition {
637 peer_id: PeerId,
639 from_phase: TraversalPhase,
641 to_phase: TraversalPhase,
643 },
644 SessionStateChanged {
646 peer_id: PeerId,
648 new_state: ConnectionState,
650 },
651}
652
653#[derive(Debug, Clone)]
655pub enum NatTraversalError {
656 NoBootstrapNodes,
658 NoCandidatesFound,
660 CandidateDiscoveryFailed(String),
662 CoordinationFailed(String),
664 HolePunchingFailed,
666 PunchingFailed(String),
668 ValidationFailed(String),
670 ValidationTimeout,
672 NetworkError(String),
674 ConfigError(String),
676 ProtocolError(String),
678 Timeout,
680 ConnectionFailed(String),
682 TraversalFailed(String),
684 PeerNotConnected,
686}
687
688impl Default for NatTraversalConfig {
689 fn default() -> Self {
690 Self {
691 role: EndpointRole::Client,
692 bootstrap_nodes: Vec::new(),
693 max_candidates: 8,
694 coordination_timeout: Duration::from_secs(10),
695 enable_symmetric_nat: true,
696 enable_relay_fallback: true,
697 max_concurrent_attempts: 3,
698 bind_addr: None,
699 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
701 }
702 }
703}
704
705impl ConfigValidator for NatTraversalConfig {
706 fn validate(&self) -> ValidationResult<()> {
707 use crate::config::validation::*;
708
709 match self.role {
711 EndpointRole::Client => {
712 if self.bootstrap_nodes.is_empty() {
713 return Err(ConfigValidationError::InvalidRole(
714 "Client endpoints require at least one bootstrap node".to_string(),
715 ));
716 }
717 }
718 EndpointRole::Server { can_coordinate } => {
719 if can_coordinate && self.bootstrap_nodes.is_empty() {
720 return Err(ConfigValidationError::InvalidRole(
721 "Server endpoints with coordination capability require bootstrap nodes"
722 .to_string(),
723 ));
724 }
725 }
726 EndpointRole::Bootstrap => {
727 }
729 }
730
731 if !self.bootstrap_nodes.is_empty() {
733 validate_bootstrap_nodes(&self.bootstrap_nodes)?;
734 }
735
736 validate_range(self.max_candidates, 1, 256, "max_candidates")?;
738
739 validate_duration(
741 self.coordination_timeout,
742 Duration::from_millis(100),
743 Duration::from_secs(300),
744 "coordination_timeout",
745 )?;
746
747 validate_range(
749 self.max_concurrent_attempts,
750 1,
751 16,
752 "max_concurrent_attempts",
753 )?;
754
755 if self.max_concurrent_attempts > self.max_candidates {
757 return Err(ConfigValidationError::IncompatibleConfiguration(
758 "max_concurrent_attempts cannot exceed max_candidates".to_string(),
759 ));
760 }
761
762 if self.role == EndpointRole::Bootstrap && self.enable_relay_fallback {
763 return Err(ConfigValidationError::IncompatibleConfiguration(
764 "Bootstrap nodes should not enable relay fallback".to_string(),
765 ));
766 }
767
768 Ok(())
769 }
770}
771
772impl NatTraversalEndpoint {
773 pub async fn new(
775 config: NatTraversalConfig,
776 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
777 ) -> Result<Self, NatTraversalError> {
778 Self::new_impl(config, event_callback).await
779 }
780
781 async fn new_impl(
783 config: NatTraversalConfig,
784 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
785 ) -> Result<Self, NatTraversalError> {
786 Self::new_common(config, event_callback).await
787 }
788
789 async fn new_common(
791 config: NatTraversalConfig,
792 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
793 ) -> Result<Self, NatTraversalError> {
794 Self::new_shared_logic(config, event_callback).await
796 }
797
798 async fn new_shared_logic(
800 config: NatTraversalConfig,
801 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
802 ) -> Result<Self, NatTraversalError> {
803 {
806 config
807 .validate()
808 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
809 }
810
811 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
815 config
816 .bootstrap_nodes
817 .iter()
818 .map(|&address| BootstrapNode {
819 address,
820 last_seen: std::time::Instant::now(),
821 can_coordinate: true, rtt: None,
823 coordination_count: 0,
824 })
825 .collect(),
826 ));
827
828 let discovery_config = DiscoveryConfig {
830 total_timeout: config.coordination_timeout,
831 max_candidates: config.max_candidates,
832 enable_symmetric_prediction: config.enable_symmetric_nat,
833 bound_address: config.bind_addr, ..DiscoveryConfig::default()
835 };
836
837 let nat_traversal_role = match config.role {
838 EndpointRole::Client => NatTraversalRole::Client,
839 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server {
840 can_relay: can_coordinate,
841 },
842 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
843 };
844
845 let discovery_manager = Arc::new(std::sync::Mutex::new(CandidateDiscoveryManager::new(
846 discovery_config,
847 )));
848
849 let (quinn_endpoint, event_tx, local_addr) =
852 Self::create_quinn_endpoint(&config, nat_traversal_role).await?;
853
854 {
856 let mut discovery = discovery_manager.lock().map_err(|_| {
857 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
858 })?;
859 discovery.set_bound_address(local_addr);
860 info!(
861 "Updated discovery manager with bound address: {}",
862 local_addr
863 );
864 }
865
866 let endpoint = Self {
867 quinn_endpoint: Some(quinn_endpoint.clone()),
868 config: config.clone(),
869 bootstrap_nodes,
870 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
871 discovery_manager,
872 event_callback,
873 shutdown: Arc::new(AtomicBool::new(false)),
874 event_tx: Some(event_tx.clone()),
875 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
876 local_peer_id: Self::generate_local_peer_id(),
877 timeout_config: config.timeouts.clone(),
878 };
879
880 if matches!(
882 config.role,
883 EndpointRole::Bootstrap | EndpointRole::Server { .. }
884 ) {
885 let endpoint_clone = quinn_endpoint.clone();
886 let shutdown_clone = endpoint.shutdown.clone();
887 let event_tx_clone = event_tx.clone();
888 let connections_clone = endpoint.connections.clone();
889
890 tokio::spawn(async move {
891 Self::accept_connections(
892 endpoint_clone,
893 shutdown_clone,
894 event_tx_clone,
895 connections_clone,
896 )
897 .await;
898 });
899
900 info!("Started accepting connections for {:?} role", config.role);
901 }
902
903 let discovery_manager_clone = endpoint.discovery_manager.clone();
905 let shutdown_clone = endpoint.shutdown.clone();
906 let event_tx_clone = event_tx;
907
908 tokio::spawn(async move {
909 Self::poll_discovery(discovery_manager_clone, shutdown_clone, event_tx_clone).await;
910 });
911
912 info!("Started discovery polling task");
913
914 {
916 let mut discovery = endpoint.discovery_manager.lock().map_err(|_| {
917 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
918 })?;
919
920 let local_peer_id = endpoint.local_peer_id;
922 let bootstrap_nodes = {
923 let nodes = endpoint.bootstrap_nodes.read().map_err(|_| {
924 NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string())
925 })?;
926 nodes.clone()
927 };
928
929 discovery
930 .start_discovery(local_peer_id, bootstrap_nodes)
931 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
932
933 info!(
934 "Started local candidate discovery for peer {:?}",
935 local_peer_id
936 );
937 }
938
939 Ok(endpoint)
940 }
941
942 pub fn get_quinn_endpoint(&self) -> Option<&crate::high_level::Endpoint> {
944 self.quinn_endpoint.as_ref()
945 }
946
947 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
949 self.event_callback.as_ref()
950 }
951
952 pub fn initiate_nat_traversal(
954 &self,
955 peer_id: PeerId,
956 coordinator: SocketAddr,
957 ) -> Result<(), NatTraversalError> {
958 info!(
959 "Starting NAT traversal to peer {:?} via coordinator {}",
960 peer_id, coordinator
961 );
962
963 let session = NatTraversalSession {
965 peer_id,
966 coordinator,
967 attempt: 1,
968 started_at: std::time::Instant::now(),
969 phase: TraversalPhase::Discovery,
970 candidates: Vec::new(),
971 session_state: SessionState {
972 state: ConnectionState::Connecting,
973 last_transition: std::time::Instant::now(),
974
975 connection: None,
976 active_attempts: Vec::new(),
977 metrics: ConnectionMetrics::default(),
978 },
979 };
980
981 {
983 let mut sessions = self
984 .active_sessions
985 .write()
986 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
987 sessions.insert(peer_id, session);
988 }
989
990 let bootstrap_nodes_vec = {
992 let bootstrap_nodes = self
993 .bootstrap_nodes
994 .read()
995 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
996 bootstrap_nodes.clone()
997 };
998
999 {
1000 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1001 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1002 })?;
1003
1004 discovery
1005 .start_discovery(peer_id, bootstrap_nodes_vec)
1006 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1007 }
1008
1009 if let Some(ref callback) = self.event_callback {
1011 callback(NatTraversalEvent::CoordinationRequested {
1012 peer_id,
1013 coordinator,
1014 });
1015 }
1016
1017 Ok(())
1019 }
1020
1021 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
1023 let mut updates = Vec::new();
1024 let now = std::time::Instant::now();
1025
1026 let mut sessions = self
1027 .active_sessions
1028 .write()
1029 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
1030
1031 for (peer_id, session) in sessions.iter_mut() {
1032 let mut state_changed = false;
1033
1034 match session.session_state.state {
1035 ConnectionState::Connecting => {
1036 let elapsed = now.duration_since(session.session_state.last_transition);
1038 if elapsed
1039 > self
1040 .timeout_config
1041 .nat_traversal
1042 .connection_establishment_timeout
1043 {
1044 session.session_state.state = ConnectionState::Closed;
1045 session.session_state.last_transition = now;
1046 state_changed = true;
1047
1048 updates.push(SessionStateUpdate {
1049 peer_id: *peer_id,
1050 old_state: ConnectionState::Connecting,
1051 new_state: ConnectionState::Closed,
1052 reason: StateChangeReason::Timeout,
1053 });
1054 }
1055
1056 if let Some(ref _connection) = session.session_state.connection {
1059 session.session_state.state = ConnectionState::Connected;
1060 session.session_state.last_transition = now;
1061 state_changed = true;
1062
1063 updates.push(SessionStateUpdate {
1064 peer_id: *peer_id,
1065 old_state: ConnectionState::Connecting,
1066 new_state: ConnectionState::Connected,
1067 reason: StateChangeReason::ConnectionEstablished,
1068 });
1069 }
1070 }
1071 ConnectionState::Connected => {
1072 {
1075 }
1078
1079 session.session_state.metrics.last_activity = Some(now);
1081 }
1082 ConnectionState::Migrating => {
1083 let elapsed = now.duration_since(session.session_state.last_transition);
1085 if elapsed > Duration::from_secs(10) {
1086 if session.session_state.connection.is_some() {
1089 session.session_state.state = ConnectionState::Connected;
1090 state_changed = true;
1091
1092 updates.push(SessionStateUpdate {
1093 peer_id: *peer_id,
1094 old_state: ConnectionState::Migrating,
1095 new_state: ConnectionState::Connected,
1096 reason: StateChangeReason::MigrationComplete,
1097 });
1098 } else {
1099 session.session_state.state = ConnectionState::Closed;
1100 state_changed = true;
1101
1102 updates.push(SessionStateUpdate {
1103 peer_id: *peer_id,
1104 old_state: ConnectionState::Migrating,
1105 new_state: ConnectionState::Closed,
1106 reason: StateChangeReason::MigrationFailed,
1107 });
1108 }
1109
1110 session.session_state.last_transition = now;
1111 }
1112 }
1113 _ => {}
1114 }
1115
1116 if state_changed {
1118 if let Some(ref callback) = self.event_callback {
1119 callback(NatTraversalEvent::SessionStateChanged {
1120 peer_id: *peer_id,
1121 new_state: session.session_state.state,
1122 });
1123 }
1124 }
1125 }
1126
1127 Ok(updates)
1128 }
1129
1130 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
1132 let sessions = self.active_sessions.clone();
1133 let shutdown = self.shutdown.clone();
1134 let timeout_config = self.timeout_config.clone();
1135
1136 tokio::spawn(async move {
1137 let mut ticker = tokio::time::interval(interval);
1138
1139 loop {
1140 ticker.tick().await;
1141
1142 if shutdown.load(Ordering::Relaxed) {
1143 break;
1144 }
1145
1146 let sessions_to_update = {
1148 match sessions.read() {
1149 Ok(sessions_guard) => {
1150 sessions_guard
1151 .iter()
1152 .filter_map(|(peer_id, session)| {
1153 let now = std::time::Instant::now();
1154 let elapsed =
1155 now.duration_since(session.session_state.last_transition);
1156
1157 match session.session_state.state {
1158 ConnectionState::Connecting => {
1159 if elapsed
1161 > timeout_config
1162 .nat_traversal
1163 .connection_establishment_timeout
1164 {
1165 Some((*peer_id, SessionUpdate::Timeout))
1166 } else {
1167 None
1168 }
1169 }
1170 ConnectionState::Connected => {
1171 if let Some(ref conn) = session.session_state.connection
1173 {
1174 if conn.close_reason().is_some() {
1175 Some((*peer_id, SessionUpdate::Disconnected))
1176 } else {
1177 Some((*peer_id, SessionUpdate::UpdateMetrics))
1179 }
1180 } else {
1181 Some((*peer_id, SessionUpdate::InvalidState))
1182 }
1183 }
1184 ConnectionState::Idle => {
1185 if elapsed
1187 > timeout_config
1188 .discovery
1189 .server_reflexive_cache_ttl
1190 {
1191 Some((*peer_id, SessionUpdate::Retry))
1192 } else {
1193 None
1194 }
1195 }
1196 ConnectionState::Migrating => {
1197 if elapsed > timeout_config.nat_traversal.probe_timeout
1199 {
1200 Some((*peer_id, SessionUpdate::MigrationTimeout))
1201 } else {
1202 None
1203 }
1204 }
1205 ConnectionState::Closed => {
1206 if elapsed
1208 > timeout_config.discovery.interface_cache_ttl
1209 {
1210 Some((*peer_id, SessionUpdate::Remove))
1211 } else {
1212 None
1213 }
1214 }
1215 }
1216 })
1217 .collect::<Vec<_>>()
1218 }
1219 _ => {
1220 vec![]
1221 }
1222 }
1223 };
1224
1225 if !sessions_to_update.is_empty() {
1227 if let Ok(mut sessions_guard) = sessions.write() {
1228 for (peer_id, update) in sessions_to_update {
1229 match update {
1230 SessionUpdate::Timeout => {
1231 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1232 session.session_state.state = ConnectionState::Closed;
1233 session.session_state.last_transition =
1234 std::time::Instant::now();
1235 tracing::warn!("Connection to {:?} timed out", peer_id);
1236 }
1237 }
1238 SessionUpdate::Disconnected => {
1239 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1240 session.session_state.state = ConnectionState::Closed;
1241 session.session_state.last_transition =
1242 std::time::Instant::now();
1243 session.session_state.connection = None;
1244 tracing::info!("Connection to {:?} closed", peer_id);
1245 }
1246 }
1247 SessionUpdate::UpdateMetrics => {
1248 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1249 if let Some(ref conn) = session.session_state.connection {
1250 let stats = conn.stats();
1252 session.session_state.metrics.rtt =
1253 Some(stats.path.rtt);
1254 session.session_state.metrics.loss_rate =
1255 stats.path.lost_packets as f64
1256 / stats.path.sent_packets.max(1) as f64;
1257 }
1258 }
1259 }
1260 SessionUpdate::InvalidState => {
1261 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1262 session.session_state.state = ConnectionState::Closed;
1263 session.session_state.last_transition =
1264 std::time::Instant::now();
1265 tracing::error!("Session {:?} in invalid state", peer_id);
1266 }
1267 }
1268 SessionUpdate::Retry => {
1269 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1270 session.session_state.state = ConnectionState::Connecting;
1271 session.session_state.last_transition =
1272 std::time::Instant::now();
1273 session.attempt += 1;
1274 tracing::info!(
1275 "Retrying connection to {:?} (attempt {})",
1276 peer_id,
1277 session.attempt
1278 );
1279 }
1280 }
1281 SessionUpdate::MigrationTimeout => {
1282 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1283 session.session_state.state = ConnectionState::Closed;
1284 session.session_state.last_transition =
1285 std::time::Instant::now();
1286 tracing::warn!("Migration timeout for {:?}", peer_id);
1287 }
1288 }
1289 SessionUpdate::Remove => {
1290 sessions_guard.remove(&peer_id);
1291 tracing::debug!("Removed old session for {:?}", peer_id);
1292 }
1293 }
1294 }
1295 }
1296 }
1297 }
1298 })
1299 }
1300
1301 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
1305 let sessions = self
1306 .active_sessions
1307 .read()
1308 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1309 let bootstrap_nodes = self
1310 .bootstrap_nodes
1311 .read()
1312 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1313
1314 let avg_coordination_time = {
1316 let rtts: Vec<Duration> = bootstrap_nodes.iter().filter_map(|b| b.rtt).collect();
1317
1318 if rtts.is_empty() {
1319 Duration::from_millis(500) } else {
1321 let total_millis: u64 = rtts.iter().map(|d| d.as_millis() as u64).sum();
1322 Duration::from_millis(total_millis / rtts.len() as u64 * 2) }
1324 };
1325
1326 Ok(NatTraversalStatistics {
1327 active_sessions: sessions.len(),
1328 total_bootstrap_nodes: bootstrap_nodes.len(),
1329 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
1330 average_coordination_time: avg_coordination_time,
1331 total_attempts: 0,
1332 successful_connections: 0,
1333 direct_connections: 0,
1334 relayed_connections: 0,
1335 })
1336 }
1337
1338 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1340 let mut bootstrap_nodes = self
1341 .bootstrap_nodes
1342 .write()
1343 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1344
1345 if !bootstrap_nodes.iter().any(|b| b.address == address) {
1347 bootstrap_nodes.push(BootstrapNode {
1348 address,
1349 last_seen: std::time::Instant::now(),
1350 can_coordinate: true,
1351 rtt: None,
1352 coordination_count: 0,
1353 });
1354 info!("Added bootstrap node: {}", address);
1355 }
1356 Ok(())
1357 }
1358
1359 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1361 let mut bootstrap_nodes = self
1362 .bootstrap_nodes
1363 .write()
1364 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1365 bootstrap_nodes.retain(|b| b.address != address);
1366 info!("Removed bootstrap node: {}", address);
1367 Ok(())
1368 }
1369
1370 async fn create_quinn_endpoint(
1374 config: &NatTraversalConfig,
1375 _nat_role: NatTraversalRole,
1376 ) -> Result<
1377 (
1378 QuinnEndpoint,
1379 mpsc::UnboundedSender<NatTraversalEvent>,
1380 SocketAddr,
1381 ),
1382 NatTraversalError,
1383 > {
1384 use std::sync::Arc;
1385
1386 let server_config = match config.role {
1388 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1389 let cert_config = CertificateConfig {
1391 common_name: format!("ant-quic-{}", config.role.name()),
1392 subject_alt_names: vec!["localhost".to_string(), "ant-quic-node".to_string()],
1393 self_signed: true, ..CertificateConfig::default()
1395 };
1396
1397 let cert_manager = CertificateManager::new(cert_config).map_err(|e| {
1398 NatTraversalError::ConfigError(format!(
1399 "Certificate manager creation failed: {e}"
1400 ))
1401 })?;
1402
1403 let cert_bundle = cert_manager.generate_certificate().map_err(|e| {
1404 NatTraversalError::ConfigError(format!("Certificate generation failed: {e}"))
1405 })?;
1406
1407 let rustls_config =
1408 cert_manager
1409 .create_server_config(&cert_bundle)
1410 .map_err(|e| {
1411 NatTraversalError::ConfigError(format!(
1412 "Server config creation failed: {e}"
1413 ))
1414 })?;
1415
1416 let server_crypto = QuicServerConfig::try_from(rustls_config.as_ref().clone())
1417 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1418
1419 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
1420
1421 let mut transport_config = TransportConfig::default();
1423 transport_config
1424 .keep_alive_interval(Some(config.timeouts.nat_traversal.retry_interval));
1425 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1426
1427 let nat_config = match config.role {
1432 EndpointRole::Client => {
1433 crate::transport_parameters::NatTraversalConfig::ClientSupport
1434 }
1435 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1436 crate::transport_parameters::NatTraversalConfig::ServerSupport {
1437 concurrency_limit: VarInt::from_u32(
1438 config.max_concurrent_attempts as u32,
1439 ),
1440 }
1441 }
1442 };
1443 transport_config.nat_traversal_config(Some(nat_config));
1444
1445 server_config.transport_config(Arc::new(transport_config));
1446
1447 Some(server_config)
1448 }
1449 _ => None,
1450 };
1451
1452 let client_config = {
1454 let cert_config = CertificateConfig {
1455 common_name: format!("ant-quic-{}", config.role.name()),
1456 subject_alt_names: vec!["localhost".to_string(), "ant-quic-node".to_string()],
1457 self_signed: true,
1458 ..CertificateConfig::default()
1459 };
1460
1461 let cert_manager = CertificateManager::new(cert_config).map_err(|e| {
1462 NatTraversalError::ConfigError(format!("Certificate manager creation failed: {e}"))
1463 })?;
1464
1465 let _cert_bundle = cert_manager.generate_certificate().map_err(|e| {
1466 NatTraversalError::ConfigError(format!("Certificate generation failed: {e}"))
1467 })?;
1468
1469 let rustls_config = cert_manager.create_client_config().map_err(|e| {
1470 NatTraversalError::ConfigError(format!("Client config creation failed: {e}"))
1471 })?;
1472
1473 let client_crypto = QuicClientConfig::try_from(rustls_config.as_ref().clone())
1474 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1475
1476 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
1477
1478 let mut transport_config = TransportConfig::default();
1480 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
1481 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1482
1483 let nat_config = match config.role {
1488 EndpointRole::Client => {
1489 crate::transport_parameters::NatTraversalConfig::ClientSupport
1490 }
1491 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1492 crate::transport_parameters::NatTraversalConfig::ServerSupport {
1493 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1494 }
1495 }
1496 };
1497 transport_config.nat_traversal_config(Some(nat_config));
1498
1499 client_config.transport_config(Arc::new(transport_config));
1500
1501 client_config
1502 };
1503
1504 let bind_addr = config
1506 .bind_addr
1507 .unwrap_or_else(create_random_port_bind_addr);
1508 let socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1509 NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {e}"))
1510 })?;
1511
1512 info!("Binding endpoint to {}", bind_addr);
1513
1514 let std_socket = socket.into_std().map_err(|e| {
1516 NatTraversalError::NetworkError(format!("Failed to convert socket: {e}"))
1517 })?;
1518
1519 let runtime = default_runtime().ok_or_else(|| {
1521 NatTraversalError::ConfigError("No compatible async runtime found".to_string())
1522 })?;
1523
1524 let mut endpoint = QuinnEndpoint::new(
1525 EndpointConfig::default(),
1526 server_config,
1527 std_socket,
1528 runtime,
1529 )
1530 .map_err(|e| {
1531 NatTraversalError::ConfigError(format!("Failed to create Quinn endpoint: {e}"))
1532 })?;
1533
1534 endpoint.set_default_client_config(client_config);
1536
1537 let local_addr = endpoint.local_addr().map_err(|e| {
1539 NatTraversalError::NetworkError(format!("Failed to get local address: {e}"))
1540 })?;
1541
1542 info!("Endpoint bound to actual address: {}", local_addr);
1543
1544 let (event_tx, _event_rx) = mpsc::unbounded_channel();
1546
1547 Ok((endpoint, event_tx, local_addr))
1548 }
1549
1550 #[allow(clippy::panic)]
1552 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1553 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1554 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1555 })?;
1556
1557 let _socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1559 NatTraversalError::NetworkError(format!("Failed to bind to {bind_addr}: {e}"))
1560 })?;
1561
1562 info!("Started listening on {}", bind_addr);
1563
1564 let endpoint_clone = endpoint.clone();
1566 let shutdown_clone = self.shutdown.clone();
1567 let event_tx = self
1568 .event_tx
1569 .as_ref()
1570 .unwrap_or_else(|| panic!("event transmitter should be initialized"))
1571 .clone();
1572 let connections_clone = self.connections.clone();
1573
1574 tokio::spawn(async move {
1575 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx, connections_clone)
1576 .await;
1577 });
1578
1579 Ok(())
1580 }
1581
1582 async fn accept_connections(
1584 endpoint: QuinnEndpoint,
1585 shutdown: Arc<AtomicBool>,
1586 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1587 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
1588 ) {
1589 while !shutdown.load(Ordering::Relaxed) {
1590 match endpoint.accept().await {
1591 Some(connecting) => {
1592 let event_tx = event_tx.clone();
1593 let connections = connections.clone();
1594 tokio::spawn(async move {
1595 match connecting.await {
1596 Ok(connection) => {
1597 info!("Accepted connection from {}", connection.remote_address());
1598
1599 let peer_id = Self::generate_peer_id_from_address(
1601 connection.remote_address(),
1602 );
1603
1604 if let Ok(mut conns) = connections.write() {
1606 conns.insert(peer_id, connection.clone());
1607 }
1608
1609 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1610 peer_id,
1611 remote_address: connection.remote_address(),
1612 });
1613
1614 Self::handle_connection(connection, event_tx).await;
1616 }
1617 Err(e) => {
1618 debug!("Connection failed: {}", e);
1619 }
1620 }
1621 });
1622 }
1623 None => {
1624 break;
1626 }
1627 }
1628 }
1629 }
1630
1631 async fn poll_discovery(
1633 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1634 shutdown: Arc<AtomicBool>,
1635 _event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1636 ) {
1637 use tokio::time::{Duration, interval};
1638
1639 let mut poll_interval = interval(Duration::from_millis(100));
1640
1641 while !shutdown.load(Ordering::Relaxed) {
1642 poll_interval.tick().await;
1643
1644 let events = match discovery_manager.lock() {
1646 Ok(mut discovery) => discovery.poll(std::time::Instant::now()),
1647 Err(e) => {
1648 error!("Failed to lock discovery manager: {}", e);
1649 continue;
1650 }
1651 };
1652
1653 for event in events {
1655 match event {
1656 DiscoveryEvent::DiscoveryStarted {
1657 peer_id,
1658 bootstrap_count,
1659 } => {
1660 debug!(
1661 "Discovery started for peer {:?} with {} bootstrap nodes",
1662 peer_id, bootstrap_count
1663 );
1664 }
1665 DiscoveryEvent::LocalScanningStarted => {
1666 debug!("Local interface scanning started");
1667 }
1668 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1669 debug!("Discovered local candidate: {}", candidate.address);
1670 }
1673 DiscoveryEvent::LocalScanningCompleted {
1674 candidate_count,
1675 duration,
1676 } => {
1677 debug!(
1678 "Local interface scanning completed: {} candidates in {:?}",
1679 candidate_count, duration
1680 );
1681 }
1682 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1683 debug!(
1684 "Server reflexive discovery started with {} bootstrap nodes",
1685 bootstrap_count
1686 );
1687 }
1688 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1689 candidate,
1690 bootstrap_node,
1691 } => {
1692 debug!(
1693 "Discovered server-reflexive candidate {} via bootstrap {}",
1694 candidate.address, bootstrap_node
1695 );
1696 }
1698 DiscoveryEvent::BootstrapQueryFailed {
1699 bootstrap_node,
1700 error,
1701 } => {
1702 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1703 }
1704 DiscoveryEvent::PortAllocationDetected {
1706 port,
1707 source_address,
1708 bootstrap_node,
1709 timestamp,
1710 } => {
1711 debug!(
1712 "Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1713 port, source_address, bootstrap_node, timestamp
1714 );
1715 }
1716 DiscoveryEvent::DiscoveryCompleted {
1717 candidate_count,
1718 total_duration,
1719 success_rate,
1720 } => {
1721 info!(
1722 "Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1723 candidate_count,
1724 total_duration,
1725 success_rate * 100.0
1726 );
1727 }
1730 DiscoveryEvent::DiscoveryFailed {
1731 error,
1732 partial_results,
1733 } => {
1734 warn!(
1735 "Discovery failed: {} (found {} partial candidates)",
1736 error,
1737 partial_results.len()
1738 );
1739
1740 }
1745 DiscoveryEvent::PathValidationRequested {
1746 candidate_id,
1747 candidate_address,
1748 challenge_token,
1749 } => {
1750 debug!(
1751 "PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1752 candidate_id.0, candidate_address, challenge_token
1753 );
1754 }
1757 DiscoveryEvent::PathValidationResponse {
1758 candidate_id,
1759 candidate_address,
1760 challenge_token: _,
1761 rtt,
1762 } => {
1763 debug!(
1764 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1765 candidate_id.0, candidate_address, rtt
1766 );
1767 }
1769 }
1770 }
1771 }
1772
1773 info!("Discovery polling task shutting down");
1774 }
1775
1776 async fn handle_connection(
1778 connection: QuinnConnection,
1779 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1780 ) {
1781 let peer_id = Self::generate_peer_id_from_address(connection.remote_address());
1782 let remote_address = connection.remote_address();
1783
1784 debug!(
1785 "Handling connection from peer {:?} at {}",
1786 peer_id, remote_address
1787 );
1788
1789 loop {
1791 tokio::select! {
1792 stream = connection.accept_bi() => {
1793 match stream {
1794 Ok((send, recv)) => {
1795 tokio::spawn(async move {
1796 Self::handle_bi_stream(send, recv).await;
1797 });
1798 }
1799 Err(e) => {
1800 debug!("Error accepting bidirectional stream: {}", e);
1801 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1802 peer_id,
1803 reason: format!("Stream error: {e}"),
1804 });
1805 break;
1806 }
1807 }
1808 }
1809 stream = connection.accept_uni() => {
1810 match stream {
1811 Ok(recv) => {
1812 tokio::spawn(async move {
1813 Self::handle_uni_stream(recv).await;
1814 });
1815 }
1816 Err(e) => {
1817 debug!("Error accepting unidirectional stream: {}", e);
1818 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1819 peer_id,
1820 reason: format!("Stream error: {e}"),
1821 });
1822 break;
1823 }
1824 }
1825 }
1826 }
1827 }
1828 }
1829
1830 async fn handle_bi_stream(
1832 _send: crate::high_level::SendStream,
1833 _recv: crate::high_level::RecvStream,
1834 ) {
1835 }
1864
1865 async fn handle_uni_stream(mut recv: crate::high_level::RecvStream) {
1867 let mut buffer = vec![0u8; 1024];
1868
1869 loop {
1870 match recv.read(&mut buffer).await {
1871 Ok(Some(size)) => {
1872 debug!("Received {} bytes on unidirectional stream", size);
1873 }
1875 Ok(None) => {
1876 debug!("Unidirectional stream closed by peer");
1877 break;
1878 }
1879 Err(e) => {
1880 debug!("Error reading from unidirectional stream: {}", e);
1881 break;
1882 }
1883 }
1884 }
1885 }
1886
1887 pub async fn connect_to_peer(
1889 &self,
1890 peer_id: PeerId,
1891 server_name: &str,
1892 remote_addr: SocketAddr,
1893 ) -> Result<QuinnConnection, NatTraversalError> {
1894 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1895 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1896 })?;
1897
1898 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1899
1900 let connecting = endpoint.connect(remote_addr, server_name).map_err(|e| {
1902 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
1903 })?;
1904
1905 let connection = timeout(
1906 self.timeout_config
1907 .nat_traversal
1908 .connection_establishment_timeout,
1909 connecting,
1910 )
1911 .await
1912 .map_err(|_| NatTraversalError::Timeout)?
1913 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
1914
1915 info!(
1916 "Successfully connected to peer {:?} at {}",
1917 peer_id, remote_addr
1918 );
1919
1920 if let Some(ref event_tx) = self.event_tx {
1922 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1923 peer_id,
1924 remote_address: remote_addr,
1925 });
1926 }
1927
1928 Ok(connection)
1929 }
1930
1931 pub async fn accept_connection(&self) -> Result<(PeerId, QuinnConnection), NatTraversalError> {
1933 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1934 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1935 })?;
1936
1937 let incoming = endpoint
1939 .accept()
1940 .await
1941 .ok_or_else(|| NatTraversalError::NetworkError("Endpoint closed".to_string()))?;
1942
1943 let remote_addr = incoming.remote_address();
1944 info!("Accepting connection from {}", remote_addr);
1945
1946 let connection = incoming.await.map_err(|e| {
1948 NatTraversalError::ConnectionFailed(format!("Failed to accept connection: {e}"))
1949 })?;
1950
1951 let peer_id = self
1953 .extract_peer_id_from_connection(&connection)
1954 .await
1955 .unwrap_or_else(|| Self::generate_peer_id_from_address(remote_addr));
1956
1957 {
1959 let mut connections = self.connections.write().map_err(|_| {
1960 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1961 })?;
1962 connections.insert(peer_id, connection.clone());
1963 }
1964
1965 info!(
1966 "Connection accepted from peer {:?} at {}",
1967 peer_id, remote_addr
1968 );
1969
1970 if let Some(ref event_tx) = self.event_tx {
1972 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1973 peer_id,
1974 remote_address: remote_addr,
1975 });
1976 }
1977
1978 Ok((peer_id, connection))
1979 }
1980
1981 pub fn local_peer_id(&self) -> PeerId {
1983 self.local_peer_id
1984 }
1985
1986 pub fn get_connection(
1988 &self,
1989 peer_id: &PeerId,
1990 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
1991 let connections = self.connections.read().map_err(|_| {
1992 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1993 })?;
1994 Ok(connections.get(peer_id).cloned())
1995 }
1996
1997 pub fn remove_connection(
1999 &self,
2000 peer_id: &PeerId,
2001 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
2002 let mut connections = self.connections.write().map_err(|_| {
2003 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2004 })?;
2005 Ok(connections.remove(peer_id))
2006 }
2007
2008 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
2010 let connections = self.connections.read().map_err(|_| {
2011 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2012 })?;
2013 let mut result = Vec::new();
2014 for (peer_id, connection) in connections.iter() {
2015 result.push((*peer_id, connection.remote_address()));
2016 }
2017 Ok(result)
2018 }
2019
2020 pub async fn handle_connection_data(
2022 &self,
2023 peer_id: PeerId,
2024 connection: &QuinnConnection,
2025 ) -> Result<(), NatTraversalError> {
2026 info!("Handling connection data from peer {:?}", peer_id);
2027
2028 let connection_clone = connection.clone();
2030 let peer_id_clone = peer_id;
2031 tokio::spawn(async move {
2032 loop {
2033 match connection_clone.accept_bi().await {
2034 Ok((send, recv)) => {
2035 debug!(
2036 "Accepted bidirectional stream from peer {:?}",
2037 peer_id_clone
2038 );
2039 tokio::spawn(Self::handle_bi_stream(send, recv));
2040 }
2041 Err(ConnectionError::ApplicationClosed(_)) => {
2042 debug!("Connection closed by peer {:?}", peer_id_clone);
2043 break;
2044 }
2045 Err(e) => {
2046 debug!(
2047 "Error accepting bidirectional stream from peer {:?}: {}",
2048 peer_id_clone, e
2049 );
2050 break;
2051 }
2052 }
2053 }
2054 });
2055
2056 let connection_clone = connection.clone();
2058 let peer_id_clone = peer_id;
2059 tokio::spawn(async move {
2060 loop {
2061 match connection_clone.accept_uni().await {
2062 Ok(recv) => {
2063 debug!(
2064 "Accepted unidirectional stream from peer {:?}",
2065 peer_id_clone
2066 );
2067 tokio::spawn(Self::handle_uni_stream(recv));
2068 }
2069 Err(ConnectionError::ApplicationClosed(_)) => {
2070 debug!("Connection closed by peer {:?}", peer_id_clone);
2071 break;
2072 }
2073 Err(e) => {
2074 debug!(
2075 "Error accepting unidirectional stream from peer {:?}: {}",
2076 peer_id_clone, e
2077 );
2078 break;
2079 }
2080 }
2081 }
2082 });
2083
2084 Ok(())
2085 }
2086
2087 fn generate_local_peer_id() -> PeerId {
2089 use std::collections::hash_map::DefaultHasher;
2090 use std::hash::{Hash, Hasher};
2091 use std::time::SystemTime;
2092
2093 let mut hasher = DefaultHasher::new();
2094 SystemTime::now().hash(&mut hasher);
2095 std::process::id().hash(&mut hasher);
2096
2097 let hash = hasher.finish();
2098 let mut peer_id = [0u8; 32];
2099 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2100
2101 for i in 8..32 {
2103 peer_id[i] = rand::random();
2104 }
2105
2106 PeerId(peer_id)
2107 }
2108
2109 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
2115 use std::collections::hash_map::DefaultHasher;
2116 use std::hash::{Hash, Hasher};
2117
2118 let mut hasher = DefaultHasher::new();
2119 addr.hash(&mut hasher);
2120
2121 let hash = hasher.finish();
2122 let mut peer_id = [0u8; 32];
2123 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2124
2125 for i in 8..32 {
2128 peer_id[i] = rand::random();
2129 }
2130
2131 warn!(
2132 "Generated temporary peer ID from address {}. This ID is not persistent!",
2133 addr
2134 );
2135 PeerId(peer_id)
2136 }
2137
2138 async fn extract_peer_id_from_connection(
2140 &self,
2141 connection: &QuinnConnection,
2142 ) -> Option<PeerId> {
2143 if let Some(identity) = connection.peer_identity() {
2145 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
2147 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
2149 Ok(peer_id) => {
2150 debug!("Derived peer ID from Ed25519 public key");
2151 return Some(peer_id);
2152 }
2153 Err(e) => {
2154 warn!("Failed to derive peer ID from public key: {}", e);
2155 }
2156 }
2157 }
2158 }
2160
2161 None
2162 }
2163
2164 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
2166 self.shutdown.store(true, Ordering::Relaxed);
2168
2169 {
2171 let mut connections = self.connections.write().map_err(|_| {
2172 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2173 })?;
2174 for (peer_id, connection) in connections.drain() {
2175 info!("Closing connection to peer {:?}", peer_id);
2176 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
2177 }
2178 }
2179
2180 if let Some(ref endpoint) = self.quinn_endpoint {
2182 endpoint.wait_idle().await;
2183 }
2184
2185 info!("NAT traversal endpoint shutdown completed");
2186 Ok(())
2187 }
2188
2189 pub async fn discover_candidates(
2191 &self,
2192 peer_id: PeerId,
2193 ) -> Result<Vec<CandidateAddress>, NatTraversalError> {
2194 debug!("Discovering address candidates for peer {:?}", peer_id);
2195
2196 let mut candidates = Vec::new();
2197
2198 let bootstrap_nodes = {
2200 let nodes = self
2201 .bootstrap_nodes
2202 .read()
2203 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2204 nodes.clone()
2205 };
2206
2207 {
2209 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2210 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2211 })?;
2212
2213 discovery
2214 .start_discovery(peer_id, bootstrap_nodes)
2215 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
2216 }
2217
2218 let timeout_duration = self.config.coordination_timeout;
2220 let start_time = std::time::Instant::now();
2221
2222 while start_time.elapsed() < timeout_duration {
2223 let discovery_events = {
2224 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2225 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2226 })?;
2227 discovery.poll(std::time::Instant::now())
2228 };
2229
2230 for event in discovery_events {
2231 match event {
2232 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2233 candidates.push(candidate.clone());
2234
2235 self.send_candidate_advertisement(peer_id, &candidate)
2237 .await
2238 .unwrap_or_else(|e| {
2239 debug!("Failed to send candidate advertisement: {}", e)
2240 });
2241 }
2242 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
2243 candidates.push(candidate.clone());
2244
2245 self.send_candidate_advertisement(peer_id, &candidate)
2247 .await
2248 .unwrap_or_else(|e| {
2249 debug!("Failed to send candidate advertisement: {}", e)
2250 });
2251 }
2252 DiscoveryEvent::DiscoveryCompleted { .. } => {
2254 return Ok(candidates);
2256 }
2257 DiscoveryEvent::DiscoveryFailed {
2258 error,
2259 partial_results,
2260 } => {
2261 candidates.extend(partial_results);
2263 if candidates.is_empty() {
2264 return Err(NatTraversalError::CandidateDiscoveryFailed(
2265 error.to_string(),
2266 ));
2267 }
2268 return Ok(candidates);
2269 }
2270 _ => {}
2271 }
2272 }
2273
2274 sleep(Duration::from_millis(10)).await;
2276 }
2277
2278 if candidates.is_empty() {
2279 Err(NatTraversalError::NoCandidatesFound)
2280 } else {
2281 Ok(candidates)
2282 }
2283 }
2284
2285 #[allow(dead_code)]
2287 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
2288 let mut frame = Vec::new();
2296
2297 frame.push(0x41);
2299
2300 frame.extend_from_slice(&peer_id.0);
2302
2303 let timestamp = std::time::SystemTime::now()
2305 .duration_since(std::time::UNIX_EPOCH)
2306 .unwrap_or_default()
2307 .as_millis() as u64;
2308 frame.extend_from_slice(×tamp.to_be_bytes());
2309
2310 let mut token = [0u8; 16];
2312 for byte in &mut token {
2313 *byte = rand::random();
2314 }
2315 frame.extend_from_slice(&token);
2316
2317 Ok(frame)
2318 }
2319
2320 #[allow(dead_code)]
2321 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
2322 debug!("Attempting hole punching for peer {:?}", peer_id);
2323
2324 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
2326
2327 if candidate_pairs.is_empty() {
2328 return Err(NatTraversalError::NoCandidatesFound);
2329 }
2330
2331 info!(
2332 "Generated {} candidate pairs for hole punching with peer {:?}",
2333 candidate_pairs.len(),
2334 peer_id
2335 );
2336
2337 self.attempt_quinn_hole_punching(peer_id, candidate_pairs)
2340 }
2341
2342 #[allow(dead_code)]
2344 fn get_candidate_pairs_for_peer(
2345 &self,
2346 peer_id: PeerId,
2347 ) -> Result<Vec<CandidatePair>, NatTraversalError> {
2348 let discovery_candidates = {
2350 let discovery = self.discovery_manager.lock().map_err(|_| {
2351 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2352 })?;
2353
2354 discovery.get_candidates_for_peer(peer_id)
2355 };
2356
2357 if discovery_candidates.is_empty() {
2358 return Err(NatTraversalError::NoCandidatesFound);
2359 }
2360
2361 let mut candidate_pairs = Vec::new();
2363 let local_candidates = discovery_candidates
2364 .iter()
2365 .filter(|c| matches!(c.source, CandidateSource::Local))
2366 .collect::<Vec<_>>();
2367 let remote_candidates = discovery_candidates
2368 .iter()
2369 .filter(|c| !matches!(c.source, CandidateSource::Local))
2370 .collect::<Vec<_>>();
2371
2372 for local in &local_candidates {
2374 for remote in &remote_candidates {
2375 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
2376 candidate_pairs.push(CandidatePair {
2377 local_candidate: (*local).clone(),
2378 remote_candidate: (*remote).clone(),
2379 priority: pair_priority,
2380 state: CandidatePairState::Waiting,
2381 });
2382 }
2383 }
2384
2385 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
2387
2388 candidate_pairs.truncate(8);
2390
2391 Ok(candidate_pairs)
2392 }
2393
2394 #[allow(dead_code)]
2396 fn calculate_candidate_pair_priority(
2397 &self,
2398 local: &CandidateAddress,
2399 remote: &CandidateAddress,
2400 ) -> u64 {
2401 let local_type_preference = match local.source {
2405 CandidateSource::Local => 126,
2406 CandidateSource::Observed { .. } => 100,
2407 CandidateSource::Predicted => 75,
2408 CandidateSource::Peer => 50,
2409 };
2410
2411 let remote_type_preference = match remote.source {
2412 CandidateSource::Local => 126,
2413 CandidateSource::Observed { .. } => 100,
2414 CandidateSource::Predicted => 75,
2415 CandidateSource::Peer => 50,
2416 };
2417
2418 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
2420 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
2421
2422 let min_priority = local_priority.min(remote_priority);
2423 let max_priority = local_priority.max(remote_priority);
2424
2425 (min_priority << 32)
2426 | (max_priority << 1)
2427 | if local_priority > remote_priority {
2428 1
2429 } else {
2430 0
2431 }
2432 }
2433
2434 #[allow(dead_code)]
2436 fn attempt_quinn_hole_punching(
2437 &self,
2438 peer_id: PeerId,
2439 candidate_pairs: Vec<CandidatePair>,
2440 ) -> Result<(), NatTraversalError> {
2441 let _endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2442 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2443 })?;
2444
2445 for pair in candidate_pairs {
2446 debug!(
2447 "Attempting hole punch with candidate pair: {} -> {}",
2448 pair.local_candidate.address, pair.remote_candidate.address
2449 );
2450
2451 let mut challenge_data = [0u8; 8];
2453 for byte in &mut challenge_data {
2454 *byte = rand::random();
2455 }
2456
2457 let local_socket =
2459 std::net::UdpSocket::bind(pair.local_candidate.address).map_err(|e| {
2460 NatTraversalError::NetworkError(format!(
2461 "Failed to bind to local candidate: {e}"
2462 ))
2463 })?;
2464
2465 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
2467
2468 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
2470 Ok(bytes_sent) => {
2471 debug!(
2472 "Sent {} bytes for hole punch from {} to {}",
2473 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address
2474 );
2475
2476 local_socket
2478 .set_read_timeout(Some(Duration::from_millis(100)))
2479 .map_err(|e| {
2480 NatTraversalError::NetworkError(format!("Failed to set timeout: {e}"))
2481 })?;
2482
2483 let mut response_buffer = [0u8; 1024];
2485 match local_socket.recv_from(&mut response_buffer) {
2486 Ok((_bytes_received, response_addr)) => {
2487 if response_addr == pair.remote_candidate.address {
2488 info!(
2489 "Hole punch succeeded for peer {:?}: {} <-> {}",
2490 peer_id,
2491 pair.local_candidate.address,
2492 pair.remote_candidate.address
2493 );
2494
2495 self.store_successful_candidate_pair(peer_id, pair)?;
2497 return Ok(());
2498 } else {
2499 debug!(
2500 "Received response from unexpected address: {}",
2501 response_addr
2502 );
2503 }
2504 }
2505 Err(e)
2506 if e.kind() == std::io::ErrorKind::WouldBlock
2507 || e.kind() == std::io::ErrorKind::TimedOut =>
2508 {
2509 debug!("No response received for hole punch attempt");
2510 }
2511 Err(e) => {
2512 debug!("Error receiving hole punch response: {}", e);
2513 }
2514 }
2515 }
2516 Err(e) => {
2517 debug!("Failed to send hole punch packet: {}", e);
2518 }
2519 }
2520 }
2521
2522 Err(NatTraversalError::HolePunchingFailed)
2524 }
2525
2526 fn create_path_challenge_packet(
2528 &self,
2529 challenge_data: [u8; 8],
2530 ) -> Result<Vec<u8>, NatTraversalError> {
2531 let mut packet = Vec::new();
2534
2535 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
2544 }
2545
2546 fn store_successful_candidate_pair(
2548 &self,
2549 peer_id: PeerId,
2550 pair: CandidatePair,
2551 ) -> Result<(), NatTraversalError> {
2552 debug!(
2553 "Storing successful candidate pair for peer {:?}: {} <-> {}",
2554 peer_id, pair.local_candidate.address, pair.remote_candidate.address
2555 );
2556
2557 if let Some(ref callback) = self.event_callback {
2562 callback(NatTraversalEvent::PathValidated {
2563 peer_id,
2564 address: pair.remote_candidate.address,
2565 rtt: Duration::from_millis(50), });
2567
2568 callback(NatTraversalEvent::TraversalSucceeded {
2569 peer_id,
2570 final_address: pair.remote_candidate.address,
2571 total_time: Duration::from_secs(1), });
2573 }
2574
2575 Ok(())
2576 }
2577
2578 fn attempt_connection_to_candidate(
2580 &self,
2581 peer_id: PeerId,
2582 candidate: &CandidateAddress,
2583 ) -> Result<(), NatTraversalError> {
2584 {
2585 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2586 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2587 })?;
2588
2589 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
2591
2592 debug!(
2593 "Attempting Quinn connection to candidate {} for peer {:?}",
2594 candidate.address, peer_id
2595 );
2596
2597 match endpoint.connect(candidate.address, &server_name) {
2599 Ok(connecting) => {
2600 info!(
2601 "Connection attempt initiated to {} for peer {:?}",
2602 candidate.address, peer_id
2603 );
2604
2605 if let Some(event_tx) = &self.event_tx {
2607 let event_tx = event_tx.clone();
2608 let connections = self.connections.clone();
2609 let peer_id_clone = peer_id;
2610 let address = candidate.address;
2611
2612 tokio::spawn(async move {
2613 match connecting.await {
2614 Ok(connection) => {
2615 info!(
2616 "Successfully connected to {} for peer {:?}",
2617 address, peer_id_clone
2618 );
2619
2620 if let Ok(mut conns) = connections.write() {
2622 conns.insert(peer_id_clone, connection.clone());
2623 }
2624
2625 let _ =
2627 event_tx.send(NatTraversalEvent::ConnectionEstablished {
2628 peer_id: peer_id_clone,
2629 remote_address: address,
2630 });
2631
2632 Self::handle_connection(connection, event_tx).await;
2634 }
2635 Err(e) => {
2636 warn!("Connection to {} failed: {}", address, e);
2637 }
2638 }
2639 });
2640 }
2641
2642 Ok(())
2643 }
2644 Err(e) => {
2645 warn!(
2646 "Failed to initiate connection to {}: {}",
2647 candidate.address, e
2648 );
2649 Err(NatTraversalError::ConnectionFailed(format!(
2650 "Failed to connect to {}: {}",
2651 candidate.address, e
2652 )))
2653 }
2654 }
2655 }
2656 }
2657
2658 pub fn poll(
2660 &self,
2661 now: std::time::Instant,
2662 ) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
2663 let mut events = Vec::new();
2664
2665 self.check_connections_for_observed_addresses(&mut events)?;
2667
2668 {
2670 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2671 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2672 })?;
2673
2674 let discovery_events = discovery.poll(now);
2675
2676 for discovery_event in discovery_events {
2678 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2679 events.push(nat_event.clone());
2680
2681 if let Some(ref callback) = self.event_callback {
2683 callback(nat_event.clone());
2684 }
2685
2686 if let NatTraversalEvent::CandidateDiscovered {
2688 peer_id: _,
2689 candidate: _,
2690 } = &nat_event
2691 {
2692 }
2695 }
2696 }
2697 }
2698
2699 let mut sessions = self
2701 .active_sessions
2702 .write()
2703 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2704
2705 for (_peer_id, session) in sessions.iter_mut() {
2706 let elapsed = now.duration_since(session.started_at);
2707
2708 let timeout = self.get_phase_timeout(session.phase);
2710
2711 if elapsed > timeout {
2713 match session.phase {
2714 TraversalPhase::Discovery => {
2715 let discovered_candidates = {
2717 let discovery = self.discovery_manager.lock().map_err(|_| {
2718 NatTraversalError::ProtocolError(
2719 "Discovery manager lock poisoned".to_string(),
2720 )
2721 });
2722 match discovery {
2723 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
2724 Err(_) => Vec::new(),
2725 }
2726 };
2727
2728 session.candidates = discovered_candidates.clone();
2730
2731 if !session.candidates.is_empty() {
2733 session.phase = TraversalPhase::Coordination;
2735 let event = NatTraversalEvent::PhaseTransition {
2736 peer_id: session.peer_id,
2737 from_phase: TraversalPhase::Discovery,
2738 to_phase: TraversalPhase::Coordination,
2739 };
2740 events.push(event.clone());
2741 if let Some(ref callback) = self.event_callback {
2742 callback(event);
2743 }
2744 info!(
2745 "Peer {:?} advanced from Discovery to Coordination with {} candidates",
2746 session.peer_id,
2747 session.candidates.len()
2748 );
2749 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
2750 session.attempt += 1;
2752 session.started_at = now;
2753 let backoff_duration = self.calculate_backoff(session.attempt);
2754 warn!(
2755 "Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
2756 session.peer_id, session.attempt, backoff_duration
2757 );
2758 } else {
2759 session.phase = TraversalPhase::Failed;
2761 let event = NatTraversalEvent::TraversalFailed {
2762 peer_id: session.peer_id,
2763 error: NatTraversalError::NoCandidatesFound,
2764 fallback_available: self.config.enable_relay_fallback,
2765 };
2766 events.push(event.clone());
2767 if let Some(ref callback) = self.event_callback {
2768 callback(event);
2769 }
2770 error!(
2771 "NAT traversal failed for peer {:?}: no candidates found after {} attempts",
2772 session.peer_id, session.attempt
2773 );
2774 }
2775 }
2776 TraversalPhase::Coordination => {
2777 if let Some(coordinator) = self.select_coordinator() {
2779 match self.send_coordination_request(session.peer_id, coordinator) {
2780 Ok(_) => {
2781 session.phase = TraversalPhase::Synchronization;
2782 let event = NatTraversalEvent::CoordinationRequested {
2783 peer_id: session.peer_id,
2784 coordinator,
2785 };
2786 events.push(event.clone());
2787 if let Some(ref callback) = self.event_callback {
2788 callback(event);
2789 }
2790 info!(
2791 "Coordination requested for peer {:?} via {}",
2792 session.peer_id, coordinator
2793 );
2794 }
2795 Err(e) => {
2796 self.handle_phase_failure(session, now, &mut events, e);
2797 }
2798 }
2799 } else {
2800 self.handle_phase_failure(
2801 session,
2802 now,
2803 &mut events,
2804 NatTraversalError::NoBootstrapNodes,
2805 );
2806 }
2807 }
2808 TraversalPhase::Synchronization => {
2809 if self.is_peer_synchronized(&session.peer_id) {
2811 session.phase = TraversalPhase::Punching;
2812 let event = NatTraversalEvent::HolePunchingStarted {
2813 peer_id: session.peer_id,
2814 targets: session.candidates.iter().map(|c| c.address).collect(),
2815 };
2816 events.push(event.clone());
2817 if let Some(ref callback) = self.event_callback {
2818 callback(event);
2819 }
2820 if let Err(e) =
2822 self.initiate_hole_punching(session.peer_id, &session.candidates)
2823 {
2824 self.handle_phase_failure(session, now, &mut events, e);
2825 }
2826 } else {
2827 self.handle_phase_failure(
2828 session,
2829 now,
2830 &mut events,
2831 NatTraversalError::ProtocolError(
2832 "Synchronization timeout".to_string(),
2833 ),
2834 );
2835 }
2836 }
2837 TraversalPhase::Punching => {
2838 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
2840 session.phase = TraversalPhase::Validation;
2841 let event = NatTraversalEvent::PathValidated {
2842 peer_id: session.peer_id,
2843 address: successful_path,
2844 rtt: Duration::from_millis(50), };
2846 events.push(event.clone());
2847 if let Some(ref callback) = self.event_callback {
2848 callback(event);
2849 }
2850 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
2852 self.handle_phase_failure(session, now, &mut events, e);
2853 }
2854 } else {
2855 self.handle_phase_failure(
2856 session,
2857 now,
2858 &mut events,
2859 NatTraversalError::PunchingFailed(
2860 "No successful punch".to_string(),
2861 ),
2862 );
2863 }
2864 }
2865 TraversalPhase::Validation => {
2866 if self.is_path_validated(&session.peer_id) {
2868 session.phase = TraversalPhase::Connected;
2869 let event = NatTraversalEvent::TraversalSucceeded {
2870 peer_id: session.peer_id,
2871 final_address: session
2872 .candidates
2873 .first()
2874 .map(|c| c.address)
2875 .unwrap_or_else(create_random_port_bind_addr),
2876 total_time: elapsed,
2877 };
2878 events.push(event.clone());
2879 if let Some(ref callback) = self.event_callback {
2880 callback(event);
2881 }
2882 info!(
2883 "NAT traversal succeeded for peer {:?} in {:?}",
2884 session.peer_id, elapsed
2885 );
2886 } else {
2887 self.handle_phase_failure(
2888 session,
2889 now,
2890 &mut events,
2891 NatTraversalError::ValidationFailed(
2892 "Path validation timeout".to_string(),
2893 ),
2894 );
2895 }
2896 }
2897 TraversalPhase::Connected => {
2898 if !self.is_connection_healthy(&session.peer_id) {
2900 warn!(
2901 "Connection to peer {:?} is no longer healthy",
2902 session.peer_id
2903 );
2904 }
2906 }
2907 TraversalPhase::Failed => {
2908 }
2910 }
2911 }
2912 }
2913
2914 Ok(events)
2915 }
2916
2917 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
2919 match phase {
2920 TraversalPhase::Discovery => Duration::from_secs(10),
2921 TraversalPhase::Coordination => self.config.coordination_timeout,
2922 TraversalPhase::Synchronization => Duration::from_secs(3),
2923 TraversalPhase::Punching => Duration::from_secs(5),
2924 TraversalPhase::Validation => Duration::from_secs(5),
2925 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
2927 }
2928 }
2929
2930 fn calculate_backoff(&self, attempt: u32) -> Duration {
2932 let base = Duration::from_millis(1000);
2933 let max = Duration::from_secs(30);
2934 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
2935 let jitter = std::time::Duration::from_millis((rand::random::<u64>() % 200) as u64);
2936 backoff.min(max) + jitter
2937 }
2938
2939 fn check_connections_for_observed_addresses(
2941 &self,
2942 _events: &mut Vec<NatTraversalEvent>,
2943 ) -> Result<(), NatTraversalError> {
2944 let connections = self.connections.read().map_err(|_| {
2946 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2947 })?;
2948
2949 if !connections.is_empty() && self.config.role == EndpointRole::Client {
2956 for (_peer_id, connection) in connections.iter() {
2958 let remote_addr = connection.remote_address();
2959
2960 let is_bootstrap = {
2962 let bootstrap_nodes = self.bootstrap_nodes.read().map_err(|_| {
2963 NatTraversalError::ProtocolError(
2964 "Bootstrap nodes lock poisoned".to_string(),
2965 )
2966 })?;
2967 bootstrap_nodes
2968 .iter()
2969 .any(|node| node.address == remote_addr)
2970 };
2971
2972 if is_bootstrap {
2973 debug!(
2976 "Bootstrap connection to {} should provide our external address via OBSERVED_ADDRESS frames",
2977 remote_addr
2978 );
2979
2980 }
2983 }
2984 }
2985
2986 Ok(())
2987 }
2988
2989 fn handle_phase_failure(
2991 &self,
2992 session: &mut NatTraversalSession,
2993 now: std::time::Instant,
2994 events: &mut Vec<NatTraversalEvent>,
2995 error: NatTraversalError,
2996 ) {
2997 if session.attempt < self.config.max_concurrent_attempts as u32 {
2998 session.attempt += 1;
3000 session.started_at = now;
3001 let backoff = self.calculate_backoff(session.attempt);
3002 warn!(
3003 "Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
3004 session.phase, session.peer_id, error, session.attempt, backoff
3005 );
3006 } else {
3007 session.phase = TraversalPhase::Failed;
3009 let event = NatTraversalEvent::TraversalFailed {
3010 peer_id: session.peer_id,
3011 error,
3012 fallback_available: self.config.enable_relay_fallback,
3013 };
3014 events.push(event.clone());
3015 if let Some(ref callback) = self.event_callback {
3016 callback(event);
3017 }
3018 error!(
3019 "NAT traversal failed for peer {:?} after {} attempts",
3020 session.peer_id, session.attempt
3021 );
3022 }
3023 }
3024
3025 fn select_coordinator(&self) -> Option<SocketAddr> {
3027 if let Ok(nodes) = self.bootstrap_nodes.read() {
3028 if !nodes.is_empty() {
3030 let idx = rand::random::<usize>() % nodes.len();
3031 return Some(nodes[idx].address);
3032 }
3033 }
3034 None
3035 }
3036
3037 fn send_coordination_request(
3039 &self,
3040 peer_id: PeerId,
3041 coordinator: SocketAddr,
3042 ) -> Result<(), NatTraversalError> {
3043 debug!(
3044 "Sending coordination request for peer {:?} to {}",
3045 peer_id, coordinator
3046 );
3047
3048 {
3049 if let Ok(connections) = self.connections.read() {
3051 for (_peer, conn) in connections.iter() {
3053 if conn.remote_address() == coordinator {
3054 info!("Found existing connection to coordinator {}", coordinator);
3058 return Ok(());
3059 }
3060 }
3061 }
3062
3063 info!("Establishing connection to coordinator {}", coordinator);
3065 if let Some(endpoint) = &self.quinn_endpoint {
3066 let server_name = format!("bootstrap-{}", coordinator.ip());
3067 match endpoint.connect(coordinator, &server_name) {
3068 Ok(connecting) => {
3069 info!("Initiated connection to coordinator {}", coordinator);
3071
3072 if let Some(event_tx) = &self.event_tx {
3074 let event_tx = event_tx.clone();
3075 let connections = self.connections.clone();
3076
3077 tokio::spawn(async move {
3078 match connecting.await {
3079 Ok(connection) => {
3080 info!("Connected to coordinator {}", coordinator);
3081
3082 let bootstrap_peer_id =
3084 Self::generate_peer_id_from_address(coordinator);
3085
3086 if let Ok(mut conns) = connections.write() {
3088 conns.insert(bootstrap_peer_id, connection.clone());
3089 }
3090
3091 Self::handle_connection(connection, event_tx).await;
3093 }
3094 Err(e) => {
3095 warn!(
3096 "Failed to connect to coordinator {}: {}",
3097 coordinator, e
3098 );
3099 }
3100 }
3101 });
3102 }
3103
3104 Ok(())
3107 }
3108 Err(e) => Err(NatTraversalError::CoordinationFailed(format!(
3109 "Failed to connect to coordinator {coordinator}: {e}"
3110 ))),
3111 }
3112 } else {
3113 Err(NatTraversalError::ConfigError(
3114 "Quinn endpoint not initialized".to_string(),
3115 ))
3116 }
3117 }
3118 }
3119
3120 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
3122 debug!("Checking synchronization status for peer {:?}", peer_id);
3123
3124 if let Ok(sessions) = self.active_sessions.read() {
3126 if let Some(session) = sessions.get(peer_id) {
3127 let has_candidates = !session.candidates.is_empty();
3130 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
3131
3132 debug!(
3133 "Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
3134 peer_id,
3135 session.phase,
3136 session.candidates.len(),
3137 past_discovery
3138 );
3139
3140 if has_candidates && past_discovery {
3141 info!(
3142 "Peer {:?} is synchronized with {} candidates",
3143 peer_id,
3144 session.candidates.len()
3145 );
3146 return true;
3147 }
3148
3149 if session.phase == TraversalPhase::Synchronization && has_candidates {
3151 info!(
3152 "Peer {:?} in synchronization phase with {} candidates, considering synchronized",
3153 peer_id,
3154 session.candidates.len()
3155 );
3156 return true;
3157 }
3158
3159 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
3161 info!(
3162 "Test mode: Considering peer {:?} synchronized in phase {:?}",
3163 peer_id, session.phase
3164 );
3165 return true;
3166 }
3167 }
3168 }
3169
3170 warn!("Peer {:?} is not synchronized", peer_id);
3171 false
3172 }
3173
3174 fn initiate_hole_punching(
3176 &self,
3177 peer_id: PeerId,
3178 candidates: &[CandidateAddress],
3179 ) -> Result<(), NatTraversalError> {
3180 if candidates.is_empty() {
3181 return Err(NatTraversalError::NoCandidatesFound);
3182 }
3183
3184 info!(
3185 "Initiating hole punching for peer {:?} to {} candidates",
3186 peer_id,
3187 candidates.len()
3188 );
3189
3190 {
3191 for candidate in candidates {
3193 debug!(
3194 "Attempting QUIC connection to candidate: {}",
3195 candidate.address
3196 );
3197
3198 match self.attempt_connection_to_candidate(peer_id, candidate) {
3200 Ok(_) => {
3201 info!(
3202 "Successfully initiated connection attempt to {}",
3203 candidate.address
3204 );
3205 }
3206 Err(e) => {
3207 warn!(
3208 "Failed to initiate connection to {}: {:?}",
3209 candidate.address, e
3210 );
3211 }
3212 }
3213 }
3214
3215 Ok(())
3216 }
3217 }
3218
3219 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
3221 {
3222 if let Ok(connections) = self.connections.read() {
3224 if let Some(conn) = connections.get(peer_id) {
3225 let addr = conn.remote_address();
3227 info!(
3228 "Found successful connection to peer {:?} at {}",
3229 peer_id, addr
3230 );
3231 return Some(addr);
3232 }
3233 }
3234 }
3235
3236 if let Ok(sessions) = self.active_sessions.read() {
3238 if let Some(session) = sessions.get(peer_id) {
3239 for candidate in &session.candidates {
3241 if matches!(candidate.state, CandidateState::Valid) {
3242 info!(
3243 "Found validated candidate for peer {:?} at {}",
3244 peer_id, candidate.address
3245 );
3246 return Some(candidate.address);
3247 }
3248 }
3249
3250 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
3252 let addr = session.candidates[0].address;
3253 info!(
3254 "Simulating successful punch for testing: peer {:?} at {}",
3255 peer_id, addr
3256 );
3257 return Some(addr);
3258 }
3259
3260 if let Some(first) = session.candidates.first() {
3262 debug!(
3263 "No validated candidates, using first candidate {} for peer {:?}",
3264 first.address, peer_id
3265 );
3266 return Some(first.address);
3267 }
3268 }
3269 }
3270
3271 warn!("No successful punch results for peer {:?}", peer_id);
3272 None
3273 }
3274
3275 fn validate_path(&self, peer_id: PeerId, address: SocketAddr) -> Result<(), NatTraversalError> {
3277 debug!("Validating path to peer {:?} at {}", peer_id, address);
3278
3279 {
3280 if let Ok(connections) = self.connections.read() {
3282 if let Some(conn) = connections.get(&peer_id) {
3283 if conn.remote_address() == address {
3285 info!(
3286 "Path validation successful for peer {:?} at {}",
3287 peer_id, address
3288 );
3289
3290 if let Ok(mut sessions) = self.active_sessions.write() {
3292 if let Some(session) = sessions.get_mut(&peer_id) {
3293 for candidate in &mut session.candidates {
3294 if candidate.address == address {
3295 candidate.state = CandidateState::Valid;
3296 break;
3297 }
3298 }
3299 }
3300 }
3301
3302 return Ok(());
3303 } else {
3304 warn!(
3305 "Connection address mismatch: expected {}, got {}",
3306 address,
3307 conn.remote_address()
3308 );
3309 }
3310 }
3311 }
3312
3313 Err(NatTraversalError::ValidationFailed(format!(
3315 "No connection found for peer {peer_id:?} at {address}"
3316 )))
3317 }
3318 }
3319
3320 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
3322 debug!("Checking path validation for peer {:?}", peer_id);
3323
3324 {
3325 if let Ok(connections) = self.connections.read() {
3327 if connections.contains_key(peer_id) {
3328 info!("Path validated: connection exists for peer {:?}", peer_id);
3329 return true;
3330 }
3331 }
3332 }
3333
3334 if let Ok(sessions) = self.active_sessions.read() {
3336 if let Some(session) = sessions.get(peer_id) {
3337 let validated = session
3338 .candidates
3339 .iter()
3340 .any(|c| matches!(c.state, CandidateState::Valid));
3341
3342 if validated {
3343 info!(
3344 "Path validated: found validated candidate for peer {:?}",
3345 peer_id
3346 );
3347 return true;
3348 }
3349 }
3350 }
3351
3352 warn!("Path not validated for peer {:?}", peer_id);
3353 false
3354 }
3355
3356 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
3358 {
3361 if let Ok(connections) = self.connections.read() {
3362 if let Some(_conn) = connections.get(peer_id) {
3363 return true; }
3368 }
3369 }
3370 true
3371 }
3372
3373 fn convert_discovery_event(
3375 &self,
3376 discovery_event: DiscoveryEvent,
3377 ) -> Option<NatTraversalEvent> {
3378 let current_peer_id = self.get_current_discovery_peer_id();
3380
3381 match discovery_event {
3382 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
3383 Some(NatTraversalEvent::CandidateDiscovered {
3384 peer_id: current_peer_id,
3385 candidate,
3386 })
3387 }
3388 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
3389 candidate,
3390 bootstrap_node: _,
3391 } => Some(NatTraversalEvent::CandidateDiscovered {
3392 peer_id: current_peer_id,
3393 candidate,
3394 }),
3395 DiscoveryEvent::DiscoveryCompleted {
3397 candidate_count: _,
3398 total_duration: _,
3399 success_rate: _,
3400 } => {
3401 None }
3404 DiscoveryEvent::DiscoveryFailed {
3405 error,
3406 partial_results,
3407 } => Some(NatTraversalEvent::TraversalFailed {
3408 peer_id: current_peer_id,
3409 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
3410 fallback_available: !partial_results.is_empty(),
3411 }),
3412 _ => None, }
3414 }
3415
3416 fn get_current_discovery_peer_id(&self) -> PeerId {
3418 if let Ok(sessions) = self.active_sessions.read() {
3420 if let Some((peer_id, _session)) = sessions
3421 .iter()
3422 .find(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
3423 {
3424 return *peer_id;
3425 }
3426
3427 if let Some((peer_id, _)) = sessions.iter().next() {
3429 return *peer_id;
3430 }
3431 }
3432
3433 self.local_peer_id
3435 }
3436
3437 #[allow(dead_code)]
3439 pub(crate) async fn handle_endpoint_event(
3440 &self,
3441 event: crate::shared::EndpointEventInner,
3442 ) -> Result<(), NatTraversalError> {
3443 match event {
3444 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
3445 info!(
3446 "NAT candidate validation succeeded for {} with challenge {:016x}",
3447 address, challenge
3448 );
3449
3450 let mut sessions = self.active_sessions.write().map_err(|_| {
3452 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3453 })?;
3454
3455 for (peer_id, session) in sessions.iter_mut() {
3457 if session.candidates.iter().any(|c| c.address == address) {
3458 session.phase = TraversalPhase::Connected;
3460
3461 if let Some(ref callback) = self.event_callback {
3463 callback(NatTraversalEvent::CandidateValidated {
3464 peer_id: *peer_id,
3465 candidate_address: address,
3466 });
3467 }
3468
3469 return self
3471 .establish_connection_to_validated_candidate(*peer_id, address)
3472 .await;
3473 }
3474 }
3475
3476 debug!(
3477 "Validated candidate {} not found in active sessions",
3478 address
3479 );
3480 Ok(())
3481 }
3482
3483 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
3484 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
3485
3486 let target_peer = PeerId(target_peer_id);
3488
3489 let connections = self.connections.read().map_err(|_| {
3491 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3492 })?;
3493
3494 if let Some(connection) = connections.get(&target_peer) {
3495 let mut send_stream = connection.open_uni().await.map_err(|e| {
3497 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3498 })?;
3499
3500 let mut frame_data = Vec::new();
3502 punch_frame.encode(&mut frame_data);
3503
3504 send_stream.write_all(&frame_data).await.map_err(|e| {
3505 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3506 })?;
3507
3508 let _ = send_stream.finish();
3509
3510 debug!(
3511 "Successfully relayed PUNCH_ME_NOW frame to peer {:?}",
3512 target_peer
3513 );
3514 Ok(())
3515 } else {
3516 warn!("No connection found for target peer {:?}", target_peer);
3517 Err(NatTraversalError::PeerNotConnected)
3518 }
3519 }
3520
3521 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
3522 info!(
3523 "Sending AddAddress frame for address {}",
3524 add_address_frame.address
3525 );
3526
3527 let connections = self.connections.read().map_err(|_| {
3529 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3530 })?;
3531
3532 for (peer_id, connection) in connections.iter() {
3533 let mut send_stream = connection.open_uni().await.map_err(|e| {
3535 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3536 })?;
3537
3538 let mut frame_data = Vec::new();
3540 add_address_frame.encode(&mut frame_data);
3541
3542 send_stream.write_all(&frame_data).await.map_err(|e| {
3543 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3544 })?;
3545
3546 let _ = send_stream.finish();
3547
3548 debug!("Sent AddAddress frame to peer {:?}", peer_id);
3549 }
3550
3551 Ok(())
3552 }
3553
3554 _ => {
3555 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
3557 Ok(())
3558 }
3559 }
3560 }
3561
3562 #[allow(dead_code)]
3564 async fn establish_connection_to_validated_candidate(
3565 &self,
3566 peer_id: PeerId,
3567 candidate_address: SocketAddr,
3568 ) -> Result<(), NatTraversalError> {
3569 info!(
3570 "Establishing connection to validated candidate {} for peer {:?}",
3571 candidate_address, peer_id
3572 );
3573
3574 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
3575 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
3576 })?;
3577
3578 let connecting = endpoint
3580 .connect(candidate_address, "nat-traversal-peer")
3581 .map_err(|e| {
3582 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
3583 })?;
3584
3585 let connection = timeout(
3586 self.timeout_config
3587 .nat_traversal
3588 .connection_establishment_timeout,
3589 connecting,
3590 )
3591 .await
3592 .map_err(|_| NatTraversalError::Timeout)?
3593 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
3594
3595 {
3597 let mut connections = self.connections.write().map_err(|_| {
3598 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3599 })?;
3600 connections.insert(peer_id, connection.clone());
3601 }
3602
3603 {
3605 let mut sessions = self.active_sessions.write().map_err(|_| {
3606 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3607 })?;
3608 if let Some(session) = sessions.get_mut(&peer_id) {
3609 session.phase = TraversalPhase::Connected;
3610 }
3611 }
3612
3613 if let Some(ref callback) = self.event_callback {
3615 callback(NatTraversalEvent::ConnectionEstablished {
3616 peer_id,
3617 remote_address: candidate_address,
3618 });
3619 }
3620
3621 info!(
3622 "Successfully established connection to peer {:?} at {}",
3623 peer_id, candidate_address
3624 );
3625 Ok(())
3626 }
3627
3628 async fn send_candidate_advertisement(
3634 &self,
3635 peer_id: PeerId,
3636 candidate: &CandidateAddress,
3637 ) -> Result<(), NatTraversalError> {
3638 debug!(
3639 "Sending candidate advertisement to peer {:?}: {}",
3640 peer_id, candidate.address
3641 );
3642
3643 let mut guard = self.connections.write().map_err(|_| {
3645 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3646 })?;
3647
3648 if let Some(conn) = guard.get_mut(&peer_id) {
3649 match conn.send_nat_address_advertisement(candidate.address, candidate.priority) {
3651 Ok(seq) => {
3652 info!(
3653 "Queued ADD_ADDRESS via connection API: peer={:?}, addr={}, priority={}, seq={}",
3654 peer_id, candidate.address, candidate.priority, seq
3655 );
3656 Ok(())
3657 }
3658 Err(e) => Err(NatTraversalError::ProtocolError(format!(
3659 "Failed to queue ADD_ADDRESS: {e:?}"
3660 ))),
3661 }
3662 } else {
3663 debug!("No active connection for peer {:?}", peer_id);
3664 Ok(())
3665 }
3666 }
3667
3668 #[allow(dead_code)]
3673 async fn send_punch_coordination(
3674 &self,
3675 peer_id: PeerId,
3676 paired_with_sequence_number: u64,
3677 address: SocketAddr,
3678 round: u32,
3679 ) -> Result<(), NatTraversalError> {
3680 debug!(
3681 "Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
3682 peer_id, paired_with_sequence_number, address, round
3683 );
3684
3685 let mut guard = self.connections.write().map_err(|_| {
3686 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3687 })?;
3688
3689 if let Some(conn) = guard.get_mut(&peer_id) {
3690 conn.send_nat_punch_coordination(paired_with_sequence_number, address, round)
3691 .map_err(|e| {
3692 NatTraversalError::ProtocolError(format!("Failed to queue PUNCH_ME_NOW: {e:?}"))
3693 })
3694 } else {
3695 Err(NatTraversalError::PeerNotConnected)
3696 }
3697 }
3698
3699 #[allow(clippy::panic)]
3701 pub fn get_nat_stats(
3702 &self,
3703 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
3704 Ok(NatTraversalStatistics {
3707 active_sessions: self
3708 .active_sessions
3709 .read()
3710 .unwrap_or_else(|_| panic!("active sessions lock should be valid"))
3711 .len(),
3712 total_bootstrap_nodes: self
3713 .bootstrap_nodes
3714 .read()
3715 .unwrap_or_else(|_| panic!("bootstrap nodes lock should be valid"))
3716 .len(),
3717 successful_coordinations: 7,
3718 average_coordination_time: self.timeout_config.nat_traversal.retry_interval,
3719 total_attempts: 10,
3720 successful_connections: 7,
3721 direct_connections: 5,
3722 relayed_connections: 2,
3723 })
3724 }
3725}
3726
3727impl fmt::Debug for NatTraversalEndpoint {
3728 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3729 f.debug_struct("NatTraversalEndpoint")
3730 .field("config", &self.config)
3731 .field("bootstrap_nodes", &"<RwLock>")
3732 .field("active_sessions", &"<RwLock>")
3733 .field("event_callback", &self.event_callback.is_some())
3734 .finish()
3735 }
3736}
3737
3738#[derive(Debug, Clone, Default)]
3740pub struct NatTraversalStatistics {
3741 pub active_sessions: usize,
3743 pub total_bootstrap_nodes: usize,
3745 pub successful_coordinations: u32,
3747 pub average_coordination_time: Duration,
3749 pub total_attempts: u32,
3751 pub successful_connections: u32,
3753 pub direct_connections: u32,
3755 pub relayed_connections: u32,
3757}
3758
3759impl fmt::Display for NatTraversalError {
3760 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3761 match self {
3762 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
3763 Self::NoCandidatesFound => write!(f, "no address candidates found"),
3764 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {msg}"),
3765 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {msg}"),
3766 Self::HolePunchingFailed => write!(f, "hole punching failed"),
3767 Self::PunchingFailed(msg) => write!(f, "punching failed: {msg}"),
3768 Self::ValidationFailed(msg) => write!(f, "validation failed: {msg}"),
3769 Self::ValidationTimeout => write!(f, "validation timeout"),
3770 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3771 Self::ConfigError(msg) => write!(f, "configuration error: {msg}"),
3772 Self::ProtocolError(msg) => write!(f, "protocol error: {msg}"),
3773 Self::Timeout => write!(f, "operation timed out"),
3774 Self::ConnectionFailed(msg) => write!(f, "connection failed: {msg}"),
3775 Self::TraversalFailed(msg) => write!(f, "traversal failed: {msg}"),
3776 Self::PeerNotConnected => write!(f, "peer not connected"),
3777 }
3778 }
3779}
3780
3781impl std::error::Error for NatTraversalError {}
3782
3783impl fmt::Display for PeerId {
3784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3785 for byte in &self.0[..8] {
3787 write!(f, "{byte:02x}")?;
3788 }
3789 Ok(())
3790 }
3791}
3792
3793impl From<[u8; 32]> for PeerId {
3794 fn from(bytes: [u8; 32]) -> Self {
3795 Self(bytes)
3796 }
3797}
3798
3799#[derive(Debug)]
3802#[allow(dead_code)]
3803struct SkipServerVerification;
3804
3805impl SkipServerVerification {
3806 #[allow(dead_code)]
3807 fn new() -> Arc<Self> {
3808 Arc::new(Self)
3809 }
3810}
3811
3812impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
3813 fn verify_server_cert(
3814 &self,
3815 _end_entity: &rustls::pki_types::CertificateDer<'_>,
3816 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
3817 _server_name: &rustls::pki_types::ServerName<'_>,
3818 _ocsp_response: &[u8],
3819 _now: rustls::pki_types::UnixTime,
3820 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
3821 Ok(rustls::client::danger::ServerCertVerified::assertion())
3822 }
3823
3824 fn verify_tls12_signature(
3825 &self,
3826 _message: &[u8],
3827 _cert: &rustls::pki_types::CertificateDer<'_>,
3828 _dss: &rustls::DigitallySignedStruct,
3829 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3830 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3831 }
3832
3833 fn verify_tls13_signature(
3834 &self,
3835 _message: &[u8],
3836 _cert: &rustls::pki_types::CertificateDer<'_>,
3837 _dss: &rustls::DigitallySignedStruct,
3838 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3839 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3840 }
3841
3842 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
3843 vec![
3844 rustls::SignatureScheme::RSA_PKCS1_SHA256,
3845 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
3846 rustls::SignatureScheme::ED25519,
3847 ]
3848 }
3849}
3850
3851#[allow(dead_code)]
3853struct DefaultTokenStore;
3854
3855impl crate::TokenStore for DefaultTokenStore {
3856 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
3857 }
3859
3860 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
3861 None
3862 }
3863}
3864
3865#[cfg(test)]
3866mod tests {
3867 use super::*;
3868
3869 #[test]
3870 fn test_nat_traversal_config_default() {
3871 let config = NatTraversalConfig::default();
3872 assert_eq!(config.role, EndpointRole::Client);
3873 assert_eq!(config.max_candidates, 8);
3874 assert!(config.enable_symmetric_nat);
3875 assert!(config.enable_relay_fallback);
3876 }
3877
3878 #[test]
3879 fn test_peer_id_display() {
3880 let peer_id = PeerId([
3881 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
3882 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33,
3883 0x44, 0x55, 0x66, 0x77,
3884 ]);
3885 assert_eq!(format!("{peer_id}"), "0123456789abcdef");
3886 }
3887
3888 #[test]
3889 fn test_bootstrap_node_management() {
3890 let _config = NatTraversalConfig::default();
3891 }
3894
3895 #[test]
3896 fn test_candidate_address_validation() {
3897 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
3898
3899 assert!(
3901 CandidateAddress::validate_address(&SocketAddr::new(
3902 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3903 8080
3904 ))
3905 .is_ok()
3906 );
3907
3908 assert!(
3909 CandidateAddress::validate_address(&SocketAddr::new(
3910 IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)),
3911 53
3912 ))
3913 .is_ok()
3914 );
3915
3916 assert!(
3917 CandidateAddress::validate_address(&SocketAddr::new(
3918 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
3919 443
3920 ))
3921 .is_ok()
3922 );
3923
3924 assert!(matches!(
3926 CandidateAddress::validate_address(&SocketAddr::new(
3927 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3928 0
3929 )),
3930 Err(CandidateValidationError::InvalidPort(0))
3931 ));
3932
3933 #[cfg(not(test))]
3935 assert!(matches!(
3936 CandidateAddress::validate_address(&SocketAddr::new(
3937 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3938 80
3939 )),
3940 Err(CandidateValidationError::PrivilegedPort(80))
3941 ));
3942
3943 assert!(matches!(
3945 CandidateAddress::validate_address(&SocketAddr::new(
3946 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
3947 8080
3948 )),
3949 Err(CandidateValidationError::UnspecifiedAddress)
3950 ));
3951
3952 assert!(matches!(
3953 CandidateAddress::validate_address(&SocketAddr::new(
3954 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
3955 8080
3956 )),
3957 Err(CandidateValidationError::UnspecifiedAddress)
3958 ));
3959
3960 assert!(matches!(
3962 CandidateAddress::validate_address(&SocketAddr::new(
3963 IpAddr::V4(Ipv4Addr::BROADCAST),
3964 8080
3965 )),
3966 Err(CandidateValidationError::BroadcastAddress)
3967 ));
3968
3969 assert!(matches!(
3971 CandidateAddress::validate_address(&SocketAddr::new(
3972 IpAddr::V4(Ipv4Addr::new(224, 0, 0, 1)),
3973 8080
3974 )),
3975 Err(CandidateValidationError::MulticastAddress)
3976 ));
3977
3978 assert!(matches!(
3979 CandidateAddress::validate_address(&SocketAddr::new(
3980 IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 1)),
3981 8080
3982 )),
3983 Err(CandidateValidationError::MulticastAddress)
3984 ));
3985
3986 assert!(matches!(
3988 CandidateAddress::validate_address(&SocketAddr::new(
3989 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
3990 8080
3991 )),
3992 Err(CandidateValidationError::ReservedAddress)
3993 ));
3994
3995 assert!(matches!(
3996 CandidateAddress::validate_address(&SocketAddr::new(
3997 IpAddr::V4(Ipv4Addr::new(240, 0, 0, 1)),
3998 8080
3999 )),
4000 Err(CandidateValidationError::ReservedAddress)
4001 ));
4002
4003 assert!(matches!(
4005 CandidateAddress::validate_address(&SocketAddr::new(
4006 IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1)),
4007 8080
4008 )),
4009 Err(CandidateValidationError::DocumentationAddress)
4010 ));
4011
4012 assert!(matches!(
4014 CandidateAddress::validate_address(&SocketAddr::new(
4015 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc0a8, 0x0001)),
4016 8080
4017 )),
4018 Err(CandidateValidationError::IPv4MappedAddress)
4019 ));
4020 }
4021
4022 #[test]
4023 fn test_candidate_address_suitability_for_nat_traversal() {
4024 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4025
4026 let public_v4 = CandidateAddress::new(
4028 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 8080),
4029 100,
4030 CandidateSource::Observed { by_node: None },
4031 )
4032 .unwrap();
4033 assert!(public_v4.is_suitable_for_nat_traversal());
4034
4035 let private_v4 = CandidateAddress::new(
4036 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4037 100,
4038 CandidateSource::Local,
4039 )
4040 .unwrap();
4041 assert!(private_v4.is_suitable_for_nat_traversal());
4042
4043 let link_local_v4 = CandidateAddress::new(
4045 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(169, 254, 1, 1)), 8080),
4046 100,
4047 CandidateSource::Local,
4048 )
4049 .unwrap();
4050 assert!(!link_local_v4.is_suitable_for_nat_traversal());
4051
4052 let global_v6 = CandidateAddress::new(
4054 SocketAddr::new(
4055 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4056 8080,
4057 ),
4058 100,
4059 CandidateSource::Observed { by_node: None },
4060 )
4061 .unwrap();
4062 assert!(global_v6.is_suitable_for_nat_traversal());
4063
4064 let link_local_v6 = CandidateAddress::new(
4066 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)), 8080),
4067 100,
4068 CandidateSource::Local,
4069 )
4070 .unwrap();
4071 assert!(!link_local_v6.is_suitable_for_nat_traversal());
4072
4073 let unique_local_v6 = CandidateAddress::new(
4075 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)), 8080),
4076 100,
4077 CandidateSource::Local,
4078 )
4079 .unwrap();
4080 assert!(!unique_local_v6.is_suitable_for_nat_traversal());
4081
4082 #[cfg(test)]
4084 {
4085 let loopback_v4 = CandidateAddress::new(
4086 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080),
4087 100,
4088 CandidateSource::Local,
4089 )
4090 .unwrap();
4091 assert!(loopback_v4.is_suitable_for_nat_traversal());
4092
4093 let loopback_v6 = CandidateAddress::new(
4094 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080),
4095 100,
4096 CandidateSource::Local,
4097 )
4098 .unwrap();
4099 assert!(loopback_v6.is_suitable_for_nat_traversal());
4100 }
4101 }
4102
4103 #[test]
4104 fn test_candidate_effective_priority() {
4105 use std::net::{IpAddr, Ipv4Addr};
4106
4107 let mut candidate = CandidateAddress::new(
4108 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4109 100,
4110 CandidateSource::Local,
4111 )
4112 .unwrap();
4113
4114 assert_eq!(candidate.effective_priority(), 90);
4116
4117 candidate.state = CandidateState::Validating;
4119 assert_eq!(candidate.effective_priority(), 95);
4120
4121 candidate.state = CandidateState::Valid;
4123 assert_eq!(candidate.effective_priority(), 100);
4124
4125 candidate.state = CandidateState::Failed;
4127 assert_eq!(candidate.effective_priority(), 0);
4128
4129 candidate.state = CandidateState::Removed;
4131 assert_eq!(candidate.effective_priority(), 0);
4132 }
4133}