irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;

/// EWMA smoothing factor for RTT tracking (M106).
pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;

/// Compute one step of an exponential weighted moving average update.
///
/// `current` is the existing average, `sample` is the new observation, and
/// `alpha` controls responsiveness (higher = more weight to recent samples).
/// Returns the updated average.
pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
    alpha * sample + (1.0 - alpha) * current
}

use rustc_hash::{FxBuildHasher, FxHashMap};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

use irontide_storage::Bitfield;
use irontide_wire::ExtHandshake;

use crate::pipeline::PeerPipelineState;
use crate::types::PeerCommand;

/// O(1) lookup container for outstanding block requests to a peer.
///
/// Keyed on `(piece_index, block_offset)`, value is `block_length`.
/// Replaces `Vec<(u32, u32, u32)>` which required O(n) linear scan.
#[derive(Debug, Clone)]
pub(crate) struct PendingRequests {
    inner: FxHashMap<(u32, u32), u32>,
}

#[allow(dead_code)]
impl PendingRequests {
    pub fn new() -> Self {
        Self {
            inner: FxHashMap::with_capacity_and_hasher(32, FxBuildHasher),
        }
    }

    pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
        self.inner.insert((index, begin), length);
    }

    pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
        self.inner.remove(&(index, begin))
    }

    pub fn contains(&self, index: u32, begin: u32) -> bool {
        self.inner.contains_key(&(index, begin))
    }

    pub fn len(&self) -> usize {
        self.inner.len()
    }

    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }

    pub fn clear(&mut self) {
        self.inner.clear();
    }

    pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
        self.inner
            .iter()
            .map(|(&(index, begin), &length)| (index, begin, length))
    }
}

/// Origin of a peer address.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum PeerSource {
    /// Returned by a tracker announce.
    Tracker,
    /// Discovered via the DHT.
    Dht,
    /// Received via Peer Exchange (BEP 11).
    Pex,
    /// Found via Local Service Discovery (BEP 14).
    Lsd,
    /// Connected to us (incoming connection).
    Incoming,
    /// Loaded from saved resume data.
    ResumeData,
    /// Discovered via I2P SAM bridge.
    I2p,
    /// Injected via the HTTP API.
    Api,
}

