1#![doc = include_str!("../README.md")]
2#![deny(missing_docs, missing_debug_implementations)]
3
4use bincode::{DefaultOptions, Options};
5use bytes::{Buf, BufMut, BytesMut};
6use serde::{Serialize, de::DeserializeOwned};
7use std::fmt;
8use std::io::{self, Read};
9use std::marker::PhantomData;
10use tokio_util::codec::{Decoder, Encoder};
11
12#[inline]
14pub fn bincodec<T: DeserializeOwned>() -> BinCodec<T, DefaultOptions>
15{
16 BinCodec::<T, DefaultOptions>::with_config(bincode::options())
17}
18
19pub struct BinCodec<T, O> {
21 options: O,
22 _pd: PhantomData<T>,
23}
24
25impl<T: DeserializeOwned, O: Options + Copy> BinCodec<T, O>
26{
27 #[inline]
29 pub fn with_config(config: O) -> Self {
30 BinCodec {
31 options: config,
32 _pd: PhantomData,
33 }
34 }
35}
36
37impl<T: DeserializeOwned, O: Options + Copy> Decoder for BinCodec<T, O>
38{
39 type Item = T;
40 type Error = bincode::Error;
41
42 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
43 if !buf.is_empty() {
44 let mut reader = Reader::new(&buf[..]);
45 let message = self.options.deserialize_from(&mut reader)?;
46 buf.advance(reader.amount());
47 Ok(Some(message))
48 } else {
49 Ok(None)
50 }
51 }
52}
53
54impl<T: Serialize, O: Options + Copy> Encoder<T> for BinCodec<T, O> {
55 type Error = bincode::Error;
56
57 fn encode(&mut self, item: T, buf: &mut BytesMut) -> Result<(), Self::Error> {
58 let size = self.options.serialized_size(&item)?;
59 buf.reserve(size as usize);
60 let message = self.options.serialize(&item)?;
61 buf.put(&message[..]);
62 Ok(())
63 }
64}
65
66impl<T, O> fmt::Debug for BinCodec<T, O> {
67 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
68 f.debug_struct("BinCodec").finish()
69 }
70}
71
72#[derive(Debug)]
73struct Reader<'buf> {
74 buf: &'buf [u8],
75 amount: usize,
76}
77
78impl<'buf> Reader<'buf> {
79 pub fn new(buf: &'buf [u8]) -> Self {
80 Reader { buf, amount: 0 }
81 }
82
83 pub fn amount(&self) -> usize {
84 self.amount
85 }
86}
87
88impl<'buf, 'a> Read for &'a mut Reader<'buf> {
89 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
90 let bytes_read = self.buf.read(buf)?;
91 self.amount += bytes_read;
92 Ok(bytes_read)
93 }
94}
95
96#[cfg(test)]
97mod test {
98 use std::net::SocketAddr;
99 use futures::sink::SinkExt;
100 use serde_derive::{Deserialize, Serialize};
101 use tokio::net::{TcpListener, TcpStream};
102 use tokio_stream::StreamExt;
103 use tokio_util::codec::Framed;
104 use crate::bincodec;
105
106 #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
107 enum Mock {
108 One(u8),
109 Two(f32),
110 }
111
112 #[tokio::test]
113 async fn this_should_run() {
114 let addr = SocketAddr::new("127.0.0.1".parse().unwrap(), 15151);
115 let echo = TcpListener::bind(&addr).await.unwrap();
116 tokio::spawn(async move {
117 match echo.accept().await {
118 Ok((socket, addr)) => {
119 println!("new client: {:?}", addr);
120 let mut f = Framed::new(socket, bincodec::<Mock>());
121 while let Some(Ok(p)) = f.next().await {
122 dbg!(&p);
123 f.send(p).await.unwrap()
124 }
125 }
126 Err(e) => println!("couldn't get client: {:?}", e),
127 }
128 });
129
130 let client = TcpStream::connect(&addr).await.unwrap();
131 let mut client = Framed::new(client, bincodec::<Mock>());
132 client.send(Mock::One(1)).await.unwrap();
133
134 let got = match client.next().await.unwrap() {
135 Ok(x) => x,
136 Err(e) => panic!("{e}"),
137 };
138
139 assert_eq!(got, Mock::One(1));
140
141 client.send(Mock::Two(2.0)).await.unwrap();
142
143 let got = match client.next().await.unwrap() {
144 Ok(x) => x,
145 Err(e) => panic!("{e}"),
146 };
147
148 assert_eq!(got, Mock::Two(2.0));
149 }
150}