qudag_network/
nat_traversal.rs

1//! NAT traversal and firewall penetration module for QuDAG network.
2//!
3//! This module implements comprehensive NAT traversal capabilities including:
4//! - STUN/TURN protocol support for NAT detection and relay
5//! - UPnP and NAT-PMP for automatic port mapping
6//! - Hole punching techniques for direct peer connections
7//! - AutoNAT protocol from libp2p for NAT detection
8//! - Relay functionality for unreachable peers
9//! - IPv6 support with fallback to IPv4
10//! - Connection upgrade paths from relay to direct connections
11
12use crate::connection::ConnectionManager;
13use crate::types::{ConnectionStatus, NetworkError, PeerId};
14use dashmap::DashMap;
15use libp2p::core::Multiaddr;
16use parking_lot::RwLock;
17use rand::{thread_rng, Rng};
18use serde::{Deserialize, Serialize};
19use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
20use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use thiserror::Error;
24use tokio::net::UdpSocket;
25use tokio::sync::{mpsc, Mutex, Semaphore};
26use tokio::time::{interval, sleep, timeout};
27use tracing::{debug, error, info, warn};
28
29/// STUN transaction ID type (12 bytes as per RFC 5389)
30type TransactionId = [u8; 12];
31
32/// STUN message structure
33#[derive(Debug, Clone)]
34pub struct Message {
35    /// Message type
36    pub msg_type: MessageType,
37    /// Transaction ID
38    pub transaction_id: TransactionId,
39    /// Attributes
40    pub attributes: Vec<Attribute>,
41}
42
43/// STUN message types
44#[derive(Debug, Clone, Copy, PartialEq)]
45pub enum MessageType {
46    /// Binding request
47    BindingRequest,
48    /// Binding response
49    BindingResponse,
50    /// Binding error response
51    BindingErrorResponse,
52    /// Allocate request (TURN)
53    AllocateRequest,
54    /// Allocate response (TURN)
55    AllocateResponse,
56}
57
58/// STUN attributes
59#[derive(Debug, Clone)]
60pub enum Attribute {
61    /// Mapped address
62    MappedAddress(SocketAddr),
63    /// XOR mapped address
64    XorMappedAddress(SocketAddr),
65    /// Changed address
66    ChangedAddress(SocketAddr),
67    /// Username
68    Username(String),
69    /// Message integrity
70    MessageIntegrity(Vec<u8>),
71    /// Error code
72    ErrorCode(u16, String),
73    /// Unknown attributes
74    UnknownAttributes(Vec<u16>),
75    /// Realm
76    Realm(String),
77    /// Nonce
78    Nonce(Vec<u8>),
79}
80
81// NatTraversalStats will be defined later with atomic fields
82
83/// Errors that can occur during NAT traversal operations
84#[derive(Debug, Error)]
85pub enum NatTraversalError {
86    /// STUN operation failed
87    #[error("STUN error: {0}")]
88    StunError(String),
89
90    /// TURN operation failed
91    #[error("TURN error: {0}")]
92    TurnError(String),
93
94    /// UPnP operation failed
95    #[error("UPnP error: {0}")]
96    UpnpError(String),
97
98    /// NAT-PMP operation failed
99    #[error("NAT-PMP error: {0}")]
100    NatPmpError(String),
101
102    /// Hole punching failed
103    #[error("Hole punching failed: {0}")]
104    HolePunchError(String),
105
106    /// Relay error
107    #[error("Relay error: {0}")]
108    RelayError(String),
109
110    /// NAT detection failed
111    #[error("NAT detection failed: {0}")]
112    DetectionError(String),
113
114    /// Connection upgrade failed
115    #[error("Connection upgrade failed: {0}")]
116    UpgradeError(String),
117
118    /// Network error
119    #[error("Network error: {0}")]
120    NetworkError(#[from] NetworkError),
121
122    /// IO error
123    #[error("IO error: {0}")]
124    IoError(#[from] std::io::Error),
125
126    /// Timeout error
127    #[error("Operation timed out")]
128    Timeout,
129
130    /// Connection error
131    #[error("Connection error: {0}")]
132    ConnectionError(NetworkError),
133}
134
135/// NAT types detected by the system
136#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
137pub enum NatType {
138    /// No NAT - direct internet connection
139    None,
140    /// Full cone NAT (one-to-one NAT)
141    FullCone,
142    /// Restricted cone NAT
143    RestrictedCone,
144    /// Port restricted cone NAT
145    PortRestrictedCone,
146    /// Symmetric NAT (hardest to traverse)
147    Symmetric,
148    /// Unknown NAT type
149    Unknown,
150}
151
152/// NAT detection result
153#[derive(Debug, Clone)]
154pub struct NatInfo {
155    /// Detected NAT type
156    pub nat_type: NatType,
157    /// Public IP address
158    pub public_ip: Option<IpAddr>,
159    /// Public port (if applicable)
160    pub public_port: Option<u16>,
161    /// Local IP address
162    pub local_ip: IpAddr,
163    /// Local port
164    pub local_port: u16,
165    /// Supports hairpinning
166    pub hairpinning: bool,
167    /// Detection timestamp
168    pub detected_at: Instant,
169    /// Confidence score (0.0 to 1.0)
170    pub confidence: f64,
171}
172
173/// STUN server configuration
174#[derive(Debug, Clone)]
175pub struct StunServer {
176    /// Server address
177    pub address: SocketAddr,
178    /// Server priority (lower is better)
179    pub priority: u32,
180    /// Is this server responsive
181    pub is_active: bool,
182    /// Last successful response time
183    pub last_success: Option<Instant>,
184    /// Average response time
185    pub avg_response_ms: u64,
186}
187
188impl StunServer {
189    /// Create a new STUN server configuration
190    pub fn new(address: SocketAddr, priority: u32) -> Self {
191        Self {
192            address,
193            priority,
194            is_active: true,
195            last_success: None,
196            avg_response_ms: 0,
197        }
198    }
199}
200
201/// TURN server configuration with credentials
202#[derive(Debug, Clone)]
203pub struct TurnServer {
204    /// Server address
205    pub address: SocketAddr,
206    /// Username for authentication
207    pub username: String,
208    /// Password for authentication
209    pub password: String,
210    /// Realm (optional)
211    pub realm: Option<String>,
212    /// Server priority
213    pub priority: u32,
214    /// Is this server active
215    pub is_active: bool,
216    /// Allocated relay address
217    pub relay_address: Option<SocketAddr>,
218}
219
220/// Configuration for NAT traversal
221#[derive(Debug, Clone)]
222pub struct NatTraversalConfig {
223    /// Enable STUN
224    pub enable_stun: bool,
225    /// Enable TURN
226    pub enable_turn: bool,
227    /// Enable UPnP
228    pub enable_upnp: bool,
229    /// Enable NAT-PMP
230    pub enable_nat_pmp: bool,
231    /// Enable hole punching
232    pub enable_hole_punching: bool,
233    /// Enable relay fallback
234    pub enable_relay: bool,
235    /// Enable IPv6
236    pub enable_ipv6: bool,
237    /// STUN servers
238    pub stun_servers: Vec<StunServer>,
239    /// TURN servers
240    pub turn_servers: Vec<TurnServer>,
241    /// Maximum relay connections
242    pub max_relay_connections: usize,
243    /// Hole punch timeout
244    pub hole_punch_timeout: Duration,
245    /// NAT detection interval
246    pub detection_interval: Duration,
247    /// Connection upgrade interval
248    pub upgrade_interval: Duration,
249    /// Port mapping lifetime (for UPnP/NAT-PMP)
250    pub port_mapping_lifetime: Duration,
251}
252
253impl Default for NatTraversalConfig {
254    fn default() -> Self {
255        Self {
256            enable_stun: true,
257            enable_turn: true,
258            enable_upnp: true,
259            enable_nat_pmp: true,
260            enable_hole_punching: true,
261            enable_relay: true,
262            enable_ipv6: true,
263            stun_servers: vec![
264                StunServer::new("stun1.l.google.com:19302".parse().unwrap(), 1),
265                StunServer::new("stun2.l.google.com:19302".parse().unwrap(), 2),
266                StunServer::new("stun3.l.google.com:19302".parse().unwrap(), 3),
267                StunServer::new("stun4.l.google.com:19302".parse().unwrap(), 4),
268            ],
269            turn_servers: vec![],
270            max_relay_connections: 50,
271            hole_punch_timeout: Duration::from_secs(30),
272            detection_interval: Duration::from_secs(300), // 5 minutes
273            upgrade_interval: Duration::from_secs(60),    // 1 minute
274            port_mapping_lifetime: Duration::from_secs(3600), // 1 hour
275        }
276    }
277}
278
279/// Main NAT traversal manager
280pub struct NatTraversalManager {
281    /// Configuration
282    config: NatTraversalConfig,
283    /// Current NAT information
284    nat_info: Arc<RwLock<Option<NatInfo>>>,
285    /// Connection manager reference
286    connection_manager: Arc<ConnectionManager>,
287    /// STUN client
288    stun_client: Arc<StunClient>,
289    /// TURN client
290    turn_client: Arc<TurnClient>,
291    /// UPnP manager
292    upnp_manager: Arc<UpnpManager>,
293    /// NAT-PMP client
294    nat_pmp_client: Arc<NatPmpClient>,
295    /// Hole punch coordinator
296    hole_punch_coordinator: Arc<HolePunchCoordinator>,
297    /// Relay manager
298    relay_manager: Arc<RelayManager>,
299    /// Connection upgrade manager
300    upgrade_manager: Arc<ConnectionUpgradeManager>,
301    /// Active port mappings
302    port_mappings: Arc<DashMap<u16, PortMapping>>,
303    /// NAT detection task handle
304    detection_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
305    /// Statistics
306    stats: Arc<NatTraversalStats>,
307}
308
309/// Port mapping information
310#[derive(Debug, Clone)]
311pub struct PortMapping {
312    /// Local port
313    pub local_port: u16,
314    /// External port
315    pub external_port: u16,
316    /// Protocol (TCP/UDP)
317    pub protocol: PortMappingProtocol,
318    /// Mapping method (UPnP/NAT-PMP)
319    pub method: PortMappingMethod,
320    /// Creation timestamp
321    pub created_at: Instant,
322    /// Expiration time
323    pub expires_at: Instant,
324}
325
326/// Port mapping protocol
327#[derive(Debug, Clone, Copy, PartialEq, Eq)]
328pub enum PortMappingProtocol {
329    /// TCP protocol
330    TCP,
331    /// UDP protocol
332    UDP,
333}
334
335/// Port mapping method
336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337pub enum PortMappingMethod {
338    /// UPnP mapping
339    Upnp,
340    /// NAT-PMP mapping
341    NatPmp,
342    /// Manual mapping
343    Manual,
344}
345
346/// NAT traversal statistics
347#[derive(Debug)]
348pub struct NatTraversalStats {
349    /// Total traversal attempts
350    pub total_attempts: AtomicU64,
351    /// Successful traversals
352    pub successful_traversals: AtomicU64,
353    /// Failed traversals
354    pub failed_traversals: AtomicU64,
355    /// Successful STUN queries
356    pub stun_success: AtomicU64,
357    /// Failed STUN queries
358    pub stun_failures: AtomicU64,
359    /// Successful hole punches
360    pub hole_punch_success: AtomicU64,
361    /// Failed hole punches
362    pub hole_punch_failures: AtomicU64,
363    /// Active relay connections
364    pub relay_connections: AtomicU32,
365    /// Upgraded connections
366    pub upgraded_connections: AtomicU64,
367    /// Port mappings created
368    pub port_mappings_created: AtomicU64,
369    /// Port mappings failed
370    pub port_mappings_failed: AtomicU64,
371    /// Average traversal time (in milliseconds)
372    pub avg_traversal_time_ms: AtomicU64,
373}
374
375impl Default for NatTraversalStats {
376    fn default() -> Self {
377        Self {
378            total_attempts: AtomicU64::new(0),
379            successful_traversals: AtomicU64::new(0),
380            failed_traversals: AtomicU64::new(0),
381            stun_success: AtomicU64::new(0),
382            stun_failures: AtomicU64::new(0),
383            hole_punch_success: AtomicU64::new(0),
384            hole_punch_failures: AtomicU64::new(0),
385            relay_connections: AtomicU32::new(0),
386            upgraded_connections: AtomicU64::new(0),
387            port_mappings_created: AtomicU64::new(0),
388            port_mappings_failed: AtomicU64::new(0),
389            avg_traversal_time_ms: AtomicU64::new(0),
390        }
391    }
392}
393
394/// STUN client for NAT detection and address discovery
395pub struct StunClient {
396    /// STUN servers
397    servers: Arc<RwLock<Vec<StunServer>>>,
398    /// UDP socket for STUN
399    socket: Arc<Mutex<Option<UdpSocket>>>,
400    /// Transaction tracking
401    #[allow(dead_code)]
402    transactions: Arc<DashMap<TransactionId, StunTransaction>>,
403}
404
405/// STUN transaction information
406#[derive(Debug)]
407#[allow(dead_code)]
408struct StunTransaction {
409    /// Server address
410    server: SocketAddr,
411    /// Request timestamp
412    sent_at: Instant,
413    /// Response callback
414    callback: Arc<Mutex<Option<mpsc::Sender<Result<Message, NatTraversalError>>>>>,
415}
416
417impl StunClient {
418    /// Create a new STUN client
419    pub fn new(servers: Vec<StunServer>) -> Self {
420        Self {
421            servers: Arc::new(RwLock::new(servers)),
422            socket: Arc::new(Mutex::new(None)),
423            transactions: Arc::new(DashMap::new()),
424        }
425    }
426
427    /// Detect NAT type and public address
428    pub async fn detect_nat(&self) -> Result<NatInfo, NatTraversalError> {
429        // Bind local socket
430        let local_addr = if false {
431            // TODO: Add ipv6 feature flag
432            SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
433        } else {
434            SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
435        };
436
437        let socket = UdpSocket::bind(local_addr).await?;
438        let local_addr = socket.local_addr()?;
439
440        // Store socket
441        *self.socket.lock().await = Some(socket);
442
443        // Test with multiple STUN servers
444        let mut results = Vec::new();
445        let servers = self.servers.read().clone();
446
447        for server in servers.iter().filter(|s| s.is_active) {
448            match self.query_stun_server(&server.address).await {
449                Ok(mapped_addr) => {
450                    results.push((server.clone(), mapped_addr));
451                    if results.len() >= 3 {
452                        break; // We have enough results
453                    }
454                }
455                Err(e) => {
456                    warn!("STUN query to {} failed: {}", server.address, e);
457                }
458            }
459        }
460
461        if results.is_empty() {
462            return Err(NatTraversalError::DetectionError(
463                "No STUN servers responded".to_string(),
464            ));
465        }
466
467        // Analyze results to determine NAT type
468        let nat_type = self.analyze_nat_type(&results, local_addr).await?;
469        let (public_ip, public_port) = if let Some((_, addr)) = results.first() {
470            (Some(addr.ip()), Some(addr.port()))
471        } else {
472            (None, None)
473        };
474
475        Ok(NatInfo {
476            nat_type,
477            public_ip,
478            public_port,
479            local_ip: local_addr.ip(),
480            local_port: local_addr.port(),
481            hairpinning: false, // TODO: Test hairpinning
482            detected_at: Instant::now(),
483            confidence: self.calculate_confidence(&results),
484        })
485    }
486
487    /// Query a single STUN server
488    async fn query_stun_server(
489        &self,
490        server: &SocketAddr,
491    ) -> Result<SocketAddr, NatTraversalError> {
492        // Get the socket from the mutex guard
493        let socket_guard = self.socket.lock().await;
494        let socket = socket_guard
495            .as_ref()
496            .ok_or_else(|| NatTraversalError::StunError("Socket not initialized".to_string()))?;
497
498        // Simple STUN-like request - send a UDP packet and expect echo with public address
499        let request_data = b"STUN_REQUEST";
500
501        // Send request
502        socket
503            .send_to(request_data, server)
504            .await
505            .map_err(|e| NatTraversalError::StunError(e.to_string()))?;
506
507        // Wait for response
508        let mut response_buf = vec![0u8; 1024];
509        let (_len, from) = timeout(Duration::from_secs(5), socket.recv_from(&mut response_buf))
510            .await
511            .map_err(|_| NatTraversalError::Timeout)??;
512
513        if from != *server {
514            return Err(NatTraversalError::StunError(
515                "Response from wrong server".to_string(),
516            ));
517        }
518
519        // For simplicity, assume the response contains the public address
520        // In a real implementation, this would parse STUN protocol messages
521        let local_addr = socket.local_addr()?;
522
523        // Mock response - in real implementation this would be parsed from STUN response
524        Ok(SocketAddr::new(server.ip(), local_addr.port()))
525    }
526
527    /// Analyze NAT type based on STUN results
528    async fn analyze_nat_type(
529        &self,
530        results: &[(StunServer, SocketAddr)],
531        local_addr: SocketAddr,
532    ) -> Result<NatType, NatTraversalError> {
533        // If public IP matches local IP, no NAT
534        if let Some((_, public_addr)) = results.first() {
535            if public_addr.ip() == local_addr.ip() {
536                return Ok(NatType::None);
537            }
538        }
539
540        // Check if all results have the same public IP/port
541        let all_same = results.windows(2).all(|w| w[0].1 == w[1].1);
542
543        if all_same {
544            // Could be Full Cone or Restricted Cone
545            // Need additional tests to distinguish
546            Ok(NatType::RestrictedCone)
547        } else {
548            // Different mappings for different servers = Symmetric NAT
549            Ok(NatType::Symmetric)
550        }
551    }
552
553    /// Calculate confidence score based on results
554    fn calculate_confidence(&self, results: &[(StunServer, SocketAddr)]) -> f64 {
555        let base_confidence = results.len() as f64 / 3.0; // More servers = higher confidence
556        base_confidence.min(1.0)
557    }
558}
559
560/// TURN client for relay allocation
561pub struct TurnClient {
562    /// TURN servers
563    servers: Arc<RwLock<Vec<TurnServer>>>,
564    /// Active allocations
565    allocations: Arc<DashMap<SocketAddr, TurnAllocation>>,
566    /// Allocation semaphore
567    allocation_limit: Arc<Semaphore>,
568}
569
570/// TURN allocation information
571#[derive(Debug, Clone)]
572pub struct TurnAllocation {
573    /// Server address
574    pub server: SocketAddr,
575    /// Allocated relay address
576    pub relay_address: SocketAddr,
577    /// Allocation lifetime
578    pub lifetime: Duration,
579    /// Created timestamp
580    pub created_at: Instant,
581    /// Refresh handle
582    pub refresh_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
583}
584
585impl TurnClient {
586    /// Create a new TURN client
587    pub fn new(servers: Vec<TurnServer>, max_allocations: usize) -> Self {
588        Self {
589            servers: Arc::new(RwLock::new(servers)),
590            allocations: Arc::new(DashMap::new()),
591            allocation_limit: Arc::new(Semaphore::new(max_allocations)),
592        }
593    }
594
595    /// Allocate a relay address
596    pub async fn allocate_relay(&self) -> Result<TurnAllocation, NatTraversalError> {
597        // Acquire allocation permit
598        let _permit =
599            self.allocation_limit.acquire().await.map_err(|_| {
600                NatTraversalError::TurnError("Allocation limit reached".to_string())
601            })?;
602
603        // Try each TURN server
604        let servers = self.servers.read().clone();
605        for server in servers.iter().filter(|s| s.is_active) {
606            match self.allocate_from_server(server).await {
607                Ok(allocation) => {
608                    self.allocations.insert(server.address, allocation.clone());
609                    return Ok(allocation);
610                }
611                Err(e) => {
612                    warn!("TURN allocation from {} failed: {}", server.address, e);
613                }
614            }
615        }
616
617        Err(NatTraversalError::TurnError(
618            "No TURN servers available".to_string(),
619        ))
620    }
621
622    /// Allocate from a specific TURN server
623    async fn allocate_from_server(
624        &self,
625        server: &TurnServer,
626    ) -> Result<TurnAllocation, NatTraversalError> {
627        // TODO: Implement actual TURN allocation protocol
628        // For now, return a mock allocation
629        Ok(TurnAllocation {
630            server: server.address,
631            relay_address: server.address, // In real implementation, this would be different
632            lifetime: Duration::from_secs(600),
633            created_at: Instant::now(),
634            refresh_handle: Arc::new(Mutex::new(None)),
635        })
636    }
637}
638
639/// Simple UPnP gateway representation
640#[derive(Debug, Clone)]
641pub struct SimpleGateway {
642    /// Gateway address
643    pub address: SocketAddr,
644    /// Friendly name
645    pub name: String,
646}
647
648/// UPnP manager for automatic port mapping
649pub struct UpnpManager {
650    /// Gateway device
651    gateway: Arc<Mutex<Option<SimpleGateway>>>,
652    /// Active mappings
653    mappings: Arc<DashMap<u16, UpnpMapping>>,
654    /// Mapping refresh interval
655    #[allow(dead_code)]
656    refresh_interval: Duration,
657}
658
659/// UPnP mapping information
660#[derive(Debug, Clone)]
661pub struct UpnpMapping {
662    /// Local port
663    pub local_port: u16,
664    /// External port  
665    pub external_port: u16,
666    /// Protocol
667    pub protocol: PortMappingProtocol,
668    /// Description
669    pub description: String,
670    /// Lease duration
671    pub lease_duration: Duration,
672    /// Created timestamp
673    pub created_at: Instant,
674}
675
676impl UpnpManager {
677    /// Create a new UPnP manager
678    pub fn new(refresh_interval: Duration) -> Self {
679        Self {
680            gateway: Arc::new(Mutex::new(None)),
681            mappings: Arc::new(DashMap::new()),
682            refresh_interval,
683        }
684    }
685
686    /// Discover UPnP gateway
687    pub async fn discover_gateway(&self) -> Result<(), NatTraversalError> {
688        // Simple UPnP discovery simulation
689        // In a real implementation, this would use SSDP multicast discovery
690        let potential_gateways = vec!["192.168.1.1:1900", "192.168.0.1:1900", "10.0.0.1:1900"];
691
692        for gateway_addr in potential_gateways {
693            if let Ok(addr) = gateway_addr.parse::<SocketAddr>() {
694                // Test if gateway responds
695                if let Ok(socket) = UdpSocket::bind("0.0.0.0:0").await {
696                    if socket.send_to(b"M-SEARCH", addr).await.is_ok() {
697                        info!("Discovered UPnP gateway at: {}", addr);
698                        let gateway = SimpleGateway {
699                            address: addr,
700                            name: "UPnP Gateway".to_string(),
701                        };
702                        *self.gateway.lock().await = Some(gateway);
703                        return Ok(());
704                    }
705                }
706            }
707        }
708
709        Err(NatTraversalError::UpnpError(
710            "No UPnP gateway found".to_string(),
711        ))
712    }
713
714    /// Create port mapping
715    pub async fn create_mapping(
716        &self,
717        local_port: u16,
718        external_port: u16,
719        protocol: PortMappingProtocol,
720        description: &str,
721        lease_duration: Duration,
722    ) -> Result<UpnpMapping, NatTraversalError> {
723        // Simulate UPnP port mapping
724        // In a real implementation, this would send UPnP control messages
725        info!(
726            "Creating UPnP port mapping: {}:{} -> {} ({})",
727            local_port, external_port, protocol as u8, description
728        );
729
730        let mapping = UpnpMapping {
731            local_port,
732            external_port,
733            protocol,
734            description: description.to_string(),
735            lease_duration,
736            created_at: Instant::now(),
737        };
738
739        self.mappings.insert(local_port, mapping.clone());
740        Ok(mapping)
741    }
742
743    /// Get local IP address
744    #[allow(dead_code)]
745    async fn get_local_ip(&self) -> Result<IpAddr, NatTraversalError> {
746        // Try to get local IP by connecting to a public address
747        let socket = UdpSocket::bind("0.0.0.0:0").await?;
748        socket.connect("8.8.8.8:80").await?;
749        let local_addr = socket.local_addr()?;
750        Ok(local_addr.ip())
751    }
752}
753
754/// NAT-PMP client for port mapping
755pub struct NatPmpClient {
756    /// Gateway address
757    gateway: Arc<Mutex<Option<IpAddr>>>,
758    /// Active mappings
759    mappings: Arc<DashMap<u16, NatPmpMapping>>,
760}
761
762/// NAT-PMP mapping information
763#[derive(Debug, Clone)]
764pub struct NatPmpMapping {
765    /// Local port
766    pub local_port: u16,
767    /// External port
768    pub external_port: u16,
769    /// Protocol (TCP/UDP)
770    pub is_tcp: bool,
771    /// Lifetime
772    pub lifetime: Duration,
773    /// Created timestamp
774    pub created_at: Instant,
775}
776
777impl NatPmpClient {
778    /// Create a new NAT-PMP client
779    pub fn new() -> Self {
780        Self {
781            gateway: Arc::new(Mutex::new(None)),
782            mappings: Arc::new(DashMap::new()),
783        }
784    }
785
786    /// Discover NAT-PMP gateway
787    pub async fn discover_gateway(&self) -> Result<(), NatTraversalError> {
788        // TODO: Implement gateway discovery
789        // For now, try common gateway addresses
790        let common_gateways = vec!["192.168.1.1", "192.168.0.1", "10.0.0.1"];
791
792        for gateway_str in common_gateways {
793            if let Ok(gateway) = gateway_str.parse::<IpAddr>() {
794                // Test if gateway responds to NAT-PMP
795                if self.test_gateway(&gateway).await {
796                    *self.gateway.lock().await = Some(gateway);
797                    info!("Discovered NAT-PMP gateway: {}", gateway);
798                    return Ok(());
799                }
800            }
801        }
802
803        Err(NatTraversalError::NatPmpError(
804            "No NAT-PMP gateway found".to_string(),
805        ))
806    }
807
808    /// Test if an address responds to NAT-PMP
809    async fn test_gateway(&self, _gateway: &IpAddr) -> bool {
810        // TODO: Implement actual NAT-PMP protocol test
811        // For now, return false
812        false
813    }
814
815    /// Create port mapping
816    pub async fn create_mapping(
817        &self,
818        local_port: u16,
819        external_port: u16,
820        is_tcp: bool,
821        lifetime: Duration,
822    ) -> Result<NatPmpMapping, NatTraversalError> {
823        let gateway = self.gateway.lock().await;
824        let _gateway_addr = gateway
825            .as_ref()
826            .ok_or_else(|| NatTraversalError::NatPmpError("No gateway discovered".to_string()))?;
827
828        // TODO: Implement actual NAT-PMP mapping protocol
829
830        let mapping = NatPmpMapping {
831            local_port,
832            external_port,
833            is_tcp,
834            lifetime,
835            created_at: Instant::now(),
836        };
837
838        self.mappings.insert(local_port, mapping.clone());
839        Ok(mapping)
840    }
841}
842
843/// Hole punch coordinator for establishing direct connections
844pub struct HolePunchCoordinator {
845    /// Active hole punch attempts
846    attempts: Arc<DashMap<PeerId, HolePunchAttempt>>,
847    /// Success callback handlers
848    success_handlers: Arc<DashMap<PeerId, mpsc::Sender<SocketAddr>>>,
849    /// Hole punch timeout
850    timeout: Duration,
851}
852
853/// Hole punch attempt information
854#[derive(Debug)]
855pub struct HolePunchAttempt {
856    /// Target peer
857    pub peer_id: PeerId,
858    /// Local candidate addresses
859    pub local_candidates: Vec<SocketAddr>,
860    /// Remote candidate addresses
861    pub remote_candidates: Vec<SocketAddr>,
862    /// Started timestamp
863    pub started_at: Instant,
864    /// Current phase
865    pub phase: HolePunchPhase,
866    /// Success flag
867    pub succeeded: Arc<AtomicBool>,
868}
869
870/// Hole punch phases
871#[derive(Debug, Clone, Copy, PartialEq, Eq)]
872pub enum HolePunchPhase {
873    /// Gathering candidates
874    GatheringCandidates,
875    /// Exchanging candidates
876    ExchangingCandidates,
877    /// Probing connections
878    Probing,
879    /// Connection established
880    Connected,
881    /// Failed
882    Failed,
883}
884
885impl HolePunchCoordinator {
886    /// Create a new hole punch coordinator
887    pub fn new(timeout: Duration) -> Self {
888        Self {
889            attempts: Arc::new(DashMap::new()),
890            success_handlers: Arc::new(DashMap::new()),
891            timeout,
892        }
893    }
894
895    /// Start hole punching to a peer
896    pub async fn start_hole_punch(
897        &self,
898        peer_id: PeerId,
899        local_candidates: Vec<SocketAddr>,
900        remote_candidates: Vec<SocketAddr>,
901    ) -> Result<SocketAddr, NatTraversalError> {
902        info!("Starting hole punch to peer {:?}", peer_id);
903
904        let attempt = HolePunchAttempt {
905            peer_id,
906            local_candidates: local_candidates.clone(),
907            remote_candidates: remote_candidates.clone(),
908            started_at: Instant::now(),
909            phase: HolePunchPhase::Probing,
910            succeeded: Arc::new(AtomicBool::new(false)),
911        };
912
913        self.attempts.insert(peer_id, attempt);
914
915        // Create result channel
916        let (tx, mut rx) = mpsc::channel(1);
917        self.success_handlers.insert(peer_id, tx);
918
919        // Start probing all candidate pairs
920        let _probe_tasks: Vec<_> = local_candidates
921            .iter()
922            .flat_map(|local| {
923                remote_candidates
924                    .iter()
925                    .map(move |remote| self.probe_candidate_pair(*local, *remote, peer_id))
926            })
927            .collect();
928
929        // Wait for success or timeout
930        tokio::select! {
931            result = rx.recv() => {
932                match result {
933                    Some(addr) => {
934                        self.mark_success(peer_id);
935                        Ok(addr)
936                    }
937                    None => Err(NatTraversalError::HolePunchError("Channel closed".to_string()))
938                }
939            }
940            _ = sleep(self.timeout) => {
941                self.mark_failure(peer_id);
942                Err(NatTraversalError::HolePunchError("Timeout".to_string()))
943            }
944        }
945    }
946
947    /// Probe a candidate pair
948    async fn probe_candidate_pair(
949        &self,
950        local: SocketAddr,
951        remote: SocketAddr,
952        peer_id: PeerId,
953    ) -> Result<(), NatTraversalError> {
954        debug!("Probing candidate pair: {} -> {}", local, remote);
955
956        let socket = UdpSocket::bind(local).await?;
957
958        // Send probe packets
959        for i in 0..5 {
960            let probe_data = format!("HOLE_PUNCH_PROBE_{}", i).into_bytes();
961            socket.send_to(&probe_data, remote).await?;
962
963            // Wait for response with short timeout
964            let mut buf = vec![0u8; 1024];
965            match timeout(Duration::from_millis(200), socket.recv_from(&mut buf)).await {
966                Ok(Ok((len, from))) => {
967                    if from == remote && len > 0 {
968                        // Success! Notify coordinator
969                        if let Some(handler) = self.success_handlers.get(&peer_id) {
970                            let _ = handler.send(local).await;
971                        }
972                        return Ok(());
973                    }
974                }
975                _ => continue, // Timeout or error, try next probe
976            }
977
978            sleep(Duration::from_millis(100)).await;
979        }
980
981        Err(NatTraversalError::HolePunchError(
982            "No response from remote".to_string(),
983        ))
984    }
985
986    /// Mark hole punch as successful
987    fn mark_success(&self, peer_id: PeerId) {
988        if let Some(mut attempt) = self.attempts.get_mut(&peer_id) {
989            attempt.phase = HolePunchPhase::Connected;
990            attempt.succeeded.store(true, Ordering::Relaxed);
991        }
992    }
993
994    /// Mark hole punch as failed
995    fn mark_failure(&self, peer_id: PeerId) {
996        if let Some(mut attempt) = self.attempts.get_mut(&peer_id) {
997            attempt.phase = HolePunchPhase::Failed;
998        }
999    }
1000}
1001
1002/// Relay manager for unreachable peers
1003pub struct RelayManager {
1004    /// Available relay servers
1005    relay_servers: Arc<RwLock<Vec<RelayServer>>>,
1006    /// Active relay connections
1007    relay_connections: Arc<DashMap<PeerId, RelayConnection>>,
1008    /// Connection limit
1009    connection_limit: Arc<Semaphore>,
1010    /// Relay statistics
1011    stats: Arc<RelayStats>,
1012}
1013
1014/// Relay server information
1015#[derive(Debug, Clone)]
1016pub struct RelayServer {
1017    /// Server ID
1018    pub id: PeerId,
1019    /// Server address
1020    pub address: Multiaddr,
1021    /// Server capacity
1022    pub capacity: u32,
1023    /// Current load
1024    pub load: Arc<AtomicU32>,
1025    /// Is available
1026    pub is_available: bool,
1027    /// Last health check
1028    pub last_health_check: Option<Instant>,
1029}
1030
1031/// Relay connection information
1032#[derive(Debug, Clone)]
1033pub struct RelayConnection {
1034    /// Relay server
1035    pub relay_server: PeerId,
1036    /// Target peer
1037    pub target_peer: PeerId,
1038    /// Connection ID
1039    pub connection_id: u64,
1040    /// Established timestamp
1041    pub established_at: Instant,
1042    /// Bytes relayed
1043    pub bytes_relayed: Arc<AtomicU64>,
1044    /// Is active
1045    pub is_active: Arc<AtomicBool>,
1046}
1047
1048/// Relay statistics
1049#[derive(Debug)]
1050pub struct RelayStats {
1051    /// Total relay connections
1052    pub total_connections: AtomicU64,
1053    /// Active relay connections
1054    pub active_connections: AtomicU32,
1055    /// Total bytes relayed
1056    pub bytes_relayed: AtomicU64,
1057    /// Failed relay attempts
1058    pub failed_attempts: AtomicU64,
1059}
1060
1061impl RelayManager {
1062    /// Create a new relay manager
1063    pub fn new(max_connections: usize) -> Self {
1064        Self {
1065            relay_servers: Arc::new(RwLock::new(Vec::new())),
1066            relay_connections: Arc::new(DashMap::new()),
1067            connection_limit: Arc::new(Semaphore::new(max_connections)),
1068            stats: Arc::new(RelayStats {
1069                total_connections: AtomicU64::new(0),
1070                active_connections: AtomicU32::new(0),
1071                bytes_relayed: AtomicU64::new(0),
1072                failed_attempts: AtomicU64::new(0),
1073            }),
1074        }
1075    }
1076
1077    /// Add a relay server
1078    pub async fn add_relay_server(&self, server: RelayServer) {
1079        self.relay_servers.write().push(server);
1080    }
1081
1082    /// Establish relay connection to a peer
1083    pub async fn establish_relay(
1084        &self,
1085        target_peer: PeerId,
1086    ) -> Result<RelayConnection, NatTraversalError> {
1087        // Acquire connection permit
1088        let _permit =
1089            self.connection_limit.acquire().await.map_err(|_| {
1090                NatTraversalError::RelayError("Connection limit reached".to_string())
1091            })?;
1092
1093        // Find best relay server
1094        let relay_server = self.select_relay_server().await?;
1095
1096        // Create relay connection
1097        let connection = RelayConnection {
1098            relay_server: relay_server.id,
1099            target_peer,
1100            connection_id: thread_rng().gen(),
1101            established_at: Instant::now(),
1102            bytes_relayed: Arc::new(AtomicU64::new(0)),
1103            is_active: Arc::new(AtomicBool::new(true)),
1104        };
1105
1106        // Update stats
1107        self.stats.total_connections.fetch_add(1, Ordering::Relaxed);
1108        self.stats
1109            .active_connections
1110            .fetch_add(1, Ordering::Relaxed);
1111        relay_server.load.fetch_add(1, Ordering::Relaxed);
1112
1113        self.relay_connections
1114            .insert(target_peer, connection.clone());
1115
1116        info!(
1117            "Established relay connection to {:?} via {:?}",
1118            target_peer, relay_server.id
1119        );
1120        Ok(connection)
1121    }
1122
1123    /// Select best relay server based on load
1124    async fn select_relay_server(&self) -> Result<RelayServer, NatTraversalError> {
1125        let servers = self.relay_servers.read();
1126
1127        servers
1128            .iter()
1129            .filter(|s| s.is_available)
1130            .min_by_key(|s| s.load.load(Ordering::Relaxed))
1131            .cloned()
1132            .ok_or_else(|| NatTraversalError::RelayError("No relay servers available".to_string()))
1133    }
1134
1135    /// Close relay connection
1136    pub async fn close_relay(&self, peer_id: &PeerId) {
1137        if let Some((_, connection)) = self.relay_connections.remove(peer_id) {
1138            connection.is_active.store(false, Ordering::Relaxed);
1139            self.stats
1140                .active_connections
1141                .fetch_sub(1, Ordering::Relaxed);
1142
1143            // Update relay server load
1144            let servers = self.relay_servers.read();
1145            if let Some(server) = servers.iter().find(|s| s.id == connection.relay_server) {
1146                server.load.fetch_sub(1, Ordering::Relaxed);
1147            }
1148        }
1149    }
1150}
1151
1152/// Connection upgrade manager for upgrading relay connections to direct
1153pub struct ConnectionUpgradeManager {
1154    /// Upgrade attempts
1155    upgrade_attempts: Arc<DashMap<PeerId, UpgradeAttempt>>,
1156    /// Upgrade interval
1157    upgrade_interval: Duration,
1158    /// NAT traversal manager reference
1159    nat_manager: Option<Arc<NatTraversalManager>>,
1160}
1161
1162/// Connection upgrade attempt
1163#[derive(Debug)]
1164pub struct UpgradeAttempt {
1165    /// Target peer
1166    pub peer_id: PeerId,
1167    /// Current connection type
1168    pub current_type: ConnectionType,
1169    /// Attempt count
1170    pub attempt_count: u32,
1171    /// Last attempt timestamp
1172    pub last_attempt: Instant,
1173    /// Success flag
1174    pub succeeded: bool,
1175}
1176
1177/// Connection type
1178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1179pub enum ConnectionType {
1180    /// Direct connection
1181    Direct,
1182    /// Relay connection
1183    Relay,
1184    /// TURN relay
1185    Turn,
1186}
1187
1188impl ConnectionUpgradeManager {
1189    /// Create a new connection upgrade manager
1190    pub fn new(upgrade_interval: Duration) -> Self {
1191        Self {
1192            upgrade_attempts: Arc::new(DashMap::new()),
1193            upgrade_interval,
1194            nat_manager: None,
1195        }
1196    }
1197
1198    /// Set NAT manager reference
1199    pub fn set_nat_manager(&mut self, nat_manager: Arc<NatTraversalManager>) {
1200        self.nat_manager = Some(nat_manager);
1201    }
1202
1203    /// Try to upgrade a connection
1204    pub async fn try_upgrade(
1205        &self,
1206        peer_id: PeerId,
1207        current_type: ConnectionType,
1208    ) -> Result<ConnectionType, NatTraversalError> {
1209        if current_type == ConnectionType::Direct {
1210            return Ok(ConnectionType::Direct); // Already direct
1211        }
1212
1213        let mut attempt = self
1214            .upgrade_attempts
1215            .entry(peer_id)
1216            .or_insert(UpgradeAttempt {
1217                peer_id,
1218                current_type,
1219                attempt_count: 0,
1220                last_attempt: Instant::now(),
1221                succeeded: false,
1222            });
1223
1224        // Check if we should attempt upgrade
1225        if attempt.last_attempt.elapsed() < self.upgrade_interval {
1226            return Err(NatTraversalError::UpgradeError(
1227                "Too soon to retry".to_string(),
1228            ));
1229        }
1230
1231        attempt.attempt_count += 1;
1232        attempt.last_attempt = Instant::now();
1233
1234        // Try hole punching if we have NAT manager
1235        if let Some(nat_manager) = &self.nat_manager {
1236            match nat_manager.establish_direct_connection(peer_id).await {
1237                Ok(_) => {
1238                    attempt.succeeded = true;
1239                    info!(
1240                        "Successfully upgraded connection to {:?} from {:?} to Direct",
1241                        peer_id, current_type
1242                    );
1243                    Ok(ConnectionType::Direct)
1244                }
1245                Err(e) => {
1246                    warn!("Failed to upgrade connection to {:?}: {}", peer_id, e);
1247                    Err(e)
1248                }
1249            }
1250        } else {
1251            Err(NatTraversalError::UpgradeError(
1252                "NAT manager not available".to_string(),
1253            ))
1254        }
1255    }
1256}
1257
1258impl NatTraversalManager {
1259    /// Create a new NAT traversal manager
1260    pub fn new(config: NatTraversalConfig, connection_manager: Arc<ConnectionManager>) -> Self {
1261        let stats = Arc::new(NatTraversalStats::default());
1262
1263        Self {
1264            config: config.clone(),
1265            nat_info: Arc::new(RwLock::new(None)),
1266            connection_manager,
1267            stun_client: Arc::new(StunClient::new(config.stun_servers.clone())),
1268            turn_client: Arc::new(TurnClient::new(
1269                config.turn_servers.clone(),
1270                config.max_relay_connections,
1271            )),
1272            upnp_manager: Arc::new(UpnpManager::new(config.port_mapping_lifetime)),
1273            nat_pmp_client: Arc::new(NatPmpClient::new()),
1274            hole_punch_coordinator: Arc::new(HolePunchCoordinator::new(config.hole_punch_timeout)),
1275            relay_manager: Arc::new(RelayManager::new(config.max_relay_connections)),
1276            upgrade_manager: Arc::new(ConnectionUpgradeManager::new(config.upgrade_interval)),
1277            port_mappings: Arc::new(DashMap::new()),
1278            detection_handle: Arc::new(Mutex::new(None)),
1279            stats,
1280        }
1281    }
1282
1283    /// Initialize NAT traversal
1284    pub async fn initialize(&self) -> Result<(), NatTraversalError> {
1285        info!("Initializing NAT traversal manager");
1286
1287        // Start NAT detection
1288        if self.config.enable_stun {
1289            self.start_nat_detection().await?;
1290        }
1291
1292        // Discover gateways
1293        if self.config.enable_upnp {
1294            if let Err(e) = self.upnp_manager.discover_gateway().await {
1295                warn!("UPnP gateway discovery failed: {}", e);
1296            }
1297        }
1298
1299        if self.config.enable_nat_pmp {
1300            if let Err(e) = self.nat_pmp_client.discover_gateway().await {
1301                warn!("NAT-PMP gateway discovery failed: {}", e);
1302            }
1303        }
1304
1305        // Start periodic tasks
1306        self.start_periodic_tasks().await;
1307
1308        Ok(())
1309    }
1310
1311    /// Start NAT detection
1312    async fn start_nat_detection(&self) -> Result<(), NatTraversalError> {
1313        match self.stun_client.detect_nat().await {
1314            Ok(nat_info) => {
1315                info!("NAT detected: {:?}", nat_info.nat_type);
1316                *self.nat_info.write() = Some(nat_info);
1317                self.stats.stun_success.fetch_add(1, Ordering::Relaxed);
1318                Ok(())
1319            }
1320            Err(e) => {
1321                error!("NAT detection failed: {}", e);
1322                self.stats.stun_failures.fetch_add(1, Ordering::Relaxed);
1323                Err(e)
1324            }
1325        }
1326    }
1327
1328    /// Start periodic maintenance tasks
1329    async fn start_periodic_tasks(&self) {
1330        let nat_info = Arc::clone(&self.nat_info);
1331        let stun_client = Arc::clone(&self.stun_client);
1332        let stats = Arc::clone(&self.stats);
1333        let detection_interval = self.config.detection_interval;
1334
1335        // NAT detection refresh task
1336        let detection_task = tokio::spawn(async move {
1337            let mut interval = interval(detection_interval);
1338            loop {
1339                interval.tick().await;
1340
1341                match stun_client.detect_nat().await {
1342                    Ok(new_info) => {
1343                        *nat_info.write() = Some(new_info);
1344                        stats.stun_success.fetch_add(1, Ordering::Relaxed);
1345                    }
1346                    Err(e) => {
1347                        warn!("Periodic NAT detection failed: {}", e);
1348                        stats.stun_failures.fetch_add(1, Ordering::Relaxed);
1349                    }
1350                }
1351            }
1352        });
1353
1354        *self.detection_handle.lock().await = Some(detection_task);
1355    }
1356
1357    /// Get current NAT information
1358    pub fn get_nat_info(&self) -> Option<NatInfo> {
1359        self.nat_info.read().clone()
1360    }
1361
1362    /// Create port mapping
1363    pub async fn create_port_mapping(
1364        &self,
1365        local_port: u16,
1366        external_port: u16,
1367        protocol: PortMappingProtocol,
1368    ) -> Result<PortMapping, NatTraversalError> {
1369        // Try UPnP first
1370        if self.config.enable_upnp {
1371            match self
1372                .upnp_manager
1373                .create_mapping(
1374                    local_port,
1375                    external_port,
1376                    protocol,
1377                    "QuDAG P2P",
1378                    self.config.port_mapping_lifetime,
1379                )
1380                .await
1381            {
1382                Ok(mapping) => {
1383                    let port_mapping = PortMapping {
1384                        local_port,
1385                        external_port: mapping.external_port,
1386                        protocol,
1387                        method: PortMappingMethod::Upnp,
1388                        created_at: Instant::now(),
1389                        expires_at: Instant::now() + mapping.lease_duration,
1390                    };
1391
1392                    self.port_mappings.insert(local_port, port_mapping.clone());
1393                    self.stats
1394                        .port_mappings_created
1395                        .fetch_add(1, Ordering::Relaxed);
1396                    return Ok(port_mapping);
1397                }
1398                Err(e) => {
1399                    warn!("UPnP port mapping failed: {}", e);
1400                }
1401            }
1402        }
1403
1404        // Try NAT-PMP
1405        if self.config.enable_nat_pmp {
1406            let is_tcp = matches!(protocol, PortMappingProtocol::TCP);
1407            match self
1408                .nat_pmp_client
1409                .create_mapping(
1410                    local_port,
1411                    external_port,
1412                    is_tcp,
1413                    self.config.port_mapping_lifetime,
1414                )
1415                .await
1416            {
1417                Ok(mapping) => {
1418                    let port_mapping = PortMapping {
1419                        local_port,
1420                        external_port: mapping.external_port,
1421                        protocol,
1422                        method: PortMappingMethod::NatPmp,
1423                        created_at: Instant::now(),
1424                        expires_at: Instant::now() + mapping.lifetime,
1425                    };
1426
1427                    self.port_mappings.insert(local_port, port_mapping.clone());
1428                    self.stats
1429                        .port_mappings_created
1430                        .fetch_add(1, Ordering::Relaxed);
1431                    return Ok(port_mapping);
1432                }
1433                Err(e) => {
1434                    warn!("NAT-PMP port mapping failed: {}", e);
1435                }
1436            }
1437        }
1438
1439        self.stats
1440            .port_mappings_failed
1441            .fetch_add(1, Ordering::Relaxed);
1442        Err(NatTraversalError::UpnpError(
1443            "All port mapping methods failed".to_string(),
1444        ))
1445    }
1446
1447    /// Establish connection to a peer with NAT traversal
1448    pub async fn connect_peer(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1449        // Try direct connection first
1450        match self.connection_manager.connect(peer_id).await {
1451            Ok(()) => return Ok(()),
1452            Err(e) => {
1453                debug!("Direct connection failed: {}, trying NAT traversal", e);
1454            }
1455        }
1456
1457        // Try hole punching if enabled
1458        if self.config.enable_hole_punching {
1459            match self.try_hole_punch(peer_id).await {
1460                Ok(()) => return Ok(()),
1461                Err(e) => {
1462                    debug!("Hole punching failed: {}", e);
1463                    self.stats
1464                        .hole_punch_failures
1465                        .fetch_add(1, Ordering::Relaxed);
1466                }
1467            }
1468        }
1469
1470        // Fall back to relay if enabled
1471        if self.config.enable_relay {
1472            match self.establish_relay_connection(peer_id).await {
1473                Ok(()) => return Ok(()),
1474                Err(e) => {
1475                    error!("Relay connection failed: {}", e);
1476                }
1477            }
1478        }
1479
1480        Err(NatTraversalError::ConnectionError(
1481            NetworkError::ConnectionError("All connection methods failed".to_string()),
1482        ))
1483    }
1484
1485    /// Try hole punching to establish direct connection
1486    async fn try_hole_punch(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1487        // Get local candidates
1488        let local_candidates = self.gather_local_candidates().await?;
1489
1490        // Exchange candidates with peer (through signaling)
1491        let remote_candidates = self.exchange_candidates(peer_id, &local_candidates).await?;
1492
1493        // Start hole punching
1494        match self
1495            .hole_punch_coordinator
1496            .start_hole_punch(peer_id, local_candidates, remote_candidates)
1497            .await
1498        {
1499            Ok(addr) => {
1500                info!("Hole punch successful, connected via {}", addr);
1501                self.stats
1502                    .hole_punch_success
1503                    .fetch_add(1, Ordering::Relaxed);
1504
1505                // Update connection in connection manager
1506                self.connection_manager
1507                    .update_status(peer_id, ConnectionStatus::Connected);
1508                Ok(())
1509            }
1510            Err(e) => Err(e),
1511        }
1512    }
1513
1514    /// Gather local candidate addresses
1515    async fn gather_local_candidates(&self) -> Result<Vec<SocketAddr>, NatTraversalError> {
1516        let mut candidates = Vec::new();
1517
1518        // Add public address from NAT info
1519        if let Some(nat_info) = self.nat_info.read().as_ref() {
1520            if let (Some(ip), Some(port)) = (nat_info.public_ip, nat_info.public_port) {
1521                candidates.push(SocketAddr::new(ip, port));
1522            }
1523        }
1524
1525        // Add local addresses
1526        // TODO: Enumerate local network interfaces
1527
1528        // Add mapped ports
1529        for mapping in self.port_mappings.iter() {
1530            if let Some(public_ip) = self.get_public_ip() {
1531                candidates.push(SocketAddr::new(public_ip, mapping.external_port));
1532            }
1533        }
1534
1535        Ok(candidates)
1536    }
1537
1538    /// Exchange candidates with peer (placeholder - needs signaling)
1539    async fn exchange_candidates(
1540        &self,
1541        _peer_id: PeerId,
1542        _local_candidates: &[SocketAddr],
1543    ) -> Result<Vec<SocketAddr>, NatTraversalError> {
1544        // TODO: Implement actual candidate exchange through signaling
1545        // For now, return empty list
1546        Ok(Vec::new())
1547    }
1548
1549    /// Establish relay connection
1550    async fn establish_relay_connection(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1551        // Try TURN relay first
1552        if self.config.enable_turn {
1553            match self.turn_client.allocate_relay().await {
1554                Ok(allocation) => {
1555                    info!("TURN relay allocated: {}", allocation.relay_address);
1556                    // TODO: Use TURN relay for connection
1557                    return Ok(());
1558                }
1559                Err(e) => {
1560                    warn!("TURN allocation failed: {}", e);
1561                }
1562            }
1563        }
1564
1565        // Use custom relay
1566        match self.relay_manager.establish_relay(peer_id).await {
1567            Ok(connection) => {
1568                info!(
1569                    "Relay connection established via {:?}",
1570                    connection.relay_server
1571                );
1572                self.stats.relay_connections.fetch_add(1, Ordering::Relaxed);
1573
1574                // Update connection status
1575                self.connection_manager
1576                    .update_status(peer_id, ConnectionStatus::Connected);
1577
1578                // Schedule upgrade attempt
1579                self.schedule_connection_upgrade(peer_id, ConnectionType::Relay);
1580
1581                Ok(())
1582            }
1583            Err(e) => Err(e),
1584        }
1585    }
1586
1587    /// Schedule connection upgrade attempt
1588    fn schedule_connection_upgrade(&self, peer_id: PeerId, current_type: ConnectionType) {
1589        let upgrade_manager = Arc::clone(&self.upgrade_manager);
1590        let stats = Arc::clone(&self.stats);
1591
1592        tokio::spawn(async move {
1593            // Wait before attempting upgrade
1594            sleep(Duration::from_secs(30)).await;
1595
1596            match upgrade_manager.try_upgrade(peer_id, current_type).await {
1597                Ok(ConnectionType::Direct) => {
1598                    stats.upgraded_connections.fetch_add(1, Ordering::Relaxed);
1599                    stats.relay_connections.fetch_sub(1, Ordering::Relaxed);
1600                }
1601                Ok(_) => {}
1602                Err(e) => {
1603                    debug!("Connection upgrade failed: {}", e);
1604                }
1605            }
1606        });
1607    }
1608
1609    /// Establish direct connection (for connection upgrade)
1610    async fn establish_direct_connection(&self, peer_id: PeerId) -> Result<(), NatTraversalError> {
1611        // Try hole punching
1612        self.try_hole_punch(peer_id).await
1613    }
1614
1615    /// Get public IP address
1616    fn get_public_ip(&self) -> Option<IpAddr> {
1617        self.nat_info.read().as_ref()?.public_ip
1618    }
1619
1620    /// Get NAT traversal statistics
1621    pub fn get_stats(&self) -> NatTraversalStats {
1622        NatTraversalStats {
1623            total_attempts: AtomicU64::new(self.stats.total_attempts.load(Ordering::Relaxed)),
1624            successful_traversals: AtomicU64::new(
1625                self.stats.successful_traversals.load(Ordering::Relaxed),
1626            ),
1627            failed_traversals: AtomicU64::new(self.stats.failed_traversals.load(Ordering::Relaxed)),
1628            stun_success: AtomicU64::new(self.stats.stun_success.load(Ordering::Relaxed)),
1629            stun_failures: AtomicU64::new(self.stats.stun_failures.load(Ordering::Relaxed)),
1630            hole_punch_success: AtomicU64::new(
1631                self.stats.hole_punch_success.load(Ordering::Relaxed),
1632            ),
1633            hole_punch_failures: AtomicU64::new(
1634                self.stats.hole_punch_failures.load(Ordering::Relaxed),
1635            ),
1636            relay_connections: AtomicU32::new(self.stats.relay_connections.load(Ordering::Relaxed)),
1637            upgraded_connections: AtomicU64::new(
1638                self.stats.upgraded_connections.load(Ordering::Relaxed),
1639            ),
1640            port_mappings_created: AtomicU64::new(
1641                self.stats.port_mappings_created.load(Ordering::Relaxed),
1642            ),
1643            port_mappings_failed: AtomicU64::new(
1644                self.stats.port_mappings_failed.load(Ordering::Relaxed),
1645            ),
1646            avg_traversal_time_ms: AtomicU64::new(
1647                self.stats.avg_traversal_time_ms.load(Ordering::Relaxed),
1648            ),
1649        }
1650    }
1651
1652    /// Shutdown NAT traversal manager
1653    pub async fn shutdown(&self) -> Result<(), NatTraversalError> {
1654        info!("Shutting down NAT traversal manager");
1655
1656        // Cancel detection task
1657        if let Some(handle) = self.detection_handle.lock().await.take() {
1658            handle.abort();
1659        }
1660
1661        // Close all relay connections
1662        let relay_peers: Vec<_> = self
1663            .relay_manager
1664            .relay_connections
1665            .iter()
1666            .map(|entry| *entry.key())
1667            .collect();
1668
1669        for peer_id in relay_peers {
1670            self.relay_manager.close_relay(&peer_id).await;
1671        }
1672
1673        // Remove port mappings
1674        // TODO: Implement port mapping cleanup
1675
1676        Ok(())
1677    }
1678}
1679
1680#[cfg(test)]
1681mod tests {
1682    use super::*;
1683
1684    #[tokio::test]
1685    async fn test_nat_detection() {
1686        let servers = vec![StunServer::new("8.8.8.8:3478".parse().unwrap(), 1)];
1687
1688        let client = StunClient::new(servers);
1689
1690        // This test will fail without real STUN servers
1691        // It's here to show the structure
1692        match client.detect_nat().await {
1693            Ok(nat_info) => {
1694                println!("NAT type: {:?}", nat_info.nat_type);
1695                println!("Public IP: {:?}", nat_info.public_ip);
1696            }
1697            Err(e) => {
1698                println!("NAT detection failed: {}", e);
1699            }
1700        }
1701    }
1702
1703    #[test]
1704    fn test_nat_type_properties() {
1705        assert_eq!(NatType::None, NatType::None);
1706        assert_ne!(NatType::FullCone, NatType::Symmetric);
1707    }
1708}