irontide-engine 1.2.1

IronTide engine runtime: the per-torrent actor, peer I/O loops, and the non-leaf engine infrastructure (disk I/O, tracker management, alerts, streaming, extensions, SSL) — the renamed irontide-torrent-actor
Documentation
//! Zero-lock shared state and message types for the three per-peer tasks.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::time::Instant;

use parking_lot::Mutex;
use tokio::sync::{Notify, Semaphore};

use irontide_core::Lengths;
use irontide_storage::Bitfield;

use crate::piece_reservation::{
    AtomicPieceStates, AvailabilitySnapshot, BlockMaps, PieceWriteGuards, StealCandidates,
};

/// Initial pipeline depth — matches `INITIAL_QUEUE_DEPTH` in `torrent_peer_handler`.
///
/// Permits are **not** granted at construction; they are added when the peer
/// unchokes us (via `semaphore.add_permits(INITIAL_QUEUE_DEPTH)`).
pub(crate) const INITIAL_QUEUE_DEPTH: usize = 128;

/// M257c: permits to add when a block/reject returns one pipeline slot.
///
/// `granted` is the peer's depth AS IF the in-hand permit were returned
/// (`in_flight + available + 1`); `target` is the allocator's quota.
/// `0` = absorb (shrink), `1` = hold, `2` = grow toward the target.
#[inline]
pub(crate) fn refill_grant(granted: u32, target: u32) -> u32 {
    match granted.cmp(&target) {
        std::cmp::Ordering::Greater => 0,
        std::cmp::Ordering::Equal => 1,
        std::cmp::Ordering::Less => 2,
    }
}

/// M257c: budgeted replacement for the bare `add_permits(1)` at the
/// reader's return sites. Single-writer: only the reader task calls this.
/// Derives granted depth from live atomics — self-correcting across choke
/// cycles (`on_unchoke`'s drain+refill resets reality each cycle, and the
/// bounded transient overshoot from dangling pre-choke returns is absorbed
/// back to target at block-return rate; see the M257c plan, invariant 8).
pub(crate) fn return_permit_budgeted(shared: &PeerShared) {
    use std::sync::atomic::Ordering::Relaxed;
    let available = u32::try_from(shared.semaphore.available_permits()).unwrap_or(u32::MAX);
    let granted = shared
        .in_flight
        .load(Relaxed)
        .saturating_add(available)
        .saturating_add(1);
    let add = refill_grant(granted, shared.target_depth.load(Relaxed));
    if add == 0 {
        shared
            .counters
            .inc_diag(crate::stats::BUDGET_PERMITS_ABSORBED_TOTAL, 1);
    } else {
        shared.semaphore.add_permits(add as usize);
    }
}

