ant_quic/connection/
nat_traversal.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
4    time::Duration,
5};
6
7use crate::shared::ConnectionId;
8use tracing::{debug, info, trace, warn};
9
10use crate::{Instant, VarInt};
11
12/// NAT traversal state for a QUIC connection
13///
14/// This manages address candidate discovery, validation, and coordination
15/// for establishing direct P2P connections through NATs.
16#[derive(Debug)]
17pub(super) struct NatTraversalState {
18    /// Our role in NAT traversal (from transport parameters)
19    pub(super) role: NatTraversalRole,
20    /// Candidate addresses we've advertised to the peer
21    pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
22    /// Candidate addresses received from the peer
23    pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
24    /// Generated candidate pairs for connectivity testing
25    pub(super) candidate_pairs: Vec<CandidatePair>,
26    /// Index for fast pair lookup by remote address (maintained during generation)
27    pub(super) pair_index: HashMap<SocketAddr, usize>,
28    /// Currently active path validation attempts
29    pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
30    /// Coordination state for simultaneous hole punching
31    pub(super) coordination: Option<CoordinationState>,
32    /// Sequence number for address advertisements
33    pub(super) next_sequence: VarInt,
34    /// Maximum candidates we're willing to handle
35    pub(super) max_candidates: u32,
36    /// Timeout for coordination rounds
37    pub(super) coordination_timeout: Duration,
38    /// Statistics for this NAT traversal session
39    pub(super) stats: NatTraversalStats,
40    /// Security validation state
41    pub(super) security_state: SecurityValidationState,
42    /// Network condition monitoring for adaptive timeouts
43    pub(super) network_monitor: NetworkConditionMonitor,
44    /// Resource management and cleanup coordinator
45    pub(super) resource_manager: ResourceCleanupCoordinator,
46    /// Bootstrap coordinator (only for Bootstrap role)
47    pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
48    /// Multi-destination packet transmission manager
49    #[allow(dead_code)] // Part of multi-path transmission infrastructure
50    pub(super) multi_dest_transmitter: MultiDestinationTransmitter,
51}
52
53/// Role in NAT traversal coordination
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum NatTraversalRole {
56    /// Client endpoint (initiates connections, on-demand)
57    Client,
58    /// Server endpoint (accepts connections, always reachable)
59    Server { can_relay: bool },
60    /// Bootstrap/relay endpoint (publicly reachable, coordinates traversal)
61    Bootstrap,
62}
63
64/// Address candidate with metadata
65#[derive(Debug, Clone)]
66pub(super) struct AddressCandidate {
67    /// The socket address
68    pub(super) address: SocketAddr,
69    /// Priority for ICE-like selection (higher = better)
70    pub(super) priority: u32,
71    /// How this candidate was discovered
72    pub(super) source: CandidateSource,
73    /// When this candidate was first learned
74    pub(super) discovered_at: Instant,
75    /// Current state of this candidate
76    pub(super) state: CandidateState,
77    /// Number of validation attempts for this candidate
78    pub(super) attempt_count: u32,
79    /// Last validation attempt time
80    pub(super) last_attempt: Option<Instant>,
81}
82
83/// How an address candidate was discovered
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum CandidateSource {
86    /// Local network interface
87    Local,
88    /// Observed by a bootstrap node
89    Observed { by_node: Option<VarInt> },
90    /// Received from peer via AddAddress frame
91    Peer,
92    /// Generated prediction for symmetric NAT
93    Predicted,
94}
95
96/// Current state of a candidate address
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum CandidateState {
99    /// Newly discovered, not yet tested
100    New,
101    /// Currently being validated
102    Validating,
103    /// Successfully validated and usable
104    Valid,
105    /// Validation failed
106    Failed,
107    /// Removed by peer or expired
108    Removed,
109}
110
111/// State of an individual path validation attempt
112#[derive(Debug)]
113pub(super) struct PathValidationState {
114    /// Challenge value sent
115    pub(super) challenge: u64,
116    /// When the challenge was sent
117    pub(super) sent_at: Instant,
118    /// Number of retransmissions
119    pub(super) retry_count: u32,
120    /// Maximum retries allowed
121    pub(super) max_retries: u32,
122    /// Associated with a coordination round (if any)
123    #[allow(dead_code)] // Used for coordination tracking
124    pub(super) coordination_round: Option<VarInt>,
125    /// Adaptive timeout state
126    pub(super) timeout_state: AdaptiveTimeoutState,
127    /// Last retry attempt time
128    pub(super) last_retry_at: Option<Instant>,
129}
130
131/// Coordination state for simultaneous hole punching
132#[derive(Debug)]
133pub(super) struct CoordinationState {
134    /// Current coordination round number
135    pub(super) round: VarInt,
136    /// Addresses we're punching to in this round
137    pub(super) punch_targets: Vec<PunchTarget>,
138    /// When this round started (coordination phase)
139    pub(super) round_start: Instant,
140    /// When hole punching should begin (synchronized time)
141    pub(super) punch_start: Instant,
142    /// Duration of this coordination round
143    #[allow(dead_code)] // Used for timing coordination rounds
144    pub(super) round_duration: Duration,
145    /// Current state of this coordination round
146    pub(super) state: CoordinationPhase,
147    /// Whether we've sent our PUNCH_ME_NOW to coordinator
148    pub(super) punch_request_sent: bool,
149    /// Whether we've received peer's PUNCH_ME_NOW via coordinator
150    pub(super) peer_punch_received: bool,
151    /// Retry count for this round
152    pub(super) retry_count: u32,
153    /// Maximum retries before giving up
154    pub(super) max_retries: u32,
155    /// Adaptive timeout state for coordination
156    pub(super) timeout_state: AdaptiveTimeoutState,
157    /// Last retry attempt time
158    pub(super) last_retry_at: Option<Instant>,
159}
160
161/// Phases of the coordination protocol
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163#[allow(dead_code)] // All variants needed for complete state machine
164pub(crate) enum CoordinationPhase {
165    /// Waiting to start coordination
166    Idle,
167    /// Sending PUNCH_ME_NOW to coordinator
168    Requesting,
169    /// Waiting for peer's PUNCH_ME_NOW via coordinator  
170    Coordinating,
171    /// Grace period before synchronized hole punching
172    Preparing,
173    /// Actively sending PATH_CHALLENGE packets
174    Punching,
175    /// Waiting for PATH_RESPONSE validation
176    Validating,
177    /// This round completed successfully
178    Succeeded,
179    /// This round failed, may retry
180    Failed,
181}
182
183/// Target for hole punching in a coordination round
184#[derive(Debug, Clone)]
185pub(super) struct PunchTarget {
186    /// Remote address to punch to
187    pub(super) remote_addr: SocketAddr,
188    /// Sequence number of the remote candidate
189    #[allow(dead_code)]
190    pub(super) remote_sequence: VarInt,
191    /// Challenge value for validation
192    pub(super) challenge: u64,
193}
194
195/// Actions to take when handling NAT traversal timeouts
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub(super) enum TimeoutAction {
198    /// Retry candidate discovery
199    RetryDiscovery,
200    /// Retry coordination with bootstrap node
201    RetryCoordination,
202    /// Start path validation for discovered candidates
203    StartValidation,
204    /// NAT traversal completed successfully
205    Complete,
206    /// NAT traversal failed
207    Failed,
208}
209
210/// Target for multi-destination hole punching transmission
211#[derive(Debug, Clone)]
212#[allow(dead_code)] // Fields tracked for debugging and future multi-path optimization
213pub(super) struct MultiDestPunchTarget {
214    /// Destination address to send packets to
215    pub destination: SocketAddr,
216    /// Local address to send from
217    pub local_addr: SocketAddr,
218    /// Type of candidate pair
219    pub pair_type: PairType,
220    /// Priority of this target
221    pub priority: u32,
222    /// When this target was created
223    pub created_at: Instant,
224}
225
226/// Candidate pair for ICE-like connectivity testing
227#[derive(Debug, Clone)]
228pub(super) struct CandidatePair {
229    /// Sequence of remote candidate  
230    pub(super) remote_sequence: VarInt,
231    /// Our local address for this pair
232    pub(super) local_addr: SocketAddr,
233    /// Remote address we're testing connectivity to
234    pub(super) remote_addr: SocketAddr,
235    /// Combined priority for pair ordering (higher = better)
236    pub(super) priority: u64,
237    /// Current state of this pair
238    pub(super) state: PairState,
239    /// Type classification for this pair
240    pub(super) pair_type: PairType,
241    /// When this pair was created
242    pub(super) created_at: Instant,
243    /// When validation was last attempted
244    #[allow(dead_code)] // Used for retry timing
245    pub(super) last_check: Option<Instant>,
246}
247
248/// State of a candidate pair during validation
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250pub(super) enum PairState {
251    /// Waiting to be tested
252    Waiting,
253    /// Validation succeeded - this pair works
254    Succeeded,
255    /// Validation failed
256    #[allow(dead_code)] // Will be used when implementing pair retry logic
257    Failed,
258    /// Temporarily frozen (waiting for other pairs)
259    Frozen,
260}
261
262/// Type classification for candidate pairs (based on ICE)
263#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
264pub(super) enum PairType {
265    /// Both candidates are on local network
266    HostToHost,
267    /// Local is host, remote is server reflexive (through NAT)
268    HostToServerReflexive,
269    /// Local is server reflexive, remote is host
270    ServerReflexiveToHost,
271    /// Both are server reflexive (both behind NAT)
272    ServerReflexiveToServerReflexive,
273    /// One side is peer reflexive (learned from peer)
274    PeerReflexive,
275}
276
277/// Type of address candidate (following ICE terminology)
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
279pub(super) enum CandidateType {
280    /// Host candidate - directly reachable local interface
281    Host,
282    /// Server reflexive - public address observed by bootstrap node
283    ServerReflexive,
284    /// Peer reflexive - address learned from incoming packets
285    PeerReflexive,
286}
287
288/// Calculate ICE-like priority for an address candidate
289/// Based on RFC 8445 Section 5.1.2.1
290fn calculate_candidate_priority(
291    candidate_type: CandidateType,
292    local_preference: u16,
293    component_id: u8,
294) -> u32 {
295    let type_preference = match candidate_type {
296        CandidateType::Host => 126,
297        CandidateType::PeerReflexive => 110,
298        CandidateType::ServerReflexive => 100,
299    };
300
301    // ICE priority formula: (2^24 * type_pref) + (2^8 * local_pref) + component_id
302    (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
303}
304
305/// Calculate combined priority for a candidate pair
306/// Based on RFC 8445 Section 6.1.2.3  
307fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
308    let g = local_priority as u64;
309    let d = remote_priority as u64;
310
311    // ICE pair priority formula: 2^32 * MIN(G,D) + 2 * MAX(G,D) + (G>D ? 1 : 0)
312    (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
313}
314
315/// Determine candidate type from source information
316fn classify_candidate_type(source: CandidateSource) -> CandidateType {
317    match source {
318        CandidateSource::Local => CandidateType::Host,
319        CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
320        CandidateSource::Peer => CandidateType::PeerReflexive,
321        CandidateSource::Predicted => CandidateType::ServerReflexive, // Symmetric NAT prediction
322    }
323}
324
325/// Determine pair type from individual candidate types
326fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
327    match (local_type, remote_type) {
328        (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
329        (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
330        (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
331        (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
332            PairType::ServerReflexiveToServerReflexive
333        }
334        (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
335            PairType::PeerReflexive
336        }
337    }
338}
339
340/// Check if two candidates are compatible for pairing
341fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
342    // Must be same address family (IPv4 with IPv4, IPv6 with IPv6)
343    match (local.address, remote.address) {
344        (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
345        (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
346        _ => false, // No IPv4/IPv6 mixing for now
347    }
348}
349
350/// Statistics for NAT traversal attempts
351#[derive(Debug, Default, Clone)]
352pub(crate) struct NatTraversalStats {
353    /// Total candidates received from peer
354    pub(super) remote_candidates_received: u32,
355    /// Total candidates we've advertised
356    pub(super) local_candidates_sent: u32,
357    /// Successful validations
358    pub(super) validations_succeeded: u32,
359    /// Failed validations
360    #[allow(dead_code)] // Tracked for statistics
361    pub(super) validations_failed: u32,
362    /// Coordination rounds attempted
363    pub(super) coordination_rounds: u32,
364    /// Successful coordinations
365    #[allow(dead_code)] // Tracked for success rate calculation
366    pub(super) successful_coordinations: u32,
367    /// Failed coordinations
368    #[allow(dead_code)] // Tracked for failure analysis
369    pub(super) failed_coordinations: u32,
370    /// Timed out coordinations
371    #[allow(dead_code)] // Tracked for timeout optimization
372    pub(super) timed_out_coordinations: u32,
373    /// Coordination failures due to poor network conditions
374    pub(super) coordination_failures: u32,
375    /// Successful direct connections established
376    pub(super) direct_connections: u32,
377    /// Security validation rejections
378    pub(super) security_rejections: u32,
379    /// Rate limiting violations
380    pub(super) rate_limit_violations: u32,
381    /// Invalid address rejections
382    pub(super) invalid_address_rejections: u32,
383    /// Suspicious coordination attempts
384    pub(super) suspicious_coordination_attempts: u32,
385}
386
387/// Security validation state for rate limiting and attack detection
388#[derive(Debug)]
389pub(super) struct SecurityValidationState {
390    /// Rate limiting: track candidate additions per time window
391    candidate_rate_tracker: VecDeque<Instant>,
392    /// Maximum candidates per time window
393    max_candidates_per_window: u32,
394    /// Rate limiting time window
395    rate_window: Duration,
396    /// Coordination request tracking for suspicious patterns
397    coordination_requests: VecDeque<CoordinationRequest>,
398    /// Maximum coordination requests per time window
399    max_coordination_per_window: u32,
400    /// Address validation cache to avoid repeated validation
401    address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
402    /// Cache timeout for address validation
403    #[allow(dead_code)] // Used for cache expiry
404    validation_cache_timeout: Duration,
405}
406
407/// Coordination request tracking for security analysis
408#[derive(Debug, Clone)]
409struct CoordinationRequest {
410    /// When the request was made
411    timestamp: Instant,
412}
413
414/// Result of address validation
415#[derive(Debug, Clone, Copy, PartialEq, Eq)]
416enum AddressValidationResult {
417    /// Address is valid and safe
418    Valid,
419    /// Address is invalid (malformed, reserved, etc.)
420    Invalid,
421    /// Address is suspicious (potential attack)
422    Suspicious,
423}
424
425/// Adaptive timeout state for network condition awareness
426#[derive(Debug, Clone)]
427pub(super) struct AdaptiveTimeoutState {
428    /// Current timeout value
429    current_timeout: Duration,
430    /// Minimum allowed timeout
431    min_timeout: Duration,
432    /// Maximum allowed timeout
433    max_timeout: Duration,
434    /// Base timeout for exponential backoff
435    base_timeout: Duration,
436    /// Current backoff multiplier
437    backoff_multiplier: f64,
438    /// Maximum backoff multiplier
439    max_backoff_multiplier: f64,
440    /// Jitter factor for randomization
441    jitter_factor: f64,
442    /// Smoothed round-trip time estimation
443    srtt: Option<Duration>,
444    /// Round-trip time variance
445    rttvar: Option<Duration>,
446    /// Last successful round-trip time
447    last_rtt: Option<Duration>,
448    /// Number of consecutive timeouts
449    consecutive_timeouts: u32,
450    /// Number of successful responses
451    successful_responses: u32,
452}
453
454/// Network condition monitoring for adaptive behavior
455#[derive(Debug)]
456pub(super) struct NetworkConditionMonitor {
457    /// Recent round-trip time measurements
458    rtt_samples: VecDeque<Duration>,
459    /// Maximum samples to keep
460    max_samples: usize,
461    /// Packet loss rate estimation
462    packet_loss_rate: f64,
463    /// Congestion window estimate
464    #[allow(dead_code)] // Used for adaptive timeout calculations
465    congestion_window: u32,
466    /// Network quality score (0.0 = poor, 1.0 = excellent)
467    quality_score: f64,
468    /// Last quality update time
469    last_quality_update: Instant,
470    /// Quality measurement interval
471    quality_update_interval: Duration,
472    /// Timeout statistics
473    timeout_stats: TimeoutStatistics,
474}
475
476/// Statistics for timeout behavior
477#[derive(Debug, Default)]
478struct TimeoutStatistics {
479    /// Total timeout events
480    total_timeouts: u64,
481    /// Total successful responses
482    total_responses: u64,
483    /// Average response time
484    avg_response_time: Duration,
485    /// Timeout rate (0.0 = no timeouts, 1.0 = all timeouts)
486    timeout_rate: f64,
487    /// Last update time
488    last_update: Option<Instant>,
489}
490
491impl SecurityValidationState {
492    /// Create new security validation state with default settings
493    fn new() -> Self {
494        Self {
495            candidate_rate_tracker: VecDeque::new(),
496            max_candidates_per_window: 20, // Max 20 candidates per 60 seconds
497            rate_window: Duration::from_secs(60),
498            coordination_requests: VecDeque::new(),
499            max_coordination_per_window: 5, // Max 5 coordination requests per 60 seconds
500            address_validation_cache: HashMap::new(),
501            validation_cache_timeout: Duration::from_secs(300), // 5 minute cache
502        }
503    }
504
505    /// Create new security validation state with custom rate limits
506    #[allow(dead_code)] // Will be used for custom security configurations
507    fn new_with_limits(
508        max_candidates_per_window: u32,
509        max_coordination_per_window: u32,
510        rate_window: Duration,
511    ) -> Self {
512        Self {
513            candidate_rate_tracker: VecDeque::new(),
514            max_candidates_per_window,
515            rate_window,
516            coordination_requests: VecDeque::new(),
517            max_coordination_per_window,
518            address_validation_cache: HashMap::new(),
519            validation_cache_timeout: Duration::from_secs(300),
520        }
521    }
522
523    /// Enhanced rate limiting with adaptive thresholds
524    ///
525    /// This implements adaptive rate limiting that adjusts based on network conditions
526    /// and detected attack patterns to prevent flooding while maintaining usability.
527    fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
528        // Clean up old entries first
529        self.cleanup_rate_tracker(now);
530        self.cleanup_coordination_tracker(now);
531
532        // Calculate current request rate
533        let _current_candidate_rate =
534            self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
535        let _current_coordination_rate =
536            self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
537
538        // Adaptive threshold based on peer behavior
539        let peer_reputation = self.calculate_peer_reputation(peer_id);
540        let adaptive_candidate_limit =
541            (self.max_candidates_per_window as f64 * peer_reputation) as u32;
542        let adaptive_coordination_limit =
543            (self.max_coordination_per_window as f64 * peer_reputation) as u32;
544
545        // Check if either limit is exceeded
546        if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
547            debug!(
548                "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
549                hex::encode(&peer_id[..8]),
550                self.candidate_rate_tracker.len(),
551                adaptive_candidate_limit
552            );
553            return true;
554        }
555
556        if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
557            debug!(
558                "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
559                hex::encode(&peer_id[..8]),
560                self.coordination_requests.len(),
561                adaptive_coordination_limit
562            );
563            return true;
564        }
565
566        false
567    }
568
569    /// Calculate peer reputation score (0.0 = bad, 1.0 = good)
570    ///
571    /// This implements a simple reputation system to adjust rate limits
572    /// based on peer behavior patterns.
573    fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
574        // Simplified reputation calculation
575        // In production, this would track:
576        // - Historical success rates
577        // - Suspicious behavior patterns
578        // - Coordination completion rates
579        // - Address validation failures
580
581        // For now, return a default good reputation
582        // This can be enhanced with persistent peer reputation storage
583        1.0
584    }
585
586    /// Implement amplification attack mitigation
587    ///
588    /// This prevents the bootstrap node from being used as an amplifier
589    /// in DDoS attacks by limiting server-initiated validation packets.
590    fn validate_amplification_limits(
591        &mut self,
592        source_addr: SocketAddr,
593        target_addr: SocketAddr,
594        now: Instant,
595    ) -> Result<(), NatTraversalError> {
596        // Check if we're being asked to send too many packets to the same target
597        let amplification_key = (source_addr, target_addr);
598
599        // Simple amplification protection: limit packets per source-target pair
600        // In production, this would be more sophisticated with:
601        // - Bandwidth tracking
602        // - Packet size ratios
603        // - Geographic analysis
604        // - Temporal pattern analysis
605
606        // For now, implement basic per-pair rate limiting
607        if self.is_amplification_suspicious(amplification_key, now) {
608            warn!(
609                "Potential amplification attack detected: {} -> {}",
610                source_addr, target_addr
611            );
612            return Err(NatTraversalError::SuspiciousCoordination);
613        }
614
615        Ok(())
616    }
617
618    /// Check for suspicious amplification patterns
619    fn is_amplification_suspicious(
620        &self,
621        _amplification_key: (SocketAddr, SocketAddr),
622        _now: Instant,
623    ) -> bool {
624        // Simplified amplification detection
625        // In production, this would track:
626        // - Request/response ratios
627        // - Bandwidth amplification factors
628        // - Temporal clustering of requests
629        // - Geographic distribution analysis
630
631        // For now, return false (no amplification detected)
632        // This can be enhanced with persistent amplification tracking
633        false
634    }
635
636    /// Generate cryptographically secure random values for coordination rounds
637    ///
638    /// This ensures that coordination rounds use secure random values to prevent
639    /// prediction attacks and ensure proper synchronization security.
640    #[allow(dead_code)] // Used by BootstrapCoordinator
641    fn generate_secure_coordination_round(&self) -> VarInt {
642        // Use cryptographically secure random number generation
643        let secure_random: u64 = rand::random();
644
645        // Ensure the value is within reasonable bounds for VarInt
646        let bounded_random = secure_random % 1000000; // Limit to reasonable range
647
648        VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
649    }
650
651    /// Enhanced address validation with security checks
652    ///
653    /// This performs comprehensive address validation including:
654    /// - Basic format validation
655    /// - Security threat detection
656    /// - Amplification attack prevention
657    /// - Suspicious pattern recognition
658    fn enhanced_address_validation(
659        &mut self,
660        addr: SocketAddr,
661        source_addr: SocketAddr,
662        now: Instant,
663    ) -> Result<AddressValidationResult, NatTraversalError> {
664        // First, perform basic address validation
665        let basic_result = self.validate_address(addr, now);
666
667        match basic_result {
668            AddressValidationResult::Invalid => {
669                return Err(NatTraversalError::InvalidAddress);
670            }
671            AddressValidationResult::Suspicious => {
672                return Err(NatTraversalError::SuspiciousCoordination);
673            }
674            AddressValidationResult::Valid => {
675                // Continue with enhanced validation
676            }
677        }
678
679        // Check for amplification attack patterns
680        self.validate_amplification_limits(source_addr, addr, now)?;
681
682        // Additional security checks
683        if self.is_address_in_suspicious_range(addr) {
684            warn!("Address in suspicious range detected: {}", addr);
685            return Err(NatTraversalError::SuspiciousCoordination);
686        }
687
688        if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
689            warn!(
690                "Suspicious coordination pattern detected: {} -> {}",
691                source_addr, addr
692            );
693            return Err(NatTraversalError::SuspiciousCoordination);
694        }
695
696        Ok(AddressValidationResult::Valid)
697    }
698
699    /// Check if address is in a suspicious range
700    fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
701        match addr.ip() {
702            IpAddr::V4(ipv4) => {
703                // Check for addresses commonly used in attacks
704                let octets = ipv4.octets();
705
706                // Reject certain reserved ranges that shouldn't be used for P2P
707                if octets[0] == 0 || octets[0] == 127 {
708                    return true;
709                }
710
711                // Check for test networks (RFC 5737)
712                if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
713                    return true;
714                }
715                if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
716                    return true;
717                }
718                if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
719                    return true;
720                }
721
722                false
723            }
724            IpAddr::V6(ipv6) => {
725                // Check for suspicious IPv6 ranges
726                if ipv6.is_loopback() || ipv6.is_unspecified() {
727                    return true;
728                }
729
730                // Check for documentation ranges (RFC 3849)
731                let segments = ipv6.segments();
732                if segments[0] == 0x2001 && segments[1] == 0x0db8 {
733                    return true;
734                }
735
736                false
737            }
738        }
739    }
740
741    /// Check for suspicious coordination patterns
742    fn is_coordination_pattern_suspicious(
743        &self,
744        _source_addr: SocketAddr,
745        _target_addr: SocketAddr,
746        _now: Instant,
747    ) -> bool {
748        // Simplified pattern detection
749        // In production, this would analyze:
750        // - Temporal patterns (too frequent requests)
751        // - Geographic patterns (unusual source/target combinations)
752        // - Behavioral patterns (consistent with known attack signatures)
753        // - Network topology patterns (suspicious routing)
754
755        // For now, return false (no suspicious patterns detected)
756        // This can be enhanced with machine learning-based pattern detection
757        false
758    }
759
760    /// Check if candidate rate limit is exceeded
761    fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
762        // Clean up old entries
763        self.cleanup_rate_tracker(now);
764
765        // Check if we've exceeded the rate limit
766        if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
767            return true;
768        }
769
770        // Record this attempt
771        self.candidate_rate_tracker.push_back(now);
772        false
773    }
774
775    /// Check if coordination rate limit is exceeded
776    fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
777        // Clean up old entries
778        self.cleanup_coordination_tracker(now);
779
780        // Check if we've exceeded the rate limit
781        if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
782            return true;
783        }
784
785        // Record this attempt
786        let request = CoordinationRequest { timestamp: now };
787        self.coordination_requests.push_back(request);
788        false
789    }
790
791    /// Clean up old rate tracking entries
792    fn cleanup_rate_tracker(&mut self, now: Instant) {
793        let cutoff = now - self.rate_window;
794        while let Some(&front_time) = self.candidate_rate_tracker.front() {
795            if front_time < cutoff {
796                self.candidate_rate_tracker.pop_front();
797            } else {
798                break;
799            }
800        }
801    }
802
803    /// Clean up old coordination tracking entries
804    fn cleanup_coordination_tracker(&mut self, now: Instant) {
805        let cutoff = now - self.rate_window;
806        while let Some(front_request) = self.coordination_requests.front() {
807            if front_request.timestamp < cutoff {
808                self.coordination_requests.pop_front();
809            } else {
810                break;
811            }
812        }
813    }
814
815    /// Validate an address for security concerns
816    fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
817        // Check cache first
818        if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
819            return cached_result;
820        }
821
822        let result = self.perform_address_validation(addr);
823
824        // Cache the result
825        self.address_validation_cache.insert(addr, result);
826
827        // Clean up old cache entries periodically
828        if self.address_validation_cache.len() > 1000 {
829            self.cleanup_address_cache(now);
830        }
831
832        result
833    }
834
835    /// Perform actual address validation
836    fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
837        match addr.ip() {
838            IpAddr::V4(ipv4) => {
839                // Check for invalid IPv4 addresses
840                if ipv4.is_unspecified() || ipv4.is_broadcast() {
841                    return AddressValidationResult::Invalid;
842                }
843
844                // Check for suspicious addresses
845                if ipv4.is_multicast() || ipv4.is_documentation() {
846                    return AddressValidationResult::Suspicious;
847                }
848
849                // Check for reserved ranges that shouldn't be used for P2P
850                if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
851                    return AddressValidationResult::Invalid;
852                }
853
854                // Check for common attack patterns
855                if self.is_suspicious_ipv4(ipv4) {
856                    return AddressValidationResult::Suspicious;
857                }
858            }
859            IpAddr::V6(ipv6) => {
860                // Check for invalid IPv6 addresses
861                if ipv6.is_unspecified() || ipv6.is_multicast() {
862                    return AddressValidationResult::Invalid;
863                }
864
865                // Check for suspicious IPv6 addresses
866                if self.is_suspicious_ipv6(ipv6) {
867                    return AddressValidationResult::Suspicious;
868                }
869            }
870        }
871
872        // Check port range
873        if addr.port() == 0 || addr.port() < 1024 {
874            return AddressValidationResult::Suspicious;
875        }
876
877        AddressValidationResult::Valid
878    }
879
880    /// Check for suspicious IPv4 patterns
881    fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
882        let octets = ipv4.octets();
883
884        // Check for patterns that might indicate scanning or attacks
885        // Sequential or patterned addresses
886        if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
887            return true;
888        }
889
890        // Check for addresses in ranges commonly used for attacks
891        // This is a simplified check - production would have more sophisticated patterns
892        false
893    }
894
895    /// Check for suspicious IPv6 patterns
896    fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
897        let segments = ipv6.segments();
898
899        // Check for obvious patterns
900        if segments.iter().all(|&s| s == segments[0]) {
901            return true;
902        }
903
904        false
905    }
906
907    /// Clean up old address validation cache entries
908    fn cleanup_address_cache(&mut self, _now: Instant) {
909        // Simple cleanup - remove random entries to keep size bounded
910        // In production, this would use LRU or timestamp-based cleanup
911        if self.address_validation_cache.len() > 500 {
912            let keys_to_remove: Vec<_> = self
913                .address_validation_cache
914                .keys()
915                .take(self.address_validation_cache.len() / 2)
916                .copied()
917                .collect();
918
919            for key in keys_to_remove {
920                self.address_validation_cache.remove(&key);
921            }
922        }
923    }
924
925    /// Comprehensive path validation for PUNCH_ME_NOW frames
926    ///
927    /// This performs security-critical validation to prevent various attacks:
928    /// - Address spoofing prevention
929    /// - Reflection attack mitigation
930    /// - Coordination request validation
931    /// - Rate limiting enforcement
932    fn validate_punch_me_now_frame(
933        &mut self,
934        frame: &crate::frame::PunchMeNow,
935        source_addr: SocketAddr,
936        peer_id: [u8; 32],
937        now: Instant,
938    ) -> Result<(), NatTraversalError> {
939        // 1. Rate limiting validation
940        if self.is_coordination_rate_limited(now) {
941            debug!(
942                "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
943                hex::encode(&peer_id[..8])
944            );
945            return Err(NatTraversalError::RateLimitExceeded);
946        }
947
948        // 2. Address validation - validate the local_address claimed in the frame
949        let addr_validation = self.validate_address(frame.local_address, now);
950        match addr_validation {
951            AddressValidationResult::Invalid => {
952                debug!(
953                    "PUNCH_ME_NOW frame rejected: invalid local_address {:?} from peer {:?}",
954                    frame.local_address,
955                    hex::encode(&peer_id[..8])
956                );
957                return Err(NatTraversalError::InvalidAddress);
958            }
959            AddressValidationResult::Suspicious => {
960                debug!(
961                    "PUNCH_ME_NOW frame rejected: suspicious local_address {:?} from peer {:?}",
962                    frame.local_address,
963                    hex::encode(&peer_id[..8])
964                );
965                return Err(NatTraversalError::SuspiciousCoordination);
966            }
967            AddressValidationResult::Valid => {
968                // Continue validation
969            }
970        }
971
972        // 3. Source address consistency validation
973        // The frame's local_address should reasonably relate to the actual source
974        if !self.validate_address_consistency(frame.local_address, source_addr) {
975            debug!(
976                "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
977                frame.local_address, source_addr
978            );
979            return Err(NatTraversalError::SuspiciousCoordination);
980        }
981
982        // 4. Coordination parameters validation
983        if !self.validate_coordination_parameters(frame) {
984            debug!(
985                "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
986                hex::encode(&peer_id[..8])
987            );
988            return Err(NatTraversalError::SuspiciousCoordination);
989        }
990
991        // 5. Target peer validation (if present)
992        if let Some(target_peer_id) = frame.target_peer_id {
993            if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
994                debug!(
995                    "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
996                    hex::encode(&peer_id[..8]),
997                    hex::encode(&target_peer_id[..8])
998                );
999                return Err(NatTraversalError::SuspiciousCoordination);
1000            }
1001        }
1002
1003        // 6. Resource limits validation
1004        if !self.validate_resource_limits(frame) {
1005            debug!(
1006                "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
1007                hex::encode(&peer_id[..8])
1008            );
1009            return Err(NatTraversalError::ResourceLimitExceeded);
1010        }
1011
1012        debug!(
1013            "PUNCH_ME_NOW frame validation passed for peer {:?}",
1014            hex::encode(&peer_id[..8])
1015        );
1016        Ok(())
1017    }
1018
1019    /// Validate address consistency between claimed and observed addresses
1020    ///
1021    /// This prevents address spoofing by ensuring the claimed local address
1022    /// is reasonably consistent with the observed source address.
1023    fn validate_address_consistency(
1024        &self,
1025        claimed_addr: SocketAddr,
1026        observed_addr: SocketAddr,
1027    ) -> bool {
1028        // For P2P NAT traversal, the port will typically be different due to NAT,
1029        // but the IP should be consistent unless there's multi-homing or proxying
1030
1031        // Check if IPs are in the same family
1032        match (claimed_addr.ip(), observed_addr.ip()) {
1033            (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
1034                // For IPv4, allow same IP or addresses in same private range
1035                if claimed_ip == observed_ip {
1036                    return true;
1037                }
1038
1039                // Allow within same private network (simplified check)
1040                if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
1041                    return true;
1042                }
1043
1044                // Allow certain NAT scenarios where external IP differs
1045                // This is a simplified check - production would be more sophisticated
1046                !claimed_ip.is_private() && !observed_ip.is_private()
1047            }
1048            (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
1049                // For IPv6, be more lenient due to complex addressing
1050                claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
1051            }
1052            _ => {
1053                // Mismatched IP families - suspicious
1054                false
1055            }
1056        }
1057    }
1058
1059    /// Check if two IPv4 addresses are in the same private network
1060    fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
1061        // Check common private ranges
1062        let ip1_octets = ip1.octets();
1063        let ip2_octets = ip2.octets();
1064
1065        // 10.0.0.0/8
1066        if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
1067            return true;
1068        }
1069
1070        // 172.16.0.0/12
1071        if ip1_octets[0] == 172
1072            && ip2_octets[0] == 172
1073            && (16..=31).contains(&ip1_octets[1])
1074            && (16..=31).contains(&ip2_octets[1])
1075        {
1076            return true;
1077        }
1078
1079        // 192.168.0.0/16
1080        if ip1_octets[0] == 192
1081            && ip1_octets[1] == 168
1082            && ip2_octets[0] == 192
1083            && ip2_octets[1] == 168
1084        {
1085            return true;
1086        }
1087
1088        false
1089    }
1090
1091    /// Check if two IPv6 addresses are in the same prefix
1092    fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1093        // Simplified IPv6 prefix check - compare first 64 bits
1094        let segments1 = ip1.segments();
1095        let segments2 = ip2.segments();
1096
1097        segments1[0] == segments2[0]
1098            && segments1[1] == segments2[1]
1099            && segments1[2] == segments2[2]
1100            && segments1[3] == segments2[3]
1101    }
1102
1103    /// Validate coordination parameters for security
1104    fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1105        // Check round number is reasonable (not too large to prevent overflow attacks)
1106        if frame.round.into_inner() > 1000000 {
1107            return false;
1108        }
1109
1110        // Check target sequence is reasonable
1111        if frame.target_sequence.into_inner() > 10000 {
1112            return false;
1113        }
1114
1115        // Validate address is not obviously invalid
1116        match frame.local_address.ip() {
1117            IpAddr::V4(ipv4) => {
1118                // Reject obviously invalid addresses
1119                !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1120            }
1121            IpAddr::V6(ipv6) => {
1122                // Reject obviously invalid addresses
1123                !ipv6.is_unspecified() && !ipv6.is_multicast()
1124            }
1125        }
1126    }
1127
1128    /// Validate target peer request for potential abuse
1129    fn validate_target_peer_request(
1130        &self,
1131        requesting_peer: [u8; 32],
1132        target_peer: [u8; 32],
1133        _frame: &crate::frame::PunchMeNow,
1134    ) -> bool {
1135        // Prevent self-coordination (peer requesting coordination with itself)
1136        if requesting_peer == target_peer {
1137            return false;
1138        }
1139
1140        // Additional validation could include:
1141        // - Check if target peer is known/registered
1142        // - Validate target peer hasn't opted out of coordination
1143        // - Check for suspicious patterns in target peer selection
1144
1145        true
1146    }
1147
1148    /// Validate resource limits for the coordination request
1149    fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1150        // Check current load and resource usage
1151        // This is a simplified check - production would monitor:
1152        // - Active coordination sessions
1153        // - Memory usage
1154        // - Network bandwidth
1155        // - CPU utilization
1156
1157        // For now, just check if we have too many active coordination requests
1158        self.coordination_requests.len() < self.max_coordination_per_window as usize
1159    }
1160}
1161
1162impl AdaptiveTimeoutState {
1163    /// Create new adaptive timeout state with default values
1164    pub(crate) fn new() -> Self {
1165        let base_timeout = Duration::from_millis(1000); // 1 second base
1166        Self {
1167            current_timeout: base_timeout,
1168            min_timeout: Duration::from_millis(100),
1169            max_timeout: Duration::from_secs(30),
1170            base_timeout,
1171            backoff_multiplier: 1.0,
1172            max_backoff_multiplier: 8.0,
1173            jitter_factor: 0.1, // 10% jitter
1174            srtt: None,
1175            rttvar: None,
1176            last_rtt: None,
1177            consecutive_timeouts: 0,
1178            successful_responses: 0,
1179        }
1180    }
1181
1182    /// Update timeout based on successful response
1183    fn update_success(&mut self, rtt: Duration) {
1184        self.last_rtt = Some(rtt);
1185        self.successful_responses += 1;
1186        self.consecutive_timeouts = 0;
1187
1188        // Update smoothed RTT using TCP algorithm
1189        match self.srtt {
1190            None => {
1191                self.srtt = Some(rtt);
1192                self.rttvar = Some(rtt / 2);
1193            }
1194            Some(srtt) => {
1195                let rttvar = self.rttvar.unwrap_or(rtt / 2);
1196                let abs_diff = if rtt > srtt { rtt - srtt } else { srtt - rtt };
1197
1198                self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1199                self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1200            }
1201        }
1202
1203        // Reduce backoff multiplier on success
1204        self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1205
1206        // Update current timeout
1207        self.calculate_current_timeout();
1208    }
1209
1210    /// Update timeout based on timeout event
1211    fn update_timeout(&mut self) {
1212        self.consecutive_timeouts += 1;
1213
1214        // Exponential backoff with bounds
1215        self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1216
1217        // Update current timeout
1218        self.calculate_current_timeout();
1219    }
1220
1221    /// Calculate current timeout based on conditions
1222    fn calculate_current_timeout(&mut self) {
1223        let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1224            // Use TCP-style RTO calculation: RTO = SRTT + 4 * RTTVAR
1225            srtt + rttvar * 4
1226        } else {
1227            self.base_timeout
1228        };
1229
1230        // Apply backoff multiplier
1231        let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1232
1233        // Apply jitter to prevent thundering herd
1234        let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1235        let timeout = timeout.mul_f64(jitter);
1236
1237        // Bound the timeout
1238        self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1239    }
1240
1241    /// Get current timeout value
1242    fn get_timeout(&self) -> Duration {
1243        self.current_timeout
1244    }
1245
1246    /// Check if retry should be attempted
1247    fn should_retry(&self, max_retries: u32) -> bool {
1248        self.consecutive_timeouts < max_retries
1249    }
1250
1251    /// Get retry delay with exponential backoff
1252    fn get_retry_delay(&self) -> Duration {
1253        let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1254        delay.clamp(self.min_timeout, self.max_timeout)
1255    }
1256}
1257
1258/// Resource management limits and cleanup configuration
1259#[derive(Debug)]
1260pub(super) struct ResourceManagementConfig {
1261    /// Maximum number of active validations
1262    max_active_validations: usize,
1263    /// Maximum number of local candidates
1264    max_local_candidates: usize,
1265    /// Maximum number of remote candidates
1266    max_remote_candidates: usize,
1267    /// Maximum number of candidate pairs
1268    max_candidate_pairs: usize,
1269    /// Maximum coordination rounds to keep in history
1270    #[allow(dead_code)] // Used for memory management
1271    max_coordination_history: usize,
1272    /// Cleanup interval for expired resources
1273    cleanup_interval: Duration,
1274    /// Timeout for stale candidates
1275    candidate_timeout: Duration,
1276    /// Timeout for path validations
1277    validation_timeout: Duration,
1278    /// Timeout for coordination rounds
1279    coordination_timeout: Duration,
1280    /// Memory pressure threshold (0.0 = no pressure, 1.0 = maximum pressure)
1281    memory_pressure_threshold: f64,
1282    /// Aggressive cleanup mode threshold
1283    aggressive_cleanup_threshold: f64,
1284}
1285
1286/// Resource usage statistics and monitoring
1287#[derive(Debug, Default)]
1288pub(super) struct ResourceStats {
1289    /// Current number of active validations
1290    active_validations: usize,
1291    /// Current number of local candidates
1292    local_candidates: usize,
1293    /// Current number of remote candidates
1294    remote_candidates: usize,
1295    /// Current number of candidate pairs
1296    candidate_pairs: usize,
1297    /// Peak memory usage
1298    peak_memory_usage: usize,
1299    /// Number of cleanup operations performed
1300    cleanup_operations: u64,
1301    /// Number of resources cleaned up
1302    resources_cleaned: u64,
1303    /// Number of resource allocation failures
1304    allocation_failures: u64,
1305    /// Last cleanup time
1306    #[allow(dead_code)] // Used for cleanup scheduling
1307    last_cleanup: Option<Instant>,
1308    /// Memory pressure level (0.0 = no pressure, 1.0 = maximum pressure)
1309    memory_pressure: f64,
1310}
1311
1312/// Resource cleanup coordinator
1313#[derive(Debug)]
1314pub(super) struct ResourceCleanupCoordinator {
1315    /// Configuration for resource limits
1316    config: ResourceManagementConfig,
1317    /// Resource usage statistics
1318    stats: ResourceStats,
1319    /// Last cleanup time
1320    last_cleanup: Option<Instant>,
1321    /// Cleanup operation counter
1322    cleanup_counter: u64,
1323    /// Shutdown flag
1324    shutdown_requested: bool,
1325}
1326
1327impl ResourceManagementConfig {
1328    /// Create new resource management configuration with production-ready defaults
1329    fn new() -> Self {
1330        Self {
1331            max_active_validations: 100,
1332            max_local_candidates: 50,
1333            max_remote_candidates: 100,
1334            max_candidate_pairs: 200,
1335            max_coordination_history: 10,
1336            cleanup_interval: Duration::from_secs(30),
1337            candidate_timeout: Duration::from_secs(300), // 5 minutes
1338            validation_timeout: Duration::from_secs(30),
1339            coordination_timeout: Duration::from_secs(60),
1340            memory_pressure_threshold: 0.75,
1341            aggressive_cleanup_threshold: 0.90,
1342        }
1343    }
1344
1345    /// Create configuration optimized for low-memory environments
1346    #[allow(dead_code)] // Used when system is under memory pressure
1347    fn low_memory() -> Self {
1348        Self {
1349            max_active_validations: 25,
1350            max_local_candidates: 10,
1351            max_remote_candidates: 25,
1352            max_candidate_pairs: 50,
1353            max_coordination_history: 3,
1354            cleanup_interval: Duration::from_secs(15),
1355            candidate_timeout: Duration::from_secs(180), // 3 minutes
1356            validation_timeout: Duration::from_secs(20),
1357            coordination_timeout: Duration::from_secs(30),
1358            memory_pressure_threshold: 0.60,
1359            aggressive_cleanup_threshold: 0.80,
1360        }
1361    }
1362}
1363
1364impl ResourceCleanupCoordinator {
1365    /// Create new resource cleanup coordinator
1366    fn new() -> Self {
1367        Self {
1368            config: ResourceManagementConfig::new(),
1369            stats: ResourceStats::default(),
1370            last_cleanup: None,
1371            cleanup_counter: 0,
1372            shutdown_requested: false,
1373        }
1374    }
1375
1376    /// Create coordinator optimized for low-memory environments
1377    #[allow(dead_code)] // Used in memory-constrained environments
1378    fn low_memory() -> Self {
1379        Self {
1380            config: ResourceManagementConfig::low_memory(),
1381            stats: ResourceStats::default(),
1382            last_cleanup: None,
1383            cleanup_counter: 0,
1384            shutdown_requested: false,
1385        }
1386    }
1387
1388    /// Check if resource limits are exceeded
1389    fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1390        state.active_validations.len() > self.config.max_active_validations
1391            || state.local_candidates.len() > self.config.max_local_candidates
1392            || state.remote_candidates.len() > self.config.max_remote_candidates
1393            || state.candidate_pairs.len() > self.config.max_candidate_pairs
1394    }
1395
1396    /// Calculate current memory pressure level
1397    fn calculate_memory_pressure(
1398        &mut self,
1399        active_validations_len: usize,
1400        local_candidates_len: usize,
1401        remote_candidates_len: usize,
1402        candidate_pairs_len: usize,
1403    ) -> f64 {
1404        let total_limit = self.config.max_active_validations
1405            + self.config.max_local_candidates
1406            + self.config.max_remote_candidates
1407            + self.config.max_candidate_pairs;
1408
1409        let current_usage = active_validations_len
1410            + local_candidates_len
1411            + remote_candidates_len
1412            + candidate_pairs_len;
1413
1414        let pressure = current_usage as f64 / total_limit as f64;
1415        self.stats.memory_pressure = pressure;
1416        pressure
1417    }
1418
1419    /// Determine if cleanup is needed
1420    fn should_cleanup(&self, now: Instant) -> bool {
1421        if self.shutdown_requested {
1422            return true;
1423        }
1424
1425        // Check if it's time for regular cleanup
1426        if let Some(last_cleanup) = self.last_cleanup {
1427            if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1428                return true;
1429            }
1430        } else {
1431            return true; // First cleanup
1432        }
1433
1434        // Check memory pressure
1435        if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1436            return true;
1437        }
1438
1439        false
1440    }
1441
1442    /// Perform cleanup of expired resources
1443    #[allow(dead_code)] // Will be used when implementing automatic resource cleanup
1444    fn cleanup_expired_resources(
1445        &mut self,
1446        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1447        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1448        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1449        candidate_pairs: &mut Vec<CandidatePair>,
1450        coordination: &mut Option<CoordinationState>,
1451        now: Instant,
1452    ) -> u64 {
1453        let mut cleaned = 0;
1454
1455        // Clean up expired path validations
1456        cleaned += self.cleanup_expired_validations(active_validations, now);
1457
1458        // Clean up stale candidates
1459        cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1460
1461        // Clean up failed candidate pairs
1462        cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1463
1464        // Clean up old coordination state
1465        cleaned += self.cleanup_old_coordination(coordination, now);
1466
1467        // Update statistics
1468        self.stats.cleanup_operations += 1;
1469        self.stats.resources_cleaned += cleaned;
1470        self.last_cleanup = Some(now);
1471        self.cleanup_counter += 1;
1472
1473        debug!("Cleaned up {} expired resources", cleaned);
1474        cleaned
1475    }
1476
1477    /// Clean up expired path validations
1478    #[allow(dead_code)] // Called from cleanup_expired_resources
1479    fn cleanup_expired_validations(
1480        &mut self,
1481        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1482        now: Instant,
1483    ) -> u64 {
1484        let mut cleaned = 0;
1485        let validation_timeout = self.config.validation_timeout;
1486
1487        active_validations.retain(|_addr, validation| {
1488            let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1489            if is_expired {
1490                cleaned += 1;
1491                trace!("Cleaned up expired validation for {:?}", _addr);
1492            }
1493            !is_expired
1494        });
1495
1496        cleaned
1497    }
1498
1499    /// Clean up stale candidates
1500    #[allow(dead_code)] // Called from cleanup_expired_resources
1501    fn cleanup_stale_candidates(
1502        &mut self,
1503        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1504        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1505        now: Instant,
1506    ) -> u64 {
1507        let mut cleaned = 0;
1508        let candidate_timeout = self.config.candidate_timeout;
1509
1510        // Clean up local candidates
1511        local_candidates.retain(|_seq, candidate| {
1512            let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1513                || candidate.state == CandidateState::Failed
1514                || candidate.state == CandidateState::Removed;
1515            if is_stale {
1516                cleaned += 1;
1517                trace!("Cleaned up stale local candidate {:?}", candidate.address);
1518            }
1519            !is_stale
1520        });
1521
1522        // Clean up remote candidates
1523        remote_candidates.retain(|_seq, candidate| {
1524            let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1525                || candidate.state == CandidateState::Failed
1526                || candidate.state == CandidateState::Removed;
1527            if is_stale {
1528                cleaned += 1;
1529                trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1530            }
1531            !is_stale
1532        });
1533
1534        cleaned
1535    }
1536
1537    /// Clean up failed candidate pairs
1538    #[allow(dead_code)] // Called from cleanup_expired_resources
1539    fn cleanup_failed_pairs(
1540        &mut self,
1541        candidate_pairs: &mut Vec<CandidatePair>,
1542        now: Instant,
1543    ) -> u64 {
1544        let mut cleaned = 0;
1545        let pair_timeout = self.config.candidate_timeout;
1546
1547        candidate_pairs.retain(|pair| {
1548            let is_stale = now.duration_since(pair.created_at) > pair_timeout
1549                || pair.state == PairState::Failed;
1550            if is_stale {
1551                cleaned += 1;
1552                trace!(
1553                    "Cleaned up failed candidate pair {:?} -> {:?}",
1554                    pair.local_addr, pair.remote_addr
1555                );
1556            }
1557            !is_stale
1558        });
1559
1560        cleaned
1561    }
1562
1563    /// Clean up old coordination state
1564    #[allow(dead_code)] // Called from cleanup_expired_resources
1565    fn cleanup_old_coordination(
1566        &mut self,
1567        coordination: &mut Option<CoordinationState>,
1568        now: Instant,
1569    ) -> u64 {
1570        let mut cleaned = 0;
1571
1572        if let Some(coord) = coordination {
1573            let is_expired =
1574                now.duration_since(coord.round_start) > self.config.coordination_timeout;
1575            let is_failed = coord.state == CoordinationPhase::Failed;
1576
1577            if is_expired || is_failed {
1578                let round = coord.round;
1579                *coordination = None;
1580                cleaned += 1;
1581                trace!("Cleaned up old coordination state for round {}", round);
1582            }
1583        }
1584
1585        cleaned
1586    }
1587
1588    /// Perform aggressive cleanup when under memory pressure
1589    #[allow(dead_code)] // Will be used when implementing memory pressure response
1590    fn aggressive_cleanup(
1591        &mut self,
1592        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1593        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1594        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1595        candidate_pairs: &mut Vec<CandidatePair>,
1596        now: Instant,
1597    ) -> u64 {
1598        let mut cleaned = 0;
1599
1600        // More aggressive timeout for candidates
1601        let aggressive_timeout = self.config.candidate_timeout / 2;
1602
1603        // Clean up older candidates first
1604        local_candidates.retain(|_seq, candidate| {
1605            let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1606                && candidate.state != CandidateState::Failed;
1607            if !keep {
1608                cleaned += 1;
1609            }
1610            keep
1611        });
1612
1613        remote_candidates.retain(|_seq, candidate| {
1614            let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1615                && candidate.state != CandidateState::Failed;
1616            if !keep {
1617                cleaned += 1;
1618            }
1619            keep
1620        });
1621
1622        // Clean up waiting candidate pairs
1623        candidate_pairs.retain(|pair| {
1624            let keep = pair.state != PairState::Waiting
1625                || now.duration_since(pair.created_at) <= aggressive_timeout;
1626            if !keep {
1627                cleaned += 1;
1628            }
1629            keep
1630        });
1631
1632        // Clean up old validations more aggressively
1633        active_validations.retain(|_addr, validation| {
1634            let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1635            if !keep {
1636                cleaned += 1;
1637            }
1638            keep
1639        });
1640
1641        warn!(
1642            "Aggressive cleanup removed {} resources due to memory pressure",
1643            cleaned
1644        );
1645        cleaned
1646    }
1647
1648    /// Request graceful shutdown and cleanup
1649    #[allow(dead_code)] // Used for clean shutdown procedures
1650    fn request_shutdown(&mut self) {
1651        self.shutdown_requested = true;
1652        debug!("Resource cleanup coordinator shutdown requested");
1653    }
1654
1655    /// Perform final cleanup during shutdown
1656    #[allow(dead_code)] // Called during graceful shutdown
1657    fn shutdown_cleanup(
1658        &mut self,
1659        active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1660        local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1661        remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1662        candidate_pairs: &mut Vec<CandidatePair>,
1663        coordination: &mut Option<CoordinationState>,
1664    ) -> u64 {
1665        let mut cleaned = 0;
1666
1667        // Clear all resources
1668        cleaned += active_validations.len() as u64;
1669        active_validations.clear();
1670
1671        cleaned += local_candidates.len() as u64;
1672        local_candidates.clear();
1673
1674        cleaned += remote_candidates.len() as u64;
1675        remote_candidates.clear();
1676
1677        cleaned += candidate_pairs.len() as u64;
1678        candidate_pairs.clear();
1679
1680        if coordination.is_some() {
1681            *coordination = None;
1682            cleaned += 1;
1683        }
1684
1685        info!("Shutdown cleanup removed {} resources", cleaned);
1686        cleaned
1687    }
1688
1689    /// Get current resource usage statistics
1690    #[allow(dead_code)] // Used for monitoring and debugging
1691    fn get_resource_stats(&self) -> &ResourceStats {
1692        &self.stats
1693    }
1694
1695    /// Update resource usage statistics
1696    fn update_stats(
1697        &mut self,
1698        active_validations_len: usize,
1699        local_candidates_len: usize,
1700        remote_candidates_len: usize,
1701        candidate_pairs_len: usize,
1702    ) {
1703        self.stats.active_validations = active_validations_len;
1704        self.stats.local_candidates = local_candidates_len;
1705        self.stats.remote_candidates = remote_candidates_len;
1706        self.stats.candidate_pairs = candidate_pairs_len;
1707
1708        // Update peak memory usage
1709        let current_usage = self.stats.active_validations
1710            + self.stats.local_candidates
1711            + self.stats.remote_candidates
1712            + self.stats.candidate_pairs;
1713
1714        if current_usage > self.stats.peak_memory_usage {
1715            self.stats.peak_memory_usage = current_usage;
1716        }
1717    }
1718
1719    /// Perform resource cleanup based on current state
1720    pub(super) fn perform_cleanup(&mut self, now: Instant) {
1721        self.last_cleanup = Some(now);
1722        self.cleanup_counter += 1;
1723
1724        // Update cleanup statistics
1725        self.stats.cleanup_operations += 1;
1726
1727        debug!("Performed resource cleanup #{}", self.cleanup_counter);
1728    }
1729}
1730
1731impl NetworkConditionMonitor {
1732    /// Create new network condition monitor
1733    fn new() -> Self {
1734        Self {
1735            rtt_samples: VecDeque::new(),
1736            max_samples: 20,
1737            packet_loss_rate: 0.0,
1738            congestion_window: 10,
1739            quality_score: 0.8, // Start with good quality assumption
1740            last_quality_update: Instant::now(),
1741            quality_update_interval: Duration::from_secs(10),
1742            timeout_stats: TimeoutStatistics::default(),
1743        }
1744    }
1745
1746    /// Record a successful response time
1747    fn record_success(&mut self, rtt: Duration, now: Instant) {
1748        // Add RTT sample
1749        self.rtt_samples.push_back(rtt);
1750        if self.rtt_samples.len() > self.max_samples {
1751            self.rtt_samples.pop_front();
1752        }
1753
1754        // Update timeout statistics
1755        self.timeout_stats.total_responses += 1;
1756        self.update_timeout_stats(now);
1757
1758        // Update quality score
1759        self.update_quality_score(now);
1760    }
1761
1762    /// Record a timeout event
1763    fn record_timeout(&mut self, now: Instant) {
1764        self.timeout_stats.total_timeouts += 1;
1765        self.update_timeout_stats(now);
1766
1767        // Update quality score
1768        self.update_quality_score(now);
1769    }
1770
1771    /// Update timeout statistics
1772    fn update_timeout_stats(&mut self, now: Instant) {
1773        let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1774
1775        if total_attempts > 0 {
1776            self.timeout_stats.timeout_rate =
1777                self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1778        }
1779
1780        // Calculate average response time
1781        if !self.rtt_samples.is_empty() {
1782            let total_rtt: Duration = self.rtt_samples.iter().sum();
1783            self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1784        }
1785
1786        self.timeout_stats.last_update = Some(now);
1787    }
1788
1789    /// Update network quality score
1790    fn update_quality_score(&mut self, now: Instant) {
1791        if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1792            return;
1793        }
1794
1795        // Quality factors
1796        let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1797        let rtt_factor = self.calculate_rtt_factor();
1798        let consistency_factor = self.calculate_consistency_factor();
1799
1800        // Weighted quality score
1801        let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1802
1803        // Smooth the quality score
1804        self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1805        self.last_quality_update = now;
1806    }
1807
1808    /// Calculate RTT factor for quality score
1809    fn calculate_rtt_factor(&self) -> f64 {
1810        if self.rtt_samples.is_empty() {
1811            return 0.5; // Neutral score
1812        }
1813
1814        let avg_rtt = self.timeout_stats.avg_response_time;
1815
1816        // Good RTT: < 50ms = 1.0, Poor RTT: > 1000ms = 0.0
1817        let rtt_ms = avg_rtt.as_millis() as f64;
1818        let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1819        factor.clamp(0.0, 1.0)
1820    }
1821
1822    /// Calculate consistency factor for quality score
1823    fn calculate_consistency_factor(&self) -> f64 {
1824        if self.rtt_samples.len() < 3 {
1825            return 0.5; // Neutral score
1826        }
1827
1828        // Calculate RTT variance
1829        let mean_rtt = self.timeout_stats.avg_response_time;
1830        let variance: f64 = self
1831            .rtt_samples
1832            .iter()
1833            .map(|rtt| {
1834                let diff = if *rtt > mean_rtt {
1835                    *rtt - mean_rtt
1836                } else {
1837                    mean_rtt - *rtt
1838                };
1839                diff.as_millis() as f64
1840            })
1841            .map(|diff| diff * diff)
1842            .sum::<f64>()
1843            / self.rtt_samples.len() as f64;
1844
1845        let std_dev = variance.sqrt();
1846
1847        // Low variance = high consistency
1848        let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1849        consistency.clamp(0.0, 1.0)
1850    }
1851
1852    /// Get current network quality score
1853    fn get_quality_score(&self) -> f64 {
1854        self.quality_score
1855    }
1856
1857    /// Get estimated RTT based on recent samples
1858    fn get_estimated_rtt(&self) -> Option<Duration> {
1859        if self.rtt_samples.is_empty() {
1860            return None;
1861        }
1862
1863        Some(self.timeout_stats.avg_response_time)
1864    }
1865
1866    /// Check if network conditions are suitable for coordination
1867    fn is_suitable_for_coordination(&self) -> bool {
1868        // Require reasonable quality for coordination attempts
1869        self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1870    }
1871
1872    /// Get estimated packet loss rate
1873    #[allow(dead_code)] // Used for adaptive timeout calculations
1874    fn get_packet_loss_rate(&self) -> f64 {
1875        self.packet_loss_rate
1876    }
1877
1878    /// Get recommended timeout multiplier based on conditions
1879    #[allow(dead_code)] // Will be used for dynamic timeout adjustments
1880    fn get_timeout_multiplier(&self) -> f64 {
1881        let base_multiplier = 1.0;
1882
1883        // Adjust based on quality score
1884        let quality_multiplier = if self.quality_score < 0.3 {
1885            2.0 // Poor quality, increase timeouts
1886        } else if self.quality_score > 0.8 {
1887            0.8 // Good quality, reduce timeouts
1888        } else {
1889            1.0 // Neutral
1890        };
1891
1892        // Adjust based on packet loss
1893        let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1894
1895        base_multiplier * quality_multiplier * loss_multiplier
1896    }
1897
1898    /// Clean up old samples and statistics
1899    #[allow(dead_code)] // Will be used in periodic maintenance
1900    fn cleanup(&mut self, now: Instant) {
1901        // Remove old RTT samples (keep only recent ones)
1902        let _cutoff_time = now - Duration::from_secs(60);
1903
1904        // Reset statistics if they're too old
1905        if let Some(last_update) = self.timeout_stats.last_update {
1906            if now.duration_since(last_update) > Duration::from_secs(300) {
1907                self.timeout_stats = TimeoutStatistics::default();
1908            }
1909        }
1910    }
1911}
1912
1913impl NatTraversalState {
1914    /// Create new NAT traversal state with given role and configuration
1915    pub(super) fn new(
1916        role: NatTraversalRole,
1917        max_candidates: u32,
1918        coordination_timeout: Duration,
1919    ) -> Self {
1920        let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1921            Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1922        } else {
1923            None
1924        };
1925
1926        Self {
1927            role,
1928            local_candidates: HashMap::new(),
1929            remote_candidates: HashMap::new(),
1930            candidate_pairs: Vec::new(),
1931            pair_index: HashMap::new(),
1932            active_validations: HashMap::new(),
1933            coordination: None,
1934            next_sequence: VarInt::from_u32(1),
1935            max_candidates,
1936            coordination_timeout,
1937            stats: NatTraversalStats::default(),
1938            security_state: SecurityValidationState::new(),
1939            network_monitor: NetworkConditionMonitor::new(),
1940            resource_manager: ResourceCleanupCoordinator::new(),
1941            bootstrap_coordinator,
1942            multi_dest_transmitter: MultiDestinationTransmitter::new(),
1943        }
1944    }
1945
1946    /// Add a remote candidate from AddAddress frame with security validation
1947    pub(super) fn add_remote_candidate(
1948        &mut self,
1949        sequence: VarInt,
1950        address: SocketAddr,
1951        priority: VarInt,
1952        now: Instant,
1953    ) -> Result<(), NatTraversalError> {
1954        // Resource management: Check if we should reject new resources
1955        if self.should_reject_new_resources(now) {
1956            debug!(
1957                "Rejecting new candidate due to resource limits: {}",
1958                address
1959            );
1960            return Err(NatTraversalError::ResourceLimitExceeded);
1961        }
1962
1963        // Security validation: Check rate limiting
1964        if self.security_state.is_candidate_rate_limited(now) {
1965            self.stats.rate_limit_violations += 1;
1966            debug!("Rate limit exceeded for candidate addition: {}", address);
1967            return Err(NatTraversalError::RateLimitExceeded);
1968        }
1969
1970        // Security validation: Validate address format and safety
1971        match self.security_state.validate_address(address, now) {
1972            AddressValidationResult::Invalid => {
1973                self.stats.invalid_address_rejections += 1;
1974                self.stats.security_rejections += 1;
1975                debug!("Invalid address rejected: {}", address);
1976                return Err(NatTraversalError::InvalidAddress);
1977            }
1978            AddressValidationResult::Suspicious => {
1979                self.stats.security_rejections += 1;
1980                debug!("Suspicious address rejected: {}", address);
1981                return Err(NatTraversalError::SecurityValidationFailed);
1982            }
1983            AddressValidationResult::Valid => {
1984                // Continue with normal processing
1985            }
1986        }
1987
1988        // Check candidate count limit
1989        if self.remote_candidates.len() >= self.max_candidates as usize {
1990            return Err(NatTraversalError::TooManyCandidates);
1991        }
1992
1993        // Check for duplicate addresses (different sequence, same address)
1994        if self
1995            .remote_candidates
1996            .values()
1997            .any(|c| c.address == address && c.state != CandidateState::Removed)
1998        {
1999            return Err(NatTraversalError::DuplicateAddress);
2000        }
2001
2002        let candidate = AddressCandidate {
2003            address,
2004            priority: priority.into_inner() as u32,
2005            source: CandidateSource::Peer,
2006            discovered_at: now,
2007            state: CandidateState::New,
2008            attempt_count: 0,
2009            last_attempt: None,
2010        };
2011
2012        self.remote_candidates.insert(sequence, candidate);
2013        self.stats.remote_candidates_received += 1;
2014
2015        trace!(
2016            "Added remote candidate: {} with priority {}",
2017            address, priority
2018        );
2019        Ok(())
2020    }
2021
2022    /// Remove a candidate by sequence number
2023    pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
2024        if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
2025            candidate.state = CandidateState::Removed;
2026
2027            // Cancel any active validation for this address
2028            self.active_validations.remove(&candidate.address);
2029            true
2030        } else {
2031            false
2032        }
2033    }
2034
2035    /// Add a local candidate that we've discovered
2036    #[allow(dead_code)] // Reserved for future local candidate management
2037    pub(super) fn add_local_candidate(
2038        &mut self,
2039        address: SocketAddr,
2040        source: CandidateSource,
2041        now: Instant,
2042    ) -> VarInt {
2043        let sequence = self.next_sequence;
2044        self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2045            .expect("sequence number overflow");
2046
2047        // Calculate priority for this candidate
2048        let candidate_type = classify_candidate_type(source);
2049        let local_preference = self.calculate_local_preference(address);
2050        let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
2051
2052        let candidate = AddressCandidate {
2053            address,
2054            priority,
2055            source,
2056            discovered_at: now,
2057            state: CandidateState::New,
2058            attempt_count: 0,
2059            last_attempt: None,
2060        };
2061
2062        self.local_candidates.insert(sequence, candidate);
2063        self.stats.local_candidates_sent += 1;
2064
2065        // Regenerate pairs when we add a new local candidate
2066        self.generate_candidate_pairs(now);
2067
2068        sequence
2069    }
2070
2071    /// Calculate local preference for address prioritization
2072    #[allow(dead_code)] // Reserved for ICE-like priority calculation
2073    fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
2074        match addr {
2075            SocketAddr::V4(v4) => {
2076                if v4.ip().is_loopback() {
2077                    0 // Lowest priority
2078                } else if v4.ip().is_private() {
2079                    65000 // High priority for local network
2080                } else {
2081                    32000 // Medium priority for public addresses
2082                }
2083            }
2084            SocketAddr::V6(v6) => {
2085                if v6.ip().is_loopback() {
2086                    0
2087                } else if v6.ip().is_unicast_link_local() {
2088                    30000 // Link-local gets medium-low priority
2089                } else {
2090                    50000 // IPv6 generally gets good priority
2091                }
2092            }
2093        }
2094    }
2095
2096    /// Generate all possible candidate pairs from local and remote candidates
2097    pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
2098        self.candidate_pairs.clear();
2099        self.pair_index.clear();
2100
2101        // Pre-allocate capacity to avoid reallocations
2102        let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
2103        self.candidate_pairs.reserve(estimated_capacity);
2104        self.pair_index.reserve(estimated_capacity);
2105
2106        // Cache compatibility checks to avoid repeated work
2107        let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
2108
2109        for (_local_seq, local_candidate) in &self.local_candidates {
2110            // Skip removed candidates early
2111            if local_candidate.state == CandidateState::Removed {
2112                continue;
2113            }
2114
2115            // Pre-classify local candidate type once
2116            let local_type = classify_candidate_type(local_candidate.source);
2117
2118            for (remote_seq, remote_candidate) in &self.remote_candidates {
2119                // Skip removed candidates
2120                if remote_candidate.state == CandidateState::Removed {
2121                    continue;
2122                }
2123
2124                // Check compatibility with caching
2125                let cache_key = (local_candidate.address, remote_candidate.address);
2126                let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
2127                    are_candidates_compatible(local_candidate, remote_candidate)
2128                });
2129
2130                if !compatible {
2131                    continue;
2132                }
2133
2134                // Calculate combined priority
2135                let pair_priority =
2136                    calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
2137
2138                // Classify pair type (local already classified)
2139                let remote_type = classify_candidate_type(remote_candidate.source);
2140                let pair_type = classify_pair_type(local_type, remote_type);
2141
2142                let pair = CandidatePair {
2143                    remote_sequence: *remote_seq,
2144                    local_addr: local_candidate.address,
2145                    remote_addr: remote_candidate.address,
2146                    priority: pair_priority,
2147                    state: PairState::Waiting,
2148                    pair_type,
2149                    created_at: now,
2150                    last_check: None,
2151                };
2152
2153                // Store index for O(1) lookup
2154                let index = self.candidate_pairs.len();
2155                self.pair_index.insert(remote_candidate.address, index);
2156                self.candidate_pairs.push(pair);
2157            }
2158        }
2159
2160        // Sort pairs by priority (highest first) - use unstable sort for better performance
2161        self.candidate_pairs
2162            .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2163
2164        // Rebuild index after sorting since indices changed
2165        self.pair_index.clear();
2166        for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2167            self.pair_index.insert(pair.remote_addr, idx);
2168        }
2169
2170        trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2171    }
2172
2173    /// Get the highest priority pairs ready for validation
2174    pub(super) fn get_next_validation_pairs(
2175        &mut self,
2176        max_concurrent: usize,
2177    ) -> Vec<&mut CandidatePair> {
2178        // Since pairs are sorted by priority (highest first), we can stop early
2179        // once we find enough waiting pairs or reach lower priority pairs
2180        let mut result = Vec::with_capacity(max_concurrent);
2181
2182        for pair in self.candidate_pairs.iter_mut() {
2183            if pair.state == PairState::Waiting {
2184                result.push(pair);
2185                if result.len() >= max_concurrent {
2186                    break;
2187                }
2188            }
2189        }
2190
2191        result
2192    }
2193
2194    /// Find a candidate pair by remote address
2195    pub(super) fn find_pair_by_remote_addr(
2196        &mut self,
2197        addr: SocketAddr,
2198    ) -> Option<&mut CandidatePair> {
2199        // Use index for O(1) lookup instead of O(n) linear search
2200        if let Some(&index) = self.pair_index.get(&addr) {
2201            self.candidate_pairs.get_mut(index)
2202        } else {
2203            None
2204        }
2205    }
2206
2207    /// Mark a pair as succeeded and handle promotion
2208    pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2209        // Find the pair and get its type and priority
2210        let (succeeded_type, succeeded_priority) = {
2211            if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2212                pair.state = PairState::Succeeded;
2213                (pair.pair_type, pair.priority)
2214            } else {
2215                return false;
2216            }
2217        };
2218
2219        // Freeze lower priority pairs of the same type to avoid unnecessary testing
2220        for other_pair in &mut self.candidate_pairs {
2221            if other_pair.pair_type == succeeded_type
2222                && other_pair.priority < succeeded_priority
2223                && other_pair.state == PairState::Waiting
2224            {
2225                other_pair.state = PairState::Frozen;
2226            }
2227        }
2228
2229        true
2230    }
2231
2232    /// Get the best succeeded pair for each address family
2233    pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2234        let mut best_ipv4: Option<&CandidatePair> = None;
2235        let mut best_ipv6: Option<&CandidatePair> = None;
2236
2237        for pair in &self.candidate_pairs {
2238            if pair.state != PairState::Succeeded {
2239                continue;
2240            }
2241
2242            match pair.remote_addr {
2243                SocketAddr::V4(_) => {
2244                    if best_ipv4.map_or(true, |best| pair.priority > best.priority) {
2245                        best_ipv4 = Some(pair);
2246                    }
2247                }
2248                SocketAddr::V6(_) => {
2249                    if best_ipv6.map_or(true, |best| pair.priority > best.priority) {
2250                        best_ipv6 = Some(pair);
2251                    }
2252                }
2253            }
2254        }
2255
2256        let mut result = Vec::new();
2257        if let Some(pair) = best_ipv4 {
2258            result.push(pair);
2259        }
2260        if let Some(pair) = best_ipv6 {
2261            result.push(pair);
2262        }
2263        result
2264    }
2265
2266    /// Get candidates ready for validation, sorted by priority
2267    pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2268        let mut candidates: Vec<_> = self
2269            .remote_candidates
2270            .iter()
2271            .filter(|(_, c)| c.state == CandidateState::New)
2272            .map(|(k, v)| (*k, v))
2273            .collect();
2274
2275        // Sort by priority (higher priority first)
2276        candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2277        candidates
2278    }
2279
2280    /// Start validation for a candidate address with security checks
2281    pub(super) fn start_validation(
2282        &mut self,
2283        sequence: VarInt,
2284        challenge: u64,
2285        now: Instant,
2286    ) -> Result<(), NatTraversalError> {
2287        let candidate = self
2288            .remote_candidates
2289            .get_mut(&sequence)
2290            .ok_or(NatTraversalError::UnknownCandidate)?;
2291
2292        if candidate.state != CandidateState::New {
2293            return Err(NatTraversalError::InvalidCandidateState);
2294        }
2295
2296        // Security validation: Check for validation abuse patterns
2297        if Self::is_validation_suspicious(candidate, now) {
2298            self.stats.security_rejections += 1;
2299            debug!(
2300                "Suspicious validation attempt rejected for address {}",
2301                candidate.address
2302            );
2303            return Err(NatTraversalError::SecurityValidationFailed);
2304        }
2305
2306        // Security validation: Limit concurrent validations
2307        if self.active_validations.len() >= 10 {
2308            debug!(
2309                "Too many concurrent validations, rejecting new validation for {}",
2310                candidate.address
2311            );
2312            return Err(NatTraversalError::SecurityValidationFailed);
2313        }
2314
2315        // Update candidate state
2316        candidate.state = CandidateState::Validating;
2317        candidate.attempt_count += 1;
2318        candidate.last_attempt = Some(now);
2319
2320        // Track validation state
2321        let validation = PathValidationState {
2322            challenge,
2323            sent_at: now,
2324            retry_count: 0,
2325            max_retries: 3, // TODO: Make configurable
2326            coordination_round: self.coordination.as_ref().map(|c| c.round),
2327            timeout_state: AdaptiveTimeoutState::new(),
2328            last_retry_at: None,
2329        };
2330
2331        self.active_validations
2332            .insert(candidate.address, validation);
2333        trace!(
2334            "Started validation for candidate {} with challenge {}",
2335            candidate.address, challenge
2336        );
2337        Ok(())
2338    }
2339
2340    /// Check if a validation request shows suspicious patterns
2341    fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2342        // Check for excessive retry attempts
2343        if candidate.attempt_count > 10 {
2344            return true;
2345        }
2346
2347        // Check for rapid retry patterns
2348        if let Some(last_attempt) = candidate.last_attempt {
2349            let time_since_last = now.duration_since(last_attempt);
2350            if time_since_last < Duration::from_millis(100) {
2351                return true; // Too frequent attempts
2352            }
2353        }
2354
2355        // Check if this candidate was recently failed
2356        if candidate.state == CandidateState::Failed {
2357            let time_since_discovery = now.duration_since(candidate.discovered_at);
2358            if time_since_discovery < Duration::from_secs(60) {
2359                return true; // Recently failed, shouldn't retry so soon
2360            }
2361        }
2362
2363        false
2364    }
2365
2366    /// Handle successful validation response
2367    pub(super) fn handle_validation_success(
2368        &mut self,
2369        remote_addr: SocketAddr,
2370        challenge: u64,
2371        now: Instant,
2372    ) -> Result<VarInt, NatTraversalError> {
2373        // Find the candidate with this address
2374        let sequence = self
2375            .remote_candidates
2376            .iter()
2377            .find(|(_, c)| c.address == remote_addr)
2378            .map(|(seq, _)| *seq)
2379            .ok_or(NatTraversalError::UnknownCandidate)?;
2380
2381        // Verify challenge matches and update timeout state
2382        let validation = self
2383            .active_validations
2384            .get_mut(&remote_addr)
2385            .ok_or(NatTraversalError::NoActiveValidation)?;
2386
2387        if validation.challenge != challenge {
2388            return Err(NatTraversalError::ChallengeMismatch);
2389        }
2390
2391        // Calculate RTT and update adaptive timeout
2392        let rtt = now.duration_since(validation.sent_at);
2393        validation.timeout_state.update_success(rtt);
2394
2395        // Update network monitor
2396        self.network_monitor.record_success(rtt, now);
2397
2398        // Update candidate state
2399        let candidate = self
2400            .remote_candidates
2401            .get_mut(&sequence)
2402            .ok_or(NatTraversalError::UnknownCandidate)?;
2403
2404        candidate.state = CandidateState::Valid;
2405        self.active_validations.remove(&remote_addr);
2406        self.stats.validations_succeeded += 1;
2407
2408        trace!(
2409            "Validation successful for {} with RTT {:?}",
2410            remote_addr, rtt
2411        );
2412        Ok(sequence)
2413    }
2414
2415    /// Start a new coordination round for simultaneous hole punching with security validation
2416    pub(super) fn start_coordination_round(
2417        &mut self,
2418        targets: Vec<PunchTarget>,
2419        now: Instant,
2420    ) -> Result<VarInt, NatTraversalError> {
2421        // Security validation: Check rate limiting for coordination requests
2422        if self.security_state.is_coordination_rate_limited(now) {
2423            self.stats.rate_limit_violations += 1;
2424            debug!(
2425                "Rate limit exceeded for coordination request with {} targets",
2426                targets.len()
2427            );
2428            return Err(NatTraversalError::RateLimitExceeded);
2429        }
2430
2431        // Security validation: Check for suspicious coordination patterns
2432        if self.is_coordination_suspicious(&targets, now) {
2433            self.stats.suspicious_coordination_attempts += 1;
2434            self.stats.security_rejections += 1;
2435            debug!(
2436                "Suspicious coordination request rejected with {} targets",
2437                targets.len()
2438            );
2439            return Err(NatTraversalError::SuspiciousCoordination);
2440        }
2441
2442        // Security validation: Validate all target addresses
2443        for target in &targets {
2444            match self
2445                .security_state
2446                .validate_address(target.remote_addr, now)
2447            {
2448                AddressValidationResult::Invalid => {
2449                    self.stats.invalid_address_rejections += 1;
2450                    self.stats.security_rejections += 1;
2451                    debug!(
2452                        "Invalid target address in coordination: {}",
2453                        target.remote_addr
2454                    );
2455                    return Err(NatTraversalError::InvalidAddress);
2456                }
2457                AddressValidationResult::Suspicious => {
2458                    self.stats.security_rejections += 1;
2459                    debug!(
2460                        "Suspicious target address in coordination: {}",
2461                        target.remote_addr
2462                    );
2463                    return Err(NatTraversalError::SecurityValidationFailed);
2464                }
2465                AddressValidationResult::Valid => {
2466                    // Continue with normal processing
2467                }
2468            }
2469        }
2470
2471        let round = self.next_sequence;
2472        self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2473            .expect("sequence number overflow");
2474
2475        // Calculate synchronized punch time (grace period for coordination)
2476        let coordination_grace = Duration::from_millis(500); // 500ms for coordination
2477        let punch_start = now + coordination_grace;
2478
2479        self.coordination = Some(CoordinationState {
2480            round,
2481            punch_targets: targets,
2482            round_start: now,
2483            punch_start,
2484            round_duration: self.coordination_timeout,
2485            state: CoordinationPhase::Requesting,
2486            punch_request_sent: false,
2487            peer_punch_received: false,
2488            retry_count: 0,
2489            max_retries: 3,
2490            timeout_state: AdaptiveTimeoutState::new(),
2491            last_retry_at: None,
2492        });
2493
2494        self.stats.coordination_rounds += 1;
2495        trace!(
2496            "Started coordination round {} with {} targets",
2497            round,
2498            self.coordination.as_ref().unwrap().punch_targets.len()
2499        );
2500        Ok(round)
2501    }
2502
2503    /// Check if a coordination request shows suspicious patterns
2504    fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2505        // Check for excessive number of targets
2506        if targets.len() > 20 {
2507            return true;
2508        }
2509
2510        // Check for duplicate targets
2511        let mut seen_addresses = std::collections::HashSet::new();
2512        for target in targets {
2513            if !seen_addresses.insert(target.remote_addr) {
2514                return true; // Duplicate target
2515            }
2516        }
2517
2518        // Check for patterns that might indicate scanning
2519        if targets.len() > 5 {
2520            // Check if all targets are in sequential IP ranges (potential scan)
2521            let mut ipv4_addresses: Vec<_> = targets
2522                .iter()
2523                .filter_map(|t| match t.remote_addr.ip() {
2524                    IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2525                    _ => None,
2526                })
2527                .collect();
2528
2529            if ipv4_addresses.len() >= 3 {
2530                ipv4_addresses.sort();
2531                let mut sequential_count = 1;
2532                for i in 1..ipv4_addresses.len() {
2533                    if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2534                        sequential_count += 1;
2535                        if sequential_count >= 3 {
2536                            return true; // Sequential IPs detected
2537                        }
2538                    } else {
2539                        sequential_count = 1;
2540                    }
2541                }
2542            }
2543        }
2544
2545        false
2546    }
2547
2548    /// Get the current coordination phase
2549    pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2550        self.coordination.as_ref().map(|c| c.state)
2551    }
2552
2553    /// Check if we need to send PUNCH_ME_NOW frame
2554    pub(super) fn should_send_punch_request(&self) -> bool {
2555        if let Some(coord) = &self.coordination {
2556            coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2557        } else {
2558            false
2559        }
2560    }
2561
2562    /// Mark that we've sent our PUNCH_ME_NOW request
2563    pub(super) fn mark_punch_request_sent(&mut self) {
2564        if let Some(coord) = &mut self.coordination {
2565            coord.punch_request_sent = true;
2566            coord.state = CoordinationPhase::Coordinating;
2567            trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2568        }
2569    }
2570
2571    /// Handle receiving peer's PUNCH_ME_NOW (via coordinator) with security validation
2572    pub(super) fn handle_peer_punch_request(
2573        &mut self,
2574        peer_round: VarInt,
2575        now: Instant,
2576    ) -> Result<bool, NatTraversalError> {
2577        // Security validation: Check if this is a valid coordination request
2578        if self.is_peer_coordination_suspicious(peer_round, now) {
2579            self.stats.suspicious_coordination_attempts += 1;
2580            self.stats.security_rejections += 1;
2581            debug!(
2582                "Suspicious peer coordination request rejected for round {}",
2583                peer_round
2584            );
2585            return Err(NatTraversalError::SuspiciousCoordination);
2586        }
2587
2588        if let Some(coord) = &mut self.coordination {
2589            if coord.round == peer_round {
2590                match coord.state {
2591                    CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2592                        coord.peer_punch_received = true;
2593                        coord.state = CoordinationPhase::Preparing;
2594
2595                        // Calculate adaptive grace period based on network conditions
2596                        let network_rtt = self
2597                            .network_monitor
2598                            .get_estimated_rtt()
2599                            .unwrap_or(Duration::from_millis(100));
2600                        let quality_score = self.network_monitor.get_quality_score();
2601
2602                        // Scale grace period: good networks get shorter delays
2603                        let base_grace = Duration::from_millis(150);
2604                        let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2605                        let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2606
2607                        let adaptive_grace = Duration::from_millis(
2608                            (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2609                        );
2610
2611                        coord.punch_start = now + adaptive_grace;
2612
2613                        trace!(
2614                            "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2615                            adaptive_grace, network_rtt, quality_score
2616                        );
2617                        Ok(true)
2618                    }
2619                    CoordinationPhase::Preparing => {
2620                        // Already in preparation phase, just acknowledge
2621                        trace!("Peer coordination confirmed during preparation");
2622                        Ok(true)
2623                    }
2624                    _ => {
2625                        debug!(
2626                            "Received coordination in unexpected phase: {:?}",
2627                            coord.state
2628                        );
2629                        Ok(false)
2630                    }
2631                }
2632            } else {
2633                debug!(
2634                    "Received coordination for wrong round: {} vs {}",
2635                    peer_round, coord.round
2636                );
2637                Ok(false)
2638            }
2639        } else {
2640            debug!("Received peer coordination but no active round");
2641            Ok(false)
2642        }
2643    }
2644
2645    /// Check if a peer coordination request is suspicious
2646    fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2647        // Check for round number anomalies
2648        if peer_round.into_inner() == 0 {
2649            return true; // Invalid round number
2650        }
2651
2652        // Check if round is too far in the future or past
2653        if let Some(coord) = &self.coordination {
2654            let our_round = coord.round.into_inner();
2655            let peer_round_num = peer_round.into_inner();
2656
2657            // Allow some variance but reject extreme differences
2658            if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2659                return true;
2660            }
2661        }
2662
2663        false
2664    }
2665
2666    /// Check if it's time to start hole punching
2667    pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2668        if let Some(coord) = &self.coordination {
2669            match coord.state {
2670                CoordinationPhase::Preparing => now >= coord.punch_start,
2671                CoordinationPhase::Coordinating => {
2672                    // Check if we have peer confirmation and grace period elapsed
2673                    coord.peer_punch_received && now >= coord.punch_start
2674                }
2675                _ => false,
2676            }
2677        } else {
2678            false
2679        }
2680    }
2681
2682    /// Start the synchronized hole punching phase
2683    pub(super) fn start_punching_phase(&mut self, now: Instant) {
2684        if let Some(coord) = &mut self.coordination {
2685            coord.state = CoordinationPhase::Punching;
2686
2687            // Calculate precise timing for coordinated transmission
2688            let network_rtt = self
2689                .network_monitor
2690                .get_estimated_rtt()
2691                .unwrap_or(Duration::from_millis(100));
2692
2693            // Add small random jitter to avoid thundering herd
2694            let jitter_ms: u64 = rand::random::<u64>() % 11;
2695            let jitter = Duration::from_millis(jitter_ms);
2696            let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2697
2698            // Update punch start time with precise calculation
2699            coord.punch_start = transmission_time.max(now);
2700
2701            trace!(
2702                "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2703                coord.punch_start, network_rtt, jitter
2704            );
2705        }
2706    }
2707
2708    /// Get punch targets for the current round
2709    pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2710        self.coordination
2711            .as_ref()
2712            .map(|c| c.punch_targets.as_slice())
2713    }
2714
2715    /// Mark coordination as validating (PATH_CHALLENGE sent)
2716    pub(super) fn mark_coordination_validating(&mut self) {
2717        if let Some(coord) = &mut self.coordination {
2718            if coord.state == CoordinationPhase::Punching {
2719                coord.state = CoordinationPhase::Validating;
2720                trace!("Coordination moved to validation phase");
2721            }
2722        }
2723    }
2724
2725    /// Handle successful path validation during coordination
2726    pub(super) fn handle_coordination_success(
2727        &mut self,
2728        remote_addr: SocketAddr,
2729        now: Instant,
2730    ) -> bool {
2731        if let Some(coord) = &mut self.coordination {
2732            // Check if this address was one of our punch targets
2733            let was_target = coord
2734                .punch_targets
2735                .iter()
2736                .any(|target| target.remote_addr == remote_addr);
2737
2738            if was_target && coord.state == CoordinationPhase::Validating {
2739                // Calculate RTT and update adaptive timeout
2740                let rtt = now.duration_since(coord.round_start);
2741                coord.timeout_state.update_success(rtt);
2742                self.network_monitor.record_success(rtt, now);
2743
2744                coord.state = CoordinationPhase::Succeeded;
2745                self.stats.direct_connections += 1;
2746                trace!(
2747                    "Coordination succeeded via {} with RTT {:?}",
2748                    remote_addr, rtt
2749                );
2750                true
2751            } else {
2752                false
2753            }
2754        } else {
2755            false
2756        }
2757    }
2758
2759    /// Handle coordination failure and determine if we should retry
2760    pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2761        if let Some(coord) = &mut self.coordination {
2762            coord.retry_count += 1;
2763            coord.timeout_state.update_timeout();
2764            self.network_monitor.record_timeout(now);
2765
2766            // Check network conditions before retrying
2767            if coord.timeout_state.should_retry(coord.max_retries)
2768                && self.network_monitor.is_suitable_for_coordination()
2769            {
2770                // Retry with adaptive timeout
2771                coord.state = CoordinationPhase::Requesting;
2772                coord.punch_request_sent = false;
2773                coord.peer_punch_received = false;
2774                coord.round_start = now;
2775                coord.last_retry_at = Some(now);
2776
2777                // Use adaptive timeout for retry delay
2778                let retry_delay = coord.timeout_state.get_retry_delay();
2779
2780                // Factor in network quality for retry timing
2781                let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2782                let adjusted_delay = Duration::from_millis(
2783                    (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2784                );
2785
2786                coord.punch_start = now + adjusted_delay;
2787
2788                trace!(
2789                    "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2790                    coord.round,
2791                    coord.retry_count + 1,
2792                    adjusted_delay,
2793                    self.network_monitor.get_quality_score()
2794                );
2795                true
2796            } else {
2797                coord.state = CoordinationPhase::Failed;
2798                self.stats.coordination_failures += 1;
2799
2800                if !self.network_monitor.is_suitable_for_coordination() {
2801                    trace!(
2802                        "Coordination failed due to poor network conditions (quality: {:.2})",
2803                        self.network_monitor.get_quality_score()
2804                    );
2805                } else {
2806                    trace!("Coordination failed after {} attempts", coord.retry_count);
2807                }
2808                false
2809            }
2810        } else {
2811            false
2812        }
2813    }
2814
2815    /// Check if the current coordination round has timed out
2816    pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2817        if let Some(coord) = &mut self.coordination {
2818            let timeout = coord.timeout_state.get_timeout();
2819            let elapsed = now.duration_since(coord.round_start);
2820
2821            if elapsed > timeout {
2822                trace!(
2823                    "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2824                    coord.round, elapsed, timeout
2825                );
2826                self.handle_coordination_failure(now);
2827                true
2828            } else {
2829                false
2830            }
2831        } else {
2832            false
2833        }
2834    }
2835
2836    /// Check for validation timeouts and handle retries
2837    #[allow(dead_code)] // Reserved for validation timeout handling
2838    pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2839        let mut expired_validations = Vec::new();
2840        let mut retry_validations = Vec::new();
2841
2842        for (addr, validation) in &mut self.active_validations {
2843            let timeout = validation.timeout_state.get_timeout();
2844            let elapsed = now.duration_since(validation.sent_at);
2845
2846            if elapsed >= timeout {
2847                if validation
2848                    .timeout_state
2849                    .should_retry(validation.max_retries)
2850                {
2851                    // Schedule retry
2852                    retry_validations.push(*addr);
2853                } else {
2854                    // Mark as expired
2855                    expired_validations.push(*addr);
2856                }
2857            }
2858        }
2859
2860        // Handle retries
2861        for addr in retry_validations {
2862            if let Some(validation) = self.active_validations.get_mut(&addr) {
2863                validation.retry_count += 1;
2864                validation.sent_at = now;
2865                validation.last_retry_at = Some(now);
2866                validation.timeout_state.update_timeout();
2867
2868                trace!(
2869                    "Retrying validation for {} (attempt {})",
2870                    addr,
2871                    validation.retry_count + 1
2872                );
2873            }
2874        }
2875
2876        // Remove expired validations
2877        for addr in &expired_validations {
2878            self.active_validations.remove(addr);
2879            self.network_monitor.record_timeout(now);
2880            trace!("Validation expired for {}", addr);
2881        }
2882
2883        expired_validations
2884    }
2885
2886    /// Schedule validation retries for active validations that need retry
2887    #[allow(dead_code)] // Reserved for retry scheduling logic
2888    pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2889        let mut retry_addresses = Vec::new();
2890
2891        // Get all active validations that need retry
2892        for (addr, validation) in &mut self.active_validations {
2893            let elapsed = now.duration_since(validation.sent_at);
2894            let timeout = validation.timeout_state.get_timeout();
2895
2896            if elapsed > timeout
2897                && validation
2898                    .timeout_state
2899                    .should_retry(validation.max_retries)
2900            {
2901                // Update retry state
2902                validation.retry_count += 1;
2903                validation.last_retry_at = Some(now);
2904                validation.sent_at = now; // Reset sent time for new attempt
2905                validation.timeout_state.update_timeout();
2906
2907                retry_addresses.push(*addr);
2908                trace!(
2909                    "Scheduled retry {} for validation to {}",
2910                    validation.retry_count, addr
2911                );
2912            }
2913        }
2914
2915        retry_addresses
2916    }
2917
2918    /// Update network conditions and cleanup
2919    #[allow(dead_code)] // Reserved for network condition monitoring
2920    pub(super) fn update_network_conditions(&mut self, now: Instant) {
2921        self.network_monitor.cleanup(now);
2922
2923        // Update timeout multiplier based on network conditions
2924        let multiplier = self.network_monitor.get_timeout_multiplier();
2925
2926        // Apply network-aware timeout adjustments to active validations
2927        for validation in self.active_validations.values_mut() {
2928            if multiplier > 1.5 {
2929                // Poor network conditions - be more patient
2930                validation.timeout_state.backoff_multiplier =
2931                    (validation.timeout_state.backoff_multiplier * 1.2)
2932                        .min(validation.timeout_state.max_backoff_multiplier);
2933            } else if multiplier < 0.8 {
2934                // Good network conditions - be more aggressive
2935                validation.timeout_state.backoff_multiplier =
2936                    (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2937            }
2938        }
2939    }
2940
2941    /// Check if coordination should be retried now
2942    #[allow(dead_code)] // Reserved for coordination retry logic
2943    pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2944        if let Some(coord) = &self.coordination {
2945            if coord.retry_count > 0 {
2946                if let Some(last_retry) = coord.last_retry_at {
2947                    let retry_delay = coord.timeout_state.get_retry_delay();
2948                    return now.duration_since(last_retry) >= retry_delay;
2949                }
2950            }
2951        }
2952        false
2953    }
2954
2955    /// Perform resource management and cleanup
2956    #[allow(dead_code)] // Reserved for resource management optimization
2957    pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2958        // Update resource usage statistics
2959        self.resource_manager.update_stats(
2960            self.active_validations.len(),
2961            self.local_candidates.len(),
2962            self.remote_candidates.len(),
2963            self.candidate_pairs.len(),
2964        );
2965
2966        // Calculate current memory pressure
2967        let memory_pressure = self.resource_manager.calculate_memory_pressure(
2968            self.active_validations.len(),
2969            self.local_candidates.len(),
2970            self.remote_candidates.len(),
2971            self.candidate_pairs.len(),
2972        );
2973
2974        // Perform cleanup if needed
2975        let mut cleaned = 0;
2976
2977        if self.resource_manager.should_cleanup(now) {
2978            cleaned += self.resource_manager.cleanup_expired_resources(
2979                &mut self.active_validations,
2980                &mut self.local_candidates,
2981                &mut self.remote_candidates,
2982                &mut self.candidate_pairs,
2983                &mut self.coordination,
2984                now,
2985            );
2986
2987            // If memory pressure is high, perform aggressive cleanup
2988            if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2989                cleaned += self.resource_manager.aggressive_cleanup(
2990                    &mut self.active_validations,
2991                    &mut self.local_candidates,
2992                    &mut self.remote_candidates,
2993                    &mut self.candidate_pairs,
2994                    now,
2995                );
2996            }
2997        }
2998
2999        cleaned
3000    }
3001
3002    /// Check if we should reject new resources due to limits
3003    pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
3004        // Update stats and check limits
3005        self.resource_manager.update_stats(
3006            self.active_validations.len(),
3007            self.local_candidates.len(),
3008            self.remote_candidates.len(),
3009            self.candidate_pairs.len(),
3010        );
3011        let memory_pressure = self.resource_manager.calculate_memory_pressure(
3012            self.active_validations.len(),
3013            self.local_candidates.len(),
3014            self.remote_candidates.len(),
3015            self.candidate_pairs.len(),
3016        );
3017
3018        // Reject if memory pressure is too high
3019        if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
3020            self.resource_manager.stats.allocation_failures += 1;
3021            return true;
3022        }
3023
3024        // Reject if hard limits are exceeded
3025        if self.resource_manager.check_resource_limits(self) {
3026            self.resource_manager.stats.allocation_failures += 1;
3027            return true;
3028        }
3029
3030        false
3031    }
3032
3033    /// Get the next timeout instant for NAT traversal operations
3034    pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
3035        let mut next_timeout = None;
3036
3037        // Check coordination timeout
3038        if let Some(coord) = &self.coordination {
3039            match coord.state {
3040                CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
3041                    let timeout_at = coord.round_start + self.coordination_timeout;
3042                    next_timeout =
3043                        Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
3044                }
3045                CoordinationPhase::Preparing => {
3046                    // Punch start time is when we should start punching
3047                    next_timeout = Some(
3048                        next_timeout
3049                            .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
3050                    );
3051                }
3052                CoordinationPhase::Punching | CoordinationPhase::Validating => {
3053                    // Check for coordination round timeout
3054                    let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
3055                    next_timeout =
3056                        Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
3057                }
3058                _ => {}
3059            }
3060        }
3061
3062        // Check validation timeouts
3063        for (_, validation) in &self.active_validations {
3064            let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
3065            next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
3066        }
3067
3068        // Check resource cleanup interval
3069        if self.resource_manager.should_cleanup(now) {
3070            // Schedule cleanup soon
3071            let cleanup_at = now + Duration::from_secs(1);
3072            next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
3073        }
3074
3075        next_timeout
3076    }
3077
3078    /// Handle timeout events and return actions to take
3079    pub(super) fn handle_timeout(
3080        &mut self,
3081        now: Instant,
3082    ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
3083        let mut actions = Vec::new();
3084
3085        // Handle coordination timeouts
3086        if let Some(coord) = &mut self.coordination {
3087            match coord.state {
3088                CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
3089                    let timeout_at = coord.round_start + self.coordination_timeout;
3090                    if now >= timeout_at {
3091                        coord.retry_count += 1;
3092                        if coord.retry_count >= coord.max_retries {
3093                            debug!("Coordination failed after {} retries", coord.retry_count);
3094                            coord.state = CoordinationPhase::Failed;
3095                            actions.push(TimeoutAction::Failed);
3096                        } else {
3097                            debug!(
3098                                "Coordination timeout, retrying ({}/{})",
3099                                coord.retry_count, coord.max_retries
3100                            );
3101                            coord.state = CoordinationPhase::Requesting;
3102                            coord.round_start = now;
3103                            actions.push(TimeoutAction::RetryCoordination);
3104                        }
3105                    }
3106                }
3107                CoordinationPhase::Preparing => {
3108                    // Check if it's time to start punching
3109                    if now >= coord.punch_start {
3110                        debug!("Starting coordinated hole punching");
3111                        coord.state = CoordinationPhase::Punching;
3112                        actions.push(TimeoutAction::StartValidation);
3113                    }
3114                }
3115                CoordinationPhase::Punching | CoordinationPhase::Validating => {
3116                    let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
3117                    if now >= timeout_at {
3118                        coord.retry_count += 1;
3119                        if coord.retry_count >= coord.max_retries {
3120                            debug!("Validation failed after {} retries", coord.retry_count);
3121                            coord.state = CoordinationPhase::Failed;
3122                            actions.push(TimeoutAction::Failed);
3123                        } else {
3124                            debug!(
3125                                "Validation timeout, retrying ({}/{})",
3126                                coord.retry_count, coord.max_retries
3127                            );
3128                            coord.state = CoordinationPhase::Punching;
3129                            actions.push(TimeoutAction::StartValidation);
3130                        }
3131                    }
3132                }
3133                CoordinationPhase::Succeeded => {
3134                    actions.push(TimeoutAction::Complete);
3135                }
3136                CoordinationPhase::Failed => {
3137                    actions.push(TimeoutAction::Failed);
3138                }
3139                _ => {}
3140            }
3141        }
3142
3143        // Handle validation timeouts
3144        let mut expired_validations = Vec::new();
3145        for (addr, validation) in &mut self.active_validations {
3146            let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
3147            if now >= timeout_at {
3148                validation.retry_count += 1;
3149                if validation.retry_count >= validation.max_retries {
3150                    debug!("Path validation failed for {}: max retries exceeded", addr);
3151                    expired_validations.push(*addr);
3152                } else {
3153                    debug!(
3154                        "Path validation timeout for {}, retrying ({}/{})",
3155                        addr, validation.retry_count, validation.max_retries
3156                    );
3157                    validation.sent_at = now;
3158                    validation.last_retry_at = Some(now);
3159                    actions.push(TimeoutAction::StartValidation);
3160                }
3161            }
3162        }
3163
3164        // Remove expired validations
3165        for addr in expired_validations {
3166            self.active_validations.remove(&addr);
3167        }
3168
3169        // Handle resource cleanup
3170        if self.resource_manager.should_cleanup(now) {
3171            self.resource_manager.perform_cleanup(now);
3172        }
3173
3174        // Update network condition monitoring
3175        self.network_monitor.update_quality_score(now);
3176
3177        // If no coordination is active and we have candidates, try to start discovery
3178        if self.coordination.is_none()
3179            && !self.local_candidates.is_empty()
3180            && !self.remote_candidates.is_empty()
3181        {
3182            actions.push(TimeoutAction::RetryDiscovery);
3183        }
3184
3185        Ok(actions)
3186    }
3187
3188    /// Handle address observation for bootstrap nodes
3189    ///
3190    /// This method is called when a peer connects to this bootstrap node,
3191    /// allowing the bootstrap to observe the peer's public address.
3192    #[allow(dead_code)] // Reserved for address observation handling
3193    pub(super) fn handle_address_observation(
3194        &mut self,
3195        peer_id: [u8; 32],
3196        observed_address: SocketAddr,
3197        connection_id: crate::shared::ConnectionId,
3198        peer_role: NatTraversalRole,
3199        now: Instant,
3200    ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3201        if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3202            let connection_context = ConnectionContext {
3203                connection_id,
3204                original_destination: observed_address, // For now, use same as observed
3205                peer_role,
3206                transport_params: None,
3207            };
3208
3209            // Observe the peer's address
3210            bootstrap_coordinator.observe_peer_address(
3211                peer_id,
3212                observed_address,
3213                connection_context,
3214                now,
3215            )?;
3216
3217            // Generate ADD_ADDRESS frame to inform peer of their observed address
3218            let sequence = self.next_sequence;
3219            self.next_sequence =
3220                VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3221
3222            let priority = VarInt::from_u32(100); // Server-reflexive priority
3223            let add_address_frame =
3224                bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3225
3226            Ok(add_address_frame)
3227        } else {
3228            // Not a bootstrap node
3229            Ok(None)
3230        }
3231    }
3232
3233    /// Handle PUNCH_ME_NOW frame for bootstrap coordination
3234    ///
3235    /// This processes coordination requests from peers and facilitates
3236    /// hole punching between them.
3237    pub(super) fn handle_punch_me_now_frame(
3238        &mut self,
3239        from_peer: [u8; 32],
3240        source_addr: SocketAddr,
3241        frame: &crate::frame::PunchMeNow,
3242        now: Instant,
3243    ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3244        if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3245            bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3246        } else {
3247            // Not a bootstrap node - this frame should not be processed here
3248            Ok(None)
3249        }
3250    }
3251
3252    /// Perform bootstrap cleanup operations
3253    ///
3254    /// Get observed address for a peer
3255    #[allow(dead_code)] // Used for address reflexive candidate discovery
3256    pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3257        self.bootstrap_coordinator
3258            .as_ref()
3259            .and_then(|coord| coord.get_peer_record(peer_id))
3260            .map(|record| record.observed_address)
3261    }
3262
3263    /// Start candidate discovery process
3264    pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3265        debug!("Starting candidate discovery for NAT traversal");
3266
3267        // Initialize discovery state if needed
3268        if self.local_candidates.is_empty() {
3269            // Add local interface candidates
3270            // This would be populated by the candidate discovery manager
3271            debug!("Local candidates will be populated by discovery manager");
3272        }
3273
3274        Ok(())
3275    }
3276
3277    /// Queue an ADD_ADDRESS frame for transmission
3278    #[allow(dead_code)] // Reserved for ADD_ADDRESS frame queueing
3279    pub(super) fn queue_add_address_frame(
3280        &mut self,
3281        sequence: VarInt,
3282        address: SocketAddr,
3283        priority: u32,
3284    ) -> Result<(), NatTraversalError> {
3285        debug!(
3286            "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3287            sequence, address, priority
3288        );
3289
3290        // Add to local candidates if not already present
3291        let candidate = AddressCandidate {
3292            address,
3293            priority,
3294            source: CandidateSource::Local,
3295            discovered_at: Instant::now(),
3296            state: CandidateState::New,
3297            attempt_count: 0,
3298            last_attempt: None,
3299        };
3300
3301        // Check if candidate already exists
3302        if !self.local_candidates.values().any(|c| c.address == address) {
3303            self.local_candidates.insert(sequence, candidate);
3304        }
3305
3306        Ok(())
3307    }
3308}
3309
3310/// Errors that can occur during NAT traversal
3311#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3312#[allow(dead_code)] // NoActiveCoordination variant reserved for bootstrap coordination errors
3313pub(crate) enum NatTraversalError {
3314    /// Too many candidates received
3315    TooManyCandidates,
3316    /// Duplicate address for different sequence
3317    DuplicateAddress,
3318    /// Unknown candidate sequence
3319    UnknownCandidate,
3320    /// Candidate in wrong state for operation
3321    InvalidCandidateState,
3322    /// No active validation for address
3323    NoActiveValidation,
3324    /// Challenge value mismatch
3325    ChallengeMismatch,
3326    /// Coordination round not active
3327    NoActiveCoordination,
3328    /// Security validation failed
3329    SecurityValidationFailed,
3330    /// Rate limit exceeded
3331    RateLimitExceeded,
3332    /// Invalid address format
3333    InvalidAddress,
3334    /// Suspicious coordination request
3335    SuspiciousCoordination,
3336    /// Resource limit exceeded
3337    ResourceLimitExceeded,
3338}
3339
3340impl std::fmt::Display for NatTraversalError {
3341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3342        match self {
3343            Self::TooManyCandidates => write!(f, "too many candidates"),
3344            Self::DuplicateAddress => write!(f, "duplicate address"),
3345            Self::UnknownCandidate => write!(f, "unknown candidate"),
3346            Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3347            Self::NoActiveValidation => write!(f, "no active validation"),
3348            Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3349            Self::NoActiveCoordination => write!(f, "no active coordination"),
3350            Self::SecurityValidationFailed => write!(f, "security validation failed"),
3351            Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3352            Self::InvalidAddress => write!(f, "invalid address"),
3353            Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3354            Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3355        }
3356    }
3357}
3358
3359impl std::error::Error for NatTraversalError {}
3360
3361/// Security statistics for monitoring and debugging
3362#[derive(Debug, Clone)]
3363#[allow(dead_code)] // Security statistics reserved for monitoring and reporting
3364pub(crate) struct SecurityStats {
3365    /// Total security rejections
3366    pub total_security_rejections: u32,
3367    /// Rate limiting violations
3368    pub rate_limit_violations: u32,
3369    /// Invalid address rejections
3370    pub invalid_address_rejections: u32,
3371    /// Suspicious coordination attempts
3372    pub suspicious_coordination_attempts: u32,
3373    /// Number of active validations
3374    pub active_validations: usize,
3375    /// Number of cached address validations
3376    pub cached_address_validations: usize,
3377    /// Current candidate addition rate
3378    pub current_candidate_rate: usize,
3379    /// Current coordination request rate
3380    pub current_coordination_rate: usize,
3381}
3382
3383/// Bootstrap coordinator state machine for NAT traversal coordination
3384///
3385/// This manages the bootstrap node's role in observing client addresses,
3386/// coordinating hole punching, and relaying coordination messages.
3387#[derive(Debug)]
3388pub(crate) struct BootstrapCoordinator {
3389    /// Active peer registry with observed addresses
3390    peer_registry: HashMap<PeerId, PeerObservationRecord>,
3391    /// Active coordination sessions between peers
3392    coordination_sessions: HashMap<CoordinationSessionId, CoordinationSession>,
3393    /// Pending coordination requests awaiting peer participation
3394    #[allow(dead_code)] // Used for coordination queue management
3395    pending_coordination: VecDeque<PendingCoordinationRequest>,
3396    /// Address observation cache for quick lookups
3397    #[allow(dead_code)] // Will be used for address correlation and verification
3398    address_observations: HashMap<SocketAddr, AddressObservation>,
3399    /// Security validator for coordination requests
3400    security_validator: SecurityValidationState,
3401    /// Statistics for bootstrap operations
3402    stats: BootstrapStats,
3403    /// Configuration for bootstrap behavior (stub)
3404    _config: BootstrapConfig,
3405    /// Last cleanup time (stub)
3406    _last_cleanup: Option<Instant>,
3407}
3408
3409/// Unique identifier for coordination sessions
3410type CoordinationSessionId = u64;
3411
3412/// Peer identifier for bootstrap coordination
3413type PeerId = [u8; 32];
3414
3415/// Record of observed peer information
3416#[derive(Debug, Clone)]
3417pub(crate) struct PeerObservationRecord {
3418    /// The peer's unique identifier
3419    #[allow(dead_code)] // Used for peer tracking
3420    peer_id: PeerId,
3421    /// Last observed public address
3422    observed_address: SocketAddr,
3423    /// When this observation was made
3424    #[allow(dead_code)] // Used for observation aging
3425    observed_at: Instant,
3426    /// Connection context for this observation
3427    #[allow(dead_code)] // Used for coordination context
3428    connection_context: ConnectionContext,
3429    /// Whether this peer can participate in coordination
3430    #[allow(dead_code)] // Used for coordination eligibility
3431    can_coordinate: bool,
3432    /// Number of successful coordinations
3433    #[allow(dead_code)] // Used for coordination statistics
3434    coordination_count: u32,
3435    /// Average coordination success rate
3436    #[allow(dead_code)] // Used for peer quality assessment
3437    success_rate: f64,
3438}
3439
3440/// Connection context for address observations
3441#[derive(Debug, Clone)]
3442pub(crate) struct ConnectionContext {
3443    /// Connection ID for this observation
3444    #[allow(dead_code)] // Used for connection correlation
3445    connection_id: ConnectionId,
3446    /// Original destination address (what peer thought it was connecting to)
3447    #[allow(dead_code)] // Used for NAT analysis
3448    original_destination: SocketAddr,
3449    /// NAT traversal role of the connecting peer
3450    #[allow(dead_code)] // Used for role-based decisions
3451    peer_role: NatTraversalRole,
3452    /// Transport parameters received from peer
3453    #[allow(dead_code)] // Used for capability negotiation
3454    transport_params: Option<NatTraversalTransportParams>,
3455}
3456
3457/// Transport parameters for NAT traversal
3458#[derive(Debug, Clone)]
3459struct NatTraversalTransportParams {
3460    /// Maximum candidates this peer can handle
3461    #[allow(dead_code)] // Used for capability negotiation
3462    max_candidates: u32,
3463    /// Coordination timeout for this peer
3464    #[allow(dead_code)] // Used for timing adjustments
3465    coordination_timeout: Duration,
3466    /// Whether this peer supports advanced features
3467    #[allow(dead_code)] // Used for feature detection
3468    supports_advanced_features: bool,
3469}
3470
3471/// Address observation with validation
3472#[derive(Debug, Clone)]
3473struct AddressObservation {
3474    /// The observed address
3475    #[allow(dead_code)] // Used for address tracking
3476    address: SocketAddr,
3477    /// When this address was first observed
3478    #[allow(dead_code)] // Used for observation aging
3479    first_observed: Instant,
3480    /// How many times this address has been observed
3481    #[allow(dead_code)] // Will be used for address reliability scoring
3482    observation_count: u32,
3483    /// Validation state for this address
3484    #[allow(dead_code)] // Used for address validation tracking
3485    validation_state: AddressValidationResult,
3486    /// Associated peer IDs for this address
3487    #[allow(dead_code)] // Will be used for peer-address correlation
3488    associated_peers: Vec<PeerId>,
3489}
3490
3491/// Active coordination session between two peers
3492#[derive(Debug, Clone)]
3493#[allow(dead_code)] // Fields tracked for coordination state management
3494pub(crate) struct CoordinationSession {
3495    /// Unique session identifier
3496    session_id: CoordinationSessionId,
3497    /// First peer in coordination
3498    peer_a: PeerId,
3499    /// Second peer in coordination
3500    peer_b: PeerId,
3501    /// Current coordination round
3502    current_round: VarInt,
3503    /// When this session started
3504    started_at: Instant,
3505    /// Current phase of coordination
3506    phase: CoordinationPhase,
3507    /// Target addresses for hole punching
3508    target_addresses: Vec<(SocketAddr, VarInt)>, // (address, sequence)
3509    /// Synchronization state
3510    sync_state: SynchronizationState,
3511    /// Session statistics
3512    stats: CoordinationSessionStats,
3513}
3514
3515/// Synchronization state for coordinated hole punching
3516#[derive(Debug, Clone)]
3517struct SynchronizationState {
3518    /// Confirmation from peer A
3519    peer_a_ready: bool,
3520    /// Confirmation from peer B
3521    peer_b_ready: bool,
3522}
3523
3524/// Statistics for a coordination session
3525#[derive(Debug, Clone, Default)]
3526struct CoordinationSessionStats {
3527    /// Number of successful coordinations
3528    successful_coordinations: u32,
3529}
3530
3531/// Pending coordination request awaiting peer participation (stub implementation)
3532#[derive(Debug, Clone)]
3533struct PendingCoordinationRequest {
3534    _unused: (),
3535}
3536
3537/// Configuration for bootstrap coordinator behavior (stub implementation)
3538#[derive(Debug, Clone)]
3539pub(crate) struct BootstrapConfig {
3540    _unused: (),
3541}
3542
3543/// Statistics for bootstrap operations
3544#[derive(Debug, Clone, Default)]
3545pub(crate) struct BootstrapStats {
3546    /// Total address observations made
3547    #[allow(dead_code)] // Will be used for bootstrap performance metrics
3548    total_observations: u64,
3549    /// Total coordination sessions facilitated
3550    total_coordinations: u64,
3551    /// Successful coordinations
3552    successful_coordinations: u64,
3553    /// Active peer count
3554    #[allow(dead_code)] // Will be used for bootstrap load monitoring
3555    active_peers: usize,
3556    /// Active coordination sessions
3557    active_sessions: usize,
3558    /// Security rejections
3559    security_rejections: u64,
3560}
3561
3562/// Events generated by the coordination session state machine
3563#[derive(Debug, Clone)]
3564#[allow(dead_code)] // Will be used for coordination event handling and logging
3565pub(crate) enum CoordinationSessionEvent {
3566    /// Session phase changed
3567    PhaseChanged {
3568        session_id: CoordinationSessionId,
3569        old_phase: CoordinationPhase,
3570        new_phase: CoordinationPhase,
3571    },
3572    /// Session failed with reason
3573    SessionFailed {
3574        session_id: CoordinationSessionId,
3575        peer_a: PeerId,
3576        peer_b: PeerId,
3577        reason: String,
3578    },
3579    /// Start hole punching for session
3580    StartHolePunching {
3581        session_id: CoordinationSessionId,
3582        peer_a: PeerId,
3583        peer_b: PeerId,
3584        target_addresses: Vec<(SocketAddr, VarInt)>,
3585    },
3586    /// Session ready for cleanup
3587    ReadyForCleanup { session_id: CoordinationSessionId },
3588}
3589
3590/// Events that trigger session state advancement
3591#[derive(Debug, Clone, Copy)]
3592#[allow(dead_code)] // Will be used in coordination state machine implementation
3593enum SessionAdvancementEvent {
3594    /// Both peers are ready for coordination
3595    BothPeersReady,
3596    /// Coordination phase completed
3597    CoordinationComplete,
3598    /// Preparation phase completed
3599    PreparationComplete,
3600    /// Hole punching phase completed
3601    PunchingComplete,
3602    /// Validation timed out
3603    ValidationTimeout,
3604    /// Session timed out
3605    Timeout,
3606    /// Session ready for cleanup
3607    ReadyForCleanup,
3608}
3609
3610/// Recovery actions for coordination errors
3611#[derive(Debug, Clone, Copy)]
3612#[allow(dead_code)] // Will be used for coordination error recovery logic
3613pub(crate) enum CoordinationRecoveryAction {
3614    /// No action needed
3615    NoAction,
3616    /// Retry with exponential backoff
3617    RetryWithBackoff,
3618    /// Mark session as failed
3619    MarkAsFailed,
3620    /// Clean up session
3621    Cleanup,
3622}
3623
3624impl BootstrapCoordinator {
3625    /// Create a new bootstrap coordinator
3626    pub(crate) fn new(config: BootstrapConfig) -> Self {
3627        Self {
3628            peer_registry: HashMap::new(),
3629            coordination_sessions: HashMap::new(),
3630            pending_coordination: VecDeque::new(),
3631            address_observations: HashMap::new(),
3632            security_validator: SecurityValidationState::new(),
3633            stats: BootstrapStats::default(),
3634            _config: config,
3635            _last_cleanup: None,
3636        }
3637    }
3638
3639    /// Observe a peer's address from an incoming connection
3640    ///
3641    /// This is called when a peer connects to this bootstrap node,
3642    /// allowing us to observe their public address.
3643    #[allow(dead_code)] // Reserved for peer address observation
3644    pub(crate) fn observe_peer_address(
3645        &mut self,
3646        peer_id: PeerId,
3647        observed_address: SocketAddr,
3648        connection_context: ConnectionContext,
3649        now: Instant,
3650    ) -> Result<(), NatTraversalError> {
3651        // Security validation
3652        match self
3653            .security_validator
3654            .validate_address(observed_address, now)
3655        {
3656            AddressValidationResult::Valid => {}
3657            AddressValidationResult::Invalid => {
3658                self.stats.security_rejections += 1;
3659                return Err(NatTraversalError::InvalidAddress);
3660            }
3661            AddressValidationResult::Suspicious => {
3662                self.stats.security_rejections += 1;
3663                return Err(NatTraversalError::SecurityValidationFailed);
3664            }
3665        }
3666
3667        // Rate limiting check
3668        if self.security_validator.is_candidate_rate_limited(now) {
3669            self.stats.security_rejections += 1;
3670            return Err(NatTraversalError::RateLimitExceeded);
3671        }
3672
3673        // Update address observation
3674        let observation = self
3675            .address_observations
3676            .entry(observed_address)
3677            .or_insert_with(|| AddressObservation {
3678                address: observed_address,
3679                first_observed: now,
3680                observation_count: 0,
3681                validation_state: AddressValidationResult::Valid,
3682                associated_peers: Vec::new(),
3683            });
3684
3685        observation.observation_count += 1;
3686        if !observation.associated_peers.contains(&peer_id) {
3687            observation.associated_peers.push(peer_id);
3688        }
3689
3690        // Update or create peer record
3691        let peer_record = PeerObservationRecord {
3692            peer_id,
3693            observed_address,
3694            observed_at: now,
3695            connection_context,
3696            can_coordinate: true, // Assume true initially
3697            coordination_count: 0,
3698            success_rate: 1.0,
3699        };
3700
3701        self.peer_registry.insert(peer_id, peer_record);
3702        self.stats.total_observations += 1;
3703        self.stats.active_peers = self.peer_registry.len();
3704
3705        debug!(
3706            "Observed peer {:?} at address {} (total observations: {})",
3707            peer_id, observed_address, self.stats.total_observations
3708        );
3709
3710        Ok(())
3711    }
3712
3713    /// Generate ADD_ADDRESS frame for a peer based on observation
3714    ///
3715    /// This creates an ADD_ADDRESS frame to inform a peer of their
3716    /// observed public address.
3717    #[allow(dead_code)] // Reserved for ADD_ADDRESS frame generation
3718    pub(crate) fn generate_add_address_frame(
3719        &self,
3720        peer_id: PeerId,
3721        sequence: VarInt,
3722        priority: VarInt,
3723    ) -> Option<crate::frame::AddAddress> {
3724        if let Some(peer_record) = self.peer_registry.get(&peer_id) {
3725            Some(crate::frame::AddAddress {
3726                sequence,
3727                address: peer_record.observed_address,
3728                priority,
3729            })
3730        } else {
3731            None
3732        }
3733    }
3734
3735    /// Process a PUNCH_ME_NOW frame from a peer
3736    ///
3737    /// This handles coordination requests from peers wanting to establish
3738    /// direct connections through NAT traversal.
3739    pub(crate) fn process_punch_me_now_frame(
3740        &mut self,
3741        from_peer: PeerId,
3742        source_addr: SocketAddr,
3743        frame: &crate::frame::PunchMeNow,
3744        now: Instant,
3745    ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3746        // Enhanced security validation with adaptive rate limiting
3747        if self
3748            .security_validator
3749            .is_adaptive_rate_limited(from_peer, now)
3750        {
3751            self.stats.security_rejections += 1;
3752            debug!(
3753                "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3754                hex::encode(&from_peer[..8])
3755            );
3756            return Err(NatTraversalError::RateLimitExceeded);
3757        }
3758
3759        // Enhanced address validation with amplification protection
3760        self.security_validator
3761            .enhanced_address_validation(frame.local_address, source_addr, now)
3762            .map_err(|e| {
3763                self.stats.security_rejections += 1;
3764                debug!(
3765                    "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3766                    hex::encode(&from_peer[..8]),
3767                    e
3768                );
3769                e
3770            })?;
3771
3772        // Comprehensive security validation
3773        self.security_validator
3774            .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3775            .map_err(|e| {
3776                self.stats.security_rejections += 1;
3777                debug!(
3778                    "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3779                    hex::encode(&from_peer[..8]),
3780                    e
3781                );
3782                e
3783            })?;
3784
3785        // Check if we have a target peer for this coordination
3786        if let Some(target_peer_id) = frame.target_peer_id {
3787            // This is a coordination request that should be relayed
3788            if let Some(target_peer) = self.peer_registry.get(&target_peer_id) {
3789                // Create coordination session if it doesn't exist
3790                let session_id = self.generate_session_id();
3791
3792                if !self.coordination_sessions.contains_key(&session_id) {
3793                    // Calculate optimal coordination timing based on network conditions
3794                    let _network_rtt = self
3795                        .estimate_peer_rtt(&from_peer)
3796                        .unwrap_or(Duration::from_millis(100));
3797
3798                    let session = CoordinationSession {
3799                        session_id,
3800                        peer_a: from_peer,
3801                        peer_b: target_peer_id,
3802                        current_round: frame.round,
3803                        started_at: now,
3804                        phase: CoordinationPhase::Requesting,
3805                        target_addresses: vec![(frame.local_address, frame.target_sequence)],
3806                        sync_state: SynchronizationState {
3807                            peer_a_ready: true, // Requesting peer is ready
3808                            peer_b_ready: false,
3809                        },
3810                        stats: CoordinationSessionStats::default(),
3811                    };
3812
3813                    self.coordination_sessions.insert(session_id, session);
3814                    self.stats.total_coordinations += 1;
3815                    self.stats.active_sessions = self.coordination_sessions.len();
3816                }
3817
3818                // Generate coordination frame to send to target peer
3819                let coordination_frame = crate::frame::PunchMeNow {
3820                    round: frame.round,
3821                    target_sequence: frame.target_sequence,
3822                    local_address: target_peer.observed_address,
3823                    target_peer_id: Some(from_peer),
3824                };
3825
3826                info!(
3827                    "Coordinating hole punch between {:?} and {:?} (round: {})",
3828                    from_peer, target_peer_id, frame.round
3829                );
3830
3831                Ok(Some(coordination_frame))
3832            } else {
3833                // Target peer not found
3834                warn!(
3835                    "Target peer {:?} not found for coordination from {:?}",
3836                    target_peer_id, from_peer
3837                );
3838                Ok(None)
3839            }
3840        } else {
3841            // This is a response to coordination - update session state
3842            let session_id = if let Some(session) =
3843                self.find_coordination_session_by_peer(from_peer, frame.round)
3844            {
3845                session.sync_state.peer_b_ready = true;
3846
3847                // If both peers are ready, coordination is complete
3848                if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3849                    session.phase = CoordinationPhase::Punching;
3850                    session.stats.successful_coordinations += 1;
3851                    Some(session.session_id)
3852                } else {
3853                    None
3854                }
3855            } else {
3856                None
3857            };
3858
3859            // Update stats after releasing the mutable borrow
3860            if let Some(session_id) = session_id {
3861                self.stats.successful_coordinations += 1;
3862                info!(
3863                    "Coordination complete for session {} (round: {})",
3864                    session_id, frame.round
3865                );
3866            }
3867
3868            Ok(None)
3869        }
3870    }
3871
3872    /// Find coordination session by peer and round
3873    fn find_coordination_session_by_peer(
3874        &mut self,
3875        peer_id: PeerId,
3876        round: VarInt,
3877    ) -> Option<&mut CoordinationSession> {
3878        self.coordination_sessions.values_mut().find(|session| {
3879            (session.peer_a == peer_id || session.peer_b == peer_id)
3880                && session.current_round == round
3881        })
3882    }
3883
3884    /// Generate unique session ID
3885    fn generate_session_id(&self) -> CoordinationSessionId {
3886        rand::random()
3887    }
3888
3889    /// Generate secure coordination round using cryptographically secure random values
3890    #[allow(dead_code)] // Reserved for secure round generation
3891    pub(crate) fn generate_secure_coordination_round(&self) -> VarInt {
3892        self.security_validator.generate_secure_coordination_round()
3893    }
3894
3895    /// Perform comprehensive security validation for coordination requests
3896    #[allow(dead_code)] // Reserved for security validation
3897    pub(crate) fn validate_coordination_security(
3898        &mut self,
3899        peer_id: PeerId,
3900        source_addr: SocketAddr,
3901        target_addr: SocketAddr,
3902        now: Instant,
3903    ) -> Result<(), NatTraversalError> {
3904        // Check adaptive rate limiting
3905        if self
3906            .security_validator
3907            .is_adaptive_rate_limited(peer_id, now)
3908        {
3909            self.stats.security_rejections += 1;
3910            return Err(NatTraversalError::RateLimitExceeded);
3911        }
3912
3913        // Perform enhanced address validation
3914        self.security_validator
3915            .enhanced_address_validation(target_addr, source_addr, now)?;
3916
3917        // Check amplification limits
3918        self.security_validator
3919            .validate_amplification_limits(source_addr, target_addr, now)?;
3920
3921        Ok(())
3922    }
3923
3924    /// Clean up expired sessions and perform maintenance
3925    #[allow(dead_code)] // Reserved for session cleanup
3926    pub(crate) fn cleanup_expired_sessions(&mut self, now: Instant) {
3927        let session_timeout = Duration::from_secs(300); // 5 minutes
3928
3929        // Collect expired session IDs
3930        let expired_sessions: Vec<CoordinationSessionId> = self
3931            .coordination_sessions
3932            .iter()
3933            .filter(|(_, session)| now.duration_since(session.started_at) > session_timeout)
3934            .map(|(&session_id, _)| session_id)
3935            .collect();
3936
3937        // Remove expired sessions
3938        for session_id in expired_sessions {
3939            if let Some(session) = self.coordination_sessions.remove(&session_id) {
3940                debug!(
3941                    "Cleaned up expired coordination session {} between {:?} and {:?}",
3942                    session_id,
3943                    hex::encode(&session.peer_a[..8]),
3944                    hex::encode(&session.peer_b[..8])
3945                );
3946            }
3947        }
3948
3949        // Update active session count
3950        self.stats.active_sessions = self.coordination_sessions.len();
3951
3952        // Clean up old peer observations
3953        let observation_timeout = Duration::from_secs(3600); // 1 hour
3954        self.peer_registry
3955            .retain(|_, record| now.duration_since(record.observed_at) <= observation_timeout);
3956
3957        // Update active peer count
3958        self.stats.active_peers = self.peer_registry.len();
3959
3960        // Clean up address observations
3961        self.address_observations.retain(|_, observation| {
3962            now.duration_since(observation.first_observed) <= observation_timeout
3963        });
3964    }
3965
3966    /// Get bootstrap statistics
3967    #[allow(dead_code)] // Reserved for statistics retrieval
3968    pub(crate) fn get_stats(&self) -> &BootstrapStats {
3969        &self.stats
3970    }
3971
3972    /// Update peer coordination statistics
3973    #[allow(dead_code)] // Reserved for stats updates
3974    pub(crate) fn update_peer_coordination_stats(&mut self, peer_id: PeerId, success: bool) {
3975        if let Some(peer_record) = self.peer_registry.get_mut(&peer_id) {
3976            peer_record.coordination_count += 1;
3977
3978            if success {
3979                // Update success rate using exponential moving average
3980                let alpha = 0.1; // Learning rate
3981                peer_record.success_rate = peer_record.success_rate * (1.0 - alpha) + alpha;
3982            } else {
3983                // Decrease success rate
3984                let alpha = 0.1;
3985                peer_record.success_rate = peer_record.success_rate * (1.0 - alpha);
3986            }
3987
3988            // Disable coordination for peers with very low success rates
3989            if peer_record.success_rate < 0.1 && peer_record.coordination_count > 10 {
3990                peer_record.can_coordinate = false;
3991                warn!(
3992                    "Disabled coordination for peer {:?} due to low success rate: {:.2}",
3993                    hex::encode(&peer_id[..8]),
3994                    peer_record.success_rate
3995                );
3996            }
3997        }
3998    }
3999
4000    /// Poll session state machine and advance coordination sessions
4001    ///
4002    /// This method implements the core session state machine polling logic
4003    /// with timeout handling, retry mechanisms, and error recovery.
4004    #[allow(dead_code)] // Reserved for state machine polling
4005    pub(crate) fn poll_session_state_machine(
4006        &mut self,
4007        now: Instant,
4008    ) -> Vec<CoordinationSessionEvent> {
4009        let mut events = Vec::new();
4010        let mut sessions_to_update = Vec::new();
4011
4012        // Collect sessions that need state machine advancement
4013        for (&session_id, session) in &self.coordination_sessions {
4014            if let Some(event) = self.should_advance_session(session, now) {
4015                sessions_to_update.push((session_id, event));
4016            }
4017        }
4018
4019        // Process session updates
4020        for (session_id, event) in sessions_to_update {
4021            let session_events =
4022                if let Some(session) = self.coordination_sessions.get_mut(&session_id) {
4023                    let peer_a = session.peer_a;
4024                    let peer_b = session.peer_b;
4025
4026                    match Self::advance_session_state_static(session, event, now) {
4027                        Ok(session_events) => session_events,
4028                        Err(e) => {
4029                            warn!("Failed to advance session {} state: {:?}", session_id, e);
4030                            // Mark session as failed
4031                            session.phase = CoordinationPhase::Failed;
4032                            vec![CoordinationSessionEvent::SessionFailed {
4033                                session_id,
4034                                peer_a,
4035                                peer_b,
4036                                reason: format!("State advancement error: {:?}", e),
4037                            }]
4038                        }
4039                    }
4040                } else {
4041                    Vec::new()
4042                };
4043
4044            events.extend(session_events);
4045        }
4046
4047        // Clean up completed or failed sessions
4048        self.cleanup_completed_sessions(now);
4049
4050        events
4051    }
4052
4053    /// Check if a session should advance its state
4054    #[allow(dead_code)] // Reserved for session advancement logic
4055    fn should_advance_session(
4056        &self,
4057        session: &CoordinationSession,
4058        now: Instant,
4059    ) -> Option<SessionAdvancementEvent> {
4060        let session_age = now.duration_since(session.started_at);
4061
4062        match session.phase {
4063            CoordinationPhase::Requesting => {
4064                // Check if we've been waiting too long for peer responses
4065                if session_age > Duration::from_secs(10) {
4066                    Some(SessionAdvancementEvent::Timeout)
4067                } else if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4068                    Some(SessionAdvancementEvent::BothPeersReady)
4069                } else {
4070                    None
4071                }
4072            }
4073            CoordinationPhase::Coordinating => {
4074                // Move to preparing phase after brief coordination period
4075                if session_age > Duration::from_millis(500) {
4076                    Some(SessionAdvancementEvent::CoordinationComplete)
4077                } else {
4078                    None
4079                }
4080            }
4081            CoordinationPhase::Preparing => {
4082                // Move to punching phase after preparation period
4083                if session_age > Duration::from_secs(1) {
4084                    Some(SessionAdvancementEvent::PreparationComplete)
4085                } else {
4086                    None
4087                }
4088            }
4089            CoordinationPhase::Punching => {
4090                // Move to validation phase after punching period
4091                if session_age > Duration::from_secs(2) {
4092                    Some(SessionAdvancementEvent::PunchingComplete)
4093                } else {
4094                    None
4095                }
4096            }
4097            CoordinationPhase::Validating => {
4098                // Check for validation timeout
4099                if session_age > Duration::from_secs(10) {
4100                    Some(SessionAdvancementEvent::ValidationTimeout)
4101                } else {
4102                    None
4103                }
4104            }
4105            CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4106                // Terminal states - check for cleanup
4107                if session_age > Duration::from_secs(60) {
4108                    Some(SessionAdvancementEvent::ReadyForCleanup)
4109                } else {
4110                    None
4111                }
4112            }
4113            CoordinationPhase::Idle => {
4114                // Should not happen in active sessions
4115                Some(SessionAdvancementEvent::Timeout)
4116            }
4117        }
4118    }
4119
4120    /// Advance session state based on event (static version to avoid borrowing issues)
4121    #[allow(dead_code)] // Reserved for static state advancement
4122    fn advance_session_state_static(
4123        session: &mut CoordinationSession,
4124        event: SessionAdvancementEvent,
4125        _now: Instant,
4126    ) -> Result<Vec<CoordinationSessionEvent>, NatTraversalError> {
4127        let mut events = Vec::new();
4128        let previous_phase = session.phase;
4129
4130        match (session.phase, event) {
4131            (CoordinationPhase::Requesting, SessionAdvancementEvent::BothPeersReady) => {
4132                session.phase = CoordinationPhase::Coordinating;
4133                debug!(
4134                    "Session {} advanced from Requesting to Coordinating",
4135                    session.session_id
4136                );
4137                events.push(CoordinationSessionEvent::PhaseChanged {
4138                    session_id: session.session_id,
4139                    old_phase: previous_phase,
4140                    new_phase: session.phase,
4141                });
4142            }
4143            (CoordinationPhase::Requesting, SessionAdvancementEvent::Timeout) => {
4144                session.phase = CoordinationPhase::Failed;
4145                warn!(
4146                    "Session {} timed out in Requesting phase",
4147                    session.session_id
4148                );
4149                events.push(CoordinationSessionEvent::SessionFailed {
4150                    session_id: session.session_id,
4151                    peer_a: session.peer_a,
4152                    peer_b: session.peer_b,
4153                    reason: "Timeout waiting for peer responses".to_string(),
4154                });
4155            }
4156            (CoordinationPhase::Coordinating, SessionAdvancementEvent::CoordinationComplete) => {
4157                session.phase = CoordinationPhase::Preparing;
4158                debug!(
4159                    "Session {} advanced from Coordinating to Preparing",
4160                    session.session_id
4161                );
4162                events.push(CoordinationSessionEvent::PhaseChanged {
4163                    session_id: session.session_id,
4164                    old_phase: previous_phase,
4165                    new_phase: session.phase,
4166                });
4167            }
4168            (CoordinationPhase::Preparing, SessionAdvancementEvent::PreparationComplete) => {
4169                session.phase = CoordinationPhase::Punching;
4170                debug!(
4171                    "Session {} advanced from Preparing to Punching",
4172                    session.session_id
4173                );
4174                events.push(CoordinationSessionEvent::PhaseChanged {
4175                    session_id: session.session_id,
4176                    old_phase: previous_phase,
4177                    new_phase: session.phase,
4178                });
4179                events.push(CoordinationSessionEvent::StartHolePunching {
4180                    session_id: session.session_id,
4181                    peer_a: session.peer_a,
4182                    peer_b: session.peer_b,
4183                    target_addresses: session.target_addresses.clone(),
4184                });
4185            }
4186            (CoordinationPhase::Punching, SessionAdvancementEvent::PunchingComplete) => {
4187                session.phase = CoordinationPhase::Validating;
4188                debug!(
4189                    "Session {} advanced from Punching to Validating",
4190                    session.session_id
4191                );
4192                events.push(CoordinationSessionEvent::PhaseChanged {
4193                    session_id: session.session_id,
4194                    old_phase: previous_phase,
4195                    new_phase: session.phase,
4196                });
4197            }
4198            (CoordinationPhase::Validating, SessionAdvancementEvent::ValidationTimeout) => {
4199                session.phase = CoordinationPhase::Failed;
4200                warn!("Session {} validation timed out", session.session_id);
4201                events.push(CoordinationSessionEvent::SessionFailed {
4202                    session_id: session.session_id,
4203                    peer_a: session.peer_a,
4204                    peer_b: session.peer_b,
4205                    reason: "Validation timeout".to_string(),
4206                });
4207            }
4208            (phase, SessionAdvancementEvent::ReadyForCleanup) => {
4209                debug!(
4210                    "Session {} ready for cleanup in phase {:?}",
4211                    session.session_id, phase
4212                );
4213                events.push(CoordinationSessionEvent::ReadyForCleanup {
4214                    session_id: session.session_id,
4215                });
4216            }
4217            _ => {
4218                // Invalid state transition - log warning but don't fail
4219                warn!(
4220                    "Invalid state transition for session {}: {:?} -> {:?}",
4221                    session.session_id, session.phase, event
4222                );
4223            }
4224        }
4225
4226        Ok(events)
4227    }
4228
4229    /// Clean up completed or failed sessions
4230    #[allow(dead_code)] // Reserved for completed session cleanup
4231    fn cleanup_completed_sessions(&mut self, now: Instant) {
4232        let cleanup_timeout = Duration::from_secs(300); // 5 minutes
4233
4234        let sessions_to_remove: Vec<CoordinationSessionId> = self
4235            .coordination_sessions
4236            .iter()
4237            .filter(|(_, session)| {
4238                matches!(
4239                    session.phase,
4240                    CoordinationPhase::Succeeded | CoordinationPhase::Failed
4241                ) && now.duration_since(session.started_at) > cleanup_timeout
4242            })
4243            .map(|(&session_id, _)| session_id)
4244            .collect();
4245
4246        for session_id in sessions_to_remove {
4247            if let Some(session) = self.coordination_sessions.remove(&session_id) {
4248                debug!(
4249                    "Cleaned up completed session {} in phase {:?}",
4250                    session_id, session.phase
4251                );
4252            }
4253        }
4254
4255        self.stats.active_sessions = self.coordination_sessions.len();
4256    }
4257
4258    /// Implement retry mechanism with exponential backoff
4259    ///
4260    /// This method handles retry logic for failed coordination attempts
4261    /// with exponential backoff to avoid overwhelming the network.
4262    #[allow(dead_code)] // Reserved for retry handling
4263    pub(crate) fn retry_failed_coordination(
4264        &mut self,
4265        session_id: CoordinationSessionId,
4266        now: Instant,
4267    ) -> Result<bool, NatTraversalError> {
4268        let session = self
4269            .coordination_sessions
4270            .get_mut(&session_id)
4271            .ok_or(NatTraversalError::NoActiveCoordination)?;
4272
4273        // Check if session is in a retryable state
4274        if !matches!(session.phase, CoordinationPhase::Failed) {
4275            return Ok(false);
4276        }
4277
4278        // Calculate retry delay with exponential backoff
4279        let base_delay = Duration::from_secs(1);
4280        let max_delay = Duration::from_secs(60);
4281        let retry_count = session.stats.successful_coordinations; // Reuse this field for retry count
4282
4283        let delay = std::cmp::min(
4284            base_delay * 2_u32.pow(retry_count.min(10)), // Cap at 2^10 to prevent overflow
4285            max_delay,
4286        );
4287
4288        // Add jitter to prevent thundering herd
4289        let _jitter_factor = 0.1;
4290        let jitter =
4291            Duration::from_millis((rand::random::<u64>() % 100) * delay.as_millis() as u64 / 1000);
4292        let total_delay = delay + jitter;
4293
4294        // Check if enough time has passed for retry
4295        if now.duration_since(session.started_at) < total_delay {
4296            return Ok(false);
4297        }
4298
4299        // Check retry limits
4300        const MAX_RETRIES: u32 = 5;
4301        if retry_count >= MAX_RETRIES {
4302            warn!(
4303                "Session {} exceeded maximum retry attempts ({})",
4304                session_id, MAX_RETRIES
4305            );
4306            return Ok(false);
4307        }
4308
4309        // Reset session for retry
4310        session.phase = CoordinationPhase::Requesting;
4311        session.started_at = now;
4312        session.sync_state.peer_a_ready = false;
4313        session.sync_state.peer_b_ready = false;
4314        session.stats.successful_coordinations += 1; // Increment retry count
4315
4316        info!(
4317            "Retrying coordination session {} (attempt {})",
4318            session_id,
4319            retry_count + 1
4320        );
4321        Ok(true)
4322    }
4323
4324    /// Handle coordination errors with appropriate recovery strategies
4325    #[allow(dead_code)] // Reserved for error handling
4326    pub(crate) fn handle_coordination_error(
4327        &mut self,
4328        session_id: CoordinationSessionId,
4329        error: NatTraversalError,
4330        _now: Instant,
4331    ) -> CoordinationRecoveryAction {
4332        let session = match self.coordination_sessions.get_mut(&session_id) {
4333            Some(session) => session,
4334            None => return CoordinationRecoveryAction::NoAction,
4335        };
4336
4337        match error {
4338            NatTraversalError::RateLimitExceeded => {
4339                // Temporary error - retry with backoff
4340                warn!("Rate limit exceeded for session {}, will retry", session_id);
4341                CoordinationRecoveryAction::RetryWithBackoff
4342            }
4343            NatTraversalError::SecurityValidationFailed
4344            | NatTraversalError::SuspiciousCoordination => {
4345                // Security error - mark session as failed and don't retry
4346                session.phase = CoordinationPhase::Failed;
4347                warn!(
4348                    "Security validation failed for session {}, marking as failed",
4349                    session_id
4350                );
4351                CoordinationRecoveryAction::MarkAsFailed
4352            }
4353            NatTraversalError::InvalidAddress => {
4354                // Address error - might be temporary, allow limited retries
4355                warn!("Invalid address in session {}, allowing retry", session_id);
4356                CoordinationRecoveryAction::RetryWithBackoff
4357            }
4358            NatTraversalError::NoActiveCoordination => {
4359                // Session state error - clean up
4360                warn!(
4361                    "No active coordination for session {}, cleaning up",
4362                    session_id
4363                );
4364                CoordinationRecoveryAction::Cleanup
4365            }
4366            _ => {
4367                // Other errors - generic retry with backoff
4368                warn!(
4369                    "Coordination error for session {}: {:?}, will retry",
4370                    session_id, error
4371                );
4372                CoordinationRecoveryAction::RetryWithBackoff
4373            }
4374        }
4375    }
4376
4377    /// Estimate RTT to a specific peer based on observations
4378    fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
4379        // Simple estimation based on peer record
4380        // In a real implementation, this would use historical RTT data
4381        if let Some(_peer_record) = self.peer_registry.get(peer_id) {
4382            // Return a reasonable default based on peer observation patterns
4383            Some(Duration::from_millis(100))
4384        } else {
4385            None
4386        }
4387    }
4388
4389    /// Coordinate hole punching between two peers
4390    ///
4391    /// This method implements the core coordination logic for establishing
4392    /// direct P2P connections through NAT traversal.
4393    #[allow(dead_code)] // Reserved for hole punching coordination
4394    pub(crate) fn coordinate_hole_punching(
4395        &mut self,
4396        peer_a: PeerId,
4397        peer_b: PeerId,
4398        round: VarInt,
4399        now: Instant,
4400    ) -> Result<CoordinationSessionId, NatTraversalError> {
4401        // Validate that both peers are known and can coordinate
4402        let peer_a_record = self
4403            .peer_registry
4404            .get(&peer_a)
4405            .ok_or(NatTraversalError::UnknownCandidate)?;
4406        let peer_b_record = self
4407            .peer_registry
4408            .get(&peer_b)
4409            .ok_or(NatTraversalError::UnknownCandidate)?;
4410
4411        if !peer_a_record.can_coordinate || !peer_b_record.can_coordinate {
4412            return Err(NatTraversalError::InvalidCandidateState);
4413        }
4414
4415        // Generate unique session ID
4416        let session_id = self.generate_session_id();
4417
4418        // Create coordination session
4419        let session = CoordinationSession {
4420            session_id,
4421            peer_a,
4422            peer_b,
4423            current_round: round,
4424            started_at: now,
4425            phase: CoordinationPhase::Requesting,
4426            target_addresses: vec![
4427                (peer_a_record.observed_address, VarInt::from_u32(0)),
4428                (peer_b_record.observed_address, VarInt::from_u32(1)),
4429            ],
4430            sync_state: SynchronizationState {
4431                peer_a_ready: false,
4432                peer_b_ready: false,
4433            },
4434            stats: CoordinationSessionStats::default(),
4435        };
4436
4437        self.coordination_sessions.insert(session_id, session);
4438        self.stats.total_coordinations += 1;
4439        self.stats.active_sessions = self.coordination_sessions.len();
4440
4441        info!(
4442            "Started coordination session {} between peers {:?} and {:?} (round: {})",
4443            session_id,
4444            hex::encode(&peer_a[..8]),
4445            hex::encode(&peer_b[..8]),
4446            round
4447        );
4448
4449        Ok(session_id)
4450    }
4451
4452    /// Relay coordination frame between peers
4453    ///
4454    /// This method handles the relay of coordination messages between peers
4455    /// to facilitate synchronized hole punching.
4456    #[allow(dead_code)] // Reserved for frame relaying
4457    pub(crate) fn relay_coordination_frame(
4458        &mut self,
4459        session_id: CoordinationSessionId,
4460        from_peer: PeerId,
4461        frame: &crate::frame::PunchMeNow,
4462        _now: Instant,
4463    ) -> Result<Option<(PeerId, crate::frame::PunchMeNow)>, NatTraversalError> {
4464        let session = self
4465            .coordination_sessions
4466            .get_mut(&session_id)
4467            .ok_or(NatTraversalError::NoActiveCoordination)?;
4468
4469        // Validate that the sender is part of this session
4470        if session.peer_a != from_peer && session.peer_b != from_peer {
4471            return Err(NatTraversalError::SuspiciousCoordination);
4472        }
4473
4474        // Determine target peer
4475        let target_peer = if session.peer_a == from_peer {
4476            session.peer_b
4477        } else {
4478            session.peer_a
4479        };
4480
4481        // Get target peer's observed address
4482        let target_record = self
4483            .peer_registry
4484            .get(&target_peer)
4485            .ok_or(NatTraversalError::UnknownCandidate)?;
4486
4487        // Update session state based on frame
4488        if session.peer_a == from_peer {
4489            session.sync_state.peer_a_ready = true;
4490        } else {
4491            session.sync_state.peer_b_ready = true;
4492        }
4493
4494        // Create relay frame with target peer's information
4495        let relay_frame = crate::frame::PunchMeNow {
4496            round: frame.round,
4497            target_sequence: frame.target_sequence,
4498            local_address: target_record.observed_address,
4499            target_peer_id: Some(from_peer),
4500        };
4501
4502        // Check if coordination is complete
4503        if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4504            session.phase = CoordinationPhase::Coordinating;
4505            info!(
4506                "Coordination phase complete for session {} - both peers ready",
4507                session_id
4508            );
4509        }
4510
4511        debug!(
4512            "Relaying coordination frame from {:?} to {:?} in session {}",
4513            hex::encode(&from_peer[..8]),
4514            hex::encode(&target_peer[..8]),
4515            session_id
4516        );
4517
4518        Ok(Some((target_peer, relay_frame)))
4519    }
4520
4521    /// Implement round-based synchronization protocol
4522    ///
4523    /// This method manages the timing and synchronization of hole punching rounds
4524    /// to maximize the chances of successful NAT traversal.
4525    #[allow(dead_code)] // Reserved for round advancement
4526    pub(crate) fn advance_coordination_round(
4527        &mut self,
4528        session_id: CoordinationSessionId,
4529        now: Instant,
4530    ) -> Result<CoordinationPhase, NatTraversalError> {
4531        let session = self
4532            .coordination_sessions
4533            .get_mut(&session_id)
4534            .ok_or(NatTraversalError::NoActiveCoordination)?;
4535
4536        let previous_phase = session.phase;
4537
4538        // Advance the state machine based on current phase and timing
4539        match session.phase {
4540            CoordinationPhase::Requesting => {
4541                // Wait for both peers to send PUNCH_ME_NOW frames
4542                if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4543                    session.phase = CoordinationPhase::Coordinating;
4544                    debug!("Session {} advanced to Coordinating phase", session_id);
4545                }
4546            }
4547            CoordinationPhase::Coordinating => {
4548                // Calculate synchronized punch time
4549                let coordination_delay = Duration::from_millis(200); // Grace period
4550                let punch_time = now + coordination_delay;
4551
4552                session.phase = CoordinationPhase::Preparing;
4553                debug!(
4554                    "Session {} advanced to Preparing phase, punch time: {:?}",
4555                    session_id, punch_time
4556                );
4557            }
4558            CoordinationPhase::Preparing => {
4559                // Transition to active hole punching
4560                session.phase = CoordinationPhase::Punching;
4561                debug!("Session {} advanced to Punching phase", session_id);
4562            }
4563            CoordinationPhase::Punching => {
4564                // Wait for validation results
4565                session.phase = CoordinationPhase::Validating;
4566                debug!("Session {} advanced to Validating phase", session_id);
4567            }
4568            CoordinationPhase::Validating => {
4569                // Check for timeout or success
4570                let validation_timeout = Duration::from_secs(5);
4571                if now.duration_since(session.started_at) > validation_timeout {
4572                    session.phase = CoordinationPhase::Failed;
4573                    debug!("Session {} timed out in validation", session_id);
4574                }
4575            }
4576            CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4577                // Terminal states - no further advancement
4578            }
4579            CoordinationPhase::Idle => {
4580                // Should not happen in active sessions
4581                session.phase = CoordinationPhase::Requesting;
4582            }
4583        }
4584
4585        // Update statistics if phase changed
4586        if session.phase != previous_phase {
4587            match session.phase {
4588                CoordinationPhase::Succeeded => {
4589                    session.stats.successful_coordinations += 1;
4590                    self.stats.successful_coordinations += 1;
4591                }
4592                CoordinationPhase::Failed => {
4593                    // Update failure statistics
4594                }
4595                _ => {}
4596            }
4597        }
4598
4599        Ok(session.phase)
4600    }
4601
4602    /// Get coordination session by ID
4603    #[allow(dead_code)] // Reserved for session retrieval
4604    pub(crate) fn get_coordination_session(
4605        &self,
4606        session_id: CoordinationSessionId,
4607    ) -> Option<&CoordinationSession> {
4608        self.coordination_sessions.get(&session_id)
4609    }
4610
4611    /// Get mutable coordination session by ID
4612    #[allow(dead_code)] // Reserved for mutable session access
4613    pub(crate) fn get_coordination_session_mut(
4614        &mut self,
4615        session_id: CoordinationSessionId,
4616    ) -> Option<&mut CoordinationSession> {
4617        self.coordination_sessions.get_mut(&session_id)
4618    }
4619
4620    /// Mark coordination session as successful
4621    #[allow(dead_code)] // Reserved for success tracking
4622    pub(crate) fn mark_coordination_success(
4623        &mut self,
4624        session_id: CoordinationSessionId,
4625        _now: Instant,
4626    ) -> Result<(), NatTraversalError> {
4627        let session = self
4628            .coordination_sessions
4629            .get_mut(&session_id)
4630            .ok_or(NatTraversalError::NoActiveCoordination)?;
4631
4632        session.phase = CoordinationPhase::Succeeded;
4633        session.stats.successful_coordinations += 1;
4634        self.stats.successful_coordinations += 1;
4635
4636        // Update peer success rates
4637        if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4638            peer_a_record.coordination_count += 1;
4639            peer_a_record.success_rate =
4640                (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64 + 1.0)
4641                    / peer_a_record.coordination_count as f64;
4642        }
4643
4644        if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4645            peer_b_record.coordination_count += 1;
4646            peer_b_record.success_rate =
4647                (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64 + 1.0)
4648                    / peer_b_record.coordination_count as f64;
4649        }
4650
4651        info!("Coordination session {} marked as successful", session_id);
4652        Ok(())
4653    }
4654
4655    /// Mark coordination session as failed
4656    #[allow(dead_code)] // Reserved for failure tracking
4657    pub(crate) fn mark_coordination_failure(
4658        &mut self,
4659        session_id: CoordinationSessionId,
4660        reason: &str,
4661        _now: Instant,
4662    ) -> Result<(), NatTraversalError> {
4663        let session = self
4664            .coordination_sessions
4665            .get_mut(&session_id)
4666            .ok_or(NatTraversalError::NoActiveCoordination)?;
4667
4668        session.phase = CoordinationPhase::Failed;
4669
4670        // Update peer success rates
4671        if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4672            peer_a_record.coordination_count += 1;
4673            peer_a_record.success_rate = (peer_a_record.success_rate
4674                * (peer_a_record.coordination_count - 1) as f64)
4675                / peer_a_record.coordination_count as f64;
4676        }
4677
4678        if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4679            peer_b_record.coordination_count += 1;
4680            peer_b_record.success_rate = (peer_b_record.success_rate
4681                * (peer_b_record.coordination_count - 1) as f64)
4682                / peer_b_record.coordination_count as f64;
4683        }
4684
4685        warn!("Coordination session {} failed: {}", session_id, reason);
4686        Ok(())
4687    }
4688
4689    /// Get peer observation record
4690    pub(crate) fn get_peer_record(&self, peer_id: PeerId) -> Option<&PeerObservationRecord> {
4691        self.peer_registry.get(&peer_id)
4692    }
4693}
4694
4695impl Default for BootstrapConfig {
4696    fn default() -> Self {
4697        Self { _unused: () }
4698    }
4699}
4700
4701/// Multi-destination packet transmission manager for NAT traversal
4702///
4703/// This component handles simultaneous packet transmission to multiple candidate
4704/// addresses during hole punching attempts, maximizing the chances of successful
4705/// NAT traversal by sending packets to all viable destinations concurrently.
4706#[derive(Debug)]
4707#[allow(dead_code)] // Part of multi-path transmission infrastructure
4708pub(super) struct MultiDestinationTransmitter {
4709    /// Current transmission targets
4710    active_targets: Vec<MultiDestPunchTarget>,
4711    /// Transmission statistics
4712    stats: MultiDestTransmissionStats,
4713    /// Maximum number of concurrent targets
4714    max_targets: usize,
4715    /// Transmission rate limiting
4716    rate_limiter: TransmissionRateLimiter,
4717    /// Adaptive target selection
4718    target_selector: AdaptiveTargetSelector,
4719    /// Performance monitoring
4720    performance_monitor: TransmissionPerformanceMonitor,
4721}
4722
4723/// Statistics for multi-destination transmission (stub implementation)
4724#[derive(Debug, Default, Clone)]
4725pub(super) struct MultiDestTransmissionStats {
4726    _unused: (),
4727}
4728
4729/// Rate limiter for transmission bursts (stub implementation)
4730#[derive(Debug)]
4731struct TransmissionRateLimiter {
4732    _unused: (),
4733}
4734
4735/// Adaptive target selection based on network conditions (stub implementation)
4736#[derive(Debug)]
4737struct AdaptiveTargetSelector {
4738    _unused: (),
4739}
4740
4741/// Performance monitoring for transmission efficiency (stub implementation)
4742#[derive(Debug)]
4743struct TransmissionPerformanceMonitor {
4744    _unused: (),
4745}
4746
4747impl MultiDestinationTransmitter {
4748    /// Create a new multi-destination transmitter
4749    pub(super) fn new() -> Self {
4750        Self {
4751            active_targets: Vec::new(),
4752            stats: MultiDestTransmissionStats::default(),
4753            max_targets: 8, // Maximum concurrent targets
4754            rate_limiter: TransmissionRateLimiter::new(100, 50), // 100 pps, burst 50
4755            target_selector: AdaptiveTargetSelector::new(),
4756            performance_monitor: TransmissionPerformanceMonitor::new(),
4757        }
4758    }
4759}
4760
4761impl TransmissionRateLimiter {
4762    fn new(_max_pps: u64, _burst_size: u64) -> Self {
4763        Self { _unused: () }
4764    }
4765}
4766
4767impl AdaptiveTargetSelector {
4768    fn new() -> Self {
4769        Self { _unused: () }
4770    }
4771}
4772
4773impl TransmissionPerformanceMonitor {
4774    fn new() -> Self {
4775        Self { _unused: () }
4776    }
4777}
4778
4779// TODO: Fix nat_traversal_tests module imports
4780// #[cfg(test)]
4781// #[path = "nat_traversal_tests.rs"]
4782// mod tests;
4783
4784#[cfg(test)]
4785mod tests {
4786    use super::*;
4787
4788    fn create_test_state(role: NatTraversalRole) -> NatTraversalState {
4789        NatTraversalState::new(
4790            role,
4791            10,  // max_candidates
4792            Duration::from_secs(30)  // coordination_timeout
4793        )
4794    }
4795
4796    #[test]
4797    fn test_add_quic_discovered_address() {
4798        // Test that QUIC-discovered addresses are properly added as local candidates
4799        let mut state = create_test_state(NatTraversalRole::Client);
4800        let now = Instant::now();
4801        
4802        // Add a QUIC-discovered address (using add_local_candidate with Observed source)
4803        let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
4804        let seq = state.add_local_candidate(
4805            discovered_addr, 
4806            CandidateSource::Observed { by_node: None },
4807            now
4808        );
4809        
4810        // Verify it was added correctly
4811        assert_eq!(state.local_candidates.len(), 1);
4812        let candidate = state.local_candidates.get(&seq).unwrap();
4813        assert_eq!(candidate.address, discovered_addr);
4814        assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4815        assert_eq!(candidate.state, CandidateState::New);
4816        
4817        // Verify priority is set appropriately for server-reflexive
4818        assert!(candidate.priority > 0);
4819    }
4820
4821    #[test]
4822    fn test_add_multiple_quic_discovered_addresses() {
4823        // Test adding multiple QUIC-discovered addresses
4824        let mut state = create_test_state(NatTraversalRole::Client);
4825        let now = Instant::now();
4826        
4827        let addrs = vec![
4828            SocketAddr::from(([1, 2, 3, 4], 5678)),
4829            SocketAddr::from(([5, 6, 7, 8], 9012)),
4830            SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
4831        ];
4832        
4833        let mut sequences = Vec::new();
4834        for addr in &addrs {
4835            let seq = state.add_local_candidate(
4836                *addr, 
4837                CandidateSource::Observed { by_node: None },
4838                now
4839            );
4840            sequences.push(seq);
4841        }
4842        
4843        // Verify all were added
4844        assert_eq!(state.local_candidates.len(), 3);
4845        
4846        // Verify each address
4847        for (seq, addr) in sequences.iter().zip(&addrs) {
4848            let candidate = state.local_candidates.get(seq).unwrap();
4849            assert_eq!(candidate.address, *addr);
4850            assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4851        }
4852    }
4853
4854    #[test]
4855    fn test_quic_discovered_addresses_in_local_candidates() {
4856        // Test that QUIC-discovered addresses are included in local candidates
4857        let mut state = create_test_state(NatTraversalRole::Client);
4858        let now = Instant::now();
4859        
4860        // Add a discovered address
4861        let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4862        let seq = state.add_local_candidate(
4863            addr, 
4864            CandidateSource::Observed { by_node: None },
4865            now
4866        );
4867        
4868        // Verify it's in local candidates for advertisement
4869        assert!(state.local_candidates.contains_key(&seq));
4870        let candidate = state.local_candidates.get(&seq).unwrap();
4871        assert_eq!(candidate.address, addr);
4872        
4873        // Verify it has appropriate priority for server-reflexive
4874        assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4875    }
4876
4877    #[test]
4878    fn test_quic_discovered_addresses_included_in_hole_punching() {
4879        // Test that QUIC-discovered addresses are used in hole punching
4880        let mut state = create_test_state(NatTraversalRole::Client);
4881        let now = Instant::now();
4882        
4883        // Add a local discovered address
4884        let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4885        state.add_local_candidate(
4886            local_addr, 
4887            CandidateSource::Observed { by_node: None },
4888            now
4889        );
4890        
4891        // Add a remote candidate (using valid public IP, not documentation range)
4892        let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
4893        let priority = VarInt::from_u32(100);
4894        state.add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
4895            .expect("add remote candidate should succeed");
4896        
4897        // Generate candidate pairs
4898        state.generate_candidate_pairs(now);
4899        
4900        // Should have one pair
4901        assert_eq!(state.candidate_pairs.len(), 1);
4902        let pair = &state.candidate_pairs[0];
4903        assert_eq!(pair.local_addr, local_addr);
4904        assert_eq!(pair.remote_addr, remote_addr);
4905    }
4906
4907    #[test]
4908    fn test_prioritize_quic_discovered_over_predicted() {
4909        // Test that QUIC-discovered addresses have higher priority than predicted
4910        let mut state = create_test_state(NatTraversalRole::Client);
4911        let now = Instant::now();
4912        
4913        // Add a predicted address
4914        let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
4915        let predicted_seq = state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
4916        
4917        // Add a QUIC-discovered address
4918        let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
4919        let discovered_seq = state.add_local_candidate(
4920            discovered_addr, 
4921            CandidateSource::Observed { by_node: None },
4922            now
4923        );
4924        
4925        // Compare priorities
4926        let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
4927        let discovered_priority = state.local_candidates.get(&discovered_seq).unwrap().priority;
4928        
4929        // QUIC-discovered (server-reflexive) should have higher priority than predicted
4930        // Both are server-reflexive type, but observed addresses should get higher local preference
4931        assert!(discovered_priority >= predicted_priority);
4932    }
4933
4934    #[test]
4935    fn test_integration_with_nat_traversal_flow() {
4936        // Test full integration with NAT traversal flow
4937        let mut state = create_test_state(NatTraversalRole::Client);
4938        let now = Instant::now();
4939        
4940        // Add both local interface and QUIC-discovered addresses
4941        let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
4942        state.add_local_candidate(local_addr, CandidateSource::Local, now);
4943        
4944        let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
4945        state.add_local_candidate(
4946            discovered_addr, 
4947            CandidateSource::Observed { by_node: None },
4948            now
4949        );
4950        
4951        // Add remote candidates (using valid public IPs)
4952        let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
4953        let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
4954        let priority = VarInt::from_u32(100);
4955        state.add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
4956            .expect("add remote candidate should succeed");
4957        state.add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
4958            .expect("add remote candidate should succeed");
4959        
4960        // Generate candidate pairs
4961        state.generate_candidate_pairs(now);
4962        
4963        // Should have 4 pairs (2 local × 2 remote)
4964        assert_eq!(state.candidate_pairs.len(), 4);
4965        
4966        // Verify QUIC-discovered addresses are included
4967        let discovered_pairs: Vec<_> = state.candidate_pairs.iter()
4968            .filter(|p| p.local_addr == discovered_addr)
4969            .collect();
4970        assert_eq!(discovered_pairs.len(), 2);
4971    }
4972}