Skip to main content

mongodb/client/
options.rs

1#[cfg(test)]
2mod test;
3
4mod bulk_write;
5mod parse;
6mod resolver_config;
7
8use std::{
9    cmp::Ordering,
10    collections::{HashMap, HashSet},
11    convert::TryFrom,
12    fmt::{self, Display, Formatter, Write},
13    hash::{Hash, Hasher},
14    net::{Ipv4Addr, Ipv6Addr, SocketAddr},
15    path::PathBuf,
16    str::FromStr,
17    time::Duration,
18};
19
20use derive_where::derive_where;
21use macro_magic::export_tokens;
22use serde::{de::Unexpected, Deserialize, Deserializer, Serialize, Serializer};
23use serde_with::skip_serializing_none;
24use std::sync::LazyLock;
25use strsim::jaro_winkler;
26use typed_builder::TypedBuilder;
27
28#[cfg(any(
29    feature = "zstd-compression",
30    feature = "zlib-compression",
31    feature = "snappy-compression"
32))]
33use crate::options::Compressor;
34#[cfg(test)]
35use crate::srv::LookupHosts;
36use crate::{
37    bson::{doc, Bson, Document, Timestamp, UuidRepresentation},
38    client::auth::{AuthMechanism, Credential},
39    concern::{Acknowledgment, ReadConcern, WriteConcern},
40    error::{Error, ErrorKind, Result},
41    event::EventHandler,
42    options::ReadConcernLevel,
43    sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY},
44    selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
45    serde_util::{self, write_concern_is_empty},
46    srv::{OriginalSrvInfo, SrvResolver},
47};
48
49pub use bulk_write::*;
50#[cfg(feature = "dns-resolver")]
51pub use resolver_config::ResolverConfig;
52#[cfg(not(feature = "dns-resolver"))]
53pub(crate) use resolver_config::ResolverConfig;
54
55pub(crate) const DEFAULT_PORT: u16 = 27017;
56
57const TLS_INSECURE: &str = "tlsinsecure";
58const TLS_ALLOW_INVALID_CERTIFICATES: &str = "tlsallowinvalidcertificates";
59#[cfg(feature = "openssl-tls")]
60const TLS_ALLOW_INVALID_HOSTNAMES: &str = "tlsallowinvalidhostnames";
61const PROXY_HOST: &str = "proxyhost";
62const PROXY_PORT: &str = "proxyport";
63const PROXY_USERNAME: &str = "proxyusername";
64const PROXY_PASSWORD: &str = "proxypassword";
65const URI_OPTIONS: &[&str] = &[
66    "appname",
67    "authmechanism",
68    "authsource",
69    "authmechanismproperties",
70    "compressors",
71    "connecttimeoutms",
72    "directconnection",
73    "heartbeatfrequencyms",
74    "journal",
75    "localthresholdms",
76    "maxidletimems",
77    "maxstalenessseconds",
78    "maxpoolsize",
79    "minpoolsize",
80    "maxconnecting",
81    PROXY_HOST,
82    PROXY_PORT,
83    PROXY_USERNAME,
84    PROXY_PASSWORD,
85    "readconcernlevel",
86    "readpreference",
87    "readpreferencetags",
88    "replicaset",
89    "retrywrites",
90    "retryreads",
91    "servermonitoringmode",
92    "serverselectiontimeoutms",
93    "sockettimeoutms",
94    "tls",
95    "ssl",
96    TLS_INSECURE,
97    TLS_ALLOW_INVALID_CERTIFICATES,
98    "tlscafile",
99    "tlscertificatekeyfile",
100    "uuidRepresentation",
101    "w",
102    "waitqueuetimeoutms",
103    "wtimeoutms",
104    "zlibcompressionlevel",
105    "srvservicename",
106];
107
108/// Reserved characters as defined by [Section 2.2 of RFC-3986](https://tools.ietf.org/html/rfc3986#section-2.2).
109/// Usernames / passwords that contain these characters must instead include the URL encoded version
110/// of them when included as part of the connection string.
111static USERINFO_RESERVED_CHARACTERS: LazyLock<HashSet<&'static char>> =
112    LazyLock::new(|| [':', '/', '?', '#', '[', ']', '@'].iter().collect());
113
114static ILLEGAL_DATABASE_CHARACTERS: LazyLock<HashSet<&'static char>> =
115    LazyLock::new(|| ['/', '\\', ' ', '"', '$'].iter().collect());
116
117/// An enum representing the address of a MongoDB server.
118#[derive(Clone, Debug, Eq, Serialize)]
119#[non_exhaustive]
120pub enum ServerAddress {
121    /// A TCP/IP host and port combination.
122    Tcp {
123        /// The hostname or IP address where the MongoDB server can be found.
124        host: String,
125
126        /// The TCP port that the MongoDB server is listening on.
127        ///
128        /// The default is 27017.
129        port: Option<u16>,
130    },
131    /// A Unix Domain Socket path.
132    #[cfg(unix)]
133    Unix {
134        /// The path to the Unix Domain Socket.
135        path: PathBuf,
136    },
137}
138
139impl From<SocketAddr> for ServerAddress {
140    fn from(item: SocketAddr) -> Self {
141        ServerAddress::Tcp {
142            host: item.ip().to_string(),
143            port: Some(item.port()),
144        }
145    }
146}
147
148impl<'de> Deserialize<'de> for ServerAddress {
149    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
150    where
151        D: Deserializer<'de>,
152    {
153        #[derive(Deserialize)]
154        #[serde(untagged)]
155        enum ServerAddressHelper {
156            String(String),
157            Object { host: String, port: Option<u16> },
158        }
159
160        let helper = ServerAddressHelper::deserialize(deserializer)?;
161        match helper {
162            ServerAddressHelper::String(string) => {
163                Self::parse(string).map_err(serde::de::Error::custom)
164            }
165            ServerAddressHelper::Object { host, port } => {
166                #[cfg(unix)]
167                if host.ends_with("sock") {
168                    return Ok(Self::Unix {
169                        path: PathBuf::from(host),
170                    });
171                }
172
173                Ok(Self::Tcp { host, port })
174            }
175        }
176    }
177}
178
179impl Default for ServerAddress {
180    fn default() -> Self {
181        Self::Tcp {
182            host: "localhost".into(),
183            port: None,
184        }
185    }
186}
187
188impl PartialEq for ServerAddress {
189    fn eq(&self, other: &Self) -> bool {
190        match (self, other) {
191            (
192                Self::Tcp { host, port },
193                Self::Tcp {
194                    host: other_host,
195                    port: other_port,
196                },
197            ) => host == other_host && port.unwrap_or(27017) == other_port.unwrap_or(27017),
198            #[cfg(unix)]
199            (Self::Unix { path }, Self::Unix { path: other_path }) => path == other_path,
200            #[cfg(unix)]
201            _ => false,
202        }
203    }
204}
205
206impl Hash for ServerAddress {
207    fn hash<H>(&self, state: &mut H)
208    where
209        H: Hasher,
210    {
211        match self {
212            Self::Tcp { host, port } => {
213                host.hash(state);
214                port.unwrap_or(27017).hash(state);
215            }
216            #[cfg(unix)]
217            Self::Unix { path } => path.hash(state),
218        }
219    }
220}
221
222impl FromStr for ServerAddress {
223    type Err = Error;
224    fn from_str(address: &str) -> Result<Self> {
225        ServerAddress::parse(address)
226    }
227}
228
229impl ServerAddress {
230    /// Parses an address string into a [`ServerAddress`].
231    pub fn parse(address: impl AsRef<str>) -> Result<Self> {
232        let address = address.as_ref();
233
234        if address.ends_with(".sock") {
235            #[cfg(unix)]
236            {
237                let address = percent_decode(address, "unix domain sockets must be URL-encoded")?;
238                return Ok(Self::Unix {
239                    path: PathBuf::from(address),
240                });
241            }
242            #[cfg(not(unix))]
243            return Err(ErrorKind::InvalidArgument {
244                message: "unix domain sockets are not supported on this platform".to_string(),
245            }
246            .into());
247        }
248
249        let (hostname, port) = if let Some(ip_literal) = address.strip_prefix("[") {
250            let Some((hostname, port)) = ip_literal.split_once("]") else {
251                return Err(ErrorKind::InvalidArgument {
252                    message: format!(
253                        "invalid server address {address}: missing closing ']' in IP literal \
254                         hostname"
255                    ),
256                }
257                .into());
258            };
259
260            if let Err(parse_error) = Ipv6Addr::from_str(hostname) {
261                return Err(ErrorKind::InvalidArgument {
262                    message: format!("invalid server address {address}: {parse_error}"),
263                }
264                .into());
265            }
266
267            let port = if port.is_empty() {
268                None
269            } else if let Some(port) = port.strip_prefix(":") {
270                Some(port)
271            } else {
272                return Err(ErrorKind::InvalidArgument {
273                    message: format!(
274                        "invalid server address {address}: the hostname can only be followed by a \
275                         port prefixed with ':', got {port}"
276                    ),
277                }
278                .into());
279            };
280
281            (hostname, port)
282        } else {
283            match address.split_once(":") {
284                Some((hostname, port)) => (hostname, Some(port)),
285                None => (address, None),
286            }
287        };
288
289        if hostname.is_empty() {
290            return Err(ErrorKind::InvalidArgument {
291                message: format!("invalid server address {address}: the hostname cannot be empty"),
292            }
293            .into());
294        }
295
296        let normalized_hostname = if let Ok(v4) = hostname.parse::<Ipv4Addr>() {
297            v4.to_string()
298        } else if let Ok(v6) = hostname.parse::<Ipv6Addr>() {
299            v6.to_string()
300        } else {
301            hostname.to_lowercase()
302        };
303
304        let port = if let Some(port) = port {
305            match u16::from_str(port) {
306                Ok(0) | Err(_) => {
307                    return Err(ErrorKind::InvalidArgument {
308                        message: format!(
309                            "invalid server address {address}: the port must be an integer \
310                             between 1 and 65535, got {port}"
311                        ),
312                    }
313                    .into())
314                }
315                Ok(port) => Some(port),
316            }
317        } else {
318            None
319        };
320
321        Ok(Self::Tcp {
322            host: normalized_hostname,
323            port,
324        })
325    }
326
327    #[cfg(feature = "dns-resolver")]
328    pub(crate) fn host(&self) -> std::borrow::Cow<'_, str> {
329        match self {
330            Self::Tcp { host, .. } => std::borrow::Cow::Borrowed(host.as_str()),
331            #[cfg(unix)]
332            Self::Unix { path } => path.to_string_lossy(),
333        }
334    }
335}
336
337impl fmt::Display for ServerAddress {
338    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
339        match self {
340            Self::Tcp { host, port } => {
341                write!(fmt, "{}:{}", host, port.unwrap_or(DEFAULT_PORT))
342            }
343            #[cfg(unix)]
344            Self::Unix { path } => write!(fmt, "{}", path.display()),
345        }
346    }
347}
348
349/// Specifies the server API version to declare
350#[derive(Clone, Debug, PartialEq, Serialize)]
351#[non_exhaustive]
352pub enum ServerApiVersion {
353    /// Use API version 1.
354    #[serde(rename = "1")]
355    V1,
356}
357
358impl FromStr for ServerApiVersion {
359    type Err = Error;
360
361    fn from_str(str: &str) -> Result<Self> {
362        match str {
363            "1" => Ok(Self::V1),
364            _ => Err(ErrorKind::InvalidArgument {
365                message: format!("invalid server api version string: {str}"),
366            }
367            .into()),
368        }
369    }
370}
371
372impl Display for ServerApiVersion {
373    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
374        match self {
375            Self::V1 => write!(f, "1"),
376        }
377    }
378}
379
380impl<'de> Deserialize<'de> for ServerApiVersion {
381    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
382    where
383        D: Deserializer<'de>,
384    {
385        let s = String::deserialize(deserializer)?;
386
387        ServerApiVersion::from_str(&s).map_err(|_| {
388            serde::de::Error::invalid_value(Unexpected::Str(&s), &"a valid version number")
389        })
390    }
391}
392
393/// Options used to declare a stable server API.  For more information, see the [Stable API](
394/// https://www.mongodb.com/docs/v5.0/reference/stable-api/) manual page.
395#[serde_with::skip_serializing_none]
396#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, TypedBuilder)]
397#[builder(field_defaults(default, setter(into)))]
398#[non_exhaustive]
399pub struct ServerApi {
400    /// The declared API version.
401    #[serde(rename = "apiVersion")]
402    #[builder(!default)]
403    pub version: ServerApiVersion,
404
405    /// Whether the MongoDB server should reject all commands that are not part of the
406    /// declared API version. This includes command options and aggregation pipeline stages.
407    #[serde(rename = "apiStrict")]
408    pub strict: Option<bool>,
409
410    /// Whether the MongoDB server should return command failures when functionality that is
411    /// deprecated from the declared API version is used.
412    /// Note that at the time of this writing, no deprecations in version 1 exist.
413    #[serde(rename = "apiDeprecationErrors")]
414    pub deprecation_errors: Option<bool>,
415}
416
417/// Configuration for connecting to a SOCKS5 proxy.
418#[cfg(feature = "socks5-proxy")]
419#[derive(Clone, Debug, Deserialize, PartialEq, TypedBuilder)]
420#[builder(field_defaults(default, setter(into)))]
421#[non_exhaustive]
422pub struct Socks5Proxy {
423    /// The hostname or IP address on which the proxy is listening.
424    #[builder(!default)]
425    pub host: String,
426
427    /// The port on which the proxy is listening. Defaults to 1080 if unset.
428    pub port: Option<u16>,
429
430    /// A username/password pair to authenticate to the proxy.
431    pub authentication: Option<(String, String)>,
432}
433
434/// Dummy struct for internal use.
435#[cfg(not(feature = "socks5-proxy"))]
436#[derive(Clone, Debug)]
437pub(crate) struct Socks5Proxy;
438
439#[cfg(feature = "socks5-proxy")]
440impl Socks5Proxy {
441    fn serialize<S>(
442        proxy: &Option<Socks5Proxy>,
443        serializer: S,
444    ) -> std::result::Result<S::Ok, S::Error>
445    where
446        S: serde::Serializer,
447    {
448        #[derive(Serialize)]
449        #[serde(rename_all = "camelCase")]
450        struct Helper<'a> {
451            proxy_host: &'a String,
452            proxy_port: Option<u16>,
453            proxy_username: Option<&'a String>,
454            proxy_password: Option<&'a String>,
455        }
456
457        if let Some(proxy) = proxy.as_ref() {
458            let helper = Helper {
459                proxy_host: &proxy.host,
460                proxy_port: proxy.port,
461                proxy_username: proxy.authentication.as_ref().map(|auth| &auth.0),
462                proxy_password: proxy.authentication.as_ref().map(|auth| &auth.1),
463            };
464            helper.serialize(serializer)
465        } else {
466            serializer.serialize_none()
467        }
468    }
469}
470
471/// Contains the options that can be used to create a new [`Client`](../struct.Client.html).
472#[derive(Clone, Deserialize, TypedBuilder)]
473#[builder(field_defaults(default, setter(into)))]
474#[derive_where(Debug, PartialEq)]
475#[serde(rename_all = "camelCase")]
476#[non_exhaustive]
477pub struct ClientOptions {
478    /// The initial list of seeds that the Client should connect to.
479    ///
480    /// Note that by default, the driver will autodiscover other nodes in the cluster. To connect
481    /// directly to a single server (rather than autodiscovering the rest of the cluster), set the
482    /// `direct_connection` field to `true`.
483    #[builder(default_code = "vec![ServerAddress::Tcp {
484        host: \"localhost\".to_string(),
485        port: Some(27017),
486    }]")]
487    #[serde(default = "default_hosts")]
488    pub hosts: Vec<ServerAddress>,
489
490    /// The application name that the Client will send to the server as part of the handshake. This
491    /// can be used in combination with the server logs to determine which Client is connected to a
492    /// server.
493    pub app_name: Option<String>,
494
495    /// The allowed compressors to use to compress messages sent to and decompress messages
496    /// received from the server. This list should be specified in priority order, as the
497    /// compressor used for messages will be the first compressor in this list that is also
498    /// supported by the server selected for operations.
499    #[cfg(any(
500        feature = "zstd-compression",
501        feature = "zlib-compression",
502        feature = "snappy-compression"
503    ))]
504    #[serde(skip)]
505    pub compressors: Option<Vec<Compressor>>,
506
507    /// The handler that should process all Connection Monitoring and Pooling events.
508    #[derive_where(skip)]
509    #[builder(setter(strip_option))]
510    #[serde(skip)]
511    pub cmap_event_handler: Option<EventHandler<crate::event::cmap::CmapEvent>>,
512
513    /// The handler that should process all command-related events.
514    ///
515    /// Note that monitoring command events may incur a performance penalty.
516    #[derive_where(skip)]
517    #[builder(setter(strip_option))]
518    #[serde(skip)]
519    pub command_event_handler: Option<EventHandler<crate::event::command::CommandEvent>>,
520
521    /// The connect timeout passed to each underlying TcpStream when attemtping to connect to the
522    /// server.
523    ///
524    /// The default value is 10 seconds.
525    pub connect_timeout: Option<Duration>,
526
527    /// The credential to use for authenticating connections made by this client.
528    pub credential: Option<Credential>,
529
530    /// Specifies whether the Client should directly connect to a single host rather than
531    /// autodiscover all servers in the cluster.
532    ///
533    /// The default value is false.
534    pub direct_connection: Option<bool>,
535
536    /// Extra information to append to the driver version in the metadata of the handshake with the
537    /// server. This should be used by libraries wrapping the driver, e.g. ODMs.
538    pub driver_info: Option<DriverInfo>,
539
540    /// The amount of time each monitoring thread should wait between performing server checks.
541    ///
542    /// The default value is 10 seconds.
543    pub heartbeat_freq: Option<Duration>,
544
545    /// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
546    #[builder(setter(skip))]
547    #[serde(rename = "loadbalanced")]
548    pub load_balanced: Option<bool>,
549
550    /// When running a read operation with a ReadPreference that allows selecting secondaries,
551    /// `local_threshold` is used to determine how much longer the average round trip time between
552    /// the driver and server is allowed compared to the least round trip time of all the suitable
553    /// servers. For example, if the average round trip times of the suitable servers are 5 ms, 10
554    /// ms, and 15 ms, and the local threshold is 8 ms, then the first two servers are within the
555    /// latency window and could be chosen for the operation, but the last one is not.
556    ///
557    /// A value of zero indicates that there is no latency window, so only the server with the
558    /// lowest average round trip time is eligible.
559    ///
560    /// The default value is 15 ms.
561    pub local_threshold: Option<Duration>,
562
563    /// The amount of time that a connection can remain idle in a connection pool before being
564    /// closed. A value of zero indicates that connections should not be closed due to being idle.
565    ///
566    /// By default, connections will not be closed due to being idle.
567    pub max_idle_time: Option<Duration>,
568
569    /// The maximum amount of connections that the Client should allow to be created in a
570    /// connection pool for a given server. If an operation is attempted on a server while
571    /// `max_pool_size` connections are checked out, the operation will block until an in-progress
572    /// operation finishes and its connection is checked back into the pool.
573    ///
574    /// The default value is 10.
575    pub max_pool_size: Option<u32>,
576
577    /// The minimum number of connections that should be available in a server's connection pool at
578    /// a given time. If fewer than `min_pool_size` connections are in the pool, connections will
579    /// be added to the pool in the background until `min_pool_size` is reached.
580    ///
581    /// The default value is 0.
582    pub min_pool_size: Option<u32>,
583
584    /// The maximum number of new connections that can be created concurrently.
585    ///
586    /// If specified, this value must be greater than 0. The default is 2.
587    pub max_connecting: Option<u32>,
588
589    /// Specifies the default read concern for operations performed on the Client. See the
590    /// ReadConcern type documentation for more details.
591    pub read_concern: Option<ReadConcern>,
592
593    /// The name of the replica set that the Client should connect to.
594    pub repl_set_name: Option<String>,
595
596    /// Whether or not the client should retry a read operation if the operation fails.
597    ///
598    /// The default value is true.
599    pub retry_reads: Option<bool>,
600
601    /// Whether or not the client should retry a write operation if the operation fails.
602    ///
603    /// The default value is true.
604    pub retry_writes: Option<bool>,
605
606    /// Configures which server monitoring protocol to use.
607    ///
608    /// The default is [`Auto`](ServerMonitoringMode::Auto).
609    pub server_monitoring_mode: Option<ServerMonitoringMode>,
610
611    /// The handler that should process all Server Discovery and Monitoring events.
612    #[derive_where(skip)]
613    #[builder(setter(strip_option))]
614    #[serde(skip)]
615    pub sdam_event_handler: Option<EventHandler<crate::event::sdam::SdamEvent>>,
616
617    /// The default selection criteria for operations performed on the Client. See the
618    /// SelectionCriteria type documentation for more details.
619    pub selection_criteria: Option<SelectionCriteria>,
620
621    /// The declared API version for this client.
622    /// The declared API version is applied to all commands run through the client, including those
623    /// sent through any handle derived from the client.
624    ///
625    /// Specifying stable API options in the command document passed to `run_command` AND
626    /// declaring an API version on the client is not supported and is considered undefined
627    /// behaviour. To run any command with a different API version or without declaring one, create
628    /// a separate client that declares the appropriate API version.
629    ///
630    /// For more information, see the [Stable API](
631    /// https://www.mongodb.com/docs/v5.0/reference/stable-api/) manual page.
632    pub server_api: Option<ServerApi>,
633
634    /// The amount of time the Client should attempt to select a server for an operation before
635    /// timing outs
636    ///
637    /// The default value is 30 seconds.
638    pub server_selection_timeout: Option<Duration>,
639
640    /// Default database for this client.
641    ///
642    /// By default, no default database is specified.
643    pub default_database: Option<String>,
644
645    /// Overrides the default "mongodb" service name for SRV lookup in both discovery and polling
646    pub srv_service_name: Option<String>,
647
648    /// The TLS configuration for the Client to use in its connections with the server.
649    ///
650    /// By default, TLS is disabled.
651    pub tls: Option<Tls>,
652
653    /// The maximum number of bytes that the driver should include in a tracing event
654    /// or log message's extended JSON string representation of a BSON document, e.g. a
655    /// command or reply from the server.
656    /// If truncation of a document at the exact specified length would occur in the middle
657    /// of a Unicode codepoint, the document will be truncated at the closest larger length
658    /// which falls on a boundary between codepoints.
659    /// Note that in cases where truncation occurs the output will not be valid JSON.
660    ///
661    /// The default value is 1000.
662    #[cfg(feature = "tracing-unstable")]
663    pub tracing_max_document_length_bytes: Option<usize>,
664
665    /// Specifies the default write concern for operations performed on the Client. See the
666    /// WriteConcern type documentation for more details.
667    pub write_concern: Option<WriteConcern>,
668
669    /// Limit on the number of mongos connections that may be created for sharded topologies.
670    pub srv_max_hosts: Option<u32>,
671
672    /// Configuration for opentelemetry.
673    #[cfg(feature = "opentelemetry")]
674    pub tracing: Option<crate::otel::OpentelemetryOptions>,
675
676    /// Configuration for connecting to a SOCKS5 proxy.
677    #[cfg(feature = "socks5-proxy")]
678    pub socks5_proxy: Option<Socks5Proxy>,
679
680    /// Information from the SRV URI that generated these client options, if applicable.
681    #[builder(setter(skip))]
682    #[serde(skip)]
683    #[derive_where(skip(Debug))]
684    pub(crate) original_srv_info: Option<OriginalSrvInfo>,
685
686    #[cfg(test)]
687    #[builder(setter(skip))]
688    #[derive_where(skip(Debug))]
689    pub(crate) original_uri: Option<String>,
690
691    /// Configuration of the DNS resolver used for SRV and TXT lookups.
692    /// By default, the host system's resolver configuration will be used.
693    ///
694    /// On Windows, there is a known performance issue in [hickory_resolver] with using the default
695    /// system configuration, so a custom configuration is recommended.
696    #[builder(setter(skip))]
697    #[serde(skip)]
698    #[derive_where(skip(Debug))]
699    #[cfg(feature = "dns-resolver")]
700    pub(crate) resolver_config: Option<ResolverConfig>,
701
702    /// Control test behavior of the client.
703    #[cfg(test)]
704    #[builder(setter(skip))]
705    #[serde(skip)]
706    #[derive_where(skip)]
707    pub(crate) test_options: Option<TestOptions>,
708}
709
710#[cfg(test)]
711#[derive(Debug, Clone, Default)]
712pub(crate) struct TestOptions {
713    /// Override MIN_HEARTBEAT_FREQUENCY.
714    pub(crate) min_heartbeat_freq: Option<Duration>,
715
716    /// Disable server and SRV-polling monitor threads.
717    pub(crate) disable_monitoring_threads: bool,
718
719    /// Mock response for `SrvPollingMonitor::lookup_hosts`.
720    pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,
721
722    /// Async-capable command event listener.
723    pub(crate) async_event_listener: Option<TestEventSender>,
724
725    /// Callback to receive hello commands.
726    pub(crate) hello_cb: Option<EventHandler<crate::cmap::Command>>,
727}
728
729pub(crate) type TestEventSender = tokio::sync::mpsc::Sender<
730    crate::runtime::AcknowledgedMessage<crate::event::command::CommandEvent>,
731>;
732
733fn default_hosts() -> Vec<ServerAddress> {
734    vec![ServerAddress::default()]
735}
736
737impl Default for ClientOptions {
738    fn default() -> Self {
739        Self::builder().build()
740    }
741}
742
743#[cfg(test)]
744impl Serialize for ClientOptions {
745    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
746    where
747        S: serde::Serializer,
748    {
749        #[derive(Serialize)]
750        struct ClientOptionsHelper<'a> {
751            appname: &'a Option<String>,
752
753            #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
754            connecttimeoutms: &'a Option<Duration>,
755
756            #[serde(flatten, serialize_with = "Credential::serialize")]
757            credential: &'a Option<Credential>,
758
759            directconnection: &'a Option<bool>,
760
761            #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
762            heartbeatfrequencyms: &'a Option<Duration>,
763
764            #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
765            localthresholdms: &'a Option<Duration>,
766
767            #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
768            maxidletimems: &'a Option<Duration>,
769
770            maxpoolsize: &'a Option<u32>,
771
772            minpoolsize: &'a Option<u32>,
773
774            maxconnecting: &'a Option<u32>,
775
776            #[serde(flatten, serialize_with = "ReadConcern::serialize")]
777            readconcern: &'a Option<ReadConcern>,
778
779            replicaset: &'a Option<String>,
780
781            retryreads: &'a Option<bool>,
782
783            retrywrites: &'a Option<bool>,
784
785            servermonitoringmode: Option<String>,
786
787            #[serde(
788                flatten,
789                serialize_with = "SelectionCriteria::serialize_for_client_options"
790            )]
791            selectioncriteria: &'a Option<SelectionCriteria>,
792
793            #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
794            serverselectiontimeoutms: &'a Option<Duration>,
795
796            #[serde(flatten, serialize_with = "Tls::serialize")]
797            tls: &'a Option<Tls>,
798
799            #[serde(flatten, serialize_with = "WriteConcern::serialize")]
800            writeconcern: &'a Option<WriteConcern>,
801
802            zlibcompressionlevel: &'a Option<i32>,
803
804            loadbalanced: &'a Option<bool>,
805
806            srvmaxhosts: Option<i32>,
807
808            srvservicename: &'a Option<String>,
809
810            #[cfg(feature = "socks5-proxy")]
811            #[serde(flatten, serialize_with = "Socks5Proxy::serialize")]
812            socks5proxy: &'a Option<Socks5Proxy>,
813        }
814
815        let client_options = ClientOptionsHelper {
816            appname: &self.app_name,
817            connecttimeoutms: &self.connect_timeout,
818            credential: &self.credential,
819            directconnection: &self.direct_connection,
820            heartbeatfrequencyms: &self.heartbeat_freq,
821            localthresholdms: &self.local_threshold,
822            maxidletimems: &self.max_idle_time,
823            maxpoolsize: &self.max_pool_size,
824            minpoolsize: &self.min_pool_size,
825            maxconnecting: &self.max_connecting,
826            readconcern: &self.read_concern,
827            replicaset: &self.repl_set_name,
828            retryreads: &self.retry_reads,
829            retrywrites: &self.retry_writes,
830            servermonitoringmode: self
831                .server_monitoring_mode
832                .as_ref()
833                .map(|m| format!("{m:?}").to_lowercase()),
834            selectioncriteria: &self.selection_criteria,
835            serverselectiontimeoutms: &self.server_selection_timeout,
836            tls: &self.tls,
837            writeconcern: &self.write_concern,
838            loadbalanced: &self.load_balanced,
839            zlibcompressionlevel: &None,
840            srvmaxhosts: self
841                .srv_max_hosts
842                .map(|v| v.try_into())
843                .transpose()
844                .map_err(serde::ser::Error::custom)?,
845            srvservicename: &self.srv_service_name,
846            #[cfg(feature = "socks5-proxy")]
847            socks5proxy: &self.socks5_proxy,
848        };
849
850        client_options.serialize(serializer)
851    }
852}
853
854// For ConnectionString serialization
855fn serialize_uuid_rep_option<S>(
856    value: &Option<UuidRepresentation>,
857    serializer: S,
858) -> std::result::Result<S::Ok, S::Error>
859where
860    S: Serializer,
861{
862    #[non_exhaustive]
863    #[derive(Serialize)]
864    #[serde(remote = "UuidRepresentation")]
865    enum UuidRepresentationForSerialize {
866        Standard,
867        CSharpLegacy,
868        JavaLegacy,
869        PythonLegacy,
870    }
871    match value {
872        Some(rep) => UuidRepresentationForSerialize::serialize(rep, serializer),
873        None => serializer.serialize_none(),
874    }
875}
876
877/// Contains the options that can be set via a MongoDB connection string.
878///
879/// The format of a MongoDB connection string is described [here](https://www.mongodb.com/docs/manual/reference/connection-string/#connection-string-formats).
880#[skip_serializing_none]
881#[derive(Clone, Debug, Default, PartialEq, Serialize)]
882#[serde(rename_all = "camelCase")]
883#[non_exhaustive]
884pub struct ConnectionString {
885    /// The initial list of seeds that the Client should connect to, or a DNS name used for SRV
886    /// lookup of the initial seed list.
887    ///
888    /// Note that by default, the driver will autodiscover other nodes in the cluster. To connect
889    /// directly to a single server (rather than autodiscovering the rest of the cluster), set the
890    /// `direct_connection` field to `true`.
891    pub host_info: HostInfo,
892
893    /// The application name that the Client will send to the server as part of the handshake. This
894    /// can be used in combination with the server logs to determine which Client is connected to a
895    /// server.
896    pub app_name: Option<String>,
897
898    /// The TLS configuration for the Client to use in its connections with the server.
899    ///
900    /// By default, TLS is disabled.
901    #[serde(serialize_with = "Tls::serialize")]
902    pub tls: Option<Tls>,
903
904    /// The amount of time each monitoring thread should wait between performing server checks.
905    ///
906    /// The default value is 10 seconds.
907    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
908    pub heartbeat_frequency: Option<Duration>,
909
910    /// When running a read operation with a ReadPreference that allows selecting secondaries,
911    /// `local_threshold` is used to determine how much longer the average round trip time between
912    /// the driver and server is allowed compared to the least round trip time of all the suitable
913    /// servers. For example, if the average round trip times of the suitable servers are 5 ms, 10
914    /// ms, and 15 ms, and the local threshold is 8 ms, then the first two servers are within the
915    /// latency window and could be chosen for the operation, but the last one is not.
916    ///
917    /// A value of zero indicates that there is no latency window, so only the server with the
918    /// lowest average round trip time is eligible.
919    ///
920    /// The default value is 15 ms.
921    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
922    pub local_threshold: Option<Duration>,
923
924    /// Specifies the default read concern for operations performed on the Client. See the
925    /// ReadConcern type documentation for more details.
926    #[serde(serialize_with = "ReadConcern::serialize")]
927    pub read_concern: Option<ReadConcern>,
928
929    /// The name of the replica set that the Client should connect to.
930    pub replica_set: Option<String>,
931
932    /// Specifies the default write concern for operations performed on the Client. See the
933    /// WriteConcern type documentation for more details.
934    #[serde(serialize_with = "WriteConcern::serialize")]
935    pub write_concern: Option<WriteConcern>,
936
937    /// The amount of time the Client should attempt to select a server for an operation before
938    /// timing outs
939    ///
940    /// The default value is 30 seconds.
941    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
942    pub server_selection_timeout: Option<Duration>,
943
944    /// The maximum amount of connections that the Client should allow to be created in a
945    /// connection pool for a given server. If an operation is attempted on a server while
946    /// `max_pool_size` connections are checked out, the operation will block until an in-progress
947    /// operation finishes and its connection is checked back into the pool.
948    ///
949    /// The default value is 10.
950    pub max_pool_size: Option<u32>,
951
952    /// The minimum number of connections that should be available in a server's connection pool at
953    /// a given time. If fewer than `min_pool_size` connections are in the pool, connections will
954    /// be added to the pool in the background until `min_pool_size` is reached.
955    ///
956    /// The default value is 0.
957    pub min_pool_size: Option<u32>,
958
959    /// The maximum number of new connections that can be created concurrently.
960    ///
961    /// If specified, this value must be greater than 0. The default is 2.
962    pub max_connecting: Option<u32>,
963
964    /// The amount of time that a connection can remain idle in a connection pool before being
965    /// closed. A value of zero indicates that connections should not be closed due to being idle.
966    ///
967    /// By default, connections will not be closed due to being idle.
968    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
969    pub max_idle_time: Option<Duration>,
970
971    #[cfg(any(
972        feature = "zstd-compression",
973        feature = "zlib-compression",
974        feature = "snappy-compression"
975    ))]
976    /// The compressors that the Client is willing to use in the order they are specified
977    /// in the configuration.  The Client sends this list of compressors to the server.
978    /// The server responds with the intersection of its supported list of compressors.
979    /// The order of compressors indicates preference of compressors.
980    pub compressors: Option<Vec<Compressor>>,
981
982    /// The connect timeout passed to each underlying TcpStream when attempting to connect to the
983    /// server.
984    ///
985    /// The default value is 10 seconds.
986    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
987    pub connect_timeout: Option<Duration>,
988
989    /// Whether or not the client should retry a read operation if the operation fails.
990    ///
991    /// The default value is true.
992    pub retry_reads: Option<bool>,
993
994    /// Whether or not the client should retry a write operation if the operation fails.
995    ///
996    /// The default value is true.
997    pub retry_writes: Option<bool>,
998
999    /// Configures which server monitoring protocol to use.
1000    ///
1001    /// The default is [`Auto`](ServerMonitoringMode::Auto).
1002    pub server_monitoring_mode: Option<ServerMonitoringMode>,
1003
1004    /// Specifies whether the Client should directly connect to a single host rather than
1005    /// autodiscover all servers in the cluster.
1006    ///
1007    /// The default value is false.
1008    pub direct_connection: Option<bool>,
1009
1010    /// The credential to use for authenticating connections made by this client.
1011    #[serde(serialize_with = "Credential::serialize")]
1012    pub credential: Option<Credential>,
1013
1014    /// Default database for this client.
1015    ///
1016    /// By default, no default database is specified.
1017    pub default_database: Option<String>,
1018
1019    /// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
1020    pub load_balanced: Option<bool>,
1021
1022    /// Amount of time spent attempting to send or receive on a socket before timing out; note that
1023    /// this only applies to application operations, not server discovery and monitoring.
1024    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
1025    #[deprecated(
1026        since = "3.5.0",
1027        note = "the Rust driver does not support socketTimeoutMS"
1028    )]
1029    pub socket_timeout: Option<Duration>,
1030
1031    /// Default read preference for the client.
1032    pub read_preference: Option<ReadPreference>,
1033
1034    /// The [`UuidRepresentation`] to use when decoding [`Binary`](crate::bson::Binary) values with
1035    /// the [`UuidOld`](crate::bson::spec::BinarySubtype::UuidOld) subtype. This is not used by
1036    /// the driver; client code can use this when deserializing relevant values with
1037    /// [`Binary::to_uuid_with_representation`](crate::bson::binary::Binary::to_uuid_with_representation).
1038    #[serde(serialize_with = "serialize_uuid_rep_option")]
1039    pub uuid_representation: Option<UuidRepresentation>,
1040
1041    /// Limit on the number of mongos connections that may be created for sharded topologies.
1042    pub srv_max_hosts: Option<u32>,
1043
1044    /// Overrides the default "mongodb" service name for SRV lookup in both discovery and polling
1045    pub srv_service_name: Option<String>,
1046
1047    /// Configuration for connecting to a SOCKS5 proxy.
1048    #[cfg(feature = "socks5-proxy")]
1049    #[serde(serialize_with = "Socks5Proxy::serialize")]
1050    pub socks5_proxy: Option<Socks5Proxy>,
1051
1052    #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
1053    wait_queue_timeout: Option<Duration>,
1054    tls_insecure: Option<bool>,
1055
1056    #[cfg(test)]
1057    #[serde(skip_serializing)]
1058    original_uri: String,
1059}
1060
1061/// Elements from the connection string that are not top-level fields in `ConnectionString`.
1062#[derive(Debug, Default)]
1063struct ConnectionStringParts {
1064    read_preference_tags: Option<Vec<TagSet>>,
1065    max_staleness: Option<Duration>,
1066    auth_mechanism: Option<AuthMechanism>,
1067    auth_mechanism_properties: Option<Document>,
1068    zlib_compression: Option<i32>,
1069    auth_source: Option<String>,
1070    #[cfg(feature = "socks5-proxy")]
1071    proxy_host: Option<String>,
1072    #[cfg(feature = "socks5-proxy")]
1073    proxy_port: Option<u16>,
1074    #[cfg(feature = "socks5-proxy")]
1075    proxy_username: Option<String>,
1076    #[cfg(feature = "socks5-proxy")]
1077    proxy_password: Option<String>,
1078}
1079
1080/// Specification for mongodb server connections.
1081#[derive(Debug, PartialEq, Clone, Serialize)]
1082#[non_exhaustive]
1083pub enum HostInfo {
1084    /// A set of addresses.
1085    HostIdentifiers(Vec<ServerAddress>),
1086    /// A DNS record for SRV lookup.
1087    DnsRecord(String),
1088}
1089
1090impl Default for HostInfo {
1091    fn default() -> Self {
1092        Self::HostIdentifiers(vec![])
1093    }
1094}
1095
1096impl HostInfo {
1097    async fn resolve(
1098        self,
1099        resolver_config: Option<ResolverConfig>,
1100        srv_service_name: Option<String>,
1101    ) -> Result<ResolvedHostInfo> {
1102        Ok(match self {
1103            Self::HostIdentifiers(hosts) => ResolvedHostInfo::HostIdentifiers(hosts),
1104            Self::DnsRecord(hostname) => {
1105                let mut resolver =
1106                    SrvResolver::new(resolver_config.clone(), srv_service_name).await?;
1107                let config = resolver.resolve_client_options(&hostname).await?;
1108                ResolvedHostInfo::DnsRecord { hostname, config }
1109            }
1110        })
1111    }
1112}
1113
1114enum ResolvedHostInfo {
1115    HostIdentifiers(Vec<ServerAddress>),
1116    DnsRecord {
1117        hostname: String,
1118        config: crate::srv::ResolvedConfig,
1119    },
1120}
1121
1122/// Specifies whether TLS configuration should be used with the operations that the
1123/// [`Client`](../struct.Client.html) performs.
1124#[derive(Clone, Debug, Deserialize, PartialEq)]
1125pub enum Tls {
1126    /// Enable TLS with the specified options.
1127    Enabled(TlsOptions),
1128
1129    /// Disable TLS.
1130    Disabled,
1131}
1132
1133impl From<TlsOptions> for Tls {
1134    fn from(options: TlsOptions) -> Self {
1135        Self::Enabled(options)
1136    }
1137}
1138
1139impl From<TlsOptions> for Option<Tls> {
1140    fn from(options: TlsOptions) -> Self {
1141        Some(Tls::Enabled(options))
1142    }
1143}
1144
1145impl Tls {
1146    pub(crate) fn serialize<S>(
1147        tls: &Option<Tls>,
1148        serializer: S,
1149    ) -> std::result::Result<S::Ok, S::Error>
1150    where
1151        S: serde::Serializer,
1152    {
1153        match tls {
1154            Some(Tls::Enabled(tls_options)) => TlsOptions::serialize(tls_options, serializer),
1155            _ => serializer.serialize_none(),
1156        }
1157    }
1158}
1159
1160/// Specifies the TLS configuration that the [`Client`](../struct.Client.html) should use.
1161#[derive(Clone, Debug, Default, Deserialize, PartialEq, TypedBuilder)]
1162#[builder(field_defaults(default, setter(into)))]
1163#[non_exhaustive]
1164pub struct TlsOptions {
1165    /// Whether or not the [`Client`](../struct.Client.html) should return an error if the server
1166    /// presents an invalid certificate. This setting should _not_ be set to `true` in
1167    /// production; it should only be used for testing.
1168    ///
1169    /// The default value is to error when the server presents an invalid certificate.
1170    pub allow_invalid_certificates: Option<bool>,
1171
1172    /// The path to the CA file that the [`Client`](../struct.Client.html) should use for TLS. If
1173    /// none is specified, then the driver will use the Mozilla root certificates from the
1174    /// `webpki-roots` crate.
1175    pub ca_file_path: Option<PathBuf>,
1176
1177    /// The path to the certificate file that the [`Client`](../struct.Client.html) should present
1178    /// to the server to verify its identify. If none is specified, then the
1179    /// [`Client`](../struct.Client.html) will not attempt to verify its identity to the
1180    /// server.
1181    pub cert_key_file_path: Option<PathBuf>,
1182
1183    /// Whether or not the [`Client`](../struct.Client.html) should return an error if the hostname
1184    /// is invalid.
1185    ///
1186    /// The default value is to error on invalid hostnames.
1187    #[cfg(feature = "openssl-tls")]
1188    pub allow_invalid_hostnames: Option<bool>,
1189
1190    /// If set, the key in `cert_key_file_path` must be encrypted with this password.
1191    #[cfg(feature = "cert-key-password")]
1192    pub tls_certificate_key_file_password: Option<Vec<u8>>,
1193}
1194
1195impl TlsOptions {
1196    pub(crate) fn serialize<S>(
1197        tls_options: &TlsOptions,
1198        serializer: S,
1199    ) -> std::result::Result<S::Ok, S::Error>
1200    where
1201        S: serde::Serializer,
1202    {
1203        #[derive(Serialize)]
1204        struct TlsOptionsHelper<'a> {
1205            tls: bool,
1206            tlscafile: Option<&'a str>,
1207            tlscertificatekeyfile: Option<&'a str>,
1208            tlsallowinvalidcertificates: Option<bool>,
1209            #[cfg(feature = "cert-key-password")]
1210            tlscertificatekeyfilepassword: Option<&'a str>,
1211        }
1212
1213        let state = TlsOptionsHelper {
1214            tls: true,
1215            tlscafile: tls_options
1216                .ca_file_path
1217                .as_ref()
1218                .map(|s| s.to_str().unwrap()),
1219            tlscertificatekeyfile: tls_options
1220                .cert_key_file_path
1221                .as_ref()
1222                .map(|s| s.to_str().unwrap()),
1223            tlsallowinvalidcertificates: tls_options.allow_invalid_certificates,
1224            #[cfg(feature = "cert-key-password")]
1225            tlscertificatekeyfilepassword: tls_options
1226                .tls_certificate_key_file_password
1227                .as_deref()
1228                .map(|b| std::str::from_utf8(b).unwrap()),
1229        };
1230        state.serialize(serializer)
1231    }
1232}
1233
1234/// Extra information to append to the driver version in the metadata of the handshake with the
1235/// server. This should be used by libraries wrapping the driver, e.g. ODMs.
1236#[derive(Clone, Debug, Deserialize, TypedBuilder, Eq)]
1237#[builder(field_defaults(default, setter(into)))]
1238#[non_exhaustive]
1239pub struct DriverInfo {
1240    /// The name of the library wrapping the driver.
1241    #[builder(!default)]
1242    pub name: String,
1243
1244    /// The version of the library wrapping the driver.
1245    pub version: Option<String>,
1246
1247    /// Optional platform information for the wrapping driver.
1248    pub platform: Option<String>,
1249}
1250
1251impl DriverInfo {
1252    pub(crate) fn spec_version(&self) -> &str {
1253        self.version.as_deref().unwrap_or("")
1254    }
1255
1256    pub(crate) fn spec_platform(&self) -> &str {
1257        self.platform.as_deref().unwrap_or("")
1258    }
1259}
1260
1261impl PartialEq for DriverInfo {
1262    fn eq(&self, other: &Self) -> bool {
1263        self.name == other.name
1264            && self.spec_version() == other.spec_version()
1265            && self.spec_platform() == other.spec_platform()
1266    }
1267}
1268
1269impl Hash for DriverInfo {
1270    fn hash<H: Hasher>(&self, state: &mut H) {
1271        self.name.hash(state);
1272        self.spec_version().hash(state);
1273        self.spec_platform().hash(state);
1274    }
1275}
1276
1277impl ClientOptions {
1278    /// Creates a new ClientOptions with the `original_srv_hostname` field set to the testing value
1279    /// used in the SRV tests.
1280    #[cfg(test)]
1281    pub(crate) fn new_srv() -> Self {
1282        Self {
1283            original_srv_info: Some(OriginalSrvInfo {
1284                hostname: "localhost.test.test.build.10gen.cc".into(),
1285                min_ttl: Duration::from_secs(60),
1286            }),
1287            ..Default::default()
1288        }
1289    }
1290
1291    pub(crate) fn tls_options(&self) -> Option<TlsOptions> {
1292        match self.tls {
1293            Some(Tls::Enabled(ref opts)) => Some(opts.clone()),
1294            _ => None,
1295        }
1296    }
1297
1298    /// Ensure the options set are valid, returning an error describing the problem if they are not.
1299    pub(crate) fn validate(&self) -> Result<()> {
1300        if let Some(true) = self.direct_connection {
1301            if self.hosts.len() > 1 {
1302                return Err(ErrorKind::InvalidArgument {
1303                    message: "cannot specify multiple seeds with directConnection=true".to_string(),
1304                }
1305                .into());
1306            }
1307        }
1308
1309        if let Some(ref write_concern) = self.write_concern {
1310            write_concern.validate()?;
1311        }
1312
1313        if self.load_balanced.unwrap_or(false) {
1314            if self.hosts.len() > 1 {
1315                return Err(ErrorKind::InvalidArgument {
1316                    message: "cannot specify multiple seeds with loadBalanced=true".to_string(),
1317                }
1318                .into());
1319            }
1320            if self.repl_set_name.is_some() {
1321                return Err(ErrorKind::InvalidArgument {
1322                    message: "cannot specify replicaSet with loadBalanced=true".to_string(),
1323                }
1324                .into());
1325            }
1326            if self.direct_connection == Some(true) {
1327                return Err(ErrorKind::InvalidArgument {
1328                    message: "cannot specify directConnection=true with loadBalanced=true"
1329                        .to_string(),
1330                }
1331                .into());
1332            }
1333        }
1334
1335        #[cfg(any(
1336            feature = "zstd-compression",
1337            feature = "zlib-compression",
1338            feature = "snappy-compression"
1339        ))]
1340        if let Some(ref compressors) = self.compressors {
1341            for compressor in compressors {
1342                compressor.validate()?;
1343            }
1344        }
1345
1346        if let Some(0) = self.max_pool_size {
1347            return Err(Error::invalid_argument("cannot specify maxPoolSize=0"));
1348        }
1349
1350        if let Some(0) = self.max_connecting {
1351            return Err(Error::invalid_argument("cannot specify maxConnecting=0"));
1352        }
1353
1354        if let Some(SelectionCriteria::ReadPreference(ref rp)) = self.selection_criteria {
1355            if let Some(max_staleness) = rp.max_staleness() {
1356                verify_max_staleness(
1357                    max_staleness,
1358                    self.heartbeat_freq.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY),
1359                )?;
1360            }
1361        }
1362
1363        if let Some(heartbeat_frequency) = self.heartbeat_freq {
1364            if heartbeat_frequency < self.min_heartbeat_frequency() {
1365                return Err(ErrorKind::InvalidArgument {
1366                    message: format!(
1367                        "'heartbeat_freq' must be at least {}ms, but {}ms was given",
1368                        self.min_heartbeat_frequency().as_millis(),
1369                        heartbeat_frequency.as_millis()
1370                    ),
1371                }
1372                .into());
1373            }
1374        }
1375
1376        #[cfg(feature = "tracing-unstable")]
1377        {
1378            let hostnames = if let Some(info) = &self.original_srv_info {
1379                vec![info.hostname.to_ascii_lowercase()]
1380            } else {
1381                self.hosts
1382                    .iter()
1383                    .filter_map(|addr| match addr {
1384                        ServerAddress::Tcp { host, .. } => Some(host.to_ascii_lowercase()),
1385                        #[cfg(unix)]
1386                        _ => None,
1387                    })
1388                    .collect()
1389            };
1390            if hostnames.iter().any(|s| s.ends_with(".cosmos.azure.com")) {
1391                tracing::info!("You appear to be connected to a CosmosDB cluster. For more information regarding feature compatibility and support please visit https://www.mongodb.com/supportability/cosmosdb");
1392            }
1393            if hostnames.iter().any(|s| {
1394                s.ends_with(".docdb.amazonaws.com") || s.ends_with(".docdb-elastic.amazonaws.com")
1395            }) {
1396                tracing::info!("You appear to be connected to a DocumentDB cluster. For more information regarding feature compatibility and support please visit https://www.mongodb.com/supportability/documentdb");
1397            }
1398        }
1399
1400        #[cfg(feature = "socks5-proxy")]
1401        {
1402            if let Some(proxy) = self.socks5_proxy.as_ref() {
1403                if self
1404                    .hosts
1405                    .iter()
1406                    .any(|address| !matches!(address, ServerAddress::Tcp { .. }))
1407                {
1408                    return Err(Error::invalid_argument(
1409                        "cannot specify a non-TCP address when connected to a proxy host",
1410                    ));
1411                }
1412
1413                if let Some((username, password)) = proxy.authentication.as_ref() {
1414                    if username.is_empty() || password.is_empty() {
1415                        return Err(Error::invalid_argument(
1416                            "cannot specify an empty username or password for proxy host",
1417                        ));
1418                    }
1419                }
1420            }
1421        }
1422
1423        Ok(())
1424    }
1425
1426    #[cfg(test)]
1427    pub(crate) fn test_options_mut(&mut self) -> &mut TestOptions {
1428        self.test_options.get_or_insert_with(Default::default)
1429    }
1430
1431    pub(crate) fn min_heartbeat_frequency(&self) -> Duration {
1432        #[cfg(test)]
1433        {
1434            self.test_options
1435                .as_ref()
1436                .and_then(|to| to.min_heartbeat_freq)
1437                .unwrap_or(MIN_HEARTBEAT_FREQUENCY)
1438        }
1439
1440        #[cfg(not(test))]
1441        {
1442            MIN_HEARTBEAT_FREQUENCY
1443        }
1444    }
1445
1446    pub(crate) fn resolver_config(&self) -> Option<&ResolverConfig> {
1447        #[cfg(feature = "dns-resolver")]
1448        {
1449            self.resolver_config.as_ref()
1450        }
1451        #[cfg(not(feature = "dns-resolver"))]
1452        {
1453            None
1454        }
1455    }
1456}
1457
1458/// Splits the string once on the first instance of the given delimiter. If the delimiter is not
1459/// present, returns the entire string as the "left" side.
1460///
1461/// e.g.
1462/// "abc.def" split on "." -> ("abc", Some("def"))
1463/// "ab.cd.ef" split on "." -> ("ab", Some("cd.ef"))
1464/// "abcdef" split on "." -> ("abcdef", None)
1465fn split_once_left<'a>(s: &'a str, delimiter: &str) -> (&'a str, Option<&'a str>) {
1466    match s.split_once(delimiter) {
1467        Some((l, r)) => (l, Some(r)),
1468        None => (s, None),
1469    }
1470}
1471
1472/// Splits the string once on the last instance of the given delimiter. If the delimiter is not
1473/// present, returns the entire string as the "right" side.
1474///
1475/// e.g.
1476/// "abd.def" split on "." -> (Some("abc"), "def")
1477/// "ab.cd.ef" split on "." -> (Some("ab.cd"), "ef")
1478/// "abcdef" split on "." -> (None, "abcdef")
1479fn split_once_right<'a>(s: &'a str, delimiter: &str) -> (Option<&'a str>, &'a str) {
1480    match s.rsplit_once(delimiter) {
1481        Some((l, r)) => (Some(l), r),
1482        None => (None, s),
1483    }
1484}
1485
1486fn percent_decode(s: &str, err_message: &str) -> Result<String> {
1487    match percent_encoding::percent_decode_str(s).decode_utf8() {
1488        Ok(result) => Ok(result.to_string()),
1489        Err(_) => Err(ErrorKind::InvalidArgument {
1490            message: err_message.to_string(),
1491        }
1492        .into()),
1493    }
1494}
1495
1496fn percent_encode(s: &str) -> String {
1497    percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).to_string()
1498}
1499
1500fn validate_and_parse_userinfo(s: &str, userinfo_type: &str) -> Result<String> {
1501    if s.chars().any(|c| USERINFO_RESERVED_CHARACTERS.contains(&c)) {
1502        return Err(Error::invalid_argument(format!(
1503            "{userinfo_type} must be URL encoded"
1504        )));
1505    }
1506
1507    // All instances of '%' in the username must be part of an percent-encoded substring. This means
1508    // that there must be two hexidecimal digits following any '%' in the username.
1509    if s.split('%')
1510        .skip(1)
1511        .any(|part| part.len() < 2 || part[0..2].chars().any(|c| !c.is_ascii_hexdigit()))
1512    {
1513        return Err(Error::invalid_argument(format!(
1514            "{userinfo_type} cannot contain unescaped %"
1515        )));
1516    }
1517
1518    percent_decode(s, &format!("{userinfo_type} must be URL encoded"))
1519}
1520
1521impl TryFrom<&str> for ConnectionString {
1522    type Error = Error;
1523
1524    fn try_from(value: &str) -> Result<Self> {
1525        Self::parse(value)
1526    }
1527}
1528
1529impl TryFrom<&String> for ConnectionString {
1530    type Error = Error;
1531
1532    fn try_from(value: &String) -> Result<Self> {
1533        Self::parse(value)
1534    }
1535}
1536
1537impl TryFrom<String> for ConnectionString {
1538    type Error = Error;
1539
1540    fn try_from(value: String) -> Result<Self> {
1541        Self::parse(value)
1542    }
1543}
1544
1545impl ConnectionString {
1546    /// Parses a MongoDB connection string into a [`ConnectionString`] struct. If the string is
1547    /// malformed or one of the options has an invalid value, an error will be returned.
1548    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
1549        let s = s.as_ref();
1550
1551        let Some((scheme, after_scheme)) = s.split_once("://") else {
1552            return Err(Error::invalid_argument(
1553                "connection string contains no scheme",
1554            ));
1555        };
1556
1557        let srv = match scheme {
1558            "mongodb" => false,
1559            #[cfg(feature = "dns-resolver")]
1560            "mongodb+srv" => true,
1561            #[cfg(not(feature = "dns-resolver"))]
1562            "mongodb+srv" => {
1563                return Err(Error::invalid_argument(
1564                    "mongodb+srv connection strings cannot be used when the 'dns-resolver' \
1565                     feature is disabled",
1566                ))
1567            }
1568            other => {
1569                return Err(Error::invalid_argument(format!(
1570                    "unsupported connection string scheme: {other}"
1571                )))
1572            }
1573        };
1574
1575        let (pre_options, options) = split_once_left(after_scheme, "?");
1576        let (user_info, hosts_and_auth_db) = split_once_right(pre_options, "@");
1577
1578        // if '@' is in the host section, it MUST be interpreted as a request for authentication
1579        let authentication_requested = user_info.is_some();
1580        let (username, password) = match user_info {
1581            Some(user_info) => {
1582                let (username, password) = split_once_left(user_info, ":");
1583                let username = if username.is_empty() {
1584                    None
1585                } else {
1586                    Some(validate_and_parse_userinfo(username, "username")?)
1587                };
1588                let password = match password {
1589                    Some(password) => Some(validate_and_parse_userinfo(password, "password")?),
1590                    None => None,
1591                };
1592                (username, password)
1593            }
1594            None => (None, None),
1595        };
1596
1597        let (hosts, auth_db) = split_once_left(hosts_and_auth_db, "/");
1598
1599        let hosts = hosts
1600            .split(",")
1601            .map(ServerAddress::parse)
1602            .collect::<Result<Vec<ServerAddress>>>()?;
1603        let host_info = if !srv {
1604            HostInfo::HostIdentifiers(hosts)
1605        } else {
1606            match &hosts[..] {
1607                [ServerAddress::Tcp { host, port: None }] => HostInfo::DnsRecord(host.clone()),
1608                [ServerAddress::Tcp {
1609                    host: _,
1610                    port: Some(_),
1611                }] => {
1612                    return Err(Error::invalid_argument(
1613                        "a port cannot be specified with 'mongodb+srv'",
1614                    ));
1615                }
1616                #[cfg(unix)]
1617                [ServerAddress::Unix { .. }] => {
1618                    return Err(Error::invalid_argument(
1619                        "unix sockets cannot be used with 'mongodb+srv'",
1620                    ));
1621                }
1622                _ => {
1623                    return Err(Error::invalid_argument(
1624                        "exactly one host must be specified with 'mongodb+srv'",
1625                    ))
1626                }
1627            }
1628        };
1629
1630        let db = match auth_db {
1631            Some("") | None => None,
1632            Some(db) => {
1633                let decoded = percent_decode(db, "database name must be URL encoded")?;
1634                for c in decoded.chars() {
1635                    if ILLEGAL_DATABASE_CHARACTERS.contains(&c) {
1636                        return Err(Error::invalid_argument(format!(
1637                            "illegal character in database name: {c}"
1638                        )));
1639                    }
1640                }
1641                Some(decoded)
1642            }
1643        };
1644
1645        let mut conn_str = ConnectionString {
1646            host_info,
1647            #[cfg(test)]
1648            original_uri: s.into(),
1649            ..Default::default()
1650        };
1651
1652        let mut parts = match options {
1653            Some(options) => conn_str.parse_options(options)?,
1654            None => ConnectionStringParts::default(),
1655        };
1656
1657        if conn_str.srv_service_name.is_some() && !srv {
1658            return Err(Error::invalid_argument(
1659                "srvServiceName cannot be specified with a non-SRV URI",
1660            ));
1661        }
1662
1663        if let Some(srv_max_hosts) = conn_str.srv_max_hosts {
1664            if !srv {
1665                return Err(Error::invalid_argument(
1666                    "srvMaxHosts cannot be specified with a non-SRV URI",
1667                ));
1668            }
1669            if srv_max_hosts > 0 {
1670                if conn_str.replica_set.is_some() {
1671                    return Err(Error::invalid_argument(
1672                        "srvMaxHosts and replicaSet cannot both be present",
1673                    ));
1674                }
1675                if conn_str.load_balanced == Some(true) {
1676                    return Err(Error::invalid_argument(
1677                        "srvMaxHosts and loadBalanced=true cannot both be present",
1678                    ));
1679                }
1680            }
1681        }
1682
1683        if let Some(username) = username {
1684            let credential = conn_str.credential.get_or_insert_with(Default::default);
1685            credential.username = Some(username);
1686            credential.password = password;
1687        }
1688
1689        if parts.auth_source.as_deref() == Some("") {
1690            return Err(ErrorKind::InvalidArgument {
1691                message: "empty authSource provided".to_string(),
1692            }
1693            .into());
1694        }
1695
1696        match parts.auth_mechanism {
1697            Some(ref mechanism) => {
1698                let credential = conn_str.credential.get_or_insert_with(Default::default);
1699                credential.source = parts.auth_source;
1700
1701                if let Some(mut doc) = parts.auth_mechanism_properties.take() {
1702                    match doc.remove("CANONICALIZE_HOST_NAME") {
1703                        Some(Bson::String(s)) => {
1704                            let val = match &s.to_lowercase()[..] {
1705                                "true" => Bson::Boolean(true),
1706                                "false" => Bson::Boolean(false),
1707                                "none" | "forward" | "forwardandreverse" => Bson::String(s),
1708                                _ => {
1709                                    return Err(ErrorKind::InvalidArgument {
1710                                        message: format!(
1711                                            "Invalid CANONICALIZE_HOST_NAME value: {s}. Valid \
1712                                             values are 'none', 'forward', 'forwardAndReverse', \
1713                                             'true', 'false'"
1714                                        ),
1715                                    }
1716                                    .into());
1717                                }
1718                            };
1719                            doc.insert("CANONICALIZE_HOST_NAME", val);
1720                        }
1721                        Some(val) => {
1722                            doc.insert("CANONICALIZE_HOST_NAME", val);
1723                        }
1724                        None => {}
1725                    }
1726
1727                    credential.mechanism_properties = Some(doc);
1728                }
1729
1730                #[cfg(feature = "gssapi-auth")]
1731                if mechanism == &AuthMechanism::Gssapi {
1732                    // Set mongodb as the default SERVICE_NAME if none is provided
1733                    let mut doc = if let Some(doc) = credential.mechanism_properties.take() {
1734                        doc
1735                    } else {
1736                        Document::new()
1737                    };
1738
1739                    if !doc.contains_key("SERVICE_NAME") {
1740                        doc.insert("SERVICE_NAME", "mongodb");
1741                    }
1742
1743                    credential.mechanism_properties = Some(doc);
1744                }
1745
1746                credential.mechanism = Some(mechanism.clone());
1747                mechanism.validate_credential(credential)?;
1748            }
1749            None => {
1750                if let Some(ref mut credential) = conn_str.credential {
1751                    credential.source = parts.auth_source;
1752                } else if authentication_requested {
1753                    return Err(ErrorKind::InvalidArgument {
1754                        message: "username and mechanism both not provided, but authentication \
1755                                  was requested"
1756                            .to_string(),
1757                    }
1758                    .into());
1759                }
1760            }
1761        };
1762
1763        // set default database.
1764        conn_str.default_database = db;
1765
1766        if conn_str.tls.is_none() && conn_str.is_srv() {
1767            conn_str.tls = Some(Tls::Enabled(Default::default()));
1768        }
1769
1770        #[cfg(feature = "socks5-proxy")]
1771        {
1772            if let Some(host) = parts.proxy_host {
1773                let mut proxy = Socks5Proxy::builder().host(host).build();
1774                if let Some(port) = parts.proxy_port {
1775                    proxy.port = Some(port);
1776                }
1777                match (parts.proxy_username, parts.proxy_password) {
1778                    (Some(username), Some(password)) => {
1779                        proxy.authentication = Some((username, password))
1780                    }
1781                    (None, None) => {}
1782                    _ => {
1783                        return Err(Error::invalid_argument(
1784                            "proxy username and password must both be specified as nonempty \
1785                             strings or unset",
1786                        ));
1787                    }
1788                }
1789                conn_str.socks5_proxy = Some(proxy);
1790            } else {
1791                let error = |option: &str| {
1792                    Error::invalid_argument(format!(
1793                        "{option} cannot be set if {PROXY_HOST} is unspecified"
1794                    ))
1795                };
1796                if parts.proxy_port.is_some() {
1797                    return Err(error(PROXY_PORT));
1798                }
1799                if parts.proxy_username.is_some() {
1800                    return Err(error(PROXY_USERNAME));
1801                }
1802                if parts.proxy_password.is_some() {
1803                    return Err(error(PROXY_PASSWORD));
1804                }
1805            }
1806        }
1807
1808        Ok(conn_str)
1809    }
1810
1811    /// Un-parses a [`ConnectionString`] struct back into a MongoDB connection string.
1812    fn to_uri_str(&self) -> String {
1813        let ConnectionString {
1814            host_info,
1815            app_name,
1816            tls,
1817            heartbeat_frequency,
1818            local_threshold,
1819            read_concern,
1820            replica_set,
1821            write_concern,
1822            server_selection_timeout,
1823            max_pool_size,
1824            min_pool_size,
1825            max_connecting,
1826            max_idle_time,
1827            #[cfg(any(
1828                feature = "zstd-compression",
1829                feature = "zlib-compression",
1830                feature = "snappy-compression"
1831            ))]
1832            compressors,
1833            connect_timeout,
1834            retry_reads,
1835            retry_writes,
1836            server_monitoring_mode: _,
1837            direct_connection,
1838            credential,
1839            default_database,
1840            load_balanced,
1841            #[allow(deprecated)]
1842            socket_timeout,
1843            read_preference,
1844            uuid_representation,
1845            srv_max_hosts,
1846            srv_service_name: _,
1847            wait_queue_timeout,
1848            tls_insecure,
1849            #[cfg(feature = "socks5-proxy")]
1850            socks5_proxy,
1851            #[cfg(test)]
1852                original_uri: _,
1853        } = self;
1854
1855        let mut res: String = String::new();
1856        let mut opts = String::new();
1857
1858        if self.is_srv() {
1859            res.push_str("mongodb+srv://");
1860        } else {
1861            res.push_str("mongodb://");
1862        }
1863
1864        if let Some(credential) = credential {
1865            if let Some(username) = &credential.username {
1866                res.push_str(&percent_encode(username));
1867                if let Some(password) = &credential.password {
1868                    res.push_str(&format!(":{}", &percent_encode(password)))
1869                }
1870                res.push('@');
1871            }
1872        }
1873
1874        if self.is_srv() {
1875            if let HostInfo::DnsRecord(dns) = host_info {
1876                res.push_str(dns);
1877            }
1878        } else if let HostInfo::HostIdentifiers(hosts) = host_info {
1879            res.push_str(
1880                &hosts
1881                    .iter()
1882                    .map(|h| h.to_string())
1883                    .collect::<Vec<_>>()
1884                    .join(","),
1885            );
1886        }
1887
1888        res.push('/');
1889
1890        if let Some(authdb) = default_database {
1891            res.push_str(authdb);
1892        }
1893
1894        if let Some(replica_set) = replica_set {
1895            opts.push_str(&format!("&replicaSet={replica_set}"));
1896        }
1897
1898        if let Some(direct_connection) = direct_connection {
1899            opts.push_str(&format!("&directConnection={direct_connection}"));
1900        }
1901
1902        if let Some(tls) = tls {
1903            match tls {
1904                Tls::Enabled(options) => {
1905                    opts.push_str("&tls=true");
1906
1907                    if let Some(cert_key_file_path) = &options.cert_key_file_path {
1908                        opts.push_str(&format!(
1909                            "&tlsCertificateKeyFile={}",
1910                            cert_key_file_path.to_str().unwrap()
1911                        ));
1912                    }
1913
1914                    #[cfg(feature = "cert-key-password")]
1915                    if let Some(tls_certificate_key_file_password) =
1916                        &options.tls_certificate_key_file_password
1917                    {
1918                        opts.push_str(&format!(
1919                            "&tlsCertificateKeyFilePassword={}",
1920                            std::str::from_utf8(tls_certificate_key_file_password).unwrap()
1921                        ));
1922                    }
1923
1924                    if let Some(ca_file_path) = &options.ca_file_path {
1925                        opts.push_str(&format!("&tlsCAFile={}", ca_file_path.to_str().unwrap()));
1926                    }
1927
1928                    if let Some(allow_invalid_certificates) = options.allow_invalid_certificates {
1929                        opts.push_str(&format!(
1930                            "&tlsAllowInvalidCertificates={allow_invalid_certificates}"
1931                        ));
1932                    }
1933
1934                    #[cfg(feature = "openssl-tls")]
1935                    if let Some(allow_invalid_hostnames) = options.allow_invalid_hostnames {
1936                        opts.push_str(&format!(
1937                            "&tlsAllowInvalidHostnames={allow_invalid_hostnames}"
1938                        ));
1939                    }
1940
1941                    if let Some(tls_insecure) = tls_insecure {
1942                        opts.push_str(&format!("&tlsInsecure={tls_insecure}"));
1943                    }
1944                }
1945                Tls::Disabled => {
1946                    opts.push_str("&tls=false");
1947                }
1948            }
1949        }
1950
1951        if let Some(connect_timeout) = connect_timeout {
1952            opts.push_str(&format!(
1953                "&connectTimeoutMS={}",
1954                connect_timeout.as_millis()
1955            ));
1956        }
1957
1958        if let Some(socket_timeout) = socket_timeout {
1959            opts.push_str(&format!("&socketTimeoutMS={}", socket_timeout.as_millis()));
1960        }
1961
1962        #[cfg(any(
1963            feature = "zstd-compression",
1964            feature = "zlib-compression",
1965            feature = "snappy-compression"
1966        ))]
1967        if let Some(compressors) = compressors {
1968            opts.push_str(&format!(
1969                "&compressors={}",
1970                compressors
1971                    .iter()
1972                    .map(|c| c.name())
1973                    .collect::<Vec<_>>()
1974                    .join(",")
1975            ));
1976        }
1977
1978        #[cfg(feature = "zlib-compression")]
1979        if let Some(compressors) = compressors {
1980            for compressor in compressors {
1981                if let Compressor::Zlib { level: Some(level) } = compressor {
1982                    opts.push_str(&format!("&zlibCompressionLevel={level}"));
1983                }
1984            }
1985        }
1986
1987        if let Some(max_pool_size) = max_pool_size {
1988            opts.push_str(&format!("&maxPoolSize={max_pool_size}"));
1989        }
1990
1991        if let Some(min_pool_size) = min_pool_size {
1992            opts.push_str(&format!("&minPoolSize={min_pool_size}"));
1993        }
1994
1995        if let Some(max_connecting) = max_connecting {
1996            opts.push_str(&format!("&maxConnecting={max_connecting}"));
1997        }
1998
1999        if let Some(max_idle_time) = max_idle_time {
2000            opts.push_str(&format!("&maxIdleTimeMS={}", max_idle_time.as_millis()));
2001        }
2002
2003        if let Some(wait_queue_timeout) = wait_queue_timeout {
2004            opts.push_str(&format!(
2005                "&waitQueueTimeoutMS={}",
2006                wait_queue_timeout.as_millis()
2007            ));
2008        }
2009
2010        if let Some(write_concern) = write_concern {
2011            if let Some(w) = &write_concern.w {
2012                match w {
2013                    Acknowledgment::Nodes(i) => {
2014                        opts.push_str(&format!("&w={i}"));
2015                    }
2016                    Acknowledgment::Majority => {
2017                        opts.push_str("&w=majority");
2018                    }
2019                    Acknowledgment::Custom(tag) => {
2020                        opts.push_str(&format!("&w={tag}"));
2021                    }
2022                }
2023            }
2024
2025            if let Some(w_timeout) = write_concern.w_timeout {
2026                opts.push_str(&format!("&wtimeoutMS={}", w_timeout.as_millis()));
2027            }
2028
2029            if let Some(journal) = write_concern.journal {
2030                opts.push_str(&format!("&journal={journal}"));
2031            }
2032        }
2033
2034        if let Some(read_concern) = read_concern {
2035            opts.push_str(&format!(
2036                "&readConcernLevel={}",
2037                read_concern.level.as_str()
2038            ));
2039        }
2040
2041        if let Some(read_preference) = read_preference {
2042            opts.push_str(&format!("&readPreference={}", read_preference.mode()));
2043
2044            if let Some(max_staleness) = read_preference.max_staleness() {
2045                opts.push_str(&format!("&maxStalenessSeconds={}", max_staleness.as_secs()));
2046            }
2047
2048            if let Some(tag_sets) = read_preference.tag_sets() {
2049                let ser_tag_set = |tag_set: &HashMap<String, String>| -> String {
2050                    let tags = tag_set
2051                        .iter()
2052                        .map(|(k, v)| format!("{k}:{v}"))
2053                        .collect::<Vec<_>>()
2054                        .join(",");
2055                    format!("&readPreferenceTags={tags}")
2056                };
2057                opts.push_str(
2058                    &tag_sets
2059                        .iter()
2060                        .map(ser_tag_set)
2061                        .collect::<Vec<_>>()
2062                        .join(""),
2063                )
2064            }
2065        }
2066
2067        if let Some(auth_source) = credential
2068            .as_ref()
2069            .and_then(|c: &Credential| c.source.as_ref())
2070        {
2071            opts.push_str(&format!("&authSource={auth_source}"));
2072        }
2073
2074        if let Some(auth_mechanism) = credential
2075            .as_ref()
2076            .and_then(|c: &Credential| c.mechanism.as_ref())
2077        {
2078            opts.push_str(&format!(
2079                "&authMechanism={}",
2080                auth_mechanism.as_str().to_uppercase()
2081            ));
2082        }
2083
2084        if let Some(auth_mechanism_properties) = credential
2085            .as_ref()
2086            .and_then(|c: &Credential| c.mechanism_properties.as_ref())
2087        {
2088            if !auth_mechanism_properties.is_empty() {
2089                opts.push_str(&format!(
2090                    "&authMechanismProperties={}",
2091                    &auth_mechanism_properties
2092                        .iter()
2093                        .map(|(k, v)| format!("{k}:{v}"))
2094                        .collect::<Vec<_>>()
2095                        .join(",")
2096                ))
2097            }
2098        }
2099
2100        if let Some(local_threshold) = local_threshold {
2101            opts.push_str(&format!(
2102                "&localThresholdMS={}",
2103                local_threshold.as_millis()
2104            ));
2105        }
2106
2107        if let Some(server_selection_timeout) = server_selection_timeout {
2108            opts.push_str(&format!(
2109                "&serverSelectionTimeoutMS={}",
2110                server_selection_timeout.as_millis()
2111            ));
2112        }
2113
2114        if let Some(heartbeat_frequency) = heartbeat_frequency {
2115            opts.push_str(&format!(
2116                "&heartbeatFrequencyMS={}",
2117                heartbeat_frequency.as_millis()
2118            ));
2119        }
2120
2121        if let Some(app_name) = app_name {
2122            opts.push_str(&format!("&appName={app_name}"));
2123        }
2124
2125        if let Some(retry_reads) = retry_reads {
2126            opts.push_str(&format!("&retryReads={retry_reads}"));
2127        }
2128
2129        if let Some(retry_writes) = retry_writes {
2130            opts.push_str(&format!("&retryWrites={retry_writes}"));
2131        }
2132
2133        if let Some(uuid_rep) = uuid_representation {
2134            let s = match uuid_rep {
2135                UuidRepresentation::Standard => "standard",
2136                UuidRepresentation::CSharpLegacy => "csharpLegacy",
2137                UuidRepresentation::JavaLegacy => "javaLegacy",
2138                UuidRepresentation::PythonLegacy => "pythonLegacy",
2139                _ => "",
2140            };
2141            opts.push_str(&format!("&uuidRepresentation={s}"));
2142        }
2143
2144        if let Some(load_balanced) = load_balanced {
2145            opts.push_str(&format!("&loadBalanced={load_balanced}"));
2146        }
2147
2148        if let Some(srv_max_hosts) = srv_max_hosts {
2149            opts.push_str(&format!("&srvMaxHosts={srv_max_hosts}"));
2150        }
2151
2152        #[cfg(feature = "socks5-proxy")]
2153        if let Some(proxy) = socks5_proxy {
2154            opts.push_str(&format!("&proxyHost={}", proxy.host));
2155            if let Some(port) = proxy.port {
2156                opts.push_str(&format!("&proxyPort={port}"));
2157            }
2158            if let Some((username, password)) = proxy.authentication.as_ref() {
2159                opts.push_str(&format!(
2160                    "&proxyUsername={username}&proxyPassword={password}"
2161                ));
2162            }
2163        }
2164
2165        if !opts.is_empty() {
2166            opts.replace_range(0..1, "?"); // mark start of options
2167            res.push_str(&opts);
2168        }
2169
2170        res
2171    }
2172
2173    /// Amount of time spent attempting to check out a connection from a server's connection pool
2174    /// before timing out.  Not supported by the Rust driver.
2175    pub fn wait_queue_timeout(&self) -> Option<Duration> {
2176        self.wait_queue_timeout
2177    }
2178
2179    /// Relax TLS constraints as much as possible (e.g. allowing invalid certificates or hostname
2180    /// mismatches). This option can only be set in a URI. If it is set in a URI provided to
2181    /// [`ConnectionString::parse`], [`TlsOptions::allow_invalid_certificates`] and
2182    /// [`TlsOptions::allow_invalid_hostnames`] are set to its value.
2183    pub fn tls_insecure(&self) -> Option<bool> {
2184        self.tls_insecure
2185    }
2186
2187    fn is_srv(&self) -> bool {
2188        matches!(self.host_info, HostInfo::DnsRecord(_))
2189    }
2190
2191    fn parse_options(&mut self, options: &str) -> Result<ConnectionStringParts> {
2192        let mut parts = ConnectionStringParts::default();
2193        if options.is_empty() {
2194            return Ok(parts);
2195        }
2196
2197        let mut keys = HashSet::new();
2198
2199        for option_pair in options.split('&') {
2200            let (key, value) = match option_pair.split_once('=') {
2201                Some((key, value)) => (key.to_lowercase(), value),
2202                None => {
2203                    return Err(Error::invalid_argument(format!(
2204                        "connection string option is not a 'key=value' pair: {option_pair}"
2205                    )))
2206                }
2207            };
2208
2209            if !keys.insert(key.clone()) && key != "readpreferencetags" {
2210                return Err(Error::invalid_argument(
2211                    "repeated options are not allowed in the connection string",
2212                ));
2213            }
2214
2215            self.parse_option_pair(
2216                &mut parts,
2217                &key,
2218                percent_encoding::percent_decode(value.as_bytes())
2219                    .decode_utf8_lossy()
2220                    .as_ref(),
2221            )?;
2222        }
2223
2224        if keys.contains(TLS_INSECURE) {
2225            #[cfg(feature = "openssl-tls")]
2226            let disallowed = [TLS_ALLOW_INVALID_CERTIFICATES, TLS_ALLOW_INVALID_HOSTNAMES];
2227            #[cfg(not(feature = "openssl-tls"))]
2228            let disallowed = [TLS_ALLOW_INVALID_CERTIFICATES];
2229            for option in disallowed {
2230                if keys.contains(option) {
2231                    return Err(Error::invalid_argument(format!(
2232                        "cannot set both {TLS_INSECURE} and {option} in the connection string"
2233                    )));
2234                }
2235            }
2236        }
2237
2238        if let Some(tags) = parts.read_preference_tags.take() {
2239            self.read_preference = match self.read_preference.take() {
2240                Some(read_pref) => Some(read_pref.with_tags(tags)?),
2241                None => {
2242                    return Err(ErrorKind::InvalidArgument {
2243                        message: "cannot set read preference tags without also setting read \
2244                                  preference mode"
2245                            .to_string(),
2246                    }
2247                    .into())
2248                }
2249            };
2250        }
2251
2252        if let Some(max_staleness) = parts.max_staleness.take() {
2253            self.read_preference = match self.read_preference.take() {
2254                Some(read_pref) => Some(read_pref.with_max_staleness(max_staleness)?),
2255                None => {
2256                    return Err(ErrorKind::InvalidArgument {
2257                        message: "cannot set max staleness without also setting read preference \
2258                                  mode"
2259                            .to_string(),
2260                    }
2261                    .into())
2262                }
2263            };
2264        }
2265
2266        if let Some(true) = self.direct_connection {
2267            if self.is_srv() {
2268                return Err(ErrorKind::InvalidArgument {
2269                    message: "cannot use SRV-style URI with directConnection=true".to_string(),
2270                }
2271                .into());
2272            }
2273        }
2274
2275        #[cfg(feature = "zlib-compression")]
2276        if let Some(zlib_compression_level) = parts.zlib_compression {
2277            if let Some(compressors) = self.compressors.as_mut() {
2278                for compressor in compressors {
2279                    compressor.write_zlib_level(zlib_compression_level)?;
2280                }
2281            }
2282        }
2283        #[cfg(not(feature = "zlib-compression"))]
2284        if parts.zlib_compression.is_some() {
2285            return Err(ErrorKind::InvalidArgument {
2286                message: "zlibCompressionLevel may not be specified without the zlib-compression \
2287                          feature flag enabled"
2288                    .into(),
2289            }
2290            .into());
2291        }
2292
2293        Ok(parts)
2294    }
2295
2296    fn parse_option_pair(
2297        &mut self,
2298        parts: &mut ConnectionStringParts,
2299        key: &str,
2300        value: &str,
2301    ) -> Result<()> {
2302        macro_rules! get_bool {
2303            ($value:expr, $option:expr) => {
2304                match $value {
2305                    "true" => true,
2306                    "false" => false,
2307                    _ => {
2308                        return Err(ErrorKind::InvalidArgument {
2309                            message: format!(
2310                                "connection string `{}` option must be a boolean",
2311                                $option,
2312                            ),
2313                        }
2314                        .into())
2315                    }
2316                }
2317            };
2318        }
2319
2320        macro_rules! get_duration {
2321            ($value:expr, $option:expr) => {
2322                match $value.parse::<u64>() {
2323                    Ok(i) => i,
2324                    _ => {
2325                        return Err(ErrorKind::InvalidArgument {
2326                            message: format!(
2327                                "connection string `{}` option must be a non-negative integer",
2328                                $option
2329                            ),
2330                        }
2331                        .into())
2332                    }
2333                }
2334            };
2335        }
2336
2337        macro_rules! get_u32 {
2338            ($value:expr, $option:expr) => {
2339                match value.parse::<u32>() {
2340                    Ok(u) => u,
2341                    Err(_) => {
2342                        return Err(ErrorKind::InvalidArgument {
2343                            message: format!(
2344                                "connection string `{}` argument must be a positive integer",
2345                                $option,
2346                            ),
2347                        }
2348                        .into())
2349                    }
2350                }
2351            };
2352        }
2353
2354        macro_rules! get_i32 {
2355            ($value:expr, $option:expr) => {
2356                match value.parse::<i32>() {
2357                    Ok(u) => u,
2358                    Err(_) => {
2359                        return Err(ErrorKind::InvalidArgument {
2360                            message: format!(
2361                                "connection string `{}` argument must be an integer",
2362                                $option
2363                            ),
2364                        }
2365                        .into())
2366                    }
2367                }
2368            };
2369        }
2370
2371        match key {
2372            "appname" => {
2373                self.app_name = Some(value.into());
2374            }
2375            "authmechanism" => {
2376                parts.auth_mechanism = Some(AuthMechanism::from_str(value)?);
2377            }
2378            "authsource" => parts.auth_source = Some(value.to_string()),
2379            "authmechanismproperties" => {
2380                let mut properties = Document::new();
2381
2382                for property in value.split(",") {
2383                    let Some((k, v)) = property.split_once(":") else {
2384                        return Err(Error::invalid_argument(format!(
2385                            "each entry in authMechanismProperties must be a colon-separated \
2386                             key-value pair, got {property}"
2387                        )));
2388                    };
2389                    if k == "ALLOWED_HOSTS" || k == "OIDC_CALLBACK" || k == "OIDC_HUMAN_CALLBACK" {
2390                        return Err(Error::invalid_argument(format!(
2391                            "{k} must only be specified through client options"
2392                        )));
2393                    }
2394                    properties.insert(k, v);
2395                }
2396
2397                parts.auth_mechanism_properties = Some(properties);
2398            }
2399            #[cfg(any(
2400                feature = "zstd-compression",
2401                feature = "zlib-compression",
2402                feature = "snappy-compression"
2403            ))]
2404            "compressors" => {
2405                let mut compressors: Option<Vec<Compressor>> = None;
2406                for compressor in value.split(',') {
2407                    let compressor = Compressor::from_str(compressor)?;
2408                    compressors
2409                        .get_or_insert_with(Default::default)
2410                        .push(compressor);
2411                }
2412                self.compressors = compressors;
2413            }
2414            k @ "connecttimeoutms" => {
2415                self.connect_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2416            }
2417            k @ "directconnection" => {
2418                self.direct_connection = Some(get_bool!(value, k));
2419            }
2420            k @ "heartbeatfrequencyms" => {
2421                self.heartbeat_frequency = Some(Duration::from_millis(get_duration!(value, k)));
2422            }
2423            k @ "journal" => {
2424                let write_concern = self.write_concern.get_or_insert_with(Default::default);
2425                write_concern.journal = Some(get_bool!(value, k));
2426            }
2427            k @ "loadbalanced" => {
2428                self.load_balanced = Some(get_bool!(value, k));
2429            }
2430            k @ "localthresholdms" => {
2431                self.local_threshold = Some(Duration::from_millis(get_duration!(value, k)))
2432            }
2433            k @ "maxidletimems" => {
2434                self.max_idle_time = Some(Duration::from_millis(get_duration!(value, k)));
2435            }
2436            "maxstalenessseconds" => {
2437                let max_staleness_seconds = value.parse::<i64>().map_err(|e| {
2438                    Error::invalid_argument(format!("invalid maxStalenessSeconds value: {e}"))
2439                })?;
2440
2441                let max_staleness = match max_staleness_seconds.cmp(&-1) {
2442                    Ordering::Less => {
2443                        return Err(Error::invalid_argument(format!(
2444                            "maxStalenessSeconds must be -1 or positive, instead got \
2445                             {max_staleness_seconds}"
2446                        )));
2447                    }
2448                    Ordering::Equal => {
2449                        // -1 maxStaleness means no maxStaleness, which is the default
2450                        return Ok(());
2451                    }
2452                    Ordering::Greater => {
2453                        // unwrap safety: `max_staleness_seconds` will always be >= 0
2454                        Duration::from_secs(max_staleness_seconds.try_into().unwrap())
2455                    }
2456                };
2457
2458                parts.max_staleness = Some(max_staleness);
2459            }
2460            k @ "maxpoolsize" => {
2461                self.max_pool_size = Some(get_u32!(value, k));
2462            }
2463            k @ "minpoolsize" => {
2464                self.min_pool_size = Some(get_u32!(value, k));
2465            }
2466            k @ "maxconnecting" => {
2467                self.max_connecting = Some(get_u32!(value, k));
2468            }
2469            "readconcernlevel" => {
2470                self.read_concern = Some(ReadConcernLevel::from_str(value).into());
2471            }
2472            "readpreference" => {
2473                self.read_preference = Some(match &value.to_lowercase()[..] {
2474                    "primary" => ReadPreference::Primary,
2475                    "secondary" => ReadPreference::Secondary {
2476                        options: Default::default(),
2477                    },
2478                    "primarypreferred" => ReadPreference::PrimaryPreferred {
2479                        options: Default::default(),
2480                    },
2481                    "secondarypreferred" => ReadPreference::SecondaryPreferred {
2482                        options: Default::default(),
2483                    },
2484                    "nearest" => ReadPreference::Nearest {
2485                        options: Default::default(),
2486                    },
2487                    other => {
2488                        return Err(ErrorKind::InvalidArgument {
2489                            message: format!("'{other}' is not a valid read preference"),
2490                        }
2491                        .into())
2492                    }
2493                });
2494            }
2495            "readpreferencetags" => {
2496                let tags: Result<TagSet> = if value.is_empty() {
2497                    Ok(TagSet::new())
2498                } else {
2499                    value
2500                        .split(',')
2501                        .map(|tag| {
2502                            let mut values = tag.split(':');
2503
2504                            match (values.next(), values.next()) {
2505                                (Some(key), Some(value)) => {
2506                                    Ok((key.to_string(), value.to_string()))
2507                                }
2508                                _ => Err(ErrorKind::InvalidArgument {
2509                                    message: format!(
2510                                        "'{value}' is not a valid read preference tag (which must \
2511                                         be of the form 'key:value'",
2512                                    ),
2513                                }
2514                                .into()),
2515                            }
2516                        })
2517                        .collect()
2518                };
2519
2520                parts
2521                    .read_preference_tags
2522                    .get_or_insert_with(Vec::new)
2523                    .push(tags?);
2524            }
2525            "replicaset" => {
2526                self.replica_set = Some(value.to_string());
2527            }
2528            k @ "retrywrites" => {
2529                self.retry_writes = Some(get_bool!(value, k));
2530            }
2531            k @ "retryreads" => {
2532                self.retry_reads = Some(get_bool!(value, k));
2533            }
2534            "servermonitoringmode" => {
2535                self.server_monitoring_mode = Some(match value.to_lowercase().as_str() {
2536                    "stream" => ServerMonitoringMode::Stream,
2537                    "poll" => ServerMonitoringMode::Poll,
2538                    "auto" => ServerMonitoringMode::Auto,
2539                    other => {
2540                        return Err(Error::invalid_argument(format!(
2541                            "{other:?} is not a valid server monitoring mode"
2542                        )));
2543                    }
2544                });
2545            }
2546            k @ "serverselectiontimeoutms" => {
2547                self.server_selection_timeout = Some(Duration::from_millis(get_duration!(value, k)))
2548            }
2549            #[allow(deprecated)]
2550            k @ "sockettimeoutms" => {
2551                self.socket_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2552            }
2553            k @ "srvmaxhosts" => {
2554                self.srv_max_hosts = Some(get_u32!(value, k));
2555            }
2556            "srvservicename" => {
2557                self.srv_service_name = Some(value.to_string());
2558            }
2559            k @ "tls" | k @ "ssl" => {
2560                let tls = get_bool!(value, k);
2561
2562                match self.tls {
2563                    Some(Tls::Enabled(_)) if !tls => {
2564                        return Err(Error::invalid_argument(
2565                            "cannot set {key}={tls} if other TLS options are set",
2566                        ))
2567                    }
2568                    Some(Tls::Disabled) if tls => {
2569                        return Err(Error::invalid_argument(
2570                            "cannot set {key}={tls} if TLS is disabled",
2571                        ))
2572                    }
2573                    None => {
2574                        if tls {
2575                            self.tls = Some(Tls::Enabled(Default::default()))
2576                        } else {
2577                            self.tls = Some(Tls::Disabled)
2578                        }
2579                    }
2580                    _ => {}
2581                }
2582            }
2583            TLS_INSECURE => {
2584                let val = get_bool!(value, key);
2585                self.tls_insecure = Some(val);
2586
2587                match self
2588                    .tls
2589                    .get_or_insert_with(|| Tls::Enabled(Default::default()))
2590                {
2591                    Tls::Enabled(ref mut options) => {
2592                        options.allow_invalid_certificates = Some(val);
2593                        #[cfg(feature = "openssl-tls")]
2594                        {
2595                            options.allow_invalid_hostnames = Some(val);
2596                        }
2597                    }
2598                    Tls::Disabled => {
2599                        return Err(Error::invalid_argument(format!(
2600                            "cannot set {key} when TLS is disabled"
2601                        )));
2602                    }
2603                }
2604            }
2605            TLS_ALLOW_INVALID_CERTIFICATES => {
2606                let val = get_bool!(value, key);
2607
2608                match self
2609                    .tls
2610                    .get_or_insert_with(|| Tls::Enabled(Default::default()))
2611                {
2612                    Tls::Enabled(ref mut options) => {
2613                        options.allow_invalid_certificates = Some(val);
2614                    }
2615                    Tls::Disabled => {
2616                        return Err(Error::invalid_argument(format!(
2617                            "cannot set {key} when TLS is disabled"
2618                        )))
2619                    }
2620                }
2621            }
2622            "tlscafile" => match self.tls {
2623                Some(Tls::Disabled) => {
2624                    return Err(ErrorKind::InvalidArgument {
2625                        message: "'tlsCAFile' can't be set if tls=false".into(),
2626                    }
2627                    .into());
2628                }
2629                Some(Tls::Enabled(ref mut options)) => {
2630                    options.ca_file_path = Some(value.into());
2631                }
2632                None => {
2633                    self.tls = Some(Tls::Enabled(
2634                        TlsOptions::builder()
2635                            .ca_file_path(PathBuf::from(value))
2636                            .build(),
2637                    ))
2638                }
2639            },
2640            "tlscertificatekeyfile" => match self.tls {
2641                Some(Tls::Disabled) => {
2642                    return Err(ErrorKind::InvalidArgument {
2643                        message: "'tlsCertificateKeyFile' can't be set if tls=false".into(),
2644                    }
2645                    .into());
2646                }
2647                Some(Tls::Enabled(ref mut options)) => {
2648                    options.cert_key_file_path = Some(value.into());
2649                }
2650                None => {
2651                    self.tls = Some(Tls::Enabled(
2652                        TlsOptions::builder()
2653                            .cert_key_file_path(PathBuf::from(value))
2654                            .build(),
2655                    ))
2656                }
2657            },
2658            #[cfg(feature = "cert-key-password")]
2659            "tlscertificatekeyfilepassword" => match &mut self.tls {
2660                Some(Tls::Disabled) => {
2661                    return Err(ErrorKind::InvalidArgument {
2662                        message: "'tlsCertificateKeyFilePassword' can't be set if tls=false".into(),
2663                    }
2664                    .into());
2665                }
2666                Some(Tls::Enabled(options)) => {
2667                    options.tls_certificate_key_file_password = Some(value.as_bytes().to_vec());
2668                }
2669                None => {
2670                    self.tls = Some(Tls::Enabled(
2671                        TlsOptions::builder()
2672                            .tls_certificate_key_file_password(value.as_bytes().to_vec())
2673                            .build(),
2674                    ))
2675                }
2676            },
2677            #[cfg(not(feature = "cert-key-password"))]
2678            "tlscertificatekeyfilepassword" => {
2679                return Err(Error::invalid_argument(
2680                    "the cert-key-password feature must be enabled to specify \
2681                     tlsCertificateKeyFilePassword in the URI",
2682                ));
2683            }
2684            "uuidrepresentation" => match value.to_lowercase().as_str() {
2685                "csharplegacy" => self.uuid_representation = Some(UuidRepresentation::CSharpLegacy),
2686                "javalegacy" => self.uuid_representation = Some(UuidRepresentation::JavaLegacy),
2687                "pythonlegacy" => self.uuid_representation = Some(UuidRepresentation::PythonLegacy),
2688                _ => {
2689                    return Err(ErrorKind::InvalidArgument {
2690                        message: format!(
2691                            "connection string `uuidRepresentation` option can be one of \
2692                             `csharpLegacy`, `javaLegacy`, or `pythonLegacy`. Received invalid \
2693                             `{value}`"
2694                        ),
2695                    }
2696                    .into())
2697                }
2698            },
2699            "w" => {
2700                let write_concern = self.write_concern.get_or_insert_with(Default::default);
2701
2702                match value.parse::<i32>() {
2703                    Ok(w) => match u32::try_from(w) {
2704                        Ok(uw) => write_concern.w = Some(Acknowledgment::from(uw)),
2705                        Err(_) => {
2706                            return Err(ErrorKind::InvalidArgument {
2707                                message: "connection string `w` option cannot be a negative \
2708                                          integer"
2709                                    .to_string(),
2710                            }
2711                            .into())
2712                        }
2713                    },
2714                    Err(_) => {
2715                        write_concern.w = Some(Acknowledgment::from(value.to_string()));
2716                    }
2717                };
2718            }
2719            k @ "waitqueuetimeoutms" => {
2720                self.wait_queue_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2721            }
2722            k @ "wtimeoutms" => {
2723                let write_concern = self.write_concern.get_or_insert_with(Default::default);
2724                write_concern.w_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2725            }
2726            k @ "zlibcompressionlevel" => {
2727                let i = get_i32!(value, k);
2728                if i < -1 {
2729                    return Err(ErrorKind::InvalidArgument {
2730                        message: "'zlibCompressionLevel' cannot be less than -1".to_string(),
2731                    }
2732                    .into());
2733                }
2734
2735                if i > 9 {
2736                    return Err(ErrorKind::InvalidArgument {
2737                        message: "'zlibCompressionLevel' cannot be greater than 9".to_string(),
2738                    }
2739                    .into());
2740                }
2741
2742                parts.zlib_compression = Some(i);
2743            }
2744            #[cfg(feature = "socks5-proxy")]
2745            PROXY_HOST => parts.proxy_host = Some(value.to_string()),
2746            #[cfg(feature = "socks5-proxy")]
2747            PROXY_PORT => {
2748                let port = u16::from_str(value)
2749                    .map_err(|_| Error::invalid_argument(format!("invalid proxy port: {value}")))?;
2750                parts.proxy_port = Some(port);
2751            }
2752            #[cfg(feature = "socks5-proxy")]
2753            PROXY_USERNAME if !value.is_empty() => parts.proxy_username = Some(value.to_string()),
2754            #[cfg(feature = "socks5-proxy")]
2755            PROXY_PASSWORD if !value.is_empty() => parts.proxy_password = Some(value.to_string()),
2756            #[cfg(not(feature = "socks5-proxy"))]
2757            PROXY_HOST | PROXY_PORT | PROXY_USERNAME | PROXY_PASSWORD => {
2758                return Err(Error::invalid_argument(format!(
2759                    "cannot specify {key} if socks5-proxy feature is not enabled"
2760                )));
2761            }
2762
2763            other => {
2764                let (jaro_winkler, option) = URI_OPTIONS.iter().fold((0.0, ""), |acc, option| {
2765                    let jaro_winkler = jaro_winkler(option, other).abs();
2766                    if jaro_winkler > acc.0 {
2767                        return (jaro_winkler, option);
2768                    }
2769                    acc
2770                });
2771                let mut message = format!("{other} is an invalid option");
2772                if jaro_winkler >= 0.84 {
2773                    let _ = write!(message, ". An option with a similar name exists: {option}");
2774                }
2775                return Err(ErrorKind::InvalidArgument { message }.into());
2776            }
2777        }
2778
2779        Ok(())
2780    }
2781}
2782
2783impl FromStr for ConnectionString {
2784    type Err = Error;
2785    fn from_str(s: &str) -> Result<Self> {
2786        ConnectionString::parse(s)
2787    }
2788}
2789
2790impl<'de> Deserialize<'de> for ConnectionString {
2791    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2792    where
2793        D: Deserializer<'de>,
2794    {
2795        deserializer.deserialize_str(ConnectionStringVisitor)
2796    }
2797}
2798
2799impl Display for ConnectionString {
2800    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2801        write!(f, "{}", self.to_uri_str())
2802    }
2803}
2804
2805struct ConnectionStringVisitor;
2806
2807impl serde::de::Visitor<'_> for ConnectionStringVisitor {
2808    type Value = ConnectionString;
2809
2810    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
2811        write!(formatter, "a MongoDB connection string")
2812    }
2813
2814    fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
2815    where
2816        E: serde::de::Error,
2817    {
2818        ConnectionString::parse(v).map_err(serde::de::Error::custom)
2819    }
2820}
2821
2822#[cfg(test)]
2823mod tests {
2824    use std::time::Duration;
2825
2826    use pretty_assertions::assert_eq;
2827
2828    use super::{ClientOptions, ServerAddress};
2829    use crate::{
2830        concern::{Acknowledgment, ReadConcernLevel, WriteConcern},
2831        selection_criteria::{ReadPreference, ReadPreferenceOptions},
2832    };
2833
2834    macro_rules! tag_set {
2835        ( $($k:expr => $v:expr),* ) => {
2836            #[allow(clippy::let_and_return)]
2837            {
2838                use std::collections::HashMap;
2839
2840                #[allow(unused_mut)]
2841                let mut ts = HashMap::new();
2842                $(
2843                    ts.insert($k.to_string(), $v.to_string());
2844                )*
2845
2846                ts
2847            }
2848        }
2849    }
2850
2851    fn host_without_port(hostname: &str) -> ServerAddress {
2852        ServerAddress::Tcp {
2853            host: hostname.to_string(),
2854            port: None,
2855        }
2856    }
2857
2858    #[test]
2859    fn test_parse_address_with_from_str() {
2860        let x = "localhost:27017".parse::<ServerAddress>().unwrap();
2861        match x {
2862            ServerAddress::Tcp { host, port } => {
2863                assert_eq!(host, "localhost");
2864                assert_eq!(port, Some(27017));
2865            }
2866            #[cfg(unix)]
2867            _ => panic!("expected ServerAddress::Tcp"),
2868        }
2869
2870        // Port defaults to 27017 (so this doesn't fail)
2871        let x = "localhost".parse::<ServerAddress>().unwrap();
2872        match x {
2873            ServerAddress::Tcp { host, port } => {
2874                assert_eq!(host, "localhost");
2875                assert_eq!(port, None);
2876            }
2877            #[cfg(unix)]
2878            _ => panic!("expected ServerAddress::Tcp"),
2879        }
2880
2881        let x = "localhost:not a number".parse::<ServerAddress>();
2882        assert!(x.is_err());
2883
2884        #[cfg(unix)]
2885        {
2886            let x = "/path/to/socket.sock".parse::<ServerAddress>().unwrap();
2887            match x {
2888                ServerAddress::Unix { path } => {
2889                    assert_eq!(path.to_str().unwrap(), "/path/to/socket.sock");
2890                }
2891                _ => panic!("expected ServerAddress::Unix"),
2892            }
2893        }
2894    }
2895
2896    #[tokio::test]
2897    async fn fails_without_scheme() {
2898        assert!(ClientOptions::parse("localhost:27017").await.is_err());
2899    }
2900
2901    #[tokio::test]
2902    async fn fails_with_invalid_scheme() {
2903        assert!(ClientOptions::parse("mangodb://localhost:27017")
2904            .await
2905            .is_err());
2906    }
2907
2908    #[tokio::test]
2909    async fn fails_with_nothing_after_scheme() {
2910        assert!(ClientOptions::parse("mongodb://").await.is_err());
2911    }
2912
2913    #[tokio::test]
2914    async fn fails_with_only_slash_after_scheme() {
2915        assert!(ClientOptions::parse("mongodb:///").await.is_err());
2916    }
2917
2918    #[tokio::test]
2919    async fn fails_with_no_host() {
2920        assert!(ClientOptions::parse("mongodb://:27017").await.is_err());
2921    }
2922
2923    #[tokio::test]
2924    async fn no_port() {
2925        let uri = "mongodb://localhost";
2926
2927        assert_eq!(
2928            ClientOptions::parse(uri).await.unwrap(),
2929            ClientOptions {
2930                hosts: vec![host_without_port("localhost")],
2931                original_uri: Some(uri.into()),
2932                ..Default::default()
2933            }
2934        );
2935    }
2936
2937    #[tokio::test]
2938    async fn no_port_trailing_slash() {
2939        let uri = "mongodb://localhost/";
2940
2941        assert_eq!(
2942            ClientOptions::parse(uri).await.unwrap(),
2943            ClientOptions {
2944                hosts: vec![host_without_port("localhost")],
2945                original_uri: Some(uri.into()),
2946                ..Default::default()
2947            }
2948        );
2949    }
2950
2951    #[tokio::test]
2952    async fn with_port() {
2953        let uri = "mongodb://localhost/";
2954
2955        assert_eq!(
2956            ClientOptions::parse(uri).await.unwrap(),
2957            ClientOptions {
2958                hosts: vec![ServerAddress::Tcp {
2959                    host: "localhost".to_string(),
2960                    port: Some(27017),
2961                }],
2962                original_uri: Some(uri.into()),
2963                ..Default::default()
2964            }
2965        );
2966    }
2967
2968    #[tokio::test]
2969    async fn with_port_and_trailing_slash() {
2970        let uri = "mongodb://localhost:27017/";
2971
2972        assert_eq!(
2973            ClientOptions::parse(uri).await.unwrap(),
2974            ClientOptions {
2975                hosts: vec![ServerAddress::Tcp {
2976                    host: "localhost".to_string(),
2977                    port: Some(27017),
2978                }],
2979                original_uri: Some(uri.into()),
2980                ..Default::default()
2981            }
2982        );
2983    }
2984
2985    #[tokio::test]
2986    async fn with_read_concern() {
2987        let uri = "mongodb://localhost:27017/?readConcernLevel=foo";
2988
2989        assert_eq!(
2990            ClientOptions::parse(uri).await.unwrap(),
2991            ClientOptions {
2992                hosts: vec![ServerAddress::Tcp {
2993                    host: "localhost".to_string(),
2994                    port: Some(27017),
2995                }],
2996                read_concern: Some(ReadConcernLevel::Custom("foo".to_string()).into()),
2997                original_uri: Some(uri.into()),
2998                ..Default::default()
2999            }
3000        );
3001    }
3002
3003    #[tokio::test]
3004    async fn with_w_negative_int() {
3005        assert!(ClientOptions::parse("mongodb://localhost:27017/?w=-1")
3006            .await
3007            .is_err());
3008    }
3009
3010    #[tokio::test]
3011    async fn with_w_non_negative_int() {
3012        let uri = "mongodb://localhost:27017/?w=1";
3013        let write_concern = WriteConcern::builder().w(Acknowledgment::from(1)).build();
3014
3015        assert_eq!(
3016            ClientOptions::parse(uri).await.unwrap(),
3017            ClientOptions {
3018                hosts: vec![ServerAddress::Tcp {
3019                    host: "localhost".to_string(),
3020                    port: Some(27017),
3021                }],
3022                write_concern: Some(write_concern),
3023                original_uri: Some(uri.into()),
3024                ..Default::default()
3025            }
3026        );
3027    }
3028
3029    #[tokio::test]
3030    async fn with_w_string() {
3031        let uri = "mongodb://localhost:27017/?w=foo";
3032        let write_concern = WriteConcern::builder()
3033            .w(Acknowledgment::from("foo".to_string()))
3034            .build();
3035
3036        assert_eq!(
3037            ClientOptions::parse(uri).await.unwrap(),
3038            ClientOptions {
3039                hosts: vec![ServerAddress::Tcp {
3040                    host: "localhost".to_string(),
3041                    port: Some(27017),
3042                }],
3043                write_concern: Some(write_concern),
3044                original_uri: Some(uri.into()),
3045                ..Default::default()
3046            }
3047        );
3048    }
3049
3050    #[tokio::test]
3051    async fn with_invalid_j() {
3052        assert!(
3053            ClientOptions::parse("mongodb://localhost:27017/?journal=foo")
3054                .await
3055                .is_err()
3056        );
3057    }
3058
3059    #[tokio::test]
3060    async fn with_j() {
3061        let uri = "mongodb://localhost:27017/?journal=true";
3062        let write_concern = WriteConcern::builder().journal(true).build();
3063
3064        assert_eq!(
3065            ClientOptions::parse(uri).await.unwrap(),
3066            ClientOptions {
3067                hosts: vec![ServerAddress::Tcp {
3068                    host: "localhost".to_string(),
3069                    port: Some(27017),
3070                }],
3071                write_concern: Some(write_concern),
3072                original_uri: Some(uri.into()),
3073                ..Default::default()
3074            }
3075        );
3076    }
3077
3078    #[tokio::test]
3079    async fn with_wtimeout_non_int() {
3080        assert!(
3081            ClientOptions::parse("mongodb://localhost:27017/?wtimeoutMS=foo")
3082                .await
3083                .is_err()
3084        );
3085    }
3086
3087    #[tokio::test]
3088    async fn with_wtimeout_negative_int() {
3089        assert!(
3090            ClientOptions::parse("mongodb://localhost:27017/?wtimeoutMS=-1")
3091                .await
3092                .is_err()
3093        );
3094    }
3095
3096    #[tokio::test]
3097    async fn with_wtimeout() {
3098        let uri = "mongodb://localhost:27017/?wtimeoutMS=27";
3099        let write_concern = WriteConcern::builder()
3100            .w_timeout(Duration::from_millis(27))
3101            .build();
3102
3103        assert_eq!(
3104            ClientOptions::parse(uri).await.unwrap(),
3105            ClientOptions {
3106                hosts: vec![ServerAddress::Tcp {
3107                    host: "localhost".to_string(),
3108                    port: Some(27017),
3109                }],
3110                write_concern: Some(write_concern),
3111                original_uri: Some(uri.into()),
3112                ..Default::default()
3113            }
3114        );
3115    }
3116
3117    #[tokio::test]
3118    async fn with_all_write_concern_options() {
3119        let uri = "mongodb://localhost:27017/?w=majority&journal=false&wtimeoutMS=27";
3120        let write_concern = WriteConcern::builder()
3121            .w(Acknowledgment::Majority)
3122            .journal(false)
3123            .w_timeout(Duration::from_millis(27))
3124            .build();
3125
3126        assert_eq!(
3127            ClientOptions::parse(uri).await.unwrap(),
3128            ClientOptions {
3129                hosts: vec![ServerAddress::Tcp {
3130                    host: "localhost".to_string(),
3131                    port: Some(27017),
3132                }],
3133                write_concern: Some(write_concern),
3134                original_uri: Some(uri.into()),
3135                ..Default::default()
3136            }
3137        );
3138    }
3139
3140    #[tokio::test]
3141    async fn with_mixed_options() {
3142        let uri = "mongodb://localhost,localhost:27018/?w=majority&readConcernLevel=majority&\
3143                   journal=false&wtimeoutMS=27&replicaSet=foo&heartbeatFrequencyMS=1000&\
3144                   localThresholdMS=4000&readPreference=secondaryPreferred&readpreferencetags=dc:\
3145                   ny,rack:1&serverselectiontimeoutms=2000&readpreferencetags=dc:ny&\
3146                   readpreferencetags=";
3147        let write_concern = WriteConcern::builder()
3148            .w(Acknowledgment::Majority)
3149            .journal(false)
3150            .w_timeout(Duration::from_millis(27))
3151            .build();
3152
3153        assert_eq!(
3154            ClientOptions::parse(uri).await.unwrap(),
3155            ClientOptions {
3156                hosts: vec![
3157                    ServerAddress::Tcp {
3158                        host: "localhost".to_string(),
3159                        port: None,
3160                    },
3161                    ServerAddress::Tcp {
3162                        host: "localhost".to_string(),
3163                        port: Some(27018),
3164                    },
3165                ],
3166                selection_criteria: Some(
3167                    ReadPreference::SecondaryPreferred {
3168                        options: Some(
3169                            ReadPreferenceOptions::builder()
3170                                .tag_sets(vec![
3171                                    tag_set! {
3172                                        "dc" => "ny",
3173                                        "rack" => "1"
3174                                    },
3175                                    tag_set! {
3176                                        "dc" => "ny"
3177                                    },
3178                                    tag_set! {},
3179                                ])
3180                                .build()
3181                        )
3182                    }
3183                    .into()
3184                ),
3185                read_concern: Some(ReadConcernLevel::Majority.into()),
3186                write_concern: Some(write_concern),
3187                repl_set_name: Some("foo".to_string()),
3188                heartbeat_freq: Some(Duration::from_millis(1000)),
3189                local_threshold: Some(Duration::from_millis(4000)),
3190                server_selection_timeout: Some(Duration::from_millis(2000)),
3191                original_uri: Some(uri.into()),
3192                ..Default::default()
3193            }
3194        );
3195    }
3196}
3197
3198/// Contains the options that can be used to create a new [`ClientSession`](crate::ClientSession).
3199#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)]
3200#[builder(field_defaults(default, setter(into)))]
3201#[serde(rename_all = "camelCase")]
3202#[non_exhaustive]
3203#[export_tokens]
3204pub struct SessionOptions {
3205    /// The default options to use for transactions started on this session.
3206    ///
3207    /// If these options are not specified, they will be inherited from the
3208    /// [`Client`](../struct.Client.html) associated with this session. They will not
3209    /// be inherited from the options specified
3210    /// on the [`Database`](../struct.Database.html) or [`Collection`](../struct.Collection.html)
3211    /// associated with the operations within the transaction.
3212    pub default_transaction_options: Option<TransactionOptions>,
3213
3214    /// If true, all operations performed in the context of this session
3215    /// will be [causally consistent](https://www.mongodb.com/docs/manual/core/causal-consistency-read-write-concerns/).
3216    ///
3217    /// Defaults to true if [`SessionOptions::snapshot`] is unspecified.
3218    pub causal_consistency: Option<bool>,
3219
3220    /// If true, all read operations performed using this client session will share the same
3221    /// snapshot.  Defaults to false.
3222    pub snapshot: Option<bool>,
3223
3224    /// The snapshot time to use for a snapshot session. This option can only be set if `snapshot`
3225    /// is set to true.
3226    pub snapshot_time: Option<Timestamp>,
3227}
3228
3229impl SessionOptions {
3230    pub(crate) fn validate(&self) -> Result<()> {
3231        if let (Some(causal_consistency), Some(snapshot)) = (self.causal_consistency, self.snapshot)
3232        {
3233            if causal_consistency && snapshot {
3234                return Err(Error::invalid_argument(
3235                    "snapshot and causal consistency are mutually exclusive",
3236                ));
3237            }
3238        }
3239        if self.snapshot_time.is_some() && self.snapshot != Some(true) {
3240            return Err(Error::invalid_argument(
3241                "cannot set `snapshot_time` without setting `snapshot` to true",
3242            ));
3243        }
3244        Ok(())
3245    }
3246}
3247
3248/// Contains the options that can be used for a transaction.
3249#[skip_serializing_none]
3250#[derive(Debug, Default, Serialize, Deserialize, TypedBuilder, Clone)]
3251#[builder(field_defaults(default, setter(into)))]
3252#[serde(rename_all = "camelCase")]
3253#[non_exhaustive]
3254#[export_tokens]
3255pub struct TransactionOptions {
3256    /// The read concern to use for the transaction.
3257    #[builder(default)]
3258    #[serde(skip_serializing)]
3259    pub read_concern: Option<ReadConcern>,
3260
3261    /// The write concern to use when committing or aborting a transaction.
3262    #[builder(default)]
3263    #[serde(skip_serializing_if = "write_concern_is_empty")]
3264    pub write_concern: Option<WriteConcern>,
3265
3266    /// The selection criteria to use for all read operations in a transaction.
3267    #[builder(default)]
3268    #[serde(skip_serializing, rename = "readPreference")]
3269    pub selection_criteria: Option<SelectionCriteria>,
3270
3271    /// The maximum amount of time to allow a single commitTransaction to run.
3272    #[builder(default)]
3273    #[serde(
3274        serialize_with = "serde_util::serialize_duration_option_as_int_millis",
3275        deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis",
3276        rename(serialize = "maxTimeMS", deserialize = "maxCommitTimeMS"),
3277        default
3278    )]
3279    pub max_commit_time: Option<Duration>,
3280}
3281
3282/// Which server monitoring protocol to use.
3283#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
3284#[non_exhaustive]
3285pub enum ServerMonitoringMode {
3286    /// The client will use the streaming protocol when the server supports it and fall back to the
3287    /// polling protocol otherwise.
3288    Stream,
3289    /// The client will use the polling protocol.
3290    Poll,
3291    /// The client will use the polling protocol when running on a FaaS platform and behave the
3292    /// same as `Stream` otherwise.
3293    Auto,
3294}