quic_reverse_control/
framing.rs

1// Copyright 2024-2026 Farlight Networks, LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Length-prefixed framing for control messages.
16//!
17//! All control messages are transmitted as length-prefixed frames:
18//!
19//! ```text
20//! ┌──────────────────┬─────────────────────────────────┐
21//! │  Length (4 bytes)│         Payload (N bytes)       │
22//! │    big-endian    │      codec-encoded message      │
23//! └──────────────────┴─────────────────────────────────┘
24//! ```
25//!
26//! The maximum frame size is 64KB to prevent memory exhaustion attacks.
27
28use crate::ControlError;
29use bytes::{Buf, BufMut, BytesMut};
30
31/// Maximum frame size (64KB).
32///
33/// Frames larger than this will be rejected to prevent memory exhaustion.
34pub const MAX_FRAME_SIZE: usize = 65536;
35
36/// Length prefix size in bytes.
37const LENGTH_PREFIX_SIZE: usize = 4;
38
39/// Reads length-prefixed frames from a byte buffer.
40///
41/// This struct maintains internal state for incremental parsing,
42/// allowing frames to be read from partial data as it arrives.
43#[derive(Debug, Default)]
44pub struct FrameReader {
45    /// Buffer for accumulating incoming data.
46    buffer: BytesMut,
47}
48
49impl FrameReader {
50    /// Creates a new frame reader.
51    #[must_use]
52    pub fn new() -> Self {
53        Self {
54            buffer: BytesMut::new(),
55        }
56    }
57
58    /// Creates a new frame reader with the specified initial capacity.
59    #[must_use]
60    pub fn with_capacity(capacity: usize) -> Self {
61        Self {
62            buffer: BytesMut::with_capacity(capacity),
63        }
64    }
65
66    /// Appends data to the internal buffer.
67    pub fn extend(&mut self, data: &[u8]) {
68        self.buffer.extend_from_slice(data);
69    }
70
71    /// Attempts to read a complete frame from the buffer.
72    ///
73    /// Returns `Ok(Some(data))` if a complete frame is available,
74    /// `Ok(None)` if more data is needed, or an error if the frame
75    /// is invalid.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the frame size exceeds `MAX_FRAME_SIZE`.
80    pub fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ControlError> {
81        // Need at least the length prefix
82        if self.buffer.len() < LENGTH_PREFIX_SIZE {
83            return Ok(None);
84        }
85
86        // Peek at the length without consuming
87        let length = u32::from_be_bytes([
88            self.buffer[0],
89            self.buffer[1],
90            self.buffer[2],
91            self.buffer[3],
92        ]) as usize;
93
94        // Validate frame size
95        if length > MAX_FRAME_SIZE {
96            return Err(ControlError::FrameTooLarge { size: length });
97        }
98
99        // Check if we have the complete frame
100        let total_length = LENGTH_PREFIX_SIZE + length;
101        if self.buffer.len() < total_length {
102            return Ok(None);
103        }
104
105        // Consume the length prefix
106        self.buffer.advance(LENGTH_PREFIX_SIZE);
107
108        // Extract the payload
109        let payload = self.buffer.split_to(length).to_vec();
110
111        Ok(Some(payload))
112    }
113
114    /// Returns the number of bytes currently buffered.
115    #[must_use]
116    pub fn buffered_len(&self) -> usize {
117        self.buffer.len()
118    }
119
120    /// Returns true if the buffer is empty.
121    #[must_use]
122    pub fn is_empty(&self) -> bool {
123        self.buffer.is_empty()
124    }
125
126    /// Clears the internal buffer.
127    pub fn clear(&mut self) {
128        self.buffer.clear();
129    }
130}
131
132/// Writes length-prefixed frames to a byte buffer.
133#[derive(Debug, Default)]
134pub struct FrameWriter {
135    /// Buffer for constructing outgoing frames.
136    buffer: BytesMut,
137}
138
139impl FrameWriter {
140    /// Creates a new frame writer.
141    #[must_use]
142    pub fn new() -> Self {
143        Self {
144            buffer: BytesMut::new(),
145        }
146    }
147
148    /// Creates a new frame writer with the specified initial capacity.
149    #[must_use]
150    pub fn with_capacity(capacity: usize) -> Self {
151        Self {
152            buffer: BytesMut::with_capacity(capacity),
153        }
154    }
155
156    /// Writes a frame with the given payload.
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the payload exceeds `MAX_FRAME_SIZE`.
161    pub fn write_frame(&mut self, payload: &[u8]) -> Result<(), ControlError> {
162        if payload.len() > MAX_FRAME_SIZE {
163            return Err(ControlError::FrameTooLarge {
164                size: payload.len(),
165            });
166        }
167
168        // Reserve space for length prefix + payload
169        self.buffer.reserve(LENGTH_PREFIX_SIZE + payload.len());
170
171        // Write length prefix (big-endian u32)
172        #[allow(clippy::cast_possible_truncation)]
173        self.buffer.put_u32(payload.len() as u32);
174
175        // Write payload
176        self.buffer.extend_from_slice(payload);
177
178        Ok(())
179    }
180
181    /// Takes the accumulated bytes from the buffer.
182    ///
183    /// This clears the internal buffer and returns the data.
184    pub fn take_bytes(&mut self) -> Vec<u8> {
185        self.buffer.split().to_vec()
186    }
187
188    /// Returns a reference to the accumulated bytes.
189    #[must_use]
190    pub fn as_bytes(&self) -> &[u8] {
191        &self.buffer
192    }
193
194    /// Returns the number of bytes currently buffered.
195    #[must_use]
196    pub fn len(&self) -> usize {
197        self.buffer.len()
198    }
199
200    /// Returns true if the buffer is empty.
201    #[must_use]
202    pub fn is_empty(&self) -> bool {
203        self.buffer.is_empty()
204    }
205
206    /// Clears the internal buffer.
207    pub fn clear(&mut self) {
208        self.buffer.clear();
209    }
210}
211
212/// Encodes a payload into a length-prefixed frame.
213///
214/// This is a convenience function for one-shot encoding.
215///
216/// # Errors
217///
218/// Returns an error if the payload exceeds `MAX_FRAME_SIZE`.
219pub fn encode_frame(payload: &[u8]) -> Result<Vec<u8>, ControlError> {
220    let mut writer = FrameWriter::with_capacity(LENGTH_PREFIX_SIZE + payload.len());
221    writer.write_frame(payload)?;
222    Ok(writer.take_bytes())
223}
224
225/// Decodes a length-prefixed frame, returning the payload.
226///
227/// This is a convenience function for one-shot decoding when the complete
228/// frame is available.
229///
230/// # Errors
231///
232/// Returns an error if the frame is incomplete or exceeds `MAX_FRAME_SIZE`.
233pub fn decode_frame(data: &[u8]) -> Result<Vec<u8>, ControlError> {
234    if data.len() < LENGTH_PREFIX_SIZE {
235        return Err(ControlError::UnexpectedEof {
236            expected: LENGTH_PREFIX_SIZE,
237            actual: data.len(),
238        });
239    }
240
241    let length = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
242
243    if length > MAX_FRAME_SIZE {
244        return Err(ControlError::FrameTooLarge { size: length });
245    }
246
247    let total_length = LENGTH_PREFIX_SIZE + length;
248    if data.len() < total_length {
249        return Err(ControlError::UnexpectedEof {
250            expected: total_length,
251            actual: data.len(),
252        });
253    }
254
255    Ok(data[LENGTH_PREFIX_SIZE..total_length].to_vec())
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    fn frame_round_trip() {
264        let payload = b"hello, world!";
265        let encoded = encode_frame(payload).expect("encode should succeed");
266        let decoded = decode_frame(&encoded).expect("decode should succeed");
267        assert_eq!(decoded, payload);
268    }
269
270    #[test]
271    fn frame_reader_complete() {
272        let mut reader = FrameReader::new();
273        let payload = b"test payload";
274        let frame = encode_frame(payload).expect("encode should succeed");
275
276        reader.extend(&frame);
277        let result = reader.read_frame().expect("read should succeed");
278        assert_eq!(result, Some(payload.to_vec()));
279        assert!(reader.is_empty());
280    }
281
282    #[test]
283    fn frame_reader_incremental() {
284        let mut reader = FrameReader::new();
285        let payload = b"incremental test";
286        let frame = encode_frame(payload).expect("encode should succeed");
287
288        // Feed data byte by byte
289        for (i, &byte) in frame.iter().enumerate() {
290            reader.extend(&[byte]);
291
292            if i < frame.len() - 1 {
293                // Should return None until complete
294                let result = reader.read_frame().expect("read should succeed");
295                assert!(result.is_none(), "expected None at byte {i}");
296            }
297        }
298
299        // Now should have complete frame
300        let result = reader.read_frame().expect("read should succeed");
301        assert_eq!(result, Some(payload.to_vec()));
302    }
303
304    #[test]
305    fn frame_reader_multiple_frames() {
306        let mut reader = FrameReader::new();
307        let payload1 = b"first";
308        let payload2 = b"second";
309
310        let frame1 = encode_frame(payload1).expect("encode should succeed");
311        let frame2 = encode_frame(payload2).expect("encode should succeed");
312
313        // Extend with both frames at once
314        reader.extend(&frame1);
315        reader.extend(&frame2);
316
317        let result1 = reader.read_frame().expect("read should succeed");
318        assert_eq!(result1, Some(payload1.to_vec()));
319
320        let result2 = reader.read_frame().expect("read should succeed");
321        assert_eq!(result2, Some(payload2.to_vec()));
322
323        assert!(reader.is_empty());
324    }
325
326    #[test]
327    fn frame_too_large_on_encode() {
328        let payload = vec![0u8; MAX_FRAME_SIZE + 1];
329        let result = encode_frame(&payload);
330        assert!(matches!(result, Err(ControlError::FrameTooLarge { .. })));
331    }
332
333    #[test]
334    fn frame_too_large_on_decode() {
335        // Craft a frame with an oversized length prefix
336        let mut frame = Vec::new();
337        let bad_length = (MAX_FRAME_SIZE + 1) as u32;
338        frame.extend_from_slice(&bad_length.to_be_bytes());
339        frame.extend_from_slice(&[0u8; 100]); // Some payload
340
341        let mut reader = FrameReader::new();
342        reader.extend(&frame);
343        let result = reader.read_frame();
344        assert!(matches!(result, Err(ControlError::FrameTooLarge { .. })));
345    }
346
347    #[test]
348    fn decode_incomplete_length() {
349        let result = decode_frame(&[0, 0, 0]); // Only 3 bytes
350        assert!(matches!(result, Err(ControlError::UnexpectedEof { .. })));
351    }
352
353    #[test]
354    fn decode_incomplete_payload() {
355        let mut frame = Vec::new();
356        frame.extend_from_slice(&10u32.to_be_bytes()); // Says 10 bytes
357        frame.extend_from_slice(&[1, 2, 3, 4, 5]); // Only 5 bytes
358
359        let result = decode_frame(&frame);
360        assert!(matches!(result, Err(ControlError::UnexpectedEof { .. })));
361    }
362
363    #[test]
364    fn empty_frame() {
365        let encoded = encode_frame(&[]).expect("encode should succeed");
366        assert_eq!(encoded.len(), 4); // Just the length prefix
367
368        let decoded = decode_frame(&encoded).expect("decode should succeed");
369        assert!(decoded.is_empty());
370    }
371
372    #[test]
373    fn frame_writer_multiple_frames() {
374        let mut writer = FrameWriter::new();
375        writer.write_frame(b"one").expect("write should succeed");
376        writer.write_frame(b"two").expect("write should succeed");
377
378        let bytes = writer.take_bytes();
379
380        // Parse both frames back
381        let mut reader = FrameReader::new();
382        reader.extend(&bytes);
383
384        assert_eq!(
385            reader.read_frame().expect("read should succeed"),
386            Some(b"one".to_vec())
387        );
388        assert_eq!(
389            reader.read_frame().expect("read should succeed"),
390            Some(b"two".to_vec())
391        );
392    }
393
394    // Property-based tests
395    mod proptest_tests {
396        use super::*;
397        use proptest::prelude::*;
398
399        proptest! {
400            #![proptest_config(ProptestConfig::with_cases(1000))]
401
402            #[test]
403            fn frame_round_trip_arbitrary(payload in prop::collection::vec(any::<u8>(), 0..1024)) {
404                let encoded = encode_frame(&payload).expect("encode should succeed");
405                let decoded = decode_frame(&encoded).expect("decode should succeed");
406                prop_assert_eq!(payload, decoded);
407            }
408
409            #[test]
410            fn frame_reader_handles_arbitrary_payload(payload in prop::collection::vec(any::<u8>(), 0..1024)) {
411                let encoded = encode_frame(&payload).expect("encode should succeed");
412                let mut reader = FrameReader::new();
413                reader.extend(&encoded);
414                let result = reader.read_frame().expect("read should succeed");
415                prop_assert_eq!(result, Some(payload));
416            }
417
418            #[test]
419            fn frame_writer_produces_decodable_output(payloads in prop::collection::vec(prop::collection::vec(any::<u8>(), 0..256), 1..10)) {
420                let mut writer = FrameWriter::new();
421                for payload in &payloads {
422                    writer.write_frame(payload).expect("write should succeed");
423                }
424                let bytes = writer.take_bytes();
425
426                let mut reader = FrameReader::new();
427                reader.extend(&bytes);
428
429                for expected in payloads {
430                    let actual = reader.read_frame().expect("read should succeed");
431                    prop_assert_eq!(actual, Some(expected));
432                }
433            }
434
435            #[test]
436            fn incremental_feed_matches_bulk(payload in prop::collection::vec(any::<u8>(), 1..512)) {
437                let encoded = encode_frame(&payload).expect("encode should succeed");
438
439                // Bulk feed
440                let mut bulk_reader = FrameReader::new();
441                bulk_reader.extend(&encoded);
442                let bulk_result = bulk_reader.read_frame().expect("read should succeed");
443
444                // Incremental feed (byte by byte)
445                let mut inc_reader = FrameReader::new();
446                for &byte in &encoded {
447                    inc_reader.extend(&[byte]);
448                }
449                let inc_result = inc_reader.read_frame().expect("read should succeed");
450
451                prop_assert_eq!(bulk_result, inc_result);
452            }
453        }
454    }
455}