irontide-session 0.165.0

BitTorrent session management: peers, torrents, and piece selection
Documentation
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;

/// EWMA smoothing factor for RTT tracking (M106).
pub(crate) const RTT_EWMA_ALPHA: f64 = 0.3;

/// Compute one step of an exponential weighted moving average update.
///
/// `current` is the existing average, `sample` is the new observation, and
/// `alpha` controls responsiveness (higher = more weight to recent samples).
/// Returns the updated average.
pub(crate) fn ewma_update(current: f64, sample: f64, alpha: f64) -> f64 {
    alpha * sample + (1.0 - alpha) * current
}

use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

use irontide_storage::Bitfield;
use irontide_wire::ExtHandshake;

use crate::pipeline::PeerPipelineState;
use crate::types::PeerCommand;

/// O(1) lookup container for outstanding block requests to a peer.
///
/// Keyed on `(piece_index, block_offset)`, value is `block_length`.
/// Replaces `Vec<(u32, u32, u32)>` which required O(n) linear scan.
#[derive(Debug, Clone)]
pub(crate) struct PendingRequests {
    inner: FxHashMap<(u32, u32), u32>,
}

#[allow(dead_code)]
impl PendingRequests {
    pub fn new() -> Self {
        Self {
            inner: FxHashMap::with_capacity_and_hasher(32, Default::default()),
        }
    }

    pub fn insert(&mut self, index: u32, begin: u32, length: u32) {
        self.inner.insert((index, begin), length);
    }

    pub fn remove(&mut self, index: u32, begin: u32) -> Option<u32> {
        self.inner.remove(&(index, begin))
    }

    pub fn contains(&self, index: u32, begin: u32) -> bool {
        self.inner.contains_key(&(index, begin))
    }

    pub fn len(&self) -> usize {
        self.inner.len()
    }

    pub fn is_empty(&self) -> bool {
        self.inner.is_empty()
    }

    pub fn clear(&mut self) {
        self.inner.clear();
    }

    pub fn iter(&self) -> impl Iterator<Item = (u32, u32, u32)> + '_ {
        self.inner
            .iter()
            .map(|(&(index, begin), &length)| (index, begin, length))
    }
}

/// Origin of a peer address.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum PeerSource {
    /// Returned by a tracker announce.
    Tracker,
    /// Discovered via the DHT.
    Dht,
    /// Received via Peer Exchange (BEP 11).
    Pex,
    /// Found via Local Service Discovery (BEP 14).
    Lsd,
    /// Connected to us (incoming connection).
    Incoming,
    /// Loaded from saved resume data.
    ResumeData,
    /// Discovered via I2P SAM bridge.
    I2p,
}

