1use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; const SELECTION_MIN_PEERS: usize = 5; const INITIAL_BACKOFF_MS: u64 = 1000; const BACKOFF_MULTIPLIER: u64 = 2; const MAX_BACKOFF_MS: u64 = 480_000; const MIN_RTO_MS: u64 = 50; const MAX_RTO_MS: u64 = 60_000; const INITIAL_RTO_MS: u64 = 1000; pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
36pub struct PersistedPeerMetadata {
37 pub principal: String,
39 pub requests_sent: u64,
40 pub successes: u64,
41 pub timeouts: u64,
42 pub failures: u64,
43 pub srtt_ms: f64,
44 pub rttvar_ms: f64,
45 pub rto_ms: u64,
46 pub bytes_received: u64,
47 pub bytes_sent: u64,
48 pub cashu_paid_sat: u64,
49 pub cashu_received_sat: u64,
50 pub cashu_payment_receipts: u64,
51 pub cashu_payment_defaults: u64,
52}
53
54impl PersistedPeerMetadata {
55 fn from_stats(principal: String, stats: &PeerStats) -> Self {
56 Self {
57 principal,
58 requests_sent: stats.requests_sent,
59 successes: stats.successes,
60 timeouts: stats.timeouts,
61 failures: stats.failures,
62 srtt_ms: sanitize_latency(stats.srtt_ms),
63 rttvar_ms: sanitize_latency(stats.rttvar_ms),
64 rto_ms: clamp_rto(stats.rto_ms),
65 bytes_received: stats.bytes_received,
66 bytes_sent: stats.bytes_sent,
67 cashu_paid_sat: stats.cashu_paid_sat,
68 cashu_received_sat: stats.cashu_received_sat,
69 cashu_payment_receipts: stats.cashu_payment_receipts,
70 cashu_payment_defaults: stats.cashu_payment_defaults,
71 }
72 }
73
74 fn apply_to_stats(&self, stats: &mut PeerStats) {
75 stats.requests_sent = self.requests_sent;
76 stats.successes = self.successes;
77 stats.timeouts = self.timeouts;
78 stats.failures = self.failures;
79 stats.srtt_ms = sanitize_latency(self.srtt_ms);
80 stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
81 stats.rto_ms = clamp_rto(self.rto_ms);
82 stats.bytes_received = self.bytes_received;
83 stats.bytes_sent = self.bytes_sent;
84 stats.cashu_paid_sat = self.cashu_paid_sat;
85 stats.cashu_received_sat = self.cashu_received_sat;
86 stats.cashu_payment_receipts = self.cashu_payment_receipts;
87 stats.cashu_payment_defaults = self.cashu_payment_defaults;
88
89 stats.backoff_level = 0;
91 stats.backed_off_until = None;
92 stats.last_success = None;
93 stats.last_failure = None;
94 stats.consecutive_rto_backoffs = 0;
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct PeerMetadataSnapshot {
101 pub version: u32,
102 pub peers: Vec<PersistedPeerMetadata>,
103}
104
105impl Default for PeerMetadataSnapshot {
106 fn default() -> Self {
107 Self {
108 version: PEER_METADATA_SNAPSHOT_VERSION,
109 peers: Vec::new(),
110 }
111 }
112}
113
114fn sanitize_latency(value: f64) -> f64 {
115 if value.is_finite() && value >= 0.0 {
116 value
117 } else {
118 0.0
119 }
120}
121
122fn clamp_rto(rto_ms: u64) -> u64 {
123 if rto_ms == 0 {
124 INITIAL_RTO_MS
125 } else {
126 rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
127 }
128}
129
130pub fn peer_principal(peer_id: &str) -> &str {
135 peer_id
136}
137
138#[derive(Debug, Clone)]
140pub struct PeerStats {
141 pub peer_id: String,
143 pub connected_at: Instant,
145 pub requests_sent: u64,
147 pub successes: u64,
149 pub timeouts: u64,
151 pub failures: u64,
153 pub srtt_ms: f64,
155 pub rttvar_ms: f64,
157 pub rto_ms: u64,
159 pub consecutive_rto_backoffs: u32,
161 pub backoff_level: u32,
163 pub backed_off_until: Option<Instant>,
165 pub last_success: Option<Instant>,
167 pub last_failure: Option<Instant>,
169 pub bytes_received: u64,
171 pub bytes_sent: u64,
173 pub cashu_paid_sat: u64,
175 pub cashu_received_sat: u64,
177 pub cashu_payment_receipts: u64,
179 pub cashu_payment_defaults: u64,
181}
182
183impl PeerStats {
184 pub fn new(peer_id: impl Into<String>) -> Self {
186 Self {
187 peer_id: peer_id.into(),
188 connected_at: Instant::now(),
189 requests_sent: 0,
190 successes: 0,
191 timeouts: 0,
192 failures: 0,
193 srtt_ms: 0.0,
194 rttvar_ms: 0.0,
195 rto_ms: INITIAL_RTO_MS,
196 consecutive_rto_backoffs: 0,
197 backoff_level: 0,
198 backed_off_until: None,
199 last_success: None,
200 last_failure: None,
201 bytes_received: 0,
202 bytes_sent: 0,
203 cashu_paid_sat: 0,
204 cashu_received_sat: 0,
205 cashu_payment_receipts: 0,
206 cashu_payment_defaults: 0,
207 }
208 }
209
210 pub fn success_rate(&self) -> f64 {
212 if self.requests_sent == 0 {
213 return 0.5; }
215 self.successes as f64 / self.requests_sent as f64
216 }
217
218 pub fn selection_rate(&self) -> f64 {
220 let elapsed = self.connected_at.elapsed();
221 if elapsed.as_secs() < 10 {
222 return 0.0; }
224 self.requests_sent as f64 / elapsed.as_secs_f64()
225 }
226
227 pub fn is_backed_off(&self) -> bool {
229 if let Some(until) = self.backed_off_until {
230 Instant::now() < until
231 } else {
232 false
233 }
234 }
235
236 pub fn backoff_remaining(&self) -> Duration {
238 if let Some(until) = self.backed_off_until {
239 let now = Instant::now();
240 if now < until {
241 return until - now;
242 }
243 }
244 Duration::ZERO
245 }
246
247 pub fn record_request(&mut self, bytes: u64) {
249 self.requests_sent += 1;
250 self.bytes_sent += bytes;
251 }
252
253 pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
256 self.successes += 1;
257 self.bytes_received += bytes;
258 self.last_success = Some(Instant::now());
259 self.consecutive_rto_backoffs = 0;
260
261 self.backed_off_until = None;
263 self.backoff_level = 0;
264
265 let rtt = rtt_ms as f64;
267 if self.srtt_ms == 0.0 {
268 self.srtt_ms = rtt;
270 self.rttvar_ms = rtt / 2.0;
271 } else {
272 self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
277 self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
278 }
279
280 let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
282 self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
283 }
284
285 pub fn record_timeout(&mut self) {
287 self.timeouts += 1;
288 self.last_failure = Some(Instant::now());
289
290 self.apply_backoff();
292
293 if self.consecutive_rto_backoffs < 5 {
295 self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
296 self.consecutive_rto_backoffs += 1;
297 }
298 }
299
300 pub fn record_failure(&mut self) {
302 self.failures += 1;
303 self.last_failure = Some(Instant::now());
304 self.apply_backoff();
305 }
306
307 pub fn record_cashu_payment(&mut self, amount_sat: u64) {
309 if amount_sat == 0 {
310 return;
311 }
312 self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
313 }
314
315 pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
317 if amount_sat == 0 {
318 return;
319 }
320 self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
321 self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
322 }
323
324 pub fn record_cashu_payment_default(&mut self) {
326 self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
327 self.last_failure = Some(Instant::now());
328 self.apply_backoff();
329 }
330
331 fn apply_backoff(&mut self) {
333 self.backoff_level += 1;
334 let backoff_ms = (INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(self.backoff_level - 1))
335 .min(MAX_BACKOFF_MS);
336 self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
337 }
338
339 pub fn score(&self) -> f64 {
342 let success_score = self.success_rate();
344
345 let rtt_score = if self.srtt_ms <= 0.0 {
348 0.5 } else {
350 (500.0 / (self.srtt_ms + 50.0)).min(1.0)
351 };
352
353 let recency_bonus = if let Some(last) = self.last_success {
355 let secs_ago = last.elapsed().as_secs_f64();
356 if secs_ago < 60.0 {
357 0.1 } else {
359 0.0
360 }
361 } else {
362 0.0
363 };
364
365 0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
368 }
369
370 pub fn utility_score(&self, total_requests: u64) -> f64 {
378 let good = self.successes as f64 + 1.0;
379 let bad = (self.failures + self.timeouts) as f64 + 1.0;
380 let ratio = good / bad;
381 let ratio_score = ratio / (1.0 + ratio);
382
383 let latency_score = if self.srtt_ms <= 0.0 {
384 0.5
385 } else {
386 (300.0 / (self.srtt_ms + 50.0)).min(1.0)
387 };
388
389 let efficiency_score = if self.bytes_sent == 0 {
390 0.5
391 } else {
392 (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
393 };
394
395 let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
396
397 let uncertainty =
398 (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
399 let exploration_bonus = 0.20 * uncertainty;
400
401 exploitation + exploration_bonus
402 }
403
404 pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
410 let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
412
413 let reciprocity_raw = if self.bytes_sent == 0 {
416 1.0
417 } else {
418 self.bytes_received as f64 / self.bytes_sent as f64
419 };
420 let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
421 let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
422 let reciprocity =
423 (1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
424
425 let rtt_score = if self.srtt_ms <= 0.0 {
426 0.5
427 } else {
428 (400.0 / (self.srtt_ms + 50.0)).min(1.0)
429 };
430
431 let timeout_rate = if self.requests_sent == 0 {
432 0.0
433 } else {
434 self.timeouts as f64 / self.requests_sent as f64
435 };
436 let failure_rate = if self.requests_sent == 0 {
437 0.0
438 } else {
439 self.failures as f64 / self.requests_sent as f64
440 };
441 let retaliation_penalty =
442 (0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
443 .min(0.95);
444
445 let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
446 let exploration = 0.03
447 * (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
448
449 (cooperative + exploration - retaliation_penalty).max(0.0)
450 }
451
452 pub fn cashu_priority_boost(&self) -> f64 {
454 if self.cashu_paid_sat == 0 {
455 return 0.0;
456 }
457 let paid = self.cashu_paid_sat as f64;
458 paid / (paid + 32.0)
459 }
460
461 pub fn payment_reliability_multiplier(&self) -> f64 {
464 if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
465 return 1.0;
466 }
467 (self.cashu_payment_receipts as f64 + 1.0)
468 / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
469 }
470
471 pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
472 threshold > 0 && self.cashu_payment_defaults >= threshold
473 }
474}
475
476#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
478pub enum SelectionStrategy {
479 #[default]
481 Weighted,
482 RoundRobin,
484 Random,
486 LowestLatency,
488 HighestSuccessRate,
490 TitForTat,
492 UtilityUcb,
494}
495
496#[derive(Debug, Default)]
504pub struct PeerSelector {
505 stats: HashMap<String, PeerStats>,
507 persisted_metadata: HashMap<String, PersistedPeerMetadata>,
509 strategy: SelectionStrategy,
511 fairness_enabled: bool,
513 round_robin_idx: usize,
515 cashu_payment_weight: f64,
517}
518
519impl PeerSelector {
520 pub fn new() -> Self {
522 Self {
523 stats: HashMap::new(),
524 persisted_metadata: HashMap::new(),
525 strategy: SelectionStrategy::Weighted,
526 fairness_enabled: true,
527 round_robin_idx: 0,
528 cashu_payment_weight: 0.0,
529 }
530 }
531
532 pub fn with_strategy(strategy: SelectionStrategy) -> Self {
534 Self {
535 stats: HashMap::new(),
536 persisted_metadata: HashMap::new(),
537 strategy,
538 fairness_enabled: true,
539 round_robin_idx: 0,
540 cashu_payment_weight: 0.0,
541 }
542 }
543
544 pub fn set_fairness(&mut self, enabled: bool) {
546 self.fairness_enabled = enabled;
547 }
548
549 pub fn set_cashu_payment_weight(&mut self, weight: f64) {
552 self.cashu_payment_weight = weight.clamp(0.0, 1.0);
553 }
554
555 pub fn add_peer(&mut self, peer_id: impl Into<String>) {
557 let peer_id = peer_id.into();
558 if self.stats.contains_key(&peer_id) {
559 return;
560 }
561
562 let mut stats = PeerStats::new(peer_id.clone());
563 if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
564 saved.apply_to_stats(&mut stats);
565 }
566 self.stats.insert(peer_id, stats);
567 }
568
569 pub fn remove_peer(&mut self, peer_id: &str) {
571 if let Some(stats) = self.stats.remove(peer_id) {
572 let principal = peer_principal(&stats.peer_id).to_string();
573 self.persisted_metadata.insert(
574 principal.clone(),
575 PersistedPeerMetadata::from_stats(principal, &stats),
576 );
577 }
578 }
579
580 pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
582 self.stats.get(peer_id)
583 }
584
585 pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
587 self.stats.get_mut(peer_id)
588 }
589
590 pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
592 self.stats.values()
593 }
594
595 pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
597 if let Some(stats) = self.stats.get_mut(peer_id) {
598 stats.record_request(bytes);
599 }
600 }
601
602 pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
604 if let Some(stats) = self.stats.get_mut(peer_id) {
605 stats.record_success(rtt_ms, bytes);
606 }
607 }
608
609 pub fn record_timeout(&mut self, peer_id: &str) {
611 if let Some(stats) = self.stats.get_mut(peer_id) {
612 stats.record_timeout();
613 }
614 }
615
616 pub fn record_failure(&mut self, peer_id: &str) {
618 if let Some(stats) = self.stats.get_mut(peer_id) {
619 stats.record_failure();
620 }
621 }
622
623 pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
625 if amount_sat == 0 {
626 return;
627 }
628 let entry = self
629 .stats
630 .entry(peer_id.to_string())
631 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
632 entry.record_cashu_payment(amount_sat);
633 }
634
635 pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
637 if amount_sat == 0 {
638 return;
639 }
640 let entry = self
641 .stats
642 .entry(peer_id.to_string())
643 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
644 entry.record_cashu_receipt(amount_sat);
645 }
646
647 pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
649 let entry = self
650 .stats
651 .entry(peer_id.to_string())
652 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
653 entry.record_cashu_payment_default();
654 }
655
656 pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
657 self.stats
658 .get(peer_id)
659 .map(|stats| stats.exceeds_payment_default_threshold(threshold))
660 .unwrap_or(false)
661 }
662
663 fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
664 let reliable_base = base_score * stats.payment_reliability_multiplier();
665 if self.cashu_payment_weight <= 0.0 {
666 return reliable_base;
667 }
668 let payment_score = stats.cashu_priority_boost();
669 (1.0 - self.cashu_payment_weight) * reliable_base
670 + self.cashu_payment_weight * payment_score
671 }
672
673 fn available_peers(&self) -> Vec<String> {
675 self.stats
676 .iter()
677 .filter(|(_, s)| !s.is_backed_off())
678 .map(|(id, _)| id.clone())
679 .collect()
680 }
681
682 #[cfg(test)]
684 fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
685 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
686 self.should_skip_for_fairness_with_total(peer_id, total_rate)
687 }
688
689 fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
690 if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
691 return false;
692 }
693
694 if let Some(stats) = self.stats.get(peer_id) {
696 let peer_rate = stats.selection_rate();
697 let proportion = peer_rate / total_rate;
698 return proportion > SELECTION_PERCENTAGE_WARNING;
699 }
700
701 false
702 }
703
704 pub fn select_peers(&mut self) -> Vec<String> {
709 let available = self.available_peers();
710 if available.is_empty() {
711 let mut backed_off: Vec<_> = self
714 .stats
715 .iter()
716 .filter(|(_, s)| s.is_backed_off())
717 .map(|(id, s)| (id.clone(), s.backoff_remaining()))
718 .collect();
719 backed_off.sort_by_key(|(_, remaining)| *remaining);
720 return backed_off.into_iter().map(|(id, _)| id).collect();
721 }
722
723 let candidates: Vec<String> =
725 if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
726 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
727 available
728 .into_iter()
729 .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
730 .collect()
731 } else {
732 available
733 };
734
735 let candidates = if candidates.is_empty() {
737 self.available_peers()
738 } else {
739 candidates
740 };
741
742 let mut sorted: Vec<_> = candidates
744 .into_iter()
745 .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
746 .collect();
747
748 match self.strategy {
749 SelectionStrategy::Weighted => {
750 sorted.sort_by(|(id_a, a), (id_b, b)| {
752 let score_a = self.blend_with_payment_priority(a, a.score());
753 let score_b = self.blend_with_payment_priority(b, b.score());
754 let score_cmp = score_b
755 .partial_cmp(&score_a)
756 .unwrap_or(std::cmp::Ordering::Equal);
757 if score_cmp == std::cmp::Ordering::Equal {
758 id_a.cmp(id_b) } else {
760 score_cmp
761 }
762 });
763 }
764 SelectionStrategy::LowestLatency => {
765 sorted.sort_by(|(id_a, a), (id_b, b)| {
767 let rtt_cmp = a
768 .srtt_ms
769 .partial_cmp(&b.srtt_ms)
770 .unwrap_or(std::cmp::Ordering::Equal);
771 if rtt_cmp == std::cmp::Ordering::Equal {
772 let score_cmp = b
773 .score()
774 .partial_cmp(&a.score())
775 .unwrap_or(std::cmp::Ordering::Equal);
776 if score_cmp == std::cmp::Ordering::Equal {
777 id_a.cmp(id_b)
778 } else {
779 score_cmp
780 }
781 } else {
782 rtt_cmp
783 }
784 });
785 }
786 SelectionStrategy::HighestSuccessRate => {
787 sorted.sort_by(|(id_a, a), (id_b, b)| {
789 let rate_cmp = b
790 .success_rate()
791 .partial_cmp(&a.success_rate())
792 .unwrap_or(std::cmp::Ordering::Equal);
793 if rate_cmp == std::cmp::Ordering::Equal {
794 id_a.cmp(id_b)
795 } else {
796 rate_cmp
797 }
798 });
799 }
800 SelectionStrategy::TitForTat => {
801 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
802 sorted.sort_by(|(id_a, a), (id_b, b)| {
803 let score_a =
804 self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
805 let score_b =
806 self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
807 let score_cmp = score_b
808 .partial_cmp(&score_a)
809 .unwrap_or(std::cmp::Ordering::Equal);
810 if score_cmp == std::cmp::Ordering::Equal {
811 id_a.cmp(id_b)
812 } else {
813 score_cmp
814 }
815 });
816 }
817 SelectionStrategy::UtilityUcb => {
818 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
819 sorted.sort_by(|(id_a, a), (id_b, b)| {
820 let score_a =
821 self.blend_with_payment_priority(a, a.utility_score(total_requests));
822 let score_b =
823 self.blend_with_payment_priority(b, b.utility_score(total_requests));
824 let score_cmp = score_b
825 .partial_cmp(&score_a)
826 .unwrap_or(std::cmp::Ordering::Equal);
827 if score_cmp == std::cmp::Ordering::Equal {
828 id_a.cmp(id_b)
829 } else {
830 score_cmp
831 }
832 });
833 }
834 SelectionStrategy::RoundRobin => {
835 if !sorted.is_empty() {
837 let idx = self.round_robin_idx % sorted.len();
838 sorted.rotate_left(idx);
839 self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
840 }
841 }
842 SelectionStrategy::Random => {
843 }
846 }
847
848 sorted.into_iter().map(|(id, _)| id).collect()
849 }
850
851 pub fn select_best(&mut self) -> Option<String> {
853 self.select_peers().into_iter().next()
854 }
855
856 pub fn select_top(&mut self, n: usize) -> Vec<String> {
858 self.select_peers().into_iter().take(n).collect()
859 }
860
861 pub fn summary(&self) -> SelectorSummary {
863 let count = self.stats.len();
864 if count == 0 {
865 return SelectorSummary::default();
866 }
867
868 let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
869 let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
870 let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
871 let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
872
873 let avg_rtt = {
874 let rtts: Vec<f64> = self
875 .stats
876 .values()
877 .filter(|s| s.srtt_ms > 0.0)
878 .map(|s| s.srtt_ms)
879 .collect();
880 if rtts.is_empty() {
881 0.0
882 } else {
883 rtts.iter().sum::<f64>() / rtts.len() as f64
884 }
885 };
886
887 SelectorSummary {
888 peer_count: count,
889 total_requests,
890 total_successes,
891 total_timeouts,
892 backed_off_count: backed_off,
893 avg_rtt_ms: avg_rtt,
894 overall_success_rate: if total_requests > 0 {
895 total_successes as f64 / total_requests as f64
896 } else {
897 0.0
898 },
899 }
900 }
901
902 pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
904 let mut by_principal = self.persisted_metadata.clone();
905 for stats in self.stats.values() {
906 let principal = peer_principal(&stats.peer_id).to_string();
907 by_principal.insert(
908 principal.clone(),
909 PersistedPeerMetadata::from_stats(principal, stats),
910 );
911 }
912
913 let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
914 peers.sort_by(|a, b| a.principal.cmp(&b.principal));
915
916 PeerMetadataSnapshot {
917 version: PEER_METADATA_SNAPSHOT_VERSION,
918 peers,
919 }
920 }
921
922 pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
924 if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
925 return;
926 }
927
928 self.persisted_metadata.clear();
929 for peer in &snapshot.peers {
930 self.persisted_metadata
931 .insert(peer.principal.clone(), peer.clone());
932 }
933
934 for stats in self.stats.values_mut() {
935 if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
936 saved.apply_to_stats(stats);
937 }
938 }
939 }
940}
941
942#[derive(Debug, Clone, Default)]
944pub struct SelectorSummary {
945 pub peer_count: usize,
946 pub total_requests: u64,
947 pub total_successes: u64,
948 pub total_timeouts: u64,
949 pub backed_off_count: usize,
950 pub avg_rtt_ms: f64,
951 pub overall_success_rate: f64,
952}
953
954#[cfg(test)]
955mod tests {
956 use super::*;
957 use std::thread::sleep;
958
959 #[test]
960 fn test_peer_stats_success_rate() {
961 let mut stats = PeerStats::new("peer1");
962 assert_eq!(stats.success_rate(), 0.5); stats.record_request(40);
965 stats.record_success(50, 1024);
966 assert_eq!(stats.success_rate(), 1.0);
967
968 stats.record_request(40);
969 stats.record_timeout();
970 assert_eq!(stats.success_rate(), 0.5);
971 }
972
973 #[test]
974 fn test_peer_stats_rtt_calculation() {
975 let mut stats = PeerStats::new("peer1");
976
977 stats.record_request(40);
979 stats.record_success(100, 1024);
980 assert_eq!(stats.srtt_ms, 100.0);
981 assert_eq!(stats.rttvar_ms, 50.0); stats.record_request(40);
985 stats.record_success(80, 1024);
986 assert!((stats.srtt_ms - 97.5).abs() < 0.1);
988 }
989
990 #[test]
991 fn test_peer_stats_backoff() {
992 let mut stats = PeerStats::new("peer1");
993 assert!(!stats.is_backed_off());
994
995 stats.record_timeout();
996 assert!(stats.is_backed_off());
997 assert!(stats.backoff_remaining() > Duration::ZERO);
998 }
999
1000 #[test]
1001 fn test_peer_stats_backoff_clears_on_success() {
1002 let mut stats = PeerStats::new("peer1");
1003 stats.record_timeout();
1004 assert!(stats.is_backed_off());
1005
1006 stats.record_success(50, 1024);
1007 assert!(!stats.is_backed_off());
1008 assert_eq!(stats.backoff_level, 0);
1009 }
1010
1011 #[test]
1012 fn test_peer_selector_add_remove() {
1013 let mut selector = PeerSelector::new();
1014 selector.add_peer("peer1");
1015 selector.add_peer("peer2");
1016 assert!(selector.get_stats("peer1").is_some());
1017 assert!(selector.get_stats("peer2").is_some());
1018
1019 selector.remove_peer("peer1");
1020 assert!(selector.get_stats("peer1").is_none());
1021 assert!(selector.get_stats("peer2").is_some());
1022 }
1023
1024 #[test]
1025 fn test_peer_selector_weighted_selection() {
1026 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1027 selector.add_peer("peer1");
1028 selector.add_peer("peer2");
1029 selector.add_peer("peer3");
1030
1031 selector.record_request("peer1", 40);
1033 selector.record_success("peer1", 20, 1024);
1034 selector.record_request("peer1", 40);
1035 selector.record_success("peer1", 25, 1024);
1036
1037 selector.record_request("peer2", 40);
1039 selector.record_success("peer2", 100, 1024);
1040 selector.record_request("peer2", 40);
1041 selector.record_timeout("peer2");
1042
1043 selector.record_request("peer3", 40);
1045 selector.record_timeout("peer3");
1046 selector.record_request("peer3", 40);
1047 selector.record_timeout("peer3");
1048
1049 let peers = selector.select_peers();
1051 assert_eq!(peers[0], "peer1");
1053 }
1054
1055 #[test]
1056 fn test_peer_selector_backed_off_peers() {
1057 let mut selector = PeerSelector::new();
1058 selector.add_peer("peer1");
1059 selector.add_peer("peer2");
1060
1061 selector.record_timeout("peer1");
1063 assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1064
1065 let peers = selector.select_peers();
1067 assert_eq!(peers.len(), 1);
1068 assert_eq!(peers[0], "peer2");
1069 }
1070
1071 #[test]
1072 fn test_peer_selector_all_backed_off_fallback() {
1073 let mut selector = PeerSelector::new();
1074 selector.add_peer("peer1");
1075 selector.add_peer("peer2");
1076
1077 selector.record_timeout("peer1");
1079 selector.record_timeout("peer2");
1080
1081 let peers = selector.select_peers();
1083 assert_eq!(peers.len(), 2);
1084 }
1085
1086 #[test]
1087 fn test_peer_selector_fairness() {
1088 let mut selector = PeerSelector::new();
1089 selector.set_fairness(true);
1090
1091 for i in 1..=6 {
1093 selector.add_peer(format!("peer{}", i));
1094 }
1095
1096 sleep(Duration::from_millis(15));
1098
1099 for _ in 0..100 {
1100 selector.record_request("peer1", 40);
1101 selector.record_success("peer1", 10, 100);
1102 }
1103
1104 for i in 2..=6 {
1106 selector.record_request(&format!("peer{}", i), 40);
1107 selector.record_success(&format!("peer{}", i), 10, 100);
1108 }
1109
1110 let skipped = selector.should_skip_for_fairness("peer1");
1112 let _ = skipped; }
1114
1115 #[test]
1116 fn test_peer_selector_summary() {
1117 let mut selector = PeerSelector::new();
1118 selector.add_peer("peer1");
1119 selector.add_peer("peer2");
1120
1121 selector.record_request("peer1", 40);
1122 selector.record_success("peer1", 50, 1024);
1123 selector.record_request("peer2", 40);
1124 selector.record_timeout("peer2");
1125
1126 let summary = selector.summary();
1127 assert_eq!(summary.peer_count, 2);
1128 assert_eq!(summary.total_requests, 2);
1129 assert_eq!(summary.total_successes, 1);
1130 assert_eq!(summary.total_timeouts, 1);
1131 assert_eq!(summary.backed_off_count, 1);
1132 assert_eq!(summary.overall_success_rate, 0.5);
1133 }
1134
1135 #[test]
1136 fn test_peer_stats_score() {
1137 let mut stats = PeerStats::new("peer1");
1138
1139 let initial_score = stats.score();
1141 assert!(initial_score > 0.3 && initial_score < 0.7);
1142
1143 for _ in 0..10 {
1145 stats.record_request(40);
1146 stats.record_success(20, 1024);
1147 }
1148 let good_score = stats.score();
1149 assert!(good_score > 0.8);
1150
1151 let mut bad_stats = PeerStats::new("peer2");
1153 for _ in 0..10 {
1154 bad_stats.record_request(40);
1155 bad_stats.record_timeout();
1156 }
1157 let bad_score = bad_stats.score();
1158 assert!(bad_score < 0.3);
1159
1160 assert!(good_score > bad_score);
1161 }
1162
1163 #[test]
1164 fn test_peer_stats_utility_score_prefers_good_over_bad() {
1165 let mut good = PeerStats::new("good");
1166 good.requests_sent = 120;
1167 good.successes = 96;
1168 good.failures = 8;
1169 good.timeouts = 4;
1170 good.srtt_ms = 30.0;
1171 good.bytes_sent = 120 * 40;
1172 good.bytes_received = 96 * 1024;
1173
1174 let mut bad = PeerStats::new("bad");
1175 bad.requests_sent = 120;
1176 bad.successes = 40;
1177 bad.failures = 50;
1178 bad.timeouts = 30;
1179 bad.srtt_ms = 220.0;
1180 bad.bytes_sent = 120 * 40;
1181 bad.bytes_received = 40 * 1024;
1182
1183 let total_requests = good.requests_sent + bad.requests_sent;
1184 assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1185 }
1186
1187 #[test]
1188 fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
1189 let mut reciprocal = PeerStats::new("reciprocal");
1190 reciprocal.requests_sent = 100;
1191 reciprocal.successes = 90;
1192 reciprocal.failures = 5;
1193 reciprocal.timeouts = 5;
1194 reciprocal.srtt_ms = 40.0;
1195 reciprocal.bytes_sent = 100 * 40;
1196 reciprocal.bytes_received = 90 * 1024;
1197
1198 let mut leecher = PeerStats::new("leecher");
1199 leecher.requests_sent = 100;
1200 leecher.successes = 40;
1201 leecher.failures = 30;
1202 leecher.timeouts = 30;
1203 leecher.srtt_ms = 120.0;
1204 leecher.bytes_sent = 100 * 40;
1205 leecher.bytes_received = 10 * 1024;
1206
1207 let total_requests = reciprocal.requests_sent + leecher.requests_sent;
1208 assert!(
1209 reciprocal.tit_for_tat_score(total_requests)
1210 > leecher.tit_for_tat_score(total_requests)
1211 );
1212 }
1213
1214 #[test]
1215 fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1216 let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1217 selector.add_peer("stable");
1218 selector.add_peer("new");
1219
1220 {
1221 let stable = selector.get_stats_mut("stable").unwrap();
1222 stable.requests_sent = 500;
1223 stable.successes = 450;
1224 stable.failures = 35;
1225 stable.timeouts = 15;
1226 stable.srtt_ms = 35.0;
1227 stable.bytes_sent = 500 * 40;
1228 stable.bytes_received = 450 * 1024;
1229 }
1230 {
1231 let new_peer = selector.get_stats_mut("new").unwrap();
1232 new_peer.requests_sent = 2;
1233 new_peer.successes = 2;
1234 new_peer.failures = 0;
1235 new_peer.timeouts = 0;
1236 new_peer.srtt_ms = 70.0;
1237 new_peer.bytes_sent = 2 * 40;
1238 new_peer.bytes_received = 2 * 1024;
1239 }
1240
1241 let peers = selector.select_peers();
1242 assert_eq!(peers[0], "new");
1243 }
1244
1245 #[test]
1246 fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
1247 let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
1248 selector.add_peer("reciprocal");
1249 selector.add_peer("leecher");
1250
1251 {
1252 let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
1253 reciprocal.requests_sent = 120;
1254 reciprocal.successes = 102;
1255 reciprocal.failures = 8;
1256 reciprocal.timeouts = 10;
1257 reciprocal.srtt_ms = 45.0;
1258 reciprocal.bytes_sent = 120 * 40;
1259 reciprocal.bytes_received = 102 * 1024;
1260 }
1261 {
1262 let leecher = selector.get_stats_mut("leecher").unwrap();
1263 leecher.requests_sent = 120;
1264 leecher.successes = 70;
1265 leecher.failures = 20;
1266 leecher.timeouts = 30;
1267 leecher.srtt_ms = 35.0;
1268 leecher.bytes_sent = 120 * 40;
1269 leecher.bytes_received = 8 * 1024;
1270 }
1271
1272 let peers = selector.select_peers();
1273 assert_eq!(peers[0], "reciprocal");
1274 }
1275
1276 #[test]
1277 fn test_lowest_latency_strategy() {
1278 let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1279 selector.add_peer("peer1");
1280 selector.add_peer("peer2");
1281 selector.add_peer("peer3");
1282
1283 selector.record_request("peer1", 40);
1285 selector.record_success("peer1", 100, 1024);
1286
1287 selector.record_request("peer2", 40);
1289 selector.record_success("peer2", 20, 1024);
1290
1291 selector.record_request("peer3", 40);
1293 selector.record_success("peer3", 50, 1024);
1294
1295 let peers = selector.select_peers();
1296 assert_eq!(peers[0], "peer2");
1298 }
1299
1300 fn build_cashu_priority_fixture() -> PeerSelector {
1301 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1302 selector.add_peer("reliable");
1303 selector.add_peer("paid");
1304
1305 {
1306 let reliable = selector.get_stats_mut("reliable").expect("reliable");
1307 reliable.requests_sent = 80;
1308 reliable.successes = 75;
1309 reliable.failures = 2;
1310 reliable.timeouts = 3;
1311 reliable.srtt_ms = 40.0;
1312 reliable.bytes_sent = 80 * 40;
1313 reliable.bytes_received = 75 * 1024;
1314 }
1315 {
1316 let paid = selector.get_stats_mut("paid").expect("paid");
1317 paid.requests_sent = 80;
1318 paid.successes = 36;
1319 paid.failures = 24;
1320 paid.timeouts = 20;
1321 paid.srtt_ms = 700.0;
1322 paid.bytes_sent = 80 * 40;
1323 paid.bytes_received = 36 * 512;
1324 }
1325
1326 selector
1327 }
1328
1329 #[test]
1330 fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1331 let mut selector = build_cashu_priority_fixture();
1332 selector.set_cashu_payment_weight(0.0);
1333 selector.record_cashu_payment("paid", 5_000);
1334
1335 let peers = selector.select_peers();
1336 assert_eq!(peers[0], "reliable");
1337 }
1338
1339 #[test]
1340 fn test_cashu_payment_weight_prioritizes_paid_peer() {
1341 let mut selector = build_cashu_priority_fixture();
1342 selector.set_cashu_payment_weight(0.8);
1343 selector.record_cashu_payment("paid", 5_000);
1344
1345 let peers = selector.select_peers();
1346 assert_eq!(peers[0], "paid");
1347 }
1348
1349 #[test]
1350 fn test_cashu_payment_default_downranks_peer() {
1351 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1352 selector.add_peer("honest");
1353 selector.add_peer("delinquent");
1354
1355 for peer_id in ["honest", "delinquent"] {
1356 let stats = selector.get_stats_mut(peer_id).expect("stats");
1357 stats.requests_sent = 40;
1358 stats.successes = 34;
1359 stats.failures = 3;
1360 stats.timeouts = 3;
1361 stats.srtt_ms = 60.0;
1362 stats.bytes_sent = 40 * 40;
1363 stats.bytes_received = 34 * 1024;
1364 }
1365
1366 selector.record_cashu_payment_default("delinquent");
1367
1368 let peers = selector.select_peers();
1369 assert_eq!(peers[0], "honest");
1370 assert!(!peers.iter().any(|peer| peer == "delinquent"));
1371 }
1372
1373 #[test]
1374 fn test_payment_default_threshold_blocks_peer() {
1375 let mut selector = PeerSelector::new();
1376 selector.record_cashu_payment_default("peer-a");
1377 assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1378 assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1379 }
1380
1381 #[test]
1382 fn test_peer_principal_matches_peer_id() {
1383 assert_eq!(peer_principal("npub1abc"), "npub1abc");
1384 assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1385 }
1386
1387 #[test]
1388 fn test_metadata_snapshot_restores_for_same_peer_id() {
1389 let mut selector = PeerSelector::new();
1390 selector.add_peer("npub1stable");
1391 selector.record_request("npub1stable", 64);
1392 selector.record_success("npub1stable", 32, 1024);
1393 selector.record_cashu_payment("npub1stable", 77);
1394 selector.record_cashu_receipt("npub1stable", 33);
1395 selector.record_cashu_payment_default("npub1stable");
1396
1397 let snapshot = selector.export_peer_metadata_snapshot();
1398 assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1399 assert_eq!(snapshot.peers.len(), 1);
1400 assert_eq!(snapshot.peers[0].principal, "npub1stable");
1401
1402 let mut restored = PeerSelector::new();
1403 restored.import_peer_metadata_snapshot(&snapshot);
1404 restored.add_peer("npub1stable");
1405 let stats = restored.get_stats("npub1stable").expect("restored stats");
1406 assert_eq!(stats.requests_sent, 1);
1407 assert_eq!(stats.successes, 1);
1408 assert_eq!(stats.cashu_paid_sat, 77);
1409 assert_eq!(stats.cashu_received_sat, 33);
1410 assert_eq!(stats.cashu_payment_receipts, 1);
1411 assert_eq!(stats.cashu_payment_defaults, 1);
1412 }
1413}