1mod abort_transaction;
2pub(crate) mod aggregate;
3pub(crate) mod bulk_write;
4mod commit_transaction;
5pub(crate) mod count;
6pub(crate) mod count_documents;
7pub(crate) mod create;
8mod create_indexes;
9mod delete;
10mod distinct;
11pub(crate) mod drop_collection;
12pub(crate) mod drop_database;
13mod drop_indexes;
14mod find;
15pub(crate) mod find_and_modify;
16mod get_more;
17mod insert;
18pub(crate) mod list_collections;
19pub(crate) mod list_databases;
20mod list_indexes;
21#[cfg(feature = "in-use-encryption")]
22mod raw_output;
23pub(crate) mod run_command;
24pub(crate) mod run_cursor_command;
25mod search_index;
26mod update;
27
28use std::{collections::VecDeque, fmt::Debug, ops::Deref};
29
30use bson::{RawBsonRef, RawDocument, RawDocumentBuf, Timestamp};
31use futures_util::FutureExt;
32use serde::{de::DeserializeOwned, Deserialize, Serialize};
33
34use crate::{
35 bson::{self, Bson, Document},
36 bson_util::{self, extend_raw_document_buf},
37 client::{ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
38 cmap::{
39 conn::{pooled::PooledConnection, PinnedConnectionHandle},
40 Command,
41 RawCommandResponse,
42 StreamDescription,
43 },
44 error::{
45 CommandError,
46 Error,
47 ErrorKind,
48 IndexedWriteError,
49 InsertManyError,
50 Result,
51 WriteConcernError,
52 WriteFailure,
53 },
54 options::WriteConcern,
55 selection_criteria::SelectionCriteria,
56 BoxFuture,
57 ClientSession,
58 Namespace,
59};
60
61pub(crate) use abort_transaction::AbortTransaction;
62pub(crate) use commit_transaction::CommitTransaction;
63pub(crate) use create_indexes::CreateIndexes;
64pub(crate) use delete::Delete;
65pub(crate) use distinct::Distinct;
66pub(crate) use drop_indexes::DropIndexes;
67pub(crate) use find::Find;
68pub(crate) use find_and_modify::FindAndModify;
69pub(crate) use get_more::GetMore;
70pub(crate) use insert::Insert;
71pub(crate) use list_indexes::ListIndexes;
72#[cfg(feature = "in-use-encryption")]
73pub(crate) use raw_output::RawOutput;
74pub(crate) use search_index::{CreateSearchIndexes, DropSearchIndex, UpdateSearchIndex};
75pub(crate) use update::{Update, UpdateOrReplace};
76
77const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
78const SERVER_4_4_0_WIRE_VERSION: i32 = 9;
79const SERVER_8_0_0_WIRE_VERSION: i32 = 25;
80const MAX_ENCRYPTED_WRITE_SIZE: usize = 2_097_152;
83const OP_MSG_OVERHEAD_BYTES: usize = 1_000;
86
87pub(crate) struct ExecutionContext<'a> {
89 pub(crate) connection: &'a mut PooledConnection,
90 pub(crate) session: Option<&'a mut ClientSession>,
91}
92
93#[derive(Debug, PartialEq, Clone, Copy)]
94pub(crate) enum Retryability {
95 Write,
96 Read,
97 None,
98}
99
100pub(crate) trait Operation {
105 type O;
107
108 const NAME: &'static str;
110
111 fn build(&mut self, description: &StreamDescription) -> Result<Command>;
114
115 fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>>;
118
119 fn handle_response<'a>(
121 &'a self,
122 response: RawCommandResponse,
123 context: ExecutionContext<'a>,
124 ) -> BoxFuture<'a, Result<Self::O>>;
125
126 fn handle_error(&self, error: Error) -> Result<Self::O>;
129
130 fn selection_criteria(&self) -> Option<&SelectionCriteria>;
132
133 fn is_acknowledged(&self) -> bool;
135
136 fn write_concern(&self) -> Option<&WriteConcern>;
138
139 fn supports_read_concern(&self, description: &StreamDescription) -> bool;
141
142 fn supports_sessions(&self) -> bool;
144
145 fn retryability(&self) -> Retryability;
147
148 fn update_for_retry(&mut self);
150
151 fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>;
152
153 fn name(&self) -> &str;
154}
155
156pub(crate) trait OperationWithDefaults: Send + Sync {
159 type O;
161
162 const NAME: &'static str;
164
165 fn build(&mut self, description: &StreamDescription) -> Result<Command>;
168
169 fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result<Option<Timestamp>> {
172 Ok(None)
173 }
174
175 fn handle_response<'a>(
177 &'a self,
178 _response: RawCommandResponse,
179 _context: ExecutionContext<'a>,
180 ) -> Result<Self::O> {
181 Err(ErrorKind::Internal {
182 message: format!("operation handling not implemented for {}", Self::NAME),
183 }
184 .into())
185 }
186
187 fn handle_response_async<'a>(
190 &'a self,
191 response: RawCommandResponse,
192 context: ExecutionContext<'a>,
193 ) -> BoxFuture<'a, Result<Self::O>> {
194 async move { self.handle_response(response, context) }.boxed()
195 }
196
197 fn handle_error(&self, error: Error) -> Result<Self::O> {
200 Err(error)
201 }
202
203 fn selection_criteria(&self) -> Option<&SelectionCriteria> {
205 None
206 }
207
208 fn is_acknowledged(&self) -> bool {
210 self.write_concern()
211 .map(WriteConcern::is_acknowledged)
212 .unwrap_or(true)
213 }
214
215 fn write_concern(&self) -> Option<&WriteConcern> {
217 None
218 }
219
220 fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
222 false
223 }
224
225 fn supports_sessions(&self) -> bool {
227 true
228 }
229
230 fn retryability(&self) -> Retryability {
232 Retryability::None
233 }
234
235 fn update_for_retry(&mut self) {}
237
238 fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
239 None
240 }
241
242 fn name(&self) -> &str {
243 Self::NAME
244 }
245}
246
247impl<T: OperationWithDefaults> Operation for T
248where
249 T: Send + Sync,
250{
251 type O = T::O;
252 const NAME: &'static str = T::NAME;
253 fn build(&mut self, description: &StreamDescription) -> Result<Command> {
254 self.build(description)
255 }
256 fn extract_at_cluster_time(&self, response: &RawDocument) -> Result<Option<Timestamp>> {
257 self.extract_at_cluster_time(response)
258 }
259 fn handle_response<'a>(
260 &'a self,
261 response: RawCommandResponse,
262 context: ExecutionContext<'a>,
263 ) -> BoxFuture<'a, Result<Self::O>> {
264 self.handle_response_async(response, context)
265 }
266 fn handle_error(&self, error: Error) -> Result<Self::O> {
267 self.handle_error(error)
268 }
269 fn selection_criteria(&self) -> Option<&SelectionCriteria> {
270 self.selection_criteria()
271 }
272 fn is_acknowledged(&self) -> bool {
273 self.is_acknowledged()
274 }
275 fn write_concern(&self) -> Option<&WriteConcern> {
276 self.write_concern()
277 }
278 fn supports_read_concern(&self, description: &StreamDescription) -> bool {
279 self.supports_read_concern(description)
280 }
281 fn supports_sessions(&self) -> bool {
282 self.supports_sessions()
283 }
284 fn retryability(&self) -> Retryability {
285 self.retryability()
286 }
287 fn update_for_retry(&mut self) {
288 self.update_for_retry()
289 }
290 fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
291 self.pinned_connection()
292 }
293 fn name(&self) -> &str {
294 self.name()
295 }
296}
297
298fn should_redact_body(body: &RawDocumentBuf) -> bool {
299 if let Some(Ok((command_name, _))) = body.into_iter().next() {
300 HELLO_COMMAND_NAMES.contains(command_name.to_lowercase().as_str())
301 && body.get("speculativeAuthenticate").ok().flatten().is_some()
302 } else {
303 false
304 }
305}
306
307impl Command {
308 pub(crate) fn should_redact(&self) -> bool {
309 let name = self.name.to_lowercase();
310 REDACTED_COMMANDS.contains(name.as_str()) || should_redact_body(&self.body)
311 }
312
313 #[cfg(any(
314 feature = "zstd-compression",
315 feature = "zlib-compression",
316 feature = "snappy-compression"
317 ))]
318 pub(crate) fn should_compress(&self) -> bool {
319 let name = self.name.to_lowercase();
320 !REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
321 }
322}
323
324#[derive(Deserialize, Debug)]
326#[serde(rename_all = "camelCase")]
327pub(crate) struct CommandResponse<T> {
328 pub(crate) ok: Bson,
329
330 #[serde(rename = "$clusterTime")]
331 pub(crate) cluster_time: Option<ClusterTime>,
332
333 #[serde(flatten)]
334 pub(crate) body: T,
335}
336
337impl<T: DeserializeOwned> CommandResponse<T> {
338 pub(crate) fn is_success(&self) -> bool {
340 bson_util::get_int(&self.ok) == Some(1)
341 }
342
343 pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
344 self.cluster_time.as_ref()
345 }
346}
347
348#[derive(Deserialize, Debug)]
350pub(crate) struct CommandErrorBody {
351 #[serde(rename = "errorLabels")]
352 pub(crate) error_labels: Option<Vec<String>>,
353
354 #[serde(flatten)]
355 pub(crate) command_error: CommandError,
356}
357
358impl From<CommandErrorBody> for Error {
359 fn from(command_error_response: CommandErrorBody) -> Error {
360 Error::new(
361 ErrorKind::Command(command_error_response.command_error),
362 command_error_response.error_labels,
363 )
364 }
365}
366
367pub(crate) fn append_options<T: Serialize + Debug>(
370 doc: &mut Document,
371 options: Option<&T>,
372) -> Result<()> {
373 if let Some(options) = options {
374 let options_doc = bson::to_document(options)?;
375 doc.extend(options_doc);
376 }
377 Ok(())
378}
379
380pub(crate) fn append_options_to_raw_document<T: Serialize>(
381 doc: &mut RawDocumentBuf,
382 options: Option<&T>,
383) -> Result<()> {
384 if let Some(options) = options {
385 let options_raw_doc = bson::to_raw_document_buf(options)?;
386 extend_raw_document_buf(doc, options_raw_doc)?;
387 }
388 Ok(())
389}
390
391#[derive(Deserialize, Debug)]
392pub(crate) struct SingleWriteBody {
393 n: u64,
394}
395
396#[derive(Debug, Deserialize, Default, Clone)]
398pub(crate) struct WriteConcernOnlyBody {
399 #[serde(rename = "writeConcernError")]
400 write_concern_error: Option<WriteConcernError>,
401
402 #[serde(rename = "errorLabels")]
403 labels: Option<Vec<String>>,
404}
405
406impl WriteConcernOnlyBody {
407 fn validate(&self) -> Result<()> {
408 match self.write_concern_error {
409 Some(ref wc_error) => Err(Error::new(
410 ErrorKind::Write(WriteFailure::WriteConcernError(wc_error.clone())),
411 self.labels.clone(),
412 )),
413 None => Ok(()),
414 }
415 }
416}
417
418#[derive(Deserialize, Debug)]
419pub(crate) struct WriteResponseBody<T = SingleWriteBody> {
420 #[serde(flatten)]
421 body: T,
422
423 #[serde(rename = "writeErrors")]
424 write_errors: Option<Vec<IndexedWriteError>>,
425
426 #[serde(rename = "writeConcernError")]
427 write_concern_error: Option<WriteConcernError>,
428
429 #[serde(rename = "errorLabels")]
430 labels: Option<Vec<String>>,
431}
432
433impl<T> WriteResponseBody<T> {
434 fn validate(&self) -> Result<()> {
435 if self.write_errors.is_none() && self.write_concern_error.is_none() {
436 return Ok(());
437 };
438
439 let failure = InsertManyError {
440 write_errors: self.write_errors.clone(),
441 write_concern_error: self.write_concern_error.clone(),
442 inserted_ids: Default::default(),
443 };
444
445 Err(Error::new(
446 ErrorKind::InsertMany(failure),
447 self.labels.clone(),
448 ))
449 }
450}
451
452impl<T> Deref for WriteResponseBody<T> {
453 type Target = T;
454
455 fn deref(&self) -> &Self::Target {
456 &self.body
457 }
458}
459
460#[derive(Debug, Deserialize)]
461pub(crate) struct CursorBody {
462 cursor: CursorInfo,
463}
464
465impl CursorBody {
466 fn extract_at_cluster_time(response: &RawDocument) -> Result<Option<Timestamp>> {
467 Ok(response
468 .get("cursor")?
469 .and_then(RawBsonRef::as_document)
470 .map(|d| d.get("atClusterTime"))
471 .transpose()?
472 .flatten()
473 .and_then(RawBsonRef::as_timestamp))
474 }
475}
476
477#[derive(Debug, Deserialize, Clone)]
478#[serde(rename_all = "camelCase")]
479pub(crate) struct CursorInfo {
480 pub(crate) id: i64,
481
482 pub(crate) ns: Namespace,
483
484 pub(crate) first_batch: VecDeque<RawDocumentBuf>,
485
486 pub(crate) post_batch_resume_token: Option<RawDocumentBuf>,
487}
488
489#[derive(Debug, Clone)]
491pub(crate) struct SingleCursorResult<T>(Option<T>);
492
493impl<'de, T> Deserialize<'de> for SingleCursorResult<T>
494where
495 T: Deserialize<'de>,
496{
497 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
498 where
499 D: serde::Deserializer<'de>,
500 {
501 #[derive(Deserialize)]
502 struct FullCursorBody<T> {
503 cursor: InteriorBody<T>,
504 }
505
506 #[derive(Deserialize)]
507 struct InteriorBody<T> {
508 #[serde(rename = "firstBatch")]
509 first_batch: Vec<T>,
510 }
511
512 let mut full_body = FullCursorBody::deserialize(deserializer)?;
513 Ok(SingleCursorResult(full_body.cursor.first_batch.pop()))
514 }
515}
516
517macro_rules! remove_empty_write_concern {
518 ($opts:expr) => {
519 if let Some(ref mut options) = $opts {
520 if let Some(ref write_concern) = options.write_concern {
521 if write_concern.is_empty() {
522 options.write_concern = None;
523 }
524 }
525 }
526 };
527}
528
529pub(crate) use remove_empty_write_concern;