net/adapter/net/subprotocol/stream_window.rs
1//! Stream window subprotocol — receiver → sender credit grants.
2//!
3//! Ships over `SUBPROTOCOL_STREAM_WINDOW` on existing encrypted
4//! sessions. Each grant carries the receiver's **absolute**
5//! cumulative bytes-consumed count on the named stream; the sender
6//! reconciles its credit state from that authoritative value.
7//!
8//! ## Why absolute, not additive
9//!
10//! An additive grant (`credit_bytes` added to the sender's
11//! remaining credit) permanently strands credit when either a data
12//! packet OR a grant is dropped on the wire. An absolute
13//! `total_consumed` grant is self-healing: every arriving grant
14//! carries the receiver's full accounting, so any single lost
15//! grant is reconciled by the next one. Lost data packets leave
16//! `tx_bytes_sent - total_consumed` elevated until recovery
17//! (retransmit for `Reliable`, stream reset for `FireAndForget`),
18//! but the sender's credit view converges exactly when the
19//! receiver's view does.
20//!
21//! Wire layout: 16 bytes per message (`u64 stream_id LE` +
22//! `u64 total_consumed LE`).
23
24use bytes::{Buf, BufMut};
25
26/// Subprotocol ID for stream-window credit grants.
27pub const SUBPROTOCOL_STREAM_WINDOW: u16 = 0x0B00;
28
29/// Subprotocol ID for receiver → sender retransmit NACKs
30/// ([`StreamNack`]). Sibling of the window grant: both are small
31/// receiver-driven control messages about a stream's progress.
32pub const SUBPROTOCOL_STREAM_NACK: u16 = 0x0B01;
33
34/// Subprotocol ID for a sender → receiver stream reset ([`StreamReset`]):
35/// the sender's reliable layer gave up retransmitting a gap, so the
36/// receiver should fail any pending read on this stream now rather than
37/// stall to a timeout (H-3).
38pub const SUBPROTOCOL_STREAM_RESET: u16 = 0x0B02;
39
40/// Fixed wire size of a [`StreamReset`] in bytes.
41pub const STREAM_RESET_SIZE: usize = 8;
42
43/// Fixed wire size in bytes (`stream_id` + `total_consumed` + `ack_seq`).
44pub const STREAM_WINDOW_SIZE: usize = 24;
45
46/// Fixed wire size of a [`StreamNack`] in bytes.
47pub const STREAM_NACK_SIZE: usize = 24;
48
49/// Receiver → sender credit grant. Authoritative: `total_consumed`
50/// is the receiver's cumulative bytes-consumed count on the named
51/// stream since it was opened. The sender uses this to recompute
52/// `tx_credit_remaining = tx_window - (tx_bytes_sent - total_consumed)`,
53/// making the mechanism self-healing against lost grants.
54///
55/// # Consumer-side validation
56///
57/// The codec accepts any `total_consumed: u64`. Pre-fix the doc-
58/// comment's "self-healing" framing implied no further validation
59/// was needed, but the formula
60/// `tx_credit_remaining = tx_window - (tx_bytes_sent - total_consumed)`
61/// underflows if a malformed or hostile peer sends
62/// `total_consumed > tx_bytes_sent`. **The consumer MUST clamp
63/// `total_consumed` to its local `tx_bytes_sent` watermark before
64/// applying.** `StreamState::apply_authoritative_grant`
65/// (`adapter/net/session.rs:1153-1154`) does this today; any
66/// future consumer of this codec must do the same. The codec
67/// layer cannot do the clamp itself because it doesn't know the
68/// sender's local state.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct StreamWindow {
71 /// Stream the grant applies to.
72 pub stream_id: u64,
73 /// Receiver's cumulative consumed-byte count on this stream.
74 ///
75 /// Consumers MUST clamp this to the local `tx_bytes_sent`
76 /// watermark before deriving credit.
77 pub total_consumed: u64,
78 /// Receiver's cumulative reliable ack — the lowest sequence not yet
79 /// contiguously received (`next_expected`). The sender prunes its
80 /// retransmit window of everything below this (H-9). 0 for
81 /// non-reliable receive streams (nothing to prune).
82 pub ack_seq: u64,
83}
84
85/// Errors produced by the codec.
86#[derive(Debug, thiserror::Error)]
87pub enum StreamWindowCodecError {
88 /// Buffer shorter than the fixed wire size.
89 #[error("truncated stream-window message: {0} bytes (need 16)")]
90 Truncated(usize),
91 /// Buffer longer than the fixed wire size. Rejects garbage
92 /// trailers rather than silently ignoring them.
93 #[error("oversize stream-window message: {0} bytes (need 16)")]
94 Oversize(usize),
95}
96
97impl StreamWindow {
98 /// Encode to a fixed 16-byte buffer.
99 #[inline]
100 pub fn encode(&self) -> [u8; STREAM_WINDOW_SIZE] {
101 let mut buf = [0u8; STREAM_WINDOW_SIZE];
102 (&mut buf[..8]).put_u64_le(self.stream_id);
103 (&mut buf[8..16]).put_u64_le(self.total_consumed);
104 (&mut buf[16..]).put_u64_le(self.ack_seq);
105 buf
106 }
107
108 /// Decode a fixed-size message. Returns an error on truncated or
109 /// oversize input.
110 pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
111 match data.len() {
112 n if n < STREAM_WINDOW_SIZE => Err(StreamWindowCodecError::Truncated(n)),
113 n if n > STREAM_WINDOW_SIZE => Err(StreamWindowCodecError::Oversize(n)),
114 _ => {
115 let mut cur = std::io::Cursor::new(data);
116 let stream_id = cur.get_u64_le();
117 let total_consumed = cur.get_u64_le();
118 let ack_seq = cur.get_u64_le();
119 Ok(Self {
120 stream_id,
121 total_consumed,
122 ack_seq,
123 })
124 }
125 }
126 }
127}
128
129/// Receiver → sender retransmit request. Names a stream and the gaps
130/// the receiver is missing: `next_expected` is the lowest sequence not
131/// yet received contiguously, and `missing_bitmap` bit `i` is set iff
132/// `next_expected + 1 + i` is also still missing. The sender feeds this
133/// to `ReliableStream::on_nack` to pull the matching retransmit
134/// descriptors. Carries `stream_id` itself (rides `CONTROL_STREAM_ID`
135/// like the window grant), so the sender doesn't depend on the packet
136/// header's stream field.
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub struct StreamNack {
139 /// Stream the NACK applies to.
140 pub stream_id: u64,
141 /// Lowest sequence the receiver has not received contiguously.
142 pub next_expected: u64,
143 /// Bitmap of further missing sequences after `next_expected`.
144 pub missing_bitmap: u64,
145}
146
147impl StreamNack {
148 /// Encode to a fixed 24-byte buffer.
149 #[inline]
150 pub fn encode(&self) -> [u8; STREAM_NACK_SIZE] {
151 let mut buf = [0u8; STREAM_NACK_SIZE];
152 (&mut buf[..8]).put_u64_le(self.stream_id);
153 (&mut buf[8..16]).put_u64_le(self.next_expected);
154 (&mut buf[16..]).put_u64_le(self.missing_bitmap);
155 buf
156 }
157
158 /// Decode a 24-byte message. Errors on truncated / oversize input.
159 pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
160 match data.len() {
161 n if n < STREAM_NACK_SIZE => Err(StreamWindowCodecError::Truncated(n)),
162 n if n > STREAM_NACK_SIZE => Err(StreamWindowCodecError::Oversize(n)),
163 _ => {
164 let mut cur = std::io::Cursor::new(data);
165 let stream_id = cur.get_u64_le();
166 let next_expected = cur.get_u64_le();
167 let missing_bitmap = cur.get_u64_le();
168 Ok(Self {
169 stream_id,
170 next_expected,
171 missing_bitmap,
172 })
173 }
174 }
175 }
176}
177
178/// Sender → receiver stream reset (H-3). Names a stream the sender has
179/// given up retransmitting; the receiver fails any pending read on it.
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181pub struct StreamReset {
182 /// Stream the reset applies to.
183 pub stream_id: u64,
184}
185
186impl StreamReset {
187 /// Encode to a fixed 8-byte buffer.
188 #[inline]
189 pub fn encode(&self) -> [u8; STREAM_RESET_SIZE] {
190 self.stream_id.to_le_bytes()
191 }
192
193 /// Decode an 8-byte message. Errors on truncated / oversize input.
194 pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
195 match data.len() {
196 n if n < STREAM_RESET_SIZE => Err(StreamWindowCodecError::Truncated(n)),
197 n if n > STREAM_RESET_SIZE => Err(StreamWindowCodecError::Oversize(n)),
198 _ => {
199 let mut cur = std::io::Cursor::new(data);
200 Ok(Self {
201 stream_id: cur.get_u64_le(),
202 })
203 }
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn test_round_trip() {
214 let msg = StreamWindow {
215 stream_id: 0xDEAD_BEEF_CAFE_F00D,
216 total_consumed: 0x0102_0304_0506_0708,
217 ack_seq: 0x1122_3344_5566_7788,
218 };
219 let bytes = msg.encode();
220 assert_eq!(bytes.len(), STREAM_WINDOW_SIZE);
221 let parsed = StreamWindow::decode(&bytes).unwrap();
222 assert_eq!(parsed, msg);
223 }
224
225 #[test]
226 fn test_decode_truncated_rejected() {
227 let err = StreamWindow::decode(&[0u8; STREAM_WINDOW_SIZE - 1]).unwrap_err();
228 assert!(matches!(err, StreamWindowCodecError::Truncated(_)));
229 }
230
231 #[test]
232 fn test_decode_oversize_rejected() {
233 let err = StreamWindow::decode(&[0u8; STREAM_WINDOW_SIZE + 1]).unwrap_err();
234 assert!(matches!(err, StreamWindowCodecError::Oversize(_)));
235 }
236
237 #[test]
238 fn test_decode_empty_rejected() {
239 let err = StreamWindow::decode(&[]).unwrap_err();
240 assert!(matches!(err, StreamWindowCodecError::Truncated(0)));
241 }
242
243 #[test]
244 fn test_endianness_is_little_endian() {
245 // Explicit LE check — stream_id=1 must produce `01 00 ... 00`.
246 let msg = StreamWindow {
247 stream_id: 1,
248 total_consumed: 1,
249 ack_seq: 1,
250 };
251 let bytes = msg.encode();
252 assert_eq!(bytes[0], 0x01);
253 assert_eq!(bytes[1], 0x00);
254 assert_eq!(bytes[8], 0x01);
255 assert_eq!(bytes[9], 0x00);
256 assert_eq!(bytes[16], 0x01);
257 assert_eq!(bytes[17], 0x00);
258 }
259
260 #[test]
261 fn stream_nack_round_trip() {
262 let msg = StreamNack {
263 stream_id: 0xABCD,
264 next_expected: 7,
265 missing_bitmap: 0b1010,
266 };
267 assert_eq!(StreamNack::decode(&msg.encode()).unwrap(), msg);
268 }
269
270 #[test]
271 fn stream_reset_round_trip() {
272 let msg = StreamReset {
273 stream_id: 0x2000_0000_0000_0001,
274 };
275 assert_eq!(StreamReset::decode(&msg.encode()).unwrap(), msg);
276 }
277}