ipfrs_network/
peer_selector.rs

1//! Intelligent peer selection combining geographic proximity and connection quality
2//!
3//! This module provides a smart peer selector that combines multiple factors:
4//! - Geographic proximity (via geo_routing)
5//! - Connection quality prediction (via quality_predictor)
6//! - Network topology optimization
7//!
8//! ## Features
9//!
10//! - **Multi-factor scoring**: Combines distance, latency, bandwidth, reliability
11//! - **Configurable weights**: Adjust importance of each factor
12//! - **Smart caching**: Cache selection decisions to reduce overhead
13//! - **Adaptive scoring**: Learn from connection outcomes
14//!
15//! ## Example
16//!
17//! ```rust
18//! use ipfrs_network::peer_selector::{PeerSelector, PeerSelectorConfig, SelectionCriteria};
19//! use ipfrs_network::geo_routing::GeoLocation;
20//! use libp2p::PeerId;
21//!
22//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! let config = PeerSelectorConfig::balanced();
24//! let mut selector = PeerSelector::new(config);
25//!
26//! // Add peer with location and quality metrics
27//! let peer = PeerId::random();
28//! selector.add_peer_location(peer, GeoLocation::new(40.7128, -74.0060));
29//!
30//! // Select best peers based on criteria
31//! let my_location = GeoLocation::new(37.7749, -122.4194);
32//! let criteria = SelectionCriteria {
33//!     reference_location: Some(my_location),
34//!     min_quality_score: 0.5,
35//!     max_distance_km: Some(5000.0),
36//!     max_results: 10,
37//! };
38//!
39//! let selected = selector.select_peers(&criteria);
40//! println!("Selected {} peers", selected.len());
41//! # Ok(())
42//! # }
43//! ```
44
45use crate::geo_routing::{GeoLocation, GeoRouter, GeoRouterConfig};
46use crate::quality_predictor::{QualityPredictor, QualityPredictorConfig};
47use dashmap::DashMap;
48use libp2p::PeerId;
49use parking_lot::RwLock;
50use serde::{Deserialize, Serialize};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53
54/// Configuration for peer selector
55#[derive(Debug, Clone)]
56pub struct PeerSelectorConfig {
57    /// Weight for geographic distance (0.0 - 1.0)
58    pub distance_weight: f64,
59    /// Weight for connection quality (0.0 - 1.0)
60    pub quality_weight: f64,
61    /// Weight for latency (0.0 - 1.0)
62    pub latency_weight: f64,
63    /// Weight for bandwidth (0.0 - 1.0)
64    pub bandwidth_weight: f64,
65    /// Enable selection caching
66    pub enable_caching: bool,
67    /// Cache TTL in seconds
68    pub cache_ttl_secs: u64,
69    /// Maximum cache size
70    pub max_cache_entries: usize,
71}
72
73impl Default for PeerSelectorConfig {
74    fn default() -> Self {
75        Self {
76            distance_weight: 0.3,
77            quality_weight: 0.3,
78            latency_weight: 0.2,
79            bandwidth_weight: 0.2,
80            enable_caching: true,
81            cache_ttl_secs: 300, // 5 minutes
82            max_cache_entries: 1000,
83        }
84    }
85}
86
87impl PeerSelectorConfig {
88    /// Configuration optimized for low latency
89    pub fn low_latency() -> Self {
90        Self {
91            distance_weight: 0.4,
92            quality_weight: 0.1,
93            latency_weight: 0.4,
94            bandwidth_weight: 0.1,
95            enable_caching: true,
96            cache_ttl_secs: 180,
97            max_cache_entries: 500,
98        }
99    }
100
101    /// Configuration optimized for high bandwidth
102    pub fn high_bandwidth() -> Self {
103        Self {
104            distance_weight: 0.1,
105            quality_weight: 0.2,
106            latency_weight: 0.1,
107            bandwidth_weight: 0.6,
108            enable_caching: true,
109            cache_ttl_secs: 300,
110            max_cache_entries: 1000,
111        }
112    }
113
114    /// Balanced configuration
115    pub fn balanced() -> Self {
116        Self::default()
117    }
118
119    /// Configuration for mobile/constrained devices
120    pub fn mobile() -> Self {
121        Self {
122            distance_weight: 0.5, // Prefer nearby peers
123            quality_weight: 0.3,
124            latency_weight: 0.1,
125            bandwidth_weight: 0.1,
126            enable_caching: true,
127            cache_ttl_secs: 600, // Cache longer
128            max_cache_entries: 200,
129        }
130    }
131}
132
133/// Criteria for peer selection
134#[derive(Debug, Clone)]
135pub struct SelectionCriteria {
136    /// Reference location for distance calculation
137    pub reference_location: Option<GeoLocation>,
138    /// Minimum quality score (0.0 - 1.0)
139    pub min_quality_score: f64,
140    /// Maximum distance in kilometers
141    pub max_distance_km: Option<f64>,
142    /// Maximum number of results
143    pub max_results: usize,
144}
145
146impl Default for SelectionCriteria {
147    fn default() -> Self {
148        Self {
149            reference_location: None,
150            min_quality_score: 0.0,
151            max_distance_km: None,
152            max_results: 10,
153        }
154    }
155}
156
157/// Selected peer with score details
158#[derive(Debug, Clone)]
159pub struct SelectedPeer {
160    /// Peer ID
161    pub peer_id: PeerId,
162    /// Overall score (0.0 - 1.0, higher is better)
163    pub score: f64,
164    /// Distance score component
165    pub distance_score: f64,
166    /// Quality score component
167    pub quality_score: f64,
168    /// Latency score component
169    pub latency_score: f64,
170    /// Bandwidth score component
171    pub bandwidth_score: f64,
172    /// Geographic location (if available)
173    pub location: Option<GeoLocation>,
174    /// Distance in kilometers (if location available)
175    pub distance_km: Option<f64>,
176}
177
178/// Cached selection result
179#[derive(Debug, Clone)]
180struct CachedSelection {
181    /// Selected peers
182    peers: Vec<SelectedPeer>,
183    /// Timestamp when cached
184    cached_at: Instant,
185}
186
187/// Statistics for peer selector
188#[derive(Debug, Clone, Default, Serialize, Deserialize)]
189pub struct PeerSelectorStats {
190    /// Total selections performed
191    pub total_selections: u64,
192    /// Cache hits
193    pub cache_hits: u64,
194    /// Cache misses
195    pub cache_misses: u64,
196    /// Total peers evaluated
197    pub total_peers_evaluated: u64,
198    /// Average selection time in microseconds
199    pub avg_selection_time_us: f64,
200}
201
202impl PeerSelectorStats {
203    /// Calculate cache hit rate
204    pub fn cache_hit_rate(&self) -> f64 {
205        let total = self.cache_hits + self.cache_misses;
206        if total > 0 {
207            self.cache_hits as f64 / total as f64
208        } else {
209            0.0
210        }
211    }
212}
213
214/// Intelligent peer selector
215pub struct PeerSelector {
216    /// Configuration
217    config: PeerSelectorConfig,
218    /// Geographic router
219    geo_router: Arc<GeoRouter>,
220    /// Quality predictor
221    quality_predictor: Arc<QualityPredictor>,
222    /// Selection cache
223    cache: Arc<DashMap<String, CachedSelection>>,
224    /// Statistics
225    stats: Arc<RwLock<PeerSelectorStats>>,
226}
227
228impl PeerSelector {
229    /// Create a new peer selector with default configuration
230    pub fn new(config: PeerSelectorConfig) -> Self {
231        let geo_config = GeoRouterConfig::default();
232        let quality_config = QualityPredictorConfig::default();
233
234        Self {
235            config,
236            geo_router: Arc::new(GeoRouter::new(geo_config)),
237            quality_predictor: Arc::new(
238                QualityPredictor::new(quality_config).expect("Default config should be valid"),
239            ),
240            cache: Arc::new(DashMap::new()),
241            stats: Arc::new(RwLock::new(PeerSelectorStats::default())),
242        }
243    }
244
245    /// Create with custom geo and quality configurations
246    pub fn with_configs(
247        config: PeerSelectorConfig,
248        geo_config: GeoRouterConfig,
249        quality_config: QualityPredictorConfig,
250    ) -> Self {
251        Self {
252            config,
253            geo_router: Arc::new(GeoRouter::new(geo_config)),
254            quality_predictor: Arc::new(
255                QualityPredictor::new(quality_config).expect("Config should be valid"),
256            ),
257            cache: Arc::new(DashMap::new()),
258            stats: Arc::new(RwLock::new(PeerSelectorStats::default())),
259        }
260    }
261
262    /// Add or update peer location
263    pub fn add_peer_location(&self, peer_id: PeerId, location: GeoLocation) {
264        self.geo_router.update_peer_location(peer_id, location);
265        self.invalidate_cache();
266    }
267
268    /// Remove peer
269    pub fn remove_peer(&self, peer_id: &PeerId) {
270        self.geo_router.remove_peer(peer_id);
271        self.quality_predictor.remove_peer(peer_id);
272        self.invalidate_cache();
273    }
274
275    /// Update peer quality metrics
276    pub fn update_peer_quality(
277        &self,
278        peer_id: PeerId,
279        latency_ms: f64,
280        bandwidth_mbps: f64,
281        success: bool,
282    ) {
283        self.quality_predictor
284            .record_latency(peer_id, latency_ms as u64);
285        // Convert Mbps to bytes per second
286        let bytes_per_sec = (bandwidth_mbps * 1_000_000.0 / 8.0) as u64;
287        self.quality_predictor
288            .record_bandwidth(peer_id, bytes_per_sec);
289        if !success {
290            self.quality_predictor.record_failure(peer_id);
291        }
292        self.invalidate_cache();
293    }
294
295    /// Select best peers based on criteria
296    pub fn select_peers(&self, criteria: &SelectionCriteria) -> Vec<SelectedPeer> {
297        let start = Instant::now();
298
299        // Check cache if enabled
300        if self.config.enable_caching {
301            let cache_key = self.make_cache_key(criteria);
302            if let Some(cached) = self.cache.get(&cache_key) {
303                let age = start.duration_since(cached.cached_at);
304                if age.as_secs() < self.config.cache_ttl_secs {
305                    let mut stats = self.stats.write();
306                    stats.total_selections += 1;
307                    stats.cache_hits += 1;
308                    return cached.peers.clone();
309                }
310            }
311        }
312
313        // Perform selection
314        let selected = self.select_peers_impl(criteria);
315
316        // Update statistics
317        let elapsed = start.elapsed();
318        let mut stats = self.stats.write();
319        stats.total_selections += 1;
320        stats.cache_misses += 1;
321        stats.total_peers_evaluated += selected.len() as u64;
322        let new_avg = if stats.total_selections > 1 {
323            (stats.avg_selection_time_us * (stats.total_selections - 1) as f64
324                + elapsed.as_micros() as f64)
325                / stats.total_selections as f64
326        } else {
327            elapsed.as_micros() as f64
328        };
329        stats.avg_selection_time_us = new_avg;
330        drop(stats);
331
332        // Cache result
333        if self.config.enable_caching {
334            let cache_key = self.make_cache_key(criteria);
335            self.cache.insert(
336                cache_key,
337                CachedSelection {
338                    peers: selected.clone(),
339                    cached_at: start,
340                },
341            );
342
343            // Enforce cache size limit
344            if self.cache.len() > self.config.max_cache_entries {
345                self.evict_old_cache_entries();
346            }
347        }
348
349        selected
350    }
351
352    /// Internal implementation of peer selection
353    fn select_peers_impl(&self, criteria: &SelectionCriteria) -> Vec<SelectedPeer> {
354        // Get all peers with locations
355        let mut scored_peers = Vec::new();
356
357        // Get geo-ranked peers if location provided
358        let geo_peers = if let Some(ref_location) = &criteria.reference_location {
359            self.geo_router.rank_peers_by_proximity(ref_location)
360        } else {
361            vec![]
362        };
363
364        for geo_peer in geo_peers {
365            // Check distance constraint
366            if let Some(max_dist) = criteria.max_distance_km {
367                if let Some(dist) = geo_peer.distance_km {
368                    if dist > max_dist {
369                        continue;
370                    }
371                }
372            }
373
374            // Calculate component scores
375            let distance_score = self.calculate_distance_score(&geo_peer.distance_km);
376            let quality_prediction = self.quality_predictor.predict_quality(&geo_peer.peer_id);
377
378            let quality_score = quality_prediction
379                .as_ref()
380                .map(|p| p.overall_score)
381                .unwrap_or(0.5);
382            let latency_score = quality_prediction
383                .as_ref()
384                .map(|p| p.latency_score)
385                .unwrap_or(0.5);
386            let bandwidth_score = quality_prediction
387                .as_ref()
388                .map(|p| p.bandwidth_score)
389                .unwrap_or(0.5);
390
391            // Check minimum quality
392            if quality_score < criteria.min_quality_score {
393                continue;
394            }
395
396            // Calculate overall score
397            let overall_score = self.calculate_overall_score(
398                distance_score,
399                quality_score,
400                latency_score,
401                bandwidth_score,
402            );
403
404            scored_peers.push(SelectedPeer {
405                peer_id: geo_peer.peer_id,
406                score: overall_score,
407                distance_score,
408                quality_score,
409                latency_score,
410                bandwidth_score,
411                location: Some(geo_peer.location),
412                distance_km: geo_peer.distance_km,
413            });
414        }
415
416        // Sort by score (descending)
417        scored_peers.sort_by(|a, b| {
418            b.score
419                .partial_cmp(&a.score)
420                .unwrap_or(std::cmp::Ordering::Equal)
421        });
422
423        // Return top N results
424        scored_peers.truncate(criteria.max_results);
425        scored_peers
426    }
427
428    /// Calculate distance score (0.0 - 1.0, higher is better/closer)
429    fn calculate_distance_score(&self, distance_km: &Option<f64>) -> f64 {
430        match distance_km {
431            Some(dist) => {
432                // Use exponential decay: score = e^(-dist/1000)
433                // At 0 km: score = 1.0
434                // At 1000 km: score ≈ 0.368
435                // At 5000 km: score ≈ 0.007
436                (-dist / 1000.0).exp()
437            }
438            None => 0.5, // Neutral score if no location
439        }
440    }
441
442    /// Calculate overall score from components
443    fn calculate_overall_score(
444        &self,
445        distance: f64,
446        quality: f64,
447        latency: f64,
448        bandwidth: f64,
449    ) -> f64 {
450        distance * self.config.distance_weight
451            + quality * self.config.quality_weight
452            + latency * self.config.latency_weight
453            + bandwidth * self.config.bandwidth_weight
454    }
455
456    /// Generate cache key from criteria
457    fn make_cache_key(&self, criteria: &SelectionCriteria) -> String {
458        format!(
459            "loc:{:?}_qual:{}_dist:{:?}_max:{}",
460            criteria.reference_location,
461            criteria.min_quality_score,
462            criteria.max_distance_km,
463            criteria.max_results
464        )
465    }
466
467    /// Invalidate entire cache
468    fn invalidate_cache(&self) {
469        self.cache.clear();
470    }
471
472    /// Evict old cache entries to maintain size limit
473    fn evict_old_cache_entries(&self) {
474        let now = Instant::now();
475        let ttl = Duration::from_secs(self.config.cache_ttl_secs);
476
477        self.cache
478            .retain(|_, entry| now.duration_since(entry.cached_at) < ttl);
479
480        // If still over limit, remove random entries
481        while self.cache.len() > self.config.max_cache_entries {
482            if let Some(entry) = self.cache.iter().next() {
483                let key = entry.key().clone();
484                drop(entry);
485                self.cache.remove(&key);
486            } else {
487                break;
488            }
489        }
490    }
491
492    /// Get statistics
493    pub fn stats(&self) -> PeerSelectorStats {
494        self.stats.read().clone()
495    }
496
497    /// Clear all caches and reset
498    #[allow(dead_code)]
499    pub fn reset(&self) {
500        self.cache.clear();
501        let mut stats = self.stats.write();
502        *stats = PeerSelectorStats::default();
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509
510    #[test]
511    fn test_peer_selector_config() {
512        let config = PeerSelectorConfig::default();
513        assert_eq!(config.distance_weight, 0.3);
514        assert_eq!(config.quality_weight, 0.3);
515    }
516
517    #[test]
518    fn test_config_presets() {
519        let low_latency = PeerSelectorConfig::low_latency();
520        assert!(low_latency.latency_weight > 0.3);
521
522        let high_bandwidth = PeerSelectorConfig::high_bandwidth();
523        assert!(high_bandwidth.bandwidth_weight > 0.5);
524
525        let mobile = PeerSelectorConfig::mobile();
526        assert!(mobile.distance_weight >= 0.5);
527    }
528
529    #[test]
530    fn test_peer_selector_creation() {
531        let config = PeerSelectorConfig::default();
532        let selector = PeerSelector::new(config);
533        let stats = selector.stats();
534        assert_eq!(stats.total_selections, 0);
535    }
536
537    #[test]
538    fn test_add_peer_location() {
539        let config = PeerSelectorConfig::default();
540        let selector = PeerSelector::new(config);
541
542        let peer = PeerId::random();
543        let location = GeoLocation::new(40.7128, -74.0060);
544        selector.add_peer_location(peer, location);
545
546        // Verify peer was added (indirectly through geo_router)
547        let loc = selector.geo_router.get_peer_location(&peer);
548        assert!(loc.is_some());
549    }
550
551    #[test]
552    fn test_remove_peer() {
553        let config = PeerSelectorConfig::default();
554        let selector = PeerSelector::new(config);
555
556        let peer = PeerId::random();
557        let location = GeoLocation::new(40.7128, -74.0060);
558        selector.add_peer_location(peer, location);
559        selector.remove_peer(&peer);
560
561        let loc = selector.geo_router.get_peer_location(&peer);
562        assert!(loc.is_none());
563    }
564
565    #[test]
566    fn test_selection_criteria() {
567        let criteria = SelectionCriteria {
568            reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
569            min_quality_score: 0.5,
570            max_distance_km: Some(1000.0),
571            max_results: 5,
572        };
573
574        assert!(criteria.reference_location.is_some());
575        assert_eq!(criteria.max_results, 5);
576    }
577
578    #[test]
579    fn test_select_peers_empty() {
580        let config = PeerSelectorConfig::default();
581        let selector = PeerSelector::new(config);
582
583        let criteria = SelectionCriteria::default();
584        let selected = selector.select_peers(&criteria);
585        assert_eq!(selected.len(), 0);
586    }
587
588    #[test]
589    fn test_select_peers_with_location() {
590        let config = PeerSelectorConfig::default();
591        let selector = PeerSelector::new(config);
592
593        // Add peers
594        let peer1 = PeerId::random();
595        let peer2 = PeerId::random();
596        selector.add_peer_location(peer1, GeoLocation::new(40.7128, -74.0060)); // NY
597        selector.add_peer_location(peer2, GeoLocation::new(34.0522, -118.2437)); // LA
598
599        // Select from SF
600        let criteria = SelectionCriteria {
601            reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
602            min_quality_score: 0.0,
603            max_distance_km: None,
604            max_results: 10,
605        };
606
607        let selected = selector.select_peers(&criteria);
608        assert!(!selected.is_empty());
609        // LA should be closer to SF than NY
610        if selected.len() == 2 {
611            assert_eq!(selected[0].peer_id, peer2);
612        }
613    }
614
615    #[test]
616    fn test_distance_score_calculation() {
617        let config = PeerSelectorConfig::default();
618        let selector = PeerSelector::new(config);
619
620        let score_close = selector.calculate_distance_score(&Some(100.0));
621        let score_far = selector.calculate_distance_score(&Some(5000.0));
622        let score_none = selector.calculate_distance_score(&None);
623
624        assert!(score_close > score_far);
625        assert_eq!(score_none, 0.5);
626    }
627
628    #[test]
629    fn test_overall_score_calculation() {
630        let config = PeerSelectorConfig::default();
631        let selector = PeerSelector::new(config);
632
633        let score = selector.calculate_overall_score(1.0, 1.0, 1.0, 1.0);
634        assert!(score > 0.0 && score <= 1.0);
635    }
636
637    #[test]
638    fn test_cache_functionality() {
639        let config = PeerSelectorConfig {
640            enable_caching: true,
641            ..Default::default()
642        };
643        let selector = PeerSelector::new(config);
644
645        let peer = PeerId::random();
646        selector.add_peer_location(peer, GeoLocation::new(40.7128, -74.0060));
647
648        let criteria = SelectionCriteria {
649            reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
650            min_quality_score: 0.0,
651            max_distance_km: None,
652            max_results: 10,
653        };
654
655        // First call - cache miss
656        selector.select_peers(&criteria);
657        let stats1 = selector.stats();
658        assert_eq!(stats1.cache_misses, 1);
659
660        // Second call - cache hit
661        selector.select_peers(&criteria);
662        let stats2 = selector.stats();
663        assert_eq!(stats2.cache_hits, 1);
664    }
665
666    #[test]
667    fn test_stats_cache_hit_rate() {
668        let stats = PeerSelectorStats {
669            cache_hits: 7,
670            cache_misses: 3,
671            ..Default::default()
672        };
673        assert!((stats.cache_hit_rate() - 0.7).abs() < 0.01);
674    }
675
676    #[test]
677    fn test_max_distance_filtering() {
678        let config = PeerSelectorConfig::default();
679        let selector = PeerSelector::new(config);
680
681        // Add peers at different distances from SF
682        let peer_la = PeerId::random();
683        let peer_london = PeerId::random();
684        selector.add_peer_location(peer_la, GeoLocation::new(34.0522, -118.2437)); // ~550 km
685        selector.add_peer_location(peer_london, GeoLocation::new(51.5074, -0.1278)); // ~8600 km
686
687        let criteria = SelectionCriteria {
688            reference_location: Some(GeoLocation::new(37.7749, -122.4194)), // SF
689            min_quality_score: 0.0,
690            max_distance_km: Some(1000.0),
691            max_results: 10,
692        };
693
694        let selected = selector.select_peers(&criteria);
695        assert_eq!(selected.len(), 1); // Only LA should be selected
696        assert_eq!(selected[0].peer_id, peer_la);
697    }
698
699    #[test]
700    fn test_min_quality_filtering() {
701        let config = PeerSelectorConfig::default();
702        let selector = PeerSelector::new(config);
703
704        let peer = PeerId::random();
705        selector.add_peer_location(peer, GeoLocation::new(40.7128, -74.0060));
706
707        // Record poor quality
708        selector.update_peer_quality(peer, 1000.0, 0.1, false);
709
710        let criteria = SelectionCriteria {
711            reference_location: Some(GeoLocation::new(37.7749, -122.4194)),
712            min_quality_score: 0.9, // Very high threshold
713            max_distance_km: None,
714            max_results: 10,
715        };
716
717        let selected = selector.select_peers(&criteria);
718        // Peer might be filtered out due to quality
719        assert!(selected.is_empty() || selected[0].quality_score >= criteria.min_quality_score);
720    }
721}