pub(super) mod server;
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Condvar, Mutex, RwLock},
time::Duration,
};
use derivative::Derivative;
use self::server::Server;
use super::TopologyDescription;
use crate::{
cmap::{Command, Connection},
error::{Error, Result},
options::{ClientOptions, StreamAddress},
sdam::{
description::server::{ServerDescription, ServerType},
monitor::monitor_server,
},
selection_criteria::SelectionCriteria,
};
#[derive(Clone)]
pub(crate) struct TopologyUpdateCondvar {
condvar: Arc<Condvar>,
mutex: Arc<Mutex<()>>,
}
impl TopologyUpdateCondvar {
pub(crate) fn new() -> Self {
Self {
condvar: Arc::new(Condvar::new()),
mutex: Default::default(),
}
}
pub(crate) fn wait_timeout(&self, duration: Duration) {
let _ = self
.condvar
.wait_timeout(self.mutex.lock().unwrap(), duration)
.unwrap();
}
fn notify(&self) {
self.condvar.notify_all()
}
}
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub(crate) struct Topology {
pub(crate) description: TopologyDescription,
pub(crate) servers: HashMap<StreamAddress, Arc<Server>>,
#[derivative(Debug = "ignore")]
condvar: TopologyUpdateCondvar,
options: ClientOptions,
}
impl Topology {
pub(crate) fn new(
condvar: TopologyUpdateCondvar,
mut options: ClientOptions,
) -> Result<Arc<RwLock<Self>>> {
let description = TopologyDescription::new(options.clone())?;
let hosts: Vec<_> = options.hosts.drain(..).collect();
let topology = Arc::new(RwLock::new(Topology {
description,
servers: Default::default(),
condvar,
options,
}));
{
let mut topology_lock = topology.write().unwrap();
for address in hosts {
topology_lock.add_new_server(address, &topology)?;
}
}
Ok(topology)
}
pub(crate) fn notify(&self) {
self.condvar.notify()
}
pub(crate) fn update_command_with_read_pref(
&self,
server_address: &StreamAddress,
command: &mut Command,
criteria: Option<&SelectionCriteria>,
) {
let server_type = self
.description
.get_server_description(server_address)
.map(|desc| desc.server_type)
.unwrap_or(ServerType::Unknown);
self.description
.update_command_with_read_pref(server_type, command, criteria)
}
pub(crate) fn request_topology_check(&self) {
for server in self.servers.values() {
server.request_topology_check();
}
}
fn add_new_server(
&mut self,
address: StreamAddress,
wrapped_topology: &Arc<RwLock<Topology>>,
) -> Result<()> {
if self.servers.contains_key(&address) {
return Ok(());
}
let options = self.options.clone();
let server = Arc::new(Server::new(
Arc::downgrade(wrapped_topology),
address.clone(),
&options,
));
self.servers.insert(address, server.clone());
let conn = Connection::new(
0,
server.address.clone(),
0,
options.connect_timeout,
options.tls_options(),
options.cmap_event_handler.clone(),
)?;
monitor_server(conn, Arc::downgrade(&server), options.heartbeat_freq);
Ok(())
}
pub(crate) fn update_state(
&mut self,
server: ServerDescription,
wrapped_topology: &Arc<RwLock<Topology>>,
) -> Result<()> {
self.description.update(server)?;
let addresses: HashSet<_> = self.description.server_addresses().cloned().collect();
for address in addresses.iter() {
self.add_new_server(address.clone(), wrapped_topology)?;
}
self.servers
.retain(|address, _| addresses.contains(address));
Ok(())
}
}
pub(crate) fn handle_pre_handshake_error(
error: Error,
address: StreamAddress,
topology: Arc<RwLock<Topology>>,
) {
if error.is_network_error() {
mark_server_as_unknown(error, address, topology);
}
}
pub(crate) fn handle_post_handshake_error(
error: Error,
conn: Connection,
server: Arc<Server>,
topology: Arc<RwLock<Topology>>,
) {
if error.is_non_timeout_network_error() {
mark_server_as_unknown(error, server.address.clone(), topology);
server.clear_connection_pool();
} else if error.is_recovering() || error.is_not_master() {
mark_server_as_unknown(error.clone(), server.address.clone(), topology);
server.request_topology_check();
let wire_version = conn
.stream_description()
.map(|sd| sd.max_wire_version)
.ok()
.and_then(std::convert::identity)
.unwrap_or(0);
if wire_version < 8 || error.is_shutting_down() {
server.clear_connection_pool();
}
}
}
fn mark_server_as_unknown(error: Error, address: StreamAddress, topology: Arc<RwLock<Topology>>) {
let description = ServerDescription::new(address, Some(Err(error)));
update_topology(topology, description);
}
pub(crate) fn update_topology(
topology: Arc<RwLock<Topology>>,
server_description: ServerDescription,
) {
let mut topology_clone = topology.read().unwrap().clone();
let _ = topology_clone.update_state(server_description, &topology);
let mut topology_lock = topology.write().unwrap();
topology_lock.description = topology_clone.description;
topology_lock.servers = topology_clone.servers;
topology_lock.notify();
}