Skip to main content

irontide_session/
peer_state.rs

1use std::collections::HashSet;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6/// EWMA smoothing factor for RTT tracking (M106).
7pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;
8
9/// Compute one step of an exponential weighted moving average update.
10///
11/// `current` is the existing average, `sample` is the new observation, and
12/// `alpha` controls responsiveness (higher = more weight to recent samples).
13/// Returns the updated average.
14pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
15    alpha * sample + (1.0 - alpha) * current
16}
17
18use rustc_hash::FxHashMap;
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc;
21
22use irontide_storage::Bitfield;
23use irontide_wire::ExtHandshake;
24
25use crate::pipeline::PeerPipelineState;
26use crate::types::PeerCommand;
27
28/// O(1) lookup container for outstanding block requests to a peer.
29///
30/// Keyed on `(piece_index, block_offset)`, value is `block_length`.
31/// Replaces `Vec<(u32, u32, u32)>` which required O(n) linear scan.
32#[derive(Debug, Clone)]
33pub(crate) struct PendingRequests {
34    inner: FxHashMap<(u32, u32), u32>,
35}
36
37#[allow(dead_code)]
38impl PendingRequests {
39    pub fn new() -> Self {
40        Self {
41            inner: FxHashMap::with_capacity_and_hasher(32, Default::default()),
42        }
43    }
44
45    pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
46        self.inner.insert((index, begin), length);
47    }
48
49    pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
50        self.inner.remove(&(index, begin))
51    }
52
53    pub fn contains(&self, index: u32, begin: u32) -> bool {
54        self.inner.contains_key(&(index, begin))
55    }
56
57    pub fn len(&self) -> usize {
58        self.inner.len()
59    }
60
61    pub fn is_empty(&self) -> bool {
62        self.inner.is_empty()
63    }
64
65    pub fn clear(&mut self) {
66        self.inner.clear();
67    }
68
69    pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
70        self.inner
71            .iter()
72            .map(|(&(index, begin), &length)| (index, begin, length))
73    }
74}
75
76/// Origin of a peer address.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
78pub enum PeerSource {
79    /// Returned by a tracker announce.
80    Tracker,
81    /// Discovered via the DHT.
82    Dht,
83    /// Received via Peer Exchange (BEP 11).
84    Pex,
85    /// Found via Local Service Discovery (BEP 14).
86    Lsd,
87    /// Connected to us (incoming connection).
88    Incoming,
89    /// Loaded from saved resume data.
90    ResumeData,
91    /// Discovered via I2P SAM bridge.
92    I2p,
93}
94
95/// Per-peer state tracked by the torrent actor.
96#[allow(dead_code)] // consumed by torrent module (not yet implemented)
97pub(crate) struct PeerState {
98    pub addr: SocketAddr,
99    /// Peer is choking us (default: true).
100    pub peer_choking: bool,
101    /// Peer is interested in us (default: false).
102    pub peer_interested: bool,
103    /// We are choking the peer (default: true).
104    pub am_choking: bool,
105    /// We are interested in the peer (default: false).
106    pub am_interested: bool,
107    /// What pieces the peer has.
108    pub bitfield: Bitfield,
109    /// Download rate in bytes/sec (for choker).
110    pub download_rate: u64,
111    /// Upload rate in bytes/sec.
112    pub upload_rate: u64,
113    /// Bytes downloaded from this peer in current rate window.
114    pub download_bytes_window: u64,
115    /// Bytes uploaded to this peer in current rate window.
116    pub upload_bytes_window: u64,
117    /// Outstanding requests to this peer (index, begin, length).
118    pub pending_requests: PendingRequests,
119    /// Requests from this peer to us (index, begin, length).
120    pub incoming_requests: Vec<(u32, u32, u32)>,
121    /// Peer's extension handshake, if received.
122    pub ext_handshake: Option<ExtHandshake>,
123    /// Whether the peer supports BEP 6 Fast Extension.
124    pub supports_fast: bool,
125    /// Set of piece indices the peer is allowed to request while choked.
126    pub allowed_fast: HashSet<u32>,
127    /// BEP 21: peer declared upload-only status.
128    pub upload_only: bool,
129    /// BEP 16: piece index we revealed to this peer in super-seed mode.
130    pub super_seed_assigned: Option<u32>,
131    /// Channel to send commands to this peer's task.
132    pub cmd_tx: mpsc::Sender<PeerCommand>,
133    /// Per-peer dynamic request queue sizing (M28).
134    pub pipeline: PeerPipelineState,
135    /// Whether this peer is snubbed (no data for snub_timeout_secs).
136    pub snubbed: bool,
137    /// When this peer last unchoked us. Used for time-windowed rotation protection:
138    /// peers unchoked within 30s are protected from choke rotation.
139    pub last_unchoked_at: Option<std::time::Instant>,
140    /// Last time we received data from this peer.
141    pub last_data_received: Option<std::time::Instant>,
142    /// When this peer connection was established.
143    pub connected_at: std::time::Instant,
144    /// BEP 6: pieces suggested by this peer.
145    pub suggested_pieces: HashSet<u32>,
146    /// How this peer was discovered.
147    pub source: PeerSource,
148    /// BEP 55: peer advertised `ut_holepunch` support in their extension handshake.
149    pub supports_holepunch: bool,
150    /// Whether this peer appears to be NATed (no incoming connections observed).
151    pub appears_nated: bool,
152    /// Transport protocol used for this peer connection.
153    pub transport: Option<crate::rate_limiter::PeerTransport>,
154    /// Number of successfully received blocks.
155    pub blocks_completed: u64,
156    /// Number of blocks that timed out.
157    pub blocks_timed_out: u64,
158    /// Exponentially weighted moving average of RTT in seconds.
159    pub avg_rtt: Option<f64>,
160    /// Shared atomic in-flight request counter from `PeerShared`.
161    /// Created at the spawn site and shared with the peer task — always available.
162    pub in_flight: Arc<AtomicU32>,
163    /// M149: Dynamic per-peer pipeline depth target. Updated every 10s in
164    /// `update_peer_rates()` based on download throughput. The requester task
165    /// checks this before acquiring a semaphore permit.
166    pub target_depth: Arc<AtomicU32>,
167    /// M147: When the remote peer started choking us. Set on `PeerChoking { choking: true }`,
168    /// cleared on `PeerChoking { choking: false }`. Used for eviction scoring.
169    pub choked_since: Option<std::time::Instant>,
170    /// M147: When this peer transitioned to Live (completed BT handshake).
171    /// Used for 10-second grace period protection during eviction.
172    pub live_since: Option<std::time::Instant>,
173    /// M149: Cumulative bytes downloaded from this peer over the entire connection
174    /// lifetime. Unlike `download_bytes_window` (reset every rate-calculation
175    /// interval), this counter is monotonically increasing and never reset.
176    /// Used by Pass 0 eviction: a peer with `download_bytes_total == 0` has
177    /// never sent us any data.
178    pub download_bytes_total: u64,
179}
180
181#[allow(dead_code)]
182impl PeerState {
183    pub fn new(
184        addr: SocketAddr,
185        bitfield_len: u32,
186        cmd_tx: mpsc::Sender<PeerCommand>,
187        source: PeerSource,
188        in_flight: Arc<AtomicU32>,
189        target_depth: Arc<AtomicU32>,
190    ) -> Self {
191        Self {
192            addr,
193            peer_choking: true,
194            peer_interested: false,
195            am_choking: false, // M107: unconditional Unchoke is sent on connect
196            am_interested: false,
197            bitfield: Bitfield::new(bitfield_len),
198            download_rate: 0,
199            upload_rate: 0,
200            download_bytes_window: 0,
201            upload_bytes_window: 0,
202            pending_requests: PendingRequests::new(),
203            incoming_requests: Vec::with_capacity(32),
204            ext_handshake: None,
205            supports_fast: false,
206            allowed_fast: HashSet::new(),
207            upload_only: false,
208            super_seed_assigned: None,
209            cmd_tx,
210            pipeline: PeerPipelineState::new(),
211            snubbed: false,
212            last_unchoked_at: None,
213            last_data_received: None,
214            connected_at: std::time::Instant::now(),
215            suggested_pieces: HashSet::new(),
216            source,
217            supports_holepunch: false,
218            appears_nated: false,
219            transport: None,
220            blocks_completed: 0,
221            blocks_timed_out: 0,
222            avg_rtt: None,
223            in_flight,
224            target_depth,
225            // M147: Peer starts choked (peer_choking: true), so choked_since reflects that.
226            choked_since: Some(std::time::Instant::now()),
227            live_since: None,
228            download_bytes_total: 0,
229        }
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn peer_source_serialization() {
239        let source = PeerSource::Tracker;
240        let json = serde_json::to_string(&source).unwrap();
241        assert_eq!(json, "\"Tracker\"");
242        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
243        assert_eq!(roundtrip, PeerSource::Tracker);
244    }
245
246    #[test]
247    fn peer_source_all_variants() {
248        let variants = [
249            PeerSource::Tracker,
250            PeerSource::Dht,
251            PeerSource::Pex,
252            PeerSource::Lsd,
253            PeerSource::Incoming,
254            PeerSource::ResumeData,
255            PeerSource::I2p,
256        ];
257        for source in variants {
258            let json = serde_json::to_string(&source).unwrap();
259            let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
260            assert_eq!(roundtrip, source);
261        }
262    }
263
264    #[test]
265    fn peer_state_has_connected_at() {
266        let (tx, _rx) = tokio::sync::mpsc::channel(1);
267        let peer = PeerState::new(
268            "127.0.0.1:6881".parse().unwrap(),
269            100,
270            tx,
271            PeerSource::Tracker,
272            Arc::new(AtomicU32::new(0)),
273            Arc::new(AtomicU32::new(128)),
274        );
275        assert!(peer.connected_at.elapsed().as_secs() < 1);
276    }
277
278    #[test]
279    fn pending_requests_insert_remove() {
280        let mut pr = PendingRequests::new();
281        assert!(pr.is_empty());
282        assert_eq!(pr.len(), 0);
283
284        // Insert
285        pr.insert(5, 0, 16384);
286        pr.insert(5, 16384, 16384);
287        pr.insert(10, 0, 16384);
288        assert_eq!(pr.len(), 3);
289        assert!(pr.contains(5, 0));
290        assert!(pr.contains(10, 0));
291        assert!(!pr.contains(99, 0));
292
293        // Remove existing
294        assert_eq!(pr.remove(5, 0), Some(16384));
295        assert_eq!(pr.len(), 2);
296        assert!(!pr.contains(5, 0));
297
298        // Remove non-existent
299        assert_eq!(pr.remove(99, 0), None);
300
301        // Duplicate insert overwrites
302        pr.insert(5, 16384, 8192);
303        assert_eq!(pr.len(), 2); // same key, count unchanged
304        assert_eq!(pr.remove(5, 16384), Some(8192)); // new value
305
306        // Clear
307        pr.insert(1, 0, 16384);
308        pr.clear();
309        assert!(pr.is_empty());
310
311        // Iter
312        pr.insert(3, 0, 16384);
313        pr.insert(3, 16384, 16384);
314        let mut items: Vec<_> = pr.iter().collect();
315        items.sort();
316        assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
317    }
318
319    #[test]
320    fn peer_source_i2p_serialization() {
321        let source = PeerSource::I2p;
322        let json = serde_json::to_string(&source).unwrap();
323        assert_eq!(json, "\"I2p\"");
324        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
325        assert_eq!(roundtrip, PeerSource::I2p);
326    }
327
328    // ── M132: in_flight counter always available at construction ──
329
330    #[test]
331    fn in_flight_zero_at_construction() {
332        let (tx, _rx) = tokio::sync::mpsc::channel(1);
333        let peer = PeerState::new(
334            "127.0.0.1:6881".parse().unwrap(),
335            100,
336            tx,
337            PeerSource::Tracker,
338            Arc::new(AtomicU32::new(0)),
339            Arc::new(AtomicU32::new(128)),
340        );
341        assert_eq!(
342            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
343            0,
344            "in_flight should be zero at construction"
345        );
346    }
347
348    #[test]
349    fn build_peer_info_reads_in_flight() {
350        // Verify that in_flight is read directly from the shared atomic counter.
351        let counter = Arc::new(AtomicU32::new(42));
352        let (tx, _rx) = tokio::sync::mpsc::channel(1);
353        let peer = PeerState::new(
354            "127.0.0.1:6881".parse().unwrap(),
355            100,
356            tx,
357            PeerSource::Tracker,
358            Arc::clone(&counter),
359            Arc::new(AtomicU32::new(128)),
360        );
361
362        // Simulate the build_peer_info logic — direct load, no Option.
363        let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
364
365        assert_eq!(
366            num_pending, 42,
367            "num_pending_requests should read from in_flight atomic"
368        );
369
370        // Mutate via the external Arc clone — PeerState sees the update.
371        counter.store(99, std::sync::atomic::Ordering::Relaxed);
372        let num_pending_updated =
373            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
374        assert_eq!(
375            num_pending_updated, 99,
376            "PeerState should see updates via shared Arc"
377        );
378    }
379}