Skip to main content

mongodb/action/
insert_many.rs

1use std::{borrow::Borrow, collections::HashSet, ops::Deref};
2
3use crate::bson::{Bson, RawDocumentBuf};
4use serde::Serialize;
5
6use crate::{
7    coll::options::InsertManyOptions,
8    error::{Error, ErrorKind, IndexedWriteError, InsertManyError, Result},
9    operation::Insert as Op,
10    options::WriteConcern,
11    results::InsertManyResult,
12    ClientSession,
13    Collection,
14};
15
16use super::{action_impl, deeplink, export_doc, option_setters, options_doc, CollRef};
17
18impl<T: Serialize + Send + Sync> Collection<T> {
19    /// Inserts the data in `docs` into the collection.
20    ///
21    /// Note that this method accepts both owned and borrowed values, so the input documents
22    /// do not need to be cloned in order to be passed in.
23    ///
24    /// This operation will retry once upon failure if the connection and encountered error support
25    /// retryability. See the documentation
26    /// [here](https://www.mongodb.com/docs/manual/core/retryable-writes/) for more information on
27    /// retryable writes.
28    ///
29    /// `await` will return d[`Result<InsertManyResult>`].
30    #[deeplink]
31    #[options_doc(insert_many)]
32    pub fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> InsertMany<'_> {
33        InsertMany {
34            coll: CollRef::new(self),
35            docs: docs
36                .into_iter()
37                .map(|v| {
38                    crate::bson_compat::serialize_to_raw_document_buf(v.borrow())
39                        .map_err(Into::into)
40                })
41                .collect(),
42            options: None,
43            session: None,
44        }
45    }
46}
47
48#[cfg(feature = "sync")]
49impl<T: Serialize + Send + Sync> crate::sync::Collection<T> {
50    /// Inserts the data in `docs` into the collection.
51    ///
52    /// Note that this method accepts both owned and borrowed values, so the input documents
53    /// do not need to be cloned in order to be passed in.
54    ///
55    /// This operation will retry once upon failure if the connection and encountered error support
56    /// retryability. See the documentation
57    /// [here](https://www.mongodb.com/docs/manual/core/retryable-writes/) for more information on
58    /// retryable writes.
59    ///
60    /// [`run`](InsertMany::run) will return d[`Result<InsertManyResult>`].
61    #[deeplink]
62    #[options_doc(insert_many, "run")]
63    pub fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> InsertMany<'_> {
64        self.async_collection.insert_many(docs)
65    }
66}
67
68/// Inserts documents into a collection.  Construct with [`Collection::insert_many`].
69#[must_use]
70pub struct InsertMany<'a> {
71    coll: CollRef<'a>,
72    docs: Result<Vec<RawDocumentBuf>>,
73    options: Option<InsertManyOptions>,
74    session: Option<&'a mut ClientSession>,
75}
76
77#[option_setters(crate::coll::options::InsertManyOptions)]
78#[export_doc(insert_many)]
79impl<'a> InsertMany<'a> {
80    /// Use the provided session when running the operation.
81    pub fn session(mut self, value: impl Into<&'a mut ClientSession>) -> Self {
82        self.session = Some(value.into());
83        self
84    }
85}
86
87#[action_impl]
88impl<'a> Action for InsertMany<'a> {
89    type Future = InsertManyFuture;
90
91    async fn execute(mut self) -> Result<InsertManyResult> {
92        resolve_write_concern_with_session!(self.coll, self.options, self.session.as_ref());
93
94        let ds = self.docs?;
95        if ds.is_empty() {
96            return Err(ErrorKind::InvalidArgument {
97                message: "No documents provided to insert_many".to_string(),
98            }
99            .into());
100        }
101        let ordered = self
102            .options
103            .as_ref()
104            .and_then(|o| o.ordered)
105            .unwrap_or(true);
106        let encrypted = self.coll.client().should_auto_encrypt().await;
107
108        let mut cumulative_failure: Option<InsertManyError> = None;
109        let mut error_labels: HashSet<String> = Default::default();
110        let mut cumulative_result: Option<InsertManyResult> = None;
111
112        let mut n_attempted = 0;
113
114        while n_attempted < ds.len() {
115            let docs: Vec<_> = ds.iter().skip(n_attempted).map(Deref::deref).collect();
116            let insert = Op::new(self.coll.namespace(), docs, self.options.clone(), encrypted);
117
118            match self
119                .coll
120                .client()
121                .execute_operation(insert, self.session.as_deref_mut())
122                .await
123            {
124                Ok(result) => {
125                    let current_batch_size = result.inserted_ids.len();
126
127                    let cumulative_result = cumulative_result.get_or_insert_with(Default::default);
128                    for (index, id) in result.inserted_ids {
129                        cumulative_result
130                            .inserted_ids
131                            .insert(index + n_attempted, id);
132                    }
133
134                    n_attempted += current_batch_size;
135                }
136                Err(e) => {
137                    let labels = e.labels().clone();
138                    match *e.kind {
139                        ErrorKind::InsertMany(bw) => {
140                            // for ordered inserts this size will be incorrect, but knowing the
141                            // batch size isn't needed for ordered
142                            // failures since we return immediately from
143                            // them anyways.
144                            let current_batch_size = bw.inserted_ids.len()
145                                + bw.write_errors.as_ref().map(|we| we.len()).unwrap_or(0);
146
147                            let failure_ref =
148                                cumulative_failure.get_or_insert_with(InsertManyError::new);
149                            if let Some(write_errors) = bw.write_errors {
150                                for err in write_errors {
151                                    let index = n_attempted + err.index;
152
153                                    failure_ref
154                                        .write_errors
155                                        .get_or_insert_with(Default::default)
156                                        .push(IndexedWriteError { index, ..err });
157                                }
158                            }
159
160                            if let Some(wc_error) = bw.write_concern_error {
161                                failure_ref.write_concern_error = Some(wc_error);
162                            }
163
164                            error_labels.extend(labels);
165
166                            if ordered {
167                                // this will always be true since we invoked get_or_insert_with
168                                // above.
169                                if let Some(failure) = cumulative_failure {
170                                    return Err(Error::new(
171                                        ErrorKind::InsertMany(failure),
172                                        Some(error_labels),
173                                    ));
174                                }
175                            }
176                            n_attempted += current_batch_size;
177                        }
178                        _ => return Err(e),
179                    }
180                }
181            }
182        }
183
184        match cumulative_failure {
185            Some(failure) => Err(Error::new(
186                ErrorKind::InsertMany(failure),
187                Some(error_labels),
188            )),
189            None => Ok(cumulative_result.unwrap_or_default()),
190        }
191    }
192}