mongodb/client/options/
bulk_write.rs

1use std::borrow::Borrow;
2
3use macro_magic::export_tokens;
4use serde::{Deserialize, Serialize};
5use serde_with::skip_serializing_none;
6use typed_builder::TypedBuilder;
7
8use crate::{
9    bson::{rawdoc, Array, Bson, Document, RawDocumentBuf},
10    bson_util::{get_or_prepend_id_field, replacement_document_check, update_document_check},
11    error::Result,
12    options::{UpdateModifications, WriteConcern},
13    serde_util::serialize_bool_or_true,
14    Collection,
15    Namespace,
16};
17
18/// The supported options for [`bulk_write`](crate::Client::bulk_write).
19#[skip_serializing_none]
20#[derive(Clone, Debug, Default, Deserialize, Serialize)]
21#[serde(rename_all = "camelCase")]
22#[non_exhaustive]
23#[export_tokens]
24pub struct BulkWriteOptions {
25    /// Whether the operations should be performed in the order in which they were specified. If
26    /// true, no more writes will be performed if a single write fails. If false, writes will
27    /// continue to be attempted if a single write fails.
28    ///
29    /// Defaults to true.
30    #[serialize_always]
31    #[serde(serialize_with = "serialize_bool_or_true")]
32    pub ordered: Option<bool>,
33
34    /// Whether document-level validation should be bypassed.
35    ///
36    /// Defaults to false.
37    pub bypass_document_validation: Option<bool>,
38
39    /// An arbitrary comment to help trace the operation through the database profiler, currentOp
40    /// and logs.
41    pub comment: Option<Bson>,
42
43    /// A map of parameter names and values to apply to all operations within the bulk write.
44    /// Values must be constant or closed expressions that do not reference document fields.
45    /// Parameters can then be accessed as variables in an aggregate expression context (e.g.
46    /// "$$var").
47    #[serde(rename = "let")]
48    pub let_vars: Option<Document>,
49
50    /// The write concern to use for this operation.
51    pub write_concern: Option<WriteConcern>,
52}
53
54/// A single write to be performed within a [`bulk_write`](crate::Client::bulk_write) operation.
55#[skip_serializing_none]
56#[derive(Clone, Debug, Serialize)]
57#[serde(untagged)]
58#[non_exhaustive]
59#[allow(missing_docs)]
60pub enum WriteModel {
61    InsertOne(InsertOneModel),
62    UpdateOne(UpdateOneModel),
63    UpdateMany(UpdateManyModel),
64    ReplaceOne(ReplaceOneModel),
65    DeleteOne(DeleteOneModel),
66    DeleteMany(DeleteManyModel),
67}
68
69/// Inserts a single document.
70#[skip_serializing_none]
71#[derive(Clone, Debug, Serialize, TypedBuilder)]
72#[cfg_attr(test, derive(Deserialize))]
73#[serde(rename_all = "camelCase")]
74#[builder(field_defaults(default, setter(into)))]
75#[non_exhaustive]
76pub struct InsertOneModel {
77    /// The namespace on which the insert should be performed.
78    #[serde(skip_serializing)]
79    #[builder(!default)]
80    pub namespace: Namespace,
81
82    /// The document to insert.
83    #[builder(!default)]
84    pub document: Document,
85}
86
87impl From<InsertOneModel> for WriteModel {
88    fn from(model: InsertOneModel) -> Self {
89        Self::InsertOne(model)
90    }
91}
92
93/// Updates a single document.
94#[skip_serializing_none]
95#[derive(Clone, Debug, Serialize, TypedBuilder)]
96#[cfg_attr(test, derive(Deserialize))]
97#[serde(rename_all = "camelCase")]
98#[builder(field_defaults(default, setter(into)))]
99#[non_exhaustive]
100pub struct UpdateOneModel {
101    /// The namespace on which the update should be performed.
102    #[serde(skip_serializing)]
103    #[builder(!default)]
104    pub namespace: Namespace,
105
106    /// The filter to use. The first document matching this filter will be updated.
107    #[builder(!default)]
108    pub filter: Document,
109
110    /// The update to perform.
111    #[serde(rename(serialize = "updateMods"))]
112    #[builder(!default)]
113    pub update: UpdateModifications,
114
115    /// A set of filters specifying to which array elements an update should apply.
116    pub array_filters: Option<Array>,
117
118    /// The collation to use.
119    pub collation: Option<Document>,
120
121    /// The index to use. Specify either the index name as a string or the index key pattern. If
122    /// specified, then the query system will only consider plans using the hinted index.
123    pub hint: Option<Bson>,
124
125    /// Whether a new document should be created if no document matches the filter.
126    ///
127    /// Defaults to false.
128    pub upsert: Option<bool>,
129
130    /// Specify which document the operation updates if the query matches multiple
131    /// documents. The first document matched by the sort order will be updated.
132    pub sort: Option<Document>,
133}
134
135impl From<UpdateOneModel> for WriteModel {
136    fn from(model: UpdateOneModel) -> Self {
137        Self::UpdateOne(model)
138    }
139}
140
141/// Updates multiple documents.
142#[skip_serializing_none]
143#[derive(Clone, Debug, Serialize, TypedBuilder)]
144#[cfg_attr(test, derive(Deserialize))]
145#[serde(rename_all = "camelCase")]
146#[builder(field_defaults(default, setter(into)))]
147#[non_exhaustive]
148pub struct UpdateManyModel {
149    /// The namespace on which the update should be performed.
150    #[serde(skip_serializing)]
151    #[builder(!default)]
152    pub namespace: Namespace,
153
154    /// The filter to use. All documents matching this filter will be updated.
155    #[builder(!default)]
156    pub filter: Document,
157
158    /// The update to perform.
159    #[serde(rename(serialize = "updateMods"))]
160    #[builder(!default)]
161    pub update: UpdateModifications,
162
163    /// A set of filters specifying to which array elements an update should apply.
164    pub array_filters: Option<Array>,
165
166    /// The collation to use.
167    pub collation: Option<Document>,
168
169    /// The index to use. Specify either the index name as a string or the index key pattern. If
170    /// specified, then the query system will only consider plans using the hinted index.
171    pub hint: Option<Bson>,
172
173    /// Whether a new document should be created if no document matches the filter.
174    ///
175    /// Defaults to false.
176    pub upsert: Option<bool>,
177}
178
179impl From<UpdateManyModel> for WriteModel {
180    fn from(model: UpdateManyModel) -> Self {
181        Self::UpdateMany(model)
182    }
183}
184
185/// Replaces a single document.
186#[skip_serializing_none]
187#[derive(Clone, Debug, Serialize, TypedBuilder)]
188#[cfg_attr(test, derive(Deserialize))]
189#[serde(rename_all = "camelCase")]
190#[builder(field_defaults(default, setter(into)))]
191#[non_exhaustive]
192pub struct ReplaceOneModel {
193    /// The namespace on which the replace should be performed.
194    #[serde(skip_serializing)]
195    #[builder(!default)]
196    pub namespace: Namespace,
197
198    /// The filter to use.
199    #[builder(!default)]
200    pub filter: Document,
201
202    /// The replacement document.
203    #[serde(rename(serialize = "updateMods"))]
204    #[builder(!default)]
205    pub replacement: Document,
206
207    /// The collation to use.
208    pub collation: Option<Document>,
209
210    /// The index to use. Specify either the index name as a string or the index key pattern. If
211    /// specified, then the query system will only consider plans using the hinted index.
212    pub hint: Option<Bson>,
213
214    /// Whether a new document should be created if no document matches the filter.
215    ///
216    /// Defaults to false.
217    pub upsert: Option<bool>,
218
219    /// Specify which document the operation replaces if the query matches multiple
220    /// documents. The first document matched by the sort order will be replaced.
221    pub sort: Option<Document>,
222}
223
224impl From<ReplaceOneModel> for WriteModel {
225    fn from(model: ReplaceOneModel) -> Self {
226        Self::ReplaceOne(model)
227    }
228}
229
230/// Deletes a single document.
231#[skip_serializing_none]
232#[derive(Clone, Debug, Serialize, TypedBuilder)]
233#[cfg_attr(test, derive(Deserialize))]
234#[serde(rename_all = "camelCase")]
235#[builder(field_defaults(default, setter(into)))]
236#[non_exhaustive]
237pub struct DeleteOneModel {
238    /// The namespace on which the delete should be performed.
239    #[serde(skip_serializing)]
240    #[builder(!default)]
241    pub namespace: Namespace,
242
243    /// The filter to use. The first document matching this filter will be deleted.
244    #[builder(!default)]
245    pub filter: Document,
246
247    /// The collation to use.
248    pub collation: Option<Document>,
249
250    /// The index to use. Specify either the index name as a string or the index key pattern. If
251    /// specified, then the query system will only consider plans using the hinted index.
252    pub hint: Option<Bson>,
253}
254
255impl From<DeleteOneModel> for WriteModel {
256    fn from(model: DeleteOneModel) -> Self {
257        Self::DeleteOne(model)
258    }
259}
260
261/// Deletes multiple documents.
262#[skip_serializing_none]
263#[derive(Clone, Debug, Serialize, TypedBuilder)]
264#[cfg_attr(test, derive(Deserialize))]
265#[serde(rename_all = "camelCase")]
266#[builder(field_defaults(default, setter(into)))]
267#[non_exhaustive]
268pub struct DeleteManyModel {
269    /// The namespace on which the delete should be performed.
270    #[serde(skip_serializing)]
271    #[builder(!default)]
272    pub namespace: Namespace,
273
274    /// The filter to use. All documents matching this filter will be deleted.
275    pub filter: Document,
276
277    /// The collation to use.
278    pub collation: Option<Document>,
279
280    /// The index to use. Specify either the index name as a string or the index key pattern. If
281    /// specified, then the query system will only consider plans using the hinted index.
282    pub hint: Option<Bson>,
283}
284
285impl From<DeleteManyModel> for WriteModel {
286    fn from(model: DeleteManyModel) -> Self {
287        Self::DeleteMany(model)
288    }
289}
290
291impl<T> Collection<T>
292where
293    T: Send + Sync + Serialize,
294{
295    /// Constructs an [`InsertOneModel`] with this collection's namespace by serializing the
296    /// provided value into a [`Document`]. Returns an error if serialization fails.
297    ///
298    /// Note that the returned value must be provided to [`bulk_write`](crate::Client::bulk_write)
299    /// for the insert to be performed.
300    pub fn insert_one_model(&self, document: impl Borrow<T>) -> Result<InsertOneModel> {
301        let document = bson::to_document(document.borrow())?;
302        Ok(InsertOneModel::builder()
303            .namespace(self.namespace())
304            .document(document)
305            .build())
306    }
307
308    /// Constructs a [`ReplaceOneModel`] with this collection's namespace by serializing the
309    /// provided value into a [`Document`]. Returns an error if serialization fails.
310    ///
311    /// Note that the returned value must be provided to [`bulk_write`](crate::Client::bulk_write)
312    /// for the replace to be performed.
313    pub fn replace_one_model(
314        &self,
315        filter: Document,
316        replacement: impl Borrow<T>,
317    ) -> Result<ReplaceOneModel> {
318        let replacement = bson::to_document(replacement.borrow())?;
319        Ok(ReplaceOneModel::builder()
320            .namespace(self.namespace())
321            .filter(filter)
322            .replacement(replacement)
323            .build())
324    }
325}
326
327pub(crate) enum OperationType {
328    Insert,
329    Update,
330    Delete,
331}
332
333impl WriteModel {
334    pub(crate) fn namespace(&self) -> &Namespace {
335        match self {
336            Self::InsertOne(model) => &model.namespace,
337            Self::UpdateOne(model) => &model.namespace,
338            Self::UpdateMany(model) => &model.namespace,
339            Self::ReplaceOne(model) => &model.namespace,
340            Self::DeleteOne(model) => &model.namespace,
341            Self::DeleteMany(model) => &model.namespace,
342        }
343    }
344
345    pub(crate) fn operation_type(&self) -> OperationType {
346        match self {
347            Self::InsertOne(_) => OperationType::Insert,
348            Self::UpdateOne(_) | Self::UpdateMany(_) | Self::ReplaceOne(_) => OperationType::Update,
349            Self::DeleteOne(_) | Self::DeleteMany(_) => OperationType::Delete,
350        }
351    }
352
353    /// Whether this operation should apply to all documents that match the filter. Returns None if
354    /// the operation does not use a filter.
355    pub(crate) fn multi(&self) -> Option<bool> {
356        match self {
357            Self::UpdateMany(_) | Self::DeleteMany(_) => Some(true),
358            Self::UpdateOne(_) | Self::ReplaceOne(_) | Self::DeleteOne(_) => Some(false),
359            Self::InsertOne(_) => None,
360        }
361    }
362
363    pub(crate) fn operation_name(&self) -> &'static str {
364        match self.operation_type() {
365            OperationType::Insert => "insert",
366            OperationType::Update => "update",
367            OperationType::Delete => "delete",
368        }
369    }
370
371    /// Returns the operation-specific fields that should be included in this model's entry in the
372    /// ops array. Also returns an inserted ID if this is an insert operation.
373    pub(crate) fn get_ops_document_contents(&self) -> Result<(RawDocumentBuf, Option<Bson>)> {
374        if let Self::UpdateOne(UpdateOneModel { update, .. })
375        | Self::UpdateMany(UpdateManyModel { update, .. }) = self
376        {
377            if let UpdateModifications::Document(update_document) = update {
378                update_document_check(update_document)?;
379            }
380        } else if let Self::ReplaceOne(ReplaceOneModel { replacement, .. }) = self {
381            replacement_document_check(replacement)?;
382        }
383
384        let (mut model_document, inserted_id) = match self {
385            Self::InsertOne(model) => {
386                let mut insert_document = RawDocumentBuf::from_document(&model.document)?;
387                let inserted_id = get_or_prepend_id_field(&mut insert_document)?;
388                (rawdoc! { "document": insert_document }, Some(inserted_id))
389            }
390            _ => {
391                let model_document = bson::to_raw_document_buf(&self)?;
392                (model_document, None)
393            }
394        };
395
396        if let Some(multi) = self.multi() {
397            model_document.append("multi", multi);
398        }
399
400        Ok((model_document, inserted_id))
401    }
402}