/// Per-peer state tracked by the torrent actor.
#[allow(dead_code)] // consumed by torrent module (not yet implemented)
pub(crate) struct PeerState {
    pub addr: SocketAddr,
    /// Peer is choking us (default: true).
    pub peer_choking: bool,
    /// Peer is interested in us (default: false).
    pub peer_interested: bool,
    /// We are choking the peer (default: true).
    pub am_choking: bool,
    /// We are interested in the peer (default: false).
    pub am_interested: bool,
    /// What pieces the peer has.
    pub bitfield: Bitfield,
    /// Download rate in bytes/sec (for choker).
    pub download_rate: u64,
    /// Upload rate in bytes/sec.
    pub upload_rate: u64,
    /// Bytes downloaded from this peer in current rate window.
    pub download_bytes_window: u64,
    /// Bytes uploaded to this peer in current rate window.
    pub upload_bytes_window: u64,
    /// Outstanding requests to this peer (index, begin, length).
    pub pending_requests: PendingRequests,
    /// Requests from this peer to us (index, begin, length).
    pub incoming_requests: Vec<(u32, u32, u32)>,
    /// Peer's extension handshake, if received.
    pub ext_handshake: Option<ExtHandshake>,
    /// Whether the peer supports BEP 6 Fast Extension.
    pub supports_fast: bool,
    /// Set of piece indices the peer is allowed to request while choked.
    pub allowed_fast: HashSet<u32>,
    /// BEP 21: peer declared upload-only status.
    pub upload_only: bool,
    /// BEP 16: piece index we revealed to this peer in super-seed mode.
    pub super_seed_assigned: Option<u32>,
    /// Channel to send commands to this peer's task.
    pub cmd_tx: mpsc::Sender<PeerCommand>,
    /// Per-peer dynamic request queue sizing (M28).
    pub pipeline: PeerPipelineState,
    /// Whether this peer is snubbed (no data for `snub_timeout_secs`).
    pub snubbed: bool,
    /// When this peer last unchoked us. Used for time-windowed rotation protection:
    /// peers unchoked within 30s are protected from choke rotation.
    pub last_unchoked_at: Option<std::time::Instant>,
    /// Last time we received data from this peer.
    pub last_data_received: Option<std::time::Instant>,
    /// When this peer connection was established.
    pub connected_at: std::time::Instant,
    /// BEP 6: pieces suggested by this peer.
    pub suggested_pieces: HashSet<u32>,
    /// How this peer was discovered.
    pub source: PeerSource,
    /// BEP 55: peer advertised `ut_holepunch` support in their extension handshake.
    pub supports_holepunch: bool,
    /// Whether this peer appears to be `NATed` (no incoming connections observed).
    pub appears_nated: bool,
    /// M174: Whether the connection is encrypted via MSE/PE (RC4).
    pub is_encrypted: bool,
    /// Transport protocol used for this peer connection.
    pub transport: Option<crate::rate_limiter::PeerTransport>,
    /// Number of successfully received blocks.
    pub blocks_completed: u64,
    /// Number of blocks that timed out.
    pub blocks_timed_out: u64,
    /// Exponentially weighted moving average of RTT in seconds.
    pub avg_rtt: Option<f64>,
    /// Shared atomic in-flight request counter from `PeerShared`.
    /// Created at the spawn site and shared with the peer task — always available.
    pub in_flight: Arc<AtomicU32>,
    /// M149: Dynamic per-peer pipeline depth target. Updated every 10s in
    /// `update_peer_rates()` based on download throughput. The requester task
    /// checks this before acquiring a semaphore permit.
    pub target_depth: Arc<AtomicU32>,
    /// M147: When the remote peer started choking us. Set on `PeerChoking { choking: true }`,
    /// cleared on `PeerChoking { choking: false }`. Used for eviction scoring.
    pub choked_since: Option<std::time::Instant>,
    /// M147: When this peer transitioned to Live (completed BT handshake).
    /// Used for 10-second grace period protection during eviction.
    pub live_since: Option<std::time::Instant>,
    /// M149: Cumulative bytes downloaded from this peer over the entire connection
    /// lifetime. Unlike `download_bytes_window` (reset every rate-calculation
    /// interval), this counter is monotonically increasing and never reset.
    /// Used by Pass 0 eviction: a peer with `download_bytes_total == 0` has
    /// never sent us any data.
    pub download_bytes_total: u64,
    /// M182: cross-actor wake for the reader's `event_tx`
    /// `BackpressureQueue`. `TorrentActor`'s `event_rx` drain loop pings
    /// this after every consumed event for this peer — the reader's
    /// outer `select!` waits on it via `Notified` to retry stalled
    /// sends. Same `Arc` as `PeerShared.event_drain_notify`.
    pub event_drain_notify: Arc<tokio::sync::Notify>,
    /// Cumulative time we had this peer unchoked (we = our session sending
    /// `Unchoke` to them). Updated at choke→unchoke / unchoke→choke
    /// transitions in `run_choker`. Flushed into `TorrentActor.unchoke_durations`
    /// on peer disconnect so the per-(SocketAddr × torrent) total survives
    /// reconnects.
    pub unchoke_duration_total: std::time::Duration,
    /// `Some(t)` when we are currently NOT choking this peer (we sent
    /// `Unchoke`); `None` while we are choking. The transition sites in
    /// `run_choker` swap this in/out of `Some` and accumulate the delta
    /// into `unchoke_duration_total` on every unchoke→choke flip.
    pub am_unchoke_started_at: Option<std::time::Instant>,
}

