Skip to main content

irontide_engine/
peer_state.rs

1use std::collections::HashSet;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6/// EWMA smoothing factor for RTT tracking (M106).
7pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;
8
9/// Compute one step of an exponential weighted moving average update.
10///
11/// `current` is the existing average, `sample` is the new observation, and
12/// `alpha` controls responsiveness (higher = more weight to recent samples).
13/// Returns the updated average.
14pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
15    alpha * sample + (1.0 - alpha) * current
16}
17
18use rustc_hash::{FxBuildHasher, FxHashMap};
19use tokio::sync::mpsc;
20
21use irontide_storage::Bitfield;
22use irontide_wire::ExtHandshake;
23
24use crate::pipeline::PeerPipelineState;
25use crate::types::PeerCommand;
26
27/// O(1) lookup container for outstanding block requests to a peer.
28///
29/// Keyed on `(piece_index, block_offset)`, value is `block_length`.
30/// Replaces `Vec<(u32, u32, u32)>` which required O(n) linear scan.
31#[derive(Debug, Clone)]
32pub(crate) struct PendingRequests {
33    inner: FxHashMap<(u32, u32), u32>,
34}
35
36#[allow(dead_code)]
37impl PendingRequests {
38    pub fn new() -> Self {
39        Self {
40            inner: FxHashMap::with_capacity_and_hasher(32, FxBuildHasher),
41        }
42    }
43
44    pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
45        self.inner.insert((index, begin), length);
46    }
47
48    pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
49        self.inner.remove(&(index, begin))
50    }
51
52    pub fn contains(&self, index: u32, begin: u32) -> bool {
53        self.inner.contains_key(&(index, begin))
54    }
55
56    pub fn len(&self) -> usize {
57        self.inner.len()
58    }
59
60    pub fn is_empty(&self) -> bool {
61        self.inner.is_empty()
62    }
63
64    pub fn clear(&mut self) {
65        self.inner.clear();
66    }
67
68    pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
69        self.inner
70            .iter()
71            .map(|(&(index, begin), &length)| (index, begin, length))
72    }
73}
74
75/// Origin of a peer address — relocated to `irontide-session-types` at M244a;
76/// re-exported here so `crate::peer_state::PeerSource` resolves unchanged.
77pub use irontide_session_types::PeerSource;
78
79/// Per-peer state tracked by the torrent actor.
80#[allow(dead_code)] // consumed by torrent module (not yet implemented)
81pub(crate) struct PeerState {
82    pub addr: SocketAddr,
83    /// Peer is choking us (default: true).
84    pub peer_choking: bool,
85    /// Peer is interested in us (default: false).
86    pub peer_interested: bool,
87    /// We are choking the peer (default: true).
88    pub am_choking: bool,
89    /// We are interested in the peer (default: false).
90    pub am_interested: bool,
91    /// What pieces the peer has.
92    pub bitfield: Bitfield,
93    /// Download rate in bytes/sec (for choker).
94    pub download_rate: u64,
95    /// Upload rate in bytes/sec.
96    pub upload_rate: u64,
97    /// Bytes downloaded from this peer in current rate window.
98    pub download_bytes_window: u64,
99    /// Bytes uploaded to this peer in current rate window.
100    pub upload_bytes_window: u64,
101    /// Outstanding requests to this peer (index, begin, length).
102    pub pending_requests: PendingRequests,
103    /// Requests from this peer to us (index, begin, length).
104    pub incoming_requests: Vec<(u32, u32, u32)>,
105    /// Peer's extension handshake, if received.
106    pub ext_handshake: Option<ExtHandshake>,
107    /// Whether the peer supports BEP 6 Fast Extension.
108    pub supports_fast: bool,
109    /// Set of piece indices the peer is allowed to request while choked.
110    pub allowed_fast: HashSet<u32>,
111    /// BEP 21: peer declared upload-only status.
112    pub upload_only: bool,
113    /// BEP 16: piece index we revealed to this peer in super-seed mode.
114    pub super_seed_assigned: Option<u32>,
115    /// Channel to send commands to this peer's task.
116    pub cmd_tx: mpsc::Sender<PeerCommand>,
117    /// Per-peer dynamic request queue sizing (M28).
118    pub pipeline: PeerPipelineState,
119    /// Whether this peer is snubbed (no data for `snub_timeout_secs`).
120    pub snubbed: bool,
121    /// When this peer last unchoked us. Used for time-windowed rotation protection:
122    /// peers unchoked within 30s are protected from choke rotation.
123    pub last_unchoked_at: Option<std::time::Instant>,
124    /// Last time we received data from this peer.
125    pub last_data_received: Option<std::time::Instant>,
126    /// When this peer connection was established.
127    pub connected_at: std::time::Instant,
128    /// BEP 6: pieces suggested by this peer.
129    pub suggested_pieces: HashSet<u32>,
130    /// How this peer was discovered.
131    pub source: PeerSource,
132    /// BEP 55: peer advertised `ut_holepunch` support in their extension handshake.
133    pub supports_holepunch: bool,
134    /// Whether this peer appears to be `NATed` (no incoming connections observed).
135    pub appears_nated: bool,
136    /// M174: Whether the connection is encrypted via MSE/PE (RC4).
137    pub is_encrypted: bool,
138    /// Transport protocol used for this peer connection.
139    pub transport: Option<crate::rate_limiter::PeerTransport>,
140    /// Number of successfully received blocks.
141    pub blocks_completed: u64,
142    /// Number of blocks that timed out.
143    pub blocks_timed_out: u64,
144    /// Exponentially weighted moving average of RTT in seconds.
145    pub avg_rtt: Option<f64>,
146    /// Shared atomic in-flight request counter from `PeerShared`.
147    /// Created at the spawn site and shared with the peer task — always available.
148    pub in_flight: Arc<AtomicU32>,
149    /// M149: Dynamic per-peer pipeline depth target. Updated every 10s in
150    /// `update_peer_rates()` based on download throughput. The requester task
151    /// checks this before acquiring a semaphore permit.
152    pub target_depth: Arc<AtomicU32>,
153    /// M147: When the remote peer started choking us. Set on `PeerChoking { choking: true }`,
154    /// cleared on `PeerChoking { choking: false }`. Used for eviction scoring.
155    pub choked_since: Option<std::time::Instant>,
156    /// M147: When this peer transitioned to Live (completed BT handshake).
157    /// Used for 10-second grace period protection during eviction.
158    pub live_since: Option<std::time::Instant>,
159    /// M149: Cumulative bytes downloaded from this peer over the entire connection
160    /// lifetime. Unlike `download_bytes_window` (reset every rate-calculation
161    /// interval), this counter is monotonically increasing and never reset.
162    /// Used by Pass 0 eviction: a peer with `download_bytes_total == 0` has
163    /// never sent us any data.
164    pub download_bytes_total: u64,
165    /// M182: cross-actor wake for the reader's `event_tx`
166    /// `BackpressureQueue`. `TorrentActor`'s `event_rx` drain loop pings
167    /// this after every consumed event for this peer — the reader's
168    /// outer `select!` waits on it via `Notified` to retry stalled
169    /// sends. Same `Arc` as `PeerShared.event_drain_notify`.
170    pub event_drain_notify: Arc<tokio::sync::Notify>,
171    /// Cumulative time we had this peer unchoked (we = our session sending
172    /// `Unchoke` to them). Updated at choke→unchoke / unchoke→choke
173    /// transitions in `run_choker`. Flushed into `TorrentActor.unchoke_durations`
174    /// on peer disconnect so the per-(SocketAddr × torrent) total survives
175    /// reconnects.
176    pub unchoke_duration_total: std::time::Duration,
177    /// `Some(t)` when we are currently NOT choking this peer (we sent
178    /// `Unchoke`); `None` while we are choking. The transition sites in
179    /// `run_choker` swap this in/out of `Some` and accumulate the delta
180    /// into `unchoke_duration_total` on every unchoke→choke flip.
181    pub am_unchoke_started_at: Option<std::time::Instant>,
182}
183
184#[allow(dead_code)]
185impl PeerState {
186    pub fn new(
187        addr: SocketAddr,
188        bitfield_len: u32,
189        cmd_tx: mpsc::Sender<PeerCommand>,
190        source: PeerSource,
191        in_flight: Arc<AtomicU32>,
192        target_depth: Arc<AtomicU32>,
193        event_drain_notify: Arc<tokio::sync::Notify>,
194    ) -> Self {
195        Self {
196            addr,
197            peer_choking: true,
198            peer_interested: false,
199            am_choking: false, // M107: unconditional Unchoke is sent on connect
200            am_interested: false,
201            bitfield: Bitfield::new(bitfield_len),
202            download_rate: 0,
203            upload_rate: 0,
204            download_bytes_window: 0,
205            upload_bytes_window: 0,
206            pending_requests: PendingRequests::new(),
207            incoming_requests: Vec::with_capacity(32),
208            ext_handshake: None,
209            supports_fast: false,
210            allowed_fast: HashSet::new(),
211            upload_only: false,
212            super_seed_assigned: None,
213            cmd_tx,
214            pipeline: PeerPipelineState::new(),
215            snubbed: false,
216            last_unchoked_at: None,
217            last_data_received: None,
218            connected_at: std::time::Instant::now(),
219            suggested_pieces: HashSet::new(),
220            source,
221            supports_holepunch: false,
222            appears_nated: false,
223            is_encrypted: false,
224            transport: None,
225            blocks_completed: 0,
226            blocks_timed_out: 0,
227            avg_rtt: None,
228            in_flight,
229            target_depth,
230            // M147: Peer starts choked (peer_choking: true), so choked_since reflects that.
231            choked_since: Some(std::time::Instant::now()),
232            live_since: None,
233            download_bytes_total: 0,
234            event_drain_notify,
235            unchoke_duration_total: std::time::Duration::ZERO,
236            // M107: peers start with `am_choking: false` (unconditional
237            // unchoke on connect), so the unchoke window starts at
238            // construction time. Choker transitions in `run_choker` will
239            // toggle this on each subsequent flip.
240            am_unchoke_started_at: Some(std::time::Instant::now()),
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn peer_source_serialization() {
251        let source = PeerSource::Tracker;
252        let json = serde_json::to_string(&source).unwrap();
253        assert_eq!(json, "\"Tracker\"");
254        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
255        assert_eq!(roundtrip, PeerSource::Tracker);
256    }
257
258    #[test]
259    fn peer_source_all_variants() {
260        let variants = [
261            PeerSource::Tracker,
262            PeerSource::Dht,
263            PeerSource::Pex,
264            PeerSource::Lsd,
265            PeerSource::Incoming,
266            PeerSource::ResumeData,
267            PeerSource::I2p,
268            PeerSource::Api,
269        ];
270        for source in variants {
271            let json = serde_json::to_string(&source).unwrap();
272            let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
273            assert_eq!(roundtrip, source);
274        }
275    }
276
277    #[test]
278    fn peer_state_has_connected_at() {
279        let (tx, _rx) = tokio::sync::mpsc::channel(1);
280        let peer = PeerState::new(
281            "127.0.0.1:6881".parse().unwrap(),
282            100,
283            tx,
284            PeerSource::Tracker,
285            Arc::new(AtomicU32::new(0)),
286            Arc::new(AtomicU32::new(128)),
287            Arc::new(tokio::sync::Notify::new()),
288        );
289        assert!(peer.connected_at.elapsed().as_secs() < 1);
290    }
291
292    #[test]
293    fn pending_requests_insert_remove() {
294        let mut pr = PendingRequests::new();
295        assert!(pr.is_empty());
296        assert_eq!(pr.len(), 0);
297
298        // Insert
299        pr.insert(5, 0, 16384);
300        pr.insert(5, 16384, 16384);
301        pr.insert(10, 0, 16384);
302        assert_eq!(pr.len(), 3);
303        assert!(pr.contains(5, 0));
304        assert!(pr.contains(10, 0));
305        assert!(!pr.contains(99, 0));
306
307        // Remove existing
308        assert_eq!(pr.remove(5, 0), Some(16384));
309        assert_eq!(pr.len(), 2);
310        assert!(!pr.contains(5, 0));
311
312        // Remove non-existent
313        assert_eq!(pr.remove(99, 0), None);
314
315        // Duplicate insert overwrites
316        pr.insert(5, 16384, 8192);
317        assert_eq!(pr.len(), 2); // same key, count unchanged
318        assert_eq!(pr.remove(5, 16384), Some(8192)); // new value
319
320        // Clear
321        pr.insert(1, 0, 16384);
322        pr.clear();
323        assert!(pr.is_empty());
324
325        // Iter
326        pr.insert(3, 0, 16384);
327        pr.insert(3, 16384, 16384);
328        let mut items: Vec<_> = pr.iter().collect();
329        items.sort_unstable();
330        assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
331    }
332
333    #[test]
334    fn peer_source_i2p_serialization() {
335        let source = PeerSource::I2p;
336        let json = serde_json::to_string(&source).unwrap();
337        assert_eq!(json, "\"I2p\"");
338        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
339        assert_eq!(roundtrip, PeerSource::I2p);
340    }
341
342    // ── M132: in_flight counter always available at construction ──
343
344    #[test]
345    fn in_flight_zero_at_construction() {
346        let (tx, _rx) = tokio::sync::mpsc::channel(1);
347        let peer = PeerState::new(
348            "127.0.0.1:6881".parse().unwrap(),
349            100,
350            tx,
351            PeerSource::Tracker,
352            Arc::new(AtomicU32::new(0)),
353            Arc::new(AtomicU32::new(128)),
354            Arc::new(tokio::sync::Notify::new()),
355        );
356        assert_eq!(
357            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
358            0,
359            "in_flight should be zero at construction"
360        );
361    }
362
363    // ── Per-peer unchoke duration accumulator ─────────────────────────
364
365    fn make_peer_state(addr_str: &str) -> PeerState {
366        let (tx, _rx) = tokio::sync::mpsc::channel(1);
367        PeerState::new(
368            addr_str.parse().unwrap(),
369            100,
370            tx,
371            PeerSource::Tracker,
372            Arc::new(AtomicU32::new(0)),
373            Arc::new(AtomicU32::new(128)),
374            Arc::new(tokio::sync::Notify::new()),
375        )
376    }
377
378    #[test]
379    fn unchoke_duration_starts_zero_with_active_window() {
380        let peer = make_peer_state("127.0.0.1:6881");
381        assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
382        assert!(
383            peer.am_unchoke_started_at.is_some(),
384            "M107 starts peers unchoked, so the unchoke window opens at construction"
385        );
386    }
387
388    #[test]
389    fn unchoke_choke_unchoke_choke_accumulates() {
390        // Simulate the run_choker transitions directly on the accumulator.
391        let mut peer = make_peer_state("127.0.0.1:6881");
392        // Force a known starting point so we're not measuring construction time.
393        let t0 = std::time::Instant::now();
394        peer.am_unchoke_started_at = Some(t0);
395        peer.unchoke_duration_total = std::time::Duration::ZERO;
396
397        // First choke after ~50 ms unchoked.
398        std::thread::sleep(std::time::Duration::from_millis(50));
399        if let Some(start) = peer.am_unchoke_started_at.take() {
400            peer.unchoke_duration_total += start.elapsed();
401        }
402        assert!(peer.am_unchoke_started_at.is_none());
403        let after_first_choke = peer.unchoke_duration_total;
404        assert!(after_first_choke >= std::time::Duration::from_millis(40));
405
406        // Re-unchoke and run for ~50 ms more.
407        peer.am_unchoke_started_at = Some(std::time::Instant::now());
408        std::thread::sleep(std::time::Duration::from_millis(50));
409        if let Some(start) = peer.am_unchoke_started_at.take() {
410            peer.unchoke_duration_total += start.elapsed();
411        }
412        assert!(
413            peer.unchoke_duration_total > after_first_choke,
414            "second window must extend the accumulator"
415        );
416        assert!(
417            peer.unchoke_duration_total >= std::time::Duration::from_millis(80),
418            "two ~50 ms windows must add to ≥80 ms — got {:?}",
419            peer.unchoke_duration_total
420        );
421    }
422
423    #[test]
424    fn pure_choked_peer_has_zero_total() {
425        let mut peer = make_peer_state("127.0.0.1:6881");
426        // Cancel the construction-time auto-open so we model "never unchoked"
427        // (peer connected, choker chose never to flip am_choking false).
428        peer.am_unchoke_started_at = None;
429        std::thread::sleep(std::time::Duration::from_millis(10));
430        // No transition happens; total stays zero.
431        assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
432    }
433
434    #[test]
435    fn build_peer_info_reads_in_flight() {
436        // Verify that in_flight is read directly from the shared atomic counter.
437        let counter = Arc::new(AtomicU32::new(42));
438        let (tx, _rx) = tokio::sync::mpsc::channel(1);
439        let peer = PeerState::new(
440            "127.0.0.1:6881".parse().unwrap(),
441            100,
442            tx,
443            PeerSource::Tracker,
444            Arc::clone(&counter),
445            Arc::new(AtomicU32::new(128)),
446            Arc::new(tokio::sync::Notify::new()),
447        );
448
449        // Simulate the build_peer_info logic — direct load, no Option.
450        let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
451
452        assert_eq!(
453            num_pending, 42,
454            "num_pending_requests should read from in_flight atomic"
455        );
456
457        // Mutate via the external Arc clone — PeerState sees the update.
458        counter.store(99, std::sync::atomic::Ordering::Relaxed);
459        let num_pending_updated =
460            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
461        assert_eq!(
462            num_pending_updated, 99,
463            "PeerState should see updates via shared Arc"
464        );
465    }
466}