use futures::{Async, Poll};
use hyper::{self, Body, Request, Response, StatusCode, Uri};
use hyper::client::HttpConnector;
use hyper::rt::{Future, Stream};
use request;
use serde::Deserialize;
use serde_json;
use std::error::Error as StdError;
use std::fmt;
#[derive(Clone, Debug)]
pub struct Client {
uri: Uri,
client: hyper::Client<HttpConnector, Body>,
}
#[derive(Debug)]
pub enum Error {
Hyper(hyper::Error),
SerdeJson(serde_json::Error),
Server(String),
}
pub type Key = Vec<u8>;
pub type Value = Vec<u8>;
pub type Entry = (Vec<u8>, Vec<u8>);
#[derive(Debug)]
pub struct BodyToJsonChunks {
body: Body,
buffer: Vec<u8>,
}
impl Client {
pub fn new(uri: Uri) -> Self {
let client = hyper::Client::builder().build_http();
Client { uri, client }
}
pub fn get(&self, key: Key) -> impl Future<Item = Option<Value>, Error = Error> {
let request = request::get(self.uri.clone(), key);
request_concat_and_deserialize(self, request)
}
pub fn del(&self, key: Key) -> impl Future<Item = Option<Value>, Error = Error> {
let request = request::del(self.uri.clone(), key);
request_concat_and_deserialize(self, request)
}
pub fn set(&self, key: Key, value: Value) -> impl Future<Item = (), Error = Error> {
let request = request::set(self.uri.clone(), key, value);
request_concat_and_deserialize(self, request)
}
pub fn cas(
&self,
key: Key,
old: Option<Value>,
new: Option<Value>,
) -> impl Future<Item = Result<(), Option<Value>>, Error = Error> {
let request = request::cas(self.uri.clone(), key, old, new);
request_concat_and_deserialize(self, request)
}
pub fn merge(&self, key: Key, value: Value) -> impl Future<Item = (), Error = Error> {
let request = request::merge(self.uri.clone(), key, value);
request_concat_and_deserialize(self, request)
}
pub fn flush(&self) -> impl Future<Item = (), Error = Error> {
let request = request::flush(self.uri.clone());
request_concat_and_deserialize(self, request)
}
pub fn iter(&self) -> impl Stream<Item = Entry, Error = Error> {
let request = request::iter(self.uri.clone());
request_stream_and_deserialize(self, request)
}
pub fn scan(&self, key: Key) -> impl Stream<Item = Entry, Error = Error> {
let request = request::scan(self.uri.clone(), key);
request_stream_and_deserialize(self, request)
}
pub fn scan_range(&self, start: Key, end: Key) -> impl Stream<Item = Entry, Error = Error> {
let request = request::scan_range(self.uri.clone(), start, end);
request_stream_and_deserialize(self, request)
}
pub fn max(&self) -> impl Future<Item = Option<Entry>, Error = Error> {
let request = request::max(self.uri.clone());
request_concat_and_deserialize(self, request)
}
pub fn pred(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
let request = request::pred(self.uri.clone(), key);
request_concat_and_deserialize(self, request)
}
pub fn pred_incl(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
let request = request::pred_incl(self.uri.clone(), key);
request_concat_and_deserialize(self, request)
}
pub fn succ(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
let request = request::succ(self.uri.clone(), key);
request_concat_and_deserialize(self, request)
}
pub fn succ_incl(&self, key: Key) -> impl Future<Item = Option<Entry>, Error = Error> {
let request = request::succ_incl(self.uri.clone(), key);
request_concat_and_deserialize(self, request)
}
}
impl Stream for BodyToJsonChunks {
type Item = serde_json::Value;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.body.poll() {
Err(err) => return Err(err.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::Ready(Some(chunk))) => self.buffer.extend(chunk),
}
let v = match serde_json::from_slice::<serde_json::Value>(&self.buffer) {
Err(_err) => continue,
Ok(v) => v,
};
self.buffer.clear();
return Ok(Async::Ready(Some(v)));
}
}
}
impl StdError for Error {
fn description(&self) -> &str {
match *self {
Error::Hyper(ref err) => err.description(),
Error::SerdeJson(ref err) => err.description(),
Error::Server(ref s) => s,
}
}
fn cause(&self) -> Option<&StdError> {
match *self {
Error::Hyper(ref err) => Some(err),
Error::SerdeJson(ref err) => Some(err),
Error::Server(_) => None,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.description())
}
}
impl From<hyper::Error> for Error {
fn from(e: hyper::Error) -> Self {
Error::Hyper(e)
}
}
impl From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Self {
Error::SerdeJson(e)
}
}
impl From<Body> for BodyToJsonChunks {
fn from(body: Body) -> Self {
let buffer = vec![];
BodyToJsonChunks { body, buffer }
}
}
fn concat_and_deserialize<T>(response: Response<Body>) -> impl Future<Item = T, Error = Error>
where
T: for<'de> Deserialize<'de>,
{
let status = response.status();
BodyToJsonChunks::from(response.into_body())
.and_then(move |value| {
if status == StatusCode::INTERNAL_SERVER_ERROR {
let s = serde_json::from_value(value).map_err(Error::SerdeJson)?;
return Err(Error::Server(s));
}
serde_json::from_value::<T>(value).map_err(Error::SerdeJson)
})
.into_future()
.map_err(|(err, _)| err)
.and_then(|(opt, _stream)| opt.ok_or_else(|| unreachable!()))
}
fn stream_and_deserialize<T>(response: Response<Body>) -> impl Stream<Item = T, Error = Error>
where
T: for<'de> Deserialize<'de>,
{
BodyToJsonChunks::from(response.into_body())
.and_then(|json| serde_json::from_value(json).map_err(Error::SerdeJson))
}
fn request_concat_and_deserialize<T>(
client: &Client,
request: Request<Body>,
) -> impl Future<Item = T, Error = Error>
where
T: for<'de> Deserialize<'de>,
{
client
.client
.request(request)
.map_err(Error::Hyper)
.and_then(concat_and_deserialize)
}
fn request_stream_and_deserialize<T>(
client: &Client,
request: Request<Body>,
) -> impl Stream<Item = T, Error = Error>
where
T: for<'de> Deserialize<'de>,
{
client
.client
.request(request)
.map_err(Error::Hyper)
.map(stream_and_deserialize)
.flatten_stream()
}