1use std::{
10 collections::{HashMap, VecDeque},
11 net::{IpAddr, SocketAddr},
12 sync::Arc,
13 time::{Duration, Instant},
14};
15
16use tracing::{debug, error, info, warn};
17
18use crate::Connection;
19
20use crate::{
21 connection::nat_traversal::{CandidateSource, CandidateState},
22 nat_traversal_api::{BootstrapNode, CandidateAddress, PeerId},
23};
24
25#[cfg(target_os = "windows")]
27pub mod windows;
28
29#[cfg(target_os = "windows")]
30pub use windows::WindowsInterfaceDiscovery;
31
32#[cfg(target_os = "linux")]
33pub mod linux;
34
35#[cfg(target_os = "linux")]
36pub use linux::LinuxInterfaceDiscovery;
37
38#[cfg(target_os = "macos")]
39pub(crate) mod macos;
40
41#[cfg(target_os = "macos")]
42pub(crate) use macos::MacOSInterfaceDiscovery;
43
44fn convert_to_nat_source(discovery_source: DiscoverySourceType) -> CandidateSource {
46 match discovery_source {
47 DiscoverySourceType::Local => CandidateSource::Local,
48 DiscoverySourceType::ServerReflexive => CandidateSource::Observed { by_node: None },
49 DiscoverySourceType::Predicted => CandidateSource::Predicted,
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum DiscoverySourceType {
56 Local,
57 ServerReflexive,
58 Predicted,
59}
60
61#[derive(Debug, Clone)]
63pub(crate) struct DiscoveryCandidate {
64 pub address: SocketAddr,
65 pub priority: u32,
66 pub source: DiscoverySourceType,
67 pub state: CandidateState,
68}
69
70impl DiscoveryCandidate {
71 pub(crate) fn to_candidate_address(&self) -> CandidateAddress {
73 CandidateAddress {
74 address: self.address,
75 priority: self.priority,
76 source: convert_to_nat_source(self.source),
77 state: self.state,
78 }
79 }
80}
81
82#[derive(Debug)]
84pub struct DiscoverySession {
85 peer_id: PeerId,
87 session_id: u64,
89 current_phase: DiscoveryPhase,
91 started_at: Instant,
93 discovered_candidates: Vec<DiscoveryCandidate>,
95 statistics: DiscoveryStatistics,
97 allocation_history: VecDeque<PortAllocationEvent>,
99 server_reflexive_discovery: ServerReflexiveDiscovery,
101}
102
103pub struct CandidateDiscoveryManager {
105 config: DiscoveryConfig,
107 interface_discovery: Arc<std::sync::Mutex<Box<dyn NetworkInterfaceDiscovery + Send>>>,
109 symmetric_predictor: Arc<std::sync::Mutex<SymmetricNatPredictor>>,
111 bootstrap_manager: Arc<BootstrapNodeManager>,
113 cache: DiscoveryCache,
115 active_sessions: HashMap<PeerId, DiscoverySession>,
117 cached_local_candidates: Option<(Instant, Vec<ValidatedCandidate>)>,
119 local_cache_duration: Duration,
121 pending_validations: HashMap<CandidateId, PendingValidation>,
123}
124
125#[derive(Debug, Clone)]
127pub struct DiscoveryConfig {
128 pub total_timeout: Duration,
130 pub local_scan_timeout: Duration,
132 pub bootstrap_query_timeout: Duration,
134 pub max_query_retries: u32,
136 pub max_candidates: usize,
138 pub enable_symmetric_prediction: bool,
140 pub min_bootstrap_consensus: usize,
142 pub interface_cache_ttl: Duration,
144 pub server_reflexive_cache_ttl: Duration,
146 pub bound_address: Option<SocketAddr>,
148}
149
150#[derive(Debug, Clone, PartialEq)]
152pub enum DiscoveryPhase {
153 Idle,
155 LocalInterfaceScanning {
157 started_at: Instant,
158 },
159 ServerReflexiveQuerying {
161 started_at: Instant,
162 active_queries: HashMap<BootstrapNodeId, QueryState>,
163 responses_received: Vec<ServerReflexiveResponse>,
164 },
165 SymmetricNatPrediction {
167 started_at: Instant,
168 prediction_attempts: u32,
169 pattern_analysis: PatternAnalysisState,
170 },
171 CandidateValidation {
173 started_at: Instant,
174 validation_results: HashMap<CandidateId, ValidationResult>,
175 },
176 Completed {
178 final_candidates: Vec<ValidatedCandidate>,
179 completion_time: Instant,
180 },
181 Failed {
183 error: DiscoveryError,
184 failed_at: Instant,
185 fallback_options: Vec<FallbackStrategy>,
186 },
187}
188
189#[derive(Debug, Clone)]
191pub enum DiscoveryEvent {
192 DiscoveryStarted {
194 peer_id: PeerId,
195 bootstrap_count: usize,
196 },
197 LocalScanningStarted,
199 LocalCandidateDiscovered {
201 candidate: CandidateAddress,
202 },
203 LocalScanningCompleted {
205 candidate_count: usize,
206 duration: Duration,
207 },
208 ServerReflexiveDiscoveryStarted {
210 bootstrap_count: usize,
211 },
212 ServerReflexiveCandidateDiscovered {
214 candidate: CandidateAddress,
215 bootstrap_node: SocketAddr,
216 },
217 BootstrapQueryFailed {
219 bootstrap_node: SocketAddr,
220 error: String,
221 },
222 SymmetricPredictionStarted {
224 base_address: SocketAddr,
225 },
226 PredictedCandidateGenerated {
228 candidate: CandidateAddress,
229 confidence: f64,
230 },
231 PortAllocationDetected {
233 port: u16,
234 source_address: SocketAddr,
235 bootstrap_node: BootstrapNodeId,
236 timestamp: Instant,
237 },
238 DiscoveryCompleted {
240 candidate_count: usize,
241 total_duration: Duration,
242 success_rate: f64,
243 },
244 DiscoveryFailed {
246 error: DiscoveryError,
247 partial_results: Vec<CandidateAddress>,
248 },
249 PathValidationRequested {
251 candidate_id: CandidateId,
252 candidate_address: SocketAddr,
253 challenge_token: u64,
254 },
255 PathValidationResponse {
257 candidate_id: CandidateId,
258 candidate_address: SocketAddr,
259 challenge_token: u64,
260 rtt: Duration,
261 },
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
266pub struct BootstrapNodeId(pub u64);
267
268struct PendingValidation {
270 candidate_address: SocketAddr,
272 challenge_token: u64,
274 started_at: Instant,
276 attempts: u32,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
282pub enum QueryState {
283 Pending {
285 sent_at: Instant,
286 attempts: u32,
287 },
288 Completed,
290 Failed,
292}
293
294#[derive(Debug, Clone, PartialEq)]
296pub struct ServerReflexiveResponse {
297 pub bootstrap_node: BootstrapNodeId,
298 pub observed_address: SocketAddr,
299 pub response_time: Duration,
300 pub timestamp: Instant,
301}
302
303#[derive(Debug, Clone, PartialEq)]
305pub struct PatternAnalysisState {
306 pub allocation_history: VecDeque<PortAllocationEvent>,
307 pub detected_pattern: Option<PortAllocationPattern>,
308 pub confidence_level: f64,
309 pub prediction_accuracy: f64,
310}
311
312#[derive(Debug, Clone, PartialEq)]
314pub struct PortAllocationEvent {
315 pub port: u16,
316 pub timestamp: Instant,
317 pub source_address: SocketAddr,
318}
319
320#[derive(Debug, Clone, PartialEq)]
322pub struct PortAllocationPattern {
323 pub pattern_type: AllocationPatternType,
324 pub base_port: u16,
325 pub stride: u16,
326 pub pool_boundaries: Option<(u16, u16)>,
327 pub confidence: f64,
328}
329
330#[derive(Debug, Clone, PartialEq, Eq)]
332pub enum AllocationPatternType {
333 Sequential,
335 FixedStride,
337 Random,
339 PoolBased,
341 TimeBased,
343 Unknown,
345}
346
347#[derive(Debug, Clone)]
349pub struct PortPatternAnalysis {
350 pub pattern: PortAllocationPattern,
352 pub increment: Option<i32>,
354 pub base_port: u16,
356}
357
358#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
360pub struct CandidateId(pub u64);
361
362#[derive(Debug, Clone, PartialEq)]
364pub enum ValidationResult {
365 Valid { rtt: Duration },
366 Invalid { reason: String },
367 Timeout,
368 Pending,
369}
370
371#[derive(Debug, Clone, PartialEq)]
373pub struct ValidatedCandidate {
374 pub id: CandidateId,
375 pub address: SocketAddr,
376 pub source: DiscoverySourceType,
377 pub priority: u32,
378 pub rtt: Option<Duration>,
379 pub reliability_score: f64,
380}
381
382impl ValidatedCandidate {
383 pub fn to_candidate_address(&self) -> CandidateAddress {
385 CandidateAddress {
386 address: self.address,
387 priority: self.priority,
388 source: convert_to_nat_source(self.source),
389 state: CandidateState::Valid,
390 }
391 }
392}
393
394#[derive(Debug)]
396pub(crate) struct DiscoverySessionState {
397 pub peer_id: PeerId,
398 pub session_id: u64,
399 pub started_at: Instant,
400 pub discovered_candidates: Vec<DiscoveryCandidate>,
401 pub statistics: DiscoveryStatistics,
402 pub allocation_history: VecDeque<PortAllocationEvent>,
403}
404
405#[derive(Debug, Default, Clone)]
407pub struct DiscoveryStatistics {
408 pub local_candidates_found: u32,
409 pub server_reflexive_candidates_found: u32,
410 pub predicted_candidates_generated: u32,
411 pub bootstrap_queries_sent: u32,
412 pub bootstrap_queries_successful: u32,
413 pub total_discovery_time: Option<Duration>,
414 pub average_bootstrap_rtt: Option<Duration>,
415}
416
417#[derive(Debug, Clone, PartialEq, Eq)]
419pub enum DiscoveryError {
420 NoLocalInterfaces,
422 AllBootstrapsFailed,
424 DiscoveryTimeout,
426 InsufficientCandidates { found: usize, required: usize },
428 NetworkError(String),
430 ConfigurationError(String),
432 InternalError(String),
434}
435
436#[derive(Debug, Clone, PartialEq, Eq)]
438pub enum FallbackStrategy {
439 UseCachedResults,
441 RetryWithRelaxedParams,
443 UseMinimalCandidates,
445 EnableRelayFallback,
447}
448
449impl Default for DiscoveryConfig {
450 fn default() -> Self {
451 Self {
452 total_timeout: Duration::from_secs(30),
453 local_scan_timeout: Duration::from_secs(2),
454 bootstrap_query_timeout: Duration::from_secs(5),
455 max_query_retries: 3,
456 max_candidates: 8,
457 enable_symmetric_prediction: true,
458 min_bootstrap_consensus: 2,
459 interface_cache_ttl: Duration::from_secs(60),
460 server_reflexive_cache_ttl: Duration::from_secs(300),
461 bound_address: None,
462 }
463 }
464}
465
466impl DiscoverySession {
467 fn new(peer_id: PeerId, config: &DiscoveryConfig) -> Self {
469 Self {
470 peer_id,
471 session_id: rand::random(),
472 current_phase: DiscoveryPhase::Idle,
473 started_at: Instant::now(),
474 discovered_candidates: Vec::new(),
475 statistics: DiscoveryStatistics::default(),
476 allocation_history: VecDeque::new(),
477 server_reflexive_discovery: ServerReflexiveDiscovery::new(config),
478 }
479 }
480}
481
482impl CandidateDiscoveryManager {
483 pub fn new(config: DiscoveryConfig) -> Self {
485 let interface_discovery = Arc::new(std::sync::Mutex::new(
486 create_platform_interface_discovery()
487 ));
488 let symmetric_predictor = Arc::new(std::sync::Mutex::new(
489 SymmetricNatPredictor::new(&config)
490 ));
491 let bootstrap_manager = Arc::new(BootstrapNodeManager::new(&config));
492 let cache = DiscoveryCache::new(&config);
493 let local_cache_duration = config.interface_cache_ttl;
494
495 Self {
496 config,
497 interface_discovery,
498 symmetric_predictor,
499 bootstrap_manager,
500 cache,
501 active_sessions: HashMap::new(),
502 cached_local_candidates: None,
503 local_cache_duration,
504 pending_validations: HashMap::new(),
505 }
506 }
507
508 pub fn set_bound_address(&mut self, address: SocketAddr) {
510 self.config.bound_address = Some(address);
511 self.cached_local_candidates = None;
513 }
514
515 pub fn discover_local_candidates(&mut self) -> Result<Vec<ValidatedCandidate>, DiscoveryError> {
517 self.interface_discovery.lock().unwrap().start_scan().map_err(|e| {
519 DiscoveryError::NetworkError(format!("Failed to start interface scan: {}", e))
520 })?;
521
522 let start = Instant::now();
524 let timeout = Duration::from_secs(2);
525
526 loop {
527 if start.elapsed() > timeout {
528 return Err(DiscoveryError::DiscoveryTimeout);
529 }
530
531 if let Some(interfaces) = self.interface_discovery.lock().unwrap().check_scan_complete() {
532 let mut candidates = Vec::new();
534
535 for interface in interfaces {
536 for addr in interface.addresses {
537 candidates.push(ValidatedCandidate {
538 id: CandidateId(rand::random()),
539 address: addr,
540 source: DiscoverySourceType::Local,
541 priority: 50000, rtt: None,
543 reliability_score: 1.0,
544 });
545 }
546 }
547
548 if candidates.is_empty() {
549 return Err(DiscoveryError::NoLocalInterfaces);
550 }
551
552 return Ok(candidates);
553 }
554
555 std::thread::sleep(Duration::from_millis(10));
557 }
558 }
559
560 pub fn start_discovery(&mut self, peer_id: PeerId, _bootstrap_nodes: Vec<BootstrapNode>) -> Result<(), DiscoveryError> {
562 if self.active_sessions.contains_key(&peer_id) {
564 return Err(DiscoveryError::InternalError(
565 format!("Discovery already in progress for peer {:?}", peer_id)
566 ));
567 }
568
569 info!("Starting candidate discovery for peer {:?}", peer_id);
570
571 let mut session = DiscoverySession::new(peer_id, &self.config);
573
574 session.current_phase = DiscoveryPhase::LocalInterfaceScanning {
579 started_at: Instant::now(),
580 };
581
582 self.active_sessions.insert(peer_id, session);
584
585 Ok(())
586 }
587
588 pub fn poll(&mut self, now: Instant) -> Vec<DiscoveryEvent> {
590 let mut all_events = Vec::new();
591 let mut completed_sessions = Vec::new();
592
593 let mut local_scan_events = Vec::new();
596 for (peer_id, session) in &mut self.active_sessions {
597 match &session.current_phase {
598 DiscoveryPhase::LocalInterfaceScanning { started_at } => {
599 if started_at.elapsed() > self.config.local_scan_timeout {
601 local_scan_events.push((*peer_id, DiscoveryEvent::LocalScanningCompleted {
602 candidate_count: 0,
603 duration: started_at.elapsed(),
604 }));
605 }
606 }
607 _ => {}
608 }
609 }
610
611 for (peer_id, event) in local_scan_events {
613 all_events.push(event);
614 if let Some(session) = self.active_sessions.get_mut(&peer_id) {
615 session.current_phase = DiscoveryPhase::Completed {
617 final_candidates: session.discovered_candidates.iter()
618 .map(|dc| ValidatedCandidate {
619 id: CandidateId(0),
620 address: dc.address,
621 source: dc.source,
622 priority: dc.priority,
623 rtt: None,
624 reliability_score: 1.0,
625 })
626 .collect(),
627 completion_time: now,
628 };
629
630 all_events.push(DiscoveryEvent::DiscoveryCompleted {
631 candidate_count: session.discovered_candidates.len(),
632 total_duration: now.duration_since(session.started_at),
633 success_rate: 1.0,
634 });
635
636 completed_sessions.push(peer_id);
637 }
638 }
639
640 for peer_id in completed_sessions {
642 self.active_sessions.remove(&peer_id);
643 debug!("Removed completed discovery session for peer {:?}", peer_id);
644 }
645
646 all_events
647 }
648
649
650 pub fn get_status(&self) -> DiscoveryStatus {
652 DiscoveryStatus {
654 phase: DiscoveryPhase::Idle,
655 discovered_candidates: Vec::new(),
656 statistics: DiscoveryStatistics::default(),
657 elapsed_time: Duration::from_secs(0),
658 }
659 }
660
661 pub fn is_complete(&self) -> bool {
663 self.active_sessions.values().all(|session| {
665 matches!(session.current_phase, DiscoveryPhase::Completed { .. } | DiscoveryPhase::Failed { .. })
666 })
667 }
668
669 pub fn get_results(&self) -> Option<DiscoveryResults> {
671 if self.active_sessions.is_empty() {
673 return None;
674 }
675
676 let mut all_candidates = Vec::new();
678 let mut latest_completion = Instant::now();
679 let mut combined_stats = DiscoveryStatistics::default();
680
681 for session in self.active_sessions.values() {
682 match &session.current_phase {
683 DiscoveryPhase::Completed { final_candidates, completion_time } => {
684 all_candidates.extend(final_candidates.clone());
686 latest_completion = *completion_time;
687 combined_stats.local_candidates_found += session.statistics.local_candidates_found;
689 combined_stats.server_reflexive_candidates_found += session.statistics.server_reflexive_candidates_found;
690 combined_stats.predicted_candidates_generated += session.statistics.predicted_candidates_generated;
691 combined_stats.bootstrap_queries_sent += session.statistics.bootstrap_queries_sent;
692 combined_stats.bootstrap_queries_successful += session.statistics.bootstrap_queries_successful;
693 },
694 DiscoveryPhase::Failed { .. } => {
695 let validated: Vec<ValidatedCandidate> = session.discovered_candidates.iter()
698 .enumerate()
699 .map(|(idx, dc)| ValidatedCandidate {
700 id: CandidateId(idx as u64),
701 address: dc.address,
702 source: dc.source,
703 priority: dc.priority,
704 rtt: None,
705 reliability_score: 0.5, })
707 .collect();
708 all_candidates.extend(validated);
709 },
710 _ => {}
711 }
712 }
713
714 if all_candidates.is_empty() {
715 None
716 } else {
717 Some(DiscoveryResults {
718 candidates: all_candidates,
719 completion_time: latest_completion,
720 statistics: combined_stats,
721 })
722 }
723 }
724
725 pub fn get_candidates_for_peer(&self, peer_id: PeerId) -> Vec<CandidateAddress> {
727 if let Some(session) = self.active_sessions.get(&peer_id) {
729 session.discovered_candidates.iter()
731 .map(|c| c.to_candidate_address())
732 .collect()
733 } else {
734 debug!("No active discovery session found for peer {:?}", peer_id);
736 Vec::new()
737 }
738 }
739
740 fn poll_session_local_scanning(&mut self, session: &mut DiscoverySession, started_at: Instant, now: Instant, events: &mut Vec<DiscoveryEvent>) {
743 if let Some((cache_time, ref cached_candidates)) = self.cached_local_candidates {
745 if cache_time.elapsed() < self.local_cache_duration {
746 debug!("Using cached local candidates for peer {:?}", session.peer_id);
748 self.process_cached_local_candidates(session, cached_candidates.clone(), events, now);
749 return;
750 }
751 }
752
753 if started_at.elapsed().as_millis() < 10 {
756 let scan_result = self.interface_discovery.lock().unwrap().start_scan();
757 match scan_result {
758 Ok(()) => {
759 debug!("Started local interface scan for peer {:?}", session.peer_id);
760 events.push(DiscoveryEvent::LocalScanningStarted);
761 }
762 Err(e) => {
763 error!("Failed to start interface scan: {}", e);
764 self.handle_session_local_scan_timeout(session, events, now);
765 return;
766 }
767 }
768 }
769
770 if started_at.elapsed() > self.config.local_scan_timeout {
772 warn!("Local interface scanning timeout for peer {:?}", session.peer_id);
773 self.handle_session_local_scan_timeout(session, events, now);
774 return;
775 }
776
777 let scan_complete_result = self.interface_discovery.lock().unwrap().check_scan_complete();
779 if let Some(interfaces) = scan_complete_result {
780 self.process_session_local_interfaces(session, interfaces, events, now);
781 }
782 }
783
784 fn process_session_local_interfaces(&mut self, session: &mut DiscoverySession, interfaces: Vec<NetworkInterface>, events: &mut Vec<DiscoveryEvent>, now: Instant) {
785 debug!("Processing {} network interfaces for peer {:?}", interfaces.len(), session.peer_id);
786
787 let mut validated_candidates = Vec::new();
788
789 if let Some(bound_addr) = self.config.bound_address {
791 if self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback() {
792 let candidate = DiscoveryCandidate {
793 address: bound_addr,
794 priority: 60000, source: DiscoverySourceType::Local,
796 state: CandidateState::New,
797 };
798
799 session.discovered_candidates.push(candidate.clone());
800 session.statistics.local_candidates_found += 1;
801
802 validated_candidates.push(ValidatedCandidate {
804 id: CandidateId(rand::random()),
805 address: bound_addr,
806 source: DiscoverySourceType::Local,
807 priority: candidate.priority,
808 rtt: None,
809 reliability_score: 1.0,
810 });
811
812 events.push(DiscoveryEvent::LocalCandidateDiscovered {
813 candidate: candidate.to_candidate_address()
814 });
815
816 debug!("Added bound address {} as local candidate for peer {:?}", bound_addr, session.peer_id);
817 }
818 }
819
820 for interface in &interfaces {
822 for address in &interface.addresses {
823 if Some(*address) == self.config.bound_address {
825 continue;
826 }
827
828 if self.is_valid_local_address(&address) {
829 let candidate = DiscoveryCandidate {
830 address: *address,
831 priority: self.calculate_local_priority(address, &interface),
832 source: DiscoverySourceType::Local,
833 state: CandidateState::New,
834 };
835
836 session.discovered_candidates.push(candidate.clone());
837 session.statistics.local_candidates_found += 1;
838
839 validated_candidates.push(ValidatedCandidate {
841 id: CandidateId(rand::random()),
842 address: *address,
843 source: DiscoverySourceType::Local,
844 priority: candidate.priority,
845 rtt: None,
846 reliability_score: 1.0,
847 });
848
849 events.push(DiscoveryEvent::LocalCandidateDiscovered {
850 candidate: candidate.to_candidate_address()
851 });
852 }
853 }
854 }
855
856 self.cached_local_candidates = Some((now, validated_candidates));
858
859 events.push(DiscoveryEvent::LocalScanningCompleted {
860 candidate_count: session.statistics.local_candidates_found as usize,
861 duration: now.duration_since(session.started_at),
862 });
863
864 self.start_session_server_reflexive_discovery(session, events, now);
866 }
867
868 fn process_cached_local_candidates(&mut self, session: &mut DiscoverySession, mut cached_candidates: Vec<ValidatedCandidate>, events: &mut Vec<DiscoveryEvent>, now: Instant) {
869 if let Some(bound_addr) = self.config.bound_address {
871 let has_bound_addr = cached_candidates.iter().any(|c| c.address == bound_addr);
872 if !has_bound_addr && (self.is_valid_local_address(&bound_addr) || bound_addr.ip().is_loopback()) {
873 cached_candidates.insert(0, ValidatedCandidate {
874 id: CandidateId(rand::random()),
875 address: bound_addr,
876 source: DiscoverySourceType::Local,
877 priority: 60000, rtt: None,
879 reliability_score: 1.0,
880 });
881 }
882 }
883
884 debug!("Using {} cached local candidates for peer {:?}", cached_candidates.len(), session.peer_id);
885
886 for validated in cached_candidates {
887 let candidate = DiscoveryCandidate {
888 address: validated.address,
889 priority: validated.priority,
890 source: validated.source.clone(),
891 state: CandidateState::New,
892 };
893
894 session.discovered_candidates.push(candidate.clone());
895 session.statistics.local_candidates_found += 1;
896
897 events.push(DiscoveryEvent::LocalCandidateDiscovered {
898 candidate: candidate.to_candidate_address()
899 });
900 }
901
902 events.push(DiscoveryEvent::LocalScanningCompleted {
903 candidate_count: session.statistics.local_candidates_found as usize,
904 duration: now.duration_since(session.started_at),
905 });
906
907 self.start_session_server_reflexive_discovery(session, events, now);
909 }
910
911 fn start_session_server_reflexive_discovery(&mut self, session: &mut DiscoverySession, events: &mut Vec<DiscoveryEvent>, now: Instant) {
912 let bootstrap_node_ids = self.bootstrap_manager.get_active_bootstrap_nodes();
913
914 if bootstrap_node_ids.is_empty() {
915 info!("No bootstrap nodes available for server reflexive discovery for peer {:?}, completing with local candidates only", session.peer_id);
916 self.complete_session_discovery_with_local_candidates(session, events, now);
918 return;
919 }
920
921 let bootstrap_nodes_with_addresses: Vec<(BootstrapNodeId, SocketAddr)> = bootstrap_node_ids
923 .iter()
924 .filter_map(|&node_id| {
925 self.bootstrap_manager.get_bootstrap_address(node_id)
926 .map(|addr| (node_id, addr))
927 })
928 .collect();
929
930 if bootstrap_nodes_with_addresses.is_empty() {
931 warn!("No bootstrap node addresses available for server reflexive discovery");
932 self.complete_session_discovery_with_local_candidates(session, events, now);
934 return;
935 }
936
937 let active_queries = session.server_reflexive_discovery.start_queries_with_addresses(&bootstrap_nodes_with_addresses, now);
939
940 events.push(DiscoveryEvent::ServerReflexiveDiscoveryStarted {
941 bootstrap_count: bootstrap_nodes_with_addresses.len(),
942 });
943
944 session.current_phase = DiscoveryPhase::ServerReflexiveQuerying {
945 started_at: now,
946 active_queries,
947 responses_received: Vec::new(),
948 };
949 }
950
951
952
953 fn process_server_reflexive_response_for_session(&mut self, session: &mut DiscoverySession, response: &ServerReflexiveResponse, events: &mut Vec<DiscoveryEvent>) {
954 debug!("Received server reflexive response: {:?}", response);
955
956 let allocation_event = PortAllocationEvent {
958 port: response.observed_address.port(),
959 timestamp: response.timestamp,
960 source_address: response.observed_address,
961 };
962
963 if let DiscoveryPhase::ServerReflexiveQuerying { .. } = &mut session.current_phase {
965 session.allocation_history.push_back(allocation_event.clone());
968
969 if session.allocation_history.len() > 20 {
971 session.allocation_history.pop_front();
972 }
973 }
974
975 let candidate = DiscoveryCandidate {
976 address: response.observed_address,
977 priority: self.calculate_server_reflexive_priority(response),
978 source: DiscoverySourceType::ServerReflexive,
979 state: CandidateState::New,
980 };
981
982 session.discovered_candidates.push(candidate.clone());
983 session.statistics.server_reflexive_candidates_found += 1;
984
985 events.push(DiscoveryEvent::ServerReflexiveCandidateDiscovered {
986 candidate: candidate.to_candidate_address(),
987 bootstrap_node: self.bootstrap_manager.get_bootstrap_address(response.bootstrap_node).unwrap_or_else(|| "unknown".parse().unwrap()),
988 });
989
990 events.push(DiscoveryEvent::PortAllocationDetected {
991 port: allocation_event.port,
992 source_address: allocation_event.source_address,
993 bootstrap_node: response.bootstrap_node,
994 timestamp: allocation_event.timestamp,
995 });
996 }
997
998
999 fn start_session_symmetric_prediction(&mut self, session: &mut DiscoverySession, responses: &[ServerReflexiveResponse], events: &mut Vec<DiscoveryEvent>, now: Instant) {
1000 if !self.config.enable_symmetric_prediction || responses.is_empty() {
1001 self.complete_session_discovery_with_local_candidates(session, events, now);
1003 return;
1004 }
1005
1006 let base_address = self.calculate_consensus_address(responses);
1008
1009 events.push(DiscoveryEvent::SymmetricPredictionStarted { base_address });
1010
1011 let detected_pattern = self.symmetric_predictor.lock().unwrap().analyze_allocation_patterns(&session.allocation_history);
1013
1014 let confidence_level = detected_pattern.as_ref().map(|p| p.confidence).unwrap_or(0.0);
1015
1016 let prediction_accuracy = if let Some(ref pattern) = detected_pattern {
1018 self.calculate_prediction_accuracy(pattern, &session.allocation_history)
1019 } else {
1020 0.3 };
1022
1023 debug!("Symmetric NAT pattern analysis: detected_pattern={:?}, confidence={:.2}, accuracy={:.2}",
1024 detected_pattern, confidence_level, prediction_accuracy);
1025
1026 session.current_phase = DiscoveryPhase::SymmetricNatPrediction {
1027 started_at: now,
1028 prediction_attempts: 0,
1029 pattern_analysis: PatternAnalysisState {
1030 allocation_history: session.allocation_history.clone(),
1031 detected_pattern,
1032 confidence_level,
1033 prediction_accuracy,
1034 },
1035 };
1036 }
1037
1038
1039
1040 fn start_session_candidate_validation(&mut self, session: &mut DiscoverySession, _events: &mut Vec<DiscoveryEvent>, now: Instant) {
1041 debug!("Starting candidate validation for {} candidates", session.discovered_candidates.len());
1042
1043 session.current_phase = DiscoveryPhase::CandidateValidation {
1044 started_at: now,
1045 validation_results: HashMap::new(),
1046 };
1047 }
1048
1049
1050 fn start_path_validation(&mut self, candidate_id: CandidateId, candidate_address: SocketAddr, now: Instant, events: &mut Vec<DiscoveryEvent>) {
1052 debug!("Starting QUIC path validation for candidate {} at {}", candidate_id.0, candidate_address);
1053
1054 let challenge_token: u64 = rand::random();
1056
1057 self.pending_validations.insert(candidate_id, PendingValidation {
1059 candidate_address,
1060 challenge_token,
1061 started_at: now,
1062 attempts: 1,
1063 });
1064
1065 events.push(DiscoveryEvent::PathValidationRequested {
1067 candidate_id,
1068 candidate_address,
1069 challenge_token,
1070 });
1071
1072 debug!("PATH_CHALLENGE {:08x} requested for candidate {} at {}",
1073 challenge_token, candidate_id.0, candidate_address);
1074 }
1075
1076 pub fn handle_path_response(&mut self, candidate_address: SocketAddr, challenge_token: u64, now: Instant) -> Option<DiscoveryEvent> {
1078 let candidate_id = self.pending_validations.iter()
1080 .find(|(_, validation)| {
1081 validation.candidate_address == candidate_address &&
1082 validation.challenge_token == challenge_token
1083 })
1084 .map(|(id, _)| *id)?;
1085
1086 let validation = self.pending_validations.remove(&candidate_id)?;
1088 let rtt = now.duration_since(validation.started_at);
1089
1090 debug!("PATH_RESPONSE received for candidate {} at {} with RTT {:?}",
1091 candidate_id.0, candidate_address, rtt);
1092
1093 for session in self.active_sessions.values_mut() {
1095 if let Some(candidate) = session.discovered_candidates.iter_mut()
1096 .find(|c| c.address == candidate_address)
1097 {
1098 candidate.state = CandidateState::Valid;
1099 break;
1101 }
1102 }
1103
1104 Some(DiscoveryEvent::PathValidationResponse {
1105 candidate_id,
1106 candidate_address,
1107 challenge_token,
1108 rtt,
1109 })
1110 }
1111
1112 fn simulate_path_validation(&mut self, candidate_id: CandidateId, candidate_address: SocketAddr, _now: Instant) {
1114 let is_local = candidate_address.ip().is_loopback() ||
1116 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("192.168.")) ||
1117 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("10.")) ||
1118 (candidate_address.ip().is_ipv4() && candidate_address.ip().to_string().starts_with("172."));
1119
1120 let is_server_reflexive = !is_local && !candidate_address.ip().is_unspecified();
1121
1122 debug!("Simulated path validation for candidate {} at {} - local: {}, server_reflexive: {}",
1125 candidate_id.0, candidate_address, is_local, is_server_reflexive);
1126 }
1127
1128
1129 fn simulate_validation_result(&self, address: &SocketAddr) -> ValidationResult {
1131 let is_local = address.ip().is_loopback() ||
1132 (address.ip().is_ipv4() && address.ip().to_string().starts_with("192.168.")) ||
1133 (address.ip().is_ipv4() && address.ip().to_string().starts_with("10.")) ||
1134 (address.ip().is_ipv4() && address.ip().to_string().starts_with("172."));
1135
1136 if is_local {
1137 ValidationResult::Valid { rtt: Duration::from_millis(1) }
1139 } else if address.ip().is_unspecified() {
1140 ValidationResult::Invalid { reason: "Unspecified address".to_string() }
1142 } else {
1143 ValidationResult::Valid { rtt: Duration::from_millis(50 + (address.port() % 100) as u64) }
1145 }
1146 }
1147
1148
1149 fn calculate_reliability_score(&self, candidate: &DiscoveryCandidate, rtt: Duration) -> f64 {
1151 let mut score: f64 = 0.5; match candidate.source {
1155 DiscoverySourceType::Local => score += 0.3, DiscoverySourceType::ServerReflexive => score += 0.2, DiscoverySourceType::Predicted => score += 0.1, }
1159
1160 let rtt_ms = rtt.as_millis() as f64;
1162 if rtt_ms < 10.0 {
1163 score += 0.2;
1164 } else if rtt_ms < 50.0 {
1165 score += 0.1;
1166 } else if rtt_ms > 200.0 {
1167 score -= 0.1;
1168 }
1169
1170 if candidate.address.ip().is_ipv6() {
1172 score += 0.05; }
1174
1175 score.max(0.0).min(1.0)
1177 }
1178
1179
1180 fn handle_session_timeout(&mut self, session: &mut DiscoverySession, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1184 let error = DiscoveryError::DiscoveryTimeout;
1185 let partial_results = session.discovered_candidates.iter()
1186 .map(|c| c.to_candidate_address())
1187 .collect();
1188
1189 warn!("Discovery failed for peer {:?}: discovery process timed out (found {} partial candidates)",
1190 session.peer_id, session.discovered_candidates.len());
1191 events.push(DiscoveryEvent::DiscoveryFailed {
1192 error: error.clone(),
1193 partial_results,
1194 });
1195
1196 session.current_phase = DiscoveryPhase::Failed {
1197 error,
1198 failed_at: now,
1199 fallback_options: vec![FallbackStrategy::UseCachedResults],
1200 };
1201 }
1202
1203 fn handle_session_local_scan_timeout(&mut self, session: &mut DiscoverySession, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1204 warn!("Local interface scan timeout for peer {:?}, proceeding with available candidates", session.peer_id);
1205
1206 events.push(DiscoveryEvent::LocalScanningCompleted {
1207 candidate_count: session.statistics.local_candidates_found as usize,
1208 duration: now.duration_since(session.started_at),
1209 });
1210
1211 self.start_session_server_reflexive_discovery(session, events, now);
1212 }
1213
1214 fn poll_session_server_reflexive(&mut self, session: &mut DiscoverySession, _started_at: Instant, _active_queries: &HashMap<BootstrapNodeId, QueryState>, _responses_received: &[(BootstrapNodeId, ServerReflexiveResponse)], now: Instant, events: &mut Vec<DiscoveryEvent>) {
1215 self.complete_session_discovery_with_local_candidates(session, events, now);
1218 }
1219
1220 fn poll_session_symmetric_prediction(&mut self, session: &mut DiscoverySession, _started_at: Instant, _prediction_attempts: u32, _pattern_analysis: &PatternAnalysisState, now: Instant, events: &mut Vec<DiscoveryEvent>) {
1221 self.complete_session_discovery_with_local_candidates(session, events, now);
1224 }
1225
1226 fn poll_session_candidate_validation(&mut self, session: &mut DiscoverySession, _started_at: Instant, _validation_results: &HashMap<CandidateId, ValidationResult>, now: Instant, events: &mut Vec<DiscoveryEvent>) {
1227 self.complete_session_discovery_with_local_candidates(session, events, now);
1230 }
1231
1232
1233
1234 fn complete_session_discovery_with_local_candidates(&mut self, session: &mut DiscoverySession, events: &mut Vec<DiscoveryEvent>, now: Instant) {
1235 let duration = now.duration_since(session.started_at);
1237 session.statistics.total_discovery_time = Some(duration);
1238
1239 let success_rate = if session.statistics.local_candidates_found > 0 {
1240 1.0
1241 } else {
1242 0.0
1243 };
1244
1245 let validated_candidates: Vec<ValidatedCandidate> = session.discovered_candidates
1247 .iter()
1248 .map(|dc| ValidatedCandidate {
1249 id: CandidateId(rand::random()),
1250 address: dc.address,
1251 source: dc.source.clone(),
1252 priority: dc.priority,
1253 rtt: None,
1254 reliability_score: 1.0,
1255 })
1256 .collect();
1257
1258 events.push(DiscoveryEvent::DiscoveryCompleted {
1259 candidate_count: validated_candidates.len(),
1260 total_duration: duration,
1261 success_rate,
1262 });
1263
1264 session.current_phase = DiscoveryPhase::Completed {
1265 final_candidates: validated_candidates,
1266 completion_time: now,
1267 };
1268
1269 info!("Discovery completed with {} local candidates for peer {:?}", session.discovered_candidates.len(), session.peer_id);
1270 }
1271
1272
1273 fn is_valid_local_address(&self, address: &SocketAddr) -> bool {
1274 match address.ip() {
1275 IpAddr::V4(ipv4) => {
1276 #[cfg(test)]
1278 if ipv4.is_loopback() {
1279 return true;
1280 }
1281 !ipv4.is_loopback() && !ipv4.is_unspecified()
1282 },
1283 IpAddr::V6(ipv6) => {
1284 #[cfg(test)]
1286 if ipv6.is_loopback() {
1287 return true;
1288 }
1289 !ipv6.is_loopback() && !ipv6.is_unspecified()
1290 },
1291 }
1292 }
1293
1294 fn calculate_local_priority(&self, address: &SocketAddr, interface: &NetworkInterface) -> u32 {
1295 let mut priority = 100; match address.ip() {
1298 IpAddr::V4(ipv4) => {
1299 if ipv4.is_private() {
1300 priority += 50; }
1302 },
1303 IpAddr::V6(ipv6) => {
1304 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
1307 let segments = ipv6.segments();
1308 if segments[0] & 0xE000 == 0x2000 {
1309 priority += 60;
1311 } else if segments[0] & 0xFFC0 == 0xFE80 {
1312 priority += 20;
1314 } else if segments[0] & 0xFE00 == 0xFC00 {
1315 priority += 40;
1317 } else {
1318 priority += 30;
1320 }
1321 }
1322
1323 priority += 10; },
1326 }
1327
1328 if interface.is_wireless {
1329 priority -= 10; }
1331
1332 priority
1333 }
1334
1335 fn calculate_server_reflexive_priority(&self, response: &ServerReflexiveResponse) -> u32 {
1336 let mut priority = 200; if response.response_time < Duration::from_millis(50) {
1340 priority += 20;
1341 } else if response.response_time > Duration::from_millis(200) {
1342 priority -= 10;
1343 }
1344
1345 let age_bonus = if response.timestamp.elapsed().as_secs() < 60 { 20 } else { 0 };
1347 priority += age_bonus;
1348
1349 priority
1350 }
1351
1352 fn should_transition_to_prediction(&self, responses: &[ServerReflexiveResponse], _now: Instant) -> bool {
1353 responses.len() >= self.config.min_bootstrap_consensus.max(1)
1354 }
1355
1356 fn calculate_consensus_address(&self, responses: &[ServerReflexiveResponse]) -> SocketAddr {
1357 let mut address_counts: HashMap<SocketAddr, usize> = HashMap::new();
1359
1360 for response in responses {
1361 *address_counts.entry(response.observed_address).or_insert(0) += 1;
1362 }
1363
1364 address_counts
1365 .into_iter()
1366 .max_by_key(|(_, count)| *count)
1367 .map(|(addr, _)| addr)
1368 .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
1369 }
1370
1371 fn calculate_prediction_accuracy(&self, pattern: &PortAllocationPattern, history: &VecDeque<PortAllocationEvent>) -> f64 {
1373 if history.len() < 3 {
1374 return 0.3; }
1376
1377 let recent_ports: Vec<u16> = history.iter()
1379 .rev()
1380 .take(10)
1381 .map(|event| event.port)
1382 .collect();
1383
1384 let mut correct_predictions = 0;
1385 let total_predictions = recent_ports.len().saturating_sub(1);
1386
1387 if total_predictions == 0 {
1388 return 0.3;
1389 }
1390
1391 match pattern.pattern_type {
1392 AllocationPatternType::Sequential => {
1393 for i in 1..recent_ports.len() {
1395 if recent_ports[i-1].wrapping_sub(recent_ports[i]) == 1 {
1396 correct_predictions += 1;
1397 }
1398 }
1399 }
1400 AllocationPatternType::FixedStride => {
1401 for i in 1..recent_ports.len() {
1403 if recent_ports[i-1].wrapping_sub(recent_ports[i]) == pattern.stride {
1404 correct_predictions += 1;
1405 }
1406 }
1407 }
1408 AllocationPatternType::PoolBased => {
1409 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1411 for port in &recent_ports {
1412 if *port >= min_port && *port <= max_port {
1413 correct_predictions += 1;
1414 }
1415 }
1416 }
1417 }
1418 AllocationPatternType::Random | AllocationPatternType::Unknown => {
1419 if recent_ports.len() >= 3 {
1421 let mean = recent_ports.iter().map(|&p| p as f64).sum::<f64>() / recent_ports.len() as f64;
1422 let variance = recent_ports.iter()
1423 .map(|&p| (p as f64 - mean).powi(2))
1424 .sum::<f64>() / recent_ports.len() as f64;
1425
1426 let normalized_variance = (variance / 10000.0).min(1.0); return 0.2 + (1.0 - normalized_variance) * 0.3; }
1430 }
1431 AllocationPatternType::TimeBased => {
1432 if history.len() >= 2 {
1434 let time_diffs: Vec<Duration> = history.iter()
1435 .collect::<Vec<_>>()
1436 .windows(2)
1437 .map(|w| w[1].timestamp.duration_since(w[0].timestamp))
1438 .collect();
1439
1440 if !time_diffs.is_empty() {
1441 let avg_diff = time_diffs.iter().sum::<Duration>() / time_diffs.len() as u32;
1442 let variance = time_diffs.iter()
1443 .map(|d| d.as_millis().abs_diff(avg_diff.as_millis()) as f64)
1444 .sum::<f64>() / time_diffs.len() as f64;
1445
1446 let normalized_variance = (variance / 1000.0).min(1.0); return 0.3 + (1.0 - normalized_variance) * 0.4; }
1450 }
1451 }
1452 }
1453
1454 let accuracy = if total_predictions > 0 {
1456 correct_predictions as f64 / total_predictions as f64
1457 } else {
1458 0.3
1459 };
1460
1461 let confidence_adjusted_accuracy = accuracy * pattern.confidence;
1463
1464 confidence_adjusted_accuracy.max(0.2).min(0.9)
1466 }
1467}
1468
1469#[derive(Debug, Clone)]
1471pub struct DiscoveryStatus {
1472 pub phase: DiscoveryPhase,
1473 pub discovered_candidates: Vec<CandidateAddress>,
1474 pub statistics: DiscoveryStatistics,
1475 pub elapsed_time: Duration,
1476}
1477
1478#[derive(Debug, Clone)]
1480pub struct DiscoveryResults {
1481 pub candidates: Vec<ValidatedCandidate>,
1482 pub completion_time: Instant,
1483 pub statistics: DiscoveryStatistics,
1484}
1485
1486pub trait NetworkInterfaceDiscovery {
1490 fn start_scan(&mut self) -> Result<(), String>;
1491 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>>;
1492}
1493
1494#[derive(Debug, Clone, PartialEq)]
1496pub struct NetworkInterface {
1497 pub name: String,
1498 pub addresses: Vec<SocketAddr>,
1499 pub is_up: bool,
1500 pub is_wireless: bool,
1501 pub mtu: Option<u16>,
1502}
1503
1504#[derive(Debug)]
1506struct BootstrapConnection {
1507 connection: crate::Connection,
1509 address: SocketAddr,
1511 established_at: Instant,
1513 request_id: u64,
1515}
1516
1517#[derive(Debug, Clone)]
1519struct AddressObservationRequest {
1520 request_id: u64,
1522 timestamp: u64,
1524 capabilities: u32,
1526}
1527
1528#[derive(Debug)]
1530pub(crate) struct ServerReflexiveDiscovery {
1531 config: DiscoveryConfig,
1532 active_queries: HashMap<BootstrapNodeId, QueryState>,
1534 responses: VecDeque<ServerReflexiveResponse>,
1536 query_timeouts: HashMap<BootstrapNodeId, Instant>,
1538 active_connections: HashMap<BootstrapNodeId, BootstrapConnection>,
1540 runtime_handle: Option<tokio::runtime::Handle>,
1542}
1543
1544impl ServerReflexiveDiscovery {
1545 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1546 Self {
1547 config: config.clone(),
1548 active_queries: HashMap::new(),
1549 responses: VecDeque::new(),
1550 query_timeouts: HashMap::new(),
1551 active_connections: HashMap::new(),
1552 runtime_handle: tokio::runtime::Handle::try_current().ok(),
1553 }
1554 }
1555
1556 pub(crate) fn start_queries(&mut self, bootstrap_nodes: &[BootstrapNodeId], now: Instant) -> HashMap<BootstrapNodeId, QueryState> {
1557 debug!("Starting server reflexive queries to {} bootstrap nodes", bootstrap_nodes.len());
1558
1559 self.active_queries.clear();
1560 self.query_timeouts.clear();
1561
1562 self.active_connections.clear();
1563
1564 for &node_id in bootstrap_nodes {
1565 let query_state = QueryState::Pending {
1566 sent_at: now,
1567 attempts: 1,
1568 };
1569
1570 self.active_queries.insert(node_id, query_state);
1571 self.query_timeouts.insert(node_id, now + self.config.bootstrap_query_timeout);
1572
1573 debug!("Starting server reflexive query to bootstrap node {:?}", node_id);
1574
1575 if let Some(runtime) = &self.runtime_handle {
1577 self.start_quinn_query(node_id, runtime.clone(), now);
1578 } else {
1579 warn!("No async runtime available, falling back to simulation for node {:?}", node_id);
1580 self.simulate_bootstrap_response(node_id, now);
1581 }
1582 }
1583
1584 self.active_queries.clone()
1585 }
1586
1587 pub(crate) fn start_queries_with_addresses(
1589 &mut self,
1590 bootstrap_nodes: &[(BootstrapNodeId, SocketAddr)],
1591 now: Instant
1592 ) -> HashMap<BootstrapNodeId, QueryState> {
1593 debug!("Starting server reflexive queries to {} bootstrap nodes with addresses", bootstrap_nodes.len());
1594
1595 self.active_queries.clear();
1596 self.query_timeouts.clear();
1597
1598 self.active_connections.clear();
1599
1600 for &(node_id, bootstrap_address) in bootstrap_nodes {
1601 let query_state = QueryState::Pending {
1602 sent_at: now,
1603 attempts: 1,
1604 };
1605
1606 self.active_queries.insert(node_id, query_state);
1607 self.query_timeouts.insert(node_id, now + self.config.bootstrap_query_timeout);
1608
1609 debug!("Starting server reflexive query to bootstrap node {:?} at {}", node_id, bootstrap_address);
1610
1611 if let Some(_runtime) = &self.runtime_handle {
1613 self.start_quinn_query_with_address(node_id, bootstrap_address, now);
1614 } else {
1615 warn!("No async runtime available, falling back to simulation for node {:?}", node_id);
1616 self.simulate_bootstrap_response(node_id, now);
1617 }
1618 }
1619
1620 self.active_queries.clone()
1621 }
1622
1623 fn start_quinn_query(&mut self, node_id: BootstrapNodeId, _runtime: tokio::runtime::Handle, now: Instant) {
1625 let request_id = rand::random::<u64>();
1631
1632 debug!("Starting Quinn connection to bootstrap node {:?} with request ID {}", node_id, request_id);
1633
1634 self.simulate_bootstrap_response(node_id, now);
1644 }
1645
1646 pub(crate) fn start_quinn_query_with_address(
1648 &mut self,
1649 node_id: BootstrapNodeId,
1650 bootstrap_address: SocketAddr,
1651 now: Instant
1652 ) {
1653
1654 let request_id = rand::random::<u64>();
1655
1656 info!("Establishing Quinn connection to bootstrap node {:?} at {}", node_id, bootstrap_address);
1657
1658 if let Some(runtime) = &self.runtime_handle {
1660 let timeout = self.config.bootstrap_query_timeout;
1661
1662 let (response_tx, _response_rx) = tokio::sync::mpsc::unbounded_channel();
1664
1665 runtime.spawn(async move {
1670 match Self::perform_bootstrap_query(bootstrap_address, request_id, timeout).await {
1671 Ok(observed_address) => {
1672 let response = ServerReflexiveResponse {
1673 bootstrap_node: node_id,
1674 observed_address,
1675 response_time: now.elapsed(),
1676 timestamp: Instant::now(),
1677 };
1678
1679 let _ = response_tx.send(response);
1681
1682 info!("Successfully received observed address {} from bootstrap node {:?}",
1683 observed_address, node_id);
1684 }
1685 Err(e) => {
1686 warn!("Failed to query bootstrap node {:?} at {}: {}", node_id, bootstrap_address, e);
1687 }
1688 }
1689 });
1690 } else {
1691 warn!("No async runtime available for Quinn query to {:?}", node_id);
1692 self.simulate_bootstrap_response(node_id, now);
1693 }
1694 }
1695
1696 async fn perform_bootstrap_query(
1701 _bootstrap_address: SocketAddr,
1702 _request_id: u64,
1703 _timeout: Duration,
1704 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1705 Err("Bootstrap query not implemented for low-level API".into())
1709
1710 }
1770
1771 fn create_discovery_request(request_id: u64) -> Vec<u8> {
1773 let mut request = Vec::new();
1774
1775 request.extend_from_slice(&request_id.to_be_bytes());
1780 request.extend_from_slice(&std::time::SystemTime::now()
1781 .duration_since(std::time::UNIX_EPOCH)
1782 .unwrap_or_default()
1783 .as_millis().to_be_bytes()[8..16]); request.extend_from_slice(&1u32.to_be_bytes()); debug!("Created discovery request: {} bytes, request_id: {}", request.len(), request_id);
1787 request
1788 }
1789
1790 async fn wait_for_add_address_frame(
1792 _connection: &Connection,
1793 _expected_request_id: u64,
1794 ) -> Result<SocketAddr, Box<dyn std::error::Error + Send + Sync>> {
1795 Err("wait_for_add_address_frame not implemented for low-level API".into())
1798
1799 }
1835
1836 fn create_response_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ServerReflexiveResponse> {
1838 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1841 tx
1843 }
1844
1845 pub(crate) fn poll_queries(&mut self, _active_queries: &HashMap<BootstrapNodeId, QueryState>, now: Instant) -> Vec<ServerReflexiveResponse> {
1846 let mut responses = Vec::new();
1847
1848 while let Some(response) = self.responses.pop_front() {
1850 responses.push(response);
1851 }
1852
1853 let mut timed_out_nodes = Vec::new();
1855 for (&node_id, &timeout) in &self.query_timeouts {
1856 if now >= timeout {
1857 timed_out_nodes.push(node_id);
1858 }
1859 }
1860
1861 for node_id in timed_out_nodes {
1863 self.query_timeouts.remove(&node_id);
1864
1865 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
1866 match query_state {
1867 QueryState::Pending { attempts, .. } if *attempts < self.config.max_query_retries => {
1868 *attempts += 1;
1870 let new_timeout = now + self.config.bootstrap_query_timeout;
1871 self.query_timeouts.insert(node_id, new_timeout);
1872
1873 debug!("Retrying server reflexive query to bootstrap node {:?} (attempt {})", node_id, attempts);
1874
1875 self.simulate_bootstrap_response(node_id, now);
1877 }
1878 _ => {
1879 self.active_queries.insert(node_id, QueryState::Failed);
1881 warn!("Server reflexive query to bootstrap node {:?} failed after retries", node_id);
1882 }
1883 }
1884 }
1885 }
1886
1887 responses
1888 }
1889
1890 fn simulate_bootstrap_response(&mut self, node_id: BootstrapNodeId, now: Instant) {
1893 let simulated_external_addr = match node_id.0 % 3 {
1895 0 => "203.0.113.1:45678".parse().unwrap(),
1896 1 => "198.51.100.2:45679".parse().unwrap(),
1897 _ => "192.0.2.3:45680".parse().unwrap(),
1898 };
1899
1900 let response = ServerReflexiveResponse {
1901 bootstrap_node: node_id,
1902 observed_address: simulated_external_addr,
1903 response_time: Duration::from_millis(50 + node_id.0 * 10),
1904 timestamp: now,
1905 };
1906
1907 self.responses.push_back(response);
1908
1909 if let Some(query_state) = self.active_queries.get_mut(&node_id) {
1911 *query_state = QueryState::Completed;
1912 }
1913
1914 debug!("Received simulated server reflexive response from bootstrap node {:?}: {}",
1915 node_id, simulated_external_addr);
1916 }
1917}
1918
1919#[derive(Debug)]
1921pub(crate) struct SymmetricNatPredictor {
1922 config: DiscoveryConfig,
1923}
1924
1925impl SymmetricNatPredictor {
1926 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
1927 Self {
1928 config: config.clone(),
1929 }
1930 }
1931
1932 pub(crate) fn generate_predictions(&mut self, pattern_analysis: &PatternAnalysisState, max_count: usize) -> Vec<DiscoveryCandidate> {
1937 let mut predictions = Vec::new();
1938
1939 if pattern_analysis.allocation_history.is_empty() || max_count == 0 {
1940 return predictions;
1941 }
1942
1943 let recent_events: Vec<_> = pattern_analysis.allocation_history
1945 .iter()
1946 .rev()
1947 .take(5) .collect();
1949
1950 if recent_events.len() < 2 {
1951 return predictions;
1952 }
1953
1954 match &pattern_analysis.detected_pattern {
1955 Some(pattern) => {
1956 predictions.extend(self.generate_pattern_based_predictions(pattern, max_count));
1957 }
1958 None => {
1959 predictions.extend(self.generate_heuristic_predictions(&recent_events, max_count));
1960 }
1961 }
1962
1963 predictions.truncate(max_count);
1965 predictions
1966 }
1967
1968 fn generate_pattern_based_predictions(&self, pattern: &PortAllocationPattern, max_count: usize) -> Vec<DiscoveryCandidate> {
1970 let mut predictions = Vec::new();
1971
1972 match pattern.pattern_type {
1973 AllocationPatternType::Sequential => {
1974 for i in 1..=max_count as u16 {
1976 let predicted_port = pattern.base_port.wrapping_add(i);
1977 if self.is_valid_port(predicted_port) {
1978 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence));
1979 }
1980 }
1981 }
1982 AllocationPatternType::FixedStride => {
1983 for i in 1..=max_count as u16 {
1985 let predicted_port = pattern.base_port.wrapping_add(pattern.stride * i);
1986 if self.is_valid_port(predicted_port) {
1987 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence));
1988 }
1989 }
1990 }
1991 AllocationPatternType::PoolBased => {
1992 if let Some((min_port, max_port)) = pattern.pool_boundaries {
1994 let pool_size = max_port - min_port + 1;
1995 let step = (pool_size / max_count as u16).max(1);
1996
1997 for i in 0..max_count as u16 {
1998 let predicted_port = min_port + (i * step);
1999 if predicted_port <= max_port && self.is_valid_port(predicted_port) {
2000 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence * 0.8));
2001 }
2002 }
2003 }
2004 }
2005 AllocationPatternType::TimeBased => {
2006 for i in 1..=max_count as u16 {
2009 let predicted_port = pattern.base_port.wrapping_add(i);
2010 if self.is_valid_port(predicted_port) {
2011 predictions.push(self.create_predicted_candidate(predicted_port, pattern.confidence * 0.6));
2012 }
2013 }
2014 }
2015 AllocationPatternType::Random | AllocationPatternType::Unknown => {
2016 predictions.extend(self.generate_statistical_predictions(pattern.base_port, max_count));
2018 }
2019 }
2020
2021 predictions
2022 }
2023
2024 fn generate_heuristic_predictions(&self, recent_events: &[&PortAllocationEvent], max_count: usize) -> Vec<DiscoveryCandidate> {
2026 let mut predictions = Vec::new();
2027
2028 if let Some(latest_event) = recent_events.first() {
2029 let base_port = latest_event.port;
2030
2031 for i in 1..=(max_count / 3) as u16 {
2035 let predicted_port = base_port.wrapping_add(i);
2036 if self.is_valid_port(predicted_port) {
2037 predictions.push(self.create_predicted_candidate(predicted_port, 0.7));
2038 }
2039 }
2040
2041 if base_port % 2 == 0 {
2043 let predicted_port = base_port + 1;
2044 if self.is_valid_port(predicted_port) {
2045 predictions.push(self.create_predicted_candidate(predicted_port, 0.6));
2046 }
2047 }
2048
2049 for stride in [2, 4, 8, 16] {
2051 if predictions.len() >= max_count {
2052 break;
2053 }
2054 let predicted_port = base_port.wrapping_add(stride);
2055 if self.is_valid_port(predicted_port) {
2056 predictions.push(self.create_predicted_candidate(predicted_port, 0.5));
2057 }
2058 }
2059
2060 if recent_events.len() >= 2 {
2062 let stride = recent_events[0].port.wrapping_sub(recent_events[1].port);
2063 if stride > 0 && stride <= 100 { for i in 1..=3 {
2065 if predictions.len() >= max_count {
2066 break;
2067 }
2068 let predicted_port = base_port.wrapping_add(stride * i);
2069 if self.is_valid_port(predicted_port) {
2070 predictions.push(self.create_predicted_candidate(predicted_port, 0.4));
2071 }
2072 }
2073 }
2074 }
2075 }
2076
2077 predictions.truncate(max_count);
2078 predictions
2079 }
2080
2081 fn generate_statistical_predictions(&self, base_port: u16, max_count: usize) -> Vec<DiscoveryCandidate> {
2083 let mut predictions = Vec::new();
2084
2085 let common_ranges = [
2087 (1024, 5000), (5000, 10000), (10000, 20000), (32768, 65535), ];
2092
2093 let current_range = common_ranges.iter()
2095 .find(|(min, max)| base_port >= *min && base_port <= *max)
2096 .copied()
2097 .unwrap_or((1024, 65535));
2098
2099 let range_size = current_range.1 - current_range.0;
2101 let step = (range_size / max_count as u16).max(1);
2102
2103 for i in 0..max_count {
2104 let offset = (i as u16 * step) % range_size;
2105 let predicted_port = current_range.0 + offset;
2106
2107 if self.is_valid_port(predicted_port) && predicted_port != base_port {
2108 predictions.push(self.create_predicted_candidate(predicted_port, 0.3));
2109 }
2110 }
2111
2112 predictions
2113 }
2114
2115 fn is_valid_port(&self, port: u16) -> bool {
2117 port >= 1024 && port <= 65535 && port != 0
2119 }
2120
2121 fn create_predicted_candidate(&self, port: u16, confidence: f64) -> DiscoveryCandidate {
2123 let base_priority = 50; let priority = (base_priority as f64 * confidence) as u32;
2127
2128 DiscoveryCandidate {
2129 address: SocketAddr::new(
2130 "0.0.0.0".parse().unwrap(), port
2132 ),
2133 priority,
2134 source: DiscoverySourceType::Predicted,
2135 state: CandidateState::New,
2136 }
2137 }
2138
2139 pub(crate) fn analyze_allocation_patterns(&self, history: &VecDeque<PortAllocationEvent>) -> Option<PortAllocationPattern> {
2141 if history.len() < 3 {
2142 return None;
2143 }
2144
2145 let recent_ports: Vec<u16> = history.iter()
2146 .rev()
2147 .take(10)
2148 .map(|event| event.port)
2149 .collect();
2150
2151 if let Some(pattern) = self.detect_sequential_pattern(&recent_ports) {
2153 return Some(pattern);
2154 }
2155
2156 if let Some(pattern) = self.detect_stride_pattern(&recent_ports) {
2158 return Some(pattern);
2159 }
2160
2161 if let Some(pattern) = self.detect_pool_pattern(&recent_ports) {
2163 return Some(pattern);
2164 }
2165
2166 if let Some(pattern) = self.detect_time_based_pattern(history) {
2168 return Some(pattern);
2169 }
2170
2171 None
2172 }
2173
2174 fn detect_sequential_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2176 if ports.len() < 3 {
2177 return None;
2178 }
2179
2180 let mut sequential_count = 0;
2181 let mut total_comparisons = 0;
2182
2183 for i in 1..ports.len() {
2184 total_comparisons += 1;
2185 let diff = ports[i-1].wrapping_sub(ports[i]);
2186 if diff == 1 {
2187 sequential_count += 1;
2188 }
2189 }
2190
2191 let sequential_ratio = sequential_count as f64 / total_comparisons as f64;
2192
2193 if sequential_ratio >= 0.6 { let confidence = (sequential_ratio * 0.9).min(0.9); Some(PortAllocationPattern {
2197 pattern_type: AllocationPatternType::Sequential,
2198 base_port: ports[0],
2199 stride: 1,
2200 pool_boundaries: None,
2201 confidence,
2202 })
2203 } else {
2204 None
2205 }
2206 }
2207
2208 fn detect_stride_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2210 if ports.len() < 4 {
2211 return None;
2212 }
2213
2214 let mut diffs = Vec::new();
2216 for i in 1..ports.len() {
2217 let diff = ports[i-1].wrapping_sub(ports[i]);
2218 if diff > 0 && diff <= 1000 { diffs.push(diff);
2220 }
2221 }
2222
2223 if diffs.len() < 2 {
2224 return None;
2225 }
2226
2227 let mut diff_counts = std::collections::HashMap::new();
2229 for &diff in &diffs {
2230 *diff_counts.entry(diff).or_insert(0) += 1;
2231 }
2232
2233 let (most_common_diff, count) = diff_counts.iter()
2234 .max_by_key(|(_, &count)| count)
2235 .map(|(&diff, &count)| (diff, count))?;
2236
2237 let consistency_ratio = count as f64 / diffs.len() as f64;
2238
2239 if consistency_ratio >= 0.5 && most_common_diff > 1 { let confidence = (consistency_ratio * 0.8).min(0.8); Some(PortAllocationPattern {
2243 pattern_type: AllocationPatternType::FixedStride,
2244 base_port: ports[0],
2245 stride: most_common_diff,
2246 pool_boundaries: None,
2247 confidence,
2248 })
2249 } else {
2250 None
2251 }
2252 }
2253
2254 fn detect_pool_pattern(&self, ports: &[u16]) -> Option<PortAllocationPattern> {
2256 if ports.len() < 5 {
2257 return None;
2258 }
2259
2260 let min_port = *ports.iter().min()?;
2261 let max_port = *ports.iter().max()?;
2262 let range = max_port - min_port;
2263
2264 if range > 0 && range <= 10000 { let expected_step = range / (ports.len() as u16 - 1);
2268 let mut uniform_score = 0.0;
2269
2270 let mut sorted_ports = ports.to_vec();
2271 sorted_ports.sort_unstable();
2272
2273 for i in 1..sorted_ports.len() {
2274 let actual_step = sorted_ports[i] - sorted_ports[i-1];
2275 let step_diff = (actual_step as i32 - expected_step as i32).abs() as f64;
2276 let normalized_diff = step_diff / expected_step as f64;
2277 uniform_score += 1.0 - normalized_diff.min(1.0);
2278 }
2279
2280 uniform_score /= (sorted_ports.len() - 1) as f64;
2281
2282 if uniform_score >= 0.4 { let confidence = (uniform_score * 0.7).min(0.7); Some(PortAllocationPattern {
2286 pattern_type: AllocationPatternType::PoolBased,
2287 base_port: min_port,
2288 stride: expected_step,
2289 pool_boundaries: Some((min_port, max_port)),
2290 confidence,
2291 })
2292 } else {
2293 None
2294 }
2295 } else {
2296 None
2297 }
2298 }
2299
2300 fn detect_time_based_pattern(&self, history: &VecDeque<PortAllocationEvent>) -> Option<PortAllocationPattern> {
2302 if history.len() < 4 {
2303 return None;
2304 }
2305
2306 let mut time_intervals = Vec::new();
2308 let events: Vec<_> = history.iter().collect();
2309
2310 for i in 1..events.len() {
2311 let interval = events[i-1].timestamp.duration_since(events[i].timestamp);
2312 time_intervals.push(interval);
2313 }
2314
2315 if time_intervals.is_empty() {
2316 return None;
2317 }
2318
2319 let avg_interval = time_intervals.iter().sum::<std::time::Duration>() / time_intervals.len() as u32;
2321
2322 let mut consistency_score = 0.0;
2323 for interval in &time_intervals {
2324 let diff = if *interval > avg_interval {
2325 *interval - avg_interval
2326 } else {
2327 avg_interval - *interval
2328 };
2329
2330 let normalized_diff = diff.as_millis() as f64 / avg_interval.as_millis() as f64;
2331 consistency_score += 1.0 - normalized_diff.min(1.0);
2332 }
2333
2334 consistency_score /= time_intervals.len() as f64;
2335
2336 if consistency_score >= 0.6 && avg_interval.as_millis() > 100 && avg_interval.as_millis() < 10000 {
2337 let confidence = (consistency_score * 0.6).min(0.6); Some(PortAllocationPattern {
2340 pattern_type: AllocationPatternType::TimeBased,
2341 base_port: events[0].port,
2342 stride: 1, pool_boundaries: None,
2344 confidence,
2345 })
2346 } else {
2347 None
2348 }
2349 }
2350
2351 pub(crate) fn generate_confidence_scored_predictions(
2353 &mut self,
2354 base_address: SocketAddr,
2355 pattern_analysis: &PatternAnalysisState,
2356 max_count: usize
2357 ) -> Vec<(DiscoveryCandidate, f64)> {
2358 let mut scored_predictions = Vec::new();
2359
2360 let predictions = self.generate_predictions(pattern_analysis, max_count);
2362
2363 for mut prediction in predictions {
2364 prediction.address = SocketAddr::new(base_address.ip(), prediction.address.port());
2366
2367 let confidence = self.calculate_prediction_confidence(&prediction, pattern_analysis, base_address);
2369
2370 scored_predictions.push((prediction, confidence));
2371 }
2372
2373 scored_predictions.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2375
2376 scored_predictions
2377 }
2378
2379 fn calculate_prediction_confidence(
2381 &self,
2382 prediction: &DiscoveryCandidate,
2383 pattern_analysis: &PatternAnalysisState,
2384 base_address: SocketAddr
2385 ) -> f64 {
2386 let mut confidence = 0.5; if let Some(ref pattern) = pattern_analysis.detected_pattern {
2390 confidence += pattern.confidence * 0.3;
2391 }
2392
2393 confidence += pattern_analysis.prediction_accuracy * 0.2;
2395
2396 let port_distance = (prediction.address.port() as i32 - base_address.port() as i32).abs();
2398 let proximity_score = if port_distance <= 10 {
2399 0.2
2400 } else if port_distance <= 100 {
2401 0.1
2402 } else {
2403 0.0
2404 };
2405 confidence += proximity_score;
2406
2407 let port_range_score = match prediction.address.port() {
2409 1024..=4999 => 0.1, 5000..=9999 => 0.15, 10000..=20000 => 0.1, 32768..=65535 => 0.05, _ => 0.0,
2414 };
2415 confidence += port_range_score;
2416
2417 confidence.max(0.0).min(1.0)
2419 }
2420
2421 pub(crate) fn update_pattern_analysis(
2423 &self,
2424 pattern_analysis: &mut PatternAnalysisState,
2425 new_event: PortAllocationEvent
2426 ) {
2427 pattern_analysis.allocation_history.push_back(new_event);
2429
2430 if pattern_analysis.allocation_history.len() > 20 {
2432 pattern_analysis.allocation_history.pop_front();
2433 }
2434
2435 pattern_analysis.detected_pattern = self.analyze_allocation_patterns(&pattern_analysis.allocation_history);
2437
2438 if let Some(ref pattern) = pattern_analysis.detected_pattern {
2440 pattern_analysis.confidence_level = pattern.confidence;
2441 } else {
2442 pattern_analysis.confidence_level *= 0.9; }
2444
2445 pattern_analysis.prediction_accuracy *= 0.95;
2449 }
2450}
2451
2452#[derive(Debug)]
2454pub(crate) struct BootstrapNodeManager {
2455 config: DiscoveryConfig,
2456 bootstrap_nodes: HashMap<BootstrapNodeId, BootstrapNodeInfo>,
2457 health_stats: HashMap<BootstrapNodeId, BootstrapHealthStats>,
2458 performance_tracker: BootstrapPerformanceTracker,
2459 last_health_check: Option<Instant>,
2460 health_check_interval: Duration,
2461 failover_threshold: f64,
2462 discovery_sources: Vec<BootstrapDiscoverySource>,
2463}
2464
2465#[derive(Debug, Clone)]
2467pub(crate) struct BootstrapNodeInfo {
2468 pub address: SocketAddr,
2470 pub last_seen: Instant,
2472 pub can_coordinate: bool,
2474 pub health_status: BootstrapHealthStatus,
2476 pub capabilities: BootstrapCapabilities,
2478 pub priority: u32,
2480 pub discovery_source: BootstrapDiscoverySource,
2482}
2483
2484#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2486pub(crate) enum BootstrapHealthStatus {
2487 Healthy,
2489 Degraded,
2491 Unhealthy,
2493 Unknown,
2495}
2496
2497#[derive(Debug, Clone, Default)]
2499pub(crate) struct BootstrapCapabilities {
2500 pub supports_nat_traversal: bool,
2502 pub supports_ipv6: bool,
2504 pub supports_quic_extensions: bool,
2506 pub max_concurrent_coordinations: u32,
2508 pub supported_quic_versions: Vec<u32>,
2510}
2511
2512#[derive(Debug, Clone, Default)]
2514pub(crate) struct BootstrapHealthStats {
2515 pub connection_attempts: u32,
2517 pub successful_connections: u32,
2519 pub failed_connections: u32,
2521 pub average_rtt: Option<Duration>,
2523 pub recent_rtts: VecDeque<Duration>,
2525 pub last_health_check: Option<Instant>,
2527 pub consecutive_failures: u32,
2529 pub coordination_requests: u32,
2531 pub successful_coordinations: u32,
2533}
2534
2535#[derive(Debug, Default)]
2537pub(crate) struct BootstrapPerformanceTracker {
2538 pub overall_success_rate: f64,
2540 pub average_response_time: Duration,
2542 pub best_performers: Vec<BootstrapNodeId>,
2544 pub failover_nodes: Vec<BootstrapNodeId>,
2546 pub performance_history: VecDeque<PerformanceSnapshot>,
2548}
2549
2550#[derive(Debug, Clone)]
2552pub(crate) struct PerformanceSnapshot {
2553 pub timestamp: Instant,
2554 pub active_nodes: u32,
2555 pub success_rate: f64,
2556 pub average_rtt: Duration,
2557}
2558
2559#[derive(Debug, Clone, PartialEq, Eq)]
2561pub(crate) enum BootstrapDiscoverySource {
2562 Static,
2564 DNS,
2566 DHT,
2568 Multicast,
2570 UserProvided,
2572}
2573
2574impl BootstrapNodeManager {
2575 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
2576 Self {
2577 config: config.clone(),
2578 bootstrap_nodes: HashMap::new(),
2579 health_stats: HashMap::new(),
2580 performance_tracker: BootstrapPerformanceTracker::default(),
2581 last_health_check: None,
2582 health_check_interval: Duration::from_secs(30),
2583 failover_threshold: 0.3, discovery_sources: vec![
2585 BootstrapDiscoverySource::Static,
2586 BootstrapDiscoverySource::DNS,
2587 BootstrapDiscoverySource::UserProvided,
2588 ],
2589 }
2590 }
2591
2592 pub(crate) fn update_bootstrap_nodes(&mut self, nodes: Vec<BootstrapNode>) {
2594 let now = Instant::now();
2595
2596 for (i, node) in nodes.into_iter().enumerate() {
2598 let node_id = BootstrapNodeId(i as u64);
2599
2600 let node_info = BootstrapNodeInfo {
2601 address: node.address,
2602 last_seen: node.last_seen,
2603 can_coordinate: node.can_coordinate,
2604 health_status: BootstrapHealthStatus::Unknown,
2605 capabilities: BootstrapCapabilities {
2606 supports_nat_traversal: node.can_coordinate,
2607 supports_ipv6: node.address.is_ipv6(),
2608 supports_quic_extensions: true, max_concurrent_coordinations: 100, supported_quic_versions: vec![1], },
2612 priority: self.calculate_initial_priority(&node),
2613 discovery_source: BootstrapDiscoverySource::UserProvided,
2614 };
2615
2616 self.bootstrap_nodes.insert(node_id, node_info);
2617
2618 if !self.health_stats.contains_key(&node_id) {
2620 self.health_stats.insert(node_id, BootstrapHealthStats::default());
2621 }
2622 }
2623
2624 info!("Updated {} bootstrap nodes", self.bootstrap_nodes.len());
2625 self.schedule_health_check(now);
2626 }
2627
2628 pub(crate) fn get_active_bootstrap_nodes(&self) -> Vec<BootstrapNodeId> {
2630 let mut active_nodes: Vec<_> = self.bootstrap_nodes
2631 .iter()
2632 .filter(|(_, node)| {
2633 matches!(node.health_status, BootstrapHealthStatus::Healthy | BootstrapHealthStatus::Unknown)
2634 })
2635 .map(|(&id, node)| (id, node))
2636 .collect();
2637
2638 active_nodes.sort_by(|a, b| {
2640 let health_cmp = self.compare_health_status(a.1.health_status, b.1.health_status);
2642 if health_cmp != std::cmp::Ordering::Equal {
2643 return health_cmp;
2644 }
2645
2646 b.1.priority.cmp(&a.1.priority)
2648 });
2649
2650 active_nodes.into_iter().map(|(id, _)| id).collect()
2651 }
2652
2653 pub(crate) fn get_bootstrap_address(&self, id: BootstrapNodeId) -> Option<SocketAddr> {
2655 self.bootstrap_nodes.get(&id).map(|node| node.address)
2656 }
2657
2658 pub(crate) fn perform_health_check(&mut self, now: Instant) {
2660 if let Some(last_check) = self.last_health_check {
2661 if now.duration_since(last_check) < self.health_check_interval {
2662 return; }
2664 }
2665
2666 debug!("Performing health check on {} bootstrap nodes", self.bootstrap_nodes.len());
2667
2668 let node_ids: Vec<BootstrapNodeId> = self.bootstrap_nodes.keys().copied().collect();
2670
2671 for node_id in node_ids {
2672 self.check_node_health(node_id, now);
2673 }
2674
2675 self.update_performance_metrics(now);
2676 self.last_health_check = Some(now);
2677 }
2678
2679 fn check_node_health(&mut self, node_id: BootstrapNodeId, now: Instant) {
2681 let node_info_opt = self.bootstrap_nodes.get(&node_id).cloned();
2683 if node_info_opt.is_none() {
2684 return; }
2686 let node_info_for_priority = node_info_opt.unwrap();
2687 let current_health_status = node_info_for_priority.health_status;
2688
2689 let (_success_rate, new_health_status, _average_rtt) = {
2691 let stats = self.health_stats.get_mut(&node_id).unwrap();
2692
2693 let success_rate = if stats.connection_attempts > 0 {
2695 stats.successful_connections as f64 / stats.connection_attempts as f64
2696 } else {
2697 1.0 };
2699
2700 if !stats.recent_rtts.is_empty() {
2702 let total_rtt: Duration = stats.recent_rtts.iter().sum();
2703 stats.average_rtt = Some(total_rtt / stats.recent_rtts.len() as u32);
2704 }
2705
2706 let new_health_status = if stats.consecutive_failures >= 3 {
2708 BootstrapHealthStatus::Unhealthy
2709 } else if success_rate < self.failover_threshold {
2710 BootstrapHealthStatus::Degraded
2711 } else if success_rate >= 0.8 && stats.consecutive_failures == 0 {
2712 BootstrapHealthStatus::Healthy
2713 } else {
2714 current_health_status };
2716
2717 stats.last_health_check = Some(now);
2718
2719 (success_rate, new_health_status, stats.average_rtt)
2720 };
2721
2722 let stats_snapshot = self.health_stats.get(&node_id).unwrap();
2724 let new_priority = self.calculate_dynamic_priority(&node_info_for_priority, stats_snapshot);
2725
2726 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2728 if new_health_status != node_info.health_status {
2729 info!("Bootstrap node {:?} health status changed: {:?} -> {:?}",
2730 node_id, node_info.health_status, new_health_status);
2731 node_info.health_status = new_health_status;
2732 }
2733
2734 node_info.priority = new_priority;
2735 }
2736 }
2737
2738 pub(crate) fn record_connection_attempt(&mut self, node_id: BootstrapNodeId, success: bool, rtt: Option<Duration>) {
2740 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2741 stats.connection_attempts += 1;
2742
2743 if success {
2744 stats.successful_connections += 1;
2745 stats.consecutive_failures = 0;
2746
2747 if let Some(rtt) = rtt {
2748 stats.recent_rtts.push_back(rtt);
2749 if stats.recent_rtts.len() > 10 {
2750 stats.recent_rtts.pop_front();
2751 }
2752 }
2753 } else {
2754 stats.failed_connections += 1;
2755 stats.consecutive_failures += 1;
2756 }
2757 }
2758
2759 if success {
2761 if let Some(node_info) = self.bootstrap_nodes.get_mut(&node_id) {
2762 node_info.last_seen = Instant::now();
2763 }
2764 }
2765 }
2766
2767 pub(crate) fn record_coordination_result(&mut self, node_id: BootstrapNodeId, success: bool) {
2769 if let Some(stats) = self.health_stats.get_mut(&node_id) {
2770 stats.coordination_requests += 1;
2771 if success {
2772 stats.successful_coordinations += 1;
2773 }
2774 }
2775 }
2776
2777 pub(crate) fn get_best_performers(&self, count: usize) -> Vec<BootstrapNodeId> {
2779 let mut nodes_with_scores: Vec<_> = self.bootstrap_nodes
2780 .iter()
2781 .filter_map(|(&id, node)| {
2782 if matches!(node.health_status, BootstrapHealthStatus::Healthy) {
2783 let score = self.calculate_performance_score(id, node);
2784 Some((id, score))
2785 } else {
2786 None
2787 }
2788 })
2789 .collect();
2790
2791 nodes_with_scores.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
2792
2793 nodes_with_scores
2794 .into_iter()
2795 .take(count)
2796 .map(|(id, _)| id)
2797 .collect()
2798 }
2799
2800 pub(crate) fn discover_new_nodes(&mut self) -> Result<Vec<BootstrapNodeInfo>, String> {
2802 let mut discovered_nodes = Vec::new();
2803
2804 if let Ok(dns_nodes) = self.discover_via_dns() {
2806 discovered_nodes.extend(dns_nodes);
2807 }
2808
2809 if let Ok(multicast_nodes) = self.discover_via_multicast() {
2811 discovered_nodes.extend(multicast_nodes);
2812 }
2813
2814 for node in &discovered_nodes {
2816 let node_id = BootstrapNodeId(rand::random());
2817 self.bootstrap_nodes.insert(node_id, node.clone());
2818 self.health_stats.insert(node_id, BootstrapHealthStats::default());
2819 }
2820
2821 if !discovered_nodes.is_empty() {
2822 info!("Discovered {} new bootstrap nodes", discovered_nodes.len());
2823 }
2824
2825 Ok(discovered_nodes)
2826 }
2827
2828 fn discover_via_dns(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
2830 debug!("DNS-based bootstrap discovery not yet implemented");
2833 Ok(Vec::new())
2834 }
2835
2836 fn discover_via_multicast(&self) -> Result<Vec<BootstrapNodeInfo>, String> {
2838 debug!("Multicast-based bootstrap discovery not yet implemented");
2841 Ok(Vec::new())
2842 }
2843
2844 fn calculate_initial_priority(&self, node: &BootstrapNode) -> u32 {
2846 let mut priority = 100; if node.can_coordinate {
2849 priority += 50;
2850 }
2851
2852 if let Some(rtt) = node.rtt {
2853 if rtt < Duration::from_millis(50) {
2854 priority += 30;
2855 } else if rtt < Duration::from_millis(100) {
2856 priority += 20;
2857 } else if rtt < Duration::from_millis(200) {
2858 priority += 10;
2859 }
2860 }
2861
2862 if node.address.is_ipv6() {
2864 priority += 10;
2865 }
2866
2867 priority
2868 }
2869
2870 fn calculate_dynamic_priority(&self, node_info: &BootstrapNodeInfo, stats: &BootstrapHealthStats) -> u32 {
2872 let mut priority = node_info.priority;
2873
2874 let success_rate = if stats.connection_attempts > 0 {
2876 stats.successful_connections as f64 / stats.connection_attempts as f64
2877 } else {
2878 1.0
2879 };
2880
2881 priority = (priority as f64 * success_rate) as u32;
2882
2883 if let Some(avg_rtt) = stats.average_rtt {
2885 if avg_rtt < Duration::from_millis(50) {
2886 priority += 20;
2887 } else if avg_rtt > Duration::from_millis(500) {
2888 priority = priority.saturating_sub(20);
2889 }
2890 }
2891
2892 priority = priority.saturating_sub(stats.consecutive_failures * 10);
2894
2895 priority.max(1) }
2897
2898 fn calculate_performance_score(&self, node_id: BootstrapNodeId, _node_info: &BootstrapNodeInfo) -> f64 {
2900 let stats = self.health_stats.get(&node_id).unwrap();
2901
2902 let mut score = 0.0;
2903
2904 let success_rate = if stats.connection_attempts > 0 {
2906 stats.successful_connections as f64 / stats.connection_attempts as f64
2907 } else {
2908 1.0
2909 };
2910 score += success_rate * 0.4;
2911
2912 if let Some(avg_rtt) = stats.average_rtt {
2914 let rtt_score = (1000.0 - avg_rtt.as_millis() as f64).max(0.0) / 1000.0;
2915 score += rtt_score * 0.3;
2916 } else {
2917 score += 0.3; }
2919
2920 let coord_success_rate = if stats.coordination_requests > 0 {
2922 stats.successful_coordinations as f64 / stats.coordination_requests as f64
2923 } else {
2924 1.0
2925 };
2926 score += coord_success_rate * 0.2;
2927
2928 let stability_score = if stats.consecutive_failures == 0 {
2930 1.0
2931 } else {
2932 1.0 / (stats.consecutive_failures as f64 + 1.0)
2933 };
2934 score += stability_score * 0.1;
2935
2936 score
2937 }
2938
2939 fn compare_health_status(&self, a: BootstrapHealthStatus, b: BootstrapHealthStatus) -> std::cmp::Ordering {
2941 use std::cmp::Ordering;
2942
2943 match (a, b) {
2944 (BootstrapHealthStatus::Healthy, BootstrapHealthStatus::Healthy) => Ordering::Equal,
2945 (BootstrapHealthStatus::Healthy, _) => Ordering::Less, (_, BootstrapHealthStatus::Healthy) => Ordering::Greater,
2947 (BootstrapHealthStatus::Unknown, BootstrapHealthStatus::Unknown) => Ordering::Equal,
2948 (BootstrapHealthStatus::Unknown, _) => Ordering::Less, (_, BootstrapHealthStatus::Unknown) => Ordering::Greater,
2950 (BootstrapHealthStatus::Degraded, BootstrapHealthStatus::Degraded) => Ordering::Equal,
2951 (BootstrapHealthStatus::Degraded, _) => Ordering::Less, (_, BootstrapHealthStatus::Degraded) => Ordering::Greater,
2953 (BootstrapHealthStatus::Unhealthy, BootstrapHealthStatus::Unhealthy) => Ordering::Equal,
2954 }
2955 }
2956
2957 fn update_performance_metrics(&mut self, now: Instant) {
2959 let mut total_attempts = 0;
2960 let mut total_successes = 0;
2961 let mut total_rtt = Duration::ZERO;
2962 let mut rtt_count = 0;
2963
2964 for stats in self.health_stats.values() {
2965 total_attempts += stats.connection_attempts;
2966 total_successes += stats.successful_connections;
2967
2968 if let Some(avg_rtt) = stats.average_rtt {
2969 total_rtt += avg_rtt;
2970 rtt_count += 1;
2971 }
2972 }
2973
2974 self.performance_tracker.overall_success_rate = if total_attempts > 0 {
2975 total_successes as f64 / total_attempts as f64
2976 } else {
2977 1.0
2978 };
2979
2980 self.performance_tracker.average_response_time = if rtt_count > 0 {
2981 total_rtt / rtt_count
2982 } else {
2983 Duration::from_millis(100) };
2985
2986 self.performance_tracker.best_performers = self.get_best_performers(5);
2988
2989 let snapshot = PerformanceSnapshot {
2991 timestamp: now,
2992 active_nodes: self.get_active_bootstrap_nodes().len() as u32,
2993 success_rate: self.performance_tracker.overall_success_rate,
2994 average_rtt: self.performance_tracker.average_response_time,
2995 };
2996
2997 self.performance_tracker.performance_history.push_back(snapshot);
2998 if self.performance_tracker.performance_history.len() > 100 {
2999 self.performance_tracker.performance_history.pop_front();
3000 }
3001 }
3002
3003 fn schedule_health_check(&mut self, _now: Instant) {
3005 }
3008
3009 pub(crate) fn get_performance_stats(&self) -> &BootstrapPerformanceTracker {
3011 &self.performance_tracker
3012 }
3013
3014 pub(crate) fn get_node_health_stats(&self, node_id: BootstrapNodeId) -> Option<&BootstrapHealthStats> {
3016 self.health_stats.get(&node_id)
3017 }
3018}
3019
3020#[derive(Debug)]
3022pub(crate) struct DiscoveryCache {
3023 config: DiscoveryConfig,
3024}
3025
3026impl DiscoveryCache {
3027 pub(crate) fn new(config: &DiscoveryConfig) -> Self {
3028 Self {
3029 config: config.clone(),
3030 }
3031 }
3032}
3033
3034pub(crate) fn create_platform_interface_discovery() -> Box<dyn NetworkInterfaceDiscovery + Send> {
3036 #[cfg(target_os = "windows")]
3037 return Box::new(WindowsInterfaceDiscovery::new());
3038
3039 #[cfg(target_os = "linux")]
3040 return Box::new(LinuxInterfaceDiscovery::new());
3041
3042 #[cfg(target_os = "macos")]
3043 return Box::new(MacOSInterfaceDiscovery::new());
3044
3045 #[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
3046 return Box::new(GenericInterfaceDiscovery::new());
3047}
3048
3049pub(crate) struct GenericInterfaceDiscovery {
3059 scan_complete: bool,
3060}
3061
3062impl GenericInterfaceDiscovery {
3063 pub(crate) fn new() -> Self {
3064 Self { scan_complete: false }
3065 }
3066}
3067
3068impl NetworkInterfaceDiscovery for GenericInterfaceDiscovery {
3069 fn start_scan(&mut self) -> Result<(), String> {
3070 self.scan_complete = true;
3072 Ok(())
3073 }
3074
3075 fn check_scan_complete(&mut self) -> Option<Vec<NetworkInterface>> {
3076 if self.scan_complete {
3077 self.scan_complete = false;
3078 Some(vec![
3079 NetworkInterface {
3080 name: "generic".to_string(),
3081 addresses: vec!["127.0.0.1:0".parse().unwrap()],
3082 is_up: true,
3083 is_wireless: false,
3084 mtu: Some(1500),
3085 }
3086 ])
3087 } else {
3088 None
3089 }
3090 }
3091}
3092
3093impl std::fmt::Display for DiscoveryError {
3094 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3095 match self {
3096 Self::NoLocalInterfaces => write!(f, "no local network interfaces found"),
3097 Self::AllBootstrapsFailed => write!(f, "all bootstrap node queries failed"),
3098 Self::DiscoveryTimeout => write!(f, "discovery process timed out"),
3099 Self::InsufficientCandidates { found, required } => write!(f, "insufficient candidates found: {} < {}", found, required),
3100 Self::NetworkError(msg) => write!(f, "network error: {}", msg),
3101 Self::ConfigurationError(msg) => write!(f, "configuration error: {}", msg),
3102 Self::InternalError(msg) => write!(f, "internal error: {}", msg),
3103 }
3104 }
3105}
3106
3107impl std::error::Error for DiscoveryError {}
3108
3109pub mod test_utils {
3111 use super::*;
3112
3113 pub fn calculate_address_priority(address: &IpAddr) -> u32 {
3115 let mut priority = 100; match address {
3117 IpAddr::V4(ipv4) => {
3118 if ipv4.is_private() {
3119 priority += 50; }
3121 },
3122 IpAddr::V6(ipv6) => {
3123 if !ipv6.is_loopback() && !ipv6.is_multicast() && !ipv6.is_unspecified() {
3126 let segments = ipv6.segments();
3127 if segments[0] & 0xE000 == 0x2000 {
3128 priority += 60;
3130 } else if segments[0] & 0xFFC0 == 0xFE80 {
3131 priority += 20;
3133 } else if segments[0] & 0xFE00 == 0xFC00 {
3134 priority += 40;
3136 } else {
3137 priority += 30;
3139 }
3140 }
3141
3142 priority += 10; },
3145 }
3146 priority
3147 }
3148
3149 pub fn is_valid_address(address: &IpAddr) -> bool {
3151 match address {
3152 IpAddr::V4(ipv4) => !ipv4.is_loopback() && !ipv4.is_unspecified(),
3153 IpAddr::V6(ipv6) => !ipv6.is_loopback() && !ipv6.is_unspecified(),
3154 }
3155 }
3156}