1use std::time::Duration;
2
3use bson::{rawdoc, RawDocumentBuf};
4use serde::{Deserialize, Serialize};
5use tokio::sync::broadcast;
6
7use crate::{
8 bson::{doc, oid::ObjectId, DateTime, Document, Timestamp},
9 client::{
10 options::{ServerAddress, ServerApi},
11 ClusterTime,
12 },
13 cmap::{Command, Connection},
14 error::Result,
15 sdam::{ServerType, TopologyVersion},
16 selection_criteria::TagSet,
17};
18
19pub(crate) const LEGACY_HELLO_COMMAND_NAME: &str = "isMaster";
23pub(crate) const LEGACY_HELLO_COMMAND_NAME_LOWERCASE: &str = "ismaster";
24
25#[derive(Debug, Clone, Copy)]
26pub(crate) struct AwaitableHelloOptions {
27 pub(crate) topology_version: TopologyVersion,
28 pub(crate) max_await_time: Duration,
29}
30
31pub(crate) fn hello_command(
38 server_api: Option<&ServerApi>,
39 load_balanced: Option<bool>,
40 hello_ok: Option<bool>,
41 awaitable_options: Option<AwaitableHelloOptions>,
42) -> Command {
43 let (mut body, command_name) = if server_api.is_some()
44 || matches!(load_balanced, Some(true))
45 || matches!(hello_ok, Some(true))
46 {
47 (rawdoc! { "hello": 1 }, "hello")
48 } else {
49 let mut body = rawdoc! { LEGACY_HELLO_COMMAND_NAME: 1 };
50 if hello_ok.is_none() {
51 body.append("helloOk", true);
52 }
53 (body, LEGACY_HELLO_COMMAND_NAME)
54 };
55
56 if let Some(opts) = awaitable_options {
57 body.append("topologyVersion", opts.topology_version);
58 body.append(
59 "maxAwaitTimeMS",
60 opts.max_await_time
61 .as_millis()
62 .try_into()
63 .unwrap_or(i64::MAX),
64 );
65 }
66
67 let mut command = Command::new(command_name, "admin", body);
68 if let Some(server_api) = server_api {
69 command.set_server_api(server_api);
70 }
71 command.exhaust_allowed = awaitable_options.is_some();
72 command
73}
74
75pub(crate) async fn run_hello(
77 conn: &mut Connection,
78 command: Command,
79 mut cancellation_receiver: Option<broadcast::Receiver<()>>,
80) -> Result<HelloReply> {
81 let response_result = match cancellation_receiver {
82 Some(ref mut cancellation_receiver) => {
83 conn.send_message_with_cancellation(command, cancellation_receiver)
84 .await
85 }
86 None => conn.send_message(command).await,
87 };
88 response_result.and_then(|raw_response| raw_response.into_hello_reply())
89}
90
91#[derive(Debug, Clone, Serialize)]
92pub(crate) struct HelloReply {
93 pub(crate) server_address: ServerAddress,
94 pub(crate) command_response: HelloCommandResponse,
95 pub(crate) raw_command_response: RawDocumentBuf,
96 pub(crate) cluster_time: Option<ClusterTime>,
97}
98
99#[derive(Debug, Clone, Default, Deserialize, Serialize)]
103#[serde(rename_all = "camelCase")]
104pub(crate) struct HelloCommandResponse {
105 pub is_writable_primary: Option<bool>,
108
109 #[serde(rename = "ismaster")]
110 pub is_master: Option<bool>,
112
113 pub hello_ok: Option<bool>,
116
117 pub hosts: Option<Vec<String>>,
119
120 pub passives: Option<Vec<String>>,
122
123 pub arbiters: Option<Vec<String>>,
125
126 pub msg: Option<String>,
128
129 pub me: Option<String>,
131
132 #[serde(rename = "compression")]
133 pub compressors: Option<Vec<String>>,
135
136 pub set_version: Option<i32>,
138
139 pub set_name: Option<String>,
141
142 pub hidden: Option<bool>,
144
145 pub secondary: Option<bool>,
147
148 pub arbiter_only: Option<bool>,
150
151 #[serde(rename = "isreplicaset")]
152 pub is_replica_set: Option<bool>,
154
155 pub logical_session_timeout_minutes: Option<i64>,
157
158 pub last_write: Option<LastWrite>,
160
161 pub min_wire_version: Option<i32>,
163
164 pub max_wire_version: Option<i32>,
166
167 pub tags: Option<TagSet>,
169
170 pub election_id: Option<ObjectId>,
172
173 pub primary: Option<String>,
175
176 pub sasl_supported_mechs: Option<Vec<String>>,
178
179 pub speculative_authenticate: Option<Document>,
181
182 pub max_bson_object_size: i64,
184
185 pub max_write_batch_size: Option<i64>,
187
188 pub service_id: Option<ObjectId>,
190
191 pub topology_version: Option<TopologyVersion>,
193
194 pub max_message_size_bytes: i32,
196
197 pub connection_id: Option<i64>,
200}
201
202impl HelloCommandResponse {
203 pub(crate) fn server_type(&self) -> ServerType {
204 if self.msg.as_deref() == Some("isdbgrid") {
205 ServerType::Mongos
206 } else if self.set_name.is_some() {
207 if self.hidden == Some(true) {
208 ServerType::RsOther
209 } else if self.is_writable_primary == Some(true) || self.is_master == Some(true) {
210 ServerType::RsPrimary
211 } else if self.secondary == Some(true) {
212 ServerType::RsSecondary
213 } else if self.arbiter_only == Some(true) {
214 ServerType::RsArbiter
215 } else {
216 ServerType::RsOther
217 }
218 } else if self.is_replica_set == Some(true) {
219 ServerType::RsGhost
220 } else {
221 ServerType::Standalone
222 }
223 }
224}
225
226#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
227#[serde(rename_all = "camelCase")]
228pub(crate) struct LastWrite {
229 pub last_write_date: DateTime,
230}
231
232#[derive(Debug, Clone, PartialEq, Deserialize)]
233pub(crate) struct OpTime {
234 ts: Timestamp,
235 t: i32,
236}