1use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; const SELECTION_MIN_PEERS: usize = 5; const INITIAL_BACKOFF_MS: u64 = 1000; const BACKOFF_MULTIPLIER: u64 = 2; const MAX_BACKOFF_MS: u64 = 480_000; const MIN_RTO_MS: u64 = 50; const MAX_RTO_MS: u64 = 60_000; const INITIAL_RTO_MS: u64 = 1000; pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
36pub struct PersistedPeerMetadata {
37 pub principal: String,
39 pub requests_sent: u64,
40 pub successes: u64,
41 pub timeouts: u64,
42 pub failures: u64,
43 pub srtt_ms: f64,
44 pub rttvar_ms: f64,
45 pub rto_ms: u64,
46 pub bytes_received: u64,
47 pub bytes_sent: u64,
48 pub cashu_paid_sat: u64,
49 pub cashu_received_sat: u64,
50 pub cashu_payment_receipts: u64,
51 pub cashu_payment_defaults: u64,
52}
53
54impl PersistedPeerMetadata {
55 fn from_stats(principal: String, stats: &PeerStats) -> Self {
56 Self {
57 principal,
58 requests_sent: stats.requests_sent,
59 successes: stats.successes,
60 timeouts: stats.timeouts,
61 failures: stats.failures,
62 srtt_ms: sanitize_latency(stats.srtt_ms),
63 rttvar_ms: sanitize_latency(stats.rttvar_ms),
64 rto_ms: clamp_rto(stats.rto_ms),
65 bytes_received: stats.bytes_received,
66 bytes_sent: stats.bytes_sent,
67 cashu_paid_sat: stats.cashu_paid_sat,
68 cashu_received_sat: stats.cashu_received_sat,
69 cashu_payment_receipts: stats.cashu_payment_receipts,
70 cashu_payment_defaults: stats.cashu_payment_defaults,
71 }
72 }
73
74 fn apply_to_stats(&self, stats: &mut PeerStats) {
75 stats.requests_sent = self.requests_sent;
76 stats.successes = self.successes;
77 stats.timeouts = self.timeouts;
78 stats.failures = self.failures;
79 stats.srtt_ms = sanitize_latency(self.srtt_ms);
80 stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
81 stats.rto_ms = clamp_rto(self.rto_ms);
82 stats.bytes_received = self.bytes_received;
83 stats.bytes_sent = self.bytes_sent;
84 stats.cashu_paid_sat = self.cashu_paid_sat;
85 stats.cashu_received_sat = self.cashu_received_sat;
86 stats.cashu_payment_receipts = self.cashu_payment_receipts;
87 stats.cashu_payment_defaults = self.cashu_payment_defaults;
88
89 stats.backoff_level = 0;
91 stats.backed_off_until = None;
92 stats.last_success = None;
93 stats.last_failure = None;
94 stats.consecutive_rto_backoffs = 0;
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct PeerMetadataSnapshot {
101 pub version: u32,
102 pub peers: Vec<PersistedPeerMetadata>,
103}
104
105impl Default for PeerMetadataSnapshot {
106 fn default() -> Self {
107 Self {
108 version: PEER_METADATA_SNAPSHOT_VERSION,
109 peers: Vec::new(),
110 }
111 }
112}
113
114fn sanitize_latency(value: f64) -> f64 {
115 if value.is_finite() && value >= 0.0 {
116 value
117 } else {
118 0.0
119 }
120}
121
122fn clamp_rto(rto_ms: u64) -> u64 {
123 if rto_ms == 0 {
124 INITIAL_RTO_MS
125 } else {
126 rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
127 }
128}
129
130pub fn peer_principal(peer_id: &str) -> &str {
135 peer_id
136 .split_once(':')
137 .map(|(principal, _)| principal)
138 .unwrap_or(peer_id)
139}
140
141#[derive(Debug, Clone)]
143pub struct PeerStats {
144 pub peer_id: String,
146 pub connected_at: Instant,
148 pub requests_sent: u64,
150 pub successes: u64,
152 pub timeouts: u64,
154 pub failures: u64,
156 pub srtt_ms: f64,
158 pub rttvar_ms: f64,
160 pub rto_ms: u64,
162 pub consecutive_rto_backoffs: u32,
164 pub backoff_level: u32,
166 pub backed_off_until: Option<Instant>,
168 pub last_success: Option<Instant>,
170 pub last_failure: Option<Instant>,
172 pub bytes_received: u64,
174 pub bytes_sent: u64,
176 pub cashu_paid_sat: u64,
178 pub cashu_received_sat: u64,
180 pub cashu_payment_receipts: u64,
182 pub cashu_payment_defaults: u64,
184}
185
186impl PeerStats {
187 pub fn new(peer_id: impl Into<String>) -> Self {
189 Self {
190 peer_id: peer_id.into(),
191 connected_at: Instant::now(),
192 requests_sent: 0,
193 successes: 0,
194 timeouts: 0,
195 failures: 0,
196 srtt_ms: 0.0,
197 rttvar_ms: 0.0,
198 rto_ms: INITIAL_RTO_MS,
199 consecutive_rto_backoffs: 0,
200 backoff_level: 0,
201 backed_off_until: None,
202 last_success: None,
203 last_failure: None,
204 bytes_received: 0,
205 bytes_sent: 0,
206 cashu_paid_sat: 0,
207 cashu_received_sat: 0,
208 cashu_payment_receipts: 0,
209 cashu_payment_defaults: 0,
210 }
211 }
212
213 pub fn success_rate(&self) -> f64 {
215 if self.requests_sent == 0 {
216 return 0.5; }
218 self.successes as f64 / self.requests_sent as f64
219 }
220
221 pub fn selection_rate(&self) -> f64 {
223 let elapsed = self.connected_at.elapsed();
224 if elapsed.as_secs() < 10 {
225 return 0.0; }
227 self.requests_sent as f64 / elapsed.as_secs_f64()
228 }
229
230 pub fn is_backed_off(&self) -> bool {
232 if let Some(until) = self.backed_off_until {
233 Instant::now() < until
234 } else {
235 false
236 }
237 }
238
239 pub fn backoff_remaining(&self) -> Duration {
241 if let Some(until) = self.backed_off_until {
242 let now = Instant::now();
243 if now < until {
244 return until - now;
245 }
246 }
247 Duration::ZERO
248 }
249
250 pub fn record_request(&mut self, bytes: u64) {
252 self.requests_sent += 1;
253 self.bytes_sent += bytes;
254 }
255
256 pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
259 self.successes += 1;
260 self.bytes_received += bytes;
261 self.last_success = Some(Instant::now());
262 self.consecutive_rto_backoffs = 0;
263
264 self.backed_off_until = None;
266 self.backoff_level = 0;
267
268 let rtt = rtt_ms as f64;
270 if self.srtt_ms == 0.0 {
271 self.srtt_ms = rtt;
273 self.rttvar_ms = rtt / 2.0;
274 } else {
275 self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
280 self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
281 }
282
283 let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
285 self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
286 }
287
288 pub fn record_timeout(&mut self) {
290 self.timeouts += 1;
291 self.last_failure = Some(Instant::now());
292
293 self.apply_backoff();
295
296 if self.consecutive_rto_backoffs < 5 {
298 self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
299 self.consecutive_rto_backoffs += 1;
300 }
301 }
302
303 pub fn record_failure(&mut self) {
305 self.failures += 1;
306 self.last_failure = Some(Instant::now());
307 self.apply_backoff();
308 }
309
310 pub fn record_cashu_payment(&mut self, amount_sat: u64) {
312 if amount_sat == 0 {
313 return;
314 }
315 self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
316 }
317
318 pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
320 if amount_sat == 0 {
321 return;
322 }
323 self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
324 self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
325 }
326
327 pub fn record_cashu_payment_default(&mut self) {
329 self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
330 self.last_failure = Some(Instant::now());
331 self.apply_backoff();
332 }
333
334 fn apply_backoff(&mut self) {
336 self.backoff_level += 1;
337 let backoff_ms = (INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(self.backoff_level - 1))
338 .min(MAX_BACKOFF_MS);
339 self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
340 }
341
342 pub fn score(&self) -> f64 {
345 let success_score = self.success_rate();
347
348 let rtt_score = if self.srtt_ms <= 0.0 {
351 0.5 } else {
353 (500.0 / (self.srtt_ms + 50.0)).min(1.0)
354 };
355
356 let recency_bonus = if let Some(last) = self.last_success {
358 let secs_ago = last.elapsed().as_secs_f64();
359 if secs_ago < 60.0 {
360 0.1 } else {
362 0.0
363 }
364 } else {
365 0.0
366 };
367
368 0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
371 }
372
373 pub fn utility_score(&self, total_requests: u64) -> f64 {
381 let good = self.successes as f64 + 1.0;
382 let bad = (self.failures + self.timeouts) as f64 + 1.0;
383 let ratio = good / bad;
384 let ratio_score = ratio / (1.0 + ratio);
385
386 let latency_score = if self.srtt_ms <= 0.0 {
387 0.5
388 } else {
389 (300.0 / (self.srtt_ms + 50.0)).min(1.0)
390 };
391
392 let efficiency_score = if self.bytes_sent == 0 {
393 0.5
394 } else {
395 (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
396 };
397
398 let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
399
400 let uncertainty =
401 (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
402 let exploration_bonus = 0.20 * uncertainty;
403
404 exploitation + exploration_bonus
405 }
406
407 pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
413 let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
415
416 let reciprocity_raw = if self.bytes_sent == 0 {
419 1.0
420 } else {
421 self.bytes_received as f64 / self.bytes_sent as f64
422 };
423 let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
424 let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
425 let reciprocity =
426 (1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
427
428 let rtt_score = if self.srtt_ms <= 0.0 {
429 0.5
430 } else {
431 (400.0 / (self.srtt_ms + 50.0)).min(1.0)
432 };
433
434 let timeout_rate = if self.requests_sent == 0 {
435 0.0
436 } else {
437 self.timeouts as f64 / self.requests_sent as f64
438 };
439 let failure_rate = if self.requests_sent == 0 {
440 0.0
441 } else {
442 self.failures as f64 / self.requests_sent as f64
443 };
444 let retaliation_penalty =
445 (0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
446 .min(0.95);
447
448 let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
449 let exploration = 0.03
450 * (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
451
452 (cooperative + exploration - retaliation_penalty).max(0.0)
453 }
454
455 pub fn cashu_priority_boost(&self) -> f64 {
457 if self.cashu_paid_sat == 0 {
458 return 0.0;
459 }
460 let paid = self.cashu_paid_sat as f64;
461 paid / (paid + 32.0)
462 }
463
464 pub fn payment_reliability_multiplier(&self) -> f64 {
467 if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
468 return 1.0;
469 }
470 (self.cashu_payment_receipts as f64 + 1.0)
471 / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
472 }
473
474 pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
475 threshold > 0 && self.cashu_payment_defaults >= threshold
476 }
477}
478
479#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
481pub enum SelectionStrategy {
482 #[default]
484 Weighted,
485 RoundRobin,
487 Random,
489 LowestLatency,
491 HighestSuccessRate,
493 TitForTat,
495 UtilityUcb,
497}
498
499#[derive(Debug, Default)]
507pub struct PeerSelector {
508 stats: HashMap<String, PeerStats>,
510 persisted_metadata: HashMap<String, PersistedPeerMetadata>,
512 strategy: SelectionStrategy,
514 fairness_enabled: bool,
516 round_robin_idx: usize,
518 cashu_payment_weight: f64,
520}
521
522impl PeerSelector {
523 pub fn new() -> Self {
525 Self {
526 stats: HashMap::new(),
527 persisted_metadata: HashMap::new(),
528 strategy: SelectionStrategy::Weighted,
529 fairness_enabled: true,
530 round_robin_idx: 0,
531 cashu_payment_weight: 0.0,
532 }
533 }
534
535 pub fn with_strategy(strategy: SelectionStrategy) -> Self {
537 Self {
538 stats: HashMap::new(),
539 persisted_metadata: HashMap::new(),
540 strategy,
541 fairness_enabled: true,
542 round_robin_idx: 0,
543 cashu_payment_weight: 0.0,
544 }
545 }
546
547 pub fn set_fairness(&mut self, enabled: bool) {
549 self.fairness_enabled = enabled;
550 }
551
552 pub fn set_cashu_payment_weight(&mut self, weight: f64) {
555 self.cashu_payment_weight = weight.clamp(0.0, 1.0);
556 }
557
558 pub fn add_peer(&mut self, peer_id: impl Into<String>) {
560 let peer_id = peer_id.into();
561 if self.stats.contains_key(&peer_id) {
562 return;
563 }
564
565 let mut stats = PeerStats::new(peer_id.clone());
566 if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
567 saved.apply_to_stats(&mut stats);
568 }
569 self.stats.insert(peer_id, stats);
570 }
571
572 pub fn remove_peer(&mut self, peer_id: &str) {
574 if let Some(stats) = self.stats.remove(peer_id) {
575 let principal = peer_principal(&stats.peer_id).to_string();
576 self.persisted_metadata.insert(
577 principal.clone(),
578 PersistedPeerMetadata::from_stats(principal, &stats),
579 );
580 }
581 }
582
583 pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
585 self.stats.get(peer_id)
586 }
587
588 pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
590 self.stats.get_mut(peer_id)
591 }
592
593 pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
595 self.stats.values()
596 }
597
598 pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
600 if let Some(stats) = self.stats.get_mut(peer_id) {
601 stats.record_request(bytes);
602 }
603 }
604
605 pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
607 if let Some(stats) = self.stats.get_mut(peer_id) {
608 stats.record_success(rtt_ms, bytes);
609 }
610 }
611
612 pub fn record_timeout(&mut self, peer_id: &str) {
614 if let Some(stats) = self.stats.get_mut(peer_id) {
615 stats.record_timeout();
616 }
617 }
618
619 pub fn record_failure(&mut self, peer_id: &str) {
621 if let Some(stats) = self.stats.get_mut(peer_id) {
622 stats.record_failure();
623 }
624 }
625
626 pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
628 if amount_sat == 0 {
629 return;
630 }
631 let entry = self
632 .stats
633 .entry(peer_id.to_string())
634 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
635 entry.record_cashu_payment(amount_sat);
636 }
637
638 pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
640 if amount_sat == 0 {
641 return;
642 }
643 let entry = self
644 .stats
645 .entry(peer_id.to_string())
646 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
647 entry.record_cashu_receipt(amount_sat);
648 }
649
650 pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
652 let entry = self
653 .stats
654 .entry(peer_id.to_string())
655 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
656 entry.record_cashu_payment_default();
657 }
658
659 pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
660 self.stats
661 .get(peer_id)
662 .map(|stats| stats.exceeds_payment_default_threshold(threshold))
663 .unwrap_or(false)
664 }
665
666 fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
667 let reliable_base = base_score * stats.payment_reliability_multiplier();
668 if self.cashu_payment_weight <= 0.0 {
669 return reliable_base;
670 }
671 let payment_score = stats.cashu_priority_boost();
672 (1.0 - self.cashu_payment_weight) * reliable_base
673 + self.cashu_payment_weight * payment_score
674 }
675
676 fn available_peers(&self) -> Vec<String> {
678 self.stats
679 .iter()
680 .filter(|(_, s)| !s.is_backed_off())
681 .map(|(id, _)| id.clone())
682 .collect()
683 }
684
685 #[cfg(test)]
687 fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
688 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
689 self.should_skip_for_fairness_with_total(peer_id, total_rate)
690 }
691
692 fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
693 if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
694 return false;
695 }
696
697 if let Some(stats) = self.stats.get(peer_id) {
699 let peer_rate = stats.selection_rate();
700 let proportion = peer_rate / total_rate;
701 return proportion > SELECTION_PERCENTAGE_WARNING;
702 }
703
704 false
705 }
706
707 pub fn select_peers(&mut self) -> Vec<String> {
712 let available = self.available_peers();
713 if available.is_empty() {
714 let mut backed_off: Vec<_> = self
717 .stats
718 .iter()
719 .filter(|(_, s)| s.is_backed_off())
720 .map(|(id, s)| (id.clone(), s.backoff_remaining()))
721 .collect();
722 backed_off.sort_by_key(|(_, remaining)| *remaining);
723 return backed_off.into_iter().map(|(id, _)| id).collect();
724 }
725
726 let candidates: Vec<String> =
728 if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
729 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
730 available
731 .into_iter()
732 .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
733 .collect()
734 } else {
735 available
736 };
737
738 let candidates = if candidates.is_empty() {
740 self.available_peers()
741 } else {
742 candidates
743 };
744
745 let mut sorted: Vec<_> = candidates
747 .into_iter()
748 .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
749 .collect();
750
751 match self.strategy {
752 SelectionStrategy::Weighted => {
753 sorted.sort_by(|(id_a, a), (id_b, b)| {
755 let score_a = self.blend_with_payment_priority(a, a.score());
756 let score_b = self.blend_with_payment_priority(b, b.score());
757 let score_cmp = score_b
758 .partial_cmp(&score_a)
759 .unwrap_or(std::cmp::Ordering::Equal);
760 if score_cmp == std::cmp::Ordering::Equal {
761 id_a.cmp(id_b) } else {
763 score_cmp
764 }
765 });
766 }
767 SelectionStrategy::LowestLatency => {
768 sorted.sort_by(|(id_a, a), (id_b, b)| {
770 let rtt_cmp = a
771 .srtt_ms
772 .partial_cmp(&b.srtt_ms)
773 .unwrap_or(std::cmp::Ordering::Equal);
774 if rtt_cmp == std::cmp::Ordering::Equal {
775 let score_cmp = b
776 .score()
777 .partial_cmp(&a.score())
778 .unwrap_or(std::cmp::Ordering::Equal);
779 if score_cmp == std::cmp::Ordering::Equal {
780 id_a.cmp(id_b)
781 } else {
782 score_cmp
783 }
784 } else {
785 rtt_cmp
786 }
787 });
788 }
789 SelectionStrategy::HighestSuccessRate => {
790 sorted.sort_by(|(id_a, a), (id_b, b)| {
792 let rate_cmp = b
793 .success_rate()
794 .partial_cmp(&a.success_rate())
795 .unwrap_or(std::cmp::Ordering::Equal);
796 if rate_cmp == std::cmp::Ordering::Equal {
797 id_a.cmp(id_b)
798 } else {
799 rate_cmp
800 }
801 });
802 }
803 SelectionStrategy::TitForTat => {
804 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
805 sorted.sort_by(|(id_a, a), (id_b, b)| {
806 let score_a =
807 self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
808 let score_b =
809 self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
810 let score_cmp = score_b
811 .partial_cmp(&score_a)
812 .unwrap_or(std::cmp::Ordering::Equal);
813 if score_cmp == std::cmp::Ordering::Equal {
814 id_a.cmp(id_b)
815 } else {
816 score_cmp
817 }
818 });
819 }
820 SelectionStrategy::UtilityUcb => {
821 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
822 sorted.sort_by(|(id_a, a), (id_b, b)| {
823 let score_a =
824 self.blend_with_payment_priority(a, a.utility_score(total_requests));
825 let score_b =
826 self.blend_with_payment_priority(b, b.utility_score(total_requests));
827 let score_cmp = score_b
828 .partial_cmp(&score_a)
829 .unwrap_or(std::cmp::Ordering::Equal);
830 if score_cmp == std::cmp::Ordering::Equal {
831 id_a.cmp(id_b)
832 } else {
833 score_cmp
834 }
835 });
836 }
837 SelectionStrategy::RoundRobin => {
838 if !sorted.is_empty() {
840 let idx = self.round_robin_idx % sorted.len();
841 sorted.rotate_left(idx);
842 self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
843 }
844 }
845 SelectionStrategy::Random => {
846 }
849 }
850
851 sorted.into_iter().map(|(id, _)| id).collect()
852 }
853
854 pub fn select_best(&mut self) -> Option<String> {
856 self.select_peers().into_iter().next()
857 }
858
859 pub fn select_top(&mut self, n: usize) -> Vec<String> {
861 self.select_peers().into_iter().take(n).collect()
862 }
863
864 pub fn summary(&self) -> SelectorSummary {
866 let count = self.stats.len();
867 if count == 0 {
868 return SelectorSummary::default();
869 }
870
871 let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
872 let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
873 let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
874 let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
875
876 let avg_rtt = {
877 let rtts: Vec<f64> = self
878 .stats
879 .values()
880 .filter(|s| s.srtt_ms > 0.0)
881 .map(|s| s.srtt_ms)
882 .collect();
883 if rtts.is_empty() {
884 0.0
885 } else {
886 rtts.iter().sum::<f64>() / rtts.len() as f64
887 }
888 };
889
890 SelectorSummary {
891 peer_count: count,
892 total_requests,
893 total_successes,
894 total_timeouts,
895 backed_off_count: backed_off,
896 avg_rtt_ms: avg_rtt,
897 overall_success_rate: if total_requests > 0 {
898 total_successes as f64 / total_requests as f64
899 } else {
900 0.0
901 },
902 }
903 }
904
905 pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
907 let mut by_principal = self.persisted_metadata.clone();
908 for stats in self.stats.values() {
909 let principal = peer_principal(&stats.peer_id).to_string();
910 by_principal.insert(
911 principal.clone(),
912 PersistedPeerMetadata::from_stats(principal, stats),
913 );
914 }
915
916 let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
917 peers.sort_by(|a, b| a.principal.cmp(&b.principal));
918
919 PeerMetadataSnapshot {
920 version: PEER_METADATA_SNAPSHOT_VERSION,
921 peers,
922 }
923 }
924
925 pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
927 if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
928 return;
929 }
930
931 self.persisted_metadata.clear();
932 for peer in &snapshot.peers {
933 self.persisted_metadata
934 .insert(peer.principal.clone(), peer.clone());
935 }
936
937 for stats in self.stats.values_mut() {
938 if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
939 saved.apply_to_stats(stats);
940 }
941 }
942 }
943}
944
945#[derive(Debug, Clone, Default)]
947pub struct SelectorSummary {
948 pub peer_count: usize,
949 pub total_requests: u64,
950 pub total_successes: u64,
951 pub total_timeouts: u64,
952 pub backed_off_count: usize,
953 pub avg_rtt_ms: f64,
954 pub overall_success_rate: f64,
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960 use std::thread::sleep;
961
962 #[test]
963 fn test_peer_stats_success_rate() {
964 let mut stats = PeerStats::new("peer1");
965 assert_eq!(stats.success_rate(), 0.5); stats.record_request(40);
968 stats.record_success(50, 1024);
969 assert_eq!(stats.success_rate(), 1.0);
970
971 stats.record_request(40);
972 stats.record_timeout();
973 assert_eq!(stats.success_rate(), 0.5);
974 }
975
976 #[test]
977 fn test_peer_stats_rtt_calculation() {
978 let mut stats = PeerStats::new("peer1");
979
980 stats.record_request(40);
982 stats.record_success(100, 1024);
983 assert_eq!(stats.srtt_ms, 100.0);
984 assert_eq!(stats.rttvar_ms, 50.0); stats.record_request(40);
988 stats.record_success(80, 1024);
989 assert!((stats.srtt_ms - 97.5).abs() < 0.1);
991 }
992
993 #[test]
994 fn test_peer_stats_backoff() {
995 let mut stats = PeerStats::new("peer1");
996 assert!(!stats.is_backed_off());
997
998 stats.record_timeout();
999 assert!(stats.is_backed_off());
1000 assert!(stats.backoff_remaining() > Duration::ZERO);
1001 }
1002
1003 #[test]
1004 fn test_peer_stats_backoff_clears_on_success() {
1005 let mut stats = PeerStats::new("peer1");
1006 stats.record_timeout();
1007 assert!(stats.is_backed_off());
1008
1009 stats.record_success(50, 1024);
1010 assert!(!stats.is_backed_off());
1011 assert_eq!(stats.backoff_level, 0);
1012 }
1013
1014 #[test]
1015 fn test_peer_selector_add_remove() {
1016 let mut selector = PeerSelector::new();
1017 selector.add_peer("peer1");
1018 selector.add_peer("peer2");
1019 assert!(selector.get_stats("peer1").is_some());
1020 assert!(selector.get_stats("peer2").is_some());
1021
1022 selector.remove_peer("peer1");
1023 assert!(selector.get_stats("peer1").is_none());
1024 assert!(selector.get_stats("peer2").is_some());
1025 }
1026
1027 #[test]
1028 fn test_peer_selector_weighted_selection() {
1029 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1030 selector.add_peer("peer1");
1031 selector.add_peer("peer2");
1032 selector.add_peer("peer3");
1033
1034 selector.record_request("peer1", 40);
1036 selector.record_success("peer1", 20, 1024);
1037 selector.record_request("peer1", 40);
1038 selector.record_success("peer1", 25, 1024);
1039
1040 selector.record_request("peer2", 40);
1042 selector.record_success("peer2", 100, 1024);
1043 selector.record_request("peer2", 40);
1044 selector.record_timeout("peer2");
1045
1046 selector.record_request("peer3", 40);
1048 selector.record_timeout("peer3");
1049 selector.record_request("peer3", 40);
1050 selector.record_timeout("peer3");
1051
1052 let peers = selector.select_peers();
1054 assert_eq!(peers[0], "peer1");
1056 }
1057
1058 #[test]
1059 fn test_peer_selector_backed_off_peers() {
1060 let mut selector = PeerSelector::new();
1061 selector.add_peer("peer1");
1062 selector.add_peer("peer2");
1063
1064 selector.record_timeout("peer1");
1066 assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1067
1068 let peers = selector.select_peers();
1070 assert_eq!(peers.len(), 1);
1071 assert_eq!(peers[0], "peer2");
1072 }
1073
1074 #[test]
1075 fn test_peer_selector_all_backed_off_fallback() {
1076 let mut selector = PeerSelector::new();
1077 selector.add_peer("peer1");
1078 selector.add_peer("peer2");
1079
1080 selector.record_timeout("peer1");
1082 selector.record_timeout("peer2");
1083
1084 let peers = selector.select_peers();
1086 assert_eq!(peers.len(), 2);
1087 }
1088
1089 #[test]
1090 fn test_peer_selector_fairness() {
1091 let mut selector = PeerSelector::new();
1092 selector.set_fairness(true);
1093
1094 for i in 1..=6 {
1096 selector.add_peer(format!("peer{}", i));
1097 }
1098
1099 sleep(Duration::from_millis(15));
1101
1102 for _ in 0..100 {
1103 selector.record_request("peer1", 40);
1104 selector.record_success("peer1", 10, 100);
1105 }
1106
1107 for i in 2..=6 {
1109 selector.record_request(&format!("peer{}", i), 40);
1110 selector.record_success(&format!("peer{}", i), 10, 100);
1111 }
1112
1113 let skipped = selector.should_skip_for_fairness("peer1");
1115 let _ = skipped; }
1117
1118 #[test]
1119 fn test_peer_selector_summary() {
1120 let mut selector = PeerSelector::new();
1121 selector.add_peer("peer1");
1122 selector.add_peer("peer2");
1123
1124 selector.record_request("peer1", 40);
1125 selector.record_success("peer1", 50, 1024);
1126 selector.record_request("peer2", 40);
1127 selector.record_timeout("peer2");
1128
1129 let summary = selector.summary();
1130 assert_eq!(summary.peer_count, 2);
1131 assert_eq!(summary.total_requests, 2);
1132 assert_eq!(summary.total_successes, 1);
1133 assert_eq!(summary.total_timeouts, 1);
1134 assert_eq!(summary.backed_off_count, 1);
1135 assert_eq!(summary.overall_success_rate, 0.5);
1136 }
1137
1138 #[test]
1139 fn test_peer_stats_score() {
1140 let mut stats = PeerStats::new("peer1");
1141
1142 let initial_score = stats.score();
1144 assert!(initial_score > 0.3 && initial_score < 0.7);
1145
1146 for _ in 0..10 {
1148 stats.record_request(40);
1149 stats.record_success(20, 1024);
1150 }
1151 let good_score = stats.score();
1152 assert!(good_score > 0.8);
1153
1154 let mut bad_stats = PeerStats::new("peer2");
1156 for _ in 0..10 {
1157 bad_stats.record_request(40);
1158 bad_stats.record_timeout();
1159 }
1160 let bad_score = bad_stats.score();
1161 assert!(bad_score < 0.3);
1162
1163 assert!(good_score > bad_score);
1164 }
1165
1166 #[test]
1167 fn test_peer_stats_utility_score_prefers_good_over_bad() {
1168 let mut good = PeerStats::new("good");
1169 good.requests_sent = 120;
1170 good.successes = 96;
1171 good.failures = 8;
1172 good.timeouts = 4;
1173 good.srtt_ms = 30.0;
1174 good.bytes_sent = 120 * 40;
1175 good.bytes_received = 96 * 1024;
1176
1177 let mut bad = PeerStats::new("bad");
1178 bad.requests_sent = 120;
1179 bad.successes = 40;
1180 bad.failures = 50;
1181 bad.timeouts = 30;
1182 bad.srtt_ms = 220.0;
1183 bad.bytes_sent = 120 * 40;
1184 bad.bytes_received = 40 * 1024;
1185
1186 let total_requests = good.requests_sent + bad.requests_sent;
1187 assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1188 }
1189
1190 #[test]
1191 fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
1192 let mut reciprocal = PeerStats::new("reciprocal");
1193 reciprocal.requests_sent = 100;
1194 reciprocal.successes = 90;
1195 reciprocal.failures = 5;
1196 reciprocal.timeouts = 5;
1197 reciprocal.srtt_ms = 40.0;
1198 reciprocal.bytes_sent = 100 * 40;
1199 reciprocal.bytes_received = 90 * 1024;
1200
1201 let mut leecher = PeerStats::new("leecher");
1202 leecher.requests_sent = 100;
1203 leecher.successes = 40;
1204 leecher.failures = 30;
1205 leecher.timeouts = 30;
1206 leecher.srtt_ms = 120.0;
1207 leecher.bytes_sent = 100 * 40;
1208 leecher.bytes_received = 10 * 1024;
1209
1210 let total_requests = reciprocal.requests_sent + leecher.requests_sent;
1211 assert!(
1212 reciprocal.tit_for_tat_score(total_requests)
1213 > leecher.tit_for_tat_score(total_requests)
1214 );
1215 }
1216
1217 #[test]
1218 fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1219 let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1220 selector.add_peer("stable");
1221 selector.add_peer("new");
1222
1223 {
1224 let stable = selector.get_stats_mut("stable").unwrap();
1225 stable.requests_sent = 500;
1226 stable.successes = 450;
1227 stable.failures = 35;
1228 stable.timeouts = 15;
1229 stable.srtt_ms = 35.0;
1230 stable.bytes_sent = 500 * 40;
1231 stable.bytes_received = 450 * 1024;
1232 }
1233 {
1234 let new_peer = selector.get_stats_mut("new").unwrap();
1235 new_peer.requests_sent = 2;
1236 new_peer.successes = 2;
1237 new_peer.failures = 0;
1238 new_peer.timeouts = 0;
1239 new_peer.srtt_ms = 70.0;
1240 new_peer.bytes_sent = 2 * 40;
1241 new_peer.bytes_received = 2 * 1024;
1242 }
1243
1244 let peers = selector.select_peers();
1245 assert_eq!(peers[0], "new");
1246 }
1247
1248 #[test]
1249 fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
1250 let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
1251 selector.add_peer("reciprocal");
1252 selector.add_peer("leecher");
1253
1254 {
1255 let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
1256 reciprocal.requests_sent = 120;
1257 reciprocal.successes = 102;
1258 reciprocal.failures = 8;
1259 reciprocal.timeouts = 10;
1260 reciprocal.srtt_ms = 45.0;
1261 reciprocal.bytes_sent = 120 * 40;
1262 reciprocal.bytes_received = 102 * 1024;
1263 }
1264 {
1265 let leecher = selector.get_stats_mut("leecher").unwrap();
1266 leecher.requests_sent = 120;
1267 leecher.successes = 70;
1268 leecher.failures = 20;
1269 leecher.timeouts = 30;
1270 leecher.srtt_ms = 35.0;
1271 leecher.bytes_sent = 120 * 40;
1272 leecher.bytes_received = 8 * 1024;
1273 }
1274
1275 let peers = selector.select_peers();
1276 assert_eq!(peers[0], "reciprocal");
1277 }
1278
1279 #[test]
1280 fn test_lowest_latency_strategy() {
1281 let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1282 selector.add_peer("peer1");
1283 selector.add_peer("peer2");
1284 selector.add_peer("peer3");
1285
1286 selector.record_request("peer1", 40);
1288 selector.record_success("peer1", 100, 1024);
1289
1290 selector.record_request("peer2", 40);
1292 selector.record_success("peer2", 20, 1024);
1293
1294 selector.record_request("peer3", 40);
1296 selector.record_success("peer3", 50, 1024);
1297
1298 let peers = selector.select_peers();
1299 assert_eq!(peers[0], "peer2");
1301 }
1302
1303 fn build_cashu_priority_fixture() -> PeerSelector {
1304 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1305 selector.add_peer("reliable");
1306 selector.add_peer("paid");
1307
1308 {
1309 let reliable = selector.get_stats_mut("reliable").expect("reliable");
1310 reliable.requests_sent = 80;
1311 reliable.successes = 75;
1312 reliable.failures = 2;
1313 reliable.timeouts = 3;
1314 reliable.srtt_ms = 40.0;
1315 reliable.bytes_sent = 80 * 40;
1316 reliable.bytes_received = 75 * 1024;
1317 }
1318 {
1319 let paid = selector.get_stats_mut("paid").expect("paid");
1320 paid.requests_sent = 80;
1321 paid.successes = 36;
1322 paid.failures = 24;
1323 paid.timeouts = 20;
1324 paid.srtt_ms = 700.0;
1325 paid.bytes_sent = 80 * 40;
1326 paid.bytes_received = 36 * 512;
1327 }
1328
1329 selector
1330 }
1331
1332 #[test]
1333 fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1334 let mut selector = build_cashu_priority_fixture();
1335 selector.set_cashu_payment_weight(0.0);
1336 selector.record_cashu_payment("paid", 5_000);
1337
1338 let peers = selector.select_peers();
1339 assert_eq!(peers[0], "reliable");
1340 }
1341
1342 #[test]
1343 fn test_cashu_payment_weight_prioritizes_paid_peer() {
1344 let mut selector = build_cashu_priority_fixture();
1345 selector.set_cashu_payment_weight(0.8);
1346 selector.record_cashu_payment("paid", 5_000);
1347
1348 let peers = selector.select_peers();
1349 assert_eq!(peers[0], "paid");
1350 }
1351
1352 #[test]
1353 fn test_cashu_payment_default_downranks_peer() {
1354 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1355 selector.add_peer("honest");
1356 selector.add_peer("delinquent");
1357
1358 for peer_id in ["honest", "delinquent"] {
1359 let stats = selector.get_stats_mut(peer_id).expect("stats");
1360 stats.requests_sent = 40;
1361 stats.successes = 34;
1362 stats.failures = 3;
1363 stats.timeouts = 3;
1364 stats.srtt_ms = 60.0;
1365 stats.bytes_sent = 40 * 40;
1366 stats.bytes_received = 34 * 1024;
1367 }
1368
1369 selector.record_cashu_payment_default("delinquent");
1370
1371 let peers = selector.select_peers();
1372 assert_eq!(peers[0], "honest");
1373 assert!(!peers.iter().any(|peer| peer == "delinquent"));
1374 }
1375
1376 #[test]
1377 fn test_payment_default_threshold_blocks_peer() {
1378 let mut selector = PeerSelector::new();
1379 selector.record_cashu_payment_default("peer-a");
1380 assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1381 assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1382 }
1383
1384 #[test]
1385 fn test_peer_principal_prefers_stable_identity_prefix() {
1386 assert_eq!(peer_principal("npub1abc:session-1"), "npub1abc");
1387 assert_eq!(peer_principal("npub1abc"), "npub1abc");
1388 }
1389
1390 #[test]
1391 fn test_metadata_snapshot_restores_across_session_ids() {
1392 let mut selector = PeerSelector::new();
1393 selector.add_peer("npub1stable:session-a");
1394 selector.record_request("npub1stable:session-a", 64);
1395 selector.record_success("npub1stable:session-a", 32, 1024);
1396 selector.record_cashu_payment("npub1stable:session-a", 77);
1397 selector.record_cashu_receipt("npub1stable:session-a", 33);
1398 selector.record_cashu_payment_default("npub1stable:session-a");
1399
1400 let snapshot = selector.export_peer_metadata_snapshot();
1401 assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1402 assert_eq!(snapshot.peers.len(), 1);
1403 assert_eq!(snapshot.peers[0].principal, "npub1stable");
1404
1405 let mut restored = PeerSelector::new();
1406 restored.import_peer_metadata_snapshot(&snapshot);
1407 restored.add_peer("npub1stable:session-b");
1408 let stats = restored
1409 .get_stats("npub1stable:session-b")
1410 .expect("restored stats");
1411 assert_eq!(stats.requests_sent, 1);
1412 assert_eq!(stats.successes, 1);
1413 assert_eq!(stats.cashu_paid_sat, 77);
1414 assert_eq!(stats.cashu_received_sat, 33);
1415 assert_eq!(stats.cashu_payment_receipts, 1);
1416 assert_eq!(stats.cashu_payment_defaults, 1);
1417 }
1418}