1use std::collections::HashSet;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;
8
9pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
15 alpha * sample + (1.0 - alpha) * current
16}
17
18use rustc_hash::{FxBuildHasher, FxHashMap};
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc;
21
22use irontide_storage::Bitfield;
23use irontide_wire::ExtHandshake;
24
25use crate::pipeline::PeerPipelineState;
26use crate::types::PeerCommand;
27
28#[derive(Debug, Clone)]
33pub(crate) struct PendingRequests {
34 inner: FxHashMap<(u32, u32), u32>,
35}
36
37#[allow(dead_code)]
38impl PendingRequests {
39 pub fn new() -> Self {
40 Self {
41 inner: FxHashMap::with_capacity_and_hasher(32, FxBuildHasher),
42 }
43 }
44
45 pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
46 self.inner.insert((index, begin), length);
47 }
48
49 pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
50 self.inner.remove(&(index, begin))
51 }
52
53 pub fn contains(&self, index: u32, begin: u32) -> bool {
54 self.inner.contains_key(&(index, begin))
55 }
56
57 pub fn len(&self) -> usize {
58 self.inner.len()
59 }
60
61 pub fn is_empty(&self) -> bool {
62 self.inner.is_empty()
63 }
64
65 pub fn clear(&mut self) {
66 self.inner.clear();
67 }
68
69 pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
70 self.inner
71 .iter()
72 .map(|(&(index, begin), &length)| (index, begin, length))
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
78pub enum PeerSource {
79 Tracker,
81 Dht,
83 Pex,
85 Lsd,
87 Incoming,
89 ResumeData,
91 I2p,
93 Api,
95}
96
97#[allow(dead_code)] pub(crate) struct PeerState {
100 pub addr: SocketAddr,
101 pub peer_choking: bool,
103 pub peer_interested: bool,
105 pub am_choking: bool,
107 pub am_interested: bool,
109 pub bitfield: Bitfield,
111 pub download_rate: u64,
113 pub upload_rate: u64,
115 pub download_bytes_window: u64,
117 pub upload_bytes_window: u64,
119 pub pending_requests: PendingRequests,
121 pub incoming_requests: Vec<(u32, u32, u32)>,
123 pub ext_handshake: Option<ExtHandshake>,
125 pub supports_fast: bool,
127 pub allowed_fast: HashSet<u32>,
129 pub upload_only: bool,
131 pub super_seed_assigned: Option<u32>,
133 pub cmd_tx: mpsc::Sender<PeerCommand>,
135 pub pipeline: PeerPipelineState,
137 pub snubbed: bool,
139 pub last_unchoked_at: Option<std::time::Instant>,
142 pub last_data_received: Option<std::time::Instant>,
144 pub connected_at: std::time::Instant,
146 pub suggested_pieces: HashSet<u32>,
148 pub source: PeerSource,
150 pub supports_holepunch: bool,
152 pub appears_nated: bool,
154 pub is_encrypted: bool,
156 pub transport: Option<crate::rate_limiter::PeerTransport>,
158 pub blocks_completed: u64,
160 pub blocks_timed_out: u64,
162 pub avg_rtt: Option<f64>,
164 pub in_flight: Arc<AtomicU32>,
167 pub target_depth: Arc<AtomicU32>,
171 pub choked_since: Option<std::time::Instant>,
174 pub live_since: Option<std::time::Instant>,
177 pub download_bytes_total: u64,
183 pub event_drain_notify: Arc<tokio::sync::Notify>,
189 pub unchoke_duration_total: std::time::Duration,
195 pub am_unchoke_started_at: Option<std::time::Instant>,
200}
201
202#[allow(dead_code)]
203impl PeerState {
204 pub fn new(
205 addr: SocketAddr,
206 bitfield_len: u32,
207 cmd_tx: mpsc::Sender<PeerCommand>,
208 source: PeerSource,
209 in_flight: Arc<AtomicU32>,
210 target_depth: Arc<AtomicU32>,
211 event_drain_notify: Arc<tokio::sync::Notify>,
212 ) -> Self {
213 Self {
214 addr,
215 peer_choking: true,
216 peer_interested: false,
217 am_choking: false, am_interested: false,
219 bitfield: Bitfield::new(bitfield_len),
220 download_rate: 0,
221 upload_rate: 0,
222 download_bytes_window: 0,
223 upload_bytes_window: 0,
224 pending_requests: PendingRequests::new(),
225 incoming_requests: Vec::with_capacity(32),
226 ext_handshake: None,
227 supports_fast: false,
228 allowed_fast: HashSet::new(),
229 upload_only: false,
230 super_seed_assigned: None,
231 cmd_tx,
232 pipeline: PeerPipelineState::new(),
233 snubbed: false,
234 last_unchoked_at: None,
235 last_data_received: None,
236 connected_at: std::time::Instant::now(),
237 suggested_pieces: HashSet::new(),
238 source,
239 supports_holepunch: false,
240 appears_nated: false,
241 is_encrypted: false,
242 transport: None,
243 blocks_completed: 0,
244 blocks_timed_out: 0,
245 avg_rtt: None,
246 in_flight,
247 target_depth,
248 choked_since: Some(std::time::Instant::now()),
250 live_since: None,
251 download_bytes_total: 0,
252 event_drain_notify,
253 unchoke_duration_total: std::time::Duration::ZERO,
254 am_unchoke_started_at: Some(std::time::Instant::now()),
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn peer_source_serialization() {
269 let source = PeerSource::Tracker;
270 let json = serde_json::to_string(&source).unwrap();
271 assert_eq!(json, "\"Tracker\"");
272 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
273 assert_eq!(roundtrip, PeerSource::Tracker);
274 }
275
276 #[test]
277 fn peer_source_all_variants() {
278 let variants = [
279 PeerSource::Tracker,
280 PeerSource::Dht,
281 PeerSource::Pex,
282 PeerSource::Lsd,
283 PeerSource::Incoming,
284 PeerSource::ResumeData,
285 PeerSource::I2p,
286 PeerSource::Api,
287 ];
288 for source in variants {
289 let json = serde_json::to_string(&source).unwrap();
290 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
291 assert_eq!(roundtrip, source);
292 }
293 }
294
295 #[test]
296 fn peer_state_has_connected_at() {
297 let (tx, _rx) = tokio::sync::mpsc::channel(1);
298 let peer = PeerState::new(
299 "127.0.0.1:6881".parse().unwrap(),
300 100,
301 tx,
302 PeerSource::Tracker,
303 Arc::new(AtomicU32::new(0)),
304 Arc::new(AtomicU32::new(128)),
305 Arc::new(tokio::sync::Notify::new()),
306 );
307 assert!(peer.connected_at.elapsed().as_secs() < 1);
308 }
309
310 #[test]
311 fn pending_requests_insert_remove() {
312 let mut pr = PendingRequests::new();
313 assert!(pr.is_empty());
314 assert_eq!(pr.len(), 0);
315
316 pr.insert(5, 0, 16384);
318 pr.insert(5, 16384, 16384);
319 pr.insert(10, 0, 16384);
320 assert_eq!(pr.len(), 3);
321 assert!(pr.contains(5, 0));
322 assert!(pr.contains(10, 0));
323 assert!(!pr.contains(99, 0));
324
325 assert_eq!(pr.remove(5, 0), Some(16384));
327 assert_eq!(pr.len(), 2);
328 assert!(!pr.contains(5, 0));
329
330 assert_eq!(pr.remove(99, 0), None);
332
333 pr.insert(5, 16384, 8192);
335 assert_eq!(pr.len(), 2); assert_eq!(pr.remove(5, 16384), Some(8192)); pr.insert(1, 0, 16384);
340 pr.clear();
341 assert!(pr.is_empty());
342
343 pr.insert(3, 0, 16384);
345 pr.insert(3, 16384, 16384);
346 let mut items: Vec<_> = pr.iter().collect();
347 items.sort_unstable();
348 assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
349 }
350
351 #[test]
352 fn peer_source_i2p_serialization() {
353 let source = PeerSource::I2p;
354 let json = serde_json::to_string(&source).unwrap();
355 assert_eq!(json, "\"I2p\"");
356 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
357 assert_eq!(roundtrip, PeerSource::I2p);
358 }
359
360 #[test]
363 fn in_flight_zero_at_construction() {
364 let (tx, _rx) = tokio::sync::mpsc::channel(1);
365 let peer = PeerState::new(
366 "127.0.0.1:6881".parse().unwrap(),
367 100,
368 tx,
369 PeerSource::Tracker,
370 Arc::new(AtomicU32::new(0)),
371 Arc::new(AtomicU32::new(128)),
372 Arc::new(tokio::sync::Notify::new()),
373 );
374 assert_eq!(
375 peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
376 0,
377 "in_flight should be zero at construction"
378 );
379 }
380
381 fn make_peer_state(addr_str: &str) -> PeerState {
384 let (tx, _rx) = tokio::sync::mpsc::channel(1);
385 PeerState::new(
386 addr_str.parse().unwrap(),
387 100,
388 tx,
389 PeerSource::Tracker,
390 Arc::new(AtomicU32::new(0)),
391 Arc::new(AtomicU32::new(128)),
392 Arc::new(tokio::sync::Notify::new()),
393 )
394 }
395
396 #[test]
397 fn unchoke_duration_starts_zero_with_active_window() {
398 let peer = make_peer_state("127.0.0.1:6881");
399 assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
400 assert!(
401 peer.am_unchoke_started_at.is_some(),
402 "M107 starts peers unchoked, so the unchoke window opens at construction"
403 );
404 }
405
406 #[test]
407 fn unchoke_choke_unchoke_choke_accumulates() {
408 let mut peer = make_peer_state("127.0.0.1:6881");
410 let t0 = std::time::Instant::now();
412 peer.am_unchoke_started_at = Some(t0);
413 peer.unchoke_duration_total = std::time::Duration::ZERO;
414
415 std::thread::sleep(std::time::Duration::from_millis(50));
417 if let Some(start) = peer.am_unchoke_started_at.take() {
418 peer.unchoke_duration_total += start.elapsed();
419 }
420 assert!(peer.am_unchoke_started_at.is_none());
421 let after_first_choke = peer.unchoke_duration_total;
422 assert!(after_first_choke >= std::time::Duration::from_millis(40));
423
424 peer.am_unchoke_started_at = Some(std::time::Instant::now());
426 std::thread::sleep(std::time::Duration::from_millis(50));
427 if let Some(start) = peer.am_unchoke_started_at.take() {
428 peer.unchoke_duration_total += start.elapsed();
429 }
430 assert!(
431 peer.unchoke_duration_total > after_first_choke,
432 "second window must extend the accumulator"
433 );
434 assert!(
435 peer.unchoke_duration_total >= std::time::Duration::from_millis(80),
436 "two ~50 ms windows must add to ≥80 ms — got {:?}",
437 peer.unchoke_duration_total
438 );
439 }
440
441 #[test]
442 fn pure_choked_peer_has_zero_total() {
443 let mut peer = make_peer_state("127.0.0.1:6881");
444 peer.am_unchoke_started_at = None;
447 std::thread::sleep(std::time::Duration::from_millis(10));
448 assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
450 }
451
452 #[test]
453 fn build_peer_info_reads_in_flight() {
454 let counter = Arc::new(AtomicU32::new(42));
456 let (tx, _rx) = tokio::sync::mpsc::channel(1);
457 let peer = PeerState::new(
458 "127.0.0.1:6881".parse().unwrap(),
459 100,
460 tx,
461 PeerSource::Tracker,
462 Arc::clone(&counter),
463 Arc::new(AtomicU32::new(128)),
464 Arc::new(tokio::sync::Notify::new()),
465 );
466
467 let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
469
470 assert_eq!(
471 num_pending, 42,
472 "num_pending_requests should read from in_flight atomic"
473 );
474
475 counter.store(99, std::sync::atomic::Ordering::Relaxed);
477 let num_pending_updated =
478 peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
479 assert_eq!(
480 num_pending_updated, 99,
481 "PeerState should see updates via shared Arc"
482 );
483 }
484}