#[cfg(test)]
mod test;
use bson::{Bson, Timestamp};
use super::Operation;
use crate::{
bson::Document,
client::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS},
cmap::{Command, RawCommandResponse, StreamDescription},
error::{ErrorKind, Result},
options::WriteConcern,
selection_criteria::SelectionCriteria,
};
#[derive(Debug)]
pub(crate) struct RunCommand {
db: String,
command: Document,
selection_criteria: Option<SelectionCriteria>,
write_concern: Option<WriteConcern>,
}
impl RunCommand {
pub(crate) fn new(
db: String,
command: Document,
selection_criteria: Option<SelectionCriteria>,
) -> Result<Self> {
let write_concern = command
.get("writeConcern")
.map(|doc| bson::from_bson::<WriteConcern>(doc.clone()))
.transpose()?;
Ok(Self {
db,
command,
selection_criteria,
write_concern,
})
}
fn command_name(&self) -> Option<&str> {
self.command.keys().next().map(String::as_str)
}
}
impl Operation for RunCommand {
type O = Document;
type Command = Document;
type Response = Response;
const NAME: &'static str = "$genericRunCommand";
fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
let command_name = self
.command_name()
.ok_or_else(|| ErrorKind::InvalidArgument {
message: "an empty document cannot be passed to a run_command operation".into(),
})?;
Ok(Command::new(
command_name.to_string(),
self.db.clone(),
self.command.clone(),
))
}
fn handle_response(
&self,
response: Document,
_description: &StreamDescription,
) -> Result<Self::O> {
Ok(response)
}
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.selection_criteria.as_ref()
}
fn write_concern(&self) -> Option<&WriteConcern> {
self.write_concern.as_ref()
}
fn supports_sessions(&self) -> bool {
self.command_name()
.map(|command_name| {
!SESSIONS_UNSUPPORTED_COMMANDS.contains(command_name.to_lowercase().as_str())
})
.unwrap_or(false)
}
}
#[derive(Debug)]
pub(crate) struct Response {
doc: Document,
cluster_time: Option<ClusterTime>,
recovery_token: Option<Document>,
}
impl super::Response for Response {
type Body = Document;
fn deserialize_response(raw: &RawCommandResponse) -> Result<Self> {
let doc: Document = raw.body()?;
let cluster_time = doc
.get_document("$clusterTime")
.ok()
.and_then(|doc| bson::from_document(doc.clone()).ok());
let recovery_token = doc.get_document("recoveryToken").ok().cloned();
Ok(Self {
doc,
cluster_time,
recovery_token,
})
}
fn ok(&self) -> Option<&Bson> {
self.doc.get("ok")
}
fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}
fn at_cluster_time(&self) -> Option<Timestamp> {
self.doc
.get_timestamp("atClusterTime")
.or_else(|_| {
self.doc
.get_document("cursor")
.and_then(|subdoc| subdoc.get_timestamp("atClusterTime"))
})
.ok()
}
fn recovery_token(&self) -> Option<&Document> {
self.recovery_token.as_ref()
}
fn into_body(self) -> Self::Body {
self.doc
}
}