Skip to main content

mongodb/client/options/
bulk_write.rs

1use std::borrow::Borrow;
2
3use macro_magic::export_tokens;
4use serde::{Deserialize, Serialize};
5use serde_with::skip_serializing_none;
6use typed_builder::TypedBuilder;
7
8use crate::{
9    bson::{rawdoc, Array, Bson, Document, RawDocumentBuf},
10    bson_compat::{cstr, serialize_to_raw_document_buf},
11    bson_util::{
12        extend_raw_document_buf,
13        get_or_prepend_id_field,
14        replacement_document_check,
15        update_document_check,
16    },
17    error::{Error, Result},
18    options::{UpdateModifications, WriteConcern},
19    serde_util::{serialize_bool_or_true, write_concern_is_empty},
20    Collection,
21    Namespace,
22};
23
24/// The supported options for [`bulk_write`](crate::Client::bulk_write).
25#[skip_serializing_none]
26#[derive(Clone, Debug, Default, Deserialize, Serialize)]
27#[serde(rename_all = "camelCase")]
28#[non_exhaustive]
29#[export_tokens]
30pub struct BulkWriteOptions {
31    /// Whether the operations should be performed in the order in which they were specified. If
32    /// true, no more writes will be performed if a single write fails. If false, writes will
33    /// continue to be attempted if a single write fails.
34    ///
35    /// Defaults to true.
36    #[serialize_always]
37    #[serde(serialize_with = "serialize_bool_or_true")]
38    pub ordered: Option<bool>,
39
40    /// Whether document-level validation should be bypassed.
41    ///
42    /// Defaults to false.
43    pub bypass_document_validation: Option<bool>,
44
45    /// An arbitrary comment to help trace the operation through the database profiler, currentOp
46    /// and logs.
47    pub comment: Option<Bson>,
48
49    /// A map of parameter names and values to apply to all operations within the bulk write.
50    /// Values must be constant or closed expressions that do not reference document fields.
51    /// Parameters can then be accessed as variables in an aggregate expression context (e.g.
52    /// "$$var").
53    #[serde(rename = "let")]
54    pub let_vars: Option<Document>,
55
56    /// The write concern to use for this operation.
57    #[serde(skip_serializing_if = "write_concern_is_empty")]
58    pub write_concern: Option<WriteConcern>,
59}
60
61/// A single write to be performed within a [`bulk_write`](crate::Client::bulk_write) operation.
62#[skip_serializing_none]
63#[derive(Clone, Debug, Serialize)]
64#[serde(untagged)]
65#[non_exhaustive]
66#[allow(missing_docs)]
67pub enum WriteModel {
68    InsertOne(InsertOneModel),
69    UpdateOne(UpdateOneModel),
70    UpdateMany(UpdateManyModel),
71    ReplaceOne(ReplaceOneModel),
72    DeleteOne(DeleteOneModel),
73    DeleteMany(DeleteManyModel),
74}
75
76/// Inserts a single document.
77#[skip_serializing_none]
78#[derive(Clone, Debug, Serialize, TypedBuilder)]
79#[cfg_attr(test, derive(Deserialize))]
80#[serde(rename_all = "camelCase")]
81#[builder(field_defaults(default, setter(into)))]
82#[non_exhaustive]
83pub struct InsertOneModel {
84    /// The namespace on which the insert should be performed.
85    #[serde(skip_serializing)]
86    #[builder(!default)]
87    pub namespace: Namespace,
88
89    /// The document to insert.
90    #[builder(!default)]
91    pub document: Document,
92}
93
94impl From<InsertOneModel> for WriteModel {
95    fn from(model: InsertOneModel) -> Self {
96        Self::InsertOne(model)
97    }
98}
99
100/// Updates a single document.
101#[skip_serializing_none]
102#[derive(Clone, Debug, Serialize, TypedBuilder)]
103#[cfg_attr(test, derive(Deserialize))]
104#[serde(rename_all = "camelCase")]
105#[builder(field_defaults(default, setter(into)))]
106#[non_exhaustive]
107pub struct UpdateOneModel {
108    /// The namespace on which the update should be performed.
109    #[serde(skip_serializing)]
110    #[builder(!default)]
111    pub namespace: Namespace,
112
113    /// The filter to use. The first document matching this filter will be updated.
114    #[builder(!default)]
115    pub filter: Document,
116
117    /// The update to perform.
118    #[serde(rename(serialize = "updateMods"))]
119    #[builder(!default)]
120    pub update: UpdateModifications,
121
122    /// A set of filters specifying to which array elements an update should apply.
123    pub array_filters: Option<Array>,
124
125    /// The collation to use.
126    pub collation: Option<Document>,
127
128    /// The index to use. Specify either the index name as a string or the index key pattern. If
129    /// specified, then the query system will only consider plans using the hinted index.
130    pub hint: Option<Bson>,
131
132    /// Whether a new document should be created if no document matches the filter.
133    ///
134    /// Defaults to false.
135    pub upsert: Option<bool>,
136
137    /// Specify which document the operation updates if the query matches multiple
138    /// documents. The first document matched by the sort order will be updated.
139    pub sort: Option<Document>,
140}
141
142impl From<UpdateOneModel> for WriteModel {
143    fn from(model: UpdateOneModel) -> Self {
144        Self::UpdateOne(model)
145    }
146}
147
148/// Updates multiple documents.
149#[skip_serializing_none]
150#[derive(Clone, Debug, Serialize, TypedBuilder)]
151#[cfg_attr(test, derive(Deserialize))]
152#[serde(rename_all = "camelCase")]
153#[builder(field_defaults(default, setter(into)))]
154#[non_exhaustive]
155pub struct UpdateManyModel {
156    /// The namespace on which the update should be performed.
157    #[serde(skip_serializing)]
158    #[builder(!default)]
159    pub namespace: Namespace,
160
161    /// The filter to use. All documents matching this filter will be updated.
162    #[builder(!default)]
163    pub filter: Document,
164
165    /// The update to perform.
166    #[serde(rename(serialize = "updateMods"))]
167    #[builder(!default)]
168    pub update: UpdateModifications,
169
170    /// A set of filters specifying to which array elements an update should apply.
171    pub array_filters: Option<Array>,
172
173    /// The collation to use.
174    pub collation: Option<Document>,
175
176    /// The index to use. Specify either the index name as a string or the index key pattern. If
177    /// specified, then the query system will only consider plans using the hinted index.
178    pub hint: Option<Bson>,
179
180    /// Whether a new document should be created if no document matches the filter.
181    ///
182    /// Defaults to false.
183    pub upsert: Option<bool>,
184}
185
186impl From<UpdateManyModel> for WriteModel {
187    fn from(model: UpdateManyModel) -> Self {
188        Self::UpdateMany(model)
189    }
190}
191
192/// Replaces a single document.
193#[skip_serializing_none]
194#[derive(Clone, Debug, Serialize, TypedBuilder)]
195#[cfg_attr(test, derive(Deserialize))]
196#[serde(rename_all = "camelCase")]
197#[builder(field_defaults(default, setter(into)))]
198#[non_exhaustive]
199pub struct ReplaceOneModel {
200    /// The namespace on which the replace should be performed.
201    #[serde(skip_serializing)]
202    #[builder(!default)]
203    pub namespace: Namespace,
204
205    /// The filter to use.
206    #[builder(!default)]
207    pub filter: Document,
208
209    /// The replacement document.
210    #[serde(rename(serialize = "updateMods"))]
211    #[builder(!default)]
212    pub replacement: Document,
213
214    /// The collation to use.
215    pub collation: Option<Document>,
216
217    /// The index to use. Specify either the index name as a string or the index key pattern. If
218    /// specified, then the query system will only consider plans using the hinted index.
219    pub hint: Option<Bson>,
220
221    /// Whether a new document should be created if no document matches the filter.
222    ///
223    /// Defaults to false.
224    pub upsert: Option<bool>,
225
226    /// Specify which document the operation replaces if the query matches multiple
227    /// documents. The first document matched by the sort order will be replaced.
228    pub sort: Option<Document>,
229}
230
231impl From<ReplaceOneModel> for WriteModel {
232    fn from(model: ReplaceOneModel) -> Self {
233        Self::ReplaceOne(model)
234    }
235}
236
237/// Deletes a single document.
238#[skip_serializing_none]
239#[derive(Clone, Debug, Serialize, TypedBuilder)]
240#[cfg_attr(test, derive(Deserialize))]
241#[serde(rename_all = "camelCase")]
242#[builder(field_defaults(default, setter(into)))]
243#[non_exhaustive]
244pub struct DeleteOneModel {
245    /// The namespace on which the delete should be performed.
246    #[serde(skip_serializing)]
247    #[builder(!default)]
248    pub namespace: Namespace,
249
250    /// The filter to use. The first document matching this filter will be deleted.
251    #[builder(!default)]
252    pub filter: Document,
253
254    /// The collation to use.
255    pub collation: Option<Document>,
256
257    /// The index to use. Specify either the index name as a string or the index key pattern. If
258    /// specified, then the query system will only consider plans using the hinted index.
259    pub hint: Option<Bson>,
260}
261
262impl From<DeleteOneModel> for WriteModel {
263    fn from(model: DeleteOneModel) -> Self {
264        Self::DeleteOne(model)
265    }
266}
267
268/// Deletes multiple documents.
269#[skip_serializing_none]
270#[derive(Clone, Debug, Serialize, TypedBuilder)]
271#[cfg_attr(test, derive(Deserialize))]
272#[serde(rename_all = "camelCase")]
273#[builder(field_defaults(default, setter(into)))]
274#[non_exhaustive]
275pub struct DeleteManyModel {
276    /// The namespace on which the delete should be performed.
277    #[serde(skip_serializing)]
278    #[builder(!default)]
279    pub namespace: Namespace,
280
281    /// The filter to use. All documents matching this filter will be deleted.
282    pub filter: Document,
283
284    /// The collation to use.
285    pub collation: Option<Document>,
286
287    /// The index to use. Specify either the index name as a string or the index key pattern. If
288    /// specified, then the query system will only consider plans using the hinted index.
289    pub hint: Option<Bson>,
290}
291
292impl From<DeleteManyModel> for WriteModel {
293    fn from(model: DeleteManyModel) -> Self {
294        Self::DeleteMany(model)
295    }
296}
297
298impl<T> Collection<T>
299where
300    T: Send + Sync + Serialize,
301{
302    /// Constructs an [`InsertOneModel`] with this collection's namespace by serializing the
303    /// provided value into a [`Document`]. Returns an error if serialization fails.
304    ///
305    /// Note that the returned value must be provided to [`bulk_write`](crate::Client::bulk_write)
306    /// for the insert to be performed.
307    pub fn insert_one_model(&self, document: impl Borrow<T>) -> Result<InsertOneModel> {
308        let document = crate::bson_compat::serialize_to_document(document.borrow())?;
309        Ok(InsertOneModel::builder()
310            .namespace(self.namespace())
311            .document(document)
312            .build())
313    }
314
315    /// Constructs a [`ReplaceOneModel`] with this collection's namespace by serializing the
316    /// provided value into a [`Document`]. Returns an error if serialization fails.
317    ///
318    /// Note that the returned value must be provided to [`bulk_write`](crate::Client::bulk_write)
319    /// for the replace to be performed.
320    pub fn replace_one_model(
321        &self,
322        filter: Document,
323        replacement: impl Borrow<T>,
324    ) -> Result<ReplaceOneModel> {
325        let replacement = crate::bson_compat::serialize_to_document(replacement.borrow())?;
326        Ok(ReplaceOneModel::builder()
327            .namespace(self.namespace())
328            .filter(filter)
329            .replacement(replacement)
330            .build())
331    }
332}
333
334pub(crate) enum OperationType {
335    Insert,
336    Update,
337    Delete,
338}
339
340impl WriteModel {
341    pub(crate) fn namespace(&self) -> &Namespace {
342        match self {
343            Self::InsertOne(model) => &model.namespace,
344            Self::UpdateOne(model) => &model.namespace,
345            Self::UpdateMany(model) => &model.namespace,
346            Self::ReplaceOne(model) => &model.namespace,
347            Self::DeleteOne(model) => &model.namespace,
348            Self::DeleteMany(model) => &model.namespace,
349        }
350    }
351
352    pub(crate) fn operation_type(&self) -> OperationType {
353        match self {
354            Self::InsertOne(_) => OperationType::Insert,
355            Self::UpdateOne(_) | Self::UpdateMany(_) | Self::ReplaceOne(_) => OperationType::Update,
356            Self::DeleteOne(_) | Self::DeleteMany(_) => OperationType::Delete,
357        }
358    }
359
360    /// Whether this operation should apply to all documents that match the filter. Returns None if
361    /// the operation does not use a filter.
362    pub(crate) fn multi(&self) -> Option<bool> {
363        match self {
364            Self::UpdateMany(_) | Self::DeleteMany(_) => Some(true),
365            Self::UpdateOne(_) | Self::ReplaceOne(_) | Self::DeleteOne(_) => Some(false),
366            Self::InsertOne(_) => None,
367        }
368    }
369
370    pub(crate) fn operation_name(&self) -> &'static crate::bson_compat::CStr {
371        use crate::bson_compat::cstr;
372        match self.operation_type() {
373            OperationType::Insert => cstr!("insert"),
374            OperationType::Update => cstr!("update"),
375            OperationType::Delete => cstr!("delete"),
376        }
377    }
378
379    /// Constructs the ops document for this write model given the nsInfo array index.
380    pub(crate) fn get_ops_document(
381        &self,
382        ns_info_index: usize,
383    ) -> Result<(RawDocumentBuf, Option<Bson>)> {
384        // The maximum number of namespaces allowed in a bulkWrite command is much lower than
385        // i32::MAX, so this should never fail.
386        let index = i32::try_from(ns_info_index)
387            .map_err(|_| Error::internal("nsInfo index exceeds i32::MAX"))?;
388        let mut ops_document = rawdoc! { self.operation_name(): index };
389
390        if let Self::UpdateOne(UpdateOneModel { update, .. })
391        | Self::UpdateMany(UpdateManyModel { update, .. }) = self
392        {
393            if let UpdateModifications::Document(update_document) = update {
394                update_document_check(update_document)?;
395            }
396        } else if let Self::ReplaceOne(ReplaceOneModel { replacement, .. }) = self {
397            replacement_document_check(replacement)?;
398        }
399
400        if let Some(multi) = self.multi() {
401            ops_document.append(cstr!("multi"), multi);
402        }
403
404        if let Self::InsertOne(model) = self {
405            let mut insert_document = RawDocumentBuf::try_from(&model.document)?;
406            let inserted_id = get_or_prepend_id_field(&mut insert_document)?;
407            ops_document.append(cstr!("document"), insert_document);
408            Ok((ops_document, Some(inserted_id)))
409        } else {
410            let model = serialize_to_raw_document_buf(&self)?;
411            extend_raw_document_buf(&mut ops_document, model)?;
412            Ok((ops_document, None))
413        }
414    }
415}