use std::marker::PhantomData;
use futures::{Future, IntoFuture, Poll};
use futures_cpupool::CpuPool;
use serde_json::{self, Map, Value};
use serde::ser::{Serialize, Serializer};
use error::{self, Error};
use client::{AsyncSender, Client, Sender, SyncSender};
use client::requests::RequestBuilder;
use client::requests::params::{Id, Index, Type};
use client::requests::endpoints::UpdateRequest;
use client::requests::raw::RawRequestInner;
use client::responses::UpdateResponse;
use types::document::DocumentType;
pub type UpdateRequestBuilder<TSender, TBody> = RequestBuilder<TSender, UpdateRequestInner<TBody>>;
#[doc(hidden)]
pub struct UpdateRequestInner<TBody> {
index: Index<'static>,
ty: Type<'static>,
id: Id<'static>,
body: TBody,
_marker: PhantomData<TBody>,
}
impl<TSender> Client<TSender>
where
TSender: Sender,
{
pub fn document_update<TDocument>(&self, index: Index<'static>, id: Id<'static>) -> UpdateRequestBuilder<TSender, Doc<TDocument>>
where
TDocument: DocumentType,
{
let ty = TDocument::name().into();
RequestBuilder::new(
self.clone(),
None,
UpdateRequestInner {
index: index,
ty: ty,
id: id,
body: Doc::empty(),
_marker: PhantomData,
},
)
}
}
impl<TBody> UpdateRequestInner<TBody>
where
TBody: Serialize,
{
fn into_sync_request(self) -> Result<UpdateRequest<'static, Vec<u8>>, Error> {
let body = serde_json::to_vec(&self.body).map_err(error::request)?;
Ok(UpdateRequest::for_index_ty_id(
self.index,
self.ty,
self.id,
body,
))
}
}
impl<TBody> UpdateRequestInner<TBody>
where
TBody: Serialize + Send + 'static,
{
fn into_async_request(self, ser_pool: Option<CpuPool>) -> Box<Future<Item = UpdateRequest<'static, Vec<u8>>, Error = Error>> {
if let Some(ser_pool) = ser_pool {
let request_future = ser_pool.spawn_fn(|| self.into_sync_request());
Box::new(request_future)
} else {
Box::new(self.into_sync_request().into_future())
}
}
}
impl<TSender, TBody> UpdateRequestBuilder<TSender, TBody>
where
TSender: Sender,
{
pub fn ty<I>(mut self, ty: I) -> Self
where
I: Into<Type<'static>>,
{
self.inner.ty = ty.into();
self
}
pub fn doc<TDocument>(self, doc: TDocument) -> UpdateRequestBuilder<TSender, Doc<TDocument>>
where
TDocument: Serialize + DocumentType,
{
RequestBuilder::new(
self.client,
self.params,
UpdateRequestInner {
body: Doc::value(doc),
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_marker: PhantomData,
},
)
}
pub fn script<TScript, TParams>(self, builder: TScript) -> UpdateRequestBuilder<TSender, Script<TParams>>
where
TScript: Into<ScriptBuilder<TParams>>,
{
RequestBuilder::new(
self.client,
self.params,
UpdateRequestInner {
body: builder.into().build(),
index: self.inner.index,
ty: self.inner.ty,
id: self.inner.id,
_marker: PhantomData,
},
)
}
pub fn script_fluent<TScript, TBuilder, TParams>(self, source: TScript, builder: TBuilder) -> UpdateRequestBuilder<TSender, Script<TParams>>
where
TScript: ToString,
TBuilder: Fn(ScriptBuilder<DefaultParams>) -> ScriptBuilder<TParams>,
{
let builder = builder(ScriptBuilder::new(source));
self.script(builder)
}
}
impl<TBody> UpdateRequestBuilder<SyncSender, TBody>
where
TBody: Serialize,
{
pub fn send(self) -> Result<UpdateResponse, Error> {
let req = self.inner.into_sync_request()?;
RequestBuilder::new(self.client, self.params, RawRequestInner::new(req))
.send()?
.into_response()
}
}
impl<TBody> UpdateRequestBuilder<AsyncSender, TBody>
where
TBody: Serialize + Send + 'static,
{
pub fn send(self) -> Pending {
let (client, params) = (self.client, self.params);
let ser_pool = client.sender.serde_pool.clone();
let req_future = self.inner.into_async_request(ser_pool);
let res_future = req_future.and_then(move |req| {
RequestBuilder::new(client, params, RawRequestInner::new(req))
.send()
.and_then(|res| res.into_response())
});
Pending::new(res_future)
}
}
pub struct Pending {
inner: Box<Future<Item = UpdateResponse, Error = Error>>,
}
impl Pending {
fn new<F>(fut: F) -> Self
where
F: Future<Item = UpdateResponse, Error = Error> + 'static,
{
Pending {
inner: Box::new(fut),
}
}
}
impl Future for Pending {
type Item = UpdateResponse;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
#[derive(Serialize)]
pub struct Doc<TDocument> {
doc: DocInner<TDocument>,
}
impl<TDocument> Doc<TDocument> {
fn empty() -> Self {
Doc {
doc: DocInner { inner: None },
}
}
fn value(doc: TDocument) -> Self {
Doc {
doc: DocInner { inner: Some(doc) },
}
}
}
struct DocInner<TDocument> {
inner: Option<TDocument>,
}
impl<TDocument> Serialize for DocInner<TDocument>
where
TDocument: Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.inner {
Some(ref doc) => doc.serialize(serializer),
None => Value::Object(Map::new()).serialize(serializer),
}
}
}
type DefaultParams = Map<String, Value>;
#[derive(Serialize)]
pub struct Script<TParams> {
script: ScriptInner<TParams>,
}
#[derive(Serialize)]
struct ScriptInner<TParams> {
#[serde(rename = "inline")] source: String,
#[serde(skip_serializing_if = "Option::is_none")] lang: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] params: Option<TParams>,
}
pub struct ScriptBuilder<TParams> {
source: String,
lang: Option<String>,
params: Option<TParams>,
}
impl ScriptBuilder<DefaultParams> {
pub fn new<TScript>(source: TScript) -> Self
where
TScript: ToString,
{
ScriptBuilder {
source: source.to_string(),
params: None,
lang: None,
}
}
pub fn param<TKey, TValue>(mut self, key: TKey, value: TValue) -> Self
where
TKey: ToString,
TValue: Into<Value>,
{
let mut params = self.params.unwrap_or_else(DefaultParams::new);
params.insert(key.to_string(), value.into());
self.params = Some(params);
self
}
}
impl<TParams> ScriptBuilder<TParams> {
pub fn lang<TLang>(mut self, lang: Option<TLang>) -> Self
where
TLang: ToString,
{
self.lang = lang.map(|lang| lang.to_string());
self
}
pub fn params<TNewParams>(self, params: TNewParams) -> ScriptBuilder<TNewParams> {
ScriptBuilder {
source: self.source,
lang: self.lang,
params: Some(params),
}
}
fn build(self) -> Script<TParams> {
Script {
script: ScriptInner {
source: self.source,
params: self.params,
lang: self.lang,
},
}
}
}
impl From<String> for ScriptBuilder<DefaultParams> {
fn from(source: String) -> Self {
ScriptBuilder::new(source)
}
}
impl<'a> From<&'a str> for ScriptBuilder<DefaultParams> {
fn from(source: &'a str) -> Self {
ScriptBuilder::new(source)
}
}
#[cfg(test)]
mod tests {
use serde_json::{self, Value};
use super::ScriptBuilder;
use prelude::*;
#[test]
fn default_request() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.inner
.into_sync_request()
.unwrap();
assert_eq!("/test-idx/value/1/_update", req.url.as_ref());
let expected_body = json!({
"doc": {}
});
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
#[test]
fn specify_ty() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.ty("new-ty")
.inner
.into_sync_request()
.unwrap();
assert_eq!("/test-idx/new-ty/1/_update", req.url.as_ref());
}
#[test]
fn specify_doc() {
let client = SyncClientBuilder::new().build().unwrap();
let doc = json!({
"a": "string",
"b": 123
});
let expected_body = json!({ "doc": doc });
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.doc(doc)
.inner
.into_sync_request()
.unwrap();
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
#[test]
fn specify_inline_script() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.script("ctx._source.a = params.str")
.inner
.into_sync_request()
.unwrap();
let expected_body = json!({
"script": {
"inline": "ctx._source.a = params.str"
}
});
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
#[test]
fn specify_script_value() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.script(ScriptBuilder::new("ctx._source.a = params.str"))
.inner
.into_sync_request()
.unwrap();
let expected_body = json!({
"script": {
"inline": "ctx._source.a = params.str"
}
});
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
#[test]
fn specify_script_fluent() {
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.script_fluent("ctx._source.a = params.str", |script| {
script
.lang(Some("painless"))
.param("str", "some value")
.param("other", "some other value")
})
.inner
.into_sync_request()
.unwrap();
let expected_body = json!({
"script": {
"inline": "ctx._source.a = params.str",
"lang": "painless",
"params": {
"str": "some value",
"other": "some other value"
}
}
});
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
#[test]
fn specify_typed_script() {
#[derive(Serialize)]
struct MyParams {
a: &'static str,
b: i32,
}
let client = SyncClientBuilder::new().build().unwrap();
let req = client
.document_update::<Value>(index("test-idx"), id("1"))
.script_fluent("ctx._source.a = params.str", |script| {
script.params(MyParams {
a: "some value",
b: 42,
})
})
.inner
.into_sync_request()
.unwrap();
let expected_body = json!({
"script": {
"inline": "ctx._source.a = params.str",
"params": {
"a": "some value",
"b": 42
}
}
});
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
assert_eq!(expected_body.to_string(), actual_body.to_string());
}
}