irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
//! `PeerConnection` — per-peer task coordinator.
//!
//! M129: Two-future-plus-writer architecture (matches rqbit's proven model).
//! The requester and reader run as inline futures in a single `select!` (same
//! tokio task, same worker thread — zero cross-thread hops on the hot path).
//! Only the writer is `tokio::spawn()`'d as a separate task, fed by an mpsc
//! channel.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;

use tokio::io::AsyncWrite;
use tokio::sync::{Notify, mpsc};
use tracing::debug;

use crate::peer_codec::{PeerReader, PeerWriter};
use crate::peer_shared::{DispatchCommand, OutgoingMessage, PeerShared};
use crate::peer_tasks::{reader_loop, requester_loop, writer_loop};
use crate::piece_reservation::PieceOrderMap;
use crate::torrent_peer_handler::PeerMessageHandler;
use crate::types::{PeerCommand, PeerEvent};
use crate::vectored_io::AsyncReadVectored;

/// Dispatch channel capacity — reader to requester command forwarding.
const DISPATCH_CHANNEL_CAPACITY: usize = 16;

/// Per-peer connection coordinator.
///
/// Runs the requester and reader as inline futures in a single `select!`
/// (same tokio task), and spawns only the writer as a separate task.
/// This avoids the cross-thread context switches that `tokio::spawn()` for
/// all three would cause (~2 hops per 16 KiB block × 93K blocks = 186K
/// unnecessary context switches).
pub(crate) struct PeerConnection<R, W> {
    handler: PeerMessageHandler,
    reader: PeerReader<R>,
    writer: PeerWriter<W>,
    cmd_rx: mpsc::Receiver<PeerCommand>,
    event_tx: mpsc::Sender<PeerEvent>,
    have_broadcast_rx: tokio::sync::broadcast::Receiver<u32>,
    /// v0.173.4 (prong 1): receiver for the latest `Arc<AvailabilitySnapshot>`
    /// published by `TorrentActor.watch_tx`. Threaded through to the reader
    /// task; the writer/requester see snapshots only via dispatch commands.
    order_map_rx: tokio::sync::watch::Receiver<Arc<PieceOrderMap>>,
    addr: SocketAddr,
    /// Shared in-flight counter — created at the spawn site and shared with
    /// `PeerState` so diagnostics are available immediately.
    in_flight: Arc<AtomicU32>,
    /// M149: Dynamic pipeline depth target — shared with `PeerState` for
    /// `update_peer_rates()` writes and with `PeerShared` for requester reads.
    target_depth: Arc<AtomicU32>,
    /// M133: Seconds without any wire message before disconnecting (0 = disabled).
    read_timeout_secs: u64,
    /// M133: Seconds before a stalled outgoing write disconnects (0 = disabled).
    write_timeout_secs: u64,
    /// M137: Seconds without Piece data before disconnecting (0 = disabled).
    data_contribution_timeout_secs: u64,
    /// Per-torrent download rate limit bucket (shared across all peers).
    download_bucket: Option<Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>>,
    // M182: cross-actor wake for the reader's `event_tx` backpressure
    // queue. Stored on `PeerShared` once `run` constructs it. The same
    // `Arc` is held by `PeerState.event_drain_notify` so `TorrentActor`
    // can ping it after consuming events for this peer.
    event_drain_notify: Arc<Notify>,
    // Sim-perf: session-level counters for high-water + drain stats.
    // Stored on `PeerShared` so `reader_loop` can update them.
    counters: Arc<crate::stats::SessionCounters>,
    /// M182 backpressure caps + v0.186.1 fatal-overflow toggle.
    dispatch_backlog_cap: usize,
    event_backlog_cap: usize,
    /// M187 A/B: use actor-centralised dispatch (true) or per-peer CAS (false).
    use_actor_dispatch: bool,
}

