use crate::{
http::request::Body,
params::{SourceFilter, VersionType},
Error,
};
use bytes::{BufMut, Bytes, BytesMut};
use serde::{
ser::{SerializeMap, Serializer},
Deserialize, Serialize,
};
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
enum BulkAction {
#[serde(rename = "index")]
Index,
#[serde(rename = "create")]
Create,
#[serde(rename = "update")]
Update,
#[serde(rename = "delete")]
Delete,
}
#[serde_with::skip_serializing_none]
#[derive(Serialize, Default, Clone)]
struct BulkMetadata {
_index: Option<String>,
_id: Option<String>,
pipeline: Option<String>,
if_seq_no: Option<i64>,
if_primary_term: Option<i64>,
routing: Option<String>,
retry_on_conflict: Option<i32>,
_source: Option<SourceFilter>,
version: Option<i64>,
version_type: Option<VersionType>,
}
#[derive(Clone)]
struct BulkHeader {
action: BulkAction,
metadata: BulkMetadata,
}
impl Serialize for BulkHeader {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(Some(1))?;
let action = match self.action {
BulkAction::Create => "create",
BulkAction::Delete => "delete",
BulkAction::Index => "index",
BulkAction::Update => "update",
};
map.serialize_entry(action, &self.metadata)?;
map.end()
}
}
#[derive(Clone)]
pub struct BulkOperation<B> {
header: BulkHeader,
source: Option<B>,
}
impl<B> BulkOperation<B>
where
B: Serialize,
{
pub fn create(source: B) -> BulkCreateOperation<B> {
BulkCreateOperation::new(source)
}
pub fn index(source: B) -> BulkIndexOperation<B> {
BulkIndexOperation::new(source)
}
pub fn delete<S>(id: S) -> BulkDeleteOperation<B>
where
S: Into<String>,
{
BulkDeleteOperation::new(id)
}
pub fn update<S>(id: S, source: B) -> BulkUpdateOperation<B>
where
S: Into<String>,
{
BulkUpdateOperation::new(id, source)
}
}
impl<B> Body for BulkOperation<B>
where
B: Serialize,
{
fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
let writer = bytes.writer();
serde_json::to_writer(writer, &self.header)?;
bytes.put_u8(b'\n');
if let Some(source) = &self.source {
let writer = bytes.writer();
serde_json::to_writer(writer, source)?;
bytes.put_u8(b'\n');
}
Ok(())
}
}
pub struct BulkCreateOperation<B> {
operation: BulkOperation<B>,
}
impl<B> BulkCreateOperation<B> {
pub fn new(source: B) -> Self {
Self {
operation: BulkOperation {
header: BulkHeader {
action: BulkAction::Create,
metadata: BulkMetadata::default(),
},
source: Some(source),
},
}
}
pub fn id<S>(mut self, id: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata._id = Some(id.into());
self
}
pub fn index<S>(mut self, index: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata._index = Some(index.into());
self
}
pub fn pipeline<S>(mut self, pipeline: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata.pipeline = Some(pipeline.into());
self
}
pub fn routing<S>(mut self, routing: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata.routing = Some(routing.into());
self
}
}
impl<B> From<BulkCreateOperation<B>> for BulkOperation<B> {
fn from(b: BulkCreateOperation<B>) -> Self {
b.operation
}
}
pub struct BulkIndexOperation<B> {
operation: BulkOperation<B>,
}
impl<B> BulkIndexOperation<B> {
pub fn new(source: B) -> Self {
Self {
operation: BulkOperation {
header: BulkHeader {
action: BulkAction::Index,
metadata: BulkMetadata {
..Default::default()
},
},
source: Some(source),
},
}
}
pub fn id<S>(mut self, id: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata._id = Some(id.into());
self
}
pub fn index<S>(mut self, index: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata._index = Some(index.into());
self
}
pub fn pipeline<S>(mut self, pipeline: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata.pipeline = Some(pipeline.into());
self
}
pub fn routing<S>(mut self, routing: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata.routing = Some(routing.into());
self
}
pub fn if_seq_no(mut self, seq_no: i64) -> Self {
self.operation.header.metadata.if_seq_no = Some(seq_no);
self
}
pub fn if_primary_term(mut self, primary_term: i64) -> Self {
self.operation.header.metadata.if_primary_term = Some(primary_term);
self
}
pub fn version(mut self, version: i64) -> Self {
self.operation.header.metadata.version = Some(version);
self
}
pub fn version_type(mut self, version_type: VersionType) -> Self {
self.operation.header.metadata.version_type = Some(version_type);
self
}
}
impl<B> From<BulkIndexOperation<B>> for BulkOperation<B> {
fn from(b: BulkIndexOperation<B>) -> Self {
b.operation
}
}
pub struct BulkDeleteOperation<B> {
operation: BulkOperation<B>,
}
impl<B> BulkDeleteOperation<B> {
pub fn new<S>(id: S) -> Self
where
S: Into<String>,
{
Self {
operation: BulkOperation {
header: BulkHeader {
action: BulkAction::Delete,
metadata: BulkMetadata {
_id: Some(id.into()),
..Default::default()
},
},
source: Option::<B>::None,
},
}
}
pub fn index<S>(mut self, index: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata._index = Some(index.into());
self
}
pub fn routing<S>(mut self, routing: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata.routing = Some(routing.into());
self
}
pub fn if_seq_no(mut self, seq_no: i64) -> Self {
self.operation.header.metadata.if_seq_no = Some(seq_no);
self
}
pub fn if_primary_term(mut self, primary_term: i64) -> Self {
self.operation.header.metadata.if_primary_term = Some(primary_term);
self
}
pub fn version(mut self, version: i64) -> Self {
self.operation.header.metadata.version = Some(version);
self
}
pub fn version_type(mut self, version_type: VersionType) -> Self {
self.operation.header.metadata.version_type = Some(version_type);
self
}
}
impl<B> From<BulkDeleteOperation<B>> for BulkOperation<B> {
fn from(b: BulkDeleteOperation<B>) -> Self {
b.operation
}
}
pub struct BulkUpdateOperation<B> {
operation: BulkOperation<B>,
}
impl<B> BulkUpdateOperation<B>
where
B: serde::Serialize,
{
pub fn new<S>(id: S, source: B) -> Self
where
S: Into<String>,
{
Self {
operation: BulkOperation {
header: BulkHeader {
action: BulkAction::Update,
metadata: BulkMetadata {
_id: Some(id.into()),
..Default::default()
},
},
source: Some(source),
},
}
}
pub fn index<S>(mut self, index: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata._index = Some(index.into());
self
}
pub fn routing<S>(mut self, routing: S) -> Self
where
S: Into<String>,
{
self.operation.header.metadata.routing = Some(routing.into());
self
}
pub fn if_seq_no(mut self, seq_no: i64) -> Self {
self.operation.header.metadata.if_seq_no = Some(seq_no);
self
}
pub fn if_primary_term(mut self, primary_term: i64) -> Self {
self.operation.header.metadata.if_primary_term = Some(primary_term);
self
}
pub fn version(mut self, version: i64) -> Self {
self.operation.header.metadata.version = Some(version);
self
}
pub fn version_type(mut self, version_type: VersionType) -> Self {
self.operation.header.metadata.version_type = Some(version_type);
self
}
pub fn retry_on_conflict(mut self, retry_on_conflict: i32) -> Self {
self.operation.header.metadata.retry_on_conflict = Some(retry_on_conflict);
self
}
pub fn source<S>(mut self, source: S) -> Self
where
S: Into<SourceFilter>,
{
self.operation.header.metadata._source = Some(source.into());
self
}
}
impl<B> From<BulkUpdateOperation<B>> for BulkOperation<B> {
fn from(b: BulkUpdateOperation<B>) -> Self {
b.operation
}
}
pub struct BulkOperations {
buf: BytesMut,
}
impl BulkOperations {
pub fn new() -> Self {
Self {
buf: BytesMut::new(),
}
}
pub fn with_bytes(buf: BytesMut) -> Self {
Self { buf }
}
pub fn push<O, B>(&mut self, op: O) -> Result<(), Error>
where
O: Into<BulkOperation<B>>,
B: Serialize,
{
op.into().write(&mut self.buf)
}
}
impl Default for BulkOperations {
fn default() -> Self {
Self::new()
}
}
impl Body for BulkOperations {
fn bytes(&self) -> Option<Bytes> {
Some(self.buf.clone().freeze())
}
fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
self.buf.write(bytes)
}
}
#[cfg(test)]
mod tests {
use crate::{
http::request::{Body, NdBody},
params::VersionType,
BulkOperation, BulkOperations,
};
use bytes::{BufMut, BytesMut};
use serde::Serialize;
use serde_json::{json, Value};
use std::{cmp::Ordering, str};
pub fn compare(a: &[u8], b: &[u8]) -> Ordering {
a.iter()
.zip(b)
.map(|(x, y)| x.cmp(y))
.find(|&ord| ord != Ordering::Equal)
.unwrap_or(a.len().cmp(&b.len()))
}
#[test]
fn serialize_bulk_operations_with_same_type_writes_to_bytes() -> anyhow::Result<()> {
let mut bytes = BytesMut::new();
let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
ops.push(
BulkOperation::index(json!({ "foo": "index" }))
.id("1")
.pipeline("pipeline")
.routing("routing")
.if_seq_no(1)
.if_primary_term(2)
.version(3)
.version_type(VersionType::Internal)
.into(),
);
ops.push(
BulkOperation::create(json!({ "bar": "create" }))
.id("2")
.pipeline("pipeline")
.routing("routing")
.index("create_index")
.into(),
);
ops.push(
BulkOperation::update("3", json!({ "baz": "update_1" }))
.source(false)
.into(),
);
ops.push(
BulkOperation::update("4", json!({ "baz": "update_2" }))
.source("baz")
.into(),
);
ops.push(
BulkOperation::update("5", json!({ "baz": "update_3" }))
.source(vec!["baz"])
.into(),
);
ops.push(
BulkOperation::update("6", json!({ "baz": "update_4" }))
.source((vec!["baz"], vec!["bar"]))
.into(),
);
ops.push(BulkOperation::delete("7").into());
let body = NdBody::new(ops);
body.write(&mut bytes)?;
let mut expected = BytesMut::new();
expected.put_slice(b"{\"index\":{\"_id\":\"1\",\"pipeline\":\"pipeline\",\"if_seq_no\":1,\"if_primary_term\":2,\"routing\":\"routing\",\"version\":3,\"version_type\":\"internal\"}}\n");
expected.put_slice(b"{\"foo\":\"index\"}\n");
expected.put_slice(b"{\"create\":{\"_index\":\"create_index\",\"_id\":\"2\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
expected.put_slice(b"{\"bar\":\"create\"}\n");
expected.put_slice(b"{\"update\":{\"_id\":\"3\",\"_source\":false}}\n");
expected.put_slice(b"{\"baz\":\"update_1\"}\n");
expected.put_slice(b"{\"update\":{\"_id\":\"4\",\"_source\":\"baz\"}}\n");
expected.put_slice(b"{\"baz\":\"update_2\"}\n");
expected.put_slice(b"{\"update\":{\"_id\":\"5\",\"_source\":[\"baz\"]}}\n");
expected.put_slice(b"{\"baz\":\"update_3\"}\n");
expected.put_slice(b"{\"update\":{\"_id\":\"6\",\"_source\":{\"includes\":[\"baz\"],\"excludes\":[\"bar\"]}}}\n");
expected.put_slice(b"{\"baz\":\"update_4\"}\n");
expected.put_slice(b"{\"delete\":{\"_id\":\"7\"}}\n");
assert_eq!(
compare(&expected[..], &bytes[..]),
Ordering::Equal,
"expected {} but found {}",
str::from_utf8(&expected[..]).unwrap(),
str::from_utf8(&bytes[..]).unwrap()
);
Ok(())
}
#[test]
fn serialize_bulk_operations_with_different_types_writes_to_bytes() -> anyhow::Result<()> {
#[derive(Serialize)]
struct IndexDoc<'a> {
foo: &'a str,
}
#[derive(Serialize)]
struct CreateDoc<'a> {
bar: &'a str,
}
#[derive(Serialize)]
struct UpdateDoc<'a> {
baz: &'a str,
}
let mut bytes = BytesMut::new();
let mut ops = BulkOperations::new();
ops.push(
BulkOperation::index(IndexDoc { foo: "index" })
.id("1")
.pipeline("pipeline")
.index("index_doc")
.routing("routing"),
)?;
ops.push(BulkOperation::create(CreateDoc { bar: "create" }).id("2"))?;
ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
ops.push(BulkOperation::<()>::delete("4"))?;
let body = NdBody::new(vec![ops]);
body.write(&mut bytes)?;
let mut expected = BytesMut::new();
expected.put_slice(b"{\"index\":{\"_index\":\"index_doc\",\"_id\":\"1\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
expected.put_slice(b"{\"foo\":\"index\"}\n");
expected.put_slice(b"{\"create\":{\"_id\":\"2\"}}\n");
expected.put_slice(b"{\"bar\":\"create\"}\n");
expected.put_slice(b"{\"update\":{\"_id\":\"3\"}}\n");
expected.put_slice(b"{\"baz\":\"update\"}\n");
expected.put_slice(b"{\"delete\":{\"_id\":\"4\"}}\n");
assert_eq!(
compare(&expected[..], &bytes[..]),
Ordering::Equal,
"expected {} but found {}",
str::from_utf8(&expected[..]).unwrap(),
str::from_utf8(&bytes[..]).unwrap()
);
Ok(())
}
}