message_sink/
frame.rs

1use std::fmt::Display;
2
3#[derive(Debug)]
4pub struct Frame(Vec<u8>);
5
6#[derive(Debug)]
7pub enum ParseError {
8    NotReady,
9    Corrupt,
10}
11
12impl Display for ParseError {
13    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
14        match self {
15            Self::NotReady => write!(f, "Not ready"),
16            Self::Corrupt => write!(f, "Corrupt"),
17        }
18    }
19}
20
21impl Frame {
22    pub fn new(message: Vec<u8>) -> Self {
23        Self(message)
24    }
25    pub fn into_message(self) -> Vec<u8> {
26        self.0
27    }
28    /// Attempt to parse a message from a buffer, removing the bytes read if we successfully
29    /// parse the message (or if the buffer is corrupt)
30    /// Note: We do not implement TryFrom for this because that trait takes ownership of the
31    /// vector. We want to re-use the same vector across multiple invocations of this
32    /// function.
33    pub fn try_from(buffer: &mut Vec<u8>) -> std::result::Result<Frame, ParseError> {
34        if buffer.len() < 4 {
35            return Err(ParseError::NotReady);
36        }
37        let mut header: [u8; 4] = Default::default();
38        header[0] = buffer[0];
39        header[1] = buffer[1];
40        header[2] = buffer[2];
41        header[3] = buffer[3];
42        let size: usize = u32::from_le_bytes(header)
43            .try_into()
44            .map_err(|_| ParseError::Corrupt)?;
45        if size + 4 > buffer.len() {
46            return Err(ParseError::NotReady);
47        }
48        buffer.drain(0..4);
49        let mut message = Vec::new();
50        message.extend(buffer.drain(0..size));
51        Ok(Frame(message))
52    }
53}
54
55/// Serialize a Frame into a framed vector of bytes
56impl TryInto<Vec<u8>> for Frame {
57    type Error = ParseError;
58    fn try_into(self) -> std::result::Result<Vec<u8>, Self::Error> {
59        let size: u32 = self.0.len().try_into().map_err(|_| ParseError::Corrupt)?;
60        let header = size.to_le_bytes();
61        let mut result = Vec::new();
62        result.extend(header);
63        result.extend(self.0);
64        Ok(result)
65    }
66}
67
68#[cfg(test)]
69mod frame_test {
70    use rand::RngCore;
71
72    use super::*;
73
74    fn random(len: usize) -> Vec<u8> {
75        let mut bytes = vec![0; len];
76        rand::thread_rng().fill_bytes(&mut bytes);
77        bytes
78    }
79
80    #[test]
81    fn parse() {
82        let message = random(128);
83        let frame = Frame::new(message.clone());
84        let mut buffer: Vec<u8> = frame.try_into().unwrap();
85        assert_eq!(buffer.len(), message.len() + 4, "message wrapped in frame");
86        let parsed_frame = Frame::try_from(&mut buffer).unwrap();
87        assert_eq!(buffer.len(), 0, "consumed buffer");
88        let parsed_message = parsed_frame.into_message();
89        assert_eq!(message, parsed_message);
90    }
91
92    #[test]
93    fn not_ready() {
94        let message = random(128);
95        let frame = Frame::new(message.clone());
96        let mut buffer: Vec<u8> = frame.try_into().unwrap();
97        buffer.truncate(128);
98        let error = Frame::try_from(&mut buffer);
99        match error {
100            Err(ParseError::NotReady) => {}
101            Err(e) => panic!("unexpected error: {}", e),
102            Ok(_) => panic!("unexpected success"),
103        }
104        assert_eq!(buffer.len(), 128);
105    }
106
107    #[test]
108    fn parse_multiple() {
109        let messages = [random(128), random(128), random(128)];
110        let frames = messages.iter().map(|message| Frame::new(message.clone()));
111        let mut buffer: Vec<u8> = Vec::new();
112        for frame in frames {
113            let bytes: Vec<u8> = frame.try_into().unwrap();
114            buffer.extend(bytes);
115        }
116        let mut i = 0;
117        while let Ok(frame) = Frame::try_from(&mut buffer) {
118            let message = frame.into_message();
119            assert_eq!(messages[i], message);
120            i += 1;
121        }
122        assert_eq!(i, 3);
123        assert_eq!(buffer.len(), 0);
124    }
125
126    #[test]
127    fn parse_with_extra() {
128        let message = random(128);
129        let frame = Frame::new(message.clone());
130        let mut buffer: Vec<u8> = frame.try_into().unwrap();
131        buffer.extend(random(3));
132        if let Err(e) = Frame::try_from(&mut buffer) {
133            panic!("unexpected error: {}", e);
134        }
135        match Frame::try_from(&mut buffer) {
136            Err(ParseError::NotReady) => {}
137            Err(e) => panic!("unexpected error: {}", e),
138            Ok(_) => panic!("unexpected success"),
139        }
140        assert_eq!(buffer.len(), 3);
141    }
142}