/// Per-peer state tracked by the torrent actor.
#[allow(dead_code)] // consumed by torrent module (not yet implemented)
pub(crate) struct PeerState {
    pub addr: SocketAddr,
    /// Peer is choking us (default: true).
    pub peer_choking: bool,
    /// Peer is interested in us (default: false).
    pub peer_interested: bool,
    /// We are choking the peer (default: true).
    pub am_choking: bool,
    /// We are interested in the peer (default: false).
    pub am_interested: bool,
    /// What pieces the peer has.
    pub bitfield: Bitfield,
    /// Download rate in bytes/sec (for choker).
    pub download_rate: u64,
    /// Upload rate in bytes/sec.
    pub upload_rate: u64,
    /// Bytes downloaded from this peer in current rate window.
    pub download_bytes_window: u64,
    /// Bytes uploaded to this peer in current rate window.
    pub upload_bytes_window: u64,
    /// Outstanding requests to this peer (index, begin, length).
    pub pending_requests: PendingRequests,
    /// Requests from this peer to us (index, begin, length).
    pub incoming_requests: Vec<(u32, u32, u32)>,
    /// Peer's extension handshake, if received.
    pub ext_handshake: Option<ExtHandshake>,
    /// Whether the peer supports BEP 6 Fast Extension.
    pub supports_fast: bool,
    /// Set of piece indices the peer is allowed to request while choked.
    pub allowed_fast: HashSet<u32>,
    /// BEP 21: peer declared upload-only status.
    pub upload_only: bool,
    /// BEP 16: piece index we revealed to this peer in super-seed mode.
    pub super_seed_assigned: Option<u32>,
    /// Channel to send commands to this peer's task.
    pub cmd_tx: mpsc::Sender<PeerCommand>,
    /// Per-peer dynamic request queue sizing (M28).
    pub pipeline: PeerPipelineState,
    /// Whether this peer is snubbed (no data for snub_timeout_secs).
    pub snubbed: bool,
    /// When this peer last unchoked us. Used for time-windowed rotation protection:
    /// peers unchoked within 30s are protected from choke rotation.
    pub last_unchoked_at: Option<std::time::Instant>,
    /// Last time we received data from this peer.
    pub last_data_received: Option<std::time::Instant>,
    /// When this peer connection was established.
    pub connected_at: std::time::Instant,
    /// BEP 6: pieces suggested by this peer.
    pub suggested_pieces: HashSet<u32>,
    /// How this peer was discovered.
    pub source: PeerSource,
    /// BEP 55: peer advertised `ut_holepunch` support in their extension handshake.
    pub supports_holepunch: bool,
    /// Whether this peer appears to be NATed (no incoming connections observed).
    pub appears_nated: bool,
    /// Transport protocol used for this peer connection.
    pub transport: Option<crate::rate_limiter::PeerTransport>,
    /// Number of successfully received blocks.
    pub blocks_completed: u64,
    /// Number of blocks that timed out.
    pub blocks_timed_out: u64,
    /// Exponentially weighted moving average of RTT in seconds.
    pub avg_rtt: Option<f64>,
    /// Shared atomic in-flight request counter from `PeerShared`.
    /// Created at the spawn site and shared with the peer task — always available.
    pub in_flight: Arc<AtomicU32>,
    /// M149: Dynamic per-peer pipeline depth target. Updated every 10s in
    /// `update_peer_rates()` based on download throughput. The requester task
    /// checks this before acquiring a semaphore permit.
    pub target_depth: Arc<AtomicU32>,
    /// M147: When the remote peer started choking us. Set on `PeerChoking { choking: true }`,
    /// cleared on `PeerChoking { choking: false }`. Used for eviction scoring.
    pub choked_since: Option<std::time::Instant>,
    /// M147: When this peer transitioned to Live (completed BT handshake).
    /// Used for 10-second grace period protection during eviction.
    pub live_since: Option<std::time::Instant>,
    /// M149: Cumulative bytes downloaded from this peer over the entire connection
    /// lifetime. Unlike `download_bytes_window` (reset every rate-calculation
    /// interval), this counter is monotonically increasing and never reset.
    /// Used by Pass 0 eviction: a peer with `download_bytes_total == 0` has
    /// never sent us any data.
    pub download_bytes_total: u64,
}

