tokio_serde_json_mirror/
lib.rs

1//! `Stream` and `Sink` adaptors for serializing and deserializing values using
2//! JSON.
3//!
4//! This crate provides adaptors for going from a stream or sink of buffers
5//! ([`Bytes`]) to a stream or sink of values by performing JSON encoding or
6//! decoding. It is expected that each yielded buffer contains a single
7//! serialized JSON value. The specific strategy by which this is done is left
8//! up to the user. One option is to use using [`length_delimited`] from
9//! [tokio-io].
10//!
11//! # Examples
12//!
13//! ```ignore
14//! use futures::{Future, Sink};
15//!
16//! use tokio_core::reactor::Core;
17//! use tokio_core::net::TcpStream;
18//!
19//! // Use length delimited frames
20//! use tokio_io::codec::length_delimited;
21//! use tokio_serde_json::WriteJson;
22//!
23//! // Bind a server socket
24//! let socket = TcpStream::connect(
25//!     &"127.0.0.1:17653".parse().unwrap(),
26//!     &handle);
27//!
28//! socket.and_then(|socket| {
29//!     // Delimit frames using a length header
30//!     let length_delimited = length_delimited::FramedWrite::new(socket);
31//!
32//!     // Serialize frames with JSON
33//!     let serialized = WriteJson::new(length_delimited);
34//!
35//!     // Send the value
36//!     serialized.send(json!({
37//!       "name": "John Doe",
38//!       "age": 43,
39//!       "phones": [
40//!         "+44 1234567",
41//!         "+44 2345678"
42//!       ]
43//!     }))
44//! })
45//! ```
46//!
47//! For a full working server and client example, see the [examples] directory.
48//!
49//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
50//! [`length_delimited`]: https://docs.rs/tokio-io/0.1/tokio_io/codec/length_delimited/index.html
51//! [tokio-io]: https://github.com/tokio-rs/tokio-io
52//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
53
54extern crate futures;
55extern crate bytes;
56extern crate serde;
57extern crate serde_json;
58extern crate tokio_serde;
59
60use futures::{Stream, Poll, Sink, StartSend};
61use bytes::{Bytes, BytesMut, Buf, IntoBuf};
62use serde::{Serialize, Deserialize};
63use tokio_serde::{Serializer, Deserializer, FramedRead, FramedWrite};
64
65use std::marker::PhantomData;
66
67/// Adapts a stream of JSON encoded buffers to a stream of values by
68/// deserializing them.
69///
70/// `ReadJson` implements `Sink` by polling the inner buffer stream and
71/// deserializing the buffer as JSON. It expects that each yielded buffer
72/// represents a single JSON value and does not contain any extra trailing
73/// bytes.
74pub struct ReadJson<T, U> {
75    inner: FramedRead<T, U, Json<U>>,
76}
77
78/// Adapts a buffer sink to a value sink by serializing the values as JSON.
79///
80/// `WriteJson` implements `Sink` by serializing the submitted values to a
81/// buffer. The buffer is then sent to the inner stream, which is responsible
82/// for handling framing on the wire.
83pub struct WriteJson<T: Sink, U> {
84    inner: FramedWrite<T, U, Json<U>>,
85}
86
87struct Json<T> {
88    ghost: PhantomData<T>,
89}
90
91impl<T, U> ReadJson<T, U>
92    where T: Stream,
93          T::Error: From<serde_json::Error>,
94          for<'a> U: Deserialize<'a>,
95          Bytes: From<T::Item>,
96{
97    /// Creates a new `ReadJson` with the given buffer stream.
98    pub fn new(inner: T) -> ReadJson<T, U> {
99        let json = Json { ghost: PhantomData };
100        ReadJson { inner: FramedRead::new(inner, json) }
101    }
102}
103
104impl<T, U> ReadJson<T, U> {
105    /// Returns a reference to the underlying stream wrapped by `ReadJson`.
106    ///
107    /// Note that care should be taken to not tamper with the underlying stream
108    /// of data coming in as it may corrupt the stream of frames otherwise
109    /// being worked with.
110    pub fn get_ref(&self) -> &T {
111        self.inner.get_ref()
112    }
113
114    /// Returns a mutable reference to the underlying stream wrapped by
115    /// `ReadJson`.
116    ///
117    /// Note that care should be taken to not tamper with the underlying stream
118    /// of data coming in as it may corrupt the stream of frames otherwise
119    /// being worked with.
120    pub fn get_mut(&mut self) -> &mut T {
121        self.inner.get_mut()
122    }
123
124    /// Consumes the `ReadJson`, returning its underlying stream.
125    ///
126    /// Note that care should be taken to not tamper with the underlying stream
127    /// of data coming in as it may corrupt the stream of frames otherwise being
128    /// worked with.
129    pub fn into_inner(self) -> T {
130        self.inner.into_inner()
131    }
132}
133
134impl<T, U> Stream for ReadJson<T, U>
135    where T: Stream,
136          T::Error: From<serde_json::Error>,
137          for<'a> U: Deserialize<'a>,
138          Bytes: From<T::Item>,
139{
140    type Item = U;
141    type Error = T::Error;
142
143    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
144        self.inner.poll()
145    }
146}
147
148impl<T, U> Sink for ReadJson<T, U>
149    where T: Sink,
150{
151    type SinkItem = T::SinkItem;
152    type SinkError = T::SinkError;
153
154    fn start_send(&mut self, item: T::SinkItem)
155                  -> StartSend<T::SinkItem, T::SinkError> {
156        self.get_mut().start_send(item)
157    }
158
159    fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
160        self.get_mut().poll_complete()
161    }
162
163    fn close(&mut self) -> Poll<(), T::SinkError> {
164        self.get_mut().close()
165    }
166}
167
168impl<T, U> WriteJson<T, U>
169    where T: Sink<SinkItem = BytesMut>,
170          T::SinkError: From<serde_json::Error>,
171          U: Serialize,
172{
173    /// Creates a new `WriteJson` with the given buffer sink.
174    pub fn new(inner: T) -> WriteJson<T, U> {
175        let json = Json { ghost: PhantomData };
176        WriteJson { inner: FramedWrite::new(inner, json) }
177    }
178}
179
180impl<T: Sink, U> WriteJson<T, U> {
181    /// Returns a reference to the underlying sink wrapped by `WriteJson`.
182    ///
183    /// Note that care should be taken to not tamper with the underlying sink as
184    /// it may corrupt the sequence of frames otherwise being worked with.
185    pub fn get_ref(&self) -> &T {
186        self.inner.get_ref()
187    }
188
189    /// Returns a mutable reference to the underlying sink wrapped by
190    /// `WriteJson`.
191    ///
192    /// Note that care should be taken to not tamper with the underlying sink as
193    /// it may corrupt the sequence of frames otherwise being worked with.
194    pub fn get_mut(&mut self) -> &mut T {
195        self.inner.get_mut()
196    }
197
198    /// Consumes the `WriteJson`, returning its underlying sink.
199    ///
200    /// Note that care should be taken to not tamper with the underlying sink as
201    /// it may corrupt the sequence of frames otherwise being worked with.
202    pub fn into_inner(self) -> T {
203        self.inner.into_inner()
204    }
205}
206
207impl<T, U> Sink for WriteJson<T, U>
208    where T: Sink<SinkItem = BytesMut>,
209          T::SinkError: From<serde_json::Error>,
210          U: Serialize,
211{
212    type SinkItem = U;
213    type SinkError = T::SinkError;
214
215    fn start_send(&mut self, item: U) -> StartSend<U, T::SinkError> {
216        self.inner.start_send(item)
217    }
218
219    fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
220        self.inner.poll_complete()
221    }
222
223    fn close(&mut self) -> Poll<(), T::SinkError> {
224        self.inner.close()
225    }
226}
227
228impl<T, U> Stream for WriteJson<T, U>
229    where T: Stream + Sink,
230{
231    type Item = T::Item;
232    type Error = T::Error;
233
234    fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
235        self.get_mut().poll()
236    }
237}
238
239impl<T> Deserializer<T> for Json<T>
240    where for <'a> T: Deserialize<'a>,
241{
242    type Error = serde_json::Error;
243
244    fn deserialize(&mut self, src: &Bytes) -> Result<T, Self::Error> {
245        serde_json::from_reader(src.into_buf().reader())
246    }
247}
248
249impl<T: Serialize> Serializer<T> for Json<T> {
250    type Error = serde_json::Error;
251
252    fn serialize(&mut self, item: &T) -> Result<BytesMut, Self::Error> {
253        serde_json::to_vec(item).map(Into::into)
254    }
255}