1use std::collections::HashSet;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;
8
9pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
15 alpha * sample + (1.0 - alpha) * current
16}
17
18use rustc_hash::FxHashMap;
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc;
21
22use irontide_storage::Bitfield;
23use irontide_wire::ExtHandshake;
24
25use crate::pipeline::PeerPipelineState;
26use crate::types::PeerCommand;
27
28#[derive(Debug, Clone)]
33pub(crate) struct PendingRequests {
34 inner: FxHashMap<(u32, u32), u32>,
35}
36
37#[allow(dead_code)]
38impl PendingRequests {
39 pub fn new() -> Self {
40 Self {
41 inner: FxHashMap::with_capacity_and_hasher(32, Default::default()),
42 }
43 }
44
45 pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
46 self.inner.insert((index, begin), length);
47 }
48
49 pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
50 self.inner.remove(&(index, begin))
51 }
52
53 pub fn contains(&self, index: u32, begin: u32) -> bool {
54 self.inner.contains_key(&(index, begin))
55 }
56
57 pub fn len(&self) -> usize {
58 self.inner.len()
59 }
60
61 pub fn is_empty(&self) -> bool {
62 self.inner.is_empty()
63 }
64
65 pub fn clear(&mut self) {
66 self.inner.clear();
67 }
68
69 pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
70 self.inner
71 .iter()
72 .map(|(&(index, begin), &length)| (index, begin, length))
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
78pub enum PeerSource {
79 Tracker,
81 Dht,
83 Pex,
85 Lsd,
87 Incoming,
89 ResumeData,
91 I2p,
93}
94
95#[allow(dead_code)] pub(crate) struct PeerState {
98 pub addr: SocketAddr,
99 pub peer_choking: bool,
101 pub peer_interested: bool,
103 pub am_choking: bool,
105 pub am_interested: bool,
107 pub bitfield: Bitfield,
109 pub download_rate: u64,
111 pub upload_rate: u64,
113 pub download_bytes_window: u64,
115 pub upload_bytes_window: u64,
117 pub pending_requests: PendingRequests,
119 pub incoming_requests: Vec<(u32, u32, u32)>,
121 pub ext_handshake: Option<ExtHandshake>,
123 pub supports_fast: bool,
125 pub allowed_fast: HashSet<u32>,
127 pub upload_only: bool,
129 pub super_seed_assigned: Option<u32>,
131 pub cmd_tx: mpsc::Sender<PeerCommand>,
133 pub pipeline: PeerPipelineState,
135 pub snubbed: bool,
137 pub last_unchoked_at: Option<std::time::Instant>,
140 pub last_data_received: Option<std::time::Instant>,
142 pub connected_at: std::time::Instant,
144 pub suggested_pieces: HashSet<u32>,
146 pub source: PeerSource,
148 pub supports_holepunch: bool,
150 pub appears_nated: bool,
152 pub transport: Option<crate::rate_limiter::PeerTransport>,
154 pub blocks_completed: u64,
156 pub blocks_timed_out: u64,
158 pub avg_rtt: Option<f64>,
160 pub in_flight: Arc<AtomicU32>,
163 pub target_depth: Arc<AtomicU32>,
167 pub choked_since: Option<std::time::Instant>,
170 pub live_since: Option<std::time::Instant>,
173 pub download_bytes_total: u64,
179}
180
181#[allow(dead_code)]
182impl PeerState {
183 pub fn new(
184 addr: SocketAddr,
185 bitfield_len: u32,
186 cmd_tx: mpsc::Sender<PeerCommand>,
187 source: PeerSource,
188 in_flight: Arc<AtomicU32>,
189 target_depth: Arc<AtomicU32>,
190 ) -> Self {
191 Self {
192 addr,
193 peer_choking: true,
194 peer_interested: false,
195 am_choking: false, am_interested: false,
197 bitfield: Bitfield::new(bitfield_len),
198 download_rate: 0,
199 upload_rate: 0,
200 download_bytes_window: 0,
201 upload_bytes_window: 0,
202 pending_requests: PendingRequests::new(),
203 incoming_requests: Vec::with_capacity(32),
204 ext_handshake: None,
205 supports_fast: false,
206 allowed_fast: HashSet::new(),
207 upload_only: false,
208 super_seed_assigned: None,
209 cmd_tx,
210 pipeline: PeerPipelineState::new(),
211 snubbed: false,
212 last_unchoked_at: None,
213 last_data_received: None,
214 connected_at: std::time::Instant::now(),
215 suggested_pieces: HashSet::new(),
216 source,
217 supports_holepunch: false,
218 appears_nated: false,
219 transport: None,
220 blocks_completed: 0,
221 blocks_timed_out: 0,
222 avg_rtt: None,
223 in_flight,
224 target_depth,
225 choked_since: Some(std::time::Instant::now()),
227 live_since: None,
228 download_bytes_total: 0,
229 }
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn peer_source_serialization() {
239 let source = PeerSource::Tracker;
240 let json = serde_json::to_string(&source).unwrap();
241 assert_eq!(json, "\"Tracker\"");
242 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
243 assert_eq!(roundtrip, PeerSource::Tracker);
244 }
245
246 #[test]
247 fn peer_source_all_variants() {
248 let variants = [
249 PeerSource::Tracker,
250 PeerSource::Dht,
251 PeerSource::Pex,
252 PeerSource::Lsd,
253 PeerSource::Incoming,
254 PeerSource::ResumeData,
255 PeerSource::I2p,
256 ];
257 for source in variants {
258 let json = serde_json::to_string(&source).unwrap();
259 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
260 assert_eq!(roundtrip, source);
261 }
262 }
263
264 #[test]
265 fn peer_state_has_connected_at() {
266 let (tx, _rx) = tokio::sync::mpsc::channel(1);
267 let peer = PeerState::new(
268 "127.0.0.1:6881".parse().unwrap(),
269 100,
270 tx,
271 PeerSource::Tracker,
272 Arc::new(AtomicU32::new(0)),
273 Arc::new(AtomicU32::new(128)),
274 );
275 assert!(peer.connected_at.elapsed().as_secs() < 1);
276 }
277
278 #[test]
279 fn pending_requests_insert_remove() {
280 let mut pr = PendingRequests::new();
281 assert!(pr.is_empty());
282 assert_eq!(pr.len(), 0);
283
284 pr.insert(5, 0, 16384);
286 pr.insert(5, 16384, 16384);
287 pr.insert(10, 0, 16384);
288 assert_eq!(pr.len(), 3);
289 assert!(pr.contains(5, 0));
290 assert!(pr.contains(10, 0));
291 assert!(!pr.contains(99, 0));
292
293 assert_eq!(pr.remove(5, 0), Some(16384));
295 assert_eq!(pr.len(), 2);
296 assert!(!pr.contains(5, 0));
297
298 assert_eq!(pr.remove(99, 0), None);
300
301 pr.insert(5, 16384, 8192);
303 assert_eq!(pr.len(), 2); assert_eq!(pr.remove(5, 16384), Some(8192)); pr.insert(1, 0, 16384);
308 pr.clear();
309 assert!(pr.is_empty());
310
311 pr.insert(3, 0, 16384);
313 pr.insert(3, 16384, 16384);
314 let mut items: Vec<_> = pr.iter().collect();
315 items.sort();
316 assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
317 }
318
319 #[test]
320 fn peer_source_i2p_serialization() {
321 let source = PeerSource::I2p;
322 let json = serde_json::to_string(&source).unwrap();
323 assert_eq!(json, "\"I2p\"");
324 let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
325 assert_eq!(roundtrip, PeerSource::I2p);
326 }
327
328 #[test]
331 fn in_flight_zero_at_construction() {
332 let (tx, _rx) = tokio::sync::mpsc::channel(1);
333 let peer = PeerState::new(
334 "127.0.0.1:6881".parse().unwrap(),
335 100,
336 tx,
337 PeerSource::Tracker,
338 Arc::new(AtomicU32::new(0)),
339 Arc::new(AtomicU32::new(128)),
340 );
341 assert_eq!(
342 peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
343 0,
344 "in_flight should be zero at construction"
345 );
346 }
347
348 #[test]
349 fn build_peer_info_reads_in_flight() {
350 let counter = Arc::new(AtomicU32::new(42));
352 let (tx, _rx) = tokio::sync::mpsc::channel(1);
353 let peer = PeerState::new(
354 "127.0.0.1:6881".parse().unwrap(),
355 100,
356 tx,
357 PeerSource::Tracker,
358 Arc::clone(&counter),
359 Arc::new(AtomicU32::new(128)),
360 );
361
362 let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
364
365 assert_eq!(
366 num_pending, 42,
367 "num_pending_requests should read from in_flight atomic"
368 );
369
370 counter.store(99, std::sync::atomic::Ordering::Relaxed);
372 let num_pending_updated =
373 peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
374 assert_eq!(
375 num_pending_updated, 99,
376 "PeerState should see updates via shared Arc"
377 );
378 }
379}