use std::time::Duration;
use bson::RawDocumentBuf;
use serde::{Deserialize, Serialize};
use crate::{
bson::{doc, oid::ObjectId, DateTime, Document, Timestamp},
client::{
options::{ServerAddress, ServerApi},
ClusterTime,
},
cmap::{Command, Connection},
error::Result,
sdam::{ServerType, TopologyVersion},
selection_criteria::TagSet,
};
pub(crate) const LEGACY_HELLO_COMMAND_NAME: &str = "isMaster";
pub(crate) const LEGACY_HELLO_COMMAND_NAME_LOWERCASE: &str = "ismaster";
#[derive(Debug, Clone, Copy)]
pub(crate) struct AwaitableHelloOptions {
pub(crate) topology_version: TopologyVersion,
pub(crate) max_await_time: Duration,
}
pub(crate) fn hello_command(
server_api: Option<&ServerApi>,
load_balanced: Option<bool>,
hello_ok: Option<bool>,
awaitable_options: Option<AwaitableHelloOptions>,
) -> Command {
let (mut command, command_name) = if server_api.is_some()
|| matches!(load_balanced, Some(true))
|| matches!(hello_ok, Some(true))
{
(doc! { "hello": 1 }, "hello")
} else {
let mut cmd = doc! { LEGACY_HELLO_COMMAND_NAME: 1 };
if hello_ok.is_none() {
cmd.insert("helloOk", true);
}
(cmd, LEGACY_HELLO_COMMAND_NAME)
};
if let Some(opts) = awaitable_options {
command.insert("topologyVersion", opts.topology_version);
command.insert(
"maxAwaitTimeMS",
opts.max_await_time
.as_millis()
.try_into()
.unwrap_or(i64::MAX),
);
}
let mut command = Command::new(command_name, "admin", command);
if let Some(server_api) = server_api {
command.set_server_api(server_api);
}
command.exhaust_allowed = awaitable_options.is_some();
command
}
pub(crate) async fn run_hello(conn: &mut Connection, command: Command) -> Result<HelloReply> {
let response_result = conn.send_command(command, None).await;
response_result.and_then(|raw_response| raw_response.into_hello_reply())
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct HelloReply {
pub(crate) server_address: ServerAddress,
pub(crate) command_response: HelloCommandResponse,
pub(crate) raw_command_response: RawDocumentBuf,
pub(crate) cluster_time: Option<ClusterTime>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct HelloCommandResponse {
pub is_writable_primary: Option<bool>,
#[serde(rename = "ismaster")]
pub is_master: Option<bool>,
pub hello_ok: Option<bool>,
pub hosts: Option<Vec<String>>,
pub passives: Option<Vec<String>>,
pub arbiters: Option<Vec<String>>,
pub msg: Option<String>,
pub me: Option<String>,
#[serde(rename = "compression")]
pub compressors: Option<Vec<String>>,
pub set_version: Option<i32>,
pub set_name: Option<String>,
pub hidden: Option<bool>,
pub secondary: Option<bool>,
pub arbiter_only: Option<bool>,
#[serde(rename = "isreplicaset")]
pub is_replica_set: Option<bool>,
pub logical_session_timeout_minutes: Option<i64>,
pub last_write: Option<LastWrite>,
pub min_wire_version: Option<i32>,
pub max_wire_version: Option<i32>,
pub tags: Option<TagSet>,
pub election_id: Option<ObjectId>,
pub primary: Option<String>,
pub sasl_supported_mechs: Option<Vec<String>>,
pub speculative_authenticate: Option<Document>,
pub max_bson_object_size: i64,
pub max_write_batch_size: Option<i64>,
pub service_id: Option<ObjectId>,
pub topology_version: Option<TopologyVersion>,
pub max_message_size_bytes: i32,
#[serde(deserialize_with = "deserialize_connection_id", default)]
pub connection_id: Option<i64>,
}
#[allow(clippy::cast_possible_truncation)]
fn deserialize_connection_id<'de, D: serde::Deserializer<'de>>(
de: D,
) -> std::result::Result<Option<i64>, D::Error> {
#[derive(Deserialize)]
#[serde(untagged)]
enum Helper {
Int32(i32),
Int64(i64),
Double(f64),
}
Ok(Some(match Helper::deserialize(de)? {
Helper::Int32(v) => v as i64,
Helper::Int64(v) => v,
Helper::Double(v) => v as i64,
}))
}
impl HelloCommandResponse {
pub(crate) fn server_type(&self) -> ServerType {
if self.msg.as_deref() == Some("isdbgrid") {
ServerType::Mongos
} else if self.set_name.is_some() {
if self.hidden == Some(true) {
ServerType::RsOther
} else if self.is_writable_primary == Some(true) || self.is_master == Some(true) {
ServerType::RsPrimary
} else if self.secondary == Some(true) {
ServerType::RsSecondary
} else if self.arbiter_only == Some(true) {
ServerType::RsArbiter
} else {
ServerType::RsOther
}
} else if self.is_replica_set == Some(true) {
ServerType::RsGhost
} else {
ServerType::Standalone
}
}
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct LastWrite {
pub last_write_date: DateTime,
}
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub(crate) struct OpTime {
ts: Timestamp,
t: i32,
}