/// Zero-lock shared state between the three per-peer tasks.
///
/// Owned by `Arc<PeerShared>` — one instance per peer connection, shared among
/// the reader, writer, and requester tasks.
pub(crate) struct PeerShared {
    /// Pipeline depth gating — `INITIAL_QUEUE_DEPTH` permits.
    /// Requester acquires, reader returns via `add_permits(1)`.
    pub semaphore: Arc<Semaphore>,
    /// Set by reader on Choke/Unchoke messages. Read by requester.
    pub peer_choking: AtomicBool,
    /// Reader notifies requester when unchoked.
    pub unchoke_notify: Notify,
    /// Peer socket address (for logging and events).
    pub addr: SocketAddr,
    /// Number of in-flight block requests (incremented by requester on send,
    /// decremented by reader on Piece/RejectRequest, reset to 0 on Choke).
    /// Shared with `TorrentActor` via `PeerState` for accurate diagnostics.
    pub in_flight: Arc<AtomicU32>,
    #[allow(
        dead_code,
        reason = "shared Arc read via PeerState for debug/state API"
    )]
    pub target_depth: Arc<AtomicU32>,
    /// M182: pinged by `requester_loop` after every `dispatch_rx.recv()`.
    /// `reader_loop` waits on this in a select arm and re-runs
    /// `BackpressureQueue::try_drain_one(&dispatch_tx)` so a `dispatch_tx`
    /// burst never parks ARM 1 (the wire reader). Single-waiter,
    /// `notify_one`. See M182 audit §4.
    ///
    /// Internal to the per-peer task set (reader + requester) — kept as
    /// a plain `Notify` because both endpoints are reached through the
    /// same `Arc<PeerShared>`.
    pub dispatch_drain_notify: Notify,
    /// M182: pinged by `TorrentActor` after every `event_rx.recv()` for
    /// this peer. Same role for `event_tx` as `dispatch_drain_notify`
    /// for `dispatch_tx`, but cross-actor — `TorrentActor` holds a
    /// clone of this `Arc` on `PeerState.event_drain_notify` (the same
    /// `Arc` is also stored here so the reader can wait on it).
    pub event_drain_notify: Arc<Notify>,
    /// Sim-perf engine surface: shared session counters used by
    /// `reader_loop` to track per-peer high-water and drain stats. In
    /// production cloned from the `SessionActor`'s counters; in test
    /// code created on demand via `SessionCounters::new()` (cheap; not
    /// asserted on).
    pub counters: Arc<crate::stats::SessionCounters>,
    /// M182 dispatch-channel reader-side spill cap (default
    /// [`crate::peer_backpressure::DISPATCH_BACKLOG_CAP`]). The
    /// would-have-caught harness lowers this to reproduce the M182
    /// backlog-too-small regression class.
    pub dispatch_backlog_cap: usize,
    /// M182 event-channel reader-side spill cap (default
    /// [`crate::peer_backpressure::EVENT_BACKLOG_CAP`]).
    pub event_backlog_cap: usize,
    /// When the remote peer last unchoked us. Set in `on_unchoke`, cleared
    /// in `on_choke`. Used to measure unchoke duration and first-block
    /// latency for the `target_depth` feedback loop investigation.
    pub remote_unchoked_at: Mutex<Option<Instant>>,
    /// Set `true` on remote unchoke, cleared on the first Piece message
    /// received afterward. Gates the first-block latency measurement so
    /// it fires exactly once per unchoke cycle.
    pub first_block_pending: AtomicBool,
    /// M257f: `(piece, begin)` → send time for RTT measurement. The
    /// requester inserts on a successfully queued Request — but ONLY
    /// when fewer than [`RTT_PROBE_MAX_INFLIGHT`] blocks were already
    /// in flight at send time (probe-gating); the reader removes on
    /// Piece receipt and ships the RTT to the actor on the block batch
    /// (feeding the `avg_rtt` EWMA the BDP depth caps read). The
    /// pre-M104 peer task did exactly this within one task; post-M243
    /// the requester and reader are separate tasks, hence the mutex
    /// (two uncontended touches per probe block). Entries for blocks
    /// that never arrive (endgame cancels) are bounded by the probe
    /// gate and dropped with the task; a re-request overwrites its key.
    pub request_sent_times: Mutex<std::collections::HashMap<(u32, u32), Instant>>,
}

/// Only requests sent with fewer than this many blocks already in
/// flight get RTT-timed. A request queued behind a deep pipeline
/// measures sojourn (queue drain + wire), not propagation — by
/// Little's law a saturated pipe's sojourn satisfies
/// `rate × sojourn ≈ in-flight` exactly, which turns the BDP formula
/// `rate × rtt / 16 KiB` into the tautology `cap ≈ current depth`
/// (M257f evidence run 2: every profile self-limited at whatever depth
/// the swarm dynamics happened to visit). Low-queue sends bound the
/// self-queue bias to `(gate − 1) × 16 KiB / rate` — exact on
/// high-BDP links, ≤ a few × propagation on very low-BDP ones, and
/// always biased UPWARD (a larger cap is the safe direction for
/// throughput). Samples recur whenever the pipe drains: connection
/// ramp, post-choke refill, endgame tails.
pub(crate) const RTT_PROBE_MAX_INFLIGHT: u32 = 8;

