1mod abort_transaction;
2pub(crate) mod aggregate;
3pub(crate) mod bulk_write;
4mod commit_transaction;
5pub(crate) mod count;
6pub(crate) mod count_documents;
7pub(crate) mod create;
8mod create_indexes;
9mod delete;
10mod distinct;
11pub(crate) mod drop_collection;
12pub(crate) mod drop_database;
13mod drop_indexes;
14mod find;
15pub(crate) mod find_and_modify;
16mod get_more;
17mod insert;
18pub(crate) mod list_collections;
19pub(crate) mod list_databases;
20mod list_indexes;
21#[cfg(feature = "in-use-encryption")]
22mod raw_output;
23pub(crate) mod run_command;
24pub(crate) mod run_cursor_command;
25mod search_index;
26mod update;
27
28use std::{collections::VecDeque, fmt::Debug, ops::Deref};
29
30use bson::{RawBsonRef, RawDocument, RawDocumentBuf, Timestamp};
31use futures_util::FutureExt;
32use serde::{de::DeserializeOwned, Deserialize, Serialize};
33
34use crate::{
35 bson::{self, Bson, Document},
36 bson_compat::CStr,
37 bson_util::{self, extend_raw_document_buf},
38 client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
39 cmap::{
40 conn::{pooled::PooledConnection, PinnedConnectionHandle},
41 Command,
42 RawCommandResponse,
43 StreamDescription,
44 },
45 error::{
46 CommandError,
47 Error,
48 ErrorKind,
49 IndexedWriteError,
50 InsertManyError,
51 Result,
52 WriteConcernError,
53 WriteFailure,
54 },
55 options::{ClientOptions, WriteConcern},
56 selection_criteria::SelectionCriteria,
57 BoxFuture,
58 ClientSession,
59 Namespace,
60};
61
62pub(crate) use abort_transaction::AbortTransaction;
63pub(crate) use commit_transaction::CommitTransaction;
64pub(crate) use create_indexes::CreateIndexes;
65pub(crate) use delete::Delete;
66pub(crate) use distinct::Distinct;
67pub(crate) use drop_indexes::DropIndexes;
68pub(crate) use find::Find;
69pub(crate) use find_and_modify::FindAndModify;
70pub(crate) use get_more::GetMore;
71pub(crate) use insert::Insert;
72pub(crate) use list_indexes::ListIndexes;
73#[cfg(feature = "in-use-encryption")]
74pub(crate) use raw_output::RawOutput;
75pub(crate) use search_index::{CreateSearchIndexes, DropSearchIndex, UpdateSearchIndex};
76pub(crate) use update::{Update, UpdateOrReplace};
77
78const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79const SERVER_5_0_0_WIRE_VERSION: i32 = 13;
80const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
81const MAX_ENCRYPTED_WRITE_SIZE: usize = 2_097_152;
84const OP_MSG_OVERHEAD_BYTES: usize = 1_000;
87
88pub(crate) struct ExecutionContext<'a> {
90 pub(crate) connection: &'a mut PooledConnection,
91 pub(crate) session: Option<&'a mut ClientSession>,
92 pub(crate) effective_criteria: SelectionCriteria,
93}
94
95#[derive(Debug, PartialEq, Clone, Copy)]
96pub(crate) enum Retryability {
97 Write,
98 Read,
99 None,
100}
101
102impl Retryability {
103 pub(crate) fn with_options(&self, options: &ClientOptions) -> Self {
105 match self {
106 Self::Write if options.retry_writes != Some(false) => Self::Write,
107 Self::Read if options.retry_reads != Some(false) => Self::Read,
108 _ => Self::None,
109 }
110 }
111
112 pub(crate) fn can_retry_error(&self, error: &Error) -> bool {
114 match self {
115 Self::Write => error.is_write_retryable(),
116 Self::Read => error.is_read_retryable(),
117 Self::None => false,
118 }
119 }
120}
121
122pub(crate) trait Operation {
127 type O;
129
130 const NAME: &'static CStr;
132
133 fn build(&mut self, description: &StreamDescription) -> Result<Command>;
136
137 fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>>;
140
141 fn handle_response<'a>(
143 &'a self,
144 response: RawCommandResponse,
145 context: ExecutionContext<'a>,
146 ) -> BoxFuture<'a, Result<Self::O>>;
147
148 fn handle_error(&self, error: Error) -> Result<Self::O>;
151
152 fn selection_criteria(&self) -> Option<&SelectionCriteria>;
154
155 fn is_acknowledged(&self) -> bool;
157
158 fn write_concern(&self) -> Option<&WriteConcern>;
160
161 fn supports_read_concern(&self, description: &StreamDescription) -> bool;
163
164 fn supports_sessions(&self) -> bool;
166
167 fn retryability(&self) -> Retryability;
169
170 fn update_for_retry(&mut self);
172
173 fn override_criteria(&self) -> OverrideCriteriaFn;
176
177 fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
178
179 fn name(&self) -> &CStr;
181}
182
183pub(crate) type OverrideCriteriaFn =
184 fn(&SelectionCriteria, &crate::sdam::TopologyDescription) -> Option<SelectionCriteria>;
185
186pub(crate) trait OperationWithDefaults: Send + Sync {
189 type O;
191
192 const NAME: &'static CStr;
194
195 fn build(&mut self, description: &StreamDescription) -> Result<Command>;
198
199 fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>> {
202 Ok(None)
203 }
204
205 fn handle_response<'a>(
207 &'a self,
208 _response: RawCommandResponse,
209 _context: ExecutionContext<'a>,
210 ) -> Result<Self::O> {
211 Err(ErrorKind::Internal {
212 message: format!("operation handling not implemented for {}", Self::NAME),
213 }
214 .into())
215 }
216
217 fn handle_response_async<'a>(
220 &'a self,
221 response: RawCommandResponse,
222 context: ExecutionContext<'a>,
223 ) -> BoxFuture<'a, Result<Self::O>> {
224 async move { self.handle_response(response, context) }.boxed()
225 }
226
227 fn handle_error(&self, error: Error) -> Result<Self::O> {
230 Err(error)
231 }
232
233 fn selection_criteria(&self) -> Option<&SelectionCriteria> {
235 None
236 }
237
238 fn is_acknowledged(&self) -> bool {
240 self.write_concern()
241 .map(WriteConcern::is_acknowledged)
242 .unwrap_or(true)
243 }
244
245 fn write_concern(&self) -> Option<&WriteConcern> {
247 None
248 }
249
250 fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
252 false
253 }
254
255 fn supports_sessions(&self) -> bool {
257 true
258 }
259
260 fn retryability(&self) -> Retryability {
262 Retryability::None
263 }
264
265 fn update_for_retry(&mut self) {}
267
268 fn override_criteria(&self) -> OverrideCriteriaFn {
271 |_, _| None
272 }
273
274 fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
275 None
276 }
277
278 fn name(&self) -> &CStr {
280 Self::NAME
281 }
282}
283
284impl<T: OperationWithDefaults> Operation for T
285where
286 T: Send + Sync,
287{
288 type O = T::O;
289 const NAME: &'static CStr = T::NAME;
290 fn build(&mut self, description: &StreamDescription) -> Result<Command> {
291 self.build(description)
292 }
293 fn extract_at_cluster_time(&self, response: &RawDocument) -> Result<Option<Timestamp>> {
294 self.extract_at_cluster_time(response)
295 }
296 fn handle_response<'a>(
297 &'a self,
298 response: RawCommandResponse,
299 context: ExecutionContext<'a>,
300 ) -> BoxFuture<'a, Result<Self::O>> {
301 self.handle_response_async(response, context)
302 }
303 fn handle_error(&self, error: Error) -> Result<Self::O> {
304 self.handle_error(error)
305 }
306 fn selection_criteria(&self) -> Option<&SelectionCriteria> {
307 self.selection_criteria()
308 }
309 fn is_acknowledged(&self) -> bool {
310 self.is_acknowledged()
311 }
312 fn write_concern(&self) -> Option<&WriteConcern> {
313 self.write_concern()
314 }
315 fn supports_read_concern(&self, description: &StreamDescription) -> bool {
316 self.supports_read_concern(description)
317 }
318 fn supports_sessions(&self) -> bool {
319 self.supports_sessions()
320 }
321 fn retryability(&self) -> Retryability {
322 self.retryability()
323 }
324 fn update_for_retry(&mut self) {
325 self.update_for_retry()
326 }
327 fn override_criteria(&self) -> OverrideCriteriaFn {
328 self.override_criteria()
329 }
330 fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
331 self.pinned_connection()
332 }
333 fn name(&self) -> &CStr {
334 self.name()
335 }
336}
337
338fn should_redact_body(body: &RawDocumentBuf) -> bool {
339 if let Some(Ok((command_name, _))) = body.into_iter().next() {
340 HELLO_COMMAND_NAMES.contains(command_name.to_lowercase().as_str())
341 && body.get("speculativeAuthenticate").ok().flatten().is_some()
342 } else {
343 false
344 }
345}
346
347impl Command {
348 pub(crate) fn should_redact(&self) -> bool {
349 let name = self.name.to_lowercase();
350 REDACTED_COMMANDS.contains(name.as_str()) || should_redact_body(&self.body)
351 }
352
353 #[cfg(any(
354 feature = "zstd-compression",
355 feature = "zlib-compression",
356 feature = "snappy-compression"
357 ))]
358 pub(crate) fn should_compress(&self) -> bool {
359 let name = self.name.to_lowercase();
360 !REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
361 }
362}
363
364#[derive(Deserialize, Debug)]
366#[serde(rename_all = "camelCase")]
367pub(crate) struct CommandResponse<T> {
368 pub(crate) ok: Bson,
369
370 #[serde(rename = "$clusterTime")]
371 pub(crate) cluster_time: Option<ClusterTime>,
372
373 #[serde(flatten)]
374 pub(crate) body: T,
375}
376
377impl<T: DeserializeOwned> CommandResponse<T> {
378 pub(crate) fn is_success(&self) -> bool {
380 bson_util::get_int(&self.ok) == Some(1)
381 }
382
383 pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
384 self.cluster_time.as_ref()
385 }
386}
387
388#[derive(Deserialize, Debug)]
390pub(crate) struct CommandErrorBody {
391 #[serde(rename = "errorLabels")]
392 pub(crate) error_labels: Option<Vec<String>>,
393
394 #[serde(flatten)]
395 pub(crate) command_error: CommandError,
396}
397
398impl From<CommandErrorBody> for Error {
399 fn from(command_error_response: CommandErrorBody) -> Error {
400 Error::new(
401 ErrorKind::Command(command_error_response.command_error),
402 command_error_response.error_labels,
403 )
404 }
405}
406
407pub(crate) fn append_options<T: Serialize + Debug>(
410 doc: &mut Document,
411 options: Option<&T>,
412) -> Result<()> {
413 if let Some(options) = options {
414 let options_doc = crate::bson_compat::serialize_to_document(options)?;
415 doc.extend(options_doc);
416 }
417 Ok(())
418}
419
420pub(crate) fn append_options_to_raw_document<T: Serialize>(
421 doc: &mut RawDocumentBuf,
422 options: Option<&T>,
423) -> Result<()> {
424 if let Some(options) = options {
425 let options_raw_doc = crate::bson_compat::serialize_to_raw_document_buf(options)?;
426 extend_raw_document_buf(doc, options_raw_doc)?;
427 }
428 Ok(())
429}
430
431#[derive(Deserialize, Debug)]
432pub(crate) struct SingleWriteBody {
433 n: u64,
434}
435
436#[derive(Debug, Deserialize, Default, Clone)]
438pub(crate) struct WriteConcernOnlyBody {
439 #[serde(rename = "writeConcernError")]
440 write_concern_error: Option<WriteConcernError>,
441
442 #[serde(rename = "errorLabels")]
443 labels: Option<Vec<String>>,
444}
445
446impl WriteConcernOnlyBody {
447 fn validate(&self) -> Result<()> {
448 match self.write_concern_error {
449 Some(ref wc_error) => Err(Error::new(
450 ErrorKind::Write(WriteFailure::WriteConcernError(wc_error.clone())),
451 self.labels.clone(),
452 )),
453 None => Ok(()),
454 }
455 }
456}
457
458#[derive(Debug)]
459pub(crate) struct WriteResponseBody<T = SingleWriteBody> {
460 body: T,
461 write_errors: Option<Vec<IndexedWriteError>>,
462 write_concern_error: Option<WriteConcernError>,
463 labels: Option<Vec<String>>,
464}
465
466impl<'de, T: Deserialize<'de>> Deserialize<'de> for WriteResponseBody<T> {
467 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
468 where
469 D: serde::Deserializer<'de>,
470 {
471 use crate::bson_compat::Utf8Lossy;
472 #[derive(Deserialize)]
473 struct Helper<T> {
474 #[serde(flatten)]
475 body: T,
476 #[serde(rename = "writeErrors")]
477 write_errors: Option<Utf8Lossy<Vec<IndexedWriteError>>>,
478 #[serde(rename = "writeConcernError")]
479 write_concern_error: Option<Utf8Lossy<WriteConcernError>>,
480 #[serde(rename = "errorLabels")]
481 labels: Option<Vec<String>>,
482 }
483 let helper = Helper::deserialize(deserializer)?;
484 Ok(Self {
485 body: helper.body,
486 write_errors: helper.write_errors.map(|l| l.0),
487 write_concern_error: helper.write_concern_error.map(|l| l.0),
488 labels: helper.labels,
489 })
490 }
491}
492
493impl<T> WriteResponseBody<T> {
494 fn validate(&self) -> Result<()> {
495 if self.write_errors.is_none() && self.write_concern_error.is_none() {
496 return Ok(());
497 };
498
499 let failure = InsertManyError {
500 write_errors: self.write_errors.clone(),
501 write_concern_error: self.write_concern_error.clone(),
502 inserted_ids: Default::default(),
503 };
504
505 Err(Error::new(
506 ErrorKind::InsertMany(failure),
507 self.labels.clone(),
508 ))
509 }
510}
511
512impl<T> Deref for WriteResponseBody<T> {
513 type Target = T;
514
515 fn deref(&self) -> &Self::Target {
516 &self.body
517 }
518}
519
520#[derive(Debug, Deserialize)]
521pub(crate) struct CursorBody {
522 cursor: CursorInfo,
523}
524
525impl CursorBody {
526 fn extract_at_cluster_time(response: &RawDocument) -> Result<Option<Timestamp>> {
527 Ok(response
528 .get("cursor")?
529 .and_then(RawBsonRef::as_document)
530 .map(|d| d.get("atClusterTime"))
531 .transpose()?
532 .flatten()
533 .and_then(RawBsonRef::as_timestamp))
534 }
535}
536
537#[derive(Debug, Deserialize, Clone)]
538#[serde(rename_all = "camelCase")]
539pub(crate) struct CursorInfo {
540 pub(crate) id: i64,
541
542 pub(crate) ns: Namespace,
543
544 pub(crate) first_batch: VecDeque<RawDocumentBuf>,
545
546 pub(crate) post_batch_resume_token: Option<RawDocumentBuf>,
547}
548
549#[derive(Debug, Clone)]
551pub(crate) struct SingleCursorResult<T>(Option<T>);
552
553impl<'de, T> Deserialize<'de> for SingleCursorResult<T>
554where
555 T: Deserialize<'de>,
556{
557 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
558 where
559 D: serde::Deserializer<'de>,
560 {
561 #[derive(Deserialize)]
562 struct FullCursorBody<T> {
563 cursor: InteriorBody<T>,
564 }
565
566 #[derive(Deserialize)]
567 struct InteriorBody<T> {
568 #[serde(rename = "firstBatch")]
569 first_batch: Vec<T>,
570 }
571
572 let mut full_body = FullCursorBody::deserialize(deserializer)?;
573 Ok(SingleCursorResult(full_body.cursor.first_batch.pop()))
574 }
575}