use std::error::Error as StdError;
use uuid::Uuid;
use futures::{Future, Poll};
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use elastic_reqwest::{AsyncBody, AsyncElasticClient};
use reqwest::Error as ReqwestError;
use reqwest::unstable::async::{Client as AsyncHttpClient, ClientBuilder as AsyncHttpClientBuilder};
use error::{self, Error};
use client::requests::HttpRequest;
use client::responses::{async_response, AsyncResponseBuilder};
use client::{private, Client, RequestParams, Sender};
pub type AsyncClient = Client<AsyncSender>;
#[derive(Clone)]
pub struct AsyncSender {
pub(in client) http: AsyncHttpClient,
pub(in client) serde_pool: Option<CpuPool>,
}
impl private::Sealed for AsyncSender {}
impl Sender for AsyncSender {
type Body = AsyncBody;
type Response = Pending;
fn send<TRequest, TBody>(&self, req: TRequest, params: &RequestParams) -> Self::Response
where
TRequest: Into<HttpRequest<'static, TBody>>,
TBody: Into<Self::Body>,
{
let serde_pool = self.serde_pool.clone();
let correlation_id = Uuid::new_v4();
let req = req.into();
info!(
"Elasticsearch Request: correlation_id: '{}', path: '{}'",
correlation_id,
req.url.as_ref()
);
let req_future = self.http
.elastic_req(params, req)
.map_err(move |e| {
error!(
"Elasticsearch Response: correlation_id: '{}', error: '{}'",
correlation_id,
e
);
error::request(e)
})
.map(move |res| {
info!(
"Elasticsearch Response: correlation_id: '{}', status: '{}'",
correlation_id,
res.status()
);
async_response(res, serde_pool)
});
Pending::new(req_future)
}
}
pub struct Pending {
inner: Box<Future<Item = AsyncResponseBuilder, Error = Error>>,
}
impl Pending {
fn new<F>(fut: F) -> Self
where
F: Future<Item = AsyncResponseBuilder, Error = Error> + 'static,
{
Pending {
inner: Box::new(fut),
}
}
}
impl Future for Pending {
type Item = AsyncResponseBuilder;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
pub struct AsyncClientBuilder {
serde_pool: Option<CpuPool>,
params: RequestParams,
}
pub trait IntoAsyncHttpClient {
type Error: StdError + Send + 'static;
fn into_async_http_client(self) -> Result<AsyncHttpClient, Self::Error>;
}
impl IntoAsyncHttpClient for AsyncHttpClient {
type Error = Error;
fn into_async_http_client(self) -> Result<AsyncHttpClient, Self::Error> {
Ok(self)
}
}
impl<'a> IntoAsyncHttpClient for &'a Handle {
type Error = ReqwestError;
fn into_async_http_client(self) -> Result<AsyncHttpClient, Self::Error> {
AsyncHttpClientBuilder::new().build(self)
}
}
impl Default for AsyncClientBuilder {
fn default() -> Self {
AsyncClientBuilder::new()
}
}
impl AsyncClientBuilder {
pub fn new() -> Self {
AsyncClientBuilder {
serde_pool: None,
params: RequestParams::default(),
}
}
pub fn from_params(params: RequestParams) -> Self {
AsyncClientBuilder {
serde_pool: None,
params: params,
}
}
pub fn base_url<I>(mut self, base_url: I) -> Self
where
I: Into<String>,
{
self.params = self.params.base_url(base_url);
self
}
pub fn params<F>(mut self, builder: F) -> Self
where
F: Fn(RequestParams) -> RequestParams,
{
self.params = builder(self.params);
self
}
pub fn serde_pool<P>(mut self, serde_pool: P) -> Self
where
P: Into<Option<CpuPool>>,
{
self.serde_pool = serde_pool.into();
self
}
pub fn build<TIntoHttp>(self, client: TIntoHttp) -> Result<AsyncClient, Error>
where
TIntoHttp: IntoAsyncHttpClient,
{
let http = client.into_async_http_client().map_err(error::build)?;
Ok(AsyncClient {
sender: AsyncSender {
http: http,
serde_pool: self.serde_pool,
},
params: self.params,
})
}
}