1use std::{
8 collections::HashMap,
9 fmt,
10 net::SocketAddr,
11 sync::Arc,
12 time::Duration,
13};
14
15use tracing::{debug, info, warn, error};
16
17use std::sync::atomic::{AtomicBool, Ordering};
18
19use tokio::{
20 net::UdpSocket,
21 sync::mpsc,
22 time::{sleep, timeout},
23};
24
25#[cfg(feature = "runtime-tokio")]
26use crate::quinn_high_level::TokioRuntime;
27
28use crate::{
29 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
30 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
31 VarInt,
32};
33
34use crate::{
35 quinn_high_level::{Endpoint as QuinnEndpoint, Connection as QuinnConnection},
36 EndpointConfig,
37 ServerConfig,
38 ClientConfig,
39 ConnectionError,
40 TransportConfig,
41 crypto::rustls::QuicServerConfig,
42 crypto::rustls::QuicClientConfig,
43};
44
45use crate::config::validation::{ConfigValidator, ValidationResult};
46
47use crate::crypto::certificate_manager::{CertificateManager, CertificateConfig};
48
49pub struct NatTraversalEndpoint {
51 quinn_endpoint: Option<QuinnEndpoint>,
54 config: NatTraversalConfig,
58 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
60 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
62 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
64 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
66 shutdown: Arc<AtomicBool>,
69 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
72 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
75 local_peer_id: PeerId,
77}
78
79#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct NatTraversalConfig {
82 pub role: EndpointRole,
84 pub bootstrap_nodes: Vec<SocketAddr>,
86 pub max_candidates: usize,
88 pub coordination_timeout: Duration,
90 pub enable_symmetric_nat: bool,
92 pub enable_relay_fallback: bool,
94 pub max_concurrent_attempts: usize,
96 pub bind_addr: Option<SocketAddr>,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
102pub enum EndpointRole {
103 Client,
105 Server { can_coordinate: bool },
107 Bootstrap,
109}
110
111impl EndpointRole {
112 pub fn name(&self) -> &'static str {
114 match self {
115 EndpointRole::Client => "client",
116 EndpointRole::Server { .. } => "server",
117 EndpointRole::Bootstrap => "bootstrap",
118 }
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
124pub struct PeerId(pub [u8; 32]);
125
126#[derive(Debug, Clone)]
128pub struct BootstrapNode {
129 pub address: SocketAddr,
131 pub last_seen: std::time::Instant,
133 pub can_coordinate: bool,
135 pub rtt: Option<Duration>,
137 pub coordination_count: u32,
139}
140
141impl BootstrapNode {
142 pub fn new(address: SocketAddr) -> Self {
144 Self {
145 address,
146 last_seen: std::time::Instant::now(),
147 can_coordinate: true,
148 rtt: None,
149 coordination_count: 0,
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
156pub struct CandidatePair {
157 pub local_candidate: CandidateAddress,
159 pub remote_candidate: CandidateAddress,
161 pub priority: u64,
163 pub state: CandidatePairState,
165}
166
167#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub enum CandidatePairState {
170 Waiting,
172 InProgress,
174 Succeeded,
176 Failed,
178 Cancelled,
180}
181
182#[derive(Debug)]
184struct NatTraversalSession {
185 peer_id: PeerId,
187 coordinator: SocketAddr,
189 attempt: u32,
191 started_at: std::time::Instant,
193 phase: TraversalPhase,
195 candidates: Vec<CandidateAddress>,
197 session_state: SessionState,
199}
200
201#[derive(Debug, Clone)]
203pub struct SessionState {
204 pub state: ConnectionState,
206 pub last_transition: std::time::Instant,
208 pub connection: Option<QuinnConnection>,
211 pub active_attempts: Vec<(SocketAddr, std::time::Instant)>,
213 pub metrics: ConnectionMetrics,
215}
216
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub enum ConnectionState {
220 Idle,
222 Connecting,
224 Connected,
226 Migrating,
228 Closed,
230}
231
232#[derive(Debug, Clone, Default)]
234pub struct ConnectionMetrics {
235 pub rtt: Option<Duration>,
237 pub loss_rate: f64,
239 pub bytes_sent: u64,
241 pub bytes_received: u64,
243 pub last_activity: Option<std::time::Instant>,
245}
246
247#[derive(Debug, Clone)]
249pub struct SessionStateUpdate {
250 pub peer_id: PeerId,
252 pub old_state: ConnectionState,
254 pub new_state: ConnectionState,
256 pub reason: StateChangeReason,
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq)]
262pub enum StateChangeReason {
263 Timeout,
265 ConnectionEstablished,
267 ConnectionClosed,
269 MigrationComplete,
271 MigrationFailed,
273 NetworkError,
275 UserClosed,
277}
278
279#[derive(Debug, Clone, Copy, PartialEq, Eq)]
281pub enum TraversalPhase {
282 Discovery,
284 Coordination,
286 Synchronization,
288 Punching,
290 Validation,
292 Connected,
294 Failed,
296}
297
298#[derive(Debug, Clone)]
300pub struct CandidateAddress {
301 pub address: SocketAddr,
303 pub priority: u32,
305 pub source: CandidateSource,
307 pub state: CandidateState,
309}
310
311#[derive(Debug, Clone)]
313pub enum NatTraversalEvent {
314 CandidateDiscovered {
316 peer_id: PeerId,
317 candidate: CandidateAddress,
318 },
319 CoordinationRequested {
321 peer_id: PeerId,
322 coordinator: SocketAddr,
323 },
324 CoordinationSynchronized {
326 peer_id: PeerId,
327 round_id: VarInt,
328 },
329 HolePunchingStarted {
331 peer_id: PeerId,
332 targets: Vec<SocketAddr>,
333 },
334 PathValidated {
336 peer_id: PeerId,
337 address: SocketAddr,
338 rtt: Duration,
339 },
340 CandidateValidated {
342 peer_id: PeerId,
343 candidate_address: SocketAddr,
344 },
345 TraversalSucceeded {
347 peer_id: PeerId,
348 final_address: SocketAddr,
349 total_time: Duration,
350 },
351 ConnectionEstablished {
353 peer_id: PeerId,
354 remote_address: SocketAddr,
356 },
357 TraversalFailed {
359 peer_id: PeerId,
360 error: NatTraversalError,
361 fallback_available: bool,
362 },
363 ConnectionLost {
365 peer_id: PeerId,
366 reason: String,
367 },
368 PhaseTransition {
370 peer_id: PeerId,
371 from_phase: TraversalPhase,
372 to_phase: TraversalPhase,
373 },
374 SessionStateChanged {
376 peer_id: PeerId,
377 new_state: ConnectionState,
378 },
379}
380
381#[derive(Debug, Clone)]
383pub enum NatTraversalError {
384 NoBootstrapNodes,
386 NoCandidatesFound,
388 CandidateDiscoveryFailed(String),
390 CoordinationFailed(String),
392 HolePunchingFailed,
394 PunchingFailed(String),
396 ValidationFailed(String),
398 ValidationTimeout,
400 NetworkError(String),
402 ConfigError(String),
404 ProtocolError(String),
406 Timeout,
408 ConnectionFailed(String),
410 TraversalFailed(String),
412 PeerNotConnected,
414}
415
416impl Default for NatTraversalConfig {
417 fn default() -> Self {
418 Self {
419 role: EndpointRole::Client,
420 bootstrap_nodes: Vec::new(),
421 max_candidates: 8,
422 coordination_timeout: Duration::from_secs(10),
423 enable_symmetric_nat: true,
424 enable_relay_fallback: true,
425 max_concurrent_attempts: 3,
426 bind_addr: None,
427 }
428 }
429}
430
431impl ConfigValidator for NatTraversalConfig {
432 fn validate(&self) -> ValidationResult<()> {
433 use crate::config::validation::*;
434
435 match self.role {
437 EndpointRole::Client => {
438 if self.bootstrap_nodes.is_empty() {
439 return Err(ConfigValidationError::InvalidRole(
440 "Client endpoints require at least one bootstrap node".to_string()
441 ));
442 }
443 }
444 EndpointRole::Server { can_coordinate } => {
445 if can_coordinate && self.bootstrap_nodes.is_empty() {
446 return Err(ConfigValidationError::InvalidRole(
447 "Server endpoints with coordination capability require bootstrap nodes".to_string()
448 ));
449 }
450 }
451 EndpointRole::Bootstrap => {
452 }
454 }
455
456 if !self.bootstrap_nodes.is_empty() {
458 validate_bootstrap_nodes(&self.bootstrap_nodes)?;
459 }
460
461 validate_range(
463 self.max_candidates,
464 1,
465 256,
466 "max_candidates"
467 )?;
468
469 validate_duration(
471 self.coordination_timeout,
472 Duration::from_millis(100),
473 Duration::from_secs(300),
474 "coordination_timeout"
475 )?;
476
477 validate_range(
479 self.max_concurrent_attempts,
480 1,
481 16,
482 "max_concurrent_attempts"
483 )?;
484
485 if self.max_concurrent_attempts > self.max_candidates {
487 return Err(ConfigValidationError::IncompatibleConfiguration(
488 "max_concurrent_attempts cannot exceed max_candidates".to_string()
489 ));
490 }
491
492 if self.role == EndpointRole::Bootstrap && self.enable_relay_fallback {
493 return Err(ConfigValidationError::IncompatibleConfiguration(
494 "Bootstrap nodes should not enable relay fallback".to_string()
495 ));
496 }
497
498 Ok(())
499 }
500}
501
502impl NatTraversalEndpoint {
503 pub async fn new(
505 config: NatTraversalConfig,
506 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
507 ) -> Result<Self, NatTraversalError> {
508
509 {
510 Self::new_impl(config, event_callback).await
511 }
512
513 }
514
515 async fn new_impl(
518 config: NatTraversalConfig,
519 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
520 ) -> Result<Self, NatTraversalError> {
521 Self::new_common(config, event_callback).await
522 }
523
524 async fn new_common(
527 config: NatTraversalConfig,
528 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
529 ) -> Result<Self, NatTraversalError> {
530 Self::new_shared_logic(config, event_callback).await
532 }
533
534 async fn new_shared_logic(
537 config: NatTraversalConfig,
538 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
539 ) -> Result<Self, NatTraversalError> {
540 {
543 config.validate()
544 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
545 }
546
547 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
551 config
552 .bootstrap_nodes
553 .iter()
554 .map(|&address| BootstrapNode {
555 address,
556 last_seen: std::time::Instant::now(),
557 can_coordinate: true, rtt: None,
559 coordination_count: 0,
560 })
561 .collect(),
562 ));
563
564 let discovery_config = DiscoveryConfig {
566 total_timeout: config.coordination_timeout,
567 max_candidates: config.max_candidates,
568 enable_symmetric_prediction: config.enable_symmetric_nat,
569 bound_address: config.bind_addr, ..DiscoveryConfig::default()
571 };
572
573 let nat_traversal_role = match config.role {
574 EndpointRole::Client => NatTraversalRole::Client,
575 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server { can_relay: can_coordinate },
576 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
577 };
578
579 let discovery_manager = Arc::new(std::sync::Mutex::new(
580 CandidateDiscoveryManager::new(discovery_config)
581 ));
582
583 let (quinn_endpoint, event_tx, local_addr) = Self::create_quinn_endpoint(&config, nat_traversal_role).await?;
586
587 {
589 let mut discovery = discovery_manager.lock()
590 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
591 discovery.set_bound_address(local_addr);
592 info!("Updated discovery manager with bound address: {}", local_addr);
593 }
594
595 let endpoint = Self {
596 quinn_endpoint: Some(quinn_endpoint.clone()),
597 config: config.clone(),
598 bootstrap_nodes,
599 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
600 discovery_manager,
601 event_callback,
602 shutdown: Arc::new(AtomicBool::new(false)),
603 event_tx: Some(event_tx.clone()),
604 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
605 local_peer_id: Self::generate_local_peer_id(),
606 };
607
608 if matches!(config.role, EndpointRole::Bootstrap | EndpointRole::Server { .. }) {
610 let endpoint_clone = quinn_endpoint.clone();
611 let shutdown_clone = endpoint.shutdown.clone();
612 let event_tx_clone = event_tx.clone();
613 let connections_clone = endpoint.connections.clone();
614
615 tokio::spawn(async move {
616 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx_clone, connections_clone).await;
617 });
618
619 info!("Started accepting connections for {:?} role", config.role);
620 }
621
622 let discovery_manager_clone = endpoint.discovery_manager.clone();
624 let shutdown_clone = endpoint.shutdown.clone();
625 let event_tx_clone = event_tx;
626
627 tokio::spawn(async move {
628 Self::poll_discovery(discovery_manager_clone, shutdown_clone, event_tx_clone).await;
629 });
630
631 info!("Started discovery polling task");
632
633 {
635 let mut discovery = endpoint.discovery_manager.lock()
636 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
637
638 let local_peer_id = endpoint.local_peer_id;
640 let bootstrap_nodes = {
641 let nodes = endpoint.bootstrap_nodes.read()
642 .map_err(|_| NatTraversalError::ProtocolError("Bootstrap nodes lock poisoned".to_string()))?;
643 nodes.clone()
644 };
645
646 discovery.start_discovery(local_peer_id, bootstrap_nodes)
647 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
648
649 info!("Started local candidate discovery for peer {:?}", local_peer_id);
650 }
651
652 Ok(endpoint)
653 }
654
655 pub fn get_quinn_endpoint(&self) -> Option<&crate::quinn_high_level::Endpoint> {
657 self.quinn_endpoint.as_ref()
658 }
659
660 pub fn get_event_callback(&self) -> Option<&Box<dyn Fn(NatTraversalEvent) + Send + Sync>> {
662 self.event_callback.as_ref()
663 }
664
665 pub fn initiate_nat_traversal(
667 &self,
668 peer_id: PeerId,
669 coordinator: SocketAddr,
670 ) -> Result<(), NatTraversalError> {
671 info!("Starting NAT traversal to peer {:?} via coordinator {}", peer_id, coordinator);
672
673 let session = NatTraversalSession {
675 peer_id,
676 coordinator,
677 attempt: 1,
678 started_at: std::time::Instant::now(),
679 phase: TraversalPhase::Discovery,
680 candidates: Vec::new(),
681 session_state: SessionState {
682 state: ConnectionState::Connecting,
683 last_transition: std::time::Instant::now(),
684
685 connection: None,
686 active_attempts: Vec::new(),
687 metrics: ConnectionMetrics::default(),
688 },
689 };
690
691 {
693 let mut sessions = self.active_sessions.write()
694 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
695 sessions.insert(peer_id, session);
696 }
697
698 let bootstrap_nodes_vec = {
700 let bootstrap_nodes = self.bootstrap_nodes.read()
701 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
702 bootstrap_nodes.clone()
703 };
704
705 {
706 let mut discovery = self.discovery_manager.lock()
707 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
708
709 discovery.start_discovery(peer_id, bootstrap_nodes_vec)
710 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
711 }
712
713 if let Some(ref callback) = self.event_callback {
715 callback(NatTraversalEvent::CoordinationRequested {
716 peer_id,
717 coordinator,
718 });
719 }
720
721 Ok(())
723 }
724
725 pub fn poll_sessions(&self) -> Result<Vec<SessionStateUpdate>, NatTraversalError> {
727 let mut updates = Vec::new();
728 let now = std::time::Instant::now();
729
730 let mut sessions = self.active_sessions.write()
731 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
732
733 for (peer_id, session) in sessions.iter_mut() {
734 let mut state_changed = false;
735
736 match session.session_state.state {
737 ConnectionState::Connecting => {
738 let elapsed = now.duration_since(session.session_state.last_transition);
740 if elapsed > Duration::from_secs(30) {
741 session.session_state.state = ConnectionState::Closed;
742 session.session_state.last_transition = now;
743 state_changed = true;
744
745 updates.push(SessionStateUpdate {
746 peer_id: *peer_id,
747 old_state: ConnectionState::Connecting,
748 new_state: ConnectionState::Closed,
749 reason: StateChangeReason::Timeout,
750 });
751 }
752
753 if let Some(ref connection) = session.session_state.connection {
756 session.session_state.state = ConnectionState::Connected;
757 session.session_state.last_transition = now;
758 state_changed = true;
759
760 updates.push(SessionStateUpdate {
761 peer_id: *peer_id,
762 old_state: ConnectionState::Connecting,
763 new_state: ConnectionState::Connected,
764 reason: StateChangeReason::ConnectionEstablished,
765 });
766 }
767 }
768 ConnectionState::Connected => {
769 {
772 }
775
776 session.session_state.metrics.last_activity = Some(now);
778 }
779 ConnectionState::Migrating => {
780 let elapsed = now.duration_since(session.session_state.last_transition);
782 if elapsed > Duration::from_secs(10) {
783 if session.session_state.connection.is_some() {
786 session.session_state.state = ConnectionState::Connected;
787 state_changed = true;
788
789 updates.push(SessionStateUpdate {
790 peer_id: *peer_id,
791 old_state: ConnectionState::Migrating,
792 new_state: ConnectionState::Connected,
793 reason: StateChangeReason::MigrationComplete,
794 });
795 } else {
796 session.session_state.state = ConnectionState::Closed;
797 state_changed = true;
798
799 updates.push(SessionStateUpdate {
800 peer_id: *peer_id,
801 old_state: ConnectionState::Migrating,
802 new_state: ConnectionState::Closed,
803 reason: StateChangeReason::MigrationFailed,
804 });
805 }
806
807 session.session_state.last_transition = now;
808 }
809 }
810 _ => {}
811 }
812
813 if state_changed {
815 if let Some(ref callback) = self.event_callback {
816 callback(NatTraversalEvent::SessionStateChanged {
817 peer_id: *peer_id,
818 new_state: session.session_state.state,
819 });
820 }
821 }
822 }
823
824 Ok(updates)
825 }
826
827 pub fn start_session_polling(&self, interval: Duration) -> tokio::task::JoinHandle<()> {
830 let sessions = self.active_sessions.clone();
831 let shutdown = self.shutdown.clone();
832
833 tokio::spawn(async move {
834 let mut ticker = tokio::time::interval(interval);
835
836 loop {
837 ticker.tick().await;
838
839 if shutdown.load(Ordering::Relaxed) {
840 break;
841 }
842
843 if let Ok(sessions_guard) = sessions.read() {
845 for (_peer_id, _session) in sessions_guard.iter() {
846 }
849 }
850 }
851 })
852 }
853
854 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
856 let sessions = self.active_sessions.read()
857 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
858 let bootstrap_nodes = self.bootstrap_nodes.read()
859 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
860
861 Ok(NatTraversalStatistics {
862 active_sessions: sessions.len(),
863 total_bootstrap_nodes: bootstrap_nodes.len(),
864 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
865 average_coordination_time: Duration::from_millis(500), total_attempts: 0,
867 successful_connections: 0,
868 direct_connections: 0,
869 relayed_connections: 0,
870 })
871 }
872
873 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
875 let mut bootstrap_nodes = self.bootstrap_nodes.write()
876 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
877
878 if !bootstrap_nodes.iter().any(|b| b.address == address) {
880 bootstrap_nodes.push(BootstrapNode {
881 address,
882 last_seen: std::time::Instant::now(),
883 can_coordinate: true,
884 rtt: None,
885 coordination_count: 0,
886 });
887 info!("Added bootstrap node: {}", address);
888 }
889 Ok(())
890 }
891
892 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
894 let mut bootstrap_nodes = self.bootstrap_nodes.write()
895 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
896 bootstrap_nodes.retain(|b| b.address != address);
897 info!("Removed bootstrap node: {}", address);
898 Ok(())
899 }
900
901 async fn create_quinn_endpoint(
906 config: &NatTraversalConfig,
907 _nat_role: NatTraversalRole,
908 ) -> Result<(QuinnEndpoint, mpsc::UnboundedSender<NatTraversalEvent>, SocketAddr), NatTraversalError> {
909 use std::sync::Arc;
910
911 let server_config = match config.role {
913 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
914 let cert_config = CertificateConfig {
916 common_name: format!("ant-quic-{}", config.role.name()),
917 subject_alt_names: vec![
918 "localhost".to_string(),
919 "ant-quic-node".to_string(),
920 ],
921 self_signed: true, ..CertificateConfig::default()
923 };
924
925 let cert_manager = CertificateManager::new(cert_config)
926 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate manager creation failed: {}", e)))?;
927
928 let cert_bundle = cert_manager.generate_certificate()
929 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e)))?;
930
931 let rustls_config = cert_manager.create_server_config(&cert_bundle)
932 .map_err(|e| NatTraversalError::ConfigError(format!("Server config creation failed: {}", e)))?;
933
934 let server_crypto = QuicServerConfig::try_from(rustls_config.as_ref().clone())
935 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
936
937 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
938
939 let mut transport_config = TransportConfig::default();
941 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
942 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
943
944 let nat_config = crate::transport_parameters::NatTraversalConfig {
946 role: match config.role {
947 EndpointRole::Bootstrap => crate::transport_parameters::NatTraversalRole::Bootstrap,
948 EndpointRole::Server { can_coordinate } => crate::transport_parameters::NatTraversalRole::Server { can_relay: can_coordinate },
949 EndpointRole::Client => crate::transport_parameters::NatTraversalRole::Client,
950 },
951 max_candidates: VarInt::from_u32(config.max_candidates as u32),
952 coordination_timeout: VarInt::from_u64(config.coordination_timeout.as_millis() as u64).unwrap(),
953 max_concurrent_attempts: VarInt::from_u32(config.max_concurrent_attempts as u32),
954 peer_id: None, };
956 transport_config.nat_traversal_config(Some(nat_config));
957
958 server_config.transport_config(Arc::new(transport_config));
959
960 Some(server_config)
961 }
962 _ => None,
963 };
964
965 let client_config = {
967 let cert_config = CertificateConfig {
968 common_name: format!("ant-quic-{}", config.role.name()),
969 subject_alt_names: vec![
970 "localhost".to_string(),
971 "ant-quic-node".to_string(),
972 ],
973 self_signed: true,
974 ..CertificateConfig::default()
975 };
976
977 let cert_manager = CertificateManager::new(cert_config)
978 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate manager creation failed: {}", e)))?;
979
980 let _cert_bundle = cert_manager.generate_certificate()
981 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e)))?;
982
983 let rustls_config = cert_manager.create_client_config()
984 .map_err(|e| NatTraversalError::ConfigError(format!("Client config creation failed: {}", e)))?;
985
986 let client_crypto = QuicClientConfig::try_from(rustls_config.as_ref().clone())
987 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
988
989 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
990
991 let mut transport_config = TransportConfig::default();
993 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
994 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
995
996 let nat_config = crate::transport_parameters::NatTraversalConfig {
998 role: match config.role {
999 EndpointRole::Bootstrap => crate::transport_parameters::NatTraversalRole::Bootstrap,
1000 EndpointRole::Server { can_coordinate } => crate::transport_parameters::NatTraversalRole::Server { can_relay: can_coordinate },
1001 EndpointRole::Client => crate::transport_parameters::NatTraversalRole::Client,
1002 },
1003 max_candidates: VarInt::from_u32(config.max_candidates as u32),
1004 coordination_timeout: VarInt::from_u64(config.coordination_timeout.as_millis() as u64).unwrap(),
1005 max_concurrent_attempts: VarInt::from_u32(config.max_concurrent_attempts as u32),
1006 peer_id: None, };
1008 transport_config.nat_traversal_config(Some(nat_config));
1009
1010 client_config.transport_config(Arc::new(transport_config));
1011
1012 client_config
1013 };
1014
1015 let bind_addr = config.bind_addr.unwrap_or("0.0.0.0:0".parse().unwrap());
1017 let socket = UdpSocket::bind(bind_addr).await
1018 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {}", e)))?;
1019
1020 info!("Binding endpoint to {}", bind_addr);
1021
1022 let std_socket = socket.into_std()
1024 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to convert socket: {}", e)))?;
1025
1026 let mut endpoint = QuinnEndpoint::new(
1028 EndpointConfig::default(),
1029 server_config,
1030 std_socket,
1031 Arc::new(TokioRuntime),
1032 ).map_err(|e| NatTraversalError::ConfigError(format!("Failed to create Quinn endpoint: {}", e)))?;
1033
1034 endpoint.set_default_client_config(client_config);
1036
1037 let local_addr = endpoint.local_addr()
1039 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to get local address: {}", e)))?;
1040
1041 info!("Endpoint bound to actual address: {}", local_addr);
1042
1043 let (event_tx, _event_rx) = mpsc::unbounded_channel();
1045
1046 Ok((endpoint, event_tx, local_addr))
1047 }
1048
1049 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
1054 let endpoint = self.quinn_endpoint.as_ref()
1055 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1056
1057 let _socket = UdpSocket::bind(bind_addr).await
1059 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to bind to {}: {}", bind_addr, e)))?;
1060
1061 info!("Started listening on {}", bind_addr);
1062
1063 let endpoint_clone = endpoint.clone();
1065 let shutdown_clone = self.shutdown.clone();
1066 let event_tx = self.event_tx.as_ref().unwrap().clone();
1067 let connections_clone = self.connections.clone();
1068
1069 tokio::spawn(async move {
1070 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx, connections_clone).await;
1071 });
1072
1073 Ok(())
1074 }
1075
1076 async fn accept_connections(
1079 endpoint: QuinnEndpoint,
1080 shutdown: Arc<AtomicBool>,
1081 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1082 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
1083 ) {
1084 while !shutdown.load(Ordering::Relaxed) {
1085 match endpoint.accept().await {
1086 Some(connecting) => {
1087 let event_tx = event_tx.clone();
1088 let connections = connections.clone();
1089 tokio::spawn(async move {
1090 match connecting.await {
1091 Ok(connection) => {
1092 info!("Accepted connection from {}", connection.remote_address());
1093
1094 let peer_id = Self::generate_peer_id_from_address(connection.remote_address());
1096
1097 if let Ok(mut conns) = connections.write() {
1099 conns.insert(peer_id, connection.clone());
1100 }
1101
1102 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1103 peer_id,
1104 remote_address: connection.remote_address(),
1105 });
1106
1107 Self::handle_connection(connection, event_tx).await;
1109 }
1110 Err(e) => {
1111 debug!("Connection failed: {}", e);
1112 }
1113 }
1114 });
1115 }
1116 None => {
1117 break;
1119 }
1120 }
1121 }
1122 }
1123
1124 async fn poll_discovery(
1127 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
1128 shutdown: Arc<AtomicBool>,
1129 _event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1130 ) {
1131 use tokio::time::{interval, Duration};
1132
1133 let mut poll_interval = interval(Duration::from_millis(100));
1134
1135 while !shutdown.load(Ordering::Relaxed) {
1136 poll_interval.tick().await;
1137
1138 let events = match discovery_manager.lock() {
1140 Ok(mut discovery) => {
1141 discovery.poll(std::time::Instant::now())
1142 }
1143 Err(e) => {
1144 error!("Failed to lock discovery manager: {}", e);
1145 continue;
1146 }
1147 };
1148
1149 for event in events {
1151 match event {
1152 DiscoveryEvent::DiscoveryStarted { peer_id, bootstrap_count } => {
1153 debug!("Discovery started for peer {:?} with {} bootstrap nodes",
1154 peer_id, bootstrap_count);
1155 }
1156 DiscoveryEvent::LocalScanningStarted => {
1157 debug!("Local interface scanning started");
1158 }
1159 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1160 debug!("Discovered local candidate: {}", candidate.address);
1161 }
1164 DiscoveryEvent::LocalScanningCompleted { candidate_count, duration } => {
1165 debug!("Local interface scanning completed: {} candidates in {:?}",
1166 candidate_count, duration);
1167 }
1168 DiscoveryEvent::ServerReflexiveDiscoveryStarted { bootstrap_count } => {
1169 debug!("Server reflexive discovery started with {} bootstrap nodes",
1170 bootstrap_count);
1171 }
1172 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, bootstrap_node } => {
1173 debug!("Discovered server-reflexive candidate {} via bootstrap {}",
1174 candidate.address, bootstrap_node);
1175 }
1177 DiscoveryEvent::BootstrapQueryFailed { bootstrap_node, error } => {
1178 debug!("Bootstrap query failed for {}: {}", bootstrap_node, error);
1179 }
1180 DiscoveryEvent::SymmetricPredictionStarted { base_address } => {
1181 debug!("Symmetric NAT prediction started from base address {}", base_address);
1182 }
1183 DiscoveryEvent::PredictedCandidateGenerated { candidate, confidence } => {
1184 debug!("Predicted symmetric NAT candidate {} with confidence {}",
1185 candidate.address, confidence);
1186 }
1188 DiscoveryEvent::PortAllocationDetected { port, source_address, bootstrap_node, timestamp } => {
1189 debug!("Port allocation detected: port {} from {} via bootstrap {:?} at {:?}",
1190 port, source_address, bootstrap_node, timestamp);
1191 }
1192 DiscoveryEvent::DiscoveryCompleted { candidate_count, total_duration, success_rate } => {
1193 info!("Discovery completed with {} candidates in {:?} (success rate: {:.2}%)",
1194 candidate_count, total_duration, success_rate * 100.0);
1195 }
1198 DiscoveryEvent::DiscoveryFailed { error, partial_results } => {
1199 warn!("Discovery failed: {} (found {} partial candidates)",
1200 error, partial_results.len());
1201
1202 }
1207 DiscoveryEvent::PathValidationRequested { candidate_id, candidate_address, challenge_token } => {
1208 debug!("PATH_CHALLENGE requested for candidate {} at {} with token {:08x}",
1209 candidate_id.0, candidate_address, challenge_token);
1210 }
1213 DiscoveryEvent::PathValidationResponse { candidate_id, candidate_address, challenge_token: _, rtt } => {
1214 debug!("PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1215 candidate_id.0, candidate_address, rtt);
1216 }
1218 }
1219 }
1220 }
1221
1222 info!("Discovery polling task shutting down");
1223 }
1224
1225 async fn handle_connection(
1228 connection: QuinnConnection,
1229 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1230 ) {
1231 let peer_id = Self::generate_peer_id_from_address(connection.remote_address());
1232 let remote_address = connection.remote_address();
1233
1234 debug!("Handling connection from peer {:?} at {}", peer_id, remote_address);
1235
1236 loop {
1238 tokio::select! {
1239 stream = connection.accept_bi() => {
1240 match stream {
1241 Ok((send, recv)) => {
1242 tokio::spawn(async move {
1243 Self::handle_bi_stream(send, recv).await;
1244 });
1245 }
1246 Err(e) => {
1247 debug!("Error accepting bidirectional stream: {}", e);
1248 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1249 peer_id,
1250 reason: format!("Stream error: {}", e),
1251 });
1252 break;
1253 }
1254 }
1255 }
1256 stream = connection.accept_uni() => {
1257 match stream {
1258 Ok(recv) => {
1259 tokio::spawn(async move {
1260 Self::handle_uni_stream(recv).await;
1261 });
1262 }
1263 Err(e) => {
1264 debug!("Error accepting unidirectional stream: {}", e);
1265 let _ = event_tx.send(NatTraversalEvent::ConnectionLost {
1266 peer_id,
1267 reason: format!("Stream error: {}", e),
1268 });
1269 break;
1270 }
1271 }
1272 }
1273 }
1274 }
1275 }
1276
1277 async fn handle_bi_stream(
1280 _send: crate::quinn_high_level::SendStream,
1281 _recv: crate::quinn_high_level::RecvStream,
1282 ) {
1283 }
1312
1313 async fn handle_uni_stream(mut recv: crate::quinn_high_level::RecvStream) {
1316 let mut buffer = vec![0u8; 1024];
1317
1318 loop {
1319 match recv.read(&mut buffer).await {
1320 Ok(Some(size)) => {
1321 debug!("Received {} bytes on unidirectional stream", size);
1322 }
1324 Ok(None) => {
1325 debug!("Unidirectional stream closed by peer");
1326 break;
1327 }
1328 Err(e) => {
1329 debug!("Error reading from unidirectional stream: {}", e);
1330 break;
1331 }
1332 }
1333 }
1334 }
1335
1336 pub async fn connect_to_peer(
1339 &self,
1340 peer_id: PeerId,
1341 server_name: &str,
1342 remote_addr: SocketAddr,
1343 ) -> Result<QuinnConnection, NatTraversalError> {
1344 let endpoint = self.quinn_endpoint.as_ref()
1345 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1346
1347 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1348
1349 let connecting = endpoint.connect(remote_addr, server_name)
1351 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {}", e)))?;
1352
1353 let connection = timeout(Duration::from_secs(10), connecting)
1354 .await
1355 .map_err(|_| NatTraversalError::Timeout)?
1356 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {}", e)))?;
1357
1358 info!("Successfully connected to peer {:?} at {}", peer_id, remote_addr);
1359
1360 if let Some(ref event_tx) = self.event_tx {
1362 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1363 peer_id,
1364 remote_address: remote_addr,
1365 });
1366 }
1367
1368 Ok(connection)
1369 }
1370
1371 pub async fn accept_connection(&self) -> Result<(PeerId, QuinnConnection), NatTraversalError> {
1374 let endpoint = self.quinn_endpoint.as_ref()
1375 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1376
1377 let incoming = endpoint.accept().await
1379 .ok_or_else(|| NatTraversalError::NetworkError("Endpoint closed".to_string()))?;
1380
1381 let remote_addr = incoming.remote_address();
1382 info!("Accepting connection from {}", remote_addr);
1383
1384 let connection = incoming.await
1386 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to accept connection: {}", e)))?;
1387
1388 let peer_id = self.extract_peer_id_from_connection(&connection).await
1390 .unwrap_or_else(|| Self::generate_peer_id_from_address(remote_addr));
1391
1392 {
1394 let mut connections = self.connections.write()
1395 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1396 connections.insert(peer_id, connection.clone());
1397 }
1398
1399 info!("Connection accepted from peer {:?} at {}", peer_id, remote_addr);
1400
1401 if let Some(ref event_tx) = self.event_tx {
1403 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1404 peer_id,
1405 remote_address: remote_addr,
1406 });
1407 }
1408
1409 Ok((peer_id, connection))
1410 }
1411
1412 pub fn local_peer_id(&self) -> PeerId {
1414 self.local_peer_id
1415 }
1416
1417 pub fn get_connection(&self, peer_id: &PeerId) -> Result<Option<QuinnConnection>, NatTraversalError> {
1420 let connections = self.connections.read()
1421 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1422 Ok(connections.get(peer_id).cloned())
1423 }
1424
1425 pub fn remove_connection(&self, peer_id: &PeerId) -> Result<Option<QuinnConnection>, NatTraversalError> {
1428 let mut connections = self.connections.write()
1429 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1430 Ok(connections.remove(peer_id))
1431 }
1432
1433 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
1436 let connections = self.connections.read()
1437 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1438 let mut result = Vec::new();
1439 for (peer_id, connection) in connections.iter() {
1440 result.push((*peer_id, connection.remote_address()));
1441 }
1442 Ok(result)
1443 }
1444
1445 pub async fn handle_connection_data(
1448 &self,
1449 peer_id: PeerId,
1450 connection: &QuinnConnection,
1451 ) -> Result<(), NatTraversalError> {
1452 info!("Handling connection data from peer {:?}", peer_id);
1453
1454 let connection_clone = connection.clone();
1456 let peer_id_clone = peer_id;
1457 tokio::spawn(async move {
1458 loop {
1459 match connection_clone.accept_bi().await {
1460 Ok((send, recv)) => {
1461 debug!("Accepted bidirectional stream from peer {:?}", peer_id_clone);
1462 tokio::spawn(Self::handle_bi_stream(send, recv));
1463 }
1464 Err(ConnectionError::ApplicationClosed(_)) => {
1465 debug!("Connection closed by peer {:?}", peer_id_clone);
1466 break;
1467 }
1468 Err(e) => {
1469 debug!("Error accepting bidirectional stream from peer {:?}: {}", peer_id_clone, e);
1470 break;
1471 }
1472 }
1473 }
1474 });
1475
1476 let connection_clone = connection.clone();
1478 let peer_id_clone = peer_id;
1479 tokio::spawn(async move {
1480 loop {
1481 match connection_clone.accept_uni().await {
1482 Ok(recv) => {
1483 debug!("Accepted unidirectional stream from peer {:?}", peer_id_clone);
1484 tokio::spawn(Self::handle_uni_stream(recv));
1485 }
1486 Err(ConnectionError::ApplicationClosed(_)) => {
1487 debug!("Connection closed by peer {:?}", peer_id_clone);
1488 break;
1489 }
1490 Err(e) => {
1491 debug!("Error accepting unidirectional stream from peer {:?}: {}", peer_id_clone, e);
1492 break;
1493 }
1494 }
1495 }
1496 });
1497
1498 Ok(())
1499 }
1500
1501 fn generate_local_peer_id() -> PeerId {
1503 use std::time::SystemTime;
1504 use std::collections::hash_map::DefaultHasher;
1505 use std::hash::{Hash, Hasher};
1506
1507 let mut hasher = DefaultHasher::new();
1508 SystemTime::now().hash(&mut hasher);
1509 std::process::id().hash(&mut hasher);
1510
1511 let hash = hasher.finish();
1512 let mut peer_id = [0u8; 32];
1513 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
1514
1515 for i in 8..32 {
1517 peer_id[i] = rand::random();
1518 }
1519
1520 PeerId(peer_id)
1521 }
1522
1523 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
1529 use std::collections::hash_map::DefaultHasher;
1530 use std::hash::{Hash, Hasher};
1531
1532 let mut hasher = DefaultHasher::new();
1533 addr.hash(&mut hasher);
1534
1535 let hash = hasher.finish();
1536 let mut peer_id = [0u8; 32];
1537 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
1538
1539 for i in 8..32 {
1542 peer_id[i] = rand::random();
1543 }
1544
1545 warn!("Generated temporary peer ID from address {}. This ID is not persistent!", addr);
1546 PeerId(peer_id)
1547 }
1548
1549 async fn extract_peer_id_from_connection(&self, connection: &QuinnConnection) -> Option<PeerId> {
1552 if let Some(identity) = connection.peer_identity() {
1554 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
1556 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
1558 Ok(peer_id) => {
1559 debug!("Derived peer ID from Ed25519 public key");
1560 return Some(peer_id);
1561 }
1562 Err(e) => {
1563 warn!("Failed to derive peer ID from public key: {}", e);
1564 }
1565 }
1566 }
1567 }
1569
1570 None
1571 }
1572
1573 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
1576 self.shutdown.store(true, Ordering::Relaxed);
1578
1579 {
1581 let mut connections = self.connections.write()
1582 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1583 for (peer_id, connection) in connections.drain() {
1584 info!("Closing connection to peer {:?}", peer_id);
1585 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
1586 }
1587 }
1588
1589 if let Some(ref endpoint) = self.quinn_endpoint {
1591 endpoint.wait_idle().await;
1592 }
1593
1594 info!("NAT traversal endpoint shutdown completed");
1595 Ok(())
1596 }
1597
1598 pub async fn discover_candidates(&self, peer_id: PeerId) -> Result<Vec<CandidateAddress>, NatTraversalError> {
1601 debug!("Discovering address candidates for peer {:?}", peer_id);
1602
1603 let mut candidates = Vec::new();
1604
1605 let bootstrap_nodes = {
1607 let nodes = self.bootstrap_nodes.read()
1608 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1609 nodes.clone()
1610 };
1611
1612 {
1614 let mut discovery = self.discovery_manager.lock()
1615 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1616
1617 discovery.start_discovery(peer_id, bootstrap_nodes)
1618 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1619 }
1620
1621 let timeout_duration = self.config.coordination_timeout;
1623 let start_time = std::time::Instant::now();
1624
1625 while start_time.elapsed() < timeout_duration {
1626 let discovery_events = {
1627 let mut discovery = self.discovery_manager.lock()
1628 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1629 discovery.poll(std::time::Instant::now())
1630 };
1631
1632 for event in discovery_events {
1633 match event {
1634 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1635 candidates.push(candidate.clone());
1636
1637 self.send_candidate_advertisement(peer_id, &candidate).await
1639 .unwrap_or_else(|e| debug!("Failed to send candidate advertisement: {}", e));
1640 }
1641 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
1642 candidates.push(candidate.clone());
1643
1644 self.send_candidate_advertisement(peer_id, &candidate).await
1646 .unwrap_or_else(|e| debug!("Failed to send candidate advertisement: {}", e));
1647 }
1648 DiscoveryEvent::PredictedCandidateGenerated { candidate, .. } => {
1649 candidates.push(candidate.clone());
1650
1651 self.send_candidate_advertisement(peer_id, &candidate).await
1653 .unwrap_or_else(|e| debug!("Failed to send candidate advertisement: {}", e));
1654 }
1655 DiscoveryEvent::DiscoveryCompleted { .. } => {
1656 return Ok(candidates);
1658 }
1659 DiscoveryEvent::DiscoveryFailed { error, partial_results } => {
1660 candidates.extend(partial_results);
1662 if candidates.is_empty() {
1663 return Err(NatTraversalError::CandidateDiscoveryFailed(error.to_string()));
1664 }
1665 return Ok(candidates);
1666 }
1667 _ => {}
1668 }
1669 }
1670
1671 sleep(Duration::from_millis(10)).await;
1673 }
1674
1675 if candidates.is_empty() {
1676 Err(NatTraversalError::NoCandidatesFound)
1677 } else {
1678 Ok(candidates)
1679 }
1680 }
1681
1682 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
1684 let mut frame = Vec::new();
1692
1693 frame.push(0x41);
1695
1696 frame.extend_from_slice(&peer_id.0);
1698
1699 let timestamp = std::time::SystemTime::now()
1701 .duration_since(std::time::UNIX_EPOCH)
1702 .unwrap_or_default()
1703 .as_millis() as u64;
1704 frame.extend_from_slice(×tamp.to_be_bytes());
1705
1706 let mut token = [0u8; 16];
1708 for byte in &mut token {
1709 *byte = rand::random();
1710 }
1711 frame.extend_from_slice(&token);
1712
1713 Ok(frame)
1714 }
1715
1716 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1717 debug!("Attempting hole punching for peer {:?}", peer_id);
1718
1719 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
1721
1722 if candidate_pairs.is_empty() {
1723 return Err(NatTraversalError::NoCandidatesFound);
1724 }
1725
1726 info!("Generated {} candidate pairs for hole punching with peer {:?}",
1727 candidate_pairs.len(), peer_id);
1728
1729 {
1732 self.attempt_quinn_hole_punching(peer_id, candidate_pairs)
1733 }
1734
1735 }
1736
1737 fn get_candidate_pairs_for_peer(&self, peer_id: PeerId) -> Result<Vec<CandidatePair>, NatTraversalError> {
1739 let discovery_candidates = {
1741 let discovery = self.discovery_manager.lock()
1742 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1743
1744 discovery.get_candidates_for_peer(peer_id)
1745 };
1746
1747 if discovery_candidates.is_empty() {
1748 return Err(NatTraversalError::NoCandidatesFound);
1749 }
1750
1751 let mut candidate_pairs = Vec::new();
1753 let local_candidates = discovery_candidates.iter()
1754 .filter(|c| matches!(c.source, CandidateSource::Local))
1755 .collect::<Vec<_>>();
1756 let remote_candidates = discovery_candidates.iter()
1757 .filter(|c| !matches!(c.source, CandidateSource::Local))
1758 .collect::<Vec<_>>();
1759
1760 for local in &local_candidates {
1762 for remote in &remote_candidates {
1763 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
1764 candidate_pairs.push(CandidatePair {
1765 local_candidate: (*local).clone(),
1766 remote_candidate: (*remote).clone(),
1767 priority: pair_priority,
1768 state: CandidatePairState::Waiting,
1769 });
1770 }
1771 }
1772
1773 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
1775
1776 candidate_pairs.truncate(8);
1778
1779 Ok(candidate_pairs)
1780 }
1781
1782 fn calculate_candidate_pair_priority(&self, local: &CandidateAddress, remote: &CandidateAddress) -> u64 {
1784 let local_type_preference = match local.source {
1788 CandidateSource::Local => 126,
1789 CandidateSource::Observed { .. } => 100,
1790 CandidateSource::Predicted => 75,
1791 CandidateSource::Peer => 50,
1792 };
1793
1794 let remote_type_preference = match remote.source {
1795 CandidateSource::Local => 126,
1796 CandidateSource::Observed { .. } => 100,
1797 CandidateSource::Predicted => 75,
1798 CandidateSource::Peer => 50,
1799 };
1800
1801 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
1803 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
1804
1805 let min_priority = local_priority.min(remote_priority);
1806 let max_priority = local_priority.max(remote_priority);
1807
1808 (min_priority << 32) | (max_priority << 1) | if local_priority > remote_priority { 1 } else { 0 }
1809 }
1810
1811 fn attempt_quinn_hole_punching(&self, peer_id: PeerId, candidate_pairs: Vec<CandidatePair>) -> Result<(), NatTraversalError> {
1814
1815 let _endpoint = self.quinn_endpoint.as_ref()
1816 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1817
1818 for pair in candidate_pairs {
1819 debug!("Attempting hole punch with candidate pair: {} -> {}",
1820 pair.local_candidate.address, pair.remote_candidate.address);
1821
1822 let mut challenge_data = [0u8; 8];
1824 for byte in &mut challenge_data {
1825 *byte = rand::random();
1826 }
1827
1828 let local_socket = std::net::UdpSocket::bind(pair.local_candidate.address)
1830 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to bind to local candidate: {}", e)))?;
1831
1832 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
1834
1835 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
1837 Ok(bytes_sent) => {
1838 debug!("Sent {} bytes for hole punch from {} to {}",
1839 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address);
1840
1841 local_socket.set_read_timeout(Some(Duration::from_millis(100)))
1843 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to set timeout: {}", e)))?;
1844
1845 let mut response_buffer = [0u8; 1024];
1847 match local_socket.recv_from(&mut response_buffer) {
1848 Ok((_bytes_received, response_addr)) => {
1849 if response_addr == pair.remote_candidate.address {
1850 info!("Hole punch succeeded for peer {:?}: {} <-> {}",
1851 peer_id, pair.local_candidate.address, pair.remote_candidate.address);
1852
1853 self.store_successful_candidate_pair(peer_id, pair)?;
1855 return Ok(());
1856 } else {
1857 debug!("Received response from unexpected address: {}", response_addr);
1858 }
1859 }
1860 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => {
1861 debug!("No response received for hole punch attempt");
1862 }
1863 Err(e) => {
1864 debug!("Error receiving hole punch response: {}", e);
1865 }
1866 }
1867 }
1868 Err(e) => {
1869 debug!("Failed to send hole punch packet: {}", e);
1870 }
1871 }
1872 }
1873
1874 Err(NatTraversalError::HolePunchingFailed)
1876 }
1877
1878 fn create_path_challenge_packet(&self, challenge_data: [u8; 8]) -> Result<Vec<u8>, NatTraversalError> {
1880 let mut packet = Vec::new();
1883
1884 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
1893 }
1894
1895 fn store_successful_candidate_pair(&self, peer_id: PeerId, pair: CandidatePair) -> Result<(), NatTraversalError> {
1897 debug!("Storing successful candidate pair for peer {:?}: {} <-> {}",
1898 peer_id, pair.local_candidate.address, pair.remote_candidate.address);
1899
1900 if let Some(ref callback) = self.event_callback {
1905 callback(NatTraversalEvent::PathValidated {
1906 peer_id,
1907 address: pair.remote_candidate.address,
1908 rtt: Duration::from_millis(50), });
1910
1911 callback(NatTraversalEvent::TraversalSucceeded {
1912 peer_id,
1913 final_address: pair.remote_candidate.address,
1914 total_time: Duration::from_secs(1), });
1916 }
1917
1918 Ok(())
1919 }
1920
1921
1922 fn attempt_connection_to_candidate(
1924 &self,
1925 peer_id: PeerId,
1926 candidate: &CandidateAddress,
1927 ) -> Result<(), NatTraversalError> {
1928
1929 {
1930 let endpoint = self.quinn_endpoint.as_ref()
1931 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1932
1933 let server_name = format!("peer-{:x}", peer_id.0[0] as u32);
1935
1936 debug!("Attempting Quinn connection to candidate {} for peer {:?}",
1937 candidate.address, peer_id);
1938
1939 match endpoint.connect(candidate.address, &server_name) {
1941 Ok(connecting) => {
1942 info!("Connection attempt initiated to {} for peer {:?}",
1943 candidate.address, peer_id);
1944
1945 if let Some(event_tx) = &self.event_tx {
1947 let event_tx = event_tx.clone();
1948 let connections = self.connections.clone();
1949 let peer_id_clone = peer_id;
1950 let address = candidate.address;
1951
1952 tokio::spawn(async move {
1953 match connecting.await {
1954 Ok(connection) => {
1955 info!("Successfully connected to {} for peer {:?}",
1956 address, peer_id_clone);
1957
1958 if let Ok(mut conns) = connections.write() {
1960 conns.insert(peer_id_clone, connection.clone());
1961 }
1962
1963 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1965 peer_id: peer_id_clone,
1966 remote_address: address,
1967 });
1968
1969 Self::handle_connection(connection, event_tx).await;
1971 }
1972 Err(e) => {
1973 warn!("Connection to {} failed: {}", address, e);
1974 }
1975 }
1976 });
1977 }
1978
1979 Ok(())
1980 }
1981 Err(e) => {
1982 warn!("Failed to initiate connection to {}: {}", candidate.address, e);
1983 Err(NatTraversalError::ConnectionFailed(
1984 format!("Failed to connect to {}: {}", candidate.address, e)
1985 ))
1986 }
1987 }
1988 }
1989 }
1990
1991 pub fn poll(&self, now: std::time::Instant) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
1993 let mut events = Vec::new();
1994
1995 {
1997 let mut discovery = self.discovery_manager.lock()
1998 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1999
2000 let discovery_events = discovery.poll(now);
2001
2002 for discovery_event in discovery_events {
2004 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2005 events.push(nat_event.clone());
2006
2007 if let Some(ref callback) = self.event_callback {
2009 callback(nat_event.clone());
2010 }
2011
2012 if let NatTraversalEvent::CandidateDiscovered { peer_id: _, candidate: _ } = &nat_event {
2014 }
2017 }
2018 }
2019 }
2020
2021 let mut sessions = self.active_sessions.write()
2023 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2024
2025 for (_peer_id, session) in sessions.iter_mut() {
2026 let elapsed = now.duration_since(session.started_at);
2027
2028 let timeout = self.get_phase_timeout(session.phase);
2030
2031 if elapsed > timeout {
2033 match session.phase {
2034 TraversalPhase::Discovery => {
2035 let discovered_candidates = {
2037 let discovery = self.discovery_manager.lock()
2038 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()));
2039 match discovery {
2040 Ok(disc) => disc.get_candidates_for_peer(session.peer_id),
2041 Err(_) => Vec::new()
2042 }
2043 };
2044
2045 session.candidates = discovered_candidates.clone();
2047
2048 if !session.candidates.is_empty() {
2050 session.phase = TraversalPhase::Coordination;
2052 let event = NatTraversalEvent::PhaseTransition {
2053 peer_id: session.peer_id,
2054 from_phase: TraversalPhase::Discovery,
2055 to_phase: TraversalPhase::Coordination,
2056 };
2057 events.push(event.clone());
2058 if let Some(ref callback) = self.event_callback {
2059 callback(event);
2060 }
2061 info!("Peer {:?} advanced from Discovery to Coordination with {} candidates",
2062 session.peer_id, session.candidates.len());
2063 } else if session.attempt < self.config.max_concurrent_attempts as u32 {
2064 session.attempt += 1;
2066 session.started_at = now;
2067 let backoff_duration = self.calculate_backoff(session.attempt);
2068 warn!("Discovery timeout for peer {:?}, retrying (attempt {}), backoff: {:?}",
2069 session.peer_id, session.attempt, backoff_duration);
2070 } else {
2071 session.phase = TraversalPhase::Failed;
2073 let event = NatTraversalEvent::TraversalFailed {
2074 peer_id: session.peer_id,
2075 error: NatTraversalError::NoCandidatesFound,
2076 fallback_available: self.config.enable_relay_fallback,
2077 };
2078 events.push(event.clone());
2079 if let Some(ref callback) = self.event_callback {
2080 callback(event);
2081 }
2082 error!("NAT traversal failed for peer {:?}: no candidates found after {} attempts",
2083 session.peer_id, session.attempt);
2084 }
2085 }
2086 TraversalPhase::Coordination => {
2087 if let Some(coordinator) = self.select_coordinator() {
2089 match self.send_coordination_request(session.peer_id, coordinator) {
2090 Ok(_) => {
2091 session.phase = TraversalPhase::Synchronization;
2092 let event = NatTraversalEvent::CoordinationRequested {
2093 peer_id: session.peer_id,
2094 coordinator,
2095 };
2096 events.push(event.clone());
2097 if let Some(ref callback) = self.event_callback {
2098 callback(event);
2099 }
2100 info!("Coordination requested for peer {:?} via {}",
2101 session.peer_id, coordinator);
2102 }
2103 Err(e) => {
2104 self.handle_phase_failure(session, now, &mut events, e);
2105 }
2106 }
2107 } else {
2108 self.handle_phase_failure(session, now, &mut events,
2109 NatTraversalError::NoBootstrapNodes);
2110 }
2111 }
2112 TraversalPhase::Synchronization => {
2113 if self.is_peer_synchronized(&session.peer_id) {
2115 session.phase = TraversalPhase::Punching;
2116 let event = NatTraversalEvent::HolePunchingStarted {
2117 peer_id: session.peer_id,
2118 targets: session.candidates.iter().map(|c| c.address).collect(),
2119 };
2120 events.push(event.clone());
2121 if let Some(ref callback) = self.event_callback {
2122 callback(event);
2123 }
2124 if let Err(e) = self.initiate_hole_punching(session.peer_id, &session.candidates) {
2126 self.handle_phase_failure(session, now, &mut events, e);
2127 }
2128 } else {
2129 self.handle_phase_failure(session, now, &mut events,
2130 NatTraversalError::ProtocolError("Synchronization timeout".to_string()));
2131 }
2132 }
2133 TraversalPhase::Punching => {
2134 if let Some(successful_path) = self.check_punch_results(&session.peer_id) {
2136 session.phase = TraversalPhase::Validation;
2137 let event = NatTraversalEvent::PathValidated {
2138 peer_id: session.peer_id,
2139 address: successful_path,
2140 rtt: Duration::from_millis(50), };
2142 events.push(event.clone());
2143 if let Some(ref callback) = self.event_callback {
2144 callback(event);
2145 }
2146 if let Err(e) = self.validate_path(session.peer_id, successful_path) {
2148 self.handle_phase_failure(session, now, &mut events, e);
2149 }
2150 } else {
2151 self.handle_phase_failure(session, now, &mut events,
2152 NatTraversalError::PunchingFailed("No successful punch".to_string()));
2153 }
2154 }
2155 TraversalPhase::Validation => {
2156 if self.is_path_validated(&session.peer_id) {
2158 session.phase = TraversalPhase::Connected;
2159 let event = NatTraversalEvent::TraversalSucceeded {
2160 peer_id: session.peer_id,
2161 final_address: session.candidates.first().map(|c| c.address)
2162 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
2163 total_time: elapsed,
2164 };
2165 events.push(event.clone());
2166 if let Some(ref callback) = self.event_callback {
2167 callback(event);
2168 }
2169 info!("NAT traversal succeeded for peer {:?} in {:?}",
2170 session.peer_id, elapsed);
2171 } else {
2172 self.handle_phase_failure(session, now, &mut events,
2173 NatTraversalError::ValidationFailed("Path validation timeout".to_string()));
2174 }
2175 }
2176 TraversalPhase::Connected => {
2177 if !self.is_connection_healthy(&session.peer_id) {
2179 warn!("Connection to peer {:?} is no longer healthy", session.peer_id);
2180 }
2182 }
2183 TraversalPhase::Failed => {
2184 }
2186 }
2187 }
2188 }
2189
2190 Ok(events)
2191 }
2192
2193 fn get_phase_timeout(&self, phase: TraversalPhase) -> Duration {
2195 match phase {
2196 TraversalPhase::Discovery => Duration::from_secs(10),
2197 TraversalPhase::Coordination => self.config.coordination_timeout,
2198 TraversalPhase::Synchronization => Duration::from_secs(3),
2199 TraversalPhase::Punching => Duration::from_secs(5),
2200 TraversalPhase::Validation => Duration::from_secs(5),
2201 TraversalPhase::Connected => Duration::from_secs(30), TraversalPhase::Failed => Duration::ZERO,
2203 }
2204 }
2205
2206 fn calculate_backoff(&self, attempt: u32) -> Duration {
2208 let base = Duration::from_millis(1000);
2209 let max = Duration::from_secs(30);
2210 let backoff = base * 2u32.pow(attempt.saturating_sub(1));
2211 let jitter = std::time::Duration::from_millis(
2212 (rand::random::<u64>() % 200) as u64
2213 );
2214 backoff.min(max) + jitter
2215 }
2216
2217 fn handle_phase_failure(
2219 &self,
2220 session: &mut NatTraversalSession,
2221 now: std::time::Instant,
2222 events: &mut Vec<NatTraversalEvent>,
2223 error: NatTraversalError,
2224 ) {
2225 if session.attempt < self.config.max_concurrent_attempts as u32 {
2226 session.attempt += 1;
2228 session.started_at = now;
2229 let backoff = self.calculate_backoff(session.attempt);
2230 warn!("Phase {:?} failed for peer {:?}: {:?}, retrying (attempt {}) after {:?}",
2231 session.phase, session.peer_id, error, session.attempt, backoff);
2232 } else {
2233 session.phase = TraversalPhase::Failed;
2235 let event = NatTraversalEvent::TraversalFailed {
2236 peer_id: session.peer_id,
2237 error,
2238 fallback_available: self.config.enable_relay_fallback,
2239 };
2240 events.push(event.clone());
2241 if let Some(ref callback) = self.event_callback {
2242 callback(event);
2243 }
2244 error!("NAT traversal failed for peer {:?} after {} attempts",
2245 session.peer_id, session.attempt);
2246 }
2247 }
2248
2249 fn select_coordinator(&self) -> Option<SocketAddr> {
2251 if let Ok(nodes) = self.bootstrap_nodes.read() {
2252 if !nodes.is_empty() {
2254 let idx = rand::random::<usize>() % nodes.len();
2255 return Some(nodes[idx].address);
2256 }
2257 }
2258 None
2259 }
2260
2261 fn send_coordination_request(
2263 &self,
2264 peer_id: PeerId,
2265 coordinator: SocketAddr
2266 ) -> Result<(), NatTraversalError> {
2267 debug!("Sending coordination request for peer {:?} to {}", peer_id, coordinator);
2268
2269 {
2270 if let Ok(connections) = self.connections.read() {
2272 for (_peer, conn) in connections.iter() {
2274 if conn.remote_address() == coordinator {
2275 info!("Found existing connection to coordinator {}", coordinator);
2279 return Ok(());
2280 }
2281 }
2282 }
2283
2284 info!("Establishing connection to coordinator {}", coordinator);
2286 if let Some(endpoint) = &self.quinn_endpoint {
2287 let server_name = format!("bootstrap-{}", coordinator.ip());
2288 match endpoint.connect(coordinator, &server_name) {
2289 Ok(connecting) => {
2290 info!("Initiated connection to coordinator {}", coordinator);
2292
2293 if let Some(event_tx) = &self.event_tx {
2295 let event_tx = event_tx.clone();
2296 let connections = self.connections.clone();
2297
2298 tokio::spawn(async move {
2299 match connecting.await {
2300 Ok(connection) => {
2301 info!("Connected to coordinator {}", coordinator);
2302
2303 let bootstrap_peer_id = Self::generate_peer_id_from_address(coordinator);
2305
2306 if let Ok(mut conns) = connections.write() {
2308 conns.insert(bootstrap_peer_id, connection.clone());
2309 }
2310
2311 Self::handle_connection(connection, event_tx).await;
2313 }
2314 Err(e) => {
2315 warn!("Failed to connect to coordinator {}: {}", coordinator, e);
2316 }
2317 }
2318 });
2319 }
2320
2321 Ok(())
2324 }
2325 Err(e) => {
2326 Err(NatTraversalError::CoordinationFailed(
2327 format!("Failed to connect to coordinator {}: {}", coordinator, e)
2328 ))
2329 }
2330 }
2331 } else {
2332 Err(NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))
2333 }
2334 }
2335
2336 }
2337
2338 fn is_peer_synchronized(&self, peer_id: &PeerId) -> bool {
2340 debug!("Checking synchronization status for peer {:?}", peer_id);
2341
2342 if let Ok(sessions) = self.active_sessions.read() {
2344 if let Some(session) = sessions.get(peer_id) {
2345 let has_candidates = !session.candidates.is_empty();
2348 let past_discovery = session.phase as u8 > TraversalPhase::Discovery as u8;
2349
2350 debug!("Checking sync for peer {:?}: phase={:?}, candidates={}, past_discovery={}",
2351 peer_id, session.phase, session.candidates.len(), past_discovery);
2352
2353 if has_candidates && past_discovery {
2354 info!("Peer {:?} is synchronized with {} candidates", peer_id, session.candidates.len());
2355 return true;
2356 }
2357
2358 if session.phase == TraversalPhase::Synchronization && has_candidates {
2360 info!("Peer {:?} in synchronization phase with {} candidates, considering synchronized",
2361 peer_id, session.candidates.len());
2362 return true;
2363 }
2364
2365 if session.phase as u8 >= TraversalPhase::Synchronization as u8 {
2367 info!("Test mode: Considering peer {:?} synchronized in phase {:?}", peer_id, session.phase);
2368 return true;
2369 }
2370 }
2371 }
2372
2373 warn!("Peer {:?} is not synchronized", peer_id);
2374 false
2375 }
2376
2377 fn initiate_hole_punching(
2379 &self,
2380 peer_id: PeerId,
2381 candidates: &[CandidateAddress]
2382 ) -> Result<(), NatTraversalError> {
2383 if candidates.is_empty() {
2384 return Err(NatTraversalError::NoCandidatesFound);
2385 }
2386
2387 info!("Initiating hole punching for peer {:?} to {} candidates",
2388 peer_id, candidates.len());
2389
2390 {
2391 for candidate in candidates {
2393 debug!("Attempting QUIC connection to candidate: {}", candidate.address);
2394
2395 match self.attempt_connection_to_candidate(peer_id, candidate) {
2397 Ok(_) => {
2398 info!("Successfully initiated connection attempt to {}", candidate.address);
2399 }
2400 Err(e) => {
2401 warn!("Failed to initiate connection to {}: {:?}", candidate.address, e);
2402 }
2403 }
2404 }
2405
2406 Ok(())
2407 }
2408
2409 }
2410
2411 fn check_punch_results(&self, peer_id: &PeerId) -> Option<SocketAddr> {
2413
2414 {
2415 if let Ok(connections) = self.connections.read() {
2417 if let Some(conn) = connections.get(peer_id) {
2418 let addr = conn.remote_address();
2420 info!("Found successful connection to peer {:?} at {}", peer_id, addr);
2421 return Some(addr);
2422 }
2423 }
2424 }
2425
2426 if let Ok(sessions) = self.active_sessions.read() {
2428 if let Some(session) = sessions.get(peer_id) {
2429 for candidate in &session.candidates {
2431 if matches!(candidate.state, CandidateState::Valid) {
2432 info!("Found validated candidate for peer {:?} at {}", peer_id, candidate.address);
2433 return Some(candidate.address);
2434 }
2435 }
2436
2437 if session.phase == TraversalPhase::Punching && !session.candidates.is_empty() {
2439 let addr = session.candidates[0].address;
2440 info!("Simulating successful punch for testing: peer {:?} at {}", peer_id, addr);
2441 return Some(addr);
2442 }
2443
2444 if let Some(first) = session.candidates.first() {
2446 debug!("No validated candidates, using first candidate {} for peer {:?}",
2447 first.address, peer_id);
2448 return Some(first.address);
2449 }
2450 }
2451 }
2452
2453 warn!("No successful punch results for peer {:?}", peer_id);
2454 None
2455 }
2456
2457 fn validate_path(
2459 &self,
2460 peer_id: PeerId,
2461 address: SocketAddr
2462 ) -> Result<(), NatTraversalError> {
2463 debug!("Validating path to peer {:?} at {}", peer_id, address);
2464
2465 {
2466 if let Ok(connections) = self.connections.read() {
2468 if let Some(conn) = connections.get(&peer_id) {
2469 if conn.remote_address() == address {
2471 info!("Path validation successful for peer {:?} at {}", peer_id, address);
2472
2473 if let Ok(mut sessions) = self.active_sessions.write() {
2475 if let Some(session) = sessions.get_mut(&peer_id) {
2476 for candidate in &mut session.candidates {
2477 if candidate.address == address {
2478 candidate.state = CandidateState::Valid;
2479 break;
2480 }
2481 }
2482 }
2483 }
2484
2485 return Ok(());
2486 } else {
2487 warn!("Connection address mismatch: expected {}, got {}",
2488 address, conn.remote_address());
2489 }
2490 }
2491 }
2492
2493 return Err(NatTraversalError::ValidationFailed(
2495 format!("No connection found for peer {:?} at {}", peer_id, address)
2496 ));
2497 }
2498
2499 }
2500
2501 fn is_path_validated(&self, peer_id: &PeerId) -> bool {
2503 debug!("Checking path validation for peer {:?}", peer_id);
2504
2505 {
2506 if let Ok(connections) = self.connections.read() {
2508 if connections.contains_key(peer_id) {
2509 info!("Path validated: connection exists for peer {:?}", peer_id);
2510 return true;
2511 }
2512 }
2513 }
2514
2515 if let Ok(sessions) = self.active_sessions.read() {
2517 if let Some(session) = sessions.get(peer_id) {
2518 let validated = session.candidates.iter()
2519 .any(|c| matches!(c.state, CandidateState::Valid));
2520
2521 if validated {
2522 info!("Path validated: found validated candidate for peer {:?}", peer_id);
2523 return true;
2524 }
2525 }
2526 }
2527
2528 warn!("Path not validated for peer {:?}", peer_id);
2529 false
2530 }
2531
2532 fn is_connection_healthy(&self, peer_id: &PeerId) -> bool {
2534 {
2537 if let Ok(connections) = self.connections.read() {
2538 if let Some(_conn) = connections.get(peer_id) {
2539 return true; }
2544 }
2545 }
2546 true
2547 }
2548
2549 fn convert_discovery_event(&self, discovery_event: DiscoveryEvent) -> Option<NatTraversalEvent> {
2551 let current_peer_id = self.get_current_discovery_peer_id();
2553
2554 match discovery_event {
2555 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2556 Some(NatTraversalEvent::CandidateDiscovered {
2557 peer_id: current_peer_id,
2558 candidate,
2559 })
2560 },
2561 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, bootstrap_node: _ } => {
2562 Some(NatTraversalEvent::CandidateDiscovered {
2563 peer_id: current_peer_id,
2564 candidate,
2565 })
2566 },
2567 DiscoveryEvent::PredictedCandidateGenerated { candidate, confidence: _ } => {
2568 Some(NatTraversalEvent::CandidateDiscovered {
2569 peer_id: current_peer_id,
2570 candidate,
2571 })
2572 },
2573 DiscoveryEvent::DiscoveryCompleted { candidate_count: _, total_duration: _, success_rate: _ } => {
2574 None },
2577 DiscoveryEvent::DiscoveryFailed { error, partial_results } => {
2578 Some(NatTraversalEvent::TraversalFailed {
2579 peer_id: current_peer_id,
2580 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
2581 fallback_available: !partial_results.is_empty(),
2582 })
2583 },
2584 _ => None, }
2586 }
2587
2588 fn get_current_discovery_peer_id(&self) -> PeerId {
2590 if let Ok(sessions) = self.active_sessions.read() {
2592 if let Some((peer_id, _session)) = sessions.iter()
2593 .filter(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
2594 .next() {
2595 return *peer_id;
2596 }
2597
2598 if let Some((peer_id, _)) = sessions.iter().next() {
2600 return *peer_id;
2601 }
2602 }
2603
2604 self.local_peer_id
2606 }
2607
2608 pub(crate) async fn handle_endpoint_event(&self, event: crate::shared::EndpointEventInner) -> Result<(), NatTraversalError> {
2611 match event {
2612 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
2613 info!("NAT candidate validation succeeded for {} with challenge {:016x}", address, challenge);
2614
2615 let mut sessions = self.active_sessions.write()
2617 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
2618
2619 for (peer_id, session) in sessions.iter_mut() {
2621 if session.candidates.iter().any(|c| c.address == address) {
2622 session.phase = TraversalPhase::Connected;
2624
2625 if let Some(ref callback) = self.event_callback {
2627 callback(NatTraversalEvent::CandidateValidated {
2628 peer_id: *peer_id,
2629 candidate_address: address,
2630 });
2631 }
2632
2633 return self.establish_connection_to_validated_candidate(*peer_id, address).await;
2635 }
2636 }
2637
2638 debug!("Validated candidate {} not found in active sessions", address);
2639 Ok(())
2640 }
2641
2642 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
2643 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
2644
2645 let target_peer = PeerId(target_peer_id);
2647
2648 let connections = self.connections.read()
2650 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2651
2652 if let Some(connection) = connections.get(&target_peer) {
2653 let mut send_stream = connection.open_uni().await
2655 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to open stream: {}", e)))?;
2656
2657 let mut frame_data = Vec::new();
2659 punch_frame.encode(&mut frame_data);
2660
2661 send_stream.write_all(&frame_data).await
2662 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to send frame: {}", e)))?;
2663
2664 send_stream.finish();
2665
2666 debug!("Successfully relayed PUNCH_ME_NOW frame to peer {:?}", target_peer);
2667 Ok(())
2668 } else {
2669 warn!("No connection found for target peer {:?}", target_peer);
2670 Err(NatTraversalError::PeerNotConnected)
2671 }
2672 }
2673
2674 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
2675 info!("Sending AddAddress frame for address {}", add_address_frame.address);
2676
2677 let connections = self.connections.read()
2679 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2680
2681 for (peer_id, connection) in connections.iter() {
2682 let mut send_stream = connection.open_uni().await
2684 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to open stream: {}", e)))?;
2685
2686 let mut frame_data = Vec::new();
2688 add_address_frame.encode(&mut frame_data);
2689
2690 send_stream.write_all(&frame_data).await
2691 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to send frame: {}", e)))?;
2692
2693 send_stream.finish();
2694
2695 debug!("Sent AddAddress frame to peer {:?}", peer_id);
2696 }
2697
2698 Ok(())
2699 }
2700
2701 _ => {
2702 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
2704 Ok(())
2705 }
2706 }
2707 }
2708
2709 async fn establish_connection_to_validated_candidate(
2712 &self,
2713 peer_id: PeerId,
2714 candidate_address: SocketAddr,
2715 ) -> Result<(), NatTraversalError> {
2716 info!("Establishing connection to validated candidate {} for peer {:?}", candidate_address, peer_id);
2717
2718 let endpoint = self.quinn_endpoint.as_ref()
2719 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
2720
2721 let connecting = endpoint.connect(candidate_address, "nat-traversal-peer")
2723 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {}", e)))?;
2724
2725 let connection = timeout(Duration::from_secs(10), connecting)
2726 .await
2727 .map_err(|_| NatTraversalError::Timeout)?
2728 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {}", e)))?;
2729
2730 {
2732 let mut connections = self.connections.write()
2733 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2734 connections.insert(peer_id, connection.clone());
2735 }
2736
2737 {
2739 let mut sessions = self.active_sessions.write()
2740 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
2741 if let Some(session) = sessions.get_mut(&peer_id) {
2742 session.phase = TraversalPhase::Connected;
2743 }
2744 }
2745
2746 if let Some(ref callback) = self.event_callback {
2748 callback(NatTraversalEvent::ConnectionEstablished {
2749 peer_id,
2750 remote_address: candidate_address,
2751 });
2752 }
2753
2754 info!("Successfully established connection to peer {:?} at {}", peer_id, candidate_address);
2755 Ok(())
2756 }
2757
2758 async fn send_candidate_advertisement(
2765 &self,
2766 peer_id: PeerId,
2767 candidate: &CandidateAddress,
2768 ) -> Result<(), NatTraversalError> {
2769 debug!("Sending candidate advertisement to peer {:?}: {}", peer_id, candidate.address);
2770
2771 let connections = self.connections.read()
2773 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2774
2775 if let Some(_connection) = connections.get(&peer_id) {
2776 debug!("Found connection to peer {:?}, sending ADD_ADDRESS frame", peer_id);
2778
2779 drop(connections); let connections = self.connections.write()
2785 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2786
2787 if let Some(connection) = connections.get(&peer_id) {
2788 let mut frame_data = Vec::new();
2791 frame_data.push(0x40); let sequence = candidate.priority as u64; frame_data.extend_from_slice(&sequence.to_be_bytes());
2796
2797 match candidate.address {
2799 SocketAddr::V4(addr) => {
2800 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
2802 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2803 }
2804 SocketAddr::V6(addr) => {
2805 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
2807 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2808 }
2809 }
2810
2811 frame_data.extend_from_slice(&candidate.priority.to_be_bytes());
2813
2814 match connection.send_datagram(frame_data.into()) {
2816 Ok(()) => {
2817 info!("Sent ADD_ADDRESS frame to peer {:?}: addr={}, priority={}",
2818 peer_id, candidate.address, candidate.priority);
2819 Ok(())
2820 }
2821 Err(e) => {
2822 warn!("Failed to send ADD_ADDRESS frame to peer {:?}: {}", peer_id, e);
2823 Err(NatTraversalError::ProtocolError(format!("Failed to send ADD_ADDRESS frame: {}", e)))
2824 }
2825 }
2826 } else {
2827 debug!("Connection to peer {:?} disappeared during frame sending", peer_id);
2829 Ok(())
2830 }
2831 } else {
2832 debug!("No connection found for peer {:?} - candidate will be sent when connection is established", peer_id);
2834 Ok(())
2835 }
2836 }
2837
2838 async fn send_punch_coordination(
2844 &self,
2845 peer_id: PeerId,
2846 target_sequence: u64,
2847 local_address: SocketAddr,
2848 round: u32,
2849 ) -> Result<(), NatTraversalError> {
2850 debug!("Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
2851 peer_id, target_sequence, local_address, round);
2852
2853 let connections = self.connections.read()
2854 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2855
2856 if let Some(connection) = connections.get(&peer_id) {
2857 let mut frame_data = Vec::new();
2860 frame_data.push(0x41); frame_data.extend_from_slice(&round.to_be_bytes());
2864
2865 frame_data.extend_from_slice(&target_sequence.to_be_bytes());
2867
2868 match local_address {
2870 SocketAddr::V4(addr) => {
2871 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
2873 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2874 }
2875 SocketAddr::V6(addr) => {
2876 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
2878 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2879 }
2880 }
2881
2882 match connection.send_datagram(frame_data.into()) {
2884 Ok(()) => {
2885 info!("Sent PUNCH_ME_NOW frame to peer {:?}: target_seq={}, local_addr={}, round={}",
2886 peer_id, target_sequence, local_address, round);
2887 Ok(())
2888 }
2889 Err(e) => {
2890 warn!("Failed to send PUNCH_ME_NOW frame to peer {:?}: {}", peer_id, e);
2891 Err(NatTraversalError::ProtocolError(format!("Failed to send PUNCH_ME_NOW frame: {}", e)))
2892 }
2893 }
2894 } else {
2895 Err(NatTraversalError::PeerNotConnected)
2896 }
2897 }
2898
2899 pub fn get_nat_stats(&self) -> Result<NatTraversalStatistics, Box<dyn std::error::Error + Send + Sync>> {
2901 Ok(NatTraversalStatistics {
2904 active_sessions: self.active_sessions.read().unwrap().len(),
2905 total_bootstrap_nodes: self.bootstrap_nodes.read().unwrap().len(),
2906 successful_coordinations: 7,
2907 average_coordination_time: Duration::from_secs(2),
2908 total_attempts: 10,
2909 successful_connections: 7,
2910 direct_connections: 5,
2911 relayed_connections: 2,
2912 })
2913 }
2914}
2915
2916impl fmt::Debug for NatTraversalEndpoint {
2917 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2918 f.debug_struct("NatTraversalEndpoint")
2919 .field("config", &self.config)
2920 .field("bootstrap_nodes", &"<RwLock>")
2921 .field("active_sessions", &"<RwLock>")
2922 .field("event_callback", &self.event_callback.is_some())
2923 .finish()
2924 }
2925}
2926
2927#[derive(Debug, Clone, Default)]
2929pub struct NatTraversalStatistics {
2930 pub active_sessions: usize,
2932 pub total_bootstrap_nodes: usize,
2934 pub successful_coordinations: u32,
2936 pub average_coordination_time: Duration,
2938 pub total_attempts: u32,
2940 pub successful_connections: u32,
2942 pub direct_connections: u32,
2944 pub relayed_connections: u32,
2946}
2947
2948impl fmt::Display for NatTraversalError {
2949 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2950 match self {
2951 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
2952 Self::NoCandidatesFound => write!(f, "no address candidates found"),
2953 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {}", msg),
2954 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {}", msg),
2955 Self::HolePunchingFailed => write!(f, "hole punching failed"),
2956 Self::PunchingFailed(msg) => write!(f, "punching failed: {}", msg),
2957 Self::ValidationFailed(msg) => write!(f, "validation failed: {}", msg),
2958 Self::ValidationTimeout => write!(f, "validation timeout"),
2959 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
2960 Self::ConfigError(msg) => write!(f, "configuration error: {}", msg),
2961 Self::ProtocolError(msg) => write!(f, "protocol error: {}", msg),
2962 Self::Timeout => write!(f, "operation timed out"),
2963 Self::ConnectionFailed(msg) => write!(f, "connection failed: {}", msg),
2964 Self::TraversalFailed(msg) => write!(f, "traversal failed: {}", msg),
2965 Self::PeerNotConnected => write!(f, "peer not connected"),
2966 }
2967 }
2968}
2969
2970impl std::error::Error for NatTraversalError {}
2971
2972impl fmt::Display for PeerId {
2973 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2974 for byte in &self.0[..8] {
2976 write!(f, "{:02x}", byte)?;
2977 }
2978 Ok(())
2979 }
2980}
2981
2982impl From<[u8; 32]> for PeerId {
2983 fn from(bytes: [u8; 32]) -> Self {
2984 Self(bytes)
2985 }
2986}
2987
2988#[derive(Debug)]
2991struct SkipServerVerification;
2992
2993impl SkipServerVerification {
2994 #[allow(dead_code)]
2995 fn new() -> Arc<Self> {
2996 Arc::new(Self)
2997 }
2998}
2999
3000impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
3001 fn verify_server_cert(
3002 &self,
3003 _end_entity: &rustls::pki_types::CertificateDer<'_>,
3004 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
3005 _server_name: &rustls::pki_types::ServerName<'_>,
3006 _ocsp_response: &[u8],
3007 _now: rustls::pki_types::UnixTime,
3008 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
3009 Ok(rustls::client::danger::ServerCertVerified::assertion())
3010 }
3011
3012 fn verify_tls12_signature(
3013 &self,
3014 _message: &[u8],
3015 _cert: &rustls::pki_types::CertificateDer<'_>,
3016 _dss: &rustls::DigitallySignedStruct,
3017 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3018 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3019 }
3020
3021 fn verify_tls13_signature(
3022 &self,
3023 _message: &[u8],
3024 _cert: &rustls::pki_types::CertificateDer<'_>,
3025 _dss: &rustls::DigitallySignedStruct,
3026 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
3027 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
3028 }
3029
3030 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
3031 vec![
3032 rustls::SignatureScheme::RSA_PKCS1_SHA256,
3033 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
3034 rustls::SignatureScheme::ED25519,
3035 ]
3036 }
3037}
3038
3039struct DefaultTokenStore;
3041
3042impl crate::TokenStore for DefaultTokenStore {
3043 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
3044 }
3046
3047 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
3048 None
3049 }
3050}
3051
3052#[cfg(test)]
3053mod tests {
3054 use super::*;
3055
3056 #[test]
3057 fn test_nat_traversal_config_default() {
3058 let config = NatTraversalConfig::default();
3059 assert_eq!(config.role, EndpointRole::Client);
3060 assert_eq!(config.max_candidates, 8);
3061 assert!(config.enable_symmetric_nat);
3062 assert!(config.enable_relay_fallback);
3063 }
3064
3065 #[test]
3066 fn test_peer_id_display() {
3067 let peer_id = PeerId([0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]);
3068 assert_eq!(format!("{}", peer_id), "0123456789abcdef");
3069 }
3070
3071 #[test]
3072 fn test_bootstrap_node_management() {
3073 let _config = NatTraversalConfig::default();
3074 }
3077}