#[allow(dead_code)]
impl PeerState {
    pub fn new(
        addr: SocketAddr,
        bitfield_len: u32,
        cmd_tx: mpsc::Sender<PeerCommand>,
        source: PeerSource,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
    ) -> Self {
        Self {
            addr,
            peer_choking: true,
            peer_interested: false,
            am_choking: false, // M107: unconditional Unchoke is sent on connect
            am_interested: false,
            bitfield: Bitfield::new(bitfield_len),
            download_rate: 0,
            upload_rate: 0,
            download_bytes_window: 0,
            upload_bytes_window: 0,
            pending_requests: PendingRequests::new(),
            incoming_requests: Vec::with_capacity(32),
            ext_handshake: None,
            supports_fast: false,
            allowed_fast: HashSet::new(),
            upload_only: false,
            super_seed_assigned: None,
            cmd_tx,
            pipeline: PeerPipelineState::new(),
            snubbed: false,
            last_unchoked_at: None,
            last_data_received: None,
            connected_at: std::time::Instant::now(),
            suggested_pieces: HashSet::new(),
            source,
            supports_holepunch: false,
            appears_nated: false,
            transport: None,
            blocks_completed: 0,
            blocks_timed_out: 0,
            avg_rtt: None,
            in_flight,
            target_depth,
            // M147: Peer starts choked (peer_choking: true), so choked_since reflects that.
            choked_since: Some(std::time::Instant::now()),
            live_since: None,
            download_bytes_total: 0,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn peer_source_serialization() {
        let source = PeerSource::Tracker;
        let json = serde_json::to_string(&source).unwrap();
        assert_eq!(json, "\"Tracker\"");
        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
        assert_eq!(roundtrip, PeerSource::Tracker);
    }

    #[test]
    fn peer_source_all_variants() {
        let variants = [
            PeerSource::Tracker,
            PeerSource::Dht,
            PeerSource::Pex,
            PeerSource::Lsd,
            PeerSource::Incoming,
            PeerSource::ResumeData,
            PeerSource::I2p,
        ];
        for source in variants {
            let json = serde_json::to_string(&source).unwrap();
            let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
            assert_eq!(roundtrip, source);
        }
    }

    #[test]
    fn peer_state_has_connected_at() {
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        let peer = PeerState::new(
            "127.0.0.1:6881".parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(128)),
        );
        assert!(peer.connected_at.elapsed().as_secs() < 1);
    }

    #[test]
    fn pending_requests_insert_remove() {
        let mut pr = PendingRequests::new();
        assert!(pr.is_empty());
        assert_eq!(pr.len(), 0);

        // Insert
        pr.insert(5, 0, 16384);
        pr.insert(5, 16384, 16384);
        pr.insert(10, 0, 16384);
        assert_eq!(pr.len(), 3);
        assert!(pr.contains(5, 0));
        assert!(pr.contains(10, 0));
        assert!(!pr.contains(99, 0));

        // Remove existing
        assert_eq!(pr.remove(5, 0), Some(16384));
        assert_eq!(pr.len(), 2);
        assert!(!pr.contains(5, 0));

        // Remove non-existent
        assert_eq!(pr.remove(99, 0), None);

        // Duplicate insert overwrites
        pr.insert(5, 16384, 8192);
        assert_eq!(pr.len(), 2); // same key, count unchanged
        assert_eq!(pr.remove(5, 16384), Some(8192)); // new value

        // Clear
        pr.insert(1, 0, 16384);
        pr.clear();
        assert!(pr.is_empty());

        // Iter
        pr.insert(3, 0, 16384);
        pr.insert(3, 16384, 16384);
        let mut items: Vec<_> = pr.iter().collect();
        items.sort();
        assert_eq!(items, vec![(3, 0, 16384), (3, 16384, 16384)]);
    }

    #[test]
    fn peer_source_i2p_serialization() {
        let source = PeerSource::I2p;
        let json = serde_json::to_string(&source).unwrap();
        assert_eq!(json, "\"I2p\"");
        let roundtrip: PeerSource = serde_json::from_str(&json).unwrap();
        assert_eq!(roundtrip, PeerSource::I2p);
    }

    // ── M132: in_flight counter always available at construction ──

    #[test]
    fn in_flight_zero_at_construction() {
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        let peer = PeerState::new(
            "127.0.0.1:6881".parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(128)),
        );
        assert_eq!(
            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed),
            0,
            "in_flight should be zero at construction"
        );
    }

    #[test]
    fn build_peer_info_reads_in_flight() {
        // Verify that in_flight is read directly from the shared atomic counter.
        let counter = Arc::new(AtomicU32::new(42));
        let (tx, _rx) = tokio::sync::mpsc::channel(1);
        let peer = PeerState::new(
            "127.0.0.1:6881".parse().unwrap(),
            100,
            tx,
            PeerSource::Tracker,
            Arc::clone(&counter),
            Arc::new(AtomicU32::new(128)),
        );

        // Simulate the build_peer_info logic — direct load, no Option.
        let num_pending = peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;

        assert_eq!(
            num_pending, 42,
            "num_pending_requests should read from in_flight atomic"
        );

        // Mutate via the external Arc clone — PeerState sees the update.
        counter.store(99, std::sync::atomic::Ordering::Relaxed);
        let num_pending_updated =
            peer.in_flight.load(std::sync::atomic::Ordering::Relaxed) as usize;
        assert_eq!(
            num_pending_updated, 99,
            "PeerState should see updates via shared Arc"
        );
    }
}