irontide-session 0.165.0

BitTorrent session management: peers, torrents, and piece selection
Documentation
//! Zero-lock shared state and message types for the three per-peer tasks.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};

use bytes::Bytes;
use tokio::sync::{Notify, Semaphore};

use irontide_core::Lengths;
use irontide_storage::Bitfield;
use irontide_wire::Message;

use crate::piece_reservation::{
    AtomicPieceStates, AvailabilitySnapshot, BlockMaps, PieceWriteGuards, StealCandidates,
};

/// Initial pipeline depth — matches `INITIAL_QUEUE_DEPTH` in `torrent_peer_handler`.
///
/// Permits are **not** granted at construction; they are added when the peer
/// unchokes us (via `semaphore.add_permits(INITIAL_QUEUE_DEPTH)`).
pub(crate) const INITIAL_QUEUE_DEPTH: usize = 128;

/// Zero-lock shared state between the three per-peer tasks.
///
/// Owned by `Arc<PeerShared>` — one instance per peer connection, shared among
/// the reader, writer, and requester tasks.
pub(crate) struct PeerShared {
    /// Pipeline depth gating — `INITIAL_QUEUE_DEPTH` permits.
    /// Requester acquires, reader returns via `add_permits(1)`.
    pub semaphore: Arc<Semaphore>,
    /// Set by reader on Choke/Unchoke messages. Read by requester.
    pub peer_choking: AtomicBool,
    /// Reader notifies requester when unchoked.
    pub unchoke_notify: Notify,
    /// Peer socket address (for logging and events).
    pub addr: SocketAddr,
    /// Number of in-flight block requests (incremented by requester on send,
    /// decremented by reader on Piece/RejectRequest, reset to 0 on Choke).
    /// Shared with `TorrentActor` via `PeerState` for accurate diagnostics.
    pub in_flight: Arc<AtomicU32>,
    /// M149: Dynamic pipeline depth target — requester gates on
    /// `in_flight < target_depth` before acquiring semaphore.
    pub target_depth: Arc<AtomicU32>,
    /// M149: Reader fires this when `in_flight` decrements (Piece/Reject/Choke),
    /// waking the requester's depth gate immediately instead of waiting for
    /// the 1s safety-net tick or TorrentActor event processing.
    pub depth_notify: Notify,
}

impl PeerShared {
    /// Create new shared state for a peer connection.
    ///
    /// The semaphore starts with **zero** permits — permits are added when
    /// the remote peer sends an Unchoke message.  `peer_choking` starts
    /// `true` (BitTorrent specification default).
    pub fn new(addr: SocketAddr, in_flight: Arc<AtomicU32>, target_depth: Arc<AtomicU32>) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(0)),
            peer_choking: AtomicBool::new(true),
            unchoke_notify: Notify::new(),
            addr,
            in_flight,
            target_depth,
            depth_notify: Notify::new(),
        }
    }
}

/// Messages sent through the writer channel from requester and reader.
pub(crate) enum OutgoingMessage {
    // -- From requester (hot path) --
    /// Request a block from the peer.
    Request {
        /// Piece index.
        index: u32,
        /// Byte offset within the piece.
        begin: u32,
        /// Block length in bytes.
        length: u32,
    },

    // -- From reader/handler --
    /// Announce that we have a piece.
    Have {
        /// Piece index.
        index: u32,
    },
    /// Cancel a previously-sent request.
    Cancel {
        /// Piece index.
        index: u32,
        /// Byte offset within the piece.
        begin: u32,
        /// Block length in bytes.
        length: u32,
    },
    /// Keep-alive (zero-length message).
    Keepalive,
    /// Express interest in the peer's pieces.
    Interested,
    /// Withdraw interest.
    NotInterested,
    /// Unchoke the remote peer.
    Unchoke,
    /// Choke the remote peer.
    Choke,

    // -- Extension and upload messages (cold path, heap-allocated) --
    /// Arbitrary wire message (extensions, upload pieces, etc.).
    Wire(Message<Bytes>),
}

/// Commands from reader to requester for dispatch state management.
pub(crate) enum DispatchCommand {
    /// Transition to requesting state — provides all dispatch resources.
    Start {
        /// Atomic piece states for CAS-based reservation.
        atomic_states: Arc<AtomicPieceStates>,
        /// Rarest-first availability snapshot.
        snapshot: Arc<AvailabilitySnapshot>,
        /// Per-block request/received tracking (if block stealing enabled).
        block_maps: Option<Arc<BlockMaps>>,
        /// Shared steal candidate queue (if block stealing enabled).
        steal_candidates: Option<Arc<StealCandidates>>,
        /// Per-piece write guards (prevents steal/write races).
        piece_write_guards: Option<Arc<PieceWriteGuards>>,
        /// Piece arithmetic (piece sizes, offsets, chunk size).
        lengths: Lengths,
        /// Copy of the peer's bitfield.
        peer_bitfield: Bitfield,
        /// Notification channel for new piece availability.
        piece_notify: Arc<Notify>,
    },
    /// Updated rarest-first snapshot from `TorrentActor`.
    Snapshot(Arc<AvailabilitySnapshot>),
    /// Peer sent Have — update local bitfield copy.
    PeerHave(u32),
    /// Peer sent Bitfield — replace local copy.
    PeerBitfield(Bitfield),
    /// Stop requesting — handled by requester_loop, constructed in tests
    /// and future production use (graceful shutdown).
    #[allow(dead_code)]
    Stop,
}