use futures_util::FutureExt;
use crate::{
bson_compat::{cstr, CStr},
client::Retry,
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::WriteConcern,
cursor::common::CursorSpecification,
error::{Error, Result},
operation::{run_command::RunCommand, Operation, SERVER_4_4_0_WIRE_VERSION},
options::{ClientOptions, RunCursorCommandOptions, SelectionCriteria},
BoxFuture,
};
use super::ExecutionContext;
#[derive(Debug, Clone)]
pub(crate) struct RunCursorCommand<'conn> {
run_command: RunCommand<'conn>,
options: Option<RunCursorCommandOptions>,
}
impl<'conn> RunCursorCommand<'conn> {
pub(crate) fn new(
run_command: RunCommand<'conn>,
options: Option<RunCursorCommandOptions>,
) -> Result<Self> {
Ok(Self {
run_command,
options,
})
}
}
impl Operation for RunCursorCommand<'_> {
type O = CursorSpecification;
const NAME: &'static CStr = cstr!("run_cursor_command");
const ZERO_COPY: bool = true;
fn build(&mut self, description: &StreamDescription) -> Result<Command> {
self.run_command.build(description)
}
fn extract_at_cluster_time(
&self,
response: &crate::bson::RawDocument,
) -> Result<Option<crate::bson::Timestamp>> {
self.run_command.extract_at_cluster_time(response)
}
fn handle_error(&self, error: Error) -> Result<Self::O> {
Err(error)
}
fn selection_criteria(&self) -> super::Feature<&SelectionCriteria> {
self.run_command.selection_criteria()
}
fn read_concern(&self) -> super::Feature<&crate::options::ReadConcern> {
self.run_command.read_concern()
}
fn write_concern(&self) -> super::Feature<&WriteConcern> {
self.run_command.write_concern()
}
fn supports_sessions(&self) -> bool {
self.run_command.supports_sessions()
}
fn retryability(&self, options: &ClientOptions) -> crate::operation::Retryability {
self.run_command.retryability(options)
}
fn is_backpressure_retryable(&self, options: &ClientOptions) -> bool {
self.run_command.is_backpressure_retryable(options)
}
fn update_for_retry(&mut self, retry: Option<&Retry>) {
self.run_command.update_for_retry(retry)
}
fn override_criteria(&self) -> super::OverrideCriteriaFn {
self.run_command.override_criteria()
}
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
self.run_command.pinned_connection()
}
fn name(&self) -> &CStr {
self.run_command.name()
}
fn target(&self) -> super::OperationTarget {
self.run_command.target()
}
fn handle_response<'a>(
&'a self,
response: std::borrow::Cow<'a, RawCommandResponse>,
context: ExecutionContext<'a>,
) -> BoxFuture<'a, Result<Self::O>> {
async move {
let description = context.connection.stream_description()?;
let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION {
None
} else {
self.options.as_ref().and_then(|opts| opts.comment.clone())
};
CursorSpecification::new(
response.into_owned(),
description.server_address.clone(),
self.options.as_ref().and_then(|opts| opts.batch_size),
self.options.as_ref().and_then(|opts| opts.max_time),
comment,
)
}
.boxed()
}
#[cfg(feature = "opentelemetry")]
type Otel = crate::otel::Witness<Self>;
}
#[cfg(feature = "opentelemetry")]
impl crate::otel::OtelInfo for RunCursorCommand<'_> {
fn log_name(&self) -> &str {
self.run_command.otel().log_name()
}
fn cursor_id(&self) -> Option<i64> {
self.run_command.otel().cursor_id()
}
fn output_cursor_id(output: &<Self as Operation>::O) -> Option<i64> {
Some(output.id())
}
}