1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 time::{Duration, Instant},
13};
14
15use tracing::{debug, info, warn};
16
17#[cfg(feature = "production-ready")]
18use crate::Connection;
19
20use crate::{
21 connection::nat_traversal::{CandidateSource, CandidateState, NatTraversalRole},
22 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
23};
24
25#[cfg(target_os = "windows")]
27pub mod windows;
28
29#[cfg(target_os = "windows")]
30pub use windows::WindowsInterfaceDiscovery;
31
32#[cfg(target_os = "linux")]
33pub mod linux;
34
35#[cfg(target_os = "linux")]
36pub use linux::LinuxInterfaceDiscovery;
37
38#[cfg(target_os = "macos")]
39pub(crate) mod macos;
40
41#[cfg(target_os = "macos")]
42pub(crate) use macos::MacOSInterfaceDiscovery;
43
44fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
46 match discovery_source {
47 DiscoverySourceType::Local => CandidateSource::Local,
48 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
49 DiscoverySourceType::Predicted => CandidateSource::Predicted,
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum DiscoverySourceType {
56 Local,
57 ServerReflexive,
58 Predicted,
59}
60
61#[derive(Debug, Clone)]
63pub(crate) struct DiscoveryCandidate {
64 pub address: SocketAddr,
65 pub priority: u32,
66 pub source: DiscoverySourceType,
67 pub state: CandidateState,
68}
69
70impl DiscoveryCandidate {
71 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
73 CandidateAddress {
74 address: self.address,
75 priority: self.priority,
76 source: convert_to_nat_source(self.source),
77 state: self.state,
78 }
79 }
80}
81
82pub struct CandidateDiscoveryManager {
84 current_phase: DiscoveryPhase,
86 config: DiscoveryConfig,
88 interface_discovery: Box<dyn NetworkInterfaceDiscovery + Send>,
90 server_reflexive_discovery: ServerReflexiveDiscovery,
92 symmetric_predictor: SymmetricNatPredictor,
94 bootstrap_manager: BootstrapNodeManager,
96 cache: DiscoveryCache,
98 session_state: DiscoverySessionState,
100}
101
102#[derive(Debug, Clone)]
104pub struct DiscoveryConfig {
105 pub total_timeout: Duration,
107 pub local_scan_timeout: Duration,
109 pub bootstrap_query_timeout: Duration,
111 pub max_query_retries: u32,
113 pub max_candidates: usize,
115 pub enable_symmetric_prediction: bool,
117 pub min_bootstrap_consensus: usize,
119 pub interface_cache_ttl: Duration,
121 pub server_reflexive_cache_ttl: Duration,
123}
124
125#[derive(Debug, Clone, PartialEq)]
127pub enum DiscoveryPhase {
128 Idle,
130 LocalInterfaceScanning {
132 started_at: Instant,
133 },
134 ServerReflexiveQuerying {
136 started_at: Instant,
137 active_queries: HashMap<BootstrapNodeId, QueryState>,
138 responses_received: Vec<ServerReflexiveResponse>,
139 },
140 SymmetricNatPrediction {
142 started_at: Instant,
143 prediction_attempts: u32,
144 pattern_analysis: PatternAnalysisState,
145 },
146 CandidateValidation {
148 started_at: Instant,
149 validation_results: HashMap<CandidateId, ValidationResult>,
150 },
151 Completed {
153 final_candidates: Vec<ValidatedCandidate>,
154 completion_time: Instant,
155 },
156 Failed {
158 error: DiscoveryError,
159 failed_at: Instant,
160 fallback_options: Vec<FallbackStrategy>,
161 },
162}
163
164#[derive(Debug, Clone)]
166pub enum DiscoveryEvent {
167 DiscoveryStarted {
169 peer_id: PeerId,
170 bootstrap_count: usize,
171 },
172 LocalScanningStarted,
174 LocalCandidateDiscovered {
176 candidate: CandidateAddress,
177 },
178 LocalScanningCompleted {
180 candidate_count: usize,
181 duration: Duration,
182 },
183 ServerReflexiveDiscoveryStarted {
185 bootstrap_count: usize,
186 },
187 ServerReflexiveCandidateDiscovered {
189 candidate: CandidateAddress,
190 bootstrap_node: SocketAddr,
191 },
192 BootstrapQueryFailed {
194 bootstrap_node: SocketAddr,
195 error: String,
196 },
197 SymmetricPredictionStarted {
199 base_address: SocketAddr,
200 },
201 PredictedCandidateGenerated {
203 candidate: CandidateAddress,
204 confidence: f64,
205 },
206 PortAllocationDetected {
208 port: u16,
209 source_address: SocketAddr,
210 bootstrap_node: BootstrapNodeId,
211 timestamp: Instant,
212 },
213 DiscoveryCompleted {
215 candidate_count: usize,
216 total_duration: Duration,
217 success_rate: f64,
218 },
219 DiscoveryFailed {
221 error: DiscoveryError,
222 partial_results: Vec<CandidateAddress>,
223 },
224}
225
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
228pub struct BootstrapNodeId(pub u64);
229
230#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum QueryState {
233 Pending {
235 sent_at: Instant,
236 attempts: u32,
237 },
238 Completed,
240 Failed,
242}
243
244#[derive(Debug, Clone, PartialEq)]
246pub struct ServerReflexiveResponse {
247 pub bootstrap_node: BootstrapNodeId,
248 pub observed_address: SocketAddr,
249 pub response_time: Duration,
250 pub timestamp: Instant,
251}
252
253#[derive(Debug, Clone, PartialEq)]
255pub struct PatternAnalysisState {
256 pub allocation_history: VecDeque<PortAllocationEvent>,
257 pub detected_pattern: Option<PortAllocationPattern>,
258 pub confidence_level: f64,
259 pub prediction_accuracy: f64,
260}
261
262#[derive(Debug, Clone, PartialEq)]
264pub struct PortAllocationEvent {
265 pub port: u16,
266 pub timestamp: Instant,
267 pub source_address: SocketAddr,
268}
269
270#[derive(Debug, Clone, PartialEq)]
272pub struct PortAllocationPattern {
273 pub pattern_type: AllocationPatternType,
274 pub base_port: u16,
275 pub stride: u16,
276 pub pool_boundaries: Option<(u16, u16)>,
277 pub confidence: f64,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
282pub enum AllocationPatternType {
283 Sequential,
285 FixedStride,
287 Random,
289 PoolBased,
291 TimeBased,
293 Unknown,
295}
296
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
299pub struct CandidateId(pub u64);
300
301#[derive(Debug, Clone, PartialEq)]
303pub enum ValidationResult {
304 Valid { rtt: Duration },
305 Invalid { reason: String },
306 Timeout,
307 Pending,
308}
309
310#[derive(Debug, Clone, PartialEq)]
312pub struct ValidatedCandidate {
313 pub id: CandidateId,
314 pub address: SocketAddr,
315 pub source: DiscoverySourceType,
316 pub priority: u32,
317 pub rtt: Option<Duration>,
318 pub reliability_score: f64,
319}
320
321impl ValidatedCandidate {
322 pub fn to_candidate_address(&self) -> CandidateAddress {
324 CandidateAddress {
325 address: self.address,
326 priority: self.priority,
327 source: convert_to_nat_source(self.source),
328 state: CandidateState::Valid,
329 }
330 }
331}
332
333#[derive(Debug)]
335pub(crate) struct DiscoverySessionState {
336 pub peer_id: PeerId,
337 pub session_id: u64,
338 pub started_at: Instant,
339 pub discovered_candidates: Vec<DiscoveryCandidate>,
340 pub statistics: DiscoveryStatistics,
341 pub allocation_history: VecDeque<PortAllocationEvent>,
342}
343
344#[derive(Debug, Default, Clone)]
346pub struct DiscoveryStatistics {
347 pub local_candidates_found: u32,
348 pub server_reflexive_candidates_found: u32,
349 pub predicted_candidates_generated: u32,
350 pub bootstrap_queries_sent: u32,
351 pub bootstrap_queries_successful: u32,
352 pub total_discovery_time: Option<Duration>,
353 pub average_bootstrap_rtt: Option<Duration>,
354}
355
356#[derive(Debug, Clone, PartialEq, Eq)]
358pub enum DiscoveryError {
359 NoLocalInterfaces,
361 AllBootstrapsFailed,
363 DiscoveryTimeout,
365 InsufficientCandidates { found: usize, required: usize },
367 NetworkError(String),
369 ConfigurationError(String),
371 InternalError(String),
373}
374
375#[derive(Debug, Clone, PartialEq, Eq)]
377pub enum FallbackStrategy {
378 UseCachedResults,
380 RetryWithRelaxedParams,
382 UseMinimalCandidates,
384 EnableRelayFallback,
386}
387
388impl Default for DiscoveryConfig {
389 fn default() -> Self {
390 Self {
391 total_timeout: Duration::from_secs(30),
392 local_scan_timeout: Duration::from_secs(2),
393 bootstrap_query_timeout: Duration::from_secs(5),
394 max_query_retries: 3,
395 max_candidates: 8,
396 enable_symmetric_prediction: true,
397 min_bootstrap_consensus: 2,
398 interface_cache_ttl: Duration::from_secs(60),
399 server_reflexive_cache_ttl: Duration::from_secs(300),
400 }
401 }
402}
403
404impl CandidateDiscoveryManager {
405 pub fn new(config: DiscoveryConfig) -> Self {
407 let interface_discovery = create_platform_interface_discovery();
408 let server_reflexive_discovery = ServerReflexiveDiscovery::new(&config);
409 let symmetric_predictor = SymmetricNatPredictor::new(&config);
410 let bootstrap_manager = BootstrapNodeManager::new(&config);
411 let cache = DiscoveryCache::new(&config);
412
413 Self {
414 current_phase: DiscoveryPhase::Idle,
415 config,
416 interface_discovery,
417 server_reflexive_discovery,
418 symmetric_predictor,
419 bootstrap_manager,
420 cache,
421 session_state: DiscoverySessionState {
422 peer_id: PeerId([0; 32]), session_id: 0,
424 started_at: Instant::now(),
425 discovered_candidates: Vec::new(),
426 statistics: DiscoveryStatistics::default(),
427 allocation_history: VecDeque::new(),
428 },
429 }
430 }
431
432 pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
434 self.interface_discovery.start_scan().map_err(|e| {
436 DiscoveryError::NetworkError(format!("Failed to start interface scan: {}", e))
437 })?;
438
439 let start = Instant::now();
441 let timeout = Duration::from_secs(2);
442
443 loop {
444 if start.elapsed() > timeout {
445 return Err(DiscoveryError::DiscoveryTimeout);
446 }
447
448 if let Some(interfaces) = self.interface_discovery.check_scan_complete() {
449 let mut candidates = Vec::new();
451
452 for interface in interfaces {
453 for addr in interface.addresses {
454 candidates.push(ValidatedCandidate {
455 id: CandidateId(rand::random()),
456 address: addr,
457 source: DiscoverySourceType::Local,
458 priority: 50000, rtt: None,
460 reliability_score: 1.0,
461 });
462 }
463 }
464
465 if candidates.is_empty() {
466 return Err(DiscoveryError::NoLocalInterfaces);
467 }
468
469 return Ok(candidates);
470 }
471
472 std::thread::sleep(Duration::from_millis(10));
474 }
475 }
476
477 pub fn start_discovery(&mut self, peer_id: PeerId, bootstrap_nodes: Vec<BootstrapNode>) -> Result<(), DiscoveryError> {
479 if !matches!(self.current_phase, DiscoveryPhase::Idle | DiscoveryPhase::Failed { .. } | DiscoveryPhase::Completed { .. }) {
480 return Err(DiscoveryError::InternalError("Discovery already in progress".to_string()));
481 }
482
483 info!("Starting candidate discovery for peer {:?}", peer_id);
484
485 self.session_state.peer_id = peer_id;
487 self.session_state.session_id = rand::random();
488 self.session_state.started_at = Instant::now();
489 self.session_state.discovered_candidates.clear();
490 self.session_state.statistics = DiscoveryStatistics::default();
491
492 self.bootstrap_manager.update_bootstrap_nodes(bootstrap_nodes);
494
495 self.current_phase = DiscoveryPhase::LocalInterfaceScanning {
497 started_at: Instant::now(),
498 };
499
500 Ok(())
501 }
502
503 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
505 let mut events = Vec::new();
506
507 if self.session_state.started_at.elapsed() > self.config.total_timeout {
509 self.handle_discovery_timeout(&mut events, now);
510 return events;
511 }
512
513 match &self.current_phase.clone() {
514 DiscoveryPhase::Idle => {
515 },
517
518 DiscoveryPhase::LocalInterfaceScanning { started_at } => {
519 self.poll_local_interface_scanning(*started_at, now, &mut events);
520 },
521
522 DiscoveryPhase::ServerReflexiveQuerying { started_at, active_queries, responses_received } => {
523 self.poll_server_reflexive_discovery(*started_at, active_queries, responses_received, now, &mut events);
524 },
525
526 DiscoveryPhase::SymmetricNatPrediction { started_at, prediction_attempts, pattern_analysis } => {
527 self.poll_symmetric_prediction(*started_at, *prediction_attempts, pattern_analysis, now, &mut events);
528 },
529
530 DiscoveryPhase::CandidateValidation { started_at, validation_results } => {
531 self.poll_candidate_validation(*started_at, validation_results, now, &mut events);
532 },
533
534 DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. } => {
535 },
537 }
538
539 events
540 }
541
542 pub fn get_status(&self) -> DiscoveryStatus {
544 DiscoveryStatus {
545 phase: self.current_phase.clone(),
546 discovered_candidates: self.session_state.discovered_candidates.iter()
547 .map(|c| c.to_candidate_address())
548 .collect(),
549 statistics: self.session_state.statistics.clone(),
550 elapsed_time: self.session_state.started_at.elapsed(),
551 }
552 }
553
554 pub fn is_complete(&self) -> bool {
556 matches!(self.current_phase, DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. })
557 }
558
559 pub fn get_results(&self) -> Option<DiscoveryResults> {
561 match &self.current_phase {
562 DiscoveryPhase::Completed { final_candidates, completion_time } => {
563 Some(DiscoveryResults {
564 candidates: final_candidates.clone(),
565 completion_time: *completion_time,
566 statistics: self.session_state.statistics.clone(),
567 })
568 },
569 DiscoveryPhase::Failed { .. } => {
570 Some(DiscoveryResults {
571 candidates: Vec::new(),
572 completion_time: Instant::now(),
573 statistics: self.session_state.statistics.clone(),
574 })
575 },
576 _ => None,
577 }
578 }
579
580 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
582 if self.session_state.peer_id == peer_id {
584 self.session_state.discovered_candidates.iter()
586 .map(|c| c.to_candidate_address())
587 .collect()
588 } else {
589 debug!("No candidates found for peer {:?} (current session is for {:?})",
592 peer_id, self.session_state.peer_id);
593 Vec::new()
594 }
595 }
596
597 fn poll_local_interface_scanning(&mut self, started_at: Instant, now: Instant, events: &mut Vec<DiscoveryEvent>) {
600 if started_at.elapsed() > self.config.local_scan_timeout {
602 warn!("Local interface scanning timeout");
603 self.handle_local_scan_timeout(events, now);
604 return;
605 }
606
607 if let Some(interfaces) = self.interface_discovery.check_scan_complete() {
609 self.process_local_interfaces(interfaces, events, now);
610 }
611 }
612
613 fn process_local_interfaces(&mut self, interfaces: Vec<NetworkInterface>, events: &mut Vec<DiscoveryEvent>, now: Instant) {
614 debug!("Processing {} network interfaces", interfaces.len());
615
616 for interface in interfaces {
617 for address in &interface.addresses {
618 if self.is_valid_local_address(&address) {
619 let candidate = DiscoveryCandidate {
620 address: *address,
621 priority: self.calculate_local_priority(address, &interface),
622 source: DiscoverySourceType::Local,
623 state: CandidateState::New,
624 };
625
626 self.session_state.discovered_candidates.push(candidate.clone());
627 self.session_state.statistics.local_candidates_found += 1;
628
629 events.push(DiscoveryEvent::LocalCandidateDiscovered {
630 candidate: candidate.to_candidate_address()
631 });
632 }
633 }
634 }
635
636 events.push(DiscoveryEvent::LocalScanningCompleted {
637 candidate_count: self.session_state.statistics.local_candidates_found as usize,
638 duration: now.duration_since(self.session_state.started_at),
639 });
640
641 self.start_server_reflexive_discovery(events, now);
643 }
644
645 fn start_server_reflexive_discovery(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
646 let bootstrap_node_ids = self.bootstrap_manager.get_active_bootstrap_nodes();
647
648 if bootstrap_node_ids.is_empty() {
649 warn!("No bootstrap nodes available for server reflexive discovery");
650 self.handle_no_bootstrap_nodes(events, now);
651 return;
652 }
653
654 let bootstrap_nodes_with_addresses: Vec<(BootstrapNodeId, SocketAddr)> = bootstrap_node_ids
656 .iter()
657 .filter_map(|&node_id| {
658 self.bootstrap_manager.get_bootstrap_address(node_id)
659 .map(|addr| (node_id, addr))
660 })
661 .collect();
662
663 if bootstrap_nodes_with_addresses.is_empty() {
664 warn!("No bootstrap node addresses available for server reflexive discovery");
665 self.handle_no_bootstrap_nodes(events, now);
666 return;
667 }
668
669 let active_queries = self.server_reflexive_discovery.start_queries_with_addresses(&bootstrap_nodes_with_addresses, now);
671
672 events.push(DiscoveryEvent::ServerReflexiveDiscoveryStarted {
673 bootstrap_count: bootstrap_nodes_with_addresses.len(),
674 });
675
676 self.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
677 started_at: now,
678 active_queries,
679 responses_received: Vec::new(),
680 };
681 }
682
683 fn poll_server_reflexive_discovery(
684 &mut self,
685 started_at: Instant,
686 active_queries: &HashMap<BootstrapNodeId, QueryState>,
687 responses_received: &Vec<ServerReflexiveResponse>,
688 now: Instant,
689 events: &mut Vec<DiscoveryEvent>
690 ) {
691 let new_responses = self.server_reflexive_discovery.poll_queries(active_queries, now);
693
694 let mut updated_responses = responses_received.clone();
695 for response in new_responses {
696 self.process_server_reflexive_response(&response, events);
697 updated_responses.push(response);
698 }
699
700 if self.should_transition_to_prediction(&updated_responses, now) {
702 self.start_symmetric_prediction(&updated_responses, events, now);
703 } else if started_at.elapsed() > self.config.bootstrap_query_timeout * 2 {
704 if updated_responses.len() >= self.config.min_bootstrap_consensus {
706 self.start_symmetric_prediction(&updated_responses, events, now);
707 } else {
708 self.handle_insufficient_bootstrap_responses(events, now);
709 }
710 } else {
711 self.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
713 started_at,
714 active_queries: active_queries.clone(),
715 responses_received: updated_responses,
716 };
717 }
718 }
719
720 fn process_server_reflexive_response(&mut self, response: &ServerReflexiveResponse, events: &mut Vec<DiscoveryEvent>) {
721 debug!("Received server reflexive response: {:?}", response);
722
723 let allocation_event = PortAllocationEvent {
725 port: response.observed_address.port(),
726 timestamp: response.timestamp,
727 source_address: response.observed_address,
728 };
729
730 if let DiscoveryPhase::ServerReflexiveQuerying { .. } = &mut self.current_phase {
732 self.session_state.allocation_history.push_back(allocation_event.clone());
735
736 if self.session_state.allocation_history.len() > 20 {
738 self.session_state.allocation_history.pop_front();
739 }
740 }
741
742 let candidate = DiscoveryCandidate {
743 address: response.observed_address,
744 priority: self.calculate_server_reflexive_priority(response),
745 source: DiscoverySourceType::ServerReflexive,
746 state: CandidateState::New,
747 };
748
749 self.session_state.discovered_candidates.push(candidate.clone());
750 self.session_state.statistics.server_reflexive_candidates_found += 1;
751
752 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
753 candidate: candidate.to_candidate_address(),
754 bootstrap_node: self.bootstrap_manager.get_bootstrap_address(response.bootstrap_node).unwrap_or_else(|| "unknown".parse().unwrap()),
755 });
756
757 events.push(DiscoveryEvent::PortAllocationDetected {
758 port: allocation_event.port,
759 source_address: allocation_event.source_address,
760 bootstrap_node: response.bootstrap_node,
761 timestamp: allocation_event.timestamp,
762 });
763 }
764
765 fn start_symmetric_prediction(&mut self, responses: &[ServerReflexiveResponse], events: &mut Vec<DiscoveryEvent>, now: Instant) {
766 if !self.config.enable_symmetric_prediction || responses.is_empty() {
767 self.start_candidate_validation(events, now);
768 return;
769 }
770
771 let base_address = self.calculate_consensus_address(responses);
773
774 events.push(DiscoveryEvent::SymmetricPredictionStarted { base_address });
775
776 let detected_pattern = self.symmetric_predictor.analyze_allocation_patterns(&self.session_state.allocation_history);
778
779 let confidence_level = detected_pattern.as_ref().map(|p| p.confidence).unwrap_or(0.0);
780
781 let prediction_accuracy = if let Some(ref pattern) = detected_pattern {
783 self.calculate_prediction_accuracy(pattern, &self.session_state.allocation_history)
784 } else {
785 0.3 };
787
788 debug!("Symmetric NAT pattern analysis: detected_pattern={:?}, confidence={:.2}, accuracy={:.2}",
789 detected_pattern, confidence_level, prediction_accuracy);
790
791 self.current_phase = DiscoveryPhase::SymmetricNatPrediction {
792 started_at: now,
793 prediction_attempts: 0,
794 pattern_analysis: PatternAnalysisState {
795 allocation_history: self.session_state.allocation_history.clone(),
796 detected_pattern,
797 confidence_level,
798 prediction_accuracy,
799 },
800 };
801 }
802
803 fn poll_symmetric_prediction(
804 &mut self,
805 _started_at: Instant,
806 _prediction_attempts: u32,
807 pattern_analysis: &PatternAnalysisState,
808 now: Instant,
809 events: &mut Vec<DiscoveryEvent>
810 ) {
811 let predicted_candidates = self.symmetric_predictor.generate_predictions(pattern_analysis, self.config.max_candidates - self.session_state.discovered_candidates.len());
813
814 for candidate in predicted_candidates {
815 self.session_state.discovered_candidates.push(candidate.clone());
816 self.session_state.statistics.predicted_candidates_generated += 1;
817
818 events.push(DiscoveryEvent::PredictedCandidateGenerated {
819 candidate: candidate.to_candidate_address(),
820 confidence: pattern_analysis.confidence_level,
821 });
822 }
823
824 self.start_candidate_validation(events, now);
826 }
827
828 fn start_candidate_validation(&mut self, _events: &mut Vec<DiscoveryEvent>, now: Instant) {
829 debug!("Starting candidate validation for {} candidates", self.session_state.discovered_candidates.len());
830
831 self.current_phase = DiscoveryPhase::CandidateValidation {
832 started_at: now,
833 validation_results: HashMap::new(),
834 };
835 }
836
837 fn poll_candidate_validation(
838 &mut self,
839 started_at: Instant,
840 validation_results: &HashMap<CandidateId, ValidationResult>,
841 now: Instant,
842 events: &mut Vec<DiscoveryEvent>
843 ) {
844 if started_at.elapsed() > Duration::from_secs(10) {
846 self.complete_validation_with_results(validation_results, events, now);
848 return;
849 }
850
851 let mut updated_results = validation_results.clone();
853 let mut validation_started = false;
854
855 let candidates_to_validate: Vec<(CandidateId, SocketAddr)> = self.session_state
857 .discovered_candidates
858 .iter()
859 .enumerate()
860 .filter_map(|(i, candidate)| {
861 let candidate_id = CandidateId(i as u64);
862 if !updated_results.contains_key(&candidate_id) {
863 Some((candidate_id, candidate.address))
864 } else {
865 None
866 }
867 })
868 .collect();
869
870 for (candidate_id, address) in candidates_to_validate {
872 updated_results.insert(candidate_id, ValidationResult::Pending);
873 validation_started = true;
874
875 debug!("Starting validation for candidate {}: {}", candidate_id.0, address);
876
877 #[cfg(feature = "production-ready")]
878 {
879 self.start_path_validation(candidate_id, address, now);
881 }
882
883 #[cfg(not(feature = "production-ready"))]
884 {
885 self.simulate_path_validation(candidate_id, address, now);
887 }
888 }
889
890 let completed_validations = self.check_validation_completions(&updated_results, now);
892 for (candidate_id, result) in completed_validations {
893 updated_results.insert(candidate_id, result);
894 }
895
896 let all_complete = updated_results.values().all(|result| {
898 !matches!(result, ValidationResult::Pending)
899 });
900
901 if all_complete || validation_started {
902 self.current_phase = DiscoveryPhase::CandidateValidation {
904 started_at,
905 validation_results: updated_results.clone(),
906 };
907 }
908
909 if all_complete {
910 self.complete_validation_with_results(&updated_results, events, now);
911 }
912 }
913
914 #[cfg(feature = "production-ready")]
916 fn start_path_validation(&mut self, candidate_id: CandidateId, candidate_address: SocketAddr, _now: Instant) {
917 debug!("Starting QUIC path validation for candidate {} at {}", candidate_id.0, candidate_address);
918
919 self.simulate_path_validation(candidate_id, candidate_address, _now);
928 }
929
930 fn simulate_path_validation(&mut self, candidate_id: CandidateId, candidate_address: SocketAddr, _now: Instant) {
932 let is_local = candidate_address.ip().is_loopback() ||
934 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("192.168.")) ||
935 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("10.")) ||
936 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("172."));
937
938 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
939
940 debug!("Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
943 candidate_id.0, candidate_address, is_local, is_server_reflexive);
944 }
945
946 fn check_validation_completions(&self, current_results: &HashMap<CandidateId, ValidationResult>, _now: Instant) -> Vec<(CandidateId, ValidationResult)> {
948 let mut completions = Vec::new();
949
950 for (candidate_id, result) in current_results {
951 if matches!(result, ValidationResult::Pending) {
952 if let Some(candidate) = self.session_state.discovered_candidates.get(candidate_id.0 as usize) {
954 let validation_result = self.simulate_validation_result(&candidate.address);
955 completions.push((*candidate_id, validation_result));
956 }
957 }
958 }
959
960 completions
961 }
962
963 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
965 let is_local = address.ip().is_loopback() ||
966 (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168.")) ||
967 (address.ip().is_ipv4() && address.ip().to_string().starts_with("10.")) ||
968 (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
969
970 if is_local {
971 ValidationResult::Valid { rtt: Duration::from_millis(1) }
973 } else if address.ip().is_unspecified() {
974 ValidationResult::Invalid { reason: "Unspecified address".to_string() }
976 } else {
977 ValidationResult::Valid { rtt: Duration::from_millis(50 + (address.port() % 100) as u64) }
979 }
980 }
981
982 fn complete_validation_with_results(
984 &mut self,
985 validation_results: &HashMap<CandidateId, ValidationResult>,
986 events: &mut Vec<DiscoveryEvent>,
987 now: Instant
988 ) {
989 let validated_candidates: Vec<ValidatedCandidate> = self.session_state.discovered_candidates
990 .iter()
991 .enumerate()
992 .filter_map(|(i, candidate)| {
993 let candidate_id = CandidateId(i as u64);
994 validation_results.get(&candidate_id).and_then(|result| {
995 match result {
996 ValidationResult::Valid { rtt } => {
997 Some(ValidatedCandidate {
998 id: candidate_id,
999 address: candidate.address,
1000 source: candidate.source,
1001 priority: candidate.priority,
1002 rtt: Some(*rtt),
1003 reliability_score: self.calculate_reliability_score(candidate, *rtt),
1004 })
1005 }
1006 ValidationResult::Invalid { reason } => {
1007 debug!("Candidate {} at {} failed validation: {}",
1008 candidate_id.0, candidate.address, reason);
1009 None
1010 }
1011 ValidationResult::Timeout => {
1012 debug!("Candidate {} at {} validation timed out",
1013 candidate_id.0, candidate.address);
1014 None
1015 }
1016 ValidationResult::Pending => {
1017 debug!("Candidate {} at {} validation still pending, treating as timeout",
1019 candidate_id.0, candidate.address);
1020 None
1021 }
1022 }
1023 })
1024 })
1025 .collect();
1026
1027 debug!("Validation completed: {} valid candidates out of {} total",
1028 validated_candidates.len(), self.session_state.discovered_candidates.len());
1029
1030 self.complete_discovery(validated_candidates, events, now);
1031 }
1032
1033 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1035 let mut score: f64 = 0.5; match candidate.source {
1039 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
1043
1044 let rtt_ms = rtt.as_millis() as f64;
1046 if rtt_ms < 10.0 {
1047 score += 0.2;
1048 } else if rtt_ms < 50.0 {
1049 score += 0.1;
1050 } else if rtt_ms > 200.0 {
1051 score -= 0.1;
1052 }
1053
1054 if candidate.address.ip().is_ipv6() {
1056 score += 0.05; }
1058
1059 score.max(0.0).min(1.0)
1061 }
1062
1063 fn complete_discovery(&mut self, candidates: Vec<ValidatedCandidate>, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1064 let total_duration = now.duration_since(self.session_state.started_at);
1065 self.session_state.statistics.total_discovery_time = Some(total_duration);
1066
1067 let success_rate = if self.session_state.statistics.bootstrap_queries_sent > 0 {
1068 self.session_state.statistics.bootstrap_queries_successful as f64 / self.session_state.statistics.bootstrap_queries_sent as f64
1069 } else {
1070 1.0
1071 };
1072
1073 events.push(DiscoveryEvent::DiscoveryCompleted {
1074 candidate_count: candidates.len(),
1075 total_duration,
1076 success_rate,
1077 });
1078
1079 self.current_phase = DiscoveryPhase::Completed {
1080 final_candidates: candidates,
1081 completion_time: now,
1082 };
1083
1084 info!("Candidate discovery completed successfully in {:?}", total_duration);
1085 }
1086
1087 fn handle_discovery_timeout(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1090 let error = DiscoveryError::DiscoveryTimeout;
1091 events.push(DiscoveryEvent::DiscoveryFailed {
1092 error: error.clone(),
1093 partial_results: self.session_state.discovered_candidates.iter()
1094 .map(|c| c.to_candidate_address())
1095 .collect(),
1096 });
1097
1098 self.current_phase = DiscoveryPhase::Failed {
1099 error,
1100 failed_at: now,
1101 fallback_options: vec![FallbackStrategy::UseCachedResults, FallbackStrategy::UseMinimalCandidates],
1102 };
1103 }
1104
1105 fn handle_local_scan_timeout(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1106 warn!("Local interface scan timeout, proceeding with available candidates");
1107
1108 events.push(DiscoveryEvent::LocalScanningCompleted {
1109 candidate_count: self.session_state.statistics.local_candidates_found as usize,
1110 duration: now.duration_since(self.session_state.started_at),
1111 });
1112
1113 self.start_server_reflexive_discovery(events, now);
1114 }
1115
1116 fn handle_no_bootstrap_nodes(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1117 let error = DiscoveryError::AllBootstrapsFailed;
1118 events.push(DiscoveryEvent::DiscoveryFailed {
1119 error: error.clone(),
1120 partial_results: self.session_state.discovered_candidates.iter()
1121 .map(|c| c.to_candidate_address())
1122 .collect(),
1123 });
1124
1125 self.current_phase = DiscoveryPhase::Failed {
1126 error,
1127 failed_at: now,
1128 fallback_options: vec![FallbackStrategy::UseMinimalCandidates],
1129 };
1130 }
1131
1132 fn handle_insufficient_bootstrap_responses(&mut self, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1133 warn!("Insufficient bootstrap responses, proceeding with available data");
1134 self.start_candidate_validation(events, now);
1135 }
1136
1137 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1138 match address.ip() {
1139 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
1140 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
1141 }
1142 }
1143
1144 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1145 let mut priority = 100; match address.ip() {
1148 IpAddr::V4(ipv4) => {
1149 if ipv4.is_private() {
1150 priority += 50; }
1152 },
1153 IpAddr::V6(ipv6) => {
1154 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1157 let segments = ipv6.segments();
1158 if segments[0] & 0xE000 == 0x2000 {
1159 priority += 60;
1161 } else if segments[0] & 0xFFC0 == 0xFE80 {
1162 priority += 20;
1164 } else if segments[0] & 0xFE00 == 0xFC00 {
1165 priority += 40;
1167 } else {
1168 priority += 30;
1170 }
1171 }
1172
1173 priority += 10; },
1176 }
1177
1178 if interface.is_wireless {
1179 priority -= 10; }
1181
1182 priority
1183 }
1184
1185 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1186 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1190 priority += 20;
1191 } else if response.response_time > Duration::from_millis(200) {
1192 priority -= 10;
1193 }
1194
1195 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 { 20 } else { 0 };
1197 priority += age_bonus;
1198
1199 priority
1200 }
1201
1202 fn should_transition_to_prediction(&self, responses: &[ServerReflexiveResponse], _now: Instant) -> bool {
1203 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1204 }
1205
1206 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1207 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1209
1210 for response in responses {
1211 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1212 }
1213
1214 address_counts
1215 .into_iter()
1216 .max_by_key(|(_, count)| *count)
1217 .map(|(addr, _)| addr)
1218 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
1219 }
1220
1221 fn calculate_prediction_accuracy(&self, pattern: &PortAllocationPattern, history: &VecDeque<PortAllocationEvent>) -> f64 {
1223 if history.len() < 3 {
1224 return 0.3; }
1226
1227 let recent_ports: Vec<u16> = history.iter()
1229 .rev()
1230 .take(10)
1231 .map(|event| event.port)
1232 .collect();
1233
1234 let mut correct_predictions = 0;
1235 let total_predictions = recent_ports.len().saturating_sub(1);
1236
1237 if total_predictions == 0 {
1238 return 0.3;
1239 }
1240
1241 match pattern.pattern_type {
1242 AllocationPatternType::Sequential => {
1243 for i in 1..recent_ports.len() {
1245 if recent_ports[i-1].wrapping_sub(recent_ports[i]) == 1 {
1246 correct_predictions += 1;
1247 }
1248 }
1249 }
1250 AllocationPatternType::FixedStride => {
1251 for i in 1..recent_ports.len() {
1253 if recent_ports[i-1].wrapping_sub(recent_ports[i]) == pattern.stride {
1254 correct_predictions += 1;
1255 }
1256 }
1257 }
1258 AllocationPatternType::PoolBased => {
1259 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1261 for port in &recent_ports {
1262 if *port >= min_port && *port <= max_port {
1263 correct_predictions += 1;
1264 }
1265 }
1266 }
1267 }
1268 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1269 if recent_ports.len() >= 3 {
1271 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>() / recent_ports.len() as f64;
1272 let variance = recent_ports.iter()
1273 .map(|&p| (p as f64 - mean).powi(2))
1274 .sum::<f64>() / recent_ports.len() as f64;
1275
1276 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1280 }
1281 AllocationPatternType::TimeBased => {
1282 if history.len() >= 2 {
1284 let time_diffs: Vec<Duration> = history.iter()
1285 .collect::<Vec<_>>()
1286 .windows(2)
1287 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1288 .collect();
1289
1290 if !time_diffs.is_empty() {
1291 let avg_diff = time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1292 let variance = time_diffs.iter()
1293 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1294 .sum::<f64>() / time_diffs.len() as f64;
1295
1296 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1300 }
1301 }
1302 }
1303
1304 let accuracy = if total_predictions > 0 {
1306 correct_predictions as f64 / total_predictions as f64
1307 } else {
1308 0.3
1309 };
1310
1311 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1313
1314 confidence_adjusted_accuracy.max(0.2).min(0.9)
1316 }
1317}
1318
1319#[derive(Debug, Clone)]
1321pub struct DiscoveryStatus {
1322 pub phase: DiscoveryPhase,
1323 pub discovered_candidates: Vec<CandidateAddress>,
1324 pub statistics: DiscoveryStatistics,
1325 pub elapsed_time: Duration,
1326}
1327
1328#[derive(Debug, Clone)]
1330pub struct DiscoveryResults {
1331 pub candidates: Vec<ValidatedCandidate>,
1332 pub completion_time: Instant,
1333 pub statistics: DiscoveryStatistics,
1334}
1335
1336pub trait NetworkInterfaceDiscovery {
1340 fn start_scan(&mut self) -> Result<(), String>;
1341 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1342}
1343
1344#[derive(Debug, Clone, PartialEq)]
1346pub struct NetworkInterface {
1347 pub name: String,
1348 pub addresses: Vec<SocketAddr>,
1349 pub is_up: bool,
1350 pub is_wireless: bool,
1351 pub mtu: Option<u16>,
1352}
1353
1354#[cfg(feature = "production-ready")]
1356#[derive(Debug)]
1357struct BootstrapConnection {
1358 connection: crate::Connection,
1360 address: SocketAddr,
1362 established_at: Instant,
1364 request_id: u64,
1366}
1367
1368#[derive(Debug, Clone)]
1370struct AddressObservationRequest {
1371 request_id: u64,
1373 timestamp: u64,
1375 capabilities: u32,
1377}
1378
1379#[derive(Debug)]
1381pub(crate) struct ServerReflexiveDiscovery {
1382 config: DiscoveryConfig,
1383 active_queries: HashMap<BootstrapNodeId, QueryState>,
1385 responses: VecDeque<ServerReflexiveResponse>,
1387 query_timeouts: HashMap<BootstrapNodeId, Instant>,
1389 #[cfg(feature = "production-ready")]
1391 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
1392 #[cfg(feature = "production-ready")]
1394 runtime_handle: Option<tokio::runtime::Handle>,
1395}
1396
1397impl ServerReflexiveDiscovery {
1398 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1399 Self {
1400 config: config.clone(),
1401 active_queries: HashMap::new(),
1402 responses: VecDeque::new(),
1403 query_timeouts: HashMap::new(),
1404 #[cfg(feature = "production-ready")]
1405 active_connections: HashMap::new(),
1406 #[cfg(feature = "production-ready")]
1407 runtime_handle: tokio::runtime::Handle::try_current().ok(),
1408 }
1409 }
1410
1411 pub(crate) fn start_queries(&mut self, bootstrap_nodes: &[BootstrapNodeId], now: Instant) -> HashMap<BootstrapNodeId, QueryState> {
1412 debug!("Starting server reflexive queries to {} bootstrap nodes", bootstrap_nodes.len());
1413
1414 self.active_queries.clear();
1415 self.query_timeouts.clear();
1416
1417 #[cfg(feature = "production-ready")]
1418 self.active_connections.clear();
1419
1420 for &node_id in bootstrap_nodes {
1421 let query_state = QueryState::Pending {
1422 sent_at: now,
1423 attempts: 1,
1424 };
1425
1426 self.active_queries.insert(node_id, query_state);
1427 self.query_timeouts.insert(node_id, now + self.config.bootstrap_query_timeout);
1428
1429 debug!("Starting server reflexive query to bootstrap node {:?}", node_id);
1430
1431 #[cfg(feature = "production-ready")]
1432 {
1433 if let Some(runtime) = &self.runtime_handle {
1435 self.start_quinn_query(node_id, runtime.clone(), now);
1436 } else {
1437 warn!("No async runtime available, falling back to simulation for node {:?}", node_id);
1438 self.simulate_bootstrap_response(node_id, now);
1439 }
1440 }
1441
1442 #[cfg(not(feature = "production-ready"))]
1443 {
1444 self.simulate_bootstrap_response(node_id, now);
1446 }
1447 }
1448
1449 self.active_queries.clone()
1450 }
1451
1452 pub(crate) fn start_queries_with_addresses(
1454 &mut self,
1455 bootstrap_nodes: &[(BootstrapNodeId, SocketAddr)],
1456 now: Instant
1457 ) -> HashMap<BootstrapNodeId, QueryState> {
1458 debug!("Starting server reflexive queries to {} bootstrap nodes with addresses", bootstrap_nodes.len());
1459
1460 self.active_queries.clear();
1461 self.query_timeouts.clear();
1462
1463 #[cfg(feature = "production-ready")]
1464 self.active_connections.clear();
1465
1466 for &(node_id, bootstrap_address) in bootstrap_nodes {
1467 let query_state = QueryState::Pending {
1468 sent_at: now,
1469 attempts: 1,
1470 };
1471
1472 self.active_queries.insert(node_id, query_state);
1473 self.query_timeouts.insert(node_id, now + self.config.bootstrap_query_timeout);
1474
1475 debug!("Starting server reflexive query to bootstrap node {:?} at {}", node_id, bootstrap_address);
1476
1477 #[cfg(feature = "production-ready")]
1478 {
1479 if let Some(_runtime) = &self.runtime_handle {
1481 self.start_quinn_query_with_address(node_id, bootstrap_address, now);
1482 } else {
1483 warn!("No async runtime available, falling back to simulation for node {:?}", node_id);
1484 self.simulate_bootstrap_response(node_id, now);
1485 }
1486 }
1487
1488 #[cfg(not(feature = "production-ready"))]
1489 {
1490 self.simulate_bootstrap_response(node_id, now);
1492 }
1493 }
1494
1495 self.active_queries.clone()
1496 }
1497
1498 #[cfg(feature = "production-ready")]
1500 fn start_quinn_query(&mut self, node_id: BootstrapNodeId, _runtime: tokio::runtime::Handle, now: Instant) {
1501 let request_id = rand::random::<u64>();
1507
1508 debug!("Starting Quinn connection to bootstrap node {:?} with request ID {}", node_id, request_id);
1509
1510 self.simulate_bootstrap_response(node_id, now);
1520 }
1521
1522 #[cfg(feature = "production-ready")]
1524 pub(crate) fn start_quinn_query_with_address(
1525 &mut self,
1526 node_id: BootstrapNodeId,
1527 bootstrap_address: SocketAddr,
1528 now: Instant
1529 ) {
1530
1531 let request_id = rand::random::<u64>();
1532
1533 info!("Establishing Quinn connection to bootstrap node {:?} at {}", node_id, bootstrap_address);
1534
1535 if let Some(runtime) = &self.runtime_handle {
1537 let timeout = self.config.bootstrap_query_timeout;
1538
1539 let (response_tx, _response_rx) = tokio::sync::mpsc::unbounded_channel();
1541
1542 runtime.spawn(async move {
1547 match Self::perform_bootstrap_query(bootstrap_address, request_id, timeout).await {
1548 Ok(observed_address) => {
1549 let response = ServerReflexiveResponse {
1550 bootstrap_node: node_id,
1551 observed_address,
1552 response_time: now.elapsed(),
1553 timestamp: Instant::now(),
1554 };
1555
1556 let _ = response_tx.send(response);
1558
1559 info!("Successfully received observed address {} from bootstrap node {:?}",
1560 observed_address, node_id);
1561 }
1562 Err(e) => {
1563 warn!("Failed to query bootstrap node {:?} at {}: {}", node_id, bootstrap_address, e);
1564 }
1565 }
1566 });
1567 } else {
1568 warn!("No async runtime available for Quinn query to {:?}", node_id);
1569 self.simulate_bootstrap_response(node_id, now);
1570 }
1571 }
1572
1573 #[cfg(feature = "production-ready")]
1578 async fn perform_bootstrap_query(
1579 _bootstrap_address: SocketAddr,
1580 _request_id: u64,
1581 _timeout: Duration,
1582 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1583 Err("Bootstrap query not implemented for low-level API".into())
1585
1586 }
1646
1647 #[cfg(feature = "production-ready")]
1649 fn create_discovery_request(request_id: u64) -> Vec<u8> {
1650 let mut request = Vec::new();
1651
1652 request.extend_from_slice(&request_id.to_be_bytes());
1657 request.extend_from_slice(&std::time::SystemTime::now()
1658 .duration_since(std::time::UNIX_EPOCH)
1659 .unwrap_or_default()
1660 .as_millis().to_be_bytes()[8..16]); request.extend_from_slice(&1u32.to_be_bytes()); debug!("Created discovery request: {} bytes, request_id: {}", request.len(), request_id);
1664 request
1665 }
1666
1667 #[cfg(feature = "production-ready")]
1669 async fn wait_for_add_address_frame(
1670 _connection: &Connection,
1671 _expected_request_id: u64,
1672 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1673 Err("wait_for_add_address_frame not implemented for low-level API".into())
1676
1677 }
1713
1714 #[cfg(feature = "production-ready")]
1716 fn create_response_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
1717 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1720 tx
1722 }
1723
1724 pub(crate) fn poll_queries(&mut self, _active_queries: &HashMap<BootstrapNodeId, QueryState>, now: Instant) -> Vec<ServerReflexiveResponse> {
1725 let mut responses = Vec::new();
1726
1727 while let Some(response) = self.responses.pop_front() {
1729 responses.push(response);
1730 }
1731
1732 let mut timed_out_nodes = Vec::new();
1734 for (&node_id, &timeout) in &self.query_timeouts {
1735 if now >= timeout {
1736 timed_out_nodes.push(node_id);
1737 }
1738 }
1739
1740 for node_id in timed_out_nodes {
1742 self.query_timeouts.remove(&node_id);
1743
1744 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
1745 match query_state {
1746 QueryState::Pending { attempts, .. } if *attempts < self.config.max_query_retries => {
1747 *attempts += 1;
1749 let new_timeout = now + self.config.bootstrap_query_timeout;
1750 self.query_timeouts.insert(node_id, new_timeout);
1751
1752 debug!("Retrying server reflexive query to bootstrap node {:?} (attempt {})", node_id, attempts);
1753
1754 self.simulate_bootstrap_response(node_id, now);
1756 }
1757 _ => {
1758 self.active_queries.insert(node_id, QueryState::Failed);
1760 warn!("Server reflexive query to bootstrap node {:?} failed after retries", node_id);
1761 }
1762 }
1763 }
1764 }
1765
1766 responses
1767 }
1768
1769 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
1772 let simulated_external_addr = match node_id.0 % 3 {
1774 0 => "203.0.113.1:45678".parse().unwrap(),
1775 1 => "198.51.100.2:45679".parse().unwrap(),
1776 _ => "192.0.2.3:45680".parse().unwrap(),
1777 };
1778
1779 let response = ServerReflexiveResponse {
1780 bootstrap_node: node_id,
1781 observed_address: simulated_external_addr,
1782 response_time: Duration::from_millis(50 + node_id.0 * 10),
1783 timestamp: now,
1784 };
1785
1786 self.responses.push_back(response);
1787
1788 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
1790 *query_state = QueryState::Completed;
1791 }
1792
1793 debug!("Received simulated server reflexive response from bootstrap node {:?}: {}",
1794 node_id, simulated_external_addr);
1795 }
1796}
1797
1798#[derive(Debug)]
1800pub(crate) struct SymmetricNatPredictor {
1801 config: DiscoveryConfig,
1802}
1803
1804impl SymmetricNatPredictor {
1805 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1806 Self {
1807 config: config.clone(),
1808 }
1809 }
1810
1811 pub(crate) fn generate_predictions(&mut self, pattern_analysis: &PatternAnalysisState, max_count: usize) -> Vec<DiscoveryCandidate> {
1816 let mut predictions = Vec::new();
1817
1818 if pattern_analysis.allocation_history.is_empty() || max_count == 0 {
1819 return predictions;
1820 }
1821
1822 let recent_events: Vec<_> = pattern_analysis.allocation_history
1824 .iter()
1825 .rev()
1826 .take(5) .collect();
1828
1829 if recent_events.len() < 2 {
1830 return predictions;
1831 }
1832
1833 match &pattern_analysis.detected_pattern {
1834 Some(pattern) => {
1835 predictions.extend(self.generate_pattern_based_predictions(pattern, max_count));
1836 }
1837 None => {
1838 predictions.extend(self.generate_heuristic_predictions(&recent_events, max_count));
1839 }
1840 }
1841
1842 predictions.truncate(max_count);
1844 predictions
1845 }
1846
1847 fn generate_pattern_based_predictions(&self, pattern: &PortAllocationPattern, max_count: usize) -> Vec<DiscoveryCandidate> {
1849 let mut predictions = Vec::new();
1850
1851 match pattern.pattern_type {
1852 AllocationPatternType::Sequential => {
1853 for i in 1..=max_count as u16 {
1855 let predicted_port = pattern.base_port.wrapping_add(i);
1856 if self.is_valid_port(predicted_port) {
1857 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence));
1858 }
1859 }
1860 }
1861 AllocationPatternType::FixedStride => {
1862 for i in 1..=max_count as u16 {
1864 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
1865 if self.is_valid_port(predicted_port) {
1866 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence));
1867 }
1868 }
1869 }
1870 AllocationPatternType::PoolBased => {
1871 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1873 let pool_size = max_port - min_port + 1;
1874 let step = (pool_size / max_count as u16).max(1);
1875
1876 for i in 0..max_count as u16 {
1877 let predicted_port = min_port + (i * step);
1878 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
1879 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence * 0.8));
1880 }
1881 }
1882 }
1883 }
1884 AllocationPatternType::TimeBased => {
1885 for i in 1..=max_count as u16 {
1888 let predicted_port = pattern.base_port.wrapping_add(i);
1889 if self.is_valid_port(predicted_port) {
1890 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence * 0.6));
1891 }
1892 }
1893 }
1894 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1895 predictions.extend(self.generate_statistical_predictions(pattern.base_port, max_count));
1897 }
1898 }
1899
1900 predictions
1901 }
1902
1903 fn generate_heuristic_predictions(&self, recent_events: &[&PortAllocationEvent], max_count: usize) -> Vec<DiscoveryCandidate> {
1905 let mut predictions = Vec::new();
1906
1907 if let Some(latest_event) = recent_events.first() {
1908 let base_port = latest_event.port;
1909
1910 for i in 1..=(max_count / 3) as u16 {
1914 let predicted_port = base_port.wrapping_add(i);
1915 if self.is_valid_port(predicted_port) {
1916 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
1917 }
1918 }
1919
1920 if base_port % 2 == 0 {
1922 let predicted_port = base_port + 1;
1923 if self.is_valid_port(predicted_port) {
1924 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
1925 }
1926 }
1927
1928 for stride in [2, 4, 8, 16] {
1930 if predictions.len() >= max_count {
1931 break;
1932 }
1933 let predicted_port = base_port.wrapping_add(stride);
1934 if self.is_valid_port(predicted_port) {
1935 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
1936 }
1937 }
1938
1939 if recent_events.len() >= 2 {
1941 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
1942 if stride > 0 && stride <= 100 { for i in 1..=3 {
1944 if predictions.len() >= max_count {
1945 break;
1946 }
1947 let predicted_port = base_port.wrapping_add(stride * i);
1948 if self.is_valid_port(predicted_port) {
1949 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
1950 }
1951 }
1952 }
1953 }
1954 }
1955
1956 predictions.truncate(max_count);
1957 predictions
1958 }
1959
1960 fn generate_statistical_predictions(&self, base_port: u16, max_count: usize) -> Vec<DiscoveryCandidate> {
1962 let mut predictions = Vec::new();
1963
1964 let common_ranges = [
1966 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
1971
1972 let current_range = common_ranges.iter()
1974 .find(|(min, max)| base_port >= *min && base_port <= *max)
1975 .copied()
1976 .unwrap_or((1024, 65535));
1977
1978 let range_size = current_range.1 - current_range.0;
1980 let step = (range_size / max_count as u16).max(1);
1981
1982 for i in 0..max_count {
1983 let offset = (i as u16 * step) % range_size;
1984 let predicted_port = current_range.0 + offset;
1985
1986 if self.is_valid_port(predicted_port) && predicted_port != base_port {
1987 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
1988 }
1989 }
1990
1991 predictions
1992 }
1993
1994 fn is_valid_port(&self, port: u16) -> bool {
1996 port >= 1024 && port <= 65535 && port != 0
1998 }
1999
2000 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2002 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
2006
2007 DiscoveryCandidate {
2008 address: SocketAddr::new(
2009 "0.0.0.0".parse().unwrap(), port
2011 ),
2012 priority,
2013 source: DiscoverySourceType::Predicted,
2014 state: CandidateState::New,
2015 }
2016 }
2017
2018 pub(crate) fn analyze_allocation_patterns(&self, history: &VecDeque<PortAllocationEvent>) -> Option<PortAllocationPattern> {
2020 if history.len() < 3 {
2021 return None;
2022 }
2023
2024 let recent_ports: Vec<u16> = history.iter()
2025 .rev()
2026 .take(10)
2027 .map(|event| event.port)
2028 .collect();
2029
2030 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2032 return Some(pattern);
2033 }
2034
2035 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2037 return Some(pattern);
2038 }
2039
2040 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2042 return Some(pattern);
2043 }
2044
2045 if let Some(pattern) = self.detect_time_based_pattern(history) {
2047 return Some(pattern);
2048 }
2049
2050 None
2051 }
2052
2053 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2055 if ports.len() < 3 {
2056 return None;
2057 }
2058
2059 let mut sequential_count = 0;
2060 let mut total_comparisons = 0;
2061
2062 for i in 1..ports.len() {
2063 total_comparisons += 1;
2064 let diff = ports[i-1].wrapping_sub(ports[i]);
2065 if diff == 1 {
2066 sequential_count += 1;
2067 }
2068 }
2069
2070 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2071
2072 if sequential_ratio >= 0.6 { let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2076 pattern_type: AllocationPatternType::Sequential,
2077 base_port: ports[0],
2078 stride: 1,
2079 pool_boundaries: None,
2080 confidence,
2081 })
2082 } else {
2083 None
2084 }
2085 }
2086
2087 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2089 if ports.len() < 4 {
2090 return None;
2091 }
2092
2093 let mut diffs = Vec::new();
2095 for i in 1..ports.len() {
2096 let diff = ports[i-1].wrapping_sub(ports[i]);
2097 if diff > 0 && diff <= 1000 { diffs.push(diff);
2099 }
2100 }
2101
2102 if diffs.len() < 2 {
2103 return None;
2104 }
2105
2106 let mut diff_counts = std::collections::HashMap::new();
2108 for &diff in &diffs {
2109 *diff_counts.entry(diff).or_insert(0) += 1;
2110 }
2111
2112 let (most_common_diff, count) = diff_counts.iter()
2113 .max_by_key(|(_, &count)| count)
2114 .map(|(&diff, &count)| (diff, count))?;
2115
2116 let consistency_ratio = count as f64 / diffs.len() as f64;
2117
2118 if consistency_ratio >= 0.5 && most_common_diff > 1 { let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2122 pattern_type: AllocationPatternType::FixedStride,
2123 base_port: ports[0],
2124 stride: most_common_diff,
2125 pool_boundaries: None,
2126 confidence,
2127 })
2128 } else {
2129 None
2130 }
2131 }
2132
2133 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2135 if ports.len() < 5 {
2136 return None;
2137 }
2138
2139 let min_port = *ports.iter().min()?;
2140 let max_port = *ports.iter().max()?;
2141 let range = max_port - min_port;
2142
2143 if range > 0 && range <= 10000 { let expected_step = range / (ports.len() as u16 - 1);
2147 let mut uniform_score = 0.0;
2148
2149 let mut sorted_ports = ports.to_vec();
2150 sorted_ports.sort_unstable();
2151
2152 for i in 1..sorted_ports.len() {
2153 let actual_step = sorted_ports[i] - sorted_ports[i-1];
2154 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2155 let normalized_diff = step_diff / expected_step as f64;
2156 uniform_score += 1.0 - normalized_diff.min(1.0);
2157 }
2158
2159 uniform_score /= (sorted_ports.len() - 1) as f64;
2160
2161 if uniform_score >= 0.4 { let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2165 pattern_type: AllocationPatternType::PoolBased,
2166 base_port: min_port,
2167 stride: expected_step,
2168 pool_boundaries: Some((min_port, max_port)),
2169 confidence,
2170 })
2171 } else {
2172 None
2173 }
2174 } else {
2175 None
2176 }
2177 }
2178
2179 fn detect_time_based_pattern(&self, history: &VecDeque<PortAllocationEvent>) -> Option<PortAllocationPattern> {
2181 if history.len() < 4 {
2182 return None;
2183 }
2184
2185 let mut time_intervals = Vec::new();
2187 let events: Vec<_> = history.iter().collect();
2188
2189 for i in 1..events.len() {
2190 let interval = events[i-1].timestamp.duration_since(events[i].timestamp);
2191 time_intervals.push(interval);
2192 }
2193
2194 if time_intervals.is_empty() {
2195 return None;
2196 }
2197
2198 let avg_interval = time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2200
2201 let mut consistency_score = 0.0;
2202 for interval in &time_intervals {
2203 let diff = if *interval > avg_interval {
2204 *interval - avg_interval
2205 } else {
2206 avg_interval - *interval
2207 };
2208
2209 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2210 consistency_score += 1.0 - normalized_diff.min(1.0);
2211 }
2212
2213 consistency_score /= time_intervals.len() as f64;
2214
2215 if consistency_score >= 0.6 && avg_interval.as_millis() > 100 && avg_interval.as_millis() < 10000 {
2216 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2219 pattern_type: AllocationPatternType::TimeBased,
2220 base_port: events[0].port,
2221 stride: 1, pool_boundaries: None,
2223 confidence,
2224 })
2225 } else {
2226 None
2227 }
2228 }
2229
2230 pub(crate) fn generate_confidence_scored_predictions(
2232 &mut self,
2233 base_address: SocketAddr,
2234 pattern_analysis: &PatternAnalysisState,
2235 max_count: usize
2236 ) -> Vec<(DiscoveryCandidate, f64)> {
2237 let mut scored_predictions = Vec::new();
2238
2239 let predictions = self.generate_predictions(pattern_analysis, max_count);
2241
2242 for mut prediction in predictions {
2243 prediction.address = SocketAddr::new(base_address.ip(), prediction.address.port());
2245
2246 let confidence = self.calculate_prediction_confidence(&prediction, pattern_analysis, base_address);
2248
2249 scored_predictions.push((prediction, confidence));
2250 }
2251
2252 scored_predictions.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2254
2255 scored_predictions
2256 }
2257
2258 fn calculate_prediction_confidence(
2260 &self,
2261 prediction: &DiscoveryCandidate,
2262 pattern_analysis: &PatternAnalysisState,
2263 base_address: SocketAddr
2264 ) -> f64 {
2265 let mut confidence = 0.5; if let Some(ref pattern) = pattern_analysis.detected_pattern {
2269 confidence += pattern.confidence * 0.3;
2270 }
2271
2272 confidence += pattern_analysis.prediction_accuracy * 0.2;
2274
2275 let port_distance = (prediction.address.port() as i32 - base_address.port() as i32).abs();
2277 let proximity_score = if port_distance <= 10 {
2278 0.2
2279 } else if port_distance <= 100 {
2280 0.1
2281 } else {
2282 0.0
2283 };
2284 confidence += proximity_score;
2285
2286 let port_range_score = match prediction.address.port() {
2288 1024..=4999 => 0.1, 5000..=9999 => 0.15, 10000..=20000 => 0.1, 32768..=65535 => 0.05, _ => 0.0,
2293 };
2294 confidence += port_range_score;
2295
2296 confidence.max(0.0).min(1.0)
2298 }
2299
2300 pub(crate) fn update_pattern_analysis(
2302 &self,
2303 pattern_analysis: &mut PatternAnalysisState,
2304 new_event: PortAllocationEvent
2305 ) {
2306 pattern_analysis.allocation_history.push_back(new_event);
2308
2309 if pattern_analysis.allocation_history.len() > 20 {
2311 pattern_analysis.allocation_history.pop_front();
2312 }
2313
2314 pattern_analysis.detected_pattern = self.analyze_allocation_patterns(&pattern_analysis.allocation_history);
2316
2317 if let Some(ref pattern) = pattern_analysis.detected_pattern {
2319 pattern_analysis.confidence_level = pattern.confidence;
2320 } else {
2321 pattern_analysis.confidence_level *= 0.9; }
2323
2324 pattern_analysis.prediction_accuracy *= 0.95;
2328 }
2329}
2330
2331#[derive(Debug)]
2333pub(crate) struct BootstrapNodeManager {
2334 config: DiscoveryConfig,
2335 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
2336 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
2337 performance_tracker: BootstrapPerformanceTracker,
2338 last_health_check: Option<Instant>,
2339 health_check_interval: Duration,
2340 failover_threshold: f64,
2341 discovery_sources: Vec<BootstrapDiscoverySource>,
2342}
2343
2344#[derive(Debug, Clone)]
2346pub(crate) struct BootstrapNodeInfo {
2347 pub address: SocketAddr,
2349 pub last_seen: Instant,
2351 pub can_coordinate: bool,
2353 pub health_status: BootstrapHealthStatus,
2355 pub capabilities: BootstrapCapabilities,
2357 pub priority: u32,
2359 pub discovery_source: BootstrapDiscoverySource,
2361}
2362
2363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2365pub(crate) enum BootstrapHealthStatus {
2366 Healthy,
2368 Degraded,
2370 Unhealthy,
2372 Unknown,
2374}
2375
2376#[derive(Debug, Clone, Default)]
2378pub(crate) struct BootstrapCapabilities {
2379 pub supports_nat_traversal: bool,
2381 pub supports_ipv6: bool,
2383 pub supports_quic_extensions: bool,
2385 pub max_concurrent_coordinations: u32,
2387 pub supported_quic_versions: Vec<u32>,
2389}
2390
2391#[derive(Debug, Clone, Default)]
2393pub(crate) struct BootstrapHealthStats {
2394 pub connection_attempts: u32,
2396 pub successful_connections: u32,
2398 pub failed_connections: u32,
2400 pub average_rtt: Option<Duration>,
2402 pub recent_rtts: VecDeque<Duration>,
2404 pub last_health_check: Option<Instant>,
2406 pub consecutive_failures: u32,
2408 pub coordination_requests: u32,
2410 pub successful_coordinations: u32,
2412}
2413
2414#[derive(Debug, Default)]
2416pub(crate) struct BootstrapPerformanceTracker {
2417 pub overall_success_rate: f64,
2419 pub average_response_time: Duration,
2421 pub best_performers: Vec<BootstrapNodeId>,
2423 pub failover_nodes: Vec<BootstrapNodeId>,
2425 pub performance_history: VecDeque<PerformanceSnapshot>,
2427}
2428
2429#[derive(Debug, Clone)]
2431pub(crate) struct PerformanceSnapshot {
2432 pub timestamp: Instant,
2433 pub active_nodes: u32,
2434 pub success_rate: f64,
2435 pub average_rtt: Duration,
2436}
2437
2438#[derive(Debug, Clone, PartialEq, Eq)]
2440pub(crate) enum BootstrapDiscoverySource {
2441 Static,
2443 DNS,
2445 DHT,
2447 Multicast,
2449 UserProvided,
2451}
2452
2453impl BootstrapNodeManager {
2454 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2455 Self {
2456 config: config.clone(),
2457 bootstrap_nodes: HashMap::new(),
2458 health_stats: HashMap::new(),
2459 performance_tracker: BootstrapPerformanceTracker::default(),
2460 last_health_check: None,
2461 health_check_interval: Duration::from_secs(30),
2462 failover_threshold: 0.3, discovery_sources: vec![
2464 BootstrapDiscoverySource::Static,
2465 BootstrapDiscoverySource::DNS,
2466 BootstrapDiscoverySource::UserProvided,
2467 ],
2468 }
2469 }
2470
2471 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
2473 let now = Instant::now();
2474
2475 for (i, node) in nodes.into_iter().enumerate() {
2477 let node_id = BootstrapNodeId(i as u64);
2478
2479 let node_info = BootstrapNodeInfo {
2480 address: node.address,
2481 last_seen: node.last_seen,
2482 can_coordinate: node.can_coordinate,
2483 health_status: BootstrapHealthStatus::Unknown,
2484 capabilities: BootstrapCapabilities {
2485 supports_nat_traversal: node.can_coordinate,
2486 supports_ipv6: node.address.is_ipv6(),
2487 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
2491 priority: self.calculate_initial_priority(&node),
2492 discovery_source: BootstrapDiscoverySource::UserProvided,
2493 };
2494
2495 self.bootstrap_nodes.insert(node_id, node_info);
2496
2497 if !self.health_stats.contains_key(&node_id) {
2499 self.health_stats.insert(node_id, BootstrapHealthStats::default());
2500 }
2501 }
2502
2503 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
2504 self.schedule_health_check(now);
2505 }
2506
2507 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
2509 let mut active_nodes: Vec<_> = self.bootstrap_nodes
2510 .iter()
2511 .filter(|(_, node)| {
2512 matches!(node.health_status, BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown)
2513 })
2514 .map(|(&id, node)| (id, node))
2515 .collect();
2516
2517 active_nodes.sort_by(|a, b| {
2519 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
2521 if health_cmp != std::cmp::Ordering::Equal {
2522 return health_cmp;
2523 }
2524
2525 b.1.priority.cmp(&a.1.priority)
2527 });
2528
2529 active_nodes.into_iter().map(|(id, _)| id).collect()
2530 }
2531
2532 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
2534 self.bootstrap_nodes.get(&id).map(|node| node.address)
2535 }
2536
2537 pub(crate) fn perform_health_check(&mut self, now: Instant) {
2539 if let Some(last_check) = self.last_health_check {
2540 if now.duration_since(last_check) < self.health_check_interval {
2541 return; }
2543 }
2544
2545 debug!("Performing health check on {} bootstrap nodes", self.bootstrap_nodes.len());
2546
2547 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
2549
2550 for node_id in node_ids {
2551 self.check_node_health(node_id, now);
2552 }
2553
2554 self.update_performance_metrics(now);
2555 self.last_health_check = Some(now);
2556 }
2557
2558 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
2560 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
2562 if node_info_opt.is_none() {
2563 return; }
2565 let node_info_for_priority = node_info_opt.unwrap();
2566 let current_health_status = node_info_for_priority.health_status;
2567
2568 let (_success_rate, new_health_status, _average_rtt) = {
2570 let stats = self.health_stats.get_mut(&node_id).unwrap();
2571
2572 let success_rate = if stats.connection_attempts > 0 {
2574 stats.successful_connections as f64 / stats.connection_attempts as f64
2575 } else {
2576 1.0 };
2578
2579 if !stats.recent_rtts.is_empty() {
2581 let total_rtt: Duration = stats.recent_rtts.iter().sum();
2582 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
2583 }
2584
2585 let new_health_status = if stats.consecutive_failures >= 3 {
2587 BootstrapHealthStatus::Unhealthy
2588 } else if success_rate < self.failover_threshold {
2589 BootstrapHealthStatus::Degraded
2590 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
2591 BootstrapHealthStatus::Healthy
2592 } else {
2593 current_health_status };
2595
2596 stats.last_health_check = Some(now);
2597
2598 (success_rate, new_health_status, stats.average_rtt)
2599 };
2600
2601 let stats_snapshot = self.health_stats.get(&node_id).unwrap();
2603 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
2604
2605 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2607 if new_health_status != node_info.health_status {
2608 info!("Bootstrap node {:?} health status changed: {:?} -> {:?}",
2609 node_id, node_info.health_status, new_health_status);
2610 node_info.health_status = new_health_status;
2611 }
2612
2613 node_info.priority = new_priority;
2614 }
2615 }
2616
2617 pub(crate) fn record_connection_attempt(&mut self, node_id: BootstrapNodeId, success: bool, rtt: Option<Duration>) {
2619 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2620 stats.connection_attempts += 1;
2621
2622 if success {
2623 stats.successful_connections += 1;
2624 stats.consecutive_failures = 0;
2625
2626 if let Some(rtt) = rtt {
2627 stats.recent_rtts.push_back(rtt);
2628 if stats.recent_rtts.len() > 10 {
2629 stats.recent_rtts.pop_front();
2630 }
2631 }
2632 } else {
2633 stats.failed_connections += 1;
2634 stats.consecutive_failures += 1;
2635 }
2636 }
2637
2638 if success {
2640 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2641 node_info.last_seen = Instant::now();
2642 }
2643 }
2644 }
2645
2646 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
2648 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2649 stats.coordination_requests += 1;
2650 if success {
2651 stats.successful_coordinations += 1;
2652 }
2653 }
2654 }
2655
2656 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
2658 let mut nodes_with_scores: Vec<_> = self.bootstrap_nodes
2659 .iter()
2660 .filter_map(|(&id, node)| {
2661 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
2662 let score = self.calculate_performance_score(id, node);
2663 Some((id, score))
2664 } else {
2665 None
2666 }
2667 })
2668 .collect();
2669
2670 nodes_with_scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2671
2672 nodes_with_scores
2673 .into_iter()
2674 .take(count)
2675 .map(|(id, _)| id)
2676 .collect()
2677 }
2678
2679 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
2681 let mut discovered_nodes = Vec::new();
2682
2683 if let Ok(dns_nodes) = self.discover_via_dns() {
2685 discovered_nodes.extend(dns_nodes);
2686 }
2687
2688 if let Ok(multicast_nodes) = self.discover_via_multicast() {
2690 discovered_nodes.extend(multicast_nodes);
2691 }
2692
2693 for node in &discovered_nodes {
2695 let node_id = BootstrapNodeId(rand::random());
2696 self.bootstrap_nodes.insert(node_id, node.clone());
2697 self.health_stats.insert(node_id, BootstrapHealthStats::default());
2698 }
2699
2700 if !discovered_nodes.is_empty() {
2701 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
2702 }
2703
2704 Ok(discovered_nodes)
2705 }
2706
2707 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
2709 debug!("DNS-based bootstrap discovery not yet implemented");
2712 Ok(Vec::new())
2713 }
2714
2715 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
2717 debug!("Multicast-based bootstrap discovery not yet implemented");
2720 Ok(Vec::new())
2721 }
2722
2723 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
2725 let mut priority = 100; if node.can_coordinate {
2728 priority += 50;
2729 }
2730
2731 if let Some(rtt) = node.rtt {
2732 if rtt < Duration::from_millis(50) {
2733 priority += 30;
2734 } else if rtt < Duration::from_millis(100) {
2735 priority += 20;
2736 } else if rtt < Duration::from_millis(200) {
2737 priority += 10;
2738 }
2739 }
2740
2741 if node.address.is_ipv6() {
2743 priority += 10;
2744 }
2745
2746 priority
2747 }
2748
2749 fn calculate_dynamic_priority(&self, node_info: &BootstrapNodeInfo, stats: &BootstrapHealthStats) -> u32 {
2751 let mut priority = node_info.priority;
2752
2753 let success_rate = if stats.connection_attempts > 0 {
2755 stats.successful_connections as f64 / stats.connection_attempts as f64
2756 } else {
2757 1.0
2758 };
2759
2760 priority = (priority as f64 * success_rate) as u32;
2761
2762 if let Some(avg_rtt) = stats.average_rtt {
2764 if avg_rtt < Duration::from_millis(50) {
2765 priority += 20;
2766 } else if avg_rtt > Duration::from_millis(500) {
2767 priority = priority.saturating_sub(20);
2768 }
2769 }
2770
2771 priority = priority.saturating_sub(stats.consecutive_failures * 10);
2773
2774 priority.max(1) }
2776
2777 fn calculate_performance_score(&self, node_id: BootstrapNodeId, _node_info: &BootstrapNodeInfo) -> f64 {
2779 let stats = self.health_stats.get(&node_id).unwrap();
2780
2781 let mut score = 0.0;
2782
2783 let success_rate = if stats.connection_attempts > 0 {
2785 stats.successful_connections as f64 / stats.connection_attempts as f64
2786 } else {
2787 1.0
2788 };
2789 score += success_rate * 0.4;
2790
2791 if let Some(avg_rtt) = stats.average_rtt {
2793 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
2794 score += rtt_score * 0.3;
2795 } else {
2796 score += 0.3; }
2798
2799 let coord_success_rate = if stats.coordination_requests > 0 {
2801 stats.successful_coordinations as f64 / stats.coordination_requests as f64
2802 } else {
2803 1.0
2804 };
2805 score += coord_success_rate * 0.2;
2806
2807 let stability_score = if stats.consecutive_failures == 0 {
2809 1.0
2810 } else {
2811 1.0 / (stats.consecutive_failures as f64 + 1.0)
2812 };
2813 score += stability_score * 0.1;
2814
2815 score
2816 }
2817
2818 fn compare_health_status(&self, a: BootstrapHealthStatus, b: BootstrapHealthStatus) -> std::cmp::Ordering {
2820 use std::cmp::Ordering;
2821
2822 match (a, b) {
2823 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
2824 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
2826 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
2827 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
2829 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
2830 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
2832 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
2833 }
2834 }
2835
2836 fn update_performance_metrics(&mut self, now: Instant) {
2838 let mut total_attempts = 0;
2839 let mut total_successes = 0;
2840 let mut total_rtt = Duration::ZERO;
2841 let mut rtt_count = 0;
2842
2843 for stats in self.health_stats.values() {
2844 total_attempts += stats.connection_attempts;
2845 total_successes += stats.successful_connections;
2846
2847 if let Some(avg_rtt) = stats.average_rtt {
2848 total_rtt += avg_rtt;
2849 rtt_count += 1;
2850 }
2851 }
2852
2853 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
2854 total_successes as f64 / total_attempts as f64
2855 } else {
2856 1.0
2857 };
2858
2859 self.performance_tracker.average_response_time = if rtt_count > 0 {
2860 total_rtt / rtt_count
2861 } else {
2862 Duration::from_millis(100) };
2864
2865 self.performance_tracker.best_performers = self.get_best_performers(5);
2867
2868 let snapshot = PerformanceSnapshot {
2870 timestamp: now,
2871 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
2872 success_rate: self.performance_tracker.overall_success_rate,
2873 average_rtt: self.performance_tracker.average_response_time,
2874 };
2875
2876 self.performance_tracker.performance_history.push_back(snapshot);
2877 if self.performance_tracker.performance_history.len() > 100 {
2878 self.performance_tracker.performance_history.pop_front();
2879 }
2880 }
2881
2882 fn schedule_health_check(&mut self, _now: Instant) {
2884 }
2887
2888 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
2890 &self.performance_tracker
2891 }
2892
2893 pub(crate) fn get_node_health_stats(&self, node_id: BootstrapNodeId) -> Option<&BootstrapHealthStats> {
2895 self.health_stats.get(&node_id)
2896 }
2897}
2898
2899#[derive(Debug)]
2901pub(crate) struct DiscoveryCache {
2902 config: DiscoveryConfig,
2903}
2904
2905impl DiscoveryCache {
2906 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2907 Self {
2908 config: config.clone(),
2909 }
2910 }
2911}
2912
2913pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
2915 #[cfg(target_os = "windows")]
2916 return Box::new(WindowsInterfaceDiscovery::new());
2917
2918 #[cfg(target_os = "linux")]
2919 return Box::new(LinuxInterfaceDiscovery::new());
2920
2921 #[cfg(target_os = "macos")]
2922 return Box::new(MacOSInterfaceDiscovery::new());
2923
2924 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
2925 return Box::new(GenericInterfaceDiscovery::new());
2926}
2927
2928pub(crate) struct GenericInterfaceDiscovery {
2938 scan_complete: bool,
2939}
2940
2941impl GenericInterfaceDiscovery {
2942 pub(crate) fn new() -> Self {
2943 Self { scan_complete: false }
2944 }
2945}
2946
2947impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
2948 fn start_scan(&mut self) -> Result<(), String> {
2949 self.scan_complete = true;
2951 Ok(())
2952 }
2953
2954 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
2955 if self.scan_complete {
2956 self.scan_complete = false;
2957 Some(vec![
2958 NetworkInterface {
2959 name: "generic".to_string(),
2960 addresses: vec!["127.0.0.1:0".parse().unwrap()],
2961 is_up: true,
2962 is_wireless: false,
2963 mtu: Some(1500),
2964 }
2965 ])
2966 } else {
2967 None
2968 }
2969 }
2970}
2971
2972impl std::fmt::Display for DiscoveryError {
2973 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2974 match self {
2975 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
2976 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
2977 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
2978 Self::InsufficientCandidates { found, required } => write!(f, "insufficient candidates found: {} < {}", found, required),
2979 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
2980 Self::ConfigurationError(msg) => write!(f, "configuration error: {}", msg),
2981 Self::InternalError(msg) => write!(f, "internal error: {}", msg),
2982 }
2983 }
2984}
2985
2986impl std::error::Error for DiscoveryError {}
2987
2988pub mod test_utils {
2990 use super::*;
2991
2992 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
2994 let mut priority = 100; match address {
2996 IpAddr::V4(ipv4) => {
2997 if ipv4.is_private() {
2998 priority += 50; }
3000 },
3001 IpAddr::V6(ipv6) => {
3002 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3005 let segments = ipv6.segments();
3006 if segments[0] & 0xE000 == 0x2000 {
3007 priority += 60;
3009 } else if segments[0] & 0xFFC0 == 0xFE80 {
3010 priority += 20;
3012 } else if segments[0] & 0xFE00 == 0xFC00 {
3013 priority += 40;
3015 } else {
3016 priority += 30;
3018 }
3019 }
3020
3021 priority += 10; },
3024 }
3025 priority
3026 }
3027
3028 pub fn is_valid_address(address: &IpAddr) -> bool {
3030 match address {
3031 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3032 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3033 }
3034 }
3035}