use std::mem;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde_json::Value;
use reqwest::unstable::async::{Body, Client, ClientBuilder, Decoder, RequestBuilder, Response};
use futures::{Future, Poll, Stream};
use tokio_core::reactor::Handle;
use private;
use super::req::HttpRequest;
use super::res::parsing::{IsOk, Parse};
use super::{build_method, build_url, Error, RequestParams};
pub fn default(handle: &Handle) -> Result<(Client, RequestParams), Error> {
ClientBuilder::new()
.build(handle)
.map(|cli| (cli, RequestParams::default()))
.map_err(Into::into)
}
pub struct AsyncBody(Body);
impl AsyncBody {
pub fn into_inner(self) -> Body {
self.0
}
}
impl From<Body> for AsyncBody {
fn from(body: Body) -> AsyncBody {
AsyncBody(body)
}
}
impl From<Vec<u8>> for AsyncBody {
fn from(body: Vec<u8>) -> AsyncBody {
AsyncBody(body.into())
}
}
impl From<String> for AsyncBody {
fn from(body: String) -> AsyncBody {
AsyncBody(body.into())
}
}
impl From<Value> for AsyncBody {
fn from(body: Value) -> AsyncBody {
AsyncBody(body.to_string().into())
}
}
impl From<&'static [u8]> for AsyncBody {
fn from(body: &'static [u8]) -> AsyncBody {
AsyncBody(Bytes::from(body).into())
}
}
impl From<&'static str> for AsyncBody {
fn from(body: &'static str) -> AsyncBody {
AsyncBody(Bytes::from(body).into())
}
}
pub trait AsyncElasticClient: private::Sealed {
fn elastic_req<I, B>(&self, params: &RequestParams, req: I) -> Pending
where
I: Into<HttpRequest<'static, B>>,
B: Into<AsyncBody>;
}
pub struct Pending {
inner: Box<Future<Item = Response, Error = Error>>,
}
impl Pending {
fn new<F>(fut: F) -> Self
where
F: Future<Item = Response, Error = Error> + 'static,
{
Pending {
inner: Box::new(fut),
}
}
}
impl Future for Pending {
type Item = Response;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
pub fn build_req<I, B>(client: &Client, params: &RequestParams, req: I) -> RequestBuilder
where
I: Into<HttpRequest<'static, B>>,
B: Into<AsyncBody>,
{
let req = req.into();
let url = build_url(&req.url, ¶ms);
let method = build_method(req.method);
let body = req.body;
let mut req = client.request(method, &url);
{
req.headers(params.get_headers());
if let Some(body) = body {
req.body(body.into().into_inner());
}
}
req
}
impl AsyncElasticClient for Client {
fn elastic_req<I, B>(&self, params: &RequestParams, req: I) -> Pending
where
I: Into<HttpRequest<'static, B>>,
B: Into<AsyncBody>,
{
let mut req = build_req(&self, params, req);
Pending::new(req.send().map_err(Into::into))
}
}
impl private::Sealed for Client {}
pub trait AsyncFromResponse<TResponse>: private::Sealed {
fn from_response(self, response: Response) -> FromResponse<TResponse>;
}
pub struct FromResponse<TResponse> {
inner: Box<Future<Item = TResponse, Error = Error>>,
}
impl<TResponse> FromResponse<TResponse> {
fn new<F>(fut: F) -> Self
where
F: Future<Item = TResponse, Error = Error> + 'static,
{
FromResponse {
inner: Box::new(fut),
}
}
}
impl<TResponse> Future for FromResponse<TResponse> {
type Item = TResponse;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl<TResponse> private::Sealed for Parse<TResponse> {}
impl<TResponse: IsOk + DeserializeOwned + 'static> AsyncFromResponse<TResponse> for Parse<TResponse> {
fn from_response(self, mut response: Response) -> FromResponse<TResponse> {
let status: u16 = response.status().into();
let body_future = mem::replace(response.body_mut(), Decoder::empty())
.concat2()
.map_err(Into::into);
let de_future = body_future.and_then(move |body| {
self.from_slice(status, body.as_ref()).map_err(Into::into)
});
FromResponse::new(de_future)
}
}
#[cfg(test)]
mod tests {
use reqwest::Method;
use reqwest::unstable::async::{Client, RequestBuilder};
use reqwest::header::ContentType;
use tokio_core::reactor::Core;
use super::*;
use req::*;
fn params() -> RequestParams {
RequestParams::new("eshost:9200/path")
.url_param("pretty", true)
.url_param("q", "*")
}
fn expected_req(cli: &Client, method: Method, url: &str, body: Option<Vec<u8>>) -> RequestBuilder {
let mut req = cli.request(method, url);
{
req.header(ContentType::json());
if let Some(body) = body {
req.body(body);
}
}
req
}
fn assert_req(expected: RequestBuilder, actual: RequestBuilder) {
assert_eq!(format!("{:?}", expected), format!("{:?}", actual));
}
fn core() -> Core {
Core::new().unwrap()
}
#[test]
fn head_req() {
let cli = Client::new(&core().handle());
let req = build_req(&cli, ¶ms(), PingHeadRequest::new());
let url = "eshost:9200/path/?pretty=true&q=*";
let expected = expected_req(&cli, Method::Head, url, None);
assert_req(expected, req);
}
#[test]
fn get_req() {
let cli = Client::new(&core().handle());
let req = build_req(&cli, ¶ms(), SimpleSearchRequest::new());
let url = "eshost:9200/path/_search?pretty=true&q=*";
let expected = expected_req(&cli, Method::Get, url, None);
assert_req(expected, req);
}
#[test]
fn post_req() {
let cli = Client::new(&core().handle());
let req = build_req(
&cli,
¶ms(),
PercolateRequest::for_index_ty("idx", "ty", vec![]),
);
let url = "eshost:9200/path/idx/ty/_percolate?pretty=true&q=*";
let expected = expected_req(&cli, Method::Post, url, Some(vec![]));
assert_req(expected, req);
}
#[test]
fn put_req() {
let cli = Client::new(&core().handle());
let req = build_req(
&cli,
¶ms(),
IndicesCreateRequest::for_index("idx", vec![]),
);
let url = "eshost:9200/path/idx?pretty=true&q=*";
let expected = expected_req(&cli, Method::Put, url, Some(vec![]));
assert_req(expected, req);
}
#[test]
fn delete_req() {
let cli = Client::new(&core().handle());
let req = build_req(&cli, ¶ms(), IndicesDeleteRequest::for_index("idx"));
let url = "eshost:9200/path/idx?pretty=true&q=*";
let expected = expected_req(&cli, Method::Delete, url, None);
assert_req(expected, req);
}
#[test]
fn owned_string_into_body() {
AsyncBody::from(String::new());
}
#[test]
fn borrowed_string_into_body() {
AsyncBody::from("abc");
}
#[test]
fn owned_vec_into_body() {
AsyncBody::from(Vec::new());
}
#[test]
fn borrowed_vec_into_body() {
static BODY: &'static [u8] = &[0, 1, 2];
AsyncBody::from(BODY);
}
#[test]
fn empty_body_into_body() {
AsyncBody::from(empty_body());
}
#[test]
fn json_value_into_body() {
AsyncBody::from(json!({}));
}
}