irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
//! M182: per-channel backpressure queue used by `reader_loop`.
//!
//! The pre-M182 `reader_loop` had three `tokio::select!` arms that
//! `.await`ed on bounded `mpsc::Sender::send()`. When the downstream
//! (`dispatch_tx` to the per-peer requester, `event_tx` to the
//! `TorrentActor`) was slow, the entire `select!` parked — including
//! ARM 1 (the wire reader) — and TCP back-pressure built up against
//! the kernel buffer. See
//! `docs/investigations/2026-05-02-m182-stall-audit/INVESTIGATION.md`
//! §1 for the full audit.
//!
//! The fix: every reader-side bounded send goes through a
//! [`BackpressureQueue<T>`]. The queue tries `try_send` first; on
//! `Full`, it spills into a small local `VecDeque`. The downstream
//! consumer pings a shared [`tokio::sync::Notify`] after each
//! `recv()`, and the reader's outer `select!` re-runs
//! [`BackpressureQueue::try_drain_one`] when notified. If the queue
//! exceeds its cap, the caller treats it as a stall and disconnects
//! the peer.

use std::collections::VecDeque;

use tokio::sync::mpsc;

/// M182 dispatch-channel backlog cap.
///
/// `dispatch_tx` carries `DispatchCommand::{Start, Stop, Snapshot, …}`
/// from `reader_loop` to `requester_loop`. Today the channel itself is
/// bounded (constructed in `peer_connection.rs`); this cap is the
/// *additional* slack the reader keeps on its own stack so the wire
/// reader (ARM 1) is never parked by a slow requester.
///
/// Sizing: M173.3 throttles snapshot publishes to ≥ 250 ms; 8 slots
/// give roughly 2 s of stall tolerance before disconnect. Commands
/// (Start/Stop/UpdateNumPieces) are sparse so they comfortably fit.
pub const DISPATCH_BACKLOG_CAP: usize = 8;

/// M182 event-channel backlog cap.
///
/// `event_tx` carries `PeerEvent::*` from the per-peer reader to the
/// `TorrentActor`. The hot path is `PieceBlocksBatch`, which fires per
/// 16 KiB block — about 6,000 events/sec at 100 MB/s. 32 slots give
/// roughly 5 ms of stall tolerance before disconnect, after which the
/// `TorrentActor` is genuinely too slow and we'd rather drop the peer
/// than back-pressure all peers on this connection.
pub const EVENT_BACKLOG_CAP: usize = 32;

/// Outcome of a [`BackpressureQueue`] operation.
#[derive(Debug)]
pub(crate) enum BackpressureError {
    /// Backlog cap exceeded — caller should disconnect the peer
    /// (M137 timeout-on-backpressure semantic).
    QueueFull,
    /// Downstream channel is closed — caller should treat as fatal
    /// shutdown (`crate::Error::Shutdown`).
    Closed,
}

/// Per-channel backpressure helper for the reader loop.
///
/// Pattern:
///
/// ```ignore
/// // From the reader_loop:
/// queue.enqueue_or_send(&dispatch_tx, cmd)?;
///
/// // From the requester_loop, after every recv:
/// shared.dispatch_drain_notify.notify_one();
///
/// // From the reader_loop's outer select! arm:
/// () = shared.dispatch_drain_notify.notified() => {
///     let _ = queue.try_drain_one(&dispatch_tx);
/// }
/// ```
///
/// Owned by the reader task — single-threaded. The drain `Notify`
/// (not stored here) is shared with the consumer.
pub(crate) struct BackpressureQueue<T> {
    backlog: VecDeque<T>,
    cap: usize,
}

impl<T> BackpressureQueue<T> {
    /// Construct with a fixed cap. `cap == 0` would disable
    /// queueing entirely (every `Full` → `QueueFull` immediately); use
    /// the named constants above.
    #[must_use]
    pub fn new(cap: usize) -> Self {
        Self {
            backlog: VecDeque::new(),
            cap,
        }
    }

