1use crate::{
32 http::request::Body,
33 params::{SourceFilter, VersionType},
34 Error,
35};
36use bytes::{BufMut, Bytes, BytesMut};
37use serde::{
38 ser::{SerializeMap, Serializer},
39 Deserialize, Serialize,
40};
41
42#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
44enum BulkAction {
45 #[serde(rename = "index")]
47 Index,
48 #[serde(rename = "create")]
50 Create,
51 #[serde(rename = "update")]
53 Update,
54 #[serde(rename = "delete")]
56 Delete,
57}
58
59#[serde_with::skip_serializing_none]
63#[derive(Serialize, Default)]
64struct BulkMetadata {
65 _index: Option<String>,
66 _id: Option<String>,
67 pipeline: Option<String>,
68 if_seq_no: Option<i64>,
69 if_primary_term: Option<i64>,
70 routing: Option<String>,
71 retry_on_conflict: Option<i32>,
72 _source: Option<SourceFilter>,
73 version: Option<i64>,
74 version_type: Option<VersionType>,
75}
76
77struct BulkHeader {
82 action: BulkAction,
83 metadata: BulkMetadata,
84}
85
86impl Serialize for BulkHeader {
87 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
88 where
89 S: Serializer,
90 {
91 let mut map = serializer.serialize_map(Some(1))?;
92 let action = match self.action {
93 BulkAction::Create => "create",
94 BulkAction::Delete => "delete",
95 BulkAction::Index => "index",
96 BulkAction::Update => "update",
97 };
98 map.serialize_entry(action, &self.metadata)?;
99 map.end()
100 }
101}
102
103pub struct BulkOperation<B> {
166 header: BulkHeader,
167 source: Option<B>,
168}
169
170impl<B> BulkOperation<B>
171where
172 B: Serialize,
173{
174 pub fn create<S>(id: S, source: B) -> BulkCreateOperation<B>
176 where
177 S: Into<String>,
178 {
179 BulkCreateOperation::new(id, source)
180 }
181
182 pub fn create_without_id(source: B) -> BulkCreateOperation<B> {
184 BulkCreateOperation::new_without_id(source)
185 }
186
187 pub fn index(source: B) -> BulkIndexOperation<B> {
189 BulkIndexOperation::new(source)
190 }
191
192 pub fn delete<S>(id: S) -> BulkDeleteOperation<B>
194 where
195 S: Into<String>,
196 {
197 BulkDeleteOperation::new(id)
198 }
199
200 pub fn update<S>(id: S, source: B) -> BulkUpdateOperation<B>
202 where
203 S: Into<String>,
204 {
205 BulkUpdateOperation::new(id, source)
206 }
207}
208
209impl<B> Body for BulkOperation<B>
210where
211 B: Serialize,
212{
213 fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
214 let writer = bytes.writer();
215 serde_json::to_writer(writer, &self.header)?;
216 bytes.put_u8(b'\n');
217
218 if let Some(source) = &self.source {
219 let writer = bytes.writer();
220 serde_json::to_writer(writer, source)?;
221 bytes.put_u8(b'\n');
222 }
223
224 Ok(())
225 }
226}
227
228pub struct BulkCreateOperation<B> {
230 operation: BulkOperation<B>,
231}
232
233impl<B> BulkCreateOperation<B> {
234 pub fn new<S>(id: S, source: B) -> Self
236 where
237 S: Into<String>,
238 {
239 Self {
240 operation: BulkOperation {
241 header: BulkHeader {
242 action: BulkAction::Create,
243 metadata: BulkMetadata {
244 _id: Some(id.into()),
245 ..Default::default()
246 },
247 },
248 source: Some(source),
249 },
250 }
251 }
252
253 pub fn new_without_id(source: B) -> Self {
255 Self {
256 operation: BulkOperation {
257 header: BulkHeader {
258 action: BulkAction::Create,
259 metadata: BulkMetadata {
260 _id: None,
261 ..Default::default()
262 },
263 },
264 source: Some(source),
265 },
266 }
267 }
268
269 pub fn index<S>(mut self, index: S) -> Self
276 where
277 S: Into<String>,
278 {
279 self.operation.header.metadata._index = Some(index.into());
280 self
281 }
282
283 pub fn pipeline<S>(mut self, pipeline: S) -> Self
285 where
286 S: Into<String>,
287 {
288 self.operation.header.metadata.pipeline = Some(pipeline.into());
289 self
290 }
291
292 pub fn routing<S>(mut self, routing: S) -> Self
294 where
295 S: Into<String>,
296 {
297 self.operation.header.metadata.routing = Some(routing.into());
298 self
299 }
300}
301
302impl<B> From<BulkCreateOperation<B>> for BulkOperation<B> {
303 fn from(b: BulkCreateOperation<B>) -> Self {
304 b.operation
305 }
306}
307
308pub struct BulkIndexOperation<B> {
310 operation: BulkOperation<B>,
311}
312
313impl<B> BulkIndexOperation<B> {
314 pub fn new(source: B) -> Self {
316 Self {
317 operation: BulkOperation {
318 header: BulkHeader {
319 action: BulkAction::Index,
320 metadata: BulkMetadata {
321 ..Default::default()
322 },
323 },
324 source: Some(source),
325 },
326 }
327 }
328
329 pub fn id<S>(mut self, id: S) -> Self
334 where
335 S: Into<String>,
336 {
337 self.operation.header.metadata._id = Some(id.into());
338 self
339 }
340
341 pub fn index<S>(mut self, index: S) -> Self
348 where
349 S: Into<String>,
350 {
351 self.operation.header.metadata._index = Some(index.into());
352 self
353 }
354
355 pub fn pipeline<S>(mut self, pipeline: S) -> Self
357 where
358 S: Into<String>,
359 {
360 self.operation.header.metadata.pipeline = Some(pipeline.into());
361 self
362 }
363
364 pub fn routing<S>(mut self, routing: S) -> Self
366 where
367 S: Into<String>,
368 {
369 self.operation.header.metadata.routing = Some(routing.into());
370 self
371 }
372
373 pub fn if_seq_no(mut self, seq_no: i64) -> Self {
375 self.operation.header.metadata.if_seq_no = Some(seq_no);
376 self
377 }
378
379 pub fn if_primary_term(mut self, primary_term: i64) -> Self {
382 self.operation.header.metadata.if_primary_term = Some(primary_term);
383 self
384 }
385
386 pub fn version(mut self, version: i64) -> Self {
388 self.operation.header.metadata.version = Some(version);
389 self
390 }
391
392 pub fn version_type(mut self, version_type: VersionType) -> Self {
394 self.operation.header.metadata.version_type = Some(version_type);
395 self
396 }
397}
398
399impl<B> From<BulkIndexOperation<B>> for BulkOperation<B> {
400 fn from(b: BulkIndexOperation<B>) -> Self {
401 b.operation
402 }
403}
404
405pub struct BulkDeleteOperation<B> {
411 operation: BulkOperation<B>,
412}
413
414impl<B> BulkDeleteOperation<B> {
415 pub fn new<S>(id: S) -> Self
417 where
418 S: Into<String>,
419 {
420 Self {
421 operation: BulkOperation {
422 header: BulkHeader {
423 action: BulkAction::Delete,
424 metadata: BulkMetadata {
425 _id: Some(id.into()),
426 ..Default::default()
427 },
428 },
429 source: Option::<B>::None,
430 },
431 }
432 }
433
434 pub fn index<S>(mut self, index: S) -> Self
441 where
442 S: Into<String>,
443 {
444 self.operation.header.metadata._index = Some(index.into());
445 self
446 }
447
448 pub fn routing<S>(mut self, routing: S) -> Self
450 where
451 S: Into<String>,
452 {
453 self.operation.header.metadata.routing = Some(routing.into());
454 self
455 }
456
457 pub fn if_seq_no(mut self, seq_no: i64) -> Self {
459 self.operation.header.metadata.if_seq_no = Some(seq_no);
460 self
461 }
462
463 pub fn if_primary_term(mut self, primary_term: i64) -> Self {
466 self.operation.header.metadata.if_primary_term = Some(primary_term);
467 self
468 }
469
470 pub fn version(mut self, version: i64) -> Self {
472 self.operation.header.metadata.version = Some(version);
473 self
474 }
475
476 pub fn version_type(mut self, version_type: VersionType) -> Self {
478 self.operation.header.metadata.version_type = Some(version_type);
479 self
480 }
481}
482
483impl<B> From<BulkDeleteOperation<B>> for BulkOperation<B> {
484 fn from(b: BulkDeleteOperation<B>) -> Self {
485 b.operation
486 }
487}
488
489pub struct BulkUpdateOperation<B> {
491 operation: BulkOperation<B>,
492}
493
494impl<B> BulkUpdateOperation<B>
495where
496 B: serde::Serialize,
497{
498 pub fn new<S>(id: S, source: B) -> Self
500 where
501 S: Into<String>,
502 {
503 Self {
504 operation: BulkOperation {
505 header: BulkHeader {
506 action: BulkAction::Update,
507 metadata: BulkMetadata {
508 _id: Some(id.into()),
509 ..Default::default()
510 },
511 },
512 source: Some(source),
513 },
514 }
515 }
516
517 pub fn index<S>(mut self, index: S) -> Self
524 where
525 S: Into<String>,
526 {
527 self.operation.header.metadata._index = Some(index.into());
528 self
529 }
530
531 pub fn routing<S>(mut self, routing: S) -> Self
533 where
534 S: Into<String>,
535 {
536 self.operation.header.metadata.routing = Some(routing.into());
537 self
538 }
539
540 pub fn if_seq_no(mut self, seq_no: i64) -> Self {
542 self.operation.header.metadata.if_seq_no = Some(seq_no);
543 self
544 }
545
546 pub fn if_primary_term(mut self, primary_term: i64) -> Self {
549 self.operation.header.metadata.if_primary_term = Some(primary_term);
550 self
551 }
552
553 pub fn version(mut self, version: i64) -> Self {
555 self.operation.header.metadata.version = Some(version);
556 self
557 }
558
559 pub fn version_type(mut self, version_type: VersionType) -> Self {
561 self.operation.header.metadata.version_type = Some(version_type);
562 self
563 }
564
565 pub fn retry_on_conflict(mut self, retry_on_conflict: i32) -> Self {
567 self.operation.header.metadata.retry_on_conflict = Some(retry_on_conflict);
568 self
569 }
570
571 pub fn source<S>(mut self, source: S) -> Self
575 where
576 S: Into<SourceFilter>,
577 {
578 self.operation.header.metadata._source = Some(source.into());
579 self
580 }
581}
582
583impl<B> From<BulkUpdateOperation<B>> for BulkOperation<B> {
584 fn from(b: BulkUpdateOperation<B>) -> Self {
585 b.operation
586 }
587}
588
589pub struct BulkOperations {
649 buf: BytesMut,
650}
651
652impl BulkOperations {
653 pub fn new() -> Self {
655 Self {
656 buf: BytesMut::new(),
657 }
658 }
659
660 pub fn with_bytes(buf: BytesMut) -> Self {
663 Self { buf }
664 }
665
666 pub fn push<O, B>(&mut self, op: O) -> Result<(), Error>
670 where
671 O: Into<BulkOperation<B>>,
672 B: Serialize,
673 {
674 op.into().write(&mut self.buf)
675 }
676}
677
678impl Default for BulkOperations {
679 fn default() -> Self {
680 Self::new()
681 }
682}
683
684impl Body for BulkOperations {
685 fn bytes(&self) -> Option<Bytes> {
686 Some(self.buf.clone().freeze())
687 }
688
689 fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
690 self.buf.write(bytes)
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use crate::{
697 http::request::{Body, NdBody},
698 params::VersionType,
699 BulkOperation, BulkOperations,
700 };
701 use bytes::{BufMut, BytesMut};
702 use serde::Serialize;
703 use serde_json::{json, Value};
704 use std::{cmp::Ordering, str};
705
706 pub fn compare(a: &[u8], b: &[u8]) -> Ordering {
707 a.iter()
708 .zip(b)
709 .map(|(x, y)| x.cmp(y))
710 .find(|&ord| ord != Ordering::Equal)
711 .unwrap_or_else(|| a.len().cmp(&b.len()))
712 }
713
714 #[test]
715 fn serialize_bulk_operations_with_same_type_writes_to_bytes() -> anyhow::Result<()> {
716 let mut bytes = BytesMut::new();
717 let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
718
719 ops.push(
720 BulkOperation::index(json!({ "foo": "index" }))
721 .id("1")
722 .pipeline("pipeline")
723 .routing("routing")
724 .if_seq_no(1)
725 .if_primary_term(2)
726 .version(3)
727 .version_type(VersionType::Internal)
728 .into(),
729 );
730 ops.push(
731 BulkOperation::create("2", json!({ "bar": "create" }))
732 .pipeline("pipeline")
733 .routing("routing")
734 .index("create_index")
735 .into(),
736 );
737 ops.push(
738 BulkOperation::update("3", json!({ "baz": "update_1" }))
739 .source(false)
740 .into(),
741 );
742 ops.push(
743 BulkOperation::update("4", json!({ "baz": "update_2" }))
744 .source("baz")
745 .into(),
746 );
747 ops.push(
748 BulkOperation::update("5", json!({ "baz": "update_3" }))
749 .source(vec!["baz"])
750 .into(),
751 );
752 ops.push(
753 BulkOperation::update("6", json!({ "baz": "update_4" }))
754 .source((vec!["baz"], vec!["bar"]))
755 .into(),
756 );
757 ops.push(BulkOperation::delete("7").into());
758
759 let body = NdBody(ops);
760 body.write(&mut bytes)?;
761
762 let mut expected = BytesMut::new();
763 expected.put_slice(b"{\"index\":{\"_id\":\"1\",\"pipeline\":\"pipeline\",\"if_seq_no\":1,\"if_primary_term\":2,\"routing\":\"routing\",\"version\":3,\"version_type\":\"internal\"}}\n");
764 expected.put_slice(b"{\"foo\":\"index\"}\n");
765 expected.put_slice(b"{\"create\":{\"_index\":\"create_index\",\"_id\":\"2\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
766 expected.put_slice(b"{\"bar\":\"create\"}\n");
767 expected.put_slice(b"{\"update\":{\"_id\":\"3\",\"_source\":false}}\n");
768 expected.put_slice(b"{\"baz\":\"update_1\"}\n");
769 expected.put_slice(b"{\"update\":{\"_id\":\"4\",\"_source\":\"baz\"}}\n");
770 expected.put_slice(b"{\"baz\":\"update_2\"}\n");
771 expected.put_slice(b"{\"update\":{\"_id\":\"5\",\"_source\":[\"baz\"]}}\n");
772 expected.put_slice(b"{\"baz\":\"update_3\"}\n");
773 expected.put_slice(b"{\"update\":{\"_id\":\"6\",\"_source\":{\"includes\":[\"baz\"],\"excludes\":[\"bar\"]}}}\n");
774 expected.put_slice(b"{\"baz\":\"update_4\"}\n");
775 expected.put_slice(b"{\"delete\":{\"_id\":\"7\"}}\n");
776
777 assert_eq!(
778 compare(&expected[..], &bytes[..]),
779 Ordering::Equal,
780 "expected {} but found {}",
781 str::from_utf8(&expected[..]).unwrap(),
782 str::from_utf8(&bytes[..]).unwrap()
783 );
784 Ok(())
785 }
786
787 #[test]
788 fn serialize_bulk_operations_with_different_types_writes_to_bytes() -> anyhow::Result<()> {
789 #[derive(Serialize)]
790 struct IndexDoc<'a> {
791 foo: &'a str,
792 }
793 #[derive(Serialize)]
794 struct CreateDoc<'a> {
795 bar: &'a str,
796 }
797 #[derive(Serialize)]
798 struct UpdateDoc<'a> {
799 baz: &'a str,
800 }
801
802 let mut bytes = BytesMut::new();
803 let mut ops = BulkOperations::new();
804
805 ops.push(
806 BulkOperation::index(IndexDoc { foo: "index" })
807 .id("1")
808 .pipeline("pipeline")
809 .index("index_doc")
810 .routing("routing"),
811 )?;
812 ops.push(BulkOperation::create("2", CreateDoc { bar: "create" }))?;
813 ops.push(BulkOperation::create_without_id(CreateDoc {
814 bar: "create",
815 }))?;
816 ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
817 ops.push(BulkOperation::<()>::delete("4"))?;
818
819 let body = NdBody(vec![ops]);
820 body.write(&mut bytes)?;
821
822 let mut expected = BytesMut::new();
823 expected.put_slice(b"{\"index\":{\"_index\":\"index_doc\",\"_id\":\"1\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
824 expected.put_slice(b"{\"foo\":\"index\"}\n");
825 expected.put_slice(b"{\"create\":{\"_id\":\"2\"}}\n");
826 expected.put_slice(b"{\"bar\":\"create\"}\n");
827 expected.put_slice(b"{\"create\":{}}\n");
828 expected.put_slice(b"{\"bar\":\"create\"}\n");
829 expected.put_slice(b"{\"update\":{\"_id\":\"3\"}}\n");
830 expected.put_slice(b"{\"baz\":\"update\"}\n");
831 expected.put_slice(b"{\"delete\":{\"_id\":\"4\"}}\n");
832
833 assert_eq!(
834 compare(&expected[..], &bytes[..]),
835 Ordering::Equal,
836 "expected {} but found {}",
837 str::from_utf8(&expected[..]).unwrap(),
838 str::from_utf8(&bytes[..]).unwrap()
839 );
840 Ok(())
841 }
842}