1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
//! `PeerConnection` — per-peer task coordinator.
//!
//! M129: Two-future-plus-writer architecture (matches rqbit's proven model).
//! The requester and reader run as inline futures in a single `select!` (same
//! tokio task, same worker thread — zero cross-thread hops on the hot path).
//! Only the writer is `tokio::spawn()`'d as a separate task, fed by an mpsc
//! channel.
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use tokio::io::AsyncWrite;
use tokio::sync::{Notify, mpsc};
use tracing::debug;
use crate::peer_codec::{PeerReader, PeerWriter};
use crate::peer_shared::{DispatchCommand, OutgoingMessage, PeerShared};
use crate::peer_tasks::{reader_loop, requester_loop, writer_loop};
use crate::piece_reservation::PieceOrderMap;
use crate::torrent_peer_handler::PeerMessageHandler;
use crate::types::{PeerCommand, PeerEvent};
use crate::vectored_io::AsyncReadVectored;
/// Dispatch channel capacity — reader to requester command forwarding.
const DISPATCH_CHANNEL_CAPACITY: usize = 16;
/// Per-peer connection coordinator.
///
/// Runs the requester and reader as inline futures in a single `select!`
/// (same tokio task), and spawns only the writer as a separate task.
/// This avoids the cross-thread context switches that `tokio::spawn()` for
/// all three would cause (~2 hops per 16 KiB block × 93K blocks = 186K
/// unnecessary context switches).
pub(crate) struct PeerConnection<R, W> {
handler: PeerMessageHandler,
reader: PeerReader<R>,
writer: PeerWriter<W>,
cmd_rx: mpsc::Receiver<PeerCommand>,
event_tx: mpsc::Sender<PeerEvent>,
have_broadcast_rx: tokio::sync::broadcast::Receiver<u32>,
/// v0.173.4 (prong 1): receiver for the latest `Arc<AvailabilitySnapshot>`
/// published by `TorrentActor.watch_tx`. Threaded through to the reader
/// task; the writer/requester see snapshots only via dispatch commands.
order_map_rx: tokio::sync::watch::Receiver<Arc<PieceOrderMap>>,
addr: SocketAddr,
/// Shared in-flight counter — created at the spawn site and shared with
/// `PeerState` so diagnostics are available immediately.
in_flight: Arc<AtomicU32>,
/// M149: Dynamic pipeline depth target — shared with `PeerState` for
/// `update_peer_rates()` writes and with `PeerShared` for requester reads.
target_depth: Arc<AtomicU32>,
/// M133: Seconds without any wire message before disconnecting (0 = disabled).
read_timeout_secs: u64,
/// M133: Seconds before a stalled outgoing write disconnects (0 = disabled).
write_timeout_secs: u64,
/// M137: Seconds without Piece data before disconnecting (0 = disabled).
data_contribution_timeout_secs: u64,
/// Per-torrent download rate limit bucket (shared across all peers).
download_bucket: Option<Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>>,
// M182: cross-actor wake for the reader's `event_tx` backpressure
// queue. Stored on `PeerShared` once `run` constructs it. The same
// `Arc` is held by `PeerState.event_drain_notify` so `TorrentActor`
// can ping it after consuming events for this peer.
event_drain_notify: Arc<Notify>,
// Sim-perf: session-level counters for high-water + drain stats.
// Stored on `PeerShared` so `reader_loop` can update them.
counters: Arc<crate::stats::SessionCounters>,
/// M182 backpressure caps + v0.186.1 fatal-overflow toggle.
dispatch_backlog_cap: usize,
event_backlog_cap: usize,
/// M187 A/B: use actor-centralised dispatch (true) or per-peer CAS (false).
use_actor_dispatch: bool,
}
impl<R, W> PeerConnection<R, W>
where
R: AsyncReadVectored + Send,
W: AsyncWrite + Unpin + Send + 'static,
{
/// Construct a new connection.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
handler: PeerMessageHandler,
reader: PeerReader<R>,
writer: PeerWriter<W>,
cmd_rx: mpsc::Receiver<PeerCommand>,
event_tx: mpsc::Sender<PeerEvent>,
have_broadcast_rx: tokio::sync::broadcast::Receiver<u32>,
order_map_rx: tokio::sync::watch::Receiver<Arc<PieceOrderMap>>,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
read_timeout_secs: u64,
write_timeout_secs: u64,
data_contribution_timeout_secs: u64,
download_bucket: Option<Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>>,
event_drain_notify: Arc<Notify>,
counters: Arc<crate::stats::SessionCounters>,
dispatch_backlog_cap: usize,
event_backlog_cap: usize,
use_actor_dispatch: bool,
) -> Self {
let addr = handler.addr();
Self {
handler,
reader,
writer,
cmd_rx,
event_tx,
have_broadcast_rx,
order_map_rx,
addr,
in_flight,
target_depth,
read_timeout_secs,
write_timeout_secs,
data_contribution_timeout_secs,
download_bucket,
event_drain_notify,
counters,
dispatch_backlog_cap,
event_backlog_cap,
use_actor_dispatch,
}
}
/// Run the per-peer pipeline until any component exits.
///
/// The requester and reader run as inline futures in a single `select!`
/// on the current tokio task. Only the writer is spawned separately.
/// When any future completes, the writer task is aborted.
///
/// The reader calls `on_disconnect()` on its normal exit paths. If the
/// requester exits first (channel error), the reader will also exit
/// shortly via channel closure cascade.
pub(crate) async fn run(self) -> crate::Result<()> {
let shared = Arc::new(PeerShared::new_with_loop_config(
self.addr,
self.in_flight,
self.target_depth,
self.event_drain_notify,
self.counters,
self.dispatch_backlog_cap,
self.event_backlog_cap,
));
// Build inter-task channels.
// Unbounded: the semaphore gates requester sends (128 max), Have
// broadcasts are bounded by piece count, response messages are TCP-rate-
// limited, and command-channel messages are bounded by the cmd_tx channel.
let (writer_tx, writer_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
let (dispatch_tx, dispatch_rx) =
mpsc::channel::<DispatchCommand>(DISPATCH_CHANNEL_CAPACITY);
// Spawn ONLY the writer as a separate task (it owns the TCP write half).
let wrt = tokio::spawn(writer_loop(self.writer, writer_rx, self.write_timeout_secs));
let wrt_abort = wrt.abort_handle();
// Run requester + reader as inline futures in select! (same task, same thread).
// This is the rqbit-proven model: no cross-thread hops on the hot path.
let result = tokio::select! {
r = requester_loop(
Arc::clone(&shared),
dispatch_rx,
writer_tx.clone(),
self.download_bucket,
self.event_tx.clone(),
self.use_actor_dispatch,
) => {
debug!(addr = %self.addr, "requester exited");
Ok(r)
}
r = reader_loop(
Arc::clone(&shared),
self.reader,
self.handler,
self.event_tx,
writer_tx,
self.cmd_rx,
dispatch_tx,
self.have_broadcast_rx,
self.order_map_rx,
self.read_timeout_secs,
self.data_contribution_timeout_secs,
) => {
debug!(addr = %self.addr, "reader exited");
Ok(r)
}
r = wrt => {
debug!(addr = %self.addr, "writer exited");
r // JoinHandle<Result<()>>
}
};
// Abort the writer task if it's still running.
wrt_abort.abort();
// Unwrap the result.
match result {
Ok(inner) => inner,
Err(join_err) if join_err.is_cancelled() => Ok(()),
Err(join_err) => {
// Writer task panicked — propagate.
std::panic::resume_unwind(join_err.into_panic());
}
}
}
}