aditjind_crate/root/
bulk.rs

1/*
2 * Licensed to Elasticsearch B.V. under one or more contributor
3 * license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright
5 * ownership. Elasticsearch B.V. licenses this file to you under
6 * the Apache License, Version 2.0 (the "License"); you may
7 * not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *	http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied.  See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20/*
21 * SPDX-License-Identifier: Apache-2.0
22 *
23 * The OpenSearch Contributors require contributions made to
24 * this file be licensed under the Apache-2.0 license or a
25 * compatible open source license.
26 *
27 * Modifications Copyright OpenSearch Contributors. See
28 * GitHub history for details.
29 */
30
31use crate::{
32    http::request::Body,
33    params::{SourceFilter, VersionType},
34    Error,
35};
36use bytes::{BufMut, Bytes, BytesMut};
37use serde::{
38    ser::{SerializeMap, Serializer},
39    Deserialize, Serialize,
40};
41
42/// Bulk operation action
43#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
44enum BulkAction {
45    /// Index a document
46    #[serde(rename = "index")]
47    Index,
48    /// Create a new document
49    #[serde(rename = "create")]
50    Create,
51    /// Update an existing document
52    #[serde(rename = "update")]
53    Update,
54    /// Delete an existing document
55    #[serde(rename = "delete")]
56    Delete,
57}
58
59/// Bulk operation metadata
60///
61/// the specific bulk action metadata such as the id of the source document, index, etc.
62#[serde_with::skip_serializing_none]
63#[derive(Serialize, Default)]
64struct BulkMetadata {
65    _index: Option<String>,
66    // TODO: intentionally omit type for now, as it's going away.
67    //_type: Option<String>,
68    _id: Option<String>,
69    pipeline: Option<String>,
70    if_seq_no: Option<i64>,
71    if_primary_term: Option<i64>,
72    routing: Option<String>,
73    retry_on_conflict: Option<i32>,
74    _source: Option<SourceFilter>,
75    version: Option<i64>,
76    version_type: Option<VersionType>,
77}
78
79/// Bulk operation header
80///
81/// The header contains the bulk action and the specific action metadata
82/// such as the id of the source document, index, etc.
83struct BulkHeader {
84    action: BulkAction,
85    metadata: BulkMetadata,
86}
87
88impl Serialize for BulkHeader {
89    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
90    where
91        S: Serializer,
92    {
93        let mut map = serializer.serialize_map(Some(1))?;
94        let action = match self.action {
95            BulkAction::Create => "create",
96            BulkAction::Delete => "delete",
97            BulkAction::Index => "index",
98            BulkAction::Update => "update",
99        };
100        map.serialize_entry(action, &self.metadata)?;
101        map.end()
102    }
103}
104
105/// A bulk operation consists of a header that indicates the bulk action and the related metadata
106/// for the action, and an optional source document.
107///
108/// A collection of bulk operations can be sent to the [Bulk API](struct.Bulk.html) in the body of the API call.
109///
110/// For serializing a collection of bulk operations that model the source document of each bulk operation
111/// using different structs, take a look at [BulkOperations].
112///
113/// # Example
114///
115/// Using [serde_json]'s `json!` macro to constuct [serde_json::Value] from JSON literals, for
116/// the source document of each bulk operation
117///
118/// ```rust,no_run
119/// # use opensearch::{
120/// #     BulkOperation,
121/// #     BulkParts,
122/// #     Error, OpenSearch,
123/// # };
124/// # use url::Url;
125/// # use serde_json::{json, Value};
126/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
127/// # let client = OpenSearch::default();
128/// let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
129/// ops.push(BulkOperation::index(json!({
130///         "user": "kimchy",
131///         "post_date": "2009-11-15T00:00:00Z",
132///         "message": "Trying out Elasticsearch, so far so good?"
133///     }))
134///     .id("1")
135///     .pipeline("process_tweet")
136///     .into()
137/// );
138/// ops.push(BulkOperation::create("2", json!({
139///         "user": "forloop",
140///         "post_date": "2020-01-08T00:00:00Z",
141///         "message": "Indexing with the rust client, yeah!"
142///     }))
143///     .pipeline("process_tweet")
144///     .into()
145/// );
146/// ops.push(BulkOperation::update("3", json!({
147///         "doc": {
148///             "message": "Tweets are _meant_ to be immutable!"
149///         },
150///         "doc_as_upsert": true
151///     }))
152///     .into()
153/// );
154/// ops.push(BulkOperation::delete("4")
155///     .index("old_tweets")
156///     .into()
157/// );
158///
159/// let bulk_response = client.bulk(BulkParts::Index("tweets"))
160///     .body(ops)
161///     .send()
162///     .await?;
163///
164/// # Ok(())
165/// # }
166/// ```
167pub struct BulkOperation<B> {
168    header: BulkHeader,
169    source: Option<B>,
170}
171
172impl<B> BulkOperation<B>
173where
174    B: Serialize,
175{
176    /// Creates a new instance of a [bulk create operation](BulkCreateOperation)
177    pub fn create<S>(id: S, source: B) -> BulkCreateOperation<B>
178    where
179        S: Into<String>,
180    {
181        BulkCreateOperation::new(id, source)
182    }
183
184    /// Creates a new instance of a [bulk index operation](BulkIndexOperation)
185    pub fn index(source: B) -> BulkIndexOperation<B> {
186        BulkIndexOperation::new(source)
187    }
188
189    /// Creates a new instance of a [bulk delete operation](BulkDeleteOperation)
190    pub fn delete<S>(id: S) -> BulkDeleteOperation<B>
191    where
192        S: Into<String>,
193    {
194        BulkDeleteOperation::new(id)
195    }
196
197    /// Creates a new instance of a [bulk update operation](BulkUpdateOperation)
198    pub fn update<S>(id: S, source: B) -> BulkUpdateOperation<B>
199    where
200        S: Into<String>,
201    {
202        BulkUpdateOperation::new(id, source)
203    }
204}
205
206impl<B> Body for BulkOperation<B>
207where
208    B: Serialize,
209{
210    fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
211        let writer = bytes.writer();
212        serde_json::to_writer(writer, &self.header)?;
213        bytes.put_u8(b'\n');
214
215        if let Some(source) = &self.source {
216            let writer = bytes.writer();
217            serde_json::to_writer(writer, source)?;
218            bytes.put_u8(b'\n');
219        }
220
221        Ok(())
222    }
223}
224
225/// Bulk create operation
226pub struct BulkCreateOperation<B> {
227    operation: BulkOperation<B>,
228}
229
230impl<B> BulkCreateOperation<B> {
231    /// Creates a new instance of [BulkCreateOperation]
232    pub fn new<S>(id: S, source: B) -> Self
233    where
234        S: Into<String>,
235    {
236        Self {
237            operation: BulkOperation {
238                header: BulkHeader {
239                    action: BulkAction::Create,
240                    metadata: BulkMetadata {
241                        _id: Some(id.into()),
242                        ..Default::default()
243                    },
244                },
245                source: Some(source),
246            },
247        }
248    }
249
250    /// Specify the name of the index to perform the bulk update operation against.
251    ///
252    /// Each bulk operation can specify an index to operate against. If all bulk operations
253    /// in one Bulk API call will operate against the same index, specify
254    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
255    /// and omit specifying the index on each bulk operation.
256    pub fn index<S>(mut self, index: S) -> Self
257    where
258        S: Into<String>,
259    {
260        self.operation.header.metadata._index = Some(index.into());
261        self
262    }
263
264    /// The ID of the pipeline to use to preprocess incoming documents
265    pub fn pipeline<S>(mut self, pipeline: S) -> Self
266    where
267        S: Into<String>,
268    {
269        self.operation.header.metadata.pipeline = Some(pipeline.into());
270        self
271    }
272
273    /// Target the specified primary shard
274    pub fn routing<S>(mut self, routing: S) -> Self
275    where
276        S: Into<String>,
277    {
278        self.operation.header.metadata.routing = Some(routing.into());
279        self
280    }
281}
282
283impl<B> From<BulkCreateOperation<B>> for BulkOperation<B> {
284    fn from(b: BulkCreateOperation<B>) -> Self {
285        b.operation
286    }
287}
288
289/// Bulk index operation
290pub struct BulkIndexOperation<B> {
291    operation: BulkOperation<B>,
292}
293
294impl<B> BulkIndexOperation<B> {
295    /// Creates a new instance of [BulkIndexOperation]
296    pub fn new(source: B) -> Self {
297        Self {
298            operation: BulkOperation {
299                header: BulkHeader {
300                    action: BulkAction::Index,
301                    metadata: BulkMetadata {
302                        ..Default::default()
303                    },
304                },
305                source: Some(source),
306            },
307        }
308    }
309
310    /// Specify the id for the document
311    ///
312    /// If an id is not specified, Elasticsearch will generate an id for the document
313    /// which will be returned in the response.
314    pub fn id<S>(mut self, id: S) -> Self
315    where
316        S: Into<String>,
317    {
318        self.operation.header.metadata._id = Some(id.into());
319        self
320    }
321
322    /// Specify the name of the index to perform the bulk update operation against.
323    ///
324    /// Each bulk operation can specify an index to operate against. If all bulk operations
325    /// in one Bulk API call will operate against the same index, specify
326    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
327    /// and omit specifying the index on each bulk operation.
328    pub fn index<S>(mut self, index: S) -> Self
329    where
330        S: Into<String>,
331    {
332        self.operation.header.metadata._index = Some(index.into());
333        self
334    }
335
336    /// The ID of the pipeline to use to preprocess incoming documents
337    pub fn pipeline<S>(mut self, pipeline: S) -> Self
338    where
339        S: Into<String>,
340    {
341        self.operation.header.metadata.pipeline = Some(pipeline.into());
342        self
343    }
344
345    /// Target the specified primary shard
346    pub fn routing<S>(mut self, routing: S) -> Self
347    where
348        S: Into<String>,
349    {
350        self.operation.header.metadata.routing = Some(routing.into());
351        self
352    }
353
354    /// specify a sequence number to use for optimistic concurrency control
355    pub fn if_seq_no(mut self, seq_no: i64) -> Self {
356        self.operation.header.metadata.if_seq_no = Some(seq_no);
357        self
358    }
359
360    // TODO? Should seq_no and primary_term be set together with one function call?
361    /// specify a primary term to use for optimistic concurrency control
362    pub fn if_primary_term(mut self, primary_term: i64) -> Self {
363        self.operation.header.metadata.if_primary_term = Some(primary_term);
364        self
365    }
366
367    /// specify a version number to use for optimistic concurrency control
368    pub fn version(mut self, version: i64) -> Self {
369        self.operation.header.metadata.version = Some(version);
370        self
371    }
372
373    /// The type of versioning used when a version is specified
374    pub fn version_type(mut self, version_type: VersionType) -> Self {
375        self.operation.header.metadata.version_type = Some(version_type);
376        self
377    }
378}
379
380impl<B> From<BulkIndexOperation<B>> for BulkOperation<B> {
381    fn from(b: BulkIndexOperation<B>) -> Self {
382        b.operation
383    }
384}
385
386/// Bulk delete operation
387///
388/// The bulk delete operation is generic over `B` to allow delete operations to be specified
389/// in a collection of operations over `B`, even though the source of any delete operation will
390/// always be `None`
391pub struct BulkDeleteOperation<B> {
392    operation: BulkOperation<B>,
393}
394
395impl<B> BulkDeleteOperation<B> {
396    /// Creates a new instance of [BulkDeleteOperation]
397    pub fn new<S>(id: S) -> Self
398    where
399        S: Into<String>,
400    {
401        Self {
402            operation: BulkOperation {
403                header: BulkHeader {
404                    action: BulkAction::Delete,
405                    metadata: BulkMetadata {
406                        _id: Some(id.into()),
407                        ..Default::default()
408                    },
409                },
410                source: Option::<B>::None,
411            },
412        }
413    }
414
415    /// Specify the name of the index to perform the bulk update operation against.
416    ///
417    /// Each bulk operation can specify an index to operate against. If all bulk operations
418    /// in one Bulk API call will operate against the same index, specify
419    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
420    /// and omit specifying the index on each bulk operation.
421    pub fn index<S>(mut self, index: S) -> Self
422    where
423        S: Into<String>,
424    {
425        self.operation.header.metadata._index = Some(index.into());
426        self
427    }
428
429    /// Target the specified primary shard
430    pub fn routing<S>(mut self, routing: S) -> Self
431    where
432        S: Into<String>,
433    {
434        self.operation.header.metadata.routing = Some(routing.into());
435        self
436    }
437
438    /// Specify a sequence number to use for optimistic concurrency control
439    pub fn if_seq_no(mut self, seq_no: i64) -> Self {
440        self.operation.header.metadata.if_seq_no = Some(seq_no);
441        self
442    }
443
444    // TODO? Should seq_no and primary_term be set together with one function call?
445    /// Specify a primary term to use for optimistic concurrency control
446    pub fn if_primary_term(mut self, primary_term: i64) -> Self {
447        self.operation.header.metadata.if_primary_term = Some(primary_term);
448        self
449    }
450
451    /// Specify a version number to use for optimistic concurrency control
452    pub fn version(mut self, version: i64) -> Self {
453        self.operation.header.metadata.version = Some(version);
454        self
455    }
456
457    /// The type of versioning used when a version is specified
458    pub fn version_type(mut self, version_type: VersionType) -> Self {
459        self.operation.header.metadata.version_type = Some(version_type);
460        self
461    }
462}
463
464impl<B> From<BulkDeleteOperation<B>> for BulkOperation<B> {
465    fn from(b: BulkDeleteOperation<B>) -> Self {
466        b.operation
467    }
468}
469
470/// Bulk update operation
471pub struct BulkUpdateOperation<B> {
472    operation: BulkOperation<B>,
473}
474
475impl<B> BulkUpdateOperation<B>
476where
477    B: serde::Serialize,
478{
479    /// Creates a new instance of [BulkUpdateOperation]
480    pub fn new<S>(id: S, source: B) -> Self
481    where
482        S: Into<String>,
483    {
484        Self {
485            operation: BulkOperation {
486                header: BulkHeader {
487                    action: BulkAction::Update,
488                    metadata: BulkMetadata {
489                        _id: Some(id.into()),
490                        ..Default::default()
491                    },
492                },
493                source: Some(source),
494            },
495        }
496    }
497
498    /// specify the name of the index to perform the bulk update operation against.
499    ///
500    /// Each bulk operation can specify an index to operate against. If all bulk operations
501    /// in one Bulk API call will operate against the same index, specify
502    /// the index on [Bulk](struct.Bulk.html) using [BulkParts::Index](enum.BulkParts.html),
503    /// and omit specifying the index on each bulk operation.
504    pub fn index<S>(mut self, index: S) -> Self
505    where
506        S: Into<String>,
507    {
508        self.operation.header.metadata._index = Some(index.into());
509        self
510    }
511
512    /// Target the specified primary shard
513    pub fn routing<S>(mut self, routing: S) -> Self
514    where
515        S: Into<String>,
516    {
517        self.operation.header.metadata.routing = Some(routing.into());
518        self
519    }
520
521    /// specify a sequence number to use for optimistic concurrency control
522    pub fn if_seq_no(mut self, seq_no: i64) -> Self {
523        self.operation.header.metadata.if_seq_no = Some(seq_no);
524        self
525    }
526
527    // TODO? Should seq_no and primary_term be set together with one function call?
528    /// specify a primary term to use for optimistic concurrency control
529    pub fn if_primary_term(mut self, primary_term: i64) -> Self {
530        self.operation.header.metadata.if_primary_term = Some(primary_term);
531        self
532    }
533
534    /// specify a version number to use for optimistic concurrency control
535    pub fn version(mut self, version: i64) -> Self {
536        self.operation.header.metadata.version = Some(version);
537        self
538    }
539
540    /// The type of versioning used when a version is specified
541    pub fn version_type(mut self, version_type: VersionType) -> Self {
542        self.operation.header.metadata.version_type = Some(version_type);
543        self
544    }
545
546    /// specify how many times an update should be retried in the case of a version conflict
547    pub fn retry_on_conflict(mut self, retry_on_conflict: i32) -> Self {
548        self.operation.header.metadata.retry_on_conflict = Some(retry_on_conflict);
549        self
550    }
551
552    /// specify how the `_source` field is returned for the update operation.
553    ///
554    /// This can also be specified as part of the update action source payload instead.
555    pub fn source<S>(mut self, source: S) -> Self
556    where
557        S: Into<SourceFilter>,
558    {
559        self.operation.header.metadata._source = Some(source.into());
560        self
561    }
562}
563
564impl<B> From<BulkUpdateOperation<B>> for BulkOperation<B> {
565    fn from(b: BulkUpdateOperation<B>) -> Self {
566        b.operation
567    }
568}
569
570/// A collection of bulk operations.
571///
572/// A collection of bulk operations can perform operations against multiple different indices,
573/// specifying a different source document for each. When modelling source documents with
574/// different structs, it becomes difficult to construct a collection of bulk operations with such
575/// a setup. [BulkOperations] alleviates this difficulty by serializing bulk operations ahead of
576/// time of the bulk API call, into an internal byte buffer, using the buffered bytes as the body of
577/// the bulk API call.
578///
579/// # Example
580///
581/// Using [BulkOperations] to construct a collection of bulk operations that use different
582/// structs to model source documents
583///
584/// ```rust,no_run
585/// # use opensearch::{
586/// #     BulkOperation,
587/// #     BulkOperations,
588/// #     BulkParts,
589/// #     Error, OpenSearch,
590/// # };
591/// # use serde::Serialize;
592/// # use serde_json::{json, Value};
593/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
594/// # let client = OpenSearch::default();
595/// #[derive(Serialize)]
596/// struct IndexDoc<'a> {
597///     foo: &'a str,
598/// }
599///
600/// #[derive(Serialize)]
601/// struct CreateDoc<'a> {
602///     bar: &'a str,
603/// }
604///
605/// #[derive(Serialize)]
606/// struct UpdateDoc<'a> {
607///     baz: &'a str,
608/// }
609///
610/// let mut ops = BulkOperations::new();
611/// ops.push(BulkOperation::index(IndexDoc { foo: "index" })
612///     .id("1")
613///     .pipeline("pipeline")
614///     .index("index_doc")
615///     .routing("routing")
616/// )?;
617/// ops.push(BulkOperation::create("2", CreateDoc { bar: "create" }))?;
618/// ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
619/// ops.push(BulkOperation::<()>::delete("4"))?;
620///
621/// let bulk_response = client.bulk(BulkParts::Index("tweets"))
622///     .body(vec![ops])
623///     .send()
624///     .await?;
625///
626/// # Ok(())
627/// # }
628/// ```
629pub struct BulkOperations {
630    buf: BytesMut,
631}
632
633impl BulkOperations {
634    /// Initializes a new instance of [BulkOperations]
635    pub fn new() -> Self {
636        Self {
637            buf: BytesMut::new(),
638        }
639    }
640
641    /// Initializes a new instance of [BulkOperations], using the passed
642    /// [bytes::BytesMut] as the buffer to write operations to
643    pub fn with_bytes(buf: BytesMut) -> Self {
644        Self { buf }
645    }
646
647    /// Pushes a bulk operation into the collection of bulk operations.
648    ///
649    /// The operation is serialized and written to the underlying byte buffer.
650    pub fn push<O, B>(&mut self, op: O) -> Result<(), Error>
651    where
652        O: Into<BulkOperation<B>>,
653        B: Serialize,
654    {
655        op.into().write(&mut self.buf)
656    }
657}
658
659impl Default for BulkOperations {
660    fn default() -> Self {
661        Self::new()
662    }
663}
664
665impl Body for BulkOperations {
666    fn bytes(&self) -> Option<Bytes> {
667        Some(self.buf.clone().freeze())
668    }
669
670    fn write(&self, bytes: &mut BytesMut) -> Result<(), Error> {
671        self.buf.write(bytes)
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use crate::{
678        http::request::{Body, NdBody},
679        params::VersionType,
680        BulkOperation, BulkOperations,
681    };
682    use bytes::{BufMut, BytesMut};
683    use serde::Serialize;
684    use serde_json::{json, Value};
685    use std::{cmp::Ordering, str};
686
687    pub fn compare(a: &[u8], b: &[u8]) -> Ordering {
688        a.iter()
689            .zip(b)
690            .map(|(x, y)| x.cmp(y))
691            .find(|&ord| ord != Ordering::Equal)
692            .unwrap_or(a.len().cmp(&b.len()))
693    }
694
695    #[test]
696    fn serialize_bulk_operations_with_same_type_writes_to_bytes() -> Result<(), failure::Error> {
697        let mut bytes = BytesMut::new();
698        let mut ops: Vec<BulkOperation<Value>> = Vec::with_capacity(4);
699
700        ops.push(
701            BulkOperation::index(json!({ "foo": "index" }))
702                .id("1")
703                .pipeline("pipeline")
704                .routing("routing")
705                .if_seq_no(1)
706                .if_primary_term(2)
707                .version(3)
708                .version_type(VersionType::Internal)
709                .into(),
710        );
711        ops.push(
712            BulkOperation::create("2", json!({ "bar": "create" }))
713                .pipeline("pipeline")
714                .routing("routing")
715                .index("create_index")
716                .into(),
717        );
718        ops.push(
719            BulkOperation::update("3", json!({ "baz": "update_1" }))
720                .source(false)
721                .into(),
722        );
723        ops.push(
724            BulkOperation::update("4", json!({ "baz": "update_2" }))
725                .source("baz")
726                .into(),
727        );
728        ops.push(
729            BulkOperation::update("5", json!({ "baz": "update_3" }))
730                .source(vec!["baz"])
731                .into(),
732        );
733        ops.push(
734            BulkOperation::update("6", json!({ "baz": "update_4" }))
735                .source((vec!["baz"], vec!["bar"]))
736                .into(),
737        );
738        ops.push(BulkOperation::delete("7").into());
739
740        let body = NdBody(ops);
741        let _ = body.write(&mut bytes)?;
742
743        let mut expected = BytesMut::new();
744        expected.put_slice(b"{\"index\":{\"_id\":\"1\",\"pipeline\":\"pipeline\",\"if_seq_no\":1,\"if_primary_term\":2,\"routing\":\"routing\",\"version\":3,\"version_type\":\"internal\"}}\n");
745        expected.put_slice(b"{\"foo\":\"index\"}\n");
746        expected.put_slice(b"{\"create\":{\"_index\":\"create_index\",\"_id\":\"2\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
747        expected.put_slice(b"{\"bar\":\"create\"}\n");
748        expected.put_slice(b"{\"update\":{\"_id\":\"3\",\"_source\":false}}\n");
749        expected.put_slice(b"{\"baz\":\"update_1\"}\n");
750        expected.put_slice(b"{\"update\":{\"_id\":\"4\",\"_source\":\"baz\"}}\n");
751        expected.put_slice(b"{\"baz\":\"update_2\"}\n");
752        expected.put_slice(b"{\"update\":{\"_id\":\"5\",\"_source\":[\"baz\"]}}\n");
753        expected.put_slice(b"{\"baz\":\"update_3\"}\n");
754        expected.put_slice(b"{\"update\":{\"_id\":\"6\",\"_source\":{\"includes\":[\"baz\"],\"excludes\":[\"bar\"]}}}\n");
755        expected.put_slice(b"{\"baz\":\"update_4\"}\n");
756        expected.put_slice(b"{\"delete\":{\"_id\":\"7\"}}\n");
757
758        assert_eq!(
759            compare(&expected[..], &bytes[..]),
760            Ordering::Equal,
761            "expected {} but found {}",
762            str::from_utf8(&expected[..]).unwrap(),
763            str::from_utf8(&bytes[..]).unwrap()
764        );
765        Ok(())
766    }
767
768    #[test]
769    fn serialize_bulk_operations_with_different_types_writes_to_bytes() -> Result<(), failure::Error>
770    {
771        #[derive(Serialize)]
772        struct IndexDoc<'a> {
773            foo: &'a str,
774        }
775        #[derive(Serialize)]
776        struct CreateDoc<'a> {
777            bar: &'a str,
778        }
779        #[derive(Serialize)]
780        struct UpdateDoc<'a> {
781            baz: &'a str,
782        }
783
784        let mut bytes = BytesMut::new();
785        let mut ops = BulkOperations::new();
786
787        ops.push(
788            BulkOperation::index(IndexDoc { foo: "index" })
789                .id("1")
790                .pipeline("pipeline")
791                .index("index_doc")
792                .routing("routing"),
793        )?;
794        ops.push(BulkOperation::create("2", CreateDoc { bar: "create" }))?;
795        ops.push(BulkOperation::update("3", UpdateDoc { baz: "update" }))?;
796        ops.push(BulkOperation::<()>::delete("4"))?;
797
798        let body = NdBody(vec![ops]);
799        let _ = body.write(&mut bytes)?;
800
801        let mut expected = BytesMut::new();
802        expected.put_slice(b"{\"index\":{\"_index\":\"index_doc\",\"_id\":\"1\",\"pipeline\":\"pipeline\",\"routing\":\"routing\"}}\n");
803        expected.put_slice(b"{\"foo\":\"index\"}\n");
804        expected.put_slice(b"{\"create\":{\"_id\":\"2\"}}\n");
805        expected.put_slice(b"{\"bar\":\"create\"}\n");
806        expected.put_slice(b"{\"update\":{\"_id\":\"3\"}}\n");
807        expected.put_slice(b"{\"baz\":\"update\"}\n");
808        expected.put_slice(b"{\"delete\":{\"_id\":\"4\"}}\n");
809
810        assert_eq!(
811            compare(&expected[..], &bytes[..]),
812            Ordering::Equal,
813            "expected {} but found {}",
814            str::from_utf8(&expected[..]).unwrap(),
815            str::from_utf8(&bytes[..]).unwrap()
816        );
817        Ok(())
818    }
819}