1use futures::{Async, Poll};
2use hyper::{self, Body, Request, Response, StatusCode, Uri};
3use hyper::client::HttpConnector;
4use hyper::rt::{Future, Stream};
5use request;
6use serde::Deserialize;
7use serde_json;
8use std::error::Error as StdError;
9use std::fmt;
10
11#[derive(Clone, Debug)]
13pub struct Client {
14 uri: Uri,
15 client: hyper::Client<HttpConnector, Body>,
16}
17
18#[derive(Debug)]
20pub enum Error {
21 Hyper(hyper::Error),
22 SerdeJson(serde_json::Error),
23 Server(String),
24}
25
26pub type Key = Vec<u8>;
27pub type Value = Vec<u8>;
28pub type Entry = (Vec<u8>, Vec<u8>);
29
30#[derive(Debug)]
35pub struct BodyToJsonChunks {
36 body: Body,
37 buffer: Vec<u8>,
38}
39
40impl Client {
41 pub fn new(uri: Uri) -> Self {
47 let client = hyper::Client::builder().build_http();
48 Client { uri, client }
49 }
50
51 pub fn get(&self, key: Key) -> impl Future<Item = Option<Value>, Error = Error> {
55 let request = request::get(self.uri.clone(), key);
56 request_concat_and_deserialize(self, request)
57 }
58
59 pub fn del(&self, key: Key) -> impl Future<Item = Option<Value>, Error = Error> {
64 let request = request::del(self.uri.clone(), key);
65 request_concat_and_deserialize(self, request)
66 }
67
68 pub fn set(&self, key: Key, value: Value) -> impl Future<Item = (), Error = Error> {
72 let request = request::set(self.uri.clone(), key, value);
73 request_concat_and_deserialize(self, request)
74 }
75
76 pub fn cas(
86 &self,
87 key: Key,
88 old: Option<Value>,
89 new: Option<Value>,
90 ) -> impl Future<Item = Result<(), Option<Value>>, Error = Error> {
91 let request = request::cas(self.uri.clone(), key, old, new);
92 request_concat_and_deserialize(self, request)
93 }
94
95 pub fn merge(&self, key: Key, value: Value) -> impl Future<Item = (), Error = Error> {
99 let request = request::merge(self.uri.clone(), key, value);
100 request_concat_and_deserialize(self, request)
101 }
102
103 pub fn flush(&self) -> impl Future<Item = (), Error = Error> {
107 let request = request::flush(self.uri.clone());
108 request_concat_and_deserialize(self, request)
109 }
110
111 pub fn iter(&self) -> impl Stream<Item = Entry, Error = Error> {
115 let request = request::iter(self.uri.clone());
116 request_stream_and_deserialize(self, request)
117 }
118
119 pub fn scan(&self, key: Key) -> impl Stream<Item = Entry, Error = Error> {
123 let request = request::scan(self.uri.clone(), key);
124 request_stream_and_deserialize(self, request)
125 }
126
127 pub fn scan_range(&self, start: Key, end: Key) -> impl Stream<Item = Entry, Error = Error> {
131 let request = request::scan_range(self.uri.clone(), start, end);
132 request_stream_and_deserialize(self, request)
133 }
134
135 pub fn max(&self) -> impl Future<Item = Option<Entry>, Error = Error> {
141 let request = request::max(self.uri.clone());
142 request_concat_and_deserialize(self, request)
143 }
144
145 pub fn pred(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
150 let request = request::pred(self.uri.clone(), key);
151 request_concat_and_deserialize(self, request)
152 }
153
154 pub fn pred_incl(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
159 let request = request::pred_incl(self.uri.clone(), key);
160 request_concat_and_deserialize(self, request)
161 }
162
163 pub fn succ(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
168 let request = request::succ(self.uri.clone(), key);
169 request_concat_and_deserialize(self, request)
170 }
171
172 pub fn succ_incl(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
177 let request = request::succ_incl(self.uri.clone(), key);
178 request_concat_and_deserialize(self, request)
179 }
180}
181
182impl Stream for BodyToJsonChunks {
183 type Item = serde_json::Value;
184 type Error = Error;
185 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
186 loop {
187 match self.body.poll() {
188 Err(err) => return Err(err.into()),
189 Ok(Async::NotReady) => return Ok(Async::NotReady),
190 Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
191 Ok(Async::Ready(Some(chunk))) => self.buffer.extend(chunk),
192 }
193 let v = match serde_json::from_slice::<serde_json::Value>(&self.buffer) {
194 Err(_err) => continue,
195 Ok(v) => v,
196 };
197 self.buffer.clear();
198 return Ok(Async::Ready(Some(v)));
199 }
200 }
201}
202
203impl StdError for Error {
204 fn description(&self) -> &str {
205 match *self {
206 Error::Hyper(ref err) => err.description(),
207 Error::SerdeJson(ref err) => err.description(),
208 Error::Server(ref s) => s,
209 }
210 }
211 fn cause(&self) -> Option<&StdError> {
212 match *self {
213 Error::Hyper(ref err) => Some(err),
214 Error::SerdeJson(ref err) => Some(err),
215 Error::Server(_) => None,
216 }
217 }
218}
219
220impl fmt::Display for Error {
221 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222 write!(f, "{}", self.description())
223 }
224}
225
226impl From<hyper::Error> for Error {
227 fn from(e: hyper::Error) -> Self {
228 Error::Hyper(e)
229 }
230}
231
232impl From<serde_json::Error> for Error {
233 fn from(e: serde_json::Error) -> Self {
234 Error::SerdeJson(e)
235 }
236}
237
238impl From<Body> for BodyToJsonChunks {
239 fn from(body: Body) -> Self {
240 let buffer = vec![];
241 BodyToJsonChunks { body, buffer }
242 }
243}
244
245fn concat_and_deserialize<T>(response: Response<Body>) -> impl Future<Item = T, Error = Error>
247where
248 T: for<'de> Deserialize<'de>,
249{
250 let status = response.status();
251 BodyToJsonChunks::from(response.into_body())
252 .and_then(move |value| {
253 if status == StatusCode::INTERNAL_SERVER_ERROR {
254 let s = serde_json::from_value(value).map_err(Error::SerdeJson)?;
255 return Err(Error::Server(s));
256 }
257 serde_json::from_value::<T>(value).map_err(Error::SerdeJson)
258 })
259 .into_future()
260 .map_err(|(err, _)| err)
261 .and_then(|(opt, _stream)| opt.ok_or_else(|| unreachable!()))
262}
263
264fn stream_and_deserialize<T>(response: Response<Body>) -> impl Stream<Item = T, Error = Error>
266where
267 T: for<'de> Deserialize<'de>,
268{
269 BodyToJsonChunks::from(response.into_body())
270 .and_then(|json| serde_json::from_value(json).map_err(Error::SerdeJson))
271}
272
273fn request_concat_and_deserialize<T>(
275 client: &Client,
276 request: Request<Body>,
277) -> impl Future<Item = T, Error = Error>
278where
279 T: for<'de> Deserialize<'de>,
280{
281 client
282 .client
283 .request(request)
284 .map_err(Error::Hyper)
285 .and_then(concat_and_deserialize)
286}
287
288fn request_stream_and_deserialize<T>(
291 client: &Client,
292 request: Request<Body>,
293) -> impl Stream<Item = T, Error = Error>
294where
295 T: for<'de> Deserialize<'de>,
296{
297 client
298 .client
299 .request(request)
300 .map_err(Error::Hyper)
301 .map(stream_and_deserialize)
302 .flatten_stream()
303}