Skip to main content

http2_proto/connection/
stream.rs

1//! HTTP/2 stream state tracking.
2
3pub use crate::frame::StreamId;
4
5/// Stream state (RFC 7540 Section 5.1).
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum StreamState {
8    /// Stream is idle (not yet used).
9    Idle,
10    /// Reserved for server push (we don't use this as a client).
11    ReservedRemote,
12    /// Stream is open (can send and receive).
13    Open,
14    /// Half-closed (local) - we've sent END_STREAM.
15    HalfClosedLocal,
16    /// Half-closed (remote) - peer sent END_STREAM.
17    HalfClosedRemote,
18    /// Stream is closed.
19    Closed,
20}
21
22/// An HTTP/2 stream.
23#[derive(Debug)]
24pub struct Stream {
25    /// Stream identifier.
26    id: StreamId,
27    /// Current state.
28    state: StreamState,
29    /// Send-side flow control window.
30    send_window: i32,
31    /// Receive-side flow control window.
32    recv_window: i32,
33    /// Bytes received (for window updates).
34    bytes_received: u32,
35}
36
37impl Stream {
38    /// Create a new stream.
39    pub fn new(id: StreamId, initial_window_size: u32) -> Self {
40        Self {
41            id,
42            state: StreamState::Open,
43            send_window: initial_window_size as i32,
44            recv_window: initial_window_size as i32,
45            bytes_received: 0,
46        }
47    }
48
49    /// Get the stream ID.
50    pub fn id(&self) -> StreamId {
51        self.id
52    }
53
54    /// Get the stream state.
55    pub fn state(&self) -> StreamState {
56        self.state
57    }
58
59    /// Check if the stream is open for sending.
60    pub fn can_send(&self) -> bool {
61        matches!(
62            self.state,
63            StreamState::Open | StreamState::HalfClosedRemote
64        )
65    }
66
67    /// Check if the stream is open for receiving.
68    pub fn can_recv(&self) -> bool {
69        matches!(self.state, StreamState::Open | StreamState::HalfClosedLocal)
70    }
71
72    /// Get the send window size.
73    pub fn send_window(&self) -> i32 {
74        self.send_window
75    }
76
77    /// Get the receive window size.
78    pub fn recv_window(&self) -> i32 {
79        self.recv_window
80    }
81
82    /// Record that we sent data.
83    pub fn send_data(&mut self, size: u32) {
84        self.send_window -= size as i32;
85    }
86
87    /// Record that we sent END_STREAM.
88    pub fn send_end_stream(&mut self) {
89        self.state = match self.state {
90            StreamState::Open => StreamState::HalfClosedLocal,
91            StreamState::HalfClosedRemote => StreamState::Closed,
92            other => other,
93        };
94    }
95
96    /// Record that we received data.
97    pub fn recv_data(&mut self, size: u32) {
98        self.recv_window -= size as i32;
99        self.bytes_received += size;
100    }
101
102    /// Record that we received END_STREAM.
103    pub fn recv_end_stream(&mut self) {
104        self.state = match self.state {
105            StreamState::Open => StreamState::HalfClosedRemote,
106            StreamState::HalfClosedLocal => StreamState::Closed,
107            other => other,
108        };
109    }
110
111    /// Increase the send window (from WINDOW_UPDATE).
112    pub fn increase_send_window(&mut self, increment: u32) {
113        self.send_window += increment as i32;
114    }
115
116    /// Adjust the send window (from SETTINGS change).
117    pub fn adjust_send_window(&mut self, delta: i32) {
118        self.send_window += delta;
119    }
120
121    /// Mark the stream as reset.
122    pub fn reset(&mut self) {
123        self.state = StreamState::Closed;
124    }
125
126    /// Get bytes received since last window update.
127    pub fn bytes_received(&self) -> u32 {
128        self.bytes_received
129    }
130
131    /// Reset bytes received counter.
132    pub fn reset_bytes_received(&mut self) {
133        self.bytes_received = 0;
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn test_stream_new() {
143        let stream = Stream::new(StreamId::new(5), 65535);
144        assert_eq!(stream.id().value(), 5);
145        assert_eq!(stream.state(), StreamState::Open);
146        assert_eq!(stream.send_window(), 65535);
147        assert_eq!(stream.recv_window(), 65535);
148        assert_eq!(stream.bytes_received(), 0);
149    }
150
151    #[test]
152    fn test_stream_lifecycle() {
153        let mut stream = Stream::new(StreamId::new(1), 65535);
154
155        assert_eq!(stream.state(), StreamState::Open);
156        assert!(stream.can_send());
157        assert!(stream.can_recv());
158
159        // Send END_STREAM
160        stream.send_end_stream();
161        assert_eq!(stream.state(), StreamState::HalfClosedLocal);
162        assert!(!stream.can_send());
163        assert!(stream.can_recv());
164
165        // Receive END_STREAM
166        stream.recv_end_stream();
167        assert_eq!(stream.state(), StreamState::Closed);
168        assert!(!stream.can_send());
169        assert!(!stream.can_recv());
170    }
171
172    #[test]
173    fn test_stream_lifecycle_recv_first() {
174        let mut stream = Stream::new(StreamId::new(1), 65535);
175
176        // Receive END_STREAM first
177        stream.recv_end_stream();
178        assert_eq!(stream.state(), StreamState::HalfClosedRemote);
179        assert!(stream.can_send());
180        assert!(!stream.can_recv());
181
182        // Then send END_STREAM
183        stream.send_end_stream();
184        assert_eq!(stream.state(), StreamState::Closed);
185        assert!(!stream.can_send());
186        assert!(!stream.can_recv());
187    }
188
189    #[test]
190    fn test_stream_send_end_stream_already_closed() {
191        let mut stream = Stream::new(StreamId::new(1), 65535);
192        stream.reset();
193        assert_eq!(stream.state(), StreamState::Closed);
194
195        // Sending END_STREAM on closed stream doesn't change state
196        stream.send_end_stream();
197        assert_eq!(stream.state(), StreamState::Closed);
198    }
199
200    #[test]
201    fn test_stream_recv_end_stream_already_closed() {
202        let mut stream = Stream::new(StreamId::new(1), 65535);
203        stream.reset();
204        assert_eq!(stream.state(), StreamState::Closed);
205
206        // Receiving END_STREAM on closed stream doesn't change state
207        stream.recv_end_stream();
208        assert_eq!(stream.state(), StreamState::Closed);
209    }
210
211    #[test]
212    fn test_flow_control() {
213        let mut stream = Stream::new(StreamId::new(1), 65535);
214
215        assert_eq!(stream.send_window(), 65535);
216
217        stream.send_data(1000);
218        assert_eq!(stream.send_window(), 64535);
219
220        stream.increase_send_window(500);
221        assert_eq!(stream.send_window(), 65035);
222    }
223
224    #[test]
225    fn test_recv_flow_control() {
226        let mut stream = Stream::new(StreamId::new(1), 65535);
227
228        assert_eq!(stream.recv_window(), 65535);
229        assert_eq!(stream.bytes_received(), 0);
230
231        stream.recv_data(1000);
232        assert_eq!(stream.recv_window(), 64535);
233        assert_eq!(stream.bytes_received(), 1000);
234
235        stream.recv_data(500);
236        assert_eq!(stream.recv_window(), 64035);
237        assert_eq!(stream.bytes_received(), 1500);
238    }
239
240    #[test]
241    fn test_reset_bytes_received() {
242        let mut stream = Stream::new(StreamId::new(1), 65535);
243
244        stream.recv_data(1000);
245        assert_eq!(stream.bytes_received(), 1000);
246
247        stream.reset_bytes_received();
248        assert_eq!(stream.bytes_received(), 0);
249    }
250
251    #[test]
252    fn test_adjust_send_window() {
253        let mut stream = Stream::new(StreamId::new(1), 65535);
254
255        stream.adjust_send_window(1000);
256        assert_eq!(stream.send_window(), 66535);
257
258        stream.adjust_send_window(-2000);
259        assert_eq!(stream.send_window(), 64535);
260    }
261
262    #[test]
263    fn test_stream_reset() {
264        let mut stream = Stream::new(StreamId::new(1), 65535);
265        assert_eq!(stream.state(), StreamState::Open);
266
267        stream.reset();
268        assert_eq!(stream.state(), StreamState::Closed);
269        assert!(!stream.can_send());
270        assert!(!stream.can_recv());
271    }
272
273    #[test]
274    fn test_stream_state_debug() {
275        let states = [
276            StreamState::Idle,
277            StreamState::ReservedRemote,
278            StreamState::Open,
279            StreamState::HalfClosedLocal,
280            StreamState::HalfClosedRemote,
281            StreamState::Closed,
282        ];
283
284        for state in states {
285            let debug = format!("{:?}", state);
286            assert!(!debug.is_empty());
287        }
288    }
289
290    #[test]
291    fn test_stream_state_eq() {
292        assert_eq!(StreamState::Open, StreamState::Open);
293        assert_ne!(StreamState::Open, StreamState::Closed);
294    }
295
296    #[test]
297    fn test_stream_debug() {
298        let stream = Stream::new(StreamId::new(1), 65535);
299        let debug = format!("{:?}", stream);
300        assert!(debug.contains("Stream"));
301    }
302
303    #[test]
304    fn test_can_send_states() {
305        let mut stream = Stream::new(StreamId::new(1), 65535);
306
307        // Open - can send
308        assert!(stream.can_send());
309
310        // HalfClosedRemote - can send
311        stream.recv_end_stream();
312        assert!(stream.can_send());
313
314        // HalfClosedLocal - cannot send
315        let mut stream2 = Stream::new(StreamId::new(3), 65535);
316        stream2.send_end_stream();
317        assert!(!stream2.can_send());
318    }
319
320    #[test]
321    fn test_can_recv_states() {
322        let mut stream = Stream::new(StreamId::new(1), 65535);
323
324        // Open - can recv
325        assert!(stream.can_recv());
326
327        // HalfClosedLocal - can recv
328        stream.send_end_stream();
329        assert!(stream.can_recv());
330
331        // HalfClosedRemote - cannot recv
332        let mut stream2 = Stream::new(StreamId::new(3), 65535);
333        stream2.recv_end_stream();
334        assert!(!stream2.can_recv());
335    }
336}