use crate::bson::RawDocumentBuf;
use crate::{
bson::{rawdoc, Document},
bson_compat::{cstr, CStr},
cmap::{Command, RawCommandResponse, StreamDescription},
cursor::CursorSpecification,
error::{Error, Result},
operation::{OperationWithDefaults, Retryability, SERVER_4_4_0_WIRE_VERSION},
options::{CursorType, FindOptions, SelectionCriteria},
Namespace,
};
use super::{append_options_to_raw_document, ExecutionContext};
#[derive(Debug)]
pub(crate) struct Find {
ns: Namespace,
filter: Document,
options: Option<Box<FindOptions>>,
}
impl Find {
pub(crate) fn new(ns: Namespace, filter: Document, options: Option<FindOptions>) -> Self {
Self {
ns,
filter,
options: options.map(Box::new),
}
}
}
impl OperationWithDefaults for Find {
type O = CursorSpecification;
const NAME: &'static CStr = cstr!("find");
const ZERO_COPY: bool = true;
fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
let mut body = rawdoc! {
Self::NAME: self.ns.coll.clone(),
};
if let Some(ref mut options) = self.options {
if options.limit.map(|limit| limit < 0) == Some(true) {
body.append(cstr!("singleBatch"), true);
}
if let Some(ref mut batch_size) = options.batch_size {
if i32::try_from(*batch_size).is_err() {
return Err(Error::invalid_argument(
"the batch size must fit into a signed 32-bit integer",
));
}
if let Some(limit) = options.limit.and_then(|limit| u32::try_from(limit).ok()) {
if *batch_size == limit {
*batch_size += 1;
}
}
}
match options.cursor_type {
Some(CursorType::Tailable) => {
body.append(cstr!("tailable"), true);
}
Some(CursorType::TailableAwait) => {
body.append(cstr!("tailable"), true);
body.append(cstr!("awaitData"), true);
}
_ => {}
};
}
append_options_to_raw_document(&mut body, self.options.as_ref())?;
let raw_filter: RawDocumentBuf = (&self.filter).try_into()?;
body.append(cstr!("filter"), raw_filter);
Ok(Command::new_read(
Self::NAME,
&self.ns.db,
self.options.as_ref().and_then(|o| o.read_concern.clone()),
body,
))
}
fn extract_at_cluster_time(
&self,
response: &crate::bson::RawDocument,
) -> Result<Option<crate::bson::Timestamp>> {
super::cursor_get_at_cluster_time(response)
}
fn handle_response_cow<'a>(
&'a self,
response: std::borrow::Cow<'a, RawCommandResponse>,
context: ExecutionContext<'a>,
) -> Result<Self::O> {
let description = context.connection.stream_description()?;
let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION {
None
} else {
self.options.as_ref().and_then(|opts| opts.comment.clone())
};
CursorSpecification::new(
response.into_owned(),
description.server_address.clone(),
self.options.as_ref().and_then(|opts| opts.batch_size),
self.options.as_ref().and_then(|opts| opts.max_await_time),
comment,
)
}
fn supports_read_concern(&self, _description: &StreamDescription) -> bool {
true
}
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.options
.as_ref()
.and_then(|opts| opts.selection_criteria.as_ref())
}
fn retryability(&self) -> Retryability {
Retryability::Read
}
#[cfg(feature = "opentelemetry")]
type Otel = crate::otel::Witness<Self>;
}
#[cfg(feature = "opentelemetry")]
impl crate::otel::OtelInfoDefaults for Find {
fn target(&self) -> crate::otel::OperationTarget<'_> {
(&self.ns).into()
}
fn output_cursor_id(output: &Self::O) -> Option<i64> {
Some(output.id())
}
}