1use crate::{
32 http::request::Body,
33 params::{SourceFilter, VersionType},
34 Error,
35};
36use bytes::{BufMut, Bytes, BytesMut};
37use serde::{
38 ser::{SerializeMap, Serializer},
39 Deserialize, Serialize,
40};
41
42#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
44enum BulkAction {
45 #[serde(rename = "index")]
47 Index,
48 #[serde(rename = "create")]
50 Create,
51 #[serde(rename = "update")]
53 Update,
54 #[serde(rename = "delete")]
56 Delete,
57}
58
59#[serde_with::skip_serializing_none]
63#[derive(Serialize, Default)]
64struct BulkMetadata {
65 _index: Option<String>,
66 _id: Option<String>,
69 pipeline: Option<String>,
70 if_seq_no: Option<i64>,
71 if_primary_term: Option<i64>,
72 routing: Option<String>,
73 retry_on_conflict: Option<i32>,
74 _source: Option<SourceFilter>,
75 version: Option<i64>,
76 version_type: Option<VersionType>,
77}
78
79struct BulkHeader {
84 action: BulkAction,
85 metadata: BulkMetadata,
86}
87
88impl Serialize for BulkHeader {
89 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
90 where
91 S: Serializer,
92 {
93 let mut map = serializer.serialize_map(Some(1))?;
94 let action = match self.action {
95 BulkAction::Create => "create",
96 BulkAction::Delete => "delete",
97 BulkAction::Index => "index",
98 BulkAction::Update => "update",
99 };
100 map.serialize_entry(action, &self.metadata)?;
101 map.end()
102 }
103}
104
105pub struct BulkOperation<B> {
168 header: BulkHeader,
169 source: Option<B>,
170}
171
172impl<B> BulkOperation<B>
173where
174 B: Serialize,
175{
176 pub fn create<S>(id: S, source: B) -> BulkCreateOperation<B>
178 where
179 S: Into<String>,
180 {
181 BulkCreateOperation::new(id, source)
182 }
183
184 pub fn index(source: B) -> BulkIndexOperation<B> {
186 BulkIndexOperation::new(source)
187 }
188
189 pub fn delete<S>(id: S) -> BulkDeleteOperation<B>
191 where
192 S: Into<String>,
193 {
194 BulkDeleteOperation::new(id)
195 }
196
197 pub fn update<S>(id: S, source: B) -> BulkUpdateOperation<B>
199 where
200 S: Into<String>,
201 {
202 BulkUpdateOperation::new(id, source)
203 }
204}
205
206impl<B> Body for BulkOperation<B>
207where
208 B: Serialize,
209{
210 fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
211 let writer = bytes.writer();
212 serde_json::to_writer(writer, &self.header)?;
213 bytes.put_u8(b'\n');
214
215 if let Some(source) = &self.source {
216 let writer = bytes.writer();
217 serde_json::to_writer(writer, source)?;
218 bytes.put_u8(b'\n');
219 }
220
221 Ok(())
222 }
223}
224
225pub struct BulkCreateOperation<B> {
227 operation: BulkOperation<B>,
228}
229
230impl<B> BulkCreateOperation<B> {
231 pub fn new<S>(id: S, source: B) -> Self
233 where
234 S: Into<String>,
235 {
236 Self {
237 operation: BulkOperation {
238 header: BulkHeader {
239 action: BulkAction::Create,
240 metadata: BulkMetadata {
241 _id: Some(id.into()),
242 ..Default::default()
243 },
244 },
245 source: Some(source),
246 },
247 }
248 }
249
250 pub fn index<S>(mut self, index: S) -> Self
257 where
258 S: Into<String>,
259 {
260 self.operation.header.metadata._index = Some(index.into());
261 self
262 }
263
264 pub fn pipeline<S>(mut self, pipeline: S) -> Self
266 where
267 S: Into<String>,
268 {
269 self.operation.header.metadata.pipeline = Some(pipeline.into());
270 self
271 }
272
273 pub fn routing<S>(mut self, routing: S) -> Self
275 where
276 S: Into<String>,
277 {
278 self.operation.header.metadata.routing = Some(routing.into());
279 self
280 }
281}
282
283impl<B> From<BulkCreateOperation<B>> for BulkOperation<B> {
284 fn from(b: BulkCreateOperation<B>) -> Self {
285 b.operation
286 }
287}
288
289pub struct BulkIndexOperation<B> {
291 operation: BulkOperation<B>,
292}
293
294impl<B> BulkIndexOperation<B> {
295 pub fn new(source: B) -> Self {
297 Self {
298 operation: BulkOperation {
299 header: BulkHeader {
300 action: BulkAction::Index,
301 metadata: BulkMetadata {
302 ..Default::default()
303 },
304 },
305 source: Some(source),
306 },
307 }
308 }
309
310 pub fn id<S>(mut self, id: S) -> Self
315 where
316 S: Into<String>,
317 {
318 self.operation.header.metadata._id = Some(id.into());
319 self
320 }
321
322 pub fn index<S>(mut self, index: S) -> Self
329 where
330 S: Into<String>,
331 {
332 self.operation.header.metadata._index = Some(index.into());
333 self
334 }
335
336 pub fn pipeline<S>(mut self, pipeline: S) -> Self
338 where
339 S: Into<String>,
340 {
341 self.operation.header.metadata.pipeline = Some(pipeline.into());
342 self
343 }
344
345 pub fn routing<S>(mut self, routing: S) -> Self
347 where
348 S: Into<String>,
349 {
350 self.operation.header.metadata.routing = Some(routing.into());
351 self
352 }
353
354 pub fn if_seq_no(mut self, seq_no: i64) -> Self {
356 self.operation.header.metadata.if_seq_no = Some(seq_no);
357 self
358 }
359
360 pub fn if_primary_term(mut self, primary_term: i64) -> Self {
363 self.operation.header.metadata.if_primary_term = Some(primary_term);
364 self
365 }
366
367 pub fn version(mut self, version: i64) -> Self {
369 self.operation.header.metadata.version = Some(version);
370 self
371 }
372
373 pub fn version_type(mut self, version_type: VersionType) -> Self {
375 self.operation.header.metadata.version_type = Some(version_type);
376 self
377 }
378}
379
380impl<B> From<BulkIndexOperation<B>> for BulkOperation<B> {
381 fn from(b: BulkIndexOperation<B>) -> Self {
382 b.operation
383 }
384}
385
386pub struct BulkDeleteOperation<B> {
392 operation: BulkOperation<B>,
393}
394
395impl<B> BulkDeleteOperation<B> {
396 pub fn new<S>(id: S) -> Self
398 where
399 S: Into<String>,
400 {
401 Self {
402 operation: BulkOperation {
403 header: BulkHeader {
404 action: BulkAction::Delete,
405 metadata: BulkMetadata {
406 _id: Some(id.into()),
407 ..Default::default()
408 },
409 },
410 source: Option::<B>::None,
411 },
412 }
413 }
414
415 pub fn index<S>(mut self, index: S) -> Self
422 where
423 S: Into<String>,
424 {
425 self.operation.header.metadata._index = Some(index.into());
426 self
427 }
428
429 pub fn routing<S>(mut self, routing: S) -> Self
431 where
432 S: Into<String>,
433 {
434 self.operation.header.metadata.routing = Some(routing.into());
435 self
436 }
437
438 pub fn if_seq_no(mut self, seq_no: i64) -> Self {
440 self.operation.header.metadata.if_seq_no = Some(seq_no);
441 self
442 }
443
444 pub fn if_primary_term(mut self, primary_term: i64) -> Self {
447 self.operation.header.metadata.if_primary_term = Some(primary_term);
448 self
449 }
450
451 pub fn version(mut self, version: i64) -> Self {
453 self.operation.header.metadata.version = Some(version);
454 self
455 }
456
457 pub fn version_type(mut self, version_type: VersionType) -> Self {
459 self.operation.header.metadata.version_type = Some(version_type);
460 self
461 }
462}
463
464impl<B> From<BulkDeleteOperation<B>> for BulkOperation<B> {
465 fn from(b: BulkDeleteOperation<B>) -> Self {
466 b.operation
467 }
468}
469
470pub struct BulkUpdateOperation<B> {
472 operation: BulkOperation<B>,
473}
474
475impl<B> BulkUpdateOperation<B>
476where
477 B: serde::Serialize,
478{
479 pub fn new<S>(id: S, source: B) -> Self
481 where
482 S: Into<String>,
483 {
484 Self {
485 operation: BulkOperation {
486 header: BulkHeader {
487 action: BulkAction::Update,
488 metadata: BulkMetadata {
489 _id: Some(id.into()),
490 ..Default::default()
491 },
492 },
493 source: Some(source),
494 },
495 }
496 }
497
498 pub fn index<S>(mut self, index: S) -> Self
505 where
506 S: Into<String>,
507 {
508 self.operation.header.metadata._index = Some(index.into());
509 self
510 }
511
512 pub fn routing<S>(mut self, routing: S) -> Self
514 where
515 S: Into<String>,
516 {
517 self.operation.header.metadata.routing = Some(routing.into());
518 self
519 }
520
521 pub fn if_seq_no(mut self, seq_no: i64) -> Self {
523 self.operation.header.metadata.if_seq_no = Some(seq_no);
524 self
525 }
526
527 pub fn if_primary_term(mut self, primary_term: i64) -> Self {
530 self.operation.header.metadata.if_primary_term = Some(primary_term);
531 self
532 }
533
534 pub fn version(mut self, version: i64) -> Self {
536 self.operation.header.metadata.version = Some(version);
537 self
538 }
539
540 pub fn version_type(mut self, version_type: VersionType) -> Self {
542 self.operation.header.metadata.version_type = Some(version_type);
543 self
544 }
545
546 pub fn retry_on_conflict(mut self, retry_on_conflict: i32) -> Self {
548 self.operation.header.metadata.retry_on_conflict = Some(retry_on_conflict);
549 self
550 }
551
552 pub fn source<S>(mut self, source: S) -> Self
556 where
557 S: Into<SourceFilter>,
558 {
559 self.operation.header.metadata._source = Some(source.into());
560 self
561 }
562}
563
564impl<B> From<BulkUpdateOperation<B>> for BulkOperation<B> {
565 fn from(b: BulkUpdateOperation<B>) -> Self {
566 b.operation
567 }
568}
569
570pub struct BulkOperations {
630 buf: BytesMut,
631}
632
633impl BulkOperations {
634 pub fn new() -> Self {
636 Self {
637 buf: BytesMut::new(),
638 }
639 }
640
641 pub fn with_bytes(buf: BytesMut) -> Self {
644 Self { buf }
645 }
646
647 pub fn push<O, B>(&mut self, op: O) -> Result<(), Error>
651 where
652 O: Into<BulkOperation<B>>,
653 B: Serialize,
654 {
655 op.into().write(&mut self.buf)
656 }
657}
658
659impl Default for BulkOperations {
660 fn default() -> Self {
661 Self::new()
662 }
663}
664
665impl Body for BulkOperations {
666 fn bytes(&self) -> Option<Bytes> {
667 Some(self.buf.clone().freeze())
668 }
669
670 fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
671 self.buf.write(bytes)
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use crate::{
678 http::request::{Body, NdBody},
679 params::VersionType,
680 BulkOperation, BulkOperations,
681 };
682 use bytes::{BufMut, BytesMut};
683 use serde::Serialize;
684 use serde_json::{json, Value};
685 use std::{cmp::Ordering, str};
686
687 pub fn compare(a: &[u8], b: &[u8]) -> Ordering {
688 a.iter()
689 .zip(b)
690 .map(|(x, y)| x.cmp(y))
691 .find(|&ord| ord != Ordering::Equal)
692 .unwrap_or(a.len().cmp(&b.len()))
693 }
694
695 #[test]
696 fn serialize_bulk_operations_with_same_type_writes_to_bytes() -> Result<(), failure::Error> {
697 let mut bytes = BytesMut::new();
698 let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
699
700 ops.push(
701 BulkOperation::index(json!({ "foo": "index" }))
702 .id("1")
703 .pipeline("pipeline")
704 .routing("routing")
705 .if_seq_no(1)
706 .if_primary_term(2)
707 .version(3)
708 .version_type(VersionType::Internal)
709 .into(),
710 );
711 ops.push(
712 BulkOperation::create("2", json!({ "bar": "create" }))
713 .pipeline("pipeline")
714 .routing("routing")
715 .index("create_index")
716 .into(),
717 );
718 ops.push(
719 BulkOperation::update("3", json!({ "baz": "update_1" }))
720 .source(false)
721 .into(),
722 );
723 ops.push(
724 BulkOperation::update("4", json!({ "baz": "update_2" }))
725 .source("baz")
726 .into(),
727 );
728 ops.push(
729 BulkOperation::update("5", json!({ "baz": "update_3" }))
730 .source(vec!["baz"])
731 .into(),
732 );
733 ops.push(
734 BulkOperation::update("6", json!({ "baz": "update_4" }))
735 .source((vec!["baz"], vec!["bar"]))
736 .into(),
737 );
738 ops.push(BulkOperation::delete("7").into());
739
740 let body = NdBody(ops);
741 let _ = body.write(&mut bytes)?;
742
743 let mut expected = BytesMut::new();
744 expected.put_slice(b"{\"index\":{\"_id\":\"1\",\"pipeline\":\"pipeline\",\"if_seq_no\":1,\"if_primary_term\":2,\"routing\":\"routing\",\"version\":3,\"version_type\":\"internal\"}}\n");
745 expected.put_slice(b"{\"foo\":\"index\"}\n");
746 expected.put_slice(b"{\"create\":{\"_index\":\"create_index\",\"_id\":\"2\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
747 expected.put_slice(b"{\"bar\":\"create\"}\n");
748 expected.put_slice(b"{\"update\":{\"_id\":\"3\",\"_source\":false}}\n");
749 expected.put_slice(b"{\"baz\":\"update_1\"}\n");
750 expected.put_slice(b"{\"update\":{\"_id\":\"4\",\"_source\":\"baz\"}}\n");
751 expected.put_slice(b"{\"baz\":\"update_2\"}\n");
752 expected.put_slice(b"{\"update\":{\"_id\":\"5\",\"_source\":[\"baz\"]}}\n");
753 expected.put_slice(b"{\"baz\":\"update_3\"}\n");
754 expected.put_slice(b"{\"update\":{\"_id\":\"6\",\"_source\":{\"includes\":[\"baz\"],\"excludes\":[\"bar\"]}}}\n");
755 expected.put_slice(b"{\"baz\":\"update_4\"}\n");
756 expected.put_slice(b"{\"delete\":{\"_id\":\"7\"}}\n");
757
758 assert_eq!(
759 compare(&expected[..], &bytes[..]),
760 Ordering::Equal,
761 "expected {} but found {}",
762 str::from_utf8(&expected[..]).unwrap(),
763 str::from_utf8(&bytes[..]).unwrap()
764 );
765 Ok(())
766 }
767
768 #[test]
769 fn serialize_bulk_operations_with_different_types_writes_to_bytes() -> Result<(), failure::Error>
770 {
771 #[derive(Serialize)]
772 struct IndexDoc<'a> {
773 foo: &'a str,
774 }
775 #[derive(Serialize)]
776 struct CreateDoc<'a> {
777 bar: &'a str,
778 }
779 #[derive(Serialize)]
780 struct UpdateDoc<'a> {
781 baz: &'a str,
782 }
783
784 let mut bytes = BytesMut::new();
785 let mut ops = BulkOperations::new();
786
787 ops.push(
788 BulkOperation::index(IndexDoc { foo: "index" })
789 .id("1")
790 .pipeline("pipeline")
791 .index("index_doc")
792 .routing("routing"),
793 )?;
794 ops.push(BulkOperation::create("2", CreateDoc { bar: "create" }))?;
795 ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
796 ops.push(BulkOperation::<()>::delete("4"))?;
797
798 let body = NdBody(vec![ops]);
799 let _ = body.write(&mut bytes)?;
800
801 let mut expected = BytesMut::new();
802 expected.put_slice(b"{\"index\":{\"_index\":\"index_doc\",\"_id\":\"1\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
803 expected.put_slice(b"{\"foo\":\"index\"}\n");
804 expected.put_slice(b"{\"create\":{\"_id\":\"2\"}}\n");
805 expected.put_slice(b"{\"bar\":\"create\"}\n");
806 expected.put_slice(b"{\"update\":{\"_id\":\"3\"}}\n");
807 expected.put_slice(b"{\"baz\":\"update\"}\n");
808 expected.put_slice(b"{\"delete\":{\"_id\":\"4\"}}\n");
809
810 assert_eq!(
811 compare(&expected[..], &bytes[..]),
812 Ordering::Equal,
813 "expected {} but found {}",
814 str::from_utf8(&expected[..]).unwrap(),
815 str::from_utf8(&bytes[..]).unwrap()
816 );
817 Ok(())
818 }
819}