Skip to main content

irontide_session/
peer_state.rs

1use std::collections::HashSet;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6/// EWMA smoothing factor for RTT tracking (M106).
7pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;
8
9/// Compute one step of an exponential weighted moving average update.
10///
11/// `current` is the existing average, `sample` is the new observation, and
12/// `alpha` controls responsiveness (higher = more weight to recent samples).
13/// Returns the updated average.
14pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
15    alpha * sample + (1.0 - alpha) * current
16}
17
18use rustc_hash::{FxBuildHasher, FxHashMap};
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc;
21
22use irontide_storage::Bitfield;
23use irontide_wire::ExtHandshake;
24
25use crate::pipeline::PeerPipelineState;
26use crate::types::PeerCommand;
27
28/// O(1) lookup container for outstanding block requests to a peer.
29///
30/// Keyed on `(piece_index, block_offset)`, value is `block_length`.
31/// Replaces `Vec<(u32, u32, u32)>` which required O(n) linear scan.
32#[derive(Debug, Clone)]
33pub(crate) struct PendingRequests {
34    inner: FxHashMap<(u32, u32), u32>,
35}
36
37#[allow(dead_code)]
38impl PendingRequests {
39    pub fn new() -> Self {
40        Self {
41            inner: FxHashMap::with_capacity_and_hasher(32, FxBuildHasher),
42        }
43    }
44
45    pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
46        self.inner.insert((index, begin), length);
47    }
48
49    pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
50        self.inner.remove(&(index, begin))
51    }
52
53    pub fn contains(&self, index: u32, begin: u32) -> bool {
54        self.inner.contains_key(&(index, begin))
55    }
56
57    pub fn len(&self) -> usize {
58        self.inner.len()
59    }
60
61    pub fn is_empty(&self) -> bool {
62        self.inner.is_empty()
63    }
64
65    pub fn clear(&mut self) {
66        self.inner.clear();
67    }
68
69    pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
70        self.inner
71            .iter()
72            .map(|(&(index, begin), &length)| (index, begin, length))
73    }
74}
75
76/// Origin of a peer address.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
78pub enum PeerSource {
79    /// Returned by a tracker announce.
80    Tracker,
81    /// Discovered via the DHT.
82    Dht,
83    /// Received via Peer Exchange (BEP 11).
84    Pex,
85    /// Found via Local Service Discovery (BEP 14).
86    Lsd,
87    /// Connected to us (incoming connection).
88    Incoming,
89    /// Loaded from saved resume data.
90    ResumeData,
91    /// Discovered via I2P SAM bridge.
92    I2p,
93    /// Injected via the HTTP API.
94    Api,
95}
96
97/// Per-peer state tracked by the torrent actor.
98#[allow(dead_code)] // consumed by torrent module (not yet implemented)
99pub(crate) struct PeerState {
100    pub addr: SocketAddr,
101    /// Peer is choking us (default: true).
102    pub peer_choking: bool,
103    /// Peer is interested in us (default: false).
104    pub peer_interested: bool,
105    /// We are choking the peer (default: true).
106    pub am_choking: bool,
107    /// We are interested in the peer (default: false).
108    pub am_interested: bool,
109    /// What pieces the peer has.
110    pub bitfield: Bitfield,
111    /// Download rate in bytes/sec (for choker).
112    pub download_rate: u64,
113    /// Upload rate in bytes/sec.
114    pub upload_rate: u64,
115    /// Bytes downloaded from this peer in current rate window.
116    pub download_bytes_window: u64,
117    /// Bytes uploaded to this peer in current rate window.
118    pub upload_bytes_window: u64,
119    /// Outstanding requests to this peer (index, begin, length).
120    pub pending_requests: PendingRequests,
121    /// Requests from this peer to us (index, begin, length).
122    pub incoming_requests: Vec<(u32, u32, u32)>,
123    /// Peer's extension handshake, if received.
124    pub ext_handshake: Option<ExtHandshake>,
125    /// Whether the peer supports BEP 6 Fast Extension.
126    pub supports_fast: bool,
127    /// Set of piece indices the peer is allowed to request while choked.
128    pub allowed_fast: HashSet<u32>,
129    /// BEP 21: peer declared upload-only status.
130    pub upload_only: bool,
131    /// BEP 16: piece index we revealed to this peer in super-seed mode.
132    pub super_seed_assigned: Option<u32>,
133    /// Channel to send commands to this peer's task.
134    pub cmd_tx: mpsc::Sender<PeerCommand>,
135    /// Per-peer dynamic request queue sizing (M28).
136    pub pipeline: PeerPipelineState,
137    /// Whether this peer is snubbed (no data for `snub_timeout_secs`).
138    pub snubbed: bool,
139    /// When this peer last unchoked us. Used for time-windowed rotation protection:
140    /// peers unchoked within 30s are protected from choke rotation.
141    pub last_unchoked_at: Option<std::time::Instant>,
142    /// Last time we received data from this peer.
143    pub last_data_received: Option<std::time::Instant>,
144    /// When this peer connection was established.
145    pub connected_at: std::time::Instant,
146    /// BEP 6: pieces suggested by this peer.
147    pub suggested_pieces: HashSet<u32>,
148    /// How this peer was discovered.
149    pub source: PeerSource,
150    /// BEP 55: peer advertised `ut_holepunch` support in their extension handshake.
151    pub supports_holepunch: bool,
152    /// Whether this peer appears to be `NATed` (no incoming connections observed).
153    pub appears_nated: bool,
154    /// M174: Whether the connection is encrypted via MSE/PE (RC4).
155    pub is_encrypted: bool,
156    /// Transport protocol used for this peer connection.
157    pub transport: Option<crate::rate_limiter::PeerTransport>,
158    /// Number of successfully received blocks.
159    pub blocks_completed: u64,
160    /// Number of blocks that timed out.
161    pub blocks_timed_out: u64,
162    /// Exponentially weighted moving average of RTT in seconds.
163    pub avg_rtt: Option<f64>,
164    /// Shared atomic in-flight request counter from `PeerShared`.
165    /// Created at the spawn site and shared with the peer task — always available.
166    pub in_flight: Arc<AtomicU32>,
167    /// M149: Dynamic per-peer pipeline depth target. Updated every 10s in
168    /// `update_peer_rates()` based on download throughput. The requester task
169    /// checks this before acquiring a semaphore permit.
170    pub target_depth: Arc<AtomicU32>,
171    /// M147: When the remote peer started choking us. Set on `PeerChoking { choking: true }`,
172    /// cleared on `PeerChoking { choking: false }`. Used for eviction scoring.
173    pub choked_since: Option<std::time::Instant>,
174    /// M147: When this peer transitioned to Live (completed BT handshake).
175    /// Used for 10-second grace period protection during eviction.
176    pub live_since: Option<std::time::Instant>,
177    /// M149: Cumulative bytes downloaded from this peer over the entire connection
178    /// lifetime. Unlike `download_bytes_window` (reset every rate-calculation
179    /// interval), this counter is monotonically increasing and never reset.
180    /// Used by Pass 0 eviction: a peer with `download_bytes_total == 0` has
181    /// never sent us any data.
182    pub download_bytes_total: u64,
183    /// M182: cross-actor wake for the reader's `event_tx`
184    /// `BackpressureQueue`. `TorrentActor`'s `event_rx` drain loop pings
185    /// this after every consumed event for this peer — the reader's
186    /// outer `select!` waits on it via `Notified` to retry stalled
187    /// sends. Same `Arc` as `PeerShared.event_drain_notify`.
188    pub event_drain_notify: Arc<tokio::sync::Notify>,
189    /// Cumulative time we had this peer unchoked (we = our session sending
190    /// `Unchoke` to them). Updated at choke→unchoke / unchoke→choke
191    /// transitions in `run_choker`. Flushed into `TorrentActor.unchoke_durations`
192    /// on peer disconnect so the per-(SocketAddr × torrent) total survives
193    /// reconnects.
194    pub unchoke_duration_total: std::time::Duration,
195    /// `Some(t)` when we are currently NOT choking this peer (we sent
196    /// `Unchoke`); `None` while we are choking. The transition sites in
197    /// `run_choker` swap this in/out of `Some` and accumulate the delta
198    /// into `unchoke_duration_total` on every unchoke→choke flip.
199    pub am_unchoke_started_at: Option<std::time::Instant>,
200}
201
202#[allow(dead_code)]
203impl PeerState {
204    pub fn new(
205        addr: SocketAddr,
206        bitfield_len: u32,
207        cmd_tx: mpsc::Sender<PeerCommand>,
208        source: PeerSource,
209        in_flight: Arc<AtomicU32>,
210        target_depth: Arc<AtomicU32>,
211        event_drain_notify: Arc<tokio::sync::Notify>,
212    ) -> Self {
213        Self {
214            addr,
215            peer_choking: true,
216            peer_interested: false,
217            am_choking: false, // M107: unconditional Unchoke is sent on connect
218            am_interested: false,
219            bitfield: Bitfield::new(bitfield_len),
220            download_rate: 0,
221            upload_rate: 0,
222            download_bytes_window: 0,
223            upload_bytes_window: 0,
224            pending_requests: PendingRequests::new(),
225            incoming_requests: Vec::with_capacity(32),
226            ext_handshake: None,
227            supports_fast: false,
228            allowed_fast: HashSet::new(),
229            upload_only: false,
230            super_seed_assigned: None,
231            cmd_tx,
232            pipeline: PeerPipelineState::new(),
233            snubbed: false,
234            last_unchoked_at: None,
235            last_data_received: None,
236            connected_at: std::time::Instant::now(),
237            suggested_pieces: HashSet::new(),
238            source,
239            supports_holepunch: false,
240            appears_nated: false,
241            is_encrypted: false,
242            transport: None,
243            blocks_completed: 0,
244            blocks_timed_out: 0,
245            avg_rtt: None,
246            in_flight,
247            target_depth,
248            // M147: Peer starts choked (peer_choking: true), so choked_since reflects that.
249            choked_since: Some(std::time::Instant::now()),
250            live_since: None,
251            download_bytes_total: 0,
252            event_drain_notify,
253            unchoke_duration_total: std::time::Duration::ZERO,
254            // M107: peers start with `am_choking: false` (unconditional
255            // unchoke on connect), so the unchoke window starts at
256            // construction time. Choker transitions in `run_choker` will
257            // toggle this on each subsequent flip.
258            am_unchoke_started_at: Some(std::time::Instant::now()),
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn peer_source_serialization() {
269        let source = PeerSource::Tracker;
270        let json = serde_json::to_string(&source).unwrap();
271        assert_eq!(json, "\"Tracker\"");
272        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
273        assert_eq!(roundtrip, PeerSource::Tracker);
274    }
275
276    #[test]
277    fn peer_source_all_variants() {
278        let variants = [
279            PeerSource::Tracker,
280            PeerSource::Dht,
281            PeerSource::Pex,
282            PeerSource::Lsd,
283            PeerSource::Incoming,
284            PeerSource::ResumeData,
285            PeerSource::I2p,
286            PeerSource::Api,
287        ];
288        for source in variants {
289            let json = serde_json::to_string(&source).unwrap();
290            let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
291            assert_eq!(roundtrip, source);
292        }
293    }
294
295    #[test]
296    fn peer_state_has_connected_at() {
297        let (tx, _rx) = tokio::sync::mpsc::channel(1);
298        let peer = PeerState::new(
299            "127.0.0.1:6881".parse().unwrap(),
300            100,
301            tx,
302            PeerSource::Tracker,
303            Arc::new(AtomicU32::new(0)),
304            Arc::new(AtomicU32::new(128)),
305            Arc::new(tokio::sync::Notify::new()),
306        );
307        assert!(peer.connected_at.elapsed().as_secs() < 1);
308    }
309
310    #[test]
311    fn pending_requests_insert_remove() {
312        let mut pr = PendingRequests::new();
313        assert!(pr.is_empty());
314        assert_eq!(pr.len(), 0);
315
316        // Insert
317        pr.insert(5, 0, 16384);
318        pr.insert(5, 16384, 16384);
319        pr.insert(10, 0, 16384);
320        assert_eq!(pr.len(), 3);
321        assert!(pr.contains(5, 0));
322        assert!(pr.contains(10, 0));
323        assert!(!pr.contains(99, 0));
324
325        // Remove existing
326        assert_eq!(pr.remove(5, 0), Some(16384));
327        assert_eq!(pr.len(), 2);
328        assert!(!pr.contains(5, 0));
329
330        // Remove non-existent
331        assert_eq!(pr.remove(99, 0), None);
332
333        // Duplicate insert overwrites
334        pr.insert(5, 16384, 8192);
335        assert_eq!(pr.len(), 2); // same key, count unchanged
336        assert_eq!(pr.remove(5, 16384), Some(8192)); // new value
337
338        // Clear
339        pr.insert(1, 0, 16384);
340        pr.clear();
341        assert!(pr.is_empty());
342
343        // Iter
344        pr.insert(3, 0, 16384);
345        pr.insert(3, 16384, 16384);
346        let mut items: Vec<_> = pr.iter().collect();
347        items.sort_unstable();
348        assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
349    }
350
351    #[test]
352    fn peer_source_i2p_serialization() {
353        let source = PeerSource::I2p;
354        let json = serde_json::to_string(&source).unwrap();
355        assert_eq!(json, "\"I2p\"");
356        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
357        assert_eq!(roundtrip, PeerSource::I2p);
358    }
359
360    // ── M132: in_flight counter always available at construction ──
361
362    #[test]
363    fn in_flight_zero_at_construction() {
364        let (tx, _rx) = tokio::sync::mpsc::channel(1);
365        let peer = PeerState::new(
366            "127.0.0.1:6881".parse().unwrap(),
367            100,
368            tx,
369            PeerSource::Tracker,
370            Arc::new(AtomicU32::new(0)),
371            Arc::new(AtomicU32::new(128)),
372            Arc::new(tokio::sync::Notify::new()),
373        );
374        assert_eq!(
375            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
376            0,
377            "in_flight should be zero at construction"
378        );
379    }
380
381    // ── Per-peer unchoke duration accumulator ─────────────────────────
382
383    fn make_peer_state(addr_str: &str) -> PeerState {
384        let (tx, _rx) = tokio::sync::mpsc::channel(1);
385        PeerState::new(
386            addr_str.parse().unwrap(),
387            100,
388            tx,
389            PeerSource::Tracker,
390            Arc::new(AtomicU32::new(0)),
391            Arc::new(AtomicU32::new(128)),
392            Arc::new(tokio::sync::Notify::new()),
393        )
394    }
395
396    #[test]
397    fn unchoke_duration_starts_zero_with_active_window() {
398        let peer = make_peer_state("127.0.0.1:6881");
399        assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
400        assert!(
401            peer.am_unchoke_started_at.is_some(),
402            "M107 starts peers unchoked, so the unchoke window opens at construction"
403        );
404    }
405
406    #[test]
407    fn unchoke_choke_unchoke_choke_accumulates() {
408        // Simulate the run_choker transitions directly on the accumulator.
409        let mut peer = make_peer_state("127.0.0.1:6881");
410        // Force a known starting point so we're not measuring construction time.
411        let t0 = std::time::Instant::now();
412        peer.am_unchoke_started_at = Some(t0);
413        peer.unchoke_duration_total = std::time::Duration::ZERO;
414
415        // First choke after ~50 ms unchoked.
416        std::thread::sleep(std::time::Duration::from_millis(50));
417        if let Some(start) = peer.am_unchoke_started_at.take() {
418            peer.unchoke_duration_total += start.elapsed();
419        }
420        assert!(peer.am_unchoke_started_at.is_none());
421        let after_first_choke = peer.unchoke_duration_total;
422        assert!(after_first_choke >= std::time::Duration::from_millis(40));
423
424        // Re-unchoke and run for ~50 ms more.
425        peer.am_unchoke_started_at = Some(std::time::Instant::now());
426        std::thread::sleep(std::time::Duration::from_millis(50));
427        if let Some(start) = peer.am_unchoke_started_at.take() {
428            peer.unchoke_duration_total += start.elapsed();
429        }
430        assert!(
431            peer.unchoke_duration_total > after_first_choke,
432            "second window must extend the accumulator"
433        );
434        assert!(
435            peer.unchoke_duration_total >= std::time::Duration::from_millis(80),
436            "two ~50 ms windows must add to ≥80 ms — got {:?}",
437            peer.unchoke_duration_total
438        );
439    }
440
441    #[test]
442    fn pure_choked_peer_has_zero_total() {
443        let mut peer = make_peer_state("127.0.0.1:6881");
444        // Cancel the construction-time auto-open so we model "never unchoked"
445        // (peer connected, choker chose never to flip am_choking false).
446        peer.am_unchoke_started_at = None;
447        std::thread::sleep(std::time::Duration::from_millis(10));
448        // No transition happens; total stays zero.
449        assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
450    }
451
452    #[test]
453    fn build_peer_info_reads_in_flight() {
454        // Verify that in_flight is read directly from the shared atomic counter.
455        let counter = Arc::new(AtomicU32::new(42));
456        let (tx, _rx) = tokio::sync::mpsc::channel(1);
457        let peer = PeerState::new(
458            "127.0.0.1:6881".parse().unwrap(),
459            100,
460            tx,
461            PeerSource::Tracker,
462            Arc::clone(&counter),
463            Arc::new(AtomicU32::new(128)),
464            Arc::new(tokio::sync::Notify::new()),
465        );
466
467        // Simulate the build_peer_info logic — direct load, no Option.
468        let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
469
470        assert_eq!(
471            num_pending, 42,
472            "num_pending_requests should read from in_flight atomic"
473        );
474
475        // Mutate via the external Arc clone — PeerState sees the update.
476        counter.store(99, std::sync::atomic::Ordering::Relaxed);
477        let num_pending_updated =
478            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
479        assert_eq!(
480            num_pending_updated, 99,
481            "PeerState should see updates via shared Arc"
482        );
483    }
484}