1use crate::geo_routing::{GeoLocation, GeoRouter, GeoRouterConfig};
46use crate::quality_predictor::{QualityPredictor, QualityPredictorConfig};
47use dashmap::DashMap;
48use libp2p::PeerId;
49use parking_lot::RwLock;
50use serde::{Deserialize, Serialize};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53
54#[derive(Debug, Clone)]
56pub struct PeerSelectorConfig {
57 pub distance_weight: f64,
59 pub quality_weight: f64,
61 pub latency_weight: f64,
63 pub bandwidth_weight: f64,
65 pub enable_caching: bool,
67 pub cache_ttl_secs: u64,
69 pub max_cache_entries: usize,
71}
72
73impl Default for PeerSelectorConfig {
74 fn default() -> Self {
75 Self {
76 distance_weight: 0.3,
77 quality_weight: 0.3,
78 latency_weight: 0.2,
79 bandwidth_weight: 0.2,
80 enable_caching: true,
81 cache_ttl_secs: 300, max_cache_entries: 1000,
83 }
84 }
85}
86
87impl PeerSelectorConfig {
88 pub fn low_latency() -> Self {
90 Self {
91 distance_weight: 0.4,
92 quality_weight: 0.1,
93 latency_weight: 0.4,
94 bandwidth_weight: 0.1,
95 enable_caching: true,
96 cache_ttl_secs: 180,
97 max_cache_entries: 500,
98 }
99 }
100
101 pub fn high_bandwidth() -> Self {
103 Self {
104 distance_weight: 0.1,
105 quality_weight: 0.2,
106 latency_weight: 0.1,
107 bandwidth_weight: 0.6,
108 enable_caching: true,
109 cache_ttl_secs: 300,
110 max_cache_entries: 1000,
111 }
112 }
113
114 pub fn balanced() -> Self {
116 Self::default()
117 }
118
119 pub fn mobile() -> Self {
121 Self {
122 distance_weight: 0.5, quality_weight: 0.3,
124 latency_weight: 0.1,
125 bandwidth_weight: 0.1,
126 enable_caching: true,
127 cache_ttl_secs: 600, max_cache_entries: 200,
129 }
130 }
131}
132
133#[derive(Debug, Clone)]
135pub struct SelectionCriteria {
136 pub reference_location: Option<GeoLocation>,
138 pub min_quality_score: f64,
140 pub max_distance_km: Option<f64>,
142 pub max_results: usize,
144}
145
146impl Default for SelectionCriteria {
147 fn default() -> Self {
148 Self {
149 reference_location: None,
150 min_quality_score: 0.0,
151 max_distance_km: None,
152 max_results: 10,
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
159pub struct SelectedPeer {
160 pub peer_id: PeerId,
162 pub score: f64,
164 pub distance_score: f64,
166 pub quality_score: f64,
168 pub latency_score: f64,
170 pub bandwidth_score: f64,
172 pub location: Option<GeoLocation>,
174 pub distance_km: Option<f64>,
176}
177
178#[derive(Debug, Clone)]
180struct CachedSelection {
181 peers: Vec<SelectedPeer>,
183 cached_at: Instant,
185}
186
187#[derive(Debug, Clone, Default, Serialize, Deserialize)]
189pub struct PeerSelectorStats {
190 pub total_selections: u64,
192 pub cache_hits: u64,
194 pub cache_misses: u64,
196 pub total_peers_evaluated: u64,
198 pub avg_selection_time_us: f64,
200}
201
202impl PeerSelectorStats {
203 pub fn cache_hit_rate(&self) -> f64 {
205 let total = self.cache_hits + self.cache_misses;
206 if total > 0 {
207 self.cache_hits as f64 / total as f64
208 } else {
209 0.0
210 }
211 }
212}
213
214pub struct PeerSelector {
216 config: PeerSelectorConfig,
218 geo_router: Arc<GeoRouter>,
220 quality_predictor: Arc<QualityPredictor>,
222 cache: Arc<DashMap<String, CachedSelection>>,
224 stats: Arc<RwLock<PeerSelectorStats>>,
226}
227
228impl PeerSelector {
229 pub fn new(config: PeerSelectorConfig) -> Self {
231 let geo_config = GeoRouterConfig::default();
232 let quality_config = QualityPredictorConfig::default();
233
234 Self {
235 config,
236 geo_router: Arc::new(GeoRouter::new(geo_config)),
237 quality_predictor: Arc::new(
238 QualityPredictor::new(quality_config).expect("Default config should be valid"),
239 ),
240 cache: Arc::new(DashMap::new()),
241 stats: Arc::new(RwLock::new(PeerSelectorStats::default())),
242 }
243 }
244
245 pub fn with_configs(
247 config: PeerSelectorConfig,
248 geo_config: GeoRouterConfig,
249 quality_config: QualityPredictorConfig,
250 ) -> Self {
251 Self {
252 config,
253 geo_router: Arc::new(GeoRouter::new(geo_config)),
254 quality_predictor: Arc::new(
255 QualityPredictor::new(quality_config).expect("Config should be valid"),
256 ),
257 cache: Arc::new(DashMap::new()),
258 stats: Arc::new(RwLock::new(PeerSelectorStats::default())),
259 }
260 }
261
262 pub fn add_peer_location(&self, peer_id: PeerId, location: GeoLocation) {
264 self.geo_router.update_peer_location(peer_id, location);
265 self.invalidate_cache();
266 }
267
268 pub fn remove_peer(&self, peer_id: &PeerId) {
270 self.geo_router.remove_peer(peer_id);
271 self.quality_predictor.remove_peer(peer_id);
272 self.invalidate_cache();
273 }
274
275 pub fn update_peer_quality(
277 &self,
278 peer_id: PeerId,
279 latency_ms: f64,
280 bandwidth_mbps: f64,
281 success: bool,
282 ) {
283 self.quality_predictor
284 .record_latency(peer_id, latency_ms as u64);
285 let bytes_per_sec = (bandwidth_mbps * 1_000_000.0 / 8.0) as u64;
287 self.quality_predictor
288 .record_bandwidth(peer_id, bytes_per_sec);
289 if !success {
290 self.quality_predictor.record_failure(peer_id);
291 }
292 self.invalidate_cache();
293 }
294
295 pub fn select_peers(&self, criteria: &SelectionCriteria) -> Vec<SelectedPeer> {
297 let start = Instant::now();
298
299 if self.config.enable_caching {
301 let cache_key = self.make_cache_key(criteria);
302 if let Some(cached) = self.cache.get(&cache_key) {
303 let age = start.duration_since(cached.cached_at);
304 if age.as_secs() < self.config.cache_ttl_secs {
305 let mut stats = self.stats.write();
306 stats.total_selections += 1;
307 stats.cache_hits += 1;
308 return cached.peers.clone();
309 }
310 }
311 }
312
313 let selected = self.select_peers_impl(criteria);
315
316 let elapsed = start.elapsed();
318 let mut stats = self.stats.write();
319 stats.total_selections += 1;
320 stats.cache_misses += 1;
321 stats.total_peers_evaluated += selected.len() as u64;
322 let new_avg = if stats.total_selections > 1 {
323 (stats.avg_selection_time_us * (stats.total_selections - 1) as f64
324 + elapsed.as_micros() as f64)
325 / stats.total_selections as f64
326 } else {
327 elapsed.as_micros() as f64
328 };
329 stats.avg_selection_time_us = new_avg;
330 drop(stats);
331
332 if self.config.enable_caching {
334 let cache_key = self.make_cache_key(criteria);
335 self.cache.insert(
336 cache_key,
337 CachedSelection {
338 peers: selected.clone(),
339 cached_at: start,
340 },
341 );
342
343 if self.cache.len() > self.config.max_cache_entries {
345 self.evict_old_cache_entries();
346 }
347 }
348
349 selected
350 }
351
352 fn select_peers_impl(&self, criteria: &SelectionCriteria) -> Vec<SelectedPeer> {
354 let mut scored_peers = Vec::new();
356
357 let geo_peers = if let Some(ref_location) = &criteria.reference_location {
359 self.geo_router.rank_peers_by_proximity(ref_location)
360 } else {
361 vec![]
362 };
363
364 for geo_peer in geo_peers {
365 if let Some(max_dist) = criteria.max_distance_km {
367 if let Some(dist) = geo_peer.distance_km {
368 if dist > max_dist {
369 continue;
370 }
371 }
372 }
373
374 let distance_score = self.calculate_distance_score(&geo_peer.distance_km);
376 let quality_prediction = self.quality_predictor.predict_quality(&geo_peer.peer_id);
377
378 let quality_score = quality_prediction
379 .as_ref()
380 .map(|p| p.overall_score)
381 .unwrap_or(0.5);
382 let latency_score = quality_prediction
383 .as_ref()
384 .map(|p| p.latency_score)
385 .unwrap_or(0.5);
386 let bandwidth_score = quality_prediction
387 .as_ref()
388 .map(|p| p.bandwidth_score)
389 .unwrap_or(0.5);
390
391 if quality_score < criteria.min_quality_score {
393 continue;
394 }
395
396 let overall_score = self.calculate_overall_score(
398 distance_score,
399 quality_score,
400 latency_score,
401 bandwidth_score,
402 );
403
404 scored_peers.push(SelectedPeer {
405 peer_id: geo_peer.peer_id,
406 score: overall_score,
407 distance_score,
408 quality_score,
409 latency_score,
410 bandwidth_score,
411 location: Some(geo_peer.location),
412 distance_km: geo_peer.distance_km,
413 });
414 }
415
416 scored_peers.sort_by(|a, b| {
418 b.score
419 .partial_cmp(&a.score)
420 .unwrap_or(std::cmp::Ordering::Equal)
421 });
422
423 scored_peers.truncate(criteria.max_results);
425 scored_peers
426 }
427
428 fn calculate_distance_score(&self, distance_km: &Option<f64>) -> f64 {
430 match distance_km {
431 Some(dist) => {
432 (-dist / 1000.0).exp()
437 }
438 None => 0.5, }
440 }
441
442 fn calculate_overall_score(
444 &self,
445 distance: f64,
446 quality: f64,
447 latency: f64,
448 bandwidth: f64,
449 ) -> f64 {
450 distance * self.config.distance_weight
451 + quality * self.config.quality_weight
452 + latency * self.config.latency_weight
453 + bandwidth * self.config.bandwidth_weight
454 }
455
456 fn make_cache_key(&self, criteria: &SelectionCriteria) -> String {
458 format!(
459 "loc:{:?}_qual:{}_dist:{:?}_max:{}",
460 criteria.reference_location,
461 criteria.min_quality_score,
462 criteria.max_distance_km,
463 criteria.max_results
464 )
465 }
466
467 fn invalidate_cache(&self) {
469 self.cache.clear();
470 }
471
472 fn evict_old_cache_entries(&self) {
474 let now = Instant::now();
475 let ttl = Duration::from_secs(self.config.cache_ttl_secs);
476
477 self.cache
478 .retain(|_, entry| now.duration_since(entry.cached_at) < ttl);
479
480 while self.cache.len() > self.config.max_cache_entries {
482 if let Some(entry) = self.cache.iter().next() {
483 let key = entry.key().clone();
484 drop(entry);
485 self.cache.remove(&key);
486 } else {
487 break;
488 }
489 }
490 }
491
492 pub fn stats(&self) -> PeerSelectorStats {
494 self.stats.read().clone()
495 }
496
497 #[allow(dead_code)]
499 pub fn reset(&self) {
500 self.cache.clear();
501 let mut stats = self.stats.write();
502 *stats = PeerSelectorStats::default();
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use super::*;
509
510 #[test]
511 fn test_peer_selector_config() {
512 let config = PeerSelectorConfig::default();
513 assert_eq!(config.distance_weight, 0.3);
514 assert_eq!(config.quality_weight, 0.3);
515 }
516
517 #[test]
518 fn test_config_presets() {
519 let low_latency = PeerSelectorConfig::low_latency();
520 assert!(low_latency.latency_weight > 0.3);
521
522 let high_bandwidth = PeerSelectorConfig::high_bandwidth();
523 assert!(high_bandwidth.bandwidth_weight > 0.5);
524
525 let mobile = PeerSelectorConfig::mobile();
526 assert!(mobile.distance_weight >= 0.5);
527 }
528
529 #[test]
530 fn test_peer_selector_creation() {
531 let config = PeerSelectorConfig::default();
532 let selector = PeerSelector::new(config);
533 let stats = selector.stats();
534 assert_eq!(stats.total_selections, 0);
535 }
536
537 #[test]
538 fn test_add_peer_location() {
539 let config = PeerSelectorConfig::default();
540 let selector = PeerSelector::new(config);
541
542 let peer = PeerId::random();
543 let location = GeoLocation::new(40.7128, -74.0060);
544 selector.add_peer_location(peer, location);
545
546 let loc = selector.geo_router.get_peer_location(&peer);
548 assert!(loc.is_some());
549 }
550
551 #[test]
552 fn test_remove_peer() {
553 let config = PeerSelectorConfig::default();
554 let selector = PeerSelector::new(config);
555
556 let peer = PeerId::random();
557 let location = GeoLocation::new(40.7128, -74.0060);
558 selector.add_peer_location(peer, location);
559 selector.remove_peer(&peer);
560
561 let loc = selector.geo_router.get_peer_location(&peer);
562 assert!(loc.is_none());
563 }
564
565 #[test]
566 fn test_selection_criteria() {
567 let criteria = SelectionCriteria {
568 reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
569 min_quality_score: 0.5,
570 max_distance_km: Some(1000.0),
571 max_results: 5,
572 };
573
574 assert!(criteria.reference_location.is_some());
575 assert_eq!(criteria.max_results, 5);
576 }
577
578 #[test]
579 fn test_select_peers_empty() {
580 let config = PeerSelectorConfig::default();
581 let selector = PeerSelector::new(config);
582
583 let criteria = SelectionCriteria::default();
584 let selected = selector.select_peers(&criteria);
585 assert_eq!(selected.len(), 0);
586 }
587
588 #[test]
589 fn test_select_peers_with_location() {
590 let config = PeerSelectorConfig::default();
591 let selector = PeerSelector::new(config);
592
593 let peer1 = PeerId::random();
595 let peer2 = PeerId::random();
596 selector.add_peer_location(peer1, GeoLocation::new(40.7128, -74.0060)); selector.add_peer_location(peer2, GeoLocation::new(34.0522, -118.2437)); let criteria = SelectionCriteria {
601 reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
602 min_quality_score: 0.0,
603 max_distance_km: None,
604 max_results: 10,
605 };
606
607 let selected = selector.select_peers(&criteria);
608 assert!(!selected.is_empty());
609 if selected.len() == 2 {
611 assert_eq!(selected[0].peer_id, peer2);
612 }
613 }
614
615 #[test]
616 fn test_distance_score_calculation() {
617 let config = PeerSelectorConfig::default();
618 let selector = PeerSelector::new(config);
619
620 let score_close = selector.calculate_distance_score(&Some(100.0));
621 let score_far = selector.calculate_distance_score(&Some(5000.0));
622 let score_none = selector.calculate_distance_score(&None);
623
624 assert!(score_close > score_far);
625 assert_eq!(score_none, 0.5);
626 }
627
628 #[test]
629 fn test_overall_score_calculation() {
630 let config = PeerSelectorConfig::default();
631 let selector = PeerSelector::new(config);
632
633 let score = selector.calculate_overall_score(1.0, 1.0, 1.0, 1.0);
634 assert!(score > 0.0 && score <= 1.0);
635 }
636
637 #[test]
638 fn test_cache_functionality() {
639 let config = PeerSelectorConfig {
640 enable_caching: true,
641 ..Default::default()
642 };
643 let selector = PeerSelector::new(config);
644
645 let peer = PeerId::random();
646 selector.add_peer_location(peer, GeoLocation::new(40.7128, -74.0060));
647
648 let criteria = SelectionCriteria {
649 reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
650 min_quality_score: 0.0,
651 max_distance_km: None,
652 max_results: 10,
653 };
654
655 selector.select_peers(&criteria);
657 let stats1 = selector.stats();
658 assert_eq!(stats1.cache_misses, 1);
659
660 selector.select_peers(&criteria);
662 let stats2 = selector.stats();
663 assert_eq!(stats2.cache_hits, 1);
664 }
665
666 #[test]
667 fn test_stats_cache_hit_rate() {
668 let stats = PeerSelectorStats {
669 cache_hits: 7,
670 cache_misses: 3,
671 ..Default::default()
672 };
673 assert!((stats.cache_hit_rate() - 0.7).abs() < 0.01);
674 }
675
676 #[test]
677 fn test_max_distance_filtering() {
678 let config = PeerSelectorConfig::default();
679 let selector = PeerSelector::new(config);
680
681 let peer_la = PeerId::random();
683 let peer_london = PeerId::random();
684 selector.add_peer_location(peer_la, GeoLocation::new(34.0522, -118.2437)); selector.add_peer_location(peer_london, GeoLocation::new(51.5074, -0.1278)); let criteria = SelectionCriteria {
688 reference_location: Some(GeoLocation::new(37.7749, -122.4194)), min_quality_score: 0.0,
690 max_distance_km: Some(1000.0),
691 max_results: 10,
692 };
693
694 let selected = selector.select_peers(&criteria);
695 assert_eq!(selected.len(), 1); assert_eq!(selected[0].peer_id, peer_la);
697 }
698
699 #[test]
700 fn test_min_quality_filtering() {
701 let config = PeerSelectorConfig::default();
702 let selector = PeerSelector::new(config);
703
704 let peer = PeerId::random();
705 selector.add_peer_location(peer, GeoLocation::new(40.7128, -74.0060));
706
707 selector.update_peer_quality(peer, 1000.0, 0.1, false);
709
710 let criteria = SelectionCriteria {
711 reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
712 min_quality_score: 0.9, max_distance_km: None,
714 max_results: 10,
715 };
716
717 let selected = selector.select_peers(&criteria);
718 assert!(selected.is_empty() || selected[0].quality_score >= criteria.min_quality_score);
720 }
721}