mongodb/
operation.rs

1mod abort_transaction;
2pub(crate) mod aggregate;
3pub(crate) mod bulk_write;
4mod commit_transaction;
5pub(crate) mod count;
6pub(crate) mod count_documents;
7pub(crate) mod create;
8mod create_indexes;
9mod delete;
10mod distinct;
11pub(crate) mod drop_collection;
12pub(crate) mod drop_database;
13mod drop_indexes;
14mod find;
15pub(crate) mod find_and_modify;
16mod get_more;
17mod insert;
18pub(crate) mod list_collections;
19pub(crate) mod list_databases;
20mod list_indexes;
21#[cfg(feature = "in-use-encryption")]
22mod raw_output;
23pub(crate) mod run_command;
24pub(crate) mod run_cursor_command;
25mod search_index;
26mod update;
27
28use std::{collections::VecDeque, fmt::Debug, ops::Deref};
29
30use bson::{RawBsonRef, RawDocument, RawDocumentBuf, Timestamp};
31use futures_util::FutureExt;
32use serde::{de::DeserializeOwned, Deserialize, Serialize};
33
34use crate::{
35    bson::{self, Bson, Document},
36    bson_compat::CStr,
37    bson_util::{self, extend_raw_document_buf},
38    client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
39    cmap::{
40        conn::{pooled::PooledConnection, PinnedConnectionHandle},
41        Command,
42        RawCommandResponse,
43        StreamDescription,
44    },
45    error::{
46        CommandError,
47        Error,
48        ErrorKind,
49        IndexedWriteError,
50        InsertManyError,
51        Result,
52        WriteConcernError,
53        WriteFailure,
54    },
55    options::{ClientOptions, WriteConcern},
56    selection_criteria::SelectionCriteria,
57    BoxFuture,
58    ClientSession,
59    Namespace,
60};
61
62pub(crate) use abort_transaction::AbortTransaction;
63pub(crate) use commit_transaction::CommitTransaction;
64pub(crate) use create_indexes::CreateIndexes;
65pub(crate) use delete::Delete;
66pub(crate) use distinct::Distinct;
67pub(crate) use drop_indexes::DropIndexes;
68pub(crate) use find::Find;
69pub(crate) use find_and_modify::FindAndModify;
70pub(crate) use get_more::GetMore;
71pub(crate) use insert::Insert;
72pub(crate) use list_indexes::ListIndexes;
73#[cfg(feature = "in-use-encryption")]
74pub(crate) use raw_output::RawOutput;
75pub(crate) use search_index::{CreateSearchIndexes, DropSearchIndex, UpdateSearchIndex};
76pub(crate) use update::{Update, UpdateOrReplace};
77
78const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79const SERVER_5_0_0_WIRE_VERSION: i32 = 13;
80const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
81// The maximum number of bytes that may be included in a write payload when auto-encryption is
82// enabled.
83const MAX_ENCRYPTED_WRITE_SIZE: usize = 2_097_152;
84// The amount of message overhead (OP_MSG bytes and command-agnostic fields) to account for when
85// building a multi-write operation using document sequences.
86const OP_MSG_OVERHEAD_BYTES: usize = 1_000;
87
88/// Context about the execution of the operation.
89pub(crate) struct ExecutionContext<'a> {
90    pub(crate) connection: &'a mut PooledConnection,
91    pub(crate) session: Option<&'a mut ClientSession>,
92    pub(crate) effective_criteria: SelectionCriteria,
93}
94
95#[derive(Debug, PartialEq, Clone, Copy)]
96pub(crate) enum Retryability {
97    Write,
98    Read,
99    None,
100}
101
102impl Retryability {
103    /// Returns this level of retryability in tandem with the client options.
104    pub(crate) fn with_options(&self, options: &ClientOptions) -> Self {
105        match self {
106            Self::Write if options.retry_writes != Some(false) => Self::Write,
107            Self::Read if options.retry_reads != Some(false) => Self::Read,
108            _ => Self::None,
109        }
110    }
111
112    /// Whether this level of retryability can retry the given error.
113    pub(crate) fn can_retry_error(&self, error: &Error) -> bool {
114        match self {
115            Self::Write => error.is_write_retryable(),
116            Self::Read => error.is_read_retryable(),
117            Self::None => false,
118        }
119    }
120}
121
122/// A trait modeling the behavior of a server side operation.
123///
124/// No methods in this trait should have default behaviors to ensure that wrapper operations
125/// replicate all behavior.  Default behavior is provided by the `OperationDefault` trait.
126pub(crate) trait Operation {
127    /// The output type of this operation.
128    type O;
129
130    /// The name of the server side command associated with this operation.
131    const NAME: &'static CStr;
132
133    /// Returns the command that should be sent to the server as part of this operation.
134    /// The operation may store some additional state that is required for handling the response.
135    fn build(&mut self, description: &StreamDescription) -> Result<Command>;
136
137    /// Parse the response for the atClusterTime field.
138    /// Depending on the operation, this may be found in different locations.
139    fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>>;
140
141    /// Interprets the server response to the command.
142    fn handle_response<'a>(
143        &'a self,
144        response: RawCommandResponse,
145        context: ExecutionContext<'a>,
146    ) -> BoxFuture<'a, Result<Self::O>>;
147
148    /// Interpret an error encountered while sending the built command to the server, potentially
149    /// recovering.
150    fn handle_error(&self, error: Error) -> Result<Self::O>;
151
152    /// Criteria to use for selecting the server that this operation will be executed on.
153    fn selection_criteria(&self) -> Option<&SelectionCriteria>;
154
155    /// Whether or not this operation will request acknowledgment from the server.
156    fn is_acknowledged(&self) -> bool;
157
158    /// The write concern to use for this operation, if any.
159    fn write_concern(&self) -> Option<&WriteConcern>;
160
161    /// Returns whether or not this command supports the `readConcern` field.
162    fn supports_read_concern(&self, description: &StreamDescription) -> bool;
163
164    /// Whether this operation supports sessions or not.
165    fn supports_sessions(&self) -> bool;
166
167    /// The level of retryability the operation supports.
168    fn retryability(&self) -> Retryability;
169
170    /// Updates this operation as needed for a retry.
171    fn update_for_retry(&mut self);
172
173    /// Returns a function handle to potentially override selection criteria based on server
174    /// topology.
175    fn override_criteria(&self) -> OverrideCriteriaFn;
176
177    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
178
179    /// The name of the server side command associated with this operation.
180    fn name(&self) -> &CStr;
181}
182
183pub(crate) type OverrideCriteriaFn =
184    fn(&SelectionCriteria, &crate::sdam::TopologyDescription) -> Option<SelectionCriteria>;
185
186// A mirror of the `Operation` trait, with default behavior where appropriate.  Should only be
187// implemented by operation types that do not delegate to other operations.
188pub(crate) trait OperationWithDefaults: Send + Sync {
189    /// The output type of this operation.
190    type O;
191
192    /// The name of the server side command associated with this operation.
193    const NAME: &'static CStr;
194
195    /// Returns the command that should be sent to the server as part of this operation.
196    /// The operation may store some additional state that is required for handling the response.
197    fn build(&mut self, description: &StreamDescription) -> Result<Command>;
198
199    /// Parse the response for the atClusterTime field.
200    /// Depending on the operation, this may be found in different locations.
201    fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>> {
202        Ok(None)
203    }
204
205    /// Interprets the server response to the command.
206    fn handle_response<'a>(
207        &'a self,
208        _response: RawCommandResponse,
209        _context: ExecutionContext<'a>,
210    ) -> Result<Self::O> {
211        Err(ErrorKind::Internal {
212            message: format!("operation handling not implemented for {}", Self::NAME),
213        }
214        .into())
215    }
216
217    /// Interprets the server response to the command. This method should only be implemented when
218    /// async code is required to handle the response.
219    fn handle_response_async<'a>(
220        &'a self,
221        response: RawCommandResponse,
222        context: ExecutionContext<'a>,
223    ) -> BoxFuture<'a, Result<Self::O>> {
224        async move { self.handle_response(response, context) }.boxed()
225    }
226
227    /// Interpret an error encountered while sending the built command to the server, potentially
228    /// recovering.
229    fn handle_error(&self, error: Error) -> Result<Self::O> {
230        Err(error)
231    }
232
233    /// Criteria to use for selecting the server that this operation will be executed on.
234    fn selection_criteria(&self) -> Option<&SelectionCriteria> {
235        None
236    }
237
238    /// Whether or not this operation will request acknowledgment from the server.
239    fn is_acknowledged(&self) -> bool {
240        self.write_concern()
241            .map(WriteConcern::is_acknowledged)
242            .unwrap_or(true)
243    }
244
245    /// The write concern to use for this operation, if any.
246    fn write_concern(&self) -> Option<&WriteConcern> {
247        None
248    }
249
250    /// Returns whether or not this command supports the `readConcern` field.
251    fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
252        false
253    }
254
255    /// Whether this operation supports sessions or not.
256    fn supports_sessions(&self) -> bool {
257        true
258    }
259
260    /// The level of retryability the operation supports.
261    fn retryability(&self) -> Retryability {
262        Retryability::None
263    }
264
265    /// Updates this operation as needed for a retry.
266    fn update_for_retry(&mut self) {}
267
268    /// Returns a function handle to potentially override selection criteria based on server
269    /// topology.
270    fn override_criteria(&self) -> OverrideCriteriaFn {
271        |_, _| None
272    }
273
274    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
275        None
276    }
277
278    /// The name of the server side command associated with this operation.
279    fn name(&self) -> &CStr {
280        Self::NAME
281    }
282}
283
284impl<T: OperationWithDefaults> Operation for T
285where
286    T: Send + Sync,
287{
288    type O = T::O;
289    const NAME: &'static CStr = T::NAME;
290    fn build(&mut self, description: &StreamDescription) -> Result<Command> {
291        self.build(description)
292    }
293    fn extract_at_cluster_time(&self, response: &RawDocument) -> Result<Option<Timestamp>> {
294        self.extract_at_cluster_time(response)
295    }
296    fn handle_response<'a>(
297        &'a self,
298        response: RawCommandResponse,
299        context: ExecutionContext<'a>,
300    ) -> BoxFuture<'a, Result<Self::O>> {
301        self.handle_response_async(response, context)
302    }
303    fn handle_error(&self, error: Error) -> Result<Self::O> {
304        self.handle_error(error)
305    }
306    fn selection_criteria(&self) -> Option<&SelectionCriteria> {
307        self.selection_criteria()
308    }
309    fn is_acknowledged(&self) -> bool {
310        self.is_acknowledged()
311    }
312    fn write_concern(&self) -> Option<&WriteConcern> {
313        self.write_concern()
314    }
315    fn supports_read_concern(&self, description: &StreamDescription) -> bool {
316        self.supports_read_concern(description)
317    }
318    fn supports_sessions(&self) -> bool {
319        self.supports_sessions()
320    }
321    fn retryability(&self) -> Retryability {
322        self.retryability()
323    }
324    fn update_for_retry(&mut self) {
325        self.update_for_retry()
326    }
327    fn override_criteria(&self) -> OverrideCriteriaFn {
328        self.override_criteria()
329    }
330    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
331        self.pinned_connection()
332    }
333    fn name(&self) -> &CStr {
334        self.name()
335    }
336}
337
338fn should_redact_body(body: &RawDocumentBuf) -> bool {
339    if let Some(Ok((command_name, _))) = body.into_iter().next() {
340        HELLO_COMMAND_NAMES.contains(command_name.to_lowercase().as_str())
341            && body.get("speculativeAuthenticate").ok().flatten().is_some()
342    } else {
343        false
344    }
345}
346
347impl Command {
348    pub(crate) fn should_redact(&self) -> bool {
349        let name = self.name.to_lowercase();
350        REDACTED_COMMANDS.contains(name.as_str()) || should_redact_body(&self.body)
351    }
352
353    #[cfg(any(
354        feature = "zstd-compression",
355        feature = "zlib-compression",
356        feature = "snappy-compression"
357    ))]
358    pub(crate) fn should_compress(&self) -> bool {
359        let name = self.name.to_lowercase();
360        !REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
361    }
362}
363
364/// A response to a command with a body shaped deserialized to a `T`.
365#[derive(Deserialize, Debug)]
366#[serde(rename_all = "camelCase")]
367pub(crate) struct CommandResponse<T> {
368    pub(crate) ok: Bson,
369
370    #[serde(rename = "$clusterTime")]
371    pub(crate) cluster_time: Option<ClusterTime>,
372
373    #[serde(flatten)]
374    pub(crate) body: T,
375}
376
377impl<T: DeserializeOwned> CommandResponse<T> {
378    /// Whether the command succeeeded or not (i.e. if this response is ok: 1).
379    pub(crate) fn is_success(&self) -> bool {
380        bson_util::get_int(&self.ok) == Some(1)
381    }
382
383    pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
384        self.cluster_time.as_ref()
385    }
386}
387
388/// A response body useful for deserializing command errors.
389#[derive(Deserialize, Debug)]
390pub(crate) struct CommandErrorBody {
391    #[serde(rename = "errorLabels")]
392    pub(crate) error_labels: Option<Vec<String>>,
393
394    #[serde(flatten)]
395    pub(crate) command_error: CommandError,
396}
397
398impl From<CommandErrorBody> for Error {
399    fn from(command_error_response: CommandErrorBody) -> Error {
400        Error::new(
401            ErrorKind::Command(command_error_response.command_error),
402            command_error_response.error_labels,
403        )
404    }
405}
406
407/// Appends a serializable struct to the input document. The serializable struct MUST serialize to a
408/// Document; otherwise, an error will be thrown.
409pub(crate) fn append_options<T: Serialize + Debug>(
410    doc: &mut Document,
411    options: Option<&T>,
412) -> Result<()> {
413    if let Some(options) = options {
414        let options_doc = crate::bson_compat::serialize_to_document(options)?;
415        doc.extend(options_doc);
416    }
417    Ok(())
418}
419
420pub(crate) fn append_options_to_raw_document<T: Serialize>(
421    doc: &mut RawDocumentBuf,
422    options: Option<&T>,
423) -> Result<()> {
424    if let Some(options) = options {
425        let options_raw_doc = crate::bson_compat::serialize_to_raw_document_buf(options)?;
426        extend_raw_document_buf(doc, options_raw_doc)?;
427    }
428    Ok(())
429}
430
431#[derive(Deserialize, Debug)]
432pub(crate) struct SingleWriteBody {
433    n: u64,
434}
435
436/// Body of a write response that could possibly have a write concern error but not write errors.
437#[derive(Debug, Deserialize, Default, Clone)]
438pub(crate) struct WriteConcernOnlyBody {
439    #[serde(rename = "writeConcernError")]
440    write_concern_error: Option<WriteConcernError>,
441
442    #[serde(rename = "errorLabels")]
443    labels: Option<Vec<String>>,
444}
445
446impl WriteConcernOnlyBody {
447    fn validate(&self) -> Result<()> {
448        match self.write_concern_error {
449            Some(ref wc_error) => Err(Error::new(
450                ErrorKind::Write(WriteFailure::WriteConcernError(wc_error.clone())),
451                self.labels.clone(),
452            )),
453            None => Ok(()),
454        }
455    }
456}
457
458#[derive(Debug)]
459pub(crate) struct WriteResponseBody<T = SingleWriteBody> {
460    body: T,
461    write_errors: Option<Vec<IndexedWriteError>>,
462    write_concern_error: Option<WriteConcernError>,
463    labels: Option<Vec<String>>,
464}
465
466impl<'de, T: Deserialize<'de>> Deserialize<'de> for WriteResponseBody<T> {
467    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
468    where
469        D: serde::Deserializer<'de>,
470    {
471        use crate::bson_compat::Utf8Lossy;
472        #[derive(Deserialize)]
473        struct Helper<T> {
474            #[serde(flatten)]
475            body: T,
476            #[serde(rename = "writeErrors")]
477            write_errors: Option<Utf8Lossy<Vec<IndexedWriteError>>>,
478            #[serde(rename = "writeConcernError")]
479            write_concern_error: Option<Utf8Lossy<WriteConcernError>>,
480            #[serde(rename = "errorLabels")]
481            labels: Option<Vec<String>>,
482        }
483        let helper = Helper::deserialize(deserializer)?;
484        Ok(Self {
485            body: helper.body,
486            write_errors: helper.write_errors.map(|l| l.0),
487            write_concern_error: helper.write_concern_error.map(|l| l.0),
488            labels: helper.labels,
489        })
490    }
491}
492
493impl<T> WriteResponseBody<T> {
494    fn validate(&self) -> Result<()> {
495        if self.write_errors.is_none() && self.write_concern_error.is_none() {
496            return Ok(());
497        };
498
499        let failure = InsertManyError {
500            write_errors: self.write_errors.clone(),
501            write_concern_error: self.write_concern_error.clone(),
502            inserted_ids: Default::default(),
503        };
504
505        Err(Error::new(
506            ErrorKind::InsertMany(failure),
507            self.labels.clone(),
508        ))
509    }
510}
511
512impl<T> Deref for WriteResponseBody<T> {
513    type Target = T;
514
515    fn deref(&self) -> &Self::Target {
516        &self.body
517    }
518}
519
520#[derive(Debug, Deserialize)]
521pub(crate) struct CursorBody {
522    cursor: CursorInfo,
523}
524
525impl CursorBody {
526    fn extract_at_cluster_time(response: &RawDocument) -> Result<Option<Timestamp>> {
527        Ok(response
528            .get("cursor")?
529            .and_then(RawBsonRef::as_document)
530            .map(|d| d.get("atClusterTime"))
531            .transpose()?
532            .flatten()
533            .and_then(RawBsonRef::as_timestamp))
534    }
535}
536
537#[derive(Debug, Deserialize, Clone)]
538#[serde(rename_all = "camelCase")]
539pub(crate) struct CursorInfo {
540    pub(crate) id: i64,
541
542    pub(crate) ns: Namespace,
543
544    pub(crate) first_batch: VecDeque<RawDocumentBuf>,
545
546    pub(crate) post_batch_resume_token: Option<RawDocumentBuf>,
547}
548
549/// Type used to deserialize just the first result from a cursor, if any.
550#[derive(Debug, Clone)]
551pub(crate) struct SingleCursorResult<T>(Option<T>);
552
553impl<'de, T> Deserialize<'de> for SingleCursorResult<T>
554where
555    T: Deserialize<'de>,
556{
557    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
558    where
559        D: serde::Deserializer<'de>,
560    {
561        #[derive(Deserialize)]
562        struct FullCursorBody<T> {
563            cursor: InteriorBody<T>,
564        }
565
566        #[derive(Deserialize)]
567        struct InteriorBody<T> {
568            #[serde(rename = "firstBatch")]
569            first_batch: Vec<T>,
570        }
571
572        let mut full_body = FullCursorBody::deserialize(deserializer)?;
573        Ok(SingleCursorResult(full_body.cursor.first_batch.pop()))
574    }
575}