Skip to main content

oaat_core/
codec.rs

1use crate::error::OaatError;
2use crate::message::Message;
3use bytes::{Buf, BytesMut};
4
5const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024; // 16 MB
6
7pub struct FrameCodec {
8    buf: BytesMut,
9}
10
11impl FrameCodec {
12    pub fn new() -> Self {
13        Self {
14            buf: BytesMut::with_capacity(8192),
15        }
16    }
17
18    pub fn feed(&mut self, data: &[u8]) {
19        self.buf.extend_from_slice(data);
20    }
21
22    pub fn decode_next(&mut self) -> Result<Option<Message>, OaatError> {
23        if self.buf.len() < 4 {
24            return Ok(None);
25        }
26
27        let len = u32::from_be_bytes(self.buf[..4].try_into().unwrap()) as usize;
28
29        if len > MAX_FRAME_SIZE {
30            return Err(OaatError::MessageTooLarge(len));
31        }
32
33        if self.buf.len() < 4 + len {
34            return Ok(None);
35        }
36
37        self.buf.advance(4);
38        let json_bytes = self.buf.split_to(len);
39        let msg = Message::decode_json(&json_bytes)?;
40        Ok(Some(msg))
41    }
42
43    pub fn encode(msg: &Message) -> Vec<u8> {
44        msg.encode_framed()
45    }
46}
47
48impl Default for FrameCodec {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57    use crate::message::Hello;
58
59    #[test]
60    fn codec_feed_and_decode() {
61        let msg = Message::Hello(Hello {
62            protocol_version: 1,
63            controller_id: "ctrl-1".into(),
64            controller_name: "Test".into(),
65            clock_port: 9742,
66            features: vec![],
67        });
68
69        let frame = FrameCodec::encode(&msg);
70        let mut codec = FrameCodec::new();
71
72        // Feed in two parts to test buffering
73        let mid = frame.len() / 2;
74        codec.feed(&frame[..mid]);
75        assert!(codec.decode_next().unwrap().is_none());
76
77        codec.feed(&frame[mid..]);
78        let decoded = codec.decode_next().unwrap().unwrap();
79        match decoded {
80            Message::Hello(h) => assert_eq!(h.controller_id, "ctrl-1"),
81            _ => panic!("wrong variant"),
82        }
83    }
84
85    #[test]
86    fn codec_multiple_messages() {
87        let mut codec = FrameCodec::new();
88        for i in 0..5 {
89            let msg = Message::Hello(Hello {
90                protocol_version: 1,
91                controller_id: format!("ctrl-{i}"),
92                controller_name: "Test".into(),
93                clock_port: 9742,
94                features: vec![],
95            });
96            codec.feed(&FrameCodec::encode(&msg));
97        }
98
99        for i in 0..5 {
100            let decoded = codec.decode_next().unwrap().unwrap();
101            match decoded {
102                Message::Hello(h) => assert_eq!(h.controller_id, format!("ctrl-{i}")),
103                _ => panic!("wrong variant"),
104            }
105        }
106        assert!(codec.decode_next().unwrap().is_none());
107    }
108}