mongodb/sdam/description/
server.rs1use std::time::Duration;
2
3use crate::bson::{bson, rawdoc, Bson, RawBson};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 bson::{oid::ObjectId, DateTime},
8 client::ClusterTime,
9 error::{Error, ErrorKind, Result},
10 hello::{HelloCommandResponse, HelloReply},
11 options::ServerAddress,
12 selection_criteria::TagSet,
13 serde_util,
14};
15
16const DRIVER_MIN_DB_VERSION: &str = "4.2";
17const DRIVER_MIN_WIRE_VERSION: i32 = 8;
18const DRIVER_MAX_WIRE_VERSION: i32 = 25;
19
20#[derive(Debug, Deserialize, Clone, Copy, Eq, PartialEq, Serialize, Default)]
22#[non_exhaustive]
23pub enum ServerType {
24 Standalone,
26
27 Mongos,
29
30 #[serde(rename = "RSPrimary")]
32 RsPrimary,
33
34 #[serde(rename = "RSSecondary")]
36 RsSecondary,
37
38 #[serde(rename = "RSArbiter")]
40 RsArbiter,
41
42 #[serde(rename = "RSOther")]
44 RsOther,
45
46 #[serde(rename = "RSGhost")]
49 RsGhost,
50
51 LoadBalancer,
53
54 #[serde(alias = "PossiblePrimary")]
56 #[default]
57 Unknown,
58}
59
60impl std::fmt::Display for ServerType {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match self {
63 ServerType::Standalone => write!(f, "Standalone"),
64 ServerType::Mongos => write!(f, "Mongos"),
65 ServerType::RsPrimary => write!(f, "RSPrimary"),
66 ServerType::RsSecondary => write!(f, "RSSecondary"),
67 ServerType::RsArbiter => write!(f, "RSArbiter"),
68 ServerType::RsOther => write!(f, "RSOther"),
69 ServerType::RsGhost => write!(f, "RSGhost"),
70 ServerType::LoadBalancer => write!(f, "LoadBalancer"),
71 ServerType::Unknown => write!(f, "Unknown"),
72 }
73 }
74}
75
76impl ServerType {
77 pub(crate) fn can_auth(self) -> bool {
78 !matches!(self, ServerType::RsArbiter)
79 }
80
81 pub(crate) fn is_data_bearing(self) -> bool {
82 matches!(
83 self,
84 ServerType::Standalone
85 | ServerType::RsPrimary
86 | ServerType::RsSecondary
87 | ServerType::Mongos
88 | ServerType::LoadBalancer
89 )
90 }
91
92 pub(crate) fn is_available(self) -> bool {
93 !matches!(self, ServerType::Unknown)
94 }
95}
96
97#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
100#[serde(rename_all = "camelCase")]
101pub(crate) struct TopologyVersion {
102 pub(crate) process_id: ObjectId,
103 pub(crate) counter: i64,
104}
105
106impl TopologyVersion {
107 pub(crate) fn is_more_recent_than(&self, existing_tv: TopologyVersion) -> bool {
108 self.process_id != existing_tv.process_id || self.counter > existing_tv.counter
109 }
110}
111
112impl From<TopologyVersion> for Bson {
113 fn from(tv: TopologyVersion) -> Self {
114 bson!({
115 "processId": tv.process_id,
116 "counter": tv.counter
117 })
118 }
119}
120
121impl From<TopologyVersion> for RawBson {
122 fn from(tv: TopologyVersion) -> Self {
123 RawBson::Document(rawdoc! {
124 "processId": tv.process_id,
125 "counter": tv.counter
126 })
127 }
128}
129
130#[cfg(feature = "bson-3")]
131impl crate::bson::raw::BindRawBsonRef for TopologyVersion {
132 type Target = crate::bson::raw::BindValue<Self>;
133}
134
135#[derive(Debug, Clone, Serialize)]
137pub(crate) struct ServerDescription {
138 pub(crate) address: ServerAddress,
140
141 pub(crate) server_type: ServerType,
143
144 pub(crate) last_update_time: Option<DateTime>,
146
147 pub(crate) average_round_trip_time: Option<Duration>,
149
150 #[serde(serialize_with = "serde_util::serialize_result_error_as_string")]
162 pub(crate) reply: Result<Option<HelloReply>>,
163}
164
165fn hello_command_eq(a: &HelloCommandResponse, b: &HelloCommandResponse) -> bool {
168 a.server_type() == b.server_type()
169 && a.min_wire_version == b.min_wire_version
170 && a.max_wire_version == b.max_wire_version
171 && a.me == b.me
172 && a.hosts == b.hosts
173 && a.passives == b.passives
174 && a.arbiters == b.arbiters
175 && a.tags == b.tags
176 && a.set_name == b.set_name
177 && a.set_version == b.set_version
178 && a.election_id == b.election_id
179 && a.primary == b.primary
180 && a.logical_session_timeout_minutes == b.logical_session_timeout_minutes
181 && a.topology_version == b.topology_version
182}
183
184impl PartialEq for ServerDescription {
185 fn eq(&self, other: &Self) -> bool {
186 if self.address != other.address || self.server_type != other.server_type {
187 return false;
188 }
189
190 match (self.reply.as_ref(), other.reply.as_ref()) {
191 (Ok(self_reply), Ok(other_reply)) => {
192 let self_response = self_reply.as_ref().map(|r| &r.command_response);
193 let other_response = other_reply.as_ref().map(|r| &r.command_response);
194
195 match (self_response, other_response) {
196 (Some(a), Some(b)) => hello_command_eq(a, b),
197 (None, None) => true,
198 _ => false,
199 }
200 }
201 (Err(self_err), Err(other_err)) => {
202 match (self_err.kind.as_ref(), other_err.kind.as_ref()) {
203 (
204 ErrorKind::Command(self_command_err),
205 ErrorKind::Command(other_command_err),
206 ) => self_command_err.code == other_command_err.code,
207 _ => self_err.to_string() == other_err.to_string(),
208 }
209 }
210 _ => false,
211 }
212 }
213}
214
215impl ServerDescription {
216 pub(crate) fn new(address: &ServerAddress) -> Self {
217 Self {
218 address: match address {
219 ServerAddress::Tcp { host, port } => ServerAddress::Tcp {
220 host: host.to_lowercase(),
221 port: *port,
222 },
223 #[cfg(unix)]
224 ServerAddress::Unix { path } => ServerAddress::Unix { path: path.clone() },
225 },
226 server_type: Default::default(),
227 last_update_time: None,
228 reply: Ok(None),
229 average_round_trip_time: None,
230 }
231 }
232
233 pub(crate) fn new_from_hello_reply(
234 address: ServerAddress,
235 mut reply: HelloReply,
236 average_rtt: Duration,
237 ) -> Self {
238 let mut description = Self::new(&address);
239 description.average_round_trip_time = Some(average_rtt);
240 description.last_update_time = Some(DateTime::now());
241
242 description.server_type = reply.command_response.server_type();
244
245 if let Some(ref mut hosts) = reply.command_response.hosts {
247 let normalized_hostnames = hosts
248 .drain(..)
249 .map(|hostname| hostname.to_lowercase())
250 .collect();
251
252 *hosts = normalized_hostnames;
253 }
254
255 if let Some(ref mut passives) = reply.command_response.passives {
256 let normalized_hostnames = passives
257 .drain(..)
258 .map(|hostname| hostname.to_lowercase())
259 .collect();
260
261 *passives = normalized_hostnames;
262 }
263
264 if let Some(ref mut arbiters) = reply.command_response.arbiters {
265 let normalized_hostnames = arbiters
266 .drain(..)
267 .map(|hostname| hostname.to_lowercase())
268 .collect();
269
270 *arbiters = normalized_hostnames;
271 }
272
273 if let Some(ref mut me) = reply.command_response.me {
274 *me = me.to_lowercase();
275 }
276
277 description.reply = Ok(Some(reply));
278
279 description
280 }
281
282 pub(crate) fn new_from_error(address: ServerAddress, error: Error) -> Self {
283 let mut description = Self::new(&address);
284 description.last_update_time = Some(DateTime::now());
285 description.average_round_trip_time = None;
286 description.reply = Err(error);
287 description
288 }
289
290 pub(crate) fn is_available(&self) -> bool {
292 self.server_type.is_available()
293 }
294
295 pub(crate) fn compatibility_error_message(&self) -> Option<String> {
296 if let Ok(Some(ref reply)) = self.reply {
297 let hello_min_wire_version = reply.command_response.min_wire_version.unwrap_or(0);
298
299 if hello_min_wire_version > DRIVER_MAX_WIRE_VERSION {
300 return Some(format!(
301 "Server at {} requires wire version {}, but this version of the MongoDB Rust \
302 driver only supports up to {}",
303 self.address, hello_min_wire_version, DRIVER_MAX_WIRE_VERSION,
304 ));
305 }
306
307 let hello_max_wire_version = reply.command_response.max_wire_version.unwrap_or(0);
308
309 if hello_max_wire_version < DRIVER_MIN_WIRE_VERSION {
310 return Some(format!(
311 "Server at {} reports wire version {}, but this version of the MongoDB Rust \
312 driver requires at least {} (MongoDB {}).",
313 self.address,
314 hello_max_wire_version,
315 DRIVER_MIN_WIRE_VERSION,
316 DRIVER_MIN_DB_VERSION
317 ));
318 }
319 }
320
321 None
322 }
323
324 pub(crate) fn set_name(&self) -> Result<Option<String>> {
325 let set_name = self
326 .reply
327 .as_ref()
328 .map_err(Clone::clone)?
329 .as_ref()
330 .and_then(|reply| reply.command_response.set_name.clone());
331 Ok(set_name)
332 }
333
334 pub(crate) fn known_hosts(&self) -> Result<Vec<ServerAddress>> {
335 let known_hosts = self
336 .reply
337 .as_ref()
338 .map_err(Clone::clone)?
339 .as_ref()
340 .map(|reply| {
341 let hosts = reply.command_response.hosts.as_ref();
342 let passives = reply.command_response.passives.as_ref();
343 let arbiters = reply.command_response.arbiters.as_ref();
344
345 hosts
346 .into_iter()
347 .flatten()
348 .chain(passives.into_iter().flatten())
349 .chain(arbiters.into_iter().flatten())
350 });
351
352 known_hosts
353 .into_iter()
354 .flatten()
355 .map(ServerAddress::parse)
356 .collect()
357 }
358
359 pub(crate) fn invalid_me(&self) -> Result<bool> {
360 if let Some(ref reply) = self.reply.as_ref().map_err(Clone::clone)? {
361 if let Some(ref me) = reply.command_response.me {
362 return Ok(self.address != ServerAddress::parse(me)?);
363 }
364 }
365
366 Ok(false)
367 }
368
369 pub(crate) fn set_version(&self) -> Result<Option<i32>> {
370 let me = self
371 .reply
372 .as_ref()
373 .map_err(Clone::clone)?
374 .as_ref()
375 .and_then(|reply| reply.command_response.set_version);
376 Ok(me)
377 }
378
379 pub(crate) fn election_id(&self) -> Result<Option<ObjectId>> {
380 let me = self
381 .reply
382 .as_ref()
383 .map_err(Clone::clone)?
384 .as_ref()
385 .and_then(|reply| reply.command_response.election_id);
386 Ok(me)
387 }
388
389 #[cfg(test)]
390 pub(crate) fn min_wire_version(&self) -> Result<Option<i32>> {
391 let me = self
392 .reply
393 .as_ref()
394 .map_err(Clone::clone)?
395 .as_ref()
396 .and_then(|reply| reply.command_response.min_wire_version);
397 Ok(me)
398 }
399
400 pub(crate) fn max_wire_version(&self) -> Result<Option<i32>> {
401 let me = self
402 .reply
403 .as_ref()
404 .map_err(Clone::clone)?
405 .as_ref()
406 .and_then(|reply| reply.command_response.max_wire_version);
407 Ok(me)
408 }
409
410 pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
411 match self.reply {
412 Ok(None) => Ok(None),
413 Ok(Some(ref reply)) => Ok(reply
414 .command_response
415 .last_write
416 .as_ref()
417 .map(|write| write.last_write_date)),
418 Err(ref e) => Err(e.clone()),
419 }
420 }
421
422 pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>> {
423 match self.reply {
424 Ok(None) => Ok(None),
425 Ok(Some(ref reply)) => Ok(reply
426 .command_response
427 .logical_session_timeout_minutes
428 .map(|timeout| Duration::from_secs(timeout * 60))),
429 Err(ref e) => Err(e.clone()),
430 }
431 }
432
433 pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>> {
434 match self.reply {
435 Ok(None) => Ok(None),
436 Ok(Some(ref reply)) => Ok(reply.cluster_time.clone()),
437 Err(ref e) => Err(e.clone()),
438 }
439 }
440
441 pub(crate) fn topology_version(&self) -> Option<TopologyVersion> {
442 match self.reply {
443 Ok(None) => None,
444 Ok(Some(ref reply)) => reply.command_response.topology_version,
445 Err(ref e) => e.topology_version(),
446 }
447 }
448
449 pub(crate) fn matches_tag_set(&self, tag_set: &TagSet) -> bool {
450 if tag_set.is_empty() {
452 return true;
453 }
454
455 let reply = match self.reply.as_ref() {
456 Ok(Some(ref reply)) => reply,
457 _ => return false,
458 };
459
460 let server_tags = match reply.command_response.tags {
461 Some(ref tags) => tags,
462 None => return false,
463 };
464
465 tag_set
466 .iter()
467 .all(|(key, val)| server_tags.get(key) == Some(val))
468 }
469}