tokio_bincodec/
lib.rs

1#![doc = include_str!("../README.md")]
2#![deny(missing_docs, missing_debug_implementations)]
3
4use bincode::{DefaultOptions, Options};
5use bytes::{Buf, BufMut, BytesMut};
6use serde::{Serialize, de::DeserializeOwned};
7use std::fmt;
8use std::io::{self, Read};
9use std::marker::PhantomData;
10use tokio_util::codec::{Decoder, Encoder};
11
12/// Create a bincode based codec
13#[inline]
14pub fn bincodec<T: DeserializeOwned>() -> BinCodec<T, DefaultOptions>
15{
16    BinCodec::<T, DefaultOptions>::with_config(bincode::options())
17}
18
19/// Bincode based codec for use with `tokio-codec`
20pub struct BinCodec<T, O> {
21    options: O,
22    _pd: PhantomData<T>,
23}
24
25impl<T: DeserializeOwned, O: Options + Copy> BinCodec<T, O>
26{
27    /// Provides a bincode based codec from the bincode config
28    #[inline]
29    pub fn with_config(config: O) -> Self {
30        BinCodec {
31            options: config,
32            _pd: PhantomData,
33        }
34    }
35}
36
37impl<T: DeserializeOwned, O: Options + Copy> Decoder for BinCodec<T, O>
38{
39    type Item = T;
40    type Error = bincode::Error;
41
42    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
43        if !buf.is_empty() {
44            let mut reader = Reader::new(&buf[..]);
45            let message = self.options.deserialize_from(&mut reader)?;
46            buf.advance(reader.amount());
47            Ok(Some(message))
48        } else {
49            Ok(None)
50        }
51    }
52}
53
54impl<T: Serialize, O: Options + Copy> Encoder<T> for BinCodec<T, O> {
55    type Error = bincode::Error;
56
57    fn encode(&mut self, item: T, buf: &mut BytesMut) -> Result<(), Self::Error> {
58        let size = self.options.serialized_size(&item)?;
59        buf.reserve(size as usize);
60        let message = self.options.serialize(&item)?;
61        buf.put(&message[..]);
62        Ok(())
63    }
64}
65
66impl<T, O> fmt::Debug for BinCodec<T, O> {
67    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
68        f.debug_struct("BinCodec").finish()
69    }
70}
71
72#[derive(Debug)]
73struct Reader<'buf> {
74    buf: &'buf [u8],
75    amount: usize,
76}
77
78impl<'buf> Reader<'buf> {
79    pub fn new(buf: &'buf [u8]) -> Self {
80        Reader { buf, amount: 0 }
81    }
82
83    pub fn amount(&self) -> usize {
84        self.amount
85    }
86}
87
88impl<'buf, 'a> Read for &'a mut Reader<'buf> {
89    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
90        let bytes_read = self.buf.read(buf)?;
91        self.amount += bytes_read;
92        Ok(bytes_read)
93    }
94}
95
96#[cfg(test)]
97mod test {
98    use std::net::SocketAddr;
99    use futures::sink::SinkExt;
100    use serde_derive::{Deserialize, Serialize};
101    use tokio::net::{TcpListener, TcpStream};
102    use tokio_stream::StreamExt;
103    use tokio_util::codec::Framed;
104    use crate::bincodec;
105
106    #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
107    enum Mock {
108        One(u8),
109        Two(f32),
110    }
111
112    #[tokio::test]
113    async fn this_should_run() {
114        let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 15151);
115        let echo = TcpListener::bind(&addr).await.unwrap();
116        tokio::spawn(async move {
117            match echo.accept().await {
118                Ok((socket, addr)) => {
119                    println!("new client: {:?}", addr);
120                    let mut f = Framed::new(socket, bincodec::<Mock>());
121                    while let Some(Ok(p)) = f.next().await {
122                        dbg!(&p);
123                        f.send(p).await.unwrap()
124                    }
125                }
126                Err(e) => println!("couldn't get client: {:?}", e),
127            }
128        });
129
130        let client = TcpStream::connect(&addr).await.unwrap();
131        let mut client = Framed::new(client, bincodec::<Mock>());
132        client.send(Mock::One(1)).await.unwrap();
133
134        let got = match client.next().await.unwrap() {
135            Ok(x) => x,
136            Err(e) => panic!("{e}"),
137        };
138
139        assert_eq!(got, Mock::One(1));
140
141        client.send(Mock::Two(2.0)).await.unwrap();
142
143        let got = match client.next().await.unwrap() {
144            Ok(x) => x,
145            Err(e) => panic!("{e}"),
146        };
147
148        assert_eq!(got, Mock::Two(2.0));
149    }
150}