1use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, time::Duration};
8
9use tracing::{debug, error, info, warn};
10
11use std::sync::atomic::{AtomicBool, Ordering};
12
13use tokio::{
14 net::UdpSocket,
15 sync::mpsc,
16 time::{sleep, timeout},
17};
18
19#[cfg(feature = "runtime-tokio")]
20use crate::quinn_high_level::TokioRuntime;
21
22use crate::{
23 VarInt,
24 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
25 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
26};
27
28use crate::{
29 ClientConfig, ConnectionError, EndpointConfig, ServerConfig, TransportConfig,
30 crypto::rustls::QuicClientConfig,
31 crypto::rustls::QuicServerConfig,
32 quinn_high_level::{Connection as QuinnConnection, Endpoint as QuinnEndpoint},
33};
34
35use crate::config::validation::{ConfigValidator, ValidationResult};
36
37use crate::crypto::certificate_manager::{CertificateConfig, CertificateManager};
38
39pub struct NatTraversalEndpoint {
41 quinn_endpoint: Option<QuinnEndpoint>,
43 config: NatTraversalConfig,
47 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
49 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
51 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
53 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
55 shutdown: Arc<AtomicBool>,
57 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
59 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
61 local_peer_id: PeerId,
63}
64
65#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
67pub struct NatTraversalConfig {
68 pub role: EndpointRole,
70 pub bootstrap_nodes: Vec<SocketAddr>,
72 pub max_candidates: usize,
74 pub coordination_timeout: Duration,
76 pub enable_symmetric_nat: bool,
78 pub enable_relay_fallback: bool,
80 pub max_concurrent_attempts: usize,
82 pub bind_addr: Option<SocketAddr>,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
88pub enum EndpointRole {
89 Client,
91 Server { can_coordinate: bool },
93 Bootstrap,
95}
96
97impl EndpointRole {
98 pub fn name(&self) -> &'static str {
100 match self {
101 EndpointRole::Client => "client",
102 EndpointRole::Server { .. } => "server",
103 EndpointRole::Bootstrap => "bootstrap",
104 }
105 }
106}
107
108#[derive(
110 Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
111)]
112pub struct PeerId(pub [u8; 32]);
113
114#[derive(Debug, Clone)]
116pub struct BootstrapNode {
117 pub address: SocketAddr,
119 pub last_seen: std::time::Instant,
121 pub can_coordinate: bool,
123 pub rtt: Option<Duration>,
125 pub coordination_count: u32,
127}
128
129impl BootstrapNode {
130 pub fn new(address: SocketAddr) -> Self {
132 Self {
133 address,
134 last_seen: std::time::Instant::now(),
135 can_coordinate: true,
136 rtt: None,
137 coordination_count: 0,
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct CandidatePair {
145 pub local_candidate: CandidateAddress,
147 pub remote_candidate: CandidateAddress,
149 pub priority: u64,
151 pub state: CandidatePairState,
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum CandidatePairState {
158 Waiting,
160 InProgress,
162 Succeeded,
164 Failed,
166 Cancelled,
168}
169
170#[derive(Debug)]
172struct NatTraversalSession {
173 peer_id: PeerId,
175 coordinator: SocketAddr,
177 attempt: u32,
179 started_at: std::time::Instant,
181 phase: TraversalPhase,
183 candidates: Vec<CandidateAddress>,
185 session_state: SessionState,
187}
188
189#[derive(Debug, Clone)]
191pub struct SessionState {
192 pub state: ConnectionState,
194 pub last_transition: std::time::Instant,
196 pub connection: Option<QuinnConnection>,
198 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
200 pub metrics: ConnectionMetrics,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206pub enum ConnectionState {
207 Idle,
209 Connecting,
211 Connected,
213 Migrating,
215 Closed,
217}
218
219#[derive(Debug, Clone, Default)]
221pub struct ConnectionMetrics {
222 pub rtt: Option<Duration>,
224 pub loss_rate: f64,
226 pub bytes_sent: u64,
228 pub bytes_received: u64,
230 pub last_activity: Option<std::time::Instant>,
232}
233
234#[derive(Debug, Clone)]
236pub struct SessionStateUpdate {
237 pub peer_id: PeerId,
239 pub old_state: ConnectionState,
241 pub new_state: ConnectionState,
243 pub reason: StateChangeReason,
245}
246
247#[derive(Debug, Clone, Copy, PartialEq, Eq)]
249pub enum StateChangeReason {
250 Timeout,
252 ConnectionEstablished,
254 ConnectionClosed,
256 MigrationComplete,
258 MigrationFailed,
260 NetworkError,
262 UserClosed,
264}
265
266#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub enum TraversalPhase {
269 Discovery,
271 Coordination,
273 Synchronization,
275 Punching,
277 Validation,
279 Connected,
281 Failed,
283}
284
285#[derive(Debug, Clone)]
287pub struct CandidateAddress {
288 pub address: SocketAddr,
290 pub priority: u32,
292 pub source: CandidateSource,
294 pub state: CandidateState,
296}
297
298#[derive(Debug, Clone)]
300pub enum NatTraversalEvent {
301 CandidateDiscovered {
303 peer_id: PeerId,
304 candidate: CandidateAddress,
305 },
306 CoordinationRequested {
308 peer_id: PeerId,
309 coordinator: SocketAddr,
310 },
311 CoordinationSynchronized { peer_id: PeerId, round_id: VarInt },
313 HolePunchingStarted {
315 peer_id: PeerId,
316 targets: Vec<SocketAddr>,
317 },
318 PathValidated {
320 peer_id: PeerId,
321 address: SocketAddr,
322 rtt: Duration,
323 },
324 CandidateValidated {
326 peer_id: PeerId,
327 candidate_address: SocketAddr,
328 },
329 TraversalSucceeded {
331 peer_id: PeerId,
332 final_address: SocketAddr,
333 total_time: Duration,
334 },
335 ConnectionEstablished {
337 peer_id: PeerId,
338 remote_address: SocketAddr,
340 },
341 TraversalFailed {
343 peer_id: PeerId,
344 error: NatTraversalError,
345 fallback_available: bool,
346 },
347 ConnectionLost { peer_id: PeerId, reason: String },
349 PhaseTransition {
351 peer_id: PeerId,
352 from_phase: TraversalPhase,
353 to_phase: TraversalPhase,
354 },
355 SessionStateChanged {
357 peer_id: PeerId,
358 new_state: ConnectionState,
359 },
360}
361
362#[derive(Debug, Clone)]
364pub enum NatTraversalError {
365 NoBootstrapNodes,
367 NoCandidatesFound,
369 CandidateDiscoveryFailed(String),
371 CoordinationFailed(String),
373 HolePunchingFailed,
375 PunchingFailed(String),
377 ValidationFailed(String),
379 ValidationTimeout,
381 NetworkError(String),
383 ConfigError(String),
385 ProtocolError(String),
387 Timeout,
389 ConnectionFailed(String),
391 TraversalFailed(String),
393 PeerNotConnected,
395}
396
397impl Default for NatTraversalConfig {
398 fn default() -> Self {
399 Self {
400 role: EndpointRole::Client,
401 bootstrap_nodes: Vec::new(),
402 max_candidates: 8,
403 coordination_timeout: Duration::from_secs(10),
404 enable_symmetric_nat: true,
405 enable_relay_fallback: true,
406 max_concurrent_attempts: 3,
407 bind_addr: None,
408 }
409 }
410}
411
412impl ConfigValidator for NatTraversalConfig {
413 fn validate(&self) -> ValidationResult<()> {
414 use crate::config::validation::*;
415
416 match self.role {
418 EndpointRole::Client => {
419 if self.bootstrap_nodes.is_empty() {
420 return Err(ConfigValidationError::InvalidRole(
421 "Client endpoints require at least one bootstrap node".to_string(),
422 ));
423 }
424 }
425 EndpointRole::Server { can_coordinate } => {
426 if can_coordinate && self.bootstrap_nodes.is_empty() {
427 return Err(ConfigValidationError::InvalidRole(
428 "Server endpoints with coordination capability require bootstrap nodes"
429 .to_string(),
430 ));
431 }
432 }
433 EndpointRole::Bootstrap => {
434 }
436 }
437
438 if !self.bootstrap_nodes.is_empty() {
440 validate_bootstrap_nodes(&self.bootstrap_nodes)?;
441 }
442
443 validate_range(self.max_candidates, 1, 256, "max_candidates")?;
445
446 validate_duration(
448 self.coordination_timeout,
449 Duration::from_millis(100),
450 Duration::from_secs(300),
451 "coordination_timeout",
452 )?;
453
454 validate_range(
456 self.max_concurrent_attempts,
457 1,
458 16,
459 "max_concurrent_attempts",
460 )?;
461
462 if self.max_concurrent_attempts > self.max_candidates {
464 return Err(ConfigValidationError::IncompatibleConfiguration(
465 "max_concurrent_attempts cannot exceed max_candidates".to_string(),
466 ));
467 }
468
469 if self.role == EndpointRole::Bootstrap && self.enable_relay_fallback {
470 return Err(ConfigValidationError::IncompatibleConfiguration(
471 "Bootstrap nodes should not enable relay fallback".to_string(),
472 ));
473 }
474
475 Ok(())
476 }
477}
478
479impl NatTraversalEndpoint {
480 pub async fn new(
482 config: NatTraversalConfig,
483 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
484 ) -> Result<Self, NatTraversalError> {
485 Self::new_impl(config, event_callback).await
486 }
487
488 async fn new_impl(
490 config: NatTraversalConfig,
491 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
492 ) -> Result<Self, NatTraversalError> {
493 Self::new_common(config, event_callback).await
494 }
495
496 async fn new_common(
498 config: NatTraversalConfig,
499 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
500 ) -> Result<Self, NatTraversalError> {
501 Self::new_shared_logic(config, event_callback).await
503 }
504
505 async fn new_shared_logic(
507 config: NatTraversalConfig,
508 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
509 ) -> Result<Self, NatTraversalError> {
510 {
513 config
514 .validate()
515 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
516 }
517
518 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
522 config
523 .bootstrap_nodes
524 .iter()
525 .map(|&address| BootstrapNode {
526 address,
527 last_seen: std::time::Instant::now(),
528 can_coordinate: true, rtt: None,
530 coordination_count: 0,
531 })
532 .collect(),
533 ));
534
535 let discovery_config = DiscoveryConfig {
537 total_timeout: config.coordination_timeout,
538 max_candidates: config.max_candidates,
539 enable_symmetric_prediction: config.enable_symmetric_nat,
540 bound_address: config.bind_addr, ..DiscoveryConfig::default()
542 };
543
544 let nat_traversal_role = match config.role {
545 EndpointRole::Client => NatTraversalRole::Client,
546 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server {
547 can_relay: can_coordinate,
548 },
549 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
550 };
551
552 let discovery_manager = Arc::new(std::sync::Mutex::new(CandidateDiscoveryManager::new(
553 discovery_config,
554 )));
555
556 let (quinn_endpoint, event_tx, local_addr) =
559 Self::create_quinn_endpoint(&config, nat_traversal_role).await?;
560
561 {
563 let mut discovery = discovery_manager.lock().map_err(|_| {
564 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
565 })?;
566 discovery.set_bound_address(local_addr);
567 info!(
568 "Updated discovery manager with bound address: {}",
569 local_addr
570 );
571 }
572
573 let endpoint = Self {
574 quinn_endpoint: Some(quinn_endpoint.clone()),
575 config: config.clone(),
576 bootstrap_nodes,
577 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
578 discovery_manager,
579 event_callback,
580 shutdown: Arc::new(AtomicBool::new(false)),
581 event_tx: Some(event_tx.clone()),
582 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
583 local_peer_id: Self::generate_local_peer_id(),
584 };
585
586 if matches!(
588 config.role,
589 EndpointRole::Bootstrap | EndpointRole::Server { .. }
590 ) {
591 let endpoint_clone = quinn_endpoint.clone();
592 let shutdown_clone = endpoint.shutdown.clone();
593 let event_tx_clone = event_tx.clone();
594 let connections_clone = endpoint.connections.clone();
595
596 tokio::spawn(async move {
597 Self::accept_connections(
598 endpoint_clone,
599 shutdown_clone,
600 event_tx_clone,
601 connections_clone,
602 )
603 .await;
604 });
605
606 info!("Started accepting connections for {:?} role", config.role);
607 }
608
609 let discovery_manager_clone = endpoint.discovery_manager.clone();
611 let shutdown_clone = endpoint.shutdown.clone();
612 let event_tx_clone = event_tx;
613
614 tokio::spawn(async move {
615 Self::poll_discovery(discovery_manager_clone, shutdown_clone, event_tx_clone).await;
616 });
617
618 info!("Started discovery polling task");
619
620 {
622 let mut discovery = endpoint.discovery_manager.lock().map_err(|_| {
623 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
624 })?;
625
626 let local_peer_id = endpoint.local_peer_id;
628 let bootstrap_nodes = {
629 let nodes = endpoint.bootstrap_nodes.read().map_err(|_| {
630 NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string())
631 })?;
632 nodes.clone()
633 };
634
635 discovery
636 .start_discovery(local_peer_id, bootstrap_nodes)
637 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
638
639 info!(
640 "Started local candidate discovery for peer {:?}",
641 local_peer_id
642 );
643 }
644
645 Ok(endpoint)
646 }
647
648 pub fn get_quinn_endpoint(&self) -> Option<&crate::quinn_high_level::Endpoint> {
650 self.quinn_endpoint.as_ref()
651 }
652
653 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
655 self.event_callback.as_ref()
656 }
657
658 pub fn initiate_nat_traversal(
660 &self,
661 peer_id: PeerId,
662 coordinator: SocketAddr,
663 ) -> Result<(), NatTraversalError> {
664 info!(
665 "Starting NAT traversal to peer {:?} via coordinator {}",
666 peer_id, coordinator
667 );
668
669 let session = NatTraversalSession {
671 peer_id,
672 coordinator,
673 attempt: 1,
674 started_at: std::time::Instant::now(),
675 phase: TraversalPhase::Discovery,
676 candidates: Vec::new(),
677 session_state: SessionState {
678 state: ConnectionState::Connecting,
679 last_transition: std::time::Instant::now(),
680
681 connection: None,
682 active_attempts: Vec::new(),
683 metrics: ConnectionMetrics::default(),
684 },
685 };
686
687 {
689 let mut sessions = self
690 .active_sessions
691 .write()
692 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
693 sessions.insert(peer_id, session);
694 }
695
696 let bootstrap_nodes_vec = {
698 let bootstrap_nodes = self
699 .bootstrap_nodes
700 .read()
701 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
702 bootstrap_nodes.clone()
703 };
704
705 {
706 let mut discovery = self.discovery_manager.lock().map_err(|_| {
707 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
708 })?;
709
710 discovery
711 .start_discovery(peer_id, bootstrap_nodes_vec)
712 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
713 }
714
715 if let Some(ref callback) = self.event_callback {
717 callback(NatTraversalEvent::CoordinationRequested {
718 peer_id,
719 coordinator,
720 });
721 }
722
723 Ok(())
725 }
726
727 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
729 let mut updates = Vec::new();
730 let now = std::time::Instant::now();
731
732 let mut sessions = self
733 .active_sessions
734 .write()
735 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
736
737 for (peer_id, session) in sessions.iter_mut() {
738 let mut state_changed = false;
739
740 match session.session_state.state {
741 ConnectionState::Connecting => {
742 let elapsed = now.duration_since(session.session_state.last_transition);
744 if elapsed > Duration::from_secs(30) {
745 session.session_state.state = ConnectionState::Closed;
746 session.session_state.last_transition = now;
747 state_changed = true;
748
749 updates.push(SessionStateUpdate {
750 peer_id: *peer_id,
751 old_state: ConnectionState::Connecting,
752 new_state: ConnectionState::Closed,
753 reason: StateChangeReason::Timeout,
754 });
755 }
756
757 if let Some(ref _connection) = session.session_state.connection {
760 session.session_state.state = ConnectionState::Connected;
761 session.session_state.last_transition = now;
762 state_changed = true;
763
764 updates.push(SessionStateUpdate {
765 peer_id: *peer_id,
766 old_state: ConnectionState::Connecting,
767 new_state: ConnectionState::Connected,
768 reason: StateChangeReason::ConnectionEstablished,
769 });
770 }
771 }
772 ConnectionState::Connected => {
773 {
776 }
779
780 session.session_state.metrics.last_activity = Some(now);
782 }
783 ConnectionState::Migrating => {
784 let elapsed = now.duration_since(session.session_state.last_transition);
786 if elapsed > Duration::from_secs(10) {
787 if session.session_state.connection.is_some() {
790 session.session_state.state = ConnectionState::Connected;
791 state_changed = true;
792
793 updates.push(SessionStateUpdate {
794 peer_id: *peer_id,
795 old_state: ConnectionState::Migrating,
796 new_state: ConnectionState::Connected,
797 reason: StateChangeReason::MigrationComplete,
798 });
799 } else {
800 session.session_state.state = ConnectionState::Closed;
801 state_changed = true;
802
803 updates.push(SessionStateUpdate {
804 peer_id: *peer_id,
805 old_state: ConnectionState::Migrating,
806 new_state: ConnectionState::Closed,
807 reason: StateChangeReason::MigrationFailed,
808 });
809 }
810
811 session.session_state.last_transition = now;
812 }
813 }
814 _ => {}
815 }
816
817 if state_changed {
819 if let Some(ref callback) = self.event_callback {
820 callback(NatTraversalEvent::SessionStateChanged {
821 peer_id: *peer_id,
822 new_state: session.session_state.state,
823 });
824 }
825 }
826 }
827
828 Ok(updates)
829 }
830
831 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
833 let sessions = self.active_sessions.clone();
834 let shutdown = self.shutdown.clone();
835
836 tokio::spawn(async move {
837 let mut ticker = tokio::time::interval(interval);
838
839 loop {
840 ticker.tick().await;
841
842 if shutdown.load(Ordering::Relaxed) {
843 break;
844 }
845
846 if let Ok(sessions_guard) = sessions.read() {
848 for (_peer_id, _session) in sessions_guard.iter() {
849 }
852 }
853 }
854 })
855 }
856
857 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
859 let sessions = self
860 .active_sessions
861 .read()
862 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
863 let bootstrap_nodes = self
864 .bootstrap_nodes
865 .read()
866 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
867
868 Ok(NatTraversalStatistics {
869 active_sessions: sessions.len(),
870 total_bootstrap_nodes: bootstrap_nodes.len(),
871 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
872 average_coordination_time: Duration::from_millis(500), total_attempts: 0,
874 successful_connections: 0,
875 direct_connections: 0,
876 relayed_connections: 0,
877 })
878 }
879
880 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
882 let mut bootstrap_nodes = self
883 .bootstrap_nodes
884 .write()
885 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
886
887 if !bootstrap_nodes.iter().any(|b| b.address == address) {
889 bootstrap_nodes.push(BootstrapNode {
890 address,
891 last_seen: std::time::Instant::now(),
892 can_coordinate: true,
893 rtt: None,
894 coordination_count: 0,
895 });
896 info!("Added bootstrap node: {}", address);
897 }
898 Ok(())
899 }
900
901 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
903 let mut bootstrap_nodes = self
904 .bootstrap_nodes
905 .write()
906 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
907 bootstrap_nodes.retain(|b| b.address != address);
908 info!("Removed bootstrap node: {}", address);
909 Ok(())
910 }
911
912 async fn create_quinn_endpoint(
916 config: &NatTraversalConfig,
917 _nat_role: NatTraversalRole,
918 ) -> Result<
919 (
920 QuinnEndpoint,
921 mpsc::UnboundedSender<NatTraversalEvent>,
922 SocketAddr,
923 ),
924 NatTraversalError,
925 > {
926 use std::sync::Arc;
927
928 let server_config = match config.role {
930 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
931 let cert_config = CertificateConfig {
933 common_name: format!("ant-quic-{}", config.role.name()),
934 subject_alt_names: vec!["localhost".to_string(), "ant-quic-node".to_string()],
935 self_signed: true, ..CertificateConfig::default()
937 };
938
939 let cert_manager = CertificateManager::new(cert_config).map_err(|e| {
940 NatTraversalError::ConfigError(format!(
941 "Certificate manager creation failed: {}",
942 e
943 ))
944 })?;
945
946 let cert_bundle = cert_manager.generate_certificate().map_err(|e| {
947 NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e))
948 })?;
949
950 let rustls_config =
951 cert_manager
952 .create_server_config(&cert_bundle)
953 .map_err(|e| {
954 NatTraversalError::ConfigError(format!(
955 "Server config creation failed: {}",
956 e
957 ))
958 })?;
959
960 let server_crypto = QuicServerConfig::try_from(rustls_config.as_ref().clone())
961 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
962
963 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
964
965 let mut transport_config = TransportConfig::default();
967 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
968 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
969
970 let nat_config = crate::transport_parameters::NatTraversalConfig {
972 role: match config.role {
973 EndpointRole::Bootstrap => {
974 crate::transport_parameters::NatTraversalRole::Bootstrap
975 }
976 EndpointRole::Server { can_coordinate } => {
977 crate::transport_parameters::NatTraversalRole::Server {
978 can_relay: can_coordinate,
979 }
980 }
981 EndpointRole::Client => {
982 crate::transport_parameters::NatTraversalRole::Client
983 }
984 },
985 max_candidates: VarInt::from_u32(config.max_candidates as u32),
986 coordination_timeout: VarInt::from_u64(
987 config.coordination_timeout.as_millis() as u64
988 )
989 .unwrap(),
990 max_concurrent_attempts: VarInt::from_u32(
991 config.max_concurrent_attempts as u32,
992 ),
993 peer_id: None, };
995 transport_config.nat_traversal_config(Some(nat_config));
996
997 server_config.transport_config(Arc::new(transport_config));
998
999 Some(server_config)
1000 }
1001 _ => None,
1002 };
1003
1004 let client_config = {
1006 let cert_config = CertificateConfig {
1007 common_name: format!("ant-quic-{}", config.role.name()),
1008 subject_alt_names: vec!["localhost".to_string(), "ant-quic-node".to_string()],
1009 self_signed: true,
1010 ..CertificateConfig::default()
1011 };
1012
1013 let cert_manager = CertificateManager::new(cert_config).map_err(|e| {
1014 NatTraversalError::ConfigError(format!(
1015 "Certificate manager creation failed: {}",
1016 e
1017 ))
1018 })?;
1019
1020 let _cert_bundle = cert_manager.generate_certificate().map_err(|e| {
1021 NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e))
1022 })?;
1023
1024 let rustls_config = cert_manager.create_client_config().map_err(|e| {
1025 NatTraversalError::ConfigError(format!("Client config creation failed: {}", e))
1026 })?;
1027
1028 let client_crypto = QuicClientConfig::try_from(rustls_config.as_ref().clone())
1029 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
1030
1031 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
1032
1033 let mut transport_config = TransportConfig::default();
1035 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
1036 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
1037
1038 let nat_config = crate::transport_parameters::NatTraversalConfig {
1040 role: match config.role {
1041 EndpointRole::Bootstrap => {
1042 crate::transport_parameters::NatTraversalRole::Bootstrap
1043 }
1044 EndpointRole::Server { can_coordinate } => {
1045 crate::transport_parameters::NatTraversalRole::Server {
1046 can_relay: can_coordinate,
1047 }
1048 }
1049 EndpointRole::Client => crate::transport_parameters::NatTraversalRole::Client,
1050 },
1051 max_candidates: VarInt::from_u32(config.max_candidates as u32),
1052 coordination_timeout: VarInt::from_u64(
1053 config.coordination_timeout.as_millis() as u64
1054 )
1055 .unwrap(),
1056 max_concurrent_attempts: VarInt::from_u32(config.max_concurrent_attempts as u32),
1057 peer_id: None, };
1059 transport_config.nat_traversal_config(Some(nat_config));
1060
1061 client_config.transport_config(Arc::new(transport_config));
1062
1063 client_config
1064 };
1065
1066 let bind_addr = config.bind_addr.unwrap_or("0.0.0.0:0".parse().unwrap());
1068 let socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1069 NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {}", e))
1070 })?;
1071
1072 info!("Binding endpoint to {}", bind_addr);
1073
1074 let std_socket = socket.into_std().map_err(|e| {
1076 NatTraversalError::NetworkError(format!("Failed to convert socket: {}", e))
1077 })?;
1078
1079 let mut endpoint = QuinnEndpoint::new(
1081 EndpointConfig::default(),
1082 server_config,
1083 std_socket,
1084 Arc::new(TokioRuntime),
1085 )
1086 .map_err(|e| {
1087 NatTraversalError::ConfigError(format!("Failed to create Quinn endpoint: {}", e))
1088 })?;
1089
1090 endpoint.set_default_client_config(client_config);
1092
1093 let local_addr = endpoint.local_addr().map_err(|e| {
1095 NatTraversalError::NetworkError(format!("Failed to get local address: {}", e))
1096 })?;
1097
1098 info!("Endpoint bound to actual address: {}", local_addr);
1099
1100 let (event_tx, _event_rx) = mpsc::unbounded_channel();
1102
1103 Ok((endpoint, event_tx, local_addr))
1104 }
1105
1106 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1108 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1109 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1110 })?;
1111
1112 let _socket = UdpSocket::bind(bind_addr).await.map_err(|e| {
1114 NatTraversalError::NetworkError(format!("Failed to bind to {}: {}", bind_addr, e))
1115 })?;
1116
1117 info!("Started listening on {}", bind_addr);
1118
1119 let endpoint_clone = endpoint.clone();
1121 let shutdown_clone = self.shutdown.clone();
1122 let event_tx = self.event_tx.as_ref().unwrap().clone();
1123 let connections_clone = self.connections.clone();
1124
1125 tokio::spawn(async move {
1126 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx, connections_clone)
1127 .await;
1128 });
1129
1130 Ok(())
1131 }
1132
1133 async fn accept_connections(
1135 endpoint: QuinnEndpoint,
1136 shutdown: Arc<AtomicBool>,
1137 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1138 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
1139 ) {
1140 while !shutdown.load(Ordering::Relaxed) {
1141 match endpoint.accept().await {
1142 Some(connecting) => {
1143 let event_tx = event_tx.clone();
1144 let connections = connections.clone();
1145 tokio::spawn(async move {
1146 match connecting.await {
1147 Ok(connection) => {
1148 info!("Accepted connection from {}", connection.remote_address());
1149
1150 let peer_id = Self::generate_peer_id_from_address(
1152 connection.remote_address(),
1153 );
1154
1155 if let Ok(mut conns) = connections.write() {
1157 conns.insert(peer_id, connection.clone());
1158 }
1159
1160 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1161 peer_id,
1162 remote_address: connection.remote_address(),
1163 });
1164
1165 Self::handle_connection(connection, event_tx).await;
1167 }
1168 Err(e) => {
1169 debug!("Connection failed: {}", e);
1170 }
1171 }
1172 });
1173 }
1174 None => {
1175 break;
1177 }
1178 }
1179 }
1180 }
1181
1182 async fn poll_discovery(
1184 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1185 shutdown: Arc<AtomicBool>,
1186 _event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1187 ) {
1188 use tokio::time::{Duration, interval};
1189
1190 let mut poll_interval = interval(Duration::from_millis(100));
1191
1192 while !shutdown.load(Ordering::Relaxed) {
1193 poll_interval.tick().await;
1194
1195 let events = match discovery_manager.lock() {
1197 Ok(mut discovery) => discovery.poll(std::time::Instant::now()),
1198 Err(e) => {
1199 error!("Failed to lock discovery manager: {}", e);
1200 continue;
1201 }
1202 };
1203
1204 for event in events {
1206 match event {
1207 DiscoveryEvent::DiscoveryStarted {
1208 peer_id,
1209 bootstrap_count,
1210 } => {
1211 debug!(
1212 "Discovery started for peer {:?} with {} bootstrap nodes",
1213 peer_id, bootstrap_count
1214 );
1215 }
1216 DiscoveryEvent::LocalScanningStarted => {
1217 debug!("Local interface scanning started");
1218 }
1219 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1220 debug!("Discovered local candidate: {}", candidate.address);
1221 }
1224 DiscoveryEvent::LocalScanningCompleted {
1225 candidate_count,
1226 duration,
1227 } => {
1228 debug!(
1229 "Local interface scanning completed: {} candidates in {:?}",
1230 candidate_count, duration
1231 );
1232 }
1233 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1234 debug!(
1235 "Server reflexive discovery started with {} bootstrap nodes",
1236 bootstrap_count
1237 );
1238 }
1239 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1240 candidate,
1241 bootstrap_node,
1242 } => {
1243 debug!(
1244 "Discovered server-reflexive candidate {} via bootstrap {}",
1245 candidate.address, bootstrap_node
1246 );
1247 }
1249 DiscoveryEvent::BootstrapQueryFailed {
1250 bootstrap_node,
1251 error,
1252 } => {
1253 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1254 }
1255 DiscoveryEvent::SymmetricPredictionStarted { base_address } => {
1256 debug!(
1257 "Symmetric NAT prediction started from base address {}",
1258 base_address
1259 );
1260 }
1261 DiscoveryEvent::PredictedCandidateGenerated {
1262 candidate,
1263 confidence,
1264 } => {
1265 debug!(
1266 "Predicted symmetric NAT candidate {} with confidence {}",
1267 candidate.address, confidence
1268 );
1269 }
1271 DiscoveryEvent::PortAllocationDetected {
1272 port,
1273 source_address,
1274 bootstrap_node,
1275 timestamp,
1276 } => {
1277 debug!(
1278 "Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1279 port, source_address, bootstrap_node, timestamp
1280 );
1281 }
1282 DiscoveryEvent::DiscoveryCompleted {
1283 candidate_count,
1284 total_duration,
1285 success_rate,
1286 } => {
1287 info!(
1288 "Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1289 candidate_count,
1290 total_duration,
1291 success_rate * 100.0
1292 );
1293 }
1296 DiscoveryEvent::DiscoveryFailed {
1297 error,
1298 partial_results,
1299 } => {
1300 warn!(
1301 "Discovery failed: {} (found {} partial candidates)",
1302 error,
1303 partial_results.len()
1304 );
1305
1306 }
1311 DiscoveryEvent::PathValidationRequested {
1312 candidate_id,
1313 candidate_address,
1314 challenge_token,
1315 } => {
1316 debug!(
1317 "PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1318 candidate_id.0, candidate_address, challenge_token
1319 );
1320 }
1323 DiscoveryEvent::PathValidationResponse {
1324 candidate_id,
1325 candidate_address,
1326 challenge_token: _,
1327 rtt,
1328 } => {
1329 debug!(
1330 "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1331 candidate_id.0, candidate_address, rtt
1332 );
1333 }
1335 }
1336 }
1337 }
1338
1339 info!("Discovery polling task shutting down");
1340 }
1341
1342 async fn handle_connection(
1344 connection: QuinnConnection,
1345 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1346 ) {
1347 let peer_id = Self::generate_peer_id_from_address(connection.remote_address());
1348 let remote_address = connection.remote_address();
1349
1350 debug!(
1351 "Handling connection from peer {:?} at {}",
1352 peer_id, remote_address
1353 );
1354
1355 loop {
1357 tokio::select! {
1358 stream = connection.accept_bi() => {
1359 match stream {
1360 Ok((send, recv)) => {
1361 tokio::spawn(async move {
1362 Self::handle_bi_stream(send, recv).await;
1363 });
1364 }
1365 Err(e) => {
1366 debug!("Error accepting bidirectional stream: {}", e);
1367 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1368 peer_id,
1369 reason: format!("Stream error: {}", e),
1370 });
1371 break;
1372 }
1373 }
1374 }
1375 stream = connection.accept_uni() => {
1376 match stream {
1377 Ok(recv) => {
1378 tokio::spawn(async move {
1379 Self::handle_uni_stream(recv).await;
1380 });
1381 }
1382 Err(e) => {
1383 debug!("Error accepting unidirectional stream: {}", e);
1384 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1385 peer_id,
1386 reason: format!("Stream error: {}", e),
1387 });
1388 break;
1389 }
1390 }
1391 }
1392 }
1393 }
1394 }
1395
1396 async fn handle_bi_stream(
1398 _send: crate::quinn_high_level::SendStream,
1399 _recv: crate::quinn_high_level::RecvStream,
1400 ) {
1401 }
1430
1431 async fn handle_uni_stream(mut recv: crate::quinn_high_level::RecvStream) {
1433 let mut buffer = vec![0u8; 1024];
1434
1435 loop {
1436 match recv.read(&mut buffer).await {
1437 Ok(Some(size)) => {
1438 debug!("Received {} bytes on unidirectional stream", size);
1439 }
1441 Ok(None) => {
1442 debug!("Unidirectional stream closed by peer");
1443 break;
1444 }
1445 Err(e) => {
1446 debug!("Error reading from unidirectional stream: {}", e);
1447 break;
1448 }
1449 }
1450 }
1451 }
1452
1453 pub async fn connect_to_peer(
1455 &self,
1456 peer_id: PeerId,
1457 server_name: &str,
1458 remote_addr: SocketAddr,
1459 ) -> Result<QuinnConnection, NatTraversalError> {
1460 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1461 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1462 })?;
1463
1464 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1465
1466 let connecting = endpoint.connect(remote_addr, server_name).map_err(|e| {
1468 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {}", e))
1469 })?;
1470
1471 let connection = timeout(Duration::from_secs(10), connecting)
1472 .await
1473 .map_err(|_| NatTraversalError::Timeout)?
1474 .map_err(|e| {
1475 NatTraversalError::ConnectionFailed(format!("Connection failed: {}", e))
1476 })?;
1477
1478 info!(
1479 "Successfully connected to peer {:?} at {}",
1480 peer_id, remote_addr
1481 );
1482
1483 if let Some(ref event_tx) = self.event_tx {
1485 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1486 peer_id,
1487 remote_address: remote_addr,
1488 });
1489 }
1490
1491 Ok(connection)
1492 }
1493
1494 pub async fn accept_connection(&self) -> Result<(PeerId, QuinnConnection), NatTraversalError> {
1496 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
1497 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
1498 })?;
1499
1500 let incoming = endpoint
1502 .accept()
1503 .await
1504 .ok_or_else(|| NatTraversalError::NetworkError("Endpoint closed".to_string()))?;
1505
1506 let remote_addr = incoming.remote_address();
1507 info!("Accepting connection from {}", remote_addr);
1508
1509 let connection = incoming.await.map_err(|e| {
1511 NatTraversalError::ConnectionFailed(format!("Failed to accept connection: {}", e))
1512 })?;
1513
1514 let peer_id = self
1516 .extract_peer_id_from_connection(&connection)
1517 .await
1518 .unwrap_or_else(|| Self::generate_peer_id_from_address(remote_addr));
1519
1520 {
1522 let mut connections = self.connections.write().map_err(|_| {
1523 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1524 })?;
1525 connections.insert(peer_id, connection.clone());
1526 }
1527
1528 info!(
1529 "Connection accepted from peer {:?} at {}",
1530 peer_id, remote_addr
1531 );
1532
1533 if let Some(ref event_tx) = self.event_tx {
1535 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1536 peer_id,
1537 remote_address: remote_addr,
1538 });
1539 }
1540
1541 Ok((peer_id, connection))
1542 }
1543
1544 pub fn local_peer_id(&self) -> PeerId {
1546 self.local_peer_id
1547 }
1548
1549 pub fn get_connection(
1551 &self,
1552 peer_id: &PeerId,
1553 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
1554 let connections = self.connections.read().map_err(|_| {
1555 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1556 })?;
1557 Ok(connections.get(peer_id).cloned())
1558 }
1559
1560 pub fn remove_connection(
1562 &self,
1563 peer_id: &PeerId,
1564 ) -> Result<Option<QuinnConnection>, NatTraversalError> {
1565 let mut connections = self.connections.write().map_err(|_| {
1566 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1567 })?;
1568 Ok(connections.remove(peer_id))
1569 }
1570
1571 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
1573 let connections = self.connections.read().map_err(|_| {
1574 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1575 })?;
1576 let mut result = Vec::new();
1577 for (peer_id, connection) in connections.iter() {
1578 result.push((*peer_id, connection.remote_address()));
1579 }
1580 Ok(result)
1581 }
1582
1583 pub async fn handle_connection_data(
1585 &self,
1586 peer_id: PeerId,
1587 connection: &QuinnConnection,
1588 ) -> Result<(), NatTraversalError> {
1589 info!("Handling connection data from peer {:?}", peer_id);
1590
1591 let connection_clone = connection.clone();
1593 let peer_id_clone = peer_id;
1594 tokio::spawn(async move {
1595 loop {
1596 match connection_clone.accept_bi().await {
1597 Ok((send, recv)) => {
1598 debug!(
1599 "Accepted bidirectional stream from peer {:?}",
1600 peer_id_clone
1601 );
1602 tokio::spawn(Self::handle_bi_stream(send, recv));
1603 }
1604 Err(ConnectionError::ApplicationClosed(_)) => {
1605 debug!("Connection closed by peer {:?}", peer_id_clone);
1606 break;
1607 }
1608 Err(e) => {
1609 debug!(
1610 "Error accepting bidirectional stream from peer {:?}: {}",
1611 peer_id_clone, e
1612 );
1613 break;
1614 }
1615 }
1616 }
1617 });
1618
1619 let connection_clone = connection.clone();
1621 let peer_id_clone = peer_id;
1622 tokio::spawn(async move {
1623 loop {
1624 match connection_clone.accept_uni().await {
1625 Ok(recv) => {
1626 debug!(
1627 "Accepted unidirectional stream from peer {:?}",
1628 peer_id_clone
1629 );
1630 tokio::spawn(Self::handle_uni_stream(recv));
1631 }
1632 Err(ConnectionError::ApplicationClosed(_)) => {
1633 debug!("Connection closed by peer {:?}", peer_id_clone);
1634 break;
1635 }
1636 Err(e) => {
1637 debug!(
1638 "Error accepting unidirectional stream from peer {:?}: {}",
1639 peer_id_clone, e
1640 );
1641 break;
1642 }
1643 }
1644 }
1645 });
1646
1647 Ok(())
1648 }
1649
1650 fn generate_local_peer_id() -> PeerId {
1652 use std::collections::hash_map::DefaultHasher;
1653 use std::hash::{Hash, Hasher};
1654 use std::time::SystemTime;
1655
1656 let mut hasher = DefaultHasher::new();
1657 SystemTime::now().hash(&mut hasher);
1658 std::process::id().hash(&mut hasher);
1659
1660 let hash = hasher.finish();
1661 let mut peer_id = [0u8; 32];
1662 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
1663
1664 for i in 8..32 {
1666 peer_id[i] = rand::random();
1667 }
1668
1669 PeerId(peer_id)
1670 }
1671
1672 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
1678 use std::collections::hash_map::DefaultHasher;
1679 use std::hash::{Hash, Hasher};
1680
1681 let mut hasher = DefaultHasher::new();
1682 addr.hash(&mut hasher);
1683
1684 let hash = hasher.finish();
1685 let mut peer_id = [0u8; 32];
1686 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
1687
1688 for i in 8..32 {
1691 peer_id[i] = rand::random();
1692 }
1693
1694 warn!(
1695 "Generated temporary peer ID from address {}. This ID is not persistent!",
1696 addr
1697 );
1698 PeerId(peer_id)
1699 }
1700
1701 async fn extract_peer_id_from_connection(
1703 &self,
1704 connection: &QuinnConnection,
1705 ) -> Option<PeerId> {
1706 if let Some(identity) = connection.peer_identity() {
1708 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
1710 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
1712 Ok(peer_id) => {
1713 debug!("Derived peer ID from Ed25519 public key");
1714 return Some(peer_id);
1715 }
1716 Err(e) => {
1717 warn!("Failed to derive peer ID from public key: {}", e);
1718 }
1719 }
1720 }
1721 }
1723
1724 None
1725 }
1726
1727 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
1729 self.shutdown.store(true, Ordering::Relaxed);
1731
1732 {
1734 let mut connections = self.connections.write().map_err(|_| {
1735 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
1736 })?;
1737 for (peer_id, connection) in connections.drain() {
1738 info!("Closing connection to peer {:?}", peer_id);
1739 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
1740 }
1741 }
1742
1743 if let Some(ref endpoint) = self.quinn_endpoint {
1745 endpoint.wait_idle().await;
1746 }
1747
1748 info!("NAT traversal endpoint shutdown completed");
1749 Ok(())
1750 }
1751
1752 pub async fn discover_candidates(
1754 &self,
1755 peer_id: PeerId,
1756 ) -> Result<Vec<CandidateAddress>, NatTraversalError> {
1757 debug!("Discovering address candidates for peer {:?}", peer_id);
1758
1759 let mut candidates = Vec::new();
1760
1761 let bootstrap_nodes = {
1763 let nodes = self
1764 .bootstrap_nodes
1765 .read()
1766 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1767 nodes.clone()
1768 };
1769
1770 {
1772 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1773 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1774 })?;
1775
1776 discovery
1777 .start_discovery(peer_id, bootstrap_nodes)
1778 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1779 }
1780
1781 let timeout_duration = self.config.coordination_timeout;
1783 let start_time = std::time::Instant::now();
1784
1785 while start_time.elapsed() < timeout_duration {
1786 let discovery_events = {
1787 let mut discovery = self.discovery_manager.lock().map_err(|_| {
1788 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1789 })?;
1790 discovery.poll(std::time::Instant::now())
1791 };
1792
1793 for event in discovery_events {
1794 match event {
1795 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1796 candidates.push(candidate.clone());
1797
1798 self.send_candidate_advertisement(peer_id, &candidate)
1800 .await
1801 .unwrap_or_else(|e| {
1802 debug!("Failed to send candidate advertisement: {}", e)
1803 });
1804 }
1805 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
1806 candidates.push(candidate.clone());
1807
1808 self.send_candidate_advertisement(peer_id, &candidate)
1810 .await
1811 .unwrap_or_else(|e| {
1812 debug!("Failed to send candidate advertisement: {}", e)
1813 });
1814 }
1815 DiscoveryEvent::PredictedCandidateGenerated { candidate, .. } => {
1816 candidates.push(candidate.clone());
1817
1818 self.send_candidate_advertisement(peer_id, &candidate)
1820 .await
1821 .unwrap_or_else(|e| {
1822 debug!("Failed to send candidate advertisement: {}", e)
1823 });
1824 }
1825 DiscoveryEvent::DiscoveryCompleted { .. } => {
1826 return Ok(candidates);
1828 }
1829 DiscoveryEvent::DiscoveryFailed {
1830 error,
1831 partial_results,
1832 } => {
1833 candidates.extend(partial_results);
1835 if candidates.is_empty() {
1836 return Err(NatTraversalError::CandidateDiscoveryFailed(
1837 error.to_string(),
1838 ));
1839 }
1840 return Ok(candidates);
1841 }
1842 _ => {}
1843 }
1844 }
1845
1846 sleep(Duration::from_millis(10)).await;
1848 }
1849
1850 if candidates.is_empty() {
1851 Err(NatTraversalError::NoCandidatesFound)
1852 } else {
1853 Ok(candidates)
1854 }
1855 }
1856
1857 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
1859 let mut frame = Vec::new();
1867
1868 frame.push(0x41);
1870
1871 frame.extend_from_slice(&peer_id.0);
1873
1874 let timestamp = std::time::SystemTime::now()
1876 .duration_since(std::time::UNIX_EPOCH)
1877 .unwrap_or_default()
1878 .as_millis() as u64;
1879 frame.extend_from_slice(×tamp.to_be_bytes());
1880
1881 let mut token = [0u8; 16];
1883 for byte in &mut token {
1884 *byte = rand::random();
1885 }
1886 frame.extend_from_slice(&token);
1887
1888 Ok(frame)
1889 }
1890
1891 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1892 debug!("Attempting hole punching for peer {:?}", peer_id);
1893
1894 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
1896
1897 if candidate_pairs.is_empty() {
1898 return Err(NatTraversalError::NoCandidatesFound);
1899 }
1900
1901 info!(
1902 "Generated {} candidate pairs for hole punching with peer {:?}",
1903 candidate_pairs.len(),
1904 peer_id
1905 );
1906
1907 self.attempt_quinn_hole_punching(peer_id, candidate_pairs)
1910 }
1911
1912 fn get_candidate_pairs_for_peer(
1914 &self,
1915 peer_id: PeerId,
1916 ) -> Result<Vec<CandidatePair>, NatTraversalError> {
1917 let discovery_candidates = {
1919 let discovery = self.discovery_manager.lock().map_err(|_| {
1920 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
1921 })?;
1922
1923 discovery.get_candidates_for_peer(peer_id)
1924 };
1925
1926 if discovery_candidates.is_empty() {
1927 return Err(NatTraversalError::NoCandidatesFound);
1928 }
1929
1930 let mut candidate_pairs = Vec::new();
1932 let local_candidates = discovery_candidates
1933 .iter()
1934 .filter(|c| matches!(c.source, CandidateSource::Local))
1935 .collect::<Vec<_>>();
1936 let remote_candidates = discovery_candidates
1937 .iter()
1938 .filter(|c| !matches!(c.source, CandidateSource::Local))
1939 .collect::<Vec<_>>();
1940
1941 for local in &local_candidates {
1943 for remote in &remote_candidates {
1944 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
1945 candidate_pairs.push(CandidatePair {
1946 local_candidate: (*local).clone(),
1947 remote_candidate: (*remote).clone(),
1948 priority: pair_priority,
1949 state: CandidatePairState::Waiting,
1950 });
1951 }
1952 }
1953
1954 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
1956
1957 candidate_pairs.truncate(8);
1959
1960 Ok(candidate_pairs)
1961 }
1962
1963 fn calculate_candidate_pair_priority(
1965 &self,
1966 local: &CandidateAddress,
1967 remote: &CandidateAddress,
1968 ) -> u64 {
1969 let local_type_preference = match local.source {
1973 CandidateSource::Local => 126,
1974 CandidateSource::Observed { .. } => 100,
1975 CandidateSource::Predicted => 75,
1976 CandidateSource::Peer => 50,
1977 };
1978
1979 let remote_type_preference = match remote.source {
1980 CandidateSource::Local => 126,
1981 CandidateSource::Observed { .. } => 100,
1982 CandidateSource::Predicted => 75,
1983 CandidateSource::Peer => 50,
1984 };
1985
1986 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
1988 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
1989
1990 let min_priority = local_priority.min(remote_priority);
1991 let max_priority = local_priority.max(remote_priority);
1992
1993 (min_priority << 32)
1994 | (max_priority << 1)
1995 | if local_priority > remote_priority {
1996 1
1997 } else {
1998 0
1999 }
2000 }
2001
2002 fn attempt_quinn_hole_punching(
2004 &self,
2005 peer_id: PeerId,
2006 candidate_pairs: Vec<CandidatePair>,
2007 ) -> Result<(), NatTraversalError> {
2008 let _endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2009 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2010 })?;
2011
2012 for pair in candidate_pairs {
2013 debug!(
2014 "Attempting hole punch with candidate pair: {} -> {}",
2015 pair.local_candidate.address, pair.remote_candidate.address
2016 );
2017
2018 let mut challenge_data = [0u8; 8];
2020 for byte in &mut challenge_data {
2021 *byte = rand::random();
2022 }
2023
2024 let local_socket =
2026 std::net::UdpSocket::bind(pair.local_candidate.address).map_err(|e| {
2027 NatTraversalError::NetworkError(format!(
2028 "Failed to bind to local candidate: {}",
2029 e
2030 ))
2031 })?;
2032
2033 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
2035
2036 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
2038 Ok(bytes_sent) => {
2039 debug!(
2040 "Sent {} bytes for hole punch from {} to {}",
2041 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address
2042 );
2043
2044 local_socket
2046 .set_read_timeout(Some(Duration::from_millis(100)))
2047 .map_err(|e| {
2048 NatTraversalError::NetworkError(format!("Failed to set timeout: {}", e))
2049 })?;
2050
2051 let mut response_buffer = [0u8; 1024];
2053 match local_socket.recv_from(&mut response_buffer) {
2054 Ok((_bytes_received, response_addr)) => {
2055 if response_addr == pair.remote_candidate.address {
2056 info!(
2057 "Hole punch succeeded for peer {:?}: {} <-> {}",
2058 peer_id,
2059 pair.local_candidate.address,
2060 pair.remote_candidate.address
2061 );
2062
2063 self.store_successful_candidate_pair(peer_id, pair)?;
2065 return Ok(());
2066 } else {
2067 debug!(
2068 "Received response from unexpected address: {}",
2069 response_addr
2070 );
2071 }
2072 }
2073 Err(e)
2074 if e.kind() == std::io::ErrorKind::WouldBlock
2075 || e.kind() == std::io::ErrorKind::TimedOut =>
2076 {
2077 debug!("No response received for hole punch attempt");
2078 }
2079 Err(e) => {
2080 debug!("Error receiving hole punch response: {}", e);
2081 }
2082 }
2083 }
2084 Err(e) => {
2085 debug!("Failed to send hole punch packet: {}", e);
2086 }
2087 }
2088 }
2089
2090 Err(NatTraversalError::HolePunchingFailed)
2092 }
2093
2094 fn create_path_challenge_packet(
2096 &self,
2097 challenge_data: [u8; 8],
2098 ) -> Result<Vec<u8>, NatTraversalError> {
2099 let mut packet = Vec::new();
2102
2103 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
2112 }
2113
2114 fn store_successful_candidate_pair(
2116 &self,
2117 peer_id: PeerId,
2118 pair: CandidatePair,
2119 ) -> Result<(), NatTraversalError> {
2120 debug!(
2121 "Storing successful candidate pair for peer {:?}: {} <-> {}",
2122 peer_id, pair.local_candidate.address, pair.remote_candidate.address
2123 );
2124
2125 if let Some(ref callback) = self.event_callback {
2130 callback(NatTraversalEvent::PathValidated {
2131 peer_id,
2132 address: pair.remote_candidate.address,
2133 rtt: Duration::from_millis(50), });
2135
2136 callback(NatTraversalEvent::TraversalSucceeded {
2137 peer_id,
2138 final_address: pair.remote_candidate.address,
2139 total_time: Duration::from_secs(1), });
2141 }
2142
2143 Ok(())
2144 }
2145
2146 fn attempt_connection_to_candidate(
2148 &self,
2149 peer_id: PeerId,
2150 candidate: &CandidateAddress,
2151 ) -> Result<(), NatTraversalError> {
2152 {
2153 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
2154 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
2155 })?;
2156
2157 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
2159
2160 debug!(
2161 "Attempting Quinn connection to candidate {} for peer {:?}",
2162 candidate.address, peer_id
2163 );
2164
2165 match endpoint.connect(candidate.address, &server_name) {
2167 Ok(connecting) => {
2168 info!(
2169 "Connection attempt initiated to {} for peer {:?}",
2170 candidate.address, peer_id
2171 );
2172
2173 if let Some(event_tx) = &self.event_tx {
2175 let event_tx = event_tx.clone();
2176 let connections = self.connections.clone();
2177 let peer_id_clone = peer_id;
2178 let address = candidate.address;
2179
2180 tokio::spawn(async move {
2181 match connecting.await {
2182 Ok(connection) => {
2183 info!(
2184 "Successfully connected to {} for peer {:?}",
2185 address, peer_id_clone
2186 );
2187
2188 if let Ok(mut conns) = connections.write() {
2190 conns.insert(peer_id_clone, connection.clone());
2191 }
2192
2193 let _ =
2195 event_tx.send(NatTraversalEvent::ConnectionEstablished {
2196 peer_id: peer_id_clone,
2197 remote_address: address,
2198 });
2199
2200 Self::handle_connection(connection, event_tx).await;
2202 }
2203 Err(e) => {
2204 warn!("Connection to {} failed: {}", address, e);
2205 }
2206 }
2207 });
2208 }
2209
2210 Ok(())
2211 }
2212 Err(e) => {
2213 warn!(
2214 "Failed to initiate connection to {}: {}",
2215 candidate.address, e
2216 );
2217 Err(NatTraversalError::ConnectionFailed(format!(
2218 "Failed to connect to {}: {}",
2219 candidate.address, e
2220 )))
2221 }
2222 }
2223 }
2224 }
2225
2226 pub fn poll(
2228 &self,
2229 now: std::time::Instant,
2230 ) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
2231 let mut events = Vec::new();
2232
2233 {
2235 let mut discovery = self.discovery_manager.lock().map_err(|_| {
2236 NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string())
2237 })?;
2238
2239 let discovery_events = discovery.poll(now);
2240
2241 for discovery_event in discovery_events {
2243 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2244 events.push(nat_event.clone());
2245
2246 if let Some(ref callback) = self.event_callback {
2248 callback(nat_event.clone());
2249 }
2250
2251 if let NatTraversalEvent::CandidateDiscovered {
2253 peer_id: _,
2254 candidate: _,
2255 } = &nat_event
2256 {
2257 }
2260 }
2261 }
2262 }
2263
2264 let mut sessions = self
2266 .active_sessions
2267 .write()
2268 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2269
2270 for (_peer_id, session) in sessions.iter_mut() {
2271 let elapsed = now.duration_since(session.started_at);
2272
2273 let timeout = self.get_phase_timeout(session.phase);
2275
2276 if elapsed > timeout {
2278 match session.phase {
2279 TraversalPhase::Discovery => {
2280 let discovered_candidates = {
2282 let discovery = self.discovery_manager.lock().map_err(|_| {
2283 NatTraversalError::ProtocolError(
2284 "Discovery manager lock poisoned".to_string(),
2285 )
2286 });
2287 match discovery {
2288 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
2289 Err(_) => Vec::new(),
2290 }
2291 };
2292
2293 session.candidates = discovered_candidates.clone();
2295
2296 if !session.candidates.is_empty() {
2298 session.phase = TraversalPhase::Coordination;
2300 let event = NatTraversalEvent::PhaseTransition {
2301 peer_id: session.peer_id,
2302 from_phase: TraversalPhase::Discovery,
2303 to_phase: TraversalPhase::Coordination,
2304 };
2305 events.push(event.clone());
2306 if let Some(ref callback) = self.event_callback {
2307 callback(event);
2308 }
2309 info!(
2310 "Peer {:?} advanced from Discovery to Coordination with {} candidates",
2311 session.peer_id,
2312 session.candidates.len()
2313 );
2314 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
2315 session.attempt += 1;
2317 session.started_at = now;
2318 let backoff_duration = self.calculate_backoff(session.attempt);
2319 warn!(
2320 "Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
2321 session.peer_id, session.attempt, backoff_duration
2322 );
2323 } else {
2324 session.phase = TraversalPhase::Failed;
2326 let event = NatTraversalEvent::TraversalFailed {
2327 peer_id: session.peer_id,
2328 error: NatTraversalError::NoCandidatesFound,
2329 fallback_available: self.config.enable_relay_fallback,
2330 };
2331 events.push(event.clone());
2332 if let Some(ref callback) = self.event_callback {
2333 callback(event);
2334 }
2335 error!(
2336 "NAT traversal failed for peer {:?}: no candidates found after {} attempts",
2337 session.peer_id, session.attempt
2338 );
2339 }
2340 }
2341 TraversalPhase::Coordination => {
2342 if let Some(coordinator) = self.select_coordinator() {
2344 match self.send_coordination_request(session.peer_id, coordinator) {
2345 Ok(_) => {
2346 session.phase = TraversalPhase::Synchronization;
2347 let event = NatTraversalEvent::CoordinationRequested {
2348 peer_id: session.peer_id,
2349 coordinator,
2350 };
2351 events.push(event.clone());
2352 if let Some(ref callback) = self.event_callback {
2353 callback(event);
2354 }
2355 info!(
2356 "Coordination requested for peer {:?} via {}",
2357 session.peer_id, coordinator
2358 );
2359 }
2360 Err(e) => {
2361 self.handle_phase_failure(session, now, &mut events, e);
2362 }
2363 }
2364 } else {
2365 self.handle_phase_failure(
2366 session,
2367 now,
2368 &mut events,
2369 NatTraversalError::NoBootstrapNodes,
2370 );
2371 }
2372 }
2373 TraversalPhase::Synchronization => {
2374 if self.is_peer_synchronized(&session.peer_id) {
2376 session.phase = TraversalPhase::Punching;
2377 let event = NatTraversalEvent::HolePunchingStarted {
2378 peer_id: session.peer_id,
2379 targets: session.candidates.iter().map(|c| c.address).collect(),
2380 };
2381 events.push(event.clone());
2382 if let Some(ref callback) = self.event_callback {
2383 callback(event);
2384 }
2385 if let Err(e) =
2387 self.initiate_hole_punching(session.peer_id, &session.candidates)
2388 {
2389 self.handle_phase_failure(session, now, &mut events, e);
2390 }
2391 } else {
2392 self.handle_phase_failure(
2393 session,
2394 now,
2395 &mut events,
2396 NatTraversalError::ProtocolError(
2397 "Synchronization timeout".to_string(),
2398 ),
2399 );
2400 }
2401 }
2402 TraversalPhase::Punching => {
2403 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
2405 session.phase = TraversalPhase::Validation;
2406 let event = NatTraversalEvent::PathValidated {
2407 peer_id: session.peer_id,
2408 address: successful_path,
2409 rtt: Duration::from_millis(50), };
2411 events.push(event.clone());
2412 if let Some(ref callback) = self.event_callback {
2413 callback(event);
2414 }
2415 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
2417 self.handle_phase_failure(session, now, &mut events, e);
2418 }
2419 } else {
2420 self.handle_phase_failure(
2421 session,
2422 now,
2423 &mut events,
2424 NatTraversalError::PunchingFailed(
2425 "No successful punch".to_string(),
2426 ),
2427 );
2428 }
2429 }
2430 TraversalPhase::Validation => {
2431 if self.is_path_validated(&session.peer_id) {
2433 session.phase = TraversalPhase::Connected;
2434 let event = NatTraversalEvent::TraversalSucceeded {
2435 peer_id: session.peer_id,
2436 final_address: session
2437 .candidates
2438 .first()
2439 .map(|c| c.address)
2440 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
2441 total_time: elapsed,
2442 };
2443 events.push(event.clone());
2444 if let Some(ref callback) = self.event_callback {
2445 callback(event);
2446 }
2447 info!(
2448 "NAT traversal succeeded for peer {:?} in {:?}",
2449 session.peer_id, elapsed
2450 );
2451 } else {
2452 self.handle_phase_failure(
2453 session,
2454 now,
2455 &mut events,
2456 NatTraversalError::ValidationFailed(
2457 "Path validation timeout".to_string(),
2458 ),
2459 );
2460 }
2461 }
2462 TraversalPhase::Connected => {
2463 if !self.is_connection_healthy(&session.peer_id) {
2465 warn!(
2466 "Connection to peer {:?} is no longer healthy",
2467 session.peer_id
2468 );
2469 }
2471 }
2472 TraversalPhase::Failed => {
2473 }
2475 }
2476 }
2477 }
2478
2479 Ok(events)
2480 }
2481
2482 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
2484 match phase {
2485 TraversalPhase::Discovery => Duration::from_secs(10),
2486 TraversalPhase::Coordination => self.config.coordination_timeout,
2487 TraversalPhase::Synchronization => Duration::from_secs(3),
2488 TraversalPhase::Punching => Duration::from_secs(5),
2489 TraversalPhase::Validation => Duration::from_secs(5),
2490 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
2492 }
2493 }
2494
2495 fn calculate_backoff(&self, attempt: u32) -> Duration {
2497 let base = Duration::from_millis(1000);
2498 let max = Duration::from_secs(30);
2499 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
2500 let jitter = std::time::Duration::from_millis((rand::random::<u64>() % 200) as u64);
2501 backoff.min(max) + jitter
2502 }
2503
2504 fn handle_phase_failure(
2506 &self,
2507 session: &mut NatTraversalSession,
2508 now: std::time::Instant,
2509 events: &mut Vec<NatTraversalEvent>,
2510 error: NatTraversalError,
2511 ) {
2512 if session.attempt < self.config.max_concurrent_attempts as u32 {
2513 session.attempt += 1;
2515 session.started_at = now;
2516 let backoff = self.calculate_backoff(session.attempt);
2517 warn!(
2518 "Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
2519 session.phase, session.peer_id, error, session.attempt, backoff
2520 );
2521 } else {
2522 session.phase = TraversalPhase::Failed;
2524 let event = NatTraversalEvent::TraversalFailed {
2525 peer_id: session.peer_id,
2526 error,
2527 fallback_available: self.config.enable_relay_fallback,
2528 };
2529 events.push(event.clone());
2530 if let Some(ref callback) = self.event_callback {
2531 callback(event);
2532 }
2533 error!(
2534 "NAT traversal failed for peer {:?} after {} attempts",
2535 session.peer_id, session.attempt
2536 );
2537 }
2538 }
2539
2540 fn select_coordinator(&self) -> Option<SocketAddr> {
2542 if let Ok(nodes) = self.bootstrap_nodes.read() {
2543 if !nodes.is_empty() {
2545 let idx = rand::random::<usize>() % nodes.len();
2546 return Some(nodes[idx].address);
2547 }
2548 }
2549 None
2550 }
2551
2552 fn send_coordination_request(
2554 &self,
2555 peer_id: PeerId,
2556 coordinator: SocketAddr,
2557 ) -> Result<(), NatTraversalError> {
2558 debug!(
2559 "Sending coordination request for peer {:?} to {}",
2560 peer_id, coordinator
2561 );
2562
2563 {
2564 if let Ok(connections) = self.connections.read() {
2566 for (_peer, conn) in connections.iter() {
2568 if conn.remote_address() == coordinator {
2569 info!("Found existing connection to coordinator {}", coordinator);
2573 return Ok(());
2574 }
2575 }
2576 }
2577
2578 info!("Establishing connection to coordinator {}", coordinator);
2580 if let Some(endpoint) = &self.quinn_endpoint {
2581 let server_name = format!("bootstrap-{}", coordinator.ip());
2582 match endpoint.connect(coordinator, &server_name) {
2583 Ok(connecting) => {
2584 info!("Initiated connection to coordinator {}", coordinator);
2586
2587 if let Some(event_tx) = &self.event_tx {
2589 let event_tx = event_tx.clone();
2590 let connections = self.connections.clone();
2591
2592 tokio::spawn(async move {
2593 match connecting.await {
2594 Ok(connection) => {
2595 info!("Connected to coordinator {}", coordinator);
2596
2597 let bootstrap_peer_id =
2599 Self::generate_peer_id_from_address(coordinator);
2600
2601 if let Ok(mut conns) = connections.write() {
2603 conns.insert(bootstrap_peer_id, connection.clone());
2604 }
2605
2606 Self::handle_connection(connection, event_tx).await;
2608 }
2609 Err(e) => {
2610 warn!(
2611 "Failed to connect to coordinator {}: {}",
2612 coordinator, e
2613 );
2614 }
2615 }
2616 });
2617 }
2618
2619 Ok(())
2622 }
2623 Err(e) => Err(NatTraversalError::CoordinationFailed(format!(
2624 "Failed to connect to coordinator {}: {}",
2625 coordinator, e
2626 ))),
2627 }
2628 } else {
2629 Err(NatTraversalError::ConfigError(
2630 "Quinn endpoint not initialized".to_string(),
2631 ))
2632 }
2633 }
2634 }
2635
2636 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
2638 debug!("Checking synchronization status for peer {:?}", peer_id);
2639
2640 if let Ok(sessions) = self.active_sessions.read() {
2642 if let Some(session) = sessions.get(peer_id) {
2643 let has_candidates = !session.candidates.is_empty();
2646 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
2647
2648 debug!(
2649 "Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
2650 peer_id,
2651 session.phase,
2652 session.candidates.len(),
2653 past_discovery
2654 );
2655
2656 if has_candidates && past_discovery {
2657 info!(
2658 "Peer {:?} is synchronized with {} candidates",
2659 peer_id,
2660 session.candidates.len()
2661 );
2662 return true;
2663 }
2664
2665 if session.phase == TraversalPhase::Synchronization && has_candidates {
2667 info!(
2668 "Peer {:?} in synchronization phase with {} candidates, considering synchronized",
2669 peer_id,
2670 session.candidates.len()
2671 );
2672 return true;
2673 }
2674
2675 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
2677 info!(
2678 "Test mode: Considering peer {:?} synchronized in phase {:?}",
2679 peer_id, session.phase
2680 );
2681 return true;
2682 }
2683 }
2684 }
2685
2686 warn!("Peer {:?} is not synchronized", peer_id);
2687 false
2688 }
2689
2690 fn initiate_hole_punching(
2692 &self,
2693 peer_id: PeerId,
2694 candidates: &[CandidateAddress],
2695 ) -> Result<(), NatTraversalError> {
2696 if candidates.is_empty() {
2697 return Err(NatTraversalError::NoCandidatesFound);
2698 }
2699
2700 info!(
2701 "Initiating hole punching for peer {:?} to {} candidates",
2702 peer_id,
2703 candidates.len()
2704 );
2705
2706 {
2707 for candidate in candidates {
2709 debug!(
2710 "Attempting QUIC connection to candidate: {}",
2711 candidate.address
2712 );
2713
2714 match self.attempt_connection_to_candidate(peer_id, candidate) {
2716 Ok(_) => {
2717 info!(
2718 "Successfully initiated connection attempt to {}",
2719 candidate.address
2720 );
2721 }
2722 Err(e) => {
2723 warn!(
2724 "Failed to initiate connection to {}: {:?}",
2725 candidate.address, e
2726 );
2727 }
2728 }
2729 }
2730
2731 Ok(())
2732 }
2733 }
2734
2735 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
2737 {
2738 if let Ok(connections) = self.connections.read() {
2740 if let Some(conn) = connections.get(peer_id) {
2741 let addr = conn.remote_address();
2743 info!(
2744 "Found successful connection to peer {:?} at {}",
2745 peer_id, addr
2746 );
2747 return Some(addr);
2748 }
2749 }
2750 }
2751
2752 if let Ok(sessions) = self.active_sessions.read() {
2754 if let Some(session) = sessions.get(peer_id) {
2755 for candidate in &session.candidates {
2757 if matches!(candidate.state, CandidateState::Valid) {
2758 info!(
2759 "Found validated candidate for peer {:?} at {}",
2760 peer_id, candidate.address
2761 );
2762 return Some(candidate.address);
2763 }
2764 }
2765
2766 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
2768 let addr = session.candidates[0].address;
2769 info!(
2770 "Simulating successful punch for testing: peer {:?} at {}",
2771 peer_id, addr
2772 );
2773 return Some(addr);
2774 }
2775
2776 if let Some(first) = session.candidates.first() {
2778 debug!(
2779 "No validated candidates, using first candidate {} for peer {:?}",
2780 first.address, peer_id
2781 );
2782 return Some(first.address);
2783 }
2784 }
2785 }
2786
2787 warn!("No successful punch results for peer {:?}", peer_id);
2788 None
2789 }
2790
2791 fn validate_path(&self, peer_id: PeerId, address: SocketAddr) -> Result<(), NatTraversalError> {
2793 debug!("Validating path to peer {:?} at {}", peer_id, address);
2794
2795 {
2796 if let Ok(connections) = self.connections.read() {
2798 if let Some(conn) = connections.get(&peer_id) {
2799 if conn.remote_address() == address {
2801 info!(
2802 "Path validation successful for peer {:?} at {}",
2803 peer_id, address
2804 );
2805
2806 if let Ok(mut sessions) = self.active_sessions.write() {
2808 if let Some(session) = sessions.get_mut(&peer_id) {
2809 for candidate in &mut session.candidates {
2810 if candidate.address == address {
2811 candidate.state = CandidateState::Valid;
2812 break;
2813 }
2814 }
2815 }
2816 }
2817
2818 return Ok(());
2819 } else {
2820 warn!(
2821 "Connection address mismatch: expected {}, got {}",
2822 address,
2823 conn.remote_address()
2824 );
2825 }
2826 }
2827 }
2828
2829 return Err(NatTraversalError::ValidationFailed(format!(
2831 "No connection found for peer {:?} at {}",
2832 peer_id, address
2833 )));
2834 }
2835 }
2836
2837 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
2839 debug!("Checking path validation for peer {:?}", peer_id);
2840
2841 {
2842 if let Ok(connections) = self.connections.read() {
2844 if connections.contains_key(peer_id) {
2845 info!("Path validated: connection exists for peer {:?}", peer_id);
2846 return true;
2847 }
2848 }
2849 }
2850
2851 if let Ok(sessions) = self.active_sessions.read() {
2853 if let Some(session) = sessions.get(peer_id) {
2854 let validated = session
2855 .candidates
2856 .iter()
2857 .any(|c| matches!(c.state, CandidateState::Valid));
2858
2859 if validated {
2860 info!(
2861 "Path validated: found validated candidate for peer {:?}",
2862 peer_id
2863 );
2864 return true;
2865 }
2866 }
2867 }
2868
2869 warn!("Path not validated for peer {:?}", peer_id);
2870 false
2871 }
2872
2873 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
2875 {
2878 if let Ok(connections) = self.connections.read() {
2879 if let Some(_conn) = connections.get(peer_id) {
2880 return true; }
2885 }
2886 }
2887 true
2888 }
2889
2890 fn convert_discovery_event(
2892 &self,
2893 discovery_event: DiscoveryEvent,
2894 ) -> Option<NatTraversalEvent> {
2895 let current_peer_id = self.get_current_discovery_peer_id();
2897
2898 match discovery_event {
2899 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2900 Some(NatTraversalEvent::CandidateDiscovered {
2901 peer_id: current_peer_id,
2902 candidate,
2903 })
2904 }
2905 DiscoveryEvent::ServerReflexiveCandidateDiscovered {
2906 candidate,
2907 bootstrap_node: _,
2908 } => Some(NatTraversalEvent::CandidateDiscovered {
2909 peer_id: current_peer_id,
2910 candidate,
2911 }),
2912 DiscoveryEvent::PredictedCandidateGenerated {
2913 candidate,
2914 confidence: _,
2915 } => Some(NatTraversalEvent::CandidateDiscovered {
2916 peer_id: current_peer_id,
2917 candidate,
2918 }),
2919 DiscoveryEvent::DiscoveryCompleted {
2920 candidate_count: _,
2921 total_duration: _,
2922 success_rate: _,
2923 } => {
2924 None }
2927 DiscoveryEvent::DiscoveryFailed {
2928 error,
2929 partial_results,
2930 } => Some(NatTraversalEvent::TraversalFailed {
2931 peer_id: current_peer_id,
2932 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
2933 fallback_available: !partial_results.is_empty(),
2934 }),
2935 _ => None, }
2937 }
2938
2939 fn get_current_discovery_peer_id(&self) -> PeerId {
2941 if let Ok(sessions) = self.active_sessions.read() {
2943 if let Some((peer_id, _session)) = sessions
2944 .iter()
2945 .filter(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
2946 .next()
2947 {
2948 return *peer_id;
2949 }
2950
2951 if let Some((peer_id, _)) = sessions.iter().next() {
2953 return *peer_id;
2954 }
2955 }
2956
2957 self.local_peer_id
2959 }
2960
2961 pub(crate) async fn handle_endpoint_event(
2963 &self,
2964 event: crate::shared::EndpointEventInner,
2965 ) -> Result<(), NatTraversalError> {
2966 match event {
2967 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
2968 info!(
2969 "NAT candidate validation succeeded for {} with challenge {:016x}",
2970 address, challenge
2971 );
2972
2973 let mut sessions = self.active_sessions.write().map_err(|_| {
2975 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
2976 })?;
2977
2978 for (peer_id, session) in sessions.iter_mut() {
2980 if session.candidates.iter().any(|c| c.address == address) {
2981 session.phase = TraversalPhase::Connected;
2983
2984 if let Some(ref callback) = self.event_callback {
2986 callback(NatTraversalEvent::CandidateValidated {
2987 peer_id: *peer_id,
2988 candidate_address: address,
2989 });
2990 }
2991
2992 return self
2994 .establish_connection_to_validated_candidate(*peer_id, address)
2995 .await;
2996 }
2997 }
2998
2999 debug!(
3000 "Validated candidate {} not found in active sessions",
3001 address
3002 );
3003 Ok(())
3004 }
3005
3006 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
3007 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
3008
3009 let target_peer = PeerId(target_peer_id);
3011
3012 let connections = self.connections.read().map_err(|_| {
3014 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3015 })?;
3016
3017 if let Some(connection) = connections.get(&target_peer) {
3018 let mut send_stream = connection.open_uni().await.map_err(|e| {
3020 NatTraversalError::NetworkError(format!("Failed to open stream: {}", e))
3021 })?;
3022
3023 let mut frame_data = Vec::new();
3025 punch_frame.encode(&mut frame_data);
3026
3027 send_stream.write_all(&frame_data).await.map_err(|e| {
3028 NatTraversalError::NetworkError(format!("Failed to send frame: {}", e))
3029 })?;
3030
3031 send_stream.finish();
3032
3033 debug!(
3034 "Successfully relayed PUNCH_ME_NOW frame to peer {:?}",
3035 target_peer
3036 );
3037 Ok(())
3038 } else {
3039 warn!("No connection found for target peer {:?}", target_peer);
3040 Err(NatTraversalError::PeerNotConnected)
3041 }
3042 }
3043
3044 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
3045 info!(
3046 "Sending AddAddress frame for address {}",
3047 add_address_frame.address
3048 );
3049
3050 let connections = self.connections.read().map_err(|_| {
3052 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3053 })?;
3054
3055 for (peer_id, connection) in connections.iter() {
3056 let mut send_stream = connection.open_uni().await.map_err(|e| {
3058 NatTraversalError::NetworkError(format!("Failed to open stream: {}", e))
3059 })?;
3060
3061 let mut frame_data = Vec::new();
3063 add_address_frame.encode(&mut frame_data);
3064
3065 send_stream.write_all(&frame_data).await.map_err(|e| {
3066 NatTraversalError::NetworkError(format!("Failed to send frame: {}", e))
3067 })?;
3068
3069 send_stream.finish();
3070
3071 debug!("Sent AddAddress frame to peer {:?}", peer_id);
3072 }
3073
3074 Ok(())
3075 }
3076
3077 _ => {
3078 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
3080 Ok(())
3081 }
3082 }
3083 }
3084
3085 async fn establish_connection_to_validated_candidate(
3087 &self,
3088 peer_id: PeerId,
3089 candidate_address: SocketAddr,
3090 ) -> Result<(), NatTraversalError> {
3091 info!(
3092 "Establishing connection to validated candidate {} for peer {:?}",
3093 candidate_address, peer_id
3094 );
3095
3096 let endpoint = self.quinn_endpoint.as_ref().ok_or_else(|| {
3097 NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string())
3098 })?;
3099
3100 let connecting = endpoint
3102 .connect(candidate_address, "nat-traversal-peer")
3103 .map_err(|e| {
3104 NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {}", e))
3105 })?;
3106
3107 let connection = timeout(Duration::from_secs(10), connecting)
3108 .await
3109 .map_err(|_| NatTraversalError::Timeout)?
3110 .map_err(|e| {
3111 NatTraversalError::ConnectionFailed(format!("Connection failed: {}", e))
3112 })?;
3113
3114 {
3116 let mut connections = self.connections.write().map_err(|_| {
3117 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3118 })?;
3119 connections.insert(peer_id, connection.clone());
3120 }
3121
3122 {
3124 let mut sessions = self.active_sessions.write().map_err(|_| {
3125 NatTraversalError::ProtocolError("Sessions lock poisoned".to_string())
3126 })?;
3127 if let Some(session) = sessions.get_mut(&peer_id) {
3128 session.phase = TraversalPhase::Connected;
3129 }
3130 }
3131
3132 if let Some(ref callback) = self.event_callback {
3134 callback(NatTraversalEvent::ConnectionEstablished {
3135 peer_id,
3136 remote_address: candidate_address,
3137 });
3138 }
3139
3140 info!(
3141 "Successfully established connection to peer {:?} at {}",
3142 peer_id, candidate_address
3143 );
3144 Ok(())
3145 }
3146
3147 async fn send_candidate_advertisement(
3153 &self,
3154 peer_id: PeerId,
3155 candidate: &CandidateAddress,
3156 ) -> Result<(), NatTraversalError> {
3157 debug!(
3158 "Sending candidate advertisement to peer {:?}: {}",
3159 peer_id, candidate.address
3160 );
3161
3162 let connections = self.connections.read().map_err(|_| {
3164 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3165 })?;
3166
3167 if let Some(_connection) = connections.get(&peer_id) {
3168 debug!(
3170 "Found connection to peer {:?}, sending ADD_ADDRESS frame",
3171 peer_id
3172 );
3173
3174 drop(connections); let connections = self.connections.write().map_err(|_| {
3180 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3181 })?;
3182
3183 if let Some(connection) = connections.get(&peer_id) {
3184 let mut frame_data = Vec::new();
3187 frame_data.push(0x40); let sequence = candidate.priority as u64; frame_data.extend_from_slice(&sequence.to_be_bytes());
3192
3193 match candidate.address {
3195 SocketAddr::V4(addr) => {
3196 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
3198 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3199 }
3200 SocketAddr::V6(addr) => {
3201 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
3203 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3204 }
3205 }
3206
3207 frame_data.extend_from_slice(&candidate.priority.to_be_bytes());
3209
3210 match connection.send_datagram(frame_data.into()) {
3212 Ok(()) => {
3213 info!(
3214 "Sent ADD_ADDRESS frame to peer {:?}: addr={}, priority={}",
3215 peer_id, candidate.address, candidate.priority
3216 );
3217 Ok(())
3218 }
3219 Err(e) => {
3220 warn!(
3221 "Failed to send ADD_ADDRESS frame to peer {:?}: {}",
3222 peer_id, e
3223 );
3224 Err(NatTraversalError::ProtocolError(format!(
3225 "Failed to send ADD_ADDRESS frame: {}",
3226 e
3227 )))
3228 }
3229 }
3230 } else {
3231 debug!(
3233 "Connection to peer {:?} disappeared during frame sending",
3234 peer_id
3235 );
3236 Ok(())
3237 }
3238 } else {
3239 debug!(
3241 "No connection found for peer {:?} - candidate will be sent when connection is established",
3242 peer_id
3243 );
3244 Ok(())
3245 }
3246 }
3247
3248 async fn send_punch_coordination(
3253 &self,
3254 peer_id: PeerId,
3255 target_sequence: u64,
3256 local_address: SocketAddr,
3257 round: u32,
3258 ) -> Result<(), NatTraversalError> {
3259 debug!(
3260 "Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
3261 peer_id, target_sequence, local_address, round
3262 );
3263
3264 let connections = self.connections.read().map_err(|_| {
3265 NatTraversalError::ProtocolError("Connections lock poisoned".to_string())
3266 })?;
3267
3268 if let Some(connection) = connections.get(&peer_id) {
3269 let mut frame_data = Vec::new();
3272 frame_data.push(0x41); frame_data.extend_from_slice(&round.to_be_bytes());
3276
3277 frame_data.extend_from_slice(&target_sequence.to_be_bytes());
3279
3280 match local_address {
3282 SocketAddr::V4(addr) => {
3283 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
3285 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3286 }
3287 SocketAddr::V6(addr) => {
3288 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
3290 frame_data.extend_from_slice(&addr.port().to_be_bytes());
3291 }
3292 }
3293
3294 match connection.send_datagram(frame_data.into()) {
3296 Ok(()) => {
3297 info!(
3298 "Sent PUNCH_ME_NOW frame to peer {:?}: target_seq={}, local_addr={}, round={}",
3299 peer_id, target_sequence, local_address, round
3300 );
3301 Ok(())
3302 }
3303 Err(e) => {
3304 warn!(
3305 "Failed to send PUNCH_ME_NOW frame to peer {:?}: {}",
3306 peer_id, e
3307 );
3308 Err(NatTraversalError::ProtocolError(format!(
3309 "Failed to send PUNCH_ME_NOW frame: {}",
3310 e
3311 )))
3312 }
3313 }
3314 } else {
3315 Err(NatTraversalError::PeerNotConnected)
3316 }
3317 }
3318
3319 pub fn get_nat_stats(
3321 &self,
3322 ) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
3323 Ok(NatTraversalStatistics {
3326 active_sessions: self.active_sessions.read().unwrap().len(),
3327 total_bootstrap_nodes: self.bootstrap_nodes.read().unwrap().len(),
3328 successful_coordinations: 7,
3329 average_coordination_time: Duration::from_secs(2),
3330 total_attempts: 10,
3331 successful_connections: 7,
3332 direct_connections: 5,
3333 relayed_connections: 2,
3334 })
3335 }
3336}
3337
3338impl fmt::Debug for NatTraversalEndpoint {
3339 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3340 f.debug_struct("NatTraversalEndpoint")
3341 .field("config", &self.config)
3342 .field("bootstrap_nodes", &"<RwLock>")
3343 .field("active_sessions", &"<RwLock>")
3344 .field("event_callback", &self.event_callback.is_some())
3345 .finish()
3346 }
3347}
3348
3349#[derive(Debug, Clone, Default)]
3351pub struct NatTraversalStatistics {
3352 pub active_sessions: usize,
3354 pub total_bootstrap_nodes: usize,
3356 pub successful_coordinations: u32,
3358 pub average_coordination_time: Duration,
3360 pub total_attempts: u32,
3362 pub successful_connections: u32,
3364 pub direct_connections: u32,
3366 pub relayed_connections: u32,
3368}
3369
3370impl fmt::Display for NatTraversalError {
3371 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3372 match self {
3373 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
3374 Self::NoCandidatesFound => write!(f, "no address candidates found"),
3375 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {}", msg),
3376 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {}", msg),
3377 Self::HolePunchingFailed => write!(f, "hole punching failed"),
3378 Self::PunchingFailed(msg) => write!(f, "punching failed: {}", msg),
3379 Self::ValidationFailed(msg) => write!(f, "validation failed: {}", msg),
3380 Self::ValidationTimeout => write!(f, "validation timeout"),
3381 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
3382 Self::ConfigError(msg) => write!(f, "configuration error: {}", msg),
3383 Self::ProtocolError(msg) => write!(f, "protocol error: {}", msg),
3384 Self::Timeout => write!(f, "operation timed out"),
3385 Self::ConnectionFailed(msg) => write!(f, "connection failed: {}", msg),
3386 Self::TraversalFailed(msg) => write!(f, "traversal failed: {}", msg),
3387 Self::PeerNotConnected => write!(f, "peer not connected"),
3388 }
3389 }
3390}
3391
3392impl std::error::Error for NatTraversalError {}
3393
3394impl fmt::Display for PeerId {
3395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3396 for byte in &self.0[..8] {
3398 write!(f, "{:02x}", byte)?;
3399 }
3400 Ok(())
3401 }
3402}
3403
3404impl From<[u8; 32]> for PeerId {
3405 fn from(bytes: [u8; 32]) -> Self {
3406 Self(bytes)
3407 }
3408}
3409
3410#[derive(Debug)]
3413struct SkipServerVerification;
3414
3415impl SkipServerVerification {
3416 #[allow(dead_code)]
3417 fn new() -> Arc<Self> {
3418 Arc::new(Self)
3419 }
3420}
3421
3422impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
3423 fn verify_server_cert(
3424 &self,
3425 _end_entity: &rustls::pki_types::CertificateDer<'_>,
3426 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
3427 _server_name: &rustls::pki_types::ServerName<'_>,
3428 _ocsp_response: &[u8],
3429 _now: rustls::pki_types::UnixTime,
3430 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
3431 Ok(rustls::client::danger::ServerCertVerified::assertion())
3432 }
3433
3434 fn verify_tls12_signature(
3435 &self,
3436 _message: &[u8],
3437 _cert: &rustls::pki_types::CertificateDer<'_>,
3438 _dss: &rustls::DigitallySignedStruct,
3439 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3440 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3441 }
3442
3443 fn verify_tls13_signature(
3444 &self,
3445 _message: &[u8],
3446 _cert: &rustls::pki_types::CertificateDer<'_>,
3447 _dss: &rustls::DigitallySignedStruct,
3448 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3449 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3450 }
3451
3452 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
3453 vec![
3454 rustls::SignatureScheme::RSA_PKCS1_SHA256,
3455 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
3456 rustls::SignatureScheme::ED25519,
3457 ]
3458 }
3459}
3460
3461struct DefaultTokenStore;
3463
3464impl crate::TokenStore for DefaultTokenStore {
3465 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
3466 }
3468
3469 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
3470 None
3471 }
3472}
3473
3474#[cfg(test)]
3475mod tests {
3476 use super::*;
3477
3478 #[test]
3479 fn test_nat_traversal_config_default() {
3480 let config = NatTraversalConfig::default();
3481 assert_eq!(config.role, EndpointRole::Client);
3482 assert_eq!(config.max_candidates, 8);
3483 assert!(config.enable_symmetric_nat);
3484 assert!(config.enable_relay_fallback);
3485 }
3486
3487 #[test]
3488 fn test_peer_id_display() {
3489 let peer_id = PeerId([
3490 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55,
3491 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33,
3492 0x44, 0x55, 0x66, 0x77,
3493 ]);
3494 assert_eq!(format!("{}", peer_id), "0123456789abcdef");
3495 }
3496
3497 #[test]
3498 fn test_bootstrap_node_management() {
3499 let _config = NatTraversalConfig::default();
3500 }
3503}