tokio_serde_cbor/
lib.rs

1//! This crate integrates CBOR into Tokio.
2//!
3//! This crate provides a codec for framing information as CBOR encoded messages. It allows
4//! encoding and decoding arbitrary [serde](https://serde.rs) ready types. It can be used by
5//! plugging the codec into the connection's `framed` method to get stream and sink of the desired
6//! items.
7//!
8//! The encoded and decoded items are independent (you may want to encode references and decode
9//! owned data, or the protocol might be asymetric). If you want just one direction, you can use
10//! [`Decoder`](struct.Decoder.html) or [`Encoder`](struct.Encoder.html). If you want both, you
11//! better use [`Codec`](struct.Codec.html).
12//!
13//! Note that this is useful if the CBOR itself defines the frames. If the messages are delimited
14//! in some other way (eg. length-prefix encoding) and CBOR is only the payload, you'd use a codec
15//! for the other framing and use `.map` on the received stream and sink to convert the messages.
16
17use std::default::Default;
18use std::error::Error as ErrorTrait;
19use std::fmt::{Display, Formatter, Result as FmtResult};
20use std::io::{Error as IoError, Read, Result as IoResult};
21use std::marker::PhantomData;
22
23use bytes::{Buf, BufMut, BytesMut};
24use serde::{Deserialize, Serialize};
25use serde_cbor::de::{Deserializer, IoRead};
26use serde_cbor::error::Error as CborError;
27use serde_cbor::ser::{IoWrite, Serializer};
28use tokio_util::codec::{Decoder as IoDecoder, Encoder as IoEncoder};
29
30/// Errors returned by encoding and decoding.
31#[derive(Debug)]
32#[non_exhaustive]
33pub enum Error {
34    Io(IoError),
35    Cbor(CborError),
36}
37
38impl From<IoError> for Error {
39    fn from(error: IoError) -> Self {
40        Error::Io(error)
41    }
42}
43
44impl From<CborError> for Error {
45    fn from(error: CborError) -> Self {
46        Error::Cbor(error)
47    }
48}
49
50impl Display for Error {
51    fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
52        match self {
53            Error::Io(e) => e.fmt(fmt),
54            Error::Cbor(e) => e.fmt(fmt),
55        }
56    }
57}
58
59impl ErrorTrait for Error {
60    fn cause(&self) -> Option<&dyn ErrorTrait> {
61        match self {
62            Error::Io(e) => Some(e),
63            Error::Cbor(e) => Some(e),
64        }
65    }
66}
67
68/// A `Read` wrapper that also counts the used bytes.
69///
70/// This wraps a `Read` into another `Read` that keeps track of how many bytes were read. This is
71/// needed, as there's no way to get the position out of the CBOR decoder.
72struct Counted<'a, R: 'a> {
73    r: &'a mut R,
74    pos: &'a mut usize,
75}
76
77impl<'a, R: Read> Read for Counted<'a, R> {
78    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
79        match self.r.read(buf) {
80            Ok(size) => {
81                *self.pos += size;
82                Ok(size)
83            }
84            e => e,
85        }
86    }
87}
88
89/// CBOR based decoder.
90///
91/// This decoder can be used with `tokio_io`'s `Framed` to decode CBOR encoded frames. Anything
92/// that is `serde`s `Deserialize` can be decoded this way.
93#[derive(Clone, Debug)]
94pub struct Decoder<Item> {
95    _data: PhantomData<fn() -> Item>,
96}
97
98impl<'de, Item: Deserialize<'de>> Decoder<Item> {
99    /// Creates a new decoder.
100    pub fn new() -> Self {
101        Self { _data: PhantomData }
102    }
103}
104
105impl<'de, Item: Deserialize<'de>> Default for Decoder<Item> {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl<'de, Item: Deserialize<'de>> IoDecoder for Decoder<Item> {
112    type Item = Item;
113    type Error = Error;
114    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Item>, Error> {
115        let mut pos = 0;
116        let result = {
117            let mut slice: &[u8] = src;
118            let reader = Counted {
119                r: &mut slice,
120                pos: &mut pos,
121            };
122            let reader = IoRead::new(reader);
123            // Use the deserializer directly, instead of using `deserialize_from`. We explicitly do
124            // *not* want to check that there are no trailing bytes ‒ there may be, and they are
125            // the next frame.
126            let mut deserializer = Deserializer::new(reader);
127            Item::deserialize(&mut deserializer)
128        };
129        match result {
130            // If we read the item, we also need to consume the corresponding bytes.
131            Ok(item) => {
132                src.advance(pos);
133                Ok(Some(item))
134            }
135            // Sometimes the EOF is signalled as IO error
136            Err(ref error) if error.is_eof() => Ok(None),
137            // Any other error is simply passed through.
138            Err(e) => Err(e.into()),
139        }
140    }
141}
142
143/// Describes the behaviour of self-describe tags.
144///
145/// CBOR defines a tag which can be used to recognize a document as being CBOR (it's sometimes
146/// called „magic“). This specifies if it should be present when encoding.
147#[derive(Clone, Debug, Eq, PartialEq)]
148pub enum SdMode {
149    /// Places the tag in front of each encoded frame.
150    Always,
151    /// Places the tag in front of the first encoded frame.
152    Once,
153    /// Doesn't place the tag at all.
154    Never,
155}
156
157/// CBOR based encoder.
158///
159/// This encoder can be used with `tokio_io`'s `Framed` to encode CBOR frames. Anything
160/// that is `serde`s `Serialize` can be encoded this way (at least in theory, some values return
161/// errors when attempted to serialize).
162#[derive(Clone, Debug)]
163pub struct Encoder<Item> {
164    _data: PhantomData<fn(Item)>,
165    sd: SdMode,
166    packed: bool,
167}
168
169impl<Item: Serialize> Encoder<Item> {
170    /// Creates a new encoder.
171    ///
172    /// By default, it doesn't do packed encoding (it includes struct field names) and it doesn't
173    /// prefix the frames with self-describe tag.
174    pub fn new() -> Self {
175        Self {
176            _data: PhantomData,
177            sd: SdMode::Never,
178            packed: false,
179        }
180    }
181    /// Turns the encoder into one with confifured self-describe behaviour.
182    pub fn sd(self, sd: SdMode) -> Self {
183        Self { sd, ..self }
184    }
185    /// Turns the encoder into one with configured packed encoding.
186    ///
187    /// If `packed` is true, it omits the field names from the encoded data. That makes it smaller,
188    /// but it also means the decoding end must know the exact order of fields and it can't be
189    /// something like python, which would want to get a dictionary out of it.
190    pub fn packed(self, packed: bool) -> Self {
191        Self { packed, ..self }
192    }
193}
194
195impl<Item: Serialize> Default for Encoder<Item> {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201impl<Item: Serialize> IoEncoder<Item> for Encoder<Item> {
202    type Error = Error;
203    fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Error> {
204        let mut serializer = if self.packed {
205            Serializer::new(IoWrite::new(dst.writer())).packed_format()
206        } else {
207            Serializer::new(IoWrite::new(dst.writer()))
208        };
209        if self.sd != SdMode::Never {
210            serializer.self_describe()?;
211        }
212        if self.sd == SdMode::Once {
213            self.sd = SdMode::Never;
214        }
215        item.serialize(&mut serializer).map_err(Into::into)
216    }
217}
218
219/// Cbor serializer and deserializer.
220///
221/// This is just a combined [`Decoder`](struct.Decoder.html) and [`Encoder`](struct.Encoder.html).
222#[derive(Clone, Debug)]
223pub struct Codec<Dec, Enc> {
224    dec: Decoder<Dec>,
225    enc: Encoder<Enc>,
226}
227
228impl<'de, Dec: Deserialize<'de>, Enc: Serialize> Codec<Dec, Enc> {
229    /// Creates a new codec
230    pub fn new() -> Self {
231        Self {
232            dec: Decoder::new(),
233            enc: Encoder::new(),
234        }
235    }
236    /// Turns the internal encoder into one with confifured self-describe behaviour.
237    pub fn sd(self, sd: SdMode) -> Self {
238        Self {
239            dec: self.dec,
240            enc: Encoder { sd, ..self.enc },
241        }
242    }
243    /// Turns the internal encoder into one with configured packed encoding.
244    ///
245    /// If `packed` is true, it omits the field names from the encoded data. That makes it smaller,
246    /// but it also means the decoding end must know the exact order of fields and it can't be
247    /// something like python, which would want to get a dictionary out of it.
248    pub fn packed(self, packed: bool) -> Self {
249        Self {
250            dec: self.dec,
251            enc: Encoder { packed, ..self.enc },
252        }
253    }
254}
255
256impl<'de, Dec: Deserialize<'de>, Enc: Serialize> Default for Codec<Dec, Enc> {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262impl<'de, Dec: Deserialize<'de>, Enc: Serialize> IoDecoder for Codec<Dec, Enc> {
263    type Item = Dec;
264    type Error = Error;
265    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Dec>, Error> {
266        self.dec.decode(src)
267    }
268}
269
270impl<'de, Dec: Deserialize<'de>, Enc: Serialize> IoEncoder<Enc> for Codec<Dec, Enc> {
271    type Error = Error;
272    fn encode(&mut self, item: Enc, dst: &mut BytesMut) -> Result<(), Error> {
273        self.enc.encode(item, dst)
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use std::collections::HashMap;
280    use std::sync::Arc;
281
282    use super::*;
283
284    type TestData = HashMap<String, usize>;
285
286    /// Something to test with. It doesn't really matter what it is.
287    fn test_data() -> TestData {
288        let mut data = HashMap::new();
289        data.insert("hello".to_owned(), 42usize);
290        data.insert("world".to_owned(), 0usize);
291        data
292    }
293
294    /// Try decoding CBOR based data.
295    fn decode<Dec: IoDecoder<Item = TestData, Error = Error>>(dec: Dec) {
296        let mut decoder = dec;
297        let data = test_data();
298        let encoded = serde_cbor::to_vec(&data).unwrap();
299        let mut all = BytesMut::with_capacity(128);
300        // Put two copies and a bit into the buffer
301        all.extend(&encoded);
302        all.extend(&encoded);
303        all.extend(&encoded[..1]);
304        // We can now decode the first two copies
305        let decoded = decoder.decode(&mut all).unwrap().unwrap();
306        assert_eq!(data, decoded);
307        let decoded = decoder.decode(&mut all).unwrap().unwrap();
308        assert_eq!(data, decoded);
309        // And only 1 byte is left
310        assert_eq!(1, all.len());
311        // But the third one is not ready yet, so we get Ok(None)
312        assert!(decoder.decode(&mut all).unwrap().is_none());
313        // That single byte should still be there, yet unused
314        assert_eq!(1, all.len());
315        // We add the rest and get a third copy
316        all.extend(&encoded[1..]);
317        let decoded = decoder.decode(&mut all).unwrap().unwrap();
318        assert_eq!(data, decoded);
319        // Nothing there now
320        assert!(all.is_empty());
321        // Now we put some garbage there and see that it errors
322        all.extend(&[0, 1, 2, 3, 4]);
323        decoder.decode(&mut all).unwrap_err();
324        // All 5 bytes are still there
325        assert_eq!(5, all.len());
326    }
327
328    /// Run the decoding tests on the lone decoder.
329    #[test]
330    fn decode_only() {
331        let decoder = Decoder::new();
332        decode(decoder);
333    }
334
335    /// Run the decoding tests on the combined codec.
336    #[test]
337    fn decode_codec() {
338        let decoder: Codec<_, ()> = Codec::new();
339        decode(decoder);
340    }
341
342    /// Test encoding.
343    fn encode<Enc: IoEncoder<TestData, Error = Error>>(enc: Enc) {
344        let mut encoder = enc;
345        let data = test_data();
346        let mut buffer = BytesMut::with_capacity(0);
347        encoder.encode(data.clone(), &mut buffer).unwrap();
348        let pos1 = buffer.len();
349        let decoded = serde_cbor::from_slice::<TestData>(&buffer).unwrap();
350        assert_eq!(data, decoded);
351        // Once more, this time without the self-describe (should be smaller)
352        encoder.encode(data.clone(), &mut buffer).unwrap();
353        let pos2 = buffer.len();
354        // More data arrived
355        assert!(pos2 > pos1);
356        // But not as much as twice as many
357        assert!(pos1 * 2 > pos2);
358        // We can still decode it
359        let decoded = serde_cbor::from_slice::<TestData>(&buffer[pos1..]).unwrap();
360        assert_eq!(data, decoded);
361        // Encoding once more the size stays the same
362        encoder.encode(data, &mut buffer).unwrap();
363        let pos3 = buffer.len();
364        assert_eq!(pos2 - pos1, pos3 - pos2);
365    }
366
367    /// Test encoding by the lone encoder.
368    #[test]
369    fn encode_only() {
370        let encoder = Encoder::new().sd(SdMode::Once);
371        encode(encoder);
372    }
373
374    /// The same as `encode_only`, but with packed encoding.
375    #[test]
376    fn encode_packed() {
377        let encoder = Encoder::new().packed(true).sd(SdMode::Once);
378        encode(encoder);
379    }
380
381    /// Encoding with the combined `Codec`
382    #[test]
383    fn encode_codec() {
384        let encoder: Codec<(), _> = Codec::new().sd(SdMode::Once);
385        encode(encoder);
386    }
387
388    /// Checks that the codec can be send
389    #[test]
390    fn is_send() {
391        let codec: Codec<(), ()> = Codec::new();
392        std::thread::spawn(move || {
393            let _c = codec;
394        });
395    }
396
397    /// Checks that the codec can be send
398    #[test]
399    fn is_sync() {
400        let codec: Arc<Codec<(), ()>> = Arc::new(Codec::new());
401        std::thread::spawn(move || {
402            let _c = codec;
403        });
404    }
405}