1use std::collections::HashSet;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;
8
9pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
15 alpha * sample + (1.0 - alpha) * current
16}
17
18use rustc_hash::{FxBuildHasher, FxHashMap};
19use tokio::sync::mpsc;
20
21use irontide_storage::Bitfield;
22use irontide_wire::ExtHandshake;
23
24use crate::pipeline::PeerPipelineState;
25use crate::types::PeerCommand;
26
27#[derive(Debug, Clone)]
32pub(crate) struct PendingRequests {
33 inner: FxHashMap<(u32, u32), u32>,
34}
35
36#[allow(dead_code)]
37impl PendingRequests {
38 pub fn new() -> Self {
39 Self {
40 inner: FxHashMap::with_capacity_and_hasher(32, FxBuildHasher),
41 }
42 }
43
44 pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
45 self.inner.insert((index, begin), length);
46 }
47
48 pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
49 self.inner.remove(&(index, begin))
50 }
51
52 pub fn contains(&self, index: u32, begin: u32) -> bool {
53 self.inner.contains_key(&(index, begin))
54 }
55
56 pub fn len(&self) -> usize {
57 self.inner.len()
58 }
59
60 pub fn is_empty(&self) -> bool {
61 self.inner.is_empty()
62 }
63
64 pub fn clear(&mut self) {
65 self.inner.clear();
66 }
67
68 pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
69 self.inner
70 .iter()
71 .map(|(&(index, begin), &length)| (index, begin, length))
72 }
73}
74
75pub use irontide_session_types::PeerSource;
78
79#[allow(dead_code)] pub(crate) struct PeerState {
82 pub addr: SocketAddr,
83 pub peer_choking: bool,
85 pub peer_interested: bool,
87 pub am_choking: bool,
89 pub am_interested: bool,
91 pub bitfield: Bitfield,
93 pub download_rate: u64,
95 pub upload_rate: u64,
97 pub download_bytes_window: u64,
99 pub upload_bytes_window: u64,
101 pub pending_requests: PendingRequests,
103 pub incoming_requests: Vec<(u32, u32, u32)>,
105 pub ext_handshake: Option<ExtHandshake>,
107 pub supports_fast: bool,
109 pub allowed_fast: HashSet<u32>,
111 pub upload_only: bool,
113 pub super_seed_assigned: Option<u32>,
115 pub cmd_tx: mpsc::Sender<PeerCommand>,
117 pub pipeline: PeerPipelineState,
119 pub snubbed: bool,
121 pub last_unchoked_at: Option<std::time::Instant>,
124 pub last_data_received: Option<std::time::Instant>,
126 pub connected_at: std::time::Instant,
128 pub suggested_pieces: HashSet<u32>,
130 pub source: PeerSource,
132 pub supports_holepunch: bool,
134 pub appears_nated: bool,
136 pub is_encrypted: bool,
138 pub transport: Option<crate::rate_limiter::PeerTransport>,
140 pub blocks_completed: u64,
142 pub blocks_timed_out: u64,
144 pub avg_rtt: Option<f64>,
146 pub in_flight: Arc<AtomicU32>,
149 pub target_depth: Arc<AtomicU32>,
153 pub choked_since: Option<std::time::Instant>,
156 pub live_since: Option<std::time::Instant>,
159 pub download_bytes_total: u64,
165 pub event_drain_notify: Arc<tokio::sync::Notify>,
171 pub unchoke_duration_total: std::time::Duration,
177 pub am_unchoke_started_at: Option<std::time::Instant>,
182}
183
184#[allow(dead_code)]
185impl PeerState {
186 pub fn new(
187 addr: SocketAddr,
188 bitfield_len: u32,
189 cmd_tx: mpsc::Sender<PeerCommand>,
190 source: PeerSource,
191 in_flight: Arc<AtomicU32>,
192 target_depth: Arc<AtomicU32>,
193 event_drain_notify: Arc<tokio::sync::Notify>,
194 ) -> Self {
195 Self {
196 addr,
197 peer_choking: true,
198 peer_interested: false,
199 am_choking: false, am_interested: false,
201 bitfield: Bitfield::new(bitfield_len),
202 download_rate: 0,
203 upload_rate: 0,
204 download_bytes_window: 0,
205 upload_bytes_window: 0,
206 pending_requests: PendingRequests::new(),
207 incoming_requests: Vec::with_capacity(32),
208 ext_handshake: None,
209 supports_fast: false,
210 allowed_fast: HashSet::new(),
211 upload_only: false,
212 super_seed_assigned: None,
213 cmd_tx,
214 pipeline: PeerPipelineState::new(),
215 snubbed: false,
216 last_unchoked_at: None,
217 last_data_received: None,
218 connected_at: std::time::Instant::now(),
219 suggested_pieces: HashSet::new(),
220 source,
221 supports_holepunch: false,
222 appears_nated: false,
223 is_encrypted: false,
224 transport: None,
225 blocks_completed: 0,
226 blocks_timed_out: 0,
227 avg_rtt: None,
228 in_flight,
229 target_depth,
230 choked_since: Some(std::time::Instant::now()),
232 live_since: None,
233 download_bytes_total: 0,
234 event_drain_notify,
235 unchoke_duration_total: std::time::Duration::ZERO,
236 am_unchoke_started_at: Some(std::time::Instant::now()),
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn peer_source_serialization() {
251 let source = PeerSource::Tracker;
252 let json = serde_json::to_string(&source).unwrap();
253 assert_eq!(json, "\"Tracker\"");
254 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
255 assert_eq!(roundtrip, PeerSource::Tracker);
256 }
257
258 #[test]
259 fn peer_source_all_variants() {
260 let variants = [
261 PeerSource::Tracker,
262 PeerSource::Dht,
263 PeerSource::Pex,
264 PeerSource::Lsd,
265 PeerSource::Incoming,
266 PeerSource::ResumeData,
267 PeerSource::I2p,
268 PeerSource::Api,
269 ];
270 for source in variants {
271 let json = serde_json::to_string(&source).unwrap();
272 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
273 assert_eq!(roundtrip, source);
274 }
275 }
276
277 #[test]
278 fn peer_state_has_connected_at() {
279 let (tx, _rx) = tokio::sync::mpsc::channel(1);
280 let peer = PeerState::new(
281 "127.0.0.1:6881".parse().unwrap(),
282 100,
283 tx,
284 PeerSource::Tracker,
285 Arc::new(AtomicU32::new(0)),
286 Arc::new(AtomicU32::new(128)),
287 Arc::new(tokio::sync::Notify::new()),
288 );
289 assert!(peer.connected_at.elapsed().as_secs() < 1);
290 }
291
292 #[test]
293 fn pending_requests_insert_remove() {
294 let mut pr = PendingRequests::new();
295 assert!(pr.is_empty());
296 assert_eq!(pr.len(), 0);
297
298 pr.insert(5, 0, 16384);
300 pr.insert(5, 16384, 16384);
301 pr.insert(10, 0, 16384);
302 assert_eq!(pr.len(), 3);
303 assert!(pr.contains(5, 0));
304 assert!(pr.contains(10, 0));
305 assert!(!pr.contains(99, 0));
306
307 assert_eq!(pr.remove(5, 0), Some(16384));
309 assert_eq!(pr.len(), 2);
310 assert!(!pr.contains(5, 0));
311
312 assert_eq!(pr.remove(99, 0), None);
314
315 pr.insert(5, 16384, 8192);
317 assert_eq!(pr.len(), 2); assert_eq!(pr.remove(5, 16384), Some(8192)); pr.insert(1, 0, 16384);
322 pr.clear();
323 assert!(pr.is_empty());
324
325 pr.insert(3, 0, 16384);
327 pr.insert(3, 16384, 16384);
328 let mut items: Vec<_> = pr.iter().collect();
329 items.sort_unstable();
330 assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
331 }
332
333 #[test]
334 fn peer_source_i2p_serialization() {
335 let source = PeerSource::I2p;
336 let json = serde_json::to_string(&source).unwrap();
337 assert_eq!(json, "\"I2p\"");
338 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
339 assert_eq!(roundtrip, PeerSource::I2p);
340 }
341
342 #[test]
345 fn in_flight_zero_at_construction() {
346 let (tx, _rx) = tokio::sync::mpsc::channel(1);
347 let peer = PeerState::new(
348 "127.0.0.1:6881".parse().unwrap(),
349 100,
350 tx,
351 PeerSource::Tracker,
352 Arc::new(AtomicU32::new(0)),
353 Arc::new(AtomicU32::new(128)),
354 Arc::new(tokio::sync::Notify::new()),
355 );
356 assert_eq!(
357 peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
358 0,
359 "in_flight should be zero at construction"
360 );
361 }
362
363 fn make_peer_state(addr_str: &str) -> PeerState {
366 let (tx, _rx) = tokio::sync::mpsc::channel(1);
367 PeerState::new(
368 addr_str.parse().unwrap(),
369 100,
370 tx,
371 PeerSource::Tracker,
372 Arc::new(AtomicU32::new(0)),
373 Arc::new(AtomicU32::new(128)),
374 Arc::new(tokio::sync::Notify::new()),
375 )
376 }
377
378 #[test]
379 fn unchoke_duration_starts_zero_with_active_window() {
380 let peer = make_peer_state("127.0.0.1:6881");
381 assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
382 assert!(
383 peer.am_unchoke_started_at.is_some(),
384 "M107 starts peers unchoked, so the unchoke window opens at construction"
385 );
386 }
387
388 #[test]
389 fn unchoke_choke_unchoke_choke_accumulates() {
390 let mut peer = make_peer_state("127.0.0.1:6881");
392 let t0 = std::time::Instant::now();
394 peer.am_unchoke_started_at = Some(t0);
395 peer.unchoke_duration_total = std::time::Duration::ZERO;
396
397 std::thread::sleep(std::time::Duration::from_millis(50));
399 if let Some(start) = peer.am_unchoke_started_at.take() {
400 peer.unchoke_duration_total += start.elapsed();
401 }
402 assert!(peer.am_unchoke_started_at.is_none());
403 let after_first_choke = peer.unchoke_duration_total;
404 assert!(after_first_choke >= std::time::Duration::from_millis(40));
405
406 peer.am_unchoke_started_at = Some(std::time::Instant::now());
408 std::thread::sleep(std::time::Duration::from_millis(50));
409 if let Some(start) = peer.am_unchoke_started_at.take() {
410 peer.unchoke_duration_total += start.elapsed();
411 }
412 assert!(
413 peer.unchoke_duration_total > after_first_choke,
414 "second window must extend the accumulator"
415 );
416 assert!(
417 peer.unchoke_duration_total >= std::time::Duration::from_millis(80),
418 "two ~50 ms windows must add to ≥80 ms — got {:?}",
419 peer.unchoke_duration_total
420 );
421 }
422
423 #[test]
424 fn pure_choked_peer_has_zero_total() {
425 let mut peer = make_peer_state("127.0.0.1:6881");
426 peer.am_unchoke_started_at = None;
429 std::thread::sleep(std::time::Duration::from_millis(10));
430 assert_eq!(peer.unchoke_duration_total, std::time::Duration::ZERO);
432 }
433
434 #[test]
435 fn build_peer_info_reads_in_flight() {
436 let counter = Arc::new(AtomicU32::new(42));
438 let (tx, _rx) = tokio::sync::mpsc::channel(1);
439 let peer = PeerState::new(
440 "127.0.0.1:6881".parse().unwrap(),
441 100,
442 tx,
443 PeerSource::Tracker,
444 Arc::clone(&counter),
445 Arc::new(AtomicU32::new(128)),
446 Arc::new(tokio::sync::Notify::new()),
447 );
448
449 let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
451
452 assert_eq!(
453 num_pending, 42,
454 "num_pending_requests should read from in_flight atomic"
455 );
456
457 counter.store(99, std::sync::atomic::Ordering::Relaxed);
459 let num_pending_updated =
460 peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
461 assert_eq!(
462 num_pending_updated, 99,
463 "PeerState should see updates via shared Arc"
464 );
465 }
466}