Skip to main content

zerodds_http2/
flow.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Flow control — RFC 9113 §5.2 + §6.9.
5//!
6//! Spec §5.2: there is a flow window per stream AND per connection.
7//! A sender may not send more than `min(stream_window, conn_window)`
8//! of `DATA` bytes before receiving a `WINDOW_UPDATE`.
9
10use crate::error::Http2Error;
11
12/// Initial window size per Spec §6.5.2 (`SETTINGS_INITIAL_WINDOW_SIZE`).
13pub const INITIAL_WINDOW_SIZE: i64 = 65_535;
14
15/// Flow-control window with signed i64 (Spec §6.9: can transiently
16/// become negative when settings shrink the window).
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct FlowControl {
19    window: i64,
20    max: i64,
21}
22
23impl Default for FlowControl {
24    fn default() -> Self {
25        Self {
26            window: INITIAL_WINDOW_SIZE,
27            max: 0x7fff_ffff,
28        }
29    }
30}
31
32impl FlowControl {
33    /// Constructor with initial window size.
34    #[must_use]
35    pub fn new(initial: i64) -> Self {
36        Self {
37            window: initial,
38            max: 0x7fff_ffff,
39        }
40    }
41
42    /// Current window.
43    #[must_use]
44    pub fn window(&self) -> i64 {
45        self.window
46    }
47
48    /// Consumes `n` bytes from the window. Spec §6.9.1.
49    ///
50    /// # Errors
51    /// `FlowControlExceeded` if the window would become negative.
52    pub fn consume(&mut self, n: u32) -> Result<(), Http2Error> {
53        let n = i64::from(n);
54        if n > self.window {
55            return Err(Http2Error::FlowControlExceeded);
56        }
57        self.window -= n;
58        Ok(())
59    }
60
61    /// Applies a `WINDOW_UPDATE`. Spec §6.9.1.
62    ///
63    /// # Errors
64    /// `Protocol(FlowControlError)` if the window
65    /// 2^31-1 ueberschreitet.
66    pub fn apply_window_update(&mut self, increment: u32) -> Result<(), Http2Error> {
67        use crate::error::ErrorCode;
68        if increment == 0 {
69            // Spec §6.9: increment of 0 must be treated as PROTOCOL_ERROR
70            // for streams (or connection-level for stream id 0).
71            return Err(Http2Error::Protocol(ErrorCode::ProtocolError));
72        }
73        let new_window = self.window.saturating_add(i64::from(increment));
74        if new_window > self.max {
75            return Err(Http2Error::Protocol(ErrorCode::FlowControlError));
76        }
77        self.window = new_window;
78        Ok(())
79    }
80
81    /// Apply a new `INITIAL_WINDOW_SIZE` from a SETTINGS update
82    /// (spec §6.9.2): the window is adjusted by the difference.
83    pub fn apply_initial_window_size_change(&mut self, old: i64, new: i64) {
84        let delta = new - old;
85        self.window += delta;
86    }
87}
88
89/// Encodes a WINDOW_UPDATE frame payload (4 bytes, R bit + 31-bit
90/// increment). Spec §6.9.
91#[must_use]
92pub fn encode_window_update(increment: u32) -> [u8; 4] {
93    let v = increment & 0x7fff_ffff;
94    [
95        ((v >> 24) & 0xff) as u8,
96        ((v >> 16) & 0xff) as u8,
97        ((v >> 8) & 0xff) as u8,
98        (v & 0xff) as u8,
99    ]
100}
101
102/// Decodes a WINDOW_UPDATE frame payload (4 bytes).
103///
104/// # Errors
105/// `Protocol(FrameSizeError)` if the payload is not 4 bytes.
106pub fn decode_window_update(payload: &[u8]) -> Result<u32, Http2Error> {
107    use crate::error::ErrorCode;
108    if payload.len() != 4 {
109        return Err(Http2Error::Protocol(ErrorCode::FrameSizeError));
110    }
111    let v = (u32::from(payload[0]) << 24)
112        | (u32::from(payload[1]) << 16)
113        | (u32::from(payload[2]) << 8)
114        | u32::from(payload[3]);
115    Ok(v & 0x7fff_ffff)
116}
117
118#[cfg(test)]
119#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn default_window_is_initial() {
125        let fc = FlowControl::default();
126        assert_eq!(fc.window(), INITIAL_WINDOW_SIZE);
127    }
128
129    #[test]
130    fn consume_reduces_window() {
131        let mut fc = FlowControl::new(1000);
132        fc.consume(400).unwrap();
133        assert_eq!(fc.window(), 600);
134    }
135
136    #[test]
137    fn consume_more_than_window_rejected() {
138        let mut fc = FlowControl::new(100);
139        assert_eq!(fc.consume(101), Err(Http2Error::FlowControlExceeded));
140        assert_eq!(fc.window(), 100, "window unchanged on error");
141    }
142
143    #[test]
144    fn window_update_raises_window() {
145        let mut fc = FlowControl::new(0);
146        fc.apply_window_update(500).unwrap();
147        assert_eq!(fc.window(), 500);
148    }
149
150    #[test]
151    fn window_update_zero_rejected() {
152        let mut fc = FlowControl::default();
153        assert!(fc.apply_window_update(0).is_err());
154    }
155
156    #[test]
157    fn window_update_overflow_rejected() {
158        let mut fc = FlowControl::new(0x7fff_fff0);
159        assert!(fc.apply_window_update(0x10000).is_err());
160    }
161
162    #[test]
163    fn initial_window_size_change_adjusts_window() {
164        let mut fc = FlowControl::new(1000);
165        fc.apply_initial_window_size_change(65_535, 131_070);
166        assert_eq!(fc.window(), 1000 + 65_535);
167    }
168
169    #[test]
170    fn round_trip_window_update_codec() {
171        let bytes = encode_window_update(0x12_34_56);
172        assert_eq!(decode_window_update(&bytes).unwrap(), 0x12_34_56);
173    }
174
175    #[test]
176    fn r_bit_stripped_on_decode() {
177        let bytes = [0x80, 0x00, 0x00, 0x01]; // R-bit set
178        assert_eq!(decode_window_update(&bytes).unwrap(), 1);
179    }
180
181    #[test]
182    fn wrong_payload_size_rejected() {
183        assert!(decode_window_update(&[0; 3]).is_err());
184        assert!(decode_window_update(&[0; 5]).is_err());
185    }
186}