mirrord_protocol/
codec.rs

1use actix_codec::{Decoder, Encoder};
2use bincode::{error::DecodeError, Decode, Encode};
3use bytes::{Buf, BufMut, BytesMut};
4use std::io;
5use std::net::IpAddr;
6
7pub type ConnectionID = u16;
8pub type Port = u16;
9
10#[derive(Encode, Decode, Debug, PartialEq, Clone)]
11pub struct NewTCPConnection {
12    pub connection_id: ConnectionID,
13    pub address: IpAddr,
14    pub port: Port,
15}
16
17#[derive(Encode, Decode, Debug, PartialEq, Clone)]
18pub struct TCPData {
19    pub connection_id: ConnectionID,
20    pub data: Vec<u8>,
21}
22
23#[derive(Encode, Decode, Debug, PartialEq, Clone)]
24pub struct TCPClose {
25    pub connection_id: ConnectionID,
26}
27
28#[derive(Encode, Decode, Debug, PartialEq, Clone)]
29pub struct LogMessage {
30    pub message: String,
31}
32
33#[derive(Encode, Decode, Debug, PartialEq, Clone)]
34pub enum ClientMessage {
35    PortSubscribe(Vec<u16>),
36    Close,
37    ConnectionUnsubscribe(ConnectionID),
38}
39
40#[derive(Encode, Decode, Debug, PartialEq, Clone)]
41pub enum DaemonMessage {
42    Close,
43    NewTCPConnection(NewTCPConnection),
44    TCPData(TCPData),
45    TCPClose(TCPClose),
46    LogMessage(LogMessage),
47}
48
49pub struct ClientCodec {
50    config: bincode::config::Configuration,
51}
52
53impl ClientCodec {
54    pub fn new() -> Self {
55        ClientCodec {
56            config: bincode::config::standard(),
57        }
58    }
59}
60
61impl Default for ClientCodec {
62    fn default() -> Self {
63        ClientCodec::new()
64    }
65}
66
67impl Decoder for ClientCodec {
68    type Item = DaemonMessage;
69    type Error = io::Error;
70
71    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
72        match bincode::decode_from_slice(&src[..], self.config) {
73            Ok((message, read)) => {
74                src.advance(read);
75                Ok(Some(message))
76            }
77            Err(DecodeError::UnexpectedEnd) => Ok(None),
78            Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
79        }
80    }
81}
82
83impl Encoder<ClientMessage> for ClientCodec {
84    type Error = io::Error;
85
86    fn encode(&mut self, msg: ClientMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
87        let encoded = match bincode::encode_to_vec(msg, self.config) {
88            Ok(encoded) => encoded,
89            Err(err) => {
90                return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
91            }
92        };
93        dst.reserve(encoded.len());
94        dst.put(&encoded[..]);
95
96        Ok(())
97    }
98}
99
100pub struct DaemonCodec {
101    config: bincode::config::Configuration,
102}
103
104impl DaemonCodec {
105    pub fn new() -> Self {
106        DaemonCodec {
107            config: bincode::config::standard(),
108        }
109    }
110}
111
112impl Default for DaemonCodec {
113    fn default() -> Self {
114        DaemonCodec::new()
115    }
116}
117
118impl Decoder for DaemonCodec {
119    type Item = ClientMessage;
120    type Error = io::Error;
121
122    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
123        match bincode::decode_from_slice(&src[..], self.config) {
124            Ok((message, read)) => {
125                src.advance(read);
126                Ok(Some(message))
127            }
128            Err(DecodeError::UnexpectedEnd) => Ok(None),
129            Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
130        }
131    }
132}
133
134impl Encoder<DaemonMessage> for DaemonCodec {
135    type Error = io::Error;
136
137    fn encode(&mut self, msg: DaemonMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
138        let encoded = match bincode::encode_to_vec(msg, self.config) {
139            Ok(encoded) => encoded,
140            Err(err) => {
141                return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
142            }
143        };
144        dst.reserve(encoded.len());
145        dst.put(&encoded[..]);
146
147        Ok(())
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use bytes::BytesMut;
155
156    #[test]
157    fn sanity_client_encode_decode() {
158        let mut client_codec = ClientCodec::new();
159        let mut daemon_codec = DaemonCodec::new();
160        let mut buf = BytesMut::new();
161
162        let msg = ClientMessage::PortSubscribe(vec![1, 2, 3]);
163
164        client_codec.encode(msg.clone(), &mut buf).unwrap();
165
166        let decoded = daemon_codec.decode(&mut buf).unwrap().unwrap();
167
168        assert_eq!(decoded, msg);
169        assert!(buf.is_empty());
170    }
171
172    #[test]
173    fn sanity_daemon_encode_decode() {
174        let mut client_codec = ClientCodec::new();
175        let mut daemon_codec = DaemonCodec::new();
176        let mut buf = BytesMut::new();
177
178        let msg = DaemonMessage::TCPData(TCPData {
179            connection_id: 1,
180            data: vec![1, 2, 3],
181        });
182
183        daemon_codec.encode(msg.clone(), &mut buf).unwrap();
184
185        let decoded = client_codec.decode(&mut buf).unwrap().unwrap();
186
187        assert_eq!(decoded, msg);
188        assert!(buf.is_empty());
189    }
190
191    #[test]
192    fn decode_client_invalid_data() {
193        let mut codec = ClientCodec::new();
194        let mut buf = BytesMut::new();
195        buf.put_u8(254);
196
197        let res = codec.decode(&mut buf);
198        match res {
199            Ok(_) => panic!("Should have failed"),
200            Err(err) => assert_eq!(err.kind(), io::ErrorKind::Other),
201        }
202    }
203
204    #[test]
205    fn decode_client_partial_data() {
206        let mut codec = ClientCodec::new();
207        let mut buf = BytesMut::new();
208        buf.put_u8(1);
209
210        assert!(codec.decode(&mut buf).unwrap().is_none());
211    }
212
213    #[test]
214    fn decode_daemon_invalid_data() {
215        let mut codec = DaemonCodec::new();
216        let mut buf = BytesMut::new();
217        buf.put_u8(254);
218
219        let res = codec.decode(&mut buf);
220        match res {
221            Ok(_) => panic!("Should have failed"),
222            Err(err) => assert_eq!(err.kind(), io::ErrorKind::Other),
223        }
224    }
225
226    #[test]
227    fn decode_daemon_partial_data() {
228        let mut codec = DaemonCodec::new();
229        let mut buf = BytesMut::new();
230        buf.put_u8(0);
231
232        assert!(codec.decode(&mut buf).unwrap().is_none());
233    }
234}