mod server_selection;
#[cfg(test)]
mod test;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use bson::oid::ObjectId;
use serde::Deserialize;
use crate::{
cmap::Command,
error::{ErrorKind, Result},
options::{ClientOptions, StreamAddress},
sdam::description::server::{ServerDescription, ServerType},
selection_criteria::{ReadPreference, SelectionCriteria},
};
const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize)]
pub(crate) enum TopologyType {
Single,
ReplicaSetNoPrimary,
ReplicaSetWithPrimary,
Sharded,
Unknown,
}
impl Default for TopologyType {
fn default() -> Self {
TopologyType::Unknown
}
}
#[derive(Debug, Clone)]
pub(crate) struct TopologyDescription {
single_seed: bool,
topology_type: TopologyType,
set_name: Option<String>,
max_set_version: Option<i32>,
max_election_id: Option<ObjectId>,
compatibility_error: Option<String>,
logical_session_timeout_minutes: Option<u32>,
local_threshold: Option<Duration>,
heartbeat_freq: Option<Duration>,
servers: HashMap<StreamAddress, ServerDescription>,
}
impl TopologyDescription {
pub(crate) fn new(options: ClientOptions) -> Result<Self> {
verify_max_staleness(
options
.selection_criteria
.as_ref()
.and_then(|criteria| criteria.max_staleness()),
)?;
let topology_type = if options.repl_set_name.is_some() {
TopologyType::ReplicaSetNoPrimary
} else if let Some(true) = options.direct_connection {
TopologyType::Single
} else {
TopologyType::Unknown
};
let servers: HashMap<_, _> = options
.hosts
.into_iter()
.map(|address| {
let description = ServerDescription::new(address.clone(), None);
(address, description)
})
.collect();
Ok(Self {
single_seed: servers.len() == 1,
topology_type,
set_name: options.repl_set_name,
max_set_version: None,
max_election_id: None,
compatibility_error: None,
logical_session_timeout_minutes: None,
local_threshold: options.local_threshold,
heartbeat_freq: options.heartbeat_freq,
servers,
})
}
pub(crate) fn server_addresses(&self) -> impl Iterator<Item = &StreamAddress> {
self.servers.keys()
}
pub(crate) fn get_server_description(
&self,
address: &StreamAddress,
) -> Option<&ServerDescription> {
self.servers.get(address)
}
pub(crate) fn update_command_with_read_pref(
&self,
server_type: ServerType,
command: &mut Command,
criteria: Option<&SelectionCriteria>,
) {
match (self.topology_type, server_type) {
(TopologyType::Sharded, ServerType::Mongos)
| (TopologyType::Single, ServerType::Mongos) => {
self.update_command_read_pref_for_mongos(command, criteria);
}
(TopologyType::Single, ServerType::Standalone) => {}
(TopologyType::Single, _) => {
let specified_read_pref = criteria
.and_then(SelectionCriteria::as_read_pref)
.map(Clone::clone);
let resolved_read_pref = match specified_read_pref {
Some(ReadPreference::Primary) | None => ReadPreference::PrimaryPreferred {
max_staleness: None,
tag_sets: None,
},
Some(other) => other,
};
command.read_pref = Some(resolved_read_pref);
}
_ => {}
}
}
fn update_command_read_pref_for_mongos(
&self,
command: &mut Command,
criteria: Option<&SelectionCriteria>,
) {
match criteria {
Some(SelectionCriteria::ReadPreference(ReadPreference::Secondary {
max_staleness,
tag_sets,
})) => {
command.read_pref = Some(ReadPreference::Secondary {
max_staleness: *max_staleness,
tag_sets: tag_sets.clone(),
});
}
Some(SelectionCriteria::ReadPreference(ReadPreference::PrimaryPreferred {
max_staleness,
tag_sets,
})) => {
command.read_pref = Some(ReadPreference::PrimaryPreferred {
max_staleness: *max_staleness,
tag_sets: tag_sets.clone(),
});
}
Some(SelectionCriteria::ReadPreference(ReadPreference::SecondaryPreferred {
max_staleness,
tag_sets,
})) if max_staleness.is_some() || tag_sets.is_some() => {
command.read_pref = Some(ReadPreference::SecondaryPreferred {
max_staleness: *max_staleness,
tag_sets: tag_sets.clone(),
});
}
Some(SelectionCriteria::ReadPreference(ReadPreference::Nearest {
max_staleness,
tag_sets,
})) => {
command.read_pref = Some(ReadPreference::Nearest {
max_staleness: *max_staleness,
tag_sets: tag_sets.clone(),
});
}
_ => {}
}
}
fn heartbeat_frequency(&self) -> Duration {
self.heartbeat_freq.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY)
}
fn check_compatibility(&mut self) {
for server in self.servers.values() {
let error_message = server.compatibility_error_message();
if error_message.is_some() {
self.compatibility_error = error_message;
return;
}
}
}
pub(crate) fn compatibility_error(&self) -> Option<&String> {
self.compatibility_error.as_ref()
}
fn update_round_trip_time(&self, server_description: &mut ServerDescription) {
if let Some(old_rtt) = self
.servers
.get(&server_description.address)
.and_then(|server_desc| server_desc.average_round_trip_time)
{
if let Some(new_rtt) = server_description.average_round_trip_time {
server_description.average_round_trip_time =
Some((new_rtt / 5) + (old_rtt * 4 / 5));
}
}
}
pub(crate) fn update(&mut self, mut server_description: ServerDescription) -> Result<()> {
if !self.servers.contains_key(&server_description.address) {
return Ok(());
}
self.update_round_trip_time(&mut server_description);
self.servers.insert(
server_description.address.clone(),
server_description.clone(),
);
match self.topology_type {
TopologyType::Single => {}
TopologyType::Unknown => self.update_unknown_topology(server_description)?,
TopologyType::Sharded => self.update_sharded_topology(server_description),
TopologyType::ReplicaSetNoPrimary => {
self.update_replica_set_no_primary_topology(server_description)?
}
TopologyType::ReplicaSetWithPrimary => {
self.update_replica_set_with_primary_topology(server_description)?;
}
}
self.check_compatibility();
Ok(())
}
fn update_unknown_topology(&mut self, server_description: ServerDescription) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RSGhost => {}
ServerType::Standalone => {
self.update_unknown_with_standalone_server(server_description)
}
ServerType::Mongos => self.topology_type = TopologyType::Sharded,
ServerType::RSPrimary => {
self.update_rs_from_primary_server(server_description)?;
}
ServerType::RSSecondary | ServerType::RSArbiter | ServerType::RSOther => {
self.update_rs_without_primary_server(server_description)?;
}
}
Ok(())
}
fn update_sharded_topology(&mut self, server_description: ServerDescription) {
match server_description.server_type {
ServerType::Unknown | ServerType::Mongos => {}
_ => {
self.servers.remove(&server_description.address);
}
}
}
fn update_replica_set_no_primary_topology(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RSGhost => {}
ServerType::Standalone | ServerType::Mongos => {
self.servers.remove(&server_description.address);
}
ServerType::RSPrimary => self.update_rs_from_primary_server(server_description)?,
ServerType::RSSecondary | ServerType::RSArbiter | ServerType::RSOther => {
self.update_rs_without_primary_server(server_description)?;
}
}
Ok(())
}
fn update_replica_set_with_primary_topology(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RSGhost => {
self.record_primary_state();
}
ServerType::Standalone | ServerType::Mongos => {
self.servers.remove(&server_description.address);
self.record_primary_state();
}
ServerType::RSPrimary => self.update_rs_from_primary_server(server_description)?,
ServerType::RSSecondary | ServerType::RSArbiter | ServerType::RSOther => {
self.update_rs_with_primary_from_member(server_description)?;
}
}
Ok(())
}
fn update_unknown_with_standalone_server(&mut self, server_description: ServerDescription) {
if self.single_seed {
self.topology_type = TopologyType::Single;
} else {
self.servers.remove(&server_description.address);
}
}
fn update_rs_without_primary_server(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
if self.set_name.is_none() {
self.set_name = server_description.set_name()?;
} else if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
return Ok(());
}
self.add_new_servers(server_description.known_hosts()?)?;
if server_description.invalid_me()? {
self.servers.remove(&server_description.address);
}
Ok(())
}
fn update_rs_with_primary_from_member(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
return Ok(());
}
if server_description.invalid_me()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
return Ok(());
}
Ok(())
}
fn update_rs_from_primary_server(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
if self.set_name.is_none() {
self.set_name = server_description.set_name()?;
} else if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
return Ok(());
}
if let Some(server_set_version) = server_description.set_version()? {
if let Some(server_election_id) = server_description.election_id()? {
if let Some(topology_max_set_version) = self.max_set_version {
if let Some(ref topology_max_election_id) = self.max_election_id {
if topology_max_set_version > server_set_version
|| (topology_max_set_version == server_set_version
&& *topology_max_election_id > server_election_id)
{
self.servers.insert(
server_description.address.clone(),
ServerDescription::new(server_description.address, None),
);
self.record_primary_state();
return Ok(());
}
}
}
self.max_election_id = Some(server_election_id);
}
}
if let Some(server_set_version) = server_description.set_version()? {
if self
.max_set_version
.as_ref()
.map(|topology_max_set_version| server_set_version > *topology_max_set_version)
.unwrap_or(true)
{
self.max_set_version = Some(server_set_version);
}
}
let addresses: Vec<_> = self.servers.keys().cloned().collect();
for address in addresses.clone() {
if address == server_description.address {
continue;
}
if let ServerType::RSPrimary = self.servers.get(&address).unwrap().server_type {
self.servers
.insert(address.clone(), ServerDescription::new(address, None));
}
}
self.add_new_servers(server_description.known_hosts()?)?;
let known_hosts: HashSet<_> = server_description.known_hosts()?.collect();
for address in addresses {
if !known_hosts.contains(&address.to_string()) {
self.servers.remove(&address);
}
}
self.record_primary_state();
Ok(())
}
fn record_primary_state(&mut self) {
self.topology_type = if self
.servers
.values()
.any(|server| server.server_type == ServerType::RSPrimary)
{
TopologyType::ReplicaSetWithPrimary
} else {
TopologyType::ReplicaSetNoPrimary
};
}
fn add_new_servers<'a>(&'a mut self, servers: impl Iterator<Item = &'a String>) -> Result<()> {
for server in servers {
let server = StreamAddress::parse(&server)?;
if !self.servers.contains_key(&server) {
self.servers
.insert(server.clone(), ServerDescription::new(server, None));
}
}
Ok(())
}
}
fn verify_max_staleness(max_staleness: Option<Duration>) -> Result<()> {
if max_staleness
.map(|staleness| staleness > Duration::from_secs(0) && staleness < Duration::from_secs(90))
.unwrap_or(false)
{
return Err(ErrorKind::ArgumentError {
message: "max staleness cannot be both positive and below 90 seconds".into(),
}
.into());
}
Ok(())
}