use std::{
sync::Arc,
time::{Duration, Instant},
};
use serde::{Deserialize, Serialize};
use crate::{
bson::{doc, oid::ObjectId, DateTime, Document, Timestamp},
client::{
options::{ServerAddress, ServerApi},
ClusterTime,
},
cmap::{Command, Connection},
error::Result,
event::sdam::{
SdamEventHandler,
ServerHeartbeatFailedEvent,
ServerHeartbeatStartedEvent,
ServerHeartbeatSucceededEvent,
},
sdam::{ServerType, Topology},
selection_criteria::TagSet,
};
pub(crate) fn is_master_command(api: Option<&ServerApi>) -> Command {
let command_name = if api.is_some() { "hello" } else { "isMaster" };
let mut command = Command::new(
command_name.into(),
"admin".into(),
doc! { command_name: 1 },
);
if let Some(server_api) = api {
command.set_server_api(server_api);
}
command
}
pub(crate) async fn run_is_master(
conn: &mut Connection,
command: Command,
topology: Option<&Topology>,
handler: &Option<Arc<dyn SdamEventHandler>>,
) -> Result<IsMasterReply> {
emit_event(topology, handler, |handler| {
let event = ServerHeartbeatStartedEvent {
server_address: conn.address.clone(),
};
handler.handle_server_heartbeat_started_event(event);
});
let start_time = Instant::now();
let response_result = conn.send_command(command, None).await;
let end_time = Instant::now();
let round_trip_time = end_time.duration_since(start_time);
match response_result.and_then(|raw_response| {
let is_master_reply = raw_response.to_is_master_response(round_trip_time)?;
Ok((raw_response, is_master_reply))
}) {
Ok((raw_response, is_master_reply)) => {
emit_event(topology, handler, |handler| {
let mut reply = raw_response
.body::<Document>()
.unwrap_or_else(|e| doc! { "deserialization error": e.to_string() });
reply.remove("speculativeAuthenticate");
let event = ServerHeartbeatSucceededEvent {
duration: round_trip_time,
reply,
server_address: conn.address.clone(),
};
handler.handle_server_heartbeat_succeeded_event(event);
});
Ok(is_master_reply)
}
Err(err) => {
emit_event(topology, handler, |handler| {
let event = ServerHeartbeatFailedEvent {
duration: round_trip_time,
failure: err.clone(),
server_address: conn.address.clone(),
};
handler.handle_server_heartbeat_failed_event(event);
});
Err(err)
}
}
}
fn emit_event<F>(topology: Option<&Topology>, handler: &Option<Arc<dyn SdamEventHandler>>, emit: F)
where
F: FnOnce(&Arc<dyn SdamEventHandler>),
{
if let Some(handler) = handler {
if topology.is_some() {
emit(handler);
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct IsMasterReply {
pub server_address: ServerAddress,
pub command_response: IsMasterCommandResponse,
pub round_trip_time: Duration,
pub cluster_time: Option<ClusterTime>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct IsMasterCommandResponse {
pub is_writable_primary: Option<bool>,
#[serde(rename = "ismaster")]
pub is_master: Option<bool>,
pub hosts: Option<Vec<String>>,
pub passives: Option<Vec<String>>,
pub arbiters: Option<Vec<String>>,
pub msg: Option<String>,
pub me: Option<String>,
#[serde(rename = "compression")]
pub compressors: Option<Vec<String>>,
pub set_version: Option<i32>,
pub set_name: Option<String>,
pub hidden: Option<bool>,
pub secondary: Option<bool>,
pub arbiter_only: Option<bool>,
#[serde(rename = "isreplicaset")]
pub is_replica_set: Option<bool>,
pub logical_session_timeout_minutes: Option<i64>,
pub last_write: Option<LastWrite>,
pub min_wire_version: Option<i32>,
pub max_wire_version: Option<i32>,
pub tags: Option<TagSet>,
pub election_id: Option<ObjectId>,
pub primary: Option<String>,
pub sasl_supported_mechs: Option<Vec<String>>,
pub speculative_authenticate: Option<Document>,
pub max_bson_object_size: i64,
pub max_write_batch_size: i64,
pub service_id: Option<ObjectId>,
pub topology_version: Option<Document>,
}
impl PartialEq for IsMasterCommandResponse {
fn eq(&self, other: &Self) -> bool {
self.server_type() == other.server_type()
&& self.min_wire_version == other.min_wire_version
&& self.max_wire_version == other.max_wire_version
&& self.me == other.me
&& self.hosts == other.hosts
&& self.passives == other.passives
&& self.arbiters == other.arbiters
&& self.tags == other.tags
&& self.set_name == other.set_name
&& self.set_version == other.set_version
&& self.election_id == other.election_id
&& self.primary == other.primary
&& self.logical_session_timeout_minutes == other.logical_session_timeout_minutes
&& self.max_bson_object_size == other.max_bson_object_size
&& self.max_write_batch_size == other.max_write_batch_size
&& self.service_id == other.service_id
}
}
impl IsMasterCommandResponse {
pub(crate) fn server_type(&self) -> ServerType {
if self.msg.as_deref() == Some("isdbgrid") {
ServerType::Mongos
} else if self.set_name.is_some() {
if self.hidden == Some(true) {
ServerType::RsOther
} else if self.is_writable_primary == Some(true) || self.is_master == Some(true) {
ServerType::RsPrimary
} else if self.secondary == Some(true) {
ServerType::RsSecondary
} else if self.arbiter_only == Some(true) {
ServerType::RsArbiter
} else {
ServerType::RsOther
}
} else if self.is_replica_set == Some(true) {
ServerType::RsGhost
} else {
ServerType::Standalone
}
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct LastWrite {
pub last_write_date: DateTime,
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub(crate) struct OpTime {
ts: Timestamp,
t: i32,
}