use std::mem;
use futures::{Future, Poll, Stream};
use futures_cpupool::CpuPool;
use serde::de::DeserializeOwned;
use reqwest::unstable::async::{Decoder, Response as RawResponse};
use error::{self, Error};
use http::AsyncChunk;
use super::parse::{parse, IsOk};
pub struct AsyncResponseBuilder {
inner: RawResponse,
de_pool: Option<CpuPool>,
}
pub(crate) fn async_response(res: RawResponse, de_pool: Option<CpuPool>) -> AsyncResponseBuilder {
AsyncResponseBuilder {
inner: res,
de_pool: de_pool,
}
}
impl AsyncResponseBuilder {
pub fn status(&self) -> u16 {
self.inner.status().into()
}
pub fn into_raw(self) -> AsyncHttpResponse {
AsyncHttpResponse(self.inner)
}
pub fn into_response<T>(mut self) -> IntoResponse<T>
where
T: IsOk + DeserializeOwned + Send + 'static,
{
let status = self.status();
let body = mem::replace(self.inner.body_mut(), Decoder::empty());
let de_fn = move |body: AsyncChunk| {
parse()
.from_slice(status, body.as_ref())
.map_err(move |e| error::response(status, e))
};
let body_future = body.concat2().map_err(move |e| error::response(status, e));
if let Some(de_pool) = self.de_pool {
IntoResponse::new(body_future.and_then(move |body| de_pool.spawn_fn(move || de_fn(body))))
} else {
IntoResponse::new(body_future.and_then(de_fn))
}
}
}
pub struct IntoResponse<T> {
inner: Box<Future<Item = T, Error = Error>>,
}
impl<T> IntoResponse<T> {
fn new<F>(fut: F) -> Self
where
F: Future<Item = T, Error = Error> + 'static,
{
IntoResponse {
inner: Box::new(fut),
}
}
}
impl<T> Future for IntoResponse<T>
where
T: IsOk + DeserializeOwned + Send + 'static,
{
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
pub struct AsyncHttpResponse(RawResponse);
impl Stream for AsyncHttpResponse {
type Item = AsyncChunk;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.body_mut().poll().map_err(|e| {
let status = self.status();
error::response(status, e)
})
}
}
impl AsyncHttpResponse {
pub fn status(&self) -> u16 {
self.0.status().into()
}
}