Skip to main content

hashtree_network/
peer_selector.rs

1//! Adaptive peer selection based on Freenet patterns
2//!
3//! Implements sophisticated peer selection that favors reliable, fast peers:
4//! - Per-peer performance tracking (RTT, success rate)
5//! - RFC 2988-style smoothed RTT calculation
6//! - Exponential backoff for failing/slow peers
7//! - Fairness constraints to prevent overloading any single peer
8//! - Weighted selection combining multiple signals
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14/// Constants from Freenet's PeerManager
15const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; // Skip if selected >30% of time
16const SELECTION_MIN_PEERS: usize = 5; // Enable fairness if >=5 peers
17
18/// Backoff constants (from Freenet)
19const INITIAL_BACKOFF_MS: u64 = 1000; // 1 second initial backoff
20const BACKOFF_MULTIPLIER: u64 = 2; // Exponential backoff
21const MAX_BACKOFF_MS: u64 = 480_000; // 8 minutes max backoff
22
23/// RTO constants (RFC 2988)
24const MIN_RTO_MS: u64 = 50; // Minimum retransmission timeout
25const MAX_RTO_MS: u64 = 60_000; // Maximum RTO (60 seconds)
26const INITIAL_RTO_MS: u64 = 1000; // Initial RTO before any measurements
27
28/// Current schema version for persisted peer metadata snapshots.
29pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
30
31/// Persisted metadata for a logical peer principal (pubkey/npub identity).
32///
33/// This omits process-local runtime fields (`Instant`, active backoff timers) so
34/// metadata can survive restarts and session UUID churn.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
36pub struct PersistedPeerMetadata {
37    /// Stable principal identity (usually pubkey/npub).
38    pub principal: String,
39    pub requests_sent: u64,
40    pub successes: u64,
41    pub timeouts: u64,
42    pub failures: u64,
43    pub srtt_ms: f64,
44    pub rttvar_ms: f64,
45    pub rto_ms: u64,
46    pub bytes_received: u64,
47    pub bytes_sent: u64,
48    pub cashu_paid_sat: u64,
49    pub cashu_received_sat: u64,
50    pub cashu_payment_receipts: u64,
51    pub cashu_payment_defaults: u64,
52}
53
54impl PersistedPeerMetadata {
55    fn from_stats(principal: String, stats: &PeerStats) -> Self {
56        Self {
57            principal,
58            requests_sent: stats.requests_sent,
59            successes: stats.successes,
60            timeouts: stats.timeouts,
61            failures: stats.failures,
62            srtt_ms: sanitize_latency(stats.srtt_ms),
63            rttvar_ms: sanitize_latency(stats.rttvar_ms),
64            rto_ms: clamp_rto(stats.rto_ms),
65            bytes_received: stats.bytes_received,
66            bytes_sent: stats.bytes_sent,
67            cashu_paid_sat: stats.cashu_paid_sat,
68            cashu_received_sat: stats.cashu_received_sat,
69            cashu_payment_receipts: stats.cashu_payment_receipts,
70            cashu_payment_defaults: stats.cashu_payment_defaults,
71        }
72    }
73
74    fn apply_to_stats(&self, stats: &mut PeerStats) {
75        stats.requests_sent = self.requests_sent;
76        stats.successes = self.successes;
77        stats.timeouts = self.timeouts;
78        stats.failures = self.failures;
79        stats.srtt_ms = sanitize_latency(self.srtt_ms);
80        stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
81        stats.rto_ms = clamp_rto(self.rto_ms);
82        stats.bytes_received = self.bytes_received;
83        stats.bytes_sent = self.bytes_sent;
84        stats.cashu_paid_sat = self.cashu_paid_sat;
85        stats.cashu_received_sat = self.cashu_received_sat;
86        stats.cashu_payment_receipts = self.cashu_payment_receipts;
87        stats.cashu_payment_defaults = self.cashu_payment_defaults;
88
89        // Runtime-only state is intentionally reset on restore.
90        stats.backoff_level = 0;
91        stats.backed_off_until = None;
92        stats.last_success = None;
93        stats.last_failure = None;
94        stats.consecutive_rto_backoffs = 0;
95    }
96}
97
98/// Snapshot of metadata for all known principals.
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct PeerMetadataSnapshot {
101    pub version: u32,
102    pub peers: Vec<PersistedPeerMetadata>,
103}
104
105impl Default for PeerMetadataSnapshot {
106    fn default() -> Self {
107        Self {
108            version: PEER_METADATA_SNAPSHOT_VERSION,
109            peers: Vec::new(),
110        }
111    }
112}
113
114fn sanitize_latency(value: f64) -> f64 {
115    if value.is_finite() && value >= 0.0 {
116        value
117    } else {
118        0.0
119    }
120}
121
122fn clamp_rto(rto_ms: u64) -> u64 {
123    if rto_ms == 0 {
124        INITIAL_RTO_MS
125    } else {
126        rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
127    }
128}
129
130/// Extract the stable principal identity for a peer.
131///
132/// Peer IDs are the stable endpoint identifier now, so selector state keys map
133/// directly to the peer id instead of stripping any session suffix.
134pub fn peer_principal(peer_id: &str) -> &str {
135    peer_id
136}
137
138/// Per-peer performance statistics
139#[derive(Debug, Clone)]
140pub struct PeerStats {
141    /// Peer identifier
142    pub peer_id: String,
143    /// When this peer was connected
144    pub connected_at: Instant,
145    /// Total requests sent to this peer
146    pub requests_sent: u64,
147    /// Total successful responses received
148    pub successes: u64,
149    /// Total timeouts
150    pub timeouts: u64,
151    /// Total failures (bad data, disconnects, etc.)
152    pub failures: u64,
153    /// Smoothed round-trip time (RFC 2988 SRTT)
154    pub srtt_ms: f64,
155    /// RTT variance (RFC 2988 RTTVAR)
156    pub rttvar_ms: f64,
157    /// Retransmission timeout (computed from SRTT and RTTVAR)
158    pub rto_ms: u64,
159    /// Consecutive RTO backoffs (for capping)
160    pub consecutive_rto_backoffs: u32,
161    /// Current backoff level (how many times we've backed off)
162    pub backoff_level: u32,
163    /// When backoff expires (None if not backed off)
164    pub backed_off_until: Option<Instant>,
165    /// Last successful response timestamp
166    pub last_success: Option<Instant>,
167    /// Last failure timestamp
168    pub last_failure: Option<Instant>,
169    /// Total bytes received from this peer
170    pub bytes_received: u64,
171    /// Total bytes sent to this peer
172    pub bytes_sent: u64,
173    /// Total sats paid to this peer through an external payment channel.
174    pub cashu_paid_sat: u64,
175    /// Total sats this peer paid us after successful delivery.
176    pub cashu_received_sat: u64,
177    /// Number of successful post-delivery payments received from this peer.
178    pub cashu_payment_receipts: u64,
179    /// Number of times this peer failed to pay after successful delivery.
180    pub cashu_payment_defaults: u64,
181}
182
183impl PeerStats {
184    /// Create new peer stats
185    pub fn new(peer_id: impl Into<String>) -> Self {
186        Self {
187            peer_id: peer_id.into(),
188            connected_at: Instant::now(),
189            requests_sent: 0,
190            successes: 0,
191            timeouts: 0,
192            failures: 0,
193            srtt_ms: 0.0,
194            rttvar_ms: 0.0,
195            rto_ms: INITIAL_RTO_MS,
196            consecutive_rto_backoffs: 0,
197            backoff_level: 0,
198            backed_off_until: None,
199            last_success: None,
200            last_failure: None,
201            bytes_received: 0,
202            bytes_sent: 0,
203            cashu_paid_sat: 0,
204            cashu_received_sat: 0,
205            cashu_payment_receipts: 0,
206            cashu_payment_defaults: 0,
207        }
208    }
209
210    /// Get success rate (0.0 to 1.0)
211    pub fn success_rate(&self) -> f64 {
212        if self.requests_sent == 0 {
213            return 0.5; // Neutral for new peers
214        }
215        self.successes as f64 / self.requests_sent as f64
216    }
217
218    /// Get selection rate (selections per second since connected)
219    pub fn selection_rate(&self) -> f64 {
220        let elapsed = self.connected_at.elapsed();
221        if elapsed.as_secs() < 10 {
222            return 0.0; // Avoid bias from short uptime (Freenet pattern)
223        }
224        self.requests_sent as f64 / elapsed.as_secs_f64()
225    }
226
227    /// Check if peer is currently backed off
228    pub fn is_backed_off(&self) -> bool {
229        if let Some(until) = self.backed_off_until {
230            Instant::now() < until
231        } else {
232            false
233        }
234    }
235
236    /// Get remaining backoff time
237    pub fn backoff_remaining(&self) -> Duration {
238        if let Some(until) = self.backed_off_until {
239            let now = Instant::now();
240            if now < until {
241                return until - now;
242            }
243        }
244        Duration::ZERO
245    }
246
247    /// Record a request being sent
248    pub fn record_request(&mut self, bytes: u64) {
249        self.requests_sent += 1;
250        self.bytes_sent += bytes;
251    }
252
253    /// Record a successful response with RTT
254    /// Uses RFC 2988 algorithm for smoothed RTT calculation
255    pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
256        self.successes += 1;
257        self.bytes_received += bytes;
258        self.last_success = Some(Instant::now());
259        self.consecutive_rto_backoffs = 0;
260
261        // Clear backoff on success
262        self.backed_off_until = None;
263        self.backoff_level = 0;
264
265        // RFC 2988 RTT update
266        let rtt = rtt_ms as f64;
267        if self.srtt_ms == 0.0 {
268            // First measurement
269            self.srtt_ms = rtt;
270            self.rttvar_ms = rtt / 2.0;
271        } else {
272            // Subsequent measurements
273            // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R'|
274            // SRTT = (1 - alpha) * SRTT + alpha * R'
275            // where alpha = 1/8 = 0.125 and beta = 1/4 = 0.25
276            self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
277            self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
278        }
279
280        // RTO = SRTT + max(G, K*RTTVAR) where G=20ms granularity, K=4
281        let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
282        self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
283    }
284
285    /// Record a timeout
286    pub fn record_timeout(&mut self) {
287        self.timeouts += 1;
288        self.last_failure = Some(Instant::now());
289
290        // Apply backoff
291        self.apply_backoff();
292
293        // RFC 2988: Double RTO on timeout (up to max)
294        if self.consecutive_rto_backoffs < 5 {
295            self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
296            self.consecutive_rto_backoffs += 1;
297        }
298    }
299
300    /// Record a failure (bad data, disconnect, etc.)
301    pub fn record_failure(&mut self) {
302        self.failures += 1;
303        self.last_failure = Some(Instant::now());
304        self.apply_backoff();
305    }
306
307    /// Record an out-of-band payment to this peer (e.g. Cashu channel transfer).
308    pub fn record_cashu_payment(&mut self, amount_sat: u64) {
309        if amount_sat == 0 {
310            return;
311        }
312        self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
313    }
314
315    /// Record a settled payment received from this peer after we served data.
316    pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
317        if amount_sat == 0 {
318            return;
319        }
320        self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
321        self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
322    }
323
324    /// Record that this peer failed to pay after successful delivery.
325    pub fn record_cashu_payment_default(&mut self) {
326        self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
327        self.last_failure = Some(Instant::now());
328        self.apply_backoff();
329    }
330
331    /// Apply exponential backoff
332    fn apply_backoff(&mut self) {
333        self.backoff_level += 1;
334        let backoff_ms = (INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(self.backoff_level - 1))
335            .min(MAX_BACKOFF_MS);
336        self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
337    }
338
339    /// Calculate peer score for selection (higher is better)
340    /// Combines success rate, RTT, and recent performance
341    pub fn score(&self) -> f64 {
342        // Base score from success rate (0-1)
343        let success_score = self.success_rate();
344
345        // RTT score: prefer faster peers (inverse of normalized RTT)
346        // Scale: 0-50ms = 1.0, 500ms+ = 0.1
347        let rtt_score = if self.srtt_ms <= 0.0 {
348            0.5 // Unknown RTT, neutral
349        } else {
350            (500.0 / (self.srtt_ms + 50.0)).min(1.0)
351        };
352
353        // Recency bonus: slight boost for recently successful peers
354        let recency_bonus = if let Some(last) = self.last_success {
355            let secs_ago = last.elapsed().as_secs_f64();
356            if secs_ago < 60.0 {
357                0.1 // Recent success
358            } else {
359                0.0
360            }
361        } else {
362            0.0
363        };
364
365        // Combine scores (weighted)
366        // Success rate is most important (60%), RTT next (30%), recency last (10%)
367        0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
368    }
369
370    /// Utility-centric score with exploration bonus (UCB-style).
371    ///
372    /// Balances:
373    /// - good/bad outcome ratio (successes vs failures+timeouts),
374    /// - latency efficiency,
375    /// - bytes efficiency (received vs sent),
376    /// - uncertainty bonus for less-tested peers.
377    pub fn utility_score(&self, total_requests: u64) -> f64 {
378        let good = self.successes as f64 + 1.0;
379        let bad = (self.failures + self.timeouts) as f64 + 1.0;
380        let ratio = good / bad;
381        let ratio_score = ratio / (1.0 + ratio);
382
383        let latency_score = if self.srtt_ms <= 0.0 {
384            0.5
385        } else {
386            (300.0 / (self.srtt_ms + 50.0)).min(1.0)
387        };
388
389        let efficiency_score = if self.bytes_sent == 0 {
390            0.5
391        } else {
392            (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
393        };
394
395        let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
396
397        let uncertainty =
398            (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
399        let exploration_bonus = 0.20 * uncertainty;
400
401        exploitation + exploration_bonus
402    }
403
404    /// Tit-for-tat style score:
405    /// - reward peers that answer our requests with useful bytes (reciprocity)
406    /// - reward reliable responders
407    /// - penalize timeout/failure-heavy peers (retaliation)
408    /// - keep a small exploration term for under-sampled peers
409    pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
410        // Beta prior keeps cold-start behavior neutral while converging quickly.
411        let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
412
413        // Reciprocity should not be dominated by one lucky large payload. We
414        // gate byte-ratio impact with success confidence.
415        let reciprocity_raw = if self.bytes_sent == 0 {
416            1.0
417        } else {
418            self.bytes_received as f64 / self.bytes_sent as f64
419        };
420        let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
421        let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
422        let reciprocity =
423            (1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
424
425        let rtt_score = if self.srtt_ms <= 0.0 {
426            0.5
427        } else {
428            (400.0 / (self.srtt_ms + 50.0)).min(1.0)
429        };
430
431        let timeout_rate = if self.requests_sent == 0 {
432            0.0
433        } else {
434            self.timeouts as f64 / self.requests_sent as f64
435        };
436        let failure_rate = if self.requests_sent == 0 {
437            0.0
438        } else {
439            self.failures as f64 / self.requests_sent as f64
440        };
441        let retaliation_penalty =
442            (0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
443                .min(0.95);
444
445        let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
446        let exploration = 0.03
447            * (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
448
449        (cooperative + exploration - retaliation_penalty).max(0.0)
450    }
451
452    /// Normalize paid amount to a bounded priority score in [0, 1).
453    pub fn cashu_priority_boost(&self) -> f64 {
454        if self.cashu_paid_sat == 0 {
455            return 0.0;
456        }
457        let paid = self.cashu_paid_sat as f64;
458        paid / (paid + 32.0)
459    }
460
461    /// Cooperative peers that actually pay us should not be penalized; repeated
462    /// defaults quickly reduce their desirability.
463    pub fn payment_reliability_multiplier(&self) -> f64 {
464        if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
465            return 1.0;
466        }
467        (self.cashu_payment_receipts as f64 + 1.0)
468            / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
469    }
470
471    pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
472        threshold > 0 && self.cashu_payment_defaults >= threshold
473    }
474}
475
476/// Peer selection strategy
477#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
478pub enum SelectionStrategy {
479    /// Select by score (success rate + RTT) - recommended
480    #[default]
481    Weighted,
482    /// Round-robin (ignores performance)
483    RoundRobin,
484    /// Random selection
485    Random,
486    /// Lowest RTT first
487    LowestLatency,
488    /// Highest success rate first
489    HighestSuccessRate,
490    /// Tit-for-tat style utility (reciprocity + reliability + retaliation + exploration)
491    TitForTat,
492    /// Utility + exploration (good/bad ratio + RTT/efficiency + UCB bonus)
493    UtilityUcb,
494}
495
496/// Adaptive peer selector
497///
498/// Tracks peer performance and selects peers intelligently:
499/// - Prefers high success rate peers
500/// - Prefers low latency peers
501/// - Backs off failing peers exponentially
502/// - Ensures fairness (no peer gets >30% of traffic with 5+ peers)
503#[derive(Debug, Default)]
504pub struct PeerSelector {
505    /// Per-peer statistics
506    stats: HashMap<String, PeerStats>,
507    /// Persisted peer metadata indexed by stable principal identity (pubkey/npub).
508    persisted_metadata: HashMap<String, PersistedPeerMetadata>,
509    /// Selection strategy
510    strategy: SelectionStrategy,
511    /// Enable fairness constraints (Freenet FOAF mitigation)
512    fairness_enabled: bool,
513    /// Round-robin index for RoundRobin strategy
514    round_robin_idx: usize,
515    /// Blending weight for payment priority. 0.0 keeps pure reputation routing.
516    cashu_payment_weight: f64,
517}
518
519impl PeerSelector {
520    /// Create a new peer selector with default weighted strategy
521    pub fn new() -> Self {
522        Self {
523            stats: HashMap::new(),
524            persisted_metadata: HashMap::new(),
525            strategy: SelectionStrategy::Weighted,
526            fairness_enabled: true,
527            round_robin_idx: 0,
528            cashu_payment_weight: 0.0,
529        }
530    }
531
532    /// Create with specific strategy
533    pub fn with_strategy(strategy: SelectionStrategy) -> Self {
534        Self {
535            stats: HashMap::new(),
536            persisted_metadata: HashMap::new(),
537            strategy,
538            fairness_enabled: true,
539            round_robin_idx: 0,
540            cashu_payment_weight: 0.0,
541        }
542    }
543
544    /// Enable/disable fairness constraints
545    pub fn set_fairness(&mut self, enabled: bool) {
546        self.fairness_enabled = enabled;
547    }
548
549    /// Configure payment-priority influence when ranking peers.
550    /// `0.0` disables payment influence and preserves reputation-only behavior.
551    pub fn set_cashu_payment_weight(&mut self, weight: f64) {
552        self.cashu_payment_weight = weight.clamp(0.0, 1.0);
553    }
554
555    /// Add a peer to track
556    pub fn add_peer(&mut self, peer_id: impl Into<String>) {
557        let peer_id = peer_id.into();
558        if self.stats.contains_key(&peer_id) {
559            return;
560        }
561
562        let mut stats = PeerStats::new(peer_id.clone());
563        if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
564            saved.apply_to_stats(&mut stats);
565        }
566        self.stats.insert(peer_id, stats);
567    }
568
569    /// Remove a peer
570    pub fn remove_peer(&mut self, peer_id: &str) {
571        if let Some(stats) = self.stats.remove(peer_id) {
572            let principal = peer_principal(&stats.peer_id).to_string();
573            self.persisted_metadata.insert(
574                principal.clone(),
575                PersistedPeerMetadata::from_stats(principal, &stats),
576            );
577        }
578    }
579
580    /// Get peer stats (immutable)
581    pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
582        self.stats.get(peer_id)
583    }
584
585    /// Get peer stats (mutable)
586    pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
587        self.stats.get_mut(peer_id)
588    }
589
590    /// Get all peer stats
591    pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
592        self.stats.values()
593    }
594
595    /// Record a request being sent to a peer
596    pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
597        if let Some(stats) = self.stats.get_mut(peer_id) {
598            stats.record_request(bytes);
599        }
600    }
601
602    /// Record a successful response
603    pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
604        if let Some(stats) = self.stats.get_mut(peer_id) {
605            stats.record_success(rtt_ms, bytes);
606        }
607    }
608
609    /// Record a timeout
610    pub fn record_timeout(&mut self, peer_id: &str) {
611        if let Some(stats) = self.stats.get_mut(peer_id) {
612            stats.record_timeout();
613        }
614    }
615
616    /// Record a failure
617    pub fn record_failure(&mut self, peer_id: &str) {
618        if let Some(stats) = self.stats.get_mut(peer_id) {
619            stats.record_failure();
620        }
621    }
622
623    /// Record payment channel credit for a peer.
624    pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
625        if amount_sat == 0 {
626            return;
627        }
628        let entry = self
629            .stats
630            .entry(peer_id.to_string())
631            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
632        entry.record_cashu_payment(amount_sat);
633    }
634
635    /// Record a settled post-delivery payment received from a peer.
636    pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
637        if amount_sat == 0 {
638            return;
639        }
640        let entry = self
641            .stats
642            .entry(peer_id.to_string())
643            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
644        entry.record_cashu_receipt(amount_sat);
645    }
646
647    /// Record that a peer failed to settle after we delivered successfully.
648    pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
649        let entry = self
650            .stats
651            .entry(peer_id.to_string())
652            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
653        entry.record_cashu_payment_default();
654    }
655
656    pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
657        self.stats
658            .get(peer_id)
659            .map(|stats| stats.exceeds_payment_default_threshold(threshold))
660            .unwrap_or(false)
661    }
662
663    fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
664        let reliable_base = base_score * stats.payment_reliability_multiplier();
665        if self.cashu_payment_weight <= 0.0 {
666            return reliable_base;
667        }
668        let payment_score = stats.cashu_priority_boost();
669        (1.0 - self.cashu_payment_weight) * reliable_base
670            + self.cashu_payment_weight * payment_score
671    }
672
673    /// Get available (non-backed-off) peers
674    fn available_peers(&self) -> Vec<String> {
675        self.stats
676            .iter()
677            .filter(|(_, s)| !s.is_backed_off())
678            .map(|(id, _)| id.clone())
679            .collect()
680    }
681
682    /// Check fairness: should this peer be skipped due to over-selection?
683    #[cfg(test)]
684    fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
685        let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
686        self.should_skip_for_fairness_with_total(peer_id, total_rate)
687    }
688
689    fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
690        if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
691            return false;
692        }
693
694        // Check if this peer is selected too often
695        if let Some(stats) = self.stats.get(peer_id) {
696            let peer_rate = stats.selection_rate();
697            let proportion = peer_rate / total_rate;
698            return proportion > SELECTION_PERCENTAGE_WARNING;
699        }
700
701        false
702    }
703
704    /// Select peers ordered by preference
705    ///
706    /// Returns all available peers sorted by preference (best first).
707    /// Respects backoff states and fairness constraints.
708    pub fn select_peers(&mut self) -> Vec<String> {
709        let available = self.available_peers();
710        if available.is_empty() {
711            // If all peers are backed off, return backed off peers anyway
712            // sorted by when their backoff expires (soonest first)
713            let mut backed_off: Vec<_> = self
714                .stats
715                .iter()
716                .filter(|(_, s)| s.is_backed_off())
717                .map(|(id, s)| (id.clone(), s.backoff_remaining()))
718                .collect();
719            backed_off.sort_by_key(|(_, remaining)| *remaining);
720            return backed_off.into_iter().map(|(id, _)| id).collect();
721        }
722
723        // Apply fairness filter
724        let candidates: Vec<String> =
725            if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
726                let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
727                available
728                    .into_iter()
729                    .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
730                    .collect()
731            } else {
732                available
733            };
734
735        // If all peers were filtered out for fairness, use all available
736        let candidates = if candidates.is_empty() {
737            self.available_peers()
738        } else {
739            candidates
740        };
741
742        // Sort by strategy
743        let mut sorted: Vec<_> = candidates
744            .into_iter()
745            .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
746            .collect();
747
748        match self.strategy {
749            SelectionStrategy::Weighted => {
750                // Sort by score (highest first), then by peer_id for determinism
751                sorted.sort_by(|(id_a, a), (id_b, b)| {
752                    let score_a = self.blend_with_payment_priority(a, a.score());
753                    let score_b = self.blend_with_payment_priority(b, b.score());
754                    let score_cmp = score_b
755                        .partial_cmp(&score_a)
756                        .unwrap_or(std::cmp::Ordering::Equal);
757                    if score_cmp == std::cmp::Ordering::Equal {
758                        id_a.cmp(id_b) // Alphabetical for determinism
759                    } else {
760                        score_cmp
761                    }
762                });
763            }
764            SelectionStrategy::LowestLatency => {
765                // Sort by SRTT (lowest first), use score and peer_id as tiebreakers
766                sorted.sort_by(|(id_a, a), (id_b, b)| {
767                    let rtt_cmp = a
768                        .srtt_ms
769                        .partial_cmp(&b.srtt_ms)
770                        .unwrap_or(std::cmp::Ordering::Equal);
771                    if rtt_cmp == std::cmp::Ordering::Equal {
772                        let score_cmp = b
773                            .score()
774                            .partial_cmp(&a.score())
775                            .unwrap_or(std::cmp::Ordering::Equal);
776                        if score_cmp == std::cmp::Ordering::Equal {
777                            id_a.cmp(id_b)
778                        } else {
779                            score_cmp
780                        }
781                    } else {
782                        rtt_cmp
783                    }
784                });
785            }
786            SelectionStrategy::HighestSuccessRate => {
787                // Sort by success rate (highest first), peer_id as tiebreaker
788                sorted.sort_by(|(id_a, a), (id_b, b)| {
789                    let rate_cmp = b
790                        .success_rate()
791                        .partial_cmp(&a.success_rate())
792                        .unwrap_or(std::cmp::Ordering::Equal);
793                    if rate_cmp == std::cmp::Ordering::Equal {
794                        id_a.cmp(id_b)
795                    } else {
796                        rate_cmp
797                    }
798                });
799            }
800            SelectionStrategy::TitForTat => {
801                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
802                sorted.sort_by(|(id_a, a), (id_b, b)| {
803                    let score_a =
804                        self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
805                    let score_b =
806                        self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
807                    let score_cmp = score_b
808                        .partial_cmp(&score_a)
809                        .unwrap_or(std::cmp::Ordering::Equal);
810                    if score_cmp == std::cmp::Ordering::Equal {
811                        id_a.cmp(id_b)
812                    } else {
813                        score_cmp
814                    }
815                });
816            }
817            SelectionStrategy::UtilityUcb => {
818                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
819                sorted.sort_by(|(id_a, a), (id_b, b)| {
820                    let score_a =
821                        self.blend_with_payment_priority(a, a.utility_score(total_requests));
822                    let score_b =
823                        self.blend_with_payment_priority(b, b.utility_score(total_requests));
824                    let score_cmp = score_b
825                        .partial_cmp(&score_a)
826                        .unwrap_or(std::cmp::Ordering::Equal);
827                    if score_cmp == std::cmp::Ordering::Equal {
828                        id_a.cmp(id_b)
829                    } else {
830                        score_cmp
831                    }
832                });
833            }
834            SelectionStrategy::RoundRobin => {
835                // Rotate the list based on round-robin index
836                if !sorted.is_empty() {
837                    let idx = self.round_robin_idx % sorted.len();
838                    sorted.rotate_left(idx);
839                    self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
840                }
841            }
842            SelectionStrategy::Random => {
843                // Shuffle using simple deterministic approach for reproducibility
844                // In production, use proper random shuffle
845            }
846        }
847
848        sorted.into_iter().map(|(id, _)| id).collect()
849    }
850
851    /// Select single best peer
852    pub fn select_best(&mut self) -> Option<String> {
853        self.select_peers().into_iter().next()
854    }
855
856    /// Select top N peers
857    pub fn select_top(&mut self, n: usize) -> Vec<String> {
858        self.select_peers().into_iter().take(n).collect()
859    }
860
861    /// Get summary statistics across all peers
862    pub fn summary(&self) -> SelectorSummary {
863        let count = self.stats.len();
864        if count == 0 {
865            return SelectorSummary::default();
866        }
867
868        let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
869        let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
870        let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
871        let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
872
873        let avg_rtt = {
874            let rtts: Vec<f64> = self
875                .stats
876                .values()
877                .filter(|s| s.srtt_ms > 0.0)
878                .map(|s| s.srtt_ms)
879                .collect();
880            if rtts.is_empty() {
881                0.0
882            } else {
883                rtts.iter().sum::<f64>() / rtts.len() as f64
884            }
885        };
886
887        SelectorSummary {
888            peer_count: count,
889            total_requests,
890            total_successes,
891            total_timeouts,
892            backed_off_count: backed_off,
893            avg_rtt_ms: avg_rtt,
894            overall_success_rate: if total_requests > 0 {
895                total_successes as f64 / total_requests as f64
896            } else {
897                0.0
898            },
899        }
900    }
901
902    /// Export persisted peer metadata keyed by stable principal identity.
903    pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
904        let mut by_principal = self.persisted_metadata.clone();
905        for stats in self.stats.values() {
906            let principal = peer_principal(&stats.peer_id).to_string();
907            by_principal.insert(
908                principal.clone(),
909                PersistedPeerMetadata::from_stats(principal, stats),
910            );
911        }
912
913        let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
914        peers.sort_by(|a, b| a.principal.cmp(&b.principal));
915
916        PeerMetadataSnapshot {
917            version: PEER_METADATA_SNAPSHOT_VERSION,
918            peers,
919        }
920    }
921
922    /// Import persisted metadata and apply it to currently tracked peers.
923    pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
924        if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
925            return;
926        }
927
928        self.persisted_metadata.clear();
929        for peer in &snapshot.peers {
930            self.persisted_metadata
931                .insert(peer.principal.clone(), peer.clone());
932        }
933
934        for stats in self.stats.values_mut() {
935            if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
936                saved.apply_to_stats(stats);
937            }
938        }
939    }
940}
941
942/// Summary statistics for the selector
943#[derive(Debug, Clone, Default)]
944pub struct SelectorSummary {
945    pub peer_count: usize,
946    pub total_requests: u64,
947    pub total_successes: u64,
948    pub total_timeouts: u64,
949    pub backed_off_count: usize,
950    pub avg_rtt_ms: f64,
951    pub overall_success_rate: f64,
952}
953
954#[cfg(test)]
955mod tests {
956    use super::*;
957    use std::thread::sleep;
958
959    #[test]
960    fn test_peer_stats_success_rate() {
961        let mut stats = PeerStats::new("peer1");
962        assert_eq!(stats.success_rate(), 0.5); // Neutral for new peer
963
964        stats.record_request(40);
965        stats.record_success(50, 1024);
966        assert_eq!(stats.success_rate(), 1.0);
967
968        stats.record_request(40);
969        stats.record_timeout();
970        assert_eq!(stats.success_rate(), 0.5);
971    }
972
973    #[test]
974    fn test_peer_stats_rtt_calculation() {
975        let mut stats = PeerStats::new("peer1");
976
977        // First RTT measurement
978        stats.record_request(40);
979        stats.record_success(100, 1024);
980        assert_eq!(stats.srtt_ms, 100.0);
981        assert_eq!(stats.rttvar_ms, 50.0); // RTT/2
982
983        // Second measurement
984        stats.record_request(40);
985        stats.record_success(80, 1024);
986        // SRTT = 0.875 * 100 + 0.125 * 80 = 87.5 + 10 = 97.5
987        assert!((stats.srtt_ms - 97.5).abs() < 0.1);
988    }
989
990    #[test]
991    fn test_peer_stats_backoff() {
992        let mut stats = PeerStats::new("peer1");
993        assert!(!stats.is_backed_off());
994
995        stats.record_timeout();
996        assert!(stats.is_backed_off());
997        assert!(stats.backoff_remaining() > Duration::ZERO);
998    }
999
1000    #[test]
1001    fn test_peer_stats_backoff_clears_on_success() {
1002        let mut stats = PeerStats::new("peer1");
1003        stats.record_timeout();
1004        assert!(stats.is_backed_off());
1005
1006        stats.record_success(50, 1024);
1007        assert!(!stats.is_backed_off());
1008        assert_eq!(stats.backoff_level, 0);
1009    }
1010
1011    #[test]
1012    fn test_peer_selector_add_remove() {
1013        let mut selector = PeerSelector::new();
1014        selector.add_peer("peer1");
1015        selector.add_peer("peer2");
1016        assert!(selector.get_stats("peer1").is_some());
1017        assert!(selector.get_stats("peer2").is_some());
1018
1019        selector.remove_peer("peer1");
1020        assert!(selector.get_stats("peer1").is_none());
1021        assert!(selector.get_stats("peer2").is_some());
1022    }
1023
1024    #[test]
1025    fn test_peer_selector_weighted_selection() {
1026        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1027        selector.add_peer("peer1");
1028        selector.add_peer("peer2");
1029        selector.add_peer("peer3");
1030
1031        // Peer 1: good (high success, low RTT)
1032        selector.record_request("peer1", 40);
1033        selector.record_success("peer1", 20, 1024);
1034        selector.record_request("peer1", 40);
1035        selector.record_success("peer1", 25, 1024);
1036
1037        // Peer 2: medium
1038        selector.record_request("peer2", 40);
1039        selector.record_success("peer2", 100, 1024);
1040        selector.record_request("peer2", 40);
1041        selector.record_timeout("peer2");
1042
1043        // Peer 3: bad (timeouts)
1044        selector.record_request("peer3", 40);
1045        selector.record_timeout("peer3");
1046        selector.record_request("peer3", 40);
1047        selector.record_timeout("peer3");
1048
1049        // Peer 3 should be backed off
1050        let peers = selector.select_peers();
1051        // Peer 1 should be first (best score)
1052        assert_eq!(peers[0], "peer1");
1053    }
1054
1055    #[test]
1056    fn test_peer_selector_backed_off_peers() {
1057        let mut selector = PeerSelector::new();
1058        selector.add_peer("peer1");
1059        selector.add_peer("peer2");
1060
1061        // Back off peer 1
1062        selector.record_timeout("peer1");
1063        assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1064
1065        // Peer 2 should be available
1066        let peers = selector.select_peers();
1067        assert_eq!(peers.len(), 1);
1068        assert_eq!(peers[0], "peer2");
1069    }
1070
1071    #[test]
1072    fn test_peer_selector_all_backed_off_fallback() {
1073        let mut selector = PeerSelector::new();
1074        selector.add_peer("peer1");
1075        selector.add_peer("peer2");
1076
1077        // Back off both peers
1078        selector.record_timeout("peer1");
1079        selector.record_timeout("peer2");
1080
1081        // Should still return peers (sorted by backoff remaining)
1082        let peers = selector.select_peers();
1083        assert_eq!(peers.len(), 2);
1084    }
1085
1086    #[test]
1087    fn test_peer_selector_fairness() {
1088        let mut selector = PeerSelector::new();
1089        selector.set_fairness(true);
1090
1091        // Add 5+ peers to enable fairness
1092        for i in 1..=6 {
1093            selector.add_peer(format!("peer{}", i));
1094        }
1095
1096        // Simulate peer 1 being selected way too often
1097        sleep(Duration::from_millis(15));
1098
1099        for _ in 0..100 {
1100            selector.record_request("peer1", 40);
1101            selector.record_success("peer1", 10, 100);
1102        }
1103
1104        // Other peers get very few requests
1105        for i in 2..=6 {
1106            selector.record_request(&format!("peer{}", i), 40);
1107            selector.record_success(&format!("peer{}", i), 10, 100);
1108        }
1109
1110        // Peer 1 should be skipped due to fairness (>30% selection rate)
1111        let skipped = selector.should_skip_for_fairness("peer1");
1112        let _ = skipped; // May or may not trigger depending on timing
1113    }
1114
1115    #[test]
1116    fn test_peer_selector_summary() {
1117        let mut selector = PeerSelector::new();
1118        selector.add_peer("peer1");
1119        selector.add_peer("peer2");
1120
1121        selector.record_request("peer1", 40);
1122        selector.record_success("peer1", 50, 1024);
1123        selector.record_request("peer2", 40);
1124        selector.record_timeout("peer2");
1125
1126        let summary = selector.summary();
1127        assert_eq!(summary.peer_count, 2);
1128        assert_eq!(summary.total_requests, 2);
1129        assert_eq!(summary.total_successes, 1);
1130        assert_eq!(summary.total_timeouts, 1);
1131        assert_eq!(summary.backed_off_count, 1);
1132        assert_eq!(summary.overall_success_rate, 0.5);
1133    }
1134
1135    #[test]
1136    fn test_peer_stats_score() {
1137        let mut stats = PeerStats::new("peer1");
1138
1139        // New peer has neutral score
1140        let initial_score = stats.score();
1141        assert!(initial_score > 0.3 && initial_score < 0.7);
1142
1143        // Good peer: high success rate + low RTT
1144        for _ in 0..10 {
1145            stats.record_request(40);
1146            stats.record_success(20, 1024);
1147        }
1148        let good_score = stats.score();
1149        assert!(good_score > 0.8);
1150
1151        // Bad peer: high timeout rate
1152        let mut bad_stats = PeerStats::new("peer2");
1153        for _ in 0..10 {
1154            bad_stats.record_request(40);
1155            bad_stats.record_timeout();
1156        }
1157        let bad_score = bad_stats.score();
1158        assert!(bad_score < 0.3);
1159
1160        assert!(good_score > bad_score);
1161    }
1162
1163    #[test]
1164    fn test_peer_stats_utility_score_prefers_good_over_bad() {
1165        let mut good = PeerStats::new("good");
1166        good.requests_sent = 120;
1167        good.successes = 96;
1168        good.failures = 8;
1169        good.timeouts = 4;
1170        good.srtt_ms = 30.0;
1171        good.bytes_sent = 120 * 40;
1172        good.bytes_received = 96 * 1024;
1173
1174        let mut bad = PeerStats::new("bad");
1175        bad.requests_sent = 120;
1176        bad.successes = 40;
1177        bad.failures = 50;
1178        bad.timeouts = 30;
1179        bad.srtt_ms = 220.0;
1180        bad.bytes_sent = 120 * 40;
1181        bad.bytes_received = 40 * 1024;
1182
1183        let total_requests = good.requests_sent + bad.requests_sent;
1184        assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1185    }
1186
1187    #[test]
1188    fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
1189        let mut reciprocal = PeerStats::new("reciprocal");
1190        reciprocal.requests_sent = 100;
1191        reciprocal.successes = 90;
1192        reciprocal.failures = 5;
1193        reciprocal.timeouts = 5;
1194        reciprocal.srtt_ms = 40.0;
1195        reciprocal.bytes_sent = 100 * 40;
1196        reciprocal.bytes_received = 90 * 1024;
1197
1198        let mut leecher = PeerStats::new("leecher");
1199        leecher.requests_sent = 100;
1200        leecher.successes = 40;
1201        leecher.failures = 30;
1202        leecher.timeouts = 30;
1203        leecher.srtt_ms = 120.0;
1204        leecher.bytes_sent = 100 * 40;
1205        leecher.bytes_received = 10 * 1024;
1206
1207        let total_requests = reciprocal.requests_sent + leecher.requests_sent;
1208        assert!(
1209            reciprocal.tit_for_tat_score(total_requests)
1210                > leecher.tit_for_tat_score(total_requests)
1211        );
1212    }
1213
1214    #[test]
1215    fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1216        let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1217        selector.add_peer("stable");
1218        selector.add_peer("new");
1219
1220        {
1221            let stable = selector.get_stats_mut("stable").unwrap();
1222            stable.requests_sent = 500;
1223            stable.successes = 450;
1224            stable.failures = 35;
1225            stable.timeouts = 15;
1226            stable.srtt_ms = 35.0;
1227            stable.bytes_sent = 500 * 40;
1228            stable.bytes_received = 450 * 1024;
1229        }
1230        {
1231            let new_peer = selector.get_stats_mut("new").unwrap();
1232            new_peer.requests_sent = 2;
1233            new_peer.successes = 2;
1234            new_peer.failures = 0;
1235            new_peer.timeouts = 0;
1236            new_peer.srtt_ms = 70.0;
1237            new_peer.bytes_sent = 2 * 40;
1238            new_peer.bytes_received = 2 * 1024;
1239        }
1240
1241        let peers = selector.select_peers();
1242        assert_eq!(peers[0], "new");
1243    }
1244
1245    #[test]
1246    fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
1247        let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
1248        selector.add_peer("reciprocal");
1249        selector.add_peer("leecher");
1250
1251        {
1252            let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
1253            reciprocal.requests_sent = 120;
1254            reciprocal.successes = 102;
1255            reciprocal.failures = 8;
1256            reciprocal.timeouts = 10;
1257            reciprocal.srtt_ms = 45.0;
1258            reciprocal.bytes_sent = 120 * 40;
1259            reciprocal.bytes_received = 102 * 1024;
1260        }
1261        {
1262            let leecher = selector.get_stats_mut("leecher").unwrap();
1263            leecher.requests_sent = 120;
1264            leecher.successes = 70;
1265            leecher.failures = 20;
1266            leecher.timeouts = 30;
1267            leecher.srtt_ms = 35.0;
1268            leecher.bytes_sent = 120 * 40;
1269            leecher.bytes_received = 8 * 1024;
1270        }
1271
1272        let peers = selector.select_peers();
1273        assert_eq!(peers[0], "reciprocal");
1274    }
1275
1276    #[test]
1277    fn test_lowest_latency_strategy() {
1278        let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1279        selector.add_peer("peer1");
1280        selector.add_peer("peer2");
1281        selector.add_peer("peer3");
1282
1283        // Peer 1: 100ms RTT
1284        selector.record_request("peer1", 40);
1285        selector.record_success("peer1", 100, 1024);
1286
1287        // Peer 2: 20ms RTT (fastest)
1288        selector.record_request("peer2", 40);
1289        selector.record_success("peer2", 20, 1024);
1290
1291        // Peer 3: 50ms RTT
1292        selector.record_request("peer3", 40);
1293        selector.record_success("peer3", 50, 1024);
1294
1295        let peers = selector.select_peers();
1296        // Peer 2 should be first (lowest RTT)
1297        assert_eq!(peers[0], "peer2");
1298    }
1299
1300    fn build_cashu_priority_fixture() -> PeerSelector {
1301        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1302        selector.add_peer("reliable");
1303        selector.add_peer("paid");
1304
1305        {
1306            let reliable = selector.get_stats_mut("reliable").expect("reliable");
1307            reliable.requests_sent = 80;
1308            reliable.successes = 75;
1309            reliable.failures = 2;
1310            reliable.timeouts = 3;
1311            reliable.srtt_ms = 40.0;
1312            reliable.bytes_sent = 80 * 40;
1313            reliable.bytes_received = 75 * 1024;
1314        }
1315        {
1316            let paid = selector.get_stats_mut("paid").expect("paid");
1317            paid.requests_sent = 80;
1318            paid.successes = 36;
1319            paid.failures = 24;
1320            paid.timeouts = 20;
1321            paid.srtt_ms = 700.0;
1322            paid.bytes_sent = 80 * 40;
1323            paid.bytes_received = 36 * 512;
1324        }
1325
1326        selector
1327    }
1328
1329    #[test]
1330    fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1331        let mut selector = build_cashu_priority_fixture();
1332        selector.set_cashu_payment_weight(0.0);
1333        selector.record_cashu_payment("paid", 5_000);
1334
1335        let peers = selector.select_peers();
1336        assert_eq!(peers[0], "reliable");
1337    }
1338
1339    #[test]
1340    fn test_cashu_payment_weight_prioritizes_paid_peer() {
1341        let mut selector = build_cashu_priority_fixture();
1342        selector.set_cashu_payment_weight(0.8);
1343        selector.record_cashu_payment("paid", 5_000);
1344
1345        let peers = selector.select_peers();
1346        assert_eq!(peers[0], "paid");
1347    }
1348
1349    #[test]
1350    fn test_cashu_payment_default_downranks_peer() {
1351        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1352        selector.add_peer("honest");
1353        selector.add_peer("delinquent");
1354
1355        for peer_id in ["honest", "delinquent"] {
1356            let stats = selector.get_stats_mut(peer_id).expect("stats");
1357            stats.requests_sent = 40;
1358            stats.successes = 34;
1359            stats.failures = 3;
1360            stats.timeouts = 3;
1361            stats.srtt_ms = 60.0;
1362            stats.bytes_sent = 40 * 40;
1363            stats.bytes_received = 34 * 1024;
1364        }
1365
1366        selector.record_cashu_payment_default("delinquent");
1367
1368        let peers = selector.select_peers();
1369        assert_eq!(peers[0], "honest");
1370        assert!(!peers.iter().any(|peer| peer == "delinquent"));
1371    }
1372
1373    #[test]
1374    fn test_payment_default_threshold_blocks_peer() {
1375        let mut selector = PeerSelector::new();
1376        selector.record_cashu_payment_default("peer-a");
1377        assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1378        assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1379    }
1380
1381    #[test]
1382    fn test_peer_principal_matches_peer_id() {
1383        assert_eq!(peer_principal("npub1abc"), "npub1abc");
1384        assert_eq!(peer_principal("peer-hex-01"), "peer-hex-01");
1385    }
1386
1387    #[test]
1388    fn test_metadata_snapshot_restores_for_same_peer_id() {
1389        let mut selector = PeerSelector::new();
1390        selector.add_peer("npub1stable");
1391        selector.record_request("npub1stable", 64);
1392        selector.record_success("npub1stable", 32, 1024);
1393        selector.record_cashu_payment("npub1stable", 77);
1394        selector.record_cashu_receipt("npub1stable", 33);
1395        selector.record_cashu_payment_default("npub1stable");
1396
1397        let snapshot = selector.export_peer_metadata_snapshot();
1398        assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1399        assert_eq!(snapshot.peers.len(), 1);
1400        assert_eq!(snapshot.peers[0].principal, "npub1stable");
1401
1402        let mut restored = PeerSelector::new();
1403        restored.import_peer_metadata_snapshot(&snapshot);
1404        restored.add_peer("npub1stable");
1405        let stats = restored.get_stats("npub1stable").expect("restored stats");
1406        assert_eq!(stats.requests_sent, 1);
1407        assert_eq!(stats.successes, 1);
1408        assert_eq!(stats.cashu_paid_sat, 77);
1409        assert_eq!(stats.cashu_received_sat, 33);
1410        assert_eq!(stats.cashu_payment_receipts, 1);
1411        assert_eq!(stats.cashu_payment_defaults, 1);
1412    }
1413}