irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
//! Zero-lock shared state and message types for the three per-peer tasks.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::time::Instant;

use bytes::Bytes;
use parking_lot::Mutex;
use tokio::sync::{Notify, Semaphore};

use irontide_core::Lengths;
use irontide_storage::Bitfield;
use irontide_wire::Message;

use crate::piece_reservation::{
    AtomicPieceStates, AvailabilitySnapshot, BlockMaps, PieceWriteGuards, StealCandidates,
};

/// Initial pipeline depth — matches `INITIAL_QUEUE_DEPTH` in `torrent_peer_handler`.
///
/// Permits are **not** granted at construction; they are added when the peer
/// unchokes us (via `semaphore.add_permits(INITIAL_QUEUE_DEPTH)`).
pub(crate) const INITIAL_QUEUE_DEPTH: usize = 128;

/// Zero-lock shared state between the three per-peer tasks.
///
/// Owned by `Arc<PeerShared>` — one instance per peer connection, shared among
/// the reader, writer, and requester tasks.
pub(crate) struct PeerShared {
    /// Pipeline depth gating — `INITIAL_QUEUE_DEPTH` permits.
    /// Requester acquires, reader returns via `add_permits(1)`.
    pub semaphore: Arc<Semaphore>,
    /// Set by reader on Choke/Unchoke messages. Read by requester.
    pub peer_choking: AtomicBool,
    /// Reader notifies requester when unchoked.
    pub unchoke_notify: Notify,
    /// Peer socket address (for logging and events).
    pub addr: SocketAddr,
    /// Number of in-flight block requests (incremented by requester on send,
    /// decremented by reader on Piece/RejectRequest, reset to 0 on Choke).
    /// Shared with `TorrentActor` via `PeerState` for accurate diagnostics.
    pub in_flight: Arc<AtomicU32>,
    #[allow(
        dead_code,
        reason = "shared Arc read via PeerState for debug/state API"
    )]
    pub target_depth: Arc<AtomicU32>,
    /// M182: pinged by `requester_loop` after every `dispatch_rx.recv()`.
    /// `reader_loop` waits on this in a select arm and re-runs
    /// `BackpressureQueue::try_drain_one(&dispatch_tx)` so a `dispatch_tx`
    /// burst never parks ARM 1 (the wire reader). Single-waiter,
    /// `notify_one`. See M182 audit §4.
    ///
    /// Internal to the per-peer task set (reader + requester) — kept as
    /// a plain `Notify` because both endpoints are reached through the
    /// same `Arc<PeerShared>`.
    pub dispatch_drain_notify: Notify,
    /// M182: pinged by `TorrentActor` after every `event_rx.recv()` for
    /// this peer. Same role for `event_tx` as `dispatch_drain_notify`
    /// for `dispatch_tx`, but cross-actor — `TorrentActor` holds a
    /// clone of this `Arc` on `PeerState.event_drain_notify` (the same
    /// `Arc` is also stored here so the reader can wait on it).
    pub event_drain_notify: Arc<Notify>,
    /// Sim-perf engine surface: shared session counters used by
    /// `reader_loop` to track per-peer high-water and drain stats. In
    /// production cloned from the `SessionActor`'s counters; in test
    /// code created on demand via `SessionCounters::new()` (cheap; not
    /// asserted on).
    pub counters: Arc<crate::stats::SessionCounters>,
    /// M182 dispatch-channel reader-side spill cap (default
    /// [`crate::peer_backpressure::DISPATCH_BACKLOG_CAP`]). The
    /// would-have-caught harness lowers this to reproduce the M182
    /// backlog-too-small regression class.
    pub dispatch_backlog_cap: usize,
    /// M182 event-channel reader-side spill cap (default
    /// [`crate::peer_backpressure::EVENT_BACKLOG_CAP`]).
    pub event_backlog_cap: usize,
    /// When the remote peer last unchoked us. Set in `on_unchoke`, cleared
    /// in `on_choke`. Used to measure unchoke duration and first-block
    /// latency for the `target_depth` feedback loop investigation.
    pub remote_unchoked_at: Mutex<Option<Instant>>,
    /// Set `true` on remote unchoke, cleared on the first Piece message
    /// received afterward. Gates the first-block latency measurement so
    /// it fires exactly once per unchoke cycle.
    pub first_block_pending: AtomicBool,
}

