1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
//! Zero-lock shared state and message types for the three per-peer tasks.
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::time::Instant;
use bytes::Bytes;
use parking_lot::Mutex;
use tokio::sync::{Notify, Semaphore};
use irontide_core::Lengths;
use irontide_storage::Bitfield;
use irontide_wire::Message;
use crate::piece_reservation::{
AtomicPieceStates, AvailabilitySnapshot, BlockMaps, PieceWriteGuards, StealCandidates,
};
/// Initial pipeline depth — matches `INITIAL_QUEUE_DEPTH` in `torrent_peer_handler`.
///
/// Permits are **not** granted at construction; they are added when the peer
/// unchokes us (via `semaphore.add_permits(INITIAL_QUEUE_DEPTH)`).
pub(crate) const INITIAL_QUEUE_DEPTH: usize = 128;
/// Zero-lock shared state between the three per-peer tasks.
///
/// Owned by `Arc<PeerShared>` — one instance per peer connection, shared among
/// the reader, writer, and requester tasks.
pub(crate) struct PeerShared {
/// Pipeline depth gating — `INITIAL_QUEUE_DEPTH` permits.
/// Requester acquires, reader returns via `add_permits(1)`.
pub semaphore: Arc<Semaphore>,
/// Set by reader on Choke/Unchoke messages. Read by requester.
pub peer_choking: AtomicBool,
/// Reader notifies requester when unchoked.
pub unchoke_notify: Notify,
/// Peer socket address (for logging and events).
pub addr: SocketAddr,
/// Number of in-flight block requests (incremented by requester on send,
/// decremented by reader on Piece/RejectRequest, reset to 0 on Choke).
/// Shared with `TorrentActor` via `PeerState` for accurate diagnostics.
pub in_flight: Arc<AtomicU32>,
#[allow(
dead_code,
reason = "shared Arc read via PeerState for debug/state API"
)]
pub target_depth: Arc<AtomicU32>,
/// M182: pinged by `requester_loop` after every `dispatch_rx.recv()`.
/// `reader_loop` waits on this in a select arm and re-runs
/// `BackpressureQueue::try_drain_one(&dispatch_tx)` so a `dispatch_tx`
/// burst never parks ARM 1 (the wire reader). Single-waiter,
/// `notify_one`. See M182 audit §4.
///
/// Internal to the per-peer task set (reader + requester) — kept as
/// a plain `Notify` because both endpoints are reached through the
/// same `Arc<PeerShared>`.
pub dispatch_drain_notify: Notify,
/// M182: pinged by `TorrentActor` after every `event_rx.recv()` for
/// this peer. Same role for `event_tx` as `dispatch_drain_notify`
/// for `dispatch_tx`, but cross-actor — `TorrentActor` holds a
/// clone of this `Arc` on `PeerState.event_drain_notify` (the same
/// `Arc` is also stored here so the reader can wait on it).
pub event_drain_notify: Arc<Notify>,
/// Sim-perf engine surface: shared session counters used by
/// `reader_loop` to track per-peer high-water and drain stats. In
/// production cloned from the `SessionActor`'s counters; in test
/// code created on demand via `SessionCounters::new()` (cheap; not
/// asserted on).
pub counters: Arc<crate::stats::SessionCounters>,
/// M182 dispatch-channel reader-side spill cap (default
/// [`crate::peer_backpressure::DISPATCH_BACKLOG_CAP`]). The
/// would-have-caught harness lowers this to reproduce the M182
/// backlog-too-small regression class.
pub dispatch_backlog_cap: usize,
/// M182 event-channel reader-side spill cap (default
/// [`crate::peer_backpressure::EVENT_BACKLOG_CAP`]).
pub event_backlog_cap: usize,
/// When the remote peer last unchoked us. Set in `on_unchoke`, cleared
/// in `on_choke`. Used to measure unchoke duration and first-block
/// latency for the `target_depth` feedback loop investigation.
pub remote_unchoked_at: Mutex<Option<Instant>>,
/// Set `true` on remote unchoke, cleared on the first Piece message
/// received afterward. Gates the first-block latency measurement so
/// it fires exactly once per unchoke cycle.
pub first_block_pending: AtomicBool,
}
impl PeerShared {
/// Create new shared state for a peer connection.
///
/// The semaphore starts with **zero** permits — permits are added when
/// the remote peer sends an Unchoke message. `peer_choking` starts
/// `true` (`BitTorrent` specification default).
///
/// `event_drain_notify` is created at the peer spawn site so the
/// `TorrentActor` can hold a clone alongside the
/// `Arc<PeerShared>` on `PeerState`. Both refer to the same `Notify`.
///
/// This shorthand uses fresh per-peer [`SessionCounters`] —
/// production paths should prefer [`Self::new_with_counters`] so the
/// sim-perf gauges aggregate across every `reader_loop` in the session.
#[allow(
dead_code,
reason = "test helper used in `peer_tasks::tests` and `torrent_peer_handler::tests`"
)]
pub fn new(
addr: SocketAddr,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
event_drain_notify: Arc<Notify>,
) -> Self {
Self::new_with_counters(
addr,
in_flight,
target_depth,
event_drain_notify,
Arc::new(crate::stats::SessionCounters::new()),
)
}
/// Like [`Self::new`] but uses the caller-supplied counters Arc.
/// Production call sites pass the `SessionActor`'s counters so the
/// sim-perf surface (high-water gauges + drain counters) reflects
/// real session state.
pub fn new_with_counters(
addr: SocketAddr,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
event_drain_notify: Arc<Notify>,
counters: Arc<crate::stats::SessionCounters>,
) -> Self {
Self::new_with_loop_config(
addr,
in_flight,
target_depth,
event_drain_notify,
counters,
crate::peer_backpressure::DISPATCH_BACKLOG_CAP,
crate::peer_backpressure::EVENT_BACKLOG_CAP,
)
}
/// Full constructor — also accepts the M182 spill caps and
/// harness's would-have-caught regression demos to reproduce
/// historical bugs (M182 cap=2, v0.186.1 fatal-overflow).
#[allow(clippy::too_many_arguments)]
pub fn new_with_loop_config(
addr: SocketAddr,
in_flight: Arc<AtomicU32>,
target_depth: Arc<AtomicU32>,
event_drain_notify: Arc<Notify>,
counters: Arc<crate::stats::SessionCounters>,
dispatch_backlog_cap: usize,
event_backlog_cap: usize,
) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(0)),
peer_choking: AtomicBool::new(true),
unchoke_notify: Notify::new(),
addr,
in_flight,
target_depth,
dispatch_drain_notify: Notify::new(),
event_drain_notify,
counters,
dispatch_backlog_cap,
event_backlog_cap,
remote_unchoked_at: Mutex::new(None),
first_block_pending: AtomicBool::new(false),
}
}
}
/// Messages sent through the writer channel from requester and reader.
pub(crate) enum OutgoingMessage {
// -- From requester (hot path) --
/// Request a block from the peer.
Request {
/// Piece index.
index: u32,
/// Byte offset within the piece.
begin: u32,
/// Block length in bytes.
length: u32,
},
// -- From reader/handler --
/// Announce that we have a piece.
Have {
/// Piece index.
index: u32,
},
/// Cancel a previously-sent request.
Cancel {
/// Piece index.
index: u32,
/// Byte offset within the piece.
begin: u32,
/// Block length in bytes.
length: u32,
},
/// Keep-alive (zero-length message).
Keepalive,
/// Express interest in the peer's pieces.
Interested,
/// Withdraw interest.
NotInterested,
/// Unchoke the remote peer.
Unchoke,
/// Choke the remote peer.
Choke,
// -- Extension and upload messages (cold path, heap-allocated) --
/// Arbitrary wire message (extensions, upload pieces, etc.).
Wire(Message<Bytes>),
}
/// Commands from reader to requester for dispatch state management.
#[allow(dead_code)]
pub(crate) enum DispatchCommand {
/// Transition to requesting state — provides dispatch resources.
Start {
// -- New M187 fields (always present) --
/// Piece arithmetic (piece sizes, offsets, chunk size).
lengths: Lengths,
/// Copy of the peer's bitfield (kept for Have/Bitfield updates).
peer_bitfield: Bitfield,
/// Notification channel for new piece availability.
piece_notify: Arc<Notify>,
// -- Old CAS fields (only populated when use_actor_dispatch=false) --
/// Shared atomic piece states for CAS reservation.
atomic_states: Option<Arc<AtomicPieceStates>>,
/// Initial availability snapshot for CAS dispatch.
snapshot: Option<Arc<AvailabilitySnapshot>>,
/// Block-level request/receipt tracking for steal visibility.
block_maps: Option<Arc<BlockMaps>>,
/// Queue of pieces available for block-level stealing.
steal_candidates: Option<Arc<StealCandidates>>,
/// Per-piece write guards for steal/write race prevention.
piece_write_guards: Option<Arc<PieceWriteGuards>>,
},
/// Peer sent Have — update local bitfield copy.
PeerHave(u32),
/// Peer sent Bitfield — replace local copy.
PeerBitfield(Bitfield),
/// Updated availability snapshot (CAS path only).
Snapshot(Arc<AvailabilitySnapshot>),
/// Stop requesting — handled by `requester_loop`, constructed in tests
/// and future production use (graceful shutdown).
Stop,
}