use std::marker::PhantomData;
use serde_json;
use futures::{Future, IntoFuture, Poll};
use futures_cpupool::CpuPool;
use serde::Serialize;
use error::{self, Error, Result};
use client::{AsyncSender, Client, Sender, SyncSender};
use client::requests::RequestBuilder;
use client::requests::params::{Index, Type};
use client::requests::endpoints::IndicesPutMappingRequest;
use client::requests::raw::RawRequestInner;
use client::responses::CommandResponse;
use types::document::DocumentType;
pub type PutMappingRequestBuilder<TSender, TDocument> = RequestBuilder<TSender, PutMappingRequestInner<TDocument>>;
#[doc(hidden)]
pub struct PutMappingRequestInner<TDocument> {
index: Index<'static>,
ty: Type<'static>,
_marker: PhantomData<TDocument>,
}
impl<TSender> Client<TSender>
where
TSender: Sender,
{
pub fn document_put_mapping<TDocument>(&self, index: Index<'static>) -> PutMappingRequestBuilder<TSender, TDocument>
where
TDocument: Serialize + DocumentType,
{
let ty = TDocument::name().into();
RequestBuilder::new(
self.clone(),
None,
PutMappingRequestInner {
index: index,
ty: ty,
_marker: PhantomData,
},
)
}
}
impl<TDocument> PutMappingRequestInner<TDocument>
where
TDocument: DocumentType,
{
fn into_sync_request(self) -> Result<IndicesPutMappingRequest<'static, Vec<u8>>> {
let body = serde_json::to_vec(&TDocument::index_mapping()).map_err(error::request)?;
Ok(IndicesPutMappingRequest::for_index_ty(
self.index,
self.ty,
body,
))
}
}
impl<TDocument> PutMappingRequestInner<TDocument>
where
TDocument: DocumentType + Send + 'static,
{
fn into_async_request(self, ser_pool: Option<CpuPool>) -> Box<Future<Item = IndicesPutMappingRequest<'static, Vec<u8>>, Error = Error>> {
if let Some(ser_pool) = ser_pool {
let request_future = ser_pool.spawn_fn(|| self.into_sync_request());
Box::new(request_future)
} else {
Box::new(self.into_sync_request().into_future())
}
}
}
impl<TSender, TDocument> PutMappingRequestBuilder<TSender, TDocument>
where
TSender: Sender,
{
pub fn ty<I>(mut self, ty: I) -> Self
where
I: Into<Type<'static>>,
{
self.inner.ty = ty.into();
self
}
}
impl<TDocument> PutMappingRequestBuilder<SyncSender, TDocument>
where
TDocument: DocumentType,
{
pub fn send(self) -> Result<CommandResponse> {
let req = self.inner.into_sync_request()?;
RequestBuilder::new(self.client, self.params, RawRequestInner::new(req))
.send()?
.into_response()
}
}
impl<TDocument> PutMappingRequestBuilder<AsyncSender, TDocument>
where
TDocument: DocumentType + Send + 'static,
{
pub fn send(self) -> Pending {
let (client, params) = (self.client, self.params);
let ser_pool = client.sender.serde_pool.clone();
let req_future = self.inner.into_async_request(ser_pool);
let res_future = req_future.and_then(move |req| {
RequestBuilder::new(client, params, RawRequestInner::new(req))
.send()
.and_then(|res| res.into_response())
});
Pending::new(res_future)
}
}
pub struct Pending {
inner: Box<Future<Item = CommandResponse, Error = Error>>,
}
impl Pending {
fn new<F>(fut: F) -> Self
where
F: Future<Item = CommandResponse, Error = Error> + 'static,
{
Pending {
inner: Box::new(fut),
}
}
}
impl Future for Pending {
type Item = CommandResponse;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
#[cfg(test)]
mod tests {
use serde_json::{self, Value};
use prelude::*;
#[test]
fn default_request() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_put_mapping::<Value>(index("test-idx"))
.inner
.into_sync_request()
.unwrap();
let expected_body = json!({
"properties": {
}
});
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!("/test-idx/_mappings/value", req.url.as_ref());
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
#[test]
fn specify_ty() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_put_mapping::<Value>(index("test-idx"))
.ty("new-ty")
.inner
.into_sync_request()
.unwrap();
assert_eq!("/test-idx/_mappings/new-ty", req.url.as_ref());
}
}