1pub use crate::frame::StreamId;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum StreamState {
8 Idle,
10 ReservedRemote,
12 Open,
14 HalfClosedLocal,
16 HalfClosedRemote,
18 Closed,
20}
21
22#[derive(Debug)]
24pub struct Stream {
25 id: StreamId,
27 state: StreamState,
29 send_window: i32,
31 recv_window: i32,
33 bytes_received: u32,
35}
36
37impl Stream {
38 pub fn new(id: StreamId, initial_window_size: u32) -> Self {
40 Self {
41 id,
42 state: StreamState::Open,
43 send_window: initial_window_size as i32,
44 recv_window: initial_window_size as i32,
45 bytes_received: 0,
46 }
47 }
48
49 pub fn id(&self) -> StreamId {
51 self.id
52 }
53
54 pub fn state(&self) -> StreamState {
56 self.state
57 }
58
59 pub fn can_send(&self) -> bool {
61 matches!(
62 self.state,
63 StreamState::Open | StreamState::HalfClosedRemote
64 )
65 }
66
67 pub fn can_recv(&self) -> bool {
69 matches!(self.state, StreamState::Open | StreamState::HalfClosedLocal)
70 }
71
72 pub fn send_window(&self) -> i32 {
74 self.send_window
75 }
76
77 pub fn recv_window(&self) -> i32 {
79 self.recv_window
80 }
81
82 pub fn send_data(&mut self, size: u32) {
84 self.send_window -= size as i32;
85 }
86
87 pub fn send_end_stream(&mut self) {
89 self.state = match self.state {
90 StreamState::Open => StreamState::HalfClosedLocal,
91 StreamState::HalfClosedRemote => StreamState::Closed,
92 other => other,
93 };
94 }
95
96 pub fn recv_data(&mut self, size: u32) {
98 self.recv_window -= size as i32;
99 self.bytes_received += size;
100 }
101
102 pub fn recv_end_stream(&mut self) {
104 self.state = match self.state {
105 StreamState::Open => StreamState::HalfClosedRemote,
106 StreamState::HalfClosedLocal => StreamState::Closed,
107 other => other,
108 };
109 }
110
111 pub fn increase_send_window(&mut self, increment: u32) {
113 self.send_window += increment as i32;
114 }
115
116 pub fn adjust_send_window(&mut self, delta: i32) {
118 self.send_window += delta;
119 }
120
121 pub fn reset(&mut self) {
123 self.state = StreamState::Closed;
124 }
125
126 pub fn bytes_received(&self) -> u32 {
128 self.bytes_received
129 }
130
131 pub fn reset_bytes_received(&mut self) {
133 self.bytes_received = 0;
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn test_stream_new() {
143 let stream = Stream::new(StreamId::new(5), 65535);
144 assert_eq!(stream.id().value(), 5);
145 assert_eq!(stream.state(), StreamState::Open);
146 assert_eq!(stream.send_window(), 65535);
147 assert_eq!(stream.recv_window(), 65535);
148 assert_eq!(stream.bytes_received(), 0);
149 }
150
151 #[test]
152 fn test_stream_lifecycle() {
153 let mut stream = Stream::new(StreamId::new(1), 65535);
154
155 assert_eq!(stream.state(), StreamState::Open);
156 assert!(stream.can_send());
157 assert!(stream.can_recv());
158
159 stream.send_end_stream();
161 assert_eq!(stream.state(), StreamState::HalfClosedLocal);
162 assert!(!stream.can_send());
163 assert!(stream.can_recv());
164
165 stream.recv_end_stream();
167 assert_eq!(stream.state(), StreamState::Closed);
168 assert!(!stream.can_send());
169 assert!(!stream.can_recv());
170 }
171
172 #[test]
173 fn test_stream_lifecycle_recv_first() {
174 let mut stream = Stream::new(StreamId::new(1), 65535);
175
176 stream.recv_end_stream();
178 assert_eq!(stream.state(), StreamState::HalfClosedRemote);
179 assert!(stream.can_send());
180 assert!(!stream.can_recv());
181
182 stream.send_end_stream();
184 assert_eq!(stream.state(), StreamState::Closed);
185 assert!(!stream.can_send());
186 assert!(!stream.can_recv());
187 }
188
189 #[test]
190 fn test_stream_send_end_stream_already_closed() {
191 let mut stream = Stream::new(StreamId::new(1), 65535);
192 stream.reset();
193 assert_eq!(stream.state(), StreamState::Closed);
194
195 stream.send_end_stream();
197 assert_eq!(stream.state(), StreamState::Closed);
198 }
199
200 #[test]
201 fn test_stream_recv_end_stream_already_closed() {
202 let mut stream = Stream::new(StreamId::new(1), 65535);
203 stream.reset();
204 assert_eq!(stream.state(), StreamState::Closed);
205
206 stream.recv_end_stream();
208 assert_eq!(stream.state(), StreamState::Closed);
209 }
210
211 #[test]
212 fn test_flow_control() {
213 let mut stream = Stream::new(StreamId::new(1), 65535);
214
215 assert_eq!(stream.send_window(), 65535);
216
217 stream.send_data(1000);
218 assert_eq!(stream.send_window(), 64535);
219
220 stream.increase_send_window(500);
221 assert_eq!(stream.send_window(), 65035);
222 }
223
224 #[test]
225 fn test_recv_flow_control() {
226 let mut stream = Stream::new(StreamId::new(1), 65535);
227
228 assert_eq!(stream.recv_window(), 65535);
229 assert_eq!(stream.bytes_received(), 0);
230
231 stream.recv_data(1000);
232 assert_eq!(stream.recv_window(), 64535);
233 assert_eq!(stream.bytes_received(), 1000);
234
235 stream.recv_data(500);
236 assert_eq!(stream.recv_window(), 64035);
237 assert_eq!(stream.bytes_received(), 1500);
238 }
239
240 #[test]
241 fn test_reset_bytes_received() {
242 let mut stream = Stream::new(StreamId::new(1), 65535);
243
244 stream.recv_data(1000);
245 assert_eq!(stream.bytes_received(), 1000);
246
247 stream.reset_bytes_received();
248 assert_eq!(stream.bytes_received(), 0);
249 }
250
251 #[test]
252 fn test_adjust_send_window() {
253 let mut stream = Stream::new(StreamId::new(1), 65535);
254
255 stream.adjust_send_window(1000);
256 assert_eq!(stream.send_window(), 66535);
257
258 stream.adjust_send_window(-2000);
259 assert_eq!(stream.send_window(), 64535);
260 }
261
262 #[test]
263 fn test_stream_reset() {
264 let mut stream = Stream::new(StreamId::new(1), 65535);
265 assert_eq!(stream.state(), StreamState::Open);
266
267 stream.reset();
268 assert_eq!(stream.state(), StreamState::Closed);
269 assert!(!stream.can_send());
270 assert!(!stream.can_recv());
271 }
272
273 #[test]
274 fn test_stream_state_debug() {
275 let states = [
276 StreamState::Idle,
277 StreamState::ReservedRemote,
278 StreamState::Open,
279 StreamState::HalfClosedLocal,
280 StreamState::HalfClosedRemote,
281 StreamState::Closed,
282 ];
283
284 for state in states {
285 let debug = format!("{:?}", state);
286 assert!(!debug.is_empty());
287 }
288 }
289
290 #[test]
291 fn test_stream_state_eq() {
292 assert_eq!(StreamState::Open, StreamState::Open);
293 assert_ne!(StreamState::Open, StreamState::Closed);
294 }
295
296 #[test]
297 fn test_stream_debug() {
298 let stream = Stream::new(StreamId::new(1), 65535);
299 let debug = format!("{:?}", stream);
300 assert!(debug.contains("Stream"));
301 }
302
303 #[test]
304 fn test_can_send_states() {
305 let mut stream = Stream::new(StreamId::new(1), 65535);
306
307 assert!(stream.can_send());
309
310 stream.recv_end_stream();
312 assert!(stream.can_send());
313
314 let mut stream2 = Stream::new(StreamId::new(3), 65535);
316 stream2.send_end_stream();
317 assert!(!stream2.can_send());
318 }
319
320 #[test]
321 fn test_can_recv_states() {
322 let mut stream = Stream::new(StreamId::new(1), 65535);
323
324 assert!(stream.can_recv());
326
327 stream.send_end_stream();
329 assert!(stream.can_recv());
330
331 let mut stream2 = Stream::new(StreamId::new(3), 65535);
333 stream2.recv_end_stream();
334 assert!(!stream2.can_recv());
335 }
336}