#[allow(dead_code)]
impl PeerState {
    pub fn new(
        addr: SocketAddr,
        bitfield_len: u32,
        cmd_tx: mpsc::Sender<PeerCommand>,
        source: PeerSource,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<tokio::sync::Notify>,
    ) -> Self {
        Self {
            addr,
            peer_choking: true,
            peer_interested: false,
            am_choking: false, // M107: unconditional Unchoke is sent on connect
            am_interested: false,
            bitfield: Bitfield::new(bitfield_len),
            download_rate: 0,
            upload_rate: 0,
            download_bytes_window: 0,
            upload_bytes_window: 0,
            pending_requests: PendingRequests::new(),
            incoming_requests: Vec::with_capacity(32),
            ext_handshake: None,
            supports_fast: false,
            allowed_fast: HashSet::new(),
            upload_only: false,
            super_seed_assigned: None,
            cmd_tx,
            pipeline: PeerPipelineState::new(),
            snubbed: false,
            last_unchoked_at: None,
            last_data_received: None,
            connected_at: std::time::Instant::now(),
            suggested_pieces: HashSet::new(),
            source,
            supports_holepunch: false,
            appears_nated: false,
            is_encrypted: false,
            transport: None,
            blocks_completed: 0,
            blocks_timed_out: 0,
            avg_rtt: None,
            in_flight,
            target_depth,
            // M147: Peer starts choked (peer_choking: true), so choked_since reflects that.
            choked_since: Some(std::time::Instant::now()),
            live_since: None,
            download_bytes_total: 0,
            event_drain_notify,
            unchoke_duration_total: std::time::Duration::ZERO,
            // M107: peers start with `am_choking: false` (unconditional
            // unchoke on connect), so the unchoke window starts at
            // construction time. Choker transitions in `run_choker` will
            // toggle this on each subsequent flip.
            am_unchoke_started_at: Some(std::time::Instant::now()),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn peer_source_serialization() {
        let source = PeerSource::Tracker;
        let json = serde_json::to_string(&source).unwrap();
        assert_eq!(json, "\"Tracker\"");
        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
        assert_eq!(roundtrip, PeerSource::Tracker);
    }

    #[test]
    fn peer_source_all_variants() {
        let variants = [
            PeerSource::Tracker,
            PeerSource::Dht,
            PeerSource::Pex,
            PeerSource::Lsd,
            PeerSource::Incoming,
            PeerSource::ResumeData,
            PeerSource::I2p,
            PeerSource::Api,
        ];
        for source in variants {
            let json = serde_json::to_string(&source).unwrap();
            let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
            assert_eq!(roundtrip, source);
        }
    }

    #[test]
    fn peer_state_has_connected_at() {
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        let peer = PeerState::new(
            "127.0.0.1:6881".parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(128)),
            Arc::new(tokio::sync::Notify::new()),
        );
        assert!(peer.connected_at.elapsed().as_secs() < 1);
    }

    #[test]
    fn pending_requests_insert_remove() {
        let mut pr = PendingRequests::new();
        assert!(pr.is_empty());
        assert_eq!(pr.len(), 0);

        // Insert
        pr.insert(5, 0, 16384);
        pr.insert(5, 16384, 16384);
        pr.insert(10, 0, 16384);
        assert_eq!(pr.len(), 3);
        assert!(pr.contains(5, 0));
        assert!(pr.contains(10, 0));
        assert!(!pr.contains(99, 0));

        // Remove existing
        assert_eq!(pr.remove(5, 0), Some(16384));
        assert_eq!(pr.len(), 2);
        assert!(!pr.contains(5, 0));

        // Remove non-existent
        assert_eq!(pr.remove(99, 0), None);

        // Duplicate insert overwrites
        pr.insert(5, 16384, 8192);
        assert_eq!(pr.len(), 2); // same key, count unchanged
        assert_eq!(pr.remove(5, 16384), Some(8192)); // new value

        // Clear
        pr.insert(1, 0, 16384);
        pr.clear();
        assert!(pr.is_empty());

        // Iter
        pr.insert(3, 0, 16384);
        pr.insert(3, 16384, 16384);
        let mut items: Vec<_> = pr.iter().collect();
        items.sort_unstable();
        assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
    }

    #[test]
    fn peer_source_i2p_serialization() {
        let source = PeerSource::I2p;
        let json = serde_json::to_string(&source).unwrap();
        assert_eq!(json, "\"I2p\"");
        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
        assert_eq!(roundtrip, PeerSource::I2p);
    }

    // ── M132: in_flight counter always available at construction ──

    #[test]
    fn in_flight_zero_at_construction() {
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        let peer = PeerState::new(
            "127.0.0.1:6881".parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(128)),
            Arc::new(tokio::sync::Notify::new()),
        );
        assert_eq!(
            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
            0,
            "in_flight should be zero at construction"
        );
    }

