1use std::{
8 collections::HashMap,
9 fmt,
10 net::SocketAddr,
11 sync::Arc,
12 time::Duration,
13};
14
15use tracing::{debug, info, warn};
16
17#[cfg(feature = "production-ready")]
18use std::sync::atomic::{AtomicBool, Ordering};
19
20#[cfg(feature = "production-ready")]
21use tokio::{
22 net::UdpSocket,
23 sync::mpsc,
24 time::{sleep, timeout},
25};
26
27#[cfg(feature = "runtime-tokio")]
28use crate::quinn_high_level::TokioRuntime;
29
30use crate::{
31 candidate_discovery::{CandidateDiscoveryManager, DiscoveryConfig, DiscoveryEvent},
32 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
33 VarInt,
34};
35
36#[cfg(feature = "production-ready")]
37use crate::{
38 quinn_high_level::{Endpoint as QuinnEndpoint, Connection as QuinnConnection},
39 EndpointConfig,
40 ServerConfig,
41 ClientConfig,
42 ConnectionError,
43 TransportConfig,
44 crypto::rustls::QuicServerConfig,
45 crypto::rustls::QuicClientConfig,
46};
47
48
49#[cfg(feature = "production-ready")]
50use crate::config::validation::{ConfigValidator, ValidationResult};
51
52#[cfg(feature = "production-ready")]
53use crate::crypto::certificate_manager::{CertificateManager, CertificateConfig};
54
55pub struct NatTraversalEndpoint {
57 #[cfg(feature = "production-ready")]
59 quinn_endpoint: Option<QuinnEndpoint>,
60 #[cfg(not(feature = "production-ready"))]
62 internal_endpoint: Endpoint,
63 config: NatTraversalConfig,
65 bootstrap_nodes: Arc<std::sync::RwLock<Vec<BootstrapNode>>>,
67 active_sessions: Arc<std::sync::RwLock<HashMap<PeerId, NatTraversalSession>>>,
69 discovery_manager: Arc<std::sync::Mutex<CandidateDiscoveryManager>>,
71 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
73 #[cfg(feature = "production-ready")]
75 shutdown: Arc<AtomicBool>,
76 #[cfg(feature = "production-ready")]
78 event_tx: Option<mpsc::UnboundedSender<NatTraversalEvent>>,
79 #[cfg(feature = "production-ready")]
81 connections: Arc<std::sync::RwLock<HashMap<PeerId, QuinnConnection>>>,
82 local_peer_id: PeerId,
84}
85
86#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
88pub struct NatTraversalConfig {
89 pub role: EndpointRole,
91 pub bootstrap_nodes: Vec<SocketAddr>,
93 pub max_candidates: usize,
95 pub coordination_timeout: Duration,
97 pub enable_symmetric_nat: bool,
99 pub enable_relay_fallback: bool,
101 pub max_concurrent_attempts: usize,
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
107pub enum EndpointRole {
108 Client,
110 Server { can_coordinate: bool },
112 Bootstrap,
114}
115
116impl EndpointRole {
117 pub fn name(&self) -> &'static str {
119 match self {
120 EndpointRole::Client => "client",
121 EndpointRole::Server { .. } => "server",
122 EndpointRole::Bootstrap => "bootstrap",
123 }
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
129pub struct PeerId(pub [u8; 32]);
130
131#[derive(Debug, Clone)]
133pub struct BootstrapNode {
134 pub address: SocketAddr,
136 pub last_seen: std::time::Instant,
138 pub can_coordinate: bool,
140 pub rtt: Option<Duration>,
142 pub coordination_count: u32,
144}
145
146impl BootstrapNode {
147 pub fn new(address: SocketAddr) -> Self {
149 Self {
150 address,
151 last_seen: std::time::Instant::now(),
152 can_coordinate: true,
153 rtt: None,
154 coordination_count: 0,
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct CandidatePair {
162 pub local_candidate: CandidateAddress,
164 pub remote_candidate: CandidateAddress,
166 pub priority: u64,
168 pub state: CandidatePairState,
170}
171
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
174pub enum CandidatePairState {
175 Waiting,
177 InProgress,
179 Succeeded,
181 Failed,
183 Cancelled,
185}
186
187#[derive(Debug)]
189struct NatTraversalSession {
190 peer_id: PeerId,
192 coordinator: SocketAddr,
194 attempt: u32,
196 started_at: std::time::Instant,
198 phase: TraversalPhase,
200 candidates: Vec<CandidateAddress>,
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
206enum TraversalPhase {
207 Discovery,
209 Coordination,
211 Synchronization,
213 Punching,
215 Validation,
217 Connected,
219 Failed,
221}
222
223#[derive(Debug, Clone)]
225pub struct CandidateAddress {
226 pub address: SocketAddr,
228 pub priority: u32,
230 pub source: CandidateSource,
232 pub state: CandidateState,
234}
235
236
237#[derive(Debug, Clone)]
239pub enum NatTraversalEvent {
240 CandidateDiscovered {
242 peer_id: PeerId,
243 candidate: CandidateAddress,
244 },
245 CoordinationRequested {
247 peer_id: PeerId,
248 coordinator: SocketAddr,
249 },
250 CoordinationSynchronized {
252 peer_id: PeerId,
253 round_id: VarInt,
254 },
255 HolePunchingStarted {
257 peer_id: PeerId,
258 targets: Vec<SocketAddr>,
259 },
260 PathValidated {
262 peer_id: PeerId,
263 address: SocketAddr,
264 rtt: Duration,
265 },
266 CandidateValidated {
268 peer_id: PeerId,
269 candidate_address: SocketAddr,
270 },
271 TraversalSucceeded {
273 peer_id: PeerId,
274 final_address: SocketAddr,
275 total_time: Duration,
276 },
277 ConnectionEstablished {
279 peer_id: PeerId,
280 remote_address: SocketAddr,
282 },
283 TraversalFailed {
285 peer_id: PeerId,
286 error: NatTraversalError,
287 fallback_available: bool,
288 },
289 ConnectionLost {
291 peer_id: PeerId,
292 reason: String,
293 },
294}
295
296#[derive(Debug, Clone)]
298pub enum NatTraversalError {
299 NoBootstrapNodes,
301 NoCandidatesFound,
303 CandidateDiscoveryFailed(String),
305 CoordinationFailed(String),
307 HolePunchingFailed,
309 ValidationTimeout,
311 NetworkError(String),
313 ConfigError(String),
315 ProtocolError(String),
317 Timeout,
319 ConnectionFailed(String),
321 TraversalFailed(String),
323 PeerNotConnected,
325}
326
327impl Default for NatTraversalConfig {
328 fn default() -> Self {
329 Self {
330 role: EndpointRole::Client,
331 bootstrap_nodes: Vec::new(),
332 max_candidates: 8,
333 coordination_timeout: Duration::from_secs(10),
334 enable_symmetric_nat: true,
335 enable_relay_fallback: true,
336 max_concurrent_attempts: 3,
337 }
338 }
339}
340
341#[cfg(feature = "production-ready")]
342impl ConfigValidator for NatTraversalConfig {
343 fn validate(&self) -> ValidationResult<()> {
344 use crate::config::validation::*;
345
346 match self.role {
348 EndpointRole::Client => {
349 if self.bootstrap_nodes.is_empty() {
350 return Err(ConfigValidationError::InvalidRole(
351 "Client endpoints require at least one bootstrap node".to_string()
352 ));
353 }
354 }
355 EndpointRole::Server { can_coordinate } => {
356 if can_coordinate && self.bootstrap_nodes.is_empty() {
357 return Err(ConfigValidationError::InvalidRole(
358 "Server endpoints with coordination capability require bootstrap nodes".to_string()
359 ));
360 }
361 }
362 EndpointRole::Bootstrap => {
363 }
365 }
366
367 if !self.bootstrap_nodes.is_empty() {
369 validate_bootstrap_nodes(&self.bootstrap_nodes)?;
370 }
371
372 validate_range(
374 self.max_candidates,
375 1,
376 256,
377 "max_candidates"
378 )?;
379
380 validate_duration(
382 self.coordination_timeout,
383 Duration::from_millis(100),
384 Duration::from_secs(300),
385 "coordination_timeout"
386 )?;
387
388 validate_range(
390 self.max_concurrent_attempts,
391 1,
392 16,
393 "max_concurrent_attempts"
394 )?;
395
396 if self.max_concurrent_attempts > self.max_candidates {
398 return Err(ConfigValidationError::IncompatibleConfiguration(
399 "max_concurrent_attempts cannot exceed max_candidates".to_string()
400 ));
401 }
402
403 if self.role == EndpointRole::Bootstrap && self.enable_relay_fallback {
404 return Err(ConfigValidationError::IncompatibleConfiguration(
405 "Bootstrap nodes should not enable relay fallback".to_string()
406 ));
407 }
408
409 Ok(())
410 }
411}
412
413impl NatTraversalEndpoint {
414 pub async fn new(
416 config: NatTraversalConfig,
417 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
418 ) -> Result<Self, NatTraversalError> {
419 #[cfg(feature = "production-ready")]
420 {
421 Self::new_impl(config, event_callback).await
422 }
423 #[cfg(not(feature = "production-ready"))]
424 {
425 Ok(Self::new_fallback(config, event_callback)?)
427 }
428 }
429
430 #[cfg(feature = "production-ready")]
432 async fn new_impl(
433 config: NatTraversalConfig,
434 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
435 ) -> Result<Self, NatTraversalError> {
436 Self::new_common(config, event_callback).await
437 }
438
439 #[cfg(not(feature = "production-ready"))]
441 fn new_fallback(
442 config: NatTraversalConfig,
443 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
444 ) -> Result<Self, NatTraversalError> {
445 Self::new_common_sync(config, event_callback)
446 }
447
448 #[cfg(feature = "production-ready")]
450 async fn new_common(
451 config: NatTraversalConfig,
452 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
453 ) -> Result<Self, NatTraversalError> {
454 Self::new_shared_logic(config, event_callback).await
456 }
457
458 #[cfg(not(feature = "production-ready"))]
460 fn new_common_sync(
461 config: NatTraversalConfig,
462 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
463 ) -> Result<Self, NatTraversalError> {
464 Self::new_shared_logic_sync(config, event_callback)
466 }
467
468 #[cfg(feature = "production-ready")]
470 async fn new_shared_logic(
471 config: NatTraversalConfig,
472 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
473 ) -> Result<Self, NatTraversalError> {
474 #[cfg(feature = "production-ready")]
476 {
477 config.validate()
478 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
479 }
480
481 #[cfg(not(feature = "production-ready"))]
483 {
484 if config.bootstrap_nodes.is_empty() && config.role != EndpointRole::Bootstrap {
485 return Err(NatTraversalError::ConfigError(
486 "At least one bootstrap node required for non-bootstrap endpoints".to_string(),
487 ));
488 }
489 }
490
491 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
493 config
494 .bootstrap_nodes
495 .iter()
496 .map(|&address| BootstrapNode {
497 address,
498 last_seen: std::time::Instant::now(),
499 can_coordinate: true, rtt: None,
501 coordination_count: 0,
502 })
503 .collect(),
504 ));
505
506 let discovery_config = DiscoveryConfig {
508 total_timeout: config.coordination_timeout,
509 max_candidates: config.max_candidates,
510 enable_symmetric_prediction: config.enable_symmetric_nat,
511 ..DiscoveryConfig::default()
512 };
513
514 let nat_traversal_role = match config.role {
515 EndpointRole::Client => NatTraversalRole::Client,
516 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server { can_relay: can_coordinate },
517 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
518 };
519
520 let discovery_manager = Arc::new(std::sync::Mutex::new(
521 CandidateDiscoveryManager::new(discovery_config, nat_traversal_role)
522 ));
523
524 let (quinn_endpoint, event_tx) = Self::create_quinn_endpoint(&config, nat_traversal_role).await?;
527
528 Ok(Self {
529 quinn_endpoint: Some(quinn_endpoint),
530 config,
531 bootstrap_nodes,
532 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
533 discovery_manager,
534 event_callback,
535 shutdown: Arc::new(AtomicBool::new(false)),
536 event_tx: Some(event_tx),
537 connections: Arc::new(std::sync::RwLock::new(HashMap::new())),
538 local_peer_id: Self::generate_local_peer_id(),
539 })
540 }
541
542 #[cfg(not(feature = "production-ready"))]
544 fn new_shared_logic_sync(
545 config: NatTraversalConfig,
546 event_callback: Option<Box<dyn Fn(NatTraversalEvent) + Send + Sync>>,
547 ) -> Result<Self, NatTraversalError> {
548 config.validate()
550 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
551
552 let bootstrap_nodes = Arc::new(std::sync::RwLock::new(
554 config
555 .bootstrap_nodes
556 .iter()
557 .map(|&address| BootstrapNode {
558 address,
559 last_seen: std::time::Instant::now(),
560 can_coordinate: true,
561 rtt: None,
562 coordination_count: 0,
563 })
564 .collect(),
565 ));
566
567 let discovery_config = DiscoveryConfig {
569 total_timeout: config.coordination_timeout,
570 max_candidates: config.max_candidates,
571 enable_symmetric_prediction: config.enable_symmetric_nat,
572 ..DiscoveryConfig::default()
573 };
574
575 let nat_traversal_role = match config.role {
576 EndpointRole::Client => NatTraversalRole::Client,
577 EndpointRole::Server { can_coordinate } => NatTraversalRole::Server { can_relay: can_coordinate },
578 EndpointRole::Bootstrap => NatTraversalRole::Bootstrap,
579 };
580
581 let discovery_manager = Arc::new(std::sync::Mutex::new(
582 CandidateDiscoveryManager::new(discovery_config, nat_traversal_role)
583 ));
584
585 let internal_endpoint = Self::create_fallback_endpoint(&config, nat_traversal_role)?;
587
588 Ok(Self {
589 internal_endpoint,
590 config,
591 bootstrap_nodes,
592 active_sessions: Arc::new(std::sync::RwLock::new(HashMap::new())),
593 discovery_manager,
594 event_callback,
595 local_peer_id: Self::generate_local_peer_id(),
596 })
597 }
598
599 pub fn initiate_nat_traversal(
601 &self,
602 peer_id: PeerId,
603 coordinator: SocketAddr,
604 ) -> Result<(), NatTraversalError> {
605 info!("Starting NAT traversal to peer {:?} via coordinator {}", peer_id, coordinator);
606
607 let session = NatTraversalSession {
609 peer_id,
610 coordinator,
611 attempt: 1,
612 started_at: std::time::Instant::now(),
613 phase: TraversalPhase::Discovery,
614 candidates: Vec::new(),
615 };
616
617 {
619 let mut sessions = self.active_sessions.write()
620 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
621 sessions.insert(peer_id, session);
622 }
623
624 let bootstrap_nodes_vec = {
626 let bootstrap_nodes = self.bootstrap_nodes.read()
627 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
628 bootstrap_nodes.clone()
629 };
630
631 {
632 let mut discovery = self.discovery_manager.lock()
633 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
634
635 discovery.start_discovery(peer_id, bootstrap_nodes_vec)
636 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
637 }
638
639 if let Some(ref callback) = self.event_callback {
641 callback(NatTraversalEvent::CoordinationRequested {
642 peer_id,
643 coordinator,
644 });
645 }
646
647 Ok(())
649 }
650
651 pub fn get_statistics(&self) -> Result<NatTraversalStatistics, NatTraversalError> {
653 let sessions = self.active_sessions.read()
654 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
655 let bootstrap_nodes = self.bootstrap_nodes.read()
656 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
657
658 Ok(NatTraversalStatistics {
659 active_sessions: sessions.len(),
660 total_bootstrap_nodes: bootstrap_nodes.len(),
661 successful_coordinations: bootstrap_nodes.iter().map(|b| b.coordination_count).sum(),
662 average_coordination_time: Duration::from_millis(500), })
664 }
665
666 pub fn add_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
668 let mut bootstrap_nodes = self.bootstrap_nodes.write()
669 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
670
671 if !bootstrap_nodes.iter().any(|b| b.address == address) {
673 bootstrap_nodes.push(BootstrapNode {
674 address,
675 last_seen: std::time::Instant::now(),
676 can_coordinate: true,
677 rtt: None,
678 coordination_count: 0,
679 });
680 info!("Added bootstrap node: {}", address);
681 }
682 Ok(())
683 }
684
685 pub fn remove_bootstrap_node(&self, address: SocketAddr) -> Result<(), NatTraversalError> {
687 let mut bootstrap_nodes = self.bootstrap_nodes.write()
688 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
689 bootstrap_nodes.retain(|b| b.address != address);
690 info!("Removed bootstrap node: {}", address);
691 Ok(())
692 }
693
694 #[cfg(feature = "production-ready")]
698 async fn create_quinn_endpoint(
699 config: &NatTraversalConfig,
700 _nat_role: NatTraversalRole,
701 ) -> Result<(QuinnEndpoint, mpsc::UnboundedSender<NatTraversalEvent>), NatTraversalError> {
702 use std::sync::Arc;
703
704 let server_config = match config.role {
706 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
707 let cert_config = CertificateConfig {
709 common_name: format!("ant-quic-{}", config.role.name()),
710 subject_alt_names: vec![
711 "localhost".to_string(),
712 "ant-quic-node".to_string(),
713 ],
714 self_signed: true, ..CertificateConfig::default()
716 };
717
718 let cert_manager = CertificateManager::new(cert_config)
719 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate manager creation failed: {}", e)))?;
720
721 let cert_bundle = cert_manager.generate_certificate()
722 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e)))?;
723
724 let rustls_config = cert_manager.create_server_config(&cert_bundle)
725 .map_err(|e| NatTraversalError::ConfigError(format!("Server config creation failed: {}", e)))?;
726
727 let server_crypto = QuicServerConfig::try_from(rustls_config.as_ref().clone())
728 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
729
730 let mut server_config = ServerConfig::with_crypto(Arc::new(server_crypto));
731
732 let mut transport_config = TransportConfig::default();
734 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
735 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
736
737 let nat_config = crate::transport_parameters::NatTraversalConfig {
739 role: match config.role {
740 EndpointRole::Bootstrap => crate::transport_parameters::NatTraversalRole::Bootstrap,
741 EndpointRole::Server { can_coordinate } => crate::transport_parameters::NatTraversalRole::Server { can_relay: can_coordinate },
742 EndpointRole::Client => crate::transport_parameters::NatTraversalRole::Client,
743 },
744 max_candidates: VarInt::from_u32(config.max_candidates as u32),
745 coordination_timeout: VarInt::from_u64(config.coordination_timeout.as_millis() as u64).unwrap(),
746 max_concurrent_attempts: VarInt::from_u32(config.max_concurrent_attempts as u32),
747 peer_id: None, };
749 transport_config.nat_traversal_config(Some(nat_config));
750
751 server_config.transport_config(Arc::new(transport_config));
752
753 Some(server_config)
754 }
755 _ => None,
756 };
757
758 let client_config = {
760 let cert_config = CertificateConfig {
761 common_name: format!("ant-quic-{}", config.role.name()),
762 subject_alt_names: vec![
763 "localhost".to_string(),
764 "ant-quic-node".to_string(),
765 ],
766 self_signed: true,
767 ..CertificateConfig::default()
768 };
769
770 let cert_manager = CertificateManager::new(cert_config)
771 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate manager creation failed: {}", e)))?;
772
773 let _cert_bundle = cert_manager.generate_certificate()
774 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e)))?;
775
776 let rustls_config = cert_manager.create_client_config()
777 .map_err(|e| NatTraversalError::ConfigError(format!("Client config creation failed: {}", e)))?;
778
779 let client_crypto = QuicClientConfig::try_from(rustls_config.as_ref().clone())
780 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
781
782 let mut client_config = ClientConfig::new(Arc::new(client_crypto));
783
784 let mut transport_config = TransportConfig::default();
786 transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
787 transport_config.max_idle_timeout(Some(crate::VarInt::from_u32(30000).into()));
788
789 let nat_config = crate::transport_parameters::NatTraversalConfig {
791 role: match config.role {
792 EndpointRole::Bootstrap => crate::transport_parameters::NatTraversalRole::Bootstrap,
793 EndpointRole::Server { can_coordinate } => crate::transport_parameters::NatTraversalRole::Server { can_relay: can_coordinate },
794 EndpointRole::Client => crate::transport_parameters::NatTraversalRole::Client,
795 },
796 max_candidates: VarInt::from_u32(config.max_candidates as u32),
797 coordination_timeout: VarInt::from_u64(config.coordination_timeout.as_millis() as u64).unwrap(),
798 max_concurrent_attempts: VarInt::from_u32(config.max_concurrent_attempts as u32),
799 peer_id: None, };
801 transport_config.nat_traversal_config(Some(nat_config));
802
803 client_config.transport_config(Arc::new(transport_config));
804
805 client_config
806 };
807
808 let socket = UdpSocket::bind("0.0.0.0:0").await
810 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to bind UDP socket: {}", e)))?;
811
812 let std_socket = socket.into_std()
814 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to convert socket: {}", e)))?;
815
816 let mut endpoint = QuinnEndpoint::new(
818 EndpointConfig::default(),
819 server_config,
820 std_socket,
821 Arc::new(TokioRuntime),
822 ).map_err(|e| NatTraversalError::ConfigError(format!("Failed to create Quinn endpoint: {}", e)))?;
823
824 endpoint.set_default_client_config(client_config);
826
827 let (event_tx, _event_rx) = mpsc::unbounded_channel();
829
830 Ok((endpoint, event_tx))
831 }
832
833 #[cfg(not(feature = "production-ready"))]
835 fn create_fallback_endpoint(
836 config: &NatTraversalConfig,
837 nat_role: NatTraversalRole,
838 ) -> Result<Endpoint, NatTraversalError> {
839 use crate::{
840 EndpointConfig, TransportConfig,
841 transport_parameters::NatTraversalConfig as TPNatConfig,
842 transport_parameters::NatTraversalRole as TPRole,
843 };
844
845 #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
846 use crate::crypto::rustls::QuicServerConfig;
847
848 let mut transport_config = TransportConfig::default();
850
851 let tp_role = match nat_role {
853 NatTraversalRole::Client => TPRole::Client,
854 NatTraversalRole::Server { can_relay } => TPRole::Server { can_relay },
855 NatTraversalRole::Bootstrap => TPRole::Bootstrap,
856 };
857
858 transport_config.nat_traversal_config = Some(TPNatConfig {
860 role: tp_role,
861 max_candidates: VarInt::from_u32(config.max_candidates as u32),
862 coordination_timeout: VarInt::from_u32(config.coordination_timeout.as_millis() as u32),
863 max_concurrent_attempts: VarInt::from_u32(config.max_concurrent_attempts as u32),
864 peer_id: None, });
866
867 let endpoint_config = Arc::new(EndpointConfig::default());
869
870 let server_config = match config.role {
872 EndpointRole::Bootstrap | EndpointRole::Server { .. } => {
873 #[cfg(feature = "production-ready")]
874 {
875 let cert_config = CertificateConfig {
877 common_name: format!("ant-quic-{}", config.role.name()),
878 subject_alt_names: vec![
879 "localhost".to_string(),
880 "ant-quic-node".to_string(),
881 ],
882 self_signed: true, ..CertificateConfig::default()
884 };
885
886 let cert_manager = CertificateManager::new(cert_config)
887 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate manager creation failed: {}", e)))?;
888
889 let _cert_bundle = cert_manager.generate_certificate()
890 .map_err(|e| NatTraversalError::ConfigError(format!("Certificate generation failed: {}", e)))?;
891
892 #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
893 {
894 let rustls_config = cert_manager.create_server_config(&cert_bundle)
895 .map_err(|e| NatTraversalError::ConfigError(format!("Server config creation failed: {}", e)))?;
896
897 let server_config = QuicServerConfig::try_from(rustls_config.as_ref().clone())
898 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
899
900 Some(Arc::new(crate::ServerConfig::with_crypto(Arc::new(server_config))))
901 }
902 #[cfg(not(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring")))]
903 {
904 None
905 }
906 }
907 #[cfg(not(feature = "production-ready"))]
908 {
909 let cert = rustls::pki_types::CertificateDer::from(vec![0; 32]);
911 let key = rustls::pki_types::PrivateKeyDer::try_from(vec![0; 32]).ok();
912
913 if let Some(key) = key {
914 #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))]
915 {
916 let server_config = QuicServerConfig::try_from(
917 rustls::ServerConfig::builder()
918 .with_no_client_auth()
919 .with_single_cert(vec![cert], key)
920 .map_err(|e| NatTraversalError::ConfigError(e.to_string()))?
921 ).map_err(|e| NatTraversalError::ConfigError(e.to_string()))?;
922
923 Some(Arc::new(crate::ServerConfig::with_crypto(Arc::new(server_config))))
924 }
925 #[cfg(not(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring")))]
926 {
927 None
928 }
929 } else {
930 None
931 }
932 }
933 }
934 _ => None,
935 };
936
937 let endpoint = Endpoint::new(
939 endpoint_config,
940 server_config,
941 true, None, );
944
945 Ok(endpoint)
949 }
950
951 #[cfg(feature = "production-ready")]
953 pub async fn start_listening(&self, bind_addr: SocketAddr) -> Result<(), NatTraversalError> {
954 let endpoint = self.quinn_endpoint.as_ref()
955 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
956
957 let _socket = UdpSocket::bind(bind_addr).await
959 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to bind to {}: {}", bind_addr, e)))?;
960
961 info!("Started listening on {}", bind_addr);
962
963 let endpoint_clone = endpoint.clone();
965 let shutdown_clone = self.shutdown.clone();
966 let event_tx = self.event_tx.as_ref().unwrap().clone();
967
968 tokio::spawn(async move {
969 Self::accept_connections(endpoint_clone, shutdown_clone, event_tx).await;
970 });
971
972 Ok(())
973 }
974
975 #[cfg(feature = "production-ready")]
977 async fn accept_connections(
978 endpoint: QuinnEndpoint,
979 shutdown: Arc<AtomicBool>,
980 event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
981 ) {
982 while !shutdown.load(Ordering::Relaxed) {
983 match endpoint.accept().await {
984 Some(connecting) => {
985 let event_tx = event_tx.clone();
986 tokio::spawn(async move {
987 match connecting.await {
988 Ok(connection) => {
989 info!("Accepted connection from {}", connection.remote_address());
990
991 let peer_id = Self::generate_peer_id_from_address(connection.remote_address());
993
994 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
995 peer_id,
996 remote_address: connection.remote_address(),
997 });
998
999 Self::handle_connection(connection, event_tx).await;
1001 }
1002 Err(e) => {
1003 debug!("Connection failed: {}", e);
1004 }
1005 }
1006 });
1007 }
1008 None => {
1009 break;
1011 }
1012 }
1013 }
1014 }
1015
1016 #[cfg(feature = "production-ready")]
1018 async fn handle_connection(
1019 _connection: QuinnConnection,
1020 _event_tx: mpsc::UnboundedSender<NatTraversalEvent>,
1021 ) {
1022 }
1071
1072 #[cfg(feature = "production-ready")]
1074 async fn handle_bi_stream(
1075 _send: crate::quinn_high_level::SendStream,
1076 _recv: crate::quinn_high_level::RecvStream,
1077 ) {
1078 }
1107
1108 #[cfg(feature = "production-ready")]
1110 async fn handle_uni_stream(mut recv: crate::quinn_high_level::RecvStream) {
1111 let mut buffer = vec![0u8; 1024];
1112
1113 loop {
1114 match recv.read(&mut buffer).await {
1115 Ok(Some(size)) => {
1116 debug!("Received {} bytes on unidirectional stream", size);
1117 }
1119 Ok(None) => {
1120 debug!("Unidirectional stream closed by peer");
1121 break;
1122 }
1123 Err(e) => {
1124 debug!("Error reading from unidirectional stream: {}", e);
1125 break;
1126 }
1127 }
1128 }
1129 }
1130
1131 #[cfg(feature = "production-ready")]
1133 pub async fn connect_to_peer(
1134 &self,
1135 peer_id: PeerId,
1136 server_name: &str,
1137 remote_addr: SocketAddr,
1138 ) -> Result<QuinnConnection, NatTraversalError> {
1139 let endpoint = self.quinn_endpoint.as_ref()
1140 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1141
1142 info!("Connecting to peer {:?} at {}", peer_id, remote_addr);
1143
1144 let connecting = endpoint.connect(remote_addr, server_name)
1146 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {}", e)))?;
1147
1148 let connection = timeout(Duration::from_secs(10), connecting)
1149 .await
1150 .map_err(|_| NatTraversalError::Timeout)?
1151 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {}", e)))?;
1152
1153 info!("Successfully connected to peer {:?} at {}", peer_id, remote_addr);
1154
1155 if let Some(ref event_tx) = self.event_tx {
1157 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1158 peer_id,
1159 remote_address: remote_addr,
1160 });
1161 }
1162
1163 Ok(connection)
1164 }
1165
1166 #[cfg(feature = "production-ready")]
1168 pub async fn accept_connection(&self) -> Result<(PeerId, QuinnConnection), NatTraversalError> {
1169 let endpoint = self.quinn_endpoint.as_ref()
1170 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1171
1172 let incoming = endpoint.accept().await
1174 .ok_or_else(|| NatTraversalError::NetworkError("Endpoint closed".to_string()))?;
1175
1176 let remote_addr = incoming.remote_address();
1177 info!("Accepting connection from {}", remote_addr);
1178
1179 let connection = incoming.await
1181 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to accept connection: {}", e)))?;
1182
1183 let peer_id = self.extract_peer_id_from_connection(&connection).await
1185 .unwrap_or_else(|| Self::generate_peer_id_from_address(remote_addr));
1186
1187 {
1189 let mut connections = self.connections.write()
1190 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1191 connections.insert(peer_id, connection.clone());
1192 }
1193
1194 info!("Connection accepted from peer {:?} at {}", peer_id, remote_addr);
1195
1196 if let Some(ref event_tx) = self.event_tx {
1198 let _ = event_tx.send(NatTraversalEvent::ConnectionEstablished {
1199 peer_id,
1200 remote_address: remote_addr,
1201 });
1202 }
1203
1204 Ok((peer_id, connection))
1205 }
1206
1207 pub fn local_peer_id(&self) -> PeerId {
1209 self.local_peer_id
1210 }
1211
1212 #[cfg(feature = "production-ready")]
1214 pub fn get_connection(&self, peer_id: &PeerId) -> Result<Option<QuinnConnection>, NatTraversalError> {
1215 let connections = self.connections.read()
1216 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1217 Ok(connections.get(peer_id).cloned())
1218 }
1219
1220 #[cfg(not(feature = "production-ready"))]
1222 pub async fn accept_connection(&self) -> Result<(PeerId, crate::quinn_high_level::Connection), NatTraversalError> {
1223 Err(NatTraversalError::ConfigError("accept_connection requires production-ready feature".to_string()))
1224 }
1225
1226 #[cfg(not(feature = "production-ready"))]
1228 pub fn get_connection(&self, _peer_id: &PeerId) -> Result<Option<crate::quinn_high_level::Connection>, NatTraversalError> {
1229 Err(NatTraversalError::ConfigError("get_connection requires production-ready feature".to_string()))
1230 }
1231
1232 #[cfg(feature = "production-ready")]
1234 pub fn remove_connection(&self, peer_id: &PeerId) -> Result<Option<QuinnConnection>, NatTraversalError> {
1235 let mut connections = self.connections.write()
1236 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1237 Ok(connections.remove(peer_id))
1238 }
1239
1240 #[cfg(feature = "production-ready")]
1242 pub fn list_connections(&self) -> Result<Vec<(PeerId, SocketAddr)>, NatTraversalError> {
1243 let connections = self.connections.read()
1244 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1245 let mut result = Vec::new();
1246 for (peer_id, connection) in connections.iter() {
1247 result.push((*peer_id, connection.remote_address()));
1248 }
1249 Ok(result)
1250 }
1251
1252 #[cfg(feature = "production-ready")]
1254 pub async fn handle_connection_data(
1255 &self,
1256 peer_id: PeerId,
1257 connection: &QuinnConnection,
1258 ) -> Result<(), NatTraversalError> {
1259 info!("Handling connection data from peer {:?}", peer_id);
1260
1261 let connection_clone = connection.clone();
1263 let peer_id_clone = peer_id;
1264 tokio::spawn(async move {
1265 loop {
1266 match connection_clone.accept_bi().await {
1267 Ok((send, recv)) => {
1268 debug!("Accepted bidirectional stream from peer {:?}", peer_id_clone);
1269 tokio::spawn(Self::handle_bi_stream(send, recv));
1270 }
1271 Err(ConnectionError::ApplicationClosed(_)) => {
1272 debug!("Connection closed by peer {:?}", peer_id_clone);
1273 break;
1274 }
1275 Err(e) => {
1276 debug!("Error accepting bidirectional stream from peer {:?}: {}", peer_id_clone, e);
1277 break;
1278 }
1279 }
1280 }
1281 });
1282
1283 let connection_clone = connection.clone();
1285 let peer_id_clone = peer_id;
1286 tokio::spawn(async move {
1287 loop {
1288 match connection_clone.accept_uni().await {
1289 Ok(recv) => {
1290 debug!("Accepted unidirectional stream from peer {:?}", peer_id_clone);
1291 tokio::spawn(Self::handle_uni_stream(recv));
1292 }
1293 Err(ConnectionError::ApplicationClosed(_)) => {
1294 debug!("Connection closed by peer {:?}", peer_id_clone);
1295 break;
1296 }
1297 Err(e) => {
1298 debug!("Error accepting unidirectional stream from peer {:?}: {}", peer_id_clone, e);
1299 break;
1300 }
1301 }
1302 }
1303 });
1304
1305 Ok(())
1306 }
1307
1308 fn generate_local_peer_id() -> PeerId {
1310 use std::time::SystemTime;
1311 use std::collections::hash_map::DefaultHasher;
1312 use std::hash::{Hash, Hasher};
1313
1314 let mut hasher = DefaultHasher::new();
1315 SystemTime::now().hash(&mut hasher);
1316 std::process::id().hash(&mut hasher);
1317
1318 let hash = hasher.finish();
1319 let mut peer_id = [0u8; 32];
1320 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
1321
1322 for i in 8..32 {
1324 peer_id[i] = rand::random();
1325 }
1326
1327 PeerId(peer_id)
1328 }
1329
1330 fn generate_peer_id_from_address(addr: SocketAddr) -> PeerId {
1336 use std::collections::hash_map::DefaultHasher;
1337 use std::hash::{Hash, Hasher};
1338
1339 let mut hasher = DefaultHasher::new();
1340 addr.hash(&mut hasher);
1341
1342 let hash = hasher.finish();
1343 let mut peer_id = [0u8; 32];
1344 peer_id[0..8].copy_from_slice(&hash.to_be_bytes());
1345
1346 for i in 8..32 {
1349 peer_id[i] = rand::random();
1350 }
1351
1352 warn!("Generated temporary peer ID from address {}. This ID is not persistent!", addr);
1353 PeerId(peer_id)
1354 }
1355
1356 #[cfg(feature = "production-ready")]
1358 async fn extract_peer_id_from_connection(&self, connection: &QuinnConnection) -> Option<PeerId> {
1359 if let Some(identity) = connection.peer_identity() {
1361 if let Some(public_key_bytes) = identity.downcast_ref::<[u8; 32]>() {
1363 match crate::derive_peer_id_from_key_bytes(public_key_bytes) {
1365 Ok(peer_id) => {
1366 debug!("Derived peer ID from Ed25519 public key");
1367 return Some(peer_id);
1368 }
1369 Err(e) => {
1370 warn!("Failed to derive peer ID from public key: {}", e);
1371 }
1372 }
1373 }
1374 }
1376
1377 None
1378 }
1379
1380 #[cfg(feature = "production-ready")]
1382 pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
1383 self.shutdown.store(true, Ordering::Relaxed);
1385
1386 {
1388 let mut connections = self.connections.write()
1389 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
1390 for (peer_id, connection) in connections.drain() {
1391 info!("Closing connection to peer {:?}", peer_id);
1392 connection.close(crate::VarInt::from_u32(0), b"Shutdown");
1393 }
1394 }
1395
1396 if let Some(ref endpoint) = self.quinn_endpoint {
1398 endpoint.wait_idle().await;
1399 }
1400
1401 info!("NAT traversal endpoint shutdown completed");
1402 Ok(())
1403 }
1404
1405 #[cfg(feature = "production-ready")]
1407 pub async fn discover_candidates(&self, peer_id: PeerId) -> Result<Vec<CandidateAddress>, NatTraversalError> {
1408 debug!("Discovering address candidates for peer {:?}", peer_id);
1409
1410 let mut candidates = Vec::new();
1411
1412 let bootstrap_nodes = {
1414 let nodes = self.bootstrap_nodes.read()
1415 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1416 nodes.clone()
1417 };
1418
1419 {
1421 let mut discovery = self.discovery_manager.lock()
1422 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1423
1424 discovery.start_discovery(peer_id, bootstrap_nodes)
1425 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1426 }
1427
1428 let timeout_duration = self.config.coordination_timeout;
1430 let start_time = std::time::Instant::now();
1431
1432 while start_time.elapsed() < timeout_duration {
1433 let discovery_events = {
1434 let mut discovery = self.discovery_manager.lock()
1435 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1436 discovery.poll(std::time::Instant::now())
1437 };
1438
1439 for event in discovery_events {
1440 match event {
1441 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1442 candidates.push(candidate.clone());
1443
1444 self.send_candidate_advertisement(peer_id, &candidate).await
1446 .unwrap_or_else(|e| debug!("Failed to send candidate advertisement: {}", e));
1447 }
1448 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
1449 candidates.push(candidate.clone());
1450
1451 self.send_candidate_advertisement(peer_id, &candidate).await
1453 .unwrap_or_else(|e| debug!("Failed to send candidate advertisement: {}", e));
1454 }
1455 DiscoveryEvent::PredictedCandidateGenerated { candidate, .. } => {
1456 candidates.push(candidate.clone());
1457
1458 self.send_candidate_advertisement(peer_id, &candidate).await
1460 .unwrap_or_else(|e| debug!("Failed to send candidate advertisement: {}", e));
1461 }
1462 DiscoveryEvent::DiscoveryCompleted { .. } => {
1463 return Ok(candidates);
1465 }
1466 DiscoveryEvent::DiscoveryFailed { error, partial_results } => {
1467 candidates.extend(partial_results);
1469 if candidates.is_empty() {
1470 return Err(NatTraversalError::CandidateDiscoveryFailed(error.to_string()));
1471 }
1472 return Ok(candidates);
1473 }
1474 _ => {}
1475 }
1476 }
1477
1478 sleep(Duration::from_millis(10)).await;
1480 }
1481
1482 if candidates.is_empty() {
1483 Err(NatTraversalError::NoCandidatesFound)
1484 } else {
1485 Ok(candidates)
1486 }
1487 }
1488
1489 #[cfg(not(feature = "production-ready"))]
1491 fn discover_candidates(&self, peer_id: PeerId) -> Result<Vec<CandidateAddress>, NatTraversalError> {
1492 debug!("Discovering address candidates for peer {:?}", peer_id);
1493
1494 let mut candidates = Vec::new();
1495
1496 let bootstrap_nodes = {
1498 let nodes = self.bootstrap_nodes.read()
1499 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
1500 nodes.clone()
1501 };
1502
1503 {
1505 let mut discovery = self.discovery_manager.lock()
1506 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1507
1508 discovery.start_discovery(peer_id, bootstrap_nodes)
1509 .map_err(|e| NatTraversalError::CandidateDiscoveryFailed(e.to_string()))?;
1510 }
1511
1512 let timeout_duration = self.config.coordination_timeout;
1514 let start_time = std::time::Instant::now();
1515
1516 while start_time.elapsed() < timeout_duration {
1517 let discovery_events = {
1518 let mut discovery = self.discovery_manager.lock()
1519 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1520 discovery.poll(std::time::Instant::now())
1521 };
1522
1523 for event in discovery_events {
1524 match event {
1525 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
1526 candidates.push(candidate.clone());
1527 debug!("Discovered local candidate: {}", candidate.address);
1528 }
1529 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. } => {
1530 candidates.push(candidate.clone());
1531 debug!("Discovered server-reflexive candidate: {}", candidate.address);
1532 }
1533 DiscoveryEvent::PredictedCandidateGenerated { candidate, .. } => {
1534 candidates.push(candidate.clone());
1535 debug!("Generated predicted candidate: {}", candidate.address);
1536 }
1537 DiscoveryEvent::DiscoveryCompleted { .. } => {
1538 info!("Candidate discovery completed for peer {:?}, found {} candidates",
1539 peer_id, candidates.len());
1540 return Ok(candidates);
1541 }
1542 DiscoveryEvent::DiscoveryFailed { error, partial_results } => {
1543 warn!("Candidate discovery failed for peer {:?}: {}", peer_id, error);
1544 candidates.extend(partial_results);
1545 if candidates.is_empty() {
1546 return Err(NatTraversalError::CandidateDiscoveryFailed(error.to_string()));
1547 }
1548 return Ok(candidates);
1549 }
1550 _ => {}
1551 }
1552 }
1553
1554 std::thread::sleep(Duration::from_millis(10));
1556 }
1557
1558 if candidates.is_empty() {
1559 Err(NatTraversalError::NoCandidatesFound)
1560 } else {
1561 info!("Candidate discovery timed out for peer {:?}, returning {} partial candidates",
1562 peer_id, candidates.len());
1563 Ok(candidates)
1564 }
1565 }
1566
1567 #[cfg(feature = "production-ready")]
1568 async fn coordinate_with_bootstrap_async(
1569 &self,
1570 peer_id: PeerId,
1571 coordinator: SocketAddr,
1572 ) -> Result<(), NatTraversalError> {
1573 debug!("Coordinating with bootstrap {} for peer {:?}", coordinator, peer_id);
1574
1575 let endpoint = self.quinn_endpoint.as_ref()
1576 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1577
1578 let server_name = "bootstrap-coordinator";
1580 let connecting = endpoint.connect(coordinator, server_name)
1581 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to connect to coordinator: {}", e)))?;
1582
1583 let connection = tokio::time::timeout(Duration::from_secs(5), connecting)
1584 .await
1585 .map_err(|_| NatTraversalError::Timeout)?
1586 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Coordinator connection failed: {}", e)))?;
1587
1588 let mut send_stream = connection.open_uni()
1590 .await
1591 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to open stream: {}", e)))?;
1592
1593 let coordination_msg = self.create_punch_me_now_frame(peer_id)?;
1595
1596 send_stream.write_all(&coordination_msg)
1597 .await
1598 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to send coordination: {}", e)))?;
1599
1600 send_stream.finish();
1601 info!("Coordination request sent to bootstrap {} for peer {:?}", coordinator, peer_id);
1604
1605 Ok(())
1609 }
1610
1611 #[cfg(not(feature = "production-ready"))]
1613 fn coordinate_with_bootstrap(
1614 &self,
1615 peer_id: PeerId,
1616 coordinator: SocketAddr,
1617 ) -> Result<(), NatTraversalError> {
1618 debug!("Coordinating with bootstrap {} for peer {:?}", coordinator, peer_id);
1619
1620 #[cfg(feature = "production-ready")]
1621 {
1622 let endpoint = self.quinn_endpoint.as_ref()
1623 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1624
1625 let server_name = format!("bootstrap-{}", coordinator.ip());
1627 let connection = endpoint.connect(coordinator, &server_name)
1628 .map_err(|e| NatTraversalError::CoordinationFailed(format!("Failed to connect to bootstrap: {}", e)))?;
1629
1630 let connection = match timeout(Duration::from_secs(5), connection).await {
1632 Ok(Ok(conn)) => conn,
1633 Ok(Err(e)) => return Err(NatTraversalError::CoordinationFailed(format!("Connection failed: {}", e))),
1634 Err(_) => return Err(NatTraversalError::CoordinationFailed("Connection timeout".to_string())),
1635 };
1636
1637 let (mut send_stream, _recv_stream) = connection.open_bi().await
1639 .map_err(|e| NatTraversalError::CoordinationFailed(format!("Failed to open stream: {}", e)))?;
1640
1641 let coordination_request = format!("COORDINATE_NAT_TRAVERSAL peer_id={:?}", peer_id);
1643 send_stream.write_all(coordination_request.as_bytes()).await
1644 .map_err(|e| NatTraversalError::CoordinationFailed(format!("Failed to send request: {}", e)))?;
1645
1646 send_stream.finish();
1647 info!("Coordination request sent to bootstrap {} for peer {:?}", coordinator, peer_id);
1650 }
1651
1652 #[cfg(not(feature = "production-ready"))]
1653 {
1654 info!("Coordinating with bootstrap {} for peer {:?} (fallback mode)", coordinator, peer_id);
1656
1657 debug!("Validating bootstrap connectivity to {}", coordinator);
1659
1660 if coordinator.port() == 0 {
1662 return Err(NatTraversalError::CoordinationFailed("Invalid bootstrap address".to_string()));
1663 }
1664
1665 info!("Bootstrap coordination completed for peer {:?}", peer_id);
1666 }
1667
1668 Ok(())
1669 }
1670
1671 fn create_punch_me_now_frame(&self, peer_id: PeerId) -> Result<Vec<u8>, NatTraversalError> {
1673 let mut frame = Vec::new();
1681
1682 frame.push(0x41);
1684
1685 frame.extend_from_slice(&peer_id.0);
1687
1688 let timestamp = std::time::SystemTime::now()
1690 .duration_since(std::time::UNIX_EPOCH)
1691 .unwrap_or_default()
1692 .as_millis() as u64;
1693 frame.extend_from_slice(×tamp.to_be_bytes());
1694
1695 let mut token = [0u8; 16];
1697 for byte in &mut token {
1698 *byte = rand::random();
1699 }
1700 frame.extend_from_slice(&token);
1701
1702 Ok(frame)
1703 }
1704
1705 fn attempt_hole_punching(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1706 debug!("Attempting hole punching for peer {:?}", peer_id);
1707
1708 let candidate_pairs = self.get_candidate_pairs_for_peer(peer_id)?;
1710
1711 if candidate_pairs.is_empty() {
1712 return Err(NatTraversalError::NoCandidatesFound);
1713 }
1714
1715 info!("Generated {} candidate pairs for hole punching with peer {:?}",
1716 candidate_pairs.len(), peer_id);
1717
1718 #[cfg(feature = "production-ready")]
1720 {
1721 self.attempt_quinn_hole_punching(peer_id, candidate_pairs)
1722 }
1723
1724 #[cfg(not(feature = "production-ready"))]
1725 {
1726 self.simulate_hole_punching(peer_id, candidate_pairs)
1727 }
1728 }
1729
1730 fn get_candidate_pairs_for_peer(&self, peer_id: PeerId) -> Result<Vec<CandidatePair>, NatTraversalError> {
1732 let discovery_candidates = {
1734 let discovery = self.discovery_manager.lock()
1735 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1736
1737 discovery.get_candidates_for_peer(peer_id)
1738 };
1739
1740 if discovery_candidates.is_empty() {
1741 return Err(NatTraversalError::NoCandidatesFound);
1742 }
1743
1744 let mut candidate_pairs = Vec::new();
1746 let local_candidates = discovery_candidates.iter()
1747 .filter(|c| matches!(c.source, CandidateSource::Local))
1748 .collect::<Vec<_>>();
1749 let remote_candidates = discovery_candidates.iter()
1750 .filter(|c| !matches!(c.source, CandidateSource::Local))
1751 .collect::<Vec<_>>();
1752
1753 for local in &local_candidates {
1755 for remote in &remote_candidates {
1756 let pair_priority = self.calculate_candidate_pair_priority(local, remote);
1757 candidate_pairs.push(CandidatePair {
1758 local_candidate: (*local).clone(),
1759 remote_candidate: (*remote).clone(),
1760 priority: pair_priority,
1761 state: CandidatePairState::Waiting,
1762 });
1763 }
1764 }
1765
1766 candidate_pairs.sort_by(|a, b| b.priority.cmp(&a.priority));
1768
1769 candidate_pairs.truncate(8);
1771
1772 Ok(candidate_pairs)
1773 }
1774
1775 fn calculate_candidate_pair_priority(&self, local: &CandidateAddress, remote: &CandidateAddress) -> u64 {
1777 let local_type_preference = match local.source {
1781 CandidateSource::Local => 126,
1782 CandidateSource::Observed { .. } => 100,
1783 CandidateSource::Predicted => 75,
1784 CandidateSource::Peer => 50,
1785 };
1786
1787 let remote_type_preference = match remote.source {
1788 CandidateSource::Local => 126,
1789 CandidateSource::Observed { .. } => 100,
1790 CandidateSource::Predicted => 75,
1791 CandidateSource::Peer => 50,
1792 };
1793
1794 let local_priority = (local_type_preference as u64) << 8 | local.priority as u64;
1796 let remote_priority = (remote_type_preference as u64) << 8 | remote.priority as u64;
1797
1798 let min_priority = local_priority.min(remote_priority);
1799 let max_priority = local_priority.max(remote_priority);
1800
1801 (min_priority << 32) | (max_priority << 1) | if local_priority > remote_priority { 1 } else { 0 }
1802 }
1803
1804 #[cfg(feature = "production-ready")]
1806 fn attempt_quinn_hole_punching(&self, peer_id: PeerId, candidate_pairs: Vec<CandidatePair>) -> Result<(), NatTraversalError> {
1807
1808 let _endpoint = self.quinn_endpoint.as_ref()
1809 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1810
1811 for pair in candidate_pairs {
1812 debug!("Attempting hole punch with candidate pair: {} -> {}",
1813 pair.local_candidate.address, pair.remote_candidate.address);
1814
1815 let mut challenge_data = [0u8; 8];
1817 for byte in &mut challenge_data {
1818 *byte = rand::random();
1819 }
1820
1821 let local_socket = std::net::UdpSocket::bind(pair.local_candidate.address)
1823 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to bind to local candidate: {}", e)))?;
1824
1825 let path_challenge_packet = self.create_path_challenge_packet(challenge_data)?;
1827
1828 match local_socket.send_to(&path_challenge_packet, pair.remote_candidate.address) {
1830 Ok(bytes_sent) => {
1831 debug!("Sent {} bytes for hole punch from {} to {}",
1832 bytes_sent, pair.local_candidate.address, pair.remote_candidate.address);
1833
1834 local_socket.set_read_timeout(Some(Duration::from_millis(100)))
1836 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to set timeout: {}", e)))?;
1837
1838 let mut response_buffer = [0u8; 1024];
1840 match local_socket.recv_from(&mut response_buffer) {
1841 Ok((_bytes_received, response_addr)) => {
1842 if response_addr == pair.remote_candidate.address {
1843 info!("Hole punch succeeded for peer {:?}: {} <-> {}",
1844 peer_id, pair.local_candidate.address, pair.remote_candidate.address);
1845
1846 self.store_successful_candidate_pair(peer_id, pair)?;
1848 return Ok(());
1849 } else {
1850 debug!("Received response from unexpected address: {}", response_addr);
1851 }
1852 }
1853 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut => {
1854 debug!("No response received for hole punch attempt");
1855 }
1856 Err(e) => {
1857 debug!("Error receiving hole punch response: {}", e);
1858 }
1859 }
1860 }
1861 Err(e) => {
1862 debug!("Failed to send hole punch packet: {}", e);
1863 }
1864 }
1865 }
1866
1867 Err(NatTraversalError::HolePunchingFailed)
1869 }
1870
1871 fn create_path_challenge_packet(&self, challenge_data: [u8; 8]) -> Result<Vec<u8>, NatTraversalError> {
1873 let mut packet = Vec::new();
1876
1877 packet.push(0x40); packet.extend_from_slice(&[0, 0, 0, 1]); packet.push(0x1a); packet.extend_from_slice(&challenge_data); Ok(packet)
1886 }
1887
1888 fn store_successful_candidate_pair(&self, peer_id: PeerId, pair: CandidatePair) -> Result<(), NatTraversalError> {
1890 debug!("Storing successful candidate pair for peer {:?}: {} <-> {}",
1891 peer_id, pair.local_candidate.address, pair.remote_candidate.address);
1892
1893 if let Some(ref callback) = self.event_callback {
1898 callback(NatTraversalEvent::PathValidated {
1899 peer_id,
1900 address: pair.remote_candidate.address,
1901 rtt: Duration::from_millis(50), });
1903
1904 callback(NatTraversalEvent::TraversalSucceeded {
1905 peer_id,
1906 final_address: pair.remote_candidate.address,
1907 total_time: Duration::from_secs(1), });
1909 }
1910
1911 Ok(())
1912 }
1913
1914
1915
1916 #[cfg(not(feature = "production-ready"))]
1918 fn simulate_hole_punching(&self, peer_id: PeerId, candidate_pairs: Vec<CandidatePair>) -> Result<(), NatTraversalError> {
1919 debug!("Attempting hole punching for peer {:?} with {} candidate pairs",
1920 peer_id, candidate_pairs.len());
1921
1922 if let Some(best_pair) = candidate_pairs.first() {
1924 info!("Attempting hole punch for peer {:?} using {}",
1925 peer_id, best_pair.remote_candidate.address);
1926
1927 match self.attempt_connection_to_candidate(peer_id, &best_pair.remote_candidate) {
1930 Ok(_) => {
1931 info!("Hole punch succeeded for peer {:?} using {}",
1932 peer_id, best_pair.remote_candidate.address);
1933 return Ok(());
1934 }
1935 Err(e) => {
1936 debug!("Hole punch failed for peer {:?}: {}", peer_id, e);
1937 }
1938 }
1939 }
1940
1941 Err(NatTraversalError::HolePunchingFailed)
1942 }
1943
1944 fn attempt_connection_to_candidate(
1946 &self,
1947 peer_id: PeerId,
1948 candidate: &CandidateAddress,
1949 ) -> Result<(), NatTraversalError> {
1950 #[cfg(feature = "production-ready")]
1951 {
1952 let _endpoint = self.quinn_endpoint.as_ref()
1953 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
1954
1955 let _server_name = format!("peer-{:x}", peer_id.0[0] as u32);
1957
1958 debug!("Attempting Quinn connection to candidate {} for peer {:?}",
1960 candidate.address, peer_id);
1961
1962 info!("Connection attempt initiated to {} for peer {:?}",
1965 candidate.address, peer_id);
1966
1967 Ok(())
1968 }
1969 #[cfg(not(feature = "production-ready"))]
1970 {
1971 debug!("Checking connectivity to candidate {} for peer {:?}",
1973 candidate.address, peer_id);
1974
1975 if candidate.address.port() > 0 {
1978 info!("Connectivity check passed for candidate {} (peer {:?})",
1979 candidate.address, peer_id);
1980 Ok(())
1981 } else {
1982 Err(NatTraversalError::NetworkError("Invalid candidate address".to_string()))
1983 }
1984 }
1985 }
1986
1987 pub fn poll(&self, now: std::time::Instant) -> Result<Vec<NatTraversalEvent>, NatTraversalError> {
1989 let mut events = Vec::new();
1990
1991 {
1993 let mut discovery = self.discovery_manager.lock()
1994 .map_err(|_| NatTraversalError::ProtocolError("Discovery manager lock poisoned".to_string()))?;
1995
1996 let discovery_events = discovery.poll(now);
1997
1998 for discovery_event in discovery_events {
2000 if let Some(nat_event) = self.convert_discovery_event(discovery_event) {
2001 events.push(nat_event.clone());
2002
2003 if let Some(ref callback) = self.event_callback {
2005 callback(nat_event);
2006 }
2007 }
2008 }
2009 }
2010
2011 let mut sessions = self.active_sessions.write()
2013 .map_err(|_| NatTraversalError::ProtocolError("Lock poisoned".to_string()))?;
2014
2015 for (_peer_id, session) in sessions.iter_mut() {
2016 let _elapsed = now.duration_since(session.started_at);
2021 }
2022
2023 Ok(events)
2024 }
2025
2026 fn convert_discovery_event(&self, discovery_event: DiscoveryEvent) -> Option<NatTraversalEvent> {
2028 let current_peer_id = self.get_current_discovery_peer_id();
2030
2031 match discovery_event {
2032 DiscoveryEvent::LocalCandidateDiscovered { candidate } => {
2033 Some(NatTraversalEvent::CandidateDiscovered {
2034 peer_id: current_peer_id,
2035 candidate,
2036 })
2037 },
2038 DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, bootstrap_node: _ } => {
2039 Some(NatTraversalEvent::CandidateDiscovered {
2040 peer_id: current_peer_id,
2041 candidate,
2042 })
2043 },
2044 DiscoveryEvent::PredictedCandidateGenerated { candidate, confidence: _ } => {
2045 Some(NatTraversalEvent::CandidateDiscovered {
2046 peer_id: current_peer_id,
2047 candidate,
2048 })
2049 },
2050 DiscoveryEvent::DiscoveryCompleted { candidate_count: _, total_duration: _, success_rate: _ } => {
2051 None },
2054 DiscoveryEvent::DiscoveryFailed { error, partial_results } => {
2055 Some(NatTraversalEvent::TraversalFailed {
2056 peer_id: current_peer_id,
2057 error: NatTraversalError::CandidateDiscoveryFailed(error.to_string()),
2058 fallback_available: !partial_results.is_empty(),
2059 })
2060 },
2061 _ => None, }
2063 }
2064
2065 fn get_current_discovery_peer_id(&self) -> PeerId {
2067 if let Ok(sessions) = self.active_sessions.read() {
2069 if let Some((peer_id, _session)) = sessions.iter()
2070 .filter(|(_, s)| matches!(s.phase, TraversalPhase::Discovery))
2071 .next() {
2072 return *peer_id;
2073 }
2074
2075 if let Some((peer_id, _)) = sessions.iter().next() {
2077 return *peer_id;
2078 }
2079 }
2080
2081 self.local_peer_id
2083 }
2084
2085 #[cfg(feature = "production-ready")]
2087 pub(crate) async fn handle_endpoint_event(&self, event: crate::shared::EndpointEventInner) -> Result<(), NatTraversalError> {
2088 match event {
2089 crate::shared::EndpointEventInner::NatCandidateValidated { address, challenge } => {
2090 info!("NAT candidate validation succeeded for {} with challenge {:016x}", address, challenge);
2091
2092 let mut sessions = self.active_sessions.write()
2094 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
2095
2096 for (peer_id, session) in sessions.iter_mut() {
2098 if session.candidates.iter().any(|c| c.address == address) {
2099 session.phase = TraversalPhase::Connected;
2101
2102 if let Some(ref callback) = self.event_callback {
2104 callback(NatTraversalEvent::CandidateValidated {
2105 peer_id: *peer_id,
2106 candidate_address: address,
2107 });
2108 }
2109
2110 return self.establish_connection_to_validated_candidate(*peer_id, address).await;
2112 }
2113 }
2114
2115 debug!("Validated candidate {} not found in active sessions", address);
2116 Ok(())
2117 }
2118
2119 crate::shared::EndpointEventInner::RelayPunchMeNow(target_peer_id, punch_frame) => {
2120 info!("Relaying PUNCH_ME_NOW to peer {:?}", target_peer_id);
2121
2122 let target_peer = PeerId(target_peer_id);
2124
2125 let connections = self.connections.read()
2127 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2128
2129 if let Some(connection) = connections.get(&target_peer) {
2130 let mut send_stream = connection.open_uni().await
2132 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to open stream: {}", e)))?;
2133
2134 let mut frame_data = Vec::new();
2136 punch_frame.encode(&mut frame_data);
2137
2138 send_stream.write_all(&frame_data).await
2139 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to send frame: {}", e)))?;
2140
2141 send_stream.finish();
2142
2143 debug!("Successfully relayed PUNCH_ME_NOW frame to peer {:?}", target_peer);
2144 Ok(())
2145 } else {
2146 warn!("No connection found for target peer {:?}", target_peer);
2147 Err(NatTraversalError::PeerNotConnected)
2148 }
2149 }
2150
2151 crate::shared::EndpointEventInner::SendAddressFrame(add_address_frame) => {
2152 info!("Sending AddAddress frame for address {}", add_address_frame.address);
2153
2154 let connections = self.connections.read()
2156 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2157
2158 for (peer_id, connection) in connections.iter() {
2159 let mut send_stream = connection.open_uni().await
2161 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to open stream: {}", e)))?;
2162
2163 let mut frame_data = Vec::new();
2165 add_address_frame.encode(&mut frame_data);
2166
2167 send_stream.write_all(&frame_data).await
2168 .map_err(|e| NatTraversalError::NetworkError(format!("Failed to send frame: {}", e)))?;
2169
2170 send_stream.finish();
2171
2172 debug!("Sent AddAddress frame to peer {:?}", peer_id);
2173 }
2174
2175 Ok(())
2176 }
2177
2178 _ => {
2179 debug!("Ignoring non-NAT traversal endpoint event: {:?}", event);
2181 Ok(())
2182 }
2183 }
2184 }
2185
2186 #[cfg(feature = "production-ready")]
2188 async fn establish_connection_to_validated_candidate(
2189 &self,
2190 peer_id: PeerId,
2191 candidate_address: SocketAddr,
2192 ) -> Result<(), NatTraversalError> {
2193 info!("Establishing connection to validated candidate {} for peer {:?}", candidate_address, peer_id);
2194
2195 let endpoint = self.quinn_endpoint.as_ref()
2196 .ok_or_else(|| NatTraversalError::ConfigError("Quinn endpoint not initialized".to_string()))?;
2197
2198 let connecting = endpoint.connect(candidate_address, "nat-traversal-peer")
2200 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Failed to initiate connection: {}", e)))?;
2201
2202 let connection = timeout(Duration::from_secs(10), connecting)
2203 .await
2204 .map_err(|_| NatTraversalError::Timeout)?
2205 .map_err(|e| NatTraversalError::ConnectionFailed(format!("Connection failed: {}", e)))?;
2206
2207 {
2209 let mut connections = self.connections.write()
2210 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2211 connections.insert(peer_id, connection.clone());
2212 }
2213
2214 {
2216 let mut sessions = self.active_sessions.write()
2217 .map_err(|_| NatTraversalError::ProtocolError("Sessions lock poisoned".to_string()))?;
2218 if let Some(session) = sessions.get_mut(&peer_id) {
2219 session.phase = TraversalPhase::Connected;
2220 }
2221 }
2222
2223 if let Some(ref callback) = self.event_callback {
2225 callback(NatTraversalEvent::ConnectionEstablished {
2226 peer_id,
2227 remote_address: candidate_address,
2228 });
2229 }
2230
2231 info!("Successfully established connection to peer {:?} at {}", peer_id, candidate_address);
2232 Ok(())
2233 }
2234
2235 #[cfg(feature = "production-ready")]
2241 async fn send_candidate_advertisement(
2242 &self,
2243 peer_id: PeerId,
2244 candidate: &CandidateAddress,
2245 ) -> Result<(), NatTraversalError> {
2246 debug!("Sending candidate advertisement to peer {:?}: {}", peer_id, candidate.address);
2247
2248 let connections = self.connections.read()
2250 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2251
2252 if let Some(_connection) = connections.get(&peer_id) {
2253 debug!("Found connection to peer {:?}, sending ADD_ADDRESS frame", peer_id);
2255
2256 drop(connections); let connections = self.connections.write()
2262 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2263
2264 if let Some(connection) = connections.get(&peer_id) {
2265 let mut frame_data = Vec::new();
2268 frame_data.push(0x40); let sequence = candidate.priority as u64; frame_data.extend_from_slice(&sequence.to_be_bytes());
2273
2274 match candidate.address {
2276 SocketAddr::V4(addr) => {
2277 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
2279 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2280 }
2281 SocketAddr::V6(addr) => {
2282 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
2284 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2285 }
2286 }
2287
2288 frame_data.extend_from_slice(&candidate.priority.to_be_bytes());
2290
2291 match connection.send_datagram(frame_data.into()) {
2293 Ok(()) => {
2294 info!("Sent ADD_ADDRESS frame to peer {:?}: addr={}, priority={}",
2295 peer_id, candidate.address, candidate.priority);
2296 Ok(())
2297 }
2298 Err(e) => {
2299 warn!("Failed to send ADD_ADDRESS frame to peer {:?}: {}", peer_id, e);
2300 Err(NatTraversalError::ProtocolError(format!("Failed to send ADD_ADDRESS frame: {}", e)))
2301 }
2302 }
2303 } else {
2304 debug!("Connection to peer {:?} disappeared during frame sending", peer_id);
2306 Ok(())
2307 }
2308 } else {
2309 debug!("No connection found for peer {:?} - candidate will be sent when connection is established", peer_id);
2311 Ok(())
2312 }
2313 }
2314
2315 #[cfg(feature = "production-ready")]
2320 async fn send_punch_coordination(
2321 &self,
2322 peer_id: PeerId,
2323 target_sequence: u64,
2324 local_address: SocketAddr,
2325 round: u32,
2326 ) -> Result<(), NatTraversalError> {
2327 debug!("Sending punch coordination to peer {:?}: seq={}, addr={}, round={}",
2328 peer_id, target_sequence, local_address, round);
2329
2330 let connections = self.connections.read()
2331 .map_err(|_| NatTraversalError::ProtocolError("Connections lock poisoned".to_string()))?;
2332
2333 if let Some(connection) = connections.get(&peer_id) {
2334 let mut frame_data = Vec::new();
2337 frame_data.push(0x41); frame_data.extend_from_slice(&round.to_be_bytes());
2341
2342 frame_data.extend_from_slice(&target_sequence.to_be_bytes());
2344
2345 match local_address {
2347 SocketAddr::V4(addr) => {
2348 frame_data.push(4); frame_data.extend_from_slice(&addr.ip().octets());
2350 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2351 }
2352 SocketAddr::V6(addr) => {
2353 frame_data.push(6); frame_data.extend_from_slice(&addr.ip().octets());
2355 frame_data.extend_from_slice(&addr.port().to_be_bytes());
2356 }
2357 }
2358
2359 match connection.send_datagram(frame_data.into()) {
2361 Ok(()) => {
2362 info!("Sent PUNCH_ME_NOW frame to peer {:?}: target_seq={}, local_addr={}, round={}",
2363 peer_id, target_sequence, local_address, round);
2364 Ok(())
2365 }
2366 Err(e) => {
2367 warn!("Failed to send PUNCH_ME_NOW frame to peer {:?}: {}", peer_id, e);
2368 Err(NatTraversalError::ProtocolError(format!("Failed to send PUNCH_ME_NOW frame: {}", e)))
2369 }
2370 }
2371 } else {
2372 Err(NatTraversalError::PeerNotConnected)
2373 }
2374 }
2375}
2376
2377impl fmt::Debug for NatTraversalEndpoint {
2378 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2379 f.debug_struct("NatTraversalEndpoint")
2380 .field("config", &self.config)
2381 .field("bootstrap_nodes", &"<RwLock>")
2382 .field("active_sessions", &"<RwLock>")
2383 .field("event_callback", &self.event_callback.is_some())
2384 .finish()
2385 }
2386}
2387
2388#[derive(Debug, Clone)]
2390pub struct NatTraversalStatistics {
2391 pub active_sessions: usize,
2393 pub total_bootstrap_nodes: usize,
2395 pub successful_coordinations: u32,
2397 pub average_coordination_time: Duration,
2399}
2400
2401impl fmt::Display for NatTraversalError {
2402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2403 match self {
2404 Self::NoBootstrapNodes => write!(f, "no bootstrap nodes available"),
2405 Self::NoCandidatesFound => write!(f, "no address candidates found"),
2406 Self::CandidateDiscoveryFailed(msg) => write!(f, "candidate discovery failed: {}", msg),
2407 Self::CoordinationFailed(msg) => write!(f, "coordination failed: {}", msg),
2408 Self::HolePunchingFailed => write!(f, "hole punching failed"),
2409 Self::ValidationTimeout => write!(f, "validation timeout"),
2410 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
2411 Self::ConfigError(msg) => write!(f, "configuration error: {}", msg),
2412 Self::ProtocolError(msg) => write!(f, "protocol error: {}", msg),
2413 Self::Timeout => write!(f, "operation timed out"),
2414 Self::ConnectionFailed(msg) => write!(f, "connection failed: {}", msg),
2415 Self::TraversalFailed(msg) => write!(f, "traversal failed: {}", msg),
2416 Self::PeerNotConnected => write!(f, "peer not connected"),
2417 }
2418 }
2419}
2420
2421impl std::error::Error for NatTraversalError {}
2422
2423impl fmt::Display for PeerId {
2424 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2425 for byte in &self.0[..8] {
2427 write!(f, "{:02x}", byte)?;
2428 }
2429 Ok(())
2430 }
2431}
2432
2433impl From<[u8; 32]> for PeerId {
2434 fn from(bytes: [u8; 32]) -> Self {
2435 Self(bytes)
2436 }
2437}
2438
2439#[derive(Debug)]
2442struct SkipServerVerification;
2443
2444impl SkipServerVerification {
2445 #[allow(dead_code)]
2446 fn new() -> Arc<Self> {
2447 Arc::new(Self)
2448 }
2449}
2450
2451impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
2452 fn verify_server_cert(
2453 &self,
2454 _end_entity: &rustls::pki_types::CertificateDer<'_>,
2455 _intermediates: &[rustls::pki_types::CertificateDer<'_>],
2456 _server_name: &rustls::pki_types::ServerName<'_>,
2457 _ocsp_response: &[u8],
2458 _now: rustls::pki_types::UnixTime,
2459 ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
2460 Ok(rustls::client::danger::ServerCertVerified::assertion())
2461 }
2462
2463 fn verify_tls12_signature(
2464 &self,
2465 _message: &[u8],
2466 _cert: &rustls::pki_types::CertificateDer<'_>,
2467 _dss: &rustls::DigitallySignedStruct,
2468 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
2469 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
2470 }
2471
2472 fn verify_tls13_signature(
2473 &self,
2474 _message: &[u8],
2475 _cert: &rustls::pki_types::CertificateDer<'_>,
2476 _dss: &rustls::DigitallySignedStruct,
2477 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
2478 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
2479 }
2480
2481 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
2482 vec![
2483 rustls::SignatureScheme::RSA_PKCS1_SHA256,
2484 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
2485 rustls::SignatureScheme::ED25519,
2486 ]
2487 }
2488}
2489
2490struct DefaultTokenStore;
2492
2493impl crate::TokenStore for DefaultTokenStore {
2494 fn insert(&self, _server_name: &str, _token: bytes::Bytes) {
2495 }
2497
2498 fn take(&self, _server_name: &str) -> Option<bytes::Bytes> {
2499 None
2500 }
2501}
2502
2503#[cfg(test)]
2504mod tests {
2505 use super::*;
2506
2507 #[test]
2508 fn test_nat_traversal_config_default() {
2509 let config = NatTraversalConfig::default();
2510 assert_eq!(config.role, EndpointRole::Client);
2511 assert_eq!(config.max_candidates, 8);
2512 assert!(config.enable_symmetric_nat);
2513 assert!(config.enable_relay_fallback);
2514 }
2515
2516 #[test]
2517 fn test_peer_id_display() {
2518 let peer_id = PeerId([0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77]);
2519 assert_eq!(format!("{}", peer_id), "0123456789abcdef");
2520 }
2521
2522 #[test]
2523 fn test_bootstrap_node_management() {
2524 let _config = NatTraversalConfig::default();
2525 }
2528}