1use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, time::Duration};
8
9use tracing::{debug, error, info, warn};
10
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use tokio::{
14 net::UdpSocket,
15 sync::mpsc,
16 time::{sleep, timeout},
17};
18
19#[cfg(feature = "runtime-tokio")]
20use crate::high_level::TokioRuntime;
21
22use crate::{
23 VarInt,
24 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
25 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
26};
27
28use crate::{
29 ClientConfig, ConnectionError, EndpointConfig, ServerConfig, TransportConfig,
30 crypto::rustls::QuicClientConfig,
31 crypto::rustls::QuicServerConfig,
32 high_level::{Connection as QuinnConnection, Endpoint as QuinnEndpoint},
33};
34
35use crate::config::validation::{ConfigValidator, ValidationResult};
36
37use crate::crypto::certificate_manager::{CertificateConfig, CertificateManager};
38
39pub struct NatTraversalEndpoint {
41 quinn_endpoint: Option<QuinnEndpoint>,
43 config: NatTraversalConfig,
47 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
49 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
51 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
53 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
55 shutdown: Arc<AtomicBool>,
57 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
59 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
61 local_peer_id: PeerId,
63 timeout_config: crate::config::nat_timeouts::TimeoutConfig,
65}
66
67#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
69pub struct NatTraversalConfig {
70 pub role: EndpointRole,
72 pub bootstrap_nodes: Vec<SocketAddr>,
74 pub max_candidates: usize,
76 pub coordination_timeout: Duration,
78 pub enable_symmetric_nat: bool,
80 pub enable_relay_fallback: bool,
82 pub max_concurrent_attempts: usize,
84 pub bind_addr: Option<SocketAddr>,
86 pub prefer_rfc_nat_traversal: bool,
89 pub timeouts: crate::config::nat_timeouts::TimeoutConfig,
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
95pub enum EndpointRole {
96 Client,
98 Server {
100 can_coordinate: bool,
102 },
103 Bootstrap,
105}
106
107impl EndpointRole {
108 pub fn name(&self) -> &'static str {
110 match self {
111 Self::Client => "client",
112 Self::Server { .. } => "server",
113 Self::Bootstrap => "bootstrap",
114 }
115 }
116}
117
118#[derive(
120 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
121)]
122pub struct PeerId(pub [u8; 32]);
123
124#[derive(Debug, Clone)]
126pub struct BootstrapNode {
127 pub address: SocketAddr,
129 pub last_seen: std::time::Instant,
131 pub can_coordinate: bool,
133 pub rtt: Option<Duration>,
135 pub coordination_count: u32,
137}
138
139impl BootstrapNode {
140 pub fn new(address: SocketAddr) -> Self {
142 Self {
143 address,
144 last_seen: std::time::Instant::now(),
145 can_coordinate: true,
146 rtt: None,
147 coordination_count: 0,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct CandidatePair {
155 pub local_candidate: CandidateAddress,
157 pub remote_candidate: CandidateAddress,
159 pub priority: u64,
161 pub state: CandidatePairState,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
167pub enum CandidatePairState {
168 Waiting,
170 InProgress,
172 Succeeded,
174 Failed,
176 Cancelled,
178}
179
180#[derive(Debug)]
182struct NatTraversalSession {
183 peer_id: PeerId,
185 coordinator: SocketAddr,
187 attempt: u32,
189 started_at: std::time::Instant,
191 phase: TraversalPhase,
193 candidates: Vec<CandidateAddress>,
195 session_state: SessionState,
197}
198
199#[derive(Debug, Clone)]
201pub struct SessionState {
202 pub state: ConnectionState,
204 pub last_transition: std::time::Instant,
206 pub connection: Option<QuinnConnection>,
208 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
210 pub metrics: ConnectionMetrics,
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum ConnectionState {
217 Idle,
219 Connecting,
221 Connected,
223 Migrating,
225 Closed,
227}
228
229#[derive(Debug, Clone, Default)]
231pub struct ConnectionMetrics {
232 pub rtt: Option<Duration>,
234 pub loss_rate: f64,
236 pub bytes_sent: u64,
238 pub bytes_received: u64,
240 pub last_activity: Option<std::time::Instant>,
242}
243
244#[derive(Debug, Clone)]
246pub struct SessionStateUpdate {
247 pub peer_id: PeerId,
249 pub old_state: ConnectionState,
251 pub new_state: ConnectionState,
253 pub reason: StateChangeReason,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum StateChangeReason {
260 Timeout,
262 ConnectionEstablished,
264 ConnectionClosed,
266 MigrationComplete,
268 MigrationFailed,
270 NetworkError,
272 UserClosed,
274}
275
276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
278pub enum TraversalPhase {
279 Discovery,
281 Coordination,
283 Synchronization,
285 Punching,
287 Validation,
289 Connected,
291 Failed,
293}
294
295#[derive(Debug, Clone, Copy)]
297enum SessionUpdate {
298 Timeout,
300 Disconnected,
302 UpdateMetrics,
304 InvalidState,
306 Retry,
308 MigrationTimeout,
310 Remove,
312}
313
314#[derive(Debug, Clone)]
316pub struct CandidateAddress {
317 pub address: SocketAddr,
319 pub priority: u32,
321 pub source: CandidateSource,
323 pub state: CandidateState,
325}
326
327impl CandidateAddress {
328 pub fn new(
330 address: SocketAddr,
331 priority: u32,
332 source: CandidateSource,
333 ) -> Result<Self, CandidateValidationError> {
334 Self::validate_address(&address)?;
335 Ok(Self {
336 address,
337 priority,
338 source,
339 state: CandidateState::New,
340 })
341 }
342
343 pub fn validate_address(addr: &SocketAddr) -> Result<(), CandidateValidationError> {
345 if addr.port() == 0 {
347 return Err(CandidateValidationError::InvalidPort(0));
348 }
349
350 #[cfg(not(test))]
352 if addr.port() < 1024 {
353 return Err(CandidateValidationError::PrivilegedPort(addr.port()));
354 }
355
356 match addr.ip() {
357 std::net::IpAddr::V4(ipv4) => {
358 if ipv4.is_unspecified() {
360 return Err(CandidateValidationError::UnspecifiedAddress);
361 }
362 if ipv4.is_broadcast() {
363 return Err(CandidateValidationError::BroadcastAddress);
364 }
365 if ipv4.is_multicast() {
366 return Err(CandidateValidationError::MulticastAddress);
367 }
368 if ipv4.octets()[0] == 0 {
370 return Err(CandidateValidationError::ReservedAddress);
371 }
372 if ipv4.octets()[0] >= 240 {
374 return Err(CandidateValidationError::ReservedAddress);
375 }
376 }
377 std::net::IpAddr::V6(ipv6) => {
378 if ipv6.is_unspecified() {
380 return Err(CandidateValidationError::UnspecifiedAddress);
381 }
382 if ipv6.is_multicast() {
383 return Err(CandidateValidationError::MulticastAddress);
384 }
385 let segments = ipv6.segments();
387 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
388 return Err(CandidateValidationError::DocumentationAddress);
389 }
390 if ipv6.to_ipv4_mapped().is_some() {
392 return Err(CandidateValidationError::IPv4MappedAddress);
393 }
394 }
395 }
396
397 Ok(())
398 }
399
400 pub fn is_suitable_for_nat_traversal(&self) -> bool {
402 match self.address.ip() {
403 std::net::IpAddr::V4(ipv4) => {
404 #[cfg(test)]
409 if ipv4.is_loopback() {
410 return true;
411 }
412 !ipv4.is_loopback()
413 && !ipv4.is_link_local()
414 && !ipv4.is_multicast()
415 && !ipv4.is_broadcast()
416 }
417 std::net::IpAddr::V6(ipv6) => {
418 #[cfg(test)]
424 if ipv6.is_loopback() {
425 return true;
426 }
427 let segments = ipv6.segments();
428 let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
429 let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
430
431 !ipv6.is_loopback() && !is_link_local && !is_unique_local && !ipv6.is_multicast()
432 }
433 }
434 }
435
436 pub fn effective_priority(&self) -> u32 {
438 match self.state {
439 CandidateState::Valid => self.priority,
440 CandidateState::New => self.priority.saturating_sub(10),
441 CandidateState::Validating => self.priority.saturating_sub(5),
442 CandidateState::Failed => 0,
443 CandidateState::Removed => 0,
444 }
445 }
446}
447
448#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
450pub enum CandidateValidationError {
451 #[error("invalid port number: {0}")]
453 InvalidPort(u16),
454 #[error("privileged port not allowed: {0}")]
456 PrivilegedPort(u16),
457 #[error("unspecified address not allowed")]
459 UnspecifiedAddress,
460 #[error("broadcast address not allowed")]
462 BroadcastAddress,
463 #[error("multicast address not allowed")]
465 MulticastAddress,
466 #[error("reserved address not allowed")]
468 ReservedAddress,
469 #[error("documentation address not allowed")]
471 DocumentationAddress,
472 #[error("IPv4-mapped IPv6 address not allowed")]
474 IPv4MappedAddress,
475}
476
477#[derive(Debug, Clone)]
479pub enum NatTraversalEvent {
480 CandidateDiscovered {
482 peer_id: PeerId,
483 candidate: CandidateAddress,
484 },
485 CoordinationRequested {
487 peer_id: PeerId,
488 coordinator: SocketAddr,
489 },
490 CoordinationSynchronized { peer_id: PeerId, round_id: VarInt },
492 HolePunchingStarted {
494 peer_id: PeerId,
495 targets: Vec<SocketAddr>,
496 },
497 PathValidated {
499 peer_id: PeerId,
500 address: SocketAddr,
501 rtt: Duration,
502 },
503 CandidateValidated {
505 peer_id: PeerId,
506 candidate_address: SocketAddr,
507 },
508 TraversalSucceeded {
510 peer_id: PeerId,
511 final_address: SocketAddr,
512 total_time: Duration,
513 },
514 ConnectionEstablished {
516 peer_id: PeerId,
517 remote_address: SocketAddr,
519 },
520 TraversalFailed {
522 peer_id: PeerId,
524 error: NatTraversalError,
526 fallback_available: bool,
528 },
529 ConnectionLost { peer_id: PeerId, reason: String },
531 PhaseTransition {
533 peer_id: PeerId,
534 from_phase: TraversalPhase,
535 to_phase: TraversalPhase,
536 },
537 SessionStateChanged {
539 peer_id: PeerId,
540 new_state: ConnectionState,
541 },
542}
543
544#[derive(Debug, Clone)]
546pub enum NatTraversalError {
547 NoBootstrapNodes,
549 NoCandidatesFound,
551 CandidateDiscoveryFailed(String),
553 CoordinationFailed(String),
555 HolePunchingFailed,
557 PunchingFailed(String),
559 ValidationFailed(String),
561 ValidationTimeout,
563 NetworkError(String),
565 ConfigError(String),
567 ProtocolError(String),
569 Timeout,
571 ConnectionFailed(String),
573 TraversalFailed(String),
575 PeerNotConnected,
577}
578
579impl Default for NatTraversalConfig {
580 fn default() -> Self {
581 Self {
582 role: EndpointRole::Client,
583 bootstrap_nodes: Vec::new(),
584 max_candidates: 8,
585 coordination_timeout: Duration::from_secs(10),
586 enable_symmetric_nat: true,
587 enable_relay_fallback: true,
588 max_concurrent_attempts: 3,
589 bind_addr: None,
590 prefer_rfc_nat_traversal: true, timeouts: crate::config::nat_timeouts::TimeoutConfig::default(),
592 }
593 }
594}
595
596impl ConfigValidator for NatTraversalConfig {
597 fn validate(&self) -> ValidationResult<()> {
598 use crate::config::validation::*;
599
600 match self.role {
602 EndpointRole::Client => {
603 if self.bootstrap_nodes.is_empty() {
604 return Err(ConfigValidationError::InvalidRole(
605 "Client endpoints require at least one bootstrap node".to_string(),
606 ));
607 }
608 }
609 EndpointRole::Server { can_coordinate } => {
610 if can_coordinate && self.bootstrap_nodes.is_empty() {
611 return Err(ConfigValidationError::InvalidRole(
612 "Server endpoints with coordination capability require bootstrap nodes"
613 .to_string(),
614 ));
615 }
616 }
617 EndpointRole::Bootstrap => {
618 }
620 }
621
622 if !self.bootstrap_nodes.is_empty() {
624 validate_bootstrap_nodes(&self.bootstrap_nodes)?;
625 }
626
627 validate_range(self.max_candidates, 1, 256, "max_candidates")?;
629
630 validate_duration(
632 self.coordination_timeout,
633 Duration::from_millis(100),
634 Duration::from_secs(300),
635 "coordination_timeout",
636 )?;
637
638 validate_range(
640 self.max_concurrent_attempts,
641 1,
642 16,
643 "max_concurrent_attempts",
644 )?;
645
646 if self.max_concurrent_attempts > self.max_candidates {
648 return Err(ConfigValidationError::IncompatibleConfiguration(
649 "max_concurrent_attempts cannot exceed max_candidates".to_string(),
650 ));
651 }
652
653 if self.role == EndpointRole::Bootstrap && self.enable_relay_fallback {
654 return Err(ConfigValidationError::IncompatibleConfiguration(
655 "Bootstrap nodes should not enable relay fallback".to_string(),
656 ));
657 }
658
659 Ok(())
660 }
661}
662
663impl NatTraversalEndpoint {
664 pub async fn new(
666 config: NatTraversalConfig,
667 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
668 ) -> Result<Self, NatTraversalError> {
669 Self::new_impl(config, event_callback).await
670 }
671
672 async fn new_impl(
674 config: NatTraversalConfig,
675 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
676 ) -> Result<Self, NatTraversalError> {
677 Self::new_common(config, event_callback).await
678 }
679
680 async fn new_common(
682 config: NatTraversalConfig,
683 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
684 ) -> Result<Self, NatTraversalError> {
685 Self::new_shared_logic(config, event_callback).await
687 }
688
689 async fn new_shared_logic(
691 config: NatTraversalConfig,
692 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
693 ) -> Result<Self, NatTraversalError> {
694 {
697 config
698 .validate()
699 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
700 }
701
702 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
706 config
707 .bootstrap_nodes
708 .iter()
709 .map(|&address| BootstrapNode {
710 address,
711 last_seen: std::time::Instant::now(),
712 can_coordinate: true, rtt: None,
714 coordination_count: 0,
715 })
716 .collect(),
717 ));
718
719 let discovery_config = DiscoveryConfig {
721 total_timeout: config.coordination_timeout,
722 max_candidates: config.max_candidates,
723 enable_symmetric_prediction: config.enable_symmetric_nat,
724 bound_address: config.bind_addr, ..DiscoveryConfig::default()
726 };
727
728 let nat_traversal_role = match config.role {
729 EndpointRole::Client => NatTraversalRole::Client,
730 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server {
731 can_relay: can_coordinate,
732 },
733 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
734 };
735
736 let discovery_manager = Arc::new(std::sync::Mutex::new(CandidateDiscoveryManager::new(
737 discovery_config,
738 )));
739
740 let (quinn_endpoint, event_tx, local_addr) =
743 Self::create_quinn_endpoint(&config, nat_traversal_role).await?;
744
745 {
747 let mut discovery = discovery_manager.lock().map_err(|_| {
748 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
749 })?;
750 discovery.set_bound_address(local_addr);
751 info!(
752 "Updated discovery manager with bound address: {}",
753 local_addr
754 );
755 }
756
757 let endpoint = Self {
758 quinn_endpoint: Some(quinn_endpoint.clone()),
759 config: config.clone(),
760 bootstrap_nodes,
761 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
762 discovery_manager,
763 event_callback,
764 shutdown: Arc::new(AtomicBool::new(false)),
765 event_tx: Some(event_tx.clone()),
766 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
767 local_peer_id: Self::generate_local_peer_id(),
768 timeout_config: config.timeouts.clone(),
769 };
770
771 if matches!(
773 config.role,
774 EndpointRole::Bootstrap | EndpointRole::Server { .. }
775 ) {
776 let endpoint_clone = quinn_endpoint.clone();
777 let shutdown_clone = endpoint.shutdown.clone();
778 let event_tx_clone = event_tx.clone();
779 let connections_clone = endpoint.connections.clone();
780
781 tokio::spawn(async move {
782 Self::accept_connections(
783 endpoint_clone,
784 shutdown_clone,
785 event_tx_clone,
786 connections_clone,
787 )
788 .await;
789 });
790
791 info!("Started accepting connections for {:?} role", config.role);
792 }
793
794 let discovery_manager_clone = endpoint.discovery_manager.clone();
796 let shutdown_clone = endpoint.shutdown.clone();
797 let event_tx_clone = event_tx;
798
799 tokio::spawn(async move {
800 Self::poll_discovery(discovery_manager_clone, shutdown_clone, event_tx_clone).await;
801 });
802
803 info!("Started discovery polling task");
804
805 {
807 let mut discovery = endpoint.discovery_manager.lock().map_err(|_| {
808 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
809 })?;
810
811 let local_peer_id = endpoint.local_peer_id;
813 let bootstrap_nodes = {
814 let nodes = endpoint.bootstrap_nodes.read().map_err(|_| {
815 NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string())
816 })?;
817 nodes.clone()
818 };
819
820 discovery
821 .start_discovery(local_peer_id, bootstrap_nodes)
822 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
823
824 info!(
825 "Started local candidate discovery for peer {:?}",
826 local_peer_id
827 );
828 }
829
830 Ok(endpoint)
831 }
832
833 pub fn get_quinn_endpoint(&self) -> Option<&crate::high_level::Endpoint> {
835 self.quinn_endpoint.as_ref()
836 }
837
838 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
840 self.event_callback.as_ref()
841 }
842
843 pub fn initiate_nat_traversal(
845 &self,
846 peer_id: PeerId,
847 coordinator: SocketAddr,
848 ) -> Result<(), NatTraversalError> {
849 info!(
850 "Starting NAT traversal to peer {:?} via coordinator {}",
851 peer_id, coordinator
852 );
853
854 let session = NatTraversalSession {
856 peer_id,
857 coordinator,
858 attempt: 1,
859 started_at: std::time::Instant::now(),
860 phase: TraversalPhase::Discovery,
861 candidates: Vec::new(),
862 session_state: SessionState {
863 state: ConnectionState::Connecting,
864 last_transition: std::time::Instant::now(),
865
866 connection: None,
867 active_attempts: Vec::new(),
868 metrics: ConnectionMetrics::default(),
869 },
870 };
871
872 {
874 let mut sessions = self
875 .active_sessions
876 .write()
877 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
878 sessions.insert(peer_id, session);
879 }
880
881 let bootstrap_nodes_vec = {
883 let bootstrap_nodes = self
884 .bootstrap_nodes
885 .read()
886 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
887 bootstrap_nodes.clone()
888 };
889
890 {
891 let mut discovery = self.discovery_manager.lock().map_err(|_| {
892 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
893 })?;
894
895 discovery
896 .start_discovery(peer_id, bootstrap_nodes_vec)
897 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
898 }
899
900 if let Some(ref callback) = self.event_callback {
902 callback(NatTraversalEvent::CoordinationRequested {
903 peer_id,
904 coordinator,
905 });
906 }
907
908 Ok(())
910 }
911
912 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
914 let mut updates = Vec::new();
915 let now = std::time::Instant::now();
916
917 let mut sessions = self
918 .active_sessions
919 .write()
920 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
921
922 for (peer_id, session) in sessions.iter_mut() {
923 let mut state_changed = false;
924
925 match session.session_state.state {
926 ConnectionState::Connecting => {
927 let elapsed = now.duration_since(session.session_state.last_transition);
929 if elapsed
930 > self
931 .timeout_config
932 .nat_traversal
933 .connection_establishment_timeout
934 {
935 session.session_state.state = ConnectionState::Closed;
936 session.session_state.last_transition = now;
937 state_changed = true;
938
939 updates.push(SessionStateUpdate {
940 peer_id: *peer_id,
941 old_state: ConnectionState::Connecting,
942 new_state: ConnectionState::Closed,
943 reason: StateChangeReason::Timeout,
944 });
945 }
946
947 if let Some(ref _connection) = session.session_state.connection {
950 session.session_state.state = ConnectionState::Connected;
951 session.session_state.last_transition = now;
952 state_changed = true;
953
954 updates.push(SessionStateUpdate {
955 peer_id: *peer_id,
956 old_state: ConnectionState::Connecting,
957 new_state: ConnectionState::Connected,
958 reason: StateChangeReason::ConnectionEstablished,
959 });
960 }
961 }
962 ConnectionState::Connected => {
963 {
966 }
969
970 session.session_state.metrics.last_activity = Some(now);
972 }
973 ConnectionState::Migrating => {
974 let elapsed = now.duration_since(session.session_state.last_transition);
976 if elapsed > Duration::from_secs(10) {
977 if session.session_state.connection.is_some() {
980 session.session_state.state = ConnectionState::Connected;
981 state_changed = true;
982
983 updates.push(SessionStateUpdate {
984 peer_id: *peer_id,
985 old_state: ConnectionState::Migrating,
986 new_state: ConnectionState::Connected,
987 reason: StateChangeReason::MigrationComplete,
988 });
989 } else {
990 session.session_state.state = ConnectionState::Closed;
991 state_changed = true;
992
993 updates.push(SessionStateUpdate {
994 peer_id: *peer_id,
995 old_state: ConnectionState::Migrating,
996 new_state: ConnectionState::Closed,
997 reason: StateChangeReason::MigrationFailed,
998 });
999 }
1000
1001 session.session_state.last_transition = now;
1002 }
1003 }
1004 _ => {}
1005 }
1006
1007 if state_changed {
1009 if let Some(ref callback) = self.event_callback {
1010 callback(NatTraversalEvent::SessionStateChanged {
1011 peer_id: *peer_id,
1012 new_state: session.session_state.state,
1013 });
1014 }
1015 }
1016 }
1017
1018 Ok(updates)
1019 }
1020
1021 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
1023 let sessions = self.active_sessions.clone();
1024 let shutdown = self.shutdown.clone();
1025 let timeout_config = self.timeout_config.clone();
1026
1027 tokio::spawn(async move {
1028 let mut ticker = tokio::time::interval(interval);
1029
1030 loop {
1031 ticker.tick().await;
1032
1033 if shutdown.load(Ordering::Relaxed) {
1034 break;
1035 }
1036
1037 let sessions_to_update = {
1039 match sessions.read() {
1040 Ok(sessions_guard) => {
1041 sessions_guard
1042 .iter()
1043 .filter_map(|(peer_id, session)| {
1044 let now = std::time::Instant::now();
1045 let elapsed =
1046 now.duration_since(session.session_state.last_transition);
1047
1048 match session.session_state.state {
1049 ConnectionState::Connecting => {
1050 if elapsed
1052 > timeout_config
1053 .nat_traversal
1054 .connection_establishment_timeout
1055 {
1056 Some((*peer_id, SessionUpdate::Timeout))
1057 } else {
1058 None
1059 }
1060 }
1061 ConnectionState::Connected => {
1062 if let Some(ref conn) = session.session_state.connection
1064 {
1065 if conn.close_reason().is_some() {
1066 Some((*peer_id, SessionUpdate::Disconnected))
1067 } else {
1068 Some((*peer_id, SessionUpdate::UpdateMetrics))
1070 }
1071 } else {
1072 Some((*peer_id, SessionUpdate::InvalidState))
1073 }
1074 }
1075 ConnectionState::Idle => {
1076 if elapsed
1078 > timeout_config
1079 .discovery
1080 .server_reflexive_cache_ttl
1081 {
1082 Some((*peer_id, SessionUpdate::Retry))
1083 } else {
1084 None
1085 }
1086 }
1087 ConnectionState::Migrating => {
1088 if elapsed > timeout_config.nat_traversal.probe_timeout
1090 {
1091 Some((*peer_id, SessionUpdate::MigrationTimeout))
1092 } else {
1093 None
1094 }
1095 }
1096 ConnectionState::Closed => {
1097 if elapsed
1099 > timeout_config.discovery.interface_cache_ttl
1100 {
1101 Some((*peer_id, SessionUpdate::Remove))
1102 } else {
1103 None
1104 }
1105 }
1106 }
1107 })
1108 .collect::<Vec<_>>()
1109 }
1110 _ => {
1111 vec![]
1112 }
1113 }
1114 };
1115
1116 if !sessions_to_update.is_empty() {
1118 if let Ok(mut sessions_guard) = sessions.write() {
1119 for (peer_id, update) in sessions_to_update {
1120 match update {
1121 SessionUpdate::Timeout => {
1122 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1123 session.session_state.state = ConnectionState::Closed;
1124 session.session_state.last_transition =
1125 std::time::Instant::now();
1126 tracing::warn!("Connection to {:?} timed out", peer_id);
1127 }
1128 }
1129 SessionUpdate::Disconnected => {
1130 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1131 session.session_state.state = ConnectionState::Closed;
1132 session.session_state.last_transition =
1133 std::time::Instant::now();
1134 session.session_state.connection = None;
1135 tracing::info!("Connection to {:?} closed", peer_id);
1136 }
1137 }
1138 SessionUpdate::UpdateMetrics => {
1139 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1140 if let Some(ref conn) = session.session_state.connection {
1141 let stats = conn.stats();
1143 session.session_state.metrics.rtt =
1144 Some(stats.path.rtt);
1145 session.session_state.metrics.loss_rate =
1146 stats.path.lost_packets as f64
1147 / stats.path.sent_packets.max(1) as f64;
1148 }
1149 }
1150 }
1151 SessionUpdate::InvalidState => {
1152 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1153 session.session_state.state = ConnectionState::Closed;
1154 session.session_state.last_transition =
1155 std::time::Instant::now();
1156 tracing::error!("Session {:?} in invalid state", peer_id);
1157 }
1158 }
1159 SessionUpdate::Retry => {
1160 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1161 session.session_state.state = ConnectionState::Connecting;
1162 session.session_state.last_transition =
1163 std::time::Instant::now();
1164 session.attempt += 1;
1165 tracing::info!(
1166 "Retrying connection to {:?} (attempt {})",
1167 peer_id,
1168 session.attempt
1169 );
1170 }
1171 }
1172 SessionUpdate::MigrationTimeout => {
1173 if let Some(session) = sessions_guard.get_mut(&peer_id) {
1174 session.session_state.state = ConnectionState::Closed;
1175 session.session_state.last_transition =
1176 std::time::Instant::now();
1177 tracing::warn!("Migration timeout for {:?}", peer_id);
1178 }
1179 }
1180 SessionUpdate::Remove => {
1181 sessions_guard.remove(&peer_id);
1182 tracing::debug!("Removed old session for {:?}", peer_id);
1183 }
1184 }
1185 }
1186 }
1187 }
1188 }
1189 })
1190 }
1191
1192 pub fn inject_observed_address(
1195 &self,
1196 observed_address: SocketAddr,
1197 _from_peer: PeerId,
1198 ) -> Result<(), NatTraversalError> {
1199 info!("Injecting observed address {}", observed_address);
1200
1201 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1203 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1204 })?;
1205
1206 let our_peer_id = self.local_peer_id;
1208
1209 match discovery.accept_quic_discovered_address(our_peer_id, observed_address) {
1211 Ok(()) => {
1212 info!(
1213 "Successfully accepted observed address: {}",
1214 observed_address
1215 );
1216
1217 if let Some(ref event_tx) = self.event_tx {
1219 let _ = event_tx.send(NatTraversalEvent::CandidateValidated {
1220 peer_id: our_peer_id,
1221 candidate_address: observed_address,
1222 });
1223 }
1224
1225 Ok(())
1226 }
1227 Err(e) => {
1228 warn!(
1229 "Failed to accept observed address {}: {}",
1230 observed_address, e
1231 );
1232 Err(NatTraversalError::CandidateDiscoveryFailed(e.to_string()))
1233 }
1234 }
1235 }
1236
1237 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
1239 let sessions = self
1240 .active_sessions
1241 .read()
1242 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1243 let bootstrap_nodes = self
1244 .bootstrap_nodes
1245 .read()
1246 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1247
1248 let avg_coordination_time = {
1250 let rtts: Vec<Duration> = bootstrap_nodes.iter().filter_map(|b| b.rtt).collect();
1251
1252 if rtts.is_empty() {
1253 Duration::from_millis(500) } else {
1255 let total_millis: u64 = rtts.iter().map(|d| d.as_millis() as u64).sum();
1256 Duration::from_millis(total_millis / rtts.len() as u64 * 2) }
1258 };
1259
1260 Ok(NatTraversalStatistics {
1261 active_sessions: sessions.len(),
1262 total_bootstrap_nodes: bootstrap_nodes.len(),
1263 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
1264 average_coordination_time: avg_coordination_time,
1265 total_attempts: 0,
1266 successful_connections: 0,
1267 direct_connections: 0,
1268 relayed_connections: 0,
1269 })
1270 }
1271
1272 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1274 let mut bootstrap_nodes = self
1275 .bootstrap_nodes
1276 .write()
1277 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1278
1279 if !bootstrap_nodes.iter().any(|b| b.address == address) {
1281 bootstrap_nodes.push(BootstrapNode {
1282 address,
1283 last_seen: std::time::Instant::now(),
1284 can_coordinate: true,
1285 rtt: None,
1286 coordination_count: 0,
1287 });
1288 info!("Added bootstrap node: {}", address);
1289 }
1290 Ok(())
1291 }
1292
1293 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
1295 let mut bootstrap_nodes = self
1296 .bootstrap_nodes
1297 .write()
1298 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1299 bootstrap_nodes.retain(|b| b.address != address);
1300 info!("Removed bootstrap node: {}", address);
1301 Ok(())
1302 }
1303
1304 async fn create_quinn_endpoint(
1308 config: &NatTraversalConfig,
1309 _nat_role: NatTraversalRole,
1310 ) -> Result<
1311 (
1312 QuinnEndpoint,
1313 mpsc::UnboundedSender<NatTraversalEvent>,
1314 SocketAddr,
1315 ),
1316 NatTraversalError,
1317 > {
1318 use std::sync::Arc;
1319
1320 let server_config = match config.role {
1322 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1323 let cert_config = CertificateConfig {
1325 common_name: format!("ant-quic-{}", config.role.name()),
1326 subject_alt_names: vec!["localhost".to_string(), "ant-quic-node".to_string()],
1327 self_signed: true, ..CertificateConfig::default()
1329 };
1330
1331 let cert_manager = CertificateManager::new(cert_config).map_err(|e| {
1332 NatTraversalError::ConfigError(format!(
1333 "Certificate manager creation failed: {e}"
1334 ))
1335 })?;
1336
1337 let cert_bundle = cert_manager.generate_certificate().map_err(|e| {
1338 NatTraversalError::ConfigError(format!("Certificate generation failed: {e}"))
1339 })?;
1340
1341 let rustls_config =
1342 cert_manager
1343 .create_server_config(&cert_bundle)
1344 .map_err(|e| {
1345 NatTraversalError::ConfigError(format!(
1346 "Server config creation failed: {e}"
1347 ))
1348 })?;
1349
1350 let server_crypto = QuicServerConfig::try_from(rustls_config.as_ref().clone())
1351 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1352
1353 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
1354
1355 let mut transport_config = TransportConfig::default();
1357 transport_config
1358 .keep_alive_interval(Some(config.timeouts.nat_traversal.retry_interval));
1359 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1360
1361 let nat_config = match config.role {
1366 EndpointRole::Client => {
1367 crate::transport_parameters::NatTraversalConfig::ClientSupport
1368 }
1369 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1370 crate::transport_parameters::NatTraversalConfig::ServerSupport {
1371 concurrency_limit: VarInt::from_u32(
1372 config.max_concurrent_attempts as u32,
1373 ),
1374 }
1375 }
1376 };
1377 transport_config.nat_traversal_config(Some(nat_config));
1378
1379 server_config.transport_config(Arc::new(transport_config));
1380
1381 Some(server_config)
1382 }
1383 _ => None,
1384 };
1385
1386 let client_config = {
1388 let cert_config = CertificateConfig {
1389 common_name: format!("ant-quic-{}", config.role.name()),
1390 subject_alt_names: vec!["localhost".to_string(), "ant-quic-node".to_string()],
1391 self_signed: true,
1392 ..CertificateConfig::default()
1393 };
1394
1395 let cert_manager = CertificateManager::new(cert_config).map_err(|e| {
1396 NatTraversalError::ConfigError(format!("Certificate manager creation failed: {e}"))
1397 })?;
1398
1399 let _cert_bundle = cert_manager.generate_certificate().map_err(|e| {
1400 NatTraversalError::ConfigError(format!("Certificate generation failed: {e}"))
1401 })?;
1402
1403 let rustls_config = cert_manager.create_client_config().map_err(|e| {
1404 NatTraversalError::ConfigError(format!("Client config creation failed: {e}"))
1405 })?;
1406
1407 let client_crypto = QuicClientConfig::try_from(rustls_config.as_ref().clone())
1408 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1409
1410 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
1411
1412 let mut transport_config = TransportConfig::default();
1414 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
1415 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1416
1417 let nat_config = match config.role {
1422 EndpointRole::Client => {
1423 crate::transport_parameters::NatTraversalConfig::ClientSupport
1424 }
1425 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
1426 crate::transport_parameters::NatTraversalConfig::ServerSupport {
1427 concurrency_limit: VarInt::from_u32(config.max_concurrent_attempts as u32),
1428 }
1429 }
1430 };
1431 transport_config.nat_traversal_config(Some(nat_config));
1432
1433 client_config.transport_config(Arc::new(transport_config));
1434
1435 client_config
1436 };
1437
1438 let bind_addr = config.bind_addr.unwrap_or("0.0.0.0:0".parse().unwrap());
1440 let socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1441 NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {e}"))
1442 })?;
1443
1444 info!("Binding endpoint to {}", bind_addr);
1445
1446 let std_socket = socket.into_std().map_err(|e| {
1448 NatTraversalError::NetworkError(format!("Failed to convert socket: {e}"))
1449 })?;
1450
1451 let mut endpoint = QuinnEndpoint::new(
1453 EndpointConfig::default(),
1454 server_config,
1455 std_socket,
1456 Arc::new(TokioRuntime),
1457 )
1458 .map_err(|e| {
1459 NatTraversalError::ConfigError(format!("Failed to create Quinn endpoint: {e}"))
1460 })?;
1461
1462 endpoint.set_default_client_config(client_config);
1464
1465 let local_addr = endpoint.local_addr().map_err(|e| {
1467 NatTraversalError::NetworkError(format!("Failed to get local address: {e}"))
1468 })?;
1469
1470 info!("Endpoint bound to actual address: {}", local_addr);
1471
1472 let (event_tx, _event_rx) = mpsc::unbounded_channel();
1474
1475 Ok((endpoint, event_tx, local_addr))
1476 }
1477
1478 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1480 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1481 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1482 })?;
1483
1484 let _socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1486 NatTraversalError::NetworkError(format!("Failed to bind to {bind_addr}: {e}"))
1487 })?;
1488
1489 info!("Started listening on {}", bind_addr);
1490
1491 let endpoint_clone = endpoint.clone();
1493 let shutdown_clone = self.shutdown.clone();
1494 let event_tx = self.event_tx.as_ref().unwrap().clone();
1495 let connections_clone = self.connections.clone();
1496
1497 tokio::spawn(async move {
1498 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx, connections_clone)
1499 .await;
1500 });
1501
1502 Ok(())
1503 }
1504
1505 async fn accept_connections(
1507 endpoint: QuinnEndpoint,
1508 shutdown: Arc<AtomicBool>,
1509 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1510 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
1511 ) {
1512 while !shutdown.load(Ordering::Relaxed) {
1513 match endpoint.accept().await {
1514 Some(connecting) => {
1515 let event_tx = event_tx.clone();
1516 let connections = connections.clone();
1517 tokio::spawn(async move {
1518 match connecting.await {
1519 Ok(connection) => {
1520 info!("Accepted connection from {}", connection.remote_address());
1521
1522 let peer_id = Self::generate_peer_id_from_address(
1524 connection.remote_address(),
1525 );
1526
1527 if let Ok(mut conns) = connections.write() {
1529 conns.insert(peer_id, connection.clone());
1530 }
1531
1532 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1533 peer_id,
1534 remote_address: connection.remote_address(),
1535 });
1536
1537 Self::handle_connection(connection, event_tx).await;
1539 }
1540 Err(e) => {
1541 debug!("Connection failed: {}", e);
1542 }
1543 }
1544 });
1545 }
1546 None => {
1547 break;
1549 }
1550 }
1551 }
1552 }
1553
1554 async fn poll_discovery(
1556 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1557 shutdown: Arc<AtomicBool>,
1558 _event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1559 ) {
1560 use tokio::time::{Duration, interval};
1561
1562 let mut poll_interval = interval(Duration::from_millis(100));
1563
1564 while !shutdown.load(Ordering::Relaxed) {
1565 poll_interval.tick().await;
1566
1567 let events = match discovery_manager.lock() {
1569 Ok(mut discovery) => discovery.poll(std::time::Instant::now()),
1570 Err(e) => {
1571 error!("Failed to lock discovery manager: {}", e);
1572 continue;
1573 }
1574 };
1575
1576 for event in events {
1578 match event {
1579 DiscoveryEvent::DiscoveryStarted {
1580 peer_id,
1581 bootstrap_count,
1582 } => {
1583 debug!(
1584 "Discovery started for peer {:?} with {} bootstrap nodes",
1585 peer_id, bootstrap_count
1586 );
1587 }
1588 DiscoveryEvent::LocalScanningStarted => {
1589 debug!("Local interface scanning started");
1590 }
1591 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1592 debug!("Discovered local candidate: {}", candidate.address);
1593 }
1596 DiscoveryEvent::LocalScanningCompleted {
1597 candidate_count,
1598 duration,
1599 } => {
1600 debug!(
1601 "Local interface scanning completed: {} candidates in {:?}",
1602 candidate_count, duration
1603 );
1604 }
1605 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1606 debug!(
1607 "Server reflexive discovery started with {} bootstrap nodes",
1608 bootstrap_count
1609 );
1610 }
1611 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1612 candidate,
1613 bootstrap_node,
1614 } => {
1615 debug!(
1616 "Discovered server-reflexive candidate {} via bootstrap {}",
1617 candidate.address, bootstrap_node
1618 );
1619 }
1621 DiscoveryEvent::BootstrapQueryFailed {
1622 bootstrap_node,
1623 error,
1624 } => {
1625 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1626 }
1627 DiscoveryEvent::SymmetricPredictionStarted { base_address } => {
1628 debug!(
1629 "Symmetric NAT prediction started from base address {}",
1630 base_address
1631 );
1632 }
1633 DiscoveryEvent::PredictedCandidateGenerated {
1634 candidate,
1635 confidence,
1636 } => {
1637 debug!(
1638 "Predicted symmetric NAT candidate {} with confidence {}",
1639 candidate.address, confidence
1640 );
1641 }
1643 DiscoveryEvent::PortAllocationDetected {
1644 port,
1645 source_address,
1646 bootstrap_node,
1647 timestamp,
1648 } => {
1649 debug!(
1650 "Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1651 port, source_address, bootstrap_node, timestamp
1652 );
1653 }
1654 DiscoveryEvent::DiscoveryCompleted {
1655 candidate_count,
1656 total_duration,
1657 success_rate,
1658 } => {
1659 info!(
1660 "Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1661 candidate_count,
1662 total_duration,
1663 success_rate * 100.0
1664 );
1665 }
1668 DiscoveryEvent::DiscoveryFailed {
1669 error,
1670 partial_results,
1671 } => {
1672 warn!(
1673 "Discovery failed: {} (found {} partial candidates)",
1674 error,
1675 partial_results.len()
1676 );
1677
1678 }
1683 DiscoveryEvent::PathValidationRequested {
1684 candidate_id,
1685 candidate_address,
1686 challenge_token,
1687 } => {
1688 debug!(
1689 "PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1690 candidate_id.0, candidate_address, challenge_token
1691 );
1692 }
1695 DiscoveryEvent::PathValidationResponse {
1696 candidate_id,
1697 candidate_address,
1698 challenge_token: _,
1699 rtt,
1700 } => {
1701 debug!(
1702 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1703 candidate_id.0, candidate_address, rtt
1704 );
1705 }
1707 }
1708 }
1709 }
1710
1711 info!("Discovery polling task shutting down");
1712 }
1713
1714 async fn handle_connection(
1716 connection: QuinnConnection,
1717 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1718 ) {
1719 let peer_id = Self::generate_peer_id_from_address(connection.remote_address());
1720 let remote_address = connection.remote_address();
1721
1722 debug!(
1723 "Handling connection from peer {:?} at {}",
1724 peer_id, remote_address
1725 );
1726
1727 loop {
1729 tokio::select! {
1730 stream = connection.accept_bi() => {
1731 match stream {
1732 Ok((send, recv)) => {
1733 tokio::spawn(async move {
1734 Self::handle_bi_stream(send, recv).await;
1735 });
1736 }
1737 Err(e) => {
1738 debug!("Error accepting bidirectional stream: {}", e);
1739 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1740 peer_id,
1741 reason: format!("Stream error: {e}"),
1742 });
1743 break;
1744 }
1745 }
1746 }
1747 stream = connection.accept_uni() => {
1748 match stream {
1749 Ok(recv) => {
1750 tokio::spawn(async move {
1751 Self::handle_uni_stream(recv).await;
1752 });
1753 }
1754 Err(e) => {
1755 debug!("Error accepting unidirectional stream: {}", e);
1756 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1757 peer_id,
1758 reason: format!("Stream error: {e}"),
1759 });
1760 break;
1761 }
1762 }
1763 }
1764 }
1765 }
1766 }
1767
1768 async fn handle_bi_stream(
1770 _send: crate::high_level::SendStream,
1771 _recv: crate::high_level::RecvStream,
1772 ) {
1773 }
1802
1803 async fn handle_uni_stream(mut recv: crate::high_level::RecvStream) {
1805 let mut buffer = vec![0u8; 1024];
1806
1807 loop {
1808 match recv.read(&mut buffer).await {
1809 Ok(Some(size)) => {
1810 debug!("Received {} bytes on unidirectional stream", size);
1811 }
1813 Ok(None) => {
1814 debug!("Unidirectional stream closed by peer");
1815 break;
1816 }
1817 Err(e) => {
1818 debug!("Error reading from unidirectional stream: {}", e);
1819 break;
1820 }
1821 }
1822 }
1823 }
1824
1825 pub async fn connect_to_peer(
1827 &self,
1828 peer_id: PeerId,
1829 server_name: &str,
1830 remote_addr: SocketAddr,
1831 ) -> Result<QuinnConnection, NatTraversalError> {
1832 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1833 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1834 })?;
1835
1836 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1837
1838 let connecting = endpoint.connect(remote_addr, server_name).map_err(|e| {
1840 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
1841 })?;
1842
1843 let connection = timeout(
1844 self.timeout_config
1845 .nat_traversal
1846 .connection_establishment_timeout,
1847 connecting,
1848 )
1849 .await
1850 .map_err(|_| NatTraversalError::Timeout)?
1851 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
1852
1853 info!(
1854 "Successfully connected to peer {:?} at {}",
1855 peer_id, remote_addr
1856 );
1857
1858 if let Some(ref event_tx) = self.event_tx {
1860 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1861 peer_id,
1862 remote_address: remote_addr,
1863 });
1864 }
1865
1866 Ok(connection)
1867 }
1868
1869 pub async fn accept_connection(&self) -> Result<(PeerId, QuinnConnection), NatTraversalError> {
1871 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1872 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1873 })?;
1874
1875 let incoming = endpoint
1877 .accept()
1878 .await
1879 .ok_or_else(|| NatTraversalError::NetworkError("Endpoint closed".to_string()))?;
1880
1881 let remote_addr = incoming.remote_address();
1882 info!("Accepting connection from {}", remote_addr);
1883
1884 let connection = incoming.await.map_err(|e| {
1886 NatTraversalError::ConnectionFailed(format!("Failed to accept connection: {e}"))
1887 })?;
1888
1889 let peer_id = self
1891 .extract_peer_id_from_connection(&connection)
1892 .await
1893 .unwrap_or_else(|| Self::generate_peer_id_from_address(remote_addr));
1894
1895 {
1897 let mut connections = self.connections.write().map_err(|_| {
1898 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1899 })?;
1900 connections.insert(peer_id, connection.clone());
1901 }
1902
1903 info!(
1904 "Connection accepted from peer {:?} at {}",
1905 peer_id, remote_addr
1906 );
1907
1908 if let Some(ref event_tx) = self.event_tx {
1910 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1911 peer_id,
1912 remote_address: remote_addr,
1913 });
1914 }
1915
1916 Ok((peer_id, connection))
1917 }
1918
1919 pub fn local_peer_id(&self) -> PeerId {
1921 self.local_peer_id
1922 }
1923
1924 pub fn get_connection(
1926 &self,
1927 peer_id: &PeerId,
1928 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
1929 let connections = self.connections.read().map_err(|_| {
1930 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1931 })?;
1932 Ok(connections.get(peer_id).cloned())
1933 }
1934
1935 pub fn remove_connection(
1937 &self,
1938 peer_id: &PeerId,
1939 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
1940 let mut connections = self.connections.write().map_err(|_| {
1941 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1942 })?;
1943 Ok(connections.remove(peer_id))
1944 }
1945
1946 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
1948 let connections = self.connections.read().map_err(|_| {
1949 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1950 })?;
1951 let mut result = Vec::new();
1952 for (peer_id, connection) in connections.iter() {
1953 result.push((*peer_id, connection.remote_address()));
1954 }
1955 Ok(result)
1956 }
1957
1958 pub async fn handle_connection_data(
1960 &self,
1961 peer_id: PeerId,
1962 connection: &QuinnConnection,
1963 ) -> Result<(), NatTraversalError> {
1964 info!("Handling connection data from peer {:?}", peer_id);
1965
1966 let connection_clone = connection.clone();
1968 let peer_id_clone = peer_id;
1969 tokio::spawn(async move {
1970 loop {
1971 match connection_clone.accept_bi().await {
1972 Ok((send, recv)) => {
1973 debug!(
1974 "Accepted bidirectional stream from peer {:?}",
1975 peer_id_clone
1976 );
1977 tokio::spawn(Self::handle_bi_stream(send, recv));
1978 }
1979 Err(ConnectionError::ApplicationClosed(_)) => {
1980 debug!("Connection closed by peer {:?}", peer_id_clone);
1981 break;
1982 }
1983 Err(e) => {
1984 debug!(
1985 "Error accepting bidirectional stream from peer {:?}: {}",
1986 peer_id_clone, e
1987 );
1988 break;
1989 }
1990 }
1991 }
1992 });
1993
1994 let connection_clone = connection.clone();
1996 let peer_id_clone = peer_id;
1997 tokio::spawn(async move {
1998 loop {
1999 match connection_clone.accept_uni().await {
2000 Ok(recv) => {
2001 debug!(
2002 "Accepted unidirectional stream from peer {:?}",
2003 peer_id_clone
2004 );
2005 tokio::spawn(Self::handle_uni_stream(recv));
2006 }
2007 Err(ConnectionError::ApplicationClosed(_)) => {
2008 debug!("Connection closed by peer {:?}", peer_id_clone);
2009 break;
2010 }
2011 Err(e) => {
2012 debug!(
2013 "Error accepting unidirectional stream from peer {:?}: {}",
2014 peer_id_clone, e
2015 );
2016 break;
2017 }
2018 }
2019 }
2020 });
2021
2022 Ok(())
2023 }
2024
2025 fn generate_local_peer_id() -> PeerId {
2027 use std::collections::hash_map::DefaultHasher;
2028 use std::hash::{Hash, Hasher};
2029 use std::time::SystemTime;
2030
2031 let mut hasher = DefaultHasher::new();
2032 SystemTime::now().hash(&mut hasher);
2033 std::process::id().hash(&mut hasher);
2034
2035 let hash = hasher.finish();
2036 let mut peer_id = [0u8; 32];
2037 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2038
2039 for i in 8..32 {
2041 peer_id[i] = rand::random();
2042 }
2043
2044 PeerId(peer_id)
2045 }
2046
2047 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
2053 use std::collections::hash_map::DefaultHasher;
2054 use std::hash::{Hash, Hasher};
2055
2056 let mut hasher = DefaultHasher::new();
2057 addr.hash(&mut hasher);
2058
2059 let hash = hasher.finish();
2060 let mut peer_id = [0u8; 32];
2061 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
2062
2063 for i in 8..32 {
2066 peer_id[i] = rand::random();
2067 }
2068
2069 warn!(
2070 "Generated temporary peer ID from address {}. This ID is not persistent!",
2071 addr
2072 );
2073 PeerId(peer_id)
2074 }
2075
2076 async fn extract_peer_id_from_connection(
2078 &self,
2079 connection: &QuinnConnection,
2080 ) -> Option<PeerId> {
2081 if let Some(identity) = connection.peer_identity() {
2083 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
2085 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
2087 Ok(peer_id) => {
2088 debug!("Derived peer ID from Ed25519 public key");
2089 return Some(peer_id);
2090 }
2091 Err(e) => {
2092 warn!("Failed to derive peer ID from public key: {}", e);
2093 }
2094 }
2095 }
2096 }
2098
2099 None
2100 }
2101
2102 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
2104 self.shutdown.store(true, Ordering::Relaxed);
2106
2107 {
2109 let mut connections = self.connections.write().map_err(|_| {
2110 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2111 })?;
2112 for (peer_id, connection) in connections.drain() {
2113 info!("Closing connection to peer {:?}", peer_id);
2114 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
2115 }
2116 }
2117
2118 if let Some(ref endpoint) = self.quinn_endpoint {
2120 endpoint.wait_idle().await;
2121 }
2122
2123 info!("NAT traversal endpoint shutdown completed");
2124 Ok(())
2125 }
2126
2127 pub async fn discover_candidates(
2129 &self,
2130 peer_id: PeerId,
2131 ) -> Result<Vec<CandidateAddress>, NatTraversalError> {
2132 debug!("Discovering address candidates for peer {:?}", peer_id);
2133
2134 let mut candidates = Vec::new();
2135
2136 let bootstrap_nodes = {
2138 let nodes = self
2139 .bootstrap_nodes
2140 .read()
2141 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2142 nodes.clone()
2143 };
2144
2145 {
2147 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2148 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2149 })?;
2150
2151 discovery
2152 .start_discovery(peer_id, bootstrap_nodes)
2153 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
2154 }
2155
2156 let timeout_duration = self.config.coordination_timeout;
2158 let start_time = std::time::Instant::now();
2159
2160 while start_time.elapsed() < timeout_duration {
2161 let discovery_events = {
2162 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2163 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2164 })?;
2165 discovery.poll(std::time::Instant::now())
2166 };
2167
2168 for event in discovery_events {
2169 match event {
2170 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2171 candidates.push(candidate.clone());
2172
2173 self.send_candidate_advertisement(peer_id, &candidate)
2175 .await
2176 .unwrap_or_else(|e| {
2177 debug!("Failed to send candidate advertisement: {}", e)
2178 });
2179 }
2180 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
2181 candidates.push(candidate.clone());
2182
2183 self.send_candidate_advertisement(peer_id, &candidate)
2185 .await
2186 .unwrap_or_else(|e| {
2187 debug!("Failed to send candidate advertisement: {}", e)
2188 });
2189 }
2190 DiscoveryEvent::PredictedCandidateGenerated { candidate, .. } => {
2191 candidates.push(candidate.clone());
2192
2193 self.send_candidate_advertisement(peer_id, &candidate)
2195 .await
2196 .unwrap_or_else(|e| {
2197 debug!("Failed to send candidate advertisement: {}", e)
2198 });
2199 }
2200 DiscoveryEvent::DiscoveryCompleted { .. } => {
2201 return Ok(candidates);
2203 }
2204 DiscoveryEvent::DiscoveryFailed {
2205 error,
2206 partial_results,
2207 } => {
2208 candidates.extend(partial_results);
2210 if candidates.is_empty() {
2211 return Err(NatTraversalError::CandidateDiscoveryFailed(
2212 error.to_string(),
2213 ));
2214 }
2215 return Ok(candidates);
2216 }
2217 _ => {}
2218 }
2219 }
2220
2221 sleep(Duration::from_millis(10)).await;
2223 }
2224
2225 if candidates.is_empty() {
2226 Err(NatTraversalError::NoCandidatesFound)
2227 } else {
2228 Ok(candidates)
2229 }
2230 }
2231
2232 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
2234 let mut frame = Vec::new();
2242
2243 frame.push(0x41);
2245
2246 frame.extend_from_slice(&peer_id.0);
2248
2249 let timestamp = std::time::SystemTime::now()
2251 .duration_since(std::time::UNIX_EPOCH)
2252 .unwrap_or_default()
2253 .as_millis() as u64;
2254 frame.extend_from_slice(×tamp.to_be_bytes());
2255
2256 let mut token = [0u8; 16];
2258 for byte in &mut token {
2259 *byte = rand::random();
2260 }
2261 frame.extend_from_slice(&token);
2262
2263 Ok(frame)
2264 }
2265
2266 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
2267 debug!("Attempting hole punching for peer {:?}", peer_id);
2268
2269 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
2271
2272 if candidate_pairs.is_empty() {
2273 return Err(NatTraversalError::NoCandidatesFound);
2274 }
2275
2276 info!(
2277 "Generated {} candidate pairs for hole punching with peer {:?}",
2278 candidate_pairs.len(),
2279 peer_id
2280 );
2281
2282 self.attempt_quinn_hole_punching(peer_id, candidate_pairs)
2285 }
2286
2287 fn get_candidate_pairs_for_peer(
2289 &self,
2290 peer_id: PeerId,
2291 ) -> Result<Vec<CandidatePair>, NatTraversalError> {
2292 let discovery_candidates = {
2294 let discovery = self.discovery_manager.lock().map_err(|_| {
2295 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2296 })?;
2297
2298 discovery.get_candidates_for_peer(peer_id)
2299 };
2300
2301 if discovery_candidates.is_empty() {
2302 return Err(NatTraversalError::NoCandidatesFound);
2303 }
2304
2305 let mut candidate_pairs = Vec::new();
2307 let local_candidates = discovery_candidates
2308 .iter()
2309 .filter(|c| matches!(c.source, CandidateSource::Local))
2310 .collect::<Vec<_>>();
2311 let remote_candidates = discovery_candidates
2312 .iter()
2313 .filter(|c| !matches!(c.source, CandidateSource::Local))
2314 .collect::<Vec<_>>();
2315
2316 for local in &local_candidates {
2318 for remote in &remote_candidates {
2319 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
2320 candidate_pairs.push(CandidatePair {
2321 local_candidate: (*local).clone(),
2322 remote_candidate: (*remote).clone(),
2323 priority: pair_priority,
2324 state: CandidatePairState::Waiting,
2325 });
2326 }
2327 }
2328
2329 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
2331
2332 candidate_pairs.truncate(8);
2334
2335 Ok(candidate_pairs)
2336 }
2337
2338 fn calculate_candidate_pair_priority(
2340 &self,
2341 local: &CandidateAddress,
2342 remote: &CandidateAddress,
2343 ) -> u64 {
2344 let local_type_preference = match local.source {
2348 CandidateSource::Local => 126,
2349 CandidateSource::Observed { .. } => 100,
2350 CandidateSource::Predicted => 75,
2351 CandidateSource::Peer => 50,
2352 };
2353
2354 let remote_type_preference = match remote.source {
2355 CandidateSource::Local => 126,
2356 CandidateSource::Observed { .. } => 100,
2357 CandidateSource::Predicted => 75,
2358 CandidateSource::Peer => 50,
2359 };
2360
2361 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
2363 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
2364
2365 let min_priority = local_priority.min(remote_priority);
2366 let max_priority = local_priority.max(remote_priority);
2367
2368 (min_priority << 32)
2369 | (max_priority << 1)
2370 | if local_priority > remote_priority {
2371 1
2372 } else {
2373 0
2374 }
2375 }
2376
2377 fn attempt_quinn_hole_punching(
2379 &self,
2380 peer_id: PeerId,
2381 candidate_pairs: Vec<CandidatePair>,
2382 ) -> Result<(), NatTraversalError> {
2383 let _endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2384 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2385 })?;
2386
2387 for pair in candidate_pairs {
2388 debug!(
2389 "Attempting hole punch with candidate pair: {} -> {}",
2390 pair.local_candidate.address, pair.remote_candidate.address
2391 );
2392
2393 let mut challenge_data = [0u8; 8];
2395 for byte in &mut challenge_data {
2396 *byte = rand::random();
2397 }
2398
2399 let local_socket =
2401 std::net::UdpSocket::bind(pair.local_candidate.address).map_err(|e| {
2402 NatTraversalError::NetworkError(format!(
2403 "Failed to bind to local candidate: {e}"
2404 ))
2405 })?;
2406
2407 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
2409
2410 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
2412 Ok(bytes_sent) => {
2413 debug!(
2414 "Sent {} bytes for hole punch from {} to {}",
2415 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address
2416 );
2417
2418 local_socket
2420 .set_read_timeout(Some(Duration::from_millis(100)))
2421 .map_err(|e| {
2422 NatTraversalError::NetworkError(format!("Failed to set timeout: {e}"))
2423 })?;
2424
2425 let mut response_buffer = [0u8; 1024];
2427 match local_socket.recv_from(&mut response_buffer) {
2428 Ok((_bytes_received, response_addr)) => {
2429 if response_addr == pair.remote_candidate.address {
2430 info!(
2431 "Hole punch succeeded for peer {:?}: {} <-> {}",
2432 peer_id,
2433 pair.local_candidate.address,
2434 pair.remote_candidate.address
2435 );
2436
2437 self.store_successful_candidate_pair(peer_id, pair)?;
2439 return Ok(());
2440 } else {
2441 debug!(
2442 "Received response from unexpected address: {}",
2443 response_addr
2444 );
2445 }
2446 }
2447 Err(e)
2448 if e.kind() == std::io::ErrorKind::WouldBlock
2449 || e.kind() == std::io::ErrorKind::TimedOut =>
2450 {
2451 debug!("No response received for hole punch attempt");
2452 }
2453 Err(e) => {
2454 debug!("Error receiving hole punch response: {}", e);
2455 }
2456 }
2457 }
2458 Err(e) => {
2459 debug!("Failed to send hole punch packet: {}", e);
2460 }
2461 }
2462 }
2463
2464 Err(NatTraversalError::HolePunchingFailed)
2466 }
2467
2468 fn create_path_challenge_packet(
2470 &self,
2471 challenge_data: [u8; 8],
2472 ) -> Result<Vec<u8>, NatTraversalError> {
2473 let mut packet = Vec::new();
2476
2477 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
2486 }
2487
2488 fn store_successful_candidate_pair(
2490 &self,
2491 peer_id: PeerId,
2492 pair: CandidatePair,
2493 ) -> Result<(), NatTraversalError> {
2494 debug!(
2495 "Storing successful candidate pair for peer {:?}: {} <-> {}",
2496 peer_id, pair.local_candidate.address, pair.remote_candidate.address
2497 );
2498
2499 if let Some(ref callback) = self.event_callback {
2504 callback(NatTraversalEvent::PathValidated {
2505 peer_id,
2506 address: pair.remote_candidate.address,
2507 rtt: Duration::from_millis(50), });
2509
2510 callback(NatTraversalEvent::TraversalSucceeded {
2511 peer_id,
2512 final_address: pair.remote_candidate.address,
2513 total_time: Duration::from_secs(1), });
2515 }
2516
2517 Ok(())
2518 }
2519
2520 fn attempt_connection_to_candidate(
2522 &self,
2523 peer_id: PeerId,
2524 candidate: &CandidateAddress,
2525 ) -> Result<(), NatTraversalError> {
2526 {
2527 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2528 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2529 })?;
2530
2531 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
2533
2534 debug!(
2535 "Attempting Quinn connection to candidate {} for peer {:?}",
2536 candidate.address, peer_id
2537 );
2538
2539 match endpoint.connect(candidate.address, &server_name) {
2541 Ok(connecting) => {
2542 info!(
2543 "Connection attempt initiated to {} for peer {:?}",
2544 candidate.address, peer_id
2545 );
2546
2547 if let Some(event_tx) = &self.event_tx {
2549 let event_tx = event_tx.clone();
2550 let connections = self.connections.clone();
2551 let peer_id_clone = peer_id;
2552 let address = candidate.address;
2553
2554 tokio::spawn(async move {
2555 match connecting.await {
2556 Ok(connection) => {
2557 info!(
2558 "Successfully connected to {} for peer {:?}",
2559 address, peer_id_clone
2560 );
2561
2562 if let Ok(mut conns) = connections.write() {
2564 conns.insert(peer_id_clone, connection.clone());
2565 }
2566
2567 let _ =
2569 event_tx.send(NatTraversalEvent::ConnectionEstablished {
2570 peer_id: peer_id_clone,
2571 remote_address: address,
2572 });
2573
2574 Self::handle_connection(connection, event_tx).await;
2576 }
2577 Err(e) => {
2578 warn!("Connection to {} failed: {}", address, e);
2579 }
2580 }
2581 });
2582 }
2583
2584 Ok(())
2585 }
2586 Err(e) => {
2587 warn!(
2588 "Failed to initiate connection to {}: {}",
2589 candidate.address, e
2590 );
2591 Err(NatTraversalError::ConnectionFailed(format!(
2592 "Failed to connect to {}: {}",
2593 candidate.address, e
2594 )))
2595 }
2596 }
2597 }
2598 }
2599
2600 pub fn poll(
2602 &self,
2603 now: std::time::Instant,
2604 ) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
2605 let mut events = Vec::new();
2606
2607 self.check_connections_for_observed_addresses(&mut events)?;
2609
2610 {
2612 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2613 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2614 })?;
2615
2616 let discovery_events = discovery.poll(now);
2617
2618 for discovery_event in discovery_events {
2620 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2621 events.push(nat_event.clone());
2622
2623 if let Some(ref callback) = self.event_callback {
2625 callback(nat_event.clone());
2626 }
2627
2628 if let NatTraversalEvent::CandidateDiscovered {
2630 peer_id: _,
2631 candidate: _,
2632 } = &nat_event
2633 {
2634 }
2637 }
2638 }
2639 }
2640
2641 let mut sessions = self
2643 .active_sessions
2644 .write()
2645 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2646
2647 for (_peer_id, session) in sessions.iter_mut() {
2648 let elapsed = now.duration_since(session.started_at);
2649
2650 let timeout = self.get_phase_timeout(session.phase);
2652
2653 if elapsed > timeout {
2655 match session.phase {
2656 TraversalPhase::Discovery => {
2657 let discovered_candidates = {
2659 let discovery = self.discovery_manager.lock().map_err(|_| {
2660 NatTraversalError::ProtocolError(
2661 "Discovery manager lock poisoned".to_string(),
2662 )
2663 });
2664 match discovery {
2665 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
2666 Err(_) => Vec::new(),
2667 }
2668 };
2669
2670 session.candidates = discovered_candidates.clone();
2672
2673 if !session.candidates.is_empty() {
2675 session.phase = TraversalPhase::Coordination;
2677 let event = NatTraversalEvent::PhaseTransition {
2678 peer_id: session.peer_id,
2679 from_phase: TraversalPhase::Discovery,
2680 to_phase: TraversalPhase::Coordination,
2681 };
2682 events.push(event.clone());
2683 if let Some(ref callback) = self.event_callback {
2684 callback(event);
2685 }
2686 info!(
2687 "Peer {:?} advanced from Discovery to Coordination with {} candidates",
2688 session.peer_id,
2689 session.candidates.len()
2690 );
2691 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
2692 session.attempt += 1;
2694 session.started_at = now;
2695 let backoff_duration = self.calculate_backoff(session.attempt);
2696 warn!(
2697 "Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
2698 session.peer_id, session.attempt, backoff_duration
2699 );
2700 } else {
2701 session.phase = TraversalPhase::Failed;
2703 let event = NatTraversalEvent::TraversalFailed {
2704 peer_id: session.peer_id,
2705 error: NatTraversalError::NoCandidatesFound,
2706 fallback_available: self.config.enable_relay_fallback,
2707 };
2708 events.push(event.clone());
2709 if let Some(ref callback) = self.event_callback {
2710 callback(event);
2711 }
2712 error!(
2713 "NAT traversal failed for peer {:?}: no candidates found after {} attempts",
2714 session.peer_id, session.attempt
2715 );
2716 }
2717 }
2718 TraversalPhase::Coordination => {
2719 if let Some(coordinator) = self.select_coordinator() {
2721 match self.send_coordination_request(session.peer_id, coordinator) {
2722 Ok(_) => {
2723 session.phase = TraversalPhase::Synchronization;
2724 let event = NatTraversalEvent::CoordinationRequested {
2725 peer_id: session.peer_id,
2726 coordinator,
2727 };
2728 events.push(event.clone());
2729 if let Some(ref callback) = self.event_callback {
2730 callback(event);
2731 }
2732 info!(
2733 "Coordination requested for peer {:?} via {}",
2734 session.peer_id, coordinator
2735 );
2736 }
2737 Err(e) => {
2738 self.handle_phase_failure(session, now, &mut events, e);
2739 }
2740 }
2741 } else {
2742 self.handle_phase_failure(
2743 session,
2744 now,
2745 &mut events,
2746 NatTraversalError::NoBootstrapNodes,
2747 );
2748 }
2749 }
2750 TraversalPhase::Synchronization => {
2751 if self.is_peer_synchronized(&session.peer_id) {
2753 session.phase = TraversalPhase::Punching;
2754 let event = NatTraversalEvent::HolePunchingStarted {
2755 peer_id: session.peer_id,
2756 targets: session.candidates.iter().map(|c| c.address).collect(),
2757 };
2758 events.push(event.clone());
2759 if let Some(ref callback) = self.event_callback {
2760 callback(event);
2761 }
2762 if let Err(e) =
2764 self.initiate_hole_punching(session.peer_id, &session.candidates)
2765 {
2766 self.handle_phase_failure(session, now, &mut events, e);
2767 }
2768 } else {
2769 self.handle_phase_failure(
2770 session,
2771 now,
2772 &mut events,
2773 NatTraversalError::ProtocolError(
2774 "Synchronization timeout".to_string(),
2775 ),
2776 );
2777 }
2778 }
2779 TraversalPhase::Punching => {
2780 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
2782 session.phase = TraversalPhase::Validation;
2783 let event = NatTraversalEvent::PathValidated {
2784 peer_id: session.peer_id,
2785 address: successful_path,
2786 rtt: Duration::from_millis(50), };
2788 events.push(event.clone());
2789 if let Some(ref callback) = self.event_callback {
2790 callback(event);
2791 }
2792 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
2794 self.handle_phase_failure(session, now, &mut events, e);
2795 }
2796 } else {
2797 self.handle_phase_failure(
2798 session,
2799 now,
2800 &mut events,
2801 NatTraversalError::PunchingFailed(
2802 "No successful punch".to_string(),
2803 ),
2804 );
2805 }
2806 }
2807 TraversalPhase::Validation => {
2808 if self.is_path_validated(&session.peer_id) {
2810 session.phase = TraversalPhase::Connected;
2811 let event = NatTraversalEvent::TraversalSucceeded {
2812 peer_id: session.peer_id,
2813 final_address: session
2814 .candidates
2815 .first()
2816 .map(|c| c.address)
2817 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
2818 total_time: elapsed,
2819 };
2820 events.push(event.clone());
2821 if let Some(ref callback) = self.event_callback {
2822 callback(event);
2823 }
2824 info!(
2825 "NAT traversal succeeded for peer {:?} in {:?}",
2826 session.peer_id, elapsed
2827 );
2828 } else {
2829 self.handle_phase_failure(
2830 session,
2831 now,
2832 &mut events,
2833 NatTraversalError::ValidationFailed(
2834 "Path validation timeout".to_string(),
2835 ),
2836 );
2837 }
2838 }
2839 TraversalPhase::Connected => {
2840 if !self.is_connection_healthy(&session.peer_id) {
2842 warn!(
2843 "Connection to peer {:?} is no longer healthy",
2844 session.peer_id
2845 );
2846 }
2848 }
2849 TraversalPhase::Failed => {
2850 }
2852 }
2853 }
2854 }
2855
2856 Ok(events)
2857 }
2858
2859 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
2861 match phase {
2862 TraversalPhase::Discovery => Duration::from_secs(10),
2863 TraversalPhase::Coordination => self.config.coordination_timeout,
2864 TraversalPhase::Synchronization => Duration::from_secs(3),
2865 TraversalPhase::Punching => Duration::from_secs(5),
2866 TraversalPhase::Validation => Duration::from_secs(5),
2867 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
2869 }
2870 }
2871
2872 fn calculate_backoff(&self, attempt: u32) -> Duration {
2874 let base = Duration::from_millis(1000);
2875 let max = Duration::from_secs(30);
2876 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
2877 let jitter = std::time::Duration::from_millis((rand::random::<u64>() % 200) as u64);
2878 backoff.min(max) + jitter
2879 }
2880
2881 fn check_connections_for_observed_addresses(
2883 &self,
2884 _events: &mut Vec<NatTraversalEvent>,
2885 ) -> Result<(), NatTraversalError> {
2886 let connections = self.connections.read().map_err(|_| {
2888 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
2889 })?;
2890
2891 if !connections.is_empty() && self.config.role == EndpointRole::Client {
2898 for (_peer_id, connection) in connections.iter() {
2900 let remote_addr = connection.remote_address();
2901
2902 let is_bootstrap = {
2904 let bootstrap_nodes = self.bootstrap_nodes.read().map_err(|_| {
2905 NatTraversalError::ProtocolError(
2906 "Bootstrap nodes lock poisoned".to_string(),
2907 )
2908 })?;
2909 bootstrap_nodes
2910 .iter()
2911 .any(|node| node.address == remote_addr)
2912 };
2913
2914 if is_bootstrap {
2915 debug!(
2918 "Bootstrap connection to {} should provide our external address via OBSERVED_ADDRESS frames",
2919 remote_addr
2920 );
2921
2922 }
2925 }
2926 }
2927
2928 Ok(())
2929 }
2930
2931 fn handle_phase_failure(
2933 &self,
2934 session: &mut NatTraversalSession,
2935 now: std::time::Instant,
2936 events: &mut Vec<NatTraversalEvent>,
2937 error: NatTraversalError,
2938 ) {
2939 if session.attempt < self.config.max_concurrent_attempts as u32 {
2940 session.attempt += 1;
2942 session.started_at = now;
2943 let backoff = self.calculate_backoff(session.attempt);
2944 warn!(
2945 "Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
2946 session.phase, session.peer_id, error, session.attempt, backoff
2947 );
2948 } else {
2949 session.phase = TraversalPhase::Failed;
2951 let event = NatTraversalEvent::TraversalFailed {
2952 peer_id: session.peer_id,
2953 error,
2954 fallback_available: self.config.enable_relay_fallback,
2955 };
2956 events.push(event.clone());
2957 if let Some(ref callback) = self.event_callback {
2958 callback(event);
2959 }
2960 error!(
2961 "NAT traversal failed for peer {:?} after {} attempts",
2962 session.peer_id, session.attempt
2963 );
2964 }
2965 }
2966
2967 fn select_coordinator(&self) -> Option<SocketAddr> {
2969 if let Ok(nodes) = self.bootstrap_nodes.read() {
2970 if !nodes.is_empty() {
2972 let idx = rand::random::<usize>() % nodes.len();
2973 return Some(nodes[idx].address);
2974 }
2975 }
2976 None
2977 }
2978
2979 fn send_coordination_request(
2981 &self,
2982 peer_id: PeerId,
2983 coordinator: SocketAddr,
2984 ) -> Result<(), NatTraversalError> {
2985 debug!(
2986 "Sending coordination request for peer {:?} to {}",
2987 peer_id, coordinator
2988 );
2989
2990 {
2991 if let Ok(connections) = self.connections.read() {
2993 for (_peer, conn) in connections.iter() {
2995 if conn.remote_address() == coordinator {
2996 info!("Found existing connection to coordinator {}", coordinator);
3000 return Ok(());
3001 }
3002 }
3003 }
3004
3005 info!("Establishing connection to coordinator {}", coordinator);
3007 if let Some(endpoint) = &self.quinn_endpoint {
3008 let server_name = format!("bootstrap-{}", coordinator.ip());
3009 match endpoint.connect(coordinator, &server_name) {
3010 Ok(connecting) => {
3011 info!("Initiated connection to coordinator {}", coordinator);
3013
3014 if let Some(event_tx) = &self.event_tx {
3016 let event_tx = event_tx.clone();
3017 let connections = self.connections.clone();
3018
3019 tokio::spawn(async move {
3020 match connecting.await {
3021 Ok(connection) => {
3022 info!("Connected to coordinator {}", coordinator);
3023
3024 let bootstrap_peer_id =
3026 Self::generate_peer_id_from_address(coordinator);
3027
3028 if let Ok(mut conns) = connections.write() {
3030 conns.insert(bootstrap_peer_id, connection.clone());
3031 }
3032
3033 Self::handle_connection(connection, event_tx).await;
3035 }
3036 Err(e) => {
3037 warn!(
3038 "Failed to connect to coordinator {}: {}",
3039 coordinator, e
3040 );
3041 }
3042 }
3043 });
3044 }
3045
3046 Ok(())
3049 }
3050 Err(e) => Err(NatTraversalError::CoordinationFailed(format!(
3051 "Failed to connect to coordinator {coordinator}: {e}"
3052 ))),
3053 }
3054 } else {
3055 Err(NatTraversalError::ConfigError(
3056 "Quinn endpoint not initialized".to_string(),
3057 ))
3058 }
3059 }
3060 }
3061
3062 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
3064 debug!("Checking synchronization status for peer {:?}", peer_id);
3065
3066 if let Ok(sessions) = self.active_sessions.read() {
3068 if let Some(session) = sessions.get(peer_id) {
3069 let has_candidates = !session.candidates.is_empty();
3072 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
3073
3074 debug!(
3075 "Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
3076 peer_id,
3077 session.phase,
3078 session.candidates.len(),
3079 past_discovery
3080 );
3081
3082 if has_candidates && past_discovery {
3083 info!(
3084 "Peer {:?} is synchronized with {} candidates",
3085 peer_id,
3086 session.candidates.len()
3087 );
3088 return true;
3089 }
3090
3091 if session.phase == TraversalPhase::Synchronization && has_candidates {
3093 info!(
3094 "Peer {:?} in synchronization phase with {} candidates, considering synchronized",
3095 peer_id,
3096 session.candidates.len()
3097 );
3098 return true;
3099 }
3100
3101 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
3103 info!(
3104 "Test mode: Considering peer {:?} synchronized in phase {:?}",
3105 peer_id, session.phase
3106 );
3107 return true;
3108 }
3109 }
3110 }
3111
3112 warn!("Peer {:?} is not synchronized", peer_id);
3113 false
3114 }
3115
3116 fn initiate_hole_punching(
3118 &self,
3119 peer_id: PeerId,
3120 candidates: &[CandidateAddress],
3121 ) -> Result<(), NatTraversalError> {
3122 if candidates.is_empty() {
3123 return Err(NatTraversalError::NoCandidatesFound);
3124 }
3125
3126 info!(
3127 "Initiating hole punching for peer {:?} to {} candidates",
3128 peer_id,
3129 candidates.len()
3130 );
3131
3132 {
3133 for candidate in candidates {
3135 debug!(
3136 "Attempting QUIC connection to candidate: {}",
3137 candidate.address
3138 );
3139
3140 match self.attempt_connection_to_candidate(peer_id, candidate) {
3142 Ok(_) => {
3143 info!(
3144 "Successfully initiated connection attempt to {}",
3145 candidate.address
3146 );
3147 }
3148 Err(e) => {
3149 warn!(
3150 "Failed to initiate connection to {}: {:?}",
3151 candidate.address, e
3152 );
3153 }
3154 }
3155 }
3156
3157 Ok(())
3158 }
3159 }
3160
3161 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
3163 {
3164 if let Ok(connections) = self.connections.read() {
3166 if let Some(conn) = connections.get(peer_id) {
3167 let addr = conn.remote_address();
3169 info!(
3170 "Found successful connection to peer {:?} at {}",
3171 peer_id, addr
3172 );
3173 return Some(addr);
3174 }
3175 }
3176 }
3177
3178 if let Ok(sessions) = self.active_sessions.read() {
3180 if let Some(session) = sessions.get(peer_id) {
3181 for candidate in &session.candidates {
3183 if matches!(candidate.state, CandidateState::Valid) {
3184 info!(
3185 "Found validated candidate for peer {:?} at {}",
3186 peer_id, candidate.address
3187 );
3188 return Some(candidate.address);
3189 }
3190 }
3191
3192 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
3194 let addr = session.candidates[0].address;
3195 info!(
3196 "Simulating successful punch for testing: peer {:?} at {}",
3197 peer_id, addr
3198 );
3199 return Some(addr);
3200 }
3201
3202 if let Some(first) = session.candidates.first() {
3204 debug!(
3205 "No validated candidates, using first candidate {} for peer {:?}",
3206 first.address, peer_id
3207 );
3208 return Some(first.address);
3209 }
3210 }
3211 }
3212
3213 warn!("No successful punch results for peer {:?}", peer_id);
3214 None
3215 }
3216
3217 fn validate_path(&self, peer_id: PeerId, address: SocketAddr) -> Result<(), NatTraversalError> {
3219 debug!("Validating path to peer {:?} at {}", peer_id, address);
3220
3221 {
3222 if let Ok(connections) = self.connections.read() {
3224 if let Some(conn) = connections.get(&peer_id) {
3225 if conn.remote_address() == address {
3227 info!(
3228 "Path validation successful for peer {:?} at {}",
3229 peer_id, address
3230 );
3231
3232 if let Ok(mut sessions) = self.active_sessions.write() {
3234 if let Some(session) = sessions.get_mut(&peer_id) {
3235 for candidate in &mut session.candidates {
3236 if candidate.address == address {
3237 candidate.state = CandidateState::Valid;
3238 break;
3239 }
3240 }
3241 }
3242 }
3243
3244 return Ok(());
3245 } else {
3246 warn!(
3247 "Connection address mismatch: expected {}, got {}",
3248 address,
3249 conn.remote_address()
3250 );
3251 }
3252 }
3253 }
3254
3255 Err(NatTraversalError::ValidationFailed(format!(
3257 "No connection found for peer {peer_id:?} at {address}"
3258 )))
3259 }
3260 }
3261
3262 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
3264 debug!("Checking path validation for peer {:?}", peer_id);
3265
3266 {
3267 if let Ok(connections) = self.connections.read() {
3269 if connections.contains_key(peer_id) {
3270 info!("Path validated: connection exists for peer {:?}", peer_id);
3271 return true;
3272 }
3273 }
3274 }
3275
3276 if let Ok(sessions) = self.active_sessions.read() {
3278 if let Some(session) = sessions.get(peer_id) {
3279 let validated = session
3280 .candidates
3281 .iter()
3282 .any(|c| matches!(c.state, CandidateState::Valid));
3283
3284 if validated {
3285 info!(
3286 "Path validated: found validated candidate for peer {:?}",
3287 peer_id
3288 );
3289 return true;
3290 }
3291 }
3292 }
3293
3294 warn!("Path not validated for peer {:?}", peer_id);
3295 false
3296 }
3297
3298 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
3300 {
3303 if let Ok(connections) = self.connections.read() {
3304 if let Some(_conn) = connections.get(peer_id) {
3305 return true; }
3310 }
3311 }
3312 true
3313 }
3314
3315 fn convert_discovery_event(
3317 &self,
3318 discovery_event: DiscoveryEvent,
3319 ) -> Option<NatTraversalEvent> {
3320 let current_peer_id = self.get_current_discovery_peer_id();
3322
3323 match discovery_event {
3324 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
3325 Some(NatTraversalEvent::CandidateDiscovered {
3326 peer_id: current_peer_id,
3327 candidate,
3328 })
3329 }
3330 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
3331 candidate,
3332 bootstrap_node: _,
3333 } => Some(NatTraversalEvent::CandidateDiscovered {
3334 peer_id: current_peer_id,
3335 candidate,
3336 }),
3337 DiscoveryEvent::PredictedCandidateGenerated {
3338 candidate,
3339 confidence: _,
3340 } => Some(NatTraversalEvent::CandidateDiscovered {
3341 peer_id: current_peer_id,
3342 candidate,
3343 }),
3344 DiscoveryEvent::DiscoveryCompleted {
3345 candidate_count: _,
3346 total_duration: _,
3347 success_rate: _,
3348 } => {
3349 None }
3352 DiscoveryEvent::DiscoveryFailed {
3353 error,
3354 partial_results,
3355 } => Some(NatTraversalEvent::TraversalFailed {
3356 peer_id: current_peer_id,
3357 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
3358 fallback_available: !partial_results.is_empty(),
3359 }),
3360 _ => None, }
3362 }
3363
3364 fn get_current_discovery_peer_id(&self) -> PeerId {
3366 if let Ok(sessions) = self.active_sessions.read() {
3368 if let Some((peer_id, _session)) = sessions
3369 .iter()
3370 .find(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
3371 {
3372 return *peer_id;
3373 }
3374
3375 if let Some((peer_id, _)) = sessions.iter().next() {
3377 return *peer_id;
3378 }
3379 }
3380
3381 self.local_peer_id
3383 }
3384
3385 pub(crate) async fn handle_endpoint_event(
3387 &self,
3388 event: crate::shared::EndpointEventInner,
3389 ) -> Result<(), NatTraversalError> {
3390 match event {
3391 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
3392 info!(
3393 "NAT candidate validation succeeded for {} with challenge {:016x}",
3394 address, challenge
3395 );
3396
3397 let mut sessions = self.active_sessions.write().map_err(|_| {
3399 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3400 })?;
3401
3402 for (peer_id, session) in sessions.iter_mut() {
3404 if session.candidates.iter().any(|c| c.address == address) {
3405 session.phase = TraversalPhase::Connected;
3407
3408 if let Some(ref callback) = self.event_callback {
3410 callback(NatTraversalEvent::CandidateValidated {
3411 peer_id: *peer_id,
3412 candidate_address: address,
3413 });
3414 }
3415
3416 return self
3418 .establish_connection_to_validated_candidate(*peer_id, address)
3419 .await;
3420 }
3421 }
3422
3423 debug!(
3424 "Validated candidate {} not found in active sessions",
3425 address
3426 );
3427 Ok(())
3428 }
3429
3430 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
3431 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
3432
3433 let target_peer = PeerId(target_peer_id);
3435
3436 let connections = self.connections.read().map_err(|_| {
3438 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3439 })?;
3440
3441 if let Some(connection) = connections.get(&target_peer) {
3442 let mut send_stream = connection.open_uni().await.map_err(|e| {
3444 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3445 })?;
3446
3447 let mut frame_data = Vec::new();
3449 punch_frame.encode(&mut frame_data);
3450
3451 send_stream.write_all(&frame_data).await.map_err(|e| {
3452 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3453 })?;
3454
3455 send_stream.finish();
3456
3457 debug!(
3458 "Successfully relayed PUNCH_ME_NOW frame to peer {:?}",
3459 target_peer
3460 );
3461 Ok(())
3462 } else {
3463 warn!("No connection found for target peer {:?}", target_peer);
3464 Err(NatTraversalError::PeerNotConnected)
3465 }
3466 }
3467
3468 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
3469 info!(
3470 "Sending AddAddress frame for address {}",
3471 add_address_frame.address
3472 );
3473
3474 let connections = self.connections.read().map_err(|_| {
3476 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3477 })?;
3478
3479 for (peer_id, connection) in connections.iter() {
3480 let mut send_stream = connection.open_uni().await.map_err(|e| {
3482 NatTraversalError::NetworkError(format!("Failed to open stream: {e}"))
3483 })?;
3484
3485 let mut frame_data = Vec::new();
3487 add_address_frame.encode(&mut frame_data);
3488
3489 send_stream.write_all(&frame_data).await.map_err(|e| {
3490 NatTraversalError::NetworkError(format!("Failed to send frame: {e}"))
3491 })?;
3492
3493 send_stream.finish();
3494
3495 debug!("Sent AddAddress frame to peer {:?}", peer_id);
3496 }
3497
3498 Ok(())
3499 }
3500
3501 _ => {
3502 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
3504 Ok(())
3505 }
3506 }
3507 }
3508
3509 async fn establish_connection_to_validated_candidate(
3511 &self,
3512 peer_id: PeerId,
3513 candidate_address: SocketAddr,
3514 ) -> Result<(), NatTraversalError> {
3515 info!(
3516 "Establishing connection to validated candidate {} for peer {:?}",
3517 candidate_address, peer_id
3518 );
3519
3520 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
3521 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
3522 })?;
3523
3524 let connecting = endpoint
3526 .connect(candidate_address, "nat-traversal-peer")
3527 .map_err(|e| {
3528 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {e}"))
3529 })?;
3530
3531 let connection = timeout(
3532 self.timeout_config
3533 .nat_traversal
3534 .connection_establishment_timeout,
3535 connecting,
3536 )
3537 .await
3538 .map_err(|_| NatTraversalError::Timeout)?
3539 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {e}")))?;
3540
3541 {
3543 let mut connections = self.connections.write().map_err(|_| {
3544 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3545 })?;
3546 connections.insert(peer_id, connection.clone());
3547 }
3548
3549 {
3551 let mut sessions = self.active_sessions.write().map_err(|_| {
3552 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3553 })?;
3554 if let Some(session) = sessions.get_mut(&peer_id) {
3555 session.phase = TraversalPhase::Connected;
3556 }
3557 }
3558
3559 if let Some(ref callback) = self.event_callback {
3561 callback(NatTraversalEvent::ConnectionEstablished {
3562 peer_id,
3563 remote_address: candidate_address,
3564 });
3565 }
3566
3567 info!(
3568 "Successfully established connection to peer {:?} at {}",
3569 peer_id, candidate_address
3570 );
3571 Ok(())
3572 }
3573
3574 async fn send_candidate_advertisement(
3580 &self,
3581 peer_id: PeerId,
3582 candidate: &CandidateAddress,
3583 ) -> Result<(), NatTraversalError> {
3584 debug!(
3585 "Sending candidate advertisement to peer {:?}: {}",
3586 peer_id, candidate.address
3587 );
3588
3589 let connections = self.connections.read().map_err(|_| {
3591 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3592 })?;
3593
3594 if let Some(_connection) = connections.get(&peer_id) {
3595 debug!(
3597 "Found connection to peer {:?}, sending ADD_ADDRESS frame",
3598 peer_id
3599 );
3600
3601 drop(connections); let connections = self.connections.write().map_err(|_| {
3607 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3608 })?;
3609
3610 if let Some(connection) = connections.get(&peer_id) {
3611 let mut frame_data = Vec::new();
3614 frame_data.push(0x40); let sequence = candidate.priority as u64; frame_data.extend_from_slice(&sequence.to_be_bytes());
3619
3620 match candidate.address {
3622 SocketAddr::V4(addr) => {
3623 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
3625 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3626 }
3627 SocketAddr::V6(addr) => {
3628 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
3630 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3631 }
3632 }
3633
3634 frame_data.extend_from_slice(&candidate.priority.to_be_bytes());
3636
3637 match connection.send_datagram(frame_data.into()) {
3639 Ok(()) => {
3640 info!(
3641 "Sent ADD_ADDRESS frame to peer {:?}: addr={}, priority={}",
3642 peer_id, candidate.address, candidate.priority
3643 );
3644 Ok(())
3645 }
3646 Err(e) => {
3647 warn!(
3648 "Failed to send ADD_ADDRESS frame to peer {:?}: {}",
3649 peer_id, e
3650 );
3651 Err(NatTraversalError::ProtocolError(format!(
3652 "Failed to send ADD_ADDRESS frame: {e}"
3653 )))
3654 }
3655 }
3656 } else {
3657 debug!(
3659 "Connection to peer {:?} disappeared during frame sending",
3660 peer_id
3661 );
3662 Ok(())
3663 }
3664 } else {
3665 debug!(
3667 "No connection found for peer {:?} - candidate will be sent when connection is established",
3668 peer_id
3669 );
3670 Ok(())
3671 }
3672 }
3673
3674 async fn send_punch_coordination(
3679 &self,
3680 peer_id: PeerId,
3681 paired_with_sequence_number: u64,
3682 address: SocketAddr,
3683 round: u32,
3684 ) -> Result<(), NatTraversalError> {
3685 debug!(
3686 "Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
3687 peer_id, paired_with_sequence_number, address, round
3688 );
3689
3690 let connections = self.connections.read().map_err(|_| {
3691 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3692 })?;
3693
3694 if let Some(connection) = connections.get(&peer_id) {
3695 let mut frame_data = Vec::new();
3698 frame_data.push(0x41); frame_data.extend_from_slice(&round.to_be_bytes());
3702
3703 frame_data.extend_from_slice(&paired_with_sequence_number.to_be_bytes());
3705
3706 match address {
3708 SocketAddr::V4(addr) => {
3709 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
3711 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3712 }
3713 SocketAddr::V6(addr) => {
3714 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
3716 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3717 }
3718 }
3719
3720 match connection.send_datagram(frame_data.into()) {
3722 Ok(()) => {
3723 info!(
3724 "Sent PUNCH_ME_NOW frame to peer {:?}: paired_with_seq={}, addr={}, round={}",
3725 peer_id, paired_with_sequence_number, address, round
3726 );
3727 Ok(())
3728 }
3729 Err(e) => {
3730 warn!(
3731 "Failed to send PUNCH_ME_NOW frame to peer {:?}: {}",
3732 peer_id, e
3733 );
3734 Err(NatTraversalError::ProtocolError(format!(
3735 "Failed to send PUNCH_ME_NOW frame: {e}"
3736 )))
3737 }
3738 }
3739 } else {
3740 Err(NatTraversalError::PeerNotConnected)
3741 }
3742 }
3743
3744 pub fn get_nat_stats(
3746 &self,
3747 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
3748 Ok(NatTraversalStatistics {
3751 active_sessions: self.active_sessions.read().unwrap().len(),
3752 total_bootstrap_nodes: self.bootstrap_nodes.read().unwrap().len(),
3753 successful_coordinations: 7,
3754 average_coordination_time: self.timeout_config.nat_traversal.retry_interval,
3755 total_attempts: 10,
3756 successful_connections: 7,
3757 direct_connections: 5,
3758 relayed_connections: 2,
3759 })
3760 }
3761}
3762
3763impl fmt::Debug for NatTraversalEndpoint {
3764 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3765 f.debug_struct("NatTraversalEndpoint")
3766 .field("config", &self.config)
3767 .field("bootstrap_nodes", &"<RwLock>")
3768 .field("active_sessions", &"<RwLock>")
3769 .field("event_callback", &self.event_callback.is_some())
3770 .finish()
3771 }
3772}
3773
3774#[derive(Debug, Clone, Default)]
3776pub struct NatTraversalStatistics {
3777 pub active_sessions: usize,
3779 pub total_bootstrap_nodes: usize,
3781 pub successful_coordinations: u32,
3783 pub average_coordination_time: Duration,
3785 pub total_attempts: u32,
3787 pub successful_connections: u32,
3789 pub direct_connections: u32,
3791 pub relayed_connections: u32,
3793}
3794
3795impl fmt::Display for NatTraversalError {
3796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3797 match self {
3798 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
3799 Self::NoCandidatesFound => write!(f, "no address candidates found"),
3800 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {msg}"),
3801 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {msg}"),
3802 Self::HolePunchingFailed => write!(f, "hole punching failed"),
3803 Self::PunchingFailed(msg) => write!(f, "punching failed: {msg}"),
3804 Self::ValidationFailed(msg) => write!(f, "validation failed: {msg}"),
3805 Self::ValidationTimeout => write!(f, "validation timeout"),
3806 Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3807 Self::ConfigError(msg) => write!(f, "configuration error: {msg}"),
3808 Self::ProtocolError(msg) => write!(f, "protocol error: {msg}"),
3809 Self::Timeout => write!(f, "operation timed out"),
3810 Self::ConnectionFailed(msg) => write!(f, "connection failed: {msg}"),
3811 Self::TraversalFailed(msg) => write!(f, "traversal failed: {msg}"),
3812 Self::PeerNotConnected => write!(f, "peer not connected"),
3813 }
3814 }
3815}
3816
3817impl std::error::Error for NatTraversalError {}
3818
3819impl fmt::Display for PeerId {
3820 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3821 for byte in &self.0[..8] {
3823 write!(f, "{byte:02x}")?;
3824 }
3825 Ok(())
3826 }
3827}
3828
3829impl From<[u8; 32]> for PeerId {
3830 fn from(bytes: [u8; 32]) -> Self {
3831 Self(bytes)
3832 }
3833}
3834
3835#[derive(Debug)]
3838struct SkipServerVerification;
3839
3840impl SkipServerVerification {
3841 fn new() -> Arc<Self> {
3842 Arc::new(Self)
3843 }
3844}
3845
3846impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
3847 fn verify_server_cert(
3848 &self,
3849 _end_entity: &rustls::pki_types::CertificateDer<'_>,
3850 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
3851 _server_name: &rustls::pki_types::ServerName<'_>,
3852 _ocsp_response: &[u8],
3853 _now: rustls::pki_types::UnixTime,
3854 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
3855 Ok(rustls::client::danger::ServerCertVerified::assertion())
3856 }
3857
3858 fn verify_tls12_signature(
3859 &self,
3860 _message: &[u8],
3861 _cert: &rustls::pki_types::CertificateDer<'_>,
3862 _dss: &rustls::DigitallySignedStruct,
3863 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3864 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3865 }
3866
3867 fn verify_tls13_signature(
3868 &self,
3869 _message: &[u8],
3870 _cert: &rustls::pki_types::CertificateDer<'_>,
3871 _dss: &rustls::DigitallySignedStruct,
3872 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3873 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3874 }
3875
3876 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
3877 vec![
3878 rustls::SignatureScheme::RSA_PKCS1_SHA256,
3879 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
3880 rustls::SignatureScheme::ED25519,
3881 ]
3882 }
3883}
3884
3885struct DefaultTokenStore;
3887
3888impl crate::TokenStore for DefaultTokenStore {
3889 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
3890 }
3892
3893 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
3894 None
3895 }
3896}
3897
3898#[cfg(test)]
3899mod tests {
3900 use super::*;
3901
3902 #[test]
3903 fn test_nat_traversal_config_default() {
3904 let config = NatTraversalConfig::default();
3905 assert_eq!(config.role, EndpointRole::Client);
3906 assert_eq!(config.max_candidates, 8);
3907 assert!(config.enable_symmetric_nat);
3908 assert!(config.enable_relay_fallback);
3909 }
3910
3911 #[test]
3912 fn test_peer_id_display() {
3913 let peer_id = PeerId([
3914 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
3915 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33,
3916 0x44, 0x55, 0x66, 0x77,
3917 ]);
3918 assert_eq!(format!("{peer_id}"), "0123456789abcdef");
3919 }
3920
3921 #[test]
3922 fn test_bootstrap_node_management() {
3923 let _config = NatTraversalConfig::default();
3924 }
3927
3928 #[test]
3929 fn test_candidate_address_validation() {
3930 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
3931
3932 assert!(
3934 CandidateAddress::validate_address(&SocketAddr::new(
3935 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3936 8080
3937 ))
3938 .is_ok()
3939 );
3940
3941 assert!(
3942 CandidateAddress::validate_address(&SocketAddr::new(
3943 IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)),
3944 53
3945 ))
3946 .is_ok()
3947 );
3948
3949 assert!(
3950 CandidateAddress::validate_address(&SocketAddr::new(
3951 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
3952 443
3953 ))
3954 .is_ok()
3955 );
3956
3957 assert!(matches!(
3959 CandidateAddress::validate_address(&SocketAddr::new(
3960 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3961 0
3962 )),
3963 Err(CandidateValidationError::InvalidPort(0))
3964 ));
3965
3966 #[cfg(not(test))]
3968 assert!(matches!(
3969 CandidateAddress::validate_address(&SocketAddr::new(
3970 IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)),
3971 80
3972 )),
3973 Err(CandidateValidationError::PrivilegedPort(80))
3974 ));
3975
3976 assert!(matches!(
3978 CandidateAddress::validate_address(&SocketAddr::new(
3979 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
3980 8080
3981 )),
3982 Err(CandidateValidationError::UnspecifiedAddress)
3983 ));
3984
3985 assert!(matches!(
3986 CandidateAddress::validate_address(&SocketAddr::new(
3987 IpAddr::V6(Ipv6Addr::UNSPECIFIED),
3988 8080
3989 )),
3990 Err(CandidateValidationError::UnspecifiedAddress)
3991 ));
3992
3993 assert!(matches!(
3995 CandidateAddress::validate_address(&SocketAddr::new(
3996 IpAddr::V4(Ipv4Addr::BROADCAST),
3997 8080
3998 )),
3999 Err(CandidateValidationError::BroadcastAddress)
4000 ));
4001
4002 assert!(matches!(
4004 CandidateAddress::validate_address(&SocketAddr::new(
4005 IpAddr::V4(Ipv4Addr::new(224, 0, 0, 1)),
4006 8080
4007 )),
4008 Err(CandidateValidationError::MulticastAddress)
4009 ));
4010
4011 assert!(matches!(
4012 CandidateAddress::validate_address(&SocketAddr::new(
4013 IpAddr::V6(Ipv6Addr::new(0xff02, 0, 0, 0, 0, 0, 0, 1)),
4014 8080
4015 )),
4016 Err(CandidateValidationError::MulticastAddress)
4017 ));
4018
4019 assert!(matches!(
4021 CandidateAddress::validate_address(&SocketAddr::new(
4022 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
4023 8080
4024 )),
4025 Err(CandidateValidationError::ReservedAddress)
4026 ));
4027
4028 assert!(matches!(
4029 CandidateAddress::validate_address(&SocketAddr::new(
4030 IpAddr::V4(Ipv4Addr::new(240, 0, 0, 1)),
4031 8080
4032 )),
4033 Err(CandidateValidationError::ReservedAddress)
4034 ));
4035
4036 assert!(matches!(
4038 CandidateAddress::validate_address(&SocketAddr::new(
4039 IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1)),
4040 8080
4041 )),
4042 Err(CandidateValidationError::DocumentationAddress)
4043 ));
4044
4045 assert!(matches!(
4047 CandidateAddress::validate_address(&SocketAddr::new(
4048 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc0a8, 0x0001)),
4049 8080
4050 )),
4051 Err(CandidateValidationError::IPv4MappedAddress)
4052 ));
4053 }
4054
4055 #[test]
4056 fn test_candidate_address_suitability_for_nat_traversal() {
4057 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
4058
4059 let public_v4 = CandidateAddress::new(
4061 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), 8080),
4062 100,
4063 CandidateSource::Observed { by_node: None },
4064 )
4065 .unwrap();
4066 assert!(public_v4.is_suitable_for_nat_traversal());
4067
4068 let private_v4 = CandidateAddress::new(
4069 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4070 100,
4071 CandidateSource::Local,
4072 )
4073 .unwrap();
4074 assert!(private_v4.is_suitable_for_nat_traversal());
4075
4076 let link_local_v4 = CandidateAddress::new(
4078 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(169, 254, 1, 1)), 8080),
4079 100,
4080 CandidateSource::Local,
4081 )
4082 .unwrap();
4083 assert!(!link_local_v4.is_suitable_for_nat_traversal());
4084
4085 let global_v6 = CandidateAddress::new(
4087 SocketAddr::new(
4088 IpAddr::V6(Ipv6Addr::new(0x2001, 0x4860, 0x4860, 0, 0, 0, 0, 0x8888)),
4089 8080,
4090 ),
4091 100,
4092 CandidateSource::Observed { by_node: None },
4093 )
4094 .unwrap();
4095 assert!(global_v6.is_suitable_for_nat_traversal());
4096
4097 let link_local_v6 = CandidateAddress::new(
4099 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfe80, 0, 0, 0, 0, 0, 0, 1)), 8080),
4100 100,
4101 CandidateSource::Local,
4102 )
4103 .unwrap();
4104 assert!(!link_local_v6.is_suitable_for_nat_traversal());
4105
4106 let unique_local_v6 = CandidateAddress::new(
4108 SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xfc00, 0, 0, 0, 0, 0, 0, 1)), 8080),
4109 100,
4110 CandidateSource::Local,
4111 )
4112 .unwrap();
4113 assert!(!unique_local_v6.is_suitable_for_nat_traversal());
4114
4115 #[cfg(test)]
4117 {
4118 let loopback_v4 = CandidateAddress::new(
4119 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080),
4120 100,
4121 CandidateSource::Local,
4122 )
4123 .unwrap();
4124 assert!(loopback_v4.is_suitable_for_nat_traversal());
4125
4126 let loopback_v6 = CandidateAddress::new(
4127 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 8080),
4128 100,
4129 CandidateSource::Local,
4130 )
4131 .unwrap();
4132 assert!(loopback_v6.is_suitable_for_nat_traversal());
4133 }
4134 }
4135
4136 #[test]
4137 fn test_candidate_effective_priority() {
4138 use std::net::{IpAddr, Ipv4Addr};
4139
4140 let mut candidate = CandidateAddress::new(
4141 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080),
4142 100,
4143 CandidateSource::Local,
4144 )
4145 .unwrap();
4146
4147 assert_eq!(candidate.effective_priority(), 90);
4149
4150 candidate.state = CandidateState::Validating;
4152 assert_eq!(candidate.effective_priority(), 95);
4153
4154 candidate.state = CandidateState::Valid;
4156 assert_eq!(candidate.effective_priority(), 100);
4157
4158 candidate.state = CandidateState::Failed;
4160 assert_eq!(candidate.effective_priority(), 0);
4161
4162 candidate.state = CandidateState::Removed;
4164 assert_eq!(candidate.effective_priority(), 0);
4165 }
4166}