net/adapter/net/subprotocol/stream_window.rs
1//! Stream window subprotocol — receiver → sender credit grants.
2//!
3//! Ships over `SUBPROTOCOL_STREAM_WINDOW` on existing encrypted
4//! sessions. Each grant carries the receiver's **absolute**
5//! cumulative bytes-consumed count on the named stream; the sender
6//! reconciles its credit state from that authoritative value.
7//!
8//! ## Why absolute, not additive
9//!
10//! An additive grant (`credit_bytes` added to the sender's
11//! remaining credit) permanently strands credit when either a data
12//! packet OR a grant is dropped on the wire. An absolute
13//! `total_consumed` grant is self-healing: every arriving grant
14//! carries the receiver's full accounting, so any single lost
15//! grant is reconciled by the next one. Lost data packets leave
16//! `tx_bytes_sent - total_consumed` elevated until recovery
17//! (retransmit for `Reliable`, stream reset for `FireAndForget`),
18//! but the sender's credit view converges exactly when the
19//! receiver's view does.
20//!
21//! Wire layout: 16 bytes per message (`u64 stream_id LE` +
22//! `u64 total_consumed LE`).
23
24use bytes::{Buf, BufMut};
25
26/// Subprotocol ID for stream-window credit grants.
27pub const SUBPROTOCOL_STREAM_WINDOW: u16 = 0x0B00;
28
29/// Fixed wire size in bytes.
30pub const STREAM_WINDOW_SIZE: usize = 16;
31
32/// Receiver → sender credit grant. Authoritative: `total_consumed`
33/// is the receiver's cumulative bytes-consumed count on the named
34/// stream since it was opened. The sender uses this to recompute
35/// `tx_credit_remaining = tx_window - (tx_bytes_sent - total_consumed)`,
36/// making the mechanism self-healing against lost grants.
37///
38/// # Consumer-side validation
39///
40/// The codec accepts any `total_consumed: u64`. Pre-fix the doc-
41/// comment's "self-healing" framing implied no further validation
42/// was needed, but the formula
43/// `tx_credit_remaining = tx_window - (tx_bytes_sent - total_consumed)`
44/// underflows if a malformed or hostile peer sends
45/// `total_consumed > tx_bytes_sent`. **The consumer MUST clamp
46/// `total_consumed` to its local `tx_bytes_sent` watermark before
47/// applying.** `StreamState::apply_authoritative_grant`
48/// (`adapter/net/session.rs:1153-1154`) does this today; any
49/// future consumer of this codec must do the same. The codec
50/// layer cannot do the clamp itself because it doesn't know the
51/// sender's local state.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct StreamWindow {
54 /// Stream the grant applies to.
55 pub stream_id: u64,
56 /// Receiver's cumulative consumed-byte count on this stream.
57 ///
58 /// Consumers MUST clamp this to the local `tx_bytes_sent`
59 /// watermark before deriving credit.
60 pub total_consumed: u64,
61}
62
63/// Errors produced by the codec.
64#[derive(Debug, thiserror::Error)]
65pub enum StreamWindowCodecError {
66 /// Buffer shorter than the fixed wire size.
67 #[error("truncated stream-window message: {0} bytes (need 16)")]
68 Truncated(usize),
69 /// Buffer longer than the fixed wire size. Rejects garbage
70 /// trailers rather than silently ignoring them.
71 #[error("oversize stream-window message: {0} bytes (need 16)")]
72 Oversize(usize),
73}
74
75impl StreamWindow {
76 /// Encode to a fixed 16-byte buffer.
77 #[inline]
78 pub fn encode(&self) -> [u8; STREAM_WINDOW_SIZE] {
79 let mut buf = [0u8; STREAM_WINDOW_SIZE];
80 (&mut buf[..8]).put_u64_le(self.stream_id);
81 (&mut buf[8..]).put_u64_le(self.total_consumed);
82 buf
83 }
84
85 /// Decode a 16-byte message. Returns an error on truncated or
86 /// oversize input.
87 pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
88 match data.len() {
89 n if n < STREAM_WINDOW_SIZE => Err(StreamWindowCodecError::Truncated(n)),
90 n if n > STREAM_WINDOW_SIZE => Err(StreamWindowCodecError::Oversize(n)),
91 _ => {
92 let mut cur = std::io::Cursor::new(data);
93 let stream_id = cur.get_u64_le();
94 let total_consumed = cur.get_u64_le();
95 Ok(Self {
96 stream_id,
97 total_consumed,
98 })
99 }
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107
108 #[test]
109 fn test_round_trip() {
110 let msg = StreamWindow {
111 stream_id: 0xDEAD_BEEF_CAFE_F00D,
112 total_consumed: 0x0102_0304_0506_0708,
113 };
114 let bytes = msg.encode();
115 assert_eq!(bytes.len(), STREAM_WINDOW_SIZE);
116 let parsed = StreamWindow::decode(&bytes).unwrap();
117 assert_eq!(parsed, msg);
118 }
119
120 #[test]
121 fn test_decode_truncated_rejected() {
122 let err = StreamWindow::decode(&[0u8; 15]).unwrap_err();
123 assert!(matches!(err, StreamWindowCodecError::Truncated(15)));
124 }
125
126 #[test]
127 fn test_decode_oversize_rejected() {
128 let err = StreamWindow::decode(&[0u8; 17]).unwrap_err();
129 assert!(matches!(err, StreamWindowCodecError::Oversize(17)));
130 }
131
132 #[test]
133 fn test_decode_empty_rejected() {
134 let err = StreamWindow::decode(&[]).unwrap_err();
135 assert!(matches!(err, StreamWindowCodecError::Truncated(0)));
136 }
137
138 #[test]
139 fn test_endianness_is_little_endian() {
140 // Explicit LE check — stream_id=1 must produce `01 00 ... 00`.
141 let msg = StreamWindow {
142 stream_id: 1,
143 total_consumed: 1,
144 };
145 let bytes = msg.encode();
146 assert_eq!(bytes[0], 0x01);
147 assert_eq!(bytes[1], 0x00);
148 assert_eq!(bytes[8], 0x01);
149 assert_eq!(bytes[9], 0x00);
150 }
151}