Skip to main content

hashtree_network/
peer_selector.rs

1//! Adaptive peer selection based on Freenet patterns
2//!
3//! Implements sophisticated peer selection that favors reliable, fast peers:
4//! - Per-peer performance tracking (RTT, success rate)
5//! - RFC 2988-style smoothed RTT calculation
6//! - Exponential backoff for failing/slow peers
7//! - Fairness constraints to prevent overloading any single peer
8//! - Weighted selection combining multiple signals
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::Duration;
13use tokio::time::Instant;
14
15/// Constants from Freenet's PeerManager
16const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; // Skip if selected >30% of time
17const SELECTION_MIN_PEERS: usize = 5; // Enable fairness if >=5 peers
18
19/// Backoff constants (from Freenet)
20const INITIAL_BACKOFF_MS: u64 = 1000; // 1 second initial backoff
21const BACKOFF_MULTIPLIER: u64 = 2; // Exponential backoff
22const MAX_BACKOFF_MS: u64 = 480_000; // 8 minutes max backoff
23
24/// RTO constants (RFC 2988)
25const MIN_RTO_MS: u64 = 50; // Minimum retransmission timeout
26const MAX_RTO_MS: u64 = 60_000; // Maximum RTO (60 seconds)
27const INITIAL_RTO_MS: u64 = 1000; // Initial RTO before any measurements
28
29/// Current schema version for persisted peer metadata snapshots.
30pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
31
32/// Persisted metadata for a logical peer principal (pubkey/npub identity).
33///
34/// This omits process-local runtime fields (`Instant`, active backoff timers) so
35/// metadata can survive restarts and session UUID churn.
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
37pub struct PersistedPeerMetadata {
38    /// Stable principal identity (usually pubkey/npub).
39    pub principal: String,
40    pub requests_sent: u64,
41    pub successes: u64,
42    pub timeouts: u64,
43    pub failures: u64,
44    pub srtt_ms: f64,
45    pub rttvar_ms: f64,
46    pub rto_ms: u64,
47    pub bytes_received: u64,
48    pub bytes_sent: u64,
49    pub cashu_paid_sat: u64,
50    pub cashu_received_sat: u64,
51    pub cashu_payment_receipts: u64,
52    pub cashu_payment_defaults: u64,
53}
54
55impl PersistedPeerMetadata {
56    fn from_stats(principal: String, stats: &PeerStats) -> Self {
57        Self {
58            principal,
59            requests_sent: stats.requests_sent,
60            successes: stats.successes,
61            timeouts: stats.timeouts,
62            failures: stats.failures,
63            srtt_ms: sanitize_latency(stats.srtt_ms),
64            rttvar_ms: sanitize_latency(stats.rttvar_ms),
65            rto_ms: clamp_rto(stats.rto_ms),
66            bytes_received: stats.bytes_received,
67            bytes_sent: stats.bytes_sent,
68            cashu_paid_sat: stats.cashu_paid_sat,
69            cashu_received_sat: stats.cashu_received_sat,
70            cashu_payment_receipts: stats.cashu_payment_receipts,
71            cashu_payment_defaults: stats.cashu_payment_defaults,
72        }
73    }
74
75    fn apply_to_stats(&self, stats: &mut PeerStats) {
76        stats.requests_sent = self.requests_sent;
77        stats.successes = self.successes;
78        stats.timeouts = self.timeouts;
79        stats.failures = self.failures;
80        stats.srtt_ms = sanitize_latency(self.srtt_ms);
81        stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
82        stats.rto_ms = clamp_rto(self.rto_ms);
83        stats.bytes_received = self.bytes_received;
84        stats.bytes_sent = self.bytes_sent;
85        stats.cashu_paid_sat = self.cashu_paid_sat;
86        stats.cashu_received_sat = self.cashu_received_sat;
87        stats.cashu_payment_receipts = self.cashu_payment_receipts;
88        stats.cashu_payment_defaults = self.cashu_payment_defaults;
89
90        // Runtime-only state is intentionally reset on restore.
91        stats.backoff_level = 0;
92        stats.backed_off_until = None;
93        stats.last_success = None;
94        stats.last_failure = None;
95        stats.consecutive_rto_backoffs = 0;
96    }
97}
98
99/// Snapshot of metadata for all known principals.
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct PeerMetadataSnapshot {
102    pub version: u32,
103    pub peers: Vec<PersistedPeerMetadata>,
104}
105
106impl Default for PeerMetadataSnapshot {
107    fn default() -> Self {
108        Self {
109            version: PEER_METADATA_SNAPSHOT_VERSION,
110            peers: Vec::new(),
111        }
112    }
113}
114
115fn sanitize_latency(value: f64) -> f64 {
116    if value.is_finite() && value >= 0.0 {
117        value
118    } else {
119        0.0
120    }
121}
122
123fn compute_backoff_ms(level: u32) -> u64 {
124    if level == 0 {
125        return 0;
126    }
127
128    let mut backoff_ms = INITIAL_BACKOFF_MS;
129    for _ in 1..level {
130        backoff_ms = backoff_ms.saturating_mul(BACKOFF_MULTIPLIER);
131        if backoff_ms >= MAX_BACKOFF_MS {
132            return MAX_BACKOFF_MS;
133        }
134    }
135
136    backoff_ms.min(MAX_BACKOFF_MS)
137}
138
139fn clamp_rto(rto_ms: u64) -> u64 {
140    if rto_ms == 0 {
141        INITIAL_RTO_MS
142    } else {
143        rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
144    }
145}
146
147/// Extract the stable principal identity for a peer.
148///
149/// Peer IDs are the stable endpoint identifier now, so selector state keys map
150/// directly to the peer id instead of stripping any session suffix.
151pub fn peer_principal(peer_id: &str) -> &str {
152    peer_id
153}
154
155/// Per-peer performance statistics
156#[derive(Debug, Clone)]
157pub struct PeerStats {
158    /// Peer identifier
159    pub peer_id: String,
160    /// When this peer was connected
161    pub connected_at: Instant,
162    /// Total requests sent to this peer
163    pub requests_sent: u64,
164    /// Total successful responses received
165    pub successes: u64,
166    /// Total timeouts
167    pub timeouts: u64,
168    /// Total failures (bad data, disconnects, etc.)
169    pub failures: u64,
170    /// Smoothed round-trip time (RFC 2988 SRTT)
171    pub srtt_ms: f64,
172    /// RTT variance (RFC 2988 RTTVAR)
173    pub rttvar_ms: f64,
174    /// Retransmission timeout (computed from SRTT and RTTVAR)
175    pub rto_ms: u64,
176    /// Consecutive RTO backoffs (for capping)
177    pub consecutive_rto_backoffs: u32,
178    /// Current backoff level (how many times we've backed off)
179    pub backoff_level: u32,
180    /// When backoff expires (None if not backed off)
181    pub backed_off_until: Option<Instant>,
182    /// Last successful response timestamp
183    pub last_success: Option<Instant>,
184    /// Last failure timestamp
185    pub last_failure: Option<Instant>,
186    /// Total bytes received from this peer
187    pub bytes_received: u64,
188    /// Total bytes sent to this peer
189    pub bytes_sent: u64,
190    /// Total sats paid to this peer through an external payment channel.
191    pub cashu_paid_sat: u64,
192    /// Total sats this peer paid us after successful delivery.
193    pub cashu_received_sat: u64,
194    /// Number of successful post-delivery payments received from this peer.
195    pub cashu_payment_receipts: u64,
196    /// Number of times this peer failed to pay after successful delivery.
197    pub cashu_payment_defaults: u64,
198}
199
200impl PeerStats {
201    /// Create new peer stats
202    pub fn new(peer_id: impl Into<String>) -> Self {
203        Self {
204            peer_id: peer_id.into(),
205            connected_at: Instant::now(),
206            requests_sent: 0,
207            successes: 0,
208            timeouts: 0,
209            failures: 0,
210            srtt_ms: 0.0,
211            rttvar_ms: 0.0,
212            rto_ms: INITIAL_RTO_MS,
213            consecutive_rto_backoffs: 0,
214            backoff_level: 0,
215            backed_off_until: None,
216            last_success: None,
217            last_failure: None,
218            bytes_received: 0,
219            bytes_sent: 0,
220            cashu_paid_sat: 0,
221            cashu_received_sat: 0,
222            cashu_payment_receipts: 0,
223            cashu_payment_defaults: 0,
224        }
225    }
226
227    /// Get success rate (0.0 to 1.0)
228    pub fn success_rate(&self) -> f64 {
229        if self.requests_sent == 0 {
230            return 0.5; // Neutral for new peers
231        }
232        self.successes as f64 / self.requests_sent as f64
233    }
234
235    /// Get selection rate (selections per second since connected)
236    pub fn selection_rate(&self) -> f64 {
237        let elapsed = self.connected_at.elapsed();
238        if elapsed.as_secs() < 10 {
239            return 0.0; // Avoid bias from short uptime (Freenet pattern)
240        }
241        self.requests_sent as f64 / elapsed.as_secs_f64()
242    }
243
244    /// Check if peer is currently backed off
245    pub fn is_backed_off(&self) -> bool {
246        if let Some(until) = self.backed_off_until {
247            Instant::now() < until
248        } else {
249            false
250        }
251    }
252
253    /// Get remaining backoff time
254    pub fn backoff_remaining(&self) -> Duration {
255        if let Some(until) = self.backed_off_until {
256            let now = Instant::now();
257            if now < until {
258                return until - now;
259            }
260        }
261        Duration::ZERO
262    }
263
264    /// Record a request being sent
265    pub fn record_request(&mut self, bytes: u64) {
266        self.requests_sent += 1;
267        self.bytes_sent += bytes;
268    }
269
270    /// Record a successful response with RTT
271    /// Uses RFC 2988 algorithm for smoothed RTT calculation
272    pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
273        self.successes += 1;
274        self.bytes_received += bytes;
275        self.last_success = Some(Instant::now());
276        self.consecutive_rto_backoffs = 0;
277
278        // Clear backoff on success
279        self.backed_off_until = None;
280        self.backoff_level = 0;
281
282        // RFC 2988 RTT update
283        let rtt = rtt_ms as f64;
284        if self.srtt_ms == 0.0 {
285            // First measurement
286            self.srtt_ms = rtt;
287            self.rttvar_ms = rtt / 2.0;
288        } else {
289            // Subsequent measurements
290            // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R'|
291            // SRTT = (1 - alpha) * SRTT + alpha * R'
292            // where alpha = 1/8 = 0.125 and beta = 1/4 = 0.25
293            self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
294            self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
295        }
296
297        // RTO = SRTT + max(G, K*RTTVAR) where G=20ms granularity, K=4
298        let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
299        self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
300    }
301
302    /// Record a timeout
303    pub fn record_timeout(&mut self) {
304        self.timeouts += 1;
305        self.last_failure = Some(Instant::now());
306
307        // Apply backoff
308        self.apply_backoff();
309
310        // RFC 2988: Double RTO on timeout (up to max)
311        if self.consecutive_rto_backoffs < 5 {
312            self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
313            self.consecutive_rto_backoffs += 1;
314        }
315    }
316
317    /// Record a failure (bad data, disconnect, etc.)
318    pub fn record_failure(&mut self) {
319        self.failures += 1;
320        self.last_failure = Some(Instant::now());
321        self.apply_backoff();
322    }
323
324    /// Record an out-of-band payment to this peer (e.g. Cashu channel transfer).
325    pub fn record_cashu_payment(&mut self, amount_sat: u64) {
326        if amount_sat == 0 {
327            return;
328        }
329        self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
330    }
331
332    /// Record a settled payment received from this peer after we served data.
333    pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
334        if amount_sat == 0 {
335            return;
336        }
337        self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
338        self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
339    }
340
341    /// Record that this peer failed to pay after successful delivery.
342    pub fn record_cashu_payment_default(&mut self) {
343        self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
344        self.last_failure = Some(Instant::now());
345        self.apply_backoff();
346    }
347
348    /// Apply exponential backoff
349    fn apply_backoff(&mut self) {
350        self.backoff_level += 1;
351        let backoff_ms = compute_backoff_ms(self.backoff_level);
352        self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
353    }
354
355    /// Calculate peer score for selection (higher is better)
356    /// Combines success rate, RTT, and recent performance
357    pub fn score(&self) -> f64 {
358        // Base score from success rate (0-1)
359        let success_score = self.success_rate();
360
361        // RTT score: prefer faster peers (inverse of normalized RTT)
362        // Scale: 0-50ms = 1.0, 500ms+ = 0.1
363        let rtt_score = if self.srtt_ms <= 0.0 {
364            0.5 // Unknown RTT, neutral
365        } else {
366            (500.0 / (self.srtt_ms + 50.0)).min(1.0)
367        };
368
369        // Recency bonus: slight boost for recently successful peers
370        let recency_bonus = if let Some(last) = self.last_success {
371            let secs_ago = last.elapsed().as_secs_f64();
372            if secs_ago < 60.0 {
373                0.1 // Recent success
374            } else {
375                0.0
376            }
377        } else {
378            0.0
379        };
380
381        // Combine scores (weighted)
382        // Success rate is most important (60%), RTT next (30%), recency last (10%)
383        0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
384    }
385
386    /// Utility-centric score with exploration bonus (UCB-style).
387    ///
388    /// Balances:
389    /// - good/bad outcome ratio (successes vs failures+timeouts),
390    /// - latency efficiency,
391    /// - bytes efficiency (received vs sent),
392    /// - uncertainty bonus for less-tested peers.
393    pub fn utility_score(&self, total_requests: u64) -> f64 {
394        let good = self.successes as f64 + 1.0;
395        let bad = (self.failures + self.timeouts) as f64 + 1.0;
396        let ratio = good / bad;
397        let ratio_score = ratio / (1.0 + ratio);
398
399        let latency_score = if self.srtt_ms <= 0.0 {
400            0.5
401        } else {
402            (300.0 / (self.srtt_ms + 50.0)).min(1.0)
403        };
404
405        let efficiency_score = if self.bytes_sent == 0 {
406            0.5
407        } else {
408            (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
409        };
410
411        let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
412
413        let uncertainty =
414            (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
415        let exploration_bonus = 0.20 * uncertainty;
416
417        exploitation + exploration_bonus
418    }
419
420    /// Normalize paid amount to a bounded priority score in [0, 1).
421    pub fn cashu_priority_boost(&self) -> f64 {
422        if self.cashu_paid_sat == 0 {
423            return 0.0;
424        }
425        let paid = self.cashu_paid_sat as f64;
426        paid / (paid + 32.0)
427    }
428
429    /// Cooperative peers that actually pay us should not be penalized; repeated
430    /// defaults quickly reduce their desirability.
431    pub fn payment_reliability_multiplier(&self) -> f64 {
432        if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
433            return 1.0;
434        }
435        (self.cashu_payment_receipts as f64 + 1.0)
436            / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
437    }
438
439    pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
440        threshold > 0 && self.cashu_payment_defaults >= threshold
441    }
442}
443
444/// Peer selection strategy
445#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
446pub enum SelectionStrategy {
447    /// Select by score (success rate + RTT) - recommended
448    #[default]
449    Weighted,
450    /// Round-robin (ignores performance)
451    RoundRobin,
452    /// Random selection
453    Random,
454    /// Lowest RTT first
455    LowestLatency,
456    /// Highest success rate first
457    HighestSuccessRate,
458    /// Utility + exploration (good/bad ratio + RTT/efficiency + UCB bonus)
459    UtilityUcb,
460}
461
462/// Adaptive peer selector
463///
464/// Tracks peer performance and selects peers intelligently:
465/// - Prefers high success rate peers
466/// - Prefers low latency peers
467/// - Backs off failing peers exponentially
468/// - Ensures fairness (no peer gets >30% of traffic with 5+ peers)
469#[derive(Debug, Default)]
470pub struct PeerSelector {
471    /// Per-peer statistics
472    stats: HashMap<String, PeerStats>,
473    /// Persisted peer metadata indexed by stable principal identity (pubkey/npub).
474    persisted_metadata: HashMap<String, PersistedPeerMetadata>,
475    /// Selection strategy
476    strategy: SelectionStrategy,
477    /// Enable fairness constraints (Freenet FOAF mitigation)
478    fairness_enabled: bool,
479    /// Round-robin index for RoundRobin strategy
480    round_robin_idx: usize,
481    /// Blending weight for payment priority. 0.0 keeps pure reputation routing.
482    cashu_payment_weight: f64,
483}
484
485impl PeerSelector {
486    /// Create a new peer selector with default weighted strategy
487    pub fn new() -> Self {
488        Self {
489            stats: HashMap::new(),
490            persisted_metadata: HashMap::new(),
491            strategy: SelectionStrategy::Weighted,
492            fairness_enabled: true,
493            round_robin_idx: 0,
494            cashu_payment_weight: 0.0,
495        }
496    }
497
498    /// Create with specific strategy
499    pub fn with_strategy(strategy: SelectionStrategy) -> Self {
500        Self {
501            stats: HashMap::new(),
502            persisted_metadata: HashMap::new(),
503            strategy,
504            fairness_enabled: true,
505            round_robin_idx: 0,
506            cashu_payment_weight: 0.0,
507        }
508    }
509
510    /// Enable/disable fairness constraints
511    pub fn set_fairness(&mut self, enabled: bool) {
512        self.fairness_enabled = enabled;
513    }
514
515    /// Configure payment-priority influence when ranking peers.
516    /// `0.0` disables payment influence and preserves reputation-only behavior.
517    pub fn set_cashu_payment_weight(&mut self, weight: f64) {
518        self.cashu_payment_weight = weight.clamp(0.0, 1.0);
519    }
520
521    /// Add a peer to track
522    pub fn add_peer(&mut self, peer_id: impl Into<String>) {
523        let peer_id = peer_id.into();
524        if self.stats.contains_key(&peer_id) {
525            return;
526        }
527
528        let mut stats = PeerStats::new(peer_id.clone());
529        if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
530            saved.apply_to_stats(&mut stats);
531        }
532        self.stats.insert(peer_id, stats);
533    }
534
535    /// Remove a peer
536    pub fn remove_peer(&mut self, peer_id: &str) {
537        if let Some(stats) = self.stats.remove(peer_id) {
538            let principal = peer_principal(&stats.peer_id).to_string();
539            self.persisted_metadata.insert(
540                principal.clone(),
541                PersistedPeerMetadata::from_stats(principal, &stats),
542            );
543        }
544    }
545
546    /// Get peer stats (immutable)
547    pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
548        self.stats.get(peer_id)
549    }
550
551    /// Get peer stats (mutable)
552    pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
553        self.stats.get_mut(peer_id)
554    }
555
556    /// Get all peer stats
557    pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
558        self.stats.values()
559    }
560
561    /// Whether this peer is currently backed off due to recent failures/timeouts.
562    pub fn is_peer_backed_off(&self, peer_id: &str) -> bool {
563        self.stats
564            .get(peer_id)
565            .is_some_and(PeerStats::is_backed_off)
566    }
567
568    /// Record a request being sent to a peer
569    pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
570        if let Some(stats) = self.stats.get_mut(peer_id) {
571            stats.record_request(bytes);
572        }
573    }
574
575    /// Record a successful response
576    pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
577        if let Some(stats) = self.stats.get_mut(peer_id) {
578            stats.record_success(rtt_ms, bytes);
579        }
580    }
581
582    /// Record a timeout
583    pub fn record_timeout(&mut self, peer_id: &str) {
584        if let Some(stats) = self.stats.get_mut(peer_id) {
585            stats.record_timeout();
586        }
587    }
588
589    /// Record a failure
590    pub fn record_failure(&mut self, peer_id: &str) {
591        if let Some(stats) = self.stats.get_mut(peer_id) {
592            stats.record_failure();
593        }
594    }
595
596    /// Record payment channel credit for a peer.
597    pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
598        if amount_sat == 0 {
599            return;
600        }
601        let entry = self
602            .stats
603            .entry(peer_id.to_string())
604            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
605        entry.record_cashu_payment(amount_sat);
606    }
607
608    /// Record a settled post-delivery payment received from a peer.
609    pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
610        if amount_sat == 0 {
611            return;
612        }
613        let entry = self
614            .stats
615            .entry(peer_id.to_string())
616            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
617        entry.record_cashu_receipt(amount_sat);
618    }
619
620    /// Record that a peer failed to settle after we delivered successfully.
621    pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
622        let entry = self
623            .stats
624            .entry(peer_id.to_string())
625            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
626        entry.record_cashu_payment_default();
627    }
628
629    pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
630        self.stats
631            .get(peer_id)
632            .map(|stats| stats.exceeds_payment_default_threshold(threshold))
633            .unwrap_or(false)
634    }
635
636    fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
637        let reliable_base = base_score * stats.payment_reliability_multiplier();
638        if self.cashu_payment_weight <= 0.0 {
639            return reliable_base;
640        }
641        let payment_score = stats.cashu_priority_boost();
642        (1.0 - self.cashu_payment_weight) * reliable_base
643            + self.cashu_payment_weight * payment_score
644    }
645
646    /// Get available (non-backed-off) peers
647    fn available_peers(&self) -> Vec<String> {
648        self.stats
649            .iter()
650            .filter(|(_, s)| !s.is_backed_off())
651            .map(|(id, _)| id.clone())
652            .collect()
653    }
654
655    /// Check fairness: should this peer be skipped due to over-selection?
656    #[cfg(test)]
657    fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
658        let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
659        self.should_skip_for_fairness_with_total(peer_id, total_rate)
660    }
661
662    fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
663        if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
664            return false;
665        }
666
667        // Check if this peer is selected too often
668        if let Some(stats) = self.stats.get(peer_id) {
669            let peer_rate = stats.selection_rate();
670            let proportion = peer_rate / total_rate;
671            return proportion > SELECTION_PERCENTAGE_WARNING;
672        }
673
674        false
675    }
676
677    /// Select peers ordered by preference
678    ///
679    /// Returns all available peers sorted by preference (best first).
680    /// Respects backoff states and fairness constraints.
681    pub fn select_peers(&mut self) -> Vec<String> {
682        let available = self.available_peers();
683        if available.is_empty() {
684            // If all peers are backed off, return backed off peers anyway
685            // sorted by when their backoff expires (soonest first)
686            let mut backed_off: Vec<_> = self
687                .stats
688                .iter()
689                .filter(|(_, s)| s.is_backed_off())
690                .map(|(id, s)| (id.clone(), s.backoff_remaining()))
691                .collect();
692            backed_off.sort_by_key(|(_, remaining)| *remaining);
693            return backed_off.into_iter().map(|(id, _)| id).collect();
694        }
695
696        // Apply fairness filter
697        let candidates: Vec<String> =
698            if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
699                let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
700                available
701                    .into_iter()
702                    .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
703                    .collect()
704            } else {
705                available
706            };
707
708        // If all peers were filtered out for fairness, use all available
709        let candidates = if candidates.is_empty() {
710            self.available_peers()
711        } else {
712            candidates
713        };
714
715        // Sort by strategy
716        let mut sorted: Vec<_> = candidates
717            .into_iter()
718            .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
719            .collect();
720
721        match self.strategy {
722            SelectionStrategy::Weighted => {
723                // Sort by score (highest first), then by peer_id for determinism
724                sorted.sort_by(|(id_a, a), (id_b, b)| {
725                    let score_a = self.blend_with_payment_priority(a, a.score());
726                    let score_b = self.blend_with_payment_priority(b, b.score());
727                    let score_cmp = score_b
728                        .partial_cmp(&score_a)
729                        .unwrap_or(std::cmp::Ordering::Equal);
730                    if score_cmp == std::cmp::Ordering::Equal {
731                        id_a.cmp(id_b) // Alphabetical for determinism
732                    } else {
733                        score_cmp
734                    }
735                });
736            }
737            SelectionStrategy::LowestLatency => {
738                // Sort by SRTT (lowest first), use score and peer_id as tiebreakers
739                sorted.sort_by(|(id_a, a), (id_b, b)| {
740                    let rtt_cmp = a
741                        .srtt_ms
742                        .partial_cmp(&b.srtt_ms)
743                        .unwrap_or(std::cmp::Ordering::Equal);
744                    if rtt_cmp == std::cmp::Ordering::Equal {
745                        let score_cmp = b
746                            .score()
747                            .partial_cmp(&a.score())
748                            .unwrap_or(std::cmp::Ordering::Equal);
749                        if score_cmp == std::cmp::Ordering::Equal {
750                            id_a.cmp(id_b)
751                        } else {
752                            score_cmp
753                        }
754                    } else {
755                        rtt_cmp
756                    }
757                });
758            }
759            SelectionStrategy::HighestSuccessRate => {
760                // Sort by success rate (highest first), peer_id as tiebreaker
761                sorted.sort_by(|(id_a, a), (id_b, b)| {
762                    let rate_cmp = b
763                        .success_rate()
764                        .partial_cmp(&a.success_rate())
765                        .unwrap_or(std::cmp::Ordering::Equal);
766                    if rate_cmp == std::cmp::Ordering::Equal {
767                        id_a.cmp(id_b)
768                    } else {
769                        rate_cmp
770                    }
771                });
772            }
773            SelectionStrategy::UtilityUcb => {
774                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
775                sorted.sort_by(|(id_a, a), (id_b, b)| {
776                    let score_a =
777                        self.blend_with_payment_priority(a, a.utility_score(total_requests));
778                    let score_b =
779                        self.blend_with_payment_priority(b, b.utility_score(total_requests));
780                    let score_cmp = score_b
781                        .partial_cmp(&score_a)
782                        .unwrap_or(std::cmp::Ordering::Equal);
783                    if score_cmp == std::cmp::Ordering::Equal {
784                        id_a.cmp(id_b)
785                    } else {
786                        score_cmp
787                    }
788                });
789            }
790            SelectionStrategy::RoundRobin => {
791                // Rotate the list based on round-robin index
792                if !sorted.is_empty() {
793                    let idx = self.round_robin_idx % sorted.len();
794                    sorted.rotate_left(idx);
795                    self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
796                }
797            }
798            SelectionStrategy::Random => {
799                // Shuffle using simple deterministic approach for reproducibility
800                // In production, use proper random shuffle
801            }
802        }
803
804        sorted.into_iter().map(|(id, _)| id).collect()
805    }
806
807    /// Select single best peer
808    pub fn select_best(&mut self) -> Option<String> {
809        self.select_peers().into_iter().next()
810    }
811
812    /// Select top N peers
813    pub fn select_top(&mut self, n: usize) -> Vec<String> {
814        self.select_peers().into_iter().take(n).collect()
815    }
816
817    /// Get summary statistics across all peers
818    pub fn summary(&self) -> SelectorSummary {
819        let count = self.stats.len();
820        if count == 0 {
821            return SelectorSummary::default();
822        }
823
824        let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
825        let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
826        let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
827        let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
828
829        let avg_rtt = {
830            let rtts: Vec<f64> = self
831                .stats
832                .values()
833                .filter(|s| s.srtt_ms > 0.0)
834                .map(|s| s.srtt_ms)
835                .collect();
836            if rtts.is_empty() {
837                0.0
838            } else {
839                rtts.iter().sum::<f64>() / rtts.len() as f64
840            }
841        };
842
843        SelectorSummary {
844            peer_count: count,
845            total_requests,
846            total_successes,
847            total_timeouts,
848            backed_off_count: backed_off,
849            avg_rtt_ms: avg_rtt,
850            overall_success_rate: if total_requests > 0 {
851                total_successes as f64 / total_requests as f64
852            } else {
853                0.0
854            },
855        }
856    }
857
858    /// Export persisted peer metadata keyed by stable principal identity.
859    pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
860        let mut by_principal = self.persisted_metadata.clone();
861        for stats in self.stats.values() {
862            let principal = peer_principal(&stats.peer_id).to_string();
863            by_principal.insert(
864                principal.clone(),
865                PersistedPeerMetadata::from_stats(principal, stats),
866            );
867        }
868
869        let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
870        peers.sort_by(|a, b| a.principal.cmp(&b.principal));
871
872        PeerMetadataSnapshot {
873            version: PEER_METADATA_SNAPSHOT_VERSION,
874            peers,
875        }
876    }
877
878    /// Import persisted metadata and apply it to currently tracked peers.
879    pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
880        if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
881            return;
882        }
883
884        self.persisted_metadata.clear();
885        for peer in &snapshot.peers {
886            self.persisted_metadata
887                .insert(peer.principal.clone(), peer.clone());
888        }
889
890        for stats in self.stats.values_mut() {
891            if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
892                saved.apply_to_stats(stats);
893            }
894        }
895    }
896}
897
898/// Summary statistics for the selector
899#[derive(Debug, Clone, Default)]
900pub struct SelectorSummary {
901    pub peer_count: usize,
902    pub total_requests: u64,
903    pub total_successes: u64,
904    pub total_timeouts: u64,
905    pub backed_off_count: usize,
906    pub avg_rtt_ms: f64,
907    pub overall_success_rate: f64,
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use std::thread::sleep;
914
915    #[test]
916    fn test_peer_stats_success_rate() {
917        let mut stats = PeerStats::new("peer1");
918        assert_eq!(stats.success_rate(), 0.5); // Neutral for new peer
919
920        stats.record_request(40);
921        stats.record_success(50, 1024);
922        assert_eq!(stats.success_rate(), 1.0);
923
924        stats.record_request(40);
925        stats.record_timeout();
926        assert_eq!(stats.success_rate(), 0.5);
927    }
928
929    #[test]
930    fn test_peer_stats_rtt_calculation() {
931        let mut stats = PeerStats::new("peer1");
932
933        // First RTT measurement
934        stats.record_request(40);
935        stats.record_success(100, 1024);
936        assert_eq!(stats.srtt_ms, 100.0);
937        assert_eq!(stats.rttvar_ms, 50.0); // RTT/2
938
939        // Second measurement
940        stats.record_request(40);
941        stats.record_success(80, 1024);
942        // SRTT = 0.875 * 100 + 0.125 * 80 = 87.5 + 10 = 97.5
943        assert!((stats.srtt_ms - 97.5).abs() < 0.1);
944    }
945
946    #[test]
947    fn test_peer_stats_backoff() {
948        let mut stats = PeerStats::new("peer1");
949        assert!(!stats.is_backed_off());
950
951        stats.record_timeout();
952        assert!(stats.is_backed_off());
953        assert!(stats.backoff_remaining() > Duration::ZERO);
954    }
955
956    #[test]
957    fn test_peer_stats_backoff_clears_on_success() {
958        let mut stats = PeerStats::new("peer1");
959        stats.record_timeout();
960        assert!(stats.is_backed_off());
961
962        stats.record_success(50, 1024);
963        assert!(!stats.is_backed_off());
964        assert_eq!(stats.backoff_level, 0);
965    }
966
967    #[test]
968    fn test_peer_stats_backoff_saturates_without_overflow() {
969        let mut stats = PeerStats::new("peer1");
970
971        for _ in 0..128 {
972            stats.record_failure();
973        }
974
975        assert_eq!(compute_backoff_ms(stats.backoff_level), MAX_BACKOFF_MS);
976        assert!(stats.is_backed_off());
977    }
978
979    #[test]
980    fn test_peer_selector_add_remove() {
981        let mut selector = PeerSelector::new();
982        selector.add_peer("peer1");
983        selector.add_peer("peer2");
984        assert!(selector.get_stats("peer1").is_some());
985        assert!(selector.get_stats("peer2").is_some());
986
987        selector.remove_peer("peer1");
988        assert!(selector.get_stats("peer1").is_none());
989        assert!(selector.get_stats("peer2").is_some());
990    }
991
992    #[test]
993    fn test_peer_selector_weighted_selection() {
994        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
995        selector.add_peer("peer1");
996        selector.add_peer("peer2");
997        selector.add_peer("peer3");
998
999        // Peer 1: good (high success, low RTT)
1000        selector.record_request("peer1", 40);
1001        selector.record_success("peer1", 20, 1024);
1002        selector.record_request("peer1", 40);
1003        selector.record_success("peer1", 25, 1024);
1004
1005        // Peer 2: medium
1006        selector.record_request("peer2", 40);
1007        selector.record_success("peer2", 100, 1024);
1008        selector.record_request("peer2", 40);
1009        selector.record_timeout("peer2");
1010
1011        // Peer 3: bad (timeouts)
1012        selector.record_request("peer3", 40);
1013        selector.record_timeout("peer3");
1014        selector.record_request("peer3", 40);
1015        selector.record_timeout("peer3");
1016
1017        // Peer 3 should be backed off
1018        let peers = selector.select_peers();
1019        // Peer 1 should be first (best score)
1020        assert_eq!(peers[0], "peer1");
1021    }
1022
1023    #[test]
1024    fn test_peer_selector_backed_off_peers() {
1025        let mut selector = PeerSelector::new();
1026        selector.add_peer("peer1");
1027        selector.add_peer("peer2");
1028
1029        // Back off peer 1
1030        selector.record_timeout("peer1");
1031        assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1032
1033        // Peer 2 should be available
1034        let peers = selector.select_peers();
1035        assert_eq!(peers.len(), 1);
1036        assert_eq!(peers[0], "peer2");
1037    }
1038
1039    #[test]
1040    fn test_peer_selector_all_backed_off_fallback() {
1041        let mut selector = PeerSelector::new();
1042        selector.add_peer("peer1");
1043        selector.add_peer("peer2");
1044
1045        // Back off both peers
1046        selector.record_timeout("peer1");
1047        selector.record_timeout("peer2");
1048
1049        // Should still return peers (sorted by backoff remaining)
1050        let peers = selector.select_peers();
1051        assert_eq!(peers.len(), 2);
1052    }
1053
1054    #[test]
1055    fn test_peer_selector_fairness() {
1056        let mut selector = PeerSelector::new();
1057        selector.set_fairness(true);
1058
1059        // Add 5+ peers to enable fairness
1060        for i in 1..=6 {
1061            selector.add_peer(format!("peer{}", i));
1062        }
1063
1064        // Simulate peer 1 being selected way too often
1065        sleep(Duration::from_millis(15));
1066
1067        for _ in 0..100 {
1068            selector.record_request("peer1", 40);
1069            selector.record_success("peer1", 10, 100);
1070        }
1071
1072        // Other peers get very few requests
1073        for i in 2..=6 {
1074            selector.record_request(&format!("peer{}", i), 40);
1075            selector.record_success(&format!("peer{}", i), 10, 100);
1076        }
1077
1078        // Peer 1 should be skipped due to fairness (>30% selection rate)
1079        let skipped = selector.should_skip_for_fairness("peer1");
1080        let _ = skipped; // May or may not trigger depending on timing
1081    }
1082
1083    #[test]
1084    fn test_peer_selector_summary() {
1085        let mut selector = PeerSelector::new();
1086        selector.add_peer("peer1");
1087        selector.add_peer("peer2");
1088
1089        selector.record_request("peer1", 40);
1090        selector.record_success("peer1", 50, 1024);
1091        selector.record_request("peer2", 40);
1092        selector.record_timeout("peer2");
1093
1094        let summary = selector.summary();
1095        assert_eq!(summary.peer_count, 2);
1096        assert_eq!(summary.total_requests, 2);
1097        assert_eq!(summary.total_successes, 1);
1098        assert_eq!(summary.total_timeouts, 1);
1099        assert_eq!(summary.backed_off_count, 1);
1100        assert_eq!(summary.overall_success_rate, 0.5);
1101    }
1102
1103    #[test]
1104    fn test_peer_stats_score() {
1105        let mut stats = PeerStats::new("peer1");
1106
1107        // New peer has neutral score
1108        let initial_score = stats.score();
1109        assert!(initial_score > 0.3 && initial_score < 0.7);
1110
1111        // Good peer: high success rate + low RTT
1112        for _ in 0..10 {
1113            stats.record_request(40);
1114            stats.record_success(20, 1024);
1115        }
1116        let good_score = stats.score();
1117        assert!(good_score > 0.8);
1118
1119        // Bad peer: high timeout rate
1120        let mut bad_stats = PeerStats::new("peer2");
1121        for _ in 0..10 {
1122            bad_stats.record_request(40);
1123            bad_stats.record_timeout();
1124        }
1125        let bad_score = bad_stats.score();
1126        assert!(bad_score < 0.3);
1127
1128        assert!(good_score > bad_score);
1129    }
1130
1131    #[test]
1132    fn test_peer_stats_utility_score_prefers_good_over_bad() {
1133        let mut good = PeerStats::new("good");
1134        good.requests_sent = 120;
1135        good.successes = 96;
1136        good.failures = 8;
1137        good.timeouts = 4;
1138        good.srtt_ms = 30.0;
1139        good.bytes_sent = 120 * 40;
1140        good.bytes_received = 96 * 1024;
1141
1142        let mut bad = PeerStats::new("bad");
1143        bad.requests_sent = 120;
1144        bad.successes = 40;
1145        bad.failures = 50;
1146        bad.timeouts = 30;
1147        bad.srtt_ms = 220.0;
1148        bad.bytes_sent = 120 * 40;
1149        bad.bytes_received = 40 * 1024;
1150
1151        let total_requests = good.requests_sent + bad.requests_sent;
1152        assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1153    }
1154
1155    #[test]
1156    fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1157        let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1158        selector.add_peer("stable");
1159        selector.add_peer("new");
1160
1161        {
1162            let stable = selector.get_stats_mut("stable").unwrap();
1163            stable.requests_sent = 500;
1164            stable.successes = 450;
1165            stable.failures = 35;
1166            stable.timeouts = 15;
1167            stable.srtt_ms = 35.0;
1168            stable.bytes_sent = 500 * 40;
1169            stable.bytes_received = 450 * 1024;
1170        }
1171        {
1172            let new_peer = selector.get_stats_mut("new").unwrap();
1173            new_peer.requests_sent = 2;
1174            new_peer.successes = 2;
1175            new_peer.failures = 0;
1176            new_peer.timeouts = 0;
1177            new_peer.srtt_ms = 70.0;
1178            new_peer.bytes_sent = 2 * 40;
1179            new_peer.bytes_received = 2 * 1024;
1180        }
1181
1182        let peers = selector.select_peers();
1183        assert_eq!(peers[0], "new");
1184    }
1185
1186    #[test]
1187    fn test_lowest_latency_strategy() {
1188        let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1189        selector.add_peer("peer1");
1190        selector.add_peer("peer2");
1191        selector.add_peer("peer3");
1192
1193        // Peer 1: 100ms RTT
1194        selector.record_request("peer1", 40);
1195        selector.record_success("peer1", 100, 1024);
1196
1197        // Peer 2: 20ms RTT (fastest)
1198        selector.record_request("peer2", 40);
1199        selector.record_success("peer2", 20, 1024);
1200
1201        // Peer 3: 50ms RTT
1202        selector.record_request("peer3", 40);
1203        selector.record_success("peer3", 50, 1024);
1204
1205        let peers = selector.select_peers();
1206        // Peer 2 should be first (lowest RTT)
1207        assert_eq!(peers[0], "peer2");
1208    }
1209
1210    fn build_cashu_priority_fixture() -> PeerSelector {
1211        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1212        selector.add_peer("reliable");
1213        selector.add_peer("paid");
1214
1215        {
1216            let reliable = selector.get_stats_mut("reliable").expect("reliable");
1217            reliable.requests_sent = 80;
1218            reliable.successes = 75;
1219            reliable.failures = 2;
1220            reliable.timeouts = 3;
1221            reliable.srtt_ms = 40.0;
1222            reliable.bytes_sent = 80 * 40;
1223            reliable.bytes_received = 75 * 1024;
1224        }
1225        {
1226            let paid = selector.get_stats_mut("paid").expect("paid");
1227            paid.requests_sent = 80;
1228            paid.successes = 36;
1229            paid.failures = 24;
1230            paid.timeouts = 20;
1231            paid.srtt_ms = 700.0;
1232            paid.bytes_sent = 80 * 40;
1233            paid.bytes_received = 36 * 512;
1234        }
1235
1236        selector
1237    }
1238
1239    #[test]
1240    fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1241        let mut selector = build_cashu_priority_fixture();
1242        selector.set_cashu_payment_weight(0.0);
1243        selector.record_cashu_payment("paid", 5_000);
1244
1245        let peers = selector.select_peers();
1246        assert_eq!(peers[0], "reliable");
1247    }
1248
1249    #[test]
1250    fn test_cashu_payment_weight_prioritizes_paid_peer() {
1251        let mut selector = build_cashu_priority_fixture();
1252        selector.set_cashu_payment_weight(0.8);
1253        selector.record_cashu_payment("paid", 5_000);
1254
1255        let peers = selector.select_peers();
1256        assert_eq!(peers[0], "paid");
1257    }
1258
1259    #[test]
1260    fn test_cashu_payment_default_downranks_peer() {
1261        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1262        selector.add_peer("honest");
1263        selector.add_peer("delinquent");
1264
1265        for peer_id in ["honest", "delinquent"] {
1266            let stats = selector.get_stats_mut(peer_id).expect("stats");
1267            stats.requests_sent = 40;
1268            stats.successes = 34;
1269            stats.failures = 3;
1270            stats.timeouts = 3;
1271            stats.srtt_ms = 60.0;
1272            stats.bytes_sent = 40 * 40;
1273            stats.bytes_received = 34 * 1024;
1274        }
1275
1276        selector.record_cashu_payment_default("delinquent");
1277
1278        let peers = selector.select_peers();
1279        assert_eq!(peers[0], "honest");
1280        assert!(!peers.iter().any(|peer| peer == "delinquent"));
1281    }
1282
1283    #[test]
1284    fn test_payment_default_threshold_blocks_peer() {
1285        let mut selector = PeerSelector::new();
1286        selector.record_cashu_payment_default("peer-a");
1287        assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1288        assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1289    }
1290
1291    #[test]
1292    fn test_peer_principal_matches_peer_id() {
1293        assert_eq!(peer_principal("npub1abc"), "npub1abc");
1294        assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1295    }
1296
1297    #[test]
1298    fn test_metadata_snapshot_restores_for_same_peer_id() {
1299        let mut selector = PeerSelector::new();
1300        selector.add_peer("npub1stable");
1301        selector.record_request("npub1stable", 64);
1302        selector.record_success("npub1stable", 32, 1024);
1303        selector.record_cashu_payment("npub1stable", 77);
1304        selector.record_cashu_receipt("npub1stable", 33);
1305        selector.record_cashu_payment_default("npub1stable");
1306
1307        let snapshot = selector.export_peer_metadata_snapshot();
1308        assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1309        assert_eq!(snapshot.peers.len(), 1);
1310        assert_eq!(snapshot.peers[0].principal, "npub1stable");
1311
1312        let mut restored = PeerSelector::new();
1313        restored.import_peer_metadata_snapshot(&snapshot);
1314        restored.add_peer("npub1stable");
1315        let stats = restored.get_stats("npub1stable").expect("restored stats");
1316        assert_eq!(stats.requests_sent, 1);
1317        assert_eq!(stats.successes, 1);
1318        assert_eq!(stats.cashu_paid_sat, 77);
1319        assert_eq!(stats.cashu_received_sat, 33);
1320        assert_eq!(stats.cashu_payment_receipts, 1);
1321        assert_eq!(stats.cashu_payment_defaults, 1);
1322    }
1323}