mongodb/
operation.rs

1mod abort_transaction;
2pub(crate) mod aggregate;
3pub(crate) mod bulk_write;
4mod commit_transaction;
5pub(crate) mod count;
6pub(crate) mod count_documents;
7pub(crate) mod create;
8mod create_indexes;
9mod delete;
10mod distinct;
11pub(crate) mod drop_collection;
12pub(crate) mod drop_database;
13mod drop_indexes;
14mod find;
15pub(crate) mod find_and_modify;
16mod get_more;
17mod insert;
18pub(crate) mod list_collections;
19pub(crate) mod list_databases;
20mod list_indexes;
21#[cfg(feature = "in-use-encryption")]
22mod raw_output;
23pub(crate) mod run_command;
24pub(crate) mod run_cursor_command;
25mod search_index;
26mod update;
27
28use std::{collections::VecDeque, fmt::Debug, ops::Deref};
29
30use bson::{RawBsonRef, RawDocument, RawDocumentBuf, Timestamp};
31use futures_util::FutureExt;
32use serde::{de::DeserializeOwned, Deserialize, Serialize};
33
34use crate::{
35    bson::{self, Bson, Document},
36    bson_util::{self, extend_raw_document_buf},
37    client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
38    cmap::{
39        conn::{pooled::PooledConnection, PinnedConnectionHandle},
40        Command,
41        RawCommandResponse,
42        StreamDescription,
43    },
44    error::{
45        CommandError,
46        Error,
47        ErrorKind,
48        IndexedWriteError,
49        InsertManyError,
50        Result,
51        WriteConcernError,
52        WriteFailure,
53    },
54    options::WriteConcern,
55    selection_criteria::SelectionCriteria,
56    BoxFuture,
57    ClientSession,
58    Namespace,
59};
60
61pub(crate) use abort_transaction::AbortTransaction;
62pub(crate) use commit_transaction::CommitTransaction;
63pub(crate) use create_indexes::CreateIndexes;
64pub(crate) use delete::Delete;
65pub(crate) use distinct::Distinct;
66pub(crate) use drop_indexes::DropIndexes;
67pub(crate) use find::Find;
68pub(crate) use find_and_modify::FindAndModify;
69pub(crate) use get_more::GetMore;
70pub(crate) use insert::Insert;
71pub(crate) use list_indexes::ListIndexes;
72#[cfg(feature = "in-use-encryption")]
73pub(crate) use raw_output::RawOutput;
74pub(crate) use search_index::{CreateSearchIndexes, DropSearchIndex, UpdateSearchIndex};
75pub(crate) use update::{Update, UpdateOrReplace};
76
77const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
78const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
80// The maximum number of bytes that may be included in a write payload when auto-encryption is
81// enabled.
82const MAX_ENCRYPTED_WRITE_SIZE: usize = 2_097_152;
83// The amount of message overhead (OP_MSG bytes and command-agnostic fields) to account for when
84// building a multi-write operation using document sequences.
85const OP_MSG_OVERHEAD_BYTES: usize = 1_000;
86
87/// Context about the execution of the operation.
88pub(crate) struct ExecutionContext<'a> {
89    pub(crate) connection: &'a mut PooledConnection,
90    pub(crate) session: Option<&'a mut ClientSession>,
91}
92
93#[derive(Debug, PartialEq, Clone, Copy)]
94pub(crate) enum Retryability {
95    Write,
96    Read,
97    None,
98}
99
100/// A trait modeling the behavior of a server side operation.
101///
102/// No methods in this trait should have default behaviors to ensure that wrapper operations
103/// replicate all behavior.  Default behavior is provided by the `OperationDefault` trait.
104pub(crate) trait Operation {
105    /// The output type of this operation.
106    type O;
107
108    /// The name of the server side command associated with this operation.
109    const NAME: &'static str;
110
111    /// Returns the command that should be sent to the server as part of this operation.
112    /// The operation may store some additional state that is required for handling the response.
113    fn build(&mut self, description: &StreamDescription) -> Result<Command>;
114
115    /// Parse the response for the atClusterTime field.
116    /// Depending on the operation, this may be found in different locations.
117    fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>>;
118
119    /// Interprets the server response to the command.
120    fn handle_response<'a>(
121        &'a self,
122        response: RawCommandResponse,
123        context: ExecutionContext<'a>,
124    ) -> BoxFuture<'a, Result<Self::O>>;
125
126    /// Interpret an error encountered while sending the built command to the server, potentially
127    /// recovering.
128    fn handle_error(&self, error: Error) -> Result<Self::O>;
129
130    /// Criteria to use for selecting the server that this operation will be executed on.
131    fn selection_criteria(&self) -> Option<&SelectionCriteria>;
132
133    /// Whether or not this operation will request acknowledgment from the server.
134    fn is_acknowledged(&self) -> bool;
135
136    /// The write concern to use for this operation, if any.
137    fn write_concern(&self) -> Option<&WriteConcern>;
138
139    /// Returns whether or not this command supports the `readConcern` field.
140    fn supports_read_concern(&self, description: &StreamDescription) -> bool;
141
142    /// Whether this operation supports sessions or not.
143    fn supports_sessions(&self) -> bool;
144
145    /// The level of retryability the operation supports.
146    fn retryability(&self) -> Retryability;
147
148    /// Updates this operation as needed for a retry.
149    fn update_for_retry(&mut self);
150
151    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
152
153    fn name(&self) -> &str;
154}
155
156// A mirror of the `Operation` trait, with default behavior where appropriate.  Should only be
157// implemented by operation types that do not delegate to other operations.
158pub(crate) trait OperationWithDefaults: Send + Sync {
159    /// The output type of this operation.
160    type O;
161
162    /// The name of the server side command associated with this operation.
163    const NAME: &'static str;
164
165    /// Returns the command that should be sent to the server as part of this operation.
166    /// The operation may store some additional state that is required for handling the response.
167    fn build(&mut self, description: &StreamDescription) -> Result<Command>;
168
169    /// Parse the response for the atClusterTime field.
170    /// Depending on the operation, this may be found in different locations.
171    fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>> {
172        Ok(None)
173    }
174
175    /// Interprets the server response to the command.
176    fn handle_response<'a>(
177        &'a self,
178        _response: RawCommandResponse,
179        _context: ExecutionContext<'a>,
180    ) -> Result<Self::O> {
181        Err(ErrorKind::Internal {
182            message: format!("operation handling not implemented for {}", Self::NAME),
183        }
184        .into())
185    }
186
187    /// Interprets the server response to the command. This method should only be implemented when
188    /// async code is required to handle the response.
189    fn handle_response_async<'a>(
190        &'a self,
191        response: RawCommandResponse,
192        context: ExecutionContext<'a>,
193    ) -> BoxFuture<'a, Result<Self::O>> {
194        async move { self.handle_response(response, context) }.boxed()
195    }
196
197    /// Interpret an error encountered while sending the built command to the server, potentially
198    /// recovering.
199    fn handle_error(&self, error: Error) -> Result<Self::O> {
200        Err(error)
201    }
202
203    /// Criteria to use for selecting the server that this operation will be executed on.
204    fn selection_criteria(&self) -> Option<&SelectionCriteria> {
205        None
206    }
207
208    /// Whether or not this operation will request acknowledgment from the server.
209    fn is_acknowledged(&self) -> bool {
210        self.write_concern()
211            .map(WriteConcern::is_acknowledged)
212            .unwrap_or(true)
213    }
214
215    /// The write concern to use for this operation, if any.
216    fn write_concern(&self) -> Option<&WriteConcern> {
217        None
218    }
219
220    /// Returns whether or not this command supports the `readConcern` field.
221    fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
222        false
223    }
224
225    /// Whether this operation supports sessions or not.
226    fn supports_sessions(&self) -> bool {
227        true
228    }
229
230    /// The level of retryability the operation supports.
231    fn retryability(&self) -> Retryability {
232        Retryability::None
233    }
234
235    /// Updates this operation as needed for a retry.
236    fn update_for_retry(&mut self) {}
237
238    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
239        None
240    }
241
242    fn name(&self) -> &str {
243        Self::NAME
244    }
245}
246
247impl<T: OperationWithDefaults> Operation for T
248where
249    T: Send + Sync,
250{
251    type O = T::O;
252    const NAME: &'static str = T::NAME;
253    fn build(&mut self, description: &StreamDescription) -> Result<Command> {
254        self.build(description)
255    }
256    fn extract_at_cluster_time(&self, response: &RawDocument) -> Result<Option<Timestamp>> {
257        self.extract_at_cluster_time(response)
258    }
259    fn handle_response<'a>(
260        &'a self,
261        response: RawCommandResponse,
262        context: ExecutionContext<'a>,
263    ) -> BoxFuture<'a, Result<Self::O>> {
264        self.handle_response_async(response, context)
265    }
266    fn handle_error(&self, error: Error) -> Result<Self::O> {
267        self.handle_error(error)
268    }
269    fn selection_criteria(&self) -> Option<&SelectionCriteria> {
270        self.selection_criteria()
271    }
272    fn is_acknowledged(&self) -> bool {
273        self.is_acknowledged()
274    }
275    fn write_concern(&self) -> Option<&WriteConcern> {
276        self.write_concern()
277    }
278    fn supports_read_concern(&self, description: &StreamDescription) -> bool {
279        self.supports_read_concern(description)
280    }
281    fn supports_sessions(&self) -> bool {
282        self.supports_sessions()
283    }
284    fn retryability(&self) -> Retryability {
285        self.retryability()
286    }
287    fn update_for_retry(&mut self) {
288        self.update_for_retry()
289    }
290    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
291        self.pinned_connection()
292    }
293    fn name(&self) -> &str {
294        self.name()
295    }
296}
297
298fn should_redact_body(body: &RawDocumentBuf) -> bool {
299    if let Some(Ok((command_name, _))) = body.into_iter().next() {
300        HELLO_COMMAND_NAMES.contains(command_name.to_lowercase().as_str())
301            && body.get("speculativeAuthenticate").ok().flatten().is_some()
302    } else {
303        false
304    }
305}
306
307impl Command {
308    pub(crate) fn should_redact(&self) -> bool {
309        let name = self.name.to_lowercase();
310        REDACTED_COMMANDS.contains(name.as_str()) || should_redact_body(&self.body)
311    }
312
313    #[cfg(any(
314        feature = "zstd-compression",
315        feature = "zlib-compression",
316        feature = "snappy-compression"
317    ))]
318    pub(crate) fn should_compress(&self) -> bool {
319        let name = self.name.to_lowercase();
320        !REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
321    }
322}
323
324/// A response to a command with a body shaped deserialized to a `T`.
325#[derive(Deserialize, Debug)]
326#[serde(rename_all = "camelCase")]
327pub(crate) struct CommandResponse<T> {
328    pub(crate) ok: Bson,
329
330    #[serde(rename = "$clusterTime")]
331    pub(crate) cluster_time: Option<ClusterTime>,
332
333    #[serde(flatten)]
334    pub(crate) body: T,
335}
336
337impl<T: DeserializeOwned> CommandResponse<T> {
338    /// Whether the command succeeeded or not (i.e. if this response is ok: 1).
339    pub(crate) fn is_success(&self) -> bool {
340        bson_util::get_int(&self.ok) == Some(1)
341    }
342
343    pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
344        self.cluster_time.as_ref()
345    }
346}
347
348/// A response body useful for deserializing command errors.
349#[derive(Deserialize, Debug)]
350pub(crate) struct CommandErrorBody {
351    #[serde(rename = "errorLabels")]
352    pub(crate) error_labels: Option<Vec<String>>,
353
354    #[serde(flatten)]
355    pub(crate) command_error: CommandError,
356}
357
358impl From<CommandErrorBody> for Error {
359    fn from(command_error_response: CommandErrorBody) -> Error {
360        Error::new(
361            ErrorKind::Command(command_error_response.command_error),
362            command_error_response.error_labels,
363        )
364    }
365}
366
367/// Appends a serializable struct to the input document. The serializable struct MUST serialize to a
368/// Document; otherwise, an error will be thrown.
369pub(crate) fn append_options<T: Serialize + Debug>(
370    doc: &mut Document,
371    options: Option<&T>,
372) -> Result<()> {
373    if let Some(options) = options {
374        let options_doc = bson::to_document(options)?;
375        doc.extend(options_doc);
376    }
377    Ok(())
378}
379
380pub(crate) fn append_options_to_raw_document<T: Serialize>(
381    doc: &mut RawDocumentBuf,
382    options: Option<&T>,
383) -> Result<()> {
384    if let Some(options) = options {
385        let options_raw_doc = bson::to_raw_document_buf(options)?;
386        extend_raw_document_buf(doc, options_raw_doc)?;
387    }
388    Ok(())
389}
390
391#[derive(Deserialize, Debug)]
392pub(crate) struct SingleWriteBody {
393    n: u64,
394}
395
396/// Body of a write response that could possibly have a write concern error but not write errors.
397#[derive(Debug, Deserialize, Default, Clone)]
398pub(crate) struct WriteConcernOnlyBody {
399    #[serde(rename = "writeConcernError")]
400    write_concern_error: Option<WriteConcernError>,
401
402    #[serde(rename = "errorLabels")]
403    labels: Option<Vec<String>>,
404}
405
406impl WriteConcernOnlyBody {
407    fn validate(&self) -> Result<()> {
408        match self.write_concern_error {
409            Some(ref wc_error) => Err(Error::new(
410                ErrorKind::Write(WriteFailure::WriteConcernError(wc_error.clone())),
411                self.labels.clone(),
412            )),
413            None => Ok(()),
414        }
415    }
416}
417
418#[derive(Deserialize, Debug)]
419pub(crate) struct WriteResponseBody<T = SingleWriteBody> {
420    #[serde(flatten)]
421    body: T,
422
423    #[serde(rename = "writeErrors")]
424    write_errors: Option<Vec<IndexedWriteError>>,
425
426    #[serde(rename = "writeConcernError")]
427    write_concern_error: Option<WriteConcernError>,
428
429    #[serde(rename = "errorLabels")]
430    labels: Option<Vec<String>>,
431}
432
433impl<T> WriteResponseBody<T> {
434    fn validate(&self) -> Result<()> {
435        if self.write_errors.is_none() && self.write_concern_error.is_none() {
436            return Ok(());
437        };
438
439        let failure = InsertManyError {
440            write_errors: self.write_errors.clone(),
441            write_concern_error: self.write_concern_error.clone(),
442            inserted_ids: Default::default(),
443        };
444
445        Err(Error::new(
446            ErrorKind::InsertMany(failure),
447            self.labels.clone(),
448        ))
449    }
450}
451
452impl<T> Deref for WriteResponseBody<T> {
453    type Target = T;
454
455    fn deref(&self) -> &Self::Target {
456        &self.body
457    }
458}
459
460#[derive(Debug, Deserialize)]
461pub(crate) struct CursorBody {
462    cursor: CursorInfo,
463}
464
465impl CursorBody {
466    fn extract_at_cluster_time(response: &RawDocument) -> Result<Option<Timestamp>> {
467        Ok(response
468            .get("cursor")?
469            .and_then(RawBsonRef::as_document)
470            .map(|d| d.get("atClusterTime"))
471            .transpose()?
472            .flatten()
473            .and_then(RawBsonRef::as_timestamp))
474    }
475}
476
477#[derive(Debug, Deserialize, Clone)]
478#[serde(rename_all = "camelCase")]
479pub(crate) struct CursorInfo {
480    pub(crate) id: i64,
481
482    pub(crate) ns: Namespace,
483
484    pub(crate) first_batch: VecDeque<RawDocumentBuf>,
485
486    pub(crate) post_batch_resume_token: Option<RawDocumentBuf>,
487}
488
489/// Type used to deserialize just the first result from a cursor, if any.
490#[derive(Debug, Clone)]
491pub(crate) struct SingleCursorResult<T>(Option<T>);
492
493impl<'de, T> Deserialize<'de> for SingleCursorResult<T>
494where
495    T: Deserialize<'de>,
496{
497    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
498    where
499        D: serde::Deserializer<'de>,
500    {
501        #[derive(Deserialize)]
502        struct FullCursorBody<T> {
503            cursor: InteriorBody<T>,
504        }
505
506        #[derive(Deserialize)]
507        struct InteriorBody<T> {
508            #[serde(rename = "firstBatch")]
509            first_batch: Vec<T>,
510        }
511
512        let mut full_body = FullCursorBody::deserialize(deserializer)?;
513        Ok(SingleCursorResult(full_body.cursor.first_batch.pop()))
514    }
515}
516
517macro_rules! remove_empty_write_concern {
518    ($opts:expr) => {
519        if let Some(ref mut options) = $opts {
520            if let Some(ref write_concern) = options.write_concern {
521                if write_concern.is_empty() {
522                    options.write_concern = None;
523                }
524            }
525        }
526    };
527}
528
529pub(crate) use remove_empty_write_concern;