use std::{
sync::{Arc, Weak},
time::Duration,
};
use bson::{bson, doc};
use lazy_static::lazy_static;
use time::PreciseTime;
use super::{
description::server::{ServerDescription, ServerType},
state::server::Server,
};
use crate::{
cmap::{Command, Connection},
error::Result,
is_master::IsMasterReply,
sdam::update_topology,
};
const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
lazy_static! {
pub(crate) static ref MIN_HEARTBEAT_FREQUENCY: time::Duration = time::Duration::milliseconds(500);
}
pub(super) fn monitor_server(
mut conn: Connection,
server: Weak<Server>,
heartbeat_frequency: Option<Duration>,
) {
std::thread::spawn(move || {
let mut server_type = ServerType::Unknown;
let heartbeat_frequency = heartbeat_frequency.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY);
loop {
server_type = match monitor_server_check(&mut conn, server_type, &server) {
Some(server_type) => server_type,
None => return,
};
let last_check = PreciseTime::now();
let timed_out = match server.upgrade() {
Some(server) => server.wait_timeout(heartbeat_frequency),
None => return,
};
if !timed_out {
let duration_since_last_check = last_check.to(PreciseTime::now());
if duration_since_last_check < *MIN_HEARTBEAT_FREQUENCY {
let remaining_time = *MIN_HEARTBEAT_FREQUENCY - duration_since_last_check;
if let Ok(remaining_time) = remaining_time.to_std() {
std::thread::sleep(remaining_time);
}
}
}
}
});
}
fn monitor_server_check(
conn: &mut Connection,
mut server_type: ServerType,
server: &Weak<Server>,
) -> Option<ServerType> {
let server = match server.upgrade() {
Some(server) => server,
None => return None,
};
let topology = match server.topology() {
Some(topology) => topology,
None => return None,
};
let server_description = check_server(conn, server_type, &server);
server_type = server_description.server_type;
update_topology(topology, server_description);
Some(server_type)
}
fn check_server(
conn: &mut Connection,
server_type: ServerType,
server: &Arc<Server>,
) -> ServerDescription {
let address = conn.address().clone();
match is_master(conn) {
Ok(reply) => return ServerDescription::new(address, Some(Ok(reply))),
Err(e) => {
server.clear_connection_pool();
if server_type == ServerType::Unknown {
return ServerDescription::new(address, Some(Err(e)));
}
}
}
ServerDescription::new(address, Some(is_master(conn)))
}
fn is_master(conn: &mut Connection) -> Result<IsMasterReply> {
let command = Command::new_read(
"isMaster".into(),
"admin".into(),
None,
doc! { "isMaster": 1 },
);
let start_time = PreciseTime::now();
let command_response = conn.send_command(command, None)?;
let end_time = PreciseTime::now();
let command_response = command_response.body()?;
Ok(IsMasterReply {
command_response,
round_trip_time: Some(start_time.to(end_time).to_std().unwrap()),
})
}