1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 time::{Duration, Instant},
13};
14
15use tracing::{debug, info, warn};
16
17#[cfg(feature = "production-ready")]
18use crate::Connection;
19
20use crate::{
21 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
22 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
23};
24
25#[cfg(target_os = "windows")]
27pub mod windows;
28
29#[cfg(target_os = "windows")]
30pub use windows::WindowsInterfaceDiscovery;
31
32#[cfg(target_os = "linux")]
33pub mod linux;
34
35#[cfg(target_os = "linux")]
36pub use linux::LinuxInterfaceDiscovery;
37
38#[cfg(target_os = "macos")]
39pub(crate) mod macos;
40
41#[cfg(target_os = "macos")]
42pub(crate) use macos::MacOSInterfaceDiscovery;
43
44fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
46 match discovery_source {
47 DiscoverySourceType::Local => CandidateSource::Local,
48 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
49 DiscoverySourceType::Predicted => CandidateSource::Predicted,
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum DiscoverySourceType {
56 Local,
57 ServerReflexive,
58 Predicted,
59}
60
61#[derive(Debug, Clone)]
63pub(crate) struct DiscoveryCandidate {
64 pub address: SocketAddr,
65 pub priority: u32,
66 pub source: DiscoverySourceType,
67 pub state: CandidateState,
68}
69
70impl DiscoveryCandidate {
71 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
73 CandidateAddress {
74 address: self.address,
75 priority: self.priority,
76 source: convert_to_nat_source(self.source),
77 state: self.state,
78 }
79 }
80}
81
82pub struct CandidateDiscoveryManager {
84 current_phase: DiscoveryPhase,
86 config: DiscoveryConfig,
88 interface_discovery: Box<dyn NetworkInterfaceDiscovery + Send>,
90 server_reflexive_discovery: ServerReflexiveDiscovery,
92 symmetric_predictor: SymmetricNatPredictor,
94 bootstrap_manager: BootstrapNodeManager,
96 cache: DiscoveryCache,
98 session_state: DiscoverySessionState,
100}
101
102#[derive(Debug, Clone)]
104pub struct DiscoveryConfig {
105 pub total_timeout: Duration,
107 pub local_scan_timeout: Duration,
109 pub bootstrap_query_timeout: Duration,
111 pub max_query_retries: u32,
113 pub max_candidates: usize,
115 pub enable_symmetric_prediction: bool,
117 pub min_bootstrap_consensus: usize,
119 pub interface_cache_ttl: Duration,
121 pub server_reflexive_cache_ttl: Duration,
123}
124
125#[derive(Debug, Clone, PartialEq)]
127pub enum DiscoveryPhase {
128 Idle,
130 LocalInterfaceScanning {
132 started_at: Instant,
133 },
134 ServerReflexiveQuerying {
136 started_at: Instant,
137 active_queries: HashMap<BootstrapNodeId, QueryState>,
138 responses_received: Vec<ServerReflexiveResponse>,
139 },
140 SymmetricNatPrediction {
142 started_at: Instant,
143 prediction_attempts: u32,
144 pattern_analysis: PatternAnalysisState,
145 },
146 CandidateValidation {
148 started_at: Instant,
149 validation_results: HashMap<CandidateId, ValidationResult>,
150 },
151 Completed {
153 final_candidates: Vec<ValidatedCandidate>,
154 completion_time: Instant,
155 },
156 Failed {
158 error: DiscoveryError,
159 failed_at: Instant,
160 fallback_options: Vec<FallbackStrategy>,
161 },
162}
163
164#[derive(Debug, Clone)]
166pub enum DiscoveryEvent {
167 DiscoveryStarted {
169 peer_id: PeerId,
170 bootstrap_count: usize,
171 },
172 LocalScanningStarted,
174 LocalCandidateDiscovered {
176 candidate: CandidateAddress,
177 },
178 LocalScanningCompleted {
180 candidate_count: usize,
181 duration: Duration,
182 },
183 ServerReflexiveDiscoveryStarted {
185 bootstrap_count: usize,
186 },
187 ServerReflexiveCandidateDiscovered {
189 candidate: CandidateAddress,
190 bootstrap_node: SocketAddr,
191 },
192 BootstrapQueryFailed {
194 bootstrap_node: SocketAddr,
195 error: String,
196 },
197 SymmetricPredictionStarted {
199 base_address: SocketAddr,
200 },
201 PredictedCandidateGenerated {
203 candidate: CandidateAddress,
204 confidence: f64,
205 },
206 PortAllocationDetected {
208 port: u16,
209 source_address: SocketAddr,
210 bootstrap_node: BootstrapNodeId,
211 timestamp: Instant,
212 },
213 DiscoveryCompleted {
215 candidate_count: usize,
216 total_duration: Duration,
217 success_rate: f64,
218 },
219 DiscoveryFailed {
221 error: DiscoveryError,
222 partial_results: Vec<CandidateAddress>,
223 },
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
228pub struct BootstrapNodeId(pub u64);
229
230#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum QueryState {
233 Pending {
235 sent_at: Instant,
236 attempts: u32,
237 },
238 Completed,
240 Failed,
242}
243
244#[derive(Debug, Clone, PartialEq)]
246pub struct ServerReflexiveResponse {
247 pub bootstrap_node: BootstrapNodeId,
248 pub observed_address: SocketAddr,
249 pub response_time: Duration,
250 pub timestamp: Instant,
251}
252
253#[derive(Debug, Clone, PartialEq)]
255pub struct PatternAnalysisState {
256 pub allocation_history: VecDeque<PortAllocationEvent>,
257 pub detected_pattern: Option<PortAllocationPattern>,
258 pub confidence_level: f64,
259 pub prediction_accuracy: f64,
260}
261
262#[derive(Debug, Clone, PartialEq)]
264pub struct PortAllocationEvent {
265 pub port: u16,
266 pub timestamp: Instant,
267 pub source_address: SocketAddr,
268}
269
270#[derive(Debug, Clone, PartialEq)]
272pub struct PortAllocationPattern {
273 pub pattern_type: AllocationPatternType,
274 pub base_port: u16,
275 pub stride: u16,
276 pub pool_boundaries: Option<(u16, u16)>,
277 pub confidence: f64,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
282pub enum AllocationPatternType {
283 Sequential,
285 FixedStride,
287 Random,
289 PoolBased,
291 TimeBased,
293 Unknown,
295}
296
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
299pub struct CandidateId(pub u64);
300
301#[derive(Debug, Clone, PartialEq)]
303pub enum ValidationResult {
304 Valid { rtt: Duration },
305 Invalid { reason: String },
306 Timeout,
307 Pending,
308}
309
310#[derive(Debug, Clone, PartialEq)]
312pub struct ValidatedCandidate {
313 pub id: CandidateId,
314 pub address: SocketAddr,
315 pub source: DiscoverySourceType,
316 pub priority: u32,
317 pub rtt: Option<Duration>,
318 pub reliability_score: f64,
319}
320
321impl ValidatedCandidate {
322 pub fn to_candidate_address(&self) -> CandidateAddress {
324 CandidateAddress {
325 address: self.address,
326 priority: self.priority,
327 source: convert_to_nat_source(self.source),
328 state: CandidateState::Valid,
329 }
330 }
331}
332
333#[derive(Debug)]
335pub(crate) struct DiscoverySessionState {
336 pub peer_id: PeerId,
337 pub session_id: u64,
338 pub started_at: Instant,
339 pub discovered_candidates: Vec<DiscoveryCandidate>,
340 pub statistics: DiscoveryStatistics,
341 pub allocation_history: VecDeque<PortAllocationEvent>,
342}
343
344#[derive(Debug, Default, Clone)]
346pub struct DiscoveryStatistics {
347 pub local_candidates_found: u32,
348 pub server_reflexive_candidates_found: u32,
349 pub predicted_candidates_generated: u32,
350 pub bootstrap_queries_sent: u32,
351 pub bootstrap_queries_successful: u32,
352 pub total_discovery_time: Option<Duration>,
353 pub average_bootstrap_rtt: Option<Duration>,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq)]
358pub enum DiscoveryError {
359 NoLocalInterfaces,
361 AllBootstrapsFailed,
363 DiscoveryTimeout,
365 InsufficientCandidates { found: usize, required: usize },
367 NetworkError(String),
369 ConfigurationError(String),
371 InternalError(String),
373}
374
375#[derive(Debug, Clone, PartialEq, Eq)]
377pub enum FallbackStrategy {
378 UseCachedResults,
380 RetryWithRelaxedParams,
382 UseMinimalCandidates,
384 EnableRelayFallback,
386}
387
388impl Default for DiscoveryConfig {
389 fn default() -> Self {
390 Self {
391 total_timeout: Duration::from_secs(30),
392 local_scan_timeout: Duration::from_secs(2),
393 bootstrap_query_timeout: Duration::from_secs(5),
394 max_query_retries: 3,
395 max_candidates: 8,
396 enable_symmetric_prediction: true,
397 min_bootstrap_consensus: 2,
398 interface_cache_ttl: Duration::from_secs(60),
399 server_reflexive_cache_ttl: Duration::from_secs(300),
400 }
401 }
402}
403
404impl CandidateDiscoveryManager {
405 pub fn new(config: DiscoveryConfig, _role: NatTraversalRole) -> Self {
407 let interface_discovery = create_platform_interface_discovery();
408 let server_reflexive_discovery = ServerReflexiveDiscovery::new(&config);
409 let symmetric_predictor = SymmetricNatPredictor::new(&config);
410 let bootstrap_manager = BootstrapNodeManager::new(&config);
411 let cache = DiscoveryCache::new(&config);
412
413 Self {
414 current_phase: DiscoveryPhase::Idle,
415 config,
416 interface_discovery,
417 server_reflexive_discovery,
418 symmetric_predictor,
419 bootstrap_manager,
420 cache,
421 session_state: DiscoverySessionState {
422 peer_id: PeerId([0; 32]), session_id: 0,
424 started_at: Instant::now(),
425 discovered_candidates: Vec::new(),
426 statistics: DiscoveryStatistics::default(),
427 allocation_history: VecDeque::new(),
428 },
429 }
430 }
431
432 pub fn start_discovery(&mut self, peer_id: PeerId, bootstrap_nodes: Vec<BootstrapNode>) -> Result<(), DiscoveryError> {
434 if !matches!(self.current_phase, DiscoveryPhase::Idle | DiscoveryPhase::Failed { .. } | DiscoveryPhase::Completed { .. }) {
435 return Err(DiscoveryError::InternalError("Discovery already in progress".to_string()));
436 }
437
438 info!("Starting candidate discovery for peer {:?}", peer_id);
439
440 self.session_state.peer_id = peer_id;
442 self.session_state.session_id = rand::random();
443 self.session_state.started_at = Instant::now();
444 self.session_state.discovered_candidates.clear();
445 self.session_state.statistics = DiscoveryStatistics::default();
446
447 self.bootstrap_manager.update_bootstrap_nodes(bootstrap_nodes);
449
450 self.current_phase = DiscoveryPhase::LocalInterfaceScanning {
452 started_at: Instant::now(),
453 };
454
455 Ok(())
456 }
457
458 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
460 let mut events = Vec::new();
461
462 if self.session_state.started_at.elapsed() > self.config.total_timeout {
464 self.handle_discovery_timeout(&mut events, now);
465 return events;
466 }
467
468 match &self.current_phase.clone() {
469 DiscoveryPhase::Idle => {
470 },
472
473 DiscoveryPhase::LocalInterfaceScanning { started_at } => {
474 self.poll_local_interface_scanning(*started_at, now, &mut events);
475 },
476
477 DiscoveryPhase::ServerReflexiveQuerying { started_at, active_queries, responses_received } => {
478 self.poll_server_reflexive_discovery(*started_at, active_queries, responses_received, now, &mut events);
479 },
480
481 DiscoveryPhase::SymmetricNatPrediction { started_at, prediction_attempts, pattern_analysis } => {
482 self.poll_symmetric_prediction(*started_at, *prediction_attempts, pattern_analysis, now, &mut events);
483 },
484
485 DiscoveryPhase::CandidateValidation { started_at, validation_results } => {
486 self.poll_candidate_validation(*started_at, validation_results, now, &mut events);
487 },
488
489 DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. } => {
490 },
492 }
493
494 events
495 }
496
497 pub fn get_status(&self) -> DiscoveryStatus {
499 DiscoveryStatus {
500 phase: self.current_phase.clone(),
501 discovered_candidates: self.session_state.discovered_candidates.iter()
502 .map(|c| c.to_candidate_address())
503 .collect(),
504 statistics: self.session_state.statistics.clone(),
505 elapsed_time: self.session_state.started_at.elapsed(),
506 }
507 }
508
509 pub fn is_complete(&self) -> bool {
511 matches!(self.current_phase, DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. })
512 }
513
514 pub fn get_results(&self) -> Option<DiscoveryResults> {
516 match &self.current_phase {
517 DiscoveryPhase::Completed { final_candidates, completion_time } => {
518 Some(DiscoveryResults {
519 candidates: final_candidates.clone(),
520 completion_time: *completion_time,
521 statistics: self.session_state.statistics.clone(),
522 })
523 },
524 DiscoveryPhase::Failed { .. } => {
525 Some(DiscoveryResults {
526 candidates: Vec::new(),
527 completion_time: Instant::now(),
528 statistics: self.session_state.statistics.clone(),
529 })
530 },
531 _ => None,
532 }
533 }
534
535 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
537 if self.session_state.peer_id == peer_id {
539 self.session_state.discovered_candidates.iter()
541 .map(|c| c.to_candidate_address())
542 .collect()
543 } else {
544 debug!("No candidates found for peer {:?} (current session is for {:?})",
547 peer_id, self.session_state.peer_id);
548 Vec::new()
549 }
550 }
551
552 fn poll_local_interface_scanning(&mut self, started_at: Instant, now: Instant, events: &mut Vec<DiscoveryEvent>) {
555 if started_at.elapsed() > self.config.local_scan_timeout {
557 warn!("Local interface scanning timeout");
558 self.handle_local_scan_timeout(events, now);
559 return;
560 }
561
562 if let Some(interfaces) = self.interface_discovery.check_scan_complete() {
564 self.process_local_interfaces(interfaces, events, now);
565 }
566 }
567
568 fn process_local_interfaces(&mut self, interfaces: Vec<NetworkInterface>, events: &mut Vec<DiscoveryEvent>, now: Instant) {
569 debug!("Processing {} network interfaces", interfaces.len());
570
571 for interface in interfaces {
572 for address in &interface.addresses {
573 if self.is_valid_local_address(&address) {
574 let candidate = DiscoveryCandidate {
575 address: *address,
576 priority: self.calculate_local_priority(address, &interface),
577 source: DiscoverySourceType::Local,
578 state: CandidateState::New,
579 };
580
581 self.session_state.discovered_candidates.push(candidate.clone());
582 self.session_state.statistics.local_candidates_found += 1;
583
584 events.push(DiscoveryEvent::LocalCandidateDiscovered {
585 candidate: candidate.to_candidate_address()
586 });
587 }
588 }
589 }
590
591 events.push(DiscoveryEvent::LocalScanningCompleted {
592 candidate_count: self.session_state.statistics.local_candidates_found as usize,
593 duration: now.duration_since(self.session_state.started_at),
594 });
595
596 self.start_server_reflexive_discovery(events, now);
598 }
599
600 fn start_server_reflexive_discovery(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
601 let bootstrap_node_ids = self.bootstrap_manager.get_active_bootstrap_nodes();
602
603 if bootstrap_node_ids.is_empty() {
604 warn!("No bootstrap nodes available for server reflexive discovery");
605 self.handle_no_bootstrap_nodes(events, now);
606 return;
607 }
608
609 let bootstrap_nodes_with_addresses: Vec<(BootstrapNodeId, SocketAddr)> = bootstrap_node_ids
611 .iter()
612 .filter_map(|&node_id| {
613 self.bootstrap_manager.get_bootstrap_address(node_id)
614 .map(|addr| (node_id, addr))
615 })
616 .collect();
617
618 if bootstrap_nodes_with_addresses.is_empty() {
619 warn!("No bootstrap node addresses available for server reflexive discovery");
620 self.handle_no_bootstrap_nodes(events, now);
621 return;
622 }
623
624 let active_queries = self.server_reflexive_discovery.start_queries_with_addresses(&bootstrap_nodes_with_addresses, now);
626
627 events.push(DiscoveryEvent::ServerReflexiveDiscoveryStarted {
628 bootstrap_count: bootstrap_nodes_with_addresses.len(),
629 });
630
631 self.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
632 started_at: now,
633 active_queries,
634 responses_received: Vec::new(),
635 };
636 }
637
638 fn poll_server_reflexive_discovery(
639 &mut self,
640 started_at: Instant,
641 active_queries: &HashMap<BootstrapNodeId, QueryState>,
642 responses_received: &Vec<ServerReflexiveResponse>,
643 now: Instant,
644 events: &mut Vec<DiscoveryEvent>
645 ) {
646 let new_responses = self.server_reflexive_discovery.poll_queries(active_queries, now);
648
649 let mut updated_responses = responses_received.clone();
650 for response in new_responses {
651 self.process_server_reflexive_response(&response, events);
652 updated_responses.push(response);
653 }
654
655 if self.should_transition_to_prediction(&updated_responses, now) {
657 self.start_symmetric_prediction(&updated_responses, events, now);
658 } else if started_at.elapsed() > self.config.bootstrap_query_timeout * 2 {
659 if updated_responses.len() >= self.config.min_bootstrap_consensus {
661 self.start_symmetric_prediction(&updated_responses, events, now);
662 } else {
663 self.handle_insufficient_bootstrap_responses(events, now);
664 }
665 } else {
666 self.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
668 started_at,
669 active_queries: active_queries.clone(),
670 responses_received: updated_responses,
671 };
672 }
673 }
674
675 fn process_server_reflexive_response(&mut self, response: &ServerReflexiveResponse, events: &mut Vec<DiscoveryEvent>) {
676 debug!("Received server reflexive response: {:?}", response);
677
678 let allocation_event = PortAllocationEvent {
680 port: response.observed_address.port(),
681 timestamp: response.timestamp,
682 source_address: response.observed_address,
683 };
684
685 if let DiscoveryPhase::ServerReflexiveQuerying { .. } = &mut self.current_phase {
687 self.session_state.allocation_history.push_back(allocation_event.clone());
690
691 if self.session_state.allocation_history.len() > 20 {
693 self.session_state.allocation_history.pop_front();
694 }
695 }
696
697 let candidate = DiscoveryCandidate {
698 address: response.observed_address,
699 priority: self.calculate_server_reflexive_priority(response),
700 source: DiscoverySourceType::ServerReflexive,
701 state: CandidateState::New,
702 };
703
704 self.session_state.discovered_candidates.push(candidate.clone());
705 self.session_state.statistics.server_reflexive_candidates_found += 1;
706
707 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
708 candidate: candidate.to_candidate_address(),
709 bootstrap_node: self.bootstrap_manager.get_bootstrap_address(response.bootstrap_node).unwrap_or_else(|| "unknown".parse().unwrap()),
710 });
711
712 events.push(DiscoveryEvent::PortAllocationDetected {
713 port: allocation_event.port,
714 source_address: allocation_event.source_address,
715 bootstrap_node: response.bootstrap_node,
716 timestamp: allocation_event.timestamp,
717 });
718 }
719
720 fn start_symmetric_prediction(&mut self, responses: &[ServerReflexiveResponse], events: &mut Vec<DiscoveryEvent>, now: Instant) {
721 if !self.config.enable_symmetric_prediction || responses.is_empty() {
722 self.start_candidate_validation(events, now);
723 return;
724 }
725
726 let base_address = self.calculate_consensus_address(responses);
728
729 events.push(DiscoveryEvent::SymmetricPredictionStarted { base_address });
730
731 let detected_pattern = self.symmetric_predictor.analyze_allocation_patterns(&self.session_state.allocation_history);
733
734 let confidence_level = detected_pattern.as_ref().map(|p| p.confidence).unwrap_or(0.0);
735
736 let prediction_accuracy = if let Some(ref pattern) = detected_pattern {
738 self.calculate_prediction_accuracy(pattern, &self.session_state.allocation_history)
739 } else {
740 0.3 };
742
743 debug!("Symmetric NAT pattern analysis: detected_pattern={:?}, confidence={:.2}, accuracy={:.2}",
744 detected_pattern, confidence_level, prediction_accuracy);
745
746 self.current_phase = DiscoveryPhase::SymmetricNatPrediction {
747 started_at: now,
748 prediction_attempts: 0,
749 pattern_analysis: PatternAnalysisState {
750 allocation_history: self.session_state.allocation_history.clone(),
751 detected_pattern,
752 confidence_level,
753 prediction_accuracy,
754 },
755 };
756 }
757
758 fn poll_symmetric_prediction(
759 &mut self,
760 _started_at: Instant,
761 _prediction_attempts: u32,
762 pattern_analysis: &PatternAnalysisState,
763 now: Instant,
764 events: &mut Vec<DiscoveryEvent>
765 ) {
766 let predicted_candidates = self.symmetric_predictor.generate_predictions(pattern_analysis, self.config.max_candidates - self.session_state.discovered_candidates.len());
768
769 for candidate in predicted_candidates {
770 self.session_state.discovered_candidates.push(candidate.clone());
771 self.session_state.statistics.predicted_candidates_generated += 1;
772
773 events.push(DiscoveryEvent::PredictedCandidateGenerated {
774 candidate: candidate.to_candidate_address(),
775 confidence: pattern_analysis.confidence_level,
776 });
777 }
778
779 self.start_candidate_validation(events, now);
781 }
782
783 fn start_candidate_validation(&mut self, _events: &mut Vec<DiscoveryEvent>, now: Instant) {
784 debug!("Starting candidate validation for {} candidates", self.session_state.discovered_candidates.len());
785
786 self.current_phase = DiscoveryPhase::CandidateValidation {
787 started_at: now,
788 validation_results: HashMap::new(),
789 };
790 }
791
792 fn poll_candidate_validation(
793 &mut self,
794 started_at: Instant,
795 validation_results: &HashMap<CandidateId, ValidationResult>,
796 now: Instant,
797 events: &mut Vec<DiscoveryEvent>
798 ) {
799 if started_at.elapsed() > Duration::from_secs(10) {
801 self.complete_validation_with_results(validation_results, events, now);
803 return;
804 }
805
806 let mut updated_results = validation_results.clone();
808 let mut validation_started = false;
809
810 let candidates_to_validate: Vec<(CandidateId, SocketAddr)> = self.session_state
812 .discovered_candidates
813 .iter()
814 .enumerate()
815 .filter_map(|(i, candidate)| {
816 let candidate_id = CandidateId(i as u64);
817 if !updated_results.contains_key(&candidate_id) {
818 Some((candidate_id, candidate.address))
819 } else {
820 None
821 }
822 })
823 .collect();
824
825 for (candidate_id, address) in candidates_to_validate {
827 updated_results.insert(candidate_id, ValidationResult::Pending);
828 validation_started = true;
829
830 debug!("Starting validation for candidate {}: {}", candidate_id.0, address);
831
832 #[cfg(feature = "production-ready")]
833 {
834 self.start_path_validation(candidate_id, address, now);
836 }
837
838 #[cfg(not(feature = "production-ready"))]
839 {
840 self.simulate_path_validation(candidate_id, address, now);
842 }
843 }
844
845 let completed_validations = self.check_validation_completions(&updated_results, now);
847 for (candidate_id, result) in completed_validations {
848 updated_results.insert(candidate_id, result);
849 }
850
851 let all_complete = updated_results.values().all(|result| {
853 !matches!(result, ValidationResult::Pending)
854 });
855
856 if all_complete || validation_started {
857 self.current_phase = DiscoveryPhase::CandidateValidation {
859 started_at,
860 validation_results: updated_results.clone(),
861 };
862 }
863
864 if all_complete {
865 self.complete_validation_with_results(&updated_results, events, now);
866 }
867 }
868
869 #[cfg(feature = "production-ready")]
871 fn start_path_validation(&mut self, candidate_id: CandidateId, candidate_address: SocketAddr, _now: Instant) {
872 debug!("Starting QUIC path validation for candidate {} at {}", candidate_id.0, candidate_address);
873
874 self.simulate_path_validation(candidate_id, candidate_address, _now);
883 }
884
885 fn simulate_path_validation(&mut self, candidate_id: CandidateId, candidate_address: SocketAddr, _now: Instant) {
887 let is_local = candidate_address.ip().is_loopback() ||
889 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("192.168.")) ||
890 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("10.")) ||
891 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("172."));
892
893 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
894
895 debug!("Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
898 candidate_id.0, candidate_address, is_local, is_server_reflexive);
899 }
900
901 fn check_validation_completions(&self, current_results: &HashMap<CandidateId, ValidationResult>, _now: Instant) -> Vec<(CandidateId, ValidationResult)> {
903 let mut completions = Vec::new();
904
905 for (candidate_id, result) in current_results {
906 if matches!(result, ValidationResult::Pending) {
907 if let Some(candidate) = self.session_state.discovered_candidates.get(candidate_id.0 as usize) {
909 let validation_result = self.simulate_validation_result(&candidate.address);
910 completions.push((*candidate_id, validation_result));
911 }
912 }
913 }
914
915 completions
916 }
917
918 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
920 let is_local = address.ip().is_loopback() ||
921 (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168.")) ||
922 (address.ip().is_ipv4() && address.ip().to_string().starts_with("10.")) ||
923 (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
924
925 if is_local {
926 ValidationResult::Valid { rtt: Duration::from_millis(1) }
928 } else if address.ip().is_unspecified() {
929 ValidationResult::Invalid { reason: "Unspecified address".to_string() }
931 } else {
932 ValidationResult::Valid { rtt: Duration::from_millis(50 + (address.port() % 100) as u64) }
934 }
935 }
936
937 fn complete_validation_with_results(
939 &mut self,
940 validation_results: &HashMap<CandidateId, ValidationResult>,
941 events: &mut Vec<DiscoveryEvent>,
942 now: Instant
943 ) {
944 let validated_candidates: Vec<ValidatedCandidate> = self.session_state.discovered_candidates
945 .iter()
946 .enumerate()
947 .filter_map(|(i, candidate)| {
948 let candidate_id = CandidateId(i as u64);
949 validation_results.get(&candidate_id).and_then(|result| {
950 match result {
951 ValidationResult::Valid { rtt } => {
952 Some(ValidatedCandidate {
953 id: candidate_id,
954 address: candidate.address,
955 source: candidate.source,
956 priority: candidate.priority,
957 rtt: Some(*rtt),
958 reliability_score: self.calculate_reliability_score(candidate, *rtt),
959 })
960 }
961 ValidationResult::Invalid { reason } => {
962 debug!("Candidate {} at {} failed validation: {}",
963 candidate_id.0, candidate.address, reason);
964 None
965 }
966 ValidationResult::Timeout => {
967 debug!("Candidate {} at {} validation timed out",
968 candidate_id.0, candidate.address);
969 None
970 }
971 ValidationResult::Pending => {
972 debug!("Candidate {} at {} validation still pending, treating as timeout",
974 candidate_id.0, candidate.address);
975 None
976 }
977 }
978 })
979 })
980 .collect();
981
982 debug!("Validation completed: {} valid candidates out of {} total",
983 validated_candidates.len(), self.session_state.discovered_candidates.len());
984
985 self.complete_discovery(validated_candidates, events, now);
986 }
987
988 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
990 let mut score: f64 = 0.5; match candidate.source {
994 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
998
999 let rtt_ms = rtt.as_millis() as f64;
1001 if rtt_ms < 10.0 {
1002 score += 0.2;
1003 } else if rtt_ms < 50.0 {
1004 score += 0.1;
1005 } else if rtt_ms > 200.0 {
1006 score -= 0.1;
1007 }
1008
1009 if candidate.address.ip().is_ipv6() {
1011 score += 0.05; }
1013
1014 score.max(0.0).min(1.0)
1016 }
1017
1018 fn complete_discovery(&mut self, candidates: Vec<ValidatedCandidate>, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1019 let total_duration = now.duration_since(self.session_state.started_at);
1020 self.session_state.statistics.total_discovery_time = Some(total_duration);
1021
1022 let success_rate = if self.session_state.statistics.bootstrap_queries_sent > 0 {
1023 self.session_state.statistics.bootstrap_queries_successful as f64 / self.session_state.statistics.bootstrap_queries_sent as f64
1024 } else {
1025 1.0
1026 };
1027
1028 events.push(DiscoveryEvent::DiscoveryCompleted {
1029 candidate_count: candidates.len(),
1030 total_duration,
1031 success_rate,
1032 });
1033
1034 self.current_phase = DiscoveryPhase::Completed {
1035 final_candidates: candidates,
1036 completion_time: now,
1037 };
1038
1039 info!("Candidate discovery completed successfully in {:?}", total_duration);
1040 }
1041
1042 fn handle_discovery_timeout(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1045 let error = DiscoveryError::DiscoveryTimeout;
1046 events.push(DiscoveryEvent::DiscoveryFailed {
1047 error: error.clone(),
1048 partial_results: self.session_state.discovered_candidates.iter()
1049 .map(|c| c.to_candidate_address())
1050 .collect(),
1051 });
1052
1053 self.current_phase = DiscoveryPhase::Failed {
1054 error,
1055 failed_at: now,
1056 fallback_options: vec![FallbackStrategy::UseCachedResults, FallbackStrategy::UseMinimalCandidates],
1057 };
1058 }
1059
1060 fn handle_local_scan_timeout(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1061 warn!("Local interface scan timeout, proceeding with available candidates");
1062
1063 events.push(DiscoveryEvent::LocalScanningCompleted {
1064 candidate_count: self.session_state.statistics.local_candidates_found as usize,
1065 duration: now.duration_since(self.session_state.started_at),
1066 });
1067
1068 self.start_server_reflexive_discovery(events, now);
1069 }
1070
1071 fn handle_no_bootstrap_nodes(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1072 let error = DiscoveryError::AllBootstrapsFailed;
1073 events.push(DiscoveryEvent::DiscoveryFailed {
1074 error: error.clone(),
1075 partial_results: self.session_state.discovered_candidates.iter()
1076 .map(|c| c.to_candidate_address())
1077 .collect(),
1078 });
1079
1080 self.current_phase = DiscoveryPhase::Failed {
1081 error,
1082 failed_at: now,
1083 fallback_options: vec![FallbackStrategy::UseMinimalCandidates],
1084 };
1085 }
1086
1087 fn handle_insufficient_bootstrap_responses(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1088 warn!("Insufficient bootstrap responses, proceeding with available data");
1089 self.start_candidate_validation(events, now);
1090 }
1091
1092 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1093 match address.ip() {
1094 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
1095 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
1096 }
1097 }
1098
1099 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1100 let mut priority = 100; match address.ip() {
1103 IpAddr::V4(ipv4) => {
1104 if ipv4.is_private() {
1105 priority += 50; }
1107 },
1108 IpAddr::V6(ipv6) => {
1109 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1112 let segments = ipv6.segments();
1113 if segments[0] & 0xE000 == 0x2000 {
1114 priority += 60;
1116 } else if segments[0] & 0xFFC0 == 0xFE80 {
1117 priority += 20;
1119 } else if segments[0] & 0xFE00 == 0xFC00 {
1120 priority += 40;
1122 } else {
1123 priority += 30;
1125 }
1126 }
1127
1128 priority += 10; },
1131 }
1132
1133 if interface.is_wireless {
1134 priority -= 10; }
1136
1137 priority
1138 }
1139
1140 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1141 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1145 priority += 20;
1146 } else if response.response_time > Duration::from_millis(200) {
1147 priority -= 10;
1148 }
1149
1150 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 { 20 } else { 0 };
1152 priority += age_bonus;
1153
1154 priority
1155 }
1156
1157 fn should_transition_to_prediction(&self, responses: &[ServerReflexiveResponse], _now: Instant) -> bool {
1158 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1159 }
1160
1161 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1162 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1164
1165 for response in responses {
1166 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1167 }
1168
1169 address_counts
1170 .into_iter()
1171 .max_by_key(|(_, count)| *count)
1172 .map(|(addr, _)| addr)
1173 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
1174 }
1175
1176 fn calculate_prediction_accuracy(&self, pattern: &PortAllocationPattern, history: &VecDeque<PortAllocationEvent>) -> f64 {
1178 if history.len() < 3 {
1179 return 0.3; }
1181
1182 let recent_ports: Vec<u16> = history.iter()
1184 .rev()
1185 .take(10)
1186 .map(|event| event.port)
1187 .collect();
1188
1189 let mut correct_predictions = 0;
1190 let total_predictions = recent_ports.len().saturating_sub(1);
1191
1192 if total_predictions == 0 {
1193 return 0.3;
1194 }
1195
1196 match pattern.pattern_type {
1197 AllocationPatternType::Sequential => {
1198 for i in 1..recent_ports.len() {
1200 if recent_ports[i-1].wrapping_sub(recent_ports[i]) == 1 {
1201 correct_predictions += 1;
1202 }
1203 }
1204 }
1205 AllocationPatternType::FixedStride => {
1206 for i in 1..recent_ports.len() {
1208 if recent_ports[i-1].wrapping_sub(recent_ports[i]) == pattern.stride {
1209 correct_predictions += 1;
1210 }
1211 }
1212 }
1213 AllocationPatternType::PoolBased => {
1214 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1216 for port in &recent_ports {
1217 if *port >= min_port && *port <= max_port {
1218 correct_predictions += 1;
1219 }
1220 }
1221 }
1222 }
1223 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1224 if recent_ports.len() >= 3 {
1226 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>() / recent_ports.len() as f64;
1227 let variance = recent_ports.iter()
1228 .map(|&p| (p as f64 - mean).powi(2))
1229 .sum::<f64>() / recent_ports.len() as f64;
1230
1231 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1235 }
1236 AllocationPatternType::TimeBased => {
1237 if history.len() >= 2 {
1239 let time_diffs: Vec<Duration> = history.iter()
1240 .collect::<Vec<_>>()
1241 .windows(2)
1242 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1243 .collect();
1244
1245 if !time_diffs.is_empty() {
1246 let avg_diff = time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1247 let variance = time_diffs.iter()
1248 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1249 .sum::<f64>() / time_diffs.len() as f64;
1250
1251 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1255 }
1256 }
1257 }
1258
1259 let accuracy = if total_predictions > 0 {
1261 correct_predictions as f64 / total_predictions as f64
1262 } else {
1263 0.3
1264 };
1265
1266 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1268
1269 confidence_adjusted_accuracy.max(0.2).min(0.9)
1271 }
1272}
1273
1274#[derive(Debug, Clone)]
1276pub struct DiscoveryStatus {
1277 pub phase: DiscoveryPhase,
1278 pub discovered_candidates: Vec<CandidateAddress>,
1279 pub statistics: DiscoveryStatistics,
1280 pub elapsed_time: Duration,
1281}
1282
1283#[derive(Debug, Clone)]
1285pub struct DiscoveryResults {
1286 pub candidates: Vec<ValidatedCandidate>,
1287 pub completion_time: Instant,
1288 pub statistics: DiscoveryStatistics,
1289}
1290
1291pub trait NetworkInterfaceDiscovery {
1295 fn start_scan(&mut self) -> Result<(), String>;
1296 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1297}
1298
1299#[derive(Debug, Clone, PartialEq)]
1301pub struct NetworkInterface {
1302 pub name: String,
1303 pub addresses: Vec<SocketAddr>,
1304 pub is_up: bool,
1305 pub is_wireless: bool,
1306 pub mtu: Option<u16>,
1307}
1308
1309#[cfg(feature = "production-ready")]
1311#[derive(Debug)]
1312struct BootstrapConnection {
1313 connection: crate::Connection,
1315 address: SocketAddr,
1317 established_at: Instant,
1319 request_id: u64,
1321}
1322
1323#[derive(Debug, Clone)]
1325struct AddressObservationRequest {
1326 request_id: u64,
1328 timestamp: u64,
1330 capabilities: u32,
1332}
1333
1334#[derive(Debug)]
1336pub(crate) struct ServerReflexiveDiscovery {
1337 config: DiscoveryConfig,
1338 active_queries: HashMap<BootstrapNodeId, QueryState>,
1340 responses: VecDeque<ServerReflexiveResponse>,
1342 query_timeouts: HashMap<BootstrapNodeId, Instant>,
1344 #[cfg(feature = "production-ready")]
1346 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
1347 #[cfg(feature = "production-ready")]
1349 runtime_handle: Option<tokio::runtime::Handle>,
1350}
1351
1352impl ServerReflexiveDiscovery {
1353 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1354 Self {
1355 config: config.clone(),
1356 active_queries: HashMap::new(),
1357 responses: VecDeque::new(),
1358 query_timeouts: HashMap::new(),
1359 #[cfg(feature = "production-ready")]
1360 active_connections: HashMap::new(),
1361 #[cfg(feature = "production-ready")]
1362 runtime_handle: tokio::runtime::Handle::try_current().ok(),
1363 }
1364 }
1365
1366 pub(crate) fn start_queries(&mut self, bootstrap_nodes: &[BootstrapNodeId], now: Instant) -> HashMap<BootstrapNodeId, QueryState> {
1367 debug!("Starting server reflexive queries to {} bootstrap nodes", bootstrap_nodes.len());
1368
1369 self.active_queries.clear();
1370 self.query_timeouts.clear();
1371
1372 #[cfg(feature = "production-ready")]
1373 self.active_connections.clear();
1374
1375 for &node_id in bootstrap_nodes {
1376 let query_state = QueryState::Pending {
1377 sent_at: now,
1378 attempts: 1,
1379 };
1380
1381 self.active_queries.insert(node_id, query_state);
1382 self.query_timeouts.insert(node_id, now + self.config.bootstrap_query_timeout);
1383
1384 debug!("Starting server reflexive query to bootstrap node {:?}", node_id);
1385
1386 #[cfg(feature = "production-ready")]
1387 {
1388 if let Some(runtime) = &self.runtime_handle {
1390 self.start_quinn_query(node_id, runtime.clone(), now);
1391 } else {
1392 warn!("No async runtime available, falling back to simulation for node {:?}", node_id);
1393 self.simulate_bootstrap_response(node_id, now);
1394 }
1395 }
1396
1397 #[cfg(not(feature = "production-ready"))]
1398 {
1399 self.simulate_bootstrap_response(node_id, now);
1401 }
1402 }
1403
1404 self.active_queries.clone()
1405 }
1406
1407 pub(crate) fn start_queries_with_addresses(
1409 &mut self,
1410 bootstrap_nodes: &[(BootstrapNodeId, SocketAddr)],
1411 now: Instant
1412 ) -> HashMap<BootstrapNodeId, QueryState> {
1413 debug!("Starting server reflexive queries to {} bootstrap nodes with addresses", bootstrap_nodes.len());
1414
1415 self.active_queries.clear();
1416 self.query_timeouts.clear();
1417
1418 #[cfg(feature = "production-ready")]
1419 self.active_connections.clear();
1420
1421 for &(node_id, bootstrap_address) in bootstrap_nodes {
1422 let query_state = QueryState::Pending {
1423 sent_at: now,
1424 attempts: 1,
1425 };
1426
1427 self.active_queries.insert(node_id, query_state);
1428 self.query_timeouts.insert(node_id, now + self.config.bootstrap_query_timeout);
1429
1430 debug!("Starting server reflexive query to bootstrap node {:?} at {}", node_id, bootstrap_address);
1431
1432 #[cfg(feature = "production-ready")]
1433 {
1434 if let Some(_runtime) = &self.runtime_handle {
1436 self.start_quinn_query_with_address(node_id, bootstrap_address, now);
1437 } else {
1438 warn!("No async runtime available, falling back to simulation for node {:?}", node_id);
1439 self.simulate_bootstrap_response(node_id, now);
1440 }
1441 }
1442
1443 #[cfg(not(feature = "production-ready"))]
1444 {
1445 self.simulate_bootstrap_response(node_id, now);
1447 }
1448 }
1449
1450 self.active_queries.clone()
1451 }
1452
1453 #[cfg(feature = "production-ready")]
1455 fn start_quinn_query(&mut self, node_id: BootstrapNodeId, _runtime: tokio::runtime::Handle, now: Instant) {
1456 let request_id = rand::random::<u64>();
1462
1463 debug!("Starting Quinn connection to bootstrap node {:?} with request ID {}", node_id, request_id);
1464
1465 self.simulate_bootstrap_response(node_id, now);
1475 }
1476
1477 #[cfg(feature = "production-ready")]
1479 pub(crate) fn start_quinn_query_with_address(
1480 &mut self,
1481 node_id: BootstrapNodeId,
1482 bootstrap_address: SocketAddr,
1483 now: Instant
1484 ) {
1485
1486 let request_id = rand::random::<u64>();
1487
1488 info!("Establishing Quinn connection to bootstrap node {:?} at {}", node_id, bootstrap_address);
1489
1490 if let Some(runtime) = &self.runtime_handle {
1492 let timeout = self.config.bootstrap_query_timeout;
1493
1494 let (response_tx, _response_rx) = tokio::sync::mpsc::unbounded_channel();
1496
1497 runtime.spawn(async move {
1502 match Self::perform_bootstrap_query(bootstrap_address, request_id, timeout).await {
1503 Ok(observed_address) => {
1504 let response = ServerReflexiveResponse {
1505 bootstrap_node: node_id,
1506 observed_address,
1507 response_time: now.elapsed(),
1508 timestamp: Instant::now(),
1509 };
1510
1511 let _ = response_tx.send(response);
1513
1514 info!("Successfully received observed address {} from bootstrap node {:?}",
1515 observed_address, node_id);
1516 }
1517 Err(e) => {
1518 warn!("Failed to query bootstrap node {:?} at {}: {}", node_id, bootstrap_address, e);
1519 }
1520 }
1521 });
1522 } else {
1523 warn!("No async runtime available for Quinn query to {:?}", node_id);
1524 self.simulate_bootstrap_response(node_id, now);
1525 }
1526 }
1527
1528 #[cfg(feature = "production-ready")]
1533 async fn perform_bootstrap_query(
1534 _bootstrap_address: SocketAddr,
1535 _request_id: u64,
1536 _timeout: Duration,
1537 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1538 Err("Bootstrap query not implemented for low-level API".into())
1540
1541 }
1601
1602 #[cfg(feature = "production-ready")]
1604 fn create_discovery_request(request_id: u64) -> Vec<u8> {
1605 let mut request = Vec::new();
1606
1607 request.extend_from_slice(&request_id.to_be_bytes());
1612 request.extend_from_slice(&std::time::SystemTime::now()
1613 .duration_since(std::time::UNIX_EPOCH)
1614 .unwrap_or_default()
1615 .as_millis().to_be_bytes()[8..16]); request.extend_from_slice(&1u32.to_be_bytes()); debug!("Created discovery request: {} bytes, request_id: {}", request.len(), request_id);
1619 request
1620 }
1621
1622 #[cfg(feature = "production-ready")]
1624 async fn wait_for_add_address_frame(
1625 _connection: &Connection,
1626 _expected_request_id: u64,
1627 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1628 Err("wait_for_add_address_frame not implemented for low-level API".into())
1631
1632 }
1668
1669 #[cfg(feature = "production-ready")]
1671 fn create_response_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
1672 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1675 tx
1677 }
1678
1679 pub(crate) fn poll_queries(&mut self, _active_queries: &HashMap<BootstrapNodeId, QueryState>, now: Instant) -> Vec<ServerReflexiveResponse> {
1680 let mut responses = Vec::new();
1681
1682 while let Some(response) = self.responses.pop_front() {
1684 responses.push(response);
1685 }
1686
1687 let mut timed_out_nodes = Vec::new();
1689 for (&node_id, &timeout) in &self.query_timeouts {
1690 if now >= timeout {
1691 timed_out_nodes.push(node_id);
1692 }
1693 }
1694
1695 for node_id in timed_out_nodes {
1697 self.query_timeouts.remove(&node_id);
1698
1699 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
1700 match query_state {
1701 QueryState::Pending { attempts, .. } if *attempts < self.config.max_query_retries => {
1702 *attempts += 1;
1704 let new_timeout = now + self.config.bootstrap_query_timeout;
1705 self.query_timeouts.insert(node_id, new_timeout);
1706
1707 debug!("Retrying server reflexive query to bootstrap node {:?} (attempt {})", node_id, attempts);
1708
1709 self.simulate_bootstrap_response(node_id, now);
1711 }
1712 _ => {
1713 self.active_queries.insert(node_id, QueryState::Failed);
1715 warn!("Server reflexive query to bootstrap node {:?} failed after retries", node_id);
1716 }
1717 }
1718 }
1719 }
1720
1721 responses
1722 }
1723
1724 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
1727 let simulated_external_addr = match node_id.0 % 3 {
1729 0 => "203.0.113.1:45678".parse().unwrap(),
1730 1 => "198.51.100.2:45679".parse().unwrap(),
1731 _ => "192.0.2.3:45680".parse().unwrap(),
1732 };
1733
1734 let response = ServerReflexiveResponse {
1735 bootstrap_node: node_id,
1736 observed_address: simulated_external_addr,
1737 response_time: Duration::from_millis(50 + node_id.0 * 10),
1738 timestamp: now,
1739 };
1740
1741 self.responses.push_back(response);
1742
1743 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
1745 *query_state = QueryState::Completed;
1746 }
1747
1748 debug!("Received simulated server reflexive response from bootstrap node {:?}: {}",
1749 node_id, simulated_external_addr);
1750 }
1751}
1752
1753#[derive(Debug)]
1755pub(crate) struct SymmetricNatPredictor {
1756 config: DiscoveryConfig,
1757}
1758
1759impl SymmetricNatPredictor {
1760 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1761 Self {
1762 config: config.clone(),
1763 }
1764 }
1765
1766 pub(crate) fn generate_predictions(&mut self, pattern_analysis: &PatternAnalysisState, max_count: usize) -> Vec<DiscoveryCandidate> {
1771 let mut predictions = Vec::new();
1772
1773 if pattern_analysis.allocation_history.is_empty() || max_count == 0 {
1774 return predictions;
1775 }
1776
1777 let recent_events: Vec<_> = pattern_analysis.allocation_history
1779 .iter()
1780 .rev()
1781 .take(5) .collect();
1783
1784 if recent_events.len() < 2 {
1785 return predictions;
1786 }
1787
1788 match &pattern_analysis.detected_pattern {
1789 Some(pattern) => {
1790 predictions.extend(self.generate_pattern_based_predictions(pattern, max_count));
1791 }
1792 None => {
1793 predictions.extend(self.generate_heuristic_predictions(&recent_events, max_count));
1794 }
1795 }
1796
1797 predictions.truncate(max_count);
1799 predictions
1800 }
1801
1802 fn generate_pattern_based_predictions(&self, pattern: &PortAllocationPattern, max_count: usize) -> Vec<DiscoveryCandidate> {
1804 let mut predictions = Vec::new();
1805
1806 match pattern.pattern_type {
1807 AllocationPatternType::Sequential => {
1808 for i in 1..=max_count as u16 {
1810 let predicted_port = pattern.base_port.wrapping_add(i);
1811 if self.is_valid_port(predicted_port) {
1812 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence));
1813 }
1814 }
1815 }
1816 AllocationPatternType::FixedStride => {
1817 for i in 1..=max_count as u16 {
1819 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
1820 if self.is_valid_port(predicted_port) {
1821 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence));
1822 }
1823 }
1824 }
1825 AllocationPatternType::PoolBased => {
1826 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1828 let pool_size = max_port - min_port + 1;
1829 let step = (pool_size / max_count as u16).max(1);
1830
1831 for i in 0..max_count as u16 {
1832 let predicted_port = min_port + (i * step);
1833 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
1834 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence * 0.8));
1835 }
1836 }
1837 }
1838 }
1839 AllocationPatternType::TimeBased => {
1840 for i in 1..=max_count as u16 {
1843 let predicted_port = pattern.base_port.wrapping_add(i);
1844 if self.is_valid_port(predicted_port) {
1845 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence * 0.6));
1846 }
1847 }
1848 }
1849 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1850 predictions.extend(self.generate_statistical_predictions(pattern.base_port, max_count));
1852 }
1853 }
1854
1855 predictions
1856 }
1857
1858 fn generate_heuristic_predictions(&self, recent_events: &[&PortAllocationEvent], max_count: usize) -> Vec<DiscoveryCandidate> {
1860 let mut predictions = Vec::new();
1861
1862 if let Some(latest_event) = recent_events.first() {
1863 let base_port = latest_event.port;
1864
1865 for i in 1..=(max_count / 3) as u16 {
1869 let predicted_port = base_port.wrapping_add(i);
1870 if self.is_valid_port(predicted_port) {
1871 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
1872 }
1873 }
1874
1875 if base_port % 2 == 0 {
1877 let predicted_port = base_port + 1;
1878 if self.is_valid_port(predicted_port) {
1879 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
1880 }
1881 }
1882
1883 for stride in [2, 4, 8, 16] {
1885 if predictions.len() >= max_count {
1886 break;
1887 }
1888 let predicted_port = base_port.wrapping_add(stride);
1889 if self.is_valid_port(predicted_port) {
1890 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
1891 }
1892 }
1893
1894 if recent_events.len() >= 2 {
1896 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
1897 if stride > 0 && stride <= 100 { for i in 1..=3 {
1899 if predictions.len() >= max_count {
1900 break;
1901 }
1902 let predicted_port = base_port.wrapping_add(stride * i);
1903 if self.is_valid_port(predicted_port) {
1904 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
1905 }
1906 }
1907 }
1908 }
1909 }
1910
1911 predictions.truncate(max_count);
1912 predictions
1913 }
1914
1915 fn generate_statistical_predictions(&self, base_port: u16, max_count: usize) -> Vec<DiscoveryCandidate> {
1917 let mut predictions = Vec::new();
1918
1919 let common_ranges = [
1921 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
1926
1927 let current_range = common_ranges.iter()
1929 .find(|(min, max)| base_port >= *min && base_port <= *max)
1930 .copied()
1931 .unwrap_or((1024, 65535));
1932
1933 let range_size = current_range.1 - current_range.0;
1935 let step = (range_size / max_count as u16).max(1);
1936
1937 for i in 0..max_count {
1938 let offset = (i as u16 * step) % range_size;
1939 let predicted_port = current_range.0 + offset;
1940
1941 if self.is_valid_port(predicted_port) && predicted_port != base_port {
1942 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
1943 }
1944 }
1945
1946 predictions
1947 }
1948
1949 fn is_valid_port(&self, port: u16) -> bool {
1951 port >= 1024 && port <= 65535 && port != 0
1953 }
1954
1955 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
1957 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
1961
1962 DiscoveryCandidate {
1963 address: SocketAddr::new(
1964 "0.0.0.0".parse().unwrap(), port
1966 ),
1967 priority,
1968 source: DiscoverySourceType::Predicted,
1969 state: CandidateState::New,
1970 }
1971 }
1972
1973 pub(crate) fn analyze_allocation_patterns(&self, history: &VecDeque<PortAllocationEvent>) -> Option<PortAllocationPattern> {
1975 if history.len() < 3 {
1976 return None;
1977 }
1978
1979 let recent_ports: Vec<u16> = history.iter()
1980 .rev()
1981 .take(10)
1982 .map(|event| event.port)
1983 .collect();
1984
1985 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
1987 return Some(pattern);
1988 }
1989
1990 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
1992 return Some(pattern);
1993 }
1994
1995 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
1997 return Some(pattern);
1998 }
1999
2000 if let Some(pattern) = self.detect_time_based_pattern(history) {
2002 return Some(pattern);
2003 }
2004
2005 None
2006 }
2007
2008 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2010 if ports.len() < 3 {
2011 return None;
2012 }
2013
2014 let mut sequential_count = 0;
2015 let mut total_comparisons = 0;
2016
2017 for i in 1..ports.len() {
2018 total_comparisons += 1;
2019 let diff = ports[i-1].wrapping_sub(ports[i]);
2020 if diff == 1 {
2021 sequential_count += 1;
2022 }
2023 }
2024
2025 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2026
2027 if sequential_ratio >= 0.6 { let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2031 pattern_type: AllocationPatternType::Sequential,
2032 base_port: ports[0],
2033 stride: 1,
2034 pool_boundaries: None,
2035 confidence,
2036 })
2037 } else {
2038 None
2039 }
2040 }
2041
2042 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2044 if ports.len() < 4 {
2045 return None;
2046 }
2047
2048 let mut diffs = Vec::new();
2050 for i in 1..ports.len() {
2051 let diff = ports[i-1].wrapping_sub(ports[i]);
2052 if diff > 0 && diff <= 1000 { diffs.push(diff);
2054 }
2055 }
2056
2057 if diffs.len() < 2 {
2058 return None;
2059 }
2060
2061 let mut diff_counts = std::collections::HashMap::new();
2063 for &diff in &diffs {
2064 *diff_counts.entry(diff).or_insert(0) += 1;
2065 }
2066
2067 let (most_common_diff, count) = diff_counts.iter()
2068 .max_by_key(|(_, &count)| count)
2069 .map(|(&diff, &count)| (diff, count))?;
2070
2071 let consistency_ratio = count as f64 / diffs.len() as f64;
2072
2073 if consistency_ratio >= 0.5 && most_common_diff > 1 { let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2077 pattern_type: AllocationPatternType::FixedStride,
2078 base_port: ports[0],
2079 stride: most_common_diff,
2080 pool_boundaries: None,
2081 confidence,
2082 })
2083 } else {
2084 None
2085 }
2086 }
2087
2088 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2090 if ports.len() < 5 {
2091 return None;
2092 }
2093
2094 let min_port = *ports.iter().min()?;
2095 let max_port = *ports.iter().max()?;
2096 let range = max_port - min_port;
2097
2098 if range > 0 && range <= 10000 { let expected_step = range / (ports.len() as u16 - 1);
2102 let mut uniform_score = 0.0;
2103
2104 let mut sorted_ports = ports.to_vec();
2105 sorted_ports.sort_unstable();
2106
2107 for i in 1..sorted_ports.len() {
2108 let actual_step = sorted_ports[i] - sorted_ports[i-1];
2109 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2110 let normalized_diff = step_diff / expected_step as f64;
2111 uniform_score += 1.0 - normalized_diff.min(1.0);
2112 }
2113
2114 uniform_score /= (sorted_ports.len() - 1) as f64;
2115
2116 if uniform_score >= 0.4 { let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2120 pattern_type: AllocationPatternType::PoolBased,
2121 base_port: min_port,
2122 stride: expected_step,
2123 pool_boundaries: Some((min_port, max_port)),
2124 confidence,
2125 })
2126 } else {
2127 None
2128 }
2129 } else {
2130 None
2131 }
2132 }
2133
2134 fn detect_time_based_pattern(&self, history: &VecDeque<PortAllocationEvent>) -> Option<PortAllocationPattern> {
2136 if history.len() < 4 {
2137 return None;
2138 }
2139
2140 let mut time_intervals = Vec::new();
2142 let events: Vec<_> = history.iter().collect();
2143
2144 for i in 1..events.len() {
2145 let interval = events[i-1].timestamp.duration_since(events[i].timestamp);
2146 time_intervals.push(interval);
2147 }
2148
2149 if time_intervals.is_empty() {
2150 return None;
2151 }
2152
2153 let avg_interval = time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2155
2156 let mut consistency_score = 0.0;
2157 for interval in &time_intervals {
2158 let diff = if *interval > avg_interval {
2159 *interval - avg_interval
2160 } else {
2161 avg_interval - *interval
2162 };
2163
2164 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2165 consistency_score += 1.0 - normalized_diff.min(1.0);
2166 }
2167
2168 consistency_score /= time_intervals.len() as f64;
2169
2170 if consistency_score >= 0.6 && avg_interval.as_millis() > 100 && avg_interval.as_millis() < 10000 {
2171 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2174 pattern_type: AllocationPatternType::TimeBased,
2175 base_port: events[0].port,
2176 stride: 1, pool_boundaries: None,
2178 confidence,
2179 })
2180 } else {
2181 None
2182 }
2183 }
2184
2185 pub(crate) fn generate_confidence_scored_predictions(
2187 &mut self,
2188 base_address: SocketAddr,
2189 pattern_analysis: &PatternAnalysisState,
2190 max_count: usize
2191 ) -> Vec<(DiscoveryCandidate, f64)> {
2192 let mut scored_predictions = Vec::new();
2193
2194 let predictions = self.generate_predictions(pattern_analysis, max_count);
2196
2197 for mut prediction in predictions {
2198 prediction.address = SocketAddr::new(base_address.ip(), prediction.address.port());
2200
2201 let confidence = self.calculate_prediction_confidence(&prediction, pattern_analysis, base_address);
2203
2204 scored_predictions.push((prediction, confidence));
2205 }
2206
2207 scored_predictions.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2209
2210 scored_predictions
2211 }
2212
2213 fn calculate_prediction_confidence(
2215 &self,
2216 prediction: &DiscoveryCandidate,
2217 pattern_analysis: &PatternAnalysisState,
2218 base_address: SocketAddr
2219 ) -> f64 {
2220 let mut confidence = 0.5; if let Some(ref pattern) = pattern_analysis.detected_pattern {
2224 confidence += pattern.confidence * 0.3;
2225 }
2226
2227 confidence += pattern_analysis.prediction_accuracy * 0.2;
2229
2230 let port_distance = (prediction.address.port() as i32 - base_address.port() as i32).abs();
2232 let proximity_score = if port_distance <= 10 {
2233 0.2
2234 } else if port_distance <= 100 {
2235 0.1
2236 } else {
2237 0.0
2238 };
2239 confidence += proximity_score;
2240
2241 let port_range_score = match prediction.address.port() {
2243 1024..=4999 => 0.1, 5000..=9999 => 0.15, 10000..=20000 => 0.1, 32768..=65535 => 0.05, _ => 0.0,
2248 };
2249 confidence += port_range_score;
2250
2251 confidence.max(0.0).min(1.0)
2253 }
2254
2255 pub(crate) fn update_pattern_analysis(
2257 &self,
2258 pattern_analysis: &mut PatternAnalysisState,
2259 new_event: PortAllocationEvent
2260 ) {
2261 pattern_analysis.allocation_history.push_back(new_event);
2263
2264 if pattern_analysis.allocation_history.len() > 20 {
2266 pattern_analysis.allocation_history.pop_front();
2267 }
2268
2269 pattern_analysis.detected_pattern = self.analyze_allocation_patterns(&pattern_analysis.allocation_history);
2271
2272 if let Some(ref pattern) = pattern_analysis.detected_pattern {
2274 pattern_analysis.confidence_level = pattern.confidence;
2275 } else {
2276 pattern_analysis.confidence_level *= 0.9; }
2278
2279 pattern_analysis.prediction_accuracy *= 0.95;
2283 }
2284}
2285
2286#[derive(Debug)]
2288pub(crate) struct BootstrapNodeManager {
2289 config: DiscoveryConfig,
2290 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
2291 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
2292 performance_tracker: BootstrapPerformanceTracker,
2293 last_health_check: Option<Instant>,
2294 health_check_interval: Duration,
2295 failover_threshold: f64,
2296 discovery_sources: Vec<BootstrapDiscoverySource>,
2297}
2298
2299#[derive(Debug, Clone)]
2301pub(crate) struct BootstrapNodeInfo {
2302 pub address: SocketAddr,
2304 pub last_seen: Instant,
2306 pub can_coordinate: bool,
2308 pub health_status: BootstrapHealthStatus,
2310 pub capabilities: BootstrapCapabilities,
2312 pub priority: u32,
2314 pub discovery_source: BootstrapDiscoverySource,
2316}
2317
2318#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2320pub(crate) enum BootstrapHealthStatus {
2321 Healthy,
2323 Degraded,
2325 Unhealthy,
2327 Unknown,
2329}
2330
2331#[derive(Debug, Clone, Default)]
2333pub(crate) struct BootstrapCapabilities {
2334 pub supports_nat_traversal: bool,
2336 pub supports_ipv6: bool,
2338 pub supports_quic_extensions: bool,
2340 pub max_concurrent_coordinations: u32,
2342 pub supported_quic_versions: Vec<u32>,
2344}
2345
2346#[derive(Debug, Clone, Default)]
2348pub(crate) struct BootstrapHealthStats {
2349 pub connection_attempts: u32,
2351 pub successful_connections: u32,
2353 pub failed_connections: u32,
2355 pub average_rtt: Option<Duration>,
2357 pub recent_rtts: VecDeque<Duration>,
2359 pub last_health_check: Option<Instant>,
2361 pub consecutive_failures: u32,
2363 pub coordination_requests: u32,
2365 pub successful_coordinations: u32,
2367}
2368
2369#[derive(Debug, Default)]
2371pub(crate) struct BootstrapPerformanceTracker {
2372 pub overall_success_rate: f64,
2374 pub average_response_time: Duration,
2376 pub best_performers: Vec<BootstrapNodeId>,
2378 pub failover_nodes: Vec<BootstrapNodeId>,
2380 pub performance_history: VecDeque<PerformanceSnapshot>,
2382}
2383
2384#[derive(Debug, Clone)]
2386pub(crate) struct PerformanceSnapshot {
2387 pub timestamp: Instant,
2388 pub active_nodes: u32,
2389 pub success_rate: f64,
2390 pub average_rtt: Duration,
2391}
2392
2393#[derive(Debug, Clone, PartialEq, Eq)]
2395pub(crate) enum BootstrapDiscoverySource {
2396 Static,
2398 DNS,
2400 DHT,
2402 Multicast,
2404 UserProvided,
2406}
2407
2408impl BootstrapNodeManager {
2409 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2410 Self {
2411 config: config.clone(),
2412 bootstrap_nodes: HashMap::new(),
2413 health_stats: HashMap::new(),
2414 performance_tracker: BootstrapPerformanceTracker::default(),
2415 last_health_check: None,
2416 health_check_interval: Duration::from_secs(30),
2417 failover_threshold: 0.3, discovery_sources: vec![
2419 BootstrapDiscoverySource::Static,
2420 BootstrapDiscoverySource::DNS,
2421 BootstrapDiscoverySource::UserProvided,
2422 ],
2423 }
2424 }
2425
2426 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
2428 let now = Instant::now();
2429
2430 for (i, node) in nodes.into_iter().enumerate() {
2432 let node_id = BootstrapNodeId(i as u64);
2433
2434 let node_info = BootstrapNodeInfo {
2435 address: node.address,
2436 last_seen: node.last_seen,
2437 can_coordinate: node.can_coordinate,
2438 health_status: BootstrapHealthStatus::Unknown,
2439 capabilities: BootstrapCapabilities {
2440 supports_nat_traversal: node.can_coordinate,
2441 supports_ipv6: node.address.is_ipv6(),
2442 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
2446 priority: self.calculate_initial_priority(&node),
2447 discovery_source: BootstrapDiscoverySource::UserProvided,
2448 };
2449
2450 self.bootstrap_nodes.insert(node_id, node_info);
2451
2452 if !self.health_stats.contains_key(&node_id) {
2454 self.health_stats.insert(node_id, BootstrapHealthStats::default());
2455 }
2456 }
2457
2458 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
2459 self.schedule_health_check(now);
2460 }
2461
2462 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
2464 let mut active_nodes: Vec<_> = self.bootstrap_nodes
2465 .iter()
2466 .filter(|(_, node)| {
2467 matches!(node.health_status, BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown)
2468 })
2469 .map(|(&id, node)| (id, node))
2470 .collect();
2471
2472 active_nodes.sort_by(|a, b| {
2474 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
2476 if health_cmp != std::cmp::Ordering::Equal {
2477 return health_cmp;
2478 }
2479
2480 b.1.priority.cmp(&a.1.priority)
2482 });
2483
2484 active_nodes.into_iter().map(|(id, _)| id).collect()
2485 }
2486
2487 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
2489 self.bootstrap_nodes.get(&id).map(|node| node.address)
2490 }
2491
2492 pub(crate) fn perform_health_check(&mut self, now: Instant) {
2494 if let Some(last_check) = self.last_health_check {
2495 if now.duration_since(last_check) < self.health_check_interval {
2496 return; }
2498 }
2499
2500 debug!("Performing health check on {} bootstrap nodes", self.bootstrap_nodes.len());
2501
2502 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
2504
2505 for node_id in node_ids {
2506 self.check_node_health(node_id, now);
2507 }
2508
2509 self.update_performance_metrics(now);
2510 self.last_health_check = Some(now);
2511 }
2512
2513 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
2515 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
2517 if node_info_opt.is_none() {
2518 return; }
2520 let node_info_for_priority = node_info_opt.unwrap();
2521 let current_health_status = node_info_for_priority.health_status;
2522
2523 let (_success_rate, new_health_status, _average_rtt) = {
2525 let stats = self.health_stats.get_mut(&node_id).unwrap();
2526
2527 let success_rate = if stats.connection_attempts > 0 {
2529 stats.successful_connections as f64 / stats.connection_attempts as f64
2530 } else {
2531 1.0 };
2533
2534 if !stats.recent_rtts.is_empty() {
2536 let total_rtt: Duration = stats.recent_rtts.iter().sum();
2537 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
2538 }
2539
2540 let new_health_status = if stats.consecutive_failures >= 3 {
2542 BootstrapHealthStatus::Unhealthy
2543 } else if success_rate < self.failover_threshold {
2544 BootstrapHealthStatus::Degraded
2545 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
2546 BootstrapHealthStatus::Healthy
2547 } else {
2548 current_health_status };
2550
2551 stats.last_health_check = Some(now);
2552
2553 (success_rate, new_health_status, stats.average_rtt)
2554 };
2555
2556 let stats_snapshot = self.health_stats.get(&node_id).unwrap();
2558 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
2559
2560 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2562 if new_health_status != node_info.health_status {
2563 info!("Bootstrap node {:?} health status changed: {:?} -> {:?}",
2564 node_id, node_info.health_status, new_health_status);
2565 node_info.health_status = new_health_status;
2566 }
2567
2568 node_info.priority = new_priority;
2569 }
2570 }
2571
2572 pub(crate) fn record_connection_attempt(&mut self, node_id: BootstrapNodeId, success: bool, rtt: Option<Duration>) {
2574 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2575 stats.connection_attempts += 1;
2576
2577 if success {
2578 stats.successful_connections += 1;
2579 stats.consecutive_failures = 0;
2580
2581 if let Some(rtt) = rtt {
2582 stats.recent_rtts.push_back(rtt);
2583 if stats.recent_rtts.len() > 10 {
2584 stats.recent_rtts.pop_front();
2585 }
2586 }
2587 } else {
2588 stats.failed_connections += 1;
2589 stats.consecutive_failures += 1;
2590 }
2591 }
2592
2593 if success {
2595 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2596 node_info.last_seen = Instant::now();
2597 }
2598 }
2599 }
2600
2601 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
2603 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2604 stats.coordination_requests += 1;
2605 if success {
2606 stats.successful_coordinations += 1;
2607 }
2608 }
2609 }
2610
2611 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
2613 let mut nodes_with_scores: Vec<_> = self.bootstrap_nodes
2614 .iter()
2615 .filter_map(|(&id, node)| {
2616 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
2617 let score = self.calculate_performance_score(id, node);
2618 Some((id, score))
2619 } else {
2620 None
2621 }
2622 })
2623 .collect();
2624
2625 nodes_with_scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2626
2627 nodes_with_scores
2628 .into_iter()
2629 .take(count)
2630 .map(|(id, _)| id)
2631 .collect()
2632 }
2633
2634 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
2636 let mut discovered_nodes = Vec::new();
2637
2638 if let Ok(dns_nodes) = self.discover_via_dns() {
2640 discovered_nodes.extend(dns_nodes);
2641 }
2642
2643 if let Ok(multicast_nodes) = self.discover_via_multicast() {
2645 discovered_nodes.extend(multicast_nodes);
2646 }
2647
2648 for node in &discovered_nodes {
2650 let node_id = BootstrapNodeId(rand::random());
2651 self.bootstrap_nodes.insert(node_id, node.clone());
2652 self.health_stats.insert(node_id, BootstrapHealthStats::default());
2653 }
2654
2655 if !discovered_nodes.is_empty() {
2656 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
2657 }
2658
2659 Ok(discovered_nodes)
2660 }
2661
2662 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
2664 debug!("DNS-based bootstrap discovery not yet implemented");
2667 Ok(Vec::new())
2668 }
2669
2670 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
2672 debug!("Multicast-based bootstrap discovery not yet implemented");
2675 Ok(Vec::new())
2676 }
2677
2678 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
2680 let mut priority = 100; if node.can_coordinate {
2683 priority += 50;
2684 }
2685
2686 if let Some(rtt) = node.rtt {
2687 if rtt < Duration::from_millis(50) {
2688 priority += 30;
2689 } else if rtt < Duration::from_millis(100) {
2690 priority += 20;
2691 } else if rtt < Duration::from_millis(200) {
2692 priority += 10;
2693 }
2694 }
2695
2696 if node.address.is_ipv6() {
2698 priority += 10;
2699 }
2700
2701 priority
2702 }
2703
2704 fn calculate_dynamic_priority(&self, node_info: &BootstrapNodeInfo, stats: &BootstrapHealthStats) -> u32 {
2706 let mut priority = node_info.priority;
2707
2708 let success_rate = if stats.connection_attempts > 0 {
2710 stats.successful_connections as f64 / stats.connection_attempts as f64
2711 } else {
2712 1.0
2713 };
2714
2715 priority = (priority as f64 * success_rate) as u32;
2716
2717 if let Some(avg_rtt) = stats.average_rtt {
2719 if avg_rtt < Duration::from_millis(50) {
2720 priority += 20;
2721 } else if avg_rtt > Duration::from_millis(500) {
2722 priority = priority.saturating_sub(20);
2723 }
2724 }
2725
2726 priority = priority.saturating_sub(stats.consecutive_failures * 10);
2728
2729 priority.max(1) }
2731
2732 fn calculate_performance_score(&self, node_id: BootstrapNodeId, _node_info: &BootstrapNodeInfo) -> f64 {
2734 let stats = self.health_stats.get(&node_id).unwrap();
2735
2736 let mut score = 0.0;
2737
2738 let success_rate = if stats.connection_attempts > 0 {
2740 stats.successful_connections as f64 / stats.connection_attempts as f64
2741 } else {
2742 1.0
2743 };
2744 score += success_rate * 0.4;
2745
2746 if let Some(avg_rtt) = stats.average_rtt {
2748 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
2749 score += rtt_score * 0.3;
2750 } else {
2751 score += 0.3; }
2753
2754 let coord_success_rate = if stats.coordination_requests > 0 {
2756 stats.successful_coordinations as f64 / stats.coordination_requests as f64
2757 } else {
2758 1.0
2759 };
2760 score += coord_success_rate * 0.2;
2761
2762 let stability_score = if stats.consecutive_failures == 0 {
2764 1.0
2765 } else {
2766 1.0 / (stats.consecutive_failures as f64 + 1.0)
2767 };
2768 score += stability_score * 0.1;
2769
2770 score
2771 }
2772
2773 fn compare_health_status(&self, a: BootstrapHealthStatus, b: BootstrapHealthStatus) -> std::cmp::Ordering {
2775 use std::cmp::Ordering;
2776
2777 match (a, b) {
2778 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
2779 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
2781 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
2782 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
2784 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
2785 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
2787 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
2788 }
2789 }
2790
2791 fn update_performance_metrics(&mut self, now: Instant) {
2793 let mut total_attempts = 0;
2794 let mut total_successes = 0;
2795 let mut total_rtt = Duration::ZERO;
2796 let mut rtt_count = 0;
2797
2798 for stats in self.health_stats.values() {
2799 total_attempts += stats.connection_attempts;
2800 total_successes += stats.successful_connections;
2801
2802 if let Some(avg_rtt) = stats.average_rtt {
2803 total_rtt += avg_rtt;
2804 rtt_count += 1;
2805 }
2806 }
2807
2808 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
2809 total_successes as f64 / total_attempts as f64
2810 } else {
2811 1.0
2812 };
2813
2814 self.performance_tracker.average_response_time = if rtt_count > 0 {
2815 total_rtt / rtt_count
2816 } else {
2817 Duration::from_millis(100) };
2819
2820 self.performance_tracker.best_performers = self.get_best_performers(5);
2822
2823 let snapshot = PerformanceSnapshot {
2825 timestamp: now,
2826 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
2827 success_rate: self.performance_tracker.overall_success_rate,
2828 average_rtt: self.performance_tracker.average_response_time,
2829 };
2830
2831 self.performance_tracker.performance_history.push_back(snapshot);
2832 if self.performance_tracker.performance_history.len() > 100 {
2833 self.performance_tracker.performance_history.pop_front();
2834 }
2835 }
2836
2837 fn schedule_health_check(&mut self, _now: Instant) {
2839 }
2842
2843 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
2845 &self.performance_tracker
2846 }
2847
2848 pub(crate) fn get_node_health_stats(&self, node_id: BootstrapNodeId) -> Option<&BootstrapHealthStats> {
2850 self.health_stats.get(&node_id)
2851 }
2852}
2853
2854#[derive(Debug)]
2856pub(crate) struct DiscoveryCache {
2857 config: DiscoveryConfig,
2858}
2859
2860impl DiscoveryCache {
2861 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2862 Self {
2863 config: config.clone(),
2864 }
2865 }
2866}
2867
2868pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
2870 #[cfg(target_os = "windows")]
2871 return Box::new(WindowsInterfaceDiscovery::new());
2872
2873 #[cfg(target_os = "linux")]
2874 return Box::new(LinuxInterfaceDiscovery::new());
2875
2876 #[cfg(target_os = "macos")]
2877 return Box::new(MacOSInterfaceDiscovery::new());
2878
2879 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
2880 return Box::new(GenericInterfaceDiscovery::new());
2881}
2882
2883pub(crate) struct GenericInterfaceDiscovery {
2893 scan_complete: bool,
2894}
2895
2896impl GenericInterfaceDiscovery {
2897 pub(crate) fn new() -> Self {
2898 Self { scan_complete: false }
2899 }
2900}
2901
2902impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
2903 fn start_scan(&mut self) -> Result<(), String> {
2904 self.scan_complete = true;
2906 Ok(())
2907 }
2908
2909 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
2910 if self.scan_complete {
2911 self.scan_complete = false;
2912 Some(vec![
2913 NetworkInterface {
2914 name: "generic".to_string(),
2915 addresses: vec!["127.0.0.1:0".parse().unwrap()],
2916 is_up: true,
2917 is_wireless: false,
2918 mtu: Some(1500),
2919 }
2920 ])
2921 } else {
2922 None
2923 }
2924 }
2925}
2926
2927impl std::fmt::Display for DiscoveryError {
2928 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2929 match self {
2930 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
2931 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
2932 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
2933 Self::InsufficientCandidates { found, required } => write!(f, "insufficient candidates found: {} < {}", found, required),
2934 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
2935 Self::ConfigurationError(msg) => write!(f, "configuration error: {}", msg),
2936 Self::InternalError(msg) => write!(f, "internal error: {}", msg),
2937 }
2938 }
2939}
2940
2941impl std::error::Error for DiscoveryError {}
2942
2943pub mod test_utils {
2945 use super::*;
2946
2947 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
2949 let mut priority = 100; match address {
2951 IpAddr::V4(ipv4) => {
2952 if ipv4.is_private() {
2953 priority += 50; }
2955 },
2956 IpAddr::V6(ipv6) => {
2957 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
2960 let segments = ipv6.segments();
2961 if segments[0] & 0xE000 == 0x2000 {
2962 priority += 60;
2964 } else if segments[0] & 0xFFC0 == 0xFE80 {
2965 priority += 20;
2967 } else if segments[0] & 0xFE00 == 0xFC00 {
2968 priority += 40;
2970 } else {
2971 priority += 30;
2973 }
2974 }
2975
2976 priority += 10; },
2979 }
2980 priority
2981 }
2982
2983 pub fn is_valid_address(address: &IpAddr) -> bool {
2985 match address {
2986 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
2987 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
2988 }
2989 }
2990}