Skip to main content

hashtree_webrtc/
peer_selector.rs

1//! Adaptive peer selection based on Freenet patterns
2//!
3//! Implements sophisticated peer selection that favors reliable, fast peers:
4//! - Per-peer performance tracking (RTT, success rate)
5//! - RFC 2988-style smoothed RTT calculation
6//! - Exponential backoff for failing/slow peers
7//! - Fairness constraints to prevent overloading any single peer
8//! - Weighted selection combining multiple signals
9
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::time::{Duration, Instant};
13
14/// Constants from Freenet's PeerManager
15const SELECTION_PERCENTAGE_WARNING: f64 = 0.30; // Skip if selected >30% of time
16const SELECTION_MIN_PEERS: usize = 5; // Enable fairness if >=5 peers
17
18/// Backoff constants (from Freenet)
19const INITIAL_BACKOFF_MS: u64 = 1000; // 1 second initial backoff
20const BACKOFF_MULTIPLIER: u64 = 2; // Exponential backoff
21const MAX_BACKOFF_MS: u64 = 480_000; // 8 minutes max backoff
22
23/// RTO constants (RFC 2988)
24const MIN_RTO_MS: u64 = 50; // Minimum retransmission timeout
25const MAX_RTO_MS: u64 = 60_000; // Maximum RTO (60 seconds)
26const INITIAL_RTO_MS: u64 = 1000; // Initial RTO before any measurements
27
28/// Current schema version for persisted peer metadata snapshots.
29pub const PEER_METADATA_SNAPSHOT_VERSION: u32 = 1;
30
31/// Persisted metadata for a logical peer principal (pubkey/npub identity).
32///
33/// This omits process-local runtime fields (`Instant`, active backoff timers) so
34/// metadata can survive restarts and session UUID churn.
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
36pub struct PersistedPeerMetadata {
37    /// Stable principal identity (usually pubkey/npub).
38    pub principal: String,
39    pub requests_sent: u64,
40    pub successes: u64,
41    pub timeouts: u64,
42    pub failures: u64,
43    pub srtt_ms: f64,
44    pub rttvar_ms: f64,
45    pub rto_ms: u64,
46    pub bytes_received: u64,
47    pub bytes_sent: u64,
48    pub cashu_paid_sat: u64,
49    pub cashu_received_sat: u64,
50    pub cashu_payment_receipts: u64,
51    pub cashu_payment_defaults: u64,
52}
53
54impl PersistedPeerMetadata {
55    fn from_stats(principal: String, stats: &PeerStats) -> Self {
56        Self {
57            principal,
58            requests_sent: stats.requests_sent,
59            successes: stats.successes,
60            timeouts: stats.timeouts,
61            failures: stats.failures,
62            srtt_ms: sanitize_latency(stats.srtt_ms),
63            rttvar_ms: sanitize_latency(stats.rttvar_ms),
64            rto_ms: clamp_rto(stats.rto_ms),
65            bytes_received: stats.bytes_received,
66            bytes_sent: stats.bytes_sent,
67            cashu_paid_sat: stats.cashu_paid_sat,
68            cashu_received_sat: stats.cashu_received_sat,
69            cashu_payment_receipts: stats.cashu_payment_receipts,
70            cashu_payment_defaults: stats.cashu_payment_defaults,
71        }
72    }
73
74    fn apply_to_stats(&self, stats: &mut PeerStats) {
75        stats.requests_sent = self.requests_sent;
76        stats.successes = self.successes;
77        stats.timeouts = self.timeouts;
78        stats.failures = self.failures;
79        stats.srtt_ms = sanitize_latency(self.srtt_ms);
80        stats.rttvar_ms = sanitize_latency(self.rttvar_ms);
81        stats.rto_ms = clamp_rto(self.rto_ms);
82        stats.bytes_received = self.bytes_received;
83        stats.bytes_sent = self.bytes_sent;
84        stats.cashu_paid_sat = self.cashu_paid_sat;
85        stats.cashu_received_sat = self.cashu_received_sat;
86        stats.cashu_payment_receipts = self.cashu_payment_receipts;
87        stats.cashu_payment_defaults = self.cashu_payment_defaults;
88
89        // Runtime-only state is intentionally reset on restore.
90        stats.backoff_level = 0;
91        stats.backed_off_until = None;
92        stats.last_success = None;
93        stats.last_failure = None;
94        stats.consecutive_rto_backoffs = 0;
95    }
96}
97
98/// Snapshot of metadata for all known principals.
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct PeerMetadataSnapshot {
101    pub version: u32,
102    pub peers: Vec<PersistedPeerMetadata>,
103}
104
105impl Default for PeerMetadataSnapshot {
106    fn default() -> Self {
107        Self {
108            version: PEER_METADATA_SNAPSHOT_VERSION,
109            peers: Vec::new(),
110        }
111    }
112}
113
114fn sanitize_latency(value: f64) -> f64 {
115    if value.is_finite() && value >= 0.0 {
116        value
117    } else {
118        0.0
119    }
120}
121
122fn clamp_rto(rto_ms: u64) -> u64 {
123    if rto_ms == 0 {
124        INITIAL_RTO_MS
125    } else {
126        rto_ms.clamp(MIN_RTO_MS, MAX_RTO_MS)
127    }
128}
129
130/// Extract stable principal identity from a transient peer id.
131///
132/// Peer IDs are generally `"<principal>:<session>"`; if no `:` exists we treat
133/// the full peer id as the principal.
134pub fn peer_principal(peer_id: &str) -> &str {
135    peer_id
136        .split_once(':')
137        .map(|(principal, _)| principal)
138        .unwrap_or(peer_id)
139}
140
141/// Per-peer performance statistics
142#[derive(Debug, Clone)]
143pub struct PeerStats {
144    /// Peer identifier
145    pub peer_id: String,
146    /// When this peer was connected
147    pub connected_at: Instant,
148    /// Total requests sent to this peer
149    pub requests_sent: u64,
150    /// Total successful responses received
151    pub successes: u64,
152    /// Total timeouts
153    pub timeouts: u64,
154    /// Total failures (bad data, disconnects, etc.)
155    pub failures: u64,
156    /// Smoothed round-trip time (RFC 2988 SRTT)
157    pub srtt_ms: f64,
158    /// RTT variance (RFC 2988 RTTVAR)
159    pub rttvar_ms: f64,
160    /// Retransmission timeout (computed from SRTT and RTTVAR)
161    pub rto_ms: u64,
162    /// Consecutive RTO backoffs (for capping)
163    pub consecutive_rto_backoffs: u32,
164    /// Current backoff level (how many times we've backed off)
165    pub backoff_level: u32,
166    /// When backoff expires (None if not backed off)
167    pub backed_off_until: Option<Instant>,
168    /// Last successful response timestamp
169    pub last_success: Option<Instant>,
170    /// Last failure timestamp
171    pub last_failure: Option<Instant>,
172    /// Total bytes received from this peer
173    pub bytes_received: u64,
174    /// Total bytes sent to this peer
175    pub bytes_sent: u64,
176    /// Total sats paid to this peer through an external payment channel.
177    pub cashu_paid_sat: u64,
178    /// Total sats this peer paid us after successful delivery.
179    pub cashu_received_sat: u64,
180    /// Number of successful post-delivery payments received from this peer.
181    pub cashu_payment_receipts: u64,
182    /// Number of times this peer failed to pay after successful delivery.
183    pub cashu_payment_defaults: u64,
184}
185
186impl PeerStats {
187    /// Create new peer stats
188    pub fn new(peer_id: impl Into<String>) -> Self {
189        Self {
190            peer_id: peer_id.into(),
191            connected_at: Instant::now(),
192            requests_sent: 0,
193            successes: 0,
194            timeouts: 0,
195            failures: 0,
196            srtt_ms: 0.0,
197            rttvar_ms: 0.0,
198            rto_ms: INITIAL_RTO_MS,
199            consecutive_rto_backoffs: 0,
200            backoff_level: 0,
201            backed_off_until: None,
202            last_success: None,
203            last_failure: None,
204            bytes_received: 0,
205            bytes_sent: 0,
206            cashu_paid_sat: 0,
207            cashu_received_sat: 0,
208            cashu_payment_receipts: 0,
209            cashu_payment_defaults: 0,
210        }
211    }
212
213    /// Get success rate (0.0 to 1.0)
214    pub fn success_rate(&self) -> f64 {
215        if self.requests_sent == 0 {
216            return 0.5; // Neutral for new peers
217        }
218        self.successes as f64 / self.requests_sent as f64
219    }
220
221    /// Get selection rate (selections per second since connected)
222    pub fn selection_rate(&self) -> f64 {
223        let elapsed = self.connected_at.elapsed();
224        if elapsed.as_secs() < 10 {
225            return 0.0; // Avoid bias from short uptime (Freenet pattern)
226        }
227        self.requests_sent as f64 / elapsed.as_secs_f64()
228    }
229
230    /// Check if peer is currently backed off
231    pub fn is_backed_off(&self) -> bool {
232        if let Some(until) = self.backed_off_until {
233            Instant::now() < until
234        } else {
235            false
236        }
237    }
238
239    /// Get remaining backoff time
240    pub fn backoff_remaining(&self) -> Duration {
241        if let Some(until) = self.backed_off_until {
242            let now = Instant::now();
243            if now < until {
244                return until - now;
245            }
246        }
247        Duration::ZERO
248    }
249
250    /// Record a request being sent
251    pub fn record_request(&mut self, bytes: u64) {
252        self.requests_sent += 1;
253        self.bytes_sent += bytes;
254    }
255
256    /// Record a successful response with RTT
257    /// Uses RFC 2988 algorithm for smoothed RTT calculation
258    pub fn record_success(&mut self, rtt_ms: u64, bytes: u64) {
259        self.successes += 1;
260        self.bytes_received += bytes;
261        self.last_success = Some(Instant::now());
262        self.consecutive_rto_backoffs = 0;
263
264        // Clear backoff on success
265        self.backed_off_until = None;
266        self.backoff_level = 0;
267
268        // RFC 2988 RTT update
269        let rtt = rtt_ms as f64;
270        if self.srtt_ms == 0.0 {
271            // First measurement
272            self.srtt_ms = rtt;
273            self.rttvar_ms = rtt / 2.0;
274        } else {
275            // Subsequent measurements
276            // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R'|
277            // SRTT = (1 - alpha) * SRTT + alpha * R'
278            // where alpha = 1/8 = 0.125 and beta = 1/4 = 0.25
279            self.rttvar_ms = 0.75 * self.rttvar_ms + 0.25 * (self.srtt_ms - rtt).abs();
280            self.srtt_ms = 0.875 * self.srtt_ms + 0.125 * rtt;
281        }
282
283        // RTO = SRTT + max(G, K*RTTVAR) where G=20ms granularity, K=4
284        let rto = self.srtt_ms + (20.0_f64).max(4.0 * self.rttvar_ms);
285        self.rto_ms = (rto as u64).clamp(MIN_RTO_MS, MAX_RTO_MS);
286    }
287
288    /// Record a timeout
289    pub fn record_timeout(&mut self) {
290        self.timeouts += 1;
291        self.last_failure = Some(Instant::now());
292
293        // Apply backoff
294        self.apply_backoff();
295
296        // RFC 2988: Double RTO on timeout (up to max)
297        if self.consecutive_rto_backoffs < 5 {
298            self.rto_ms = (self.rto_ms * 2).min(MAX_RTO_MS);
299            self.consecutive_rto_backoffs += 1;
300        }
301    }
302
303    /// Record a failure (bad data, disconnect, etc.)
304    pub fn record_failure(&mut self) {
305        self.failures += 1;
306        self.last_failure = Some(Instant::now());
307        self.apply_backoff();
308    }
309
310    /// Record an out-of-band payment to this peer (e.g. Cashu channel transfer).
311    pub fn record_cashu_payment(&mut self, amount_sat: u64) {
312        if amount_sat == 0 {
313            return;
314        }
315        self.cashu_paid_sat = self.cashu_paid_sat.saturating_add(amount_sat);
316    }
317
318    /// Record a settled payment received from this peer after we served data.
319    pub fn record_cashu_receipt(&mut self, amount_sat: u64) {
320        if amount_sat == 0 {
321            return;
322        }
323        self.cashu_received_sat = self.cashu_received_sat.saturating_add(amount_sat);
324        self.cashu_payment_receipts = self.cashu_payment_receipts.saturating_add(1);
325    }
326
327    /// Record that this peer failed to pay after successful delivery.
328    pub fn record_cashu_payment_default(&mut self) {
329        self.cashu_payment_defaults = self.cashu_payment_defaults.saturating_add(1);
330        self.last_failure = Some(Instant::now());
331        self.apply_backoff();
332    }
333
334    /// Apply exponential backoff
335    fn apply_backoff(&mut self) {
336        self.backoff_level += 1;
337        let backoff_ms = (INITIAL_BACKOFF_MS * BACKOFF_MULTIPLIER.pow(self.backoff_level - 1))
338            .min(MAX_BACKOFF_MS);
339        self.backed_off_until = Some(Instant::now() + Duration::from_millis(backoff_ms));
340    }
341
342    /// Calculate peer score for selection (higher is better)
343    /// Combines success rate, RTT, and recent performance
344    pub fn score(&self) -> f64 {
345        // Base score from success rate (0-1)
346        let success_score = self.success_rate();
347
348        // RTT score: prefer faster peers (inverse of normalized RTT)
349        // Scale: 0-50ms = 1.0, 500ms+ = 0.1
350        let rtt_score = if self.srtt_ms <= 0.0 {
351            0.5 // Unknown RTT, neutral
352        } else {
353            (500.0 / (self.srtt_ms + 50.0)).min(1.0)
354        };
355
356        // Recency bonus: slight boost for recently successful peers
357        let recency_bonus = if let Some(last) = self.last_success {
358            let secs_ago = last.elapsed().as_secs_f64();
359            if secs_ago < 60.0 {
360                0.1 // Recent success
361            } else {
362                0.0
363            }
364        } else {
365            0.0
366        };
367
368        // Combine scores (weighted)
369        // Success rate is most important (60%), RTT next (30%), recency last (10%)
370        0.6 * success_score + 0.3 * rtt_score + 0.1 * (1.0 + recency_bonus)
371    }
372
373    /// Utility-centric score with exploration bonus (UCB-style).
374    ///
375    /// Balances:
376    /// - good/bad outcome ratio (successes vs failures+timeouts),
377    /// - latency efficiency,
378    /// - bytes efficiency (received vs sent),
379    /// - uncertainty bonus for less-tested peers.
380    pub fn utility_score(&self, total_requests: u64) -> f64 {
381        let good = self.successes as f64 + 1.0;
382        let bad = (self.failures + self.timeouts) as f64 + 1.0;
383        let ratio = good / bad;
384        let ratio_score = ratio / (1.0 + ratio);
385
386        let latency_score = if self.srtt_ms <= 0.0 {
387            0.5
388        } else {
389            (300.0 / (self.srtt_ms + 50.0)).min(1.0)
390        };
391
392        let efficiency_score = if self.bytes_sent == 0 {
393            0.5
394        } else {
395            (self.bytes_received as f64 / self.bytes_sent as f64).min(1.0)
396        };
397
398        let exploitation = 0.55 * ratio_score + 0.25 * latency_score + 0.20 * efficiency_score;
399
400        let uncertainty =
401            (((total_requests as f64) + 1.0).ln() / ((self.requests_sent as f64) + 1.0)).sqrt();
402        let exploration_bonus = 0.20 * uncertainty;
403
404        exploitation + exploration_bonus
405    }
406
407    /// Tit-for-tat style score:
408    /// - reward peers that answer our requests with useful bytes (reciprocity)
409    /// - reward reliable responders
410    /// - penalize timeout/failure-heavy peers (retaliation)
411    /// - keep a small exploration term for under-sampled peers
412    pub fn tit_for_tat_score(&self, total_requests: u64) -> f64 {
413        // Beta prior keeps cold-start behavior neutral while converging quickly.
414        let reliability = (self.successes as f64 + 1.0) / (self.requests_sent as f64 + 2.0);
415
416        // Reciprocity should not be dominated by one lucky large payload. We
417        // gate byte-ratio impact with success confidence.
418        let reciprocity_raw = if self.bytes_sent == 0 {
419            1.0
420        } else {
421            self.bytes_received as f64 / self.bytes_sent as f64
422        };
423        let reciprocity_ratio = reciprocity_raw / (1.0 + reciprocity_raw);
424        let reciprocity_confidence = self.successes as f64 / (self.successes as f64 + 4.0);
425        let reciprocity =
426            (1.0 - reciprocity_confidence) * 0.5 + reciprocity_confidence * reciprocity_ratio;
427
428        let rtt_score = if self.srtt_ms <= 0.0 {
429            0.5
430        } else {
431            (400.0 / (self.srtt_ms + 50.0)).min(1.0)
432        };
433
434        let timeout_rate = if self.requests_sent == 0 {
435            0.0
436        } else {
437            self.timeouts as f64 / self.requests_sent as f64
438        };
439        let failure_rate = if self.requests_sent == 0 {
440            0.0
441        } else {
442            self.failures as f64 / self.requests_sent as f64
443        };
444        let retaliation_penalty =
445            (0.60 * timeout_rate + 0.45 * failure_rate + 0.10 * self.backoff_level as f64)
446                .min(0.95);
447
448        let cooperative = 0.65 * reliability + 0.25 * reciprocity + 0.10 * rtt_score;
449        let exploration = 0.03
450            * (((total_requests as f64) + 2.0).ln() / ((self.requests_sent as f64) + 2.0)).sqrt();
451
452        (cooperative + exploration - retaliation_penalty).max(0.0)
453    }
454
455    /// Normalize paid amount to a bounded priority score in [0, 1).
456    pub fn cashu_priority_boost(&self) -> f64 {
457        if self.cashu_paid_sat == 0 {
458            return 0.0;
459        }
460        let paid = self.cashu_paid_sat as f64;
461        paid / (paid + 32.0)
462    }
463
464    /// Cooperative peers that actually pay us should not be penalized; repeated
465    /// defaults quickly reduce their desirability.
466    pub fn payment_reliability_multiplier(&self) -> f64 {
467        if self.cashu_payment_receipts == 0 && self.cashu_payment_defaults == 0 {
468            return 1.0;
469        }
470        (self.cashu_payment_receipts as f64 + 1.0)
471            / (self.cashu_payment_receipts as f64 + self.cashu_payment_defaults as f64 + 1.0)
472    }
473
474    pub fn exceeds_payment_default_threshold(&self, threshold: u64) -> bool {
475        threshold > 0 && self.cashu_payment_defaults >= threshold
476    }
477}
478
479/// Peer selection strategy
480#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
481pub enum SelectionStrategy {
482    /// Select by score (success rate + RTT) - recommended
483    #[default]
484    Weighted,
485    /// Round-robin (ignores performance)
486    RoundRobin,
487    /// Random selection
488    Random,
489    /// Lowest RTT first
490    LowestLatency,
491    /// Highest success rate first
492    HighestSuccessRate,
493    /// Tit-for-tat style utility (reciprocity + reliability + retaliation + exploration)
494    TitForTat,
495    /// Utility + exploration (good/bad ratio + RTT/efficiency + UCB bonus)
496    UtilityUcb,
497}
498
499/// Adaptive peer selector
500///
501/// Tracks peer performance and selects peers intelligently:
502/// - Prefers high success rate peers
503/// - Prefers low latency peers
504/// - Backs off failing peers exponentially
505/// - Ensures fairness (no peer gets >30% of traffic with 5+ peers)
506#[derive(Debug, Default)]
507pub struct PeerSelector {
508    /// Per-peer statistics
509    stats: HashMap<String, PeerStats>,
510    /// Persisted peer metadata indexed by stable principal identity (pubkey/npub).
511    persisted_metadata: HashMap<String, PersistedPeerMetadata>,
512    /// Selection strategy
513    strategy: SelectionStrategy,
514    /// Enable fairness constraints (Freenet FOAF mitigation)
515    fairness_enabled: bool,
516    /// Round-robin index for RoundRobin strategy
517    round_robin_idx: usize,
518    /// Blending weight for payment priority. 0.0 keeps pure reputation routing.
519    cashu_payment_weight: f64,
520}
521
522impl PeerSelector {
523    /// Create a new peer selector with default weighted strategy
524    pub fn new() -> Self {
525        Self {
526            stats: HashMap::new(),
527            persisted_metadata: HashMap::new(),
528            strategy: SelectionStrategy::Weighted,
529            fairness_enabled: true,
530            round_robin_idx: 0,
531            cashu_payment_weight: 0.0,
532        }
533    }
534
535    /// Create with specific strategy
536    pub fn with_strategy(strategy: SelectionStrategy) -> Self {
537        Self {
538            stats: HashMap::new(),
539            persisted_metadata: HashMap::new(),
540            strategy,
541            fairness_enabled: true,
542            round_robin_idx: 0,
543            cashu_payment_weight: 0.0,
544        }
545    }
546
547    /// Enable/disable fairness constraints
548    pub fn set_fairness(&mut self, enabled: bool) {
549        self.fairness_enabled = enabled;
550    }
551
552    /// Configure payment-priority influence when ranking peers.
553    /// `0.0` disables payment influence and preserves reputation-only behavior.
554    pub fn set_cashu_payment_weight(&mut self, weight: f64) {
555        self.cashu_payment_weight = weight.clamp(0.0, 1.0);
556    }
557
558    /// Add a peer to track
559    pub fn add_peer(&mut self, peer_id: impl Into<String>) {
560        let peer_id = peer_id.into();
561        if self.stats.contains_key(&peer_id) {
562            return;
563        }
564
565        let mut stats = PeerStats::new(peer_id.clone());
566        if let Some(saved) = self.persisted_metadata.get(peer_principal(&peer_id)) {
567            saved.apply_to_stats(&mut stats);
568        }
569        self.stats.insert(peer_id, stats);
570    }
571
572    /// Remove a peer
573    pub fn remove_peer(&mut self, peer_id: &str) {
574        if let Some(stats) = self.stats.remove(peer_id) {
575            let principal = peer_principal(&stats.peer_id).to_string();
576            self.persisted_metadata.insert(
577                principal.clone(),
578                PersistedPeerMetadata::from_stats(principal, &stats),
579            );
580        }
581    }
582
583    /// Get peer stats (immutable)
584    pub fn get_stats(&self, peer_id: &str) -> Option<&PeerStats> {
585        self.stats.get(peer_id)
586    }
587
588    /// Get peer stats (mutable)
589    pub fn get_stats_mut(&mut self, peer_id: &str) -> Option<&mut PeerStats> {
590        self.stats.get_mut(peer_id)
591    }
592
593    /// Get all peer stats
594    pub fn all_stats(&self) -> impl Iterator<Item = &PeerStats> {
595        self.stats.values()
596    }
597
598    /// Record a request being sent to a peer
599    pub fn record_request(&mut self, peer_id: &str, bytes: u64) {
600        if let Some(stats) = self.stats.get_mut(peer_id) {
601            stats.record_request(bytes);
602        }
603    }
604
605    /// Record a successful response
606    pub fn record_success(&mut self, peer_id: &str, rtt_ms: u64, bytes: u64) {
607        if let Some(stats) = self.stats.get_mut(peer_id) {
608            stats.record_success(rtt_ms, bytes);
609        }
610    }
611
612    /// Record a timeout
613    pub fn record_timeout(&mut self, peer_id: &str) {
614        if let Some(stats) = self.stats.get_mut(peer_id) {
615            stats.record_timeout();
616        }
617    }
618
619    /// Record a failure
620    pub fn record_failure(&mut self, peer_id: &str) {
621        if let Some(stats) = self.stats.get_mut(peer_id) {
622            stats.record_failure();
623        }
624    }
625
626    /// Record payment channel credit for a peer.
627    pub fn record_cashu_payment(&mut self, peer_id: &str, amount_sat: u64) {
628        if amount_sat == 0 {
629            return;
630        }
631        let entry = self
632            .stats
633            .entry(peer_id.to_string())
634            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
635        entry.record_cashu_payment(amount_sat);
636    }
637
638    /// Record a settled post-delivery payment received from a peer.
639    pub fn record_cashu_receipt(&mut self, peer_id: &str, amount_sat: u64) {
640        if amount_sat == 0 {
641            return;
642        }
643        let entry = self
644            .stats
645            .entry(peer_id.to_string())
646            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
647        entry.record_cashu_receipt(amount_sat);
648    }
649
650    /// Record that a peer failed to settle after we delivered successfully.
651    pub fn record_cashu_payment_default(&mut self, peer_id: &str) {
652        let entry = self
653            .stats
654            .entry(peer_id.to_string())
655            .or_insert_with(|| PeerStats::new(peer_id.to_string()));
656        entry.record_cashu_payment_default();
657    }
658
659    pub fn is_peer_blocked_for_payment_defaults(&self, peer_id: &str, threshold: u64) -> bool {
660        self.stats
661            .get(peer_id)
662            .map(|stats| stats.exceeds_payment_default_threshold(threshold))
663            .unwrap_or(false)
664    }
665
666    fn blend_with_payment_priority(&self, stats: &PeerStats, base_score: f64) -> f64 {
667        let reliable_base = base_score * stats.payment_reliability_multiplier();
668        if self.cashu_payment_weight <= 0.0 {
669            return reliable_base;
670        }
671        let payment_score = stats.cashu_priority_boost();
672        (1.0 - self.cashu_payment_weight) * reliable_base
673            + self.cashu_payment_weight * payment_score
674    }
675
676    /// Get available (non-backed-off) peers
677    fn available_peers(&self) -> Vec<String> {
678        self.stats
679            .iter()
680            .filter(|(_, s)| !s.is_backed_off())
681            .map(|(id, _)| id.clone())
682            .collect()
683    }
684
685    /// Check fairness: should this peer be skipped due to over-selection?
686    #[cfg(test)]
687    fn should_skip_for_fairness(&self, peer_id: &str) -> bool {
688        let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
689        self.should_skip_for_fairness_with_total(peer_id, total_rate)
690    }
691
692    fn should_skip_for_fairness_with_total(&self, peer_id: &str, total_rate: f64) -> bool {
693        if !self.fairness_enabled || self.stats.len() < SELECTION_MIN_PEERS || total_rate <= 0.0 {
694            return false;
695        }
696
697        // Check if this peer is selected too often
698        if let Some(stats) = self.stats.get(peer_id) {
699            let peer_rate = stats.selection_rate();
700            let proportion = peer_rate / total_rate;
701            return proportion > SELECTION_PERCENTAGE_WARNING;
702        }
703
704        false
705    }
706
707    /// Select peers ordered by preference
708    ///
709    /// Returns all available peers sorted by preference (best first).
710    /// Respects backoff states and fairness constraints.
711    pub fn select_peers(&mut self) -> Vec<String> {
712        let available = self.available_peers();
713        if available.is_empty() {
714            // If all peers are backed off, return backed off peers anyway
715            // sorted by when their backoff expires (soonest first)
716            let mut backed_off: Vec<_> = self
717                .stats
718                .iter()
719                .filter(|(_, s)| s.is_backed_off())
720                .map(|(id, s)| (id.clone(), s.backoff_remaining()))
721                .collect();
722            backed_off.sort_by_key(|(_, remaining)| *remaining);
723            return backed_off.into_iter().map(|(id, _)| id).collect();
724        }
725
726        // Apply fairness filter
727        let candidates: Vec<String> =
728            if self.fairness_enabled && available.len() >= SELECTION_MIN_PEERS {
729                let total_rate: f64 = self.stats.values().map(|s| s.selection_rate()).sum();
730                available
731                    .into_iter()
732                    .filter(|id| !self.should_skip_for_fairness_with_total(id, total_rate))
733                    .collect()
734            } else {
735                available
736            };
737
738        // If all peers were filtered out for fairness, use all available
739        let candidates = if candidates.is_empty() {
740            self.available_peers()
741        } else {
742            candidates
743        };
744
745        // Sort by strategy
746        let mut sorted: Vec<_> = candidates
747            .into_iter()
748            .filter_map(|id| self.stats.get(&id).map(|s| (id, s.clone())))
749            .collect();
750
751        match self.strategy {
752            SelectionStrategy::Weighted => {
753                // Sort by score (highest first), then by peer_id for determinism
754                sorted.sort_by(|(id_a, a), (id_b, b)| {
755                    let score_a = self.blend_with_payment_priority(a, a.score());
756                    let score_b = self.blend_with_payment_priority(b, b.score());
757                    let score_cmp = score_b
758                        .partial_cmp(&score_a)
759                        .unwrap_or(std::cmp::Ordering::Equal);
760                    if score_cmp == std::cmp::Ordering::Equal {
761                        id_a.cmp(id_b) // Alphabetical for determinism
762                    } else {
763                        score_cmp
764                    }
765                });
766            }
767            SelectionStrategy::LowestLatency => {
768                // Sort by SRTT (lowest first), use score and peer_id as tiebreakers
769                sorted.sort_by(|(id_a, a), (id_b, b)| {
770                    let rtt_cmp = a
771                        .srtt_ms
772                        .partial_cmp(&b.srtt_ms)
773                        .unwrap_or(std::cmp::Ordering::Equal);
774                    if rtt_cmp == std::cmp::Ordering::Equal {
775                        let score_cmp = b
776                            .score()
777                            .partial_cmp(&a.score())
778                            .unwrap_or(std::cmp::Ordering::Equal);
779                        if score_cmp == std::cmp::Ordering::Equal {
780                            id_a.cmp(id_b)
781                        } else {
782                            score_cmp
783                        }
784                    } else {
785                        rtt_cmp
786                    }
787                });
788            }
789            SelectionStrategy::HighestSuccessRate => {
790                // Sort by success rate (highest first), peer_id as tiebreaker
791                sorted.sort_by(|(id_a, a), (id_b, b)| {
792                    let rate_cmp = b
793                        .success_rate()
794                        .partial_cmp(&a.success_rate())
795                        .unwrap_or(std::cmp::Ordering::Equal);
796                    if rate_cmp == std::cmp::Ordering::Equal {
797                        id_a.cmp(id_b)
798                    } else {
799                        rate_cmp
800                    }
801                });
802            }
803            SelectionStrategy::TitForTat => {
804                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
805                sorted.sort_by(|(id_a, a), (id_b, b)| {
806                    let score_a =
807                        self.blend_with_payment_priority(a, a.tit_for_tat_score(total_requests));
808                    let score_b =
809                        self.blend_with_payment_priority(b, b.tit_for_tat_score(total_requests));
810                    let score_cmp = score_b
811                        .partial_cmp(&score_a)
812                        .unwrap_or(std::cmp::Ordering::Equal);
813                    if score_cmp == std::cmp::Ordering::Equal {
814                        id_a.cmp(id_b)
815                    } else {
816                        score_cmp
817                    }
818                });
819            }
820            SelectionStrategy::UtilityUcb => {
821                let total_requests: u64 = sorted.iter().map(|(_, s)| s.requests_sent).sum();
822                sorted.sort_by(|(id_a, a), (id_b, b)| {
823                    let score_a =
824                        self.blend_with_payment_priority(a, a.utility_score(total_requests));
825                    let score_b =
826                        self.blend_with_payment_priority(b, b.utility_score(total_requests));
827                    let score_cmp = score_b
828                        .partial_cmp(&score_a)
829                        .unwrap_or(std::cmp::Ordering::Equal);
830                    if score_cmp == std::cmp::Ordering::Equal {
831                        id_a.cmp(id_b)
832                    } else {
833                        score_cmp
834                    }
835                });
836            }
837            SelectionStrategy::RoundRobin => {
838                // Rotate the list based on round-robin index
839                if !sorted.is_empty() {
840                    let idx = self.round_robin_idx % sorted.len();
841                    sorted.rotate_left(idx);
842                    self.round_robin_idx = (self.round_robin_idx + 1) % sorted.len();
843                }
844            }
845            SelectionStrategy::Random => {
846                // Shuffle using simple deterministic approach for reproducibility
847                // In production, use proper random shuffle
848            }
849        }
850
851        sorted.into_iter().map(|(id, _)| id).collect()
852    }
853
854    /// Select single best peer
855    pub fn select_best(&mut self) -> Option<String> {
856        self.select_peers().into_iter().next()
857    }
858
859    /// Select top N peers
860    pub fn select_top(&mut self, n: usize) -> Vec<String> {
861        self.select_peers().into_iter().take(n).collect()
862    }
863
864    /// Get summary statistics across all peers
865    pub fn summary(&self) -> SelectorSummary {
866        let count = self.stats.len();
867        if count == 0 {
868            return SelectorSummary::default();
869        }
870
871        let total_requests: u64 = self.stats.values().map(|s| s.requests_sent).sum();
872        let total_successes: u64 = self.stats.values().map(|s| s.successes).sum();
873        let total_timeouts: u64 = self.stats.values().map(|s| s.timeouts).sum();
874        let backed_off = self.stats.values().filter(|s| s.is_backed_off()).count();
875
876        let avg_rtt = {
877            let rtts: Vec<f64> = self
878                .stats
879                .values()
880                .filter(|s| s.srtt_ms > 0.0)
881                .map(|s| s.srtt_ms)
882                .collect();
883            if rtts.is_empty() {
884                0.0
885            } else {
886                rtts.iter().sum::<f64>() / rtts.len() as f64
887            }
888        };
889
890        SelectorSummary {
891            peer_count: count,
892            total_requests,
893            total_successes,
894            total_timeouts,
895            backed_off_count: backed_off,
896            avg_rtt_ms: avg_rtt,
897            overall_success_rate: if total_requests > 0 {
898                total_successes as f64 / total_requests as f64
899            } else {
900                0.0
901            },
902        }
903    }
904
905    /// Export persisted peer metadata keyed by stable principal identity.
906    pub fn export_peer_metadata_snapshot(&self) -> PeerMetadataSnapshot {
907        let mut by_principal = self.persisted_metadata.clone();
908        for stats in self.stats.values() {
909            let principal = peer_principal(&stats.peer_id).to_string();
910            by_principal.insert(
911                principal.clone(),
912                PersistedPeerMetadata::from_stats(principal, stats),
913            );
914        }
915
916        let mut peers: Vec<PersistedPeerMetadata> = by_principal.into_values().collect();
917        peers.sort_by(|a, b| a.principal.cmp(&b.principal));
918
919        PeerMetadataSnapshot {
920            version: PEER_METADATA_SNAPSHOT_VERSION,
921            peers,
922        }
923    }
924
925    /// Import persisted metadata and apply it to currently tracked peers.
926    pub fn import_peer_metadata_snapshot(&mut self, snapshot: &PeerMetadataSnapshot) {
927        if snapshot.version != PEER_METADATA_SNAPSHOT_VERSION {
928            return;
929        }
930
931        self.persisted_metadata.clear();
932        for peer in &snapshot.peers {
933            self.persisted_metadata
934                .insert(peer.principal.clone(), peer.clone());
935        }
936
937        for stats in self.stats.values_mut() {
938            if let Some(saved) = self.persisted_metadata.get(peer_principal(&stats.peer_id)) {
939                saved.apply_to_stats(stats);
940            }
941        }
942    }
943}
944
945/// Summary statistics for the selector
946#[derive(Debug, Clone, Default)]
947pub struct SelectorSummary {
948    pub peer_count: usize,
949    pub total_requests: u64,
950    pub total_successes: u64,
951    pub total_timeouts: u64,
952    pub backed_off_count: usize,
953    pub avg_rtt_ms: f64,
954    pub overall_success_rate: f64,
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960    use std::thread::sleep;
961
962    #[test]
963    fn test_peer_stats_success_rate() {
964        let mut stats = PeerStats::new("peer1");
965        assert_eq!(stats.success_rate(), 0.5); // Neutral for new peer
966
967        stats.record_request(40);
968        stats.record_success(50, 1024);
969        assert_eq!(stats.success_rate(), 1.0);
970
971        stats.record_request(40);
972        stats.record_timeout();
973        assert_eq!(stats.success_rate(), 0.5);
974    }
975
976    #[test]
977    fn test_peer_stats_rtt_calculation() {
978        let mut stats = PeerStats::new("peer1");
979
980        // First RTT measurement
981        stats.record_request(40);
982        stats.record_success(100, 1024);
983        assert_eq!(stats.srtt_ms, 100.0);
984        assert_eq!(stats.rttvar_ms, 50.0); // RTT/2
985
986        // Second measurement
987        stats.record_request(40);
988        stats.record_success(80, 1024);
989        // SRTT = 0.875 * 100 + 0.125 * 80 = 87.5 + 10 = 97.5
990        assert!((stats.srtt_ms - 97.5).abs() < 0.1);
991    }
992
993    #[test]
994    fn test_peer_stats_backoff() {
995        let mut stats = PeerStats::new("peer1");
996        assert!(!stats.is_backed_off());
997
998        stats.record_timeout();
999        assert!(stats.is_backed_off());
1000        assert!(stats.backoff_remaining() > Duration::ZERO);
1001    }
1002
1003    #[test]
1004    fn test_peer_stats_backoff_clears_on_success() {
1005        let mut stats = PeerStats::new("peer1");
1006        stats.record_timeout();
1007        assert!(stats.is_backed_off());
1008
1009        stats.record_success(50, 1024);
1010        assert!(!stats.is_backed_off());
1011        assert_eq!(stats.backoff_level, 0);
1012    }
1013
1014    #[test]
1015    fn test_peer_selector_add_remove() {
1016        let mut selector = PeerSelector::new();
1017        selector.add_peer("peer1");
1018        selector.add_peer("peer2");
1019        assert!(selector.get_stats("peer1").is_some());
1020        assert!(selector.get_stats("peer2").is_some());
1021
1022        selector.remove_peer("peer1");
1023        assert!(selector.get_stats("peer1").is_none());
1024        assert!(selector.get_stats("peer2").is_some());
1025    }
1026
1027    #[test]
1028    fn test_peer_selector_weighted_selection() {
1029        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1030        selector.add_peer("peer1");
1031        selector.add_peer("peer2");
1032        selector.add_peer("peer3");
1033
1034        // Peer 1: good (high success, low RTT)
1035        selector.record_request("peer1", 40);
1036        selector.record_success("peer1", 20, 1024);
1037        selector.record_request("peer1", 40);
1038        selector.record_success("peer1", 25, 1024);
1039
1040        // Peer 2: medium
1041        selector.record_request("peer2", 40);
1042        selector.record_success("peer2", 100, 1024);
1043        selector.record_request("peer2", 40);
1044        selector.record_timeout("peer2");
1045
1046        // Peer 3: bad (timeouts)
1047        selector.record_request("peer3", 40);
1048        selector.record_timeout("peer3");
1049        selector.record_request("peer3", 40);
1050        selector.record_timeout("peer3");
1051
1052        // Peer 3 should be backed off
1053        let peers = selector.select_peers();
1054        // Peer 1 should be first (best score)
1055        assert_eq!(peers[0], "peer1");
1056    }
1057
1058    #[test]
1059    fn test_peer_selector_backed_off_peers() {
1060        let mut selector = PeerSelector::new();
1061        selector.add_peer("peer1");
1062        selector.add_peer("peer2");
1063
1064        // Back off peer 1
1065        selector.record_timeout("peer1");
1066        assert!(selector.get_stats("peer1").unwrap().is_backed_off());
1067
1068        // Peer 2 should be available
1069        let peers = selector.select_peers();
1070        assert_eq!(peers.len(), 1);
1071        assert_eq!(peers[0], "peer2");
1072    }
1073
1074    #[test]
1075    fn test_peer_selector_all_backed_off_fallback() {
1076        let mut selector = PeerSelector::new();
1077        selector.add_peer("peer1");
1078        selector.add_peer("peer2");
1079
1080        // Back off both peers
1081        selector.record_timeout("peer1");
1082        selector.record_timeout("peer2");
1083
1084        // Should still return peers (sorted by backoff remaining)
1085        let peers = selector.select_peers();
1086        assert_eq!(peers.len(), 2);
1087    }
1088
1089    #[test]
1090    fn test_peer_selector_fairness() {
1091        let mut selector = PeerSelector::new();
1092        selector.set_fairness(true);
1093
1094        // Add 5+ peers to enable fairness
1095        for i in 1..=6 {
1096            selector.add_peer(format!("peer{}", i));
1097        }
1098
1099        // Simulate peer 1 being selected way too often
1100        sleep(Duration::from_millis(15));
1101
1102        for _ in 0..100 {
1103            selector.record_request("peer1", 40);
1104            selector.record_success("peer1", 10, 100);
1105        }
1106
1107        // Other peers get very few requests
1108        for i in 2..=6 {
1109            selector.record_request(&format!("peer{}", i), 40);
1110            selector.record_success(&format!("peer{}", i), 10, 100);
1111        }
1112
1113        // Peer 1 should be skipped due to fairness (>30% selection rate)
1114        let skipped = selector.should_skip_for_fairness("peer1");
1115        let _ = skipped; // May or may not trigger depending on timing
1116    }
1117
1118    #[test]
1119    fn test_peer_selector_summary() {
1120        let mut selector = PeerSelector::new();
1121        selector.add_peer("peer1");
1122        selector.add_peer("peer2");
1123
1124        selector.record_request("peer1", 40);
1125        selector.record_success("peer1", 50, 1024);
1126        selector.record_request("peer2", 40);
1127        selector.record_timeout("peer2");
1128
1129        let summary = selector.summary();
1130        assert_eq!(summary.peer_count, 2);
1131        assert_eq!(summary.total_requests, 2);
1132        assert_eq!(summary.total_successes, 1);
1133        assert_eq!(summary.total_timeouts, 1);
1134        assert_eq!(summary.backed_off_count, 1);
1135        assert_eq!(summary.overall_success_rate, 0.5);
1136    }
1137
1138    #[test]
1139    fn test_peer_stats_score() {
1140        let mut stats = PeerStats::new("peer1");
1141
1142        // New peer has neutral score
1143        let initial_score = stats.score();
1144        assert!(initial_score > 0.3 && initial_score < 0.7);
1145
1146        // Good peer: high success rate + low RTT
1147        for _ in 0..10 {
1148            stats.record_request(40);
1149            stats.record_success(20, 1024);
1150        }
1151        let good_score = stats.score();
1152        assert!(good_score > 0.8);
1153
1154        // Bad peer: high timeout rate
1155        let mut bad_stats = PeerStats::new("peer2");
1156        for _ in 0..10 {
1157            bad_stats.record_request(40);
1158            bad_stats.record_timeout();
1159        }
1160        let bad_score = bad_stats.score();
1161        assert!(bad_score < 0.3);
1162
1163        assert!(good_score > bad_score);
1164    }
1165
1166    #[test]
1167    fn test_peer_stats_utility_score_prefers_good_over_bad() {
1168        let mut good = PeerStats::new("good");
1169        good.requests_sent = 120;
1170        good.successes = 96;
1171        good.failures = 8;
1172        good.timeouts = 4;
1173        good.srtt_ms = 30.0;
1174        good.bytes_sent = 120 * 40;
1175        good.bytes_received = 96 * 1024;
1176
1177        let mut bad = PeerStats::new("bad");
1178        bad.requests_sent = 120;
1179        bad.successes = 40;
1180        bad.failures = 50;
1181        bad.timeouts = 30;
1182        bad.srtt_ms = 220.0;
1183        bad.bytes_sent = 120 * 40;
1184        bad.bytes_received = 40 * 1024;
1185
1186        let total_requests = good.requests_sent + bad.requests_sent;
1187        assert!(good.utility_score(total_requests) > bad.utility_score(total_requests));
1188    }
1189
1190    #[test]
1191    fn test_peer_stats_tit_for_tat_score_prefers_reciprocal_peer() {
1192        let mut reciprocal = PeerStats::new("reciprocal");
1193        reciprocal.requests_sent = 100;
1194        reciprocal.successes = 90;
1195        reciprocal.failures = 5;
1196        reciprocal.timeouts = 5;
1197        reciprocal.srtt_ms = 40.0;
1198        reciprocal.bytes_sent = 100 * 40;
1199        reciprocal.bytes_received = 90 * 1024;
1200
1201        let mut leecher = PeerStats::new("leecher");
1202        leecher.requests_sent = 100;
1203        leecher.successes = 40;
1204        leecher.failures = 30;
1205        leecher.timeouts = 30;
1206        leecher.srtt_ms = 120.0;
1207        leecher.bytes_sent = 100 * 40;
1208        leecher.bytes_received = 10 * 1024;
1209
1210        let total_requests = reciprocal.requests_sent + leecher.requests_sent;
1211        assert!(
1212            reciprocal.tit_for_tat_score(total_requests)
1213                > leecher.tit_for_tat_score(total_requests)
1214        );
1215    }
1216
1217    #[test]
1218    fn test_utility_ucb_strategy_explores_less_sampled_peer() {
1219        let mut selector = PeerSelector::with_strategy(SelectionStrategy::UtilityUcb);
1220        selector.add_peer("stable");
1221        selector.add_peer("new");
1222
1223        {
1224            let stable = selector.get_stats_mut("stable").unwrap();
1225            stable.requests_sent = 500;
1226            stable.successes = 450;
1227            stable.failures = 35;
1228            stable.timeouts = 15;
1229            stable.srtt_ms = 35.0;
1230            stable.bytes_sent = 500 * 40;
1231            stable.bytes_received = 450 * 1024;
1232        }
1233        {
1234            let new_peer = selector.get_stats_mut("new").unwrap();
1235            new_peer.requests_sent = 2;
1236            new_peer.successes = 2;
1237            new_peer.failures = 0;
1238            new_peer.timeouts = 0;
1239            new_peer.srtt_ms = 70.0;
1240            new_peer.bytes_sent = 2 * 40;
1241            new_peer.bytes_received = 2 * 1024;
1242        }
1243
1244        let peers = selector.select_peers();
1245        assert_eq!(peers[0], "new");
1246    }
1247
1248    #[test]
1249    fn test_tit_for_tat_strategy_prioritizes_reciprocity() {
1250        let mut selector = PeerSelector::with_strategy(SelectionStrategy::TitForTat);
1251        selector.add_peer("reciprocal");
1252        selector.add_peer("leecher");
1253
1254        {
1255            let reciprocal = selector.get_stats_mut("reciprocal").unwrap();
1256            reciprocal.requests_sent = 120;
1257            reciprocal.successes = 102;
1258            reciprocal.failures = 8;
1259            reciprocal.timeouts = 10;
1260            reciprocal.srtt_ms = 45.0;
1261            reciprocal.bytes_sent = 120 * 40;
1262            reciprocal.bytes_received = 102 * 1024;
1263        }
1264        {
1265            let leecher = selector.get_stats_mut("leecher").unwrap();
1266            leecher.requests_sent = 120;
1267            leecher.successes = 70;
1268            leecher.failures = 20;
1269            leecher.timeouts = 30;
1270            leecher.srtt_ms = 35.0;
1271            leecher.bytes_sent = 120 * 40;
1272            leecher.bytes_received = 8 * 1024;
1273        }
1274
1275        let peers = selector.select_peers();
1276        assert_eq!(peers[0], "reciprocal");
1277    }
1278
1279    #[test]
1280    fn test_lowest_latency_strategy() {
1281        let mut selector = PeerSelector::with_strategy(SelectionStrategy::LowestLatency);
1282        selector.add_peer("peer1");
1283        selector.add_peer("peer2");
1284        selector.add_peer("peer3");
1285
1286        // Peer 1: 100ms RTT
1287        selector.record_request("peer1", 40);
1288        selector.record_success("peer1", 100, 1024);
1289
1290        // Peer 2: 20ms RTT (fastest)
1291        selector.record_request("peer2", 40);
1292        selector.record_success("peer2", 20, 1024);
1293
1294        // Peer 3: 50ms RTT
1295        selector.record_request("peer3", 40);
1296        selector.record_success("peer3", 50, 1024);
1297
1298        let peers = selector.select_peers();
1299        // Peer 2 should be first (lowest RTT)
1300        assert_eq!(peers[0], "peer2");
1301    }
1302
1303    fn build_cashu_priority_fixture() -> PeerSelector {
1304        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1305        selector.add_peer("reliable");
1306        selector.add_peer("paid");
1307
1308        {
1309            let reliable = selector.get_stats_mut("reliable").expect("reliable");
1310            reliable.requests_sent = 80;
1311            reliable.successes = 75;
1312            reliable.failures = 2;
1313            reliable.timeouts = 3;
1314            reliable.srtt_ms = 40.0;
1315            reliable.bytes_sent = 80 * 40;
1316            reliable.bytes_received = 75 * 1024;
1317        }
1318        {
1319            let paid = selector.get_stats_mut("paid").expect("paid");
1320            paid.requests_sent = 80;
1321            paid.successes = 36;
1322            paid.failures = 24;
1323            paid.timeouts = 20;
1324            paid.srtt_ms = 700.0;
1325            paid.bytes_sent = 80 * 40;
1326            paid.bytes_received = 36 * 512;
1327        }
1328
1329        selector
1330    }
1331
1332    #[test]
1333    fn test_cashu_payment_weight_zero_keeps_reputation_order() {
1334        let mut selector = build_cashu_priority_fixture();
1335        selector.set_cashu_payment_weight(0.0);
1336        selector.record_cashu_payment("paid", 5_000);
1337
1338        let peers = selector.select_peers();
1339        assert_eq!(peers[0], "reliable");
1340    }
1341
1342    #[test]
1343    fn test_cashu_payment_weight_prioritizes_paid_peer() {
1344        let mut selector = build_cashu_priority_fixture();
1345        selector.set_cashu_payment_weight(0.8);
1346        selector.record_cashu_payment("paid", 5_000);
1347
1348        let peers = selector.select_peers();
1349        assert_eq!(peers[0], "paid");
1350    }
1351
1352    #[test]
1353    fn test_cashu_payment_default_downranks_peer() {
1354        let mut selector = PeerSelector::with_strategy(SelectionStrategy::Weighted);
1355        selector.add_peer("honest");
1356        selector.add_peer("delinquent");
1357
1358        for peer_id in ["honest", "delinquent"] {
1359            let stats = selector.get_stats_mut(peer_id).expect("stats");
1360            stats.requests_sent = 40;
1361            stats.successes = 34;
1362            stats.failures = 3;
1363            stats.timeouts = 3;
1364            stats.srtt_ms = 60.0;
1365            stats.bytes_sent = 40 * 40;
1366            stats.bytes_received = 34 * 1024;
1367        }
1368
1369        selector.record_cashu_payment_default("delinquent");
1370
1371        let peers = selector.select_peers();
1372        assert_eq!(peers[0], "honest");
1373        assert!(!peers.iter().any(|peer| peer == "delinquent"));
1374    }
1375
1376    #[test]
1377    fn test_payment_default_threshold_blocks_peer() {
1378        let mut selector = PeerSelector::new();
1379        selector.record_cashu_payment_default("peer-a");
1380        assert!(selector.is_peer_blocked_for_payment_defaults("peer-a", 1));
1381        assert!(!selector.is_peer_blocked_for_payment_defaults("peer-a", 2));
1382    }
1383
1384    #[test]
1385    fn test_peer_principal_prefers_stable_identity_prefix() {
1386        assert_eq!(peer_principal("npub1abc:session-1"), "npub1abc");
1387        assert_eq!(peer_principal("npub1abc"), "npub1abc");
1388    }
1389
1390    #[test]
1391    fn test_metadata_snapshot_restores_across_session_ids() {
1392        let mut selector = PeerSelector::new();
1393        selector.add_peer("npub1stable:session-a");
1394        selector.record_request("npub1stable:session-a", 64);
1395        selector.record_success("npub1stable:session-a", 32, 1024);
1396        selector.record_cashu_payment("npub1stable:session-a", 77);
1397        selector.record_cashu_receipt("npub1stable:session-a", 33);
1398        selector.record_cashu_payment_default("npub1stable:session-a");
1399
1400        let snapshot = selector.export_peer_metadata_snapshot();
1401        assert_eq!(snapshot.version, PEER_METADATA_SNAPSHOT_VERSION);
1402        assert_eq!(snapshot.peers.len(), 1);
1403        assert_eq!(snapshot.peers[0].principal, "npub1stable");
1404
1405        let mut restored = PeerSelector::new();
1406        restored.import_peer_metadata_snapshot(&snapshot);
1407        restored.add_peer("npub1stable:session-b");
1408        let stats = restored
1409            .get_stats("npub1stable:session-b")
1410            .expect("restored stats");
1411        assert_eq!(stats.requests_sent, 1);
1412        assert_eq!(stats.successes, 1);
1413        assert_eq!(stats.cashu_paid_sat, 77);
1414        assert_eq!(stats.cashu_received_sat, 33);
1415        assert_eq!(stats.cashu_payment_receipts, 1);
1416        assert_eq!(stats.cashu_payment_defaults, 1);
1417    }
1418}