use serde::de::DeserializeOwned;
use super::wire::Message;
use crate::{
bson::{Bson, Document},
bson_util,
client::{options::ServerApi, ClientSession, ClusterTime},
error::{CommandError, ErrorKind, Result},
options::StreamAddress,
selection_criteria::ReadPreference,
};
#[derive(Debug, Clone)]
pub(crate) struct Command {
pub(crate) name: String,
pub(crate) target_db: String,
pub(crate) body: Document,
}
impl Command {
pub(crate) fn new(name: String, target_db: String, body: Document) -> Self {
Self {
name,
target_db,
body,
}
}
pub(crate) fn set_session(&mut self, session: &ClientSession) {
self.body.insert("lsid", session.id());
}
pub(crate) fn set_cluster_time(&mut self, cluster_time: &ClusterTime) {
if let Ok(doc) = bson::to_bson(cluster_time) {
self.body.insert("$clusterTime", doc);
}
}
pub(crate) fn set_txn_number(&mut self, txn_number: u64) {
self.body.insert("txnNumber", txn_number);
}
pub(crate) fn set_server_api(&mut self, server_api: &ServerApi) {
if matches!(self.name.as_str(), "getMore") {
return;
}
self.body
.insert("apiVersion", format!("{}", server_api.version));
if let Some(strict) = server_api.strict {
self.body.insert("apiStrict", strict);
}
if let Some(deprecation_errors) = server_api.deprecation_errors {
self.body.insert("apiDeprecationErrors", deprecation_errors);
}
}
pub(crate) fn set_read_preference(&mut self, read_preference: ReadPreference) {
self.body
.insert("$readPreference", read_preference.into_document());
}
}
#[derive(Debug, Clone)]
pub(crate) struct CommandResponse {
source: StreamAddress,
pub(crate) raw_response: Document,
cluster_time: Option<ClusterTime>,
}
impl CommandResponse {
#[cfg(test)]
pub(crate) fn with_document_and_address(source: StreamAddress, doc: Document) -> Self {
Self {
source,
raw_response: doc,
cluster_time: None,
}
}
#[cfg(test)]
pub(crate) fn with_document(doc: Document) -> Self {
Self::with_document_and_address(
StreamAddress {
hostname: "localhost".to_string(),
port: None,
},
doc,
)
}
pub(crate) fn new(source: StreamAddress, message: Message) -> Result<Self> {
let raw_response = message.single_document_response()?;
let cluster_time = raw_response
.get("$clusterTime")
.and_then(|subdoc| bson::from_bson(subdoc.clone()).ok());
Ok(Self {
source,
raw_response,
cluster_time,
})
}
pub(crate) fn is_success(&self) -> bool {
match self.raw_response.get("ok") {
Some(ref b) => bson_util::get_int(b) == Some(1),
_ => false,
}
}
pub(crate) fn validate(&self) -> Result<()> {
if !self.is_success() {
let command_error: CommandError =
bson::from_bson(Bson::Document(self.raw_response.clone())).map_err(|_| {
ErrorKind::ResponseError {
message: "invalid server response".to_string(),
}
})?;
Err(ErrorKind::CommandError(command_error).into())
} else {
Ok(())
}
}
pub(crate) fn body<T: DeserializeOwned>(&self) -> Result<T> {
match bson::from_bson(Bson::Document(self.raw_response.clone())) {
Ok(body) => Ok(body),
Err(e) => Err(ErrorKind::ResponseError {
message: format!("{}", e),
}
.into()),
}
}
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}
pub(crate) fn source_address(&self) -> &StreamAddress {
&self.source
}
}