tokio_serde_bincode/
lib.rs

1//! `Stream` and `Sink` adaptors for serializing and deserializing values using
2//! Bincode.
3//!
4//! This crate provides adaptors for going from a stream or sink of buffers
5//! ([`Bytes`]) to a stream or sink of values by performing Bincode encoding or
6//! decoding. It is expected that each yielded buffer contains a single
7//! serialized Bincode value. The specific strategy by which this is done is left
8//! up to the user. One option is to use using [`length_delimited`] from
9//! [tokio-io].
10//!
11//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
12//! [`length_delimited`]: http://alexcrichton.com/tokio-io/tokio_io/codec/length_delimited/index.html
13//! [tokio-io]: http://github.com/alexcrichton/tokio-io
14//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
15
16extern crate bincode;
17extern crate bytes;
18extern crate futures;
19extern crate serde;
20extern crate tokio_serde;
21
22use bincode::Error;
23use bytes::{Bytes, BytesMut};
24use futures::{Poll, Sink, StartSend, Stream};
25use serde::{Deserialize, Serialize};
26use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
27
28use std::marker::PhantomData;
29
30/// Adapts a stream of Bincode encoded buffers to a stream of values by
31/// deserializing them.
32///
33/// `ReadBincode` implements `Stream` by polling the inner buffer stream and
34/// deserializing the buffer as Bincode. It expects that each yielded buffer
35/// represents a single Bincode value and does not contain any extra trailing
36/// bytes.
37pub struct ReadBincode<T, U> {
38    inner: FramedRead<T, U, Bincode<U>>,
39}
40
41/// Adapts a buffer sink to a value sink by serializing the values as Bincode.
42///
43/// `WriteBincode` implements `Sink` by serializing the submitted values to a
44/// buffer. The buffer is then sent to the inner stream, which is responsible
45/// for handling framing on the wire.
46pub struct WriteBincode<T: Sink, U> {
47    inner: FramedWrite<T, U, Bincode<U>>,
48}
49
50struct Bincode<T> {
51    ghost: PhantomData<T>,
52}
53
54impl<T, U> ReadBincode<T, U>
55where
56    T: Stream,
57    T::Error: From<Error>,
58    U: for<'de> Deserialize<'de>,
59    BytesMut: From<T::Item>,
60{
61    /// Creates a new `ReadBincode` with the given buffer stream.
62    pub fn new(inner: T) -> ReadBincode<T, U> {
63        let json = Bincode { ghost: PhantomData };
64        ReadBincode {
65            inner: FramedRead::new(inner, json),
66        }
67    }
68}
69
70impl<T, U> ReadBincode<T, U> {
71    /// Returns a reference to the underlying stream wrapped by `ReadBincode`.
72    ///
73    /// Note that care should be taken to not tamper with the underlying stream
74    /// of data coming in as it may corrupt the stream of frames otherwise
75    /// being worked with.
76    pub fn get_ref(&self) -> &T {
77        self.inner.get_ref()
78    }
79
80    /// Returns a mutable reference to the underlying stream wrapped by
81    /// `ReadBincode`.
82    ///
83    /// Note that care should be taken to not tamper with the underlying stream
84    /// of data coming in as it may corrupt the stream of frames otherwise
85    /// being worked with.
86    pub fn get_mut(&mut self) -> &mut T {
87        self.inner.get_mut()
88    }
89
90    /// Consumes the `ReadBincode`, returning its underlying stream.
91    ///
92    /// Note that care should be taken to not tamper with the underlying stream
93    /// of data coming in as it may corrupt the stream of frames otherwise being
94    /// worked with.
95    pub fn into_inner(self) -> T {
96        self.inner.into_inner()
97    }
98}
99
100impl<T, U> Stream for ReadBincode<T, U>
101where
102    T: Stream,
103    T::Error: From<Error>,
104    U: for<'de> Deserialize<'de>,
105    BytesMut: From<T::Item>,
106{
107    type Item = U;
108    type Error = <T as Stream>::Error;
109
110    fn poll(&mut self) -> Poll<Option<U>, Self::Error> {
111        self.inner.poll()
112    }
113}
114
115impl<T, U> Sink for ReadBincode<T, U>
116where
117    T: Sink,
118{
119    type SinkItem = T::SinkItem;
120    type SinkError = T::SinkError;
121
122    fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
123        self.get_mut().start_send(item)
124    }
125
126    fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
127        self.get_mut().poll_complete()
128    }
129
130    fn close(&mut self) -> Poll<(), T::SinkError> {
131        self.get_mut().close()
132    }
133}
134
135impl<T, U> WriteBincode<T, U>
136where
137    T: Sink<SinkItem = Bytes>,
138    T::SinkError: From<Error>,
139    U: Serialize,
140{
141    /// Creates a new `WriteBincode` with the given buffer sink.
142    pub fn new(inner: T) -> WriteBincode<T, U> {
143        let json = Bincode { ghost: PhantomData };
144        WriteBincode {
145            inner: FramedWrite::new(inner, json),
146        }
147    }
148}
149
150impl<T: Sink, U> WriteBincode<T, U> {
151    /// Returns a reference to the underlying sink wrapped by `WriteBincode`.
152    ///
153    /// Note that care should be taken to not tamper with the underlying sink as
154    /// it may corrupt the sequence of frames otherwise being worked with.
155    pub fn get_ref(&self) -> &T {
156        self.inner.get_ref()
157    }
158
159    /// Returns a mutable reference to the underlying sink wrapped by
160    /// `WriteBincode`.
161    ///
162    /// Note that care should be taken to not tamper with the underlying sink as
163    /// it may corrupt the sequence of frames otherwise being worked with.
164    pub fn get_mut(&mut self) -> &mut T {
165        self.inner.get_mut()
166    }
167
168    /// Consumes the `WriteBincode`, returning its underlying sink.
169    ///
170    /// Note that care should be taken to not tamper with the underlying sink as
171    /// it may corrupt the sequence of frames otherwise being worked with.
172    pub fn into_inner(self) -> T {
173        self.inner.into_inner()
174    }
175}
176
177impl<T, U> Sink for WriteBincode<T, U>
178where
179    T: Sink<SinkItem = Bytes>,
180    T::SinkError: From<Error>,
181    U: Serialize,
182{
183    type SinkItem = U;
184    type SinkError = <T as Sink>::SinkError;
185
186    fn start_send(&mut self, item: U) -> StartSend<U, Self::SinkError> {
187        self.inner.start_send(item)
188    }
189
190    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
191        self.inner.poll_complete()
192    }
193
194    fn close(&mut self) -> Poll<(), Self::SinkError> {
195        self.inner.poll_complete()
196    }
197}
198
199impl<T, U> Stream for WriteBincode<T, U>
200where
201    T: Stream + Sink,
202{
203    type Item = T::Item;
204    type Error = T::Error;
205
206    fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
207        self.get_mut().poll()
208    }
209}
210
211impl<T> Deserializer<T> for Bincode<T>
212where
213    T: for<'de> Deserialize<'de>,
214{
215    type Error = Error;
216
217    fn deserialize(&mut self, src: &BytesMut) -> Result<T, Error> {
218        bincode::deserialize(src)
219    }
220}
221
222impl<T: Serialize> Serializer<T> for Bincode<T> {
223    type Error = Error;
224
225    fn serialize(&mut self, item: &T) -> Result<Bytes, Self::Error> {
226        bincode::serialize(item).map(Into::into)
227    }
228}