1use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; const SELECTION_MIN_PEERS: usize = 5; const INITIAL_BACKOFF_MS: u64 = 1000; const BACKOFF_MULTIPLIER: u64 = 2; const MAX_BACKOFF_MS: u64 = 480_000; const MIN_RTO_MS: u64 = 50; const MAX_RTO_MS: u64 = 60_000; const INITIAL_RTO_MS: u64 = 1000; pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
36pub struct PersistedPeerMetadata {
37 pub principal: String,
39 pub requests_sent: u64,
40 pub successes: u64,
41 pub timeouts: u64,
42 pub failures: u64,
43 pub srtt_ms: f64,
44 pub rttvar_ms: f64,
45 pub rto_ms: u64,
46 pub bytes_received: u64,
47 pub bytes_sent: u64,
48 pub cashu_paid_sat: u64,
49 pub cashu_received_sat: u64,
50 pub cashu_payment_receipts: u64,
51 pub cashu_payment_defaults: u64,
52}
53
54impl PersistedPeerMetadata {
55 fn from_stats(principal: String, stats: &PeerStats) -> Self {
56 Self {
57 principal,
58 requests_sent: stats.requests_sent,
59 successes: stats.successes,
60 timeouts: stats.timeouts,
61 failures: stats.failures,
62 srtt_ms: sanitize_latency(stats.srtt_ms),
63 rttvar_ms: sanitize_latency(stats.rttvar_ms),
64 rto_ms: clamp_rto(stats.rto_ms),
65 bytes_received: stats.bytes_received,
66 bytes_sent: stats.bytes_sent,
67 cashu_paid_sat: stats.cashu_paid_sat,
68 cashu_received_sat: stats.cashu_received_sat,
69 cashu_payment_receipts: stats.cashu_payment_receipts,
70 cashu_payment_defaults: stats.cashu_payment_defaults,
71 }
72 }
73
74 fn apply_to_stats(&self, stats: &mut PeerStats) {
75 stats.requests_sent = self.requests_sent;
76 stats.successes = self.successes;
77 stats.timeouts = self.timeouts;
78 stats.failures = self.failures;
79 stats.srtt_ms = sanitize_latency(self.srtt_ms);
80 stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
81 stats.rto_ms = clamp_rto(self.rto_ms);
82 stats.bytes_received = self.bytes_received;
83 stats.bytes_sent = self.bytes_sent;
84 stats.cashu_paid_sat = self.cashu_paid_sat;
85 stats.cashu_received_sat = self.cashu_received_sat;
86 stats.cashu_payment_receipts = self.cashu_payment_receipts;
87 stats.cashu_payment_defaults = self.cashu_payment_defaults;
88
89 stats.backoff_level = 0;
91 stats.backed_off_until = None;
92 stats.last_success = None;
93 stats.last_failure = None;
94 stats.consecutive_rto_backoffs = 0;
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct PeerMetadataSnapshot {
101 pub version: u32,
102 pub peers: Vec<PersistedPeerMetadata>,
103}
104
105impl Default for PeerMetadataSnapshot {
106 fn default() -> Self {
107 Self {
108 version: PEER_METADATA_SNAPSHOT_VERSION,
109 peers: Vec::new(),
110 }
111 }
112}
113
114fn sanitize_latency(value: f64) -> f64 {
115 if value.is_finite() && value >= 0.0 {
116 value
117 } else {
118 0.0
119 }
120}
121
122fn compute_backoff_ms(level: u32) -> u64 {
123 if level == 0 {
124 return 0;
125 }
126
127 let mut backoff_ms = INITIAL_BACKOFF_MS;
128 for _ in 1..level {
129 backoff_ms = backoff_ms.saturating_mul(BACKOFF_MULTIPLIER);
130 if backoff_ms >= MAX_BACKOFF_MS {
131 return MAX_BACKOFF_MS;
132 }
133 }
134
135 backoff_ms.min(MAX_BACKOFF_MS)
136}
137
138fn clamp_rto(rto_ms: u64) -> u64 {
139 if rto_ms == 0 {
140 INITIAL_RTO_MS
141 } else {
142 rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
143 }
144}
145
146pub fn peer_principal(peer_id: &str) -> &str {
151 peer_id
152}
153
154#[derive(Debug, Clone)]
156pub struct PeerStats {
157 pub peer_id: String,
159 pub connected_at: Instant,
161 pub requests_sent: u64,
163 pub successes: u64,
165 pub timeouts: u64,
167 pub failures: u64,
169 pub srtt_ms: f64,
171 pub rttvar_ms: f64,
173 pub rto_ms: u64,
175 pub consecutive_rto_backoffs: u32,
177 pub backoff_level: u32,
179 pub backed_off_until: Option<Instant>,
181 pub last_success: Option<Instant>,
183 pub last_failure: Option<Instant>,
185 pub bytes_received: u64,
187 pub bytes_sent: u64,
189 pub cashu_paid_sat: u64,
191 pub cashu_received_sat: u64,
193 pub cashu_payment_receipts: u64,
195 pub cashu_payment_defaults: u64,
197}
198
199impl PeerStats {
200 pub fn new(peer_id: impl Into<String>) -> Self {
202 Self {
203 peer_id: peer_id.into(),
204 connected_at: Instant::now(),
205 requests_sent: 0,
206 successes: 0,
207 timeouts: 0,
208 failures: 0,
209 srtt_ms: 0.0,
210 rttvar_ms: 0.0,
211 rto_ms: INITIAL_RTO_MS,
212 consecutive_rto_backoffs: 0,
213 backoff_level: 0,
214 backed_off_until: None,
215 last_success: None,
216 last_failure: None,
217 bytes_received: 0,
218 bytes_sent: 0,
219 cashu_paid_sat: 0,
220 cashu_received_sat: 0,
221 cashu_payment_receipts: 0,
222 cashu_payment_defaults: 0,
223 }
224 }
225
226 pub fn success_rate(&self) -> f64 {
228 if self.requests_sent == 0 {
229 return 0.5; }
231 self.successes as f64 / self.requests_sent as f64
232 }
233
234 pub fn selection_rate(&self) -> f64 {
236 let elapsed = self.connected_at.elapsed();
237 if elapsed.as_secs() < 10 {
238 return 0.0; }
240 self.requests_sent as f64 / elapsed.as_secs_f64()
241 }
242
243 pub fn is_backed_off(&self) -> bool {
245 if let Some(until) = self.backed_off_until {
246 Instant::now() < until
247 } else {
248 false
249 }
250 }
251
252 pub fn backoff_remaining(&self) -> Duration {
254 if let Some(until) = self.backed_off_until {
255 let now = Instant::now();
256 if now < until {
257 return until - now;
258 }
259 }
260 Duration::ZERO
261 }
262
263 pub fn record_request(&mut self, bytes: u64) {
265 self.requests_sent += 1;
266 self.bytes_sent += bytes;
267 }
268
269 pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
272 self.successes += 1;
273 self.bytes_received += bytes;
274 self.last_success = Some(Instant::now());
275 self.consecutive_rto_backoffs = 0;
276
277 self.backed_off_until = None;
279 self.backoff_level = 0;
280
281 let rtt = rtt_ms as f64;
283 if self.srtt_ms == 0.0 {
284 self.srtt_ms = rtt;
286 self.rttvar_ms = rtt / 2.0;
287 } else {
288 self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
293 self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
294 }
295
296 let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
298 self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
299 }
300
301 pub fn record_timeout(&mut self) {
303 self.timeouts += 1;
304 self.last_failure = Some(Instant::now());
305
306 self.apply_backoff();
308
309 if self.consecutive_rto_backoffs < 5 {
311 self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
312 self.consecutive_rto_backoffs += 1;
313 }
314 }
315
316 pub fn record_failure(&mut self) {
318 self.failures += 1;
319 self.last_failure = Some(Instant::now());
320 self.apply_backoff();
321 }
322
323 pub fn record_cashu_payment(&mut self, amount_sat: u64) {
325 if amount_sat == 0 {
326 return;
327 }
328 self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
329 }
330
331 pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
333 if amount_sat == 0 {
334 return;
335 }
336 self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
337 self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
338 }
339
340 pub fn record_cashu_payment_default(&mut self) {
342 self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
343 self.last_failure = Some(Instant::now());
344 self.apply_backoff();
345 }
346
347 fn apply_backoff(&mut self) {
349 self.backoff_level += 1;
350 let backoff_ms = compute_backoff_ms(self.backoff_level);
351 self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
352 }
353
354 pub fn score(&self) -> f64 {
357 let success_score = self.success_rate();
359
360 let rtt_score = if self.srtt_ms <= 0.0 {
363 0.5 } else {
365 (500.0 / (self.srtt_ms + 50.0)).min(1.0)
366 };
367
368 let recency_bonus = if let Some(last) = self.last_success {
370 let secs_ago = last.elapsed().as_secs_f64();
371 if secs_ago < 60.0 {
372 0.1 } else {
374 0.0
375 }
376 } else {
377 0.0
378 };
379
380 0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
383 }
384
385 pub fn utility_score(&self, total_requests: u64) -> f64 {
393 let good = self.successes as f64 + 1.0;
394 let bad = (self.failures + self.timeouts) as f64 + 1.0;
395 let ratio = good / bad;
396 let ratio_score = ratio / (1.0 + ratio);
397
398 let latency_score = if self.srtt_ms <= 0.0 {
399 0.5
400 } else {
401 (300.0 / (self.srtt_ms + 50.0)).min(1.0)
402 };
403
404 let efficiency_score = if self.bytes_sent == 0 {
405 0.5
406 } else {
407 (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
408 };
409
410 let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
411
412 let uncertainty =
413 (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
414 let exploration_bonus = 0.20 * uncertainty;
415
416 exploitation + exploration_bonus
417 }
418
419 pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
425 let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
427
428 let reciprocity_raw = if self.bytes_sent == 0 {
431 1.0
432 } else {
433 self.bytes_received as f64 / self.bytes_sent as f64
434 };
435 let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
436 let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
437 let reciprocity =
438 (1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
439
440 let rtt_score = if self.srtt_ms <= 0.0 {
441 0.5
442 } else {
443 (400.0 / (self.srtt_ms + 50.0)).min(1.0)
444 };
445
446 let timeout_rate = if self.requests_sent == 0 {
447 0.0
448 } else {
449 self.timeouts as f64 / self.requests_sent as f64
450 };
451 let failure_rate = if self.requests_sent == 0 {
452 0.0
453 } else {
454 self.failures as f64 / self.requests_sent as f64
455 };
456 let retaliation_penalty =
457 (0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
458 .min(0.95);
459
460 let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
461 let exploration = 0.03
462 * (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
463
464 (cooperative + exploration - retaliation_penalty).max(0.0)
465 }
466
467 pub fn cashu_priority_boost(&self) -> f64 {
469 if self.cashu_paid_sat == 0 {
470 return 0.0;
471 }
472 let paid = self.cashu_paid_sat as f64;
473 paid / (paid + 32.0)
474 }
475
476 pub fn payment_reliability_multiplier(&self) -> f64 {
479 if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
480 return 1.0;
481 }
482 (self.cashu_payment_receipts as f64 + 1.0)
483 / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
484 }
485
486 pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
487 threshold > 0 && self.cashu_payment_defaults >= threshold
488 }
489}
490
491#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
493pub enum SelectionStrategy {
494 #[default]
496 Weighted,
497 RoundRobin,
499 Random,
501 LowestLatency,
503 HighestSuccessRate,
505 TitForTat,
507 UtilityUcb,
509}
510
511#[derive(Debug, Default)]
519pub struct PeerSelector {
520 stats: HashMap<String, PeerStats>,
522 persisted_metadata: HashMap<String, PersistedPeerMetadata>,
524 strategy: SelectionStrategy,
526 fairness_enabled: bool,
528 round_robin_idx: usize,
530 cashu_payment_weight: f64,
532}
533
534impl PeerSelector {
535 pub fn new() -> Self {
537 Self {
538 stats: HashMap::new(),
539 persisted_metadata: HashMap::new(),
540 strategy: SelectionStrategy::Weighted,
541 fairness_enabled: true,
542 round_robin_idx: 0,
543 cashu_payment_weight: 0.0,
544 }
545 }
546
547 pub fn with_strategy(strategy: SelectionStrategy) -> Self {
549 Self {
550 stats: HashMap::new(),
551 persisted_metadata: HashMap::new(),
552 strategy,
553 fairness_enabled: true,
554 round_robin_idx: 0,
555 cashu_payment_weight: 0.0,
556 }
557 }
558
559 pub fn set_fairness(&mut self, enabled: bool) {
561 self.fairness_enabled = enabled;
562 }
563
564 pub fn set_cashu_payment_weight(&mut self, weight: f64) {
567 self.cashu_payment_weight = weight.clamp(0.0, 1.0);
568 }
569
570 pub fn add_peer(&mut self, peer_id: impl Into<String>) {
572 let peer_id = peer_id.into();
573 if self.stats.contains_key(&peer_id) {
574 return;
575 }
576
577 let mut stats = PeerStats::new(peer_id.clone());
578 if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
579 saved.apply_to_stats(&mut stats);
580 }
581 self.stats.insert(peer_id, stats);
582 }
583
584 pub fn remove_peer(&mut self, peer_id: &str) {
586 if let Some(stats) = self.stats.remove(peer_id) {
587 let principal = peer_principal(&stats.peer_id).to_string();
588 self.persisted_metadata.insert(
589 principal.clone(),
590 PersistedPeerMetadata::from_stats(principal, &stats),
591 );
592 }
593 }
594
595 pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
597 self.stats.get(peer_id)
598 }
599
600 pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
602 self.stats.get_mut(peer_id)
603 }
604
605 pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
607 self.stats.values()
608 }
609
610 pub fn is_peer_backed_off(&self, peer_id: &str) -> bool {
612 self.stats
613 .get(peer_id)
614 .is_some_and(PeerStats::is_backed_off)
615 }
616
617 pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
619 if let Some(stats) = self.stats.get_mut(peer_id) {
620 stats.record_request(bytes);
621 }
622 }
623
624 pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
626 if let Some(stats) = self.stats.get_mut(peer_id) {
627 stats.record_success(rtt_ms, bytes);
628 }
629 }
630
631 pub fn record_timeout(&mut self, peer_id: &str) {
633 if let Some(stats) = self.stats.get_mut(peer_id) {
634 stats.record_timeout();
635 }
636 }
637
638 pub fn record_failure(&mut self, peer_id: &str) {
640 if let Some(stats) = self.stats.get_mut(peer_id) {
641 stats.record_failure();
642 }
643 }
644
645 pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
647 if amount_sat == 0 {
648 return;
649 }
650 let entry = self
651 .stats
652 .entry(peer_id.to_string())
653 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
654 entry.record_cashu_payment(amount_sat);
655 }
656
657 pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
659 if amount_sat == 0 {
660 return;
661 }
662 let entry = self
663 .stats
664 .entry(peer_id.to_string())
665 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
666 entry.record_cashu_receipt(amount_sat);
667 }
668
669 pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
671 let entry = self
672 .stats
673 .entry(peer_id.to_string())
674 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
675 entry.record_cashu_payment_default();
676 }
677
678 pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
679 self.stats
680 .get(peer_id)
681 .map(|stats| stats.exceeds_payment_default_threshold(threshold))
682 .unwrap_or(false)
683 }
684
685 fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
686 let reliable_base = base_score * stats.payment_reliability_multiplier();
687 if self.cashu_payment_weight <= 0.0 {
688 return reliable_base;
689 }
690 let payment_score = stats.cashu_priority_boost();
691 (1.0 - self.cashu_payment_weight) * reliable_base
692 + self.cashu_payment_weight * payment_score
693 }
694
695 fn available_peers(&self) -> Vec<String> {
697 self.stats
698 .iter()
699 .filter(|(_, s)| !s.is_backed_off())
700 .map(|(id, _)| id.clone())
701 .collect()
702 }
703
704 #[cfg(test)]
706 fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
707 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
708 self.should_skip_for_fairness_with_total(peer_id, total_rate)
709 }
710
711 fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
712 if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
713 return false;
714 }
715
716 if let Some(stats) = self.stats.get(peer_id) {
718 let peer_rate = stats.selection_rate();
719 let proportion = peer_rate / total_rate;
720 return proportion > SELECTION_PERCENTAGE_WARNING;
721 }
722
723 false
724 }
725
726 pub fn select_peers(&mut self) -> Vec<String> {
731 let available = self.available_peers();
732 if available.is_empty() {
733 let mut backed_off: Vec<_> = self
736 .stats
737 .iter()
738 .filter(|(_, s)| s.is_backed_off())
739 .map(|(id, s)| (id.clone(), s.backoff_remaining()))
740 .collect();
741 backed_off.sort_by_key(|(_, remaining)| *remaining);
742 return backed_off.into_iter().map(|(id, _)| id).collect();
743 }
744
745 let candidates: Vec<String> =
747 if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
748 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
749 available
750 .into_iter()
751 .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
752 .collect()
753 } else {
754 available
755 };
756
757 let candidates = if candidates.is_empty() {
759 self.available_peers()
760 } else {
761 candidates
762 };
763
764 let mut sorted: Vec<_> = candidates
766 .into_iter()
767 .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
768 .collect();
769
770 match self.strategy {
771 SelectionStrategy::Weighted => {
772 sorted.sort_by(|(id_a, a), (id_b, b)| {
774 let score_a = self.blend_with_payment_priority(a, a.score());
775 let score_b = self.blend_with_payment_priority(b, b.score());
776 let score_cmp = score_b
777 .partial_cmp(&score_a)
778 .unwrap_or(std::cmp::Ordering::Equal);
779 if score_cmp == std::cmp::Ordering::Equal {
780 id_a.cmp(id_b) } else {
782 score_cmp
783 }
784 });
785 }
786 SelectionStrategy::LowestLatency => {
787 sorted.sort_by(|(id_a, a), (id_b, b)| {
789 let rtt_cmp = a
790 .srtt_ms
791 .partial_cmp(&b.srtt_ms)
792 .unwrap_or(std::cmp::Ordering::Equal);
793 if rtt_cmp == std::cmp::Ordering::Equal {
794 let score_cmp = b
795 .score()
796 .partial_cmp(&a.score())
797 .unwrap_or(std::cmp::Ordering::Equal);
798 if score_cmp == std::cmp::Ordering::Equal {
799 id_a.cmp(id_b)
800 } else {
801 score_cmp
802 }
803 } else {
804 rtt_cmp
805 }
806 });
807 }
808 SelectionStrategy::HighestSuccessRate => {
809 sorted.sort_by(|(id_a, a), (id_b, b)| {
811 let rate_cmp = b
812 .success_rate()
813 .partial_cmp(&a.success_rate())
814 .unwrap_or(std::cmp::Ordering::Equal);
815 if rate_cmp == std::cmp::Ordering::Equal {
816 id_a.cmp(id_b)
817 } else {
818 rate_cmp
819 }
820 });
821 }
822 SelectionStrategy::TitForTat => {
823 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
824 sorted.sort_by(|(id_a, a), (id_b, b)| {
825 let score_a =
826 self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
827 let score_b =
828 self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
829 let score_cmp = score_b
830 .partial_cmp(&score_a)
831 .unwrap_or(std::cmp::Ordering::Equal);
832 if score_cmp == std::cmp::Ordering::Equal {
833 id_a.cmp(id_b)
834 } else {
835 score_cmp
836 }
837 });
838 }
839 SelectionStrategy::UtilityUcb => {
840 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
841 sorted.sort_by(|(id_a, a), (id_b, b)| {
842 let score_a =
843 self.blend_with_payment_priority(a, a.utility_score(total_requests));
844 let score_b =
845 self.blend_with_payment_priority(b, b.utility_score(total_requests));
846 let score_cmp = score_b
847 .partial_cmp(&score_a)
848 .unwrap_or(std::cmp::Ordering::Equal);
849 if score_cmp == std::cmp::Ordering::Equal {
850 id_a.cmp(id_b)
851 } else {
852 score_cmp
853 }
854 });
855 }
856 SelectionStrategy::RoundRobin => {
857 if !sorted.is_empty() {
859 let idx = self.round_robin_idx % sorted.len();
860 sorted.rotate_left(idx);
861 self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
862 }
863 }
864 SelectionStrategy::Random => {
865 }
868 }
869
870 sorted.into_iter().map(|(id, _)| id).collect()
871 }
872
873 pub fn select_best(&mut self) -> Option<String> {
875 self.select_peers().into_iter().next()
876 }
877
878 pub fn select_top(&mut self, n: usize) -> Vec<String> {
880 self.select_peers().into_iter().take(n).collect()
881 }
882
883 pub fn summary(&self) -> SelectorSummary {
885 let count = self.stats.len();
886 if count == 0 {
887 return SelectorSummary::default();
888 }
889
890 let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
891 let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
892 let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
893 let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
894
895 let avg_rtt = {
896 let rtts: Vec<f64> = self
897 .stats
898 .values()
899 .filter(|s| s.srtt_ms > 0.0)
900 .map(|s| s.srtt_ms)
901 .collect();
902 if rtts.is_empty() {
903 0.0
904 } else {
905 rtts.iter().sum::<f64>() / rtts.len() as f64
906 }
907 };
908
909 SelectorSummary {
910 peer_count: count,
911 total_requests,
912 total_successes,
913 total_timeouts,
914 backed_off_count: backed_off,
915 avg_rtt_ms: avg_rtt,
916 overall_success_rate: if total_requests > 0 {
917 total_successes as f64 / total_requests as f64
918 } else {
919 0.0
920 },
921 }
922 }
923
924 pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
926 let mut by_principal = self.persisted_metadata.clone();
927 for stats in self.stats.values() {
928 let principal = peer_principal(&stats.peer_id).to_string();
929 by_principal.insert(
930 principal.clone(),
931 PersistedPeerMetadata::from_stats(principal, stats),
932 );
933 }
934
935 let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
936 peers.sort_by(|a, b| a.principal.cmp(&b.principal));
937
938 PeerMetadataSnapshot {
939 version: PEER_METADATA_SNAPSHOT_VERSION,
940 peers,
941 }
942 }
943
944 pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
946 if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
947 return;
948 }
949
950 self.persisted_metadata.clear();
951 for peer in &snapshot.peers {
952 self.persisted_metadata
953 .insert(peer.principal.clone(), peer.clone());
954 }
955
956 for stats in self.stats.values_mut() {
957 if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
958 saved.apply_to_stats(stats);
959 }
960 }
961 }
962}
963
964#[derive(Debug, Clone, Default)]
966pub struct SelectorSummary {
967 pub peer_count: usize,
968 pub total_requests: u64,
969 pub total_successes: u64,
970 pub total_timeouts: u64,
971 pub backed_off_count: usize,
972 pub avg_rtt_ms: f64,
973 pub overall_success_rate: f64,
974}
975
976#[cfg(test)]
977mod tests {
978 use super::*;
979 use std::thread::sleep;
980
981 #[test]
982 fn test_peer_stats_success_rate() {
983 let mut stats = PeerStats::new("peer1");
984 assert_eq!(stats.success_rate(), 0.5); stats.record_request(40);
987 stats.record_success(50, 1024);
988 assert_eq!(stats.success_rate(), 1.0);
989
990 stats.record_request(40);
991 stats.record_timeout();
992 assert_eq!(stats.success_rate(), 0.5);
993 }
994
995 #[test]
996 fn test_peer_stats_rtt_calculation() {
997 let mut stats = PeerStats::new("peer1");
998
999 stats.record_request(40);
1001 stats.record_success(100, 1024);
1002 assert_eq!(stats.srtt_ms, 100.0);
1003 assert_eq!(stats.rttvar_ms, 50.0); stats.record_request(40);
1007 stats.record_success(80, 1024);
1008 assert!((stats.srtt_ms - 97.5).abs() < 0.1);
1010 }
1011
1012 #[test]
1013 fn test_peer_stats_backoff() {
1014 let mut stats = PeerStats::new("peer1");
1015 assert!(!stats.is_backed_off());
1016
1017 stats.record_timeout();
1018 assert!(stats.is_backed_off());
1019 assert!(stats.backoff_remaining() > Duration::ZERO);
1020 }
1021
1022 #[test]
1023 fn test_peer_stats_backoff_clears_on_success() {
1024 let mut stats = PeerStats::new("peer1");
1025 stats.record_timeout();
1026 assert!(stats.is_backed_off());
1027
1028 stats.record_success(50, 1024);
1029 assert!(!stats.is_backed_off());
1030 assert_eq!(stats.backoff_level, 0);
1031 }
1032
1033 #[test]
1034 fn test_peer_stats_backoff_saturates_without_overflow() {
1035 let mut stats = PeerStats::new("peer1");
1036
1037 for _ in 0..128 {
1038 stats.record_failure();
1039 }
1040
1041 assert_eq!(compute_backoff_ms(stats.backoff_level), MAX_BACKOFF_MS);
1042 assert!(stats.is_backed_off());
1043 }
1044
1045 #[test]
1046 fn test_peer_selector_add_remove() {
1047 let mut selector = PeerSelector::new();
1048 selector.add_peer("peer1");
1049 selector.add_peer("peer2");
1050 assert!(selector.get_stats("peer1").is_some());
1051 assert!(selector.get_stats("peer2").is_some());
1052
1053 selector.remove_peer("peer1");
1054 assert!(selector.get_stats("peer1").is_none());
1055 assert!(selector.get_stats("peer2").is_some());
1056 }
1057
1058 #[test]
1059 fn test_peer_selector_weighted_selection() {
1060 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1061 selector.add_peer("peer1");
1062 selector.add_peer("peer2");
1063 selector.add_peer("peer3");
1064
1065 selector.record_request("peer1", 40);
1067 selector.record_success("peer1", 20, 1024);
1068 selector.record_request("peer1", 40);
1069 selector.record_success("peer1", 25, 1024);
1070
1071 selector.record_request("peer2", 40);
1073 selector.record_success("peer2", 100, 1024);
1074 selector.record_request("peer2", 40);
1075 selector.record_timeout("peer2");
1076
1077 selector.record_request("peer3", 40);
1079 selector.record_timeout("peer3");
1080 selector.record_request("peer3", 40);
1081 selector.record_timeout("peer3");
1082
1083 let peers = selector.select_peers();
1085 assert_eq!(peers[0], "peer1");
1087 }
1088
1089 #[test]
1090 fn test_peer_selector_backed_off_peers() {
1091 let mut selector = PeerSelector::new();
1092 selector.add_peer("peer1");
1093 selector.add_peer("peer2");
1094
1095 selector.record_timeout("peer1");
1097 assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1098
1099 let peers = selector.select_peers();
1101 assert_eq!(peers.len(), 1);
1102 assert_eq!(peers[0], "peer2");
1103 }
1104
1105 #[test]
1106 fn test_peer_selector_all_backed_off_fallback() {
1107 let mut selector = PeerSelector::new();
1108 selector.add_peer("peer1");
1109 selector.add_peer("peer2");
1110
1111 selector.record_timeout("peer1");
1113 selector.record_timeout("peer2");
1114
1115 let peers = selector.select_peers();
1117 assert_eq!(peers.len(), 2);
1118 }
1119
1120 #[test]
1121 fn test_peer_selector_fairness() {
1122 let mut selector = PeerSelector::new();
1123 selector.set_fairness(true);
1124
1125 for i in 1..=6 {
1127 selector.add_peer(format!("peer{}", i));
1128 }
1129
1130 sleep(Duration::from_millis(15));
1132
1133 for _ in 0..100 {
1134 selector.record_request("peer1", 40);
1135 selector.record_success("peer1", 10, 100);
1136 }
1137
1138 for i in 2..=6 {
1140 selector.record_request(&format!("peer{}", i), 40);
1141 selector.record_success(&format!("peer{}", i), 10, 100);
1142 }
1143
1144 let skipped = selector.should_skip_for_fairness("peer1");
1146 let _ = skipped; }
1148
1149 #[test]
1150 fn test_peer_selector_summary() {
1151 let mut selector = PeerSelector::new();
1152 selector.add_peer("peer1");
1153 selector.add_peer("peer2");
1154
1155 selector.record_request("peer1", 40);
1156 selector.record_success("peer1", 50, 1024);
1157 selector.record_request("peer2", 40);
1158 selector.record_timeout("peer2");
1159
1160 let summary = selector.summary();
1161 assert_eq!(summary.peer_count, 2);
1162 assert_eq!(summary.total_requests, 2);
1163 assert_eq!(summary.total_successes, 1);
1164 assert_eq!(summary.total_timeouts, 1);
1165 assert_eq!(summary.backed_off_count, 1);
1166 assert_eq!(summary.overall_success_rate, 0.5);
1167 }
1168
1169 #[test]
1170 fn test_peer_stats_score() {
1171 let mut stats = PeerStats::new("peer1");
1172
1173 let initial_score = stats.score();
1175 assert!(initial_score > 0.3 && initial_score < 0.7);
1176
1177 for _ in 0..10 {
1179 stats.record_request(40);
1180 stats.record_success(20, 1024);
1181 }
1182 let good_score = stats.score();
1183 assert!(good_score > 0.8);
1184
1185 let mut bad_stats = PeerStats::new("peer2");
1187 for _ in 0..10 {
1188 bad_stats.record_request(40);
1189 bad_stats.record_timeout();
1190 }
1191 let bad_score = bad_stats.score();
1192 assert!(bad_score < 0.3);
1193
1194 assert!(good_score > bad_score);
1195 }
1196
1197 #[test]
1198 fn test_peer_stats_utility_score_prefers_good_over_bad() {
1199 let mut good = PeerStats::new("good");
1200 good.requests_sent = 120;
1201 good.successes = 96;
1202 good.failures = 8;
1203 good.timeouts = 4;
1204 good.srtt_ms = 30.0;
1205 good.bytes_sent = 120 * 40;
1206 good.bytes_received = 96 * 1024;
1207
1208 let mut bad = PeerStats::new("bad");
1209 bad.requests_sent = 120;
1210 bad.successes = 40;
1211 bad.failures = 50;
1212 bad.timeouts = 30;
1213 bad.srtt_ms = 220.0;
1214 bad.bytes_sent = 120 * 40;
1215 bad.bytes_received = 40 * 1024;
1216
1217 let total_requests = good.requests_sent + bad.requests_sent;
1218 assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1219 }
1220
1221 #[test]
1222 fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
1223 let mut reciprocal = PeerStats::new("reciprocal");
1224 reciprocal.requests_sent = 100;
1225 reciprocal.successes = 90;
1226 reciprocal.failures = 5;
1227 reciprocal.timeouts = 5;
1228 reciprocal.srtt_ms = 40.0;
1229 reciprocal.bytes_sent = 100 * 40;
1230 reciprocal.bytes_received = 90 * 1024;
1231
1232 let mut leecher = PeerStats::new("leecher");
1233 leecher.requests_sent = 100;
1234 leecher.successes = 40;
1235 leecher.failures = 30;
1236 leecher.timeouts = 30;
1237 leecher.srtt_ms = 120.0;
1238 leecher.bytes_sent = 100 * 40;
1239 leecher.bytes_received = 10 * 1024;
1240
1241 let total_requests = reciprocal.requests_sent + leecher.requests_sent;
1242 assert!(
1243 reciprocal.tit_for_tat_score(total_requests)
1244 > leecher.tit_for_tat_score(total_requests)
1245 );
1246 }
1247
1248 #[test]
1249 fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1250 let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1251 selector.add_peer("stable");
1252 selector.add_peer("new");
1253
1254 {
1255 let stable = selector.get_stats_mut("stable").unwrap();
1256 stable.requests_sent = 500;
1257 stable.successes = 450;
1258 stable.failures = 35;
1259 stable.timeouts = 15;
1260 stable.srtt_ms = 35.0;
1261 stable.bytes_sent = 500 * 40;
1262 stable.bytes_received = 450 * 1024;
1263 }
1264 {
1265 let new_peer = selector.get_stats_mut("new").unwrap();
1266 new_peer.requests_sent = 2;
1267 new_peer.successes = 2;
1268 new_peer.failures = 0;
1269 new_peer.timeouts = 0;
1270 new_peer.srtt_ms = 70.0;
1271 new_peer.bytes_sent = 2 * 40;
1272 new_peer.bytes_received = 2 * 1024;
1273 }
1274
1275 let peers = selector.select_peers();
1276 assert_eq!(peers[0], "new");
1277 }
1278
1279 #[test]
1280 fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
1281 let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
1282 selector.add_peer("reciprocal");
1283 selector.add_peer("leecher");
1284
1285 {
1286 let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
1287 reciprocal.requests_sent = 120;
1288 reciprocal.successes = 102;
1289 reciprocal.failures = 8;
1290 reciprocal.timeouts = 10;
1291 reciprocal.srtt_ms = 45.0;
1292 reciprocal.bytes_sent = 120 * 40;
1293 reciprocal.bytes_received = 102 * 1024;
1294 }
1295 {
1296 let leecher = selector.get_stats_mut("leecher").unwrap();
1297 leecher.requests_sent = 120;
1298 leecher.successes = 70;
1299 leecher.failures = 20;
1300 leecher.timeouts = 30;
1301 leecher.srtt_ms = 35.0;
1302 leecher.bytes_sent = 120 * 40;
1303 leecher.bytes_received = 8 * 1024;
1304 }
1305
1306 let peers = selector.select_peers();
1307 assert_eq!(peers[0], "reciprocal");
1308 }
1309
1310 #[test]
1311 fn test_lowest_latency_strategy() {
1312 let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1313 selector.add_peer("peer1");
1314 selector.add_peer("peer2");
1315 selector.add_peer("peer3");
1316
1317 selector.record_request("peer1", 40);
1319 selector.record_success("peer1", 100, 1024);
1320
1321 selector.record_request("peer2", 40);
1323 selector.record_success("peer2", 20, 1024);
1324
1325 selector.record_request("peer3", 40);
1327 selector.record_success("peer3", 50, 1024);
1328
1329 let peers = selector.select_peers();
1330 assert_eq!(peers[0], "peer2");
1332 }
1333
1334 fn build_cashu_priority_fixture() -> PeerSelector {
1335 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1336 selector.add_peer("reliable");
1337 selector.add_peer("paid");
1338
1339 {
1340 let reliable = selector.get_stats_mut("reliable").expect("reliable");
1341 reliable.requests_sent = 80;
1342 reliable.successes = 75;
1343 reliable.failures = 2;
1344 reliable.timeouts = 3;
1345 reliable.srtt_ms = 40.0;
1346 reliable.bytes_sent = 80 * 40;
1347 reliable.bytes_received = 75 * 1024;
1348 }
1349 {
1350 let paid = selector.get_stats_mut("paid").expect("paid");
1351 paid.requests_sent = 80;
1352 paid.successes = 36;
1353 paid.failures = 24;
1354 paid.timeouts = 20;
1355 paid.srtt_ms = 700.0;
1356 paid.bytes_sent = 80 * 40;
1357 paid.bytes_received = 36 * 512;
1358 }
1359
1360 selector
1361 }
1362
1363 #[test]
1364 fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1365 let mut selector = build_cashu_priority_fixture();
1366 selector.set_cashu_payment_weight(0.0);
1367 selector.record_cashu_payment("paid", 5_000);
1368
1369 let peers = selector.select_peers();
1370 assert_eq!(peers[0], "reliable");
1371 }
1372
1373 #[test]
1374 fn test_cashu_payment_weight_prioritizes_paid_peer() {
1375 let mut selector = build_cashu_priority_fixture();
1376 selector.set_cashu_payment_weight(0.8);
1377 selector.record_cashu_payment("paid", 5_000);
1378
1379 let peers = selector.select_peers();
1380 assert_eq!(peers[0], "paid");
1381 }
1382
1383 #[test]
1384 fn test_cashu_payment_default_downranks_peer() {
1385 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1386 selector.add_peer("honest");
1387 selector.add_peer("delinquent");
1388
1389 for peer_id in ["honest", "delinquent"] {
1390 let stats = selector.get_stats_mut(peer_id).expect("stats");
1391 stats.requests_sent = 40;
1392 stats.successes = 34;
1393 stats.failures = 3;
1394 stats.timeouts = 3;
1395 stats.srtt_ms = 60.0;
1396 stats.bytes_sent = 40 * 40;
1397 stats.bytes_received = 34 * 1024;
1398 }
1399
1400 selector.record_cashu_payment_default("delinquent");
1401
1402 let peers = selector.select_peers();
1403 assert_eq!(peers[0], "honest");
1404 assert!(!peers.iter().any(|peer| peer == "delinquent"));
1405 }
1406
1407 #[test]
1408 fn test_payment_default_threshold_blocks_peer() {
1409 let mut selector = PeerSelector::new();
1410 selector.record_cashu_payment_default("peer-a");
1411 assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1412 assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1413 }
1414
1415 #[test]
1416 fn test_peer_principal_matches_peer_id() {
1417 assert_eq!(peer_principal("npub1abc"), "npub1abc");
1418 assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1419 }
1420
1421 #[test]
1422 fn test_metadata_snapshot_restores_for_same_peer_id() {
1423 let mut selector = PeerSelector::new();
1424 selector.add_peer("npub1stable");
1425 selector.record_request("npub1stable", 64);
1426 selector.record_success("npub1stable", 32, 1024);
1427 selector.record_cashu_payment("npub1stable", 77);
1428 selector.record_cashu_receipt("npub1stable", 33);
1429 selector.record_cashu_payment_default("npub1stable");
1430
1431 let snapshot = selector.export_peer_metadata_snapshot();
1432 assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1433 assert_eq!(snapshot.peers.len(), 1);
1434 assert_eq!(snapshot.peers[0].principal, "npub1stable");
1435
1436 let mut restored = PeerSelector::new();
1437 restored.import_peer_metadata_snapshot(&snapshot);
1438 restored.add_peer("npub1stable");
1439 let stats = restored.get_stats("npub1stable").expect("restored stats");
1440 assert_eq!(stats.requests_sent, 1);
1441 assert_eq!(stats.successes, 1);
1442 assert_eq!(stats.cashu_paid_sat, 77);
1443 assert_eq!(stats.cashu_received_sat, 33);
1444 assert_eq!(stats.cashu_payment_receipts, 1);
1445 assert_eq!(stats.cashu_payment_defaults, 1);
1446 }
1447}