ant_quic/connection/
nat_traversal.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
4    time::Duration,
5};
6
7use crate::shared::ConnectionId;
8use tracing::{debug, info, trace, warn};
9
10use crate::{Instant, VarInt};
11
12/// NAT traversal state for a QUIC connection
13///
14/// This manages address candidate discovery, validation, and coordination
15/// for establishing direct P2P connections through NATs.
16#[derive(Debug)]
17pub(super) struct NatTraversalState {
18    /// Our role in NAT traversal (from transport parameters)
19    pub(super) role: NatTraversalRole,
20    /// Candidate addresses we've advertised to the peer
21    pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
22    /// Candidate addresses received from the peer
23    pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
24    /// Generated candidate pairs for connectivity testing
25    pub(super) candidate_pairs: Vec<CandidatePair>,
26    /// Index for fast pair lookup by remote address (maintained during generation)
27    pub(super) pair_index: HashMap<SocketAddr, usize>,
28    /// Currently active path validation attempts
29    pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
30    /// Coordination state for simultaneous hole punching
31    pub(super) coordination: Option<CoordinationState>,
32    /// Sequence number for address advertisements
33    pub(super) next_sequence: VarInt,
34    /// Maximum candidates we're willing to handle
35    pub(super) max_candidates: u32,
36    /// Timeout for coordination rounds
37    pub(super) coordination_timeout: Duration,
38    /// Statistics for this NAT traversal session
39    pub(super) stats: NatTraversalStats,
40    /// Security validation state
41    pub(super) security_state: SecurityValidationState,
42    /// Network condition monitoring for adaptive timeouts
43    pub(super) network_monitor: NetworkConditionMonitor,
44    /// Resource management and cleanup coordinator
45    pub(super) resource_manager: ResourceCleanupCoordinator,
46    /// Bootstrap coordinator (only for Bootstrap role)
47    pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
48}
49/// Role in NAT traversal coordination
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum NatTraversalRole {
52    /// Client endpoint (initiates connections, on-demand)
53    Client,
54    /// Server endpoint (accepts connections, always reachable)
55    Server {
56        /// Whether this server can relay traffic for other peers
57        can_relay: bool,
58    },
59    /// Bootstrap/relay endpoint (publicly reachable, coordinates traversal)
60    Bootstrap,
61}
62/// Address candidate with metadata
63#[derive(Debug, Clone)]
64pub(super) struct AddressCandidate {
65    /// The socket address
66    pub(super) address: SocketAddr,
67    /// Priority for ICE-like selection (higher = better)
68    pub(super) priority: u32,
69    /// How this candidate was discovered
70    pub(super) source: CandidateSource,
71    /// When this candidate was first learned
72    pub(super) discovered_at: Instant,
73    /// Current state of this candidate
74    pub(super) state: CandidateState,
75    /// Number of validation attempts for this candidate
76    pub(super) attempt_count: u32,
77    /// Last validation attempt time
78    pub(super) last_attempt: Option<Instant>,
79}
80/// How an address candidate was discovered
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum CandidateSource {
83    /// Local network interface
84    Local,
85    /// Observed by a bootstrap node
86    Observed { by_node: Option<VarInt> },
87    /// Received from peer via AddAddress frame
88    Peer,
89    /// Generated prediction for symmetric NAT
90    Predicted,
91}
92/// Current state of a candidate address
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum CandidateState {
95    /// Newly discovered, not yet tested
96    New,
97    /// Currently being validated
98    Validating,
99    /// Successfully validated and usable
100    Valid,
101    /// Validation failed
102    Failed,
103    /// Removed by peer or expired
104    Removed,
105}
106/// State of an individual path validation attempt
107#[derive(Debug)]
108pub(super) struct PathValidationState {
109    /// Challenge value sent
110    pub(super) challenge: u64,
111    /// When the challenge was sent
112    pub(super) sent_at: Instant,
113    /// Number of retransmissions
114    pub(super) retry_count: u32,
115    /// Maximum retries allowed
116    pub(super) max_retries: u32,
117    /// Associated with a coordination round (if any)
118    pub(super) coordination_round: Option<VarInt>,
119    /// Adaptive timeout state
120    pub(super) timeout_state: AdaptiveTimeoutState,
121    /// Last retry attempt time
122    pub(super) last_retry_at: Option<Instant>,
123}
124/// Coordination state for simultaneous hole punching
125#[derive(Debug)]
126pub(super) struct CoordinationState {
127    /// Current coordination round number
128    pub(super) round: VarInt,
129    /// Addresses we're punching to in this round
130    pub(super) punch_targets: Vec<PunchTarget>,
131    /// When this round started (coordination phase)
132    pub(super) round_start: Instant,
133    /// When hole punching should begin (synchronized time)
134    pub(super) punch_start: Instant,
135    /// Duration of this coordination round
136    pub(super) round_duration: Duration,
137    /// Current state of this coordination round
138    pub(super) state: CoordinationPhase,
139    /// Whether we've sent our PUNCH_ME_NOW to coordinator
140    pub(super) punch_request_sent: bool,
141    /// Whether we've received peer's PUNCH_ME_NOW via coordinator
142    pub(super) peer_punch_received: bool,
143    /// Retry count for this round
144    pub(super) retry_count: u32,
145    /// Maximum retries before giving up
146    pub(super) max_retries: u32,
147    /// Adaptive timeout state for coordination
148    pub(super) timeout_state: AdaptiveTimeoutState,
149    /// Last retry attempt time
150    pub(super) last_retry_at: Option<Instant>,
151}
152/// Phases of the coordination protocol
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub(crate) enum CoordinationPhase {
155    /// Waiting to start coordination
156    Idle,
157    /// Sending PUNCH_ME_NOW to coordinator
158    Requesting,
159    /// Waiting for peer's PUNCH_ME_NOW via coordinator  
160    Coordinating,
161    /// Grace period before synchronized hole punching
162    Preparing,
163    /// Actively sending PATH_CHALLENGE packets
164    Punching,
165    /// Waiting for PATH_RESPONSE validation
166    Validating,
167    /// This round completed successfully
168    Succeeded,
169    /// This round failed, may retry
170    Failed,
171}
172/// Target for hole punching in a coordination round
173#[derive(Debug, Clone)]
174pub(super) struct PunchTarget {
175    /// Remote address to punch to
176    pub(super) remote_addr: SocketAddr,
177    /// Sequence number of the remote candidate
178    pub(super) remote_sequence: VarInt,
179    /// Challenge value for validation
180    pub(super) challenge: u64,
181}
182/// Actions to take when handling NAT traversal timeouts
183#[derive(Debug, Clone, PartialEq, Eq)]
184pub(super) enum TimeoutAction {
185    /// Retry candidate discovery
186    RetryDiscovery,
187    /// Retry coordination with bootstrap node
188    RetryCoordination,
189    /// Start path validation for discovered candidates
190    StartValidation,
191    /// NAT traversal completed successfully
192    Complete,
193    /// NAT traversal failed
194    Failed,
195}
196
197/// Candidate pair for ICE-like connectivity testing
198#[derive(Debug, Clone)]
199pub(super) struct CandidatePair {
200    /// Sequence of remote candidate  
201    pub(super) remote_sequence: VarInt,
202    /// Our local address for this pair
203    pub(super) local_addr: SocketAddr,
204    /// Remote address we're testing connectivity to
205    pub(super) remote_addr: SocketAddr,
206    /// Combined priority for pair ordering (higher = better)
207    pub(super) priority: u64,
208    /// Current state of this pair
209    pub(super) state: PairState,
210    /// Type classification for this pair
211    pub(super) pair_type: PairType,
212    /// When this pair was created
213    pub(super) created_at: Instant,
214    /// When validation was last attempted
215    pub(super) last_check: Option<Instant>,
216}
217/// State of a candidate pair during validation
218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub(super) enum PairState {
220    /// Waiting to be tested
221    Waiting,
222    /// Validation succeeded - this pair works
223    Succeeded,
224    /// Validation failed
225    Failed,
226    /// Temporarily frozen (waiting for other pairs)
227    Frozen,
228}
229/// Type classification for candidate pairs (based on ICE)
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231pub(super) enum PairType {
232    /// Both candidates are on local network
233    HostToHost,
234    /// Local is host, remote is server reflexive (through NAT)
235    HostToServerReflexive,
236    /// Local is server reflexive, remote is host
237    ServerReflexiveToHost,
238    /// Both are server reflexive (both behind NAT)
239    ServerReflexiveToServerReflexive,
240    /// One side is peer reflexive (learned from peer)
241    PeerReflexive,
242}
243/// Type of address candidate (following ICE terminology)
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245pub(super) enum CandidateType {
246    /// Host candidate - directly reachable local interface
247    Host,
248    /// Server reflexive - public address observed by bootstrap node
249    ServerReflexive,
250    /// Peer reflexive - address learned from incoming packets
251    PeerReflexive,
252}
253/// Calculate ICE-like priority for an address candidate
254/// Based on RFC 8445 Section 5.1.2.1
255fn calculate_candidate_priority(
256    candidate_type: CandidateType,
257    local_preference: u16,
258    component_id: u8,
259) -> u32 {
260    let type_preference = match candidate_type {
261        CandidateType::Host => 126,
262        CandidateType::PeerReflexive => 110,
263        CandidateType::ServerReflexive => 100,
264    };
265    // ICE priority formula: (2^24 * type_pref) + (2^8 * local_pref) + component_id
266    (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
267}
268
269/// Calculate combined priority for a candidate pair
270/// Based on RFC 8445 Section 6.1.2.3  
271fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
272    let g = local_priority as u64;
273    let d = remote_priority as u64;
274    // ICE pair priority formula: 2^32 * MIN(G,D) + 2 * MAX(G,D) + (G>D ? 1 : 0)
275    (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
276}
277
278/// Determine candidate type from source information
279fn classify_candidate_type(source: CandidateSource) -> CandidateType {
280    match source {
281        CandidateSource::Local => CandidateType::Host,
282        CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
283        CandidateSource::Peer => CandidateType::PeerReflexive,
284        CandidateSource::Predicted => CandidateType::ServerReflexive, // Symmetric NAT prediction
285    }
286}
287/// Determine pair type from individual candidate types
288fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
289    match (local_type, remote_type) {
290        (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
291        (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
292        (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
293        (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
294            PairType::ServerReflexiveToServerReflexive
295        }
296        (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
297            PairType::PeerReflexive
298        }
299    }
300}
301/// Check if two candidates are compatible for pairing
302fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
303    // Must be same address family (IPv4 with IPv4, IPv6 with IPv6)
304    match (local.address, remote.address) {
305        (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
306        (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
307        _ => false, // No IPv4/IPv6 mixing for now
308    }
309}
310/// Statistics for NAT traversal attempts
311#[derive(Debug, Default, Clone)]
312pub(crate) struct NatTraversalStats {
313    /// Total candidates received from peer
314    pub(super) remote_candidates_received: u32,
315    /// Total candidates we've advertised
316    pub(super) local_candidates_sent: u32,
317    /// Successful validations
318    pub(super) validations_succeeded: u32,
319    /// Failed validations
320    pub(super) validations_failed: u32,
321    /// Coordination rounds attempted
322    pub(super) coordination_rounds: u32,
323    /// Successful coordinations
324    pub(super) successful_coordinations: u32,
325    /// Failed coordinations
326    pub(super) failed_coordinations: u32,
327    /// Timed out coordinations
328    pub(super) timed_out_coordinations: u32,
329    /// Coordination failures due to poor network conditions
330    pub(super) coordination_failures: u32,
331    /// Successful direct connections established
332    pub(super) direct_connections: u32,
333    /// Security validation rejections
334    pub(super) security_rejections: u32,
335    /// Rate limiting violations
336    pub(super) rate_limit_violations: u32,
337    /// Invalid address rejections
338    pub(super) invalid_address_rejections: u32,
339    /// Suspicious coordination attempts
340    pub(super) suspicious_coordination_attempts: u32,
341}
342/// Security validation state for rate limiting and attack detection
343#[derive(Debug)]
344pub(super) struct SecurityValidationState {
345    /// Rate limiting: track candidate additions per time window
346    candidate_rate_tracker: VecDeque<Instant>,
347    /// Maximum candidates per time window
348    max_candidates_per_window: u32,
349    /// Rate limiting time window
350    rate_window: Duration,
351    /// Coordination request tracking for suspicious patterns
352    coordination_requests: VecDeque<CoordinationRequest>,
353    /// Maximum coordination requests per time window
354    max_coordination_per_window: u32,
355    /// Address validation cache to avoid repeated validation
356    address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
357    /// Cache timeout for address validation
358    validation_cache_timeout: Duration,
359}
360/// Coordination request tracking for security analysis
361#[derive(Debug, Clone)]
362struct CoordinationRequest {
363    /// When the request was made
364    timestamp: Instant,
365}
366/// Result of address validation
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368enum AddressValidationResult {
369    /// Address is valid and safe
370    Valid,
371    /// Address is invalid (malformed, reserved, etc.)
372    Invalid,
373    /// Address is suspicious (potential attack)
374    Suspicious,
375}
376/// Adaptive timeout state for network condition awareness
377#[derive(Debug, Clone)]
378pub(super) struct AdaptiveTimeoutState {
379    /// Current timeout value
380    current_timeout: Duration,
381    /// Minimum allowed timeout
382    min_timeout: Duration,
383    /// Maximum allowed timeout
384    max_timeout: Duration,
385    /// Base timeout for exponential backoff
386    base_timeout: Duration,
387    /// Current backoff multiplier
388    backoff_multiplier: f64,
389    /// Maximum backoff multiplier
390    max_backoff_multiplier: f64,
391    /// Jitter factor for randomization
392    jitter_factor: f64,
393    /// Smoothed round-trip time estimation
394    srtt: Option<Duration>,
395    /// Round-trip time variance
396    rttvar: Option<Duration>,
397    /// Last successful round-trip time
398    last_rtt: Option<Duration>,
399    /// Number of consecutive timeouts
400    consecutive_timeouts: u32,
401    /// Number of successful responses
402    successful_responses: u32,
403}
404/// Network condition monitoring for adaptive behavior
405#[derive(Debug)]
406pub(super) struct NetworkConditionMonitor {
407    /// Recent round-trip time measurements
408    rtt_samples: VecDeque<Duration>,
409    /// Maximum samples to keep
410    max_samples: usize,
411    /// Packet loss rate estimation
412    packet_loss_rate: f64,
413    /// Congestion window estimate
414    congestion_window: u32,
415    /// Network quality score (0.0 = poor, 1.0 = excellent)
416    quality_score: f64,
417    /// Last quality update time
418    last_quality_update: Instant,
419    /// Quality measurement interval
420    quality_update_interval: Duration,
421    /// Timeout statistics
422    timeout_stats: TimeoutStatistics,
423}
424/// Statistics for timeout behavior
425#[derive(Debug, Default)]
426struct TimeoutStatistics {
427    /// Total timeout events
428    total_timeouts: u64,
429    /// Total successful responses
430    total_responses: u64,
431    /// Average response time
432    avg_response_time: Duration,
433    /// Timeout rate (0.0 = no timeouts, 1.0 = all timeouts)
434    timeout_rate: f64,
435    /// Last update time
436    last_update: Option<Instant>,
437}
438impl SecurityValidationState {
439    /// Create new security validation state with default settings
440    fn new() -> Self {
441        Self {
442            candidate_rate_tracker: VecDeque::new(),
443            max_candidates_per_window: 20, // Max 20 candidates per 60 seconds
444            rate_window: Duration::from_secs(60),
445            coordination_requests: VecDeque::new(),
446            max_coordination_per_window: 5, // Max 5 coordination requests per 60 seconds
447            address_validation_cache: HashMap::new(),
448            validation_cache_timeout: Duration::from_secs(300), // 5 minute cache
449        }
450    }
451    /// Create new security validation state with custom rate limits
452    fn new_with_limits(
453        max_candidates_per_window: u32,
454        max_coordination_per_window: u32,
455        rate_window: Duration,
456    ) -> Self {
457        Self {
458            candidate_rate_tracker: VecDeque::new(),
459            max_candidates_per_window,
460            rate_window,
461            coordination_requests: VecDeque::new(),
462            max_coordination_per_window,
463            address_validation_cache: HashMap::new(),
464            validation_cache_timeout: Duration::from_secs(300),
465        }
466    }
467    /// Enhanced rate limiting with adaptive thresholds
468    ///
469    /// This implements adaptive rate limiting that adjusts based on network conditions
470    /// and detected attack patterns to prevent flooding while maintaining usability.
471    fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
472        // Clean up old entries first
473        self.cleanup_rate_tracker(now);
474        self.cleanup_coordination_tracker(now);
475        // Calculate current request rate
476        let _current_candidate_rate =
477            self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
478        let _current_coordination_rate =
479            self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
480
481        // Adaptive threshold based on peer behavior
482        let peer_reputation = self.calculate_peer_reputation(peer_id);
483        let adaptive_candidate_limit =
484            (self.max_candidates_per_window as f64 * peer_reputation) as u32;
485        let adaptive_coordination_limit =
486            (self.max_coordination_per_window as f64 * peer_reputation) as u32;
487
488        // Check if either limit is exceeded
489        if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
490            debug!(
491                "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
492                hex::encode(&peer_id[..8]),
493                self.candidate_rate_tracker.len(),
494                adaptive_candidate_limit
495            );
496            return true;
497        }
498
499        if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
500            debug!(
501                "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
502                hex::encode(&peer_id[..8]),
503                self.coordination_requests.len(),
504                adaptive_coordination_limit
505            );
506            return true;
507        }
508
509        false
510    }
511
512    /// Calculate peer reputation score (0.0 = bad, 1.0 = good)
513    ///
514    /// This implements a simple reputation system to adjust rate limits
515    /// based on peer behavior patterns.
516    fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
517        // Simplified reputation calculation
518        // In production, this would track:
519        // - Historical success rates
520        // - Suspicious behavior patterns
521        // - Coordination completion rates
522        // - Address validation failures
523        // For now, return a default good reputation
524        // This can be enhanced with persistent peer reputation storage
525        1.0
526    }
527
528    /// Implement amplification attack mitigation
529    ///
530    /// This prevents the bootstrap node from being used as an amplifier
531    /// in DDoS attacks by limiting server-initiated validation packets.
532    fn validate_amplification_limits(
533        &mut self,
534        source_addr: SocketAddr,
535        target_addr: SocketAddr,
536        now: Instant,
537    ) -> Result<(), NatTraversalError> {
538        // Check if we're being asked to send too many packets to the same target
539        let amplification_key = (source_addr, target_addr);
540        // Simple amplification protection: limit packets per source-target pair
541        // In production, this would be more sophisticated with:
542        // - Bandwidth tracking
543        // - Packet size ratios
544        // - Geographic analysis
545        // - Temporal pattern analysis
546
547        // For now, implement basic per-pair rate limiting
548        if self.is_amplification_suspicious(amplification_key, now) {
549            warn!(
550                "Potential amplification attack detected: {} -> {}",
551                source_addr, target_addr
552            );
553            return Err(NatTraversalError::SuspiciousCoordination);
554        }
555
556        Ok(())
557    }
558
559    /// Check for suspicious amplification patterns
560    fn is_amplification_suspicious(
561        &self,
562        _amplification_key: (SocketAddr, SocketAddr),
563        _now: Instant,
564    ) -> bool {
565        // Simplified amplification detection
566        // In production, this would track:
567        // - Request/response ratios
568        // - Bandwidth amplification factors
569        // - Temporal clustering of requests
570        // - Geographic distribution analysis
571        // For now, return false (no amplification detected)
572        // This can be enhanced with persistent amplification tracking
573        false
574    }
575
576    /// Generate cryptographically secure random values for coordination rounds
577    ///
578    /// This ensures that coordination rounds use secure random values to prevent
579    /// prediction attacks and ensure proper synchronization security.
580    fn generate_secure_coordination_round(&self) -> VarInt {
581        // Use cryptographically secure random number generation
582        let secure_random: u64 = rand::random();
583        // Ensure the value is within reasonable bounds for VarInt
584        let bounded_random = secure_random % 1000000; // Limit to reasonable range
585
586        VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
587    }
588
589    /// Enhanced address validation with security checks
590    ///
591    /// This performs comprehensive address validation including:
592    /// - Basic format validation
593    /// - Security threat detection
594    /// - Amplification attack prevention
595    /// - Suspicious pattern recognition
596    fn enhanced_address_validation(
597        &mut self,
598        addr: SocketAddr,
599        source_addr: SocketAddr,
600        now: Instant,
601    ) -> Result<AddressValidationResult, NatTraversalError> {
602        // First, perform basic address validation
603        let basic_result = self.validate_address(addr, now);
604        match basic_result {
605            AddressValidationResult::Invalid => {
606                return Err(NatTraversalError::InvalidAddress);
607            }
608            AddressValidationResult::Suspicious => {
609                return Err(NatTraversalError::SuspiciousCoordination);
610            }
611            AddressValidationResult::Valid => {
612                // Continue with enhanced validation
613            }
614        }
615
616        // Check for amplification attack patterns
617        self.validate_amplification_limits(source_addr, addr, now)?;
618
619        // Additional security checks
620        if self.is_address_in_suspicious_range(addr) {
621            warn!("Address in suspicious range detected: {}", addr);
622            return Err(NatTraversalError::SuspiciousCoordination);
623        }
624
625        if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
626            warn!(
627                "Suspicious coordination pattern detected: {} -> {}",
628                source_addr, addr
629            );
630            return Err(NatTraversalError::SuspiciousCoordination);
631        }
632
633        Ok(AddressValidationResult::Valid)
634    }
635
636    /// Check if address is in a suspicious range
637    fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
638        match addr.ip() {
639            IpAddr::V4(ipv4) => {
640                // Check for addresses commonly used in attacks
641                let octets = ipv4.octets();
642                // Reject certain reserved ranges that shouldn't be used for P2P
643                if octets[0] == 0 || octets[0] == 127 {
644                    return true;
645                }
646
647                // Check for test networks (RFC 5737)
648                if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
649                    return true;
650                }
651                if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
652                    return true;
653                }
654                if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
655                    return true;
656                }
657
658                false
659            }
660            IpAddr::V6(ipv6) => {
661                // Check for suspicious IPv6 ranges
662                if ipv6.is_loopback() || ipv6.is_unspecified() {
663                    return true;
664                }
665
666                // Check for documentation ranges (RFC 3849)
667                let segments = ipv6.segments();
668                if segments[0] == 0x2001 && segments[1] == 0x0db8 {
669                    return true;
670                }
671
672                false
673            }
674        }
675    }
676
677    /// Check for suspicious coordination patterns
678    fn is_coordination_pattern_suspicious(
679        &self,
680        _source_addr: SocketAddr,
681        _target_addr: SocketAddr,
682        _now: Instant,
683    ) -> bool {
684        // Simplified pattern detection
685        // In production, this would analyze:
686        // - Temporal patterns (too frequent requests)
687        // - Geographic patterns (unusual source/target combinations)
688        // - Behavioral patterns (consistent with known attack signatures)
689        // - Network topology patterns (suspicious routing)
690        // For now, return false (no suspicious patterns detected)
691        // This can be enhanced with machine learning-based pattern detection
692        false
693    }
694
695    /// Check if candidate rate limit is exceeded
696    fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
697        // Clean up old entries
698        self.cleanup_rate_tracker(now);
699        // Check if we've exceeded the rate limit
700        if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
701            return true;
702        }
703
704        // Record this attempt
705        self.candidate_rate_tracker.push_back(now);
706        false
707    }
708
709    /// Check if coordination rate limit is exceeded
710    fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
711        // Clean up old entries
712        self.cleanup_coordination_tracker(now);
713        // Check if we've exceeded the rate limit
714        if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
715            return true;
716        }
717
718        // Record this attempt
719        let request = CoordinationRequest { timestamp: now };
720        self.coordination_requests.push_back(request);
721        false
722    }
723
724    /// Clean up old rate tracking entries
725    fn cleanup_rate_tracker(&mut self, now: Instant) {
726        let cutoff = now - self.rate_window;
727        while let Some(&front_time) = self.candidate_rate_tracker.front() {
728            if front_time < cutoff {
729                self.candidate_rate_tracker.pop_front();
730            } else {
731                break;
732            }
733        }
734    }
735    /// Clean up old coordination tracking entries
736    fn cleanup_coordination_tracker(&mut self, now: Instant) {
737        let cutoff = now - self.rate_window;
738        while let Some(front_request) = self.coordination_requests.front() {
739            if front_request.timestamp < cutoff {
740                self.coordination_requests.pop_front();
741            } else {
742                break;
743            }
744        }
745    }
746    /// Validate an address for security concerns
747    fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
748        // Check cache first
749        if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
750            return cached_result;
751        }
752        let result = self.perform_address_validation(addr);
753
754        // Cache the result
755        self.address_validation_cache.insert(addr, result);
756
757        // Clean up old cache entries periodically
758        if self.address_validation_cache.len() > 1000 {
759            self.cleanup_address_cache(now);
760        }
761
762        result
763    }
764
765    /// Perform actual address validation
766    fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
767        match addr.ip() {
768            IpAddr::V4(ipv4) => {
769                // Check for invalid IPv4 addresses
770                if ipv4.is_unspecified() || ipv4.is_broadcast() {
771                    return AddressValidationResult::Invalid;
772                }
773                // Check for suspicious addresses
774                if ipv4.is_multicast() || ipv4.is_documentation() {
775                    return AddressValidationResult::Suspicious;
776                }
777
778                // Check for reserved ranges that shouldn't be used for P2P
779                if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
780                    return AddressValidationResult::Invalid;
781                }
782
783                // Check for common attack patterns
784                if self.is_suspicious_ipv4(ipv4) {
785                    return AddressValidationResult::Suspicious;
786                }
787            }
788            IpAddr::V6(ipv6) => {
789                // Check for invalid IPv6 addresses
790                if ipv6.is_unspecified() || ipv6.is_multicast() {
791                    return AddressValidationResult::Invalid;
792                }
793
794                // Check for suspicious IPv6 addresses
795                if self.is_suspicious_ipv6(ipv6) {
796                    return AddressValidationResult::Suspicious;
797                }
798            }
799        }
800
801        // Check port range
802        if addr.port() == 0 || addr.port() < 1024 {
803            return AddressValidationResult::Suspicious;
804        }
805
806        AddressValidationResult::Valid
807    }
808
809    /// Check for suspicious IPv4 patterns
810    fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
811        let octets = ipv4.octets();
812        // Check for patterns that might indicate scanning or attacks
813        // Sequential or patterned addresses
814        if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
815            return true;
816        }
817
818        // Check for addresses in ranges commonly used for attacks
819        // This is a simplified check - production would have more sophisticated patterns
820        false
821    }
822
823    /// Check for suspicious IPv6 patterns
824    fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
825        let segments = ipv6.segments();
826        // Check for obvious patterns
827        if segments.iter().all(|&s| s == segments[0]) {
828            return true;
829        }
830
831        false
832    }
833
834    /// Clean up old address validation cache entries
835    fn cleanup_address_cache(&mut self, _now: Instant) {
836        // Simple cleanup - remove random entries to keep size bounded
837        // In production, this would use LRU or timestamp-based cleanup
838        if self.address_validation_cache.len() > 500 {
839            let keys_to_remove: Vec<_> = self
840                .address_validation_cache
841                .keys()
842                .take(self.address_validation_cache.len() / 2)
843                .copied()
844                .collect();
845            for key in keys_to_remove {
846                self.address_validation_cache.remove(&key);
847            }
848        }
849    }
850
851    /// Comprehensive path validation for PUNCH_ME_NOW frames
852    ///
853    /// This performs security-critical validation to prevent various attacks:
854    /// - Address spoofing prevention
855    /// - Reflection attack mitigation
856    /// - Coordination request validation
857    /// - Rate limiting enforcement
858    fn validate_punch_me_now_frame(
859        &mut self,
860        frame: &crate::frame::PunchMeNow,
861        source_addr: SocketAddr,
862        peer_id: [u8; 32],
863        now: Instant,
864    ) -> Result<(), NatTraversalError> {
865        // 1. Rate limiting validation
866        if self.is_coordination_rate_limited(now) {
867            debug!(
868                "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
869                hex::encode(&peer_id[..8])
870            );
871            return Err(NatTraversalError::RateLimitExceeded);
872        }
873        // 2. Address validation - validate the address claimed in the frame
874        let addr_validation = self.validate_address(frame.address, now);
875        match addr_validation {
876            AddressValidationResult::Invalid => {
877                debug!(
878                    "PUNCH_ME_NOW frame rejected: invalid address {:?} from peer {:?}",
879                    frame.address,
880                    hex::encode(&peer_id[..8])
881                );
882                return Err(NatTraversalError::InvalidAddress);
883            }
884            AddressValidationResult::Suspicious => {
885                debug!(
886                    "PUNCH_ME_NOW frame rejected: suspicious address {:?} from peer {:?}",
887                    frame.address,
888                    hex::encode(&peer_id[..8])
889                );
890                return Err(NatTraversalError::SuspiciousCoordination);
891            }
892            AddressValidationResult::Valid => {
893                // Continue validation
894            }
895        }
896
897        // 3. Source address consistency validation
898        // The frame's address should reasonably relate to the actual source
899        if !self.validate_address_consistency(frame.address, source_addr) {
900            debug!(
901                "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
902                frame.address, source_addr
903            );
904            return Err(NatTraversalError::SuspiciousCoordination);
905        }
906
907        // 4. Coordination parameters validation
908        if !self.validate_coordination_parameters(frame) {
909            debug!(
910                "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
911                hex::encode(&peer_id[..8])
912            );
913            return Err(NatTraversalError::SuspiciousCoordination);
914        }
915
916        // 5. Target peer validation (if present)
917        if let Some(target_peer_id) = frame.target_peer_id {
918            if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
919                debug!(
920                    "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
921                    hex::encode(&peer_id[..8]),
922                    hex::encode(&target_peer_id[..8])
923                );
924                return Err(NatTraversalError::SuspiciousCoordination);
925            }
926        }
927
928        // 6. Resource limits validation
929        if !self.validate_resource_limits(frame) {
930            debug!(
931                "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
932                hex::encode(&peer_id[..8])
933            );
934            return Err(NatTraversalError::ResourceLimitExceeded);
935        }
936
937        debug!(
938            "PUNCH_ME_NOW frame validation passed for peer {:?}",
939            hex::encode(&peer_id[..8])
940        );
941        Ok(())
942    }
943
944    /// Validate address consistency between claimed and observed addresses
945    ///
946    /// This prevents address spoofing by ensuring the claimed local address
947    /// is reasonably consistent with the observed source address.
948    fn validate_address_consistency(
949        &self,
950        claimed_addr: SocketAddr,
951        observed_addr: SocketAddr,
952    ) -> bool {
953        // For P2P NAT traversal, the port will typically be different due to NAT,
954        // but the IP should be consistent unless there's multi-homing or proxying
955        // Check if IPs are in the same family
956        match (claimed_addr.ip(), observed_addr.ip()) {
957            (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
958                // For IPv4, allow same IP or addresses in same private range
959                if claimed_ip == observed_ip {
960                    return true;
961                }
962
963                // Allow within same private network (simplified check)
964                if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
965                    return true;
966                }
967
968                // Allow certain NAT scenarios where external IP differs
969                // This is a simplified check - production would be more sophisticated
970                !claimed_ip.is_private() && !observed_ip.is_private()
971            }
972            (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
973                // For IPv6, be more lenient due to complex addressing
974                claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
975            }
976            _ => {
977                // Mismatched IP families - suspicious
978                false
979            }
980        }
981    }
982
983    /// Check if two IPv4 addresses are in the same private network
984    fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
985        // Check common private ranges
986        let ip1_octets = ip1.octets();
987        let ip2_octets = ip2.octets();
988        // 10.0.0.0/8
989        if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
990            return true;
991        }
992
993        // 172.16.0.0/12
994        if ip1_octets[0] == 172
995            && ip2_octets[0] == 172
996            && (16..=31).contains(&ip1_octets[1])
997            && (16..=31).contains(&ip2_octets[1])
998        {
999            return true;
1000        }
1001
1002        // 192.168.0.0/16
1003        if ip1_octets[0] == 192
1004            && ip1_octets[1] == 168
1005            && ip2_octets[0] == 192
1006            && ip2_octets[1] == 168
1007        {
1008            return true;
1009        }
1010
1011        false
1012    }
1013
1014    /// Check if two IPv6 addresses are in the same prefix
1015    fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1016        // Simplified IPv6 prefix check - compare first 64 bits
1017        let segments1 = ip1.segments();
1018        let segments2 = ip2.segments();
1019        segments1[0] == segments2[0]
1020            && segments1[1] == segments2[1]
1021            && segments1[2] == segments2[2]
1022            && segments1[3] == segments2[3]
1023    }
1024
1025    /// Validate coordination parameters for security
1026    fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1027        // Check round number is reasonable (not too large to prevent overflow attacks)
1028        if frame.round.into_inner() > 1000000 {
1029            return false;
1030        }
1031        // Check target sequence is reasonable
1032        if frame.paired_with_sequence_number.into_inner() > 10000 {
1033            return false;
1034        }
1035
1036        // Validate address is not obviously invalid
1037        match frame.address.ip() {
1038            IpAddr::V4(ipv4) => {
1039                // Reject obviously invalid addresses
1040                !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1041            }
1042            IpAddr::V6(ipv6) => {
1043                // Reject obviously invalid addresses
1044                !ipv6.is_unspecified() && !ipv6.is_multicast()
1045            }
1046        }
1047    }
1048
1049    /// Validate target peer request for potential abuse
1050    fn validate_target_peer_request(
1051        &self,
1052        requesting_peer: [u8; 32],
1053        target_peer: [u8; 32],
1054        _frame: &crate::frame::PunchMeNow,
1055    ) -> bool {
1056        // Prevent self-coordination (peer requesting coordination with itself)
1057        if requesting_peer == target_peer {
1058            return false;
1059        }
1060        // Additional validation could include:
1061        // - Check if target peer is known/registered
1062        // - Validate target peer hasn't opted out of coordination
1063        // - Check for suspicious patterns in target peer selection
1064
1065        true
1066    }
1067
1068    /// Validate resource limits for the coordination request
1069    fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1070        // Check current load and resource usage
1071        // This is a simplified check - production would monitor:
1072        // - Active coordination sessions
1073        // - Memory usage
1074        // - Network bandwidth
1075        // - CPU utilization
1076        // For now, just check if we have too many active coordination requests
1077        self.coordination_requests.len() < self.max_coordination_per_window as usize
1078    }
1079}
1080
1081impl AdaptiveTimeoutState {
1082    /// Create new adaptive timeout state with default values
1083    pub(crate) fn new() -> Self {
1084        let base_timeout = Duration::from_millis(1000); // 1 second base
1085        Self {
1086            current_timeout: base_timeout,
1087            min_timeout: Duration::from_millis(100),
1088            max_timeout: Duration::from_secs(30),
1089            base_timeout,
1090            backoff_multiplier: 1.0,
1091            max_backoff_multiplier: 8.0,
1092            jitter_factor: 0.1, // 10% jitter
1093            srtt: None,
1094            rttvar: None,
1095            last_rtt: None,
1096            consecutive_timeouts: 0,
1097            successful_responses: 0,
1098        }
1099    }
1100    /// Update timeout based on successful response
1101    fn update_success(&mut self, rtt: Duration) {
1102        self.last_rtt = Some(rtt);
1103        self.successful_responses += 1;
1104        self.consecutive_timeouts = 0;
1105        // Update smoothed RTT using TCP algorithm
1106        match self.srtt {
1107            None => {
1108                self.srtt = Some(rtt);
1109                self.rttvar = Some(rtt / 2);
1110            }
1111            Some(srtt) => {
1112                let rttvar = self.rttvar.unwrap_or(rtt / 2);
1113                let abs_diff = rtt.abs_diff(srtt);
1114
1115                self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1116                self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1117            }
1118        }
1119
1120        // Reduce backoff multiplier on success
1121        self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1122
1123        // Update current timeout
1124        self.calculate_current_timeout();
1125    }
1126
1127    /// Update timeout based on timeout event
1128    fn update_timeout(&mut self) {
1129        self.consecutive_timeouts += 1;
1130        // Exponential backoff with bounds
1131        self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1132
1133        // Update current timeout
1134        self.calculate_current_timeout();
1135    }
1136
1137    /// Calculate current timeout based on conditions
1138    fn calculate_current_timeout(&mut self) {
1139        let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1140            // Use TCP-style RTO calculation: RTO = SRTT + 4 * RTTVAR
1141            srtt + rttvar * 4
1142        } else {
1143            self.base_timeout
1144        };
1145        // Apply backoff multiplier
1146        let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1147
1148        // Apply jitter to prevent thundering herd
1149        let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1150        let timeout = timeout.mul_f64(jitter);
1151
1152        // Bound the timeout
1153        self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1154    }
1155
1156    /// Get current timeout value
1157    fn get_timeout(&self) -> Duration {
1158        self.current_timeout
1159    }
1160    /// Check if retry should be attempted
1161    fn should_retry(&self, max_retries: u32) -> bool {
1162        self.consecutive_timeouts < max_retries
1163    }
1164    /// Get retry delay with exponential backoff
1165    fn get_retry_delay(&self) -> Duration {
1166        let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1167        delay.clamp(self.min_timeout, self.max_timeout)
1168    }
1169}
1170/// Resource management limits and cleanup configuration
1171#[derive(Debug)]
1172pub(super) struct ResourceManagementConfig {
1173    /// Maximum number of active validations
1174    max_active_validations: usize,
1175    /// Maximum number of local candidates
1176    max_local_candidates: usize,
1177    /// Maximum number of remote candidates
1178    max_remote_candidates: usize,
1179    /// Maximum number of candidate pairs
1180    max_candidate_pairs: usize,
1181    /// Maximum coordination rounds to keep in history
1182    max_coordination_history: usize,
1183    /// Cleanup interval for expired resources
1184    cleanup_interval: Duration,
1185    /// Timeout for stale candidates
1186    candidate_timeout: Duration,
1187    /// Timeout for path validations
1188    validation_timeout: Duration,
1189    /// Timeout for coordination rounds
1190    coordination_timeout: Duration,
1191    /// Memory pressure threshold (0.0 = no pressure, 1.0 = maximum pressure)
1192    memory_pressure_threshold: f64,
1193    /// Aggressive cleanup mode threshold
1194    aggressive_cleanup_threshold: f64,
1195}
1196/// Resource usage statistics and monitoring
1197#[derive(Debug, Default)]
1198pub(super) struct ResourceStats {
1199    /// Current number of active validations
1200    active_validations: usize,
1201    /// Current number of local candidates
1202    local_candidates: usize,
1203    /// Current number of remote candidates
1204    remote_candidates: usize,
1205    /// Current number of candidate pairs
1206    candidate_pairs: usize,
1207    /// Peak memory usage
1208    peak_memory_usage: usize,
1209    /// Number of cleanup operations performed
1210    cleanup_operations: u64,
1211    /// Number of resources cleaned up
1212    resources_cleaned: u64,
1213    /// Number of resource allocation failures
1214    allocation_failures: u64,
1215    /// Last cleanup time
1216    last_cleanup: Option<Instant>,
1217    /// Memory pressure level (0.0 = no pressure, 1.0 = maximum pressure)
1218    memory_pressure: f64,
1219}
1220/// Resource cleanup coordinator
1221#[derive(Debug)]
1222pub(super) struct ResourceCleanupCoordinator {
1223    /// Configuration for resource limits
1224    config: ResourceManagementConfig,
1225    /// Resource usage statistics
1226    stats: ResourceStats,
1227    /// Last cleanup time
1228    last_cleanup: Option<Instant>,
1229    /// Cleanup operation counter
1230    cleanup_counter: u64,
1231    /// Shutdown flag
1232    shutdown_requested: bool,
1233}
1234impl ResourceManagementConfig {
1235    /// Create new resource management configuration with production-ready defaults
1236    fn new() -> Self {
1237        Self {
1238            max_active_validations: 100,
1239            max_local_candidates: 50,
1240            max_remote_candidates: 100,
1241            max_candidate_pairs: 200,
1242            max_coordination_history: 10,
1243            cleanup_interval: Duration::from_secs(30),
1244            candidate_timeout: Duration::from_secs(300), // 5 minutes
1245            validation_timeout: Duration::from_secs(30),
1246            coordination_timeout: Duration::from_secs(60),
1247            memory_pressure_threshold: 0.75,
1248            aggressive_cleanup_threshold: 0.90,
1249        }
1250    }
1251    /// Create configuration optimized for low-memory environments
1252    #[cfg(feature = "low_memory")]
1253    fn low_memory() -> Self {
1254        Self {
1255            max_active_validations: 25,
1256            max_local_candidates: 10,
1257            max_remote_candidates: 25,
1258            max_candidate_pairs: 50,
1259            max_coordination_history: 3,
1260            cleanup_interval: Duration::from_secs(15),
1261            candidate_timeout: Duration::from_secs(180), // 3 minutes
1262            validation_timeout: Duration::from_secs(20),
1263            coordination_timeout: Duration::from_secs(30),
1264            memory_pressure_threshold: 0.60,
1265            aggressive_cleanup_threshold: 0.80,
1266        }
1267    }
1268}
1269impl ResourceCleanupCoordinator {
1270    /// Create new resource cleanup coordinator
1271    fn new() -> Self {
1272        Self {
1273            config: ResourceManagementConfig::new(),
1274            stats: ResourceStats::default(),
1275            last_cleanup: None,
1276            cleanup_counter: 0,
1277            shutdown_requested: false,
1278        }
1279    }
1280    /// Create coordinator optimized for low-memory environments
1281    #[cfg(feature = "low_memory")]
1282    fn low_memory() -> Self {
1283        Self {
1284            config: ResourceManagementConfig::low_memory(),
1285            stats: ResourceStats::default(),
1286            last_cleanup: None,
1287            cleanup_counter: 0,
1288            shutdown_requested: false,
1289        }
1290    }
1291    /// Check if resource limits are exceeded
1292    fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1293        state.active_validations.len() > self.config.max_active_validations
1294            || state.local_candidates.len() > self.config.max_local_candidates
1295            || state.remote_candidates.len() > self.config.max_remote_candidates
1296            || state.candidate_pairs.len() > self.config.max_candidate_pairs
1297    }
1298    /// Calculate current memory pressure level
1299    fn calculate_memory_pressure(
1300        &mut self,
1301        active_validations_len: usize,
1302        local_candidates_len: usize,
1303        remote_candidates_len: usize,
1304        candidate_pairs_len: usize,
1305    ) -> f64 {
1306        let total_limit = self.config.max_active_validations
1307            + self.config.max_local_candidates
1308            + self.config.max_remote_candidates
1309            + self.config.max_candidate_pairs;
1310        let current_usage = active_validations_len
1311            + local_candidates_len
1312            + remote_candidates_len
1313            + candidate_pairs_len;
1314
1315        let pressure = current_usage as f64 / total_limit as f64;
1316        self.stats.memory_pressure = pressure;
1317        pressure
1318    }
1319
1320    /// Determine if cleanup is needed
1321    fn should_cleanup(&self, now: Instant) -> bool {
1322        if self.shutdown_requested {
1323            return true;
1324        }
1325        // Check if it's time for regular cleanup
1326        if let Some(last_cleanup) = self.last_cleanup {
1327            if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1328                return true;
1329            }
1330        } else {
1331            return true; // First cleanup
1332        }
1333
1334        // Check memory pressure
1335        if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1336            return true;
1337        }
1338
1339        false
1340    }
1341
1342    /// Perform cleanup of expired resources
1343    fn cleanup_expired_resources(
1344        &mut self,
1345        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1346        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1347        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1348        candidate_pairs: &mut Vec<CandidatePair>,
1349        coordination: &mut Option<CoordinationState>,
1350        now: Instant,
1351    ) -> u64 {
1352        let mut cleaned = 0;
1353        // Clean up expired path validations
1354        cleaned += self.cleanup_expired_validations(active_validations, now);
1355
1356        // Clean up stale candidates
1357        cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1358
1359        // Clean up failed candidate pairs
1360        cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1361
1362        // Clean up old coordination state
1363        cleaned += self.cleanup_old_coordination(coordination, now);
1364
1365        // Update statistics
1366        self.stats.cleanup_operations += 1;
1367        self.stats.resources_cleaned += cleaned;
1368        self.last_cleanup = Some(now);
1369        self.cleanup_counter += 1;
1370
1371        debug!("Cleaned up {} expired resources", cleaned);
1372        cleaned
1373    }
1374
1375    /// Clean up expired path validations
1376    fn cleanup_expired_validations(
1377        &mut self,
1378        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1379        now: Instant,
1380    ) -> u64 {
1381        let mut cleaned = 0;
1382        let validation_timeout = self.config.validation_timeout;
1383        active_validations.retain(|_addr, validation| {
1384            let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1385            if is_expired {
1386                cleaned += 1;
1387                trace!("Cleaned up expired validation for {:?}", _addr);
1388            }
1389            !is_expired
1390        });
1391
1392        cleaned
1393    }
1394
1395    /// Clean up stale candidates
1396    fn cleanup_stale_candidates(
1397        &mut self,
1398        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1399        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1400        now: Instant,
1401    ) -> u64 {
1402        let mut cleaned = 0;
1403        let candidate_timeout = self.config.candidate_timeout;
1404        // Clean up local candidates
1405        local_candidates.retain(|_seq, candidate| {
1406            let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1407                || candidate.state == CandidateState::Failed
1408                || candidate.state == CandidateState::Removed;
1409            if is_stale {
1410                cleaned += 1;
1411                trace!("Cleaned up stale local candidate {:?}", candidate.address);
1412            }
1413            !is_stale
1414        });
1415
1416        // Clean up remote candidates
1417        remote_candidates.retain(|_seq, candidate| {
1418            let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1419                || candidate.state == CandidateState::Failed
1420                || candidate.state == CandidateState::Removed;
1421            if is_stale {
1422                cleaned += 1;
1423                trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1424            }
1425            !is_stale
1426        });
1427
1428        cleaned
1429    }
1430
1431    /// Clean up failed candidate pairs
1432    fn cleanup_failed_pairs(
1433        &mut self,
1434        candidate_pairs: &mut Vec<CandidatePair>,
1435        now: Instant,
1436    ) -> u64 {
1437        let mut cleaned = 0;
1438        let pair_timeout = self.config.candidate_timeout;
1439        candidate_pairs.retain(|pair| {
1440            let is_stale = now.duration_since(pair.created_at) > pair_timeout
1441                || pair.state == PairState::Failed;
1442            if is_stale {
1443                cleaned += 1;
1444                trace!(
1445                    "Cleaned up failed candidate pair {:?} -> {:?}",
1446                    pair.local_addr, pair.remote_addr
1447                );
1448            }
1449            !is_stale
1450        });
1451
1452        cleaned
1453    }
1454
1455    /// Clean up old coordination state
1456    fn cleanup_old_coordination(
1457        &mut self,
1458        coordination: &mut Option<CoordinationState>,
1459        now: Instant,
1460    ) -> u64 {
1461        let mut cleaned = 0;
1462        if let Some(coord) = coordination {
1463            let is_expired =
1464                now.duration_since(coord.round_start) > self.config.coordination_timeout;
1465            let is_failed = coord.state == CoordinationPhase::Failed;
1466
1467            if is_expired || is_failed {
1468                let round = coord.round;
1469                *coordination = None;
1470                cleaned += 1;
1471                trace!("Cleaned up old coordination state for round {}", round);
1472            }
1473        }
1474
1475        cleaned
1476    }
1477
1478    /// Perform aggressive cleanup when under memory pressure
1479    fn aggressive_cleanup(
1480        &mut self,
1481        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1482        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1483        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1484        candidate_pairs: &mut Vec<CandidatePair>,
1485        now: Instant,
1486    ) -> u64 {
1487        let mut cleaned = 0;
1488        // More aggressive timeout for candidates
1489        let aggressive_timeout = self.config.candidate_timeout / 2;
1490
1491        // Clean up older candidates first
1492        local_candidates.retain(|_seq, candidate| {
1493            let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1494                && candidate.state != CandidateState::Failed;
1495            if !keep {
1496                cleaned += 1;
1497            }
1498            keep
1499        });
1500
1501        remote_candidates.retain(|_seq, candidate| {
1502            let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1503                && candidate.state != CandidateState::Failed;
1504            if !keep {
1505                cleaned += 1;
1506            }
1507            keep
1508        });
1509
1510        // Clean up waiting candidate pairs
1511        candidate_pairs.retain(|pair| {
1512            let keep = pair.state != PairState::Waiting
1513                || now.duration_since(pair.created_at) <= aggressive_timeout;
1514            if !keep {
1515                cleaned += 1;
1516            }
1517            keep
1518        });
1519
1520        // Clean up old validations more aggressively
1521        active_validations.retain(|_addr, validation| {
1522            let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1523            if !keep {
1524                cleaned += 1;
1525            }
1526            keep
1527        });
1528
1529        warn!(
1530            "Aggressive cleanup removed {} resources due to memory pressure",
1531            cleaned
1532        );
1533        cleaned
1534    }
1535
1536    /// Request graceful shutdown and cleanup
1537    fn request_shutdown(&mut self) {
1538        self.shutdown_requested = true;
1539        debug!("Resource cleanup coordinator shutdown requested");
1540    }
1541    /// Perform final cleanup during shutdown
1542    fn shutdown_cleanup(
1543        &mut self,
1544        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1545        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1546        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1547        candidate_pairs: &mut Vec<CandidatePair>,
1548        coordination: &mut Option<CoordinationState>,
1549    ) -> u64 {
1550        let mut cleaned = 0;
1551        // Clear all resources
1552        cleaned += active_validations.len() as u64;
1553        active_validations.clear();
1554
1555        cleaned += local_candidates.len() as u64;
1556        local_candidates.clear();
1557
1558        cleaned += remote_candidates.len() as u64;
1559        remote_candidates.clear();
1560
1561        cleaned += candidate_pairs.len() as u64;
1562        candidate_pairs.clear();
1563
1564        if coordination.is_some() {
1565            *coordination = None;
1566            cleaned += 1;
1567        }
1568
1569        info!("Shutdown cleanup removed {} resources", cleaned);
1570        cleaned
1571    }
1572
1573    /// Get current resource usage statistics
1574    fn get_resource_stats(&self) -> &ResourceStats {
1575        &self.stats
1576    }
1577    /// Update resource usage statistics
1578    fn update_stats(
1579        &mut self,
1580        active_validations_len: usize,
1581        local_candidates_len: usize,
1582        remote_candidates_len: usize,
1583        candidate_pairs_len: usize,
1584    ) {
1585        self.stats.active_validations = active_validations_len;
1586        self.stats.local_candidates = local_candidates_len;
1587        self.stats.remote_candidates = remote_candidates_len;
1588        self.stats.candidate_pairs = candidate_pairs_len;
1589        // Update peak memory usage
1590        let current_usage = self.stats.active_validations
1591            + self.stats.local_candidates
1592            + self.stats.remote_candidates
1593            + self.stats.candidate_pairs;
1594
1595        if current_usage > self.stats.peak_memory_usage {
1596            self.stats.peak_memory_usage = current_usage;
1597        }
1598    }
1599
1600    /// Perform resource cleanup based on current state
1601    pub(super) fn perform_cleanup(&mut self, now: Instant) {
1602        self.last_cleanup = Some(now);
1603        self.cleanup_counter += 1;
1604        // Update cleanup statistics
1605        self.stats.cleanup_operations += 1;
1606
1607        debug!("Performed resource cleanup #{}", self.cleanup_counter);
1608    }
1609}
1610
1611impl NetworkConditionMonitor {
1612    /// Create new network condition monitor
1613    fn new() -> Self {
1614        Self {
1615            rtt_samples: VecDeque::new(),
1616            max_samples: 20,
1617            packet_loss_rate: 0.0,
1618            congestion_window: 10,
1619            quality_score: 0.8, // Start with good quality assumption
1620            last_quality_update: Instant::now(),
1621            quality_update_interval: Duration::from_secs(10),
1622            timeout_stats: TimeoutStatistics::default(),
1623        }
1624    }
1625    /// Record a successful response time
1626    fn record_success(&mut self, rtt: Duration, now: Instant) {
1627        // Add RTT sample
1628        self.rtt_samples.push_back(rtt);
1629        if self.rtt_samples.len() > self.max_samples {
1630            self.rtt_samples.pop_front();
1631        }
1632        // Update timeout statistics
1633        self.timeout_stats.total_responses += 1;
1634        self.update_timeout_stats(now);
1635
1636        // Update quality score
1637        self.update_quality_score(now);
1638    }
1639
1640    /// Record a timeout event
1641    fn record_timeout(&mut self, now: Instant) {
1642        self.timeout_stats.total_timeouts += 1;
1643        self.update_timeout_stats(now);
1644        // Update quality score
1645        self.update_quality_score(now);
1646    }
1647
1648    /// Update timeout statistics
1649    fn update_timeout_stats(&mut self, now: Instant) {
1650        let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1651        if total_attempts > 0 {
1652            self.timeout_stats.timeout_rate =
1653                self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1654        }
1655
1656        // Calculate average response time
1657        if !self.rtt_samples.is_empty() {
1658            let total_rtt: Duration = self.rtt_samples.iter().sum();
1659            self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1660        }
1661
1662        self.timeout_stats.last_update = Some(now);
1663    }
1664
1665    /// Update network quality score
1666    fn update_quality_score(&mut self, now: Instant) {
1667        if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1668            return;
1669        }
1670        // Quality factors
1671        let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1672        let rtt_factor = self.calculate_rtt_factor();
1673        let consistency_factor = self.calculate_consistency_factor();
1674
1675        // Weighted quality score
1676        let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1677
1678        // Smooth the quality score
1679        self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1680        self.last_quality_update = now;
1681    }
1682
1683    /// Calculate RTT factor for quality score
1684    fn calculate_rtt_factor(&self) -> f64 {
1685        if self.rtt_samples.is_empty() {
1686            return 0.5; // Neutral score
1687        }
1688        let avg_rtt = self.timeout_stats.avg_response_time;
1689
1690        // Good RTT: < 50ms = 1.0, Poor RTT: > 1000ms = 0.0
1691        let rtt_ms = avg_rtt.as_millis() as f64;
1692        let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1693        factor.clamp(0.0, 1.0)
1694    }
1695
1696    /// Calculate consistency factor for quality score
1697    fn calculate_consistency_factor(&self) -> f64 {
1698        if self.rtt_samples.len() < 3 {
1699            return 0.5; // Neutral score
1700        }
1701        // Calculate RTT variance
1702        let mean_rtt = self.timeout_stats.avg_response_time;
1703        let variance: f64 = self
1704            .rtt_samples
1705            .iter()
1706            .map(|rtt| {
1707                let diff = (*rtt).abs_diff(mean_rtt);
1708                diff.as_millis() as f64
1709            })
1710            .map(|diff| diff * diff)
1711            .sum::<f64>()
1712            / self.rtt_samples.len() as f64;
1713
1714        let std_dev = variance.sqrt();
1715
1716        // Low variance = high consistency
1717        let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1718        consistency.clamp(0.0, 1.0)
1719    }
1720
1721    /// Get current network quality score
1722    fn get_quality_score(&self) -> f64 {
1723        self.quality_score
1724    }
1725    /// Get estimated RTT based on recent samples
1726    fn get_estimated_rtt(&self) -> Option<Duration> {
1727        if self.rtt_samples.is_empty() {
1728            return None;
1729        }
1730        Some(self.timeout_stats.avg_response_time)
1731    }
1732
1733    /// Check if network conditions are suitable for coordination
1734    fn is_suitable_for_coordination(&self) -> bool {
1735        // Require reasonable quality for coordination attempts
1736        self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1737    }
1738    /// Get estimated packet loss rate
1739    fn get_packet_loss_rate(&self) -> f64 {
1740        self.packet_loss_rate
1741    }
1742
1743    /// Get recommended timeout multiplier based on conditions
1744    fn get_timeout_multiplier(&self) -> f64 {
1745        let base_multiplier = 1.0;
1746
1747        // Adjust based on quality score
1748        let quality_multiplier = if self.quality_score < 0.3 {
1749            2.0 // Poor quality, increase timeouts
1750        } else if self.quality_score > 0.8 {
1751            0.8 // Good quality, reduce timeouts
1752        } else {
1753            1.0 // Neutral
1754        };
1755
1756        // Adjust based on packet loss
1757        let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1758
1759        base_multiplier * quality_multiplier * loss_multiplier
1760    }
1761
1762    /// Clean up old samples and statistics
1763    fn cleanup(&mut self, now: Instant) {
1764        // Remove old RTT samples (keep only recent ones)
1765        let _cutoff_time = now - Duration::from_secs(60);
1766
1767        // Reset statistics if they're too old
1768        if let Some(last_update) = self.timeout_stats.last_update {
1769            if now.duration_since(last_update) > Duration::from_secs(300) {
1770                self.timeout_stats = TimeoutStatistics::default();
1771            }
1772        }
1773    }
1774}
1775
1776impl NatTraversalState {
1777    /// Create new NAT traversal state with given role and configuration
1778    pub(super) fn new(
1779        role: NatTraversalRole,
1780        max_candidates: u32,
1781        coordination_timeout: Duration,
1782    ) -> Self {
1783        let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1784            Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1785        } else {
1786            None
1787        };
1788        Self {
1789            role,
1790            local_candidates: HashMap::new(),
1791            remote_candidates: HashMap::new(),
1792            candidate_pairs: Vec::new(),
1793            pair_index: HashMap::new(),
1794            active_validations: HashMap::new(),
1795            coordination: None,
1796            next_sequence: VarInt::from_u32(1),
1797            max_candidates,
1798            coordination_timeout,
1799            stats: NatTraversalStats::default(),
1800            security_state: SecurityValidationState::new(),
1801            network_monitor: NetworkConditionMonitor::new(),
1802            resource_manager: ResourceCleanupCoordinator::new(),
1803            bootstrap_coordinator,
1804        }
1805    }
1806
1807    /// Add a remote candidate from AddAddress frame with security validation
1808    pub(super) fn add_remote_candidate(
1809        &mut self,
1810        sequence: VarInt,
1811        address: SocketAddr,
1812        priority: VarInt,
1813        now: Instant,
1814    ) -> Result<(), NatTraversalError> {
1815        // Resource management: Check if we should reject new resources
1816        if self.should_reject_new_resources(now) {
1817            debug!(
1818                "Rejecting new candidate due to resource limits: {}",
1819                address
1820            );
1821            return Err(NatTraversalError::ResourceLimitExceeded);
1822        }
1823        // Security validation: Check rate limiting
1824        if self.security_state.is_candidate_rate_limited(now) {
1825            self.stats.rate_limit_violations += 1;
1826            debug!("Rate limit exceeded for candidate addition: {}", address);
1827            return Err(NatTraversalError::RateLimitExceeded);
1828        }
1829
1830        // Security validation: Validate address format and safety
1831        match self.security_state.validate_address(address, now) {
1832            AddressValidationResult::Invalid => {
1833                self.stats.invalid_address_rejections += 1;
1834                self.stats.security_rejections += 1;
1835                debug!("Invalid address rejected: {}", address);
1836                return Err(NatTraversalError::InvalidAddress);
1837            }
1838            AddressValidationResult::Suspicious => {
1839                self.stats.security_rejections += 1;
1840                debug!("Suspicious address rejected: {}", address);
1841                return Err(NatTraversalError::SecurityValidationFailed);
1842            }
1843            AddressValidationResult::Valid => {
1844                // Continue with normal processing
1845            }
1846        }
1847
1848        // Check candidate count limit
1849        if self.remote_candidates.len() >= self.max_candidates as usize {
1850            return Err(NatTraversalError::TooManyCandidates);
1851        }
1852
1853        // Check for duplicate addresses (different sequence, same address)
1854        if self
1855            .remote_candidates
1856            .values()
1857            .any(|c| c.address == address && c.state != CandidateState::Removed)
1858        {
1859            return Err(NatTraversalError::DuplicateAddress);
1860        }
1861
1862        let candidate = AddressCandidate {
1863            address,
1864            priority: priority.into_inner() as u32,
1865            source: CandidateSource::Peer,
1866            discovered_at: now,
1867            state: CandidateState::New,
1868            attempt_count: 0,
1869            last_attempt: None,
1870        };
1871
1872        self.remote_candidates.insert(sequence, candidate);
1873        self.stats.remote_candidates_received += 1;
1874
1875        trace!(
1876            "Added remote candidate: {} with priority {}",
1877            address, priority
1878        );
1879        Ok(())
1880    }
1881
1882    /// Remove a candidate by sequence number
1883    pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1884        if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1885            candidate.state = CandidateState::Removed;
1886            // Cancel any active validation for this address
1887            self.active_validations.remove(&candidate.address);
1888            true
1889        } else {
1890            false
1891        }
1892    }
1893
1894    /// Add a local candidate that we've discovered
1895    pub(super) fn add_local_candidate(
1896        &mut self,
1897        address: SocketAddr,
1898        source: CandidateSource,
1899        now: Instant,
1900    ) -> VarInt {
1901        let sequence = self.next_sequence;
1902        self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1903            .expect("sequence number overflow");
1904        // Calculate priority for this candidate
1905        let candidate_type = classify_candidate_type(source);
1906        let local_preference = self.calculate_local_preference(address);
1907        let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1908
1909        let candidate = AddressCandidate {
1910            address,
1911            priority,
1912            source,
1913            discovered_at: now,
1914            state: CandidateState::New,
1915            attempt_count: 0,
1916            last_attempt: None,
1917        };
1918
1919        self.local_candidates.insert(sequence, candidate);
1920        self.stats.local_candidates_sent += 1;
1921
1922        // Regenerate pairs when we add a new local candidate
1923        self.generate_candidate_pairs(now);
1924
1925        sequence
1926    }
1927
1928    /// Calculate local preference for address prioritization
1929    fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1930        match addr {
1931            SocketAddr::V4(v4) => {
1932                if v4.ip().is_loopback() {
1933                    0 // Lowest priority
1934                } else if v4.ip().is_private() {
1935                    65000 // High priority for local network
1936                } else {
1937                    32000 // Medium priority for public addresses
1938                }
1939            }
1940            SocketAddr::V6(v6) => {
1941                if v6.ip().is_loopback() {
1942                    0
1943                } else if v6.ip().segments()[0] == 0xfe80 {
1944                    // Link-local IPv6 check
1945                    30000 // Link-local gets medium-low priority
1946                } else {
1947                    50000 // IPv6 generally gets good priority
1948                }
1949            }
1950        }
1951    }
1952    /// Generate all possible candidate pairs from local and remote candidates
1953    pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1954        self.candidate_pairs.clear();
1955        self.pair_index.clear();
1956        // Pre-allocate capacity to avoid reallocations
1957        let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1958        self.candidate_pairs.reserve(estimated_capacity);
1959        self.pair_index.reserve(estimated_capacity);
1960
1961        // Cache compatibility checks to avoid repeated work
1962        let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
1963
1964        for local_candidate in self.local_candidates.values() {
1965            // Skip removed candidates early
1966            if local_candidate.state == CandidateState::Removed {
1967                continue;
1968            }
1969
1970            // Pre-classify local candidate type once
1971            let local_type = classify_candidate_type(local_candidate.source);
1972
1973            for (remote_seq, remote_candidate) in &self.remote_candidates {
1974                // Skip removed candidates
1975                if remote_candidate.state == CandidateState::Removed {
1976                    continue;
1977                }
1978
1979                // Check compatibility with caching
1980                let cache_key = (local_candidate.address, remote_candidate.address);
1981                let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
1982                    are_candidates_compatible(local_candidate, remote_candidate)
1983                });
1984
1985                if !compatible {
1986                    continue;
1987                }
1988
1989                // Calculate combined priority
1990                let pair_priority =
1991                    calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
1992
1993                // Classify pair type (local already classified)
1994                let remote_type = classify_candidate_type(remote_candidate.source);
1995                let pair_type = classify_pair_type(local_type, remote_type);
1996
1997                let pair = CandidatePair {
1998                    remote_sequence: *remote_seq,
1999                    local_addr: local_candidate.address,
2000                    remote_addr: remote_candidate.address,
2001                    priority: pair_priority,
2002                    state: PairState::Waiting,
2003                    pair_type,
2004                    created_at: now,
2005                    last_check: None,
2006                };
2007
2008                // Store index for O(1) lookup
2009                let index = self.candidate_pairs.len();
2010                self.pair_index.insert(remote_candidate.address, index);
2011                self.candidate_pairs.push(pair);
2012            }
2013        }
2014
2015        // Sort pairs by priority (highest first) - use unstable sort for better performance
2016        self.candidate_pairs
2017            .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2018
2019        // Rebuild index after sorting since indices changed
2020        self.pair_index.clear();
2021        for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2022            self.pair_index.insert(pair.remote_addr, idx);
2023        }
2024
2025        trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2026    }
2027
2028    /// Get the highest priority pairs ready for validation
2029    pub(super) fn get_next_validation_pairs(
2030        &mut self,
2031        max_concurrent: usize,
2032    ) -> Vec<&mut CandidatePair> {
2033        // Since pairs are sorted by priority (highest first), we can stop early
2034        // once we find enough waiting pairs or reach lower priority pairs
2035        let mut result = Vec::with_capacity(max_concurrent);
2036        for pair in self.candidate_pairs.iter_mut() {
2037            if pair.state == PairState::Waiting {
2038                result.push(pair);
2039                if result.len() >= max_concurrent {
2040                    break;
2041                }
2042            }
2043        }
2044
2045        result
2046    }
2047
2048    /// Find a candidate pair by remote address
2049    pub(super) fn find_pair_by_remote_addr(
2050        &mut self,
2051        addr: SocketAddr,
2052    ) -> Option<&mut CandidatePair> {
2053        // Use index for O(1) lookup instead of O(n) linear search
2054        if let Some(&index) = self.pair_index.get(&addr) {
2055            self.candidate_pairs.get_mut(index)
2056        } else {
2057            None
2058        }
2059    }
2060    /// Mark a pair as succeeded and handle promotion
2061    pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2062        // Find the pair and get its type and priority
2063        let (succeeded_type, succeeded_priority) = {
2064            if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2065                pair.state = PairState::Succeeded;
2066                (pair.pair_type, pair.priority)
2067            } else {
2068                return false;
2069            }
2070        };
2071        // Freeze lower priority pairs of the same type to avoid unnecessary testing
2072        for other_pair in &mut self.candidate_pairs {
2073            if other_pair.pair_type == succeeded_type
2074                && other_pair.priority < succeeded_priority
2075                && other_pair.state == PairState::Waiting
2076            {
2077                other_pair.state = PairState::Frozen;
2078            }
2079        }
2080
2081        true
2082    }
2083
2084    /// Get the best succeeded pair for each address family
2085    pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2086        let mut best_ipv4: Option<&CandidatePair> = None;
2087        let mut best_ipv6: Option<&CandidatePair> = None;
2088        for pair in &self.candidate_pairs {
2089            if pair.state != PairState::Succeeded {
2090                continue;
2091            }
2092
2093            match pair.remote_addr {
2094                SocketAddr::V4(_) => {
2095                    if best_ipv4.is_none_or(|best| pair.priority > best.priority) {
2096                        best_ipv4 = Some(pair);
2097                    }
2098                }
2099                SocketAddr::V6(_) => {
2100                    if best_ipv6.is_none_or(|best| pair.priority > best.priority) {
2101                        best_ipv6 = Some(pair);
2102                    }
2103                }
2104            }
2105        }
2106
2107        let mut result = Vec::new();
2108        if let Some(pair) = best_ipv4 {
2109            result.push(pair);
2110        }
2111        if let Some(pair) = best_ipv6 {
2112            result.push(pair);
2113        }
2114        result
2115    }
2116
2117    /// Get candidates ready for validation, sorted by priority
2118    pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2119        let mut candidates: Vec<_> = self
2120            .remote_candidates
2121            .iter()
2122            .filter(|(_, c)| c.state == CandidateState::New)
2123            .map(|(k, v)| (*k, v))
2124            .collect();
2125        // Sort by priority (higher priority first)
2126        candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2127        candidates
2128    }
2129
2130    /// Start validation for a candidate address with security checks
2131    pub(super) fn start_validation(
2132        &mut self,
2133        sequence: VarInt,
2134        challenge: u64,
2135        now: Instant,
2136    ) -> Result<(), NatTraversalError> {
2137        let candidate = self
2138            .remote_candidates
2139            .get_mut(&sequence)
2140            .ok_or(NatTraversalError::UnknownCandidate)?;
2141        if candidate.state != CandidateState::New {
2142            return Err(NatTraversalError::InvalidCandidateState);
2143        }
2144
2145        // Security validation: Check for validation abuse patterns
2146        if Self::is_validation_suspicious(candidate, now) {
2147            self.stats.security_rejections += 1;
2148            debug!(
2149                "Suspicious validation attempt rejected for address {}",
2150                candidate.address
2151            );
2152            return Err(NatTraversalError::SecurityValidationFailed);
2153        }
2154
2155        // Security validation: Limit concurrent validations
2156        if self.active_validations.len() >= 10 {
2157            debug!(
2158                "Too many concurrent validations, rejecting new validation for {}",
2159                candidate.address
2160            );
2161            return Err(NatTraversalError::SecurityValidationFailed);
2162        }
2163
2164        // Update candidate state
2165        candidate.state = CandidateState::Validating;
2166        candidate.attempt_count += 1;
2167        candidate.last_attempt = Some(now);
2168
2169        // Track validation state
2170        let validation = PathValidationState {
2171            challenge,
2172            sent_at: now,
2173            retry_count: 0,
2174            max_retries: 3, // TODO: Make configurable
2175            coordination_round: self.coordination.as_ref().map(|c| c.round),
2176            timeout_state: AdaptiveTimeoutState::new(),
2177            last_retry_at: None,
2178        };
2179
2180        self.active_validations
2181            .insert(candidate.address, validation);
2182        trace!(
2183            "Started validation for candidate {} with challenge {}",
2184            candidate.address, challenge
2185        );
2186        Ok(())
2187    }
2188
2189    /// Check if a validation request shows suspicious patterns
2190    fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2191        // Check for excessive retry attempts
2192        if candidate.attempt_count > 10 {
2193            return true;
2194        }
2195        // Check for rapid retry patterns
2196        if let Some(last_attempt) = candidate.last_attempt {
2197            let time_since_last = now.duration_since(last_attempt);
2198            if time_since_last < Duration::from_millis(100) {
2199                return true; // Too frequent attempts
2200            }
2201        }
2202
2203        // Check if this candidate was recently failed
2204        if candidate.state == CandidateState::Failed {
2205            let time_since_discovery = now.duration_since(candidate.discovered_at);
2206            if time_since_discovery < Duration::from_secs(60) {
2207                return true; // Recently failed, shouldn't retry so soon
2208            }
2209        }
2210
2211        false
2212    }
2213
2214    /// Handle successful validation response
2215    pub(super) fn handle_validation_success(
2216        &mut self,
2217        remote_addr: SocketAddr,
2218        challenge: u64,
2219        now: Instant,
2220    ) -> Result<VarInt, NatTraversalError> {
2221        // Find the candidate with this address
2222        let sequence = self
2223            .remote_candidates
2224            .iter()
2225            .find(|(_, c)| c.address == remote_addr)
2226            .map(|(seq, _)| *seq)
2227            .ok_or(NatTraversalError::UnknownCandidate)?;
2228        // Verify challenge matches and update timeout state
2229        let validation = self
2230            .active_validations
2231            .get_mut(&remote_addr)
2232            .ok_or(NatTraversalError::NoActiveValidation)?;
2233
2234        if validation.challenge != challenge {
2235            return Err(NatTraversalError::ChallengeMismatch);
2236        }
2237
2238        // Calculate RTT and update adaptive timeout
2239        let rtt = now.duration_since(validation.sent_at);
2240        validation.timeout_state.update_success(rtt);
2241
2242        // Update network monitor
2243        self.network_monitor.record_success(rtt, now);
2244
2245        // Update candidate state
2246        let candidate = self
2247            .remote_candidates
2248            .get_mut(&sequence)
2249            .ok_or(NatTraversalError::UnknownCandidate)?;
2250
2251        candidate.state = CandidateState::Valid;
2252        self.active_validations.remove(&remote_addr);
2253        self.stats.validations_succeeded += 1;
2254
2255        trace!(
2256            "Validation successful for {} with RTT {:?}",
2257            remote_addr, rtt
2258        );
2259        Ok(sequence)
2260    }
2261
2262    /// Start a new coordination round for simultaneous hole punching with security validation
2263    pub(super) fn start_coordination_round(
2264        &mut self,
2265        targets: Vec<PunchTarget>,
2266        now: Instant,
2267    ) -> Result<VarInt, NatTraversalError> {
2268        // Security validation: Check rate limiting for coordination requests
2269        if self.security_state.is_coordination_rate_limited(now) {
2270            self.stats.rate_limit_violations += 1;
2271            debug!(
2272                "Rate limit exceeded for coordination request with {} targets",
2273                targets.len()
2274            );
2275            return Err(NatTraversalError::RateLimitExceeded);
2276        }
2277        // Security validation: Check for suspicious coordination patterns
2278        if self.is_coordination_suspicious(&targets, now) {
2279            self.stats.suspicious_coordination_attempts += 1;
2280            self.stats.security_rejections += 1;
2281            debug!(
2282                "Suspicious coordination request rejected with {} targets",
2283                targets.len()
2284            );
2285            return Err(NatTraversalError::SuspiciousCoordination);
2286        }
2287
2288        // Security validation: Validate all target addresses
2289        for target in &targets {
2290            match self
2291                .security_state
2292                .validate_address(target.remote_addr, now)
2293            {
2294                AddressValidationResult::Invalid => {
2295                    self.stats.invalid_address_rejections += 1;
2296                    self.stats.security_rejections += 1;
2297                    debug!(
2298                        "Invalid target address in coordination: {}",
2299                        target.remote_addr
2300                    );
2301                    return Err(NatTraversalError::InvalidAddress);
2302                }
2303                AddressValidationResult::Suspicious => {
2304                    self.stats.security_rejections += 1;
2305                    debug!(
2306                        "Suspicious target address in coordination: {}",
2307                        target.remote_addr
2308                    );
2309                    return Err(NatTraversalError::SecurityValidationFailed);
2310                }
2311                AddressValidationResult::Valid => {
2312                    // Continue with normal processing
2313                }
2314            }
2315        }
2316
2317        let round = self.next_sequence;
2318        self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2319            .expect("sequence number overflow");
2320
2321        // Calculate synchronized punch time (grace period for coordination)
2322        let coordination_grace = Duration::from_millis(500); // 500ms for coordination
2323        let punch_start = now + coordination_grace;
2324
2325        self.coordination = Some(CoordinationState {
2326            round,
2327            punch_targets: targets,
2328            round_start: now,
2329            punch_start,
2330            round_duration: self.coordination_timeout,
2331            state: CoordinationPhase::Requesting,
2332            punch_request_sent: false,
2333            peer_punch_received: false,
2334            retry_count: 0,
2335            max_retries: 3,
2336            timeout_state: AdaptiveTimeoutState::new(),
2337            last_retry_at: None,
2338        });
2339
2340        self.stats.coordination_rounds += 1;
2341        trace!(
2342            "Started coordination round {} with {} targets",
2343            round,
2344            self.coordination.as_ref().unwrap().punch_targets.len()
2345        );
2346        Ok(round)
2347    }
2348
2349    /// Check if a coordination request shows suspicious patterns
2350    fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2351        // Check for excessive number of targets
2352        if targets.len() > 20 {
2353            return true;
2354        }
2355        // Check for duplicate targets
2356        let mut seen_addresses = std::collections::HashSet::new();
2357        for target in targets {
2358            if !seen_addresses.insert(target.remote_addr) {
2359                return true; // Duplicate target
2360            }
2361        }
2362
2363        // Check for patterns that might indicate scanning
2364        if targets.len() > 5 {
2365            // Check if all targets are in sequential IP ranges (potential scan)
2366            let mut ipv4_addresses: Vec<_> = targets
2367                .iter()
2368                .filter_map(|t| match t.remote_addr.ip() {
2369                    IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2370                    _ => None,
2371                })
2372                .collect();
2373
2374            if ipv4_addresses.len() >= 3 {
2375                ipv4_addresses.sort();
2376                let mut sequential_count = 1;
2377                for i in 1..ipv4_addresses.len() {
2378                    if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2379                        sequential_count += 1;
2380                        if sequential_count >= 3 {
2381                            return true; // Sequential IPs detected
2382                        }
2383                    } else {
2384                        sequential_count = 1;
2385                    }
2386                }
2387            }
2388        }
2389
2390        false
2391    }
2392
2393    /// Get the current coordination phase
2394    pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2395        self.coordination.as_ref().map(|c| c.state)
2396    }
2397    /// Check if we need to send PUNCH_ME_NOW frame
2398    pub(super) fn should_send_punch_request(&self) -> bool {
2399        if let Some(coord) = &self.coordination {
2400            coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2401        } else {
2402            false
2403        }
2404    }
2405    /// Mark that we've sent our PUNCH_ME_NOW request
2406    pub(super) fn mark_punch_request_sent(&mut self) {
2407        if let Some(coord) = &mut self.coordination {
2408            coord.punch_request_sent = true;
2409            coord.state = CoordinationPhase::Coordinating;
2410            trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2411        }
2412    }
2413    /// Handle receiving peer's PUNCH_ME_NOW (via coordinator) with security validation
2414    pub(super) fn handle_peer_punch_request(
2415        &mut self,
2416        peer_round: VarInt,
2417        now: Instant,
2418    ) -> Result<bool, NatTraversalError> {
2419        // Security validation: Check if this is a valid coordination request
2420        if self.is_peer_coordination_suspicious(peer_round, now) {
2421            self.stats.suspicious_coordination_attempts += 1;
2422            self.stats.security_rejections += 1;
2423            debug!(
2424                "Suspicious peer coordination request rejected for round {}",
2425                peer_round
2426            );
2427            return Err(NatTraversalError::SuspiciousCoordination);
2428        }
2429        if let Some(coord) = &mut self.coordination {
2430            if coord.round == peer_round {
2431                match coord.state {
2432                    CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2433                        coord.peer_punch_received = true;
2434                        coord.state = CoordinationPhase::Preparing;
2435
2436                        // Calculate adaptive grace period based on network conditions
2437                        let network_rtt = self
2438                            .network_monitor
2439                            .get_estimated_rtt()
2440                            .unwrap_or(Duration::from_millis(100));
2441                        let quality_score = self.network_monitor.get_quality_score();
2442
2443                        // Scale grace period: good networks get shorter delays
2444                        let base_grace = Duration::from_millis(150);
2445                        let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2446                        let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2447
2448                        let adaptive_grace = Duration::from_millis(
2449                            (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2450                        );
2451
2452                        coord.punch_start = now + adaptive_grace;
2453
2454                        trace!(
2455                            "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2456                            adaptive_grace, network_rtt, quality_score
2457                        );
2458                        Ok(true)
2459                    }
2460                    CoordinationPhase::Preparing => {
2461                        // Already in preparation phase, just acknowledge
2462                        trace!("Peer coordination confirmed during preparation");
2463                        Ok(true)
2464                    }
2465                    _ => {
2466                        debug!(
2467                            "Received coordination in unexpected phase: {:?}",
2468                            coord.state
2469                        );
2470                        Ok(false)
2471                    }
2472                }
2473            } else {
2474                debug!(
2475                    "Received coordination for wrong round: {} vs {}",
2476                    peer_round, coord.round
2477                );
2478                Ok(false)
2479            }
2480        } else {
2481            debug!("Received peer coordination but no active round");
2482            Ok(false)
2483        }
2484    }
2485
2486    /// Check if a peer coordination request is suspicious
2487    fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2488        // Check for round number anomalies
2489        if peer_round.into_inner() == 0 {
2490            return true; // Invalid round number
2491        }
2492        // Check if round is too far in the future or past
2493        if let Some(coord) = &self.coordination {
2494            let our_round = coord.round.into_inner();
2495            let peer_round_num = peer_round.into_inner();
2496
2497            // Allow some variance but reject extreme differences
2498            if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2499                return true;
2500            }
2501        }
2502
2503        false
2504    }
2505
2506    /// Check if it's time to start hole punching
2507    pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2508        if let Some(coord) = &self.coordination {
2509            match coord.state {
2510                CoordinationPhase::Preparing => now >= coord.punch_start,
2511                CoordinationPhase::Coordinating => {
2512                    // Check if we have peer confirmation and grace period elapsed
2513                    coord.peer_punch_received && now >= coord.punch_start
2514                }
2515                _ => false,
2516            }
2517        } else {
2518            false
2519        }
2520    }
2521    /// Start the synchronized hole punching phase
2522    pub(super) fn start_punching_phase(&mut self, now: Instant) {
2523        if let Some(coord) = &mut self.coordination {
2524            coord.state = CoordinationPhase::Punching;
2525            // Calculate precise timing for coordinated transmission
2526            let network_rtt = self
2527                .network_monitor
2528                .get_estimated_rtt()
2529                .unwrap_or(Duration::from_millis(100));
2530
2531            // Add small random jitter to avoid thundering herd
2532            let jitter_ms: u64 = rand::random::<u64>() % 11;
2533            let jitter = Duration::from_millis(jitter_ms);
2534            let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2535
2536            // Update punch start time with precise calculation
2537            coord.punch_start = transmission_time.max(now);
2538
2539            trace!(
2540                "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2541                coord.punch_start, network_rtt, jitter
2542            );
2543        }
2544    }
2545
2546    /// Get punch targets for the current round
2547    pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2548        self.coordination
2549            .as_ref()
2550            .map(|c| c.punch_targets.as_slice())
2551    }
2552    /// Mark coordination as validating (PATH_CHALLENGE sent)
2553    pub(super) fn mark_coordination_validating(&mut self) {
2554        if let Some(coord) = &mut self.coordination {
2555            if coord.state == CoordinationPhase::Punching {
2556                coord.state = CoordinationPhase::Validating;
2557                trace!("Coordination moved to validation phase");
2558            }
2559        }
2560    }
2561    /// Handle successful path validation during coordination
2562    pub(super) fn handle_coordination_success(
2563        &mut self,
2564        remote_addr: SocketAddr,
2565        now: Instant,
2566    ) -> bool {
2567        if let Some(coord) = &mut self.coordination {
2568            // Check if this address was one of our punch targets
2569            let was_target = coord
2570                .punch_targets
2571                .iter()
2572                .any(|target| target.remote_addr == remote_addr);
2573            if was_target && coord.state == CoordinationPhase::Validating {
2574                // Calculate RTT and update adaptive timeout
2575                let rtt = now.duration_since(coord.round_start);
2576                coord.timeout_state.update_success(rtt);
2577                self.network_monitor.record_success(rtt, now);
2578
2579                coord.state = CoordinationPhase::Succeeded;
2580                self.stats.direct_connections += 1;
2581                trace!(
2582                    "Coordination succeeded via {} with RTT {:?}",
2583                    remote_addr, rtt
2584                );
2585                true
2586            } else {
2587                false
2588            }
2589        } else {
2590            false
2591        }
2592    }
2593
2594    /// Handle coordination failure and determine if we should retry
2595    pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2596        if let Some(coord) = &mut self.coordination {
2597            coord.retry_count += 1;
2598            coord.timeout_state.update_timeout();
2599            self.network_monitor.record_timeout(now);
2600            // Check network conditions before retrying
2601            if coord.timeout_state.should_retry(coord.max_retries)
2602                && self.network_monitor.is_suitable_for_coordination()
2603            {
2604                // Retry with adaptive timeout
2605                coord.state = CoordinationPhase::Requesting;
2606                coord.punch_request_sent = false;
2607                coord.peer_punch_received = false;
2608                coord.round_start = now;
2609                coord.last_retry_at = Some(now);
2610
2611                // Use adaptive timeout for retry delay
2612                let retry_delay = coord.timeout_state.get_retry_delay();
2613
2614                // Factor in network quality for retry timing
2615                let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2616                let adjusted_delay = Duration::from_millis(
2617                    (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2618                );
2619
2620                coord.punch_start = now + adjusted_delay;
2621
2622                trace!(
2623                    "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2624                    coord.round,
2625                    coord.retry_count + 1,
2626                    adjusted_delay,
2627                    self.network_monitor.get_quality_score()
2628                );
2629                true
2630            } else {
2631                coord.state = CoordinationPhase::Failed;
2632                self.stats.coordination_failures += 1;
2633
2634                if !self.network_monitor.is_suitable_for_coordination() {
2635                    trace!(
2636                        "Coordination failed due to poor network conditions (quality: {:.2})",
2637                        self.network_monitor.get_quality_score()
2638                    );
2639                } else {
2640                    trace!("Coordination failed after {} attempts", coord.retry_count);
2641                }
2642                false
2643            }
2644        } else {
2645            false
2646        }
2647    }
2648
2649    /// Check if the current coordination round has timed out
2650    pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2651        if let Some(coord) = &mut self.coordination {
2652            let timeout = coord.timeout_state.get_timeout();
2653            let elapsed = now.duration_since(coord.round_start);
2654            if elapsed > timeout {
2655                trace!(
2656                    "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2657                    coord.round, elapsed, timeout
2658                );
2659                self.handle_coordination_failure(now);
2660                true
2661            } else {
2662                false
2663            }
2664        } else {
2665            false
2666        }
2667    }
2668
2669    /// Check for validation timeouts and handle retries
2670    pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2671        let mut expired_validations = Vec::new();
2672        let mut retry_validations = Vec::new();
2673
2674        for (addr, validation) in &mut self.active_validations {
2675            let timeout = validation.timeout_state.get_timeout();
2676            let elapsed = now.duration_since(validation.sent_at);
2677
2678            if elapsed >= timeout {
2679                if validation
2680                    .timeout_state
2681                    .should_retry(validation.max_retries)
2682                {
2683                    // Schedule retry
2684                    retry_validations.push(*addr);
2685                } else {
2686                    // Mark as expired
2687                    expired_validations.push(*addr);
2688                }
2689            }
2690        }
2691
2692        // Handle retries
2693        for addr in retry_validations {
2694            if let Some(validation) = self.active_validations.get_mut(&addr) {
2695                validation.retry_count += 1;
2696                validation.sent_at = now;
2697                validation.last_retry_at = Some(now);
2698                validation.timeout_state.update_timeout();
2699
2700                trace!(
2701                    "Retrying validation for {} (attempt {})",
2702                    addr,
2703                    validation.retry_count + 1
2704                );
2705            }
2706        }
2707
2708        // Remove expired validations
2709        for addr in &expired_validations {
2710            self.active_validations.remove(addr);
2711            self.network_monitor.record_timeout(now);
2712            trace!("Validation expired for {}", addr);
2713        }
2714
2715        expired_validations
2716    }
2717
2718    /// Schedule validation retries for active validations that need retry
2719    pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2720        let mut retry_addresses = Vec::new();
2721
2722        // Get all active validations that need retry
2723        for (addr, validation) in &mut self.active_validations {
2724            let elapsed = now.duration_since(validation.sent_at);
2725            let timeout = validation.timeout_state.get_timeout();
2726
2727            if elapsed > timeout
2728                && validation
2729                    .timeout_state
2730                    .should_retry(validation.max_retries)
2731            {
2732                // Update retry state
2733                validation.retry_count += 1;
2734                validation.last_retry_at = Some(now);
2735                validation.sent_at = now; // Reset sent time for new attempt
2736                validation.timeout_state.update_timeout();
2737
2738                retry_addresses.push(*addr);
2739                trace!(
2740                    "Scheduled retry {} for validation to {}",
2741                    validation.retry_count, addr
2742                );
2743            }
2744        }
2745
2746        retry_addresses
2747    }
2748
2749    /// Update network conditions and cleanup
2750    pub(super) fn update_network_conditions(&mut self, now: Instant) {
2751        self.network_monitor.cleanup(now);
2752
2753        // Update timeout multiplier based on network conditions
2754        let multiplier = self.network_monitor.get_timeout_multiplier();
2755
2756        // Apply network-aware timeout adjustments to active validations
2757        for validation in self.active_validations.values_mut() {
2758            if multiplier > 1.5 {
2759                // Poor network conditions - be more patient
2760                validation.timeout_state.backoff_multiplier =
2761                    (validation.timeout_state.backoff_multiplier * 1.2)
2762                        .min(validation.timeout_state.max_backoff_multiplier);
2763            } else if multiplier < 0.8 {
2764                // Good network conditions - be more aggressive
2765                validation.timeout_state.backoff_multiplier =
2766                    (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2767            }
2768        }
2769    }
2770
2771    /// Check if coordination should be retried now
2772    pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2773        if let Some(coord) = &self.coordination {
2774            if coord.retry_count > 0 {
2775                if let Some(last_retry) = coord.last_retry_at {
2776                    let retry_delay = coord.timeout_state.get_retry_delay();
2777                    return now.duration_since(last_retry) >= retry_delay;
2778                }
2779            }
2780        }
2781        false
2782    }
2783
2784    /// Perform resource management and cleanup
2785    pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2786        // Update resource usage statistics
2787        self.resource_manager.update_stats(
2788            self.active_validations.len(),
2789            self.local_candidates.len(),
2790            self.remote_candidates.len(),
2791            self.candidate_pairs.len(),
2792        );
2793
2794        // Calculate current memory pressure
2795        let memory_pressure = self.resource_manager.calculate_memory_pressure(
2796            self.active_validations.len(),
2797            self.local_candidates.len(),
2798            self.remote_candidates.len(),
2799            self.candidate_pairs.len(),
2800        );
2801
2802        // Perform cleanup if needed
2803        let mut cleaned = 0;
2804
2805        if self.resource_manager.should_cleanup(now) {
2806            cleaned += self.resource_manager.cleanup_expired_resources(
2807                &mut self.active_validations,
2808                &mut self.local_candidates,
2809                &mut self.remote_candidates,
2810                &mut self.candidate_pairs,
2811                &mut self.coordination,
2812                now,
2813            );
2814
2815            // If memory pressure is high, perform aggressive cleanup
2816            if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2817                cleaned += self.resource_manager.aggressive_cleanup(
2818                    &mut self.active_validations,
2819                    &mut self.local_candidates,
2820                    &mut self.remote_candidates,
2821                    &mut self.candidate_pairs,
2822                    now,
2823                );
2824            }
2825        }
2826
2827        cleaned
2828    }
2829
2830    /// Check if we should reject new resources due to limits
2831    pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2832        // Update stats and check limits
2833        self.resource_manager.update_stats(
2834            self.active_validations.len(),
2835            self.local_candidates.len(),
2836            self.remote_candidates.len(),
2837            self.candidate_pairs.len(),
2838        );
2839        let memory_pressure = self.resource_manager.calculate_memory_pressure(
2840            self.active_validations.len(),
2841            self.local_candidates.len(),
2842            self.remote_candidates.len(),
2843            self.candidate_pairs.len(),
2844        );
2845        // Reject if memory pressure is too high
2846        if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2847            self.resource_manager.stats.allocation_failures += 1;
2848            return true;
2849        }
2850
2851        // Reject if hard limits are exceeded
2852        if self.resource_manager.check_resource_limits(self) {
2853            self.resource_manager.stats.allocation_failures += 1;
2854            return true;
2855        }
2856
2857        false
2858    }
2859
2860    /// Get the next timeout instant for NAT traversal operations
2861    pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2862        let mut next_timeout = None;
2863        // Check coordination timeout
2864        if let Some(coord) = &self.coordination {
2865            match coord.state {
2866                CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2867                    let timeout_at = coord.round_start + self.coordination_timeout;
2868                    next_timeout =
2869                        Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2870                }
2871                CoordinationPhase::Preparing => {
2872                    // Punch start time is when we should start punching
2873                    next_timeout = Some(
2874                        next_timeout
2875                            .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
2876                    );
2877                }
2878                CoordinationPhase::Punching | CoordinationPhase::Validating => {
2879                    // Check for coordination round timeout
2880                    let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2881                    next_timeout =
2882                        Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2883                }
2884                _ => {}
2885            }
2886        }
2887
2888        // Check validation timeouts
2889        for validation in self.active_validations.values() {
2890            let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2891            next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2892        }
2893
2894        // Check resource cleanup interval
2895        if self.resource_manager.should_cleanup(now) {
2896            // Schedule cleanup soon
2897            let cleanup_at = now + Duration::from_secs(1);
2898            next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2899        }
2900
2901        next_timeout
2902    }
2903
2904    /// Handle timeout events and return actions to take
2905    pub(super) fn handle_timeout(
2906        &mut self,
2907        now: Instant,
2908    ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2909        let mut actions = Vec::new();
2910        // Handle coordination timeouts
2911        if let Some(coord) = &mut self.coordination {
2912            match coord.state {
2913                CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2914                    let timeout_at = coord.round_start + self.coordination_timeout;
2915                    if now >= timeout_at {
2916                        coord.retry_count += 1;
2917                        if coord.retry_count >= coord.max_retries {
2918                            debug!("Coordination failed after {} retries", coord.retry_count);
2919                            coord.state = CoordinationPhase::Failed;
2920                            actions.push(TimeoutAction::Failed);
2921                        } else {
2922                            debug!(
2923                                "Coordination timeout, retrying ({}/{})",
2924                                coord.retry_count, coord.max_retries
2925                            );
2926                            coord.state = CoordinationPhase::Requesting;
2927                            coord.round_start = now;
2928                            actions.push(TimeoutAction::RetryCoordination);
2929                        }
2930                    }
2931                }
2932                CoordinationPhase::Preparing => {
2933                    // Check if it's time to start punching
2934                    if now >= coord.punch_start {
2935                        debug!("Starting coordinated hole punching");
2936                        coord.state = CoordinationPhase::Punching;
2937                        actions.push(TimeoutAction::StartValidation);
2938                    }
2939                }
2940                CoordinationPhase::Punching | CoordinationPhase::Validating => {
2941                    let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2942                    if now >= timeout_at {
2943                        coord.retry_count += 1;
2944                        if coord.retry_count >= coord.max_retries {
2945                            debug!("Validation failed after {} retries", coord.retry_count);
2946                            coord.state = CoordinationPhase::Failed;
2947                            actions.push(TimeoutAction::Failed);
2948                        } else {
2949                            debug!(
2950                                "Validation timeout, retrying ({}/{})",
2951                                coord.retry_count, coord.max_retries
2952                            );
2953                            coord.state = CoordinationPhase::Punching;
2954                            actions.push(TimeoutAction::StartValidation);
2955                        }
2956                    }
2957                }
2958                CoordinationPhase::Succeeded => {
2959                    actions.push(TimeoutAction::Complete);
2960                }
2961                CoordinationPhase::Failed => {
2962                    actions.push(TimeoutAction::Failed);
2963                }
2964                _ => {}
2965            }
2966        }
2967
2968        // Handle validation timeouts
2969        let mut expired_validations = Vec::new();
2970        for (addr, validation) in &mut self.active_validations {
2971            let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2972            if now >= timeout_at {
2973                validation.retry_count += 1;
2974                if validation.retry_count >= validation.max_retries {
2975                    debug!("Path validation failed for {}: max retries exceeded", addr);
2976                    expired_validations.push(*addr);
2977                } else {
2978                    debug!(
2979                        "Path validation timeout for {}, retrying ({}/{})",
2980                        addr, validation.retry_count, validation.max_retries
2981                    );
2982                    validation.sent_at = now;
2983                    validation.last_retry_at = Some(now);
2984                    actions.push(TimeoutAction::StartValidation);
2985                }
2986            }
2987        }
2988
2989        // Remove expired validations
2990        for addr in expired_validations {
2991            self.active_validations.remove(&addr);
2992        }
2993
2994        // Handle resource cleanup
2995        if self.resource_manager.should_cleanup(now) {
2996            self.resource_manager.perform_cleanup(now);
2997        }
2998
2999        // Update network condition monitoring
3000        self.network_monitor.update_quality_score(now);
3001
3002        // If no coordination is active and we have candidates, try to start discovery
3003        if self.coordination.is_none()
3004            && !self.local_candidates.is_empty()
3005            && !self.remote_candidates.is_empty()
3006        {
3007            actions.push(TimeoutAction::RetryDiscovery);
3008        }
3009
3010        Ok(actions)
3011    }
3012
3013    /// Handle address observation for bootstrap nodes
3014    ///
3015    /// This method is called when a peer connects to this bootstrap node,
3016    /// allowing the bootstrap to observe the peer's public address.
3017    pub(super) fn handle_address_observation(
3018        &mut self,
3019        peer_id: [u8; 32],
3020        observed_address: SocketAddr,
3021        connection_id: crate::shared::ConnectionId,
3022        peer_role: NatTraversalRole,
3023        now: Instant,
3024    ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3025        if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3026            let connection_context = ConnectionContext {
3027                connection_id,
3028                original_destination: observed_address, // For now, use same as observed
3029                peer_role,
3030                transport_params: None,
3031            };
3032
3033            // Observe the peer's address
3034            bootstrap_coordinator.observe_peer_address(
3035                peer_id,
3036                observed_address,
3037                connection_context,
3038                now,
3039            )?;
3040
3041            // Generate ADD_ADDRESS frame to inform peer of their observed address
3042            let sequence = self.next_sequence;
3043            self.next_sequence =
3044                VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3045
3046            let priority = VarInt::from_u32(100); // Server-reflexive priority
3047            let add_address_frame =
3048                bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3049
3050            Ok(add_address_frame)
3051        } else {
3052            // Not a bootstrap node
3053            Ok(None)
3054        }
3055    }
3056
3057    /// Handle PUNCH_ME_NOW frame for bootstrap coordination
3058    ///
3059    /// This processes coordination requests from peers and facilitates
3060    /// hole punching between them.
3061    pub(super) fn handle_punch_me_now_frame(
3062        &mut self,
3063        from_peer: [u8; 32],
3064        source_addr: SocketAddr,
3065        frame: &crate::frame::PunchMeNow,
3066        now: Instant,
3067    ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3068        if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3069            bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3070        } else {
3071            // Not a bootstrap node - this frame should not be processed here
3072            Ok(None)
3073        }
3074    }
3075    /// Perform bootstrap cleanup operations
3076    ///
3077    /// Get observed address for a peer
3078    pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3079        self.bootstrap_coordinator
3080            .as_ref()
3081            .and_then(|coord| coord.get_peer_record(peer_id))
3082            .map(|record| record.observed_address)
3083    }
3084
3085    /// Start candidate discovery process
3086    pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3087        debug!("Starting candidate discovery for NAT traversal");
3088        // Initialize discovery state if needed
3089        if self.local_candidates.is_empty() {
3090            // Add local interface candidates
3091            // This would be populated by the candidate discovery manager
3092            debug!("Local candidates will be populated by discovery manager");
3093        }
3094
3095        Ok(())
3096    }
3097
3098    /// Queue an ADD_ADDRESS frame for transmission
3099    pub(super) fn queue_add_address_frame(
3100        &mut self,
3101        sequence: VarInt,
3102        address: SocketAddr,
3103        priority: u32,
3104    ) -> Result<(), NatTraversalError> {
3105        debug!(
3106            "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3107            sequence, address, priority
3108        );
3109
3110        // Add to local candidates if not already present
3111        let candidate = AddressCandidate {
3112            address,
3113            priority,
3114            source: CandidateSource::Local,
3115            discovered_at: Instant::now(),
3116            state: CandidateState::New,
3117            attempt_count: 0,
3118            last_attempt: None,
3119        };
3120
3121        // Check if candidate already exists
3122        if !self.local_candidates.values().any(|c| c.address == address) {
3123            self.local_candidates.insert(sequence, candidate);
3124        }
3125
3126        Ok(())
3127    }
3128}
3129
3130/// Errors that can occur during NAT traversal
3131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3132pub(crate) enum NatTraversalError {
3133    /// Too many candidates received
3134    TooManyCandidates,
3135    /// Duplicate address for different sequence
3136    DuplicateAddress,
3137    /// Unknown candidate sequence
3138    UnknownCandidate,
3139    /// Candidate in wrong state for operation
3140    InvalidCandidateState,
3141    /// No active validation for address
3142    NoActiveValidation,
3143    /// Challenge value mismatch
3144    ChallengeMismatch,
3145    /// Coordination round not active
3146    NoActiveCoordination,
3147    /// Security validation failed
3148    SecurityValidationFailed,
3149    /// Rate limit exceeded
3150    RateLimitExceeded,
3151    /// Invalid address format
3152    InvalidAddress,
3153    /// Suspicious coordination request
3154    SuspiciousCoordination,
3155    /// Resource limit exceeded
3156    ResourceLimitExceeded,
3157}
3158impl std::fmt::Display for NatTraversalError {
3159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3160        match self {
3161            Self::TooManyCandidates => write!(f, "too many candidates"),
3162            Self::DuplicateAddress => write!(f, "duplicate address"),
3163            Self::UnknownCandidate => write!(f, "unknown candidate"),
3164            Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3165            Self::NoActiveValidation => write!(f, "no active validation"),
3166            Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3167            Self::NoActiveCoordination => write!(f, "no active coordination"),
3168            Self::SecurityValidationFailed => write!(f, "security validation failed"),
3169            Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3170            Self::InvalidAddress => write!(f, "invalid address"),
3171            Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3172            Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3173        }
3174    }
3175}
3176
3177impl std::error::Error for NatTraversalError {}
3178
3179/// Security statistics for monitoring and debugging
3180#[derive(Debug, Clone)]
3181pub(crate) struct SecurityStats {
3182    /// Total security rejections
3183    pub total_security_rejections: u32,
3184    /// Rate limiting violations
3185    pub rate_limit_violations: u32,
3186    /// Invalid address rejections
3187    pub invalid_address_rejections: u32,
3188    /// Suspicious coordination attempts
3189    pub suspicious_coordination_attempts: u32,
3190    /// Number of active validations
3191    pub active_validations: usize,
3192    /// Number of cached address validations
3193    pub cached_address_validations: usize,
3194    /// Current candidate addition rate
3195    pub current_candidate_rate: usize,
3196    /// Current coordination request rate
3197    pub current_coordination_rate: usize,
3198}
3199/// Bootstrap coordinator state machine for NAT traversal coordination
3200///
3201/// This manages the bootstrap node's role in observing client addresses,
3202/// coordinating hole punching, and relaying coordination messages.
3203#[derive(Debug)]
3204pub(crate) struct BootstrapCoordinator {
3205    /// Active peer registry with observed addresses
3206    peer_registry: HashMap<PeerId, PeerObservationRecord>,
3207    /// Active coordination sessions between peers
3208    coordination_sessions: HashMap<CoordinationSessionId, CoordinationSession>,
3209    /// Address observation cache for quick lookups
3210    address_observations: HashMap<SocketAddr, AddressObservation>,
3211    /// Security validator for coordination requests
3212    security_validator: SecurityValidationState,
3213    /// Statistics for bootstrap operations
3214    stats: BootstrapStats,
3215    /// Configuration for bootstrap behavior (stub)
3216    _config: BootstrapConfig,
3217    /// Last cleanup time (stub)
3218    _last_cleanup: Option<Instant>,
3219}
3220/// Unique identifier for coordination sessions
3221type CoordinationSessionId = u64;
3222/// Peer identifier for bootstrap coordination
3223type PeerId = [u8; 32];
3224/// Record of observed peer information
3225#[derive(Debug, Clone)]
3226pub(crate) struct PeerObservationRecord {
3227    /// The peer's unique identifier
3228    peer_id: PeerId,
3229    /// Last observed public address
3230    observed_address: SocketAddr,
3231    /// When this observation was made
3232    observed_at: Instant,
3233    /// Connection context for this observation
3234    connection_context: ConnectionContext,
3235    /// Whether this peer can participate in coordination
3236    can_coordinate: bool,
3237    /// Number of successful coordinations
3238    coordination_count: u32,
3239    /// Average coordination success rate
3240    success_rate: f64,
3241}
3242
3243/// Connection context for address observations
3244#[derive(Debug, Clone)]
3245pub(crate) struct ConnectionContext {
3246    /// Connection ID for this observation
3247    connection_id: ConnectionId,
3248    /// Original destination address (what peer thought it was connecting to)
3249    original_destination: SocketAddr,
3250    /// NAT traversal role of the connecting peer
3251    peer_role: NatTraversalRole,
3252    /// Transport parameters received from peer
3253    transport_params: Option<NatTraversalTransportParams>,
3254}
3255
3256/// Transport parameters for NAT traversal
3257#[derive(Debug, Clone)]
3258struct NatTraversalTransportParams {
3259    /// Maximum candidates this peer can handle
3260    max_candidates: u32,
3261    /// Coordination timeout for this peer
3262    coordination_timeout: Duration,
3263    /// Whether this peer supports advanced features
3264    supports_advanced_features: bool,
3265}
3266
3267/// Address observation with validation
3268#[derive(Debug, Clone)]
3269struct AddressObservation {
3270    /// The observed address
3271    address: SocketAddr,
3272    /// When this address was first observed
3273    first_observed: Instant,
3274    /// How many times this address has been observed
3275    observation_count: u32,
3276    /// Validation state for this address
3277    validation_state: AddressValidationResult,
3278    /// Associated peer IDs for this address
3279    associated_peers: Vec<PeerId>,
3280}
3281
3282/// Active coordination session between two peers
3283#[derive(Debug, Clone)]
3284pub(crate) struct CoordinationSession {
3285    /// Unique session identifier
3286    session_id: CoordinationSessionId,
3287    /// First peer in coordination
3288    peer_a: PeerId,
3289    /// Second peer in coordination
3290    peer_b: PeerId,
3291    /// Current coordination round
3292    current_round: VarInt,
3293    /// When this session started
3294    started_at: Instant,
3295    /// Current phase of coordination
3296    phase: CoordinationPhase,
3297    /// Target addresses for hole punching
3298    target_addresses: Vec<(SocketAddr, VarInt)>, // (address, sequence)
3299    /// Synchronization state
3300    sync_state: SynchronizationState,
3301    /// Session statistics
3302    stats: CoordinationSessionStats,
3303}
3304/// Synchronization state for coordinated hole punching
3305#[derive(Debug, Clone)]
3306struct SynchronizationState {
3307    /// Confirmation from peer A
3308    peer_a_ready: bool,
3309    /// Confirmation from peer B
3310    peer_b_ready: bool,
3311}
3312/// Statistics for a coordination session
3313#[derive(Debug, Clone, Default)]
3314struct CoordinationSessionStats {
3315    /// Number of successful coordinations
3316    successful_coordinations: u32,
3317}
3318/// Pending coordination request awaiting peer participation (stub implementation)
3319/// Configuration for bootstrap coordinator behavior (stub implementation)
3320#[derive(Debug, Clone, Default)]
3321pub(crate) struct BootstrapConfig {
3322    _unused: (),
3323}
3324/// Statistics for bootstrap operations
3325#[derive(Debug, Clone, Default)]
3326pub(crate) struct BootstrapStats {
3327    /// Total address observations made
3328    total_observations: u64,
3329    /// Total coordination sessions facilitated
3330    total_coordinations: u64,
3331    /// Successful coordinations
3332    successful_coordinations: u64,
3333    /// Active peer count
3334    active_peers: usize,
3335    /// Active coordination sessions
3336    active_sessions: usize,
3337    /// Security rejections
3338    security_rejections: u64,
3339}
3340/// Events generated by the coordination session state machine
3341#[derive(Debug, Clone)]
3342pub(crate) enum CoordinationSessionEvent {
3343    /// Session phase changed
3344    PhaseChanged {
3345        session_id: CoordinationSessionId,
3346        old_phase: CoordinationPhase,
3347        new_phase: CoordinationPhase,
3348    },
3349    /// Session failed with reason
3350    SessionFailed {
3351        session_id: CoordinationSessionId,
3352        peer_a: PeerId,
3353        peer_b: PeerId,
3354        reason: String,
3355    },
3356    /// Start hole punching for session
3357    StartHolePunching {
3358        session_id: CoordinationSessionId,
3359        peer_a: PeerId,
3360        peer_b: PeerId,
3361        target_addresses: Vec<(SocketAddr, VarInt)>,
3362    },
3363    /// Session ready for cleanup
3364    ReadyForCleanup { session_id: CoordinationSessionId },
3365}
3366/// Events that trigger session state advancement
3367#[derive(Debug, Clone, Copy)]
3368enum SessionAdvancementEvent {
3369    /// Both peers are ready for coordination
3370    BothPeersReady,
3371    /// Coordination phase completed
3372    CoordinationComplete,
3373    /// Preparation phase completed
3374    PreparationComplete,
3375    /// Hole punching phase completed
3376    PunchingComplete,
3377    /// Validation timed out
3378    ValidationTimeout,
3379    /// Session timed out
3380    Timeout,
3381    /// Session ready for cleanup
3382    ReadyForCleanup,
3383}
3384/// Recovery actions for coordination errors
3385#[derive(Debug, Clone, Copy)]
3386pub(crate) enum CoordinationRecoveryAction {
3387    /// No action needed
3388    NoAction,
3389    /// Retry with exponential backoff
3390    RetryWithBackoff,
3391    /// Mark session as failed
3392    MarkAsFailed,
3393    /// Clean up session
3394    Cleanup,
3395}
3396impl BootstrapCoordinator {
3397    /// Create a new bootstrap coordinator
3398    pub(crate) fn new(config: BootstrapConfig) -> Self {
3399        Self {
3400            peer_registry: HashMap::new(),
3401            coordination_sessions: HashMap::new(),
3402            address_observations: HashMap::new(),
3403            security_validator: SecurityValidationState::new(),
3404            stats: BootstrapStats::default(),
3405            _config: config,
3406            _last_cleanup: None,
3407        }
3408    }
3409    /// Observe a peer's address from an incoming connection
3410    ///
3411    /// This is called when a peer connects to this bootstrap node,
3412    /// allowing us to observe their public address.
3413    pub(crate) fn observe_peer_address(
3414        &mut self,
3415        peer_id: PeerId,
3416        observed_address: SocketAddr,
3417        connection_context: ConnectionContext,
3418        now: Instant,
3419    ) -> Result<(), NatTraversalError> {
3420        // Security validation
3421        match self
3422            .security_validator
3423            .validate_address(observed_address, now)
3424        {
3425            AddressValidationResult::Valid => {}
3426            AddressValidationResult::Invalid => {
3427                self.stats.security_rejections += 1;
3428                return Err(NatTraversalError::InvalidAddress);
3429            }
3430            AddressValidationResult::Suspicious => {
3431                self.stats.security_rejections += 1;
3432                return Err(NatTraversalError::SecurityValidationFailed);
3433            }
3434        }
3435
3436        // Rate limiting check
3437        if self.security_validator.is_candidate_rate_limited(now) {
3438            self.stats.security_rejections += 1;
3439            return Err(NatTraversalError::RateLimitExceeded);
3440        }
3441
3442        // Update address observation
3443        let observation = self
3444            .address_observations
3445            .entry(observed_address)
3446            .or_insert_with(|| AddressObservation {
3447                address: observed_address,
3448                first_observed: now,
3449                observation_count: 0,
3450                validation_state: AddressValidationResult::Valid,
3451                associated_peers: Vec::new(),
3452            });
3453
3454        observation.observation_count += 1;
3455        if !observation.associated_peers.contains(&peer_id) {
3456            observation.associated_peers.push(peer_id);
3457        }
3458
3459        // Update or create peer record
3460        let peer_record = PeerObservationRecord {
3461            peer_id,
3462            observed_address,
3463            observed_at: now,
3464            connection_context,
3465            can_coordinate: true, // Assume true initially
3466            coordination_count: 0,
3467            success_rate: 1.0,
3468        };
3469
3470        self.peer_registry.insert(peer_id, peer_record);
3471        self.stats.total_observations += 1;
3472        self.stats.active_peers = self.peer_registry.len();
3473
3474        debug!(
3475            "Observed peer {:?} at address {} (total observations: {})",
3476            peer_id, observed_address, self.stats.total_observations
3477        );
3478
3479        Ok(())
3480    }
3481
3482    /// Generate ADD_ADDRESS frame for a peer based on observation
3483    ///
3484    /// This creates an ADD_ADDRESS frame to inform a peer of their
3485    /// observed public address.
3486    pub(crate) fn generate_add_address_frame(
3487        &self,
3488        peer_id: PeerId,
3489        sequence: VarInt,
3490        priority: VarInt,
3491    ) -> Option<crate::frame::AddAddress> {
3492        self.peer_registry
3493            .get(&peer_id)
3494            .map(|peer_record| crate::frame::AddAddress {
3495                sequence,
3496                address: peer_record.observed_address,
3497                priority,
3498            })
3499    }
3500
3501    /// Process a PUNCH_ME_NOW frame from a peer
3502    ///
3503    /// This handles coordination requests from peers wanting to establish
3504    /// direct connections through NAT traversal.
3505    pub(crate) fn process_punch_me_now_frame(
3506        &mut self,
3507        from_peer: PeerId,
3508        source_addr: SocketAddr,
3509        frame: &crate::frame::PunchMeNow,
3510        now: Instant,
3511    ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3512        // Enhanced security validation with adaptive rate limiting
3513        if self
3514            .security_validator
3515            .is_adaptive_rate_limited(from_peer, now)
3516        {
3517            self.stats.security_rejections += 1;
3518            debug!(
3519                "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3520                hex::encode(&from_peer[..8])
3521            );
3522            return Err(NatTraversalError::RateLimitExceeded);
3523        }
3524        // Enhanced address validation with amplification protection
3525        self.security_validator
3526            .enhanced_address_validation(frame.address, source_addr, now)
3527            .inspect_err(|&e| {
3528                self.stats.security_rejections += 1;
3529                debug!(
3530                    "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3531                    hex::encode(&from_peer[..8]),
3532                    e
3533                );
3534            })?;
3535
3536        // Comprehensive security validation
3537        self.security_validator
3538            .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3539            .inspect_err(|&e| {
3540                self.stats.security_rejections += 1;
3541                debug!(
3542                    "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3543                    hex::encode(&from_peer[..8]),
3544                    e
3545                );
3546            })?;
3547
3548        // Check if we have a target peer for this coordination
3549        if let Some(target_peer_id) = frame.target_peer_id {
3550            // This is a coordination request that should be relayed
3551            if let Some(target_peer) = self.peer_registry.get(&target_peer_id) {
3552                // Create coordination session if it doesn't exist
3553                let session_id = self.generate_session_id();
3554
3555                if !self.coordination_sessions.contains_key(&session_id) {
3556                    // Calculate optimal coordination timing based on network conditions
3557                    let _network_rtt = self
3558                        .estimate_peer_rtt(&from_peer)
3559                        .unwrap_or(Duration::from_millis(100));
3560
3561                    let session = CoordinationSession {
3562                        session_id,
3563                        peer_a: from_peer,
3564                        peer_b: target_peer_id,
3565                        current_round: frame.round,
3566                        started_at: now,
3567                        phase: CoordinationPhase::Requesting,
3568                        target_addresses: vec![(frame.address, frame.paired_with_sequence_number)],
3569                        sync_state: SynchronizationState {
3570                            peer_a_ready: true, // Requesting peer is ready
3571                            peer_b_ready: false,
3572                        },
3573                        stats: CoordinationSessionStats::default(),
3574                    };
3575
3576                    self.coordination_sessions.insert(session_id, session);
3577                    self.stats.total_coordinations += 1;
3578                    self.stats.active_sessions = self.coordination_sessions.len();
3579                }
3580
3581                // Generate coordination frame to send to target peer
3582                let coordination_frame = crate::frame::PunchMeNow {
3583                    round: frame.round,
3584                    paired_with_sequence_number: frame.paired_with_sequence_number,
3585                    address: target_peer.observed_address,
3586                    target_peer_id: Some(from_peer),
3587                };
3588
3589                info!(
3590                    "Coordinating hole punch between {:?} and {:?} (round: {})",
3591                    from_peer, target_peer_id, frame.round
3592                );
3593
3594                Ok(Some(coordination_frame))
3595            } else {
3596                // Target peer not found
3597                warn!(
3598                    "Target peer {:?} not found for coordination from {:?}",
3599                    target_peer_id, from_peer
3600                );
3601                Ok(None)
3602            }
3603        } else {
3604            // This is a response to coordination - update session state
3605            let session_id = if let Some(session) =
3606                self.find_coordination_session_by_peer(from_peer, frame.round)
3607            {
3608                session.sync_state.peer_b_ready = true;
3609
3610                // If both peers are ready, coordination is complete
3611                if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3612                    session.phase = CoordinationPhase::Punching;
3613                    session.stats.successful_coordinations += 1;
3614                    Some(session.session_id)
3615                } else {
3616                    None
3617                }
3618            } else {
3619                None
3620            };
3621
3622            // Update stats after releasing the mutable borrow
3623            if let Some(session_id) = session_id {
3624                self.stats.successful_coordinations += 1;
3625                info!(
3626                    "Coordination complete for session {} (round: {})",
3627                    session_id, frame.round
3628                );
3629            }
3630
3631            Ok(None)
3632        }
3633    }
3634
3635    /// Find coordination session by peer and round
3636    fn find_coordination_session_by_peer(
3637        &mut self,
3638        peer_id: PeerId,
3639        round: VarInt,
3640    ) -> Option<&mut CoordinationSession> {
3641        self.coordination_sessions.values_mut().find(|session| {
3642            (session.peer_a == peer_id || session.peer_b == peer_id)
3643                && session.current_round == round
3644        })
3645    }
3646    /// Generate unique session ID
3647    fn generate_session_id(&self) -> CoordinationSessionId {
3648        rand::random()
3649    }
3650    /// Generate secure coordination round using cryptographically secure random values
3651    pub(crate) fn generate_secure_coordination_round(&self) -> VarInt {
3652        self.security_validator.generate_secure_coordination_round()
3653    }
3654
3655    /// Perform comprehensive security validation for coordination requests
3656    pub(crate) fn validate_coordination_security(
3657        &mut self,
3658        peer_id: PeerId,
3659        source_addr: SocketAddr,
3660        target_addr: SocketAddr,
3661        now: Instant,
3662    ) -> Result<(), NatTraversalError> {
3663        // Check adaptive rate limiting
3664        if self
3665            .security_validator
3666            .is_adaptive_rate_limited(peer_id, now)
3667        {
3668            self.stats.security_rejections += 1;
3669            return Err(NatTraversalError::RateLimitExceeded);
3670        }
3671
3672        // Perform enhanced address validation
3673        self.security_validator
3674            .enhanced_address_validation(target_addr, source_addr, now)?;
3675
3676        // Check amplification limits
3677        self.security_validator
3678            .validate_amplification_limits(source_addr, target_addr, now)?;
3679
3680        Ok(())
3681    }
3682
3683    /// Clean up expired sessions and perform maintenance
3684    pub(crate) fn cleanup_expired_sessions(&mut self, now: Instant) {
3685        let session_timeout = Duration::from_secs(300); // 5 minutes
3686
3687        // Collect expired session IDs
3688        let expired_sessions: Vec<CoordinationSessionId> = self
3689            .coordination_sessions
3690            .iter()
3691            .filter(|(_, session)| now.duration_since(session.started_at) > session_timeout)
3692            .map(|(&session_id, _)| session_id)
3693            .collect();
3694
3695        // Remove expired sessions
3696        for session_id in expired_sessions {
3697            if let Some(session) = self.coordination_sessions.remove(&session_id) {
3698                debug!(
3699                    "Cleaned up expired coordination session {} between {:?} and {:?}",
3700                    session_id,
3701                    hex::encode(&session.peer_a[..8]),
3702                    hex::encode(&session.peer_b[..8])
3703                );
3704            }
3705        }
3706
3707        // Update active session count
3708        self.stats.active_sessions = self.coordination_sessions.len();
3709
3710        // Clean up old peer observations
3711        let observation_timeout = Duration::from_secs(3600); // 1 hour
3712        self.peer_registry
3713            .retain(|_, record| now.duration_since(record.observed_at) <= observation_timeout);
3714
3715        // Update active peer count
3716        self.stats.active_peers = self.peer_registry.len();
3717
3718        // Clean up address observations
3719        self.address_observations.retain(|_, observation| {
3720            now.duration_since(observation.first_observed) <= observation_timeout
3721        });
3722    }
3723
3724    /// Get bootstrap statistics
3725    pub(crate) fn get_stats(&self) -> &BootstrapStats {
3726        &self.stats
3727    }
3728
3729    /// Update peer coordination statistics
3730    pub(crate) fn update_peer_coordination_stats(&mut self, peer_id: PeerId, success: bool) {
3731        if let Some(peer_record) = self.peer_registry.get_mut(&peer_id) {
3732            peer_record.coordination_count += 1;
3733
3734            if success {
3735                // Update success rate using exponential moving average
3736                let alpha = 0.1; // Learning rate
3737                peer_record.success_rate = peer_record.success_rate * (1.0 - alpha) + alpha;
3738            } else {
3739                // Decrease success rate
3740                let alpha = 0.1;
3741                peer_record.success_rate *= 1.0 - alpha;
3742            }
3743
3744            // Disable coordination for peers with very low success rates
3745            if peer_record.success_rate < 0.1 && peer_record.coordination_count > 10 {
3746                peer_record.can_coordinate = false;
3747                warn!(
3748                    "Disabled coordination for peer {:?} due to low success rate: {:.2}",
3749                    hex::encode(&peer_id[..8]),
3750                    peer_record.success_rate
3751                );
3752            }
3753        }
3754    }
3755
3756    /// Poll session state machine and advance coordination sessions
3757    ///
3758    /// This method implements the core session state machine polling logic
3759    /// with timeout handling, retry mechanisms, and error recovery.
3760    pub(crate) fn poll_session_state_machine(
3761        &mut self,
3762        now: Instant,
3763    ) -> Vec<CoordinationSessionEvent> {
3764        let mut events = Vec::new();
3765        let mut sessions_to_update = Vec::new();
3766
3767        // Collect sessions that need state machine advancement
3768        for (&session_id, session) in &self.coordination_sessions {
3769            if let Some(event) = self.should_advance_session(session, now) {
3770                sessions_to_update.push((session_id, event));
3771            }
3772        }
3773
3774        // Process session updates
3775        for (session_id, event) in sessions_to_update {
3776            let session_events =
3777                if let Some(session) = self.coordination_sessions.get_mut(&session_id) {
3778                    let peer_a = session.peer_a;
3779                    let peer_b = session.peer_b;
3780
3781                    match Self::advance_session_state_static(session, event, now) {
3782                        Ok(session_events) => session_events,
3783                        Err(e) => {
3784                            warn!("Failed to advance session {} state: {:?}", session_id, e);
3785                            // Mark session as failed
3786                            session.phase = CoordinationPhase::Failed;
3787                            vec![CoordinationSessionEvent::SessionFailed {
3788                                session_id,
3789                                peer_a,
3790                                peer_b,
3791                                reason: format!("State advancement error: {e:?}"),
3792                            }]
3793                        }
3794                    }
3795                } else {
3796                    Vec::new()
3797                };
3798
3799            events.extend(session_events);
3800        }
3801
3802        // Clean up completed or failed sessions
3803        self.cleanup_completed_sessions(now);
3804
3805        events
3806    }
3807
3808    /// Check if a session should advance its state
3809    fn should_advance_session(
3810        &self,
3811        session: &CoordinationSession,
3812        now: Instant,
3813    ) -> Option<SessionAdvancementEvent> {
3814        let session_age = now.duration_since(session.started_at);
3815
3816        match session.phase {
3817            CoordinationPhase::Requesting => {
3818                // Check if we've been waiting too long for peer responses
3819                if session_age > Duration::from_secs(10) {
3820                    Some(SessionAdvancementEvent::Timeout)
3821                } else if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3822                    Some(SessionAdvancementEvent::BothPeersReady)
3823                } else {
3824                    None
3825                }
3826            }
3827            CoordinationPhase::Coordinating => {
3828                // Move to preparing phase after brief coordination period
3829                if session_age > Duration::from_millis(500) {
3830                    Some(SessionAdvancementEvent::CoordinationComplete)
3831                } else {
3832                    None
3833                }
3834            }
3835            CoordinationPhase::Preparing => {
3836                // Move to punching phase after preparation period
3837                if session_age > Duration::from_secs(1) {
3838                    Some(SessionAdvancementEvent::PreparationComplete)
3839                } else {
3840                    None
3841                }
3842            }
3843            CoordinationPhase::Punching => {
3844                // Move to validation phase after punching period
3845                if session_age > Duration::from_secs(2) {
3846                    Some(SessionAdvancementEvent::PunchingComplete)
3847                } else {
3848                    None
3849                }
3850            }
3851            CoordinationPhase::Validating => {
3852                // Check for validation timeout
3853                if session_age > Duration::from_secs(10) {
3854                    Some(SessionAdvancementEvent::ValidationTimeout)
3855                } else {
3856                    None
3857                }
3858            }
3859            CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
3860                // Terminal states - check for cleanup
3861                if session_age > Duration::from_secs(60) {
3862                    Some(SessionAdvancementEvent::ReadyForCleanup)
3863                } else {
3864                    None
3865                }
3866            }
3867            CoordinationPhase::Idle => {
3868                // Should not happen in active sessions
3869                Some(SessionAdvancementEvent::Timeout)
3870            }
3871        }
3872    }
3873
3874    /// Advance session state based on event (static version to avoid borrowing issues)
3875    fn advance_session_state_static(
3876        session: &mut CoordinationSession,
3877        event: SessionAdvancementEvent,
3878        _now: Instant,
3879    ) -> Result<Vec<CoordinationSessionEvent>, NatTraversalError> {
3880        let mut events = Vec::new();
3881        let previous_phase = session.phase;
3882
3883        match (session.phase, event) {
3884            (CoordinationPhase::Requesting, SessionAdvancementEvent::BothPeersReady) => {
3885                session.phase = CoordinationPhase::Coordinating;
3886                debug!(
3887                    "Session {} advanced from Requesting to Coordinating",
3888                    session.session_id
3889                );
3890                events.push(CoordinationSessionEvent::PhaseChanged {
3891                    session_id: session.session_id,
3892                    old_phase: previous_phase,
3893                    new_phase: session.phase,
3894                });
3895            }
3896            (CoordinationPhase::Requesting, SessionAdvancementEvent::Timeout) => {
3897                session.phase = CoordinationPhase::Failed;
3898                warn!(
3899                    "Session {} timed out in Requesting phase",
3900                    session.session_id
3901                );
3902                events.push(CoordinationSessionEvent::SessionFailed {
3903                    session_id: session.session_id,
3904                    peer_a: session.peer_a,
3905                    peer_b: session.peer_b,
3906                    reason: "Timeout waiting for peer responses".to_string(),
3907                });
3908            }
3909            (CoordinationPhase::Coordinating, SessionAdvancementEvent::CoordinationComplete) => {
3910                session.phase = CoordinationPhase::Preparing;
3911                debug!(
3912                    "Session {} advanced from Coordinating to Preparing",
3913                    session.session_id
3914                );
3915                events.push(CoordinationSessionEvent::PhaseChanged {
3916                    session_id: session.session_id,
3917                    old_phase: previous_phase,
3918                    new_phase: session.phase,
3919                });
3920            }
3921            (CoordinationPhase::Preparing, SessionAdvancementEvent::PreparationComplete) => {
3922                session.phase = CoordinationPhase::Punching;
3923                debug!(
3924                    "Session {} advanced from Preparing to Punching",
3925                    session.session_id
3926                );
3927                events.push(CoordinationSessionEvent::PhaseChanged {
3928                    session_id: session.session_id,
3929                    old_phase: previous_phase,
3930                    new_phase: session.phase,
3931                });
3932                events.push(CoordinationSessionEvent::StartHolePunching {
3933                    session_id: session.session_id,
3934                    peer_a: session.peer_a,
3935                    peer_b: session.peer_b,
3936                    target_addresses: session.target_addresses.clone(),
3937                });
3938            }
3939            (CoordinationPhase::Punching, SessionAdvancementEvent::PunchingComplete) => {
3940                session.phase = CoordinationPhase::Validating;
3941                debug!(
3942                    "Session {} advanced from Punching to Validating",
3943                    session.session_id
3944                );
3945                events.push(CoordinationSessionEvent::PhaseChanged {
3946                    session_id: session.session_id,
3947                    old_phase: previous_phase,
3948                    new_phase: session.phase,
3949                });
3950            }
3951            (CoordinationPhase::Validating, SessionAdvancementEvent::ValidationTimeout) => {
3952                session.phase = CoordinationPhase::Failed;
3953                warn!("Session {} validation timed out", session.session_id);
3954                events.push(CoordinationSessionEvent::SessionFailed {
3955                    session_id: session.session_id,
3956                    peer_a: session.peer_a,
3957                    peer_b: session.peer_b,
3958                    reason: "Validation timeout".to_string(),
3959                });
3960            }
3961            (phase, SessionAdvancementEvent::ReadyForCleanup) => {
3962                debug!(
3963                    "Session {} ready for cleanup in phase {:?}",
3964                    session.session_id, phase
3965                );
3966                events.push(CoordinationSessionEvent::ReadyForCleanup {
3967                    session_id: session.session_id,
3968                });
3969            }
3970            _ => {
3971                // Invalid state transition - log warning but don't fail
3972                warn!(
3973                    "Invalid state transition for session {}: {:?} -> {:?}",
3974                    session.session_id, session.phase, event
3975                );
3976            }
3977        }
3978
3979        Ok(events)
3980    }
3981
3982    /// Clean up completed or failed sessions
3983    fn cleanup_completed_sessions(&mut self, now: Instant) {
3984        let cleanup_timeout = Duration::from_secs(300); // 5 minutes
3985
3986        let sessions_to_remove: Vec<CoordinationSessionId> = self
3987            .coordination_sessions
3988            .iter()
3989            .filter(|(_, session)| {
3990                matches!(
3991                    session.phase,
3992                    CoordinationPhase::Succeeded | CoordinationPhase::Failed
3993                ) && now.duration_since(session.started_at) > cleanup_timeout
3994            })
3995            .map(|(&session_id, _)| session_id)
3996            .collect();
3997
3998        for session_id in sessions_to_remove {
3999            if let Some(session) = self.coordination_sessions.remove(&session_id) {
4000                debug!(
4001                    "Cleaned up completed session {} in phase {:?}",
4002                    session_id, session.phase
4003                );
4004            }
4005        }
4006
4007        self.stats.active_sessions = self.coordination_sessions.len();
4008    }
4009
4010    /// Implement retry mechanism with exponential backoff
4011    ///
4012    /// This method handles retry logic for failed coordination attempts
4013    /// with exponential backoff to avoid overwhelming the network.
4014    pub(crate) fn retry_failed_coordination(
4015        &mut self,
4016        session_id: CoordinationSessionId,
4017        now: Instant,
4018    ) -> Result<bool, NatTraversalError> {
4019        let session = self
4020            .coordination_sessions
4021            .get_mut(&session_id)
4022            .ok_or(NatTraversalError::NoActiveCoordination)?;
4023
4024        // Check if session is in a retryable state
4025        if !matches!(session.phase, CoordinationPhase::Failed) {
4026            return Ok(false);
4027        }
4028
4029        // Calculate retry delay with exponential backoff
4030        let base_delay = Duration::from_secs(1);
4031        let max_delay = Duration::from_secs(60);
4032        let retry_count = session.stats.successful_coordinations; // Reuse this field for retry count
4033
4034        let delay = std::cmp::min(
4035            base_delay * 2_u32.pow(retry_count.min(10)), // Cap at 2^10 to prevent overflow
4036            max_delay,
4037        );
4038
4039        // Add jitter to prevent thundering herd
4040        let _jitter_factor = 0.1;
4041        let jitter =
4042            Duration::from_millis((rand::random::<u64>() % 100) * delay.as_millis() as u64 / 1000);
4043        let total_delay = delay + jitter;
4044
4045        // Check if enough time has passed for retry
4046        if now.duration_since(session.started_at) < total_delay {
4047            return Ok(false);
4048        }
4049
4050        // Check retry limits
4051        const MAX_RETRIES: u32 = 5;
4052        if retry_count >= MAX_RETRIES {
4053            warn!(
4054                "Session {} exceeded maximum retry attempts ({})",
4055                session_id, MAX_RETRIES
4056            );
4057            return Ok(false);
4058        }
4059
4060        // Reset session for retry
4061        session.phase = CoordinationPhase::Requesting;
4062        session.started_at = now;
4063        session.sync_state.peer_a_ready = false;
4064        session.sync_state.peer_b_ready = false;
4065        session.stats.successful_coordinations += 1; // Increment retry count
4066
4067        info!(
4068            "Retrying coordination session {} (attempt {})",
4069            session_id,
4070            retry_count + 1
4071        );
4072        Ok(true)
4073    }
4074
4075    /// Handle coordination errors with appropriate recovery strategies
4076    pub(crate) fn handle_coordination_error(
4077        &mut self,
4078        session_id: CoordinationSessionId,
4079        error: NatTraversalError,
4080        _now: Instant,
4081    ) -> CoordinationRecoveryAction {
4082        let session = match self.coordination_sessions.get_mut(&session_id) {
4083            Some(session) => session,
4084            None => return CoordinationRecoveryAction::NoAction,
4085        };
4086
4087        match error {
4088            NatTraversalError::RateLimitExceeded => {
4089                // Temporary error - retry with backoff
4090                warn!("Rate limit exceeded for session {}, will retry", session_id);
4091                CoordinationRecoveryAction::RetryWithBackoff
4092            }
4093            NatTraversalError::SecurityValidationFailed
4094            | NatTraversalError::SuspiciousCoordination => {
4095                // Security error - mark session as failed and don't retry
4096                session.phase = CoordinationPhase::Failed;
4097                warn!(
4098                    "Security validation failed for session {}, marking as failed",
4099                    session_id
4100                );
4101                CoordinationRecoveryAction::MarkAsFailed
4102            }
4103            NatTraversalError::InvalidAddress => {
4104                // Address error - might be temporary, allow limited retries
4105                warn!("Invalid address in session {}, allowing retry", session_id);
4106                CoordinationRecoveryAction::RetryWithBackoff
4107            }
4108            NatTraversalError::NoActiveCoordination => {
4109                // Session state error - clean up
4110                warn!(
4111                    "No active coordination for session {}, cleaning up",
4112                    session_id
4113                );
4114                CoordinationRecoveryAction::Cleanup
4115            }
4116            _ => {
4117                // Other errors - generic retry with backoff
4118                warn!(
4119                    "Coordination error for session {}: {:?}, will retry",
4120                    session_id, error
4121                );
4122                CoordinationRecoveryAction::RetryWithBackoff
4123            }
4124        }
4125    }
4126
4127    /// Estimate RTT to a specific peer based on observations
4128    fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
4129        // Simple estimation based on peer record
4130        // In a real implementation, this would use historical RTT data
4131        self.peer_registry
4132            .get(peer_id)
4133            .map(|_peer_record| Duration::from_millis(100))
4134    }
4135    /// Coordinate hole punching between two peers
4136    ///
4137    /// This method implements the core coordination logic for establishing
4138    /// direct P2P connections through NAT traversal.
4139    pub(crate) fn coordinate_hole_punching(
4140        &mut self,
4141        peer_a: PeerId,
4142        peer_b: PeerId,
4143        round: VarInt,
4144        now: Instant,
4145    ) -> Result<CoordinationSessionId, NatTraversalError> {
4146        // Validate that both peers are known and can coordinate
4147        let peer_a_record = self
4148            .peer_registry
4149            .get(&peer_a)
4150            .ok_or(NatTraversalError::UnknownCandidate)?;
4151        let peer_b_record = self
4152            .peer_registry
4153            .get(&peer_b)
4154            .ok_or(NatTraversalError::UnknownCandidate)?;
4155
4156        if !peer_a_record.can_coordinate || !peer_b_record.can_coordinate {
4157            return Err(NatTraversalError::InvalidCandidateState);
4158        }
4159
4160        // Generate unique session ID
4161        let session_id = self.generate_session_id();
4162
4163        // Create coordination session
4164        let session = CoordinationSession {
4165            session_id,
4166            peer_a,
4167            peer_b,
4168            current_round: round,
4169            started_at: now,
4170            phase: CoordinationPhase::Requesting,
4171            target_addresses: vec![
4172                (peer_a_record.observed_address, VarInt::from_u32(0)),
4173                (peer_b_record.observed_address, VarInt::from_u32(1)),
4174            ],
4175            sync_state: SynchronizationState {
4176                peer_a_ready: false,
4177                peer_b_ready: false,
4178            },
4179            stats: CoordinationSessionStats::default(),
4180        };
4181
4182        self.coordination_sessions.insert(session_id, session);
4183        self.stats.total_coordinations += 1;
4184        self.stats.active_sessions = self.coordination_sessions.len();
4185
4186        info!(
4187            "Started coordination session {} between peers {:?} and {:?} (round: {})",
4188            session_id,
4189            hex::encode(&peer_a[..8]),
4190            hex::encode(&peer_b[..8]),
4191            round
4192        );
4193
4194        Ok(session_id)
4195    }
4196
4197    /// Relay coordination frame between peers
4198    ///
4199    /// This method handles the relay of coordination messages between peers
4200    /// to facilitate synchronized hole punching.
4201    pub(crate) fn relay_coordination_frame(
4202        &mut self,
4203        session_id: CoordinationSessionId,
4204        from_peer: PeerId,
4205        frame: &crate::frame::PunchMeNow,
4206        _now: Instant,
4207    ) -> Result<Option<(PeerId, crate::frame::PunchMeNow)>, NatTraversalError> {
4208        let session = self
4209            .coordination_sessions
4210            .get_mut(&session_id)
4211            .ok_or(NatTraversalError::NoActiveCoordination)?;
4212
4213        // Validate that the sender is part of this session
4214        if session.peer_a != from_peer && session.peer_b != from_peer {
4215            return Err(NatTraversalError::SuspiciousCoordination);
4216        }
4217
4218        // Determine target peer
4219        let target_peer = if session.peer_a == from_peer {
4220            session.peer_b
4221        } else {
4222            session.peer_a
4223        };
4224
4225        // Get target peer's observed address
4226        let target_record = self
4227            .peer_registry
4228            .get(&target_peer)
4229            .ok_or(NatTraversalError::UnknownCandidate)?;
4230
4231        // Update session state based on frame
4232        if session.peer_a == from_peer {
4233            session.sync_state.peer_a_ready = true;
4234        } else {
4235            session.sync_state.peer_b_ready = true;
4236        }
4237
4238        // Create relay frame with target peer's information
4239        let relay_frame = crate::frame::PunchMeNow {
4240            round: frame.round,
4241            paired_with_sequence_number: frame.paired_with_sequence_number,
4242            address: target_record.observed_address,
4243            target_peer_id: Some(from_peer),
4244        };
4245
4246        // Check if coordination is complete
4247        if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4248            session.phase = CoordinationPhase::Coordinating;
4249            info!(
4250                "Coordination phase complete for session {} - both peers ready",
4251                session_id
4252            );
4253        }
4254
4255        debug!(
4256            "Relaying coordination frame from {:?} to {:?} in session {}",
4257            hex::encode(&from_peer[..8]),
4258            hex::encode(&target_peer[..8]),
4259            session_id
4260        );
4261
4262        Ok(Some((target_peer, relay_frame)))
4263    }
4264
4265    /// Implement round-based synchronization protocol
4266    ///
4267    /// This method manages the timing and synchronization of hole punching rounds
4268    /// to maximize the chances of successful NAT traversal.
4269    pub(crate) fn advance_coordination_round(
4270        &mut self,
4271        session_id: CoordinationSessionId,
4272        now: Instant,
4273    ) -> Result<CoordinationPhase, NatTraversalError> {
4274        let session = self
4275            .coordination_sessions
4276            .get_mut(&session_id)
4277            .ok_or(NatTraversalError::NoActiveCoordination)?;
4278
4279        let previous_phase = session.phase;
4280
4281        // Advance the state machine based on current phase and timing
4282        match session.phase {
4283            CoordinationPhase::Requesting => {
4284                // Wait for both peers to send PUNCH_ME_NOW frames
4285                if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4286                    session.phase = CoordinationPhase::Coordinating;
4287                    debug!("Session {} advanced to Coordinating phase", session_id);
4288                }
4289            }
4290            CoordinationPhase::Coordinating => {
4291                // Calculate synchronized punch time
4292                let coordination_delay = Duration::from_millis(200); // Grace period
4293                let punch_time = now + coordination_delay;
4294
4295                session.phase = CoordinationPhase::Preparing;
4296                debug!(
4297                    "Session {} advanced to Preparing phase, punch time: {:?}",
4298                    session_id, punch_time
4299                );
4300            }
4301            CoordinationPhase::Preparing => {
4302                // Transition to active hole punching
4303                session.phase = CoordinationPhase::Punching;
4304                debug!("Session {} advanced to Punching phase", session_id);
4305            }
4306            CoordinationPhase::Punching => {
4307                // Wait for validation results
4308                session.phase = CoordinationPhase::Validating;
4309                debug!("Session {} advanced to Validating phase", session_id);
4310            }
4311            CoordinationPhase::Validating => {
4312                // Check for timeout or success
4313                let validation_timeout = Duration::from_secs(5);
4314                if now.duration_since(session.started_at) > validation_timeout {
4315                    session.phase = CoordinationPhase::Failed;
4316                    debug!("Session {} timed out in validation", session_id);
4317                }
4318            }
4319            CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4320                // Terminal states - no further advancement
4321            }
4322            CoordinationPhase::Idle => {
4323                // Should not happen in active sessions
4324                session.phase = CoordinationPhase::Requesting;
4325            }
4326        }
4327
4328        // Update statistics if phase changed
4329        if session.phase != previous_phase {
4330            match session.phase {
4331                CoordinationPhase::Succeeded => {
4332                    session.stats.successful_coordinations += 1;
4333                    self.stats.successful_coordinations += 1;
4334                }
4335                CoordinationPhase::Failed => {
4336                    // Update failure statistics
4337                }
4338                _ => {}
4339            }
4340        }
4341
4342        Ok(session.phase)
4343    }
4344
4345    /// Get coordination session by ID
4346    pub(crate) fn get_coordination_session(
4347        &self,
4348        session_id: CoordinationSessionId,
4349    ) -> Option<&CoordinationSession> {
4350        self.coordination_sessions.get(&session_id)
4351    }
4352
4353    /// Get mutable coordination session by ID
4354    pub(crate) fn get_coordination_session_mut(
4355        &mut self,
4356        session_id: CoordinationSessionId,
4357    ) -> Option<&mut CoordinationSession> {
4358        self.coordination_sessions.get_mut(&session_id)
4359    }
4360
4361    /// Mark coordination session as successful
4362    pub(crate) fn mark_coordination_success(
4363        &mut self,
4364        session_id: CoordinationSessionId,
4365        _now: Instant,
4366    ) -> Result<(), NatTraversalError> {
4367        let session = self
4368            .coordination_sessions
4369            .get_mut(&session_id)
4370            .ok_or(NatTraversalError::NoActiveCoordination)?;
4371
4372        session.phase = CoordinationPhase::Succeeded;
4373        session.stats.successful_coordinations += 1;
4374        self.stats.successful_coordinations += 1;
4375
4376        // Update peer success rates
4377        if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4378            peer_a_record.coordination_count += 1;
4379            peer_a_record.success_rate =
4380                (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64 + 1.0)
4381                    / peer_a_record.coordination_count as f64;
4382        }
4383
4384        if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4385            peer_b_record.coordination_count += 1;
4386            peer_b_record.success_rate =
4387                (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64 + 1.0)
4388                    / peer_b_record.coordination_count as f64;
4389        }
4390
4391        info!("Coordination session {} marked as successful", session_id);
4392        Ok(())
4393    }
4394
4395    /// Mark coordination session as failed
4396    pub(crate) fn mark_coordination_failure(
4397        &mut self,
4398        session_id: CoordinationSessionId,
4399        reason: &str,
4400        _now: Instant,
4401    ) -> Result<(), NatTraversalError> {
4402        let session = self
4403            .coordination_sessions
4404            .get_mut(&session_id)
4405            .ok_or(NatTraversalError::NoActiveCoordination)?;
4406
4407        session.phase = CoordinationPhase::Failed;
4408
4409        // Update peer success rates
4410        if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4411            peer_a_record.coordination_count += 1;
4412            peer_a_record.success_rate = (peer_a_record.success_rate
4413                * (peer_a_record.coordination_count - 1) as f64)
4414                / peer_a_record.coordination_count as f64;
4415        }
4416
4417        if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4418            peer_b_record.coordination_count += 1;
4419            peer_b_record.success_rate = (peer_b_record.success_rate
4420                * (peer_b_record.coordination_count - 1) as f64)
4421                / peer_b_record.coordination_count as f64;
4422        }
4423
4424        warn!("Coordination session {} failed: {}", session_id, reason);
4425        Ok(())
4426    }
4427
4428    /// Get peer observation record
4429    pub(crate) fn get_peer_record(&self, peer_id: PeerId) -> Option<&PeerObservationRecord> {
4430        self.peer_registry.get(&peer_id)
4431    }
4432}
4433
4434// Multi-destination packet transmission manager for NAT traversal
4435//
4436// This component handles simultaneous packet transmission to multiple candidate
4437// addresses during hole punching attempts, maximizing the chances of successful
4438// NAT traversal by sending packets to all viable destinations concurrently.
4439// TODO: Implement multi-path transmission infrastructure when needed
4440// This would include MultiDestinationTransmitter for sending packets to multiple
4441// destinations simultaneously for improved NAT traversal success rates.
4442// TODO: Fix nat_traversal_tests module imports
4443// #[cfg(test)]
4444// #[path = "nat_traversal_tests.rs"]
4445// mod tests;
4446
4447#[cfg(test)]
4448mod tests {
4449    use super::*;
4450
4451    fn create_test_state(role: NatTraversalRole) -> NatTraversalState {
4452        NatTraversalState::new(
4453            role,
4454            10,                      // max_candidates
4455            Duration::from_secs(30), // coordination_timeout
4456        )
4457    }
4458
4459    #[test]
4460    fn test_add_quic_discovered_address() {
4461        // Test that QUIC-discovered addresses are properly added as local candidates
4462        let mut state = create_test_state(NatTraversalRole::Client);
4463        let now = Instant::now();
4464
4465        // Add a QUIC-discovered address (using add_local_candidate with Observed source)
4466        let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
4467        let seq = state.add_local_candidate(
4468            discovered_addr,
4469            CandidateSource::Observed { by_node: None },
4470            now,
4471        );
4472
4473        // Verify it was added correctly
4474        assert_eq!(state.local_candidates.len(), 1);
4475        let candidate = state.local_candidates.get(&seq).unwrap();
4476        assert_eq!(candidate.address, discovered_addr);
4477        assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4478        assert_eq!(candidate.state, CandidateState::New);
4479
4480        // Verify priority is set appropriately for server-reflexive
4481        assert!(candidate.priority > 0);
4482    }
4483
4484    #[test]
4485    fn test_add_multiple_quic_discovered_addresses() {
4486        // Test adding multiple QUIC-discovered addresses
4487        let mut state = create_test_state(NatTraversalRole::Client);
4488        let now = Instant::now();
4489
4490        let addrs = vec![
4491            SocketAddr::from(([1, 2, 3, 4], 5678)),
4492            SocketAddr::from(([5, 6, 7, 8], 9012)),
4493            SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
4494        ];
4495
4496        let mut sequences = Vec::new();
4497        for addr in &addrs {
4498            let seq =
4499                state.add_local_candidate(*addr, CandidateSource::Observed { by_node: None }, now);
4500            sequences.push(seq);
4501        }
4502
4503        // Verify all were added
4504        assert_eq!(state.local_candidates.len(), 3);
4505
4506        // Verify each address
4507        for (seq, addr) in sequences.iter().zip(&addrs) {
4508            let candidate = state.local_candidates.get(seq).unwrap();
4509            assert_eq!(candidate.address, *addr);
4510            assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4511        }
4512    }
4513
4514    #[test]
4515    fn test_quic_discovered_addresses_in_local_candidates() {
4516        // Test that QUIC-discovered addresses are included in local candidates
4517        let mut state = create_test_state(NatTraversalRole::Client);
4518        let now = Instant::now();
4519
4520        // Add a discovered address
4521        let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4522        let seq = state.add_local_candidate(addr, CandidateSource::Observed { by_node: None }, now);
4523
4524        // Verify it's in local candidates for advertisement
4525        assert!(state.local_candidates.contains_key(&seq));
4526        let candidate = state.local_candidates.get(&seq).unwrap();
4527        assert_eq!(candidate.address, addr);
4528
4529        // Verify it has appropriate priority for server-reflexive
4530        assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4531    }
4532
4533    #[test]
4534    fn test_quic_discovered_addresses_included_in_hole_punching() {
4535        // Test that QUIC-discovered addresses are used in hole punching
4536        let mut state = create_test_state(NatTraversalRole::Client);
4537        let now = Instant::now();
4538
4539        // Add a local discovered address
4540        let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4541        state.add_local_candidate(local_addr, CandidateSource::Observed { by_node: None }, now);
4542
4543        // Add a remote candidate (using valid public IP, not documentation range)
4544        let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
4545        let priority = VarInt::from_u32(100);
4546        state
4547            .add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
4548            .expect("add remote candidate should succeed");
4549
4550        // Generate candidate pairs
4551        state.generate_candidate_pairs(now);
4552
4553        // Should have one pair
4554        assert_eq!(state.candidate_pairs.len(), 1);
4555        let pair = &state.candidate_pairs[0];
4556        assert_eq!(pair.local_addr, local_addr);
4557        assert_eq!(pair.remote_addr, remote_addr);
4558    }
4559
4560    #[test]
4561    fn test_prioritize_quic_discovered_over_predicted() {
4562        // Test that QUIC-discovered addresses have higher priority than predicted
4563        let mut state = create_test_state(NatTraversalRole::Client);
4564        let now = Instant::now();
4565
4566        // Add a predicted address
4567        let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
4568        let predicted_seq =
4569            state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
4570
4571        // Add a QUIC-discovered address
4572        let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
4573        let discovered_seq = state.add_local_candidate(
4574            discovered_addr,
4575            CandidateSource::Observed { by_node: None },
4576            now,
4577        );
4578
4579        // Compare priorities
4580        let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
4581        let discovered_priority = state
4582            .local_candidates
4583            .get(&discovered_seq)
4584            .unwrap()
4585            .priority;
4586
4587        // QUIC-discovered (server-reflexive) should have higher priority than predicted
4588        // Both are server-reflexive type, but observed addresses should get higher local preference
4589        assert!(discovered_priority >= predicted_priority);
4590    }
4591
4592    #[test]
4593    fn test_integration_with_nat_traversal_flow() {
4594        // Test full integration with NAT traversal flow
4595        let mut state = create_test_state(NatTraversalRole::Client);
4596        let now = Instant::now();
4597
4598        // Add both local interface and QUIC-discovered addresses
4599        let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
4600        state.add_local_candidate(local_addr, CandidateSource::Local, now);
4601
4602        let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
4603        state.add_local_candidate(
4604            discovered_addr,
4605            CandidateSource::Observed { by_node: None },
4606            now,
4607        );
4608
4609        // Add remote candidates (using valid public IPs)
4610        let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
4611        let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
4612        let priority = VarInt::from_u32(100);
4613        state
4614            .add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
4615            .expect("add remote candidate should succeed");
4616        state
4617            .add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
4618            .expect("add remote candidate should succeed");
4619
4620        // Generate candidate pairs
4621        state.generate_candidate_pairs(now);
4622
4623        // Should have 4 pairs (2 local × 2 remote)
4624        assert_eq!(state.candidate_pairs.len(), 4);
4625
4626        // Verify QUIC-discovered addresses are included
4627        let discovered_pairs: Vec<_> = state
4628            .candidate_pairs
4629            .iter()
4630            .filter(|p| p.local_addr == discovered_addr)
4631            .collect();
4632        assert_eq!(discovered_pairs.len(), 2);
4633    }
4634}