1use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::time::Instant;
14
15const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; const SELECTION_MIN_PEERS: usize = 5; const INITIAL_BACKOFF_MS: u64 = 1000; const BACKOFF_MULTIPLIER: u64 = 2; const MAX_BACKOFF_MS: u64 = 480_000; const MIN_RTO_MS: u64 = 50; const MAX_RTO_MS: u64 = 60_000; const INITIAL_RTO_MS: u64 = 1000; pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
37pub struct PersistedPeerMetadata {
38 pub principal: String,
40 pub requests_sent: u64,
41 pub successes: u64,
42 pub timeouts: u64,
43 pub failures: u64,
44 pub srtt_ms: f64,
45 pub rttvar_ms: f64,
46 pub rto_ms: u64,
47 pub bytes_received: u64,
48 pub bytes_sent: u64,
49 pub cashu_paid_sat: u64,
50 pub cashu_received_sat: u64,
51 pub cashu_payment_receipts: u64,
52 pub cashu_payment_defaults: u64,
53}
54
55impl PersistedPeerMetadata {
56 fn from_stats(principal: String, stats: &PeerStats) -> Self {
57 Self {
58 principal,
59 requests_sent: stats.requests_sent,
60 successes: stats.successes,
61 timeouts: stats.timeouts,
62 failures: stats.failures,
63 srtt_ms: sanitize_latency(stats.srtt_ms),
64 rttvar_ms: sanitize_latency(stats.rttvar_ms),
65 rto_ms: clamp_rto(stats.rto_ms),
66 bytes_received: stats.bytes_received,
67 bytes_sent: stats.bytes_sent,
68 cashu_paid_sat: stats.cashu_paid_sat,
69 cashu_received_sat: stats.cashu_received_sat,
70 cashu_payment_receipts: stats.cashu_payment_receipts,
71 cashu_payment_defaults: stats.cashu_payment_defaults,
72 }
73 }
74
75 fn apply_to_stats(&self, stats: &mut PeerStats) {
76 stats.requests_sent = self.requests_sent;
77 stats.successes = self.successes;
78 stats.timeouts = self.timeouts;
79 stats.failures = self.failures;
80 stats.srtt_ms = sanitize_latency(self.srtt_ms);
81 stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
82 stats.rto_ms = clamp_rto(self.rto_ms);
83 stats.bytes_received = self.bytes_received;
84 stats.bytes_sent = self.bytes_sent;
85 stats.cashu_paid_sat = self.cashu_paid_sat;
86 stats.cashu_received_sat = self.cashu_received_sat;
87 stats.cashu_payment_receipts = self.cashu_payment_receipts;
88 stats.cashu_payment_defaults = self.cashu_payment_defaults;
89
90 stats.backoff_level = 0;
92 stats.backed_off_until = None;
93 stats.last_success = None;
94 stats.last_failure = None;
95 stats.consecutive_rto_backoffs = 0;
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct PeerMetadataSnapshot {
102 pub version: u32,
103 pub peers: Vec<PersistedPeerMetadata>,
104}
105
106impl Default for PeerMetadataSnapshot {
107 fn default() -> Self {
108 Self {
109 version: PEER_METADATA_SNAPSHOT_VERSION,
110 peers: Vec::new(),
111 }
112 }
113}
114
115fn sanitize_latency(value: f64) -> f64 {
116 if value.is_finite() && value >= 0.0 {
117 value
118 } else {
119 0.0
120 }
121}
122
123fn compute_backoff_ms(level: u32) -> u64 {
124 if level == 0 {
125 return 0;
126 }
127
128 let mut backoff_ms = INITIAL_BACKOFF_MS;
129 for _ in 1..level {
130 backoff_ms = backoff_ms.saturating_mul(BACKOFF_MULTIPLIER);
131 if backoff_ms >= MAX_BACKOFF_MS {
132 return MAX_BACKOFF_MS;
133 }
134 }
135
136 backoff_ms.min(MAX_BACKOFF_MS)
137}
138
139fn clamp_rto(rto_ms: u64) -> u64 {
140 if rto_ms == 0 {
141 INITIAL_RTO_MS
142 } else {
143 rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
144 }
145}
146
147pub fn peer_principal(peer_id: &str) -> &str {
152 peer_id
153}
154
155#[derive(Debug, Clone)]
157pub struct PeerStats {
158 pub peer_id: String,
160 pub connected_at: Instant,
162 pub requests_sent: u64,
164 pub successes: u64,
166 pub timeouts: u64,
168 pub failures: u64,
170 pub srtt_ms: f64,
172 pub rttvar_ms: f64,
174 pub rto_ms: u64,
176 pub consecutive_rto_backoffs: u32,
178 pub backoff_level: u32,
180 pub backed_off_until: Option<Instant>,
182 pub last_success: Option<Instant>,
184 pub last_failure: Option<Instant>,
186 pub bytes_received: u64,
188 pub bytes_sent: u64,
190 pub cashu_paid_sat: u64,
192 pub cashu_received_sat: u64,
194 pub cashu_payment_receipts: u64,
196 pub cashu_payment_defaults: u64,
198}
199
200impl PeerStats {
201 pub fn new(peer_id: impl Into<String>) -> Self {
203 Self {
204 peer_id: peer_id.into(),
205 connected_at: Instant::now(),
206 requests_sent: 0,
207 successes: 0,
208 timeouts: 0,
209 failures: 0,
210 srtt_ms: 0.0,
211 rttvar_ms: 0.0,
212 rto_ms: INITIAL_RTO_MS,
213 consecutive_rto_backoffs: 0,
214 backoff_level: 0,
215 backed_off_until: None,
216 last_success: None,
217 last_failure: None,
218 bytes_received: 0,
219 bytes_sent: 0,
220 cashu_paid_sat: 0,
221 cashu_received_sat: 0,
222 cashu_payment_receipts: 0,
223 cashu_payment_defaults: 0,
224 }
225 }
226
227 pub fn success_rate(&self) -> f64 {
229 if self.requests_sent == 0 {
230 return 0.5; }
232 self.successes as f64 / self.requests_sent as f64
233 }
234
235 pub fn selection_rate(&self) -> f64 {
237 let elapsed = self.connected_at.elapsed();
238 if elapsed.as_secs() < 10 {
239 return 0.0; }
241 self.requests_sent as f64 / elapsed.as_secs_f64()
242 }
243
244 pub fn is_backed_off(&self) -> bool {
246 if let Some(until) = self.backed_off_until {
247 Instant::now() < until
248 } else {
249 false
250 }
251 }
252
253 pub fn backoff_remaining(&self) -> Duration {
255 if let Some(until) = self.backed_off_until {
256 let now = Instant::now();
257 if now < until {
258 return until - now;
259 }
260 }
261 Duration::ZERO
262 }
263
264 pub fn record_request(&mut self, bytes: u64) {
266 self.requests_sent += 1;
267 self.bytes_sent += bytes;
268 }
269
270 pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
273 self.successes += 1;
274 self.bytes_received += bytes;
275 self.last_success = Some(Instant::now());
276 self.consecutive_rto_backoffs = 0;
277
278 self.backed_off_until = None;
280 self.backoff_level = 0;
281
282 let rtt = rtt_ms as f64;
284 if self.srtt_ms == 0.0 {
285 self.srtt_ms = rtt;
287 self.rttvar_ms = rtt / 2.0;
288 } else {
289 self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
294 self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
295 }
296
297 let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
299 self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
300 }
301
302 pub fn record_timeout(&mut self) {
304 self.timeouts += 1;
305 self.last_failure = Some(Instant::now());
306
307 self.apply_backoff();
309
310 if self.consecutive_rto_backoffs < 5 {
312 self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
313 self.consecutive_rto_backoffs += 1;
314 }
315 }
316
317 pub fn record_failure(&mut self) {
319 self.failures += 1;
320 self.last_failure = Some(Instant::now());
321 self.apply_backoff();
322 }
323
324 pub fn record_cashu_payment(&mut self, amount_sat: u64) {
326 if amount_sat == 0 {
327 return;
328 }
329 self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
330 }
331
332 pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
334 if amount_sat == 0 {
335 return;
336 }
337 self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
338 self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
339 }
340
341 pub fn record_cashu_payment_default(&mut self) {
343 self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
344 self.last_failure = Some(Instant::now());
345 self.apply_backoff();
346 }
347
348 fn apply_backoff(&mut self) {
350 self.backoff_level += 1;
351 let backoff_ms = compute_backoff_ms(self.backoff_level);
352 self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
353 }
354
355 pub fn score(&self) -> f64 {
358 let success_score = self.success_rate();
360
361 let rtt_score = if self.srtt_ms <= 0.0 {
364 0.5 } else {
366 (500.0 / (self.srtt_ms + 50.0)).min(1.0)
367 };
368
369 let recency_bonus = if let Some(last) = self.last_success {
371 let secs_ago = last.elapsed().as_secs_f64();
372 if secs_ago < 60.0 {
373 0.1 } else {
375 0.0
376 }
377 } else {
378 0.0
379 };
380
381 0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
384 }
385
386 pub fn utility_score(&self, total_requests: u64) -> f64 {
394 let good = self.successes as f64 + 1.0;
395 let bad = (self.failures + self.timeouts) as f64 + 1.0;
396 let ratio = good / bad;
397 let ratio_score = ratio / (1.0 + ratio);
398
399 let latency_score = if self.srtt_ms <= 0.0 {
400 0.5
401 } else {
402 (300.0 / (self.srtt_ms + 50.0)).min(1.0)
403 };
404
405 let efficiency_score = if self.bytes_sent == 0 {
406 0.5
407 } else {
408 (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
409 };
410
411 let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
412
413 let uncertainty =
414 (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
415 let exploration_bonus = 0.20 * uncertainty;
416
417 exploitation + exploration_bonus
418 }
419
420 pub fn cashu_priority_boost(&self) -> f64 {
422 if self.cashu_paid_sat == 0 {
423 return 0.0;
424 }
425 let paid = self.cashu_paid_sat as f64;
426 paid / (paid + 32.0)
427 }
428
429 pub fn payment_reliability_multiplier(&self) -> f64 {
432 if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
433 return 1.0;
434 }
435 (self.cashu_payment_receipts as f64 + 1.0)
436 / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
437 }
438
439 pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
440 threshold > 0 && self.cashu_payment_defaults >= threshold
441 }
442}
443
444#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
446pub enum SelectionStrategy {
447 #[default]
449 Weighted,
450 RoundRobin,
452 Random,
454 LowestLatency,
456 HighestSuccessRate,
458 UtilityUcb,
460}
461
462#[derive(Debug, Default)]
470pub struct PeerSelector {
471 stats: HashMap<String, PeerStats>,
473 persisted_metadata: HashMap<String, PersistedPeerMetadata>,
475 strategy: SelectionStrategy,
477 fairness_enabled: bool,
479 round_robin_idx: usize,
481 cashu_payment_weight: f64,
483}
484
485impl PeerSelector {
486 pub fn new() -> Self {
488 Self {
489 stats: HashMap::new(),
490 persisted_metadata: HashMap::new(),
491 strategy: SelectionStrategy::Weighted,
492 fairness_enabled: true,
493 round_robin_idx: 0,
494 cashu_payment_weight: 0.0,
495 }
496 }
497
498 pub fn with_strategy(strategy: SelectionStrategy) -> Self {
500 Self {
501 stats: HashMap::new(),
502 persisted_metadata: HashMap::new(),
503 strategy,
504 fairness_enabled: true,
505 round_robin_idx: 0,
506 cashu_payment_weight: 0.0,
507 }
508 }
509
510 pub fn set_fairness(&mut self, enabled: bool) {
512 self.fairness_enabled = enabled;
513 }
514
515 pub fn set_cashu_payment_weight(&mut self, weight: f64) {
518 self.cashu_payment_weight = weight.clamp(0.0, 1.0);
519 }
520
521 pub fn add_peer(&mut self, peer_id: impl Into<String>) {
523 let peer_id = peer_id.into();
524 if self.stats.contains_key(&peer_id) {
525 return;
526 }
527
528 let mut stats = PeerStats::new(peer_id.clone());
529 if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
530 saved.apply_to_stats(&mut stats);
531 }
532 self.stats.insert(peer_id, stats);
533 }
534
535 pub fn remove_peer(&mut self, peer_id: &str) {
537 if let Some(stats) = self.stats.remove(peer_id) {
538 let principal = peer_principal(&stats.peer_id).to_string();
539 self.persisted_metadata.insert(
540 principal.clone(),
541 PersistedPeerMetadata::from_stats(principal, &stats),
542 );
543 }
544 }
545
546 pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
548 self.stats.get(peer_id)
549 }
550
551 pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
553 self.stats.get_mut(peer_id)
554 }
555
556 pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
558 self.stats.values()
559 }
560
561 pub fn is_peer_backed_off(&self, peer_id: &str) -> bool {
563 self.stats
564 .get(peer_id)
565 .is_some_and(PeerStats::is_backed_off)
566 }
567
568 pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
570 if let Some(stats) = self.stats.get_mut(peer_id) {
571 stats.record_request(bytes);
572 }
573 }
574
575 pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
577 if let Some(stats) = self.stats.get_mut(peer_id) {
578 stats.record_success(rtt_ms, bytes);
579 }
580 }
581
582 pub fn record_timeout(&mut self, peer_id: &str) {
584 if let Some(stats) = self.stats.get_mut(peer_id) {
585 stats.record_timeout();
586 }
587 }
588
589 pub fn record_failure(&mut self, peer_id: &str) {
591 if let Some(stats) = self.stats.get_mut(peer_id) {
592 stats.record_failure();
593 }
594 }
595
596 pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
598 if amount_sat == 0 {
599 return;
600 }
601 let entry = self
602 .stats
603 .entry(peer_id.to_string())
604 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
605 entry.record_cashu_payment(amount_sat);
606 }
607
608 pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
610 if amount_sat == 0 {
611 return;
612 }
613 let entry = self
614 .stats
615 .entry(peer_id.to_string())
616 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
617 entry.record_cashu_receipt(amount_sat);
618 }
619
620 pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
622 let entry = self
623 .stats
624 .entry(peer_id.to_string())
625 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
626 entry.record_cashu_payment_default();
627 }
628
629 pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
630 self.stats
631 .get(peer_id)
632 .map(|stats| stats.exceeds_payment_default_threshold(threshold))
633 .unwrap_or(false)
634 }
635
636 fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
637 let reliable_base = base_score * stats.payment_reliability_multiplier();
638 if self.cashu_payment_weight <= 0.0 {
639 return reliable_base;
640 }
641 let payment_score = stats.cashu_priority_boost();
642 (1.0 - self.cashu_payment_weight) * reliable_base
643 + self.cashu_payment_weight * payment_score
644 }
645
646 fn available_peers(&self) -> Vec<String> {
648 self.stats
649 .iter()
650 .filter(|(_, s)| !s.is_backed_off())
651 .map(|(id, _)| id.clone())
652 .collect()
653 }
654
655 #[cfg(test)]
657 fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
658 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
659 self.should_skip_for_fairness_with_total(peer_id, total_rate)
660 }
661
662 fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
663 if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
664 return false;
665 }
666
667 if let Some(stats) = self.stats.get(peer_id) {
669 let peer_rate = stats.selection_rate();
670 let proportion = peer_rate / total_rate;
671 return proportion > SELECTION_PERCENTAGE_WARNING;
672 }
673
674 false
675 }
676
677 pub fn select_peers(&mut self) -> Vec<String> {
682 let available = self.available_peers();
683 if available.is_empty() {
684 let mut backed_off: Vec<_> = self
687 .stats
688 .iter()
689 .filter(|(_, s)| s.is_backed_off())
690 .map(|(id, s)| (id.clone(), s.backoff_remaining()))
691 .collect();
692 backed_off.sort_by_key(|(_, remaining)| *remaining);
693 return backed_off.into_iter().map(|(id, _)| id).collect();
694 }
695
696 let candidates: Vec<String> =
698 if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
699 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
700 available
701 .into_iter()
702 .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
703 .collect()
704 } else {
705 available
706 };
707
708 let candidates = if candidates.is_empty() {
710 self.available_peers()
711 } else {
712 candidates
713 };
714
715 let mut sorted: Vec<_> = candidates
717 .into_iter()
718 .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
719 .collect();
720
721 match self.strategy {
722 SelectionStrategy::Weighted => {
723 sorted.sort_by(|(id_a, a), (id_b, b)| {
725 let score_a = self.blend_with_payment_priority(a, a.score());
726 let score_b = self.blend_with_payment_priority(b, b.score());
727 let score_cmp = score_b
728 .partial_cmp(&score_a)
729 .unwrap_or(std::cmp::Ordering::Equal);
730 if score_cmp == std::cmp::Ordering::Equal {
731 id_a.cmp(id_b) } else {
733 score_cmp
734 }
735 });
736 }
737 SelectionStrategy::LowestLatency => {
738 sorted.sort_by(|(id_a, a), (id_b, b)| {
740 let rtt_cmp = a
741 .srtt_ms
742 .partial_cmp(&b.srtt_ms)
743 .unwrap_or(std::cmp::Ordering::Equal);
744 if rtt_cmp == std::cmp::Ordering::Equal {
745 let score_cmp = b
746 .score()
747 .partial_cmp(&a.score())
748 .unwrap_or(std::cmp::Ordering::Equal);
749 if score_cmp == std::cmp::Ordering::Equal {
750 id_a.cmp(id_b)
751 } else {
752 score_cmp
753 }
754 } else {
755 rtt_cmp
756 }
757 });
758 }
759 SelectionStrategy::HighestSuccessRate => {
760 sorted.sort_by(|(id_a, a), (id_b, b)| {
762 let rate_cmp = b
763 .success_rate()
764 .partial_cmp(&a.success_rate())
765 .unwrap_or(std::cmp::Ordering::Equal);
766 if rate_cmp == std::cmp::Ordering::Equal {
767 id_a.cmp(id_b)
768 } else {
769 rate_cmp
770 }
771 });
772 }
773 SelectionStrategy::UtilityUcb => {
774 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
775 sorted.sort_by(|(id_a, a), (id_b, b)| {
776 let score_a =
777 self.blend_with_payment_priority(a, a.utility_score(total_requests));
778 let score_b =
779 self.blend_with_payment_priority(b, b.utility_score(total_requests));
780 let score_cmp = score_b
781 .partial_cmp(&score_a)
782 .unwrap_or(std::cmp::Ordering::Equal);
783 if score_cmp == std::cmp::Ordering::Equal {
784 id_a.cmp(id_b)
785 } else {
786 score_cmp
787 }
788 });
789 }
790 SelectionStrategy::RoundRobin => {
791 if !sorted.is_empty() {
793 let idx = self.round_robin_idx % sorted.len();
794 sorted.rotate_left(idx);
795 self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
796 }
797 }
798 SelectionStrategy::Random => {
799 }
802 }
803
804 sorted.into_iter().map(|(id, _)| id).collect()
805 }
806
807 pub fn select_best(&mut self) -> Option<String> {
809 self.select_peers().into_iter().next()
810 }
811
812 pub fn select_top(&mut self, n: usize) -> Vec<String> {
814 self.select_peers().into_iter().take(n).collect()
815 }
816
817 pub fn summary(&self) -> SelectorSummary {
819 let count = self.stats.len();
820 if count == 0 {
821 return SelectorSummary::default();
822 }
823
824 let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
825 let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
826 let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
827 let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
828
829 let avg_rtt = {
830 let rtts: Vec<f64> = self
831 .stats
832 .values()
833 .filter(|s| s.srtt_ms > 0.0)
834 .map(|s| s.srtt_ms)
835 .collect();
836 if rtts.is_empty() {
837 0.0
838 } else {
839 rtts.iter().sum::<f64>() / rtts.len() as f64
840 }
841 };
842
843 SelectorSummary {
844 peer_count: count,
845 total_requests,
846 total_successes,
847 total_timeouts,
848 backed_off_count: backed_off,
849 avg_rtt_ms: avg_rtt,
850 overall_success_rate: if total_requests > 0 {
851 total_successes as f64 / total_requests as f64
852 } else {
853 0.0
854 },
855 }
856 }
857
858 pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
860 let mut by_principal = self.persisted_metadata.clone();
861 for stats in self.stats.values() {
862 let principal = peer_principal(&stats.peer_id).to_string();
863 by_principal.insert(
864 principal.clone(),
865 PersistedPeerMetadata::from_stats(principal, stats),
866 );
867 }
868
869 let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
870 peers.sort_by(|a, b| a.principal.cmp(&b.principal));
871
872 PeerMetadataSnapshot {
873 version: PEER_METADATA_SNAPSHOT_VERSION,
874 peers,
875 }
876 }
877
878 pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
880 if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
881 return;
882 }
883
884 self.persisted_metadata.clear();
885 for peer in &snapshot.peers {
886 self.persisted_metadata
887 .insert(peer.principal.clone(), peer.clone());
888 }
889
890 for stats in self.stats.values_mut() {
891 if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
892 saved.apply_to_stats(stats);
893 }
894 }
895 }
896}
897
898#[derive(Debug, Clone, Default)]
900pub struct SelectorSummary {
901 pub peer_count: usize,
902 pub total_requests: u64,
903 pub total_successes: u64,
904 pub total_timeouts: u64,
905 pub backed_off_count: usize,
906 pub avg_rtt_ms: f64,
907 pub overall_success_rate: f64,
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use std::thread::sleep;
914
915 #[test]
916 fn test_peer_stats_success_rate() {
917 let mut stats = PeerStats::new("peer1");
918 assert_eq!(stats.success_rate(), 0.5); stats.record_request(40);
921 stats.record_success(50, 1024);
922 assert_eq!(stats.success_rate(), 1.0);
923
924 stats.record_request(40);
925 stats.record_timeout();
926 assert_eq!(stats.success_rate(), 0.5);
927 }
928
929 #[test]
930 fn test_peer_stats_rtt_calculation() {
931 let mut stats = PeerStats::new("peer1");
932
933 stats.record_request(40);
935 stats.record_success(100, 1024);
936 assert_eq!(stats.srtt_ms, 100.0);
937 assert_eq!(stats.rttvar_ms, 50.0); stats.record_request(40);
941 stats.record_success(80, 1024);
942 assert!((stats.srtt_ms - 97.5).abs() < 0.1);
944 }
945
946 #[test]
947 fn test_peer_stats_backoff() {
948 let mut stats = PeerStats::new("peer1");
949 assert!(!stats.is_backed_off());
950
951 stats.record_timeout();
952 assert!(stats.is_backed_off());
953 assert!(stats.backoff_remaining() > Duration::ZERO);
954 }
955
956 #[test]
957 fn test_peer_stats_backoff_clears_on_success() {
958 let mut stats = PeerStats::new("peer1");
959 stats.record_timeout();
960 assert!(stats.is_backed_off());
961
962 stats.record_success(50, 1024);
963 assert!(!stats.is_backed_off());
964 assert_eq!(stats.backoff_level, 0);
965 }
966
967 #[test]
968 fn test_peer_stats_backoff_saturates_without_overflow() {
969 let mut stats = PeerStats::new("peer1");
970
971 for _ in 0..128 {
972 stats.record_failure();
973 }
974
975 assert_eq!(compute_backoff_ms(stats.backoff_level), MAX_BACKOFF_MS);
976 assert!(stats.is_backed_off());
977 }
978
979 #[test]
980 fn test_peer_selector_add_remove() {
981 let mut selector = PeerSelector::new();
982 selector.add_peer("peer1");
983 selector.add_peer("peer2");
984 assert!(selector.get_stats("peer1").is_some());
985 assert!(selector.get_stats("peer2").is_some());
986
987 selector.remove_peer("peer1");
988 assert!(selector.get_stats("peer1").is_none());
989 assert!(selector.get_stats("peer2").is_some());
990 }
991
992 #[test]
993 fn test_peer_selector_weighted_selection() {
994 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
995 selector.add_peer("peer1");
996 selector.add_peer("peer2");
997 selector.add_peer("peer3");
998
999 selector.record_request("peer1", 40);
1001 selector.record_success("peer1", 20, 1024);
1002 selector.record_request("peer1", 40);
1003 selector.record_success("peer1", 25, 1024);
1004
1005 selector.record_request("peer2", 40);
1007 selector.record_success("peer2", 100, 1024);
1008 selector.record_request("peer2", 40);
1009 selector.record_timeout("peer2");
1010
1011 selector.record_request("peer3", 40);
1013 selector.record_timeout("peer3");
1014 selector.record_request("peer3", 40);
1015 selector.record_timeout("peer3");
1016
1017 let peers = selector.select_peers();
1019 assert_eq!(peers[0], "peer1");
1021 }
1022
1023 #[test]
1024 fn test_peer_selector_backed_off_peers() {
1025 let mut selector = PeerSelector::new();
1026 selector.add_peer("peer1");
1027 selector.add_peer("peer2");
1028
1029 selector.record_timeout("peer1");
1031 assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1032
1033 let peers = selector.select_peers();
1035 assert_eq!(peers.len(), 1);
1036 assert_eq!(peers[0], "peer2");
1037 }
1038
1039 #[test]
1040 fn test_peer_selector_all_backed_off_fallback() {
1041 let mut selector = PeerSelector::new();
1042 selector.add_peer("peer1");
1043 selector.add_peer("peer2");
1044
1045 selector.record_timeout("peer1");
1047 selector.record_timeout("peer2");
1048
1049 let peers = selector.select_peers();
1051 assert_eq!(peers.len(), 2);
1052 }
1053
1054 #[test]
1055 fn test_peer_selector_fairness() {
1056 let mut selector = PeerSelector::new();
1057 selector.set_fairness(true);
1058
1059 for i in 1..=6 {
1061 selector.add_peer(format!("peer{}", i));
1062 }
1063
1064 sleep(Duration::from_millis(15));
1066
1067 for _ in 0..100 {
1068 selector.record_request("peer1", 40);
1069 selector.record_success("peer1", 10, 100);
1070 }
1071
1072 for i in 2..=6 {
1074 selector.record_request(&format!("peer{}", i), 40);
1075 selector.record_success(&format!("peer{}", i), 10, 100);
1076 }
1077
1078 let skipped = selector.should_skip_for_fairness("peer1");
1080 let _ = skipped; }
1082
1083 #[test]
1084 fn test_peer_selector_summary() {
1085 let mut selector = PeerSelector::new();
1086 selector.add_peer("peer1");
1087 selector.add_peer("peer2");
1088
1089 selector.record_request("peer1", 40);
1090 selector.record_success("peer1", 50, 1024);
1091 selector.record_request("peer2", 40);
1092 selector.record_timeout("peer2");
1093
1094 let summary = selector.summary();
1095 assert_eq!(summary.peer_count, 2);
1096 assert_eq!(summary.total_requests, 2);
1097 assert_eq!(summary.total_successes, 1);
1098 assert_eq!(summary.total_timeouts, 1);
1099 assert_eq!(summary.backed_off_count, 1);
1100 assert_eq!(summary.overall_success_rate, 0.5);
1101 }
1102
1103 #[test]
1104 fn test_peer_stats_score() {
1105 let mut stats = PeerStats::new("peer1");
1106
1107 let initial_score = stats.score();
1109 assert!(initial_score > 0.3 && initial_score < 0.7);
1110
1111 for _ in 0..10 {
1113 stats.record_request(40);
1114 stats.record_success(20, 1024);
1115 }
1116 let good_score = stats.score();
1117 assert!(good_score > 0.8);
1118
1119 let mut bad_stats = PeerStats::new("peer2");
1121 for _ in 0..10 {
1122 bad_stats.record_request(40);
1123 bad_stats.record_timeout();
1124 }
1125 let bad_score = bad_stats.score();
1126 assert!(bad_score < 0.3);
1127
1128 assert!(good_score > bad_score);
1129 }
1130
1131 #[test]
1132 fn test_peer_stats_utility_score_prefers_good_over_bad() {
1133 let mut good = PeerStats::new("good");
1134 good.requests_sent = 120;
1135 good.successes = 96;
1136 good.failures = 8;
1137 good.timeouts = 4;
1138 good.srtt_ms = 30.0;
1139 good.bytes_sent = 120 * 40;
1140 good.bytes_received = 96 * 1024;
1141
1142 let mut bad = PeerStats::new("bad");
1143 bad.requests_sent = 120;
1144 bad.successes = 40;
1145 bad.failures = 50;
1146 bad.timeouts = 30;
1147 bad.srtt_ms = 220.0;
1148 bad.bytes_sent = 120 * 40;
1149 bad.bytes_received = 40 * 1024;
1150
1151 let total_requests = good.requests_sent + bad.requests_sent;
1152 assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1153 }
1154
1155 #[test]
1156 fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1157 let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1158 selector.add_peer("stable");
1159 selector.add_peer("new");
1160
1161 {
1162 let stable = selector.get_stats_mut("stable").unwrap();
1163 stable.requests_sent = 500;
1164 stable.successes = 450;
1165 stable.failures = 35;
1166 stable.timeouts = 15;
1167 stable.srtt_ms = 35.0;
1168 stable.bytes_sent = 500 * 40;
1169 stable.bytes_received = 450 * 1024;
1170 }
1171 {
1172 let new_peer = selector.get_stats_mut("new").unwrap();
1173 new_peer.requests_sent = 2;
1174 new_peer.successes = 2;
1175 new_peer.failures = 0;
1176 new_peer.timeouts = 0;
1177 new_peer.srtt_ms = 70.0;
1178 new_peer.bytes_sent = 2 * 40;
1179 new_peer.bytes_received = 2 * 1024;
1180 }
1181
1182 let peers = selector.select_peers();
1183 assert_eq!(peers[0], "new");
1184 }
1185
1186 #[test]
1187 fn test_lowest_latency_strategy() {
1188 let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1189 selector.add_peer("peer1");
1190 selector.add_peer("peer2");
1191 selector.add_peer("peer3");
1192
1193 selector.record_request("peer1", 40);
1195 selector.record_success("peer1", 100, 1024);
1196
1197 selector.record_request("peer2", 40);
1199 selector.record_success("peer2", 20, 1024);
1200
1201 selector.record_request("peer3", 40);
1203 selector.record_success("peer3", 50, 1024);
1204
1205 let peers = selector.select_peers();
1206 assert_eq!(peers[0], "peer2");
1208 }
1209
1210 fn build_cashu_priority_fixture() -> PeerSelector {
1211 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1212 selector.add_peer("reliable");
1213 selector.add_peer("paid");
1214
1215 {
1216 let reliable = selector.get_stats_mut("reliable").expect("reliable");
1217 reliable.requests_sent = 80;
1218 reliable.successes = 75;
1219 reliable.failures = 2;
1220 reliable.timeouts = 3;
1221 reliable.srtt_ms = 40.0;
1222 reliable.bytes_sent = 80 * 40;
1223 reliable.bytes_received = 75 * 1024;
1224 }
1225 {
1226 let paid = selector.get_stats_mut("paid").expect("paid");
1227 paid.requests_sent = 80;
1228 paid.successes = 36;
1229 paid.failures = 24;
1230 paid.timeouts = 20;
1231 paid.srtt_ms = 700.0;
1232 paid.bytes_sent = 80 * 40;
1233 paid.bytes_received = 36 * 512;
1234 }
1235
1236 selector
1237 }
1238
1239 #[test]
1240 fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1241 let mut selector = build_cashu_priority_fixture();
1242 selector.set_cashu_payment_weight(0.0);
1243 selector.record_cashu_payment("paid", 5_000);
1244
1245 let peers = selector.select_peers();
1246 assert_eq!(peers[0], "reliable");
1247 }
1248
1249 #[test]
1250 fn test_cashu_payment_weight_prioritizes_paid_peer() {
1251 let mut selector = build_cashu_priority_fixture();
1252 selector.set_cashu_payment_weight(0.8);
1253 selector.record_cashu_payment("paid", 5_000);
1254
1255 let peers = selector.select_peers();
1256 assert_eq!(peers[0], "paid");
1257 }
1258
1259 #[test]
1260 fn test_cashu_payment_default_downranks_peer() {
1261 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1262 selector.add_peer("honest");
1263 selector.add_peer("delinquent");
1264
1265 for peer_id in ["honest", "delinquent"] {
1266 let stats = selector.get_stats_mut(peer_id).expect("stats");
1267 stats.requests_sent = 40;
1268 stats.successes = 34;
1269 stats.failures = 3;
1270 stats.timeouts = 3;
1271 stats.srtt_ms = 60.0;
1272 stats.bytes_sent = 40 * 40;
1273 stats.bytes_received = 34 * 1024;
1274 }
1275
1276 selector.record_cashu_payment_default("delinquent");
1277
1278 let peers = selector.select_peers();
1279 assert_eq!(peers[0], "honest");
1280 assert!(!peers.iter().any(|peer| peer == "delinquent"));
1281 }
1282
1283 #[test]
1284 fn test_payment_default_threshold_blocks_peer() {
1285 let mut selector = PeerSelector::new();
1286 selector.record_cashu_payment_default("peer-a");
1287 assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1288 assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1289 }
1290
1291 #[test]
1292 fn test_peer_principal_matches_peer_id() {
1293 assert_eq!(peer_principal("npub1abc"), "npub1abc");
1294 assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1295 }
1296
1297 #[test]
1298 fn test_metadata_snapshot_restores_for_same_peer_id() {
1299 let mut selector = PeerSelector::new();
1300 selector.add_peer("npub1stable");
1301 selector.record_request("npub1stable", 64);
1302 selector.record_success("npub1stable", 32, 1024);
1303 selector.record_cashu_payment("npub1stable", 77);
1304 selector.record_cashu_receipt("npub1stable", 33);
1305 selector.record_cashu_payment_default("npub1stable");
1306
1307 let snapshot = selector.export_peer_metadata_snapshot();
1308 assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1309 assert_eq!(snapshot.peers.len(), 1);
1310 assert_eq!(snapshot.peers[0].principal, "npub1stable");
1311
1312 let mut restored = PeerSelector::new();
1313 restored.import_peer_metadata_snapshot(&snapshot);
1314 restored.add_peer("npub1stable");
1315 let stats = restored.get_stats("npub1stable").expect("restored stats");
1316 assert_eq!(stats.requests_sent, 1);
1317 assert_eq!(stats.successes, 1);
1318 assert_eq!(stats.cashu_paid_sat, 77);
1319 assert_eq!(stats.cashu_received_sat, 33);
1320 assert_eq!(stats.cashu_payment_receipts, 1);
1321 assert_eq!(stats.cashu_payment_defaults, 1);
1322 }
1323}