1use actix_codec::{Decoder, Encoder};
2use bincode::{error::DecodeError, Decode, Encode};
3use bytes::{Buf, BufMut, BytesMut};
4use std::io;
5use std::net::IpAddr;
6
7pub type ConnectionID = u16;
8pub type Port = u16;
9
10#[derive(Encode, Decode, Debug, PartialEq, Clone)]
11pub struct NewTCPConnection {
12 pub connection_id: ConnectionID,
13 pub address: IpAddr,
14 pub port: Port,
15}
16
17#[derive(Encode, Decode, Debug, PartialEq, Clone)]
18pub struct TCPData {
19 pub connection_id: ConnectionID,
20 pub data: Vec<u8>,
21}
22
23#[derive(Encode, Decode, Debug, PartialEq, Clone)]
24pub struct TCPClose {
25 pub connection_id: ConnectionID,
26}
27
28#[derive(Encode, Decode, Debug, PartialEq, Clone)]
29pub struct LogMessage {
30 pub message: String,
31}
32
33#[derive(Encode, Decode, Debug, PartialEq, Clone)]
34pub enum ClientMessage {
35 PortSubscribe(Vec<u16>),
36 Close,
37 ConnectionUnsubscribe(ConnectionID),
38}
39
40#[derive(Encode, Decode, Debug, PartialEq, Clone)]
41pub enum DaemonMessage {
42 Close,
43 NewTCPConnection(NewTCPConnection),
44 TCPData(TCPData),
45 TCPClose(TCPClose),
46 LogMessage(LogMessage),
47}
48
49pub struct ClientCodec {
50 config: bincode::config::Configuration,
51}
52
53impl ClientCodec {
54 pub fn new() -> Self {
55 ClientCodec {
56 config: bincode::config::standard(),
57 }
58 }
59}
60
61impl Default for ClientCodec {
62 fn default() -> Self {
63 ClientCodec::new()
64 }
65}
66
67impl Decoder for ClientCodec {
68 type Item = DaemonMessage;
69 type Error = io::Error;
70
71 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
72 match bincode::decode_from_slice(&src[..], self.config) {
73 Ok((message, read)) => {
74 src.advance(read);
75 Ok(Some(message))
76 }
77 Err(DecodeError::UnexpectedEnd) => Ok(None),
78 Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
79 }
80 }
81}
82
83impl Encoder<ClientMessage> for ClientCodec {
84 type Error = io::Error;
85
86 fn encode(&mut self, msg: ClientMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
87 let encoded = match bincode::encode_to_vec(msg, self.config) {
88 Ok(encoded) => encoded,
89 Err(err) => {
90 return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
91 }
92 };
93 dst.reserve(encoded.len());
94 dst.put(&encoded[..]);
95
96 Ok(())
97 }
98}
99
100pub struct DaemonCodec {
101 config: bincode::config::Configuration,
102}
103
104impl DaemonCodec {
105 pub fn new() -> Self {
106 DaemonCodec {
107 config: bincode::config::standard(),
108 }
109 }
110}
111
112impl Default for DaemonCodec {
113 fn default() -> Self {
114 DaemonCodec::new()
115 }
116}
117
118impl Decoder for DaemonCodec {
119 type Item = ClientMessage;
120 type Error = io::Error;
121
122 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
123 match bincode::decode_from_slice(&src[..], self.config) {
124 Ok((message, read)) => {
125 src.advance(read);
126 Ok(Some(message))
127 }
128 Err(DecodeError::UnexpectedEnd) => Ok(None),
129 Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
130 }
131 }
132}
133
134impl Encoder<DaemonMessage> for DaemonCodec {
135 type Error = io::Error;
136
137 fn encode(&mut self, msg: DaemonMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
138 let encoded = match bincode::encode_to_vec(msg, self.config) {
139 Ok(encoded) => encoded,
140 Err(err) => {
141 return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
142 }
143 };
144 dst.reserve(encoded.len());
145 dst.put(&encoded[..]);
146
147 Ok(())
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use bytes::BytesMut;
155
156 #[test]
157 fn sanity_client_encode_decode() {
158 let mut client_codec = ClientCodec::new();
159 let mut daemon_codec = DaemonCodec::new();
160 let mut buf = BytesMut::new();
161
162 let msg = ClientMessage::PortSubscribe(vec![1, 2, 3]);
163
164 client_codec.encode(msg.clone(), &mut buf).unwrap();
165
166 let decoded = daemon_codec.decode(&mut buf).unwrap().unwrap();
167
168 assert_eq!(decoded, msg);
169 assert!(buf.is_empty());
170 }
171
172 #[test]
173 fn sanity_daemon_encode_decode() {
174 let mut client_codec = ClientCodec::new();
175 let mut daemon_codec = DaemonCodec::new();
176 let mut buf = BytesMut::new();
177
178 let msg = DaemonMessage::TCPData(TCPData {
179 connection_id: 1,
180 data: vec![1, 2, 3],
181 });
182
183 daemon_codec.encode(msg.clone(), &mut buf).unwrap();
184
185 let decoded = client_codec.decode(&mut buf).unwrap().unwrap();
186
187 assert_eq!(decoded, msg);
188 assert!(buf.is_empty());
189 }
190
191 #[test]
192 fn decode_client_invalid_data() {
193 let mut codec = ClientCodec::new();
194 let mut buf = BytesMut::new();
195 buf.put_u8(254);
196
197 let res = codec.decode(&mut buf);
198 match res {
199 Ok(_) => panic!("Should have failed"),
200 Err(err) => assert_eq!(err.kind(), io::ErrorKind::Other),
201 }
202 }
203
204 #[test]
205 fn decode_client_partial_data() {
206 let mut codec = ClientCodec::new();
207 let mut buf = BytesMut::new();
208 buf.put_u8(1);
209
210 assert!(codec.decode(&mut buf).unwrap().is_none());
211 }
212
213 #[test]
214 fn decode_daemon_invalid_data() {
215 let mut codec = DaemonCodec::new();
216 let mut buf = BytesMut::new();
217 buf.put_u8(254);
218
219 let res = codec.decode(&mut buf);
220 match res {
221 Ok(_) => panic!("Should have failed"),
222 Err(err) => assert_eq!(err.kind(), io::ErrorKind::Other),
223 }
224 }
225
226 #[test]
227 fn decode_daemon_partial_data() {
228 let mut codec = DaemonCodec::new();
229 let mut buf = BytesMut::new();
230 buf.put_u8(0);
231
232 assert!(codec.decode(&mut buf).unwrap().is_none());
233 }
234}