    /// Send `item` via `tx`, spilling to the local backlog if `tx` is
    /// full.
    ///
    /// Order is preserved: if the backlog is non-empty we always push
    /// to the back (to avoid reordering against earlier-enqueued items
    /// that are still draining).
    ///
    /// # Errors
    ///
    /// - [`BackpressureError::QueueFull`] when the local backlog is at
    ///   `cap` and `tx` is still full — caller should disconnect.
    /// - [`BackpressureError::Closed`] when `tx` is closed — caller
    ///   should treat as fatal shutdown.
    pub fn enqueue_or_send(
        &mut self,
        tx: &mpsc::Sender<T>,
        item: T,
    ) -> Result<(), BackpressureError> {
        if !self.backlog.is_empty() {
            // Don't try_send while items are queued — would reorder.
            if self.backlog.len() >= self.cap {
                return Err(BackpressureError::QueueFull);
            }
            self.backlog.push_back(item);
            return Ok(());
        }
        match tx.try_send(item) {
            Ok(()) => Ok(()),
            Err(mpsc::error::TrySendError::Full(item)) => {
                if self.backlog.len() >= self.cap {
                    return Err(BackpressureError::QueueFull);
                }
                self.backlog.push_back(item);
                Ok(())
            }
            Err(mpsc::error::TrySendError::Closed(_)) => Err(BackpressureError::Closed),
        }
    }

    /// Try to drain one queued item into `tx`. Returns whether anything
    /// was sent.
    ///
    /// Used from the reader's outer `select!` arm gated on the drain
    /// `Notify`. If `tx` is still full, the item stays at the front of
    /// the backlog (no reordering).
    ///
    /// # Errors
    ///
    /// - [`BackpressureError::Closed`] when `tx` is closed.
    pub fn try_drain_one(&mut self, tx: &mpsc::Sender<T>) -> Result<bool, BackpressureError> {
        let Some(front) = self.backlog.pop_front() else {
            return Ok(false);
        };
        match tx.try_send(front) {
            Ok(()) => Ok(true),
            Err(mpsc::error::TrySendError::Full(item)) => {
                self.backlog.push_front(item);
                Ok(false)
            }
            Err(mpsc::error::TrySendError::Closed(_)) => Err(BackpressureError::Closed),
        }
    }

    /// Whether anything is currently queued (drives the
    /// `tick_notify`-arm gating in `reader_loop`).
    #[must_use]
    pub fn has_pending(&self) -> bool {
        !self.backlog.is_empty()
    }

