use super::{PeerId, PieceId, SwarmError, SwarmPeer, SwarmResult, swarm_time_now};
use crate::types::Time;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug)]
pub struct PeerSelector {
quality_history: HashMap<PeerId, QualityHistory>,
selection_config: PeerSelectionConfig,
}
#[derive(Debug, Clone)]
pub struct PeerSelectionConfig {
pub speed_weight: f64,
pub reliability_weight: f64,
pub latency_weight: f64,
pub reputation_weight: f64,
pub min_peer_score: f64,
pub max_evaluation_age: Duration,
}
impl Default for PeerSelectionConfig {
fn default() -> Self {
Self {
speed_weight: 0.3,
reliability_weight: 0.3,
latency_weight: 0.2,
reputation_weight: 0.2,
min_peer_score: 0.3,
max_evaluation_age: Duration::from_secs(300), }
}
}
#[derive(Debug, Clone)]
pub struct QualityHistory {
evaluations: Vec<QualityEvaluation>,
reputation_score: f64,
successful_transfers: u64,
failed_transfers: u64,
}
#[derive(Debug, Clone)]
pub(crate) struct QualityEvaluation {
timestamp: Time,
download_speed: f64,
latency: Duration,
reliability: f64,
quality_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerQuality {
pub overall_score: f64,
pub download_speed: f64,
pub upload_speed: f64,
pub avg_response_time: Duration,
pub reliability: f64,
pub successful_transfers: u64,
pub failed_transfers: u64,
pub verification_failures: u64,
pub last_updated: Time,
}
impl Default for PeerQuality {
fn default() -> Self {
Self {
overall_score: 0.5,
download_speed: 1_000_000.0, upload_speed: 1_000_000.0,
avg_response_time: Duration::from_millis(100),
reliability: 0.8,
successful_transfers: 0,
failed_transfers: 0,
verification_failures: 0,
last_updated: swarm_time_now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerReputation {
pub reputation_score: f64,
pub bytes_uploaded: u64,
pub bytes_downloaded: u64,
pub reciprocity_ratio: f64,
pub tokens_earned: u64,
pub tokens_spent: u64,
pub joined_at: Time,
pub session_count: u64,
pub cooperation_score: f64,
}
impl Default for PeerReputation {
fn default() -> Self {
Self {
reputation_score: 0.5,
bytes_uploaded: 0,
bytes_downloaded: 0,
reciprocity_ratio: 1.0,
tokens_earned: 0,
tokens_spent: 0,
joined_at: swarm_time_now(),
session_count: 0,
cooperation_score: 0.8,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathQuality {
pub rtt: Duration,
pub packet_loss: f64,
pub bandwidth: f64,
pub jitter: Duration,
pub hop_count: u32,
pub stability: f64,
}
impl Default for PathQuality {
fn default() -> Self {
Self {
rtt: Duration::from_millis(50),
packet_loss: 0.01,
bandwidth: 10_000_000.0, jitter: Duration::from_millis(5),
hop_count: 10,
stability: 0.9,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerScore {
pub composite_score: f64,
pub speed_score: f64,
pub reliability_score: f64,
pub latency_score: f64,
pub reputation_score: f64,
pub calculated_at: Time,
}
impl PeerSelector {
pub fn new() -> Self {
Self {
quality_history: HashMap::new(),
selection_config: PeerSelectionConfig::default(),
}
}
pub fn with_config(config: PeerSelectionConfig) -> Self {
Self {
quality_history: HashMap::new(),
selection_config: config,
}
}
pub fn select_peers(
&self,
candidates: &[SwarmPeer],
max_peers: usize,
quality_threshold: f64,
) -> SwarmResult<Vec<SwarmPeer>> {
if candidates.is_empty() {
return Err(SwarmError::NoPeersAvailable {
details: "No candidate peers provided".to_string(),
});
}
let qualified_peers: Vec<&SwarmPeer> = candidates
.iter()
.filter(|peer| peer.quality.overall_score >= quality_threshold)
.collect();
if qualified_peers.is_empty() {
return Err(SwarmError::NoPeersAvailable {
details: format!(
"No peers meet quality threshold {} (best: {})",
quality_threshold,
candidates
.iter()
.map(|p| p.quality.overall_score)
.fold(0.0_f64, f64::max)
),
});
}
let mut scored_peers: Vec<(f64, &SwarmPeer)> = qualified_peers
.into_iter()
.map(|peer| {
let score = self.calculate_peer_score(peer);
(score.composite_score, peer)
})
.collect();
scored_peers.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
let selected = scored_peers
.into_iter()
.take(max_peers)
.map(|(_, peer)| peer.clone())
.collect();
Ok(selected)
}
pub fn select_peer_for_piece(
&self,
piece_id: &PieceId,
available_peers: &HashMap<PeerId, SwarmPeer>,
active_loads: &HashMap<PeerId, usize>,
) -> SwarmResult<PeerId> {
let mut candidates = Vec::new();
for (peer_id, peer) in available_peers {
if peer.available_pieces.contains(piece_id) {
let active_count =
active_loads.get(peer_id).copied().unwrap_or(0) + peer.pending_requests.len();
if active_count < peer.capabilities.max_concurrent_uploads {
let score = self.calculate_peer_score(peer);
let load_headroom: f64 = 1.0
- (active_count as f64 / peer.capabilities.max_concurrent_uploads as f64);
candidates.push((
score.composite_score * 0.85 + load_headroom.clamp(0.0, 1.0) * 0.15,
peer_id.clone(),
));
}
}
}
if candidates.is_empty() {
return Err(SwarmError::NoPeersAvailable {
details: format!("No peers available for piece {}", piece_id.as_u64()),
});
}
candidates.sort_by(|a, b| {
b.0.partial_cmp(&a.0)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.1.as_str().cmp(b.1.as_str()))
});
Ok(candidates[0].1.clone())
}
pub fn calculate_peer_score(&self, peer: &SwarmPeer) -> PeerScore {
let config = &self.selection_config;
let speed_score = (peer.quality.download_speed / 10_000_000.0).min(1.0);
let total_transfers = peer.quality.successful_transfers + peer.quality.failed_transfers;
let reliability_score = if total_transfers > 0 {
peer.quality.successful_transfers as f64 / total_transfers as f64
} else {
peer.quality.reliability
};
let latency_ms = peer.quality.avg_response_time.as_millis() as f64;
let latency_score = (1000.0 / (latency_ms + 100.0)).min(1.0);
let reputation_score = peer.reputation.reputation_score;
let composite_score = (speed_score * config.speed_weight)
+ (reliability_score * config.reliability_weight)
+ (latency_score * config.latency_weight)
+ (reputation_score * config.reputation_weight);
PeerScore {
composite_score: composite_score.clamp(0.0, 1.0),
speed_score,
reliability_score,
latency_score,
reputation_score,
calculated_at: swarm_time_now(),
}
}
pub fn update_peer_quality(
&mut self,
peer_id: &PeerId,
download_speed: f64,
latency: Duration,
success: bool,
) {
let evaluation = QualityEvaluation {
timestamp: swarm_time_now(),
download_speed,
latency,
reliability: if success { 1.0 } else { 0.0 },
quality_score: self.calculate_quality_score(download_speed, latency, success),
};
let history = self
.quality_history
.entry(peer_id.clone())
.or_insert_with(|| QualityHistory {
evaluations: Vec::new(),
reputation_score: 0.5,
successful_transfers: 0,
failed_transfers: 0,
});
history.evaluations.push(evaluation);
if success {
history.successful_transfers += 1;
} else {
history.failed_transfers += 1;
}
let cutoff_time = Time::from_nanos(
swarm_time_now()
.as_nanos()
.saturating_sub(self.selection_config.max_evaluation_age.as_nanos() as u64),
);
history
.evaluations
.retain(|eval| eval.timestamp > cutoff_time);
history.reputation_score = Self::calculate_reputation_score(history);
}
fn calculate_quality_score(
&self,
download_speed: f64,
latency: Duration,
success: bool,
) -> f64 {
let speed_factor = (download_speed / 1_000_000.0).min(1.0); let latency_factor = (1000.0 / (latency.as_millis() as f64 + 100.0)).min(1.0);
let success_factor = if success { 1.0 } else { 0.1 };
(speed_factor * 0.4 + latency_factor * 0.3 + success_factor * 0.3).clamp(0.0, 1.0)
}
fn calculate_reputation_score(history: &QualityHistory) -> f64 {
if history.evaluations.is_empty() {
return history.reputation_score;
}
let recent_quality: f64 = history
.evaluations
.iter()
.map(|eval| {
let speed_score = (eval.download_speed / 1_000_000.0).clamp(0.0, 1.0);
let latency_score = (1.0 / (1.0 + eval.latency.as_secs_f64())).clamp(0.0, 1.0);
eval.quality_score * 0.50
+ speed_score * 0.20
+ latency_score * 0.15
+ eval.reliability.clamp(0.0, 1.0) * 0.15
})
.sum::<f64>()
/ history.evaluations.len() as f64;
let total_transfers = history.successful_transfers + history.failed_transfers;
let success_rate = if total_transfers > 0 {
history.successful_transfers as f64 / total_transfers as f64
} else {
0.5
};
(recent_quality * 0.6 + success_rate * 0.4).clamp(0.0, 1.0)
}
pub fn get_quality_history(&self, peer_id: &PeerId) -> Option<&QualityHistory> {
self.quality_history.get(peer_id)
}
pub fn cleanup_old_data(&mut self) {
let cutoff_time = Time::from_nanos(swarm_time_now().as_nanos().saturating_sub(
(self.selection_config.max_evaluation_age.as_nanos() as u64).saturating_mul(2),
));
for history in self.quality_history.values_mut() {
history
.evaluations
.retain(|eval| eval.timestamp > cutoff_time);
}
self.quality_history
.retain(|_, history| !history.evaluations.is_empty());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::atp::swarm::PeerCapabilities;
use std::collections::BTreeSet;
fn create_test_peer(id: &str, quality_score: f64, download_speed: f64) -> SwarmPeer {
SwarmPeer {
peer_id: PeerId::new(id),
endpoint: "127.0.0.1:8080".parse().unwrap(),
available_pieces: BTreeSet::new(),
quality: PeerQuality {
overall_score: quality_score,
download_speed,
..Default::default()
},
reputation: PeerReputation::default(),
last_seen: swarm_time_now(),
pending_requests: BTreeSet::new(),
capabilities: PeerCapabilities::default(),
}
}
#[test]
fn test_peer_selector_creation() {
let selector = PeerSelector::new();
assert_eq!(selector.quality_history.len(), 0);
}
#[test]
fn test_select_peers_empty_candidates() {
let selector = PeerSelector::new();
let result = selector.select_peers(&[], 5, 0.5);
assert!(result.is_err());
}
#[test]
fn test_select_peers_quality_filtering() {
let selector = PeerSelector::new();
let candidates = vec![
create_test_peer("peer1", 0.8, 1000000.0),
create_test_peer("peer2", 0.3, 500000.0), create_test_peer("peer3", 0.7, 2000000.0),
];
let selected = selector.select_peers(&candidates, 5, 0.5).unwrap();
assert_eq!(selected.len(), 2);
let selected_ids: Vec<_> = selected.iter().map(|p| p.peer_id.as_str()).collect();
assert!(selected_ids.contains(&"peer1"));
assert!(selected_ids.contains(&"peer3"));
assert!(!selected_ids.contains(&"peer2"));
}
#[test]
fn test_select_peers_max_limit() {
let selector = PeerSelector::new();
let candidates = vec![
create_test_peer("peer1", 0.9, 1000000.0),
create_test_peer("peer2", 0.8, 1500000.0),
create_test_peer("peer3", 0.7, 2000000.0),
];
let selected = selector.select_peers(&candidates, 2, 0.5).unwrap();
assert_eq!(selected.len(), 2);
}
#[test]
fn test_calculate_peer_score() {
let selector = PeerSelector::new();
let peer = create_test_peer("test", 0.8, 5000000.0);
let score = selector.calculate_peer_score(&peer);
assert!(score.composite_score > 0.0);
assert!(score.composite_score <= 1.0);
assert!(score.speed_score > 0.0);
assert!(score.reliability_score > 0.0);
}
#[test]
fn test_update_peer_quality() {
let mut selector = PeerSelector::new();
let peer_id = PeerId::new("test-peer");
selector.update_peer_quality(&peer_id, 1_000_000.0, Duration::from_millis(100), true);
let history = selector.get_quality_history(&peer_id).unwrap();
assert_eq!(history.evaluations.len(), 1);
assert_eq!(history.successful_transfers, 1);
assert_eq!(history.failed_transfers, 0);
}
#[test]
fn test_cleanup_old_data() {
let mut selector = PeerSelector::with_config(PeerSelectionConfig {
max_evaluation_age: Duration::from_millis(100),
..Default::default()
});
let peer_id = PeerId::new("test-peer");
selector.update_peer_quality(&peer_id, 1_000_000.0, Duration::from_millis(100), true);
std::thread::sleep(Duration::from_millis(200));
selector.cleanup_old_data();
assert!(selector.get_quality_history(&peer_id).is_none());
}
}