opensearch/root/
bulk.rs

1/*
2 * Licensed to Elasticsearch B.V. under one or more contributor
3 * license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright
5 * ownership. Elasticsearch B.V. licenses this file to you under
6 * the Apache License, Version 2.0 (the "License"); you may
7 * not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *	http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/*
21 * SPDX-License-Identifier: Apache-2.0
22 *
23 * The OpenSearch Contributors require contributions made to
24 * this file be licensed under the Apache-2.0 license or a
25 * compatible open source license.
26 *
27 * Modifications Copyright OpenSearch Contributors. See
28 * GitHub history for details.
29 */
30
31use crate::{
32    http::request::Body,
33    params::{SourceFilter, VersionType},
34    Error,
35};
36use bytes::{BufMut, Bytes, BytesMut};
37use serde::{
38    ser::{SerializeMap, Serializer},
39    Deserialize, Serialize,
40};
41
42/// Bulk operation action
43#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
44enum BulkAction {
45    /// Index a document
46    #[serde(rename = "index")]
47    Index,
48    /// Create a new document
49    #[serde(rename = "create")]
50    Create,
51    /// Update an existing document
52    #[serde(rename = "update")]
53    Update,
54    /// Delete an existing document
55    #[serde(rename = "delete")]
56    Delete,
57}
58
59/// Bulk operation metadata
60///
61/// the specific bulk action metadata such as the id of the source document, index, etc.
62#[serde_with::skip_serializing_none]
63#[derive(Serialize, Default)]
64struct BulkMetadata {
65    _index: Option<String>,
66    _id: Option<String>,
67    pipeline: Option<String>,
68    if_seq_no: Option<i64>,
69    if_primary_term: Option<i64>,
70    routing: Option<String>,
71    retry_on_conflict: Option<i32>,
72    _source: Option<SourceFilter>,
73    version: Option<i64>,
74    version_type: Option<VersionType>,
75}
76
77/// Bulk operation header
78///
79/// The header contains the bulk action and the specific action metadata
80/// such as the id of the source document, index, etc.
81struct BulkHeader {
82    action: BulkAction,
83    metadata: BulkMetadata,
84}
85
86impl Serialize for BulkHeader {
87    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
88    where
89        S: Serializer,
90    {
91        let mut map = serializer.serialize_map(Some(1))?;
92        let action = match self.action {
93            BulkAction::Create => "create",
94            BulkAction::Delete => "delete",
95            BulkAction::Index => "index",
96            BulkAction::Update => "update",
97        };
98        map.serialize_entry(action, &self.metadata)?;
99        map.end()
100    }
101}
102
103/// A bulk operation consists of a header that indicates the bulk action and the related metadata
104/// for the action, and an optional source document.
105///
106/// A collection of bulk operations can be sent to the [Bulk API](struct.Bulk.html) in the body of the API call.
107///
108/// For serializing a collection of bulk operations that model the source document of each bulk operation
109/// using different structs, take a look at [BulkOperations].
110///
111/// # Example
112///
113/// Using [serde_json]'s `json!` macro to constuct [serde_json::Value] from JSON literals, for
114/// the source document of each bulk operation
115///
116/// ```rust,no_run
117/// # use opensearch::{
118/// #     BulkOperation,
119/// #     BulkParts,
120/// #     Error, OpenSearch,
121/// # };
122/// # use url::Url;
123/// # use serde_json::{json, Value};
124/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
125/// # let client = OpenSearch::default();
126/// let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
127/// ops.push(BulkOperation::index(json!({
128///         "user": "kimchy",
129///         "post_date": "2009-11-15T00:00:00Z",
130///         "message": "Trying out OpenSearch, so far so good?"
131///     }))
132///     .id("1")
133///     .pipeline("process_tweet")
134///     .into()
135/// );
136/// ops.push(BulkOperation::create("2", json!({
137///         "user": "forloop",
138///         "post_date": "2020-01-08T00:00:00Z",
139///         "message": "Indexing with the rust client, yeah!"
140///     }))
141///     .pipeline("process_tweet")
142///     .into()
143/// );
144/// ops.push(BulkOperation::update("3", json!({
145///         "doc": {
146///             "message": "Tweets are _meant_ to be immutable!"
147///         },
148///         "doc_as_upsert": true
149///     }))
150///     .into()
151/// );
152/// ops.push(BulkOperation::delete("4")
153///     .index("old_tweets")
154///     .into()
155/// );
156///
157/// let bulk_response = client.bulk(BulkParts::Index("tweets"))
158///     .body(ops)
159///     .send()
160///     .await?;
161///
162/// # Ok(())
163/// # }
164/// ```
165pub struct BulkOperation<B> {
166    header: BulkHeader,
167    source: Option<B>,
168}
169
170impl<B> BulkOperation<B>
171where
172    B: Serialize,
173{
174    /// Creates a new instance of a [bulk create operation](BulkCreateOperation)
175    pub fn create<S>(id: S, source: B) -> BulkCreateOperation<B>
176    where
177        S: Into<String>,
178    {
179        BulkCreateOperation::new(id, source)
180    }
181
182    /// Creates a new instance of [bulk create operation](BulkCreateOperation) without an explicit id
183    pub fn create_without_id(source: B) -> BulkCreateOperation<B> {
184        BulkCreateOperation::new_without_id(source)
185    }
186
187    /// Creates a new instance of a [bulk index operation](BulkIndexOperation)
188    pub fn index(source: B) -> BulkIndexOperation<B> {
189        BulkIndexOperation::new(source)
190    }
191
192    /// Creates a new instance of a [bulk delete operation](BulkDeleteOperation)
193    pub fn delete<S>(id: S) -> BulkDeleteOperation<B>
194    where
195        S: Into<String>,
196    {
197        BulkDeleteOperation::new(id)
198    }
199
200    /// Creates a new instance of a [bulk update operation](BulkUpdateOperation)
201    pub fn update<S>(id: S, source: B) -> BulkUpdateOperation<B>
202    where
203        S: Into<String>,
204    {
205        BulkUpdateOperation::new(id, source)
206    }
207}
208
209impl<B> Body for BulkOperation<B>
210where
211    B: Serialize,
212{
213    fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
214        let writer = bytes.writer();
215        serde_json::to_writer(writer, &self.header)?;
216        bytes.put_u8(b'\n');
217
218        if let Some(source) = &self.source {
219            let writer = bytes.writer();
220            serde_json::to_writer(writer, source)?;
221            bytes.put_u8(b'\n');
222        }
223
224        Ok(())
225    }
226}
227
228/// Bulk create operation
229pub struct BulkCreateOperation<B> {
230    operation: BulkOperation<B>,
231}
232
233impl<B> BulkCreateOperation<B> {
234    /// Creates a new instance of [BulkCreateOperation]
235    pub fn new<S>(id: S, source: B) -> Self
236    where
237        S: Into<String>,
238    {
239        Self {
240            operation: BulkOperation {
241                header: BulkHeader {
242                    action: BulkAction::Create,
243                    metadata: BulkMetadata {
244                        _id: Some(id.into()),
245                        ..Default::default()
246                    },
247                },
248                source: Some(source),
249            },
250        }
251    }
252
253    /// Creates a new instance of [BulkCreateOperation] without an explicit id
254    pub fn new_without_id(source: B) -> Self {
255        Self {
256            operation: BulkOperation {
257                header: BulkHeader {
258                    action: BulkAction::Create,
259                    metadata: BulkMetadata {
260                        _id: None,
261                        ..Default::default()
262                    },
263                },
264                source: Some(source),
265            },
266        }
267    }
268
269    /// Specify the name of the index to perform the bulk update operation against.
270    ///
271    /// Each bulk operation can specify an index to operate against. If all bulk operations
272    /// in one Bulk API call will operate against the same index, specify
273    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
274    /// and omit specifying the index on each bulk operation.
275    pub fn index<S>(mut self, index: S) -> Self
276    where
277        S: Into<String>,
278    {
279        self.operation.header.metadata._index = Some(index.into());
280        self
281    }
282
283    /// The ID of the pipeline to use to preprocess incoming documents
284    pub fn pipeline<S>(mut self, pipeline: S) -> Self
285    where
286        S: Into<String>,
287    {
288        self.operation.header.metadata.pipeline = Some(pipeline.into());
289        self
290    }
291
292    /// Target the specified primary shard
293    pub fn routing<S>(mut self, routing: S) -> Self
294    where
295        S: Into<String>,
296    {
297        self.operation.header.metadata.routing = Some(routing.into());
298        self
299    }
300}
301
302impl<B> From<BulkCreateOperation<B>> for BulkOperation<B> {
303    fn from(b: BulkCreateOperation<B>) -> Self {
304        b.operation
305    }
306}
307
308/// Bulk index operation
309pub struct BulkIndexOperation<B> {
310    operation: BulkOperation<B>,
311}
312
313impl<B> BulkIndexOperation<B> {
314    /// Creates a new instance of [BulkIndexOperation]
315    pub fn new(source: B) -> Self {
316        Self {
317            operation: BulkOperation {
318                header: BulkHeader {
319                    action: BulkAction::Index,
320                    metadata: BulkMetadata {
321                        ..Default::default()
322                    },
323                },
324                source: Some(source),
325            },
326        }
327    }
328
329    /// Specify the id for the document
330    ///
331    /// If an id is not specified, OpenSearch will generate an id for the document
332    /// which will be returned in the response.
333    pub fn id<S>(mut self, id: S) -> Self
334    where
335        S: Into<String>,
336    {
337        self.operation.header.metadata._id = Some(id.into());
338        self
339    }
340
341    /// Specify the name of the index to perform the bulk update operation against.
342    ///
343    /// Each bulk operation can specify an index to operate against. If all bulk operations
344    /// in one Bulk API call will operate against the same index, specify
345    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
346    /// and omit specifying the index on each bulk operation.
347    pub fn index<S>(mut self, index: S) -> Self
348    where
349        S: Into<String>,
350    {
351        self.operation.header.metadata._index = Some(index.into());
352        self
353    }
354
355    /// The ID of the pipeline to use to preprocess incoming documents
356    pub fn pipeline<S>(mut self, pipeline: S) -> Self
357    where
358        S: Into<String>,
359    {
360        self.operation.header.metadata.pipeline = Some(pipeline.into());
361        self
362    }
363
364    /// Target the specified primary shard
365    pub fn routing<S>(mut self, routing: S) -> Self
366    where
367        S: Into<String>,
368    {
369        self.operation.header.metadata.routing = Some(routing.into());
370        self
371    }
372
373    /// specify a sequence number to use for optimistic concurrency control
374    pub fn if_seq_no(mut self, seq_no: i64) -> Self {
375        self.operation.header.metadata.if_seq_no = Some(seq_no);
376        self
377    }
378
379    // TODO? Should seq_no and primary_term be set together with one function call?
380    /// specify a primary term to use for optimistic concurrency control
381    pub fn if_primary_term(mut self, primary_term: i64) -> Self {
382        self.operation.header.metadata.if_primary_term = Some(primary_term);
383        self
384    }
385
386    /// specify a version number to use for optimistic concurrency control
387    pub fn version(mut self, version: i64) -> Self {
388        self.operation.header.metadata.version = Some(version);
389        self
390    }
391
392    /// The type of versioning used when a version is specified
393    pub fn version_type(mut self, version_type: VersionType) -> Self {
394        self.operation.header.metadata.version_type = Some(version_type);
395        self
396    }
397}
398
399impl<B> From<BulkIndexOperation<B>> for BulkOperation<B> {
400    fn from(b: BulkIndexOperation<B>) -> Self {
401        b.operation
402    }
403}
404
405/// Bulk delete operation
406///
407/// The bulk delete operation is generic over `B` to allow delete operations to be specified
408/// in a collection of operations over `B`, even though the source of any delete operation will
409/// always be `None`
410pub struct BulkDeleteOperation<B> {
411    operation: BulkOperation<B>,
412}
413
414impl<B> BulkDeleteOperation<B> {
415    /// Creates a new instance of [BulkDeleteOperation]
416    pub fn new<S>(id: S) -> Self
417    where
418        S: Into<String>,
419    {
420        Self {
421            operation: BulkOperation {
422                header: BulkHeader {
423                    action: BulkAction::Delete,
424                    metadata: BulkMetadata {
425                        _id: Some(id.into()),
426                        ..Default::default()
427                    },
428                },
429                source: Option::<B>::None,
430            },
431        }
432    }
433
434    /// Specify the name of the index to perform the bulk update operation against.
435    ///
436    /// Each bulk operation can specify an index to operate against. If all bulk operations
437    /// in one Bulk API call will operate against the same index, specify
438    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
439    /// and omit specifying the index on each bulk operation.
440    pub fn index<S>(mut self, index: S) -> Self
441    where
442        S: Into<String>,
443    {
444        self.operation.header.metadata._index = Some(index.into());
445        self
446    }
447
448    /// Target the specified primary shard
449    pub fn routing<S>(mut self, routing: S) -> Self
450    where
451        S: Into<String>,
452    {
453        self.operation.header.metadata.routing = Some(routing.into());
454        self
455    }
456
457    /// Specify a sequence number to use for optimistic concurrency control
458    pub fn if_seq_no(mut self, seq_no: i64) -> Self {
459        self.operation.header.metadata.if_seq_no = Some(seq_no);
460        self
461    }
462
463    // TODO? Should seq_no and primary_term be set together with one function call?
464    /// Specify a primary term to use for optimistic concurrency control
465    pub fn if_primary_term(mut self, primary_term: i64) -> Self {
466        self.operation.header.metadata.if_primary_term = Some(primary_term);
467        self
468    }
469
470    /// Specify a version number to use for optimistic concurrency control
471    pub fn version(mut self, version: i64) -> Self {
472        self.operation.header.metadata.version = Some(version);
473        self
474    }
475
476    /// The type of versioning used when a version is specified
477    pub fn version_type(mut self, version_type: VersionType) -> Self {
478        self.operation.header.metadata.version_type = Some(version_type);
479        self
480    }
481}
482
483impl<B> From<BulkDeleteOperation<B>> for BulkOperation<B> {
484    fn from(b: BulkDeleteOperation<B>) -> Self {
485        b.operation
486    }
487}
488
489/// Bulk update operation
490pub struct BulkUpdateOperation<B> {
491    operation: BulkOperation<B>,
492}
493
494impl<B> BulkUpdateOperation<B>
495where
496    B: serde::Serialize,
497{
498    /// Creates a new instance of [BulkUpdateOperation]
499    pub fn new<S>(id: S, source: B) -> Self
500    where
501        S: Into<String>,
502    {
503        Self {
504            operation: BulkOperation {
505                header: BulkHeader {
506                    action: BulkAction::Update,
507                    metadata: BulkMetadata {
508                        _id: Some(id.into()),
509                        ..Default::default()
510                    },
511                },
512                source: Some(source),
513            },
514        }
515    }
516
517    /// specify the name of the index to perform the bulk update operation against.
518    ///
519    /// Each bulk operation can specify an index to operate against. If all bulk operations
520    /// in one Bulk API call will operate against the same index, specify
521    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
522    /// and omit specifying the index on each bulk operation.
523    pub fn index<S>(mut self, index: S) -> Self
524    where
525        S: Into<String>,
526    {
527        self.operation.header.metadata._index = Some(index.into());
528        self
529    }
530
531    /// Target the specified primary shard
532    pub fn routing<S>(mut self, routing: S) -> Self
533    where
534        S: Into<String>,
535    {
536        self.operation.header.metadata.routing = Some(routing.into());
537        self
538    }
539
540    /// specify a sequence number to use for optimistic concurrency control
541    pub fn if_seq_no(mut self, seq_no: i64) -> Self {
542        self.operation.header.metadata.if_seq_no = Some(seq_no);
543        self
544    }
545
546    // TODO? Should seq_no and primary_term be set together with one function call?
547    /// specify a primary term to use for optimistic concurrency control
548    pub fn if_primary_term(mut self, primary_term: i64) -> Self {
549        self.operation.header.metadata.if_primary_term = Some(primary_term);
550        self
551    }
552
553    /// specify a version number to use for optimistic concurrency control
554    pub fn version(mut self, version: i64) -> Self {
555        self.operation.header.metadata.version = Some(version);
556        self
557    }
558
559    /// The type of versioning used when a version is specified
560    pub fn version_type(mut self, version_type: VersionType) -> Self {
561        self.operation.header.metadata.version_type = Some(version_type);
562        self
563    }
564
565    /// specify how many times an update should be retried in the case of a version conflict
566    pub fn retry_on_conflict(mut self, retry_on_conflict: i32) -> Self {
567        self.operation.header.metadata.retry_on_conflict = Some(retry_on_conflict);
568        self
569    }
570
571    /// specify how the `_source` field is returned for the update operation.
572    ///
573    /// This can also be specified as part of the update action source payload instead.
574    pub fn source<S>(mut self, source: S) -> Self
575    where
576        S: Into<SourceFilter>,
577    {
578        self.operation.header.metadata._source = Some(source.into());
579        self
580    }
581}
582
583impl<B> From<BulkUpdateOperation<B>> for BulkOperation<B> {
584    fn from(b: BulkUpdateOperation<B>) -> Self {
585        b.operation
586    }
587}
588
589/// A collection of bulk operations.
590///
591/// A collection of bulk operations can perform operations against multiple different indices,
592/// specifying a different source document for each. When modelling source documents with
593/// different structs, it becomes difficult to construct a collection of bulk operations with such
594/// a setup. [BulkOperations] alleviates this difficulty by serializing bulk operations ahead of
595/// time of the bulk API call, into an internal byte buffer, using the buffered bytes as the body of
596/// the bulk API call.
597///
598/// # Example
599///
600/// Using [BulkOperations] to construct a collection of bulk operations that use different
601/// structs to model source documents
602///
603/// ```rust,no_run
604/// # use opensearch::{
605/// #     BulkOperation,
606/// #     BulkOperations,
607/// #     BulkParts,
608/// #     Error, OpenSearch,
609/// # };
610/// # use serde::Serialize;
611/// # use serde_json::{json, Value};
612/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
613/// # let client = OpenSearch::default();
614/// #[derive(Serialize)]
615/// struct IndexDoc<'a> {
616///     foo: &'a str,
617/// }
618///
619/// #[derive(Serialize)]
620/// struct CreateDoc<'a> {
621///     bar: &'a str,
622/// }
623///
624/// #[derive(Serialize)]
625/// struct UpdateDoc<'a> {
626///     baz: &'a str,
627/// }
628///
629/// let mut ops = BulkOperations::new();
630/// ops.push(BulkOperation::index(IndexDoc { foo: "index" })
631///     .id("1")
632///     .pipeline("pipeline")
633///     .index("index_doc")
634///     .routing("routing")
635/// )?;
636/// ops.push(BulkOperation::create("2", CreateDoc { bar: "create" }))?;
637/// ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
638/// ops.push(BulkOperation::<()>::delete("4"))?;
639///
640/// let bulk_response = client.bulk(BulkParts::Index("tweets"))
641///     .body(vec![ops])
642///     .send()
643///     .await?;
644///
645/// # Ok(())
646/// # }
647/// ```
648pub struct BulkOperations {
649    buf: BytesMut,
650}
651
652impl BulkOperations {
653    /// Initializes a new instance of [BulkOperations]
654    pub fn new() -> Self {
655        Self {
656            buf: BytesMut::new(),
657        }
658    }
659
660    /// Initializes a new instance of [BulkOperations], using the passed
661    /// [bytes::BytesMut] as the buffer to write operations to
662    pub fn with_bytes(buf: BytesMut) -> Self {
663        Self { buf }
664    }
665
666    /// Pushes a bulk operation into the collection of bulk operations.
667    ///
668    /// The operation is serialized and written to the underlying byte buffer.
669    pub fn push<O, B>(&mut self, op: O) -> Result<(), Error>
670    where
671        O: Into<BulkOperation<B>>,
672        B: Serialize,
673    {
674        op.into().write(&mut self.buf)
675    }
676}
677
678impl Default for BulkOperations {
679    fn default() -> Self {
680        Self::new()
681    }
682}
683
684impl Body for BulkOperations {
685    fn bytes(&self) -> Option<Bytes> {
686        Some(self.buf.clone().freeze())
687    }
688
689    fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
690        self.buf.write(bytes)
691    }
692}
693
694#[cfg(test)]
695mod tests {
696    use crate::{
697        http::request::{Body, NdBody},
698        params::VersionType,
699        BulkOperation, BulkOperations,
700    };
701    use bytes::{BufMut, BytesMut};
702    use serde::Serialize;
703    use serde_json::{json, Value};
704    use std::{cmp::Ordering, str};
705
706    pub fn compare(a: &[u8], b: &[u8]) -> Ordering {
707        a.iter()
708            .zip(b)
709            .map(|(x, y)| x.cmp(y))
710            .find(|&ord| ord != Ordering::Equal)
711            .unwrap_or_else(|| a.len().cmp(&b.len()))
712    }
713
714    #[test]
715    fn serialize_bulk_operations_with_same_type_writes_to_bytes() -> anyhow::Result<()> {
716        let mut bytes = BytesMut::new();
717        let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
718
719        ops.push(
720            BulkOperation::index(json!({ "foo": "index" }))
721                .id("1")
722                .pipeline("pipeline")
723                .routing("routing")
724                .if_seq_no(1)
725                .if_primary_term(2)
726                .version(3)
727                .version_type(VersionType::Internal)
728                .into(),
729        );
730        ops.push(
731            BulkOperation::create("2", json!({ "bar": "create" }))
732                .pipeline("pipeline")
733                .routing("routing")
734                .index("create_index")
735                .into(),
736        );
737        ops.push(
738            BulkOperation::update("3", json!({ "baz": "update_1" }))
739                .source(false)
740                .into(),
741        );
742        ops.push(
743            BulkOperation::update("4", json!({ "baz": "update_2" }))
744                .source("baz")
745                .into(),
746        );
747        ops.push(
748            BulkOperation::update("5", json!({ "baz": "update_3" }))
749                .source(vec!["baz"])
750                .into(),
751        );
752        ops.push(
753            BulkOperation::update("6", json!({ "baz": "update_4" }))
754                .source((vec!["baz"], vec!["bar"]))
755                .into(),
756        );
757        ops.push(BulkOperation::delete("7").into());
758
759        let body = NdBody(ops);
760        body.write(&mut bytes)?;
761
762        let mut expected = BytesMut::new();
763        expected.put_slice(b"{\"index\":{\"_id\":\"1\",\"pipeline\":\"pipeline\",\"if_seq_no\":1,\"if_primary_term\":2,\"routing\":\"routing\",\"version\":3,\"version_type\":\"internal\"}}\n");
764        expected.put_slice(b"{\"foo\":\"index\"}\n");
765        expected.put_slice(b"{\"create\":{\"_index\":\"create_index\",\"_id\":\"2\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
766        expected.put_slice(b"{\"bar\":\"create\"}\n");
767        expected.put_slice(b"{\"update\":{\"_id\":\"3\",\"_source\":false}}\n");
768        expected.put_slice(b"{\"baz\":\"update_1\"}\n");
769        expected.put_slice(b"{\"update\":{\"_id\":\"4\",\"_source\":\"baz\"}}\n");
770        expected.put_slice(b"{\"baz\":\"update_2\"}\n");
771        expected.put_slice(b"{\"update\":{\"_id\":\"5\",\"_source\":[\"baz\"]}}\n");
772        expected.put_slice(b"{\"baz\":\"update_3\"}\n");
773        expected.put_slice(b"{\"update\":{\"_id\":\"6\",\"_source\":{\"includes\":[\"baz\"],\"excludes\":[\"bar\"]}}}\n");
774        expected.put_slice(b"{\"baz\":\"update_4\"}\n");
775        expected.put_slice(b"{\"delete\":{\"_id\":\"7\"}}\n");
776
777        assert_eq!(
778            compare(&expected[..], &bytes[..]),
779            Ordering::Equal,
780            "expected {} but found {}",
781            str::from_utf8(&expected[..]).unwrap(),
782            str::from_utf8(&bytes[..]).unwrap()
783        );
784        Ok(())
785    }
786
787    #[test]
788    fn serialize_bulk_operations_with_different_types_writes_to_bytes() -> anyhow::Result<()> {
789        #[derive(Serialize)]
790        struct IndexDoc<'a> {
791            foo: &'a str,
792        }
793        #[derive(Serialize)]
794        struct CreateDoc<'a> {
795            bar: &'a str,
796        }
797        #[derive(Serialize)]
798        struct UpdateDoc<'a> {
799            baz: &'a str,
800        }
801
802        let mut bytes = BytesMut::new();
803        let mut ops = BulkOperations::new();
804
805        ops.push(
806            BulkOperation::index(IndexDoc { foo: "index" })
807                .id("1")
808                .pipeline("pipeline")
809                .index("index_doc")
810                .routing("routing"),
811        )?;
812        ops.push(BulkOperation::create("2", CreateDoc { bar: "create" }))?;
813        ops.push(BulkOperation::create_without_id(CreateDoc {
814            bar: "create",
815        }))?;
816        ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
817        ops.push(BulkOperation::<()>::delete("4"))?;
818
819        let body = NdBody(vec![ops]);
820        body.write(&mut bytes)?;
821
822        let mut expected = BytesMut::new();
823        expected.put_slice(b"{\"index\":{\"_index\":\"index_doc\",\"_id\":\"1\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
824        expected.put_slice(b"{\"foo\":\"index\"}\n");
825        expected.put_slice(b"{\"create\":{\"_id\":\"2\"}}\n");
826        expected.put_slice(b"{\"bar\":\"create\"}\n");
827        expected.put_slice(b"{\"create\":{}}\n");
828        expected.put_slice(b"{\"bar\":\"create\"}\n");
829        expected.put_slice(b"{\"update\":{\"_id\":\"3\"}}\n");
830        expected.put_slice(b"{\"baz\":\"update\"}\n");
831        expected.put_slice(b"{\"delete\":{\"_id\":\"4\"}}\n");
832
833        assert_eq!(
834            compare(&expected[..], &bytes[..]),
835            Ordering::Equal,
836            "expected {} but found {}",
837            str::from_utf8(&expected[..]).unwrap(),
838            str::from_utf8(&bytes[..]).unwrap()
839        );
840        Ok(())
841    }
842}