mongodb 3.6.0

The official MongoDB driver for Rust
Documentation
use futures_util::FutureExt;

use crate::{
    bson_compat::{cstr, CStr},
    client::Retry,
    cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
    concern::WriteConcern,
    cursor::common::CursorSpecification,
    error::{Error, Result},
    operation::{run_command::RunCommand, Operation, SERVER_4_4_0_WIRE_VERSION},
    options::{ClientOptions, RunCursorCommandOptions, SelectionCriteria},
    BoxFuture,
};

use super::ExecutionContext;

#[derive(Debug, Clone)]
pub(crate) struct RunCursorCommand<'conn> {
    run_command: RunCommand<'conn>,
    options: Option<RunCursorCommandOptions>,
}

impl<'conn> RunCursorCommand<'conn> {
    pub(crate) fn new(
        run_command: RunCommand<'conn>,
        options: Option<RunCursorCommandOptions>,
    ) -> Result<Self> {
        Ok(Self {
            run_command,
            options,
        })
    }
}

impl Operation for RunCursorCommand<'_> {
    type O = CursorSpecification;

    const NAME: &'static CStr = cstr!("run_cursor_command");

    const ZERO_COPY: bool = true;

    fn build(&mut self, description: &StreamDescription) -> Result<Command> {
        self.run_command.build(description)
    }

    fn extract_at_cluster_time(
        &self,
        response: &crate::bson::RawDocument,
    ) -> Result<Option<crate::bson::Timestamp>> {
        self.run_command.extract_at_cluster_time(response)
    }

    fn handle_error(&self, error: Error) -> Result<Self::O> {
        Err(error)
    }

    fn selection_criteria(&self) -> super::Feature<&SelectionCriteria> {
        self.run_command.selection_criteria()
    }

    fn read_concern(&self) -> super::Feature<&crate::options::ReadConcern> {
        self.run_command.read_concern()
    }

    fn write_concern(&self) -> super::Feature<&WriteConcern> {
        self.run_command.write_concern()
    }

    fn supports_sessions(&self) -> bool {
        self.run_command.supports_sessions()
    }

    fn retryability(&self, options: &ClientOptions) -> crate::operation::Retryability {
        self.run_command.retryability(options)
    }

    fn is_backpressure_retryable(&self, options: &ClientOptions) -> bool {
        self.run_command.is_backpressure_retryable(options)
    }

    fn update_for_retry(&mut self, retry: Option<&Retry>) {
        self.run_command.update_for_retry(retry)
    }

    fn override_criteria(&self) -> super::OverrideCriteriaFn {
        self.run_command.override_criteria()
    }

    fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
        self.run_command.pinned_connection()
    }

    fn name(&self) -> &CStr {
        self.run_command.name()
    }

    fn target(&self) -> super::OperationTarget {
        self.run_command.target()
    }

    fn handle_response<'a>(
        &'a self,
        response: std::borrow::Cow<'a, RawCommandResponse>,
        context: ExecutionContext<'a>,
    ) -> BoxFuture<'a, Result<Self::O>> {
        async move {
            let description = context.connection.stream_description()?;

            // The comment should only be propagated to getMore calls on 4.4+.
            let comment = if description.max_wire_version.unwrap_or(0) < SERVER_4_4_0_WIRE_VERSION {
                None
            } else {
                self.options.as_ref().and_then(|opts| opts.comment.clone())
            };

            CursorSpecification::new(
                response.into_owned(),
                description.server_address.clone(),
                self.options.as_ref().and_then(|opts| opts.batch_size),
                self.options.as_ref().and_then(|opts| opts.max_time),
                comment,
            )
        }
        .boxed()
    }

    #[cfg(feature = "opentelemetry")]
    type Otel = crate::otel::Witness<Self>;
}

#[cfg(feature = "opentelemetry")]
impl crate::otel::OtelInfo for RunCursorCommand<'_> {
    fn log_name(&self) -> &str {
        self.run_command.otel().log_name()
    }

    fn cursor_id(&self) -> Option<i64> {
        self.run_command.otel().cursor_id()
    }

    fn output_cursor_id(output: &<Self as Operation>::O) -> Option<i64> {
        Some(output.id())
    }
}