tokio_jsonrpc/
codec.rs

1// Copyright 2017 tokio-jsonrpc Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8//! The codecs to encode and decode messages from a stream of bytes.
9//!
10//! You can choose to use either line separated one ([Line](struct.Line.html)) or
11//! boundary separated one ([Boundary](struct.Boundary.html)). The first one needs the
12//! messages to be separated by newlines and not to contain newlines in their representation. On
13//! the other hand, it can recover from syntax error in a message and you can respond with an error
14//! instead of terminating the connection.
15
16use std::io::{Error, ErrorKind, Result as IoResult};
17
18use tokio_io::codec::{Decoder, Encoder};
19use bytes::{BufMut, BytesMut};
20use serde_json::de::Deserializer;
21use serde_json::ser::to_vec;
22use serde_json::error::Error as SerdeError;
23
24use message::{decoded_to_parsed, from_slice, from_str, Message, Parsed};
25
26/// A helper to wrap the error
27fn err_map(e: SerdeError) -> Error {
28    Error::new(ErrorKind::Other, e)
29}
30
31/// A helper trait to unify `Line` and `DirtyLine`
32trait PositionCache {
33    fn position(&mut self) -> &mut usize;
34}
35
36/// An encoding function reused by [`Line`], [`DirtyLine`] and [`Boundary`]
37fn encode_codec(msg: &Message, buf: &mut BytesMut) -> IoResult<()> {
38    let encoded = to_vec(&msg).map_err(err_map)?;
39    // As discovered the hard way, we must not overwrite buf, but append to it.
40    buf.reserve(encoded.len() + 1);
41    buf.put_slice(&encoded);
42    buf.put(b'\n');
43    Ok(())
44}
45
46fn decode_codec<Cache, Convert>(
47    cache: &mut Cache, buf: &mut BytesMut, convert: Convert
48) -> IoResult<Option<Parsed>>
49where
50    Cache: PositionCache,
51    Convert: FnOnce(&[u8]) -> Parsed,
52{
53    // Where did we stop scanning before? Scan only the new part
54    let start_pos = cache.position();
55    if let Some(i) = buf[*start_pos..].iter().position(|&b| b == b'\n') {
56        let end_pos = *start_pos + i;
57        let line = buf.split_to(end_pos);
58        buf.split_to(1);
59        // We'll start from the beginning next time.
60        *start_pos = 0;
61        Ok(Some(convert(&line)))
62    } else {
63        // Mark where we ended scanning.
64        *start_pos = buf.len();
65        Ok(None)
66    }
67}
68
69/// A codec working with JSONRPC 2.0 messages.
70///
71/// This produces or encodes [Message](../message/enum.Message.hmtl). It separates the records by
72/// newlines, so it can recover from syntax error.s
73///
74/// Note that the produced items is a `Result`, to allow not terminating the stream on
75/// protocol-level errors.
76#[derive(Debug, Default)]
77pub struct Line(usize);
78
79impl Line {
80    /// A constructor
81    pub fn new() -> Self {
82        Line(0)
83    }
84}
85
86impl PositionCache for Line {
87    fn position(&mut self) -> &mut usize {
88        &mut self.0
89    }
90}
91
92impl Encoder for Line {
93    type Item = Message;
94    type Error = Error;
95    fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> IoResult<()> {
96        encode_codec(&msg, buf)
97    }
98}
99
100impl Decoder for Line {
101    type Item = Parsed;
102    type Error = Error;
103    fn decode(&mut self, src: &mut BytesMut) -> IoResult<Option<Parsed>> {
104        decode_codec(self, src, from_slice)
105    }
106}
107
108/// A codec working with JSONRPC 2.0 messages on top of badly encoded utf-8.
109///
110/// This works like the [Line](struct.Line.html) codec. However, it can cope with the input not
111/// being valid utf-8. That is arguably broken, nevertheless found in the wild and sometimes the
112/// only thing left to be done is to cope with it. This copes with it by running the input through
113/// the `String::from_utf8_lossy` conversion, effectively replacing anything that is not valid with
114/// these special utf-8 WTF question marks (U+FFFD).
115///
116/// In contrast, Line errors on such invalid inputs. Encoding is the same for both codecs, however.
117#[derive(Debug, Default)]
118pub struct DirtyLine(usize);
119
120impl DirtyLine {
121    /// A constructor
122    pub fn new() -> Self {
123        DirtyLine(0)
124    }
125}
126
127impl PositionCache for DirtyLine {
128    fn position(&mut self) -> &mut usize {
129        &mut self.0
130    }
131}
132
133impl Decoder for DirtyLine {
134    type Item = Parsed;
135    type Error = Error;
136    fn decode(&mut self, src: &mut BytesMut) -> IoResult<Option<Parsed>> {
137        decode_codec(self, src, |bytes| {
138            from_str(String::from_utf8_lossy(bytes).as_ref())
139        })
140    }
141}
142
143impl Encoder for DirtyLine {
144    type Item = Message;
145    type Error = Error;
146    fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> IoResult<()> {
147        encode_codec(&msg, buf)
148    }
149}
150
151/// A codec working with JSONRPC 2.0 messages.
152///
153/// This produces or encodes [Message](../message/enum.Message.hmtl). It takes the JSON object
154/// boundaries, so it works with both newline-separated and object-separated encoding. It produces
155/// newline-separated stream, which is more generic.
156pub struct Boundary;
157
158impl Encoder for Boundary {
159    type Item = Message;
160    type Error = Error;
161    fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> IoResult<()> {
162        encode_codec(&msg, buf)
163    }
164}
165
166impl Decoder for Boundary {
167    type Item = Parsed;
168    type Error = Error;
169    fn decode(&mut self, src: &mut BytesMut) -> IoResult<Option<Parsed>> {
170        let (decoded, pos) = {
171            let mut deserializer = Deserializer::from_slice(src).into_iter();
172            let decoded = deserializer.next().and_then(|result| match result {
173                Err(ref e) if e.is_eof() => None,
174                other => Some(decoded_to_parsed(other)),
175            });
176            (decoded, deserializer.byte_offset())
177        };
178
179        // It did read some data from the input. Find out how many and cut them off.
180        src.split_to(pos);
181        Ok(decoded)
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use message::Broken;
189
190    #[test]
191    fn encode() {
192        let mut output = BytesMut::with_capacity(10);
193        let mut codec = Line::new();
194        let msg = Message::notification("notif".to_owned(), None);
195        let encoded = BytesMut::from(&b"{\"jsonrpc\":\"2.0\",\"method\":\"notif\"}\n"[..]);
196        codec.encode(msg.clone(), &mut output).unwrap();
197        assert_eq!(encoded, output);
198        let mut dirty_codec = DirtyLine::new();
199        output.clear();
200        dirty_codec.encode(msg, &mut output).unwrap();
201        assert_eq!(encoded, output);
202    }
203
204    fn get_buf(input: &[u8]) -> BytesMut {
205        BytesMut::from(input)
206    }
207
208    #[test]
209    fn decode() {
210        fn one(input: &[u8], rest: &[u8]) -> IoResult<Option<Parsed>> {
211            let mut codec = Line::new();
212            let mut buf = get_buf(input);
213            let result = codec.decode(&mut buf);
214            assert_eq!(rest, &buf);
215            // On all the valid inputs, DirtyLine should act the same as Line
216            let mut dirty_codec = DirtyLine::new();
217            let mut buf = get_buf(input);
218            let dirty = dirty_codec.decode(&mut buf);
219            assert_eq!(rest, &buf);
220            assert_eq!(result.as_ref().unwrap(), dirty.as_ref().unwrap());
221            result
222        }
223
224        let notif = Message::notification("notif".to_owned(), None);
225        let msgstring = Vec::from(&b"{\"jsonrpc\":\"2.0\",\"method\":\"notif\"}\n"[..]);
226        // A single message, nothing is left
227        assert_eq!(one(&msgstring, b"").unwrap(), Some(Ok(notif.clone())));
228        // The first message is decoded, the second stays in the buffer
229        let mut twomsgs = msgstring.clone();
230        twomsgs.extend_from_slice(&msgstring);
231        assert_eq!(one(&twomsgs, &msgstring).unwrap(), Some(Ok(notif.clone())));
232        // The second message is incomplete, but stays there
233        let incomplete = Vec::from(&br#"{"jsonrpc": "2.0", "method":""#[..]);
234        let mut oneandhalf = msgstring.clone();
235        oneandhalf.extend_from_slice(&incomplete);
236        assert_eq!(
237            one(&oneandhalf, &incomplete).unwrap(),
238            Some(Ok(notif.clone()))
239        );
240        // An incomplete message ‒ nothing gets out and everything stays
241        assert_eq!(one(&incomplete, &incomplete).unwrap(), None);
242        // A syntax error is reported as an error (and eaten, but that's no longer interesting)
243        match one(b"{]\n", b"") {
244            Ok(Some(Err(Broken::SyntaxError(_)))) => (),
245            other => panic!("Something unexpected: {:?}", other),
246        };
247    }
248
249    /// Test with invalid utf-8 in a string
250    #[test]
251    fn decode_nonunicode() {
252        let broken_input = b"{\"jsonrpc\":\"2.0\",\"method\":\"Hello \xF0\x90\x80World\"}\n";
253        let mut codec = Line::new();
254        let mut buf = get_buf(broken_input);
255        // The ordinary line codec gives up
256        let result = codec.decode(&mut buf).unwrap();
257        match result {
258            Some(Err(Broken::SyntaxError(_))) => (),
259            other => panic!("Something unexpected: {:?}", other),
260        };
261        buf = get_buf(broken_input);
262        // But the dirty one just keeps going on
263        let mut dirty = DirtyLine::new();
264        let result = dirty.decode(&mut buf).unwrap();
265        assert_eq!(
266            result,
267            Some(Ok(Message::notification("Hello �World".to_owned(), None)))
268        );
269    }
270
271    /// Not enough data for a whole message
272    #[test]
273    fn decode_boundary_short() {
274        let mut buf = get_buf(b"{\"jsonrpc\":\"");
275        assert!(Boundary.decode(&mut buf).unwrap().is_none());
276        assert_eq!(&buf, &b"{\"jsonrpc\":\""[..]);
277    }
278
279    /// There's more than one message, but not two.
280    ///
281    /// And they are not separated by whitespace (to test the boundary decoding).
282    #[test]
283    fn decode_boundary_prefix() {
284        let mut buf = get_buf(b"\n\n {\"jsonrpc\":\"2.0\",\"method\":\"notif\"}{\"");
285        assert_eq!(
286            Boundary.decode(&mut buf).unwrap().unwrap(),
287            Ok(Message::notification("notif".to_owned(), None))
288        );
289        assert_eq!(&buf, &b"{\""[..]);
290    }
291}