Skip to main content

hashtree_network/
peer_selector.rs

1//! Adaptive peer selection based on Freenet patterns
2//!
3//! Implements sophisticated peer selection that favors reliable, fast peers:
4//! - Per-peer performance tracking (RTT, success rate)
5//! - RFC 2988-style smoothed RTT calculation
6//! - Exponential backoff for failing/slow peers
7//! - Fairness constraints to prevent overloading any single peer
8//! - Weighted selection combining multiple signals
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14/// Constants from Freenet's PeerManager
15const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; // Skip if selected >30% of time
16const SELECTION_MIN_PEERS: usize = 5; // Enable fairness if >=5 peers
17
18/// Backoff constants (from Freenet)
19const INITIAL_BACKOFF_MS: u64 = 1000; // 1 second initial backoff
20const BACKOFF_MULTIPLIER: u64 = 2; // Exponential backoff
21const MAX_BACKOFF_MS: u64 = 480_000; // 8 minutes max backoff
22
23/// RTO constants (RFC 2988)
24const MIN_RTO_MS: u64 = 50; // Minimum retransmission timeout
25const MAX_RTO_MS: u64 = 60_000; // Maximum RTO (60 seconds)
26const INITIAL_RTO_MS: u64 = 1000; // Initial RTO before any measurements
27
28/// Current schema version for persisted peer metadata snapshots.
29pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
30
31/// Persisted metadata for a logical peer principal (pubkey/npub identity).
32///
33/// This omits process-local runtime fields (`Instant`, active backoff timers) so
34/// metadata can survive restarts and session UUID churn.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
36pub struct PersistedPeerMetadata {
37    /// Stable principal identity (usually pubkey/npub).
38    pub principal: String,
39    pub requests_sent: u64,
40    pub successes: u64,
41    pub timeouts: u64,
42    pub failures: u64,
43    pub srtt_ms: f64,
44    pub rttvar_ms: f64,
45    pub rto_ms: u64,
46    pub bytes_received: u64,
47    pub bytes_sent: u64,
48    pub cashu_paid_sat: u64,
49    pub cashu_received_sat: u64,
50    pub cashu_payment_receipts: u64,
51    pub cashu_payment_defaults: u64,
52}
53
54impl PersistedPeerMetadata {
55    fn from_stats(principal: String, stats: &PeerStats) -> Self {
56        Self {
57            principal,
58            requests_sent: stats.requests_sent,
59            successes: stats.successes,
60            timeouts: stats.timeouts,
61            failures: stats.failures,
62            srtt_ms: sanitize_latency(stats.srtt_ms),
63            rttvar_ms: sanitize_latency(stats.rttvar_ms),
64            rto_ms: clamp_rto(stats.rto_ms),
65            bytes_received: stats.bytes_received,
66            bytes_sent: stats.bytes_sent,
67            cashu_paid_sat: stats.cashu_paid_sat,
68            cashu_received_sat: stats.cashu_received_sat,
69            cashu_payment_receipts: stats.cashu_payment_receipts,
70            cashu_payment_defaults: stats.cashu_payment_defaults,
71        }
72    }
73
74    fn apply_to_stats(&self, stats: &mut PeerStats) {
75        stats.requests_sent = self.requests_sent;
76        stats.successes = self.successes;
77        stats.timeouts = self.timeouts;
78        stats.failures = self.failures;
79        stats.srtt_ms = sanitize_latency(self.srtt_ms);
80        stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
81        stats.rto_ms = clamp_rto(self.rto_ms);
82        stats.bytes_received = self.bytes_received;
83        stats.bytes_sent = self.bytes_sent;
84        stats.cashu_paid_sat = self.cashu_paid_sat;
85        stats.cashu_received_sat = self.cashu_received_sat;
86        stats.cashu_payment_receipts = self.cashu_payment_receipts;
87        stats.cashu_payment_defaults = self.cashu_payment_defaults;
88
89        // Runtime-only state is intentionally reset on restore.
90        stats.backoff_level = 0;
91        stats.backed_off_until = None;
92        stats.last_success = None;
93        stats.last_failure = None;
94        stats.consecutive_rto_backoffs = 0;
95    }
96}
97
98/// Snapshot of metadata for all known principals.
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct PeerMetadataSnapshot {
101    pub version: u32,
102    pub peers: Vec<PersistedPeerMetadata>,
103}
104
105impl Default for PeerMetadataSnapshot {
106    fn default() -> Self {
107        Self {
108            version: PEER_METADATA_SNAPSHOT_VERSION,
109            peers: Vec::new(),
110        }
111    }
112}
113
114fn sanitize_latency(value: f64) -> f64 {
115    if value.is_finite() && value >= 0.0 {
116        value
117    } else {
118        0.0
119    }
120}
121
122fn compute_backoff_ms(level: u32) -> u64 {
123    if level == 0 {
124        return 0;
125    }
126
127    let mut backoff_ms = INITIAL_BACKOFF_MS;
128    for _ in 1..level {
129        backoff_ms = backoff_ms.saturating_mul(BACKOFF_MULTIPLIER);
130        if backoff_ms >= MAX_BACKOFF_MS {
131            return MAX_BACKOFF_MS;
132        }
133    }
134
135    backoff_ms.min(MAX_BACKOFF_MS)
136}
137
138fn clamp_rto(rto_ms: u64) -> u64 {
139    if rto_ms == 0 {
140        INITIAL_RTO_MS
141    } else {
142        rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
143    }
144}
145
146/// Extract the stable principal identity for a peer.
147///
148/// Peer IDs are the stable endpoint identifier now, so selector state keys map
149/// directly to the peer id instead of stripping any session suffix.
150pub fn peer_principal(peer_id: &str) -> &str {
151    peer_id
152}
153
154/// Per-peer performance statistics
155#[derive(Debug, Clone)]
156pub struct PeerStats {
157    /// Peer identifier
158    pub peer_id: String,
159    /// When this peer was connected
160    pub connected_at: Instant,
161    /// Total requests sent to this peer
162    pub requests_sent: u64,
163    /// Total successful responses received
164    pub successes: u64,
165    /// Total timeouts
166    pub timeouts: u64,
167    /// Total failures (bad data, disconnects, etc.)
168    pub failures: u64,
169    /// Smoothed round-trip time (RFC 2988 SRTT)
170    pub srtt_ms: f64,
171    /// RTT variance (RFC 2988 RTTVAR)
172    pub rttvar_ms: f64,
173    /// Retransmission timeout (computed from SRTT and RTTVAR)
174    pub rto_ms: u64,
175    /// Consecutive RTO backoffs (for capping)
176    pub consecutive_rto_backoffs: u32,
177    /// Current backoff level (how many times we've backed off)
178    pub backoff_level: u32,
179    /// When backoff expires (None if not backed off)
180    pub backed_off_until: Option<Instant>,
181    /// Last successful response timestamp
182    pub last_success: Option<Instant>,
183    /// Last failure timestamp
184    pub last_failure: Option<Instant>,
185    /// Total bytes received from this peer
186    pub bytes_received: u64,
187    /// Total bytes sent to this peer
188    pub bytes_sent: u64,
189    /// Total sats paid to this peer through an external payment channel.
190    pub cashu_paid_sat: u64,
191    /// Total sats this peer paid us after successful delivery.
192    pub cashu_received_sat: u64,
193    /// Number of successful post-delivery payments received from this peer.
194    pub cashu_payment_receipts: u64,
195    /// Number of times this peer failed to pay after successful delivery.
196    pub cashu_payment_defaults: u64,
197}
198
199impl PeerStats {
200    /// Create new peer stats
201    pub fn new(peer_id: impl Into<String>) -> Self {
202        Self {
203            peer_id: peer_id.into(),
204            connected_at: Instant::now(),
205            requests_sent: 0,
206            successes: 0,
207            timeouts: 0,
208            failures: 0,
209            srtt_ms: 0.0,
210            rttvar_ms: 0.0,
211            rto_ms: INITIAL_RTO_MS,
212            consecutive_rto_backoffs: 0,
213            backoff_level: 0,
214            backed_off_until: None,
215            last_success: None,
216            last_failure: None,
217            bytes_received: 0,
218            bytes_sent: 0,
219            cashu_paid_sat: 0,
220            cashu_received_sat: 0,
221            cashu_payment_receipts: 0,
222            cashu_payment_defaults: 0,
223        }
224    }
225
226    /// Get success rate (0.0 to 1.0)
227    pub fn success_rate(&self) -> f64 {
228        if self.requests_sent == 0 {
229            return 0.5; // Neutral for new peers
230        }
231        self.successes as f64 / self.requests_sent as f64
232    }
233
234    /// Get selection rate (selections per second since connected)
235    pub fn selection_rate(&self) -> f64 {
236        let elapsed = self.connected_at.elapsed();
237        if elapsed.as_secs() < 10 {
238            return 0.0; // Avoid bias from short uptime (Freenet pattern)
239        }
240        self.requests_sent as f64 / elapsed.as_secs_f64()
241    }
242
243    /// Check if peer is currently backed off
244    pub fn is_backed_off(&self) -> bool {
245        if let Some(until) = self.backed_off_until {
246            Instant::now() < until
247        } else {
248            false
249        }
250    }
251
252    /// Get remaining backoff time
253    pub fn backoff_remaining(&self) -> Duration {
254        if let Some(until) = self.backed_off_until {
255            let now = Instant::now();
256            if now < until {
257                return until - now;
258            }
259        }
260        Duration::ZERO
261    }
262
263    /// Record a request being sent
264    pub fn record_request(&mut self, bytes: u64) {
265        self.requests_sent += 1;
266        self.bytes_sent += bytes;
267    }
268
269    /// Record a successful response with RTT
270    /// Uses RFC 2988 algorithm for smoothed RTT calculation
271    pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
272        self.successes += 1;
273        self.bytes_received += bytes;
274        self.last_success = Some(Instant::now());
275        self.consecutive_rto_backoffs = 0;
276
277        // Clear backoff on success
278        self.backed_off_until = None;
279        self.backoff_level = 0;
280
281        // RFC 2988 RTT update
282        let rtt = rtt_ms as f64;
283        if self.srtt_ms == 0.0 {
284            // First measurement
285            self.srtt_ms = rtt;
286            self.rttvar_ms = rtt / 2.0;
287        } else {
288            // Subsequent measurements
289            // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R'|
290            // SRTT = (1 - alpha) * SRTT + alpha * R'
291            // where alpha = 1/8 = 0.125 and beta = 1/4 = 0.25
292            self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
293            self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
294        }
295
296        // RTO = SRTT + max(G, K*RTTVAR) where G=20ms granularity, K=4
297        let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
298        self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
299    }
300
301    /// Record a timeout
302    pub fn record_timeout(&mut self) {
303        self.timeouts += 1;
304        self.last_failure = Some(Instant::now());
305
306        // Apply backoff
307        self.apply_backoff();
308
309        // RFC 2988: Double RTO on timeout (up to max)
310        if self.consecutive_rto_backoffs < 5 {
311            self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
312            self.consecutive_rto_backoffs += 1;
313        }
314    }
315
316    /// Record a failure (bad data, disconnect, etc.)
317    pub fn record_failure(&mut self) {
318        self.failures += 1;
319        self.last_failure = Some(Instant::now());
320        self.apply_backoff();
321    }
322
323    /// Record an out-of-band payment to this peer (e.g. Cashu channel transfer).
324    pub fn record_cashu_payment(&mut self, amount_sat: u64) {
325        if amount_sat == 0 {
326            return;
327        }
328        self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
329    }
330
331    /// Record a settled payment received from this peer after we served data.
332    pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
333        if amount_sat == 0 {
334            return;
335        }
336        self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
337        self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
338    }
339
340    /// Record that this peer failed to pay after successful delivery.
341    pub fn record_cashu_payment_default(&mut self) {
342        self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
343        self.last_failure = Some(Instant::now());
344        self.apply_backoff();
345    }
346
347    /// Apply exponential backoff
348    fn apply_backoff(&mut self) {
349        self.backoff_level += 1;
350        let backoff_ms = compute_backoff_ms(self.backoff_level);
351        self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
352    }
353
354    /// Calculate peer score for selection (higher is better)
355    /// Combines success rate, RTT, and recent performance
356    pub fn score(&self) -> f64 {
357        // Base score from success rate (0-1)
358        let success_score = self.success_rate();
359
360        // RTT score: prefer faster peers (inverse of normalized RTT)
361        // Scale: 0-50ms = 1.0, 500ms+ = 0.1
362        let rtt_score = if self.srtt_ms <= 0.0 {
363            0.5 // Unknown RTT, neutral
364        } else {
365            (500.0 / (self.srtt_ms + 50.0)).min(1.0)
366        };
367
368        // Recency bonus: slight boost for recently successful peers
369        let recency_bonus = if let Some(last) = self.last_success {
370            let secs_ago = last.elapsed().as_secs_f64();
371            if secs_ago < 60.0 {
372                0.1 // Recent success
373            } else {
374                0.0
375            }
376        } else {
377            0.0
378        };
379
380        // Combine scores (weighted)
381        // Success rate is most important (60%), RTT next (30%), recency last (10%)
382        0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
383    }
384
385    /// Utility-centric score with exploration bonus (UCB-style).
386    ///
387    /// Balances:
388    /// - good/bad outcome ratio (successes vs failures+timeouts),
389    /// - latency efficiency,
390    /// - bytes efficiency (received vs sent),
391    /// - uncertainty bonus for less-tested peers.
392    pub fn utility_score(&self, total_requests: u64) -> f64 {
393        let good = self.successes as f64 + 1.0;
394        let bad = (self.failures + self.timeouts) as f64 + 1.0;
395        let ratio = good / bad;
396        let ratio_score = ratio / (1.0 + ratio);
397
398        let latency_score = if self.srtt_ms <= 0.0 {
399            0.5
400        } else {
401            (300.0 / (self.srtt_ms + 50.0)).min(1.0)
402        };
403
404        let efficiency_score = if self.bytes_sent == 0 {
405            0.5
406        } else {
407            (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
408        };
409
410        let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
411
412        let uncertainty =
413            (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
414        let exploration_bonus = 0.20 * uncertainty;
415
416        exploitation + exploration_bonus
417    }
418
419    /// Tit-for-tat style score:
420    /// - reward peers that answer our requests with useful bytes (reciprocity)
421    /// - reward reliable responders
422    /// - penalize timeout/failure-heavy peers (retaliation)
423    /// - keep a small exploration term for under-sampled peers
424    pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
425        // Beta prior keeps cold-start behavior neutral while converging quickly.
426        let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
427
428        // Reciprocity should not be dominated by one lucky large payload. We
429        // gate byte-ratio impact with success confidence.
430        let reciprocity_raw = if self.bytes_sent == 0 {
431            1.0
432        } else {
433            self.bytes_received as f64 / self.bytes_sent as f64
434        };
435        let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
436        let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
437        let reciprocity =
438            (1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
439
440        let rtt_score = if self.srtt_ms <= 0.0 {
441            0.5
442        } else {
443            (400.0 / (self.srtt_ms + 50.0)).min(1.0)
444        };
445
446        let timeout_rate = if self.requests_sent == 0 {
447            0.0
448        } else {
449            self.timeouts as f64 / self.requests_sent as f64
450        };
451        let failure_rate = if self.requests_sent == 0 {
452            0.0
453        } else {
454            self.failures as f64 / self.requests_sent as f64
455        };
456        let retaliation_penalty =
457            (0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
458                .min(0.95);
459
460        let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
461        let exploration = 0.03
462            * (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
463
464        (cooperative + exploration - retaliation_penalty).max(0.0)
465    }
466
467    /// Normalize paid amount to a bounded priority score in [0, 1).
468    pub fn cashu_priority_boost(&self) -> f64 {
469        if self.cashu_paid_sat == 0 {
470            return 0.0;
471        }
472        let paid = self.cashu_paid_sat as f64;
473        paid / (paid + 32.0)
474    }
475
476    /// Cooperative peers that actually pay us should not be penalized; repeated
477    /// defaults quickly reduce their desirability.
478    pub fn payment_reliability_multiplier(&self) -> f64 {
479        if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
480            return 1.0;
481        }
482        (self.cashu_payment_receipts as f64 + 1.0)
483            / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
484    }
485
486    pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
487        threshold > 0 && self.cashu_payment_defaults >= threshold
488    }
489}
490
491/// Peer selection strategy
492#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
493pub enum SelectionStrategy {
494    /// Select by score (success rate + RTT) - recommended
495    #[default]
496    Weighted,
497    /// Round-robin (ignores performance)
498    RoundRobin,
499    /// Random selection
500    Random,
501    /// Lowest RTT first
502    LowestLatency,
503    /// Highest success rate first
504    HighestSuccessRate,
505    /// Tit-for-tat style utility (reciprocity + reliability + retaliation + exploration)
506    TitForTat,
507    /// Utility + exploration (good/bad ratio + RTT/efficiency + UCB bonus)
508    UtilityUcb,
509}
510
511/// Adaptive peer selector
512///
513/// Tracks peer performance and selects peers intelligently:
514/// - Prefers high success rate peers
515/// - Prefers low latency peers
516/// - Backs off failing peers exponentially
517/// - Ensures fairness (no peer gets >30% of traffic with 5+ peers)
518#[derive(Debug, Default)]
519pub struct PeerSelector {
520    /// Per-peer statistics
521    stats: HashMap<String, PeerStats>,
522    /// Persisted peer metadata indexed by stable principal identity (pubkey/npub).
523    persisted_metadata: HashMap<String, PersistedPeerMetadata>,
524    /// Selection strategy
525    strategy: SelectionStrategy,
526    /// Enable fairness constraints (Freenet FOAF mitigation)
527    fairness_enabled: bool,
528    /// Round-robin index for RoundRobin strategy
529    round_robin_idx: usize,
530    /// Blending weight for payment priority. 0.0 keeps pure reputation routing.
531    cashu_payment_weight: f64,
532}
533
534impl PeerSelector {
535    /// Create a new peer selector with default weighted strategy
536    pub fn new() -> Self {
537        Self {
538            stats: HashMap::new(),
539            persisted_metadata: HashMap::new(),
540            strategy: SelectionStrategy::Weighted,
541            fairness_enabled: true,
542            round_robin_idx: 0,
543            cashu_payment_weight: 0.0,
544        }
545    }
546
547    /// Create with specific strategy
548    pub fn with_strategy(strategy: SelectionStrategy) -> Self {
549        Self {
550            stats: HashMap::new(),
551            persisted_metadata: HashMap::new(),
552            strategy,
553            fairness_enabled: true,
554            round_robin_idx: 0,
555            cashu_payment_weight: 0.0,
556        }
557    }
558
559    /// Enable/disable fairness constraints
560    pub fn set_fairness(&mut self, enabled: bool) {
561        self.fairness_enabled = enabled;
562    }
563
564    /// Configure payment-priority influence when ranking peers.
565    /// `0.0` disables payment influence and preserves reputation-only behavior.
566    pub fn set_cashu_payment_weight(&mut self, weight: f64) {
567        self.cashu_payment_weight = weight.clamp(0.0, 1.0);
568    }
569
570    /// Add a peer to track
571    pub fn add_peer(&mut self, peer_id: impl Into<String>) {
572        let peer_id = peer_id.into();
573        if self.stats.contains_key(&peer_id) {
574            return;
575        }
576
577        let mut stats = PeerStats::new(peer_id.clone());
578        if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
579            saved.apply_to_stats(&mut stats);
580        }
581        self.stats.insert(peer_id, stats);
582    }
583
584    /// Remove a peer
585    pub fn remove_peer(&mut self, peer_id: &str) {
586        if let Some(stats) = self.stats.remove(peer_id) {
587            let principal = peer_principal(&stats.peer_id).to_string();
588            self.persisted_metadata.insert(
589                principal.clone(),
590                PersistedPeerMetadata::from_stats(principal, &stats),
591            );
592        }
593    }
594
595    /// Get peer stats (immutable)
596    pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
597        self.stats.get(peer_id)
598    }
599
600    /// Get peer stats (mutable)
601    pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
602        self.stats.get_mut(peer_id)
603    }
604
605    /// Get all peer stats
606    pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
607        self.stats.values()
608    }
609
610    /// Whether this peer is currently backed off due to recent failures/timeouts.
611    pub fn is_peer_backed_off(&self, peer_id: &str) -> bool {
612        self.stats
613            .get(peer_id)
614            .is_some_and(PeerStats::is_backed_off)
615    }
616
617    /// Record a request being sent to a peer
618    pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
619        if let Some(stats) = self.stats.get_mut(peer_id) {
620            stats.record_request(bytes);
621        }
622    }
623
624    /// Record a successful response
625    pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
626        if let Some(stats) = self.stats.get_mut(peer_id) {
627            stats.record_success(rtt_ms, bytes);
628        }
629    }
630
631    /// Record a timeout
632    pub fn record_timeout(&mut self, peer_id: &str) {
633        if let Some(stats) = self.stats.get_mut(peer_id) {
634            stats.record_timeout();
635        }
636    }
637
638    /// Record a failure
639    pub fn record_failure(&mut self, peer_id: &str) {
640        if let Some(stats) = self.stats.get_mut(peer_id) {
641            stats.record_failure();
642        }
643    }
644
645    /// Record payment channel credit for a peer.
646    pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
647        if amount_sat == 0 {
648            return;
649        }
650        let entry = self
651            .stats
652            .entry(peer_id.to_string())
653            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
654        entry.record_cashu_payment(amount_sat);
655    }
656
657    /// Record a settled post-delivery payment received from a peer.
658    pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
659        if amount_sat == 0 {
660            return;
661        }
662        let entry = self
663            .stats
664            .entry(peer_id.to_string())
665            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
666        entry.record_cashu_receipt(amount_sat);
667    }
668
669    /// Record that a peer failed to settle after we delivered successfully.
670    pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
671        let entry = self
672            .stats
673            .entry(peer_id.to_string())
674            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
675        entry.record_cashu_payment_default();
676    }
677
678    pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
679        self.stats
680            .get(peer_id)
681            .map(|stats| stats.exceeds_payment_default_threshold(threshold))
682            .unwrap_or(false)
683    }
684
685    fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
686        let reliable_base = base_score * stats.payment_reliability_multiplier();
687        if self.cashu_payment_weight <= 0.0 {
688            return reliable_base;
689        }
690        let payment_score = stats.cashu_priority_boost();
691        (1.0 - self.cashu_payment_weight) * reliable_base
692            + self.cashu_payment_weight * payment_score
693    }
694
695    /// Get available (non-backed-off) peers
696    fn available_peers(&self) -> Vec<String> {
697        self.stats
698            .iter()
699            .filter(|(_, s)| !s.is_backed_off())
700            .map(|(id, _)| id.clone())
701            .collect()
702    }
703
704    /// Check fairness: should this peer be skipped due to over-selection?
705    #[cfg(test)]
706    fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
707        let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
708        self.should_skip_for_fairness_with_total(peer_id, total_rate)
709    }
710
711    fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
712        if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
713            return false;
714        }
715
716        // Check if this peer is selected too often
717        if let Some(stats) = self.stats.get(peer_id) {
718            let peer_rate = stats.selection_rate();
719            let proportion = peer_rate / total_rate;
720            return proportion > SELECTION_PERCENTAGE_WARNING;
721        }
722
723        false
724    }
725
726    /// Select peers ordered by preference
727    ///
728    /// Returns all available peers sorted by preference (best first).
729    /// Respects backoff states and fairness constraints.
730    pub fn select_peers(&mut self) -> Vec<String> {
731        let available = self.available_peers();
732        if available.is_empty() {
733            // If all peers are backed off, return backed off peers anyway
734            // sorted by when their backoff expires (soonest first)
735            let mut backed_off: Vec<_> = self
736                .stats
737                .iter()
738                .filter(|(_, s)| s.is_backed_off())
739                .map(|(id, s)| (id.clone(), s.backoff_remaining()))
740                .collect();
741            backed_off.sort_by_key(|(_, remaining)| *remaining);
742            return backed_off.into_iter().map(|(id, _)| id).collect();
743        }
744
745        // Apply fairness filter
746        let candidates: Vec<String> =
747            if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
748                let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
749                available
750                    .into_iter()
751                    .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
752                    .collect()
753            } else {
754                available
755            };
756
757        // If all peers were filtered out for fairness, use all available
758        let candidates = if candidates.is_empty() {
759            self.available_peers()
760        } else {
761            candidates
762        };
763
764        // Sort by strategy
765        let mut sorted: Vec<_> = candidates
766            .into_iter()
767            .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
768            .collect();
769
770        match self.strategy {
771            SelectionStrategy::Weighted => {
772                // Sort by score (highest first), then by peer_id for determinism
773                sorted.sort_by(|(id_a, a), (id_b, b)| {
774                    let score_a = self.blend_with_payment_priority(a, a.score());
775                    let score_b = self.blend_with_payment_priority(b, b.score());
776                    let score_cmp = score_b
777                        .partial_cmp(&score_a)
778                        .unwrap_or(std::cmp::Ordering::Equal);
779                    if score_cmp == std::cmp::Ordering::Equal {
780                        id_a.cmp(id_b) // Alphabetical for determinism
781                    } else {
782                        score_cmp
783                    }
784                });
785            }
786            SelectionStrategy::LowestLatency => {
787                // Sort by SRTT (lowest first), use score and peer_id as tiebreakers
788                sorted.sort_by(|(id_a, a), (id_b, b)| {
789                    let rtt_cmp = a
790                        .srtt_ms
791                        .partial_cmp(&b.srtt_ms)
792                        .unwrap_or(std::cmp::Ordering::Equal);
793                    if rtt_cmp == std::cmp::Ordering::Equal {
794                        let score_cmp = b
795                            .score()
796                            .partial_cmp(&a.score())
797                            .unwrap_or(std::cmp::Ordering::Equal);
798                        if score_cmp == std::cmp::Ordering::Equal {
799                            id_a.cmp(id_b)
800                        } else {
801                            score_cmp
802                        }
803                    } else {
804                        rtt_cmp
805                    }
806                });
807            }
808            SelectionStrategy::HighestSuccessRate => {
809                // Sort by success rate (highest first), peer_id as tiebreaker
810                sorted.sort_by(|(id_a, a), (id_b, b)| {
811                    let rate_cmp = b
812                        .success_rate()
813                        .partial_cmp(&a.success_rate())
814                        .unwrap_or(std::cmp::Ordering::Equal);
815                    if rate_cmp == std::cmp::Ordering::Equal {
816                        id_a.cmp(id_b)
817                    } else {
818                        rate_cmp
819                    }
820                });
821            }
822            SelectionStrategy::TitForTat => {
823                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
824                sorted.sort_by(|(id_a, a), (id_b, b)| {
825                    let score_a =
826                        self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
827                    let score_b =
828                        self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
829                    let score_cmp = score_b
830                        .partial_cmp(&score_a)
831                        .unwrap_or(std::cmp::Ordering::Equal);
832                    if score_cmp == std::cmp::Ordering::Equal {
833                        id_a.cmp(id_b)
834                    } else {
835                        score_cmp
836                    }
837                });
838            }
839            SelectionStrategy::UtilityUcb => {
840                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
841                sorted.sort_by(|(id_a, a), (id_b, b)| {
842                    let score_a =
843                        self.blend_with_payment_priority(a, a.utility_score(total_requests));
844                    let score_b =
845                        self.blend_with_payment_priority(b, b.utility_score(total_requests));
846                    let score_cmp = score_b
847                        .partial_cmp(&score_a)
848                        .unwrap_or(std::cmp::Ordering::Equal);
849                    if score_cmp == std::cmp::Ordering::Equal {
850                        id_a.cmp(id_b)
851                    } else {
852                        score_cmp
853                    }
854                });
855            }
856            SelectionStrategy::RoundRobin => {
857                // Rotate the list based on round-robin index
858                if !sorted.is_empty() {
859                    let idx = self.round_robin_idx % sorted.len();
860                    sorted.rotate_left(idx);
861                    self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
862                }
863            }
864            SelectionStrategy::Random => {
865                // Shuffle using simple deterministic approach for reproducibility
866                // In production, use proper random shuffle
867            }
868        }
869
870        sorted.into_iter().map(|(id, _)| id).collect()
871    }
872
873    /// Select single best peer
874    pub fn select_best(&mut self) -> Option<String> {
875        self.select_peers().into_iter().next()
876    }
877
878    /// Select top N peers
879    pub fn select_top(&mut self, n: usize) -> Vec<String> {
880        self.select_peers().into_iter().take(n).collect()
881    }
882
883    /// Get summary statistics across all peers
884    pub fn summary(&self) -> SelectorSummary {
885        let count = self.stats.len();
886        if count == 0 {
887            return SelectorSummary::default();
888        }
889
890        let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
891        let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
892        let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
893        let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
894
895        let avg_rtt = {
896            let rtts: Vec<f64> = self
897                .stats
898                .values()
899                .filter(|s| s.srtt_ms > 0.0)
900                .map(|s| s.srtt_ms)
901                .collect();
902            if rtts.is_empty() {
903                0.0
904            } else {
905                rtts.iter().sum::<f64>() / rtts.len() as f64
906            }
907        };
908
909        SelectorSummary {
910            peer_count: count,
911            total_requests,
912            total_successes,
913            total_timeouts,
914            backed_off_count: backed_off,
915            avg_rtt_ms: avg_rtt,
916            overall_success_rate: if total_requests > 0 {
917                total_successes as f64 / total_requests as f64
918            } else {
919                0.0
920            },
921        }
922    }
923
924    /// Export persisted peer metadata keyed by stable principal identity.
925    pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
926        let mut by_principal = self.persisted_metadata.clone();
927        for stats in self.stats.values() {
928            let principal = peer_principal(&stats.peer_id).to_string();
929            by_principal.insert(
930                principal.clone(),
931                PersistedPeerMetadata::from_stats(principal, stats),
932            );
933        }
934
935        let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
936        peers.sort_by(|a, b| a.principal.cmp(&b.principal));
937
938        PeerMetadataSnapshot {
939            version: PEER_METADATA_SNAPSHOT_VERSION,
940            peers,
941        }
942    }
943
944    /// Import persisted metadata and apply it to currently tracked peers.
945    pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
946        if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
947            return;
948        }
949
950        self.persisted_metadata.clear();
951        for peer in &snapshot.peers {
952            self.persisted_metadata
953                .insert(peer.principal.clone(), peer.clone());
954        }
955
956        for stats in self.stats.values_mut() {
957            if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
958                saved.apply_to_stats(stats);
959            }
960        }
961    }
962}
963
964/// Summary statistics for the selector
965#[derive(Debug, Clone, Default)]
966pub struct SelectorSummary {
967    pub peer_count: usize,
968    pub total_requests: u64,
969    pub total_successes: u64,
970    pub total_timeouts: u64,
971    pub backed_off_count: usize,
972    pub avg_rtt_ms: f64,
973    pub overall_success_rate: f64,
974}
975
976#[cfg(test)]
977mod tests {
978    use super::*;
979    use std::thread::sleep;
980
981    #[test]
982    fn test_peer_stats_success_rate() {
983        let mut stats = PeerStats::new("peer1");
984        assert_eq!(stats.success_rate(), 0.5); // Neutral for new peer
985
986        stats.record_request(40);
987        stats.record_success(50, 1024);
988        assert_eq!(stats.success_rate(), 1.0);
989
990        stats.record_request(40);
991        stats.record_timeout();
992        assert_eq!(stats.success_rate(), 0.5);
993    }
994
995    #[test]
996    fn test_peer_stats_rtt_calculation() {
997        let mut stats = PeerStats::new("peer1");
998
999        // First RTT measurement
1000        stats.record_request(40);
1001        stats.record_success(100, 1024);
1002        assert_eq!(stats.srtt_ms, 100.0);
1003        assert_eq!(stats.rttvar_ms, 50.0); // RTT/2
1004
1005        // Second measurement
1006        stats.record_request(40);
1007        stats.record_success(80, 1024);
1008        // SRTT = 0.875 * 100 + 0.125 * 80 = 87.5 + 10 = 97.5
1009        assert!((stats.srtt_ms - 97.5).abs() < 0.1);
1010    }
1011
1012    #[test]
1013    fn test_peer_stats_backoff() {
1014        let mut stats = PeerStats::new("peer1");
1015        assert!(!stats.is_backed_off());
1016
1017        stats.record_timeout();
1018        assert!(stats.is_backed_off());
1019        assert!(stats.backoff_remaining() > Duration::ZERO);
1020    }
1021
1022    #[test]
1023    fn test_peer_stats_backoff_clears_on_success() {
1024        let mut stats = PeerStats::new("peer1");
1025        stats.record_timeout();
1026        assert!(stats.is_backed_off());
1027
1028        stats.record_success(50, 1024);
1029        assert!(!stats.is_backed_off());
1030        assert_eq!(stats.backoff_level, 0);
1031    }
1032
1033    #[test]
1034    fn test_peer_stats_backoff_saturates_without_overflow() {
1035        let mut stats = PeerStats::new("peer1");
1036
1037        for _ in 0..128 {
1038            stats.record_failure();
1039        }
1040
1041        assert_eq!(compute_backoff_ms(stats.backoff_level), MAX_BACKOFF_MS);
1042        assert!(stats.is_backed_off());
1043    }
1044
1045    #[test]
1046    fn test_peer_selector_add_remove() {
1047        let mut selector = PeerSelector::new();
1048        selector.add_peer("peer1");
1049        selector.add_peer("peer2");
1050        assert!(selector.get_stats("peer1").is_some());
1051        assert!(selector.get_stats("peer2").is_some());
1052
1053        selector.remove_peer("peer1");
1054        assert!(selector.get_stats("peer1").is_none());
1055        assert!(selector.get_stats("peer2").is_some());
1056    }
1057
1058    #[test]
1059    fn test_peer_selector_weighted_selection() {
1060        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1061        selector.add_peer("peer1");
1062        selector.add_peer("peer2");
1063        selector.add_peer("peer3");
1064
1065        // Peer 1: good (high success, low RTT)
1066        selector.record_request("peer1", 40);
1067        selector.record_success("peer1", 20, 1024);
1068        selector.record_request("peer1", 40);
1069        selector.record_success("peer1", 25, 1024);
1070
1071        // Peer 2: medium
1072        selector.record_request("peer2", 40);
1073        selector.record_success("peer2", 100, 1024);
1074        selector.record_request("peer2", 40);
1075        selector.record_timeout("peer2");
1076
1077        // Peer 3: bad (timeouts)
1078        selector.record_request("peer3", 40);
1079        selector.record_timeout("peer3");
1080        selector.record_request("peer3", 40);
1081        selector.record_timeout("peer3");
1082
1083        // Peer 3 should be backed off
1084        let peers = selector.select_peers();
1085        // Peer 1 should be first (best score)
1086        assert_eq!(peers[0], "peer1");
1087    }
1088
1089    #[test]
1090    fn test_peer_selector_backed_off_peers() {
1091        let mut selector = PeerSelector::new();
1092        selector.add_peer("peer1");
1093        selector.add_peer("peer2");
1094
1095        // Back off peer 1
1096        selector.record_timeout("peer1");
1097        assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1098
1099        // Peer 2 should be available
1100        let peers = selector.select_peers();
1101        assert_eq!(peers.len(), 1);
1102        assert_eq!(peers[0], "peer2");
1103    }
1104
1105    #[test]
1106    fn test_peer_selector_all_backed_off_fallback() {
1107        let mut selector = PeerSelector::new();
1108        selector.add_peer("peer1");
1109        selector.add_peer("peer2");
1110
1111        // Back off both peers
1112        selector.record_timeout("peer1");
1113        selector.record_timeout("peer2");
1114
1115        // Should still return peers (sorted by backoff remaining)
1116        let peers = selector.select_peers();
1117        assert_eq!(peers.len(), 2);
1118    }
1119
1120    #[test]
1121    fn test_peer_selector_fairness() {
1122        let mut selector = PeerSelector::new();
1123        selector.set_fairness(true);
1124
1125        // Add 5+ peers to enable fairness
1126        for i in 1..=6 {
1127            selector.add_peer(format!("peer{}", i));
1128        }
1129
1130        // Simulate peer 1 being selected way too often
1131        sleep(Duration::from_millis(15));
1132
1133        for _ in 0..100 {
1134            selector.record_request("peer1", 40);
1135            selector.record_success("peer1", 10, 100);
1136        }
1137
1138        // Other peers get very few requests
1139        for i in 2..=6 {
1140            selector.record_request(&format!("peer{}", i), 40);
1141            selector.record_success(&format!("peer{}", i), 10, 100);
1142        }
1143
1144        // Peer 1 should be skipped due to fairness (>30% selection rate)
1145        let skipped = selector.should_skip_for_fairness("peer1");
1146        let _ = skipped; // May or may not trigger depending on timing
1147    }
1148
1149    #[test]
1150    fn test_peer_selector_summary() {
1151        let mut selector = PeerSelector::new();
1152        selector.add_peer("peer1");
1153        selector.add_peer("peer2");
1154
1155        selector.record_request("peer1", 40);
1156        selector.record_success("peer1", 50, 1024);
1157        selector.record_request("peer2", 40);
1158        selector.record_timeout("peer2");
1159
1160        let summary = selector.summary();
1161        assert_eq!(summary.peer_count, 2);
1162        assert_eq!(summary.total_requests, 2);
1163        assert_eq!(summary.total_successes, 1);
1164        assert_eq!(summary.total_timeouts, 1);
1165        assert_eq!(summary.backed_off_count, 1);
1166        assert_eq!(summary.overall_success_rate, 0.5);
1167    }
1168
1169    #[test]
1170    fn test_peer_stats_score() {
1171        let mut stats = PeerStats::new("peer1");
1172
1173        // New peer has neutral score
1174        let initial_score = stats.score();
1175        assert!(initial_score > 0.3 && initial_score < 0.7);
1176
1177        // Good peer: high success rate + low RTT
1178        for _ in 0..10 {
1179            stats.record_request(40);
1180            stats.record_success(20, 1024);
1181        }
1182        let good_score = stats.score();
1183        assert!(good_score > 0.8);
1184
1185        // Bad peer: high timeout rate
1186        let mut bad_stats = PeerStats::new("peer2");
1187        for _ in 0..10 {
1188            bad_stats.record_request(40);
1189            bad_stats.record_timeout();
1190        }
1191        let bad_score = bad_stats.score();
1192        assert!(bad_score < 0.3);
1193
1194        assert!(good_score > bad_score);
1195    }
1196
1197    #[test]
1198    fn test_peer_stats_utility_score_prefers_good_over_bad() {
1199        let mut good = PeerStats::new("good");
1200        good.requests_sent = 120;
1201        good.successes = 96;
1202        good.failures = 8;
1203        good.timeouts = 4;
1204        good.srtt_ms = 30.0;
1205        good.bytes_sent = 120 * 40;
1206        good.bytes_received = 96 * 1024;
1207
1208        let mut bad = PeerStats::new("bad");
1209        bad.requests_sent = 120;
1210        bad.successes = 40;
1211        bad.failures = 50;
1212        bad.timeouts = 30;
1213        bad.srtt_ms = 220.0;
1214        bad.bytes_sent = 120 * 40;
1215        bad.bytes_received = 40 * 1024;
1216
1217        let total_requests = good.requests_sent + bad.requests_sent;
1218        assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1219    }
1220
1221    #[test]
1222    fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
1223        let mut reciprocal = PeerStats::new("reciprocal");
1224        reciprocal.requests_sent = 100;
1225        reciprocal.successes = 90;
1226        reciprocal.failures = 5;
1227        reciprocal.timeouts = 5;
1228        reciprocal.srtt_ms = 40.0;
1229        reciprocal.bytes_sent = 100 * 40;
1230        reciprocal.bytes_received = 90 * 1024;
1231
1232        let mut leecher = PeerStats::new("leecher");
1233        leecher.requests_sent = 100;
1234        leecher.successes = 40;
1235        leecher.failures = 30;
1236        leecher.timeouts = 30;
1237        leecher.srtt_ms = 120.0;
1238        leecher.bytes_sent = 100 * 40;
1239        leecher.bytes_received = 10 * 1024;
1240
1241        let total_requests = reciprocal.requests_sent + leecher.requests_sent;
1242        assert!(
1243            reciprocal.tit_for_tat_score(total_requests)
1244                > leecher.tit_for_tat_score(total_requests)
1245        );
1246    }
1247
1248    #[test]
1249    fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1250        let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1251        selector.add_peer("stable");
1252        selector.add_peer("new");
1253
1254        {
1255            let stable = selector.get_stats_mut("stable").unwrap();
1256            stable.requests_sent = 500;
1257            stable.successes = 450;
1258            stable.failures = 35;
1259            stable.timeouts = 15;
1260            stable.srtt_ms = 35.0;
1261            stable.bytes_sent = 500 * 40;
1262            stable.bytes_received = 450 * 1024;
1263        }
1264        {
1265            let new_peer = selector.get_stats_mut("new").unwrap();
1266            new_peer.requests_sent = 2;
1267            new_peer.successes = 2;
1268            new_peer.failures = 0;
1269            new_peer.timeouts = 0;
1270            new_peer.srtt_ms = 70.0;
1271            new_peer.bytes_sent = 2 * 40;
1272            new_peer.bytes_received = 2 * 1024;
1273        }
1274
1275        let peers = selector.select_peers();
1276        assert_eq!(peers[0], "new");
1277    }
1278
1279    #[test]
1280    fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
1281        let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
1282        selector.add_peer("reciprocal");
1283        selector.add_peer("leecher");
1284
1285        {
1286            let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
1287            reciprocal.requests_sent = 120;
1288            reciprocal.successes = 102;
1289            reciprocal.failures = 8;
1290            reciprocal.timeouts = 10;
1291            reciprocal.srtt_ms = 45.0;
1292            reciprocal.bytes_sent = 120 * 40;
1293            reciprocal.bytes_received = 102 * 1024;
1294        }
1295        {
1296            let leecher = selector.get_stats_mut("leecher").unwrap();
1297            leecher.requests_sent = 120;
1298            leecher.successes = 70;
1299            leecher.failures = 20;
1300            leecher.timeouts = 30;
1301            leecher.srtt_ms = 35.0;
1302            leecher.bytes_sent = 120 * 40;
1303            leecher.bytes_received = 8 * 1024;
1304        }
1305
1306        let peers = selector.select_peers();
1307        assert_eq!(peers[0], "reciprocal");
1308    }
1309
1310    #[test]
1311    fn test_lowest_latency_strategy() {
1312        let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1313        selector.add_peer("peer1");
1314        selector.add_peer("peer2");
1315        selector.add_peer("peer3");
1316
1317        // Peer 1: 100ms RTT
1318        selector.record_request("peer1", 40);
1319        selector.record_success("peer1", 100, 1024);
1320
1321        // Peer 2: 20ms RTT (fastest)
1322        selector.record_request("peer2", 40);
1323        selector.record_success("peer2", 20, 1024);
1324
1325        // Peer 3: 50ms RTT
1326        selector.record_request("peer3", 40);
1327        selector.record_success("peer3", 50, 1024);
1328
1329        let peers = selector.select_peers();
1330        // Peer 2 should be first (lowest RTT)
1331        assert_eq!(peers[0], "peer2");
1332    }
1333
1334    fn build_cashu_priority_fixture() -> PeerSelector {
1335        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1336        selector.add_peer("reliable");
1337        selector.add_peer("paid");
1338
1339        {
1340            let reliable = selector.get_stats_mut("reliable").expect("reliable");
1341            reliable.requests_sent = 80;
1342            reliable.successes = 75;
1343            reliable.failures = 2;
1344            reliable.timeouts = 3;
1345            reliable.srtt_ms = 40.0;
1346            reliable.bytes_sent = 80 * 40;
1347            reliable.bytes_received = 75 * 1024;
1348        }
1349        {
1350            let paid = selector.get_stats_mut("paid").expect("paid");
1351            paid.requests_sent = 80;
1352            paid.successes = 36;
1353            paid.failures = 24;
1354            paid.timeouts = 20;
1355            paid.srtt_ms = 700.0;
1356            paid.bytes_sent = 80 * 40;
1357            paid.bytes_received = 36 * 512;
1358        }
1359
1360        selector
1361    }
1362
1363    #[test]
1364    fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1365        let mut selector = build_cashu_priority_fixture();
1366        selector.set_cashu_payment_weight(0.0);
1367        selector.record_cashu_payment("paid", 5_000);
1368
1369        let peers = selector.select_peers();
1370        assert_eq!(peers[0], "reliable");
1371    }
1372
1373    #[test]
1374    fn test_cashu_payment_weight_prioritizes_paid_peer() {
1375        let mut selector = build_cashu_priority_fixture();
1376        selector.set_cashu_payment_weight(0.8);
1377        selector.record_cashu_payment("paid", 5_000);
1378
1379        let peers = selector.select_peers();
1380        assert_eq!(peers[0], "paid");
1381    }
1382
1383    #[test]
1384    fn test_cashu_payment_default_downranks_peer() {
1385        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1386        selector.add_peer("honest");
1387        selector.add_peer("delinquent");
1388
1389        for peer_id in ["honest", "delinquent"] {
1390            let stats = selector.get_stats_mut(peer_id).expect("stats");
1391            stats.requests_sent = 40;
1392            stats.successes = 34;
1393            stats.failures = 3;
1394            stats.timeouts = 3;
1395            stats.srtt_ms = 60.0;
1396            stats.bytes_sent = 40 * 40;
1397            stats.bytes_received = 34 * 1024;
1398        }
1399
1400        selector.record_cashu_payment_default("delinquent");
1401
1402        let peers = selector.select_peers();
1403        assert_eq!(peers[0], "honest");
1404        assert!(!peers.iter().any(|peer| peer == "delinquent"));
1405    }
1406
1407    #[test]
1408    fn test_payment_default_threshold_blocks_peer() {
1409        let mut selector = PeerSelector::new();
1410        selector.record_cashu_payment_default("peer-a");
1411        assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1412        assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1413    }
1414
1415    #[test]
1416    fn test_peer_principal_matches_peer_id() {
1417        assert_eq!(peer_principal("npub1abc"), "npub1abc");
1418        assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1419    }
1420
1421    #[test]
1422    fn test_metadata_snapshot_restores_for_same_peer_id() {
1423        let mut selector = PeerSelector::new();
1424        selector.add_peer("npub1stable");
1425        selector.record_request("npub1stable", 64);
1426        selector.record_success("npub1stable", 32, 1024);
1427        selector.record_cashu_payment("npub1stable", 77);
1428        selector.record_cashu_receipt("npub1stable", 33);
1429        selector.record_cashu_payment_default("npub1stable");
1430
1431        let snapshot = selector.export_peer_metadata_snapshot();
1432        assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1433        assert_eq!(snapshot.peers.len(), 1);
1434        assert_eq!(snapshot.peers[0].principal, "npub1stable");
1435
1436        let mut restored = PeerSelector::new();
1437        restored.import_peer_metadata_snapshot(&snapshot);
1438        restored.add_peer("npub1stable");
1439        let stats = restored.get_stats("npub1stable").expect("restored stats");
1440        assert_eq!(stats.requests_sent, 1);
1441        assert_eq!(stats.successes, 1);
1442        assert_eq!(stats.cashu_paid_sat, 77);
1443        assert_eq!(stats.cashu_received_sat, 33);
1444        assert_eq!(stats.cashu_payment_receipts, 1);
1445        assert_eq!(stats.cashu_payment_defaults, 1);
1446    }
1447}