1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//! Zero-lock shared state and message types for the three per-peer tasks.
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32};
use bytes::Bytes;
use tokio::sync::{Notify, Semaphore};
use irontide_core::Lengths;
use irontide_storage::Bitfield;
use irontide_wire::Message;
use crate::piece_reservation::{
AtomicPieceStates, AvailabilitySnapshot, BlockMaps, PieceWriteGuards, StealCandidates,
};
/// Initial pipeline depth — matches `INITIAL_QUEUE_DEPTH` in `torrent_peer_handler`.
///
/// Permits are **not** granted at construction; they are added when the peer
/// unchokes us (via `semaphore.add_permits(INITIAL_QUEUE_DEPTH)`).
pub(crate) const INITIAL_QUEUE_DEPTH: usize = 128;
/// Zero-lock shared state between the three per-peer tasks.
///
/// Owned by `Arc<PeerShared>` — one instance per peer connection, shared among
/// the reader, writer, and requester tasks.
pub(crate) struct PeerShared {
/// Pipeline depth gating — `INITIAL_QUEUE_DEPTH` permits.
/// Requester acquires, reader returns via `add_permits(1)`.
pub semaphore: Arc<Semaphore>,
/// Set by reader on Choke/Unchoke messages. Read by requester.
pub peer_choking: AtomicBool,
/// Reader notifies requester when unchoked.
pub unchoke_notify: Notify,
/// Peer socket address (for logging and events).
pub addr: SocketAddr,
/// Number of in-flight block requests (incremented by requester on send,
/// decremented by reader on Piece/RejectRequest, reset to 0 on Choke).
/// Shared with `TorrentActor` via `PeerState` for accurate diagnostics.
pub in_flight: Arc<AtomicU32>,
/// M149: Dynamic pipeline depth target — requester gates on
/// `in_flight < target_depth` before acquiring semaphore.
pub target_depth: Arc<AtomicU32>,
/// M149: Reader fires this when `in_flight` decrements (Piece/Reject/Choke),
/// waking the requester's depth gate immediately instead of waiting for
/// the 1s safety-net tick or TorrentActor event processing.
pub depth_notify: Notify,
}
impl PeerShared {
/// Create new shared state for a peer connection.
///
/// The semaphore starts with **zero** permits — permits are added when
/// the remote peer sends an Unchoke message. `peer_choking` starts
/// `true` (BitTorrent specification default).
pub fn new(addr: SocketAddr, in_flight: Arc<AtomicU32>, target_depth: Arc<AtomicU32>) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(0)),
peer_choking: AtomicBool::new(true),
unchoke_notify: Notify::new(),
addr,
in_flight,
target_depth,
depth_notify: Notify::new(),
}
}
}
/// Messages sent through the writer channel from requester and reader.
pub(crate) enum OutgoingMessage {
// -- From requester (hot path) --
/// Request a block from the peer.
Request {
/// Piece index.
index: u32,
/// Byte offset within the piece.
begin: u32,
/// Block length in bytes.
length: u32,
},
// -- From reader/handler --
/// Announce that we have a piece.
Have {
/// Piece index.
index: u32,
},
/// Cancel a previously-sent request.
Cancel {
/// Piece index.
index: u32,
/// Byte offset within the piece.
begin: u32,
/// Block length in bytes.
length: u32,
},
/// Keep-alive (zero-length message).
Keepalive,
/// Express interest in the peer's pieces.
Interested,
/// Withdraw interest.
NotInterested,
/// Unchoke the remote peer.
Unchoke,
/// Choke the remote peer.
Choke,
// -- Extension and upload messages (cold path, heap-allocated) --
/// Arbitrary wire message (extensions, upload pieces, etc.).
Wire(Message<Bytes>),
}
/// Commands from reader to requester for dispatch state management.
pub(crate) enum DispatchCommand {
/// Transition to requesting state — provides all dispatch resources.
Start {
/// Atomic piece states for CAS-based reservation.
atomic_states: Arc<AtomicPieceStates>,
/// Rarest-first availability snapshot.
snapshot: Arc<AvailabilitySnapshot>,
/// Per-block request/received tracking (if block stealing enabled).
block_maps: Option<Arc<BlockMaps>>,
/// Shared steal candidate queue (if block stealing enabled).
steal_candidates: Option<Arc<StealCandidates>>,
/// Per-piece write guards (prevents steal/write races).
piece_write_guards: Option<Arc<PieceWriteGuards>>,
/// Piece arithmetic (piece sizes, offsets, chunk size).
lengths: Lengths,
/// Copy of the peer's bitfield.
peer_bitfield: Bitfield,
/// Notification channel for new piece availability.
piece_notify: Arc<Notify>,
},
/// Updated rarest-first snapshot from `TorrentActor`.
Snapshot(Arc<AvailabilitySnapshot>),
/// Peer sent Have — update local bitfield copy.
PeerHave(u32),
/// Peer sent Bitfield — replace local copy.
PeerBitfield(Bitfield),
/// Stop requesting — handled by requester_loop, constructed in tests
/// and future production use (graceful shutdown).
#[allow(dead_code)]
Stop,
}