impl PeerShared {
    /// Create new shared state for a peer connection.
    ///
    /// The semaphore starts with **zero** permits — permits are added when
    /// the remote peer sends an Unchoke message.  `peer_choking` starts
    /// `true` (`BitTorrent` specification default).
    ///
    /// `event_drain_notify` is created at the peer spawn site so the
    /// `TorrentActor` can hold a clone alongside the
    /// `Arc<PeerShared>` on `PeerState`. Both refer to the same `Notify`.
    ///
    /// This shorthand uses fresh per-peer [`SessionCounters`] —
    /// production paths should prefer [`Self::new_with_counters`] so the
    /// sim-perf gauges aggregate across every `reader_loop` in the session.
    #[allow(
        dead_code,
        reason = "test helper used in `peer_tasks::tests` and `torrent_peer_handler::tests`"
    )]
    pub fn new(
        addr: SocketAddr,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<Notify>,
    ) -> Self {
        Self::new_with_counters(
            addr,
            in_flight,
            target_depth,
            event_drain_notify,
            Arc::new(crate::stats::SessionCounters::new()),
        )
    }

    /// Like [`Self::new`] but uses the caller-supplied counters Arc.
    /// Production call sites pass the `SessionActor`'s counters so the
    /// sim-perf surface (high-water gauges + drain counters) reflects
    /// real session state.
    pub fn new_with_counters(
        addr: SocketAddr,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<Notify>,
        counters: Arc<crate::stats::SessionCounters>,
    ) -> Self {
        Self::new_with_loop_config(
            addr,
            in_flight,
            target_depth,
            event_drain_notify,
            counters,
            crate::peer_backpressure::DISPATCH_BACKLOG_CAP,
            crate::peer_backpressure::EVENT_BACKLOG_CAP,
        )
    }

    /// Full constructor — also accepts the M182 spill caps and
    /// harness's would-have-caught regression demos to reproduce
    /// historical bugs (M182 cap=2, v0.186.1 fatal-overflow).
    #[allow(clippy::too_many_arguments)]
    pub fn new_with_loop_config(
        addr: SocketAddr,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        event_drain_notify: Arc<Notify>,
        counters: Arc<crate::stats::SessionCounters>,
        dispatch_backlog_cap: usize,
        event_backlog_cap: usize,
    ) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(0)),
            peer_choking: AtomicBool::new(true),
            unchoke_notify: Notify::new(),
            addr,
            in_flight,
            target_depth,
            dispatch_drain_notify: Notify::new(),
            event_drain_notify,
            counters,
            dispatch_backlog_cap,
            event_backlog_cap,
            remote_unchoked_at: Mutex::new(None),
            first_block_pending: AtomicBool::new(false),
            request_sent_times: Mutex::new(std::collections::HashMap::new()),
        }
    }
}

/// Commands from reader to requester for dispatch state management.
#[allow(dead_code)]
pub(crate) enum DispatchCommand {
    /// Transition to requesting state — provides dispatch resources.
    Start {
        // -- New M187 fields (always present) --
        /// Piece arithmetic (piece sizes, offsets, chunk size).
        lengths: Lengths,
        /// Copy of the peer's bitfield (kept for Have/Bitfield updates).
        peer_bitfield: Bitfield,
        /// Notification channel for new piece availability.
        piece_notify: Arc<Notify>,
        // -- Old CAS fields (only populated when use_actor_dispatch=false) --
        /// Shared atomic piece states for CAS reservation.
        atomic_states: Option<Arc<AtomicPieceStates>>,
        /// Initial availability snapshot for CAS dispatch.
        snapshot: Option<Arc<AvailabilitySnapshot>>,
        /// Block-level request/receipt tracking for steal visibility.
        block_maps: Option<Arc<BlockMaps>>,
        /// Queue of pieces available for block-level stealing.
        steal_candidates: Option<Arc<StealCandidates>>,
        /// Per-piece write guards for steal/write race prevention.
        piece_write_guards: Option<Arc<PieceWriteGuards>>,
    },
    /// Peer sent Have — update local bitfield copy.
    PeerHave(u32),
    /// Peer sent Bitfield — replace local copy.
    PeerBitfield(Bitfield),
    /// Updated availability snapshot (CAS path only).
    Snapshot(Arc<AvailabilitySnapshot>),
    /// Stop requesting — handled by `requester_loop`, constructed in tests
    /// and future production use (graceful shutdown).
    Stop,
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::Ordering::Relaxed;