impl PeerShared {
    /// Create new shared state for a peer connection.
    ///
    /// The semaphore starts with **zero** permits — permits are added when
    /// the remote peer sends an Unchoke message.  `peer_choking` starts
    /// `true` (`BitTorrent` specification default).
    ///
    /// `event_drain_notify` is created at the peer spawn site so the
    /// `TorrentActor` can hold a clone alongside the
    /// `Arc<PeerShared>` on `PeerState`. Both refer to the same `Notify`.
    ///
    /// This shorthand uses fresh per-peer [`SessionCounters`] —
    /// production paths should prefer [`Self::new_with_counters`] so the
    /// sim-perf gauges aggregate across every `reader_loop` in the session.
    #[allow(
        dead_code,
        reason = "test helper used in `peer_tasks::tests` and `torrent_peer_handler::tests`"
    )]
    pub fn new(
        addr: SocketAddr,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<Notify>,
    ) -> Self {
        Self::new_with_counters(
            addr,
            in_flight,
            target_depth,
            event_drain_notify,
            Arc::new(crate::stats::SessionCounters::new()),
        )
    }

    /// Like [`Self::new`] but uses the caller-supplied counters Arc.
    /// Production call sites pass the `SessionActor`'s counters so the
    /// sim-perf surface (high-water gauges + drain counters) reflects
    /// real session state.
    pub fn new_with_counters(
        addr: SocketAddr,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<Notify>,
        counters: Arc<crate::stats::SessionCounters>,
    ) -> Self {
        Self::new_with_loop_config(
            addr,
            in_flight,
            target_depth,
            event_drain_notify,
            counters,
            crate::peer_backpressure::DISPATCH_BACKLOG_CAP,
            crate::peer_backpressure::EVENT_BACKLOG_CAP,
        )
    }

    /// Full constructor — also accepts the M182 spill caps and
    /// harness's would-have-caught regression demos to reproduce
    /// historical bugs (M182 cap=2, v0.186.1 fatal-overflow).
    #[allow(clippy::too_many_arguments)]
    pub fn new_with_loop_config(
        addr: SocketAddr,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<Notify>,
        counters: Arc<crate::stats::SessionCounters>,
        dispatch_backlog_cap: usize,
        event_backlog_cap: usize,
    ) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(0)),
            peer_choking: AtomicBool::new(true),
            unchoke_notify: Notify::new(),
            addr,
            in_flight,
            target_depth,
            dispatch_drain_notify: Notify::new(),
            event_drain_notify,
            counters,
            dispatch_backlog_cap,
            event_backlog_cap,
            remote_unchoked_at: Mutex::new(None),
            first_block_pending: AtomicBool::new(false),
        }
    }
}

/// Messages sent through the writer channel from requester and reader.
pub(crate) enum OutgoingMessage {
    // -- From requester (hot path) --
    /// Request a block from the peer.
    Request {
        /// Piece index.
        index: u32,
        /// Byte offset within the piece.
        begin: u32,
        /// Block length in bytes.
        length: u32,
    },

    // -- From reader/handler --
    /// Announce that we have a piece.
    Have {
        /// Piece index.
        index: u32,
    },
    /// Cancel a previously-sent request.
    Cancel {
        /// Piece index.
        index: u32,
        /// Byte offset within the piece.
        begin: u32,
        /// Block length in bytes.
        length: u32,
    },
    /// Keep-alive (zero-length message).
    Keepalive,
    /// Express interest in the peer's pieces.
    Interested,
    /// Withdraw interest.
    NotInterested,
    /// Unchoke the remote peer.
    Unchoke,
    /// Choke the remote peer.
    Choke,

    // -- Extension and upload messages (cold path, heap-allocated) --
    /// Arbitrary wire message (extensions, upload pieces, etc.).
    Wire(Message<Bytes>),
}

/// Commands from reader to requester for dispatch state management.
#[allow(dead_code)]
pub(crate) enum DispatchCommand {
    /// Transition to requesting state — provides dispatch resources.
    Start {
        // -- New M187 fields (always present) --
        /// Piece arithmetic (piece sizes, offsets, chunk size).
        lengths: Lengths,
        /// Copy of the peer's bitfield (kept for Have/Bitfield updates).
        peer_bitfield: Bitfield,
        /// Notification channel for new piece availability.
        piece_notify: Arc<Notify>,
        // -- Old CAS fields (only populated when use_actor_dispatch=false) --
        /// Shared atomic piece states for CAS reservation.
        atomic_states: Option<Arc<AtomicPieceStates>>,
        /// Initial availability snapshot for CAS dispatch.
        snapshot: Option<Arc<AvailabilitySnapshot>>,
        /// Block-level request/receipt tracking for steal visibility.
        block_maps: Option<Arc<BlockMaps>>,
        /// Queue of pieces available for block-level stealing.
        steal_candidates: Option<Arc<StealCandidates>>,
        /// Per-piece write guards for steal/write race prevention.
        piece_write_guards: Option<Arc<PieceWriteGuards>>,
    },
    /// Peer sent Have — update local bitfield copy.
    PeerHave(u32),
    /// Peer sent Bitfield — replace local copy.
    PeerBitfield(Bitfield),
    /// Updated availability snapshot (CAS path only).
    Snapshot(Arc<AvailabilitySnapshot>),
    /// Stop requesting — handled by `requester_loop`, constructed in tests
    /// and future production use (graceful shutdown).
    Stop,
}