mongodb/action/
insert_many.rs1use std::{borrow::Borrow, collections::HashSet, ops::Deref};
2
3use crate::bson::{Bson, RawDocumentBuf};
4use serde::Serialize;
5
6use crate::{
7 coll::options::InsertManyOptions,
8 error::{Error, ErrorKind, IndexedWriteError, InsertManyError, Result},
9 operation::Insert as Op,
10 options::WriteConcern,
11 results::InsertManyResult,
12 ClientSession,
13 Collection,
14};
15
16use super::{action_impl, deeplink, export_doc, option_setters, options_doc, CollRef};
17
18impl<T: Serialize + Send + Sync> Collection<T> {
19 #[deeplink]
31 #[options_doc(insert_many)]
32 pub fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> InsertMany<'_> {
33 InsertMany {
34 coll: CollRef::new(self),
35 docs: docs
36 .into_iter()
37 .map(|v| {
38 crate::bson_compat::serialize_to_raw_document_buf(v.borrow())
39 .map_err(Into::into)
40 })
41 .collect(),
42 options: None,
43 session: None,
44 }
45 }
46}
47
48#[cfg(feature = "sync")]
49impl<T: Serialize + Send + Sync> crate::sync::Collection<T> {
50 #[deeplink]
62 #[options_doc(insert_many, "run")]
63 pub fn insert_many(&self, docs: impl IntoIterator<Item = impl Borrow<T>>) -> InsertMany<'_> {
64 self.async_collection.insert_many(docs)
65 }
66}
67
68#[must_use]
70pub struct InsertMany<'a> {
71 coll: CollRef<'a>,
72 docs: Result<Vec<RawDocumentBuf>>,
73 options: Option<InsertManyOptions>,
74 session: Option<&'a mut ClientSession>,
75}
76
77#[option_setters(crate::coll::options::InsertManyOptions)]
78#[export_doc(insert_many)]
79impl<'a> InsertMany<'a> {
80 pub fn session(mut self, value: impl Into<&'a mut ClientSession>) -> Self {
82 self.session = Some(value.into());
83 self
84 }
85}
86
87#[action_impl]
88impl<'a> Action for InsertMany<'a> {
89 type Future = InsertManyFuture;
90
91 async fn execute(mut self) -> Result<InsertManyResult> {
92 resolve_write_concern_with_session!(self.coll, self.options, self.session.as_ref());
93
94 let ds = self.docs?;
95 if ds.is_empty() {
96 return Err(ErrorKind::InvalidArgument {
97 message: "No documents provided to insert_many".to_string(),
98 }
99 .into());
100 }
101 let ordered = self
102 .options
103 .as_ref()
104 .and_then(|o| o.ordered)
105 .unwrap_or(true);
106 let encrypted = self.coll.client().should_auto_encrypt().await;
107
108 let mut cumulative_failure: Option<InsertManyError> = None;
109 let mut error_labels: HashSet<String> = Default::default();
110 let mut cumulative_result: Option<InsertManyResult> = None;
111
112 let mut n_attempted = 0;
113
114 while n_attempted < ds.len() {
115 let docs: Vec<_> = ds.iter().skip(n_attempted).map(Deref::deref).collect();
116 let insert = Op::new(self.coll.namespace(), docs, self.options.clone(), encrypted);
117
118 match self
119 .coll
120 .client()
121 .execute_operation(insert, self.session.as_deref_mut())
122 .await
123 {
124 Ok(result) => {
125 let current_batch_size = result.inserted_ids.len();
126
127 let cumulative_result = cumulative_result.get_or_insert_with(Default::default);
128 for (index, id) in result.inserted_ids {
129 cumulative_result
130 .inserted_ids
131 .insert(index + n_attempted, id);
132 }
133
134 n_attempted += current_batch_size;
135 }
136 Err(e) => {
137 let labels = e.labels().clone();
138 match *e.kind {
139 ErrorKind::InsertMany(bw) => {
140 let current_batch_size = bw.inserted_ids.len()
145 + bw.write_errors.as_ref().map(|we| we.len()).unwrap_or(0);
146
147 let failure_ref =
148 cumulative_failure.get_or_insert_with(InsertManyError::new);
149 if let Some(write_errors) = bw.write_errors {
150 for err in write_errors {
151 let index = n_attempted + err.index;
152
153 failure_ref
154 .write_errors
155 .get_or_insert_with(Default::default)
156 .push(IndexedWriteError { index, ..err });
157 }
158 }
159
160 if let Some(wc_error) = bw.write_concern_error {
161 failure_ref.write_concern_error = Some(wc_error);
162 }
163
164 error_labels.extend(labels);
165
166 if ordered {
167 if let Some(failure) = cumulative_failure {
170 return Err(Error::new(
171 ErrorKind::InsertMany(failure),
172 Some(error_labels),
173 ));
174 }
175 }
176 n_attempted += current_batch_size;
177 }
178 _ => return Err(e),
179 }
180 }
181 }
182 }
183
184 match cumulative_failure {
185 Some(failure) => Err(Error::new(
186 ErrorKind::InsertMany(failure),
187 Some(error_labels),
188 )),
189 None => Ok(cumulative_result.unwrap_or_default()),
190 }
191 }
192}