tokio_serde_json_mirror/lib.rs
1//! `Stream` and `Sink` adaptors for serializing and deserializing values using
2//! JSON.
3//!
4//! This crate provides adaptors for going from a stream or sink of buffers
5//! ([`Bytes`]) to a stream or sink of values by performing JSON encoding or
6//! decoding. It is expected that each yielded buffer contains a single
7//! serialized JSON value. The specific strategy by which this is done is left
8//! up to the user. One option is to use using [`length_delimited`] from
9//! [tokio-io].
10//!
11//! # Examples
12//!
13//! ```ignore
14//! use futures::{Future, Sink};
15//!
16//! use tokio_core::reactor::Core;
17//! use tokio_core::net::TcpStream;
18//!
19//! // Use length delimited frames
20//! use tokio_io::codec::length_delimited;
21//! use tokio_serde_json::WriteJson;
22//!
23//! // Bind a server socket
24//! let socket = TcpStream::connect(
25//! &"127.0.0.1:17653".parse().unwrap(),
26//! &handle);
27//!
28//! socket.and_then(|socket| {
29//! // Delimit frames using a length header
30//! let length_delimited = length_delimited::FramedWrite::new(socket);
31//!
32//! // Serialize frames with JSON
33//! let serialized = WriteJson::new(length_delimited);
34//!
35//! // Send the value
36//! serialized.send(json!({
37//! "name": "John Doe",
38//! "age": 43,
39//! "phones": [
40//! "+44 1234567",
41//! "+44 2345678"
42//! ]
43//! }))
44//! })
45//! ```
46//!
47//! For a full working server and client example, see the [examples] directory.
48//!
49//! [`Bytes`]: https://docs.rs/bytes/0.4/bytes/struct.Bytes.html
50//! [`length_delimited`]: https://docs.rs/tokio-io/0.1/tokio_io/codec/length_delimited/index.html
51//! [tokio-io]: https://github.com/tokio-rs/tokio-io
52//! [examples]: https://github.com/carllerche/tokio-serde-json/tree/master/examples
53
54extern crate futures;
55extern crate bytes;
56extern crate serde;
57extern crate serde_json;
58extern crate tokio_serde;
59
60use futures::{Stream, Poll, Sink, StartSend};
61use bytes::{Bytes, BytesMut, Buf, IntoBuf};
62use serde::{Serialize, Deserialize};
63use tokio_serde::{Serializer, Deserializer, FramedRead, FramedWrite};
64
65use std::marker::PhantomData;
66
67/// Adapts a stream of JSON encoded buffers to a stream of values by
68/// deserializing them.
69///
70/// `ReadJson` implements `Sink` by polling the inner buffer stream and
71/// deserializing the buffer as JSON. It expects that each yielded buffer
72/// represents a single JSON value and does not contain any extra trailing
73/// bytes.
74pub struct ReadJson<T, U> {
75 inner: FramedRead<T, U, Json<U>>,
76}
77
78/// Adapts a buffer sink to a value sink by serializing the values as JSON.
79///
80/// `WriteJson` implements `Sink` by serializing the submitted values to a
81/// buffer. The buffer is then sent to the inner stream, which is responsible
82/// for handling framing on the wire.
83pub struct WriteJson<T: Sink, U> {
84 inner: FramedWrite<T, U, Json<U>>,
85}
86
87struct Json<T> {
88 ghost: PhantomData<T>,
89}
90
91impl<T, U> ReadJson<T, U>
92 where T: Stream,
93 T::Error: From<serde_json::Error>,
94 for<'a> U: Deserialize<'a>,
95 Bytes: From<T::Item>,
96{
97 /// Creates a new `ReadJson` with the given buffer stream.
98 pub fn new(inner: T) -> ReadJson<T, U> {
99 let json = Json { ghost: PhantomData };
100 ReadJson { inner: FramedRead::new(inner, json) }
101 }
102}
103
104impl<T, U> ReadJson<T, U> {
105 /// Returns a reference to the underlying stream wrapped by `ReadJson`.
106 ///
107 /// Note that care should be taken to not tamper with the underlying stream
108 /// of data coming in as it may corrupt the stream of frames otherwise
109 /// being worked with.
110 pub fn get_ref(&self) -> &T {
111 self.inner.get_ref()
112 }
113
114 /// Returns a mutable reference to the underlying stream wrapped by
115 /// `ReadJson`.
116 ///
117 /// Note that care should be taken to not tamper with the underlying stream
118 /// of data coming in as it may corrupt the stream of frames otherwise
119 /// being worked with.
120 pub fn get_mut(&mut self) -> &mut T {
121 self.inner.get_mut()
122 }
123
124 /// Consumes the `ReadJson`, returning its underlying stream.
125 ///
126 /// Note that care should be taken to not tamper with the underlying stream
127 /// of data coming in as it may corrupt the stream of frames otherwise being
128 /// worked with.
129 pub fn into_inner(self) -> T {
130 self.inner.into_inner()
131 }
132}
133
134impl<T, U> Stream for ReadJson<T, U>
135 where T: Stream,
136 T::Error: From<serde_json::Error>,
137 for<'a> U: Deserialize<'a>,
138 Bytes: From<T::Item>,
139{
140 type Item = U;
141 type Error = T::Error;
142
143 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
144 self.inner.poll()
145 }
146}
147
148impl<T, U> Sink for ReadJson<T, U>
149 where T: Sink,
150{
151 type SinkItem = T::SinkItem;
152 type SinkError = T::SinkError;
153
154 fn start_send(&mut self, item: T::SinkItem)
155 -> StartSend<T::SinkItem, T::SinkError> {
156 self.get_mut().start_send(item)
157 }
158
159 fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
160 self.get_mut().poll_complete()
161 }
162
163 fn close(&mut self) -> Poll<(), T::SinkError> {
164 self.get_mut().close()
165 }
166}
167
168impl<T, U> WriteJson<T, U>
169 where T: Sink<SinkItem = BytesMut>,
170 T::SinkError: From<serde_json::Error>,
171 U: Serialize,
172{
173 /// Creates a new `WriteJson` with the given buffer sink.
174 pub fn new(inner: T) -> WriteJson<T, U> {
175 let json = Json { ghost: PhantomData };
176 WriteJson { inner: FramedWrite::new(inner, json) }
177 }
178}
179
180impl<T: Sink, U> WriteJson<T, U> {
181 /// Returns a reference to the underlying sink wrapped by `WriteJson`.
182 ///
183 /// Note that care should be taken to not tamper with the underlying sink as
184 /// it may corrupt the sequence of frames otherwise being worked with.
185 pub fn get_ref(&self) -> &T {
186 self.inner.get_ref()
187 }
188
189 /// Returns a mutable reference to the underlying sink wrapped by
190 /// `WriteJson`.
191 ///
192 /// Note that care should be taken to not tamper with the underlying sink as
193 /// it may corrupt the sequence of frames otherwise being worked with.
194 pub fn get_mut(&mut self) -> &mut T {
195 self.inner.get_mut()
196 }
197
198 /// Consumes the `WriteJson`, returning its underlying sink.
199 ///
200 /// Note that care should be taken to not tamper with the underlying sink as
201 /// it may corrupt the sequence of frames otherwise being worked with.
202 pub fn into_inner(self) -> T {
203 self.inner.into_inner()
204 }
205}
206
207impl<T, U> Sink for WriteJson<T, U>
208 where T: Sink<SinkItem = BytesMut>,
209 T::SinkError: From<serde_json::Error>,
210 U: Serialize,
211{
212 type SinkItem = U;
213 type SinkError = T::SinkError;
214
215 fn start_send(&mut self, item: U) -> StartSend<U, T::SinkError> {
216 self.inner.start_send(item)
217 }
218
219 fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
220 self.inner.poll_complete()
221 }
222
223 fn close(&mut self) -> Poll<(), T::SinkError> {
224 self.inner.close()
225 }
226}
227
228impl<T, U> Stream for WriteJson<T, U>
229 where T: Stream + Sink,
230{
231 type Item = T::Item;
232 type Error = T::Error;
233
234 fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
235 self.get_mut().poll()
236 }
237}
238
239impl<T> Deserializer<T> for Json<T>
240 where for <'a> T: Deserialize<'a>,
241{
242 type Error = serde_json::Error;
243
244 fn deserialize(&mut self, src: &Bytes) -> Result<T, Self::Error> {
245 serde_json::from_reader(src.into_buf().reader())
246 }
247}
248
249impl<T: Serialize> Serializer<T> for Json<T> {
250 type Error = serde_json::Error;
251
252 fn serialize(&mut self, item: &T) -> Result<BytesMut, Self::Error> {
253 serde_json::to_vec(item).map(Into::into)
254 }
255}