ant_quic/
candidate_discovery.rs

1//! Candidate Discovery System for QUIC NAT Traversal
2//!
3//! This module implements sophisticated address candidate discovery including:
4//! - Local network interface enumeration (platform-specific)
5//! - Server reflexive address discovery via bootstrap nodes
6//! - Symmetric NAT port prediction algorithms
7//! - Bootstrap node health management and consensus
8
9use std::{
10    collections::{HashMap, VecDeque},
11    net::{IpAddr, SocketAddr},
12    sync::Arc,
13    time::{Duration, Instant},
14};
15
16use tracing::{debug, error, info, warn};
17
18use crate::Connection;
19
20use crate::{
21    connection::nat_traversal::{CandidateSource, CandidateState},
22    nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
23};
24
25// Platform-specific implementations
26#[cfg(all(target_os = "windows", feature = "network-discovery"))]
27pub mod windows;
28
29#[cfg(all(target_os = "windows", feature = "network-discovery"))]
30pub use windows::WindowsInterfaceDiscovery;
31
32#[cfg(all(target_os = "linux", feature = "network-discovery"))]
33pub mod linux;
34
35#[cfg(all(target_os = "linux", feature = "network-discovery"))]
36pub use linux::LinuxInterfaceDiscovery;
37
38#[cfg(all(target_os = "macos", feature = "network-discovery"))]
39pub(crate) mod macos;
40
41#[cfg(all(target_os = "macos", feature = "network-discovery"))]
42pub(crate) use macos::MacOSInterfaceDiscovery;
43
44/// Convert discovery source type to NAT traversal source type
45fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
46    match discovery_source {
47        DiscoverySourceType::Local => CandidateSource::Local,
48        DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
49        DiscoverySourceType::Predicted => CandidateSource::Predicted,
50    }
51}
52
53/// Source type used during discovery process
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum DiscoverySourceType {
56    Local,
57    ServerReflexive,
58    Predicted,
59}
60
61/// Internal candidate type used during discovery
62#[derive(Debug, Clone)]
63pub(crate) struct DiscoveryCandidate {
64    pub address: SocketAddr,
65    pub priority: u32,
66    pub source: DiscoverySourceType,
67    pub state: CandidateState,
68}
69
70impl DiscoveryCandidate {
71    /// Convert to external CandidateAddress
72    pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
73        CandidateAddress {
74            address: self.address,
75            priority: self.priority,
76            source: convert_to_nat_source(self.source),
77            state: self.state,
78        }
79    }
80}
81
82/// Per-peer discovery session containing all state for a single peer's discovery
83#[derive(Debug)]
84
85pub struct DiscoverySession {
86    /// Peer ID for this discovery session
87    peer_id: PeerId,
88    /// Unique session identifier
89    session_id: u64,
90    /// Current discovery phase
91    current_phase: DiscoveryPhase,
92    /// Session start time
93    started_at: Instant,
94    /// Discovered candidates for this peer
95    discovered_candidates: Vec<DiscoveryCandidate>,
96    /// Discovery statistics
97    statistics: DiscoveryStatistics,
98    /// Port allocation history
99    allocation_history: VecDeque<PortAllocationEvent>,
100    /// Server reflexive discovery state
101    server_reflexive_discovery: ServerReflexiveDiscovery,
102}
103
104/// Main candidate discovery manager coordinating all discovery phases
105pub struct CandidateDiscoveryManager {
106    /// Configuration for discovery behavior
107    config: DiscoveryConfig,
108    /// Platform-specific interface discovery (shared)
109    interface_discovery: Arc<std::sync::Mutex<Box<dyn NetworkInterfaceDiscovery + Send>>>,
110    /// Symmetric NAT prediction engine (shared)
111    symmetric_predictor: Arc<std::sync::Mutex<SymmetricNatPredictor>>,
112    /// Bootstrap node health manager (shared)
113    bootstrap_manager: Arc<BootstrapNodeManager>,
114    /// Discovery result cache (shared)
115    cache: DiscoveryCache,
116    /// Active discovery sessions per peer
117    active_sessions: HashMap<PeerId, DiscoverySession>,
118    /// Cached local interface results (shared across all sessions)
119    cached_local_candidates: Option<(Instant, Vec<ValidatedCandidate>)>,
120    /// Cache duration for local candidates
121    local_cache_duration: Duration,
122    /// Pending path validations
123    pending_validations: HashMap<CandidateId, PendingValidation>,
124}
125
126/// Configuration for candidate discovery behavior
127#[derive(Debug, Clone)]
128pub struct DiscoveryConfig {
129    /// Maximum time for entire discovery process
130    pub total_timeout: Duration,
131    /// Maximum time for local interface scanning
132    pub local_scan_timeout: Duration,
133    /// Timeout for individual bootstrap queries
134    pub bootstrap_query_timeout: Duration,
135    /// Maximum number of query retries per bootstrap node
136    pub max_query_retries: u32,
137    /// Maximum number of candidates to discover
138    pub max_candidates: usize,
139    /// Enable symmetric NAT prediction
140    pub enable_symmetric_prediction: bool,
141    /// Minimum bootstrap nodes required for consensus
142    pub min_bootstrap_consensus: usize,
143    /// Cache TTL for local interfaces
144    pub interface_cache_ttl: Duration,
145    /// Cache TTL for server reflexive addresses
146    pub server_reflexive_cache_ttl: Duration,
147    /// Actual bound address of the local endpoint (if known)
148    pub bound_address: Option<SocketAddr>,
149}
150
151/// Current phase of the discovery process
152#[derive(Debug, Clone, PartialEq)]
153pub enum DiscoveryPhase {
154    /// Initial state, ready to begin discovery
155    Idle,
156    /// Scanning local network interfaces
157    LocalInterfaceScanning { started_at: Instant },
158    /// Querying bootstrap nodes for server reflexive addresses
159    ServerReflexiveQuerying {
160        started_at: Instant,
161        active_queries: HashMap<BootstrapNodeId, QueryState>,
162        responses_received: Vec<ServerReflexiveResponse>,
163    },
164    /// Analyzing NAT behavior and predicting symmetric ports
165    SymmetricNatPrediction {
166        started_at: Instant,
167        prediction_attempts: u32,
168        pattern_analysis: PatternAnalysisState,
169    },
170    /// Validating discovered candidates
171    CandidateValidation {
172        started_at: Instant,
173        validation_results: HashMap<CandidateId, ValidationResult>,
174    },
175    /// Discovery completed successfully
176    Completed {
177        final_candidates: Vec<ValidatedCandidate>,
178        completion_time: Instant,
179    },
180    /// Discovery failed with error details
181    Failed {
182        /// The discovery error that occurred
183        error: DiscoveryError,
184        /// When the failure occurred
185        failed_at: Instant,
186        /// Available fallback strategies
187        fallback_options: Vec<FallbackStrategy>,
188    },
189}
190
191/// Events generated during candidate discovery
192#[derive(Debug, Clone)]
193pub enum DiscoveryEvent {
194    /// Discovery process started
195    DiscoveryStarted {
196        peer_id: PeerId,
197        bootstrap_count: usize,
198    },
199    /// Local interface scanning started
200    LocalScanningStarted,
201    /// Local candidate discovered
202    LocalCandidateDiscovered { candidate: CandidateAddress },
203    /// Local interface scanning completed
204    LocalScanningCompleted {
205        candidate_count: usize,
206        duration: Duration,
207    },
208    /// Server reflexive discovery started
209    ServerReflexiveDiscoveryStarted { bootstrap_count: usize },
210    /// Server reflexive address discovered
211    ServerReflexiveCandidateDiscovered {
212        candidate: CandidateAddress,
213        bootstrap_node: SocketAddr,
214    },
215    /// Bootstrap node query failed
216    BootstrapQueryFailed {
217        /// The bootstrap node that failed
218        bootstrap_node: SocketAddr,
219        /// The error message
220        error: String,
221    },
222    /// Symmetric NAT prediction started
223    SymmetricPredictionStarted { base_address: SocketAddr },
224    /// Predicted candidate generated
225    PredictedCandidateGenerated {
226        candidate: CandidateAddress,
227        confidence: f64,
228    },
229    /// Port allocation pattern detected
230    PortAllocationDetected {
231        port: u16,
232        source_address: SocketAddr,
233        bootstrap_node: BootstrapNodeId,
234        timestamp: Instant,
235    },
236    /// Discovery completed successfully
237    DiscoveryCompleted {
238        candidate_count: usize,
239        total_duration: Duration,
240        success_rate: f64,
241    },
242    /// Discovery failed
243    DiscoveryFailed {
244        /// The discovery error that occurred
245        error: DiscoveryError,
246        /// Any partial results before failure
247        partial_results: Vec<CandidateAddress>,
248    },
249    /// Path validation requested for a candidate
250    PathValidationRequested {
251        candidate_id: CandidateId,
252        candidate_address: SocketAddr,
253        challenge_token: u64,
254    },
255    /// Path validation response received
256    PathValidationResponse {
257        candidate_id: CandidateId,
258        candidate_address: SocketAddr,
259        challenge_token: u64,
260        rtt: Duration,
261    },
262}
263
264/// Unique identifier for bootstrap nodes
265#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
266pub struct BootstrapNodeId(pub u64);
267
268/// Pending path validation state
269struct PendingValidation {
270    /// Address being validated
271    candidate_address: SocketAddr,
272    /// Challenge token sent
273    challenge_token: u64,
274    /// When validation started
275    started_at: Instant,
276    /// Number of attempts made
277    attempts: u32,
278}
279
280/// State of a bootstrap node query
281#[derive(Debug, Clone, PartialEq, Eq)]
282pub enum QueryState {
283    /// Query is pending (in progress)
284    Pending { sent_at: Instant, attempts: u32 },
285    /// Query completed successfully
286    Completed,
287    /// Query failed after all retries
288    Failed,
289}
290
291/// Response from server reflexive discovery
292#[derive(Debug, Clone, PartialEq)]
293pub struct ServerReflexiveResponse {
294    pub bootstrap_node: BootstrapNodeId,
295    pub observed_address: SocketAddr,
296    pub response_time: Duration,
297    pub timestamp: Instant,
298}
299
300/// State for symmetric NAT pattern analysis
301#[derive(Debug, Clone, PartialEq)]
302pub struct PatternAnalysisState {
303    pub allocation_history: VecDeque<PortAllocationEvent>,
304    pub detected_pattern: Option<PortAllocationPattern>,
305    pub confidence_level: f64,
306    pub prediction_accuracy: f64,
307}
308
309/// Port allocation event for pattern analysis
310#[derive(Debug, Clone, PartialEq)]
311pub struct PortAllocationEvent {
312    pub port: u16,
313    pub timestamp: Instant,
314    pub source_address: SocketAddr,
315}
316
317/// Detected port allocation pattern
318#[derive(Debug, Clone, PartialEq)]
319pub struct PortAllocationPattern {
320    pub pattern_type: AllocationPatternType,
321    pub base_port: u16,
322    pub stride: u16,
323    pub pool_boundaries: Option<(u16, u16)>,
324    pub confidence: f64,
325}
326
327/// Types of port allocation patterns
328#[derive(Debug, Clone, PartialEq, Eq)]
329pub enum AllocationPatternType {
330    /// Sequential allocation (port + 1, port + 2, ...)
331    Sequential,
332    /// Fixed stride allocation (port + N, port + 2N, ...)
333    FixedStride,
334    /// Random allocation within range
335    Random,
336    /// Pool-based allocation
337    PoolBased,
338    /// Time-based allocation
339    TimeBased,
340    /// Unknown/unpredictable pattern
341    Unknown,
342}
343
344/// Analysis of port allocation patterns for symmetric NAT prediction
345#[derive(Debug, Clone)]
346pub struct PortPatternAnalysis {
347    /// The detected pattern
348    pub pattern: PortAllocationPattern,
349    /// The increment between consecutive allocations
350    pub increment: Option<i32>,
351    /// Base port for calculations
352    pub base_port: u16,
353}
354
355/// Unique identifier for candidates
356#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
357pub struct CandidateId(pub u64);
358
359/// Result of candidate validation
360#[derive(Debug, Clone, PartialEq)]
361pub enum ValidationResult {
362    Valid { rtt: Duration },
363    Invalid { reason: String },
364    Timeout,
365    Pending,
366}
367
368/// Validated candidate with metadata
369#[derive(Debug, Clone, PartialEq)]
370pub struct ValidatedCandidate {
371    pub id: CandidateId,
372    pub address: SocketAddr,
373    pub source: DiscoverySourceType,
374    pub priority: u32,
375    pub rtt: Option<Duration>,
376    pub reliability_score: f64,
377}
378
379impl ValidatedCandidate {
380    /// Convert to CandidateAddress with proper NAT traversal source type
381    pub fn to_candidate_address(&self) -> CandidateAddress {
382        CandidateAddress {
383            address: self.address,
384            priority: self.priority,
385            source: convert_to_nat_source(self.source),
386            state: CandidateState::Valid,
387        }
388    }
389}
390
391/// Discovery session state tracking
392#[derive(Debug)]
393
394pub(crate) struct DiscoverySessionState {
395    pub peer_id: PeerId,
396    pub session_id: u64,
397    pub started_at: Instant,
398    pub discovered_candidates: Vec<DiscoveryCandidate>,
399    pub statistics: DiscoveryStatistics,
400    pub allocation_history: VecDeque<PortAllocationEvent>,
401}
402
403/// Discovery performance statistics
404#[derive(Debug, Default, Clone)]
405pub struct DiscoveryStatistics {
406    pub local_candidates_found: u32,
407    pub server_reflexive_candidates_found: u32,
408    pub predicted_candidates_generated: u32,
409    pub bootstrap_queries_sent: u32,
410    pub bootstrap_queries_successful: u32,
411    pub total_discovery_time: Option<Duration>,
412    pub average_bootstrap_rtt: Option<Duration>,
413    pub invalid_addresses_rejected: u32,
414}
415
416/// Errors that can occur during discovery
417#[derive(Debug, Clone, PartialEq, Eq)]
418pub enum DiscoveryError {
419    /// No local interfaces found
420    NoLocalInterfaces,
421    /// All bootstrap node queries failed
422    AllBootstrapsFailed,
423    /// Discovery timeout exceeded
424    DiscoveryTimeout,
425    /// Insufficient candidates discovered
426    InsufficientCandidates { found: usize, required: usize },
427    /// Platform-specific network error
428    NetworkError(String),
429    /// Configuration error
430    ConfigurationError(String),
431    /// Internal system error
432    InternalError(String),
433}
434
435/// Fallback strategies when discovery fails
436#[derive(Debug, Clone, PartialEq, Eq)]
437pub enum FallbackStrategy {
438    /// Use cached results from previous discovery
439    UseCachedResults,
440    /// Retry with relaxed parameters
441    RetryWithRelaxedParams,
442    /// Use minimal candidate set
443    UseMinimalCandidates,
444    /// Enable relay-based fallback
445    EnableRelayFallback,
446}
447
448impl Default for DiscoveryConfig {
449    fn default() -> Self {
450        Self {
451            total_timeout: Duration::from_secs(30),
452            local_scan_timeout: Duration::from_secs(2),
453            bootstrap_query_timeout: Duration::from_secs(5),
454            max_query_retries: 3,
455            max_candidates: 8,
456            enable_symmetric_prediction: true,
457            min_bootstrap_consensus: 2,
458            interface_cache_ttl: Duration::from_secs(60),
459            server_reflexive_cache_ttl: Duration::from_secs(300),
460            bound_address: None,
461        }
462    }
463}
464
465impl DiscoverySession {
466    /// Create a new discovery session for a peer
467    fn new(peer_id: PeerId, config: &DiscoveryConfig) -> Self {
468        Self {
469            peer_id,
470            session_id: rand::random(),
471            current_phase: DiscoveryPhase::Idle,
472            started_at: Instant::now(),
473            discovered_candidates: Vec::new(),
474            statistics: DiscoveryStatistics::default(),
475            allocation_history: VecDeque::new(),
476            server_reflexive_discovery: ServerReflexiveDiscovery::new(config),
477        }
478    }
479}
480
481impl CandidateDiscoveryManager {
482    /// Create a new candidate discovery manager
483    pub fn new(config: DiscoveryConfig) -> Self {
484        let interface_discovery =
485            Arc::new(std::sync::Mutex::new(create_platform_interface_discovery()));
486        let symmetric_predictor =
487            Arc::new(std::sync::Mutex::new(SymmetricNatPredictor::new(&config)));
488        let bootstrap_manager = Arc::new(BootstrapNodeManager::new(&config));
489        let cache = DiscoveryCache::new(&config);
490        let local_cache_duration = config.interface_cache_ttl;
491
492        Self {
493            config,
494            interface_discovery,
495            symmetric_predictor,
496            bootstrap_manager,
497            cache,
498            active_sessions: HashMap::new(),
499            cached_local_candidates: None,
500            local_cache_duration,
501            pending_validations: HashMap::new(),
502        }
503    }
504
505    /// Set the actual bound address of the local endpoint
506    pub fn set_bound_address(&mut self, address: SocketAddr) {
507        self.config.bound_address = Some(address);
508        // Clear cached local candidates to force refresh with new bound address
509        self.cached_local_candidates = None;
510    }
511
512    /// Discover local network interface candidates synchronously
513    pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
514        // Start interface scan
515        self.interface_discovery
516            .lock()
517            .unwrap()
518            .start_scan()
519            .map_err(|e| {
520                DiscoveryError::NetworkError(format!("Failed to start interface scan: {e}"))
521            })?;
522
523        // Poll until scan completes (this should be quick for local interfaces)
524        let start = Instant::now();
525        let timeout = Duration::from_secs(2);
526
527        loop {
528            if start.elapsed() > timeout {
529                return Err(DiscoveryError::DiscoveryTimeout);
530            }
531
532            if let Some(interfaces) = self
533                .interface_discovery
534                .lock()
535                .unwrap()
536                .check_scan_complete()
537            {
538                // Convert interfaces to candidates
539                let mut candidates = Vec::new();
540
541                for interface in interfaces {
542                    for addr in interface.addresses {
543                        candidates.push(ValidatedCandidate {
544                            id: CandidateId(rand::random()),
545                            address: addr,
546                            source: DiscoverySourceType::Local,
547                            priority: 50000, // High priority for local interfaces
548                            rtt: None,
549                            reliability_score: 1.0,
550                        });
551                    }
552                }
553
554                if candidates.is_empty() {
555                    return Err(DiscoveryError::NoLocalInterfaces);
556                }
557
558                return Ok(candidates);
559            }
560
561            // Small sleep to avoid busy waiting
562            std::thread::sleep(Duration::from_millis(10));
563        }
564    }
565
566    /// Start candidate discovery for a specific peer
567    pub fn start_discovery(
568        &mut self,
569        peer_id: PeerId,
570        _bootstrap_nodes: Vec<BootstrapNode>,
571    ) -> Result<(), DiscoveryError> {
572        // Check if session already exists for this peer
573        if self.active_sessions.contains_key(&peer_id) {
574            return Err(DiscoveryError::InternalError(format!(
575                "Discovery already in progress for peer {peer_id:?}"
576            )));
577        }
578
579        info!("Starting candidate discovery for peer {:?}", peer_id);
580
581        // Create new session
582        let mut session = DiscoverySession::new(peer_id, &self.config);
583
584        // Update bootstrap node manager (shared resource)
585        // Note: BootstrapNodeManager is immutable through Arc, updates would need internal mutability
586
587        // Start with local interface scanning
588        session.current_phase = DiscoveryPhase::LocalInterfaceScanning {
589            started_at: Instant::now(),
590        };
591
592        // Add session to active sessions
593        self.active_sessions.insert(peer_id, session);
594
595        Ok(())
596    }
597
598    /// Poll for discovery progress and state updates across all active sessions
599    pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
600        let mut all_events = Vec::new();
601        let mut completed_sessions = Vec::new();
602
603        // Since we need to poll sessions with self methods, we'll do it in phases
604        // First, check for local interface scanning completions
605        let mut local_scan_events = Vec::new();
606        for (peer_id, session) in &mut self.active_sessions {
607            if let DiscoveryPhase::LocalInterfaceScanning { started_at } = &session.current_phase {
608                // Handle timeouts
609                if started_at.elapsed() > self.config.local_scan_timeout {
610                    local_scan_events.push((
611                        *peer_id,
612                        DiscoveryEvent::LocalScanningCompleted {
613                            candidate_count: 0,
614                            duration: started_at.elapsed(),
615                        },
616                    ));
617                }
618            }
619        }
620
621        // Process local scan events
622        for (peer_id, event) in local_scan_events {
623            all_events.push(event);
624            if let Some(session) = self.active_sessions.get_mut(&peer_id) {
625                // Move to next phase
626                session.current_phase = DiscoveryPhase::Completed {
627                    final_candidates: session
628                        .discovered_candidates
629                        .iter()
630                        .map(|dc| ValidatedCandidate {
631                            id: CandidateId(0),
632                            address: dc.address,
633                            source: dc.source,
634                            priority: dc.priority,
635                            rtt: None,
636                            reliability_score: 1.0,
637                        })
638                        .collect(),
639                    completion_time: now,
640                };
641
642                all_events.push(DiscoveryEvent::DiscoveryCompleted {
643                    candidate_count: session.discovered_candidates.len(),
644                    total_duration: now.duration_since(session.started_at),
645                    success_rate: 1.0,
646                });
647
648                completed_sessions.push(peer_id);
649            }
650        }
651
652        // Remove completed sessions
653        for peer_id in completed_sessions {
654            self.active_sessions.remove(&peer_id);
655            debug!("Removed completed discovery session for peer {:?}", peer_id);
656        }
657
658        all_events
659    }
660
661    /// Get current discovery status
662    pub fn get_status(&self) -> DiscoveryStatus {
663        // Return a default status since we now manage multiple sessions
664        DiscoveryStatus {
665            phase: DiscoveryPhase::Idle,
666            discovered_candidates: Vec::new(),
667            statistics: DiscoveryStatistics::default(),
668            elapsed_time: Duration::from_secs(0),
669        }
670    }
671
672    /// Check if discovery is complete
673    pub fn is_complete(&self) -> bool {
674        // All sessions must be complete
675        self.active_sessions.values().all(|session| {
676            matches!(
677                session.current_phase,
678                DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. }
679            )
680        })
681    }
682
683    /// Get final discovery results
684    pub fn get_results(&self) -> Option<DiscoveryResults> {
685        // Return results from all completed sessions
686        if self.active_sessions.is_empty() {
687            return None;
688        }
689
690        // Aggregate results from all sessions
691        let mut all_candidates = Vec::new();
692        let mut latest_completion = Instant::now();
693        let mut combined_stats = DiscoveryStatistics::default();
694
695        for session in self.active_sessions.values() {
696            match &session.current_phase {
697                DiscoveryPhase::Completed {
698                    final_candidates,
699                    completion_time,
700                } => {
701                    // Add candidates from this session
702                    all_candidates.extend(final_candidates.clone());
703                    latest_completion = *completion_time;
704                    // Combine statistics
705                    combined_stats.local_candidates_found +=
706                        session.statistics.local_candidates_found;
707                    combined_stats.server_reflexive_candidates_found +=
708                        session.statistics.server_reflexive_candidates_found;
709                    combined_stats.predicted_candidates_generated +=
710                        session.statistics.predicted_candidates_generated;
711                    combined_stats.bootstrap_queries_sent +=
712                        session.statistics.bootstrap_queries_sent;
713                    combined_stats.bootstrap_queries_successful +=
714                        session.statistics.bootstrap_queries_successful;
715                }
716                DiscoveryPhase::Failed { .. } => {
717                    // Include any partial results from failed sessions
718                    // Convert DiscoveryCandidate to ValidatedCandidate
719                    let validated: Vec<ValidatedCandidate> = session
720                        .discovered_candidates
721                        .iter()
722                        .enumerate()
723                        .map(|(idx, dc)| ValidatedCandidate {
724                            id: CandidateId(idx as u64),
725                            address: dc.address,
726                            source: dc.source,
727                            priority: dc.priority,
728                            rtt: None,
729                            reliability_score: 0.5, // Default score for failed sessions
730                        })
731                        .collect();
732                    all_candidates.extend(validated);
733                }
734                _ => {}
735            }
736        }
737
738        if all_candidates.is_empty() {
739            None
740        } else {
741            Some(DiscoveryResults {
742                candidates: all_candidates,
743                completion_time: latest_completion,
744                statistics: combined_stats,
745            })
746        }
747    }
748
749    /// Get all discovered candidates for a specific peer
750    pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
751        // Look up the specific session for this peer
752        if let Some(session) = self.active_sessions.get(&peer_id) {
753            // Return all discovered candidates converted to CandidateAddress
754            session
755                .discovered_candidates
756                .iter()
757                .map(|c| c.to_candidate_address())
758                .collect()
759        } else {
760            // No active session for this peer
761            debug!("No active discovery session found for peer {:?}", peer_id);
762            Vec::new()
763        }
764    }
765
766    // Private implementation methods
767
768    fn poll_session_local_scanning(
769        &mut self,
770        session: &mut DiscoverySession,
771        started_at: Instant,
772        now: Instant,
773        events: &mut Vec<DiscoveryEvent>,
774    ) {
775        // Check if we have cached local candidates
776        if let Some((cache_time, ref cached_candidates)) = self.cached_local_candidates {
777            if cache_time.elapsed() < self.local_cache_duration {
778                // Use cached candidates
779                debug!(
780                    "Using cached local candidates for peer {:?}",
781                    session.peer_id
782                );
783                self.process_cached_local_candidates(
784                    session,
785                    cached_candidates.clone(),
786                    events,
787                    now,
788                );
789                return;
790            }
791        }
792
793        // Start the scan if not already started
794        // We check if the scan is at the very beginning (within first 10ms) to avoid repeated start_scan calls
795        if started_at.elapsed().as_millis() < 10 {
796            let scan_result = self.interface_discovery.lock().unwrap().start_scan();
797            match scan_result {
798                Ok(()) => {
799                    debug!(
800                        "Started local interface scan for peer {:?}",
801                        session.peer_id
802                    );
803                    events.push(DiscoveryEvent::LocalScanningStarted);
804                }
805                Err(e) => {
806                    error!("Failed to start interface scan: {}", e);
807                    self.handle_session_local_scan_timeout(session, events, now);
808                    return;
809                }
810            }
811        }
812
813        // Check for timeout
814        if started_at.elapsed() > self.config.local_scan_timeout {
815            warn!(
816                "Local interface scanning timeout for peer {:?}",
817                session.peer_id
818            );
819            self.handle_session_local_scan_timeout(session, events, now);
820            return;
821        }
822
823        // Check if scanning is complete
824        let scan_complete_result = self
825            .interface_discovery
826            .lock()
827            .unwrap()
828            .check_scan_complete();
829        if let Some(interfaces) = scan_complete_result {
830            self.process_session_local_interfaces(session, interfaces, events, now);
831        }
832    }
833
834    fn process_session_local_interfaces(
835        &mut self,
836        session: &mut DiscoverySession,
837        interfaces: Vec<NetworkInterface>,
838        events: &mut Vec<DiscoveryEvent>,
839        now: Instant,
840    ) {
841        debug!(
842            "Processing {} network interfaces for peer {:?}",
843            interfaces.len(),
844            session.peer_id
845        );
846
847        let mut validated_candidates = Vec::new();
848
849        // First, add the bound address if available
850        if let Some(bound_addr) = self.config.bound_address {
851            if self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback() {
852                let candidate = DiscoveryCandidate {
853                    address: bound_addr,
854                    priority: 60000, // High priority for the actual bound address
855                    source: DiscoverySourceType::Local,
856                    state: CandidateState::New,
857                };
858
859                session.discovered_candidates.push(candidate.clone());
860                session.statistics.local_candidates_found += 1;
861
862                // Create validated candidate for caching
863                validated_candidates.push(ValidatedCandidate {
864                    id: CandidateId(rand::random()),
865                    address: bound_addr,
866                    source: DiscoverySourceType::Local,
867                    priority: candidate.priority,
868                    rtt: None,
869                    reliability_score: 1.0,
870                });
871
872                events.push(DiscoveryEvent::LocalCandidateDiscovered {
873                    candidate: candidate.to_candidate_address(),
874                });
875
876                debug!(
877                    "Added bound address {} as local candidate for peer {:?}",
878                    bound_addr, session.peer_id
879                );
880            }
881        }
882
883        // Then process discovered interfaces
884        for interface in &interfaces {
885            for address in &interface.addresses {
886                // Skip if this is the same as the bound address
887                if Some(*address) == self.config.bound_address {
888                    continue;
889                }
890
891                if self.is_valid_local_address(address) {
892                    let candidate = DiscoveryCandidate {
893                        address: *address,
894                        priority: self.calculate_local_priority(address, interface),
895                        source: DiscoverySourceType::Local,
896                        state: CandidateState::New,
897                    };
898
899                    session.discovered_candidates.push(candidate.clone());
900                    session.statistics.local_candidates_found += 1;
901
902                    // Create validated candidate for caching
903                    validated_candidates.push(ValidatedCandidate {
904                        id: CandidateId(rand::random()),
905                        address: *address,
906                        source: DiscoverySourceType::Local,
907                        priority: candidate.priority,
908                        rtt: None,
909                        reliability_score: 1.0,
910                    });
911
912                    events.push(DiscoveryEvent::LocalCandidateDiscovered {
913                        candidate: candidate.to_candidate_address(),
914                    });
915                }
916            }
917        }
918
919        // Cache the local candidates for other sessions
920        self.cached_local_candidates = Some((now, validated_candidates));
921
922        events.push(DiscoveryEvent::LocalScanningCompleted {
923            candidate_count: session.statistics.local_candidates_found as usize,
924            duration: now.duration_since(session.started_at),
925        });
926
927        // Transition to server reflexive discovery
928        self.start_session_server_reflexive_discovery(session, events, now);
929    }
930
931    fn process_cached_local_candidates(
932        &mut self,
933        session: &mut DiscoverySession,
934        mut cached_candidates: Vec<ValidatedCandidate>,
935        events: &mut Vec<DiscoveryEvent>,
936        now: Instant,
937    ) {
938        // If we have a bound address, ensure it's included in the candidates
939        if let Some(bound_addr) = self.config.bound_address {
940            let has_bound_addr = cached_candidates.iter().any(|c| c.address == bound_addr);
941            if !has_bound_addr
942                && (self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback())
943            {
944                cached_candidates.insert(
945                    0,
946                    ValidatedCandidate {
947                        id: CandidateId(rand::random()),
948                        address: bound_addr,
949                        source: DiscoverySourceType::Local,
950                        priority: 60000, // High priority for the actual bound address
951                        rtt: None,
952                        reliability_score: 1.0,
953                    },
954                );
955            }
956        }
957
958        debug!(
959            "Using {} cached local candidates for peer {:?}",
960            cached_candidates.len(),
961            session.peer_id
962        );
963
964        for validated in cached_candidates {
965            let candidate = DiscoveryCandidate {
966                address: validated.address,
967                priority: validated.priority,
968                source: validated.source,
969                state: CandidateState::New,
970            };
971
972            session.discovered_candidates.push(candidate.clone());
973            session.statistics.local_candidates_found += 1;
974
975            events.push(DiscoveryEvent::LocalCandidateDiscovered {
976                candidate: candidate.to_candidate_address(),
977            });
978        }
979
980        events.push(DiscoveryEvent::LocalScanningCompleted {
981            candidate_count: session.statistics.local_candidates_found as usize,
982            duration: now.duration_since(session.started_at),
983        });
984
985        // Transition to server reflexive discovery
986        self.start_session_server_reflexive_discovery(session, events, now);
987    }
988
989    fn start_session_server_reflexive_discovery(
990        &mut self,
991        session: &mut DiscoverySession,
992        events: &mut Vec<DiscoveryEvent>,
993        now: Instant,
994    ) {
995        // Check if we already have QUIC-discovered addresses (server reflexive)
996        let has_quic_discovered = session
997            .discovered_candidates
998            .iter()
999            .any(|c| c.source == DiscoverySourceType::ServerReflexive);
1000
1001        if has_quic_discovered {
1002            info!(
1003                "Skipping server reflexive discovery for peer {:?}, using QUIC-discovered addresses",
1004                session.peer_id
1005            );
1006            // Complete discovery with existing candidates
1007            self.complete_session_discovery_with_local_candidates(session, events, now);
1008            return;
1009        }
1010
1011        let bootstrap_node_ids = self.bootstrap_manager.get_active_bootstrap_nodes();
1012
1013        if bootstrap_node_ids.is_empty() {
1014            info!(
1015                "No bootstrap nodes available for server reflexive discovery for peer {:?}, completing with local candidates only",
1016                session.peer_id
1017            );
1018            // For bootstrap nodes or nodes without bootstrap servers, complete discovery with local candidates
1019            self.complete_session_discovery_with_local_candidates(session, events, now);
1020            return;
1021        }
1022
1023        // Get bootstrap node addresses for real QUIC communication
1024        let bootstrap_nodes_with_addresses: Vec<(BootstrapNodeId, SocketAddr)> = bootstrap_node_ids
1025            .iter()
1026            .filter_map(|&node_id| {
1027                self.bootstrap_manager
1028                    .get_bootstrap_address(node_id)
1029                    .map(|addr| (node_id, addr))
1030            })
1031            .collect();
1032
1033        if bootstrap_nodes_with_addresses.is_empty() {
1034            warn!("No bootstrap node addresses available for server reflexive discovery");
1035            // Complete discovery with just local candidates
1036            self.complete_session_discovery_with_local_candidates(session, events, now);
1037            return;
1038        }
1039
1040        // Use the enhanced method that includes addresses for real QUIC communication
1041        let active_queries = session
1042            .server_reflexive_discovery
1043            .start_queries_with_addresses(&bootstrap_nodes_with_addresses, now);
1044
1045        events.push(DiscoveryEvent::ServerReflexiveDiscoveryStarted {
1046            bootstrap_count: bootstrap_nodes_with_addresses.len(),
1047        });
1048
1049        session.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
1050            started_at: now,
1051            active_queries,
1052            responses_received: Vec::new(),
1053        };
1054    }
1055
1056    fn process_server_reflexive_response_for_session(
1057        &mut self,
1058        session: &mut DiscoverySession,
1059        response: &ServerReflexiveResponse,
1060        events: &mut Vec<DiscoveryEvent>,
1061    ) {
1062        debug!("Received server reflexive response: {:?}", response);
1063
1064        // Validate the server reflexive address
1065        if !self.is_valid_server_reflexive_address(&response.observed_address) {
1066            warn!(
1067                "Ignoring invalid server reflexive address {} from bootstrap node",
1068                response.observed_address
1069            );
1070            session.statistics.invalid_addresses_rejected += 1;
1071            return;
1072        }
1073
1074        // Additional validation: check if response time is reasonable
1075        if response.response_time > Duration::from_secs(10) {
1076            warn!(
1077                "Ignoring server reflexive response with excessive delay: {:?}",
1078                response.response_time
1079            );
1080            return;
1081        }
1082
1083        // Record port allocation event for pattern analysis
1084        let allocation_event = PortAllocationEvent {
1085            port: response.observed_address.port(),
1086            timestamp: response.timestamp,
1087            source_address: response.observed_address,
1088        };
1089
1090        // Add to allocation history for pattern analysis
1091        if let DiscoveryPhase::ServerReflexiveQuerying { .. } = &mut session.current_phase {
1092            // We'll need to track allocation history in session state
1093            // For now, update session state to track this information
1094            session
1095                .allocation_history
1096                .push_back(allocation_event.clone());
1097
1098            // Keep only recent allocations (last 20) to avoid unbounded growth
1099            if session.allocation_history.len() > 20 {
1100                session.allocation_history.pop_front();
1101            }
1102        }
1103
1104        let candidate = DiscoveryCandidate {
1105            address: response.observed_address,
1106            priority: self.calculate_server_reflexive_priority(response),
1107            source: DiscoverySourceType::ServerReflexive,
1108            state: CandidateState::New,
1109        };
1110
1111        session.discovered_candidates.push(candidate.clone());
1112        session.statistics.server_reflexive_candidates_found += 1;
1113
1114        events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1115            candidate: candidate.to_candidate_address(),
1116            bootstrap_node: self
1117                .bootstrap_manager
1118                .get_bootstrap_address(response.bootstrap_node)
1119                .unwrap_or_else(|| "unknown".parse().unwrap()),
1120        });
1121
1122        events.push(DiscoveryEvent::PortAllocationDetected {
1123            port: allocation_event.port,
1124            source_address: allocation_event.source_address,
1125            bootstrap_node: response.bootstrap_node,
1126            timestamp: allocation_event.timestamp,
1127        });
1128    }
1129
1130    fn start_session_symmetric_prediction(
1131        &mut self,
1132        session: &mut DiscoverySession,
1133        responses: &[ServerReflexiveResponse],
1134        events: &mut Vec<DiscoveryEvent>,
1135        now: Instant,
1136    ) {
1137        if !self.config.enable_symmetric_prediction || responses.is_empty() {
1138            // Skip symmetric prediction and complete with discovered candidates
1139            self.complete_session_discovery_with_local_candidates(session, events, now);
1140            return;
1141        }
1142
1143        // Use consensus address as base for prediction
1144        let base_address = self.calculate_consensus_address(responses);
1145
1146        events.push(DiscoveryEvent::SymmetricPredictionStarted { base_address });
1147
1148        // Analyze allocation patterns from collected history
1149        let detected_pattern = self
1150            .symmetric_predictor
1151            .lock()
1152            .unwrap()
1153            .analyze_allocation_patterns(&session.allocation_history);
1154
1155        let confidence_level = detected_pattern
1156            .as_ref()
1157            .map(|p| p.confidence)
1158            .unwrap_or(0.0);
1159
1160        // Calculate prediction accuracy based on pattern consistency
1161        let prediction_accuracy = if let Some(ref pattern) = detected_pattern {
1162            self.calculate_prediction_accuracy(pattern, &session.allocation_history)
1163        } else {
1164            0.3 // Default accuracy for heuristic predictions
1165        };
1166
1167        debug!(
1168            "Symmetric NAT pattern analysis: detected_pattern={:?}, confidence={:.2}, accuracy={:.2}",
1169            detected_pattern, confidence_level, prediction_accuracy
1170        );
1171
1172        session.current_phase = DiscoveryPhase::SymmetricNatPrediction {
1173            started_at: now,
1174            prediction_attempts: 0,
1175            pattern_analysis: PatternAnalysisState {
1176                allocation_history: session.allocation_history.clone(),
1177                detected_pattern,
1178                confidence_level,
1179                prediction_accuracy,
1180            },
1181        };
1182    }
1183
1184    fn start_session_candidate_validation(
1185        &mut self,
1186        session: &mut DiscoverySession,
1187        _events: &mut Vec<DiscoveryEvent>,
1188        now: Instant,
1189    ) {
1190        debug!(
1191            "Starting candidate validation for {} candidates",
1192            session.discovered_candidates.len()
1193        );
1194
1195        session.current_phase = DiscoveryPhase::CandidateValidation {
1196            started_at: now,
1197            validation_results: HashMap::new(),
1198        };
1199    }
1200
1201    /// Start real QUIC PATH_CHALLENGE/PATH_RESPONSE validation for a candidate
1202    fn start_path_validation(
1203        &mut self,
1204        candidate_id: CandidateId,
1205        candidate_address: SocketAddr,
1206        now: Instant,
1207        events: &mut Vec<DiscoveryEvent>,
1208    ) {
1209        debug!(
1210            "Starting QUIC path validation for candidate {} at {}",
1211            candidate_id.0, candidate_address
1212        );
1213
1214        // Generate a random challenge token
1215        let challenge_token: u64 = rand::random();
1216
1217        // Store the validation state
1218        self.pending_validations.insert(
1219            candidate_id,
1220            PendingValidation {
1221                candidate_address,
1222                challenge_token,
1223                started_at: now,
1224                attempts: 1,
1225            },
1226        );
1227
1228        // Add event to trigger PATH_CHALLENGE sending
1229        events.push(DiscoveryEvent::PathValidationRequested {
1230            candidate_id,
1231            candidate_address,
1232            challenge_token,
1233        });
1234
1235        debug!(
1236            "PATH_CHALLENGE {:08x} requested for candidate {} at {}",
1237            challenge_token, candidate_id.0, candidate_address
1238        );
1239    }
1240
1241    /// Handle PATH_RESPONSE received for a candidate
1242    pub fn handle_path_response(
1243        &mut self,
1244        candidate_address: SocketAddr,
1245        challenge_token: u64,
1246        now: Instant,
1247    ) -> Option<DiscoveryEvent> {
1248        // Find the matching pending validation
1249        let candidate_id = self
1250            .pending_validations
1251            .iter()
1252            .find(|(_, validation)| {
1253                validation.candidate_address == candidate_address
1254                    && validation.challenge_token == challenge_token
1255            })
1256            .map(|(id, _)| *id)?;
1257
1258        // Remove from pending and calculate RTT
1259        let validation = self.pending_validations.remove(&candidate_id)?;
1260        let rtt = now.duration_since(validation.started_at);
1261
1262        debug!(
1263            "PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1264            candidate_id.0, candidate_address, rtt
1265        );
1266
1267        // Update the candidate in the appropriate session
1268        for session in self.active_sessions.values_mut() {
1269            if let Some(candidate) = session
1270                .discovered_candidates
1271                .iter_mut()
1272                .find(|c| c.address == candidate_address)
1273            {
1274                candidate.state = CandidateState::Valid;
1275                // Store RTT information if needed in the future
1276                break;
1277            }
1278        }
1279
1280        Some(DiscoveryEvent::PathValidationResponse {
1281            candidate_id,
1282            candidate_address,
1283            challenge_token,
1284            rtt,
1285        })
1286    }
1287
1288    /// Simulate path validation for development/testing
1289    fn simulate_path_validation(
1290        &mut self,
1291        candidate_id: CandidateId,
1292        candidate_address: SocketAddr,
1293        _now: Instant,
1294    ) {
1295        // Simulate different validation outcomes based on address characteristics
1296        let is_local = candidate_address.ip().is_loopback()
1297            || (candidate_address.ip().is_ipv4()
1298                && candidate_address.ip().to_string().starts_with("192.168."))
1299            || (candidate_address.ip().is_ipv4()
1300                && candidate_address.ip().to_string().starts_with("10."))
1301            || (candidate_address.ip().is_ipv4()
1302                && candidate_address.ip().to_string().starts_with("172."));
1303
1304        let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
1305
1306        // Store validation result for later retrieval
1307        // In a real implementation, this would be stored in a validation state tracker
1308        debug!(
1309            "Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
1310            candidate_id.0, candidate_address, is_local, is_server_reflexive
1311        );
1312    }
1313
1314    /// Simulate validation result based on address characteristics
1315    fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
1316        let is_local = address.ip().is_loopback()
1317            || (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168."))
1318            || (address.ip().is_ipv4() && address.ip().to_string().starts_with("10."))
1319            || (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
1320
1321        if is_local {
1322            // Local addresses typically validate quickly
1323            ValidationResult::Valid {
1324                rtt: Duration::from_millis(1),
1325            }
1326        } else if address.ip().is_unspecified() {
1327            // Unspecified addresses are invalid
1328            ValidationResult::Invalid {
1329                reason: "Unspecified address".to_string(),
1330            }
1331        } else {
1332            // Server reflexive addresses have higher RTT
1333            ValidationResult::Valid {
1334                rtt: Duration::from_millis(50 + (address.port() % 100) as u64),
1335            }
1336        }
1337    }
1338
1339    /// Calculate reliability score for a validated candidate
1340    fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1341        let mut score: f64 = 0.5; // Base score
1342
1343        // Adjust based on source type
1344        match candidate.source {
1345            DiscoverySourceType::Local => score += 0.3, // Local addresses are more reliable
1346            DiscoverySourceType::ServerReflexive => score += 0.2, // Server reflexive are good
1347            DiscoverySourceType::Predicted => score += 0.1, // Predicted are less certain
1348        }
1349
1350        // Adjust based on RTT (lower RTT = higher reliability)
1351        let rtt_ms = rtt.as_millis() as f64;
1352        if rtt_ms < 10.0 {
1353            score += 0.2;
1354        } else if rtt_ms < 50.0 {
1355            score += 0.1;
1356        } else if rtt_ms > 200.0 {
1357            score -= 0.1;
1358        }
1359
1360        // Adjust based on address type
1361        if candidate.address.ip().is_ipv6() {
1362            score += 0.05; // Slight preference for IPv6
1363        }
1364
1365        // Ensure score is in valid range [0.0, 1.0]
1366        score.max(0.0).min(1.0)
1367    }
1368
1369    // Helper methods
1370
1371    fn handle_session_timeout(
1372        &mut self,
1373        session: &mut DiscoverySession,
1374        events: &mut Vec<DiscoveryEvent>,
1375        now: Instant,
1376    ) {
1377        let error = DiscoveryError::DiscoveryTimeout;
1378        let partial_results = session
1379            .discovered_candidates
1380            .iter()
1381            .map(|c| c.to_candidate_address())
1382            .collect();
1383
1384        warn!(
1385            "Discovery failed for peer {:?}: discovery process timed out (found {} partial candidates)",
1386            session.peer_id,
1387            session.discovered_candidates.len()
1388        );
1389        events.push(DiscoveryEvent::DiscoveryFailed {
1390            error: error.clone(),
1391            partial_results,
1392        });
1393
1394        session.current_phase = DiscoveryPhase::Failed {
1395            error,
1396            failed_at: now,
1397            fallback_options: vec![FallbackStrategy::UseCachedResults],
1398        };
1399    }
1400
1401    fn handle_session_local_scan_timeout(
1402        &mut self,
1403        session: &mut DiscoverySession,
1404        events: &mut Vec<DiscoveryEvent>,
1405        now: Instant,
1406    ) {
1407        warn!(
1408            "Local interface scan timeout for peer {:?}, proceeding with available candidates",
1409            session.peer_id
1410        );
1411
1412        events.push(DiscoveryEvent::LocalScanningCompleted {
1413            candidate_count: session.statistics.local_candidates_found as usize,
1414            duration: now.duration_since(session.started_at),
1415        });
1416
1417        self.start_session_server_reflexive_discovery(session, events, now);
1418    }
1419
1420    fn poll_session_server_reflexive(
1421        &mut self,
1422        session: &mut DiscoverySession,
1423        _started_at: Instant,
1424        _active_queries: &HashMap<BootstrapNodeId, QueryState>,
1425        _responses_received: &[(BootstrapNodeId, ServerReflexiveResponse)],
1426        now: Instant,
1427        events: &mut Vec<DiscoveryEvent>,
1428    ) {
1429        // Check if we already have QUIC-discovered addresses
1430        let has_quic_discovered = session
1431            .discovered_candidates
1432            .iter()
1433            .any(|c| c.source == DiscoverySourceType::ServerReflexive);
1434
1435        if has_quic_discovered {
1436            // Complete discovery immediately with QUIC-discovered addresses
1437            self.complete_session_discovery_with_local_candidates(session, events, now);
1438            return;
1439        }
1440
1441        // TODO: Implement server reflexive polling for session
1442        // For now, transition to completion
1443        self.complete_session_discovery_with_local_candidates(session, events, now);
1444    }
1445
1446    fn poll_session_symmetric_prediction(
1447        &mut self,
1448        session: &mut DiscoverySession,
1449        _started_at: Instant,
1450        _prediction_attempts: u32,
1451        _pattern_analysis: &PatternAnalysisState,
1452        now: Instant,
1453        events: &mut Vec<DiscoveryEvent>,
1454    ) {
1455        // TODO: Implement symmetric NAT prediction for session
1456        // For now, skip to completion
1457        self.complete_session_discovery_with_local_candidates(session, events, now);
1458    }
1459
1460    fn poll_session_candidate_validation(
1461        &mut self,
1462        session: &mut DiscoverySession,
1463        _started_at: Instant,
1464        _validation_results: &HashMap<CandidateId, ValidationResult>,
1465        now: Instant,
1466        events: &mut Vec<DiscoveryEvent>,
1467    ) {
1468        // TODO: Implement candidate validation for session
1469        // For now, complete discovery
1470        self.complete_session_discovery_with_local_candidates(session, events, now);
1471    }
1472
1473    fn complete_session_discovery_with_local_candidates(
1474        &mut self,
1475        session: &mut DiscoverySession,
1476        events: &mut Vec<DiscoveryEvent>,
1477        now: Instant,
1478    ) {
1479        // Calculate statistics
1480        let duration = now.duration_since(session.started_at);
1481        session.statistics.total_discovery_time = Some(duration);
1482
1483        let success_rate = if session.statistics.local_candidates_found > 0 {
1484            1.0
1485        } else {
1486            0.0
1487        };
1488
1489        // Convert discovered candidates to ValidatedCandidate format
1490        let validated_candidates: Vec<ValidatedCandidate> = session
1491            .discovered_candidates
1492            .iter()
1493            .map(|dc| ValidatedCandidate {
1494                id: CandidateId(rand::random()),
1495                address: dc.address,
1496                source: dc.source,
1497                priority: dc.priority,
1498                rtt: None,
1499                reliability_score: 1.0,
1500            })
1501            .collect();
1502
1503        events.push(DiscoveryEvent::DiscoveryCompleted {
1504            candidate_count: validated_candidates.len(),
1505            total_duration: duration,
1506            success_rate,
1507        });
1508
1509        session.current_phase = DiscoveryPhase::Completed {
1510            final_candidates: validated_candidates,
1511            completion_time: now,
1512        };
1513
1514        info!(
1515            "Discovery completed with {} local candidates for peer {:?}",
1516            session.discovered_candidates.len(),
1517            session.peer_id
1518        );
1519    }
1520
1521    fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1522        // Use the enhanced validation from CandidateAddress
1523        use crate::nat_traversal_api::CandidateAddress;
1524
1525        if let Err(e) = CandidateAddress::validate_address(address) {
1526            debug!("Address {} failed validation: {}", address, e);
1527            return false;
1528        }
1529
1530        match address.ip() {
1531            IpAddr::V4(ipv4) => {
1532                // For testing, allow loopback addresses
1533                #[cfg(test)]
1534                if ipv4.is_loopback() {
1535                    return true;
1536                }
1537                // For local addresses, we want actual interface addresses
1538                // Allow private addresses (RFC1918)
1539                !ipv4.is_loopback()
1540                    && !ipv4.is_unspecified()
1541                    && !ipv4.is_broadcast()
1542                    && !ipv4.is_multicast()
1543                    && !ipv4.is_documentation()
1544            }
1545            IpAddr::V6(ipv6) => {
1546                // For testing, allow loopback addresses
1547                #[cfg(test)]
1548                if ipv6.is_loopback() {
1549                    return true;
1550                }
1551                // For IPv6, accept most addresses except special ones
1552                let segments = ipv6.segments();
1553                let is_documentation = segments[0] == 0x2001 && segments[1] == 0x0db8;
1554
1555                !ipv6.is_loopback()
1556                    && !ipv6.is_unspecified()
1557                    && !ipv6.is_multicast()
1558                    && !is_documentation
1559            }
1560        }
1561    }
1562
1563    fn is_valid_server_reflexive_address(&self, address: &SocketAddr) -> bool {
1564        use crate::nat_traversal_api::CandidateAddress;
1565
1566        // First, use the standard validation
1567        if let Err(e) = CandidateAddress::validate_address(address) {
1568            debug!(
1569                "Server reflexive address {} failed validation: {}",
1570                address, e
1571            );
1572            return false;
1573        }
1574
1575        // Additional checks for server reflexive addresses
1576        match address.ip() {
1577            IpAddr::V4(ipv4) => {
1578                // Server reflexive addresses should be public
1579                // They should not be private (RFC1918) addresses
1580                !ipv4.is_private()
1581                    && !ipv4.is_loopback()
1582                    && !ipv4.is_link_local()
1583                    && !ipv4.is_documentation()
1584                    && !ipv4.is_unspecified()
1585                    && !ipv4.is_broadcast()
1586                    && !ipv4.is_multicast()
1587            }
1588            IpAddr::V6(ipv6) => {
1589                // For IPv6, we expect global unicast addresses
1590                let segments = ipv6.segments();
1591                let is_global_unicast = (segments[0] & 0xE000) == 0x2000;
1592                let is_link_local = (segments[0] & 0xffc0) == 0xfe80;
1593                let is_unique_local = (segments[0] & 0xfe00) == 0xfc00;
1594
1595                // Server reflexive should be global unicast
1596                is_global_unicast
1597                    && !ipv6.is_loopback()
1598                    && !ipv6.is_unspecified()
1599                    && !ipv6.is_multicast()
1600                    && !is_link_local
1601                    && !is_unique_local
1602            }
1603        }
1604    }
1605
1606    fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1607        let mut priority = 100; // Base priority
1608
1609        match address.ip() {
1610            IpAddr::V4(ipv4) => {
1611                if ipv4.is_private() {
1612                    priority += 50; // Prefer private addresses for local networks
1613                }
1614            }
1615            IpAddr::V6(ipv6) => {
1616                // IPv6 priority based on address type
1617                // Global unicast: 2000::/3 (not link-local, not unique local)
1618                if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1619                    let segments = ipv6.segments();
1620                    if segments[0] & 0xE000 == 0x2000 {
1621                        // Global unicast IPv6 (2000::/3)
1622                        priority += 60;
1623                    } else if segments[0] & 0xFFC0 == 0xFE80 {
1624                        // Link-local IPv6 (fe80::/10)
1625                        priority += 20;
1626                    } else if segments[0] & 0xFE00 == 0xFC00 {
1627                        // Unique local IPv6 (fc00::/7)
1628                        priority += 40;
1629                    } else {
1630                        // Other IPv6 addresses
1631                        priority += 30;
1632                    }
1633                }
1634
1635                // Prefer IPv6 for better NAT traversal potential
1636                priority += 10; // Small boost for IPv6 overall
1637            }
1638        }
1639
1640        if interface.is_wireless {
1641            priority -= 10; // Slight penalty for wireless
1642        }
1643
1644        priority
1645    }
1646
1647    fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1648        let mut priority = 200; // Base priority for server reflexive
1649
1650        // Adjust based on response time
1651        if response.response_time < Duration::from_millis(50) {
1652            priority += 20;
1653        } else if response.response_time > Duration::from_millis(200) {
1654            priority -= 10;
1655        }
1656
1657        // Adjust based on response timestamp (more recent is better)
1658        let age_bonus = if response.timestamp.elapsed().as_secs() < 60 {
1659            20
1660        } else {
1661            0
1662        };
1663        priority += age_bonus;
1664
1665        priority
1666    }
1667
1668    fn should_transition_to_prediction(
1669        &self,
1670        responses: &[ServerReflexiveResponse],
1671        _now: Instant,
1672    ) -> bool {
1673        responses.len() >= self.config.min_bootstrap_consensus.max(1)
1674    }
1675
1676    fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1677        // Simple majority consensus - in practice, would use more sophisticated algorithm
1678        let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1679
1680        for response in responses {
1681            *address_counts.entry(response.observed_address).or_insert(0) += 1;
1682        }
1683
1684        address_counts
1685            .into_iter()
1686            .max_by_key(|(_, count)| *count)
1687            .map(|(addr, _)| addr)
1688            .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
1689    }
1690
1691    /// Calculate the accuracy of predictions based on pattern consistency
1692    fn calculate_prediction_accuracy(
1693        &self,
1694        pattern: &PortAllocationPattern,
1695        history: &VecDeque<PortAllocationEvent>,
1696    ) -> f64 {
1697        if history.len() < 3 {
1698            return 0.3; // Low accuracy for insufficient data
1699        }
1700
1701        // Calculate how well the pattern explains the observed allocations
1702        let recent_ports: Vec<u16> = history
1703            .iter()
1704            .rev()
1705            .take(10)
1706            .map(|event| event.port)
1707            .collect();
1708
1709        let mut correct_predictions = 0;
1710        let total_predictions = recent_ports.len().saturating_sub(1);
1711
1712        if total_predictions == 0 {
1713            return 0.3;
1714        }
1715
1716        match pattern.pattern_type {
1717            AllocationPatternType::Sequential => {
1718                // Check how many consecutive pairs follow sequential pattern
1719                for i in 1..recent_ports.len() {
1720                    if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == 1 {
1721                        correct_predictions += 1;
1722                    }
1723                }
1724            }
1725            AllocationPatternType::FixedStride => {
1726                // Check how many consecutive pairs follow the stride pattern
1727                for i in 1..recent_ports.len() {
1728                    if recent_ports[i - 1].wrapping_sub(recent_ports[i]) == pattern.stride {
1729                        correct_predictions += 1;
1730                    }
1731                }
1732            }
1733            AllocationPatternType::PoolBased => {
1734                // Check how many ports fall within the detected pool
1735                if let Some((min_port, max_port)) = pattern.pool_boundaries {
1736                    for port in &recent_ports {
1737                        if *port >= min_port && *port <= max_port {
1738                            correct_predictions += 1;
1739                        }
1740                    }
1741                }
1742            }
1743            AllocationPatternType::Random | AllocationPatternType::Unknown => {
1744                // For random patterns, use statistical variance
1745                if recent_ports.len() >= 3 {
1746                    let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>()
1747                        / recent_ports.len() as f64;
1748                    let variance = recent_ports
1749                        .iter()
1750                        .map(|&p| (p as f64 - mean).powi(2))
1751                        .sum::<f64>()
1752                        / recent_ports.len() as f64;
1753
1754                    // Higher variance suggests more randomness, lower accuracy
1755                    let normalized_variance = (variance / 10000.0).min(1.0); // Normalize to [0, 1]
1756                    return 0.2 + (1.0 - normalized_variance) * 0.3; // Range [0.2, 0.5]
1757                }
1758            }
1759            AllocationPatternType::TimeBased => {
1760                // For time-based patterns, check timing consistency
1761                if history.len() >= 2 {
1762                    let time_diffs: Vec<Duration> = history
1763                        .iter()
1764                        .collect::<Vec<_>>()
1765                        .windows(2)
1766                        .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1767                        .collect();
1768
1769                    if !time_diffs.is_empty() {
1770                        let avg_diff =
1771                            time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1772                        let variance = time_diffs
1773                            .iter()
1774                            .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1775                            .sum::<f64>()
1776                            / time_diffs.len() as f64;
1777
1778                        // Lower timing variance suggests more consistent time-based allocation
1779                        let normalized_variance = (variance / 1000.0).min(1.0); // Normalize
1780                        return 0.3 + (1.0 - normalized_variance) * 0.4; // Range [0.3, 0.7]
1781                    }
1782                }
1783            }
1784        }
1785
1786        // Calculate accuracy based on prediction success rate
1787        let accuracy = if total_predictions > 0 {
1788            correct_predictions as f64 / total_predictions as f64
1789        } else {
1790            0.3
1791        };
1792
1793        // Apply confidence factor from pattern detection
1794        let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1795
1796        // Ensure accuracy is within reasonable bounds [0.2, 0.9]
1797        confidence_adjusted_accuracy.max(0.2).min(0.9)
1798    }
1799
1800    /// Accept a QUIC-discovered address (from OBSERVED_ADDRESS frames)
1801    /// This replaces the need for STUN-based server reflexive discovery
1802    pub fn accept_quic_discovered_address(
1803        &mut self,
1804        peer_id: PeerId,
1805        discovered_address: SocketAddr,
1806    ) -> Result<(), DiscoveryError> {
1807        // Calculate priority for the discovered address first to avoid borrow issues
1808        let priority = self.calculate_quic_discovered_priority(&discovered_address);
1809
1810        // Get the active session for this peer
1811        let session = self.active_sessions.get_mut(&peer_id).ok_or_else(|| {
1812            DiscoveryError::InternalError(format!(
1813                "No active discovery session for peer {peer_id:?}"
1814            ))
1815        })?;
1816
1817        // Check if address already exists
1818        let already_exists = session
1819            .discovered_candidates
1820            .iter()
1821            .any(|c| c.address == discovered_address);
1822
1823        if already_exists {
1824            debug!(
1825                "QUIC-discovered address {} already in candidates",
1826                discovered_address
1827            );
1828            return Ok(());
1829        }
1830
1831        info!("Accepting QUIC-discovered address: {}", discovered_address);
1832
1833        // Create candidate from QUIC-discovered address
1834        let candidate = DiscoveryCandidate {
1835            address: discovered_address,
1836            priority,
1837            source: DiscoverySourceType::ServerReflexive,
1838            state: CandidateState::New,
1839        };
1840
1841        // Add to discovered candidates
1842        session.discovered_candidates.push(candidate);
1843        session.statistics.server_reflexive_candidates_found += 1;
1844
1845        Ok(())
1846    }
1847
1848    /// Calculate priority for QUIC-discovered addresses
1849    fn calculate_quic_discovered_priority(&self, address: &SocketAddr) -> u32 {
1850        // QUIC-discovered addresses get higher priority than STUN-discovered ones
1851        // because they come from actual QUIC connections and are more reliable
1852        let mut priority = 255; // Base priority for QUIC-discovered addresses
1853
1854        match address.ip() {
1855            IpAddr::V4(ipv4) => {
1856                if ipv4.is_private() {
1857                    priority -= 10; // Slight penalty for private addresses
1858                } else if ipv4.is_loopback() {
1859                    priority -= 20; // More penalty for loopback
1860                }
1861                // Public IPv4 keeps base priority of 255
1862            }
1863            IpAddr::V6(ipv6) => {
1864                // Prefer IPv6 for better NAT traversal potential
1865                priority += 10; // Boost for IPv6 (265 base)
1866
1867                if ipv6.is_loopback() {
1868                    priority -= 30; // Significant penalty for loopback
1869                } else if ipv6.is_multicast() {
1870                    priority -= 40; // Even more penalty for multicast
1871                } else if ipv6.is_unspecified() {
1872                    priority -= 50; // Unspecified should not be used
1873                } else {
1874                    // Check for specific IPv6 types
1875                    let segments = ipv6.segments();
1876                    if segments[0] & 0xFFC0 == 0xFE80 {
1877                        // Link-local IPv6 (fe80::/10)
1878                        priority -= 30; // Significant penalty
1879                    } else if segments[0] & 0xFE00 == 0xFC00 {
1880                        // Unique local IPv6 (fc00::/7)
1881                        priority -= 10; // Slight penalty, similar to private IPv4
1882                    }
1883                    // Global unicast IPv6 (2000::/3) keeps the boost
1884                }
1885            }
1886        }
1887
1888        priority
1889    }
1890
1891    /// Poll discovery progress and get pending events
1892    pub fn poll_discovery_progress(&mut self, peer_id: PeerId) -> Vec<DiscoveryEvent> {
1893        let mut events = Vec::new();
1894
1895        if let Some(session) = self.active_sessions.get_mut(&peer_id) {
1896            // Check if we have new candidates to report
1897            for candidate in &session.discovered_candidates {
1898                if matches!(candidate.state, CandidateState::New) {
1899                    events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
1900                        candidate: candidate.to_candidate_address(),
1901                        bootstrap_node: "0.0.0.0:0".parse().unwrap(), // Placeholder for QUIC-discovered
1902                    });
1903                }
1904            }
1905
1906            // Mark all new candidates as reported
1907            for candidate in &mut session.discovered_candidates {
1908                if matches!(candidate.state, CandidateState::New) {
1909                    candidate.state = CandidateState::Validating;
1910                }
1911            }
1912        }
1913
1914        events
1915    }
1916
1917    /// Get the current discovery status for a peer
1918    pub fn get_discovery_status(&self, peer_id: PeerId) -> Option<DiscoveryStatus> {
1919        self.active_sessions.get(&peer_id).map(|session| {
1920            let discovered_candidates = session
1921                .discovered_candidates
1922                .iter()
1923                .map(|c| c.to_candidate_address())
1924                .collect();
1925
1926            DiscoveryStatus {
1927                phase: session.current_phase.clone(),
1928                discovered_candidates,
1929                statistics: session.statistics.clone(),
1930                elapsed_time: session.started_at.elapsed(),
1931            }
1932        })
1933    }
1934}
1935
1936/// Current status of candidate discovery
1937#[derive(Debug, Clone)]
1938pub struct DiscoveryStatus {
1939    pub phase: DiscoveryPhase,
1940    pub discovered_candidates: Vec<CandidateAddress>,
1941    pub statistics: DiscoveryStatistics,
1942    pub elapsed_time: Duration,
1943}
1944
1945/// Final results of candidate discovery
1946#[derive(Debug, Clone)]
1947pub struct DiscoveryResults {
1948    pub candidates: Vec<ValidatedCandidate>,
1949    pub completion_time: Instant,
1950    pub statistics: DiscoveryStatistics,
1951}
1952
1953// Placeholder implementations for components to be implemented
1954
1955/// Platform-specific network interface discovery
1956pub trait NetworkInterfaceDiscovery {
1957    fn start_scan(&mut self) -> Result<(), String>;
1958    fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1959}
1960
1961/// Network interface information
1962#[derive(Debug, Clone, PartialEq)]
1963pub struct NetworkInterface {
1964    pub name: String,
1965    pub addresses: Vec<SocketAddr>,
1966    pub is_up: bool,
1967    pub is_wireless: bool,
1968    pub mtu: Option<u16>,
1969}
1970
1971/// Active connection state to a bootstrap node (production builds)
1972#[derive(Debug)]
1973
1974struct BootstrapConnection {
1975    /// Quinn connection to the bootstrap node
1976    connection: crate::Connection,
1977    /// Address of the bootstrap node
1978    address: SocketAddr,
1979    /// When this connection was established
1980    established_at: Instant,
1981    /// Request ID for correlation with responses
1982    request_id: u64,
1983}
1984
1985/// Discovery request message sent to bootstrap nodes
1986#[derive(Debug, Clone)]
1987
1988struct AddressObservationRequest {
1989    /// Unique request ID for correlation
1990    request_id: u64,
1991    /// Timestamp when request was sent
1992    timestamp: u64,
1993    /// Client capabilities for NAT traversal
1994    capabilities: u32,
1995}
1996
1997/// Server reflexive address discovery coordinator
1998#[derive(Debug)]
1999pub(crate) struct ServerReflexiveDiscovery {
2000    config: DiscoveryConfig,
2001    /// Active queries to bootstrap nodes
2002    active_queries: HashMap<BootstrapNodeId, QueryState>,
2003    /// Received responses from bootstrap nodes
2004    responses: VecDeque<ServerReflexiveResponse>,
2005    /// Query timeout tracker
2006    query_timeouts: HashMap<BootstrapNodeId, Instant>,
2007    /// Active Quinn connections to bootstrap nodes (production builds)
2008    active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
2009    /// Runtime handle for async operations (production builds)
2010    runtime_handle: Option<tokio::runtime::Handle>,
2011}
2012
2013impl ServerReflexiveDiscovery {
2014    pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2015        Self {
2016            config: config.clone(),
2017            active_queries: HashMap::new(),
2018            responses: VecDeque::new(),
2019            query_timeouts: HashMap::new(),
2020            active_connections: HashMap::new(),
2021            runtime_handle: tokio::runtime::Handle::try_current().ok(),
2022        }
2023    }
2024
2025    pub(crate) fn start_queries(
2026        &mut self,
2027        bootstrap_nodes: &[BootstrapNodeId],
2028        now: Instant,
2029    ) -> HashMap<BootstrapNodeId, QueryState> {
2030        debug!(
2031            "Starting server reflexive queries to {} bootstrap nodes",
2032            bootstrap_nodes.len()
2033        );
2034
2035        self.active_queries.clear();
2036        self.query_timeouts.clear();
2037
2038        self.active_connections.clear();
2039
2040        for &node_id in bootstrap_nodes {
2041            let query_state = QueryState::Pending {
2042                sent_at: now,
2043                attempts: 1,
2044            };
2045
2046            self.active_queries.insert(node_id, query_state);
2047            self.query_timeouts
2048                .insert(node_id, now + self.config.bootstrap_query_timeout);
2049
2050            debug!(
2051                "Starting server reflexive query to bootstrap node {:?}",
2052                node_id
2053            );
2054
2055            // Try to establish real Quinn connection in production
2056            if let Some(runtime) = &self.runtime_handle {
2057                self.start_quinn_query(node_id, runtime.clone(), now);
2058            } else {
2059                warn!(
2060                    "No async runtime available, falling back to simulation for node {:?}",
2061                    node_id
2062                );
2063                self.simulate_bootstrap_response(node_id, now);
2064            }
2065        }
2066
2067        self.active_queries.clone()
2068    }
2069
2070    /// Start queries with bootstrap node addresses (enhanced version)
2071    pub(crate) fn start_queries_with_addresses(
2072        &mut self,
2073        bootstrap_nodes: &[(BootstrapNodeId, SocketAddr)],
2074        now: Instant,
2075    ) -> HashMap<BootstrapNodeId, QueryState> {
2076        debug!(
2077            "Starting server reflexive queries to {} bootstrap nodes with addresses",
2078            bootstrap_nodes.len()
2079        );
2080
2081        self.active_queries.clear();
2082        self.query_timeouts.clear();
2083
2084        self.active_connections.clear();
2085
2086        for &(node_id, bootstrap_address) in bootstrap_nodes {
2087            let query_state = QueryState::Pending {
2088                sent_at: now,
2089                attempts: 1,
2090            };
2091
2092            self.active_queries.insert(node_id, query_state);
2093            self.query_timeouts
2094                .insert(node_id, now + self.config.bootstrap_query_timeout);
2095
2096            debug!(
2097                "Starting server reflexive query to bootstrap node {:?} at {}",
2098                node_id, bootstrap_address
2099            );
2100
2101            // Try to establish real Quinn connection in production
2102            if let Some(_runtime) = &self.runtime_handle {
2103                self.start_quinn_query_with_address(node_id, bootstrap_address, now);
2104            } else {
2105                warn!(
2106                    "No async runtime available, falling back to simulation for node {:?}",
2107                    node_id
2108                );
2109                self.simulate_bootstrap_response(node_id, now);
2110            }
2111        }
2112
2113        self.active_queries.clone()
2114    }
2115
2116    /// Start a real Quinn-based query to a bootstrap node (production builds)
2117    fn start_quinn_query(
2118        &mut self,
2119        node_id: BootstrapNodeId,
2120        _runtime: tokio::runtime::Handle,
2121        now: Instant,
2122    ) {
2123        // For now, we need the bootstrap node address. This will be provided by
2124        // the BootstrapNodeManager in the calling code. For this implementation,
2125        // we'll need to modify the interface to pass addresses.
2126
2127        // Generate a unique request ID
2128        let request_id = rand::random::<u64>();
2129
2130        debug!(
2131            "Starting Quinn connection to bootstrap node {:?} with request ID {}",
2132            node_id, request_id
2133        );
2134
2135        // In a complete implementation, this would:
2136        // 1. Create Quinn endpoint if not exists
2137        // 2. Connect to bootstrap node address
2138        // 3. Send AddressObservationRequest message
2139        // 4. Wait for ADD_ADDRESS frame response
2140        // 5. Parse response and create ServerReflexiveResponse
2141
2142        // For now, simulate success to maintain compatibility
2143        // TODO: Replace with real Quinn connection establishment
2144        self.simulate_bootstrap_response(node_id, now);
2145    }
2146
2147    /// Start a real Quinn-based query with full bootstrap node information
2148    pub(crate) fn start_quinn_query_with_address(
2149        &mut self,
2150        node_id: BootstrapNodeId,
2151        bootstrap_address: SocketAddr,
2152        now: Instant,
2153    ) {
2154        let request_id = rand::random::<u64>();
2155
2156        info!(
2157            "Establishing Quinn connection to bootstrap node {:?} at {}",
2158            node_id, bootstrap_address
2159        );
2160
2161        // We need to spawn this as a task since Quinn operations are async
2162        if let Some(runtime) = &self.runtime_handle {
2163            let timeout = self.config.bootstrap_query_timeout;
2164
2165            // Create a channel for receiving responses
2166            let (response_tx, _response_rx) = tokio::sync::mpsc::unbounded_channel();
2167
2168            // Store the receiver for polling
2169            // Note: In a complete implementation, we'd store this receiver and poll it
2170            // For now, we'll handle the response directly in the spawned task
2171
2172            runtime.spawn(async move {
2173                match Self::perform_bootstrap_query(bootstrap_address, request_id, timeout).await {
2174                    Ok(observed_address) => {
2175                        let response = ServerReflexiveResponse {
2176                            bootstrap_node: node_id,
2177                            observed_address,
2178                            response_time: now.elapsed(),
2179                            timestamp: Instant::now(),
2180                        };
2181
2182                        // Send response back to main thread
2183                        let _ = response_tx.send(response);
2184
2185                        info!(
2186                            "Successfully received observed address {} from bootstrap node {:?}",
2187                            observed_address, node_id
2188                        );
2189                    }
2190                    Err(e) => {
2191                        warn!(
2192                            "Failed to query bootstrap node {:?} at {}: {}",
2193                            node_id, bootstrap_address, e
2194                        );
2195                    }
2196                }
2197            });
2198        } else {
2199            warn!(
2200                "No async runtime available for Quinn query to {:?}",
2201                node_id
2202            );
2203            self.simulate_bootstrap_response(node_id, now);
2204        }
2205    }
2206
2207    /// Perform the actual Quinn-based bootstrap query (async)
2208    // NOTE: This function was written for Quinn's high-level API which we don't have
2209    // since ant-quic IS a fork of Quinn, not something that uses Quinn.
2210    // This needs to be rewritten to work with our low-level protocol implementation.
2211    async fn perform_bootstrap_query(
2212        _bootstrap_address: SocketAddr,
2213        _request_id: u64,
2214        _timeout: Duration,
2215    ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2216        // For testing, return a simulated external address
2217        // In production, this would connect to the bootstrap node and get the observed address
2218        // Temporarily return an error until this is properly implemented
2219        Err("Bootstrap query not implemented for low-level API".into())
2220
2221        /* Original implementation that used high-level Quinn API:
2222        use tokio::time::timeout as tokio_timeout;
2223        use crate::frame::{AddAddress, Frame};
2224        use crate::VarInt;
2225
2226        // Create a Quinn client configuration with NAT traversal transport parameters
2227        let mut transport_config = crate::TransportConfig::default();
2228
2229        // Enable NAT traversal transport parameter
2230        // This signals to the bootstrap node that we support NAT traversal
2231        let mut transport_params = std::collections::HashMap::new();
2232        transport_params.insert(0x3d7e9f0bca12fea6u64, vec![0x01]); // nat_traversal = 1 (client)
2233
2234        let client_config = ClientConfig::with_platform_verifier();
2235
2236        // Create Quinn endpoint with a random local port
2237        let local_addr = if bootstrap_address.is_ipv6() {
2238            "[::]:0"
2239        } else {
2240            "0.0.0.0:0"
2241        };
2242
2243        let mut endpoint = Endpoint::client(local_addr.parse()?)?;
2244        endpoint.set_default_client_config(client_config);
2245
2246        // Establish connection with timeout
2247        let connection = tokio_timeout(timeout, async {
2248            let connecting = endpoint.connect(bootstrap_address, "nat-traversal")
2249                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
2250            connecting.await
2251                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
2252        }).await??;
2253
2254        info!("Established QUIC connection to bootstrap node at {}", bootstrap_address);
2255
2256        // Send address observation request using a unidirectional stream
2257        let discovery_request = Self::create_discovery_request(request_id);
2258        let mut send_stream = connection.open_uni().await?;
2259        send_stream.write_all(&discovery_request).await?;
2260        send_stream.finish().await?;
2261
2262        debug!("Sent address observation request to bootstrap node");
2263
2264        // Wait for ADD_ADDRESS frame response via QUIC extension frames
2265        let observed_address = tokio_timeout(timeout / 2, async {
2266            Self::wait_for_add_address_frame(&connection, request_id).await
2267        }).await??;
2268
2269        info!("Received observed address {} from bootstrap node {}", observed_address, bootstrap_address);
2270
2271        // Clean up connection gracefully
2272        connection.close(0u32.into(), b"discovery complete");
2273        endpoint.close(0u32.into(), b"discovery complete");
2274
2275        // Wait a bit for graceful shutdown
2276        tokio::time::sleep(Duration::from_millis(100)).await;
2277
2278        Ok(observed_address)
2279        */
2280    }
2281
2282    /// Create a discovery request message
2283    fn create_discovery_request(request_id: u64) -> Vec<u8> {
2284        let mut request = Vec::new();
2285
2286        // Simple message format:
2287        // 8 bytes: request_id
2288        // 8 bytes: timestamp
2289        // 4 bytes: capabilities
2290        request.extend_from_slice(&request_id.to_be_bytes());
2291        request.extend_from_slice(
2292            &std::time::SystemTime::now()
2293                .duration_since(std::time::UNIX_EPOCH)
2294                .unwrap_or_default()
2295                .as_millis()
2296                .to_be_bytes()[8..16],
2297        ); // Take lower 8 bytes
2298        request.extend_from_slice(&1u32.to_be_bytes()); // Capabilities = 1 (basic NAT traversal)
2299
2300        debug!(
2301            "Created discovery request: {} bytes, request_id: {}",
2302            request.len(),
2303            request_id
2304        );
2305        request
2306    }
2307
2308    /// Wait for ADD_ADDRESS frame from bootstrap node
2309    async fn wait_for_add_address_frame(
2310        _connection: &Connection,
2311        _expected_request_id: u64,
2312    ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
2313        // TODO: This function needs to be rewritten to work with low-level Quinn API
2314        // The high-level accept_uni() and read_to_end() methods are not available
2315        Err("wait_for_add_address_frame not implemented for low-level API".into())
2316
2317        /* Original code that uses high-level API:
2318        use crate::frame::{Frame, AddAddress};
2319        use bytes::Bytes;
2320
2321        // Accept incoming unidirectional stream from bootstrap node
2322        let mut recv_stream = connection.accept_uni().await?;
2323
2324        // Read the frame data (with reasonable size limit)
2325        let frame_data = recv_stream.read_to_end(1024).await?;
2326
2327        if frame_data.is_empty() {
2328            return Err("Empty frame data received".into());
2329        }
2330
2331        debug!("Received {} bytes of frame data from bootstrap node", frame_data.len());
2332
2333        // Parse QUIC frames using our frame parser
2334        let frame_bytes = Bytes::from(frame_data);
2335        // Parse frame data directly without FrameIter
2336        // For now, simulate frame parsing
2337
2338        // Look for ADD_ADDRESS frame
2339        // For now, simulate successful frame parsing
2340        if !frame_data.is_empty() {
2341            // Simulate parsing an ADD_ADDRESS frame
2342            let simulated_address = "192.168.1.100:8080".parse().unwrap_or_else(|_| {
2343                SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 100)), 8080)
2344            });
2345            debug!("Simulated ADD_ADDRESS frame parsing: address={}", simulated_address);
2346            return Ok(simulated_address);
2347        }
2348
2349        // If we get here, no valid frame was found
2350        Err("No valid ADD_ADDRESS frame found".into())
2351        */
2352    }
2353
2354    /// Create a response channel for async communication (placeholder)
2355    fn create_response_channel(
2356        &self,
2357    ) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
2358        // In a complete implementation, this would create a channel
2359        // that feeds responses back to the main discovery manager
2360        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2361        // TODO: Store receiver and poll it in poll_queries()
2362        tx
2363    }
2364
2365    pub(crate) fn poll_queries(
2366        &mut self,
2367        _active_queries: &HashMap<BootstrapNodeId, QueryState>,
2368        now: Instant,
2369    ) -> Vec<ServerReflexiveResponse> {
2370        let mut responses = Vec::new();
2371
2372        // Drain any received responses
2373        while let Some(response) = self.responses.pop_front() {
2374            responses.push(response);
2375        }
2376
2377        // Check for timeouts
2378        let mut timed_out_nodes = Vec::new();
2379        for (&node_id, &timeout) in &self.query_timeouts {
2380            if now >= timeout {
2381                timed_out_nodes.push(node_id);
2382            }
2383        }
2384
2385        // Handle timeouts by retrying or marking as failed
2386        for node_id in timed_out_nodes {
2387            self.query_timeouts.remove(&node_id);
2388
2389            if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2390                match query_state {
2391                    QueryState::Pending { attempts, .. }
2392                        if *attempts < self.config.max_query_retries =>
2393                    {
2394                        // Retry the query
2395                        *attempts += 1;
2396                        let new_timeout = now + self.config.bootstrap_query_timeout;
2397                        self.query_timeouts.insert(node_id, new_timeout);
2398
2399                        debug!(
2400                            "Retrying server reflexive query to bootstrap node {:?} (attempt {})",
2401                            node_id, attempts
2402                        );
2403
2404                        // Send retry (in real implementation)
2405                        self.simulate_bootstrap_response(node_id, now);
2406                    }
2407                    _ => {
2408                        // Mark as failed
2409                        self.active_queries.insert(node_id, QueryState::Failed);
2410                        warn!(
2411                            "Server reflexive query to bootstrap node {:?} failed after retries",
2412                            node_id
2413                        );
2414                    }
2415                }
2416            }
2417        }
2418
2419        responses
2420    }
2421
2422    /// Simulate a bootstrap node response (temporary implementation)
2423    /// In production, this would be triggered by actual QUIC message reception
2424    fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
2425        // Simulate network delay
2426        let simulated_external_addr = match node_id.0 % 3 {
2427            0 => "203.0.113.1:45678".parse().unwrap(),
2428            1 => "198.51.100.2:45679".parse().unwrap(),
2429            _ => "192.0.2.3:45680".parse().unwrap(),
2430        };
2431
2432        let response = ServerReflexiveResponse {
2433            bootstrap_node: node_id,
2434            observed_address: simulated_external_addr,
2435            response_time: Duration::from_millis(50 + node_id.0 * 10),
2436            timestamp: now,
2437        };
2438
2439        self.responses.push_back(response);
2440
2441        // Mark query as completed
2442        if let Some(query_state) = self.active_queries.get_mut(&node_id) {
2443            *query_state = QueryState::Completed;
2444        }
2445
2446        debug!(
2447            "Received simulated server reflexive response from bootstrap node {:?}: {}",
2448            node_id, simulated_external_addr
2449        );
2450    }
2451}
2452
2453/// Symmetric NAT port prediction engine
2454#[derive(Debug)]
2455pub(crate) struct SymmetricNatPredictor {
2456    config: DiscoveryConfig,
2457}
2458
2459impl SymmetricNatPredictor {
2460    pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2461        Self {
2462            config: config.clone(),
2463        }
2464    }
2465
2466    /// Generate predicted candidate addresses for symmetric NAT traversal
2467    ///
2468    /// Uses observed port allocation patterns to predict likely external ports
2469    /// that symmetric NATs will assign for new connections
2470    pub(crate) fn generate_predictions(
2471        &mut self,
2472        pattern_analysis: &PatternAnalysisState,
2473        max_count: usize,
2474    ) -> Vec<DiscoveryCandidate> {
2475        let mut predictions = Vec::new();
2476
2477        if pattern_analysis.allocation_history.is_empty() || max_count == 0 {
2478            return predictions;
2479        }
2480
2481        // Use most recent allocations for base prediction
2482        let recent_events: Vec<_> = pattern_analysis
2483            .allocation_history
2484            .iter()
2485            .rev()
2486            .take(5) // Analyze last 5 allocations for pattern detection
2487            .collect();
2488
2489        if recent_events.len() < 2 {
2490            return predictions;
2491        }
2492
2493        match &pattern_analysis.detected_pattern {
2494            Some(pattern) => {
2495                predictions.extend(self.generate_pattern_based_predictions(pattern, max_count));
2496            }
2497            None => {
2498                predictions.extend(self.generate_heuristic_predictions(&recent_events, max_count));
2499            }
2500        }
2501
2502        // Ensure predictions don't exceed the maximum count
2503        predictions.truncate(max_count);
2504        predictions
2505    }
2506
2507    /// Generate predictions based on detected allocation pattern
2508    fn generate_pattern_based_predictions(
2509        &self,
2510        pattern: &PortAllocationPattern,
2511        max_count: usize,
2512    ) -> Vec<DiscoveryCandidate> {
2513        let mut predictions = Vec::new();
2514
2515        match pattern.pattern_type {
2516            AllocationPatternType::Sequential => {
2517                // Predict next sequential ports
2518                for i in 1..=max_count as u16 {
2519                    let predicted_port = pattern.base_port.wrapping_add(i);
2520                    if self.is_valid_port(predicted_port) {
2521                        predictions.push(
2522                            self.create_predicted_candidate(predicted_port, pattern.confidence),
2523                        );
2524                    }
2525                }
2526            }
2527            AllocationPatternType::FixedStride => {
2528                // Predict based on fixed stride pattern
2529                for i in 1..=max_count as u16 {
2530                    let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
2531                    if self.is_valid_port(predicted_port) {
2532                        predictions.push(
2533                            self.create_predicted_candidate(predicted_port, pattern.confidence),
2534                        );
2535                    }
2536                }
2537            }
2538            AllocationPatternType::PoolBased => {
2539                // Generate predictions within detected pool boundaries
2540                if let Some((min_port, max_port)) = pattern.pool_boundaries {
2541                    let pool_size = max_port - min_port + 1;
2542                    let step = (pool_size / max_count as u16).max(1);
2543
2544                    for i in 0..max_count as u16 {
2545                        let predicted_port = min_port + (i * step);
2546                        if predicted_port <= max_port && self.is_valid_port(predicted_port) {
2547                            predictions.push(self.create_predicted_candidate(
2548                                predicted_port,
2549                                pattern.confidence * 0.8,
2550                            ));
2551                        }
2552                    }
2553                }
2554            }
2555            AllocationPatternType::TimeBased => {
2556                // Predict based on time-based allocation patterns
2557                // Use a conservative approach with sequential prediction
2558                for i in 1..=max_count as u16 {
2559                    let predicted_port = pattern.base_port.wrapping_add(i);
2560                    if self.is_valid_port(predicted_port) {
2561                        predictions.push(
2562                            self.create_predicted_candidate(
2563                                predicted_port,
2564                                pattern.confidence * 0.6,
2565                            ),
2566                        );
2567                    }
2568                }
2569            }
2570            AllocationPatternType::Random | AllocationPatternType::Unknown => {
2571                // For random/unknown patterns, use statistical approach
2572                predictions
2573                    .extend(self.generate_statistical_predictions(pattern.base_port, max_count));
2574            }
2575        }
2576
2577        predictions
2578    }
2579
2580    /// Generate predictions using heuristics when no clear pattern is detected
2581    fn generate_heuristic_predictions(
2582        &self,
2583        recent_events: &[&PortAllocationEvent],
2584        max_count: usize,
2585    ) -> Vec<DiscoveryCandidate> {
2586        let mut predictions = Vec::new();
2587
2588        if let Some(latest_event) = recent_events.first() {
2589            let base_port = latest_event.port;
2590
2591            // Try multiple common NAT behaviors
2592
2593            // 1. Sequential allocation (most common for symmetric NATs)
2594            for i in 1..=(max_count / 3) as u16 {
2595                let predicted_port = base_port.wrapping_add(i);
2596                if self.is_valid_port(predicted_port) {
2597                    predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
2598                }
2599            }
2600
2601            // 2. Even/odd port pairs (common in some NAT implementations)
2602            if base_port % 2 == 0 {
2603                let predicted_port = base_port + 1;
2604                if self.is_valid_port(predicted_port) {
2605                    predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
2606                }
2607            }
2608
2609            // 3. Common stride patterns (2, 4, 8, 16)
2610            for stride in [2, 4, 8, 16] {
2611                if predictions.len() >= max_count {
2612                    break;
2613                }
2614                let predicted_port = base_port.wrapping_add(stride);
2615                if self.is_valid_port(predicted_port) {
2616                    predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
2617                }
2618            }
2619
2620            // 4. Try to detect stride from recent allocations
2621            if recent_events.len() >= 2 {
2622                let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
2623                if stride > 0 && stride <= 100 {
2624                    // Reasonable stride range
2625                    for i in 1..=3 {
2626                        if predictions.len() >= max_count {
2627                            break;
2628                        }
2629                        let predicted_port = base_port.wrapping_add(stride * i);
2630                        if self.is_valid_port(predicted_port) {
2631                            predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
2632                        }
2633                    }
2634                }
2635            }
2636        }
2637
2638        predictions.truncate(max_count);
2639        predictions
2640    }
2641
2642    /// Generate statistical predictions for random/unknown patterns
2643    fn generate_statistical_predictions(
2644        &self,
2645        base_port: u16,
2646        max_count: usize,
2647    ) -> Vec<DiscoveryCandidate> {
2648        let mut predictions = Vec::new();
2649
2650        // Common port ranges used by NATs
2651        let common_ranges = [
2652            (1024, 5000),   // User ports
2653            (5000, 10000),  // Common NAT range
2654            (10000, 20000), // Extended range
2655            (32768, 65535), // Dynamic/private ports
2656        ];
2657
2658        // Find which range the base port is in
2659        let current_range = common_ranges
2660            .iter()
2661            .find(|(min, max)| base_port >= *min && base_port <= *max)
2662            .copied()
2663            .unwrap_or((1024, 65535));
2664
2665        // Generate predictions within the detected range
2666        let range_size = current_range.1 - current_range.0;
2667        let step = (range_size / max_count as u16).max(1);
2668
2669        for i in 0..max_count {
2670            let offset = (i as u16 * step) % range_size;
2671            let predicted_port = current_range.0 + offset;
2672
2673            if self.is_valid_port(predicted_port) && predicted_port != base_port {
2674                predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
2675            }
2676        }
2677
2678        predictions
2679    }
2680
2681    /// Check if a port number is valid for prediction
2682    fn is_valid_port(&self, port: u16) -> bool {
2683        // Avoid well-known ports and ensure it's in usable range
2684        // Port 0 is invalid, ports 1-1023 are privileged
2685        // Some common ports to avoid for NAT predictions:
2686        const COMMON_PORTS_TO_AVOID: &[u16] = &[
2687            21,    // FTP
2688            22,    // SSH
2689            23,    // Telnet
2690            25,    // SMTP
2691            53,    // DNS
2692            80,    // HTTP
2693            110,   // POP3
2694            143,   // IMAP
2695            443,   // HTTPS
2696            445,   // SMB
2697            3389,  // RDP
2698            5432,  // PostgreSQL
2699            3306,  // MySQL
2700            6379,  // Redis
2701            27017, // MongoDB
2702        ];
2703
2704        port != 0 && port >= 1024 && !COMMON_PORTS_TO_AVOID.contains(&port)
2705    }
2706
2707    /// Create a predicted candidate with appropriate priority
2708    fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2709        // Calculate priority based on confidence level
2710        // Higher confidence gets higher priority
2711        let base_priority = 50; // Base priority for predicted candidates
2712        let priority = (base_priority as f64 * confidence) as u32;
2713
2714        DiscoveryCandidate {
2715            address: SocketAddr::new(
2716                "0.0.0.0".parse().unwrap(), // Placeholder IP, will be filled by caller
2717                port,
2718            ),
2719            priority,
2720            source: DiscoverySourceType::Predicted,
2721            state: CandidateState::New,
2722        }
2723    }
2724
2725    /// Analyze port allocation history to detect patterns
2726    pub(crate) fn analyze_allocation_patterns(
2727        &self,
2728        history: &VecDeque<PortAllocationEvent>,
2729    ) -> Option<PortAllocationPattern> {
2730        if history.len() < 3 {
2731            return None;
2732        }
2733
2734        let recent_ports: Vec<u16> = history
2735            .iter()
2736            .rev()
2737            .take(10)
2738            .map(|event| event.port)
2739            .collect();
2740
2741        // Try to detect sequential pattern
2742        if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2743            return Some(pattern);
2744        }
2745
2746        // Try to detect fixed stride pattern
2747        if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2748            return Some(pattern);
2749        }
2750
2751        // Try to detect pool-based allocation
2752        if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2753            return Some(pattern);
2754        }
2755
2756        // Try to detect time-based allocation
2757        if let Some(pattern) = self.detect_time_based_pattern(history) {
2758            return Some(pattern);
2759        }
2760
2761        None
2762    }
2763
2764    /// Detect sequential port allocation pattern
2765    fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2766        if ports.len() < 3 {
2767            return None;
2768        }
2769
2770        let mut sequential_count = 0;
2771        let mut total_comparisons = 0;
2772
2773        for i in 1..ports.len() {
2774            total_comparisons += 1;
2775            let diff = ports[i - 1].wrapping_sub(ports[i]);
2776            if diff == 1 {
2777                sequential_count += 1;
2778            }
2779        }
2780
2781        let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2782
2783        if sequential_ratio >= 0.6 {
2784            // At least 60% sequential
2785            let confidence = (sequential_ratio * 0.9).min(0.9); // Cap at 90%
2786
2787            Some(PortAllocationPattern {
2788                pattern_type: AllocationPatternType::Sequential,
2789                base_port: ports[0],
2790                stride: 1,
2791                pool_boundaries: None,
2792                confidence,
2793            })
2794        } else {
2795            None
2796        }
2797    }
2798
2799    /// Detect fixed stride allocation pattern
2800    fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2801        if ports.len() < 4 {
2802            return None;
2803        }
2804
2805        // Calculate differences between consecutive ports
2806        let mut diffs = Vec::new();
2807        for i in 1..ports.len() {
2808            let diff = ports[i - 1].wrapping_sub(ports[i]);
2809            if diff > 0 && diff <= 1000 {
2810                // Reasonable stride range
2811                diffs.push(diff);
2812            }
2813        }
2814
2815        if diffs.len() < 2 {
2816            return None;
2817        }
2818
2819        // Find the most common difference
2820        let mut diff_counts = std::collections::HashMap::new();
2821        for &diff in &diffs {
2822            *diff_counts.entry(diff).or_insert(0) += 1;
2823        }
2824
2825        let (most_common_diff, count) = diff_counts
2826            .iter()
2827            .max_by_key(|&(_, &count)| count)
2828            .map(|(&diff, &count)| (diff, count))?;
2829
2830        let consistency_ratio = count as f64 / diffs.len() as f64;
2831
2832        if consistency_ratio >= 0.5 && most_common_diff > 1 {
2833            // At least 50% consistent, not sequential
2834            let confidence = (consistency_ratio * 0.8).min(0.8); // Cap at 80%
2835
2836            Some(PortAllocationPattern {
2837                pattern_type: AllocationPatternType::FixedStride,
2838                base_port: ports[0],
2839                stride: most_common_diff,
2840                pool_boundaries: None,
2841                confidence,
2842            })
2843        } else {
2844            None
2845        }
2846    }
2847
2848    /// Detect pool-based allocation pattern
2849    fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2850        if ports.len() < 5 {
2851            return None;
2852        }
2853
2854        let min_port = *ports.iter().min()?;
2855        let max_port = *ports.iter().max()?;
2856        let range = max_port - min_port;
2857
2858        // Check if ports are distributed within a reasonable range
2859        if range > 0 && range <= 10000 {
2860            // Reasonable pool size
2861            // Check distribution uniformity
2862            let expected_step = range / (ports.len() as u16 - 1);
2863            let mut uniform_score = 0.0;
2864
2865            let mut sorted_ports = ports.to_vec();
2866            sorted_ports.sort_unstable();
2867
2868            for i in 1..sorted_ports.len() {
2869                let actual_step = sorted_ports[i] - sorted_ports[i - 1];
2870                let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2871                let normalized_diff = step_diff / expected_step as f64;
2872                uniform_score += 1.0 - normalized_diff.min(1.0);
2873            }
2874
2875            uniform_score /= (sorted_ports.len() - 1) as f64;
2876
2877            if uniform_score >= 0.4 {
2878                // Reasonably uniform distribution
2879                let confidence = (uniform_score * 0.7).min(0.7); // Cap at 70%
2880
2881                Some(PortAllocationPattern {
2882                    pattern_type: AllocationPatternType::PoolBased,
2883                    base_port: min_port,
2884                    stride: expected_step,
2885                    pool_boundaries: Some((min_port, max_port)),
2886                    confidence,
2887                })
2888            } else {
2889                None
2890            }
2891        } else {
2892            None
2893        }
2894    }
2895
2896    /// Detect time-based allocation pattern
2897    fn detect_time_based_pattern(
2898        &self,
2899        history: &VecDeque<PortAllocationEvent>,
2900    ) -> Option<PortAllocationPattern> {
2901        if history.len() < 4 {
2902            return None;
2903        }
2904
2905        // Calculate time intervals between allocations
2906        let mut time_intervals = Vec::new();
2907        let events: Vec<_> = history.iter().collect();
2908
2909        for i in 1..events.len() {
2910            let interval = events[i - 1].timestamp.duration_since(events[i].timestamp);
2911            time_intervals.push(interval);
2912        }
2913
2914        if time_intervals.is_empty() {
2915            return None;
2916        }
2917
2918        // Check for consistent timing patterns
2919        let avg_interval =
2920            time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2921
2922        let mut consistency_score = 0.0;
2923        for interval in &time_intervals {
2924            let diff = (*interval).abs_diff(avg_interval);
2925
2926            let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2927            consistency_score += 1.0 - normalized_diff.min(1.0);
2928        }
2929
2930        consistency_score /= time_intervals.len() as f64;
2931
2932        if consistency_score >= 0.6
2933            && avg_interval.as_millis() > 100
2934            && avg_interval.as_millis() < 10000
2935        {
2936            let confidence = (consistency_score * 0.6).min(0.6); // Cap at 60%
2937
2938            Some(PortAllocationPattern {
2939                pattern_type: AllocationPatternType::TimeBased,
2940                base_port: events[0].port,
2941                stride: 1, // Default stride for time-based
2942                pool_boundaries: None,
2943                confidence,
2944            })
2945        } else {
2946            None
2947        }
2948    }
2949
2950    /// Generate confidence-scored predictions for a given base address
2951    pub(crate) fn generate_confidence_scored_predictions(
2952        &mut self,
2953        base_address: SocketAddr,
2954        pattern_analysis: &PatternAnalysisState,
2955        max_count: usize,
2956    ) -> Vec<(DiscoveryCandidate, f64)> {
2957        let mut scored_predictions = Vec::new();
2958
2959        // Generate base predictions
2960        let predictions = self.generate_predictions(pattern_analysis, max_count);
2961
2962        for mut prediction in predictions {
2963            // Update the IP address from the placeholder
2964            prediction.address = SocketAddr::new(base_address.ip(), prediction.address.port());
2965
2966            // Calculate confidence score based on multiple factors
2967            let confidence =
2968                self.calculate_prediction_confidence(&prediction, pattern_analysis, base_address);
2969
2970            scored_predictions.push((prediction, confidence));
2971        }
2972
2973        // Sort by confidence (highest first)
2974        scored_predictions
2975            .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2976
2977        scored_predictions
2978    }
2979
2980    /// Calculate confidence score for a prediction
2981    fn calculate_prediction_confidence(
2982        &self,
2983        prediction: &DiscoveryCandidate,
2984        pattern_analysis: &PatternAnalysisState,
2985        base_address: SocketAddr,
2986    ) -> f64 {
2987        let mut confidence = 0.5; // Base confidence
2988
2989        // Factor in pattern analysis confidence
2990        if let Some(ref pattern) = pattern_analysis.detected_pattern {
2991            confidence += pattern.confidence * 0.3;
2992        }
2993
2994        // Factor in prediction accuracy from pattern analysis
2995        confidence += pattern_analysis.prediction_accuracy * 0.2;
2996
2997        // Factor in port proximity to base address
2998        let port_distance = (prediction.address.port() as i32 - base_address.port() as i32).abs();
2999        let proximity_score = if port_distance <= 10 {
3000            0.2
3001        } else if port_distance <= 100 {
3002            0.1
3003        } else {
3004            0.0
3005        };
3006        confidence += proximity_score;
3007
3008        // Factor in port range (prefer common NAT ranges)
3009        let port_range_score = match prediction.address.port() {
3010            1024..=4999 => 0.1,    // User ports
3011            5000..=9999 => 0.15,   // Common NAT range
3012            10000..=20000 => 0.1,  // Extended range
3013            32768..=65535 => 0.05, // Dynamic ports
3014            _ => 0.0,
3015        };
3016        confidence += port_range_score;
3017
3018        // Ensure confidence is within valid range [0.0, 1.0]
3019        confidence.max(0.0).min(1.0)
3020    }
3021
3022    /// Update pattern analysis with new allocation event
3023    pub(crate) fn update_pattern_analysis(
3024        &self,
3025        pattern_analysis: &mut PatternAnalysisState,
3026        new_event: PortAllocationEvent,
3027    ) {
3028        // Add new event to history
3029        pattern_analysis.allocation_history.push_back(new_event);
3030
3031        // Keep history size manageable
3032        if pattern_analysis.allocation_history.len() > 20 {
3033            pattern_analysis.allocation_history.pop_front();
3034        }
3035
3036        // Re-analyze patterns with updated history
3037        pattern_analysis.detected_pattern =
3038            self.analyze_allocation_patterns(&pattern_analysis.allocation_history);
3039
3040        // Update confidence level
3041        if let Some(ref pattern) = pattern_analysis.detected_pattern {
3042            pattern_analysis.confidence_level = pattern.confidence;
3043        } else {
3044            pattern_analysis.confidence_level *= 0.9; // Decay confidence if no pattern
3045        }
3046
3047        // Update prediction accuracy based on recent success
3048        // This would be updated based on actual validation results
3049        // For now, maintain current accuracy with slight decay
3050        pattern_analysis.prediction_accuracy *= 0.95;
3051    }
3052}
3053
3054/// Bootstrap node health manager with comprehensive monitoring and failover
3055#[derive(Debug)]
3056pub(crate) struct BootstrapNodeManager {
3057    config: DiscoveryConfig,
3058    bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
3059    health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
3060    performance_tracker: BootstrapPerformanceTracker,
3061    last_health_check: Option<Instant>,
3062    health_check_interval: Duration,
3063    failover_threshold: f64,
3064    discovery_sources: Vec<BootstrapDiscoverySource>,
3065}
3066
3067/// Enhanced bootstrap node information with health tracking
3068#[derive(Debug, Clone)]
3069pub(crate) struct BootstrapNodeInfo {
3070    /// Network address of the bootstrap node
3071    pub address: SocketAddr,
3072    /// Last successful contact time
3073    pub last_seen: Instant,
3074    /// Whether this node can coordinate NAT traversal
3075    pub can_coordinate: bool,
3076    /// Current health status
3077    pub health_status: BootstrapHealthStatus,
3078    /// Node capabilities
3079    pub capabilities: BootstrapCapabilities,
3080    /// Priority for selection (higher = preferred)
3081    pub priority: u32,
3082    /// Source where this node was discovered
3083    pub discovery_source: BootstrapDiscoverySource,
3084}
3085
3086/// Health status of a bootstrap node
3087#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3088pub(crate) enum BootstrapHealthStatus {
3089    /// Node is healthy and responsive
3090    Healthy,
3091    /// Node is experiencing issues but still usable
3092    Degraded,
3093    /// Node is unresponsive or failing
3094    Unhealthy,
3095    /// Node status is unknown (not yet tested)
3096    Unknown,
3097}
3098
3099/// Capabilities of a bootstrap node
3100#[derive(Debug, Clone, Default)]
3101pub(crate) struct BootstrapCapabilities {
3102    /// Supports NAT traversal coordination
3103    pub supports_nat_traversal: bool,
3104    /// Supports IPv6
3105    pub supports_ipv6: bool,
3106    /// Supports QUIC extension frames
3107    pub supports_quic_extensions: bool,
3108    /// Maximum concurrent coordinations
3109    pub max_concurrent_coordinations: u32,
3110    /// Supported QUIC versions
3111    pub supported_quic_versions: Vec<u32>,
3112}
3113
3114/// Health statistics for a bootstrap node
3115#[derive(Debug, Clone, Default)]
3116pub(crate) struct BootstrapHealthStats {
3117    /// Total number of connection attempts
3118    pub connection_attempts: u32,
3119    /// Number of successful connections
3120    pub successful_connections: u32,
3121    /// Number of failed connections
3122    pub failed_connections: u32,
3123    /// Average response time (RTT)
3124    pub average_rtt: Option<Duration>,
3125    /// Recent RTT measurements
3126    pub recent_rtts: VecDeque<Duration>,
3127    /// Last health check time
3128    pub last_health_check: Option<Instant>,
3129    /// Consecutive failures
3130    pub consecutive_failures: u32,
3131    /// Total coordination requests handled
3132    pub coordination_requests: u32,
3133    /// Successful coordinations
3134    pub successful_coordinations: u32,
3135}
3136
3137/// Performance tracker for bootstrap nodes
3138#[derive(Debug, Default)]
3139pub(crate) struct BootstrapPerformanceTracker {
3140    /// Overall success rate across all nodes
3141    pub overall_success_rate: f64,
3142    /// Average response time across all nodes
3143    pub average_response_time: Duration,
3144    /// Best performing nodes (by ID)
3145    pub best_performers: Vec<BootstrapNodeId>,
3146    /// Nodes currently in failover state
3147    pub failover_nodes: Vec<BootstrapNodeId>,
3148    /// Performance history
3149    pub performance_history: VecDeque<PerformanceSnapshot>,
3150}
3151
3152/// Snapshot of performance metrics at a point in time
3153#[derive(Debug, Clone)]
3154pub(crate) struct PerformanceSnapshot {
3155    pub timestamp: Instant,
3156    pub active_nodes: u32,
3157    pub success_rate: f64,
3158    pub average_rtt: Duration,
3159}
3160
3161/// Sources for discovering bootstrap nodes
3162#[derive(Debug, Clone, PartialEq, Eq)]
3163pub(crate) enum BootstrapDiscoverySource {
3164    /// Configured statically
3165    Static,
3166    /// Discovered via DNS
3167    DNS,
3168    /// Discovered via DHT/peer exchange
3169    DHT,
3170    /// Discovered via multicast
3171    Multicast,
3172    /// Provided by user configuration
3173    UserProvided,
3174}
3175
3176impl BootstrapNodeManager {
3177    pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3178        Self {
3179            config: config.clone(),
3180            bootstrap_nodes: HashMap::new(),
3181            health_stats: HashMap::new(),
3182            performance_tracker: BootstrapPerformanceTracker::default(),
3183            last_health_check: None,
3184            health_check_interval: Duration::from_secs(30),
3185            failover_threshold: 0.3, // 30% success rate threshold
3186            discovery_sources: vec![
3187                BootstrapDiscoverySource::Static,
3188                BootstrapDiscoverySource::DNS,
3189                BootstrapDiscoverySource::UserProvided,
3190            ],
3191        }
3192    }
3193
3194    /// Update bootstrap nodes with enhanced information
3195    pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
3196        let now = Instant::now();
3197
3198        // Convert BootstrapNode to BootstrapNodeInfo
3199        for (i, node) in nodes.into_iter().enumerate() {
3200            let node_id = BootstrapNodeId(i as u64);
3201
3202            let node_info = BootstrapNodeInfo {
3203                address: node.address,
3204                last_seen: node.last_seen,
3205                can_coordinate: node.can_coordinate,
3206                health_status: BootstrapHealthStatus::Unknown,
3207                capabilities: BootstrapCapabilities {
3208                    supports_nat_traversal: node.can_coordinate,
3209                    supports_ipv6: node.address.is_ipv6(),
3210                    supports_quic_extensions: true, // Assume support
3211                    max_concurrent_coordinations: 100, // Default
3212                    supported_quic_versions: vec![1], // QUIC v1
3213                },
3214                priority: self.calculate_initial_priority(&node),
3215                discovery_source: BootstrapDiscoverySource::UserProvided,
3216            };
3217
3218            self.bootstrap_nodes.insert(node_id, node_info);
3219
3220            // Initialize health stats if not exists
3221            self.health_stats.entry(node_id).or_default();
3222        }
3223
3224        info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
3225        self.schedule_health_check(now);
3226    }
3227
3228    /// Get active bootstrap nodes sorted by health and performance
3229    pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
3230        let mut active_nodes: Vec<_> = self
3231            .bootstrap_nodes
3232            .iter()
3233            .filter(|(_, node)| {
3234                matches!(
3235                    node.health_status,
3236                    BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown
3237                )
3238            })
3239            .map(|(&id, node)| (id, node))
3240            .collect();
3241
3242        // Sort by priority and health
3243        active_nodes.sort_by(|a, b| {
3244            // First by health status
3245            let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
3246            if health_cmp != std::cmp::Ordering::Equal {
3247                return health_cmp;
3248            }
3249
3250            // Then by priority
3251            b.1.priority.cmp(&a.1.priority)
3252        });
3253
3254        active_nodes.into_iter().map(|(id, _)| id).collect()
3255    }
3256
3257    /// Get bootstrap node address
3258    pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
3259        self.bootstrap_nodes.get(&id).map(|node| node.address)
3260    }
3261
3262    /// Perform health check on all bootstrap nodes
3263    pub(crate) fn perform_health_check(&mut self, now: Instant) {
3264        if let Some(last_check) = self.last_health_check {
3265            if now.duration_since(last_check) < self.health_check_interval {
3266                return; // Too soon for another health check
3267            }
3268        }
3269
3270        debug!(
3271            "Performing health check on {} bootstrap nodes",
3272            self.bootstrap_nodes.len()
3273        );
3274
3275        // Collect node IDs to check to avoid borrowing issues
3276        let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
3277
3278        for node_id in node_ids {
3279            self.check_node_health(node_id, now);
3280        }
3281
3282        self.update_performance_metrics(now);
3283        self.last_health_check = Some(now);
3284    }
3285
3286    /// Check health of a specific bootstrap node
3287    fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
3288        // Get current health status and node info before mutable operations
3289        let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
3290        if node_info_opt.is_none() {
3291            return; // Node not found
3292        }
3293        let node_info_for_priority = node_info_opt.unwrap();
3294        let current_health_status = node_info_for_priority.health_status;
3295
3296        // Calculate metrics from stats
3297        let (_success_rate, new_health_status, _average_rtt) = {
3298            let stats = self.health_stats.get_mut(&node_id).unwrap();
3299
3300            // Calculate success rate
3301            let success_rate = if stats.connection_attempts > 0 {
3302                stats.successful_connections as f64 / stats.connection_attempts as f64
3303            } else {
3304                1.0 // No attempts yet, assume healthy
3305            };
3306
3307            // Calculate average RTT
3308            if !stats.recent_rtts.is_empty() {
3309                let total_rtt: Duration = stats.recent_rtts.iter().sum();
3310                stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
3311            }
3312
3313            // Determine health status
3314            let new_health_status = if stats.consecutive_failures >= 3 {
3315                BootstrapHealthStatus::Unhealthy
3316            } else if success_rate < self.failover_threshold {
3317                BootstrapHealthStatus::Degraded
3318            } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
3319                BootstrapHealthStatus::Healthy
3320            } else {
3321                current_health_status // Keep current status
3322            };
3323
3324            stats.last_health_check = Some(now);
3325
3326            (success_rate, new_health_status, stats.average_rtt)
3327        };
3328
3329        // Calculate new priority using stats snapshot
3330        let stats_snapshot = self.health_stats.get(&node_id).unwrap();
3331        let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
3332
3333        // Now update the node info
3334        if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3335            if new_health_status != node_info.health_status {
3336                info!(
3337                    "Bootstrap node {:?} health status changed: {:?} -> {:?}",
3338                    node_id, node_info.health_status, new_health_status
3339                );
3340                node_info.health_status = new_health_status;
3341            }
3342
3343            node_info.priority = new_priority;
3344        }
3345    }
3346
3347    /// Record connection attempt result
3348    pub(crate) fn record_connection_attempt(
3349        &mut self,
3350        node_id: BootstrapNodeId,
3351        success: bool,
3352        rtt: Option<Duration>,
3353    ) {
3354        if let Some(stats) = self.health_stats.get_mut(&node_id) {
3355            stats.connection_attempts += 1;
3356
3357            if success {
3358                stats.successful_connections += 1;
3359                stats.consecutive_failures = 0;
3360
3361                if let Some(rtt) = rtt {
3362                    stats.recent_rtts.push_back(rtt);
3363                    if stats.recent_rtts.len() > 10 {
3364                        stats.recent_rtts.pop_front();
3365                    }
3366                }
3367            } else {
3368                stats.failed_connections += 1;
3369                stats.consecutive_failures += 1;
3370            }
3371        }
3372
3373        // Update node's last seen time if successful
3374        if success {
3375            if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
3376                node_info.last_seen = Instant::now();
3377            }
3378        }
3379    }
3380
3381    /// Record coordination request result
3382    pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
3383        if let Some(stats) = self.health_stats.get_mut(&node_id) {
3384            stats.coordination_requests += 1;
3385            if success {
3386                stats.successful_coordinations += 1;
3387            }
3388        }
3389    }
3390
3391    /// Get best performing bootstrap nodes
3392    pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
3393        let mut nodes_with_scores: Vec<_> = self
3394            .bootstrap_nodes
3395            .iter()
3396            .filter_map(|(&id, node)| {
3397                if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
3398                    let score = self.calculate_performance_score(id, node);
3399                    Some((id, score))
3400                } else {
3401                    None
3402                }
3403            })
3404            .collect();
3405
3406        nodes_with_scores
3407            .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
3408
3409        nodes_with_scores
3410            .into_iter()
3411            .take(count)
3412            .map(|(id, _)| id)
3413            .collect()
3414    }
3415
3416    /// Discover new bootstrap nodes dynamically
3417    pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
3418        let mut discovered_nodes = Vec::new();
3419
3420        // Try DNS discovery
3421        if let Ok(dns_nodes) = self.discover_via_dns() {
3422            discovered_nodes.extend(dns_nodes);
3423        }
3424
3425        // Try multicast discovery (for local networks)
3426        if let Ok(multicast_nodes) = self.discover_via_multicast() {
3427            discovered_nodes.extend(multicast_nodes);
3428        }
3429
3430        // Add discovered nodes to our registry
3431        for node in &discovered_nodes {
3432            let node_id = BootstrapNodeId(rand::random());
3433            self.bootstrap_nodes.insert(node_id, node.clone());
3434            self.health_stats
3435                .insert(node_id, BootstrapHealthStats::default());
3436        }
3437
3438        if !discovered_nodes.is_empty() {
3439            info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
3440        }
3441
3442        Ok(discovered_nodes)
3443    }
3444
3445    /// Discover bootstrap nodes via DNS
3446    fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3447        // This would implement DNS-based discovery
3448        // For now, return empty list
3449        debug!("DNS-based bootstrap discovery not yet implemented");
3450        Ok(Vec::new())
3451    }
3452
3453    /// Discover bootstrap nodes via multicast
3454    fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
3455        // This would implement multicast-based discovery for local networks
3456        // For now, return empty list
3457        debug!("Multicast-based bootstrap discovery not yet implemented");
3458        Ok(Vec::new())
3459    }
3460
3461    /// Calculate initial priority for a bootstrap node
3462    fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
3463        let mut priority = 100; // Base priority
3464
3465        if node.can_coordinate {
3466            priority += 50;
3467        }
3468
3469        if let Some(rtt) = node.rtt {
3470            if rtt < Duration::from_millis(50) {
3471                priority += 30;
3472            } else if rtt < Duration::from_millis(100) {
3473                priority += 20;
3474            } else if rtt < Duration::from_millis(200) {
3475                priority += 10;
3476            }
3477        }
3478
3479        // Prefer IPv6 for better NAT traversal potential
3480        if node.address.is_ipv6() {
3481            priority += 10;
3482        }
3483
3484        priority
3485    }
3486
3487    /// Calculate dynamic priority based on performance
3488    fn calculate_dynamic_priority(
3489        &self,
3490        node_info: &BootstrapNodeInfo,
3491        stats: &BootstrapHealthStats,
3492    ) -> u32 {
3493        let mut priority = node_info.priority;
3494
3495        // Adjust based on success rate
3496        let success_rate = if stats.connection_attempts > 0 {
3497            stats.successful_connections as f64 / stats.connection_attempts as f64
3498        } else {
3499            1.0
3500        };
3501
3502        priority = (priority as f64 * success_rate) as u32;
3503
3504        // Adjust based on RTT
3505        if let Some(avg_rtt) = stats.average_rtt {
3506            if avg_rtt < Duration::from_millis(50) {
3507                priority += 20;
3508            } else if avg_rtt > Duration::from_millis(500) {
3509                priority = priority.saturating_sub(20);
3510            }
3511        }
3512
3513        // Penalize consecutive failures
3514        priority = priority.saturating_sub(stats.consecutive_failures * 10);
3515
3516        priority.max(1) // Ensure minimum priority
3517    }
3518
3519    /// Calculate performance score for ranking
3520    fn calculate_performance_score(
3521        &self,
3522        node_id: BootstrapNodeId,
3523        _node_info: &BootstrapNodeInfo,
3524    ) -> f64 {
3525        let stats = self.health_stats.get(&node_id).unwrap();
3526
3527        let mut score = 0.0;
3528
3529        // Success rate component (40% of score)
3530        let success_rate = if stats.connection_attempts > 0 {
3531            stats.successful_connections as f64 / stats.connection_attempts as f64
3532        } else {
3533            1.0
3534        };
3535        score += success_rate * 0.4;
3536
3537        // RTT component (30% of score)
3538        if let Some(avg_rtt) = stats.average_rtt {
3539            let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
3540            score += rtt_score * 0.3;
3541        } else {
3542            score += 0.3; // No RTT data, assume good
3543        }
3544
3545        // Coordination success rate (20% of score)
3546        let coord_success_rate = if stats.coordination_requests > 0 {
3547            stats.successful_coordinations as f64 / stats.coordination_requests as f64
3548        } else {
3549            1.0
3550        };
3551        score += coord_success_rate * 0.2;
3552
3553        // Stability component (10% of score)
3554        let stability_score = if stats.consecutive_failures == 0 {
3555            1.0
3556        } else {
3557            1.0 / (stats.consecutive_failures as f64 + 1.0)
3558        };
3559        score += stability_score * 0.1;
3560
3561        score
3562    }
3563
3564    /// Compare health status for sorting
3565    fn compare_health_status(
3566        &self,
3567        a: BootstrapHealthStatus,
3568        b: BootstrapHealthStatus,
3569    ) -> std::cmp::Ordering {
3570        use std::cmp::Ordering;
3571
3572        match (a, b) {
3573            (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
3574            (BootstrapHealthStatus::Healthy, _) => Ordering::Less, // Healthy comes first
3575            (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
3576            (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
3577            (BootstrapHealthStatus::Unknown, _) => Ordering::Less, // Unknown comes before degraded/unhealthy
3578            (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
3579            (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
3580            (BootstrapHealthStatus::Degraded, _) => Ordering::Less, // Degraded comes before unhealthy
3581            (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
3582            (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
3583        }
3584    }
3585
3586    /// Update overall performance metrics
3587    fn update_performance_metrics(&mut self, now: Instant) {
3588        let mut total_attempts = 0;
3589        let mut total_successes = 0;
3590        let mut total_rtt = Duration::ZERO;
3591        let mut rtt_count = 0;
3592
3593        for stats in self.health_stats.values() {
3594            total_attempts += stats.connection_attempts;
3595            total_successes += stats.successful_connections;
3596
3597            if let Some(avg_rtt) = stats.average_rtt {
3598                total_rtt += avg_rtt;
3599                rtt_count += 1;
3600            }
3601        }
3602
3603        self.performance_tracker.overall_success_rate = if total_attempts > 0 {
3604            total_successes as f64 / total_attempts as f64
3605        } else {
3606            1.0
3607        };
3608
3609        self.performance_tracker.average_response_time = if rtt_count > 0 {
3610            total_rtt / rtt_count
3611        } else {
3612            Duration::from_millis(100) // Default
3613        };
3614
3615        // Update best performers
3616        self.performance_tracker.best_performers = self.get_best_performers(5);
3617
3618        // Record performance snapshot
3619        let snapshot = PerformanceSnapshot {
3620            timestamp: now,
3621            active_nodes: self.get_active_bootstrap_nodes().len() as u32,
3622            success_rate: self.performance_tracker.overall_success_rate,
3623            average_rtt: self.performance_tracker.average_response_time,
3624        };
3625
3626        self.performance_tracker
3627            .performance_history
3628            .push_back(snapshot);
3629        if self.performance_tracker.performance_history.len() > 100 {
3630            self.performance_tracker.performance_history.pop_front();
3631        }
3632    }
3633
3634    /// Schedule next health check
3635    fn schedule_health_check(&mut self, _now: Instant) {
3636        // In a complete implementation, this would schedule an async task
3637        // For now, health checks are performed on-demand via polling
3638    }
3639
3640    /// Get performance statistics
3641    pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
3642        &self.performance_tracker
3643    }
3644
3645    /// Get health statistics for a specific node
3646    pub(crate) fn get_node_health_stats(
3647        &self,
3648        node_id: BootstrapNodeId,
3649    ) -> Option<&BootstrapHealthStats> {
3650        self.health_stats.get(&node_id)
3651    }
3652}
3653
3654/// Discovery result cache
3655#[derive(Debug)]
3656pub(crate) struct DiscoveryCache {
3657    config: DiscoveryConfig,
3658}
3659
3660impl DiscoveryCache {
3661    pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3662        Self {
3663            config: config.clone(),
3664        }
3665    }
3666}
3667
3668/// Create platform-specific network interface discovery
3669pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
3670    #[cfg(target_os = "windows")]
3671    return Box::new(WindowsInterfaceDiscovery::new());
3672
3673    #[cfg(target_os = "linux")]
3674    return Box::new(LinuxInterfaceDiscovery::new());
3675
3676    #[cfg(all(target_os = "macos", feature = "network-discovery"))]
3677    return Box::new(MacOSInterfaceDiscovery::new());
3678
3679    #[cfg(all(target_os = "macos", not(feature = "network-discovery")))]
3680    return Box::new(GenericInterfaceDiscovery::new());
3681
3682    #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
3683    return Box::new(GenericInterfaceDiscovery::new());
3684}
3685
3686// Platform-specific implementations
3687
3688// Windows implementation is in windows.rs module
3689
3690// Linux implementation is in linux.rs module
3691
3692// macOS implementation is in macos.rs module
3693
3694// Generic fallback implementation
3695pub(crate) struct GenericInterfaceDiscovery {
3696    scan_complete: bool,
3697}
3698
3699impl GenericInterfaceDiscovery {
3700    pub(crate) fn new() -> Self {
3701        Self {
3702            scan_complete: false,
3703        }
3704    }
3705}
3706
3707impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
3708    fn start_scan(&mut self) -> Result<(), String> {
3709        // Generic implementation using standard library
3710        self.scan_complete = true;
3711        Ok(())
3712    }
3713
3714    fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
3715        if self.scan_complete {
3716            self.scan_complete = false;
3717            Some(vec![NetworkInterface {
3718                name: "generic".to_string(),
3719                addresses: vec!["127.0.0.1:0".parse().unwrap()],
3720                is_up: true,
3721                is_wireless: false,
3722                mtu: Some(1500),
3723            }])
3724        } else {
3725            None
3726        }
3727    }
3728}
3729
3730impl std::fmt::Display for DiscoveryError {
3731    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3732        match self {
3733            Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
3734            Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
3735            Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
3736            Self::InsufficientCandidates { found, required } => {
3737                write!(f, "insufficient candidates found: {found} < {required}")
3738            }
3739            Self::NetworkError(msg) => write!(f, "network error: {msg}"),
3740            Self::ConfigurationError(msg) => write!(f, "configuration error: {msg}"),
3741            Self::InternalError(msg) => write!(f, "internal error: {msg}"),
3742        }
3743    }
3744}
3745
3746impl std::error::Error for DiscoveryError {}
3747
3748/// Public utility functions for testing IPv6 and dual-stack functionality
3749pub mod test_utils {
3750    use super::*;
3751
3752    /// Test utility to calculate address priority for testing
3753    pub fn calculate_address_priority(address: &IpAddr) -> u32 {
3754        let mut priority = 100; // Base priority
3755        match address {
3756            IpAddr::V4(ipv4) => {
3757                if ipv4.is_private() {
3758                    priority += 50; // Prefer private addresses for local networks
3759                }
3760            }
3761            IpAddr::V6(ipv6) => {
3762                // IPv6 priority based on address type
3763                // Global unicast: 2000::/3 (not link-local, not unique local)
3764                if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3765                    let segments = ipv6.segments();
3766                    if segments[0] & 0xE000 == 0x2000 {
3767                        // Global unicast IPv6 (2000::/3)
3768                        priority += 60;
3769                    } else if segments[0] & 0xFFC0 == 0xFE80 {
3770                        // Link-local IPv6 (fe80::/10)
3771                        priority += 20;
3772                    } else if segments[0] & 0xFE00 == 0xFC00 {
3773                        // Unique local IPv6 (fc00::/7)
3774                        priority += 40;
3775                    } else {
3776                        // Other IPv6 addresses
3777                        priority += 30;
3778                    }
3779                }
3780
3781                // Prefer IPv6 for better NAT traversal potential
3782                priority += 10; // Small boost for IPv6 overall
3783            }
3784        }
3785        priority
3786    }
3787
3788    /// Test utility to validate local addresses
3789    pub fn is_valid_address(address: &IpAddr) -> bool {
3790        match address {
3791            IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3792            IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3793        }
3794    }
3795}
3796
3797#[cfg(test)]
3798mod tests {
3799    use super::*;
3800
3801    fn create_test_manager() -> CandidateDiscoveryManager {
3802        let config = DiscoveryConfig {
3803            total_timeout: Duration::from_secs(30),
3804            local_scan_timeout: Duration::from_secs(5),
3805            bootstrap_query_timeout: Duration::from_secs(10),
3806            max_query_retries: 3,
3807            max_candidates: 50,
3808            enable_symmetric_prediction: true,
3809            min_bootstrap_consensus: 2,
3810            interface_cache_ttl: Duration::from_secs(300),
3811            server_reflexive_cache_ttl: Duration::from_secs(600),
3812            bound_address: None,
3813        };
3814        CandidateDiscoveryManager::new(config)
3815    }
3816
3817    #[test]
3818    fn test_accept_quic_discovered_addresses() {
3819        let mut manager = create_test_manager();
3820        let peer_id = PeerId([1; 32]);
3821
3822        // Create a discovery session
3823        manager.start_discovery(peer_id, vec![]).unwrap();
3824
3825        // Test accepting QUIC-discovered addresses
3826        let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3827        let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3828
3829        assert!(result.is_ok());
3830
3831        // Verify the address was added to the session
3832        if let Some(session) = manager.active_sessions.get(&peer_id) {
3833            let found = session.discovered_candidates.iter().any(|c| {
3834                c.address == discovered_addr
3835                    && matches!(c.source, DiscoverySourceType::ServerReflexive)
3836            });
3837            assert!(found, "QUIC-discovered address should be in candidates");
3838        }
3839    }
3840
3841    #[test]
3842    fn test_accept_quic_discovered_addresses_no_session() {
3843        let mut manager = create_test_manager();
3844        let peer_id = PeerId([1; 32]);
3845        let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3846
3847        // Try to add address without an active session
3848        let result = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3849
3850        assert!(result.is_err());
3851        match result {
3852            Err(DiscoveryError::InternalError(msg)) => {
3853                assert!(msg.contains("No active discovery session"));
3854            }
3855            _ => panic!("Expected InternalError for missing session"),
3856        }
3857    }
3858
3859    #[test]
3860    fn test_accept_quic_discovered_addresses_deduplication() {
3861        let mut manager = create_test_manager();
3862        let peer_id = PeerId([1; 32]);
3863
3864        // Create a discovery session
3865        manager.start_discovery(peer_id, vec![]).unwrap();
3866
3867        // Add the same address twice
3868        let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3869        let result1 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3870        let result2 = manager.accept_quic_discovered_address(peer_id, discovered_addr);
3871
3872        assert!(result1.is_ok());
3873        assert!(result2.is_ok()); // Should succeed but not duplicate
3874
3875        // Verify no duplicates
3876        if let Some(session) = manager.active_sessions.get(&peer_id) {
3877            let count = session
3878                .discovered_candidates
3879                .iter()
3880                .filter(|c| c.address == discovered_addr)
3881                .count();
3882            assert_eq!(count, 1, "Should not have duplicate addresses");
3883        }
3884    }
3885
3886    #[test]
3887    fn test_accept_quic_discovered_addresses_priority() {
3888        let mut manager = create_test_manager();
3889        let peer_id = PeerId([1; 32]);
3890
3891        // Create a discovery session
3892        manager.start_discovery(peer_id, vec![]).unwrap();
3893
3894        // Add different types of addresses
3895        let public_addr = "8.8.8.8:5000".parse().unwrap();
3896        let private_addr = "192.168.1.100:5000".parse().unwrap();
3897        let ipv6_addr = "[2001:db8::1]:5000".parse().unwrap();
3898
3899        manager
3900            .accept_quic_discovered_address(peer_id, public_addr)
3901            .unwrap();
3902        manager
3903            .accept_quic_discovered_address(peer_id, private_addr)
3904            .unwrap();
3905        manager
3906            .accept_quic_discovered_address(peer_id, ipv6_addr)
3907            .unwrap();
3908
3909        // Verify priorities are assigned correctly
3910        if let Some(session) = manager.active_sessions.get(&peer_id) {
3911            for candidate in &session.discovered_candidates {
3912                assert!(
3913                    candidate.priority > 0,
3914                    "All candidates should have non-zero priority"
3915                );
3916
3917                // Verify IPv6 gets a boost
3918                if candidate.address == ipv6_addr {
3919                    let ipv4_priority = session
3920                        .discovered_candidates
3921                        .iter()
3922                        .find(|c| c.address == public_addr)
3923                        .map(|c| c.priority)
3924                        .unwrap();
3925
3926                    // IPv6 should have higher or equal priority (due to boost)
3927                    assert!(candidate.priority >= ipv4_priority);
3928                }
3929            }
3930        }
3931    }
3932
3933    #[test]
3934    fn test_accept_quic_discovered_addresses_event_generation() {
3935        let mut manager = create_test_manager();
3936        let peer_id = PeerId([1; 32]);
3937
3938        // Create a discovery session
3939        manager.start_discovery(peer_id, vec![]).unwrap();
3940
3941        // Add address and check for events
3942        let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3943        manager
3944            .accept_quic_discovered_address(peer_id, discovered_addr)
3945            .unwrap();
3946
3947        // Poll for events
3948        let events = manager.poll_discovery_progress(peer_id);
3949
3950        // Should have a ServerReflexiveCandidateDiscovered event
3951        let has_event = events.iter().any(|e| {
3952            matches!(e,
3953                DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
3954                if candidate.address == discovered_addr
3955            )
3956        });
3957
3958        assert!(
3959            has_event,
3960            "Should generate discovery event for QUIC-discovered address"
3961        );
3962    }
3963
3964    #[test]
3965    fn test_discovery_completes_without_server_reflexive_phase() {
3966        let mut manager = create_test_manager();
3967        let peer_id = PeerId([1; 32]);
3968
3969        // Start discovery
3970        manager.start_discovery(peer_id, vec![]).unwrap();
3971
3972        // Add a QUIC-discovered address
3973        let discovered_addr = "192.168.1.100:5000".parse().unwrap();
3974        manager
3975            .accept_quic_discovered_address(peer_id, discovered_addr)
3976            .unwrap();
3977
3978        // Poll discovery to advance state
3979        let status = manager.get_discovery_status(peer_id).unwrap();
3980
3981        // Should not be in ServerReflexiveQuerying phase
3982        match &status.phase {
3983            DiscoveryPhase::ServerReflexiveQuerying { .. } => {
3984                panic!("Should not be in ServerReflexiveQuerying phase when using QUIC discovery");
3985            }
3986            _ => {} // Any other phase is fine
3987        }
3988    }
3989
3990    #[test]
3991    fn test_no_bootstrap_queries_when_using_quic_discovery() {
3992        let mut manager = create_test_manager();
3993        let peer_id = PeerId([1; 32]);
3994
3995        // Start discovery
3996        manager.start_discovery(peer_id, vec![]).unwrap();
3997
3998        // Immediately add QUIC-discovered addresses
3999        let addr1 = "192.168.1.100:5000".parse().unwrap();
4000        let addr2 = "8.8.8.8:5000".parse().unwrap();
4001        manager
4002            .accept_quic_discovered_address(peer_id, addr1)
4003            .unwrap();
4004        manager
4005            .accept_quic_discovered_address(peer_id, addr2)
4006            .unwrap();
4007
4008        // Get status to check phase
4009        let status = manager.get_discovery_status(peer_id).unwrap();
4010
4011        // Verify we have candidates from QUIC discovery
4012        assert!(status.discovered_candidates.len() >= 2);
4013
4014        // Verify no bootstrap queries were made
4015        if let Some(session) = manager.active_sessions.get(&peer_id) {
4016            // Check that we didn't record any bootstrap query statistics
4017            assert_eq!(
4018                session.statistics.bootstrap_queries_sent, 0,
4019                "Should not query bootstrap nodes when using QUIC discovery"
4020            );
4021        }
4022    }
4023
4024    #[test]
4025    fn test_priority_differences_quic_vs_placeholder() {
4026        let mut manager = create_test_manager();
4027        let peer_id = PeerId([1; 32]);
4028
4029        // Start discovery
4030        manager.start_discovery(peer_id, vec![]).unwrap();
4031
4032        // Add QUIC-discovered address
4033        let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4034        manager
4035            .accept_quic_discovered_address(peer_id, discovered_addr)
4036            .unwrap();
4037
4038        // Check the priority assigned
4039        if let Some(session) = manager.active_sessions.get(&peer_id) {
4040            let candidate = session
4041                .discovered_candidates
4042                .iter()
4043                .find(|c| c.address == discovered_addr)
4044                .expect("Should find the discovered address");
4045
4046            // QUIC-discovered addresses should have reasonable priority
4047            assert!(
4048                candidate.priority > 100,
4049                "QUIC-discovered address should have good priority"
4050            );
4051            assert!(candidate.priority < 300, "Priority should be reasonable");
4052
4053            // Verify it's marked as ServerReflexive type (for compatibility)
4054            assert!(matches!(
4055                candidate.source,
4056                DiscoverySourceType::ServerReflexive
4057            ));
4058        }
4059    }
4060
4061    #[test]
4062    fn test_quic_discovered_address_priority_calculation() {
4063        // Test that QUIC-discovered addresses get appropriate priorities based on characteristics
4064        let mut manager = create_test_manager();
4065        let peer_id = PeerId([1; 32]);
4066
4067        // Start discovery
4068        manager.start_discovery(peer_id, vec![]).unwrap();
4069
4070        // Test different types of addresses
4071        let test_cases = vec![
4072            // (address, expected_priority_range, description)
4073            ("1.2.3.4:5678", (250, 260), "Public IPv4"),
4074            ("192.168.1.100:9000", (240, 250), "Private IPv4"),
4075            ("[2001:db8::1]:5678", (260, 280), "Global IPv6"),
4076            ("[fe80::1]:5678", (220, 240), "Link-local IPv6"),
4077            ("[fc00::1]:5678", (240, 260), "Unique local IPv6"),
4078            ("10.0.0.1:9000", (240, 250), "Private IPv4 (10.x)"),
4079            ("172.16.0.1:9000", (240, 250), "Private IPv4 (172.16.x)"),
4080        ];
4081
4082        for (addr_str, (min_priority, max_priority), description) in test_cases {
4083            let addr: SocketAddr = addr_str.parse().unwrap();
4084            manager
4085                .accept_quic_discovered_address(peer_id, addr)
4086                .unwrap();
4087
4088            let session = manager.active_sessions.get(&peer_id).unwrap();
4089            let candidate = session
4090                .discovered_candidates
4091                .iter()
4092                .find(|c| c.address == addr)
4093                .unwrap_or_else(|| panic!("No candidate found for {description}"));
4094
4095            assert!(
4096                candidate.priority >= min_priority && candidate.priority <= max_priority,
4097                "{} priority {} not in range [{}, {}]",
4098                description,
4099                candidate.priority,
4100                min_priority,
4101                max_priority
4102            );
4103        }
4104    }
4105
4106    #[test]
4107    fn test_quic_discovered_priority_factors() {
4108        // Test that various factors affect priority calculation
4109        let manager = create_test_manager();
4110
4111        // Test base priority calculation
4112        let base_priority =
4113            manager.calculate_quic_discovered_priority(&"1.2.3.4:5678".parse().unwrap());
4114        assert_eq!(
4115            base_priority, 255,
4116            "Base priority should be 255 for public IPv4"
4117        );
4118
4119        // Test IPv6 gets higher priority
4120        let ipv6_priority =
4121            manager.calculate_quic_discovered_priority(&"[2001:db8::1]:5678".parse().unwrap());
4122        assert!(
4123            ipv6_priority > base_priority,
4124            "IPv6 should have higher priority than IPv4"
4125        );
4126
4127        // Test private addresses get lower priority
4128        let private_priority =
4129            manager.calculate_quic_discovered_priority(&"192.168.1.1:5678".parse().unwrap());
4130        assert!(
4131            private_priority < base_priority,
4132            "Private addresses should have lower priority"
4133        );
4134
4135        // Test link-local gets even lower priority
4136        let link_local_priority =
4137            manager.calculate_quic_discovered_priority(&"[fe80::1]:5678".parse().unwrap());
4138        assert!(
4139            link_local_priority < private_priority,
4140            "Link-local should have lower priority than private"
4141        );
4142    }
4143
4144    #[test]
4145    fn test_quic_discovered_addresses_override_stale_server_reflexive() {
4146        // Test that QUIC-discovered addresses can replace stale server reflexive candidates
4147        let mut manager = create_test_manager();
4148        let peer_id = PeerId([1; 32]);
4149
4150        // Start discovery
4151        manager.start_discovery(peer_id, vec![]).unwrap();
4152
4153        // Simulate adding an old server reflexive candidate (from placeholder STUN)
4154        let session = manager.active_sessions.get_mut(&peer_id).unwrap();
4155        let old_candidate = DiscoveryCandidate {
4156            address: "1.2.3.4:1234".parse().unwrap(),
4157            priority: 200,
4158            source: DiscoverySourceType::ServerReflexive,
4159            state: CandidateState::Validating,
4160        };
4161        session.discovered_candidates.push(old_candidate);
4162
4163        // Add a QUIC-discovered address for the same IP but different port
4164        let new_addr = "1.2.3.4:5678".parse().unwrap();
4165        manager
4166            .accept_quic_discovered_address(peer_id, new_addr)
4167            .unwrap();
4168
4169        // Check that we have both candidates
4170        let session = manager.active_sessions.get(&peer_id).unwrap();
4171        let candidates: Vec<_> = session
4172            .discovered_candidates
4173            .iter()
4174            .filter(|c| c.source == DiscoverySourceType::ServerReflexive)
4175            .collect();
4176
4177        assert_eq!(
4178            candidates.len(),
4179            2,
4180            "Should have both old and new candidates"
4181        );
4182
4183        // The new candidate should have a different priority
4184        let new_candidate = candidates.iter().find(|c| c.address == new_addr).unwrap();
4185        assert_ne!(
4186            new_candidate.priority, 200,
4187            "New candidate should have recalculated priority"
4188        );
4189    }
4190
4191    #[test]
4192    fn test_quic_discovered_address_generates_events() {
4193        // Test that adding a QUIC-discovered address generates appropriate events
4194        let mut manager = create_test_manager();
4195        let peer_id = PeerId([1; 32]);
4196
4197        // Start discovery
4198        manager.start_discovery(peer_id, vec![]).unwrap();
4199
4200        // Clear any startup events
4201        manager.poll_discovery_progress(peer_id);
4202
4203        // Add a QUIC-discovered address
4204        let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4205        manager
4206            .accept_quic_discovered_address(peer_id, discovered_addr)
4207            .unwrap();
4208
4209        // Poll for events
4210        let events = manager.poll_discovery_progress(peer_id);
4211
4212        // Should have at least one event about the new candidate
4213        assert!(
4214            !events.is_empty(),
4215            "Should generate events for new QUIC-discovered address"
4216        );
4217
4218        // Check for ServerReflexiveCandidateDiscovered event
4219        let has_new_candidate = events.iter().any(|e| {
4220            matches!(e,
4221                DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4222                if candidate.address == discovered_addr
4223            )
4224        });
4225        assert!(
4226            has_new_candidate,
4227            "Should generate ServerReflexiveCandidateDiscovered event for the discovered address"
4228        );
4229    }
4230
4231    #[test]
4232    fn test_multiple_quic_discovered_addresses_generate_events() {
4233        // Test that multiple QUIC-discovered addresses each generate events
4234        let mut manager = create_test_manager();
4235        let peer_id = PeerId([1; 32]);
4236
4237        // Start discovery
4238        manager.start_discovery(peer_id, vec![]).unwrap();
4239
4240        // Clear startup events
4241        manager.poll_discovery_progress(peer_id);
4242
4243        // Add multiple QUIC-discovered addresses
4244        let addresses = vec![
4245            "8.8.8.8:5000".parse().unwrap(),
4246            "1.1.1.1:6000".parse().unwrap(),
4247            "[2001:db8::1]:7000".parse().unwrap(),
4248        ];
4249
4250        for addr in &addresses {
4251            manager
4252                .accept_quic_discovered_address(peer_id, *addr)
4253                .unwrap();
4254        }
4255
4256        // Poll for events
4257        let events = manager.poll_discovery_progress(peer_id);
4258
4259        // Should have events for all addresses
4260        for addr in &addresses {
4261            let has_event = events.iter().any(|e| {
4262                matches!(e,
4263                    DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4264                    if candidate.address == *addr
4265                )
4266            });
4267            assert!(has_event, "Should have event for address {addr}");
4268        }
4269    }
4270
4271    #[test]
4272    fn test_duplicate_quic_discovered_address_no_event() {
4273        // Test that duplicate addresses don't generate duplicate events
4274        let mut manager = create_test_manager();
4275        let peer_id = PeerId([1; 32]);
4276
4277        // Start discovery
4278        manager.start_discovery(peer_id, vec![]).unwrap();
4279
4280        // Add a QUIC-discovered address
4281        let discovered_addr = "8.8.8.8:5000".parse().unwrap();
4282        manager
4283            .accept_quic_discovered_address(peer_id, discovered_addr)
4284            .unwrap();
4285
4286        // Poll and clear events
4287        manager.poll_discovery_progress(peer_id);
4288
4289        // Try to add the same address again
4290        manager
4291            .accept_quic_discovered_address(peer_id, discovered_addr)
4292            .unwrap();
4293
4294        // Poll for events
4295        let events = manager.poll_discovery_progress(peer_id);
4296
4297        // Should not generate any new events for duplicate
4298        let has_duplicate_event = events.iter().any(|e| {
4299            matches!(e,
4300                DiscoveryEvent::ServerReflexiveCandidateDiscovered { candidate, .. }
4301                if candidate.address == discovered_addr
4302            )
4303        });
4304
4305        assert!(
4306            !has_duplicate_event,
4307            "Should not generate event for duplicate address"
4308        );
4309    }
4310
4311    #[test]
4312    fn test_quic_discovered_address_event_timing() {
4313        // Test that events are queued and delivered on poll
4314        let mut manager = create_test_manager();
4315        let peer_id = PeerId([1; 32]);
4316
4317        // Start discovery
4318        manager.start_discovery(peer_id, vec![]).unwrap();
4319
4320        // Clear startup events
4321        manager.poll_discovery_progress(peer_id);
4322
4323        // Add addresses without polling
4324        let addr1 = "8.8.8.8:5000".parse().unwrap();
4325        let addr2 = "1.1.1.1:6000".parse().unwrap();
4326
4327        manager
4328            .accept_quic_discovered_address(peer_id, addr1)
4329            .unwrap();
4330        manager
4331            .accept_quic_discovered_address(peer_id, addr2)
4332            .unwrap();
4333
4334        // Events should be queued
4335        // Now poll for events
4336        let events = manager.poll_discovery_progress(peer_id);
4337
4338        // Should get all queued events
4339        let server_reflexive_count = events
4340            .iter()
4341            .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4342            .count();
4343
4344        assert!(
4345            server_reflexive_count >= 2,
4346            "Should deliver all queued events on poll, got {server_reflexive_count} events"
4347        );
4348
4349        // Subsequent poll should return no new server reflexive events
4350        let events2 = manager.poll_discovery_progress(peer_id);
4351        let server_reflexive_count2 = events2
4352            .iter()
4353            .filter(|e| matches!(e, DiscoveryEvent::ServerReflexiveCandidateDiscovered { .. }))
4354            .count();
4355        assert_eq!(
4356            server_reflexive_count2, 0,
4357            "Server reflexive events should not be duplicated on subsequent polls"
4358        );
4359    }
4360
4361    #[test]
4362    fn test_is_valid_local_address() {
4363        let manager = create_test_manager();
4364
4365        // Valid IPv4 addresses
4366        assert!(manager.is_valid_local_address(&"192.168.1.1:8080".parse().unwrap()));
4367        assert!(manager.is_valid_local_address(&"10.0.0.1:8080".parse().unwrap()));
4368        assert!(manager.is_valid_local_address(&"172.16.0.1:8080".parse().unwrap()));
4369
4370        // Valid IPv6 addresses
4371        assert!(manager.is_valid_local_address(&"[2001:4860:4860::8888]:8080".parse().unwrap()));
4372        assert!(manager.is_valid_local_address(&"[fe80::1]:8080".parse().unwrap())); // Link-local is valid for local
4373        assert!(manager.is_valid_local_address(&"[fc00::1]:8080".parse().unwrap())); // Unique local is valid for local
4374
4375        // Invalid addresses
4376        assert!(!manager.is_valid_local_address(&"0.0.0.0:8080".parse().unwrap()));
4377        assert!(!manager.is_valid_local_address(&"255.255.255.255:8080".parse().unwrap()));
4378        assert!(!manager.is_valid_local_address(&"224.0.0.1:8080".parse().unwrap())); // Multicast
4379        assert!(!manager.is_valid_local_address(&"0.0.0.1:8080".parse().unwrap())); // Reserved
4380        assert!(!manager.is_valid_local_address(&"240.0.0.1:8080".parse().unwrap())); // Reserved
4381        assert!(!manager.is_valid_local_address(&"[::]:8080".parse().unwrap())); // Unspecified
4382        assert!(!manager.is_valid_local_address(&"[ff02::1]:8080".parse().unwrap())); // Multicast
4383        assert!(!manager.is_valid_local_address(&"[2001:db8::1]:8080".parse().unwrap())); // Documentation
4384
4385        // Port 0 should fail
4386        assert!(!manager.is_valid_local_address(&"192.168.1.1:0".parse().unwrap()));
4387
4388        // Test mode allows loopback
4389        #[cfg(test)]
4390        {
4391            assert!(manager.is_valid_local_address(&"127.0.0.1:8080".parse().unwrap()));
4392            assert!(manager.is_valid_local_address(&"[::1]:8080".parse().unwrap()));
4393        }
4394    }
4395
4396    #[test]
4397    fn test_is_valid_server_reflexive_address() {
4398        let manager = create_test_manager();
4399
4400        // Valid public IPv4 addresses
4401        assert!(manager.is_valid_server_reflexive_address(&"8.8.8.8:8080".parse().unwrap()));
4402        assert!(manager.is_valid_server_reflexive_address(&"1.1.1.1:53".parse().unwrap()));
4403        assert!(manager.is_valid_server_reflexive_address(&"35.235.1.100:443".parse().unwrap()));
4404
4405        // Valid global unicast IPv6
4406        assert!(
4407            manager
4408                .is_valid_server_reflexive_address(&"[2001:4860:4860::8888]:8080".parse().unwrap())
4409        );
4410        assert!(manager.is_valid_server_reflexive_address(
4411            &"[2400:cb00:2048::c629:d7a2]:443".parse().unwrap()
4412        ));
4413
4414        // Invalid - private addresses
4415        assert!(!manager.is_valid_server_reflexive_address(&"192.168.1.1:8080".parse().unwrap()));
4416        assert!(!manager.is_valid_server_reflexive_address(&"10.0.0.1:8080".parse().unwrap()));
4417        assert!(!manager.is_valid_server_reflexive_address(&"172.16.0.1:8080".parse().unwrap()));
4418
4419        // Invalid - special addresses
4420        assert!(!manager.is_valid_server_reflexive_address(&"127.0.0.1:8080".parse().unwrap()));
4421        assert!(!manager.is_valid_server_reflexive_address(&"169.254.1.1:8080".parse().unwrap())); // Link-local
4422        assert!(!manager.is_valid_server_reflexive_address(&"0.0.0.0:8080".parse().unwrap()));
4423        assert!(
4424            !manager.is_valid_server_reflexive_address(&"255.255.255.255:8080".parse().unwrap())
4425        );
4426        assert!(!manager.is_valid_server_reflexive_address(&"224.0.0.1:8080".parse().unwrap()));
4427
4428        // Invalid - IPv6 non-global
4429        assert!(!manager.is_valid_server_reflexive_address(&"[::1]:8080".parse().unwrap()));
4430        assert!(!manager.is_valid_server_reflexive_address(&"[fe80::1]:8080".parse().unwrap())); // Link-local
4431        assert!(!manager.is_valid_server_reflexive_address(&"[fc00::1]:8080".parse().unwrap())); // Unique local
4432        assert!(!manager.is_valid_server_reflexive_address(&"[ff02::1]:8080".parse().unwrap())); // Multicast
4433
4434        // Port 0 should fail
4435        assert!(!manager.is_valid_server_reflexive_address(&"8.8.8.8:0".parse().unwrap()));
4436    }
4437
4438    // Note: is_valid_port is a private method, so we test it indirectly through the validation flow
4439
4440    #[test]
4441    fn test_validation_rejects_invalid_addresses() {
4442        let manager = create_test_manager();
4443
4444        // Test the validation methods directly
4445
4446        // Invalid server reflexive addresses that should be rejected
4447        let invalid_server_reflexive = vec![
4448            "0.0.0.0:8080",         // Unspecified
4449            "255.255.255.255:8080", // Broadcast
4450            "224.0.0.1:8080",       // Multicast
4451            "192.168.1.1:0",        // Port 0
4452            "127.0.0.1:8080",       // Loopback (invalid for server reflexive)
4453            "10.0.0.1:8080",        // Private (invalid for server reflexive)
4454            "[::]:8080",            // IPv6 unspecified
4455            "[fe80::1]:8080",       // IPv6 link-local (invalid for server reflexive)
4456        ];
4457
4458        for addr_str in invalid_server_reflexive {
4459            let addr: SocketAddr = addr_str.parse().unwrap();
4460            assert!(
4461                !manager.is_valid_server_reflexive_address(&addr),
4462                "Address {} should be invalid for server reflexive",
4463                addr_str
4464            );
4465        }
4466
4467        // Valid server reflexive addresses
4468        let valid_server_reflexive = vec![
4469            "8.8.8.8:8080",
4470            "1.1.1.1:53",
4471            "35.235.1.100:443",
4472            "[2001:4860:4860::8888]:8080",
4473        ];
4474
4475        for addr_str in valid_server_reflexive {
4476            let addr: SocketAddr = addr_str.parse().unwrap();
4477            assert!(
4478                manager.is_valid_server_reflexive_address(&addr),
4479                "Address {} should be valid for server reflexive",
4480                addr_str
4481            );
4482        }
4483    }
4484
4485    #[test]
4486    fn test_candidate_validation_error_types() {
4487        use crate::nat_traversal_api::{CandidateAddress, CandidateValidationError};
4488
4489        // Test specific error types
4490        assert!(matches!(
4491            CandidateAddress::validate_address(&"192.168.1.1:0".parse().unwrap()),
4492            Err(CandidateValidationError::InvalidPort(0))
4493        ));
4494
4495        assert!(matches!(
4496            CandidateAddress::validate_address(&"0.0.0.0:8080".parse().unwrap()),
4497            Err(CandidateValidationError::UnspecifiedAddress)
4498        ));
4499
4500        assert!(matches!(
4501            CandidateAddress::validate_address(&"255.255.255.255:8080".parse().unwrap()),
4502            Err(CandidateValidationError::BroadcastAddress)
4503        ));
4504
4505        assert!(matches!(
4506            CandidateAddress::validate_address(&"224.0.0.1:8080".parse().unwrap()),
4507            Err(CandidateValidationError::MulticastAddress)
4508        ));
4509
4510        assert!(matches!(
4511            CandidateAddress::validate_address(&"240.0.0.1:8080".parse().unwrap()),
4512            Err(CandidateValidationError::ReservedAddress)
4513        ));
4514
4515        assert!(matches!(
4516            CandidateAddress::validate_address(&"[2001:db8::1]:8080".parse().unwrap()),
4517            Err(CandidateValidationError::DocumentationAddress)
4518        ));
4519
4520        assert!(matches!(
4521            CandidateAddress::validate_address(&"[::ffff:192.168.1.1]:8080".parse().unwrap()),
4522            Err(CandidateValidationError::IPv4MappedAddress)
4523        ));
4524    }
4525}