1use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::time::Instant;
14
15const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; const SELECTION_MIN_PEERS: usize = 5; const INITIAL_BACKOFF_MS: u64 = 1000; const BACKOFF_MULTIPLIER: u64 = 2; const MAX_BACKOFF_MS: u64 = 480_000; const MIN_RTO_MS: u64 = 50; const MAX_RTO_MS: u64 = 60_000; const INITIAL_RTO_MS: u64 = 1000; pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
37pub struct PersistedPeerMetadata {
38 pub principal: String,
40 pub requests_sent: u64,
41 pub successes: u64,
42 #[serde(default)]
43 pub misses: u64,
44 pub timeouts: u64,
45 pub failures: u64,
46 pub srtt_ms: f64,
47 pub rttvar_ms: f64,
48 pub rto_ms: u64,
49 pub bytes_received: u64,
50 pub bytes_sent: u64,
51 pub cashu_paid_sat: u64,
52 pub cashu_received_sat: u64,
53 pub cashu_payment_receipts: u64,
54 pub cashu_payment_defaults: u64,
55}
56
57impl PersistedPeerMetadata {
58 fn from_stats(principal: String, stats: &PeerStats) -> Self {
59 Self {
60 principal,
61 requests_sent: stats.requests_sent,
62 successes: stats.successes,
63 misses: stats.misses,
64 timeouts: stats.timeouts,
65 failures: stats.failures,
66 srtt_ms: sanitize_latency(stats.srtt_ms),
67 rttvar_ms: sanitize_latency(stats.rttvar_ms),
68 rto_ms: clamp_rto(stats.rto_ms),
69 bytes_received: stats.bytes_received,
70 bytes_sent: stats.bytes_sent,
71 cashu_paid_sat: stats.cashu_paid_sat,
72 cashu_received_sat: stats.cashu_received_sat,
73 cashu_payment_receipts: stats.cashu_payment_receipts,
74 cashu_payment_defaults: stats.cashu_payment_defaults,
75 }
76 }
77
78 fn apply_to_stats(&self, stats: &mut PeerStats) {
79 stats.requests_sent = self.requests_sent;
80 stats.successes = self.successes;
81 stats.misses = self.misses;
82 stats.timeouts = self.timeouts;
83 stats.failures = self.failures;
84 stats.srtt_ms = sanitize_latency(self.srtt_ms);
85 stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
86 stats.rto_ms = clamp_rto(self.rto_ms);
87 stats.bytes_received = self.bytes_received;
88 stats.bytes_sent = self.bytes_sent;
89 stats.cashu_paid_sat = self.cashu_paid_sat;
90 stats.cashu_received_sat = self.cashu_received_sat;
91 stats.cashu_payment_receipts = self.cashu_payment_receipts;
92 stats.cashu_payment_defaults = self.cashu_payment_defaults;
93
94 stats.backoff_level = 0;
96 stats.backed_off_until = None;
97 stats.last_success = None;
98 stats.last_failure = None;
99 stats.consecutive_rto_backoffs = 0;
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
105pub struct PeerMetadataSnapshot {
106 pub version: u32,
107 pub peers: Vec<PersistedPeerMetadata>,
108}
109
110impl Default for PeerMetadataSnapshot {
111 fn default() -> Self {
112 Self {
113 version: PEER_METADATA_SNAPSHOT_VERSION,
114 peers: Vec::new(),
115 }
116 }
117}
118
119fn sanitize_latency(value: f64) -> f64 {
120 if value.is_finite() && value >= 0.0 {
121 value
122 } else {
123 0.0
124 }
125}
126
127fn compute_backoff_ms(level: u32) -> u64 {
128 if level == 0 {
129 return 0;
130 }
131
132 let mut backoff_ms = INITIAL_BACKOFF_MS;
133 for _ in 1..level {
134 backoff_ms = backoff_ms.saturating_mul(BACKOFF_MULTIPLIER);
135 if backoff_ms >= MAX_BACKOFF_MS {
136 return MAX_BACKOFF_MS;
137 }
138 }
139
140 backoff_ms.min(MAX_BACKOFF_MS)
141}
142
143fn clamp_rto(rto_ms: u64) -> u64 {
144 if rto_ms == 0 {
145 INITIAL_RTO_MS
146 } else {
147 rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
148 }
149}
150
151pub fn peer_principal(peer_id: &str) -> &str {
156 peer_id
157}
158
159#[derive(Debug, Clone)]
161pub struct PeerStats {
162 pub peer_id: String,
164 pub connected_at: Instant,
166 pub requests_sent: u64,
168 pub successes: u64,
170 pub misses: u64,
172 pub timeouts: u64,
174 pub failures: u64,
176 pub srtt_ms: f64,
178 pub rttvar_ms: f64,
180 pub rto_ms: u64,
182 pub consecutive_rto_backoffs: u32,
184 pub backoff_level: u32,
186 pub backed_off_until: Option<Instant>,
188 pub last_success: Option<Instant>,
190 pub last_failure: Option<Instant>,
192 pub bytes_received: u64,
194 pub bytes_sent: u64,
196 pub cashu_paid_sat: u64,
198 pub cashu_received_sat: u64,
200 pub cashu_payment_receipts: u64,
202 pub cashu_payment_defaults: u64,
204}
205
206impl PeerStats {
207 pub fn new(peer_id: impl Into<String>) -> Self {
209 Self {
210 peer_id: peer_id.into(),
211 connected_at: Instant::now(),
212 requests_sent: 0,
213 successes: 0,
214 misses: 0,
215 timeouts: 0,
216 failures: 0,
217 srtt_ms: 0.0,
218 rttvar_ms: 0.0,
219 rto_ms: INITIAL_RTO_MS,
220 consecutive_rto_backoffs: 0,
221 backoff_level: 0,
222 backed_off_until: None,
223 last_success: None,
224 last_failure: None,
225 bytes_received: 0,
226 bytes_sent: 0,
227 cashu_paid_sat: 0,
228 cashu_received_sat: 0,
229 cashu_payment_receipts: 0,
230 cashu_payment_defaults: 0,
231 }
232 }
233
234 pub fn success_rate(&self) -> f64 {
236 let attempts_with_health_outcome = self.requests_sent.saturating_sub(self.misses);
237 if attempts_with_health_outcome == 0 {
238 return 0.5; }
240 self.successes as f64 / attempts_with_health_outcome as f64
241 }
242
243 pub fn selection_rate(&self) -> f64 {
245 let elapsed = self.connected_at.elapsed();
246 if elapsed.as_secs() < 10 {
247 return 0.0; }
249 self.requests_sent as f64 / elapsed.as_secs_f64()
250 }
251
252 pub fn is_backed_off(&self) -> bool {
254 if let Some(until) = self.backed_off_until {
255 Instant::now() < until
256 } else {
257 false
258 }
259 }
260
261 pub fn backoff_remaining(&self) -> Duration {
263 if let Some(until) = self.backed_off_until {
264 let now = Instant::now();
265 if now < until {
266 return until - now;
267 }
268 }
269 Duration::ZERO
270 }
271
272 pub fn record_request(&mut self, bytes: u64) {
274 self.requests_sent += 1;
275 self.bytes_sent += bytes;
276 }
277
278 pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
281 self.successes += 1;
282 self.bytes_received += bytes;
283 self.last_success = Some(Instant::now());
284 self.consecutive_rto_backoffs = 0;
285
286 self.backed_off_until = None;
288 self.backoff_level = 0;
289
290 let rtt = rtt_ms as f64;
292 if self.srtt_ms == 0.0 {
293 self.srtt_ms = rtt;
295 self.rttvar_ms = rtt / 2.0;
296 } else {
297 self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
302 self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
303 }
304
305 let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
307 self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
308 }
309
310 pub fn record_miss(&mut self) {
312 self.misses += 1;
313 }
314
315 pub fn record_timeout(&mut self) {
317 self.timeouts += 1;
318 self.last_failure = Some(Instant::now());
319
320 self.apply_backoff();
322
323 if self.consecutive_rto_backoffs < 5 {
325 self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
326 self.consecutive_rto_backoffs += 1;
327 }
328 }
329
330 pub fn record_failure(&mut self) {
332 self.failures += 1;
333 self.last_failure = Some(Instant::now());
334 self.apply_backoff();
335 }
336
337 pub fn record_cashu_payment(&mut self, amount_sat: u64) {
339 if amount_sat == 0 {
340 return;
341 }
342 self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
343 }
344
345 pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
347 if amount_sat == 0 {
348 return;
349 }
350 self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
351 self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
352 }
353
354 pub fn record_cashu_payment_default(&mut self) {
356 self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
357 self.last_failure = Some(Instant::now());
358 self.apply_backoff();
359 }
360
361 fn apply_backoff(&mut self) {
363 self.backoff_level += 1;
364 let backoff_ms = compute_backoff_ms(self.backoff_level);
365 self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
366 }
367
368 pub fn score(&self) -> f64 {
371 let success_score = self.success_rate();
373
374 let rtt_score = if self.srtt_ms <= 0.0 {
377 0.5 } else {
379 (500.0 / (self.srtt_ms + 50.0)).min(1.0)
380 };
381
382 let recency_bonus = if let Some(last) = self.last_success {
384 let secs_ago = last.elapsed().as_secs_f64();
385 if secs_ago < 60.0 {
386 0.1 } else {
388 0.0
389 }
390 } else {
391 0.0
392 };
393
394 0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
397 }
398
399 pub fn utility_score(&self, total_requests: u64) -> f64 {
407 let good = self.successes as f64 + 1.0;
408 let bad = (self.failures + self.timeouts) as f64 + 1.0;
409 let ratio = good / bad;
410 let ratio_score = ratio / (1.0 + ratio);
411
412 let latency_score = if self.srtt_ms <= 0.0 {
413 0.5
414 } else {
415 (300.0 / (self.srtt_ms + 50.0)).min(1.0)
416 };
417
418 let efficiency_score = if self.bytes_sent == 0 {
419 0.5
420 } else {
421 (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
422 };
423
424 let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
425
426 let uncertainty =
427 (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
428 let exploration_bonus = 0.20 * uncertainty;
429
430 exploitation + exploration_bonus
431 }
432
433 pub fn cashu_priority_boost(&self) -> f64 {
435 if self.cashu_paid_sat == 0 {
436 return 0.0;
437 }
438 let paid = self.cashu_paid_sat as f64;
439 paid / (paid + 32.0)
440 }
441
442 pub fn payment_reliability_multiplier(&self) -> f64 {
445 if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
446 return 1.0;
447 }
448 (self.cashu_payment_receipts as f64 + 1.0)
449 / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
450 }
451
452 pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
453 threshold > 0 && self.cashu_payment_defaults >= threshold
454 }
455}
456
457#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
459pub enum SelectionStrategy {
460 #[default]
462 Weighted,
463 RoundRobin,
465 Random,
467 LowestLatency,
469 HighestSuccessRate,
471 UtilityUcb,
473}
474
475#[derive(Debug, Default)]
483pub struct PeerSelector {
484 stats: HashMap<String, PeerStats>,
486 persisted_metadata: HashMap<String, PersistedPeerMetadata>,
488 strategy: SelectionStrategy,
490 fairness_enabled: bool,
492 round_robin_idx: usize,
494 cashu_payment_weight: f64,
496}
497
498impl PeerSelector {
499 pub fn new() -> Self {
501 Self {
502 stats: HashMap::new(),
503 persisted_metadata: HashMap::new(),
504 strategy: SelectionStrategy::Weighted,
505 fairness_enabled: true,
506 round_robin_idx: 0,
507 cashu_payment_weight: 0.0,
508 }
509 }
510
511 pub fn with_strategy(strategy: SelectionStrategy) -> Self {
513 Self {
514 stats: HashMap::new(),
515 persisted_metadata: HashMap::new(),
516 strategy,
517 fairness_enabled: true,
518 round_robin_idx: 0,
519 cashu_payment_weight: 0.0,
520 }
521 }
522
523 pub fn set_fairness(&mut self, enabled: bool) {
525 self.fairness_enabled = enabled;
526 }
527
528 pub fn set_cashu_payment_weight(&mut self, weight: f64) {
531 self.cashu_payment_weight = weight.clamp(0.0, 1.0);
532 }
533
534 pub fn add_peer(&mut self, peer_id: impl Into<String>) {
536 let peer_id = peer_id.into();
537 if self.stats.contains_key(&peer_id) {
538 return;
539 }
540
541 let mut stats = PeerStats::new(peer_id.clone());
542 if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
543 saved.apply_to_stats(&mut stats);
544 }
545 self.stats.insert(peer_id, stats);
546 }
547
548 pub fn remove_peer(&mut self, peer_id: &str) {
550 if let Some(stats) = self.stats.remove(peer_id) {
551 let principal = peer_principal(&stats.peer_id).to_string();
552 self.persisted_metadata.insert(
553 principal.clone(),
554 PersistedPeerMetadata::from_stats(principal, &stats),
555 );
556 }
557 }
558
559 pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
561 self.stats.get(peer_id)
562 }
563
564 pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
566 self.stats.get_mut(peer_id)
567 }
568
569 pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
571 self.stats.values()
572 }
573
574 pub fn is_peer_backed_off(&self, peer_id: &str) -> bool {
576 self.stats
577 .get(peer_id)
578 .is_some_and(PeerStats::is_backed_off)
579 }
580
581 pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
583 if let Some(stats) = self.stats.get_mut(peer_id) {
584 stats.record_request(bytes);
585 }
586 }
587
588 pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
590 if let Some(stats) = self.stats.get_mut(peer_id) {
591 stats.record_success(rtt_ms, bytes);
592 }
593 }
594
595 pub fn record_miss(&mut self, peer_id: &str) {
597 if let Some(stats) = self.stats.get_mut(peer_id) {
598 stats.record_miss();
599 }
600 }
601
602 pub fn record_timeout(&mut self, peer_id: &str) {
604 if let Some(stats) = self.stats.get_mut(peer_id) {
605 stats.record_timeout();
606 }
607 }
608
609 pub fn record_failure(&mut self, peer_id: &str) {
611 if let Some(stats) = self.stats.get_mut(peer_id) {
612 stats.record_failure();
613 }
614 }
615
616 pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
618 if amount_sat == 0 {
619 return;
620 }
621 let entry = self
622 .stats
623 .entry(peer_id.to_string())
624 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
625 entry.record_cashu_payment(amount_sat);
626 }
627
628 pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
630 if amount_sat == 0 {
631 return;
632 }
633 let entry = self
634 .stats
635 .entry(peer_id.to_string())
636 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
637 entry.record_cashu_receipt(amount_sat);
638 }
639
640 pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
642 let entry = self
643 .stats
644 .entry(peer_id.to_string())
645 .or_insert_with(|| PeerStats::new(peer_id.to_string()));
646 entry.record_cashu_payment_default();
647 }
648
649 pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
650 self.stats
651 .get(peer_id)
652 .map(|stats| stats.exceeds_payment_default_threshold(threshold))
653 .unwrap_or(false)
654 }
655
656 fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
657 let reliable_base = base_score * stats.payment_reliability_multiplier();
658 if self.cashu_payment_weight <= 0.0 {
659 return reliable_base;
660 }
661 let payment_score = stats.cashu_priority_boost();
662 (1.0 - self.cashu_payment_weight) * reliable_base
663 + self.cashu_payment_weight * payment_score
664 }
665
666 fn available_peers(&self) -> Vec<String> {
668 self.stats
669 .iter()
670 .filter(|(_, s)| !s.is_backed_off())
671 .map(|(id, _)| id.clone())
672 .collect()
673 }
674
675 #[cfg(test)]
677 fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
678 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
679 self.should_skip_for_fairness_with_total(peer_id, total_rate)
680 }
681
682 fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
683 if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
684 return false;
685 }
686
687 if let Some(stats) = self.stats.get(peer_id) {
689 let peer_rate = stats.selection_rate();
690 let proportion = peer_rate / total_rate;
691 return proportion > SELECTION_PERCENTAGE_WARNING;
692 }
693
694 false
695 }
696
697 pub fn select_peers(&mut self) -> Vec<String> {
702 let available = self.available_peers();
703 if available.is_empty() {
704 let mut backed_off: Vec<_> = self
707 .stats
708 .iter()
709 .filter(|(_, s)| s.is_backed_off())
710 .map(|(id, s)| (id.clone(), s.backoff_remaining()))
711 .collect();
712 backed_off.sort_by_key(|(_, remaining)| *remaining);
713 return backed_off.into_iter().map(|(id, _)| id).collect();
714 }
715
716 let candidates: Vec<String> =
718 if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
719 let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
720 available
721 .into_iter()
722 .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
723 .collect()
724 } else {
725 available
726 };
727
728 let candidates = if candidates.is_empty() {
730 self.available_peers()
731 } else {
732 candidates
733 };
734
735 let mut sorted: Vec<_> = candidates
737 .into_iter()
738 .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
739 .collect();
740
741 match self.strategy {
742 SelectionStrategy::Weighted => {
743 sorted.sort_by(|(id_a, a), (id_b, b)| {
745 let score_a = self.blend_with_payment_priority(a, a.score());
746 let score_b = self.blend_with_payment_priority(b, b.score());
747 let score_cmp = score_b
748 .partial_cmp(&score_a)
749 .unwrap_or(std::cmp::Ordering::Equal);
750 if score_cmp == std::cmp::Ordering::Equal {
751 id_a.cmp(id_b) } else {
753 score_cmp
754 }
755 });
756 }
757 SelectionStrategy::LowestLatency => {
758 sorted.sort_by(|(id_a, a), (id_b, b)| {
760 let rtt_cmp = a
761 .srtt_ms
762 .partial_cmp(&b.srtt_ms)
763 .unwrap_or(std::cmp::Ordering::Equal);
764 if rtt_cmp == std::cmp::Ordering::Equal {
765 let score_cmp = b
766 .score()
767 .partial_cmp(&a.score())
768 .unwrap_or(std::cmp::Ordering::Equal);
769 if score_cmp == std::cmp::Ordering::Equal {
770 id_a.cmp(id_b)
771 } else {
772 score_cmp
773 }
774 } else {
775 rtt_cmp
776 }
777 });
778 }
779 SelectionStrategy::HighestSuccessRate => {
780 sorted.sort_by(|(id_a, a), (id_b, b)| {
782 let rate_cmp = b
783 .success_rate()
784 .partial_cmp(&a.success_rate())
785 .unwrap_or(std::cmp::Ordering::Equal);
786 if rate_cmp == std::cmp::Ordering::Equal {
787 id_a.cmp(id_b)
788 } else {
789 rate_cmp
790 }
791 });
792 }
793 SelectionStrategy::UtilityUcb => {
794 let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
795 sorted.sort_by(|(id_a, a), (id_b, b)| {
796 let score_a =
797 self.blend_with_payment_priority(a, a.utility_score(total_requests));
798 let score_b =
799 self.blend_with_payment_priority(b, b.utility_score(total_requests));
800 let score_cmp = score_b
801 .partial_cmp(&score_a)
802 .unwrap_or(std::cmp::Ordering::Equal);
803 if score_cmp == std::cmp::Ordering::Equal {
804 id_a.cmp(id_b)
805 } else {
806 score_cmp
807 }
808 });
809 }
810 SelectionStrategy::RoundRobin => {
811 if !sorted.is_empty() {
813 let idx = self.round_robin_idx % sorted.len();
814 sorted.rotate_left(idx);
815 self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
816 }
817 }
818 SelectionStrategy::Random => {
819 }
822 }
823
824 sorted.into_iter().map(|(id, _)| id).collect()
825 }
826
827 pub fn select_best(&mut self) -> Option<String> {
829 self.select_peers().into_iter().next()
830 }
831
832 pub fn select_top(&mut self, n: usize) -> Vec<String> {
834 self.select_peers().into_iter().take(n).collect()
835 }
836
837 pub fn summary(&self) -> SelectorSummary {
839 let count = self.stats.len();
840 if count == 0 {
841 return SelectorSummary::default();
842 }
843
844 let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
845 let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
846 let total_misses: u64 = self.stats.values().map(|s| s.misses).sum();
847 let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
848 let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
849 let total_health_attempts = total_requests.saturating_sub(total_misses);
850
851 let avg_rtt = {
852 let rtts: Vec<f64> = self
853 .stats
854 .values()
855 .filter(|s| s.srtt_ms > 0.0)
856 .map(|s| s.srtt_ms)
857 .collect();
858 if rtts.is_empty() {
859 0.0
860 } else {
861 rtts.iter().sum::<f64>() / rtts.len() as f64
862 }
863 };
864
865 SelectorSummary {
866 peer_count: count,
867 total_requests,
868 total_successes,
869 total_timeouts,
870 backed_off_count: backed_off,
871 avg_rtt_ms: avg_rtt,
872 overall_success_rate: if total_health_attempts > 0 {
873 total_successes as f64 / total_health_attempts as f64
874 } else {
875 0.0
876 },
877 }
878 }
879
880 pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
882 let mut by_principal = self.persisted_metadata.clone();
883 for stats in self.stats.values() {
884 let principal = peer_principal(&stats.peer_id).to_string();
885 by_principal.insert(
886 principal.clone(),
887 PersistedPeerMetadata::from_stats(principal, stats),
888 );
889 }
890
891 let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
892 peers.sort_by(|a, b| a.principal.cmp(&b.principal));
893
894 PeerMetadataSnapshot {
895 version: PEER_METADATA_SNAPSHOT_VERSION,
896 peers,
897 }
898 }
899
900 pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
902 if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
903 return;
904 }
905
906 self.persisted_metadata.clear();
907 for peer in &snapshot.peers {
908 self.persisted_metadata
909 .insert(peer.principal.clone(), peer.clone());
910 }
911
912 for stats in self.stats.values_mut() {
913 if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
914 saved.apply_to_stats(stats);
915 }
916 }
917 }
918}
919
920#[derive(Debug, Clone, Default)]
922pub struct SelectorSummary {
923 pub peer_count: usize,
924 pub total_requests: u64,
925 pub total_successes: u64,
926 pub total_timeouts: u64,
927 pub backed_off_count: usize,
928 pub avg_rtt_ms: f64,
929 pub overall_success_rate: f64,
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use std::thread::sleep;
936
937 #[test]
938 fn test_peer_stats_success_rate() {
939 let mut stats = PeerStats::new("peer1");
940 assert_eq!(stats.success_rate(), 0.5); stats.record_request(40);
943 stats.record_success(50, 1024);
944 assert_eq!(stats.success_rate(), 1.0);
945
946 stats.record_request(40);
947 stats.record_timeout();
948 assert_eq!(stats.success_rate(), 0.5);
949 }
950
951 #[test]
952 fn test_peer_stats_miss_does_not_lower_health_success_rate() {
953 let mut stats = PeerStats::new("peer1");
954
955 stats.record_request(40);
956 stats.record_miss();
957 assert_eq!(stats.success_rate(), 0.5);
958
959 stats.record_request(40);
960 stats.record_success(50, 1024);
961 assert_eq!(stats.success_rate(), 1.0);
962 }
963
964 #[test]
965 fn test_peer_stats_rtt_calculation() {
966 let mut stats = PeerStats::new("peer1");
967
968 stats.record_request(40);
970 stats.record_success(100, 1024);
971 assert_eq!(stats.srtt_ms, 100.0);
972 assert_eq!(stats.rttvar_ms, 50.0); stats.record_request(40);
976 stats.record_success(80, 1024);
977 assert!((stats.srtt_ms - 97.5).abs() < 0.1);
979 }
980
981 #[test]
982 fn test_peer_stats_backoff() {
983 let mut stats = PeerStats::new("peer1");
984 assert!(!stats.is_backed_off());
985
986 stats.record_timeout();
987 assert!(stats.is_backed_off());
988 assert!(stats.backoff_remaining() > Duration::ZERO);
989 }
990
991 #[test]
992 fn test_peer_stats_backoff_clears_on_success() {
993 let mut stats = PeerStats::new("peer1");
994 stats.record_timeout();
995 assert!(stats.is_backed_off());
996
997 stats.record_success(50, 1024);
998 assert!(!stats.is_backed_off());
999 assert_eq!(stats.backoff_level, 0);
1000 }
1001
1002 #[test]
1003 fn test_peer_stats_backoff_saturates_without_overflow() {
1004 let mut stats = PeerStats::new("peer1");
1005
1006 for _ in 0..128 {
1007 stats.record_failure();
1008 }
1009
1010 assert_eq!(compute_backoff_ms(stats.backoff_level), MAX_BACKOFF_MS);
1011 assert!(stats.is_backed_off());
1012 }
1013
1014 #[test]
1015 fn test_peer_selector_add_remove() {
1016 let mut selector = PeerSelector::new();
1017 selector.add_peer("peer1");
1018 selector.add_peer("peer2");
1019 assert!(selector.get_stats("peer1").is_some());
1020 assert!(selector.get_stats("peer2").is_some());
1021
1022 selector.remove_peer("peer1");
1023 assert!(selector.get_stats("peer1").is_none());
1024 assert!(selector.get_stats("peer2").is_some());
1025 }
1026
1027 #[test]
1028 fn test_peer_selector_weighted_selection() {
1029 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1030 selector.add_peer("peer1");
1031 selector.add_peer("peer2");
1032 selector.add_peer("peer3");
1033
1034 selector.record_request("peer1", 40);
1036 selector.record_success("peer1", 20, 1024);
1037 selector.record_request("peer1", 40);
1038 selector.record_success("peer1", 25, 1024);
1039
1040 selector.record_request("peer2", 40);
1042 selector.record_success("peer2", 100, 1024);
1043 selector.record_request("peer2", 40);
1044 selector.record_timeout("peer2");
1045
1046 selector.record_request("peer3", 40);
1048 selector.record_timeout("peer3");
1049 selector.record_request("peer3", 40);
1050 selector.record_timeout("peer3");
1051
1052 let peers = selector.select_peers();
1054 assert_eq!(peers[0], "peer1");
1056 }
1057
1058 #[test]
1059 fn test_peer_selector_backed_off_peers() {
1060 let mut selector = PeerSelector::new();
1061 selector.add_peer("peer1");
1062 selector.add_peer("peer2");
1063
1064 selector.record_timeout("peer1");
1066 assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1067
1068 let peers = selector.select_peers();
1070 assert_eq!(peers.len(), 1);
1071 assert_eq!(peers[0], "peer2");
1072 }
1073
1074 #[test]
1075 fn test_peer_selector_all_backed_off_fallback() {
1076 let mut selector = PeerSelector::new();
1077 selector.add_peer("peer1");
1078 selector.add_peer("peer2");
1079
1080 selector.record_timeout("peer1");
1082 selector.record_timeout("peer2");
1083
1084 let peers = selector.select_peers();
1086 assert_eq!(peers.len(), 2);
1087 }
1088
1089 #[test]
1090 fn test_peer_selector_fairness() {
1091 let mut selector = PeerSelector::new();
1092 selector.set_fairness(true);
1093
1094 for i in 1..=6 {
1096 selector.add_peer(format!("peer{}", i));
1097 }
1098
1099 sleep(Duration::from_millis(15));
1101
1102 for _ in 0..100 {
1103 selector.record_request("peer1", 40);
1104 selector.record_success("peer1", 10, 100);
1105 }
1106
1107 for i in 2..=6 {
1109 selector.record_request(&format!("peer{}", i), 40);
1110 selector.record_success(&format!("peer{}", i), 10, 100);
1111 }
1112
1113 let skipped = selector.should_skip_for_fairness("peer1");
1115 let _ = skipped; }
1117
1118 #[test]
1119 fn test_peer_selector_summary() {
1120 let mut selector = PeerSelector::new();
1121 selector.add_peer("peer1");
1122 selector.add_peer("peer2");
1123
1124 selector.record_request("peer1", 40);
1125 selector.record_success("peer1", 50, 1024);
1126 selector.record_request("peer2", 40);
1127 selector.record_timeout("peer2");
1128
1129 let summary = selector.summary();
1130 assert_eq!(summary.peer_count, 2);
1131 assert_eq!(summary.total_requests, 2);
1132 assert_eq!(summary.total_successes, 1);
1133 assert_eq!(summary.total_timeouts, 1);
1134 assert_eq!(summary.backed_off_count, 1);
1135 assert_eq!(summary.overall_success_rate, 0.5);
1136 }
1137
1138 #[test]
1139 fn test_peer_stats_score() {
1140 let mut stats = PeerStats::new("peer1");
1141
1142 let initial_score = stats.score();
1144 assert!(initial_score > 0.3 && initial_score < 0.7);
1145
1146 for _ in 0..10 {
1148 stats.record_request(40);
1149 stats.record_success(20, 1024);
1150 }
1151 let good_score = stats.score();
1152 assert!(good_score > 0.8);
1153
1154 let mut bad_stats = PeerStats::new("peer2");
1156 for _ in 0..10 {
1157 bad_stats.record_request(40);
1158 bad_stats.record_timeout();
1159 }
1160 let bad_score = bad_stats.score();
1161 assert!(bad_score < 0.3);
1162
1163 assert!(good_score > bad_score);
1164 }
1165
1166 #[test]
1167 fn test_peer_stats_utility_score_prefers_good_over_bad() {
1168 let mut good = PeerStats::new("good");
1169 good.requests_sent = 120;
1170 good.successes = 96;
1171 good.failures = 8;
1172 good.timeouts = 4;
1173 good.srtt_ms = 30.0;
1174 good.bytes_sent = 120 * 40;
1175 good.bytes_received = 96 * 1024;
1176
1177 let mut bad = PeerStats::new("bad");
1178 bad.requests_sent = 120;
1179 bad.successes = 40;
1180 bad.failures = 50;
1181 bad.timeouts = 30;
1182 bad.srtt_ms = 220.0;
1183 bad.bytes_sent = 120 * 40;
1184 bad.bytes_received = 40 * 1024;
1185
1186 let total_requests = good.requests_sent + bad.requests_sent;
1187 assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1188 }
1189
1190 #[test]
1191 fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1192 let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1193 selector.add_peer("stable");
1194 selector.add_peer("new");
1195
1196 {
1197 let stable = selector.get_stats_mut("stable").unwrap();
1198 stable.requests_sent = 500;
1199 stable.successes = 450;
1200 stable.failures = 35;
1201 stable.timeouts = 15;
1202 stable.srtt_ms = 35.0;
1203 stable.bytes_sent = 500 * 40;
1204 stable.bytes_received = 450 * 1024;
1205 }
1206 {
1207 let new_peer = selector.get_stats_mut("new").unwrap();
1208 new_peer.requests_sent = 2;
1209 new_peer.successes = 2;
1210 new_peer.failures = 0;
1211 new_peer.timeouts = 0;
1212 new_peer.srtt_ms = 70.0;
1213 new_peer.bytes_sent = 2 * 40;
1214 new_peer.bytes_received = 2 * 1024;
1215 }
1216
1217 let peers = selector.select_peers();
1218 assert_eq!(peers[0], "new");
1219 }
1220
1221 #[test]
1222 fn test_lowest_latency_strategy() {
1223 let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1224 selector.add_peer("peer1");
1225 selector.add_peer("peer2");
1226 selector.add_peer("peer3");
1227
1228 selector.record_request("peer1", 40);
1230 selector.record_success("peer1", 100, 1024);
1231
1232 selector.record_request("peer2", 40);
1234 selector.record_success("peer2", 20, 1024);
1235
1236 selector.record_request("peer3", 40);
1238 selector.record_success("peer3", 50, 1024);
1239
1240 let peers = selector.select_peers();
1241 assert_eq!(peers[0], "peer2");
1243 }
1244
1245 fn build_cashu_priority_fixture() -> PeerSelector {
1246 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1247 selector.add_peer("reliable");
1248 selector.add_peer("paid");
1249
1250 {
1251 let reliable = selector.get_stats_mut("reliable").expect("reliable");
1252 reliable.requests_sent = 80;
1253 reliable.successes = 75;
1254 reliable.failures = 2;
1255 reliable.timeouts = 3;
1256 reliable.srtt_ms = 40.0;
1257 reliable.bytes_sent = 80 * 40;
1258 reliable.bytes_received = 75 * 1024;
1259 }
1260 {
1261 let paid = selector.get_stats_mut("paid").expect("paid");
1262 paid.requests_sent = 80;
1263 paid.successes = 36;
1264 paid.failures = 24;
1265 paid.timeouts = 20;
1266 paid.srtt_ms = 700.0;
1267 paid.bytes_sent = 80 * 40;
1268 paid.bytes_received = 36 * 512;
1269 }
1270
1271 selector
1272 }
1273
1274 #[test]
1275 fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1276 let mut selector = build_cashu_priority_fixture();
1277 selector.set_cashu_payment_weight(0.0);
1278 selector.record_cashu_payment("paid", 5_000);
1279
1280 let peers = selector.select_peers();
1281 assert_eq!(peers[0], "reliable");
1282 }
1283
1284 #[test]
1285 fn test_cashu_payment_weight_prioritizes_paid_peer() {
1286 let mut selector = build_cashu_priority_fixture();
1287 selector.set_cashu_payment_weight(0.8);
1288 selector.record_cashu_payment("paid", 5_000);
1289
1290 let peers = selector.select_peers();
1291 assert_eq!(peers[0], "paid");
1292 }
1293
1294 #[test]
1295 fn test_cashu_payment_default_downranks_peer() {
1296 let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1297 selector.add_peer("honest");
1298 selector.add_peer("delinquent");
1299
1300 for peer_id in ["honest", "delinquent"] {
1301 let stats = selector.get_stats_mut(peer_id).expect("stats");
1302 stats.requests_sent = 40;
1303 stats.successes = 34;
1304 stats.failures = 3;
1305 stats.timeouts = 3;
1306 stats.srtt_ms = 60.0;
1307 stats.bytes_sent = 40 * 40;
1308 stats.bytes_received = 34 * 1024;
1309 }
1310
1311 selector.record_cashu_payment_default("delinquent");
1312
1313 let peers = selector.select_peers();
1314 assert_eq!(peers[0], "honest");
1315 assert!(!peers.iter().any(|peer| peer == "delinquent"));
1316 }
1317
1318 #[test]
1319 fn test_payment_default_threshold_blocks_peer() {
1320 let mut selector = PeerSelector::new();
1321 selector.record_cashu_payment_default("peer-a");
1322 assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1323 assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1324 }
1325
1326 #[test]
1327 fn test_peer_principal_matches_peer_id() {
1328 assert_eq!(peer_principal("npub1abc"), "npub1abc");
1329 assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1330 }
1331
1332 #[test]
1333 fn test_metadata_snapshot_restores_for_same_peer_id() {
1334 let mut selector = PeerSelector::new();
1335 selector.add_peer("npub1stable");
1336 selector.record_request("npub1stable", 64);
1337 selector.record_success("npub1stable", 32, 1024);
1338 selector.record_cashu_payment("npub1stable", 77);
1339 selector.record_cashu_receipt("npub1stable", 33);
1340 selector.record_cashu_payment_default("npub1stable");
1341
1342 let snapshot = selector.export_peer_metadata_snapshot();
1343 assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1344 assert_eq!(snapshot.peers.len(), 1);
1345 assert_eq!(snapshot.peers[0].principal, "npub1stable");
1346
1347 let mut restored = PeerSelector::new();
1348 restored.import_peer_metadata_snapshot(&snapshot);
1349 restored.add_peer("npub1stable");
1350 let stats = restored.get_stats("npub1stable").expect("restored stats");
1351 assert_eq!(stats.requests_sent, 1);
1352 assert_eq!(stats.successes, 1);
1353 assert_eq!(stats.cashu_paid_sat, 77);
1354 assert_eq!(stats.cashu_received_sat, 33);
1355 assert_eq!(stats.cashu_payment_receipts, 1);
1356 assert_eq!(stats.cashu_payment_defaults, 1);
1357 }
1358}