use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; const SELECTION_MIN_PEERS: usize = 5;
const INITIAL_BACKOFF_MS: u64 = 1000; const BACKOFF_MULTIPLIER: u64 = 2; const MAX_BACKOFF_MS: u64 = 480_000;
const MIN_RTO_MS: u64 = 50; const MAX_RTO_MS: u64 = 60_000; const INITIAL_RTO_MS: u64 = 1000;
pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct PersistedPeerMetadata {
pub principal: String,
pub requests_sent: u64,
pub successes: u64,
pub timeouts: u64,
pub failures: u64,
pub srtt_ms: f64,
pub rttvar_ms: f64,
pub rto_ms: u64,
pub bytes_received: u64,
pub bytes_sent: u64,
pub cashu_paid_sat: u64,
pub cashu_received_sat: u64,
pub cashu_payment_receipts: u64,
pub cashu_payment_defaults: u64,
}
impl PersistedPeerMetadata {
fn from_stats(principal: String, stats: &PeerStats) -> Self {
Self {
principal,
requests_sent: stats.requests_sent,
successes: stats.successes,
timeouts: stats.timeouts,
failures: stats.failures,
srtt_ms: sanitize_latency(stats.srtt_ms),
rttvar_ms: sanitize_latency(stats.rttvar_ms),
rto_ms: clamp_rto(stats.rto_ms),
bytes_received: stats.bytes_received,
bytes_sent: stats.bytes_sent,
cashu_paid_sat: stats.cashu_paid_sat,
cashu_received_sat: stats.cashu_received_sat,
cashu_payment_receipts: stats.cashu_payment_receipts,
cashu_payment_defaults: stats.cashu_payment_defaults,
}
}
fn apply_to_stats(&self, stats: &mut PeerStats) {
stats.requests_sent = self.requests_sent;
stats.successes = self.successes;
stats.timeouts = self.timeouts;
stats.failures = self.failures;
stats.srtt_ms = sanitize_latency(self.srtt_ms);
stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
stats.rto_ms = clamp_rto(self.rto_ms);
stats.bytes_received = self.bytes_received;
stats.bytes_sent = self.bytes_sent;
stats.cashu_paid_sat = self.cashu_paid_sat;
stats.cashu_received_sat = self.cashu_received_sat;
stats.cashu_payment_receipts = self.cashu_payment_receipts;
stats.cashu_payment_defaults = self.cashu_payment_defaults;
stats.backoff_level = 0;
stats.backed_off_until = None;
stats.last_success = None;
stats.last_failure = None;
stats.consecutive_rto_backoffs = 0;
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PeerMetadataSnapshot {
pub version: u32,
pub peers: Vec<PersistedPeerMetadata>,
}
impl Default for PeerMetadataSnapshot {
fn default() -> Self {
Self {
version: PEER_METADATA_SNAPSHOT_VERSION,
peers: Vec::new(),
}
}
}
fn sanitize_latency(value: f64) -> f64 {
if value.is_finite() && value >= 0.0 {
value
} else {
0.0
}
}
fn clamp_rto(rto_ms: u64) -> u64 {
if rto_ms == 0 {
INITIAL_RTO_MS
} else {
rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
}
}
pub fn peer_principal(peer_id: &str) -> &str {
peer_id
.split_once(':')
.map(|(principal, _)| principal)
.unwrap_or(peer_id)
}
#[derive(Debug, Clone)]
pub struct PeerStats {
pub peer_id: String,
pub connected_at: Instant,
pub requests_sent: u64,
pub successes: u64,
pub timeouts: u64,
pub failures: u64,
pub srtt_ms: f64,
pub rttvar_ms: f64,
pub rto_ms: u64,
pub consecutive_rto_backoffs: u32,
pub backoff_level: u32,
pub backed_off_until: Option<Instant>,
pub last_success: Option<Instant>,
pub last_failure: Option<Instant>,
pub bytes_received: u64,
pub bytes_sent: u64,
pub cashu_paid_sat: u64,
pub cashu_received_sat: u64,
pub cashu_payment_receipts: u64,
pub cashu_payment_defaults: u64,
}
impl PeerStats {
pub fn new(peer_id: impl Into<String>) -> Self {
Self {
peer_id: peer_id.into(),
connected_at: Instant::now(),
requests_sent: 0,
successes: 0,
timeouts: 0,
failures: 0,
srtt_ms: 0.0,
rttvar_ms: 0.0,
rto_ms: INITIAL_RTO_MS,
consecutive_rto_backoffs: 0,
backoff_level: 0,
backed_off_until: None,
last_success: None,
last_failure: None,
bytes_received: 0,
bytes_sent: 0,
cashu_paid_sat: 0,
cashu_received_sat: 0,
cashu_payment_receipts: 0,
cashu_payment_defaults: 0,
}
}
pub fn success_rate(&self) -> f64 {
if self.requests_sent == 0 {
return 0.5; }
self.successes as f64 / self.requests_sent as f64
}
pub fn selection_rate(&self) -> f64 {
let elapsed = self.connected_at.elapsed();
if elapsed.as_secs() < 10 {
return 0.0; }
self.requests_sent as f64 / elapsed.as_secs_f64()
}
pub fn is_backed_off(&self) -> bool {
if let Some(until) = self.backed_off_until {
Instant::now() < until
} else {
false
}
}
pub fn backoff_remaining(&self) -> Duration {
if let Some(until) = self.backed_off_until {
let now = Instant::now();
if now < until {
return until - now;
}
}
Duration::ZERO
}
pub fn record_request(&mut self, bytes: u64) {
self.requests_sent += 1;
self.bytes_sent += bytes;
}
pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
self.successes += 1;
self.bytes_received += bytes;
self.last_success = Some(Instant::now());
self.consecutive_rto_backoffs = 0;
self.backed_off_until = None;
self.backoff_level = 0;
let rtt = rtt_ms as f64;
if self.srtt_ms == 0.0 {
self.srtt_ms = rtt;
self.rttvar_ms = rtt / 2.0;
} else {
self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
}
let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
}
pub fn record_timeout(&mut self) {
self.timeouts += 1;
self.last_failure = Some(Instant::now());
self.apply_backoff();
if self.consecutive_rto_backoffs < 5 {
self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
self.consecutive_rto_backoffs += 1;
}
}
pub fn record_failure(&mut self) {
self.failures += 1;
self.last_failure = Some(Instant::now());
self.apply_backoff();
}
pub fn record_cashu_payment(&mut self, amount_sat: u64) {
if amount_sat == 0 {
return;
}
self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
}
pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
if amount_sat == 0 {
return;
}
self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
}
pub fn record_cashu_payment_default(&mut self) {
self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
self.last_failure = Some(Instant::now());
self.apply_backoff();
}
fn apply_backoff(&mut self) {
self.backoff_level += 1;
let backoff_ms = (INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(self.backoff_level - 1))
.min(MAX_BACKOFF_MS);
self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
}
pub fn score(&self) -> f64 {
let success_score = self.success_rate();
let rtt_score = if self.srtt_ms <= 0.0 {
0.5 } else {
(500.0 / (self.srtt_ms + 50.0)).min(1.0)
};
let recency_bonus = if let Some(last) = self.last_success {
let secs_ago = last.elapsed().as_secs_f64();
if secs_ago < 60.0 {
0.1 } else {
0.0
}
} else {
0.0
};
0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
}
pub fn utility_score(&self, total_requests: u64) -> f64 {
let good = self.successes as f64 + 1.0;
let bad = (self.failures + self.timeouts) as f64 + 1.0;
let ratio = good / bad;
let ratio_score = ratio / (1.0 + ratio);
let latency_score = if self.srtt_ms <= 0.0 {
0.5
} else {
(300.0 / (self.srtt_ms + 50.0)).min(1.0)
};
let efficiency_score = if self.bytes_sent == 0 {
0.5
} else {
(self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
};
let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
let uncertainty =
(((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
let exploration_bonus = 0.20 * uncertainty;
exploitation + exploration_bonus
}
pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
let reciprocity_raw = if self.bytes_sent == 0 {
1.0
} else {
self.bytes_received as f64 / self.bytes_sent as f64
};
let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
let reciprocity =
(1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
let rtt_score = if self.srtt_ms <= 0.0 {
0.5
} else {
(400.0 / (self.srtt_ms + 50.0)).min(1.0)
};
let timeout_rate = if self.requests_sent == 0 {
0.0
} else {
self.timeouts as f64 / self.requests_sent as f64
};
let failure_rate = if self.requests_sent == 0 {
0.0
} else {
self.failures as f64 / self.requests_sent as f64
};
let retaliation_penalty =
(0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
.min(0.95);
let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
let exploration = 0.03
* (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
(cooperative + exploration - retaliation_penalty).max(0.0)
}
pub fn cashu_priority_boost(&self) -> f64 {
if self.cashu_paid_sat == 0 {
return 0.0;
}
let paid = self.cashu_paid_sat as f64;
paid / (paid + 32.0)
}
pub fn payment_reliability_multiplier(&self) -> f64 {
if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
return 1.0;
}
(self.cashu_payment_receipts as f64 + 1.0)
/ (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
}
pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
threshold > 0 && self.cashu_payment_defaults >= threshold
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
pub enum SelectionStrategy {
#[default]
Weighted,
RoundRobin,
Random,
LowestLatency,
HighestSuccessRate,
TitForTat,
UtilityUcb,
}
#[derive(Debug, Default)]
pub struct PeerSelector {
stats: HashMap<String, PeerStats>,
persisted_metadata: HashMap<String, PersistedPeerMetadata>,
strategy: SelectionStrategy,
fairness_enabled: bool,
round_robin_idx: usize,
cashu_payment_weight: f64,
}
impl PeerSelector {
pub fn new() -> Self {
Self {
stats: HashMap::new(),
persisted_metadata: HashMap::new(),
strategy: SelectionStrategy::Weighted,
fairness_enabled: true,
round_robin_idx: 0,
cashu_payment_weight: 0.0,
}
}
pub fn with_strategy(strategy: SelectionStrategy) -> Self {
Self {
stats: HashMap::new(),
persisted_metadata: HashMap::new(),
strategy,
fairness_enabled: true,
round_robin_idx: 0,
cashu_payment_weight: 0.0,
}
}
pub fn set_fairness(&mut self, enabled: bool) {
self.fairness_enabled = enabled;
}
pub fn set_cashu_payment_weight(&mut self, weight: f64) {
self.cashu_payment_weight = weight.clamp(0.0, 1.0);
}
pub fn add_peer(&mut self, peer_id: impl Into<String>) {
let peer_id = peer_id.into();
if self.stats.contains_key(&peer_id) {
return;
}
let mut stats = PeerStats::new(peer_id.clone());
if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
saved.apply_to_stats(&mut stats);
}
self.stats.insert(peer_id, stats);
}
pub fn remove_peer(&mut self, peer_id: &str) {
if let Some(stats) = self.stats.remove(peer_id) {
let principal = peer_principal(&stats.peer_id).to_string();
self.persisted_metadata.insert(
principal.clone(),
PersistedPeerMetadata::from_stats(principal, &stats),
);
}
}
pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
self.stats.get(peer_id)
}
pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
self.stats.get_mut(peer_id)
}
pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
self.stats.values()
}
pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
if let Some(stats) = self.stats.get_mut(peer_id) {
stats.record_request(bytes);
}
}
pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
if let Some(stats) = self.stats.get_mut(peer_id) {
stats.record_success(rtt_ms, bytes);
}
}
pub fn record_timeout(&mut self, peer_id: &str) {
if let Some(stats) = self.stats.get_mut(peer_id) {
stats.record_timeout();
}
}
pub fn record_failure(&mut self, peer_id: &str) {
if let Some(stats) = self.stats.get_mut(peer_id) {
stats.record_failure();
}
}
pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
if amount_sat == 0 {
return;
}
let entry = self
.stats
.entry(peer_id.to_string())
.or_insert_with(|| PeerStats::new(peer_id.to_string()));
entry.record_cashu_payment(amount_sat);
}
pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
if amount_sat == 0 {
return;
}
let entry = self
.stats
.entry(peer_id.to_string())
.or_insert_with(|| PeerStats::new(peer_id.to_string()));
entry.record_cashu_receipt(amount_sat);
}
pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
let entry = self
.stats
.entry(peer_id.to_string())
.or_insert_with(|| PeerStats::new(peer_id.to_string()));
entry.record_cashu_payment_default();
}
pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
self.stats
.get(peer_id)
.map(|stats| stats.exceeds_payment_default_threshold(threshold))
.unwrap_or(false)
}
fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
let reliable_base = base_score * stats.payment_reliability_multiplier();
if self.cashu_payment_weight <= 0.0 {
return reliable_base;
}
let payment_score = stats.cashu_priority_boost();
(1.0 - self.cashu_payment_weight) * reliable_base
+ self.cashu_payment_weight * payment_score
}
fn available_peers(&self) -> Vec<String> {
self.stats
.iter()
.filter(|(_, s)| !s.is_backed_off())
.map(|(id, _)| id.clone())
.collect()
}
#[cfg(test)]
fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
self.should_skip_for_fairness_with_total(peer_id, total_rate)
}
fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
return false;
}
if let Some(stats) = self.stats.get(peer_id) {
let peer_rate = stats.selection_rate();
let proportion = peer_rate / total_rate;
return proportion > SELECTION_PERCENTAGE_WARNING;
}
false
}
pub fn select_peers(&mut self) -> Vec<String> {
let available = self.available_peers();
if available.is_empty() {
let mut backed_off: Vec<_> = self
.stats
.iter()
.filter(|(_, s)| s.is_backed_off())
.map(|(id, s)| (id.clone(), s.backoff_remaining()))
.collect();
backed_off.sort_by_key(|(_, remaining)| *remaining);
return backed_off.into_iter().map(|(id, _)| id).collect();
}
let candidates: Vec<String> =
if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
available
.into_iter()
.filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
.collect()
} else {
available
};
let candidates = if candidates.is_empty() {
self.available_peers()
} else {
candidates
};
let mut sorted: Vec<_> = candidates
.into_iter()
.filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
.collect();
match self.strategy {
SelectionStrategy::Weighted => {
sorted.sort_by(|(id_a, a), (id_b, b)| {
let score_a = self.blend_with_payment_priority(a, a.score());
let score_b = self.blend_with_payment_priority(b, b.score());
let score_cmp = score_b
.partial_cmp(&score_a)
.unwrap_or(std::cmp::Ordering::Equal);
if score_cmp == std::cmp::Ordering::Equal {
id_a.cmp(id_b) } else {
score_cmp
}
});
}
SelectionStrategy::LowestLatency => {
sorted.sort_by(|(id_a, a), (id_b, b)| {
let rtt_cmp = a
.srtt_ms
.partial_cmp(&b.srtt_ms)
.unwrap_or(std::cmp::Ordering::Equal);
if rtt_cmp == std::cmp::Ordering::Equal {
let score_cmp = b
.score()
.partial_cmp(&a.score())
.unwrap_or(std::cmp::Ordering::Equal);
if score_cmp == std::cmp::Ordering::Equal {
id_a.cmp(id_b)
} else {
score_cmp
}
} else {
rtt_cmp
}
});
}
SelectionStrategy::HighestSuccessRate => {
sorted.sort_by(|(id_a, a), (id_b, b)| {
let rate_cmp = b
.success_rate()
.partial_cmp(&a.success_rate())
.unwrap_or(std::cmp::Ordering::Equal);
if rate_cmp == std::cmp::Ordering::Equal {
id_a.cmp(id_b)
} else {
rate_cmp
}
});
}
SelectionStrategy::TitForTat => {
let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
sorted.sort_by(|(id_a, a), (id_b, b)| {
let score_a =
self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
let score_b =
self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
let score_cmp = score_b
.partial_cmp(&score_a)
.unwrap_or(std::cmp::Ordering::Equal);
if score_cmp == std::cmp::Ordering::Equal {
id_a.cmp(id_b)
} else {
score_cmp
}
});
}
SelectionStrategy::UtilityUcb => {
let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
sorted.sort_by(|(id_a, a), (id_b, b)| {
let score_a =
self.blend_with_payment_priority(a, a.utility_score(total_requests));
let score_b =
self.blend_with_payment_priority(b, b.utility_score(total_requests));
let score_cmp = score_b
.partial_cmp(&score_a)
.unwrap_or(std::cmp::Ordering::Equal);
if score_cmp == std::cmp::Ordering::Equal {
id_a.cmp(id_b)
} else {
score_cmp
}
});
}
SelectionStrategy::RoundRobin => {
if !sorted.is_empty() {
let idx = self.round_robin_idx % sorted.len();
sorted.rotate_left(idx);
self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
}
}
SelectionStrategy::Random => {
}
}
sorted.into_iter().map(|(id, _)| id).collect()
}
pub fn select_best(&mut self) -> Option<String> {
self.select_peers().into_iter().next()
}
pub fn select_top(&mut self, n: usize) -> Vec<String> {
self.select_peers().into_iter().take(n).collect()
}
pub fn summary(&self) -> SelectorSummary {
let count = self.stats.len();
if count == 0 {
return SelectorSummary::default();
}
let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
let avg_rtt = {
let rtts: Vec<f64> = self
.stats
.values()
.filter(|s| s.srtt_ms > 0.0)
.map(|s| s.srtt_ms)
.collect();
if rtts.is_empty() {
0.0
} else {
rtts.iter().sum::<f64>() / rtts.len() as f64
}
};
SelectorSummary {
peer_count: count,
total_requests,
total_successes,
total_timeouts,
backed_off_count: backed_off,
avg_rtt_ms: avg_rtt,
overall_success_rate: if total_requests > 0 {
total_successes as f64 / total_requests as f64
} else {
0.0
},
}
}
pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
let mut by_principal = self.persisted_metadata.clone();
for stats in self.stats.values() {
let principal = peer_principal(&stats.peer_id).to_string();
by_principal.insert(
principal.clone(),
PersistedPeerMetadata::from_stats(principal, stats),
);
}
let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
peers.sort_by(|a, b| a.principal.cmp(&b.principal));
PeerMetadataSnapshot {
version: PEER_METADATA_SNAPSHOT_VERSION,
peers,
}
}
pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
return;
}
self.persisted_metadata.clear();
for peer in &snapshot.peers {
self.persisted_metadata
.insert(peer.principal.clone(), peer.clone());
}
for stats in self.stats.values_mut() {
if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
saved.apply_to_stats(stats);
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SelectorSummary {
pub peer_count: usize,
pub total_requests: u64,
pub total_successes: u64,
pub total_timeouts: u64,
pub backed_off_count: usize,
pub avg_rtt_ms: f64,
pub overall_success_rate: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread::sleep;
#[test]
fn test_peer_stats_success_rate() {
let mut stats = PeerStats::new("peer1");
assert_eq!(stats.success_rate(), 0.5);
stats.record_request(40);
stats.record_success(50, 1024);
assert_eq!(stats.success_rate(), 1.0);
stats.record_request(40);
stats.record_timeout();
assert_eq!(stats.success_rate(), 0.5);
}
#[test]
fn test_peer_stats_rtt_calculation() {
let mut stats = PeerStats::new("peer1");
stats.record_request(40);
stats.record_success(100, 1024);
assert_eq!(stats.srtt_ms, 100.0);
assert_eq!(stats.rttvar_ms, 50.0);
stats.record_request(40);
stats.record_success(80, 1024);
assert!((stats.srtt_ms - 97.5).abs() < 0.1);
}
#[test]
fn test_peer_stats_backoff() {
let mut stats = PeerStats::new("peer1");
assert!(!stats.is_backed_off());
stats.record_timeout();
assert!(stats.is_backed_off());
assert!(stats.backoff_remaining() > Duration::ZERO);
}
#[test]
fn test_peer_stats_backoff_clears_on_success() {
let mut stats = PeerStats::new("peer1");
stats.record_timeout();
assert!(stats.is_backed_off());
stats.record_success(50, 1024);
assert!(!stats.is_backed_off());
assert_eq!(stats.backoff_level, 0);
}
#[test]
fn test_peer_selector_add_remove() {
let mut selector = PeerSelector::new();
selector.add_peer("peer1");
selector.add_peer("peer2");
assert!(selector.get_stats("peer1").is_some());
assert!(selector.get_stats("peer2").is_some());
selector.remove_peer("peer1");
assert!(selector.get_stats("peer1").is_none());
assert!(selector.get_stats("peer2").is_some());
}
#[test]
fn test_peer_selector_weighted_selection() {
let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
selector.add_peer("peer1");
selector.add_peer("peer2");
selector.add_peer("peer3");
selector.record_request("peer1", 40);
selector.record_success("peer1", 20, 1024);
selector.record_request("peer1", 40);
selector.record_success("peer1", 25, 1024);
selector.record_request("peer2", 40);
selector.record_success("peer2", 100, 1024);
selector.record_request("peer2", 40);
selector.record_timeout("peer2");
selector.record_request("peer3", 40);
selector.record_timeout("peer3");
selector.record_request("peer3", 40);
selector.record_timeout("peer3");
let peers = selector.select_peers();
assert_eq!(peers[0], "peer1");
}
#[test]
fn test_peer_selector_backed_off_peers() {
let mut selector = PeerSelector::new();
selector.add_peer("peer1");
selector.add_peer("peer2");
selector.record_timeout("peer1");
assert!(selector.get_stats("peer1").unwrap().is_backed_off());
let peers = selector.select_peers();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0], "peer2");
}
#[test]
fn test_peer_selector_all_backed_off_fallback() {
let mut selector = PeerSelector::new();
selector.add_peer("peer1");
selector.add_peer("peer2");
selector.record_timeout("peer1");
selector.record_timeout("peer2");
let peers = selector.select_peers();
assert_eq!(peers.len(), 2);
}
#[test]
fn test_peer_selector_fairness() {
let mut selector = PeerSelector::new();
selector.set_fairness(true);
for i in 1..=6 {
selector.add_peer(format!("peer{}", i));
}
sleep(Duration::from_millis(15));
for _ in 0..100 {
selector.record_request("peer1", 40);
selector.record_success("peer1", 10, 100);
}
for i in 2..=6 {
selector.record_request(&format!("peer{}", i), 40);
selector.record_success(&format!("peer{}", i), 10, 100);
}
let skipped = selector.should_skip_for_fairness("peer1");
let _ = skipped; }
#[test]
fn test_peer_selector_summary() {
let mut selector = PeerSelector::new();
selector.add_peer("peer1");
selector.add_peer("peer2");
selector.record_request("peer1", 40);
selector.record_success("peer1", 50, 1024);
selector.record_request("peer2", 40);
selector.record_timeout("peer2");
let summary = selector.summary();
assert_eq!(summary.peer_count, 2);
assert_eq!(summary.total_requests, 2);
assert_eq!(summary.total_successes, 1);
assert_eq!(summary.total_timeouts, 1);
assert_eq!(summary.backed_off_count, 1);
assert_eq!(summary.overall_success_rate, 0.5);
}
#[test]
fn test_peer_stats_score() {
let mut stats = PeerStats::new("peer1");
let initial_score = stats.score();
assert!(initial_score > 0.3 && initial_score < 0.7);
for _ in 0..10 {
stats.record_request(40);
stats.record_success(20, 1024);
}
let good_score = stats.score();
assert!(good_score > 0.8);
let mut bad_stats = PeerStats::new("peer2");
for _ in 0..10 {
bad_stats.record_request(40);
bad_stats.record_timeout();
}
let bad_score = bad_stats.score();
assert!(bad_score < 0.3);
assert!(good_score > bad_score);
}
#[test]
fn test_peer_stats_utility_score_prefers_good_over_bad() {
let mut good = PeerStats::new("good");
good.requests_sent = 120;
good.successes = 96;
good.failures = 8;
good.timeouts = 4;
good.srtt_ms = 30.0;
good.bytes_sent = 120 * 40;
good.bytes_received = 96 * 1024;
let mut bad = PeerStats::new("bad");
bad.requests_sent = 120;
bad.successes = 40;
bad.failures = 50;
bad.timeouts = 30;
bad.srtt_ms = 220.0;
bad.bytes_sent = 120 * 40;
bad.bytes_received = 40 * 1024;
let total_requests = good.requests_sent + bad.requests_sent;
assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
}
#[test]
fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
let mut reciprocal = PeerStats::new("reciprocal");
reciprocal.requests_sent = 100;
reciprocal.successes = 90;
reciprocal.failures = 5;
reciprocal.timeouts = 5;
reciprocal.srtt_ms = 40.0;
reciprocal.bytes_sent = 100 * 40;
reciprocal.bytes_received = 90 * 1024;
let mut leecher = PeerStats::new("leecher");
leecher.requests_sent = 100;
leecher.successes = 40;
leecher.failures = 30;
leecher.timeouts = 30;
leecher.srtt_ms = 120.0;
leecher.bytes_sent = 100 * 40;
leecher.bytes_received = 10 * 1024;
let total_requests = reciprocal.requests_sent + leecher.requests_sent;
assert!(
reciprocal.tit_for_tat_score(total_requests)
> leecher.tit_for_tat_score(total_requests)
);
}
#[test]
fn test_utility_ucb_strategy_explores_less_sampled_peer() {
let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
selector.add_peer("stable");
selector.add_peer("new");
{
let stable = selector.get_stats_mut("stable").unwrap();
stable.requests_sent = 500;
stable.successes = 450;
stable.failures = 35;
stable.timeouts = 15;
stable.srtt_ms = 35.0;
stable.bytes_sent = 500 * 40;
stable.bytes_received = 450 * 1024;
}
{
let new_peer = selector.get_stats_mut("new").unwrap();
new_peer.requests_sent = 2;
new_peer.successes = 2;
new_peer.failures = 0;
new_peer.timeouts = 0;
new_peer.srtt_ms = 70.0;
new_peer.bytes_sent = 2 * 40;
new_peer.bytes_received = 2 * 1024;
}
let peers = selector.select_peers();
assert_eq!(peers[0], "new");
}
#[test]
fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
selector.add_peer("reciprocal");
selector.add_peer("leecher");
{
let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
reciprocal.requests_sent = 120;
reciprocal.successes = 102;
reciprocal.failures = 8;
reciprocal.timeouts = 10;
reciprocal.srtt_ms = 45.0;
reciprocal.bytes_sent = 120 * 40;
reciprocal.bytes_received = 102 * 1024;
}
{
let leecher = selector.get_stats_mut("leecher").unwrap();
leecher.requests_sent = 120;
leecher.successes = 70;
leecher.failures = 20;
leecher.timeouts = 30;
leecher.srtt_ms = 35.0;
leecher.bytes_sent = 120 * 40;
leecher.bytes_received = 8 * 1024;
}
let peers = selector.select_peers();
assert_eq!(peers[0], "reciprocal");
}
#[test]
fn test_lowest_latency_strategy() {
let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
selector.add_peer("peer1");
selector.add_peer("peer2");
selector.add_peer("peer3");
selector.record_request("peer1", 40);
selector.record_success("peer1", 100, 1024);
selector.record_request("peer2", 40);
selector.record_success("peer2", 20, 1024);
selector.record_request("peer3", 40);
selector.record_success("peer3", 50, 1024);
let peers = selector.select_peers();
assert_eq!(peers[0], "peer2");
}
fn build_cashu_priority_fixture() -> PeerSelector {
let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
selector.add_peer("reliable");
selector.add_peer("paid");
{
let reliable = selector.get_stats_mut("reliable").expect("reliable");
reliable.requests_sent = 80;
reliable.successes = 75;
reliable.failures = 2;
reliable.timeouts = 3;
reliable.srtt_ms = 40.0;
reliable.bytes_sent = 80 * 40;
reliable.bytes_received = 75 * 1024;
}
{
let paid = selector.get_stats_mut("paid").expect("paid");
paid.requests_sent = 80;
paid.successes = 36;
paid.failures = 24;
paid.timeouts = 20;
paid.srtt_ms = 700.0;
paid.bytes_sent = 80 * 40;
paid.bytes_received = 36 * 512;
}
selector
}
#[test]
fn test_cashu_payment_weight_zero_keeps_reputation_order() {
let mut selector = build_cashu_priority_fixture();
selector.set_cashu_payment_weight(0.0);
selector.record_cashu_payment("paid", 5_000);
let peers = selector.select_peers();
assert_eq!(peers[0], "reliable");
}
#[test]
fn test_cashu_payment_weight_prioritizes_paid_peer() {
let mut selector = build_cashu_priority_fixture();
selector.set_cashu_payment_weight(0.8);
selector.record_cashu_payment("paid", 5_000);
let peers = selector.select_peers();
assert_eq!(peers[0], "paid");
}
#[test]
fn test_cashu_payment_default_downranks_peer() {
let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
selector.add_peer("honest");
selector.add_peer("delinquent");
for peer_id in ["honest", "delinquent"] {
let stats = selector.get_stats_mut(peer_id).expect("stats");
stats.requests_sent = 40;
stats.successes = 34;
stats.failures = 3;
stats.timeouts = 3;
stats.srtt_ms = 60.0;
stats.bytes_sent = 40 * 40;
stats.bytes_received = 34 * 1024;
}
selector.record_cashu_payment_default("delinquent");
let peers = selector.select_peers();
assert_eq!(peers[0], "honest");
assert!(!peers.iter().any(|peer| peer == "delinquent"));
}
#[test]
fn test_payment_default_threshold_blocks_peer() {
let mut selector = PeerSelector::new();
selector.record_cashu_payment_default("peer-a");
assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
}
#[test]
fn test_peer_principal_prefers_stable_identity_prefix() {
assert_eq!(peer_principal("npub1abc:session-1"), "npub1abc");
assert_eq!(peer_principal("npub1abc"), "npub1abc");
}
#[test]
fn test_metadata_snapshot_restores_across_session_ids() {
let mut selector = PeerSelector::new();
selector.add_peer("npub1stable:session-a");
selector.record_request("npub1stable:session-a", 64);
selector.record_success("npub1stable:session-a", 32, 1024);
selector.record_cashu_payment("npub1stable:session-a", 77);
selector.record_cashu_receipt("npub1stable:session-a", 33);
selector.record_cashu_payment_default("npub1stable:session-a");
let snapshot = selector.export_peer_metadata_snapshot();
assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
assert_eq!(snapshot.peers.len(), 1);
assert_eq!(snapshot.peers[0].principal, "npub1stable");
let mut restored = PeerSelector::new();
restored.import_peer_metadata_snapshot(&snapshot);
restored.add_peer("npub1stable:session-b");
let stats = restored
.get_stats("npub1stable:session-b")
.expect("restored stats");
assert_eq!(stats.requests_sent, 1);
assert_eq!(stats.successes, 1);
assert_eq!(stats.cashu_paid_sat, 77);
assert_eq!(stats.cashu_received_sat, 33);
assert_eq!(stats.cashu_payment_receipts, 1);
assert_eq!(stats.cashu_payment_defaults, 1);
}
}