    fn test_shared(target: u32) -> PeerShared {
        PeerShared::new(
            "127.0.0.1:6881".parse().unwrap(),
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(target)),
            Arc::new(Notify::new()),
        )
    }

    #[test]
    fn m257c_refill_grant_holds_at_target() {
        // granted (post-return) == target -> return exactly the one permit
        assert_eq!(refill_grant(128, 128), 1);
        assert_eq!(refill_grant(8, 8), 1);
    }

    #[test]
    fn m257c_refill_grant_shrinks_above_target() {
        // granted above target -> absorb the permit (shrink by one)
        assert_eq!(refill_grant(128, 8), 0);
        assert_eq!(refill_grant(9, 8), 0);
    }

    #[test]
    fn m257c_refill_grant_grows_below_target() {
        // granted below target -> return two (grow by one)
        assert_eq!(refill_grant(8, 128), 2);
        assert_eq!(refill_grant(127, 128), 2);
    }

    #[tokio::test]
    async fn m257c_return_permit_budgeted_converges_to_target() {
        let shared = test_shared(8);
        // Simulate a legacy 12-deep grant: 10 available + 2 in flight.
        shared.semaphore.add_permits(10);
        shared.in_flight.store(2, Relaxed);
        // A block returns: granted-as-if-returned = 1 + 10 + 1 = 12 > 8 -> absorb.
        shared.in_flight.store(1, Relaxed); // reader decremented first
        return_permit_budgeted(&shared);
        assert_eq!(
            shared.semaphore.available_permits(),
            10,
            "absorbed, not returned"
        );
        // Now grow: target jumps to 20; granted = 1 + 10 + 1 = 12 < 20 -> add 2.
        shared.target_depth.store(20, Relaxed);
        return_permit_budgeted(&shared);
        assert_eq!(shared.semaphore.available_permits(), 12, "grew by one net");
    }

    #[tokio::test]
    async fn m257c_dangling_returns_overshoot_then_reconverge() {
        let shared = test_shared(8);
        // Post-unchoke steady state at target: 8 truly in flight, 0 available.
        shared.in_flight.store(8, Relaxed);
        // 3 dangling PRE-choke blocks arrive (their epoch's in_flight was
        // reset to 0 by on_choke): the reader's dec-then-return under-counts.
        for _ in 0..3 {
            let v = shared.in_flight.load(Relaxed);
            shared.in_flight.store(v.saturating_sub(1), Relaxed);
            return_permit_budgeted(&shared);
        }
        // granted measured 8 each time (deficit offsets the phantom permits):
        // 3 phantom permits circulate on top of the 8 real in-flight = 11 vs 8.
        assert_eq!(shared.semaphore.available_permits(), 3, "bounded overshoot");
        // The real 8 blocks return; the deficit drains through the saturation
        // floor and the absorb branch reclaims the surplus.
        for _ in 0..8 {
            let v = shared.in_flight.load(Relaxed);
            shared.in_flight.store(v.saturating_sub(1), Relaxed);
            return_permit_budgeted(&shared);
        }
        assert_eq!(shared.in_flight.load(Relaxed), 0);
        assert_eq!(
            shared.semaphore.available_permits(),
            8,
            "circulation reconverges to exactly target"
        );
    }
}