pub(crate) mod server_selection;
#[cfg(test)]
mod test;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use serde::Deserialize;
use crate::{
bson::oid::ObjectId,
client::ClusterTime,
cmap::Command,
error::{ErrorKind, Result},
options::{ClientOptions, StreamAddress},
sdam::description::server::{ServerDescription, ServerType},
selection_criteria::{ReadPreference, SelectionCriteria},
};
const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize)]
pub(crate) enum TopologyType {
Single,
ReplicaSetNoPrimary,
ReplicaSetWithPrimary,
Sharded,
Unknown,
}
impl Default for TopologyType {
fn default() -> Self {
TopologyType::Unknown
}
}
#[derive(Debug, Clone)]
pub(crate) struct TopologyDescription {
single_seed: bool,
topology_type: TopologyType,
set_name: Option<String>,
max_set_version: Option<i32>,
max_election_id: Option<ObjectId>,
compatibility_error: Option<String>,
session_support_status: SessionSupportStatus,
cluster_time: Option<ClusterTime>,
local_threshold: Option<Duration>,
heartbeat_freq: Option<Duration>,
servers: HashMap<StreamAddress, ServerDescription>,
}
impl PartialEq for TopologyDescription {
fn eq(&self, other: &Self) -> bool {
self.compatibility_error == other.compatibility_error
&& self.servers == other.servers
&& self.topology_type == other.topology_type
&& self.cluster_time == other.cluster_time
}
}
impl TopologyDescription {
#[cfg(test)]
pub(crate) fn new_from_hosts(hosts: Vec<StreamAddress>) -> Self {
Self {
single_seed: false,
topology_type: TopologyType::Unknown,
set_name: None,
max_set_version: None,
max_election_id: None,
compatibility_error: None,
session_support_status: Default::default(),
cluster_time: None,
local_threshold: None,
heartbeat_freq: None,
servers: hosts
.into_iter()
.map(|address| (address.clone(), ServerDescription::new(address, None)))
.collect(),
}
}
pub(crate) fn new(options: ClientOptions) -> Result<Self> {
verify_max_staleness(
options
.selection_criteria
.as_ref()
.and_then(|criteria| criteria.max_staleness()),
)?;
let topology_type = if let Some(true) = options.direct_connection {
TopologyType::Single
} else if options.repl_set_name.is_some() {
TopologyType::ReplicaSetNoPrimary
} else {
TopologyType::Unknown
};
let servers: HashMap<_, _> = options
.hosts
.into_iter()
.map(|address| {
let description = ServerDescription::new(address.clone(), None);
(address, description)
})
.collect();
Ok(Self {
single_seed: servers.len() == 1,
topology_type,
set_name: options.repl_set_name,
max_set_version: None,
max_election_id: None,
compatibility_error: None,
session_support_status: SessionSupportStatus::Undetermined,
cluster_time: None,
local_threshold: options.local_threshold,
heartbeat_freq: options.heartbeat_freq,
servers,
})
}
pub(crate) fn topology_type(&self) -> TopologyType {
self.topology_type
}
pub(crate) fn server_addresses(&self) -> impl Iterator<Item = &StreamAddress> {
self.servers.keys()
}
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}
pub(crate) fn get_server_description(
&self,
address: &StreamAddress,
) -> Option<&ServerDescription> {
self.servers.get(address)
}
pub(crate) fn update_command_with_read_pref(
&self,
server_type: ServerType,
command: &mut Command,
criteria: Option<&SelectionCriteria>,
) {
match (self.topology_type, server_type) {
(TopologyType::Sharded, ServerType::Mongos)
| (TopologyType::Single, ServerType::Mongos) => {
self.update_command_read_pref_for_mongos(command, criteria);
}
(TopologyType::Single, ServerType::Standalone) => {}
(TopologyType::Single, _) => {
let specified_read_pref = criteria
.and_then(SelectionCriteria::as_read_pref)
.map(Clone::clone);
let resolved_read_pref = match specified_read_pref {
Some(ReadPreference::Primary) | None => ReadPreference::PrimaryPreferred {
options: Default::default(),
},
Some(other) => other,
};
command.set_read_preference(resolved_read_pref);
}
_ => {
let read_pref = match criteria {
Some(SelectionCriteria::ReadPreference(rp)) => rp.clone(),
Some(SelectionCriteria::Predicate(_)) => ReadPreference::PrimaryPreferred {
options: Default::default(),
},
None => ReadPreference::Primary,
};
command.set_read_preference(read_pref);
}
}
}
fn update_command_read_pref_for_mongos(
&self,
command: &mut Command,
criteria: Option<&SelectionCriteria>,
) {
match criteria {
Some(SelectionCriteria::ReadPreference(ReadPreference::Secondary { ref options })) => {
command.set_read_preference(ReadPreference::Secondary {
options: options.clone(),
});
}
Some(SelectionCriteria::ReadPreference(ReadPreference::PrimaryPreferred {
ref options,
})) => {
command.set_read_preference(ReadPreference::PrimaryPreferred {
options: options.clone(),
});
}
Some(SelectionCriteria::ReadPreference(ReadPreference::SecondaryPreferred {
ref options,
})) if options.max_staleness.is_some() || options.tag_sets.is_some() => {
command.set_read_preference(ReadPreference::SecondaryPreferred {
options: options.clone(),
});
}
Some(SelectionCriteria::ReadPreference(ReadPreference::Nearest { ref options })) => {
command.set_read_preference(ReadPreference::Nearest {
options: options.clone(),
});
}
_ => {}
}
}
fn heartbeat_frequency(&self) -> Duration {
self.heartbeat_freq.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY)
}
fn check_compatibility(&mut self) {
self.compatibility_error = None;
for server in self.servers.values() {
let error_message = server.compatibility_error_message();
if error_message.is_some() {
self.compatibility_error = error_message;
return;
}
}
}
pub(crate) fn compatibility_error(&self) -> Option<&String> {
self.compatibility_error.as_ref()
}
fn update_round_trip_time(&self, server_description: &mut ServerDescription) {
if let Some(old_rtt) = self
.servers
.get(&server_description.address)
.and_then(|server_desc| server_desc.average_round_trip_time)
{
if let Some(new_rtt) = server_description.average_round_trip_time {
server_description.average_round_trip_time =
Some((new_rtt / 5) + (old_rtt * 4 / 5));
}
}
}
fn update_session_support_status(&mut self, server_description: &ServerDescription) {
if !server_description.server_type.is_data_bearing() {
return;
}
if server_description.server_type == ServerType::Standalone {
self.session_support_status = SessionSupportStatus::Unsupported {
logical_session_timeout: server_description
.logical_session_timeout()
.ok()
.flatten(),
};
return;
}
match server_description.logical_session_timeout().ok().flatten() {
Some(timeout) => match self.session_support_status {
SessionSupportStatus::Supported {
logical_session_timeout: topology_timeout,
} => {
self.session_support_status = SessionSupportStatus::Supported {
logical_session_timeout: std::cmp::min(timeout, topology_timeout),
};
}
SessionSupportStatus::Undetermined => {
self.session_support_status = SessionSupportStatus::Supported {
logical_session_timeout: timeout,
}
}
SessionSupportStatus::Unsupported { .. } => {
let min_timeout = self
.servers
.values()
.filter(|s| s.server_type.is_data_bearing())
.map(|s| s.logical_session_timeout().ok().flatten())
.min()
.flatten();
match min_timeout {
Some(timeout) => {
self.session_support_status = SessionSupportStatus::Supported {
logical_session_timeout: timeout,
}
}
None => {
self.session_support_status = SessionSupportStatus::Unsupported {
logical_session_timeout: None,
}
}
}
}
},
None if server_description.server_type.is_data_bearing()
|| self.topology_type == TopologyType::Single =>
{
self.session_support_status = SessionSupportStatus::Unsupported {
logical_session_timeout: None,
}
}
None => {}
}
}
pub(crate) fn advance_cluster_time(&mut self, cluster_time: &ClusterTime) {
if self.cluster_time.as_ref() >= Some(cluster_time) {
return;
}
self.cluster_time = Some(cluster_time.clone());
}
pub(crate) fn diff(&self, other: &TopologyDescription) -> Option<TopologyDescriptionDiff> {
if self == other {
return None;
}
let addresses: HashSet<&StreamAddress> = self.server_addresses().collect();
let other_addresses: HashSet<&StreamAddress> = other.server_addresses().collect();
Some(TopologyDescriptionDiff {
new_addresses: other_addresses
.difference(&addresses)
.cloned()
.cloned()
.collect(),
})
}
pub(crate) fn sync_hosts(&mut self, hosts: &HashSet<StreamAddress>) {
self.add_new_servers_from_addresses(hosts.iter());
self.servers.retain(|host, _| hosts.contains(host));
}
pub(crate) fn session_support_status(&self) -> SessionSupportStatus {
self.session_support_status
}
pub(crate) fn update(&mut self, mut server_description: ServerDescription) -> Result<()> {
if !self.servers.contains_key(&server_description.address) {
return Ok(());
}
self.update_round_trip_time(&mut server_description);
self.servers.insert(
server_description.address.clone(),
server_description.clone(),
);
self.update_session_support_status(&server_description);
if let Some(ref cluster_time) = server_description.cluster_time().ok().flatten() {
self.advance_cluster_time(cluster_time);
}
match self.topology_type {
TopologyType::Single => {}
TopologyType::Unknown => self.update_unknown_topology(server_description)?,
TopologyType::Sharded => self.update_sharded_topology(server_description),
TopologyType::ReplicaSetNoPrimary => {
self.update_replica_set_no_primary_topology(server_description)?
}
TopologyType::ReplicaSetWithPrimary => {
self.update_replica_set_with_primary_topology(server_description)?;
}
}
self.check_compatibility();
Ok(())
}
fn update_unknown_topology(&mut self, server_description: ServerDescription) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RSGhost => {}
ServerType::Standalone => {
self.update_unknown_with_standalone_server(server_description)
}
ServerType::Mongos => self.topology_type = TopologyType::Sharded,
ServerType::RSPrimary => {
self.topology_type = TopologyType::ReplicaSetWithPrimary;
self.update_rs_from_primary_server(server_description)?;
}
ServerType::RSSecondary | ServerType::RSArbiter | ServerType::RSOther => {
self.topology_type = TopologyType::ReplicaSetNoPrimary;
self.update_rs_without_primary_server(server_description)?;
}
}
Ok(())
}
fn update_sharded_topology(&mut self, server_description: ServerDescription) {
match server_description.server_type {
ServerType::Unknown | ServerType::Mongos => {}
_ => {
self.servers.remove(&server_description.address);
}
}
}
fn update_replica_set_no_primary_topology(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RSGhost => {}
ServerType::Standalone | ServerType::Mongos => {
self.servers.remove(&server_description.address);
}
ServerType::RSPrimary => {
self.topology_type = TopologyType::ReplicaSetWithPrimary;
self.update_rs_from_primary_server(server_description)?
}
ServerType::RSSecondary | ServerType::RSArbiter | ServerType::RSOther => {
self.update_rs_without_primary_server(server_description)?;
}
}
Ok(())
}
fn update_replica_set_with_primary_topology(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RSGhost => {
self.record_primary_state();
}
ServerType::Standalone | ServerType::Mongos => {
self.servers.remove(&server_description.address);
self.record_primary_state();
}
ServerType::RSPrimary => self.update_rs_from_primary_server(server_description)?,
ServerType::RSSecondary | ServerType::RSArbiter | ServerType::RSOther => {
self.update_rs_with_primary_from_member(server_description)?;
}
}
Ok(())
}
fn update_unknown_with_standalone_server(&mut self, server_description: ServerDescription) {
if self.single_seed {
self.topology_type = TopologyType::Single;
} else {
self.servers.remove(&server_description.address);
}
}
fn update_rs_without_primary_server(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
if self.set_name.is_none() {
self.set_name = server_description.set_name()?;
} else if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
return Ok(());
}
self.add_new_servers(server_description.known_hosts()?)?;
if server_description.invalid_me()? {
self.servers.remove(&server_description.address);
}
Ok(())
}
fn update_rs_with_primary_from_member(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
return Ok(());
}
if server_description.invalid_me()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
return Ok(());
}
Ok(())
}
fn update_rs_from_primary_server(
&mut self,
server_description: ServerDescription,
) -> Result<()> {
if self.set_name.is_none() {
self.set_name = server_description.set_name()?;
} else if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
return Ok(());
}
if let Some(server_set_version) = server_description.set_version()? {
if let Some(server_election_id) = server_description.election_id()? {
if let Some(topology_max_set_version) = self.max_set_version {
if let Some(ref topology_max_election_id) = self.max_election_id {
if topology_max_set_version > server_set_version
|| (topology_max_set_version == server_set_version
&& *topology_max_election_id > server_election_id)
{
self.servers.insert(
server_description.address.clone(),
ServerDescription::new(server_description.address, None),
);
self.record_primary_state();
return Ok(());
}
}
}
self.max_election_id = Some(server_election_id);
}
}
if let Some(server_set_version) = server_description.set_version()? {
if self
.max_set_version
.as_ref()
.map(|topology_max_set_version| server_set_version > *topology_max_set_version)
.unwrap_or(true)
{
self.max_set_version = Some(server_set_version);
}
}
let addresses: Vec<_> = self.servers.keys().cloned().collect();
for address in addresses.clone() {
if address == server_description.address {
continue;
}
if let ServerType::RSPrimary = self.servers.get(&address).unwrap().server_type {
self.servers
.insert(address.clone(), ServerDescription::new(address, None));
}
}
self.add_new_servers(server_description.known_hosts()?)?;
let known_hosts: HashSet<_> = server_description.known_hosts()?.collect();
for address in addresses {
if !known_hosts.contains(&address.to_string()) {
self.servers.remove(&address);
}
}
self.record_primary_state();
Ok(())
}
fn record_primary_state(&mut self) {
self.topology_type = if self
.servers
.values()
.any(|server| server.server_type == ServerType::RSPrimary)
{
TopologyType::ReplicaSetWithPrimary
} else {
TopologyType::ReplicaSetNoPrimary
};
}
fn add_new_servers<'a>(&mut self, servers: impl Iterator<Item = &'a String>) -> Result<()> {
let servers: Result<Vec<_>> = servers.map(|server| StreamAddress::parse(server)).collect();
self.add_new_servers_from_addresses(servers?.iter());
Ok(())
}
fn add_new_servers_from_addresses<'a>(
&mut self,
servers: impl Iterator<Item = &'a StreamAddress>,
) {
for server in servers {
if !self.servers.contains_key(&server) {
self.servers
.insert(server.clone(), ServerDescription::new(server.clone(), None));
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) enum SessionSupportStatus {
Undetermined,
Unsupported {
logical_session_timeout: Option<Duration>,
},
Supported { logical_session_timeout: Duration },
}
impl Default for SessionSupportStatus {
fn default() -> Self {
Self::Undetermined
}
}
impl SessionSupportStatus {
#[cfg(test)]
fn logical_session_timeout(&self) -> Option<Duration> {
match self {
Self::Undetermined => None,
Self::Unsupported {
logical_session_timeout,
} => *logical_session_timeout,
Self::Supported {
logical_session_timeout,
} => Some(*logical_session_timeout),
}
}
}
#[derive(Debug)]
pub(crate) struct TopologyDescriptionDiff {
pub(crate) new_addresses: HashSet<StreamAddress>,
}
fn verify_max_staleness(max_staleness: Option<Duration>) -> Result<()> {
if max_staleness
.map(|staleness| staleness > Duration::from_secs(0) && staleness < Duration::from_secs(90))
.unwrap_or(false)
{
return Err(ErrorKind::ArgumentError {
message: "max staleness cannot be both positive and below 90 seconds".into(),
}
.into());
}
Ok(())
}