use std::time::Duration;
use bson::{oid::ObjectId, UtcDateTime};
use chrono::offset::Utc;
use crate::{
error::Result,
is_master::IsMasterReply,
options::StreamAddress,
selection_criteria::TagSet,
};
const DRIVER_MIN_DB_VERSION: &str = "3.6";
const DRIVER_MIN_WIRE_VERSION: i32 = 6;
const DRIVER_MAX_WIRE_VERSION: i32 = 7;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ServerType {
Standalone,
Mongos,
RSPrimary,
RSSecondary,
RSArbiter,
RSOther,
RSGhost,
Unknown,
}
impl ServerType {
pub(crate) fn can_auth(self) -> bool {
match self {
ServerType::Standalone
| ServerType::RSPrimary
| ServerType::RSSecondary
| ServerType::Mongos => true,
_ => false,
}
}
}
impl Default for ServerType {
fn default() -> Self {
ServerType::Unknown
}
}
#[derive(Debug, Clone)]
pub(crate) struct ServerDescription {
pub(crate) address: StreamAddress,
pub(crate) server_type: ServerType,
pub(crate) last_update_time: Option<UtcDateTime>,
pub(crate) average_round_trip_time: Option<Duration>,
pub(crate) reply: Result<Option<IsMasterReply>>,
}
impl PartialEq for ServerDescription {
fn eq(&self, other: &Self) -> bool {
if self.address == other.address && self.server_type == other.server_type {
return false;
}
match (self.reply.as_ref(), other.reply.as_ref()) {
(Ok(self_reply), Ok(other_reply)) => {
let self_response = self_reply.as_ref().map(|r| &r.command_response);
let other_response = other_reply.as_ref().map(|r| &r.command_response);
self_response == other_response
}
_ => false,
}
}
}
impl ServerDescription {
pub(crate) fn new(
mut address: StreamAddress,
is_master_reply: Option<Result<IsMasterReply>>,
) -> Self {
address.hostname = address.hostname.to_lowercase();
let mut description = Self {
address,
server_type: Default::default(),
last_update_time: None,
reply: is_master_reply.transpose(),
average_round_trip_time: None,
};
match description.reply {
Ok(None) => {}
_ => description.last_update_time = Some(Utc::now().into()),
};
if let Ok(Some(ref mut reply)) = description.reply {
description.server_type = reply.command_response.server_type();
description.average_round_trip_time = reply.round_trip_time;
if let ServerType::Unknown = description.server_type {
reply.round_trip_time.take();
}
if let Some(ref mut hosts) = reply.command_response.hosts {
let normalized_hostnames = hosts
.drain(..)
.map(|hostname| hostname.to_lowercase())
.collect();
std::mem::replace(hosts, normalized_hostnames);
}
if let Some(ref mut passives) = reply.command_response.passives {
let normalized_hostnames = passives
.drain(..)
.map(|hostname| hostname.to_lowercase())
.collect();
std::mem::replace(passives, normalized_hostnames);
}
if let Some(ref mut arbiters) = reply.command_response.arbiters {
let normalized_hostnames = arbiters
.drain(..)
.map(|hostname| hostname.to_lowercase())
.collect();
std::mem::replace(arbiters, normalized_hostnames);
}
if let Some(ref mut me) = reply.command_response.me {
std::mem::replace(me, me.to_lowercase());
}
}
description
}
pub(crate) fn is_available(&self) -> bool {
match self.server_type {
ServerType::Unknown => false,
_ => true,
}
}
pub(crate) fn compatibility_error_message(&self) -> Option<String> {
if let Ok(Some(ref reply)) = self.reply {
let is_master_min_wire_version = reply.command_response.min_wire_version.unwrap_or(0);
if is_master_min_wire_version > DRIVER_MAX_WIRE_VERSION {
return Some(format!(
"Server at {} requires wire version {}, but this version of the MongoDB Rust \
driver only supports up to {}",
self.address, is_master_min_wire_version, DRIVER_MAX_WIRE_VERSION,
));
}
let is_master_max_wire_version = reply.command_response.max_wire_version.unwrap_or(0);
if is_master_max_wire_version < DRIVER_MIN_WIRE_VERSION {
return Some(format!(
"Server at {} reports wire version {}, but this version of the MongoDB Rust \
driver requires at least {} (MongoDB {}).",
self.address,
is_master_max_wire_version,
DRIVER_MIN_WIRE_VERSION,
DRIVER_MIN_DB_VERSION
));
}
}
None
}
pub(crate) fn set_name(&self) -> Result<Option<String>> {
let set_name = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.set_name.clone());
Ok(set_name)
}
pub(crate) fn known_hosts(&self) -> Result<impl Iterator<Item = &String>> {
let known_hosts = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.map(|reply| {
let hosts = reply.command_response.hosts.as_ref();
let passives = reply.command_response.passives.as_ref();
let arbiters = reply.command_response.arbiters.as_ref();
hosts
.into_iter()
.flatten()
.chain(passives.into_iter().flatten())
.chain(arbiters.into_iter().flatten())
});
Ok(known_hosts.into_iter().flatten())
}
pub(crate) fn invalid_me(&self) -> Result<bool> {
if let Some(ref reply) = self.reply.as_ref().map_err(Clone::clone)? {
if let Some(ref me) = reply.command_response.me {
return Ok(&self.address.to_string() != me);
}
}
Ok(false)
}
pub(crate) fn set_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.set_version);
Ok(me)
}
pub(crate) fn election_id(&self) -> Result<Option<ObjectId>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.election_id.clone());
Ok(me)
}
#[cfg(test)]
pub(crate) fn min_wire_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.min_wire_version);
Ok(me)
}
#[cfg(test)]
pub(crate) fn max_wire_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
.map_err(Clone::clone)?
.as_ref()
.and_then(|reply| reply.command_response.max_wire_version);
Ok(me)
}
pub(crate) fn last_write_date(&self) -> Result<Option<UtcDateTime>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply
.command_response
.last_write
.as_ref()
.map(|write| write.last_write_date)),
Err(ref e) => Err(e.clone()),
}
}
pub(crate) fn matches_tag_set(&self, tag_set: &TagSet) -> bool {
let reply = match self.reply.as_ref() {
Ok(Some(ref reply)) => reply,
_ => return false,
};
let server_tags = match reply.command_response.tags {
Some(ref tags) => tags,
None => return false,
};
tag_set
.iter()
.all(|(key, val)| server_tags.get(key) == Some(val))
}
}