asynchronous_codec/codec/
json.rs

1use std::marker::PhantomData;
2
3use crate::{Decoder, Encoder};
4use bytes::{Buf, BufMut, BytesMut};
5
6use serde::{Deserialize, Serialize};
7
8/// A codec for JSON encoding and decoding using serde_json
9/// Enc is the type to encode, Dec is the type to decode
10/// ```
11/// # use futures::{executor, SinkExt, TryStreamExt};
12/// # use futures::io::Cursor;
13/// use serde::{Serialize, Deserialize};
14/// use asynchronous_codec::{JsonCodec, Framed};
15///
16/// #[derive(Serialize, Deserialize)]
17/// struct Something {
18///     pub data: u16,
19/// }
20///
21/// async move {
22///     # let mut buf = vec![];
23///     # let stream = Cursor::new(&mut buf);
24///     // let stream = ...
25///     let codec = JsonCodec::<Something, Something>::new();
26///     let mut framed = Framed::new(stream, codec);
27///
28///     while let Some(s) = framed.try_next().await.unwrap() {
29///         println!("{:?}", s.data);
30///     }
31/// };
32/// ```
33#[derive(Debug, PartialEq)]
34pub struct JsonCodec<Enc, Dec> {
35    enc: PhantomData<Enc>,
36    dec: PhantomData<Dec>,
37}
38
39/// JSON Codec error enumeration
40#[derive(Debug)]
41pub enum JsonCodecError {
42    /// IO error
43    Io(std::io::Error),
44    /// JSON error
45    Json(serde_json::Error),
46}
47
48impl std::fmt::Display for JsonCodecError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            JsonCodecError::Io(e) => write!(f, "I/O error: {}", e),
52            JsonCodecError::Json(e) => write!(f, "JSON error: {}", e),
53        }
54    }
55}
56
57impl std::error::Error for JsonCodecError {
58    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
59        match self {
60            JsonCodecError::Io(ref e) => Some(e),
61            JsonCodecError::Json(ref e) => Some(e),
62        }
63    }
64}
65
66impl From<std::io::Error> for JsonCodecError {
67    fn from(e: std::io::Error) -> JsonCodecError {
68        JsonCodecError::Io(e)
69    }
70}
71
72impl From<serde_json::Error> for JsonCodecError {
73    fn from(e: serde_json::Error) -> JsonCodecError {
74        JsonCodecError::Json(e)
75    }
76}
77
78impl<Enc, Dec> JsonCodec<Enc, Dec>
79where
80    for<'de> Dec: Deserialize<'de> + 'static,
81    for<'de> Enc: Serialize + 'static,
82{
83    /// Creates a new `JsonCodec` with the associated types
84    pub fn new() -> JsonCodec<Enc, Dec> {
85        JsonCodec {
86            enc: PhantomData,
87            dec: PhantomData,
88        }
89    }
90}
91
92impl<Enc, Dec> Clone for JsonCodec<Enc, Dec>
93where
94    for<'de> Dec: Deserialize<'de> + 'static,
95    for<'de> Enc: Serialize + 'static,
96{
97    /// Clone creates a new instance of the `JsonCodec`
98    fn clone(&self) -> JsonCodec<Enc, Dec> {
99        JsonCodec::new()
100    }
101}
102
103/// Decoder impl parses json objects from bytes
104impl<Enc, Dec> Decoder for JsonCodec<Enc, Dec>
105where
106    for<'de> Dec: Deserialize<'de> + 'static,
107    for<'de> Enc: Serialize + 'static,
108{
109    type Item = Dec;
110    type Error = JsonCodecError;
111
112    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
113        // Build streaming JSON iterator over data
114        let de = serde_json::Deserializer::from_slice(&buf);
115        let mut iter = de.into_iter::<Dec>();
116
117        // Attempt to fetch an item and generate response
118        let item = match iter.next() {
119            Some(Ok(item)) => item,
120            Some(Err(ref e)) if e.is_eof() => return Ok(None),
121            Some(Err(e)) => return Err(e.into()),
122            None => return Ok(None),
123        };
124
125        // Update offset from iterator
126        let offset = iter.byte_offset();
127
128        // Advance buffer
129        buf.advance(offset);
130
131        Ok(Some(item))
132    }
133}
134
135/// Encoder impl encodes object streams to bytes
136impl<Enc, Dec> Encoder for JsonCodec<Enc, Dec>
137where
138    for<'de> Dec: Deserialize<'de> + 'static,
139    for<'de> Enc: Serialize + 'static,
140{
141    type Item<'a> = Enc;
142    type Error = JsonCodecError;
143
144    fn encode(&mut self, data: Self::Item<'_>, buf: &mut BytesMut) -> Result<(), Self::Error> {
145        // Encode json
146        let j = serde_json::to_string(&data)?;
147
148        // Write to buffer
149        buf.reserve(j.len());
150        buf.put_slice(&j.as_bytes());
151
152        Ok(())
153    }
154}
155
156impl<Enc, Dec> Default for JsonCodec<Enc, Dec>
157where
158    for<'de> Dec: Deserialize<'de> + 'static,
159    for<'de> Enc: Serialize + 'static,
160{
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166#[cfg(test)]
167mod test {
168    use bytes::BytesMut;
169    use serde::{Deserialize, Serialize};
170
171    use super::JsonCodec;
172    use crate::{Decoder, Encoder};
173
174    #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
175    struct TestStruct {
176        pub name: String,
177        pub data: u16,
178    }
179
180    #[test]
181    fn json_codec_encode_decode() {
182        let mut codec = JsonCodec::<TestStruct, TestStruct>::new();
183        let mut buff = BytesMut::new();
184
185        let item1 = TestStruct {
186            name: "Test name".to_owned(),
187            data: 16,
188        };
189        codec.encode(item1.clone(), &mut buff).unwrap();
190
191        let item2 = codec.decode(&mut buff).unwrap().unwrap();
192        assert_eq!(item1, item2);
193
194        assert_eq!(codec.decode(&mut buff).unwrap(), None);
195
196        assert_eq!(buff.len(), 0);
197    }
198
199    #[test]
200    fn json_codec_partial_decode() {
201        let mut codec = JsonCodec::<TestStruct, TestStruct>::new();
202        let mut buff = BytesMut::new();
203
204        let item1 = TestStruct {
205            name: "Test name".to_owned(),
206            data: 34,
207        };
208        codec.encode(item1, &mut buff).unwrap();
209
210        let mut start = buff.clone().split_to(4);
211        assert_eq!(codec.decode(&mut start).unwrap(), None);
212
213        codec.decode(&mut buff).unwrap().unwrap();
214
215        assert_eq!(buff.len(), 0);
216    }
217
218    #[test]
219    fn json_codec_eof_reached() {
220        let mut codec = JsonCodec::<TestStruct, TestStruct>::new();
221        let mut buff = BytesMut::new();
222
223        let item1 = TestStruct {
224            name: "Test name".to_owned(),
225            data: 34,
226        };
227        codec.encode(item1.clone(), &mut buff).unwrap();
228
229        // Split the buffer into two.
230        let mut buff_start = buff.clone().split_to(4);
231        let buff_end = buff.clone().split_off(4);
232
233        // Attempt to decode the first half of the buffer. This should return `Ok(None)` and not
234        // advance the buffer.
235        assert_eq!(codec.decode(&mut buff_start).unwrap(), None);
236        assert_eq!(buff_start.len(), 4);
237
238        // Combine the buffer back together.
239        buff_start.extend(buff_end.iter());
240
241        // It should now decode successfully.
242        let item2 = codec.decode(&mut buff).unwrap().unwrap();
243        assert_eq!(item1, item2);
244    }
245
246    #[test]
247    fn json_codec_decode_error() {
248        let mut codec = JsonCodec::<TestStruct, TestStruct>::new();
249        let mut buff = BytesMut::new();
250
251        let item1 = TestStruct {
252            name: "Test name".to_owned(),
253            data: 34,
254        };
255        codec.encode(item1.clone(), &mut buff).unwrap();
256
257        // Split the end off the buffer.
258        let mut buff_end = buff.clone().split_off(4);
259        let buff_end_length = buff_end.len();
260
261        // Attempting to decode should return an error.
262        assert!(codec.decode(&mut buff_end).is_err());
263        assert_eq!(buff_end.len(), buff_end_length);
264    }
265}