Skip to main content

mongodb/sdam/description/
server.rs

1use std::time::Duration;
2
3use crate::bson::{bson, rawdoc, Bson, RawBson};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    bson::{oid::ObjectId, DateTime},
8    client::ClusterTime,
9    error::{Error, ErrorKind, Result},
10    hello::{HelloCommandResponse, HelloReply},
11    options::ServerAddress,
12    selection_criteria::TagSet,
13    serde_util,
14};
15
16const DRIVER_MIN_DB_VERSION: &str = "4.2";
17const DRIVER_MIN_WIRE_VERSION: i32 = 8;
18const DRIVER_MAX_WIRE_VERSION: i32 = 25;
19
20/// Enum representing the possible types of servers that the driver can connect to.
21#[derive(Debug, Deserialize, Clone, Copy, Eq, PartialEq, Serialize, Default)]
22#[non_exhaustive]
23pub enum ServerType {
24    /// A single, non-replica set mongod.
25    Standalone,
26
27    /// A router used in sharded deployments.
28    Mongos,
29
30    /// The primary node in a replica set.
31    #[serde(rename = "RSPrimary")]
32    RsPrimary,
33
34    /// A secondary node in a replica set.
35    #[serde(rename = "RSSecondary")]
36    RsSecondary,
37
38    /// A non-data bearing node in a replica set which can participate in elections.
39    #[serde(rename = "RSArbiter")]
40    RsArbiter,
41
42    /// Hidden, starting up, or recovering nodes in a replica set.
43    #[serde(rename = "RSOther")]
44    RsOther,
45
46    /// A member of an uninitialized replica set or a member that has been removed from the replica
47    /// set config.
48    #[serde(rename = "RSGhost")]
49    RsGhost,
50
51    /// A load-balancing proxy between the driver and the MongoDB deployment.
52    LoadBalancer,
53
54    /// A server that the driver hasn't yet communicated with or can't connect to.
55    #[serde(alias = "PossiblePrimary")]
56    #[default]
57    Unknown,
58}
59
60impl std::fmt::Display for ServerType {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            ServerType::Standalone => write!(f, "Standalone"),
64            ServerType::Mongos => write!(f, "Mongos"),
65            ServerType::RsPrimary => write!(f, "RSPrimary"),
66            ServerType::RsSecondary => write!(f, "RSSecondary"),
67            ServerType::RsArbiter => write!(f, "RSArbiter"),
68            ServerType::RsOther => write!(f, "RSOther"),
69            ServerType::RsGhost => write!(f, "RSGhost"),
70            ServerType::LoadBalancer => write!(f, "LoadBalancer"),
71            ServerType::Unknown => write!(f, "Unknown"),
72        }
73    }
74}
75
76impl ServerType {
77    pub(crate) fn can_auth(self) -> bool {
78        !matches!(self, ServerType::RsArbiter)
79    }
80
81    pub(crate) fn is_data_bearing(self) -> bool {
82        matches!(
83            self,
84            ServerType::Standalone
85                | ServerType::RsPrimary
86                | ServerType::RsSecondary
87                | ServerType::Mongos
88                | ServerType::LoadBalancer
89        )
90    }
91
92    pub(crate) fn is_available(self) -> bool {
93        !matches!(self, ServerType::Unknown)
94    }
95}
96
97/// Struct modeling the `topologyVersion` field included in the server's hello and legacy hello
98/// responses.
99#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
100#[serde(rename_all = "camelCase")]
101pub(crate) struct TopologyVersion {
102    pub(crate) process_id: ObjectId,
103    pub(crate) counter: i64,
104}
105
106impl TopologyVersion {
107    pub(crate) fn is_more_recent_than(&self, existing_tv: TopologyVersion) -> bool {
108        self.process_id != existing_tv.process_id || self.counter > existing_tv.counter
109    }
110}
111
112impl From<TopologyVersion> for Bson {
113    fn from(tv: TopologyVersion) -> Self {
114        bson!({
115            "processId": tv.process_id,
116            "counter": tv.counter
117        })
118    }
119}
120
121impl From<TopologyVersion> for RawBson {
122    fn from(tv: TopologyVersion) -> Self {
123        RawBson::Document(rawdoc! {
124            "processId": tv.process_id,
125            "counter": tv.counter
126        })
127    }
128}
129
130#[cfg(feature = "bson-3")]
131impl crate::bson::raw::BindRawBsonRef for TopologyVersion {
132    type Target = crate::bson::raw::BindValue<Self>;
133}
134
135/// A description of the most up-to-date information known about a server.
136#[derive(Debug, Clone, Serialize)]
137pub(crate) struct ServerDescription {
138    /// The address of this server.
139    pub(crate) address: ServerAddress,
140
141    /// The type of this server.
142    pub(crate) server_type: ServerType,
143
144    /// The last time this server was updated.
145    pub(crate) last_update_time: Option<DateTime>,
146
147    /// The average duration of this server's hello calls.
148    pub(crate) average_round_trip_time: Option<Duration>,
149
150    // The SDAM spec indicates that a ServerDescription needs to contain an error message if an
151    // error occurred when trying to send an hello for the server's heartbeat. Additionally,
152    // we need to be able to create a server description that doesn't contain either an hello
153    // reply or an error, since there's a gap between when a server is newly added to the topology
154    // and when the first heartbeat occurs.
155    //
156    // In order to represent all these states, we store a Result directly in the ServerDescription,
157    // which either contains the aforementioned error message or an Option<HelloReply>. This
158    // allows us to ensure that only valid states are possible (e.g. preventing that both an error
159    // and a reply are present) while still making it easy to define helper methods on
160    // ServerDescription for information we need from the hello reply by propagating with `?`.
161    #[serde(serialize_with = "serde_util::serialize_result_error_as_string")]
162    pub(crate) reply: Result<Option<HelloReply>>,
163}
164
165// Server description equality has a specific notion of what fields in a hello command response
166// should be compared (https://specifications.readthedocs.io/en/latest/server-discovery-and-monitoring/server-discovery-and-monitoring/#server-description-equality).
167fn hello_command_eq(a: &HelloCommandResponse, b: &HelloCommandResponse) -> bool {
168    a.server_type() == b.server_type()
169        && a.min_wire_version == b.min_wire_version
170        && a.max_wire_version == b.max_wire_version
171        && a.me == b.me
172        && a.hosts == b.hosts
173        && a.passives == b.passives
174        && a.arbiters == b.arbiters
175        && a.tags == b.tags
176        && a.set_name == b.set_name
177        && a.set_version == b.set_version
178        && a.election_id == b.election_id
179        && a.primary == b.primary
180        && a.logical_session_timeout_minutes == b.logical_session_timeout_minutes
181        && a.topology_version == b.topology_version
182}
183
184impl PartialEq for ServerDescription {
185    fn eq(&self, other: &Self) -> bool {
186        if self.address != other.address || self.server_type != other.server_type {
187            return false;
188        }
189
190        match (self.reply.as_ref(), other.reply.as_ref()) {
191            (Ok(self_reply), Ok(other_reply)) => {
192                let self_response = self_reply.as_ref().map(|r| &r.command_response);
193                let other_response = other_reply.as_ref().map(|r| &r.command_response);
194
195                match (self_response, other_response) {
196                    (Some(a), Some(b)) => hello_command_eq(a, b),
197                    (None, None) => true,
198                    _ => false,
199                }
200            }
201            (Err(self_err), Err(other_err)) => {
202                match (self_err.kind.as_ref(), other_err.kind.as_ref()) {
203                    (
204                        ErrorKind::Command(self_command_err),
205                        ErrorKind::Command(other_command_err),
206                    ) => self_command_err.code == other_command_err.code,
207                    _ => self_err.to_string() == other_err.to_string(),
208                }
209            }
210            _ => false,
211        }
212    }
213}
214
215impl ServerDescription {
216    pub(crate) fn new(address: &ServerAddress) -> Self {
217        Self {
218            address: match address {
219                ServerAddress::Tcp { host, port } => ServerAddress::Tcp {
220                    host: host.to_lowercase(),
221                    port: *port,
222                },
223                #[cfg(unix)]
224                ServerAddress::Unix { path } => ServerAddress::Unix { path: path.clone() },
225            },
226            server_type: Default::default(),
227            last_update_time: None,
228            reply: Ok(None),
229            average_round_trip_time: None,
230        }
231    }
232
233    pub(crate) fn new_from_hello_reply(
234        address: ServerAddress,
235        mut reply: HelloReply,
236        average_rtt: Duration,
237    ) -> Self {
238        let mut description = Self::new(&address);
239        description.average_round_trip_time = Some(average_rtt);
240        description.last_update_time = Some(DateTime::now());
241
242        // Infer the server type from the hello response.
243        description.server_type = reply.command_response.server_type();
244
245        // Normalize all instances of hostnames to lowercase.
246        if let Some(ref mut hosts) = reply.command_response.hosts {
247            let normalized_hostnames = hosts
248                .drain(..)
249                .map(|hostname| hostname.to_lowercase())
250                .collect();
251
252            *hosts = normalized_hostnames;
253        }
254
255        if let Some(ref mut passives) = reply.command_response.passives {
256            let normalized_hostnames = passives
257                .drain(..)
258                .map(|hostname| hostname.to_lowercase())
259                .collect();
260
261            *passives = normalized_hostnames;
262        }
263
264        if let Some(ref mut arbiters) = reply.command_response.arbiters {
265            let normalized_hostnames = arbiters
266                .drain(..)
267                .map(|hostname| hostname.to_lowercase())
268                .collect();
269
270            *arbiters = normalized_hostnames;
271        }
272
273        if let Some(ref mut me) = reply.command_response.me {
274            *me = me.to_lowercase();
275        }
276
277        description.reply = Ok(Some(reply));
278
279        description
280    }
281
282    pub(crate) fn new_from_error(address: ServerAddress, error: Error) -> Self {
283        let mut description = Self::new(&address);
284        description.last_update_time = Some(DateTime::now());
285        description.average_round_trip_time = None;
286        description.reply = Err(error);
287        description
288    }
289
290    /// Whether this server is "available" as per the definition in the server selection spec.
291    pub(crate) fn is_available(&self) -> bool {
292        self.server_type.is_available()
293    }
294
295    pub(crate) fn compatibility_error_message(&self) -> Option<String> {
296        if let Ok(Some(ref reply)) = self.reply {
297            let hello_min_wire_version = reply.command_response.min_wire_version.unwrap_or(0);
298
299            if hello_min_wire_version > DRIVER_MAX_WIRE_VERSION {
300                return Some(format!(
301                    "Server at {} requires wire version {}, but this version of the MongoDB Rust \
302                     driver only supports up to {}",
303                    self.address, hello_min_wire_version, DRIVER_MAX_WIRE_VERSION,
304                ));
305            }
306
307            let hello_max_wire_version = reply.command_response.max_wire_version.unwrap_or(0);
308
309            if hello_max_wire_version < DRIVER_MIN_WIRE_VERSION {
310                return Some(format!(
311                    "Server at {} reports wire version {}, but this version of the MongoDB Rust \
312                     driver requires at least {} (MongoDB {}).",
313                    self.address,
314                    hello_max_wire_version,
315                    DRIVER_MIN_WIRE_VERSION,
316                    DRIVER_MIN_DB_VERSION
317                ));
318            }
319        }
320
321        None
322    }
323
324    pub(crate) fn set_name(&self) -> Result<Option<String>> {
325        let set_name = self
326            .reply
327            .as_ref()
328            .map_err(Clone::clone)?
329            .as_ref()
330            .and_then(|reply| reply.command_response.set_name.clone());
331        Ok(set_name)
332    }
333
334    pub(crate) fn known_hosts(&self) -> Result<Vec<ServerAddress>> {
335        let known_hosts = self
336            .reply
337            .as_ref()
338            .map_err(Clone::clone)?
339            .as_ref()
340            .map(|reply| {
341                let hosts = reply.command_response.hosts.as_ref();
342                let passives = reply.command_response.passives.as_ref();
343                let arbiters = reply.command_response.arbiters.as_ref();
344
345                hosts
346                    .into_iter()
347                    .flatten()
348                    .chain(passives.into_iter().flatten())
349                    .chain(arbiters.into_iter().flatten())
350            });
351
352        known_hosts
353            .into_iter()
354            .flatten()
355            .map(ServerAddress::parse)
356            .collect()
357    }
358
359    pub(crate) fn invalid_me(&self) -> Result<bool> {
360        if let Some(ref reply) = self.reply.as_ref().map_err(Clone::clone)? {
361            if let Some(ref me) = reply.command_response.me {
362                return Ok(self.address != ServerAddress::parse(me)?);
363            }
364        }
365
366        Ok(false)
367    }
368
369    pub(crate) fn set_version(&self) -> Result<Option<i32>> {
370        let me = self
371            .reply
372            .as_ref()
373            .map_err(Clone::clone)?
374            .as_ref()
375            .and_then(|reply| reply.command_response.set_version);
376        Ok(me)
377    }
378
379    pub(crate) fn election_id(&self) -> Result<Option<ObjectId>> {
380        let me = self
381            .reply
382            .as_ref()
383            .map_err(Clone::clone)?
384            .as_ref()
385            .and_then(|reply| reply.command_response.election_id);
386        Ok(me)
387    }
388
389    #[cfg(test)]
390    pub(crate) fn min_wire_version(&self) -> Result<Option<i32>> {
391        let me = self
392            .reply
393            .as_ref()
394            .map_err(Clone::clone)?
395            .as_ref()
396            .and_then(|reply| reply.command_response.min_wire_version);
397        Ok(me)
398    }
399
400    pub(crate) fn max_wire_version(&self) -> Result<Option<i32>> {
401        let me = self
402            .reply
403            .as_ref()
404            .map_err(Clone::clone)?
405            .as_ref()
406            .and_then(|reply| reply.command_response.max_wire_version);
407        Ok(me)
408    }
409
410    pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
411        match self.reply {
412            Ok(None) => Ok(None),
413            Ok(Some(ref reply)) => Ok(reply
414                .command_response
415                .last_write
416                .as_ref()
417                .map(|write| write.last_write_date)),
418            Err(ref e) => Err(e.clone()),
419        }
420    }
421
422    pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>> {
423        match self.reply {
424            Ok(None) => Ok(None),
425            Ok(Some(ref reply)) => Ok(reply
426                .command_response
427                .logical_session_timeout_minutes
428                .map(|timeout| Duration::from_secs(timeout * 60))),
429            Err(ref e) => Err(e.clone()),
430        }
431    }
432
433    pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>> {
434        match self.reply {
435            Ok(None) => Ok(None),
436            Ok(Some(ref reply)) => Ok(reply.cluster_time.clone()),
437            Err(ref e) => Err(e.clone()),
438        }
439    }
440
441    pub(crate) fn topology_version(&self) -> Option<TopologyVersion> {
442        match self.reply {
443            Ok(None) => None,
444            Ok(Some(ref reply)) => reply.command_response.topology_version,
445            Err(ref e) => e.topology_version(),
446        }
447    }
448
449    pub(crate) fn matches_tag_set(&self, tag_set: &TagSet) -> bool {
450        // An empty tag set matches any server.
451        if tag_set.is_empty() {
452            return true;
453        }
454
455        let reply = match self.reply.as_ref() {
456            Ok(Some(ref reply)) => reply,
457            _ => return false,
458        };
459
460        let server_tags = match reply.command_response.tags {
461            Some(ref tags) => tags,
462            None => return false,
463        };
464
465        tag_set
466            .iter()
467            .all(|(key, val)| server_tags.get(key) == Some(val))
468    }
469}