msgpack_rpc/
codec.rs

1use crate::errors::DecodeError;
2use crate::message::Message;
3use bytes::BytesMut;
4use std::io;
5use tokio_util::codec::{Decoder, Encoder};
6
7#[derive(Debug)]
8pub(crate) struct Codec;
9
10impl Decoder for Codec {
11    type Item = Message;
12    type Error = io::Error;
13
14    fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> {
15        let res: Result<Option<Self::Item>, Self::Error>;
16        let position = {
17            let mut buf = io::Cursor::new(&src);
18            loop {
19                match Message::decode(&mut buf) {
20                    Ok(message) => {
21                        res = Ok(Some(message));
22                        break;
23                    }
24                    Err(err) => match err {
25                        DecodeError::Truncated(_) => return Ok(None),
26                        DecodeError::DepthLimitExceeded => return Ok(None), // Todo, consider replacing with `continue`
27                        DecodeError::Invalid => continue,
28                        DecodeError::UnknownIo(io_err) => {
29                            res = Err(io_err);
30                            break;
31                        }
32                    },
33                }
34            }
35            buf.position() as usize
36        };
37        let _ = src.split_to(position);
38        res
39    }
40}
41
42impl Encoder<Message> for Codec {
43    type Error = io::Error;
44
45    fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> io::Result<()> {
46        let bytes = msg.pack()?;
47        buf.extend_from_slice(&bytes);
48        Ok(())
49    }
50}
51
52#[test]
53fn decode() {
54    use crate::message::{Message, Request};
55    fn try_decode(input: &[u8], rest: &[u8]) -> io::Result<Option<Message>> {
56        let mut codec = Codec {};
57        let mut buf = BytesMut::from(input);
58        let result = codec.decode(&mut buf);
59        assert_eq!(rest, &buf);
60        result
61    }
62
63    let msg = Message::Request(Request {
64        id: 1234,
65        method: "dummy".to_string(),
66        params: Vec::new(),
67    });
68
69    // A single message, nothing is left
70    assert_eq!(
71        try_decode(&msg.pack().unwrap(), b"").unwrap(),
72        Some(msg.clone())
73    );
74
75    // The first message is decoded, the second stays in the buffer
76    let mut bytes = [&msg.pack().unwrap()[..], &msg.pack().unwrap()[..]].concat();
77    assert_eq!(
78        try_decode(&bytes, &msg.pack().unwrap()).unwrap(),
79        Some(msg.clone())
80    );
81
82    // An incomplete message: nothing gets out and everything stays
83    let packed_msg = msg.pack().unwrap();
84    bytes = Vec::from(&packed_msg[0..packed_msg.len() - 1]);
85    assert_eq!(try_decode(&bytes, &bytes).unwrap(), None);
86
87    // An invalid message: it gets eaten, and the next message get read.
88    bytes = [&[0, 1, 2], &msg.pack().unwrap()[..]].concat();
89    assert_eq!(try_decode(&bytes, b"").unwrap(), Some(msg.clone()));
90}