    /// Current backlog depth (test/diagnostic only).
    #[must_use]
    #[allow(dead_code)]
    pub fn len(&self) -> usize {
        self.backlog.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn empty_queue_sends_directly() {
        let (tx, mut rx) = mpsc::channel::<u32>(4);
        let mut q = BackpressureQueue::new(8);
        q.enqueue_or_send(&tx, 42).expect("send");
        assert!(!q.has_pending());
        assert_eq!(rx.recv().await, Some(42));
    }

    #[tokio::test]
    async fn full_channel_spills_to_backlog() {
        let (tx, mut rx) = mpsc::channel::<u32>(2);
        let mut q = BackpressureQueue::new(8);
        // Fill the channel.
        q.enqueue_or_send(&tx, 1).expect("send 1");
        q.enqueue_or_send(&tx, 2).expect("send 2");
        // Channel is full, this spills.
        q.enqueue_or_send(&tx, 3).expect("spill 3");
        q.enqueue_or_send(&tx, 4).expect("spill 4");
        assert_eq!(q.len(), 2);

        // Drain the channel — items 1 and 2 should arrive in order.
        assert_eq!(rx.recv().await, Some(1));
        assert_eq!(rx.recv().await, Some(2));

        // Now drain the backlog.
        assert!(q.try_drain_one(&tx).expect("drain 3"));
        assert!(q.try_drain_one(&tx).expect("drain 4"));
        assert!(!q.has_pending());
        assert_eq!(rx.recv().await, Some(3));
        assert_eq!(rx.recv().await, Some(4));
    }

    #[tokio::test]
    async fn cap_exceeded_returns_queue_full() {
        let (tx, _rx) = mpsc::channel::<u32>(1);
        let mut q = BackpressureQueue::new(2);
        // Fill channel + cap.
        q.enqueue_or_send(&tx, 1).expect("send 1");
        q.enqueue_or_send(&tx, 2).expect("spill 2");
        q.enqueue_or_send(&tx, 3).expect("spill 3");
        // Queue is at cap (2), channel is full → next must error.
        let err = q.enqueue_or_send(&tx, 4).expect_err("at cap, must error");
        assert!(matches!(err, BackpressureError::QueueFull));
    }

    #[tokio::test]
    async fn closed_channel_returns_closed_error() {
        let (tx, rx) = mpsc::channel::<u32>(1);
        drop(rx);
        let mut q = BackpressureQueue::new(8);
        let err = q.enqueue_or_send(&tx, 42).expect_err("closed channel");
        assert!(matches!(err, BackpressureError::Closed));
    }

    #[tokio::test]
    async fn order_preserved_with_backlog() {
        // Even when the channel has free space, if the backlog is
        // non-empty we must keep pushing to the back to preserve order.
        let (tx, mut rx) = mpsc::channel::<u32>(4);
        let mut q = BackpressureQueue::new(8);
        // Force backlog by filling channel temporarily.
        q.enqueue_or_send(&tx, 1).expect("send 1");
        q.enqueue_or_send(&tx, 2).expect("send 2");
        q.enqueue_or_send(&tx, 3).expect("send 3");
        q.enqueue_or_send(&tx, 4).expect("send 4");
        q.enqueue_or_send(&tx, 5).expect("spill 5");

        // Drain the receiver — 1..=4 arrive directly from the channel.
        for expected in 1..=4 {
            assert_eq!(rx.recv().await, Some(expected));
        }

        // Now even though the channel is empty, item 6 must spill
        // because 5 is still in the backlog (order matters).
        q.enqueue_or_send(&tx, 6).expect("spill 6");
        assert_eq!(q.len(), 2);

        // Drain the backlog.
        assert!(q.try_drain_one(&tx).expect("drain 5"));
        assert!(q.try_drain_one(&tx).expect("drain 6"));
        assert_eq!(rx.recv().await, Some(5));
        assert_eq!(rx.recv().await, Some(6));
    }

    #[tokio::test]
    async fn drain_when_channel_still_full_returns_false() {
        let (tx, _rx) = mpsc::channel::<u32>(1);
        let mut q = BackpressureQueue::new(8);
        q.enqueue_or_send(&tx, 1).expect("send 1");
        q.enqueue_or_send(&tx, 2).expect("spill 2");
        // Channel still at cap, drain fails (false), item stays queued.
        assert!(!q.try_drain_one(&tx).expect("drain attempt"));
        assert_eq!(q.len(), 1);
    }

    #[tokio::test]
    async fn cap_zero_immediately_overflows_on_full_channel() {
        // Degenerate but valid: cap=0 means "spill is not allowed —
        // any Full from the underlying channel is a fatal stall."
        // This isn't used in production (we use the named consts) but
        // the helper must behave sensibly for it.
        let (tx, _rx) = mpsc::channel::<u32>(1);
        let mut q = BackpressureQueue::new(0);
        q.enqueue_or_send(&tx, 1)
            .expect("first send fits in channel");
        let err = q.enqueue_or_send(&tx, 2).expect_err("cap=0 must overflow");
        assert!(matches!(err, BackpressureError::QueueFull));
    }

    #[tokio::test]
    async fn drain_on_empty_backlog_is_noop() {
        let (tx, _rx) = mpsc::channel::<u32>(4);
        let mut q = BackpressureQueue::<u32>::new(8);
        // No-op: nothing to drain.
        assert!(!q.try_drain_one(&tx).expect("drain on empty backlog"));
        assert!(!q.has_pending());
    }

    #[tokio::test]
    async fn drain_propagates_closed_channel() {
        let (tx, rx) = mpsc::channel::<u32>(1);
        let mut q = BackpressureQueue::new(8);
        // Force backlog: fill channel + spill one.
        q.enqueue_or_send(&tx, 1).expect("send 1");
        q.enqueue_or_send(&tx, 2).expect("spill 2");
        // Close the receiver — drain should propagate Closed.
        drop(rx);
        let err = q.try_drain_one(&tx).expect_err("closed channel");
        assert!(matches!(err, BackpressureError::Closed));
    }

    #[tokio::test]
    async fn has_pending_tracks_backlog_state() {
        let (tx, mut rx) = mpsc::channel::<u32>(1);
        let mut q = BackpressureQueue::new(8);
        assert!(!q.has_pending());
        q.enqueue_or_send(&tx, 1).expect("send 1");
        assert!(!q.has_pending(), "in-channel send doesn't add to backlog");
        q.enqueue_or_send(&tx, 2).expect("spill 2");
        assert!(q.has_pending(), "spilled item is pending");
        // Drain channel.
        let _ = rx.recv().await;
        // Drain backlog.
        assert!(q.try_drain_one(&tx).expect("drain"));
        assert!(!q.has_pending(), "backlog drained");
    }
}