1use std::io::{Error, ErrorKind, Result as IoResult};
17
18use tokio_io::codec::{Decoder, Encoder};
19use bytes::{BufMut, BytesMut};
20use serde_json::de::Deserializer;
21use serde_json::ser::to_vec;
22use serde_json::error::Error as SerdeError;
23
24use message::{decoded_to_parsed, from_slice, from_str, Message, Parsed};
25
26fn err_map(e: SerdeError) -> Error {
28 Error::new(ErrorKind::Other, e)
29}
30
31trait PositionCache {
33 fn position(&mut self) -> &mut usize;
34}
35
36fn encode_codec(msg: &Message, buf: &mut BytesMut) -> IoResult<()> {
38 let encoded = to_vec(&msg).map_err(err_map)?;
39 buf.reserve(encoded.len() + 1);
41 buf.put_slice(&encoded);
42 buf.put(b'\n');
43 Ok(())
44}
45
46fn decode_codec<Cache, Convert>(
47 cache: &mut Cache, buf: &mut BytesMut, convert: Convert
48) -> IoResult<Option<Parsed>>
49where
50 Cache: PositionCache,
51 Convert: FnOnce(&[u8]) -> Parsed,
52{
53 let start_pos = cache.position();
55 if let Some(i) = buf[*start_pos..].iter().position(|&b| b == b'\n') {
56 let end_pos = *start_pos + i;
57 let line = buf.split_to(end_pos);
58 buf.split_to(1);
59 *start_pos = 0;
61 Ok(Some(convert(&line)))
62 } else {
63 *start_pos = buf.len();
65 Ok(None)
66 }
67}
68
69#[derive(Debug, Default)]
77pub struct Line(usize);
78
79impl Line {
80 pub fn new() -> Self {
82 Line(0)
83 }
84}
85
86impl PositionCache for Line {
87 fn position(&mut self) -> &mut usize {
88 &mut self.0
89 }
90}
91
92impl Encoder for Line {
93 type Item = Message;
94 type Error = Error;
95 fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> IoResult<()> {
96 encode_codec(&msg, buf)
97 }
98}
99
100impl Decoder for Line {
101 type Item = Parsed;
102 type Error = Error;
103 fn decode(&mut self, src: &mut BytesMut) -> IoResult<Option<Parsed>> {
104 decode_codec(self, src, from_slice)
105 }
106}
107
108#[derive(Debug, Default)]
118pub struct DirtyLine(usize);
119
120impl DirtyLine {
121 pub fn new() -> Self {
123 DirtyLine(0)
124 }
125}
126
127impl PositionCache for DirtyLine {
128 fn position(&mut self) -> &mut usize {
129 &mut self.0
130 }
131}
132
133impl Decoder for DirtyLine {
134 type Item = Parsed;
135 type Error = Error;
136 fn decode(&mut self, src: &mut BytesMut) -> IoResult<Option<Parsed>> {
137 decode_codec(self, src, |bytes| {
138 from_str(String::from_utf8_lossy(bytes).as_ref())
139 })
140 }
141}
142
143impl Encoder for DirtyLine {
144 type Item = Message;
145 type Error = Error;
146 fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> IoResult<()> {
147 encode_codec(&msg, buf)
148 }
149}
150
151pub struct Boundary;
157
158impl Encoder for Boundary {
159 type Item = Message;
160 type Error = Error;
161 fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> IoResult<()> {
162 encode_codec(&msg, buf)
163 }
164}
165
166impl Decoder for Boundary {
167 type Item = Parsed;
168 type Error = Error;
169 fn decode(&mut self, src: &mut BytesMut) -> IoResult<Option<Parsed>> {
170 let (decoded, pos) = {
171 let mut deserializer = Deserializer::from_slice(src).into_iter();
172 let decoded = deserializer.next().and_then(|result| match result {
173 Err(ref e) if e.is_eof() => None,
174 other => Some(decoded_to_parsed(other)),
175 });
176 (decoded, deserializer.byte_offset())
177 };
178
179 src.split_to(pos);
181 Ok(decoded)
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use message::Broken;
189
190 #[test]
191 fn encode() {
192 let mut output = BytesMut::with_capacity(10);
193 let mut codec = Line::new();
194 let msg = Message::notification("notif".to_owned(), None);
195 let encoded = BytesMut::from(&b"{\"jsonrpc\":\"2.0\",\"method\":\"notif\"}\n"[..]);
196 codec.encode(msg.clone(), &mut output).unwrap();
197 assert_eq!(encoded, output);
198 let mut dirty_codec = DirtyLine::new();
199 output.clear();
200 dirty_codec.encode(msg, &mut output).unwrap();
201 assert_eq!(encoded, output);
202 }
203
204 fn get_buf(input: &[u8]) -> BytesMut {
205 BytesMut::from(input)
206 }
207
208 #[test]
209 fn decode() {
210 fn one(input: &[u8], rest: &[u8]) -> IoResult<Option<Parsed>> {
211 let mut codec = Line::new();
212 let mut buf = get_buf(input);
213 let result = codec.decode(&mut buf);
214 assert_eq!(rest, &buf);
215 let mut dirty_codec = DirtyLine::new();
217 let mut buf = get_buf(input);
218 let dirty = dirty_codec.decode(&mut buf);
219 assert_eq!(rest, &buf);
220 assert_eq!(result.as_ref().unwrap(), dirty.as_ref().unwrap());
221 result
222 }
223
224 let notif = Message::notification("notif".to_owned(), None);
225 let msgstring = Vec::from(&b"{\"jsonrpc\":\"2.0\",\"method\":\"notif\"}\n"[..]);
226 assert_eq!(one(&msgstring, b"").unwrap(), Some(Ok(notif.clone())));
228 let mut twomsgs = msgstring.clone();
230 twomsgs.extend_from_slice(&msgstring);
231 assert_eq!(one(&twomsgs, &msgstring).unwrap(), Some(Ok(notif.clone())));
232 let incomplete = Vec::from(&br#"{"jsonrpc": "2.0", "method":""#[..]);
234 let mut oneandhalf = msgstring.clone();
235 oneandhalf.extend_from_slice(&incomplete);
236 assert_eq!(
237 one(&oneandhalf, &incomplete).unwrap(),
238 Some(Ok(notif.clone()))
239 );
240 assert_eq!(one(&incomplete, &incomplete).unwrap(), None);
242 match one(b"{]\n", b"") {
244 Ok(Some(Err(Broken::SyntaxError(_)))) => (),
245 other => panic!("Something unexpected: {:?}", other),
246 };
247 }
248
249 #[test]
251 fn decode_nonunicode() {
252 let broken_input = b"{\"jsonrpc\":\"2.0\",\"method\":\"Hello \xF0\x90\x80World\"}\n";
253 let mut codec = Line::new();
254 let mut buf = get_buf(broken_input);
255 let result = codec.decode(&mut buf).unwrap();
257 match result {
258 Some(Err(Broken::SyntaxError(_))) => (),
259 other => panic!("Something unexpected: {:?}", other),
260 };
261 buf = get_buf(broken_input);
262 let mut dirty = DirtyLine::new();
264 let result = dirty.decode(&mut buf).unwrap();
265 assert_eq!(
266 result,
267 Some(Ok(Message::notification("Hello �World".to_owned(), None)))
268 );
269 }
270
271 #[test]
273 fn decode_boundary_short() {
274 let mut buf = get_buf(b"{\"jsonrpc\":\"");
275 assert!(Boundary.decode(&mut buf).unwrap().is_none());
276 assert_eq!(&buf, &b"{\"jsonrpc\":\""[..]);
277 }
278
279 #[test]
283 fn decode_boundary_prefix() {
284 let mut buf = get_buf(b"\n\n {\"jsonrpc\":\"2.0\",\"method\":\"notif\"}{\"");
285 assert_eq!(
286 Boundary.decode(&mut buf).unwrap().unwrap(),
287 Ok(Message::notification("notif".to_owned(), None))
288 );
289 assert_eq!(&buf, &b"{\""[..]);
290 }
291}