use std::{sync::Weak, time::Duration};
use time::PreciseTime;
use super::{
description::server::{ServerDescription, ServerType},
state::{server::Server, Topology, WeakTopology},
};
use crate::{
bson::doc,
cmap::{Command, Connection},
error::Result,
is_master::IsMasterReply,
options::StreamAddress,
RUNTIME,
};
pub(super) const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
pub(crate) const MIN_HEARTBEAT_FREQUENCY: Duration = Duration::from_millis(500);
pub(super) struct Monitor {
address: StreamAddress,
connection: Option<Connection>,
server: Weak<Server>,
server_type: ServerType,
topology: WeakTopology,
}
impl Monitor {
pub(super) fn start(address: StreamAddress, server: Weak<Server>, topology: WeakTopology) {
let mut monitor = Self {
address,
connection: None,
server,
server_type: ServerType::Unknown,
topology,
};
RUNTIME.execute(async move {
monitor.execute().await;
});
}
async fn execute(&mut self) {
let heartbeat_frequency = self
.topology
.client_options()
.heartbeat_freq
.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY);
while self.topology.is_alive() {
if self.server.upgrade().is_none() {
break;
}
let topology = match self.topology.upgrade() {
Some(topology) => topology,
None => break,
};
if self.check_if_topology_changed(&topology).await {
topology.notify_topology_changed();
}
let min_frequency = self
.topology
.client_options()
.heartbeat_freq_test
.unwrap_or(MIN_HEARTBEAT_FREQUENCY);
RUNTIME.delay_for(min_frequency).await;
topology
.wait_for_topology_check_request(heartbeat_frequency - min_frequency)
.await;
}
}
async fn check_if_topology_changed(&mut self, topology: &Topology) -> bool {
let server_description = self.check_server().await;
self.server_type = server_description.server_type;
topology.update(server_description).await
}
async fn check_server(&mut self) -> ServerDescription {
let address = self.address.clone();
match self.perform_is_master().await {
Ok(reply) => ServerDescription::new(address, Some(Ok(reply))),
Err(e) => {
self.clear_connection_pool().await;
if self.server_type == ServerType::Unknown {
return ServerDescription::new(address, Some(Err(e)));
}
ServerDescription::new(address, Some(self.perform_is_master().await))
}
}
}
async fn perform_is_master(&mut self) -> Result<IsMasterReply> {
let connection = self.resolve_connection().await?;
let result = is_master(connection).await;
if result
.as_ref()
.err()
.map(|e| e.kind.is_network_error())
.unwrap_or(false)
{
self.connection.take();
}
result
}
async fn resolve_connection(&mut self) -> Result<&mut Connection> {
if let Some(ref mut connection) = self.connection {
return Ok(connection);
}
let connection = Connection::new_monitoring(
self.address.clone(),
self.topology.client_options().connect_timeout,
self.topology.client_options().tls_options(),
)
.await?;
Ok(self.connection.get_or_insert(connection))
}
async fn clear_connection_pool(&self) {
if let Some(server) = self.server.upgrade() {
server.clear_connection_pool().await;
}
}
}
async fn is_master(connection: &mut Connection) -> Result<IsMasterReply> {
let command = Command::new_read(
"isMaster".into(),
"admin".into(),
None,
doc! { "isMaster": 1 },
);
let start_time = PreciseTime::now();
let command_response = connection.send_command(command, None).await?;
let end_time = PreciseTime::now();
let is_master_response = command_response.body()?;
Ok(IsMasterReply {
command_response: is_master_response,
round_trip_time: Some(start_time.to(end_time).to_std().unwrap()),
cluster_time: command_response.cluster_time().cloned(),
})
}