tokio_serde_bincode/
lib.rs1extern crate bincode;
17extern crate bytes;
18extern crate futures;
19extern crate serde;
20extern crate tokio_serde;
21
22use bincode::Error;
23use bytes::{Bytes, BytesMut};
24use futures::{Poll, Sink, StartSend, Stream};
25use serde::{Deserialize, Serialize};
26use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
27
28use std::marker::PhantomData;
29
30pub struct ReadBincode<T, U> {
38 inner: FramedRead<T, U, Bincode<U>>,
39}
40
41pub struct WriteBincode<T: Sink, U> {
47 inner: FramedWrite<T, U, Bincode<U>>,
48}
49
50struct Bincode<T> {
51 ghost: PhantomData<T>,
52}
53
54impl<T, U> ReadBincode<T, U>
55where
56 T: Stream,
57 T::Error: From<Error>,
58 U: for<'de> Deserialize<'de>,
59 BytesMut: From<T::Item>,
60{
61 pub fn new(inner: T) -> ReadBincode<T, U> {
63 let json = Bincode { ghost: PhantomData };
64 ReadBincode {
65 inner: FramedRead::new(inner, json),
66 }
67 }
68}
69
70impl<T, U> ReadBincode<T, U> {
71 pub fn get_ref(&self) -> &T {
77 self.inner.get_ref()
78 }
79
80 pub fn get_mut(&mut self) -> &mut T {
87 self.inner.get_mut()
88 }
89
90 pub fn into_inner(self) -> T {
96 self.inner.into_inner()
97 }
98}
99
100impl<T, U> Stream for ReadBincode<T, U>
101where
102 T: Stream,
103 T::Error: From<Error>,
104 U: for<'de> Deserialize<'de>,
105 BytesMut: From<T::Item>,
106{
107 type Item = U;
108 type Error = <T as Stream>::Error;
109
110 fn poll(&mut self) -> Poll<Option<U>, Self::Error> {
111 self.inner.poll()
112 }
113}
114
115impl<T, U> Sink for ReadBincode<T, U>
116where
117 T: Sink,
118{
119 type SinkItem = T::SinkItem;
120 type SinkError = T::SinkError;
121
122 fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
123 self.get_mut().start_send(item)
124 }
125
126 fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
127 self.get_mut().poll_complete()
128 }
129
130 fn close(&mut self) -> Poll<(), T::SinkError> {
131 self.get_mut().close()
132 }
133}
134
135impl<T, U> WriteBincode<T, U>
136where
137 T: Sink<SinkItem = Bytes>,
138 T::SinkError: From<Error>,
139 U: Serialize,
140{
141 pub fn new(inner: T) -> WriteBincode<T, U> {
143 let json = Bincode { ghost: PhantomData };
144 WriteBincode {
145 inner: FramedWrite::new(inner, json),
146 }
147 }
148}
149
150impl<T: Sink, U> WriteBincode<T, U> {
151 pub fn get_ref(&self) -> &T {
156 self.inner.get_ref()
157 }
158
159 pub fn get_mut(&mut self) -> &mut T {
165 self.inner.get_mut()
166 }
167
168 pub fn into_inner(self) -> T {
173 self.inner.into_inner()
174 }
175}
176
177impl<T, U> Sink for WriteBincode<T, U>
178where
179 T: Sink<SinkItem = Bytes>,
180 T::SinkError: From<Error>,
181 U: Serialize,
182{
183 type SinkItem = U;
184 type SinkError = <T as Sink>::SinkError;
185
186 fn start_send(&mut self, item: U) -> StartSend<U, Self::SinkError> {
187 self.inner.start_send(item)
188 }
189
190 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
191 self.inner.poll_complete()
192 }
193
194 fn close(&mut self) -> Poll<(), Self::SinkError> {
195 self.inner.poll_complete()
196 }
197}
198
199impl<T, U> Stream for WriteBincode<T, U>
200where
201 T: Stream + Sink,
202{
203 type Item = T::Item;
204 type Error = T::Error;
205
206 fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
207 self.get_mut().poll()
208 }
209}
210
211impl<T> Deserializer<T> for Bincode<T>
212where
213 T: for<'de> Deserialize<'de>,
214{
215 type Error = Error;
216
217 fn deserialize(&mut self, src: &BytesMut) -> Result<T, Error> {
218 bincode::deserialize(src)
219 }
220}
221
222impl<T: Serialize> Serializer<T> for Bincode<T> {
223 type Error = Error;
224
225 fn serialize(&mut self, item: &T) -> Result<Bytes, Self::Error> {
226 bincode::serialize(item).map(Into::into)
227 }
228}