    // ── Per-peer unchoke duration accumulator ─────────────────────────

    fn make_peer_state(addr_str: &str) -> PeerState {
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        PeerState::new(
            addr_str.parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(128)),
            Arc::new(tokio::sync::Notify::new()),
        )
    }

    #[test]
    fn unchoke_duration_starts_zero_with_active_window() {
        let peer = make_peer_state("127.0.0.1:6881");
        assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
        assert!(
            peer.am_unchoke_started_at.is_some(),
            "M107 starts peers unchoked, so the unchoke window opens at construction"
        );
    }

    #[test]
    fn unchoke_choke_unchoke_choke_accumulates() {
        // Simulate the run_choker transitions directly on the accumulator.
        let mut peer = make_peer_state("127.0.0.1:6881");
        // Force a known starting point so we're not measuring construction time.
        let t0 = std::time::Instant::now();
        peer.am_unchoke_started_at = Some(t0);
        peer.unchoke_duration_total = std::time::Duration::ZERO;

        // First choke after ~50 ms unchoked.
        std::thread::sleep(std::time::Duration::from_millis(50));
        if let Some(start) = peer.am_unchoke_started_at.take() {
            peer.unchoke_duration_total += start.elapsed();
        }
        assert!(peer.am_unchoke_started_at.is_none());
        let after_first_choke = peer.unchoke_duration_total;
        assert!(after_first_choke >= std::time::Duration::from_millis(40));

        // Re-unchoke and run for ~50 ms more.
        peer.am_unchoke_started_at = Some(std::time::Instant::now());
        std::thread::sleep(std::time::Duration::from_millis(50));
        if let Some(start) = peer.am_unchoke_started_at.take() {
            peer.unchoke_duration_total += start.elapsed();
        }
        assert!(
            peer.unchoke_duration_total > after_first_choke,
            "second window must extend the accumulator"
        );
        assert!(
            peer.unchoke_duration_total >= std::time::Duration::from_millis(80),
            "two ~50 ms windows must add to ≥80 ms — got {:?}",
            peer.unchoke_duration_total
        );
    }

    #[test]
    fn pure_choked_peer_has_zero_total() {
        let mut peer = make_peer_state("127.0.0.1:6881");
        // Cancel the construction-time auto-open so we model "never unchoked"
        // (peer connected, choker chose never to flip am_choking false).
        peer.am_unchoke_started_at = None;
        std::thread::sleep(std::time::Duration::from_millis(10));
        // No transition happens; total stays zero.
        assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
    }

    #[test]
    fn build_peer_info_reads_in_flight() {
        // Verify that in_flight is read directly from the shared atomic counter.
        let counter = Arc::new(AtomicU32::new(42));
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        let peer = PeerState::new(
            "127.0.0.1:6881".parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::clone(&counter),
            Arc::new(AtomicU32::new(128)),
            Arc::new(tokio::sync::Notify::new()),
        );

        // Simulate the build_peer_info logic — direct load, no Option.
        let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;

        assert_eq!(
            num_pending, 42,
            "num_pending_requests should read from in_flight atomic"
        );

        // Mutate via the external Arc clone — PeerState sees the update.
        counter.store(99, std::sync::atomic::Ordering::Relaxed);
        let num_pending_updated =
            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
        assert_eq!(
            num_pending_updated, 99,
            "PeerState should see updates via shared Arc"
        );
    }
}