Skip to main content

zerodds_http2/
flow.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Flow-Control — RFC 9113 §5.2 + §6.9.
5//!
6//! Spec §5.2: pro Stream UND Connection gibt es ein Flow-Window.
7//! Sender darf nicht mehr als `min(stream_window, conn_window)` an
8//! `DATA`-Bytes senden, bevor `WINDOW_UPDATE` empfangen wurde.
9
10use crate::error::Http2Error;
11
12/// Initiale Window-Size laut Spec §6.5.2 (`SETTINGS_INITIAL_WINDOW_SIZE`).
13pub const INITIAL_WINDOW_SIZE: i64 = 65_535;
14
15/// Flow-Control-Window mit signed-i64 (Spec §6.9: kann transient
16/// negativ werden, wenn Settings das Window verkleinern).
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct FlowControl {
19    window: i64,
20    max: i64,
21}
22
23impl Default for FlowControl {
24    fn default() -> Self {
25        Self {
26            window: INITIAL_WINDOW_SIZE,
27            max: 0x7fff_ffff,
28        }
29    }
30}
31
32impl FlowControl {
33    /// Konstruktor mit Initial-Window-Size.
34    #[must_use]
35    pub fn new(initial: i64) -> Self {
36        Self {
37            window: initial,
38            max: 0x7fff_ffff,
39        }
40    }
41
42    /// Aktuelles Window.
43    #[must_use]
44    pub fn window(&self) -> i64 {
45        self.window
46    }
47
48    /// Konsumiert `n` Bytes vom Window. Spec §6.9.1.
49    ///
50    /// # Errors
51    /// `FlowControlExceeded` wenn das Window negativ wuerde.
52    pub fn consume(&mut self, n: u32) -> Result<(), Http2Error> {
53        let n = i64::from(n);
54        if n > self.window {
55            return Err(Http2Error::FlowControlExceeded);
56        }
57        self.window -= n;
58        Ok(())
59    }
60
61    /// Wendet einen `WINDOW_UPDATE` an. Spec §6.9.1.
62    ///
63    /// # Errors
64    /// `Protocol(FlowControlError)` wenn das Window
65    /// 2^31-1 ueberschreitet.
66    pub fn apply_window_update(&mut self, increment: u32) -> Result<(), Http2Error> {
67        use crate::error::ErrorCode;
68        if increment == 0 {
69            // Spec §6.9: increment of 0 must be treated as PROTOCOL_ERROR
70            // for streams (or connection-level for stream id 0).
71            return Err(Http2Error::Protocol(ErrorCode::ProtocolError));
72        }
73        let new_window = self.window.saturating_add(i64::from(increment));
74        if new_window > self.max {
75            return Err(Http2Error::Protocol(ErrorCode::FlowControlError));
76        }
77        self.window = new_window;
78        Ok(())
79    }
80
81    /// Anwendung einer neuen `INITIAL_WINDOW_SIZE` aus SETTINGS-Update
82    /// (Spec §6.9.2): das Window wird um die Differenz angepasst.
83    pub fn apply_initial_window_size_change(&mut self, old: i64, new: i64) {
84        let delta = new - old;
85        self.window += delta;
86    }
87}
88
89/// Encoded ein WINDOW_UPDATE-Frame-Payload (4 Bytes, R-Bit + 31-bit
90/// Increment). Spec §6.9.
91#[must_use]
92pub fn encode_window_update(increment: u32) -> [u8; 4] {
93    let v = increment & 0x7fff_ffff;
94    [
95        ((v >> 24) & 0xff) as u8,
96        ((v >> 16) & 0xff) as u8,
97        ((v >> 8) & 0xff) as u8,
98        (v & 0xff) as u8,
99    ]
100}
101
102/// Decoded ein WINDOW_UPDATE-Frame-Payload (4 Bytes).
103///
104/// # Errors
105/// `Protocol(FrameSizeError)` wenn das Payload nicht 4 Bytes ist.
106pub fn decode_window_update(payload: &[u8]) -> Result<u32, Http2Error> {
107    use crate::error::ErrorCode;
108    if payload.len() != 4 {
109        return Err(Http2Error::Protocol(ErrorCode::FrameSizeError));
110    }
111    let v = (u32::from(payload[0]) << 24)
112        | (u32::from(payload[1]) << 16)
113        | (u32::from(payload[2]) << 8)
114        | u32::from(payload[3]);
115    Ok(v & 0x7fff_ffff)
116}
117
118#[cfg(test)]
119#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn default_window_is_initial() {
125        let fc = FlowControl::default();
126        assert_eq!(fc.window(), INITIAL_WINDOW_SIZE);
127    }
128
129    #[test]
130    fn consume_reduces_window() {
131        let mut fc = FlowControl::new(1000);
132        fc.consume(400).unwrap();
133        assert_eq!(fc.window(), 600);
134    }
135
136    #[test]
137    fn consume_more_than_window_rejected() {
138        let mut fc = FlowControl::new(100);
139        assert_eq!(fc.consume(101), Err(Http2Error::FlowControlExceeded));
140        assert_eq!(fc.window(), 100, "window unchanged on error");
141    }
142
143    #[test]
144    fn window_update_raises_window() {
145        let mut fc = FlowControl::new(0);
146        fc.apply_window_update(500).unwrap();
147        assert_eq!(fc.window(), 500);
148    }
149
150    #[test]
151    fn window_update_zero_rejected() {
152        let mut fc = FlowControl::default();
153        assert!(fc.apply_window_update(0).is_err());
154    }
155
156    #[test]
157    fn window_update_overflow_rejected() {
158        let mut fc = FlowControl::new(0x7fff_fff0);
159        assert!(fc.apply_window_update(0x10000).is_err());
160    }
161
162    #[test]
163    fn initial_window_size_change_adjusts_window() {
164        let mut fc = FlowControl::new(1000);
165        fc.apply_initial_window_size_change(65_535, 131_070);
166        assert_eq!(fc.window(), 1000 + 65_535);
167    }
168
169    #[test]
170    fn round_trip_window_update_codec() {
171        let bytes = encode_window_update(0x12_34_56);
172        assert_eq!(decode_window_update(&bytes).unwrap(), 0x12_34_56);
173    }
174
175    #[test]
176    fn r_bit_stripped_on_decode() {
177        let bytes = [0x80, 0x00, 0x00, 0x01]; // R-bit set
178        assert_eq!(decode_window_update(&bytes).unwrap(), 1);
179    }
180
181    #[test]
182    fn wrong_payload_size_rejected() {
183        assert!(decode_window_update(&[0; 3]).is_err());
184        assert!(decode_window_update(&[0; 5]).is_err());
185    }
186}