impl<R, W> PeerConnection<R, W>
where
    R: AsyncReadVectored + Send,
    W: AsyncWrite + Unpin + Send + 'static,
{
    /// Construct a new connection.
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn new(
        handler: PeerMessageHandler,
        reader: PeerReader<R>,
        writer: PeerWriter<W>,
        cmd_rx: mpsc::Receiver<PeerCommand>,
        event_tx: mpsc::Sender<PeerEvent>,
        have_broadcast_rx: tokio::sync::broadcast::Receiver<u32>,
        order_map_rx: tokio::sync::watch::Receiver<Arc<PieceOrderMap>>,
        in_flight: Arc<AtomicU32>,
        target_depth: Arc<AtomicU32>,
        read_timeout_secs: u64,
        write_timeout_secs: u64,
        data_contribution_timeout_secs: u64,
        download_bucket: Option<Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>>,
        event_drain_notify: Arc<Notify>,
        counters: Arc<crate::stats::SessionCounters>,
        dispatch_backlog_cap: usize,
        event_backlog_cap: usize,
        use_actor_dispatch: bool,
    ) -> Self {
        let addr = handler.addr();
        Self {
            handler,
            reader,
            writer,
            cmd_rx,
            event_tx,
            have_broadcast_rx,
            order_map_rx,
            addr,
            in_flight,
            target_depth,
            read_timeout_secs,
            write_timeout_secs,
            data_contribution_timeout_secs,
            download_bucket,
            event_drain_notify,
            counters,
            dispatch_backlog_cap,
            event_backlog_cap,
            use_actor_dispatch,
        }
    }

    /// Run the per-peer pipeline until any component exits.
    ///
    /// The requester and reader run as inline futures in a single `select!`
    /// on the current tokio task. Only the writer is spawned separately.
    /// When any future completes, the writer task is aborted.
    ///
    /// The reader calls `on_disconnect()` on its normal exit paths. If the
    /// requester exits first (channel error), the reader will also exit
    /// shortly via channel closure cascade.
    pub(crate) async fn run(self) -> crate::Result<()> {
        let shared = Arc::new(PeerShared::new_with_loop_config(
            self.addr,
            self.in_flight,
            self.target_depth,
            self.event_drain_notify,
            self.counters,
            self.dispatch_backlog_cap,
            self.event_backlog_cap,
        ));

        // Build inter-task channels.
        // Unbounded: the semaphore gates requester sends (128 max), Have
        // broadcasts are bounded by piece count, response messages are TCP-rate-
        // limited, and command-channel messages are bounded by the cmd_tx channel.
        let (writer_tx, writer_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
        let (dispatch_tx, dispatch_rx) =
            mpsc::channel::<DispatchCommand>(DISPATCH_CHANNEL_CAPACITY);

        // Spawn ONLY the writer as a separate task (it owns the TCP write half).
        let wrt = tokio::spawn(writer_loop(self.writer, writer_rx, self.write_timeout_secs));
        let wrt_abort = wrt.abort_handle();

        // Run requester + reader as inline futures in select! (same task, same thread).
        // This is the rqbit-proven model: no cross-thread hops on the hot path.
        let result = tokio::select! {
            r = requester_loop(
                Arc::clone(&shared),
                dispatch_rx,
                writer_tx.clone(),
                self.download_bucket,
                self.event_tx.clone(),
                self.use_actor_dispatch,
            ) => {
                debug!(addr = %self.addr, "requester exited");
                Ok(r)
            }
            r = reader_loop(
                Arc::clone(&shared),
                self.reader,
                self.handler,
                self.event_tx,
                writer_tx,
                self.cmd_rx,
                dispatch_tx,
                self.have_broadcast_rx,
                self.order_map_rx,
                self.read_timeout_secs,
                self.data_contribution_timeout_secs,
            ) => {
                debug!(addr = %self.addr, "reader exited");
                Ok(r)
            }
            r = wrt => {
                debug!(addr = %self.addr, "writer exited");
                r // JoinHandle<Result<()>>
            }
        };

        // Abort the writer task if it's still running.
        wrt_abort.abort();

        // Unwrap the result.
        match result {
            Ok(inner) => inner,
            Err(join_err) if join_err.is_cancelled() => Ok(()),
            Err(join_err) => {
                // Writer task panicked — propagate.
                std::panic::resume_unwind(join_err.into_panic());
            }
        }
    }
}