1#[cfg(test)]
2mod test;
3
4mod bulk_write;
5mod parse;
6mod resolver_config;
7
8use std::{
9 cmp::Ordering,
10 collections::{HashMap, HashSet},
11 convert::TryFrom,
12 fmt::{self, Display, Formatter, Write},
13 hash::{Hash, Hasher},
14 net::{Ipv4Addr, Ipv6Addr, SocketAddr},
15 path::PathBuf,
16 str::FromStr,
17 time::Duration,
18};
19
20use derive_where::derive_where;
21use macro_magic::export_tokens;
22use serde::{de::Unexpected, Deserialize, Deserializer, Serialize, Serializer};
23use serde_with::skip_serializing_none;
24use std::sync::LazyLock;
25use strsim::jaro_winkler;
26use typed_builder::TypedBuilder;
27
28#[cfg(any(
29 feature = "zstd-compression",
30 feature = "zlib-compression",
31 feature = "snappy-compression"
32))]
33use crate::options::Compressor;
34#[cfg(test)]
35use crate::srv::LookupHosts;
36use crate::{
37 bson::{doc, Bson, Document, Timestamp, UuidRepresentation},
38 client::auth::{AuthMechanism, Credential},
39 concern::{Acknowledgment, ReadConcern, WriteConcern},
40 error::{Error, ErrorKind, Result},
41 event::EventHandler,
42 options::ReadConcernLevel,
43 sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY},
44 selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
45 serde_util::{self, write_concern_is_empty},
46 srv::{OriginalSrvInfo, SrvResolver},
47};
48
49pub use bulk_write::*;
50#[cfg(feature = "dns-resolver")]
51pub use resolver_config::ResolverConfig;
52#[cfg(not(feature = "dns-resolver"))]
53pub(crate) use resolver_config::ResolverConfig;
54
55pub(crate) const DEFAULT_PORT: u16 = 27017;
56
57const TLS_INSECURE: &str = "tlsinsecure";
58const TLS_ALLOW_INVALID_CERTIFICATES: &str = "tlsallowinvalidcertificates";
59#[cfg(feature = "openssl-tls")]
60const TLS_ALLOW_INVALID_HOSTNAMES: &str = "tlsallowinvalidhostnames";
61const PROXY_HOST: &str = "proxyhost";
62const PROXY_PORT: &str = "proxyport";
63const PROXY_USERNAME: &str = "proxyusername";
64const PROXY_PASSWORD: &str = "proxypassword";
65const URI_OPTIONS: &[&str] = &[
66 "appname",
67 "authmechanism",
68 "authsource",
69 "authmechanismproperties",
70 "compressors",
71 "connecttimeoutms",
72 "directconnection",
73 "heartbeatfrequencyms",
74 "journal",
75 "localthresholdms",
76 "maxidletimems",
77 "maxstalenessseconds",
78 "maxpoolsize",
79 "minpoolsize",
80 "maxconnecting",
81 PROXY_HOST,
82 PROXY_PORT,
83 PROXY_USERNAME,
84 PROXY_PASSWORD,
85 "readconcernlevel",
86 "readpreference",
87 "readpreferencetags",
88 "replicaset",
89 "retrywrites",
90 "retryreads",
91 "servermonitoringmode",
92 "serverselectiontimeoutms",
93 "sockettimeoutms",
94 "tls",
95 "ssl",
96 TLS_INSECURE,
97 TLS_ALLOW_INVALID_CERTIFICATES,
98 "tlscafile",
99 "tlscertificatekeyfile",
100 "uuidRepresentation",
101 "w",
102 "waitqueuetimeoutms",
103 "wtimeoutms",
104 "zlibcompressionlevel",
105 "srvservicename",
106];
107
108static USERINFO_RESERVED_CHARACTERS: LazyLock<HashSet<&'static char>> =
112 LazyLock::new(|| [':', '/', '?', '#', '[', ']', '@'].iter().collect());
113
114static ILLEGAL_DATABASE_CHARACTERS: LazyLock<HashSet<&'static char>> =
115 LazyLock::new(|| ['/', '\\', ' ', '"', '$'].iter().collect());
116
117#[derive(Clone, Debug, Eq, Serialize)]
119#[non_exhaustive]
120pub enum ServerAddress {
121 Tcp {
123 host: String,
125
126 port: Option<u16>,
130 },
131 #[cfg(unix)]
133 Unix {
134 path: PathBuf,
136 },
137}
138
139impl From<SocketAddr> for ServerAddress {
140 fn from(item: SocketAddr) -> Self {
141 ServerAddress::Tcp {
142 host: item.ip().to_string(),
143 port: Some(item.port()),
144 }
145 }
146}
147
148impl<'de> Deserialize<'de> for ServerAddress {
149 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
150 where
151 D: Deserializer<'de>,
152 {
153 #[derive(Deserialize)]
154 #[serde(untagged)]
155 enum ServerAddressHelper {
156 String(String),
157 Object { host: String, port: Option<u16> },
158 }
159
160 let helper = ServerAddressHelper::deserialize(deserializer)?;
161 match helper {
162 ServerAddressHelper::String(string) => {
163 Self::parse(string).map_err(serde::de::Error::custom)
164 }
165 ServerAddressHelper::Object { host, port } => {
166 #[cfg(unix)]
167 if host.ends_with("sock") {
168 return Ok(Self::Unix {
169 path: PathBuf::from(host),
170 });
171 }
172
173 Ok(Self::Tcp { host, port })
174 }
175 }
176 }
177}
178
179impl Default for ServerAddress {
180 fn default() -> Self {
181 Self::Tcp {
182 host: "localhost".into(),
183 port: None,
184 }
185 }
186}
187
188impl PartialEq for ServerAddress {
189 fn eq(&self, other: &Self) -> bool {
190 match (self, other) {
191 (
192 Self::Tcp { host, port },
193 Self::Tcp {
194 host: other_host,
195 port: other_port,
196 },
197 ) => host == other_host && port.unwrap_or(27017) == other_port.unwrap_or(27017),
198 #[cfg(unix)]
199 (Self::Unix { path }, Self::Unix { path: other_path }) => path == other_path,
200 #[cfg(unix)]
201 _ => false,
202 }
203 }
204}
205
206impl Hash for ServerAddress {
207 fn hash<H>(&self, state: &mut H)
208 where
209 H: Hasher,
210 {
211 match self {
212 Self::Tcp { host, port } => {
213 host.hash(state);
214 port.unwrap_or(27017).hash(state);
215 }
216 #[cfg(unix)]
217 Self::Unix { path } => path.hash(state),
218 }
219 }
220}
221
222impl FromStr for ServerAddress {
223 type Err = Error;
224 fn from_str(address: &str) -> Result<Self> {
225 ServerAddress::parse(address)
226 }
227}
228
229impl ServerAddress {
230 pub fn parse(address: impl AsRef<str>) -> Result<Self> {
232 let address = address.as_ref();
233
234 if address.ends_with(".sock") {
235 #[cfg(unix)]
236 {
237 let address = percent_decode(address, "unix domain sockets must be URL-encoded")?;
238 return Ok(Self::Unix {
239 path: PathBuf::from(address),
240 });
241 }
242 #[cfg(not(unix))]
243 return Err(ErrorKind::InvalidArgument {
244 message: "unix domain sockets are not supported on this platform".to_string(),
245 }
246 .into());
247 }
248
249 let (hostname, port) = if let Some(ip_literal) = address.strip_prefix("[") {
250 let Some((hostname, port)) = ip_literal.split_once("]") else {
251 return Err(ErrorKind::InvalidArgument {
252 message: format!(
253 "invalid server address {address}: missing closing ']' in IP literal \
254 hostname"
255 ),
256 }
257 .into());
258 };
259
260 if let Err(parse_error) = Ipv6Addr::from_str(hostname) {
261 return Err(ErrorKind::InvalidArgument {
262 message: format!("invalid server address {address}: {parse_error}"),
263 }
264 .into());
265 }
266
267 let port = if port.is_empty() {
268 None
269 } else if let Some(port) = port.strip_prefix(":") {
270 Some(port)
271 } else {
272 return Err(ErrorKind::InvalidArgument {
273 message: format!(
274 "invalid server address {address}: the hostname can only be followed by a \
275 port prefixed with ':', got {port}"
276 ),
277 }
278 .into());
279 };
280
281 (hostname, port)
282 } else {
283 match address.split_once(":") {
284 Some((hostname, port)) => (hostname, Some(port)),
285 None => (address, None),
286 }
287 };
288
289 if hostname.is_empty() {
290 return Err(ErrorKind::InvalidArgument {
291 message: format!("invalid server address {address}: the hostname cannot be empty"),
292 }
293 .into());
294 }
295
296 let normalized_hostname = if let Ok(v4) = hostname.parse::<Ipv4Addr>() {
297 v4.to_string()
298 } else if let Ok(v6) = hostname.parse::<Ipv6Addr>() {
299 v6.to_string()
300 } else {
301 hostname.to_lowercase()
302 };
303
304 let port = if let Some(port) = port {
305 match u16::from_str(port) {
306 Ok(0) | Err(_) => {
307 return Err(ErrorKind::InvalidArgument {
308 message: format!(
309 "invalid server address {address}: the port must be an integer \
310 between 1 and 65535, got {port}"
311 ),
312 }
313 .into())
314 }
315 Ok(port) => Some(port),
316 }
317 } else {
318 None
319 };
320
321 Ok(Self::Tcp {
322 host: normalized_hostname,
323 port,
324 })
325 }
326
327 #[cfg(feature = "dns-resolver")]
328 pub(crate) fn host(&self) -> std::borrow::Cow<'_, str> {
329 match self {
330 Self::Tcp { host, .. } => std::borrow::Cow::Borrowed(host.as_str()),
331 #[cfg(unix)]
332 Self::Unix { path } => path.to_string_lossy(),
333 }
334 }
335}
336
337impl fmt::Display for ServerAddress {
338 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
339 match self {
340 Self::Tcp { host, port } => {
341 write!(fmt, "{}:{}", host, port.unwrap_or(DEFAULT_PORT))
342 }
343 #[cfg(unix)]
344 Self::Unix { path } => write!(fmt, "{}", path.display()),
345 }
346 }
347}
348
349#[derive(Clone, Debug, PartialEq, Serialize)]
351#[non_exhaustive]
352pub enum ServerApiVersion {
353 #[serde(rename = "1")]
355 V1,
356}
357
358impl FromStr for ServerApiVersion {
359 type Err = Error;
360
361 fn from_str(str: &str) -> Result<Self> {
362 match str {
363 "1" => Ok(Self::V1),
364 _ => Err(ErrorKind::InvalidArgument {
365 message: format!("invalid server api version string: {str}"),
366 }
367 .into()),
368 }
369 }
370}
371
372impl Display for ServerApiVersion {
373 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
374 match self {
375 Self::V1 => write!(f, "1"),
376 }
377 }
378}
379
380impl<'de> Deserialize<'de> for ServerApiVersion {
381 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
382 where
383 D: Deserializer<'de>,
384 {
385 let s = String::deserialize(deserializer)?;
386
387 ServerApiVersion::from_str(&s).map_err(|_| {
388 serde::de::Error::invalid_value(Unexpected::Str(&s), &"a valid version number")
389 })
390 }
391}
392
393#[serde_with::skip_serializing_none]
396#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, TypedBuilder)]
397#[builder(field_defaults(default, setter(into)))]
398#[non_exhaustive]
399pub struct ServerApi {
400 #[serde(rename = "apiVersion")]
402 #[builder(!default)]
403 pub version: ServerApiVersion,
404
405 #[serde(rename = "apiStrict")]
408 pub strict: Option<bool>,
409
410 #[serde(rename = "apiDeprecationErrors")]
414 pub deprecation_errors: Option<bool>,
415}
416
417#[cfg(feature = "socks5-proxy")]
419#[derive(Clone, Debug, Deserialize, PartialEq, TypedBuilder)]
420#[builder(field_defaults(default, setter(into)))]
421#[non_exhaustive]
422pub struct Socks5Proxy {
423 #[builder(!default)]
425 pub host: String,
426
427 pub port: Option<u16>,
429
430 pub authentication: Option<(String, String)>,
432}
433
434#[cfg(not(feature = "socks5-proxy"))]
436#[derive(Clone, Debug)]
437pub(crate) struct Socks5Proxy;
438
439#[cfg(feature = "socks5-proxy")]
440impl Socks5Proxy {
441 fn serialize<S>(
442 proxy: &Option<Socks5Proxy>,
443 serializer: S,
444 ) -> std::result::Result<S::Ok, S::Error>
445 where
446 S: serde::Serializer,
447 {
448 #[derive(Serialize)]
449 #[serde(rename_all = "camelCase")]
450 struct Helper<'a> {
451 proxy_host: &'a String,
452 proxy_port: Option<u16>,
453 proxy_username: Option<&'a String>,
454 proxy_password: Option<&'a String>,
455 }
456
457 if let Some(proxy) = proxy.as_ref() {
458 let helper = Helper {
459 proxy_host: &proxy.host,
460 proxy_port: proxy.port,
461 proxy_username: proxy.authentication.as_ref().map(|auth| &auth.0),
462 proxy_password: proxy.authentication.as_ref().map(|auth| &auth.1),
463 };
464 helper.serialize(serializer)
465 } else {
466 serializer.serialize_none()
467 }
468 }
469}
470
471#[derive(Clone, Deserialize, TypedBuilder)]
473#[builder(field_defaults(default, setter(into)))]
474#[derive_where(Debug, PartialEq)]
475#[serde(rename_all = "camelCase")]
476#[non_exhaustive]
477pub struct ClientOptions {
478 #[builder(default_code = "vec![ServerAddress::Tcp {
484 host: \"localhost\".to_string(),
485 port: Some(27017),
486 }]")]
487 #[serde(default = "default_hosts")]
488 pub hosts: Vec<ServerAddress>,
489
490 pub app_name: Option<String>,
494
495 #[cfg(any(
500 feature = "zstd-compression",
501 feature = "zlib-compression",
502 feature = "snappy-compression"
503 ))]
504 #[serde(skip)]
505 pub compressors: Option<Vec<Compressor>>,
506
507 #[derive_where(skip)]
509 #[builder(setter(strip_option))]
510 #[serde(skip)]
511 pub cmap_event_handler: Option<EventHandler<crate::event::cmap::CmapEvent>>,
512
513 #[derive_where(skip)]
517 #[builder(setter(strip_option))]
518 #[serde(skip)]
519 pub command_event_handler: Option<EventHandler<crate::event::command::CommandEvent>>,
520
521 pub connect_timeout: Option<Duration>,
526
527 pub credential: Option<Credential>,
529
530 pub direct_connection: Option<bool>,
535
536 pub driver_info: Option<DriverInfo>,
539
540 pub heartbeat_freq: Option<Duration>,
544
545 #[builder(setter(skip))]
547 #[serde(rename = "loadbalanced")]
548 pub load_balanced: Option<bool>,
549
550 pub local_threshold: Option<Duration>,
562
563 pub max_idle_time: Option<Duration>,
568
569 pub max_pool_size: Option<u32>,
576
577 pub min_pool_size: Option<u32>,
583
584 pub max_connecting: Option<u32>,
588
589 pub read_concern: Option<ReadConcern>,
592
593 pub repl_set_name: Option<String>,
595
596 pub retry_reads: Option<bool>,
600
601 pub retry_writes: Option<bool>,
605
606 pub server_monitoring_mode: Option<ServerMonitoringMode>,
610
611 #[derive_where(skip)]
613 #[builder(setter(strip_option))]
614 #[serde(skip)]
615 pub sdam_event_handler: Option<EventHandler<crate::event::sdam::SdamEvent>>,
616
617 pub selection_criteria: Option<SelectionCriteria>,
620
621 pub server_api: Option<ServerApi>,
633
634 pub server_selection_timeout: Option<Duration>,
639
640 pub default_database: Option<String>,
644
645 pub srv_service_name: Option<String>,
647
648 pub tls: Option<Tls>,
652
653 #[cfg(feature = "tracing-unstable")]
663 pub tracing_max_document_length_bytes: Option<usize>,
664
665 pub write_concern: Option<WriteConcern>,
668
669 pub srv_max_hosts: Option<u32>,
671
672 #[cfg(feature = "opentelemetry")]
674 pub tracing: Option<crate::otel::OpentelemetryOptions>,
675
676 #[cfg(feature = "socks5-proxy")]
678 pub socks5_proxy: Option<Socks5Proxy>,
679
680 #[builder(setter(skip))]
682 #[serde(skip)]
683 #[derive_where(skip(Debug))]
684 pub(crate) original_srv_info: Option<OriginalSrvInfo>,
685
686 #[cfg(test)]
687 #[builder(setter(skip))]
688 #[derive_where(skip(Debug))]
689 pub(crate) original_uri: Option<String>,
690
691 #[builder(setter(skip))]
697 #[serde(skip)]
698 #[derive_where(skip(Debug))]
699 #[cfg(feature = "dns-resolver")]
700 pub(crate) resolver_config: Option<ResolverConfig>,
701
702 #[cfg(test)]
704 #[builder(setter(skip))]
705 #[serde(skip)]
706 #[derive_where(skip)]
707 pub(crate) test_options: Option<TestOptions>,
708}
709
710#[cfg(test)]
711#[derive(Debug, Clone, Default)]
712pub(crate) struct TestOptions {
713 pub(crate) min_heartbeat_freq: Option<Duration>,
715
716 pub(crate) disable_monitoring_threads: bool,
718
719 pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,
721
722 pub(crate) async_event_listener: Option<TestEventSender>,
724
725 pub(crate) hello_cb: Option<EventHandler<crate::cmap::Command>>,
727}
728
729pub(crate) type TestEventSender = tokio::sync::mpsc::Sender<
730 crate::runtime::AcknowledgedMessage<crate::event::command::CommandEvent>,
731>;
732
733fn default_hosts() -> Vec<ServerAddress> {
734 vec![ServerAddress::default()]
735}
736
737impl Default for ClientOptions {
738 fn default() -> Self {
739 Self::builder().build()
740 }
741}
742
743#[cfg(test)]
744impl Serialize for ClientOptions {
745 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
746 where
747 S: serde::Serializer,
748 {
749 #[derive(Serialize)]
750 struct ClientOptionsHelper<'a> {
751 appname: &'a Option<String>,
752
753 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
754 connecttimeoutms: &'a Option<Duration>,
755
756 #[serde(flatten, serialize_with = "Credential::serialize")]
757 credential: &'a Option<Credential>,
758
759 directconnection: &'a Option<bool>,
760
761 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
762 heartbeatfrequencyms: &'a Option<Duration>,
763
764 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
765 localthresholdms: &'a Option<Duration>,
766
767 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
768 maxidletimems: &'a Option<Duration>,
769
770 maxpoolsize: &'a Option<u32>,
771
772 minpoolsize: &'a Option<u32>,
773
774 maxconnecting: &'a Option<u32>,
775
776 #[serde(flatten, serialize_with = "ReadConcern::serialize")]
777 readconcern: &'a Option<ReadConcern>,
778
779 replicaset: &'a Option<String>,
780
781 retryreads: &'a Option<bool>,
782
783 retrywrites: &'a Option<bool>,
784
785 servermonitoringmode: Option<String>,
786
787 #[serde(
788 flatten,
789 serialize_with = "SelectionCriteria::serialize_for_client_options"
790 )]
791 selectioncriteria: &'a Option<SelectionCriteria>,
792
793 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
794 serverselectiontimeoutms: &'a Option<Duration>,
795
796 #[serde(flatten, serialize_with = "Tls::serialize")]
797 tls: &'a Option<Tls>,
798
799 #[serde(flatten, serialize_with = "WriteConcern::serialize")]
800 writeconcern: &'a Option<WriteConcern>,
801
802 zlibcompressionlevel: &'a Option<i32>,
803
804 loadbalanced: &'a Option<bool>,
805
806 srvmaxhosts: Option<i32>,
807
808 srvservicename: &'a Option<String>,
809
810 #[cfg(feature = "socks5-proxy")]
811 #[serde(flatten, serialize_with = "Socks5Proxy::serialize")]
812 socks5proxy: &'a Option<Socks5Proxy>,
813 }
814
815 let client_options = ClientOptionsHelper {
816 appname: &self.app_name,
817 connecttimeoutms: &self.connect_timeout,
818 credential: &self.credential,
819 directconnection: &self.direct_connection,
820 heartbeatfrequencyms: &self.heartbeat_freq,
821 localthresholdms: &self.local_threshold,
822 maxidletimems: &self.max_idle_time,
823 maxpoolsize: &self.max_pool_size,
824 minpoolsize: &self.min_pool_size,
825 maxconnecting: &self.max_connecting,
826 readconcern: &self.read_concern,
827 replicaset: &self.repl_set_name,
828 retryreads: &self.retry_reads,
829 retrywrites: &self.retry_writes,
830 servermonitoringmode: self
831 .server_monitoring_mode
832 .as_ref()
833 .map(|m| format!("{m:?}").to_lowercase()),
834 selectioncriteria: &self.selection_criteria,
835 serverselectiontimeoutms: &self.server_selection_timeout,
836 tls: &self.tls,
837 writeconcern: &self.write_concern,
838 loadbalanced: &self.load_balanced,
839 zlibcompressionlevel: &None,
840 srvmaxhosts: self
841 .srv_max_hosts
842 .map(|v| v.try_into())
843 .transpose()
844 .map_err(serde::ser::Error::custom)?,
845 srvservicename: &self.srv_service_name,
846 #[cfg(feature = "socks5-proxy")]
847 socks5proxy: &self.socks5_proxy,
848 };
849
850 client_options.serialize(serializer)
851 }
852}
853
854fn serialize_uuid_rep_option<S>(
856 value: &Option<UuidRepresentation>,
857 serializer: S,
858) -> std::result::Result<S::Ok, S::Error>
859where
860 S: Serializer,
861{
862 #[non_exhaustive]
863 #[derive(Serialize)]
864 #[serde(remote = "UuidRepresentation")]
865 enum UuidRepresentationForSerialize {
866 Standard,
867 CSharpLegacy,
868 JavaLegacy,
869 PythonLegacy,
870 }
871 match value {
872 Some(rep) => UuidRepresentationForSerialize::serialize(rep, serializer),
873 None => serializer.serialize_none(),
874 }
875}
876
877#[skip_serializing_none]
881#[derive(Clone, Debug, Default, PartialEq, Serialize)]
882#[serde(rename_all = "camelCase")]
883#[non_exhaustive]
884pub struct ConnectionString {
885 pub host_info: HostInfo,
892
893 pub app_name: Option<String>,
897
898 #[serde(serialize_with = "Tls::serialize")]
902 pub tls: Option<Tls>,
903
904 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
908 pub heartbeat_frequency: Option<Duration>,
909
910 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
922 pub local_threshold: Option<Duration>,
923
924 #[serde(serialize_with = "ReadConcern::serialize")]
927 pub read_concern: Option<ReadConcern>,
928
929 pub replica_set: Option<String>,
931
932 #[serde(serialize_with = "WriteConcern::serialize")]
935 pub write_concern: Option<WriteConcern>,
936
937 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
942 pub server_selection_timeout: Option<Duration>,
943
944 pub max_pool_size: Option<u32>,
951
952 pub min_pool_size: Option<u32>,
958
959 pub max_connecting: Option<u32>,
963
964 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
969 pub max_idle_time: Option<Duration>,
970
971 #[cfg(any(
972 feature = "zstd-compression",
973 feature = "zlib-compression",
974 feature = "snappy-compression"
975 ))]
976 pub compressors: Option<Vec<Compressor>>,
981
982 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
987 pub connect_timeout: Option<Duration>,
988
989 pub retry_reads: Option<bool>,
993
994 pub retry_writes: Option<bool>,
998
999 pub server_monitoring_mode: Option<ServerMonitoringMode>,
1003
1004 pub direct_connection: Option<bool>,
1009
1010 #[serde(serialize_with = "Credential::serialize")]
1012 pub credential: Option<Credential>,
1013
1014 pub default_database: Option<String>,
1018
1019 pub load_balanced: Option<bool>,
1021
1022 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
1025 #[deprecated(
1026 since = "3.5.0",
1027 note = "the Rust driver does not support socketTimeoutMS"
1028 )]
1029 pub socket_timeout: Option<Duration>,
1030
1031 pub read_preference: Option<ReadPreference>,
1033
1034 #[serde(serialize_with = "serialize_uuid_rep_option")]
1039 pub uuid_representation: Option<UuidRepresentation>,
1040
1041 pub srv_max_hosts: Option<u32>,
1043
1044 pub srv_service_name: Option<String>,
1046
1047 #[cfg(feature = "socks5-proxy")]
1049 #[serde(serialize_with = "Socks5Proxy::serialize")]
1050 pub socks5_proxy: Option<Socks5Proxy>,
1051
1052 #[serde(serialize_with = "serde_util::serialize_duration_option_as_int_millis")]
1053 wait_queue_timeout: Option<Duration>,
1054 tls_insecure: Option<bool>,
1055
1056 #[cfg(test)]
1057 #[serde(skip_serializing)]
1058 original_uri: String,
1059}
1060
1061#[derive(Debug, Default)]
1063struct ConnectionStringParts {
1064 read_preference_tags: Option<Vec<TagSet>>,
1065 max_staleness: Option<Duration>,
1066 auth_mechanism: Option<AuthMechanism>,
1067 auth_mechanism_properties: Option<Document>,
1068 zlib_compression: Option<i32>,
1069 auth_source: Option<String>,
1070 #[cfg(feature = "socks5-proxy")]
1071 proxy_host: Option<String>,
1072 #[cfg(feature = "socks5-proxy")]
1073 proxy_port: Option<u16>,
1074 #[cfg(feature = "socks5-proxy")]
1075 proxy_username: Option<String>,
1076 #[cfg(feature = "socks5-proxy")]
1077 proxy_password: Option<String>,
1078}
1079
1080#[derive(Debug, PartialEq, Clone, Serialize)]
1082#[non_exhaustive]
1083pub enum HostInfo {
1084 HostIdentifiers(Vec<ServerAddress>),
1086 DnsRecord(String),
1088}
1089
1090impl Default for HostInfo {
1091 fn default() -> Self {
1092 Self::HostIdentifiers(vec![])
1093 }
1094}
1095
1096impl HostInfo {
1097 async fn resolve(
1098 self,
1099 resolver_config: Option<ResolverConfig>,
1100 srv_service_name: Option<String>,
1101 ) -> Result<ResolvedHostInfo> {
1102 Ok(match self {
1103 Self::HostIdentifiers(hosts) => ResolvedHostInfo::HostIdentifiers(hosts),
1104 Self::DnsRecord(hostname) => {
1105 let mut resolver =
1106 SrvResolver::new(resolver_config.clone(), srv_service_name).await?;
1107 let config = resolver.resolve_client_options(&hostname).await?;
1108 ResolvedHostInfo::DnsRecord { hostname, config }
1109 }
1110 })
1111 }
1112}
1113
1114enum ResolvedHostInfo {
1115 HostIdentifiers(Vec<ServerAddress>),
1116 DnsRecord {
1117 hostname: String,
1118 config: crate::srv::ResolvedConfig,
1119 },
1120}
1121
1122#[derive(Clone, Debug, Deserialize, PartialEq)]
1125pub enum Tls {
1126 Enabled(TlsOptions),
1128
1129 Disabled,
1131}
1132
1133impl From<TlsOptions> for Tls {
1134 fn from(options: TlsOptions) -> Self {
1135 Self::Enabled(options)
1136 }
1137}
1138
1139impl From<TlsOptions> for Option<Tls> {
1140 fn from(options: TlsOptions) -> Self {
1141 Some(Tls::Enabled(options))
1142 }
1143}
1144
1145impl Tls {
1146 pub(crate) fn serialize<S>(
1147 tls: &Option<Tls>,
1148 serializer: S,
1149 ) -> std::result::Result<S::Ok, S::Error>
1150 where
1151 S: serde::Serializer,
1152 {
1153 match tls {
1154 Some(Tls::Enabled(tls_options)) => TlsOptions::serialize(tls_options, serializer),
1155 _ => serializer.serialize_none(),
1156 }
1157 }
1158}
1159
1160#[derive(Clone, Debug, Default, Deserialize, PartialEq, TypedBuilder)]
1162#[builder(field_defaults(default, setter(into)))]
1163#[non_exhaustive]
1164pub struct TlsOptions {
1165 pub allow_invalid_certificates: Option<bool>,
1171
1172 pub ca_file_path: Option<PathBuf>,
1176
1177 pub cert_key_file_path: Option<PathBuf>,
1182
1183 #[cfg(feature = "openssl-tls")]
1188 pub allow_invalid_hostnames: Option<bool>,
1189
1190 #[cfg(feature = "cert-key-password")]
1192 pub tls_certificate_key_file_password: Option<Vec<u8>>,
1193}
1194
1195impl TlsOptions {
1196 pub(crate) fn serialize<S>(
1197 tls_options: &TlsOptions,
1198 serializer: S,
1199 ) -> std::result::Result<S::Ok, S::Error>
1200 where
1201 S: serde::Serializer,
1202 {
1203 #[derive(Serialize)]
1204 struct TlsOptionsHelper<'a> {
1205 tls: bool,
1206 tlscafile: Option<&'a str>,
1207 tlscertificatekeyfile: Option<&'a str>,
1208 tlsallowinvalidcertificates: Option<bool>,
1209 #[cfg(feature = "cert-key-password")]
1210 tlscertificatekeyfilepassword: Option<&'a str>,
1211 }
1212
1213 let state = TlsOptionsHelper {
1214 tls: true,
1215 tlscafile: tls_options
1216 .ca_file_path
1217 .as_ref()
1218 .map(|s| s.to_str().unwrap()),
1219 tlscertificatekeyfile: tls_options
1220 .cert_key_file_path
1221 .as_ref()
1222 .map(|s| s.to_str().unwrap()),
1223 tlsallowinvalidcertificates: tls_options.allow_invalid_certificates,
1224 #[cfg(feature = "cert-key-password")]
1225 tlscertificatekeyfilepassword: tls_options
1226 .tls_certificate_key_file_password
1227 .as_deref()
1228 .map(|b| std::str::from_utf8(b).unwrap()),
1229 };
1230 state.serialize(serializer)
1231 }
1232}
1233
1234#[derive(Clone, Debug, Deserialize, TypedBuilder, Eq)]
1237#[builder(field_defaults(default, setter(into)))]
1238#[non_exhaustive]
1239pub struct DriverInfo {
1240 #[builder(!default)]
1242 pub name: String,
1243
1244 pub version: Option<String>,
1246
1247 pub platform: Option<String>,
1249}
1250
1251impl DriverInfo {
1252 pub(crate) fn spec_version(&self) -> &str {
1253 self.version.as_deref().unwrap_or("")
1254 }
1255
1256 pub(crate) fn spec_platform(&self) -> &str {
1257 self.platform.as_deref().unwrap_or("")
1258 }
1259}
1260
1261impl PartialEq for DriverInfo {
1262 fn eq(&self, other: &Self) -> bool {
1263 self.name == other.name
1264 && self.spec_version() == other.spec_version()
1265 && self.spec_platform() == other.spec_platform()
1266 }
1267}
1268
1269impl Hash for DriverInfo {
1270 fn hash<H: Hasher>(&self, state: &mut H) {
1271 self.name.hash(state);
1272 self.spec_version().hash(state);
1273 self.spec_platform().hash(state);
1274 }
1275}
1276
1277impl ClientOptions {
1278 #[cfg(test)]
1281 pub(crate) fn new_srv() -> Self {
1282 Self {
1283 original_srv_info: Some(OriginalSrvInfo {
1284 hostname: "localhost.test.test.build.10gen.cc".into(),
1285 min_ttl: Duration::from_secs(60),
1286 }),
1287 ..Default::default()
1288 }
1289 }
1290
1291 pub(crate) fn tls_options(&self) -> Option<TlsOptions> {
1292 match self.tls {
1293 Some(Tls::Enabled(ref opts)) => Some(opts.clone()),
1294 _ => None,
1295 }
1296 }
1297
1298 pub(crate) fn validate(&self) -> Result<()> {
1300 if let Some(true) = self.direct_connection {
1301 if self.hosts.len() > 1 {
1302 return Err(ErrorKind::InvalidArgument {
1303 message: "cannot specify multiple seeds with directConnection=true".to_string(),
1304 }
1305 .into());
1306 }
1307 }
1308
1309 if let Some(ref write_concern) = self.write_concern {
1310 write_concern.validate()?;
1311 }
1312
1313 if self.load_balanced.unwrap_or(false) {
1314 if self.hosts.len() > 1 {
1315 return Err(ErrorKind::InvalidArgument {
1316 message: "cannot specify multiple seeds with loadBalanced=true".to_string(),
1317 }
1318 .into());
1319 }
1320 if self.repl_set_name.is_some() {
1321 return Err(ErrorKind::InvalidArgument {
1322 message: "cannot specify replicaSet with loadBalanced=true".to_string(),
1323 }
1324 .into());
1325 }
1326 if self.direct_connection == Some(true) {
1327 return Err(ErrorKind::InvalidArgument {
1328 message: "cannot specify directConnection=true with loadBalanced=true"
1329 .to_string(),
1330 }
1331 .into());
1332 }
1333 }
1334
1335 #[cfg(any(
1336 feature = "zstd-compression",
1337 feature = "zlib-compression",
1338 feature = "snappy-compression"
1339 ))]
1340 if let Some(ref compressors) = self.compressors {
1341 for compressor in compressors {
1342 compressor.validate()?;
1343 }
1344 }
1345
1346 if let Some(0) = self.max_pool_size {
1347 return Err(Error::invalid_argument("cannot specify maxPoolSize=0"));
1348 }
1349
1350 if let Some(0) = self.max_connecting {
1351 return Err(Error::invalid_argument("cannot specify maxConnecting=0"));
1352 }
1353
1354 if let Some(SelectionCriteria::ReadPreference(ref rp)) = self.selection_criteria {
1355 if let Some(max_staleness) = rp.max_staleness() {
1356 verify_max_staleness(
1357 max_staleness,
1358 self.heartbeat_freq.unwrap_or(DEFAULT_HEARTBEAT_FREQUENCY),
1359 )?;
1360 }
1361 }
1362
1363 if let Some(heartbeat_frequency) = self.heartbeat_freq {
1364 if heartbeat_frequency < self.min_heartbeat_frequency() {
1365 return Err(ErrorKind::InvalidArgument {
1366 message: format!(
1367 "'heartbeat_freq' must be at least {}ms, but {}ms was given",
1368 self.min_heartbeat_frequency().as_millis(),
1369 heartbeat_frequency.as_millis()
1370 ),
1371 }
1372 .into());
1373 }
1374 }
1375
1376 #[cfg(feature = "tracing-unstable")]
1377 {
1378 let hostnames = if let Some(info) = &self.original_srv_info {
1379 vec![info.hostname.to_ascii_lowercase()]
1380 } else {
1381 self.hosts
1382 .iter()
1383 .filter_map(|addr| match addr {
1384 ServerAddress::Tcp { host, .. } => Some(host.to_ascii_lowercase()),
1385 #[cfg(unix)]
1386 _ => None,
1387 })
1388 .collect()
1389 };
1390 if hostnames.iter().any(|s| s.ends_with(".cosmos.azure.com")) {
1391 tracing::info!("You appear to be connected to a CosmosDB cluster. For more information regarding feature compatibility and support please visit https://www.mongodb.com/supportability/cosmosdb");
1392 }
1393 if hostnames.iter().any(|s| {
1394 s.ends_with(".docdb.amazonaws.com") || s.ends_with(".docdb-elastic.amazonaws.com")
1395 }) {
1396 tracing::info!("You appear to be connected to a DocumentDB cluster. For more information regarding feature compatibility and support please visit https://www.mongodb.com/supportability/documentdb");
1397 }
1398 }
1399
1400 #[cfg(feature = "socks5-proxy")]
1401 {
1402 if let Some(proxy) = self.socks5_proxy.as_ref() {
1403 if self
1404 .hosts
1405 .iter()
1406 .any(|address| !matches!(address, ServerAddress::Tcp { .. }))
1407 {
1408 return Err(Error::invalid_argument(
1409 "cannot specify a non-TCP address when connected to a proxy host",
1410 ));
1411 }
1412
1413 if let Some((username, password)) = proxy.authentication.as_ref() {
1414 if username.is_empty() || password.is_empty() {
1415 return Err(Error::invalid_argument(
1416 "cannot specify an empty username or password for proxy host",
1417 ));
1418 }
1419 }
1420 }
1421 }
1422
1423 Ok(())
1424 }
1425
1426 #[cfg(test)]
1427 pub(crate) fn test_options_mut(&mut self) -> &mut TestOptions {
1428 self.test_options.get_or_insert_with(Default::default)
1429 }
1430
1431 pub(crate) fn min_heartbeat_frequency(&self) -> Duration {
1432 #[cfg(test)]
1433 {
1434 self.test_options
1435 .as_ref()
1436 .and_then(|to| to.min_heartbeat_freq)
1437 .unwrap_or(MIN_HEARTBEAT_FREQUENCY)
1438 }
1439
1440 #[cfg(not(test))]
1441 {
1442 MIN_HEARTBEAT_FREQUENCY
1443 }
1444 }
1445
1446 pub(crate) fn resolver_config(&self) -> Option<&ResolverConfig> {
1447 #[cfg(feature = "dns-resolver")]
1448 {
1449 self.resolver_config.as_ref()
1450 }
1451 #[cfg(not(feature = "dns-resolver"))]
1452 {
1453 None
1454 }
1455 }
1456}
1457
1458fn split_once_left<'a>(s: &'a str, delimiter: &str) -> (&'a str, Option<&'a str>) {
1466 match s.split_once(delimiter) {
1467 Some((l, r)) => (l, Some(r)),
1468 None => (s, None),
1469 }
1470}
1471
1472fn split_once_right<'a>(s: &'a str, delimiter: &str) -> (Option<&'a str>, &'a str) {
1480 match s.rsplit_once(delimiter) {
1481 Some((l, r)) => (Some(l), r),
1482 None => (None, s),
1483 }
1484}
1485
1486fn percent_decode(s: &str, err_message: &str) -> Result<String> {
1487 match percent_encoding::percent_decode_str(s).decode_utf8() {
1488 Ok(result) => Ok(result.to_string()),
1489 Err(_) => Err(ErrorKind::InvalidArgument {
1490 message: err_message.to_string(),
1491 }
1492 .into()),
1493 }
1494}
1495
1496fn percent_encode(s: &str) -> String {
1497 percent_encoding::utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).to_string()
1498}
1499
1500fn validate_and_parse_userinfo(s: &str, userinfo_type: &str) -> Result<String> {
1501 if s.chars().any(|c| USERINFO_RESERVED_CHARACTERS.contains(&c)) {
1502 return Err(Error::invalid_argument(format!(
1503 "{userinfo_type} must be URL encoded"
1504 )));
1505 }
1506
1507 if s.split('%')
1510 .skip(1)
1511 .any(|part| part.len() < 2 || part[0..2].chars().any(|c| !c.is_ascii_hexdigit()))
1512 {
1513 return Err(Error::invalid_argument(format!(
1514 "{userinfo_type} cannot contain unescaped %"
1515 )));
1516 }
1517
1518 percent_decode(s, &format!("{userinfo_type} must be URL encoded"))
1519}
1520
1521impl TryFrom<&str> for ConnectionString {
1522 type Error = Error;
1523
1524 fn try_from(value: &str) -> Result<Self> {
1525 Self::parse(value)
1526 }
1527}
1528
1529impl TryFrom<&String> for ConnectionString {
1530 type Error = Error;
1531
1532 fn try_from(value: &String) -> Result<Self> {
1533 Self::parse(value)
1534 }
1535}
1536
1537impl TryFrom<String> for ConnectionString {
1538 type Error = Error;
1539
1540 fn try_from(value: String) -> Result<Self> {
1541 Self::parse(value)
1542 }
1543}
1544
1545impl ConnectionString {
1546 pub fn parse(s: impl AsRef<str>) -> Result<Self> {
1549 let s = s.as_ref();
1550
1551 let Some((scheme, after_scheme)) = s.split_once("://") else {
1552 return Err(Error::invalid_argument(
1553 "connection string contains no scheme",
1554 ));
1555 };
1556
1557 let srv = match scheme {
1558 "mongodb" => false,
1559 #[cfg(feature = "dns-resolver")]
1560 "mongodb+srv" => true,
1561 #[cfg(not(feature = "dns-resolver"))]
1562 "mongodb+srv" => {
1563 return Err(Error::invalid_argument(
1564 "mongodb+srv connection strings cannot be used when the 'dns-resolver' \
1565 feature is disabled",
1566 ))
1567 }
1568 other => {
1569 return Err(Error::invalid_argument(format!(
1570 "unsupported connection string scheme: {other}"
1571 )))
1572 }
1573 };
1574
1575 let (pre_options, options) = split_once_left(after_scheme, "?");
1576 let (user_info, hosts_and_auth_db) = split_once_right(pre_options, "@");
1577
1578 let authentication_requested = user_info.is_some();
1580 let (username, password) = match user_info {
1581 Some(user_info) => {
1582 let (username, password) = split_once_left(user_info, ":");
1583 let username = if username.is_empty() {
1584 None
1585 } else {
1586 Some(validate_and_parse_userinfo(username, "username")?)
1587 };
1588 let password = match password {
1589 Some(password) => Some(validate_and_parse_userinfo(password, "password")?),
1590 None => None,
1591 };
1592 (username, password)
1593 }
1594 None => (None, None),
1595 };
1596
1597 let (hosts, auth_db) = split_once_left(hosts_and_auth_db, "/");
1598
1599 let hosts = hosts
1600 .split(",")
1601 .map(ServerAddress::parse)
1602 .collect::<Result<Vec<ServerAddress>>>()?;
1603 let host_info = if !srv {
1604 HostInfo::HostIdentifiers(hosts)
1605 } else {
1606 match &hosts[..] {
1607 [ServerAddress::Tcp { host, port: None }] => HostInfo::DnsRecord(host.clone()),
1608 [ServerAddress::Tcp {
1609 host: _,
1610 port: Some(_),
1611 }] => {
1612 return Err(Error::invalid_argument(
1613 "a port cannot be specified with 'mongodb+srv'",
1614 ));
1615 }
1616 #[cfg(unix)]
1617 [ServerAddress::Unix { .. }] => {
1618 return Err(Error::invalid_argument(
1619 "unix sockets cannot be used with 'mongodb+srv'",
1620 ));
1621 }
1622 _ => {
1623 return Err(Error::invalid_argument(
1624 "exactly one host must be specified with 'mongodb+srv'",
1625 ))
1626 }
1627 }
1628 };
1629
1630 let db = match auth_db {
1631 Some("") | None => None,
1632 Some(db) => {
1633 let decoded = percent_decode(db, "database name must be URL encoded")?;
1634 for c in decoded.chars() {
1635 if ILLEGAL_DATABASE_CHARACTERS.contains(&c) {
1636 return Err(Error::invalid_argument(format!(
1637 "illegal character in database name: {c}"
1638 )));
1639 }
1640 }
1641 Some(decoded)
1642 }
1643 };
1644
1645 let mut conn_str = ConnectionString {
1646 host_info,
1647 #[cfg(test)]
1648 original_uri: s.into(),
1649 ..Default::default()
1650 };
1651
1652 let mut parts = match options {
1653 Some(options) => conn_str.parse_options(options)?,
1654 None => ConnectionStringParts::default(),
1655 };
1656
1657 if conn_str.srv_service_name.is_some() && !srv {
1658 return Err(Error::invalid_argument(
1659 "srvServiceName cannot be specified with a non-SRV URI",
1660 ));
1661 }
1662
1663 if let Some(srv_max_hosts) = conn_str.srv_max_hosts {
1664 if !srv {
1665 return Err(Error::invalid_argument(
1666 "srvMaxHosts cannot be specified with a non-SRV URI",
1667 ));
1668 }
1669 if srv_max_hosts > 0 {
1670 if conn_str.replica_set.is_some() {
1671 return Err(Error::invalid_argument(
1672 "srvMaxHosts and replicaSet cannot both be present",
1673 ));
1674 }
1675 if conn_str.load_balanced == Some(true) {
1676 return Err(Error::invalid_argument(
1677 "srvMaxHosts and loadBalanced=true cannot both be present",
1678 ));
1679 }
1680 }
1681 }
1682
1683 if let Some(username) = username {
1684 let credential = conn_str.credential.get_or_insert_with(Default::default);
1685 credential.username = Some(username);
1686 credential.password = password;
1687 }
1688
1689 if parts.auth_source.as_deref() == Some("") {
1690 return Err(ErrorKind::InvalidArgument {
1691 message: "empty authSource provided".to_string(),
1692 }
1693 .into());
1694 }
1695
1696 match parts.auth_mechanism {
1697 Some(ref mechanism) => {
1698 let credential = conn_str.credential.get_or_insert_with(Default::default);
1699 credential.source = parts.auth_source;
1700
1701 if let Some(mut doc) = parts.auth_mechanism_properties.take() {
1702 match doc.remove("CANONICALIZE_HOST_NAME") {
1703 Some(Bson::String(s)) => {
1704 let val = match &s.to_lowercase()[..] {
1705 "true" => Bson::Boolean(true),
1706 "false" => Bson::Boolean(false),
1707 "none" | "forward" | "forwardandreverse" => Bson::String(s),
1708 _ => {
1709 return Err(ErrorKind::InvalidArgument {
1710 message: format!(
1711 "Invalid CANONICALIZE_HOST_NAME value: {s}. Valid \
1712 values are 'none', 'forward', 'forwardAndReverse', \
1713 'true', 'false'"
1714 ),
1715 }
1716 .into());
1717 }
1718 };
1719 doc.insert("CANONICALIZE_HOST_NAME", val);
1720 }
1721 Some(val) => {
1722 doc.insert("CANONICALIZE_HOST_NAME", val);
1723 }
1724 None => {}
1725 }
1726
1727 credential.mechanism_properties = Some(doc);
1728 }
1729
1730 #[cfg(feature = "gssapi-auth")]
1731 if mechanism == &AuthMechanism::Gssapi {
1732 let mut doc = if let Some(doc) = credential.mechanism_properties.take() {
1734 doc
1735 } else {
1736 Document::new()
1737 };
1738
1739 if !doc.contains_key("SERVICE_NAME") {
1740 doc.insert("SERVICE_NAME", "mongodb");
1741 }
1742
1743 credential.mechanism_properties = Some(doc);
1744 }
1745
1746 credential.mechanism = Some(mechanism.clone());
1747 mechanism.validate_credential(credential)?;
1748 }
1749 None => {
1750 if let Some(ref mut credential) = conn_str.credential {
1751 credential.source = parts.auth_source;
1752 } else if authentication_requested {
1753 return Err(ErrorKind::InvalidArgument {
1754 message: "username and mechanism both not provided, but authentication \
1755 was requested"
1756 .to_string(),
1757 }
1758 .into());
1759 }
1760 }
1761 };
1762
1763 conn_str.default_database = db;
1765
1766 if conn_str.tls.is_none() && conn_str.is_srv() {
1767 conn_str.tls = Some(Tls::Enabled(Default::default()));
1768 }
1769
1770 #[cfg(feature = "socks5-proxy")]
1771 {
1772 if let Some(host) = parts.proxy_host {
1773 let mut proxy = Socks5Proxy::builder().host(host).build();
1774 if let Some(port) = parts.proxy_port {
1775 proxy.port = Some(port);
1776 }
1777 match (parts.proxy_username, parts.proxy_password) {
1778 (Some(username), Some(password)) => {
1779 proxy.authentication = Some((username, password))
1780 }
1781 (None, None) => {}
1782 _ => {
1783 return Err(Error::invalid_argument(
1784 "proxy username and password must both be specified as nonempty \
1785 strings or unset",
1786 ));
1787 }
1788 }
1789 conn_str.socks5_proxy = Some(proxy);
1790 } else {
1791 let error = |option: &str| {
1792 Error::invalid_argument(format!(
1793 "{option} cannot be set if {PROXY_HOST} is unspecified"
1794 ))
1795 };
1796 if parts.proxy_port.is_some() {
1797 return Err(error(PROXY_PORT));
1798 }
1799 if parts.proxy_username.is_some() {
1800 return Err(error(PROXY_USERNAME));
1801 }
1802 if parts.proxy_password.is_some() {
1803 return Err(error(PROXY_PASSWORD));
1804 }
1805 }
1806 }
1807
1808 Ok(conn_str)
1809 }
1810
1811 fn to_uri_str(&self) -> String {
1813 let ConnectionString {
1814 host_info,
1815 app_name,
1816 tls,
1817 heartbeat_frequency,
1818 local_threshold,
1819 read_concern,
1820 replica_set,
1821 write_concern,
1822 server_selection_timeout,
1823 max_pool_size,
1824 min_pool_size,
1825 max_connecting,
1826 max_idle_time,
1827 #[cfg(any(
1828 feature = "zstd-compression",
1829 feature = "zlib-compression",
1830 feature = "snappy-compression"
1831 ))]
1832 compressors,
1833 connect_timeout,
1834 retry_reads,
1835 retry_writes,
1836 server_monitoring_mode: _,
1837 direct_connection,
1838 credential,
1839 default_database,
1840 load_balanced,
1841 #[allow(deprecated)]
1842 socket_timeout,
1843 read_preference,
1844 uuid_representation,
1845 srv_max_hosts,
1846 srv_service_name: _,
1847 wait_queue_timeout,
1848 tls_insecure,
1849 #[cfg(feature = "socks5-proxy")]
1850 socks5_proxy,
1851 #[cfg(test)]
1852 original_uri: _,
1853 } = self;
1854
1855 let mut res: String = String::new();
1856 let mut opts = String::new();
1857
1858 if self.is_srv() {
1859 res.push_str("mongodb+srv://");
1860 } else {
1861 res.push_str("mongodb://");
1862 }
1863
1864 if let Some(credential) = credential {
1865 if let Some(username) = &credential.username {
1866 res.push_str(&percent_encode(username));
1867 if let Some(password) = &credential.password {
1868 res.push_str(&format!(":{}", &percent_encode(password)))
1869 }
1870 res.push('@');
1871 }
1872 }
1873
1874 if self.is_srv() {
1875 if let HostInfo::DnsRecord(dns) = host_info {
1876 res.push_str(dns);
1877 }
1878 } else if let HostInfo::HostIdentifiers(hosts) = host_info {
1879 res.push_str(
1880 &hosts
1881 .iter()
1882 .map(|h| h.to_string())
1883 .collect::<Vec<_>>()
1884 .join(","),
1885 );
1886 }
1887
1888 res.push('/');
1889
1890 if let Some(authdb) = default_database {
1891 res.push_str(authdb);
1892 }
1893
1894 if let Some(replica_set) = replica_set {
1895 opts.push_str(&format!("&replicaSet={replica_set}"));
1896 }
1897
1898 if let Some(direct_connection) = direct_connection {
1899 opts.push_str(&format!("&directConnection={direct_connection}"));
1900 }
1901
1902 if let Some(tls) = tls {
1903 match tls {
1904 Tls::Enabled(options) => {
1905 opts.push_str("&tls=true");
1906
1907 if let Some(cert_key_file_path) = &options.cert_key_file_path {
1908 opts.push_str(&format!(
1909 "&tlsCertificateKeyFile={}",
1910 cert_key_file_path.to_str().unwrap()
1911 ));
1912 }
1913
1914 #[cfg(feature = "cert-key-password")]
1915 if let Some(tls_certificate_key_file_password) =
1916 &options.tls_certificate_key_file_password
1917 {
1918 opts.push_str(&format!(
1919 "&tlsCertificateKeyFilePassword={}",
1920 std::str::from_utf8(tls_certificate_key_file_password).unwrap()
1921 ));
1922 }
1923
1924 if let Some(ca_file_path) = &options.ca_file_path {
1925 opts.push_str(&format!("&tlsCAFile={}", ca_file_path.to_str().unwrap()));
1926 }
1927
1928 if let Some(allow_invalid_certificates) = options.allow_invalid_certificates {
1929 opts.push_str(&format!(
1930 "&tlsAllowInvalidCertificates={allow_invalid_certificates}"
1931 ));
1932 }
1933
1934 #[cfg(feature = "openssl-tls")]
1935 if let Some(allow_invalid_hostnames) = options.allow_invalid_hostnames {
1936 opts.push_str(&format!(
1937 "&tlsAllowInvalidHostnames={allow_invalid_hostnames}"
1938 ));
1939 }
1940
1941 if let Some(tls_insecure) = tls_insecure {
1942 opts.push_str(&format!("&tlsInsecure={tls_insecure}"));
1943 }
1944 }
1945 Tls::Disabled => {
1946 opts.push_str("&tls=false");
1947 }
1948 }
1949 }
1950
1951 if let Some(connect_timeout) = connect_timeout {
1952 opts.push_str(&format!(
1953 "&connectTimeoutMS={}",
1954 connect_timeout.as_millis()
1955 ));
1956 }
1957
1958 if let Some(socket_timeout) = socket_timeout {
1959 opts.push_str(&format!("&socketTimeoutMS={}", socket_timeout.as_millis()));
1960 }
1961
1962 #[cfg(any(
1963 feature = "zstd-compression",
1964 feature = "zlib-compression",
1965 feature = "snappy-compression"
1966 ))]
1967 if let Some(compressors) = compressors {
1968 opts.push_str(&format!(
1969 "&compressors={}",
1970 compressors
1971 .iter()
1972 .map(|c| c.name())
1973 .collect::<Vec<_>>()
1974 .join(",")
1975 ));
1976 }
1977
1978 #[cfg(feature = "zlib-compression")]
1979 if let Some(compressors) = compressors {
1980 for compressor in compressors {
1981 if let Compressor::Zlib { level: Some(level) } = compressor {
1982 opts.push_str(&format!("&zlibCompressionLevel={level}"));
1983 }
1984 }
1985 }
1986
1987 if let Some(max_pool_size) = max_pool_size {
1988 opts.push_str(&format!("&maxPoolSize={max_pool_size}"));
1989 }
1990
1991 if let Some(min_pool_size) = min_pool_size {
1992 opts.push_str(&format!("&minPoolSize={min_pool_size}"));
1993 }
1994
1995 if let Some(max_connecting) = max_connecting {
1996 opts.push_str(&format!("&maxConnecting={max_connecting}"));
1997 }
1998
1999 if let Some(max_idle_time) = max_idle_time {
2000 opts.push_str(&format!("&maxIdleTimeMS={}", max_idle_time.as_millis()));
2001 }
2002
2003 if let Some(wait_queue_timeout) = wait_queue_timeout {
2004 opts.push_str(&format!(
2005 "&waitQueueTimeoutMS={}",
2006 wait_queue_timeout.as_millis()
2007 ));
2008 }
2009
2010 if let Some(write_concern) = write_concern {
2011 if let Some(w) = &write_concern.w {
2012 match w {
2013 Acknowledgment::Nodes(i) => {
2014 opts.push_str(&format!("&w={i}"));
2015 }
2016 Acknowledgment::Majority => {
2017 opts.push_str("&w=majority");
2018 }
2019 Acknowledgment::Custom(tag) => {
2020 opts.push_str(&format!("&w={tag}"));
2021 }
2022 }
2023 }
2024
2025 if let Some(w_timeout) = write_concern.w_timeout {
2026 opts.push_str(&format!("&wtimeoutMS={}", w_timeout.as_millis()));
2027 }
2028
2029 if let Some(journal) = write_concern.journal {
2030 opts.push_str(&format!("&journal={journal}"));
2031 }
2032 }
2033
2034 if let Some(read_concern) = read_concern {
2035 opts.push_str(&format!(
2036 "&readConcernLevel={}",
2037 read_concern.level.as_str()
2038 ));
2039 }
2040
2041 if let Some(read_preference) = read_preference {
2042 opts.push_str(&format!("&readPreference={}", read_preference.mode()));
2043
2044 if let Some(max_staleness) = read_preference.max_staleness() {
2045 opts.push_str(&format!("&maxStalenessSeconds={}", max_staleness.as_secs()));
2046 }
2047
2048 if let Some(tag_sets) = read_preference.tag_sets() {
2049 let ser_tag_set = |tag_set: &HashMap<String, String>| -> String {
2050 let tags = tag_set
2051 .iter()
2052 .map(|(k, v)| format!("{k}:{v}"))
2053 .collect::<Vec<_>>()
2054 .join(",");
2055 format!("&readPreferenceTags={tags}")
2056 };
2057 opts.push_str(
2058 &tag_sets
2059 .iter()
2060 .map(ser_tag_set)
2061 .collect::<Vec<_>>()
2062 .join(""),
2063 )
2064 }
2065 }
2066
2067 if let Some(auth_source) = credential
2068 .as_ref()
2069 .and_then(|c: &Credential| c.source.as_ref())
2070 {
2071 opts.push_str(&format!("&authSource={auth_source}"));
2072 }
2073
2074 if let Some(auth_mechanism) = credential
2075 .as_ref()
2076 .and_then(|c: &Credential| c.mechanism.as_ref())
2077 {
2078 opts.push_str(&format!(
2079 "&authMechanism={}",
2080 auth_mechanism.as_str().to_uppercase()
2081 ));
2082 }
2083
2084 if let Some(auth_mechanism_properties) = credential
2085 .as_ref()
2086 .and_then(|c: &Credential| c.mechanism_properties.as_ref())
2087 {
2088 if !auth_mechanism_properties.is_empty() {
2089 opts.push_str(&format!(
2090 "&authMechanismProperties={}",
2091 &auth_mechanism_properties
2092 .iter()
2093 .map(|(k, v)| format!("{k}:{v}"))
2094 .collect::<Vec<_>>()
2095 .join(",")
2096 ))
2097 }
2098 }
2099
2100 if let Some(local_threshold) = local_threshold {
2101 opts.push_str(&format!(
2102 "&localThresholdMS={}",
2103 local_threshold.as_millis()
2104 ));
2105 }
2106
2107 if let Some(server_selection_timeout) = server_selection_timeout {
2108 opts.push_str(&format!(
2109 "&serverSelectionTimeoutMS={}",
2110 server_selection_timeout.as_millis()
2111 ));
2112 }
2113
2114 if let Some(heartbeat_frequency) = heartbeat_frequency {
2115 opts.push_str(&format!(
2116 "&heartbeatFrequencyMS={}",
2117 heartbeat_frequency.as_millis()
2118 ));
2119 }
2120
2121 if let Some(app_name) = app_name {
2122 opts.push_str(&format!("&appName={app_name}"));
2123 }
2124
2125 if let Some(retry_reads) = retry_reads {
2126 opts.push_str(&format!("&retryReads={retry_reads}"));
2127 }
2128
2129 if let Some(retry_writes) = retry_writes {
2130 opts.push_str(&format!("&retryWrites={retry_writes}"));
2131 }
2132
2133 if let Some(uuid_rep) = uuid_representation {
2134 let s = match uuid_rep {
2135 UuidRepresentation::Standard => "standard",
2136 UuidRepresentation::CSharpLegacy => "csharpLegacy",
2137 UuidRepresentation::JavaLegacy => "javaLegacy",
2138 UuidRepresentation::PythonLegacy => "pythonLegacy",
2139 _ => "",
2140 };
2141 opts.push_str(&format!("&uuidRepresentation={s}"));
2142 }
2143
2144 if let Some(load_balanced) = load_balanced {
2145 opts.push_str(&format!("&loadBalanced={load_balanced}"));
2146 }
2147
2148 if let Some(srv_max_hosts) = srv_max_hosts {
2149 opts.push_str(&format!("&srvMaxHosts={srv_max_hosts}"));
2150 }
2151
2152 #[cfg(feature = "socks5-proxy")]
2153 if let Some(proxy) = socks5_proxy {
2154 opts.push_str(&format!("&proxyHost={}", proxy.host));
2155 if let Some(port) = proxy.port {
2156 opts.push_str(&format!("&proxyPort={port}"));
2157 }
2158 if let Some((username, password)) = proxy.authentication.as_ref() {
2159 opts.push_str(&format!(
2160 "&proxyUsername={username}&proxyPassword={password}"
2161 ));
2162 }
2163 }
2164
2165 if !opts.is_empty() {
2166 opts.replace_range(0..1, "?"); res.push_str(&opts);
2168 }
2169
2170 res
2171 }
2172
2173 pub fn wait_queue_timeout(&self) -> Option<Duration> {
2176 self.wait_queue_timeout
2177 }
2178
2179 pub fn tls_insecure(&self) -> Option<bool> {
2184 self.tls_insecure
2185 }
2186
2187 fn is_srv(&self) -> bool {
2188 matches!(self.host_info, HostInfo::DnsRecord(_))
2189 }
2190
2191 fn parse_options(&mut self, options: &str) -> Result<ConnectionStringParts> {
2192 let mut parts = ConnectionStringParts::default();
2193 if options.is_empty() {
2194 return Ok(parts);
2195 }
2196
2197 let mut keys = HashSet::new();
2198
2199 for option_pair in options.split('&') {
2200 let (key, value) = match option_pair.split_once('=') {
2201 Some((key, value)) => (key.to_lowercase(), value),
2202 None => {
2203 return Err(Error::invalid_argument(format!(
2204 "connection string option is not a 'key=value' pair: {option_pair}"
2205 )))
2206 }
2207 };
2208
2209 if !keys.insert(key.clone()) && key != "readpreferencetags" {
2210 return Err(Error::invalid_argument(
2211 "repeated options are not allowed in the connection string",
2212 ));
2213 }
2214
2215 self.parse_option_pair(
2216 &mut parts,
2217 &key,
2218 percent_encoding::percent_decode(value.as_bytes())
2219 .decode_utf8_lossy()
2220 .as_ref(),
2221 )?;
2222 }
2223
2224 if keys.contains(TLS_INSECURE) {
2225 #[cfg(feature = "openssl-tls")]
2226 let disallowed = [TLS_ALLOW_INVALID_CERTIFICATES, TLS_ALLOW_INVALID_HOSTNAMES];
2227 #[cfg(not(feature = "openssl-tls"))]
2228 let disallowed = [TLS_ALLOW_INVALID_CERTIFICATES];
2229 for option in disallowed {
2230 if keys.contains(option) {
2231 return Err(Error::invalid_argument(format!(
2232 "cannot set both {TLS_INSECURE} and {option} in the connection string"
2233 )));
2234 }
2235 }
2236 }
2237
2238 if let Some(tags) = parts.read_preference_tags.take() {
2239 self.read_preference = match self.read_preference.take() {
2240 Some(read_pref) => Some(read_pref.with_tags(tags)?),
2241 None => {
2242 return Err(ErrorKind::InvalidArgument {
2243 message: "cannot set read preference tags without also setting read \
2244 preference mode"
2245 .to_string(),
2246 }
2247 .into())
2248 }
2249 };
2250 }
2251
2252 if let Some(max_staleness) = parts.max_staleness.take() {
2253 self.read_preference = match self.read_preference.take() {
2254 Some(read_pref) => Some(read_pref.with_max_staleness(max_staleness)?),
2255 None => {
2256 return Err(ErrorKind::InvalidArgument {
2257 message: "cannot set max staleness without also setting read preference \
2258 mode"
2259 .to_string(),
2260 }
2261 .into())
2262 }
2263 };
2264 }
2265
2266 if let Some(true) = self.direct_connection {
2267 if self.is_srv() {
2268 return Err(ErrorKind::InvalidArgument {
2269 message: "cannot use SRV-style URI with directConnection=true".to_string(),
2270 }
2271 .into());
2272 }
2273 }
2274
2275 #[cfg(feature = "zlib-compression")]
2276 if let Some(zlib_compression_level) = parts.zlib_compression {
2277 if let Some(compressors) = self.compressors.as_mut() {
2278 for compressor in compressors {
2279 compressor.write_zlib_level(zlib_compression_level)?;
2280 }
2281 }
2282 }
2283 #[cfg(not(feature = "zlib-compression"))]
2284 if parts.zlib_compression.is_some() {
2285 return Err(ErrorKind::InvalidArgument {
2286 message: "zlibCompressionLevel may not be specified without the zlib-compression \
2287 feature flag enabled"
2288 .into(),
2289 }
2290 .into());
2291 }
2292
2293 Ok(parts)
2294 }
2295
2296 fn parse_option_pair(
2297 &mut self,
2298 parts: &mut ConnectionStringParts,
2299 key: &str,
2300 value: &str,
2301 ) -> Result<()> {
2302 macro_rules! get_bool {
2303 ($value:expr, $option:expr) => {
2304 match $value {
2305 "true" => true,
2306 "false" => false,
2307 _ => {
2308 return Err(ErrorKind::InvalidArgument {
2309 message: format!(
2310 "connection string `{}` option must be a boolean",
2311 $option,
2312 ),
2313 }
2314 .into())
2315 }
2316 }
2317 };
2318 }
2319
2320 macro_rules! get_duration {
2321 ($value:expr, $option:expr) => {
2322 match $value.parse::<u64>() {
2323 Ok(i) => i,
2324 _ => {
2325 return Err(ErrorKind::InvalidArgument {
2326 message: format!(
2327 "connection string `{}` option must be a non-negative integer",
2328 $option
2329 ),
2330 }
2331 .into())
2332 }
2333 }
2334 };
2335 }
2336
2337 macro_rules! get_u32 {
2338 ($value:expr, $option:expr) => {
2339 match value.parse::<u32>() {
2340 Ok(u) => u,
2341 Err(_) => {
2342 return Err(ErrorKind::InvalidArgument {
2343 message: format!(
2344 "connection string `{}` argument must be a positive integer",
2345 $option,
2346 ),
2347 }
2348 .into())
2349 }
2350 }
2351 };
2352 }
2353
2354 macro_rules! get_i32 {
2355 ($value:expr, $option:expr) => {
2356 match value.parse::<i32>() {
2357 Ok(u) => u,
2358 Err(_) => {
2359 return Err(ErrorKind::InvalidArgument {
2360 message: format!(
2361 "connection string `{}` argument must be an integer",
2362 $option
2363 ),
2364 }
2365 .into())
2366 }
2367 }
2368 };
2369 }
2370
2371 match key {
2372 "appname" => {
2373 self.app_name = Some(value.into());
2374 }
2375 "authmechanism" => {
2376 parts.auth_mechanism = Some(AuthMechanism::from_str(value)?);
2377 }
2378 "authsource" => parts.auth_source = Some(value.to_string()),
2379 "authmechanismproperties" => {
2380 let mut properties = Document::new();
2381
2382 for property in value.split(",") {
2383 let Some((k, v)) = property.split_once(":") else {
2384 return Err(Error::invalid_argument(format!(
2385 "each entry in authMechanismProperties must be a colon-separated \
2386 key-value pair, got {property}"
2387 )));
2388 };
2389 if k == "ALLOWED_HOSTS" || k == "OIDC_CALLBACK" || k == "OIDC_HUMAN_CALLBACK" {
2390 return Err(Error::invalid_argument(format!(
2391 "{k} must only be specified through client options"
2392 )));
2393 }
2394 properties.insert(k, v);
2395 }
2396
2397 parts.auth_mechanism_properties = Some(properties);
2398 }
2399 #[cfg(any(
2400 feature = "zstd-compression",
2401 feature = "zlib-compression",
2402 feature = "snappy-compression"
2403 ))]
2404 "compressors" => {
2405 let mut compressors: Option<Vec<Compressor>> = None;
2406 for compressor in value.split(',') {
2407 let compressor = Compressor::from_str(compressor)?;
2408 compressors
2409 .get_or_insert_with(Default::default)
2410 .push(compressor);
2411 }
2412 self.compressors = compressors;
2413 }
2414 k @ "connecttimeoutms" => {
2415 self.connect_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2416 }
2417 k @ "directconnection" => {
2418 self.direct_connection = Some(get_bool!(value, k));
2419 }
2420 k @ "heartbeatfrequencyms" => {
2421 self.heartbeat_frequency = Some(Duration::from_millis(get_duration!(value, k)));
2422 }
2423 k @ "journal" => {
2424 let write_concern = self.write_concern.get_or_insert_with(Default::default);
2425 write_concern.journal = Some(get_bool!(value, k));
2426 }
2427 k @ "loadbalanced" => {
2428 self.load_balanced = Some(get_bool!(value, k));
2429 }
2430 k @ "localthresholdms" => {
2431 self.local_threshold = Some(Duration::from_millis(get_duration!(value, k)))
2432 }
2433 k @ "maxidletimems" => {
2434 self.max_idle_time = Some(Duration::from_millis(get_duration!(value, k)));
2435 }
2436 "maxstalenessseconds" => {
2437 let max_staleness_seconds = value.parse::<i64>().map_err(|e| {
2438 Error::invalid_argument(format!("invalid maxStalenessSeconds value: {e}"))
2439 })?;
2440
2441 let max_staleness = match max_staleness_seconds.cmp(&-1) {
2442 Ordering::Less => {
2443 return Err(Error::invalid_argument(format!(
2444 "maxStalenessSeconds must be -1 or positive, instead got \
2445 {max_staleness_seconds}"
2446 )));
2447 }
2448 Ordering::Equal => {
2449 return Ok(());
2451 }
2452 Ordering::Greater => {
2453 Duration::from_secs(max_staleness_seconds.try_into().unwrap())
2455 }
2456 };
2457
2458 parts.max_staleness = Some(max_staleness);
2459 }
2460 k @ "maxpoolsize" => {
2461 self.max_pool_size = Some(get_u32!(value, k));
2462 }
2463 k @ "minpoolsize" => {
2464 self.min_pool_size = Some(get_u32!(value, k));
2465 }
2466 k @ "maxconnecting" => {
2467 self.max_connecting = Some(get_u32!(value, k));
2468 }
2469 "readconcernlevel" => {
2470 self.read_concern = Some(ReadConcernLevel::from_str(value).into());
2471 }
2472 "readpreference" => {
2473 self.read_preference = Some(match &value.to_lowercase()[..] {
2474 "primary" => ReadPreference::Primary,
2475 "secondary" => ReadPreference::Secondary {
2476 options: Default::default(),
2477 },
2478 "primarypreferred" => ReadPreference::PrimaryPreferred {
2479 options: Default::default(),
2480 },
2481 "secondarypreferred" => ReadPreference::SecondaryPreferred {
2482 options: Default::default(),
2483 },
2484 "nearest" => ReadPreference::Nearest {
2485 options: Default::default(),
2486 },
2487 other => {
2488 return Err(ErrorKind::InvalidArgument {
2489 message: format!("'{other}' is not a valid read preference"),
2490 }
2491 .into())
2492 }
2493 });
2494 }
2495 "readpreferencetags" => {
2496 let tags: Result<TagSet> = if value.is_empty() {
2497 Ok(TagSet::new())
2498 } else {
2499 value
2500 .split(',')
2501 .map(|tag| {
2502 let mut values = tag.split(':');
2503
2504 match (values.next(), values.next()) {
2505 (Some(key), Some(value)) => {
2506 Ok((key.to_string(), value.to_string()))
2507 }
2508 _ => Err(ErrorKind::InvalidArgument {
2509 message: format!(
2510 "'{value}' is not a valid read preference tag (which must \
2511 be of the form 'key:value'",
2512 ),
2513 }
2514 .into()),
2515 }
2516 })
2517 .collect()
2518 };
2519
2520 parts
2521 .read_preference_tags
2522 .get_or_insert_with(Vec::new)
2523 .push(tags?);
2524 }
2525 "replicaset" => {
2526 self.replica_set = Some(value.to_string());
2527 }
2528 k @ "retrywrites" => {
2529 self.retry_writes = Some(get_bool!(value, k));
2530 }
2531 k @ "retryreads" => {
2532 self.retry_reads = Some(get_bool!(value, k));
2533 }
2534 "servermonitoringmode" => {
2535 self.server_monitoring_mode = Some(match value.to_lowercase().as_str() {
2536 "stream" => ServerMonitoringMode::Stream,
2537 "poll" => ServerMonitoringMode::Poll,
2538 "auto" => ServerMonitoringMode::Auto,
2539 other => {
2540 return Err(Error::invalid_argument(format!(
2541 "{other:?} is not a valid server monitoring mode"
2542 )));
2543 }
2544 });
2545 }
2546 k @ "serverselectiontimeoutms" => {
2547 self.server_selection_timeout = Some(Duration::from_millis(get_duration!(value, k)))
2548 }
2549 #[allow(deprecated)]
2550 k @ "sockettimeoutms" => {
2551 self.socket_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2552 }
2553 k @ "srvmaxhosts" => {
2554 self.srv_max_hosts = Some(get_u32!(value, k));
2555 }
2556 "srvservicename" => {
2557 self.srv_service_name = Some(value.to_string());
2558 }
2559 k @ "tls" | k @ "ssl" => {
2560 let tls = get_bool!(value, k);
2561
2562 match self.tls {
2563 Some(Tls::Enabled(_)) if !tls => {
2564 return Err(Error::invalid_argument(
2565 "cannot set {key}={tls} if other TLS options are set",
2566 ))
2567 }
2568 Some(Tls::Disabled) if tls => {
2569 return Err(Error::invalid_argument(
2570 "cannot set {key}={tls} if TLS is disabled",
2571 ))
2572 }
2573 None => {
2574 if tls {
2575 self.tls = Some(Tls::Enabled(Default::default()))
2576 } else {
2577 self.tls = Some(Tls::Disabled)
2578 }
2579 }
2580 _ => {}
2581 }
2582 }
2583 TLS_INSECURE => {
2584 let val = get_bool!(value, key);
2585 self.tls_insecure = Some(val);
2586
2587 match self
2588 .tls
2589 .get_or_insert_with(|| Tls::Enabled(Default::default()))
2590 {
2591 Tls::Enabled(ref mut options) => {
2592 options.allow_invalid_certificates = Some(val);
2593 #[cfg(feature = "openssl-tls")]
2594 {
2595 options.allow_invalid_hostnames = Some(val);
2596 }
2597 }
2598 Tls::Disabled => {
2599 return Err(Error::invalid_argument(format!(
2600 "cannot set {key} when TLS is disabled"
2601 )));
2602 }
2603 }
2604 }
2605 TLS_ALLOW_INVALID_CERTIFICATES => {
2606 let val = get_bool!(value, key);
2607
2608 match self
2609 .tls
2610 .get_or_insert_with(|| Tls::Enabled(Default::default()))
2611 {
2612 Tls::Enabled(ref mut options) => {
2613 options.allow_invalid_certificates = Some(val);
2614 }
2615 Tls::Disabled => {
2616 return Err(Error::invalid_argument(format!(
2617 "cannot set {key} when TLS is disabled"
2618 )))
2619 }
2620 }
2621 }
2622 "tlscafile" => match self.tls {
2623 Some(Tls::Disabled) => {
2624 return Err(ErrorKind::InvalidArgument {
2625 message: "'tlsCAFile' can't be set if tls=false".into(),
2626 }
2627 .into());
2628 }
2629 Some(Tls::Enabled(ref mut options)) => {
2630 options.ca_file_path = Some(value.into());
2631 }
2632 None => {
2633 self.tls = Some(Tls::Enabled(
2634 TlsOptions::builder()
2635 .ca_file_path(PathBuf::from(value))
2636 .build(),
2637 ))
2638 }
2639 },
2640 "tlscertificatekeyfile" => match self.tls {
2641 Some(Tls::Disabled) => {
2642 return Err(ErrorKind::InvalidArgument {
2643 message: "'tlsCertificateKeyFile' can't be set if tls=false".into(),
2644 }
2645 .into());
2646 }
2647 Some(Tls::Enabled(ref mut options)) => {
2648 options.cert_key_file_path = Some(value.into());
2649 }
2650 None => {
2651 self.tls = Some(Tls::Enabled(
2652 TlsOptions::builder()
2653 .cert_key_file_path(PathBuf::from(value))
2654 .build(),
2655 ))
2656 }
2657 },
2658 #[cfg(feature = "cert-key-password")]
2659 "tlscertificatekeyfilepassword" => match &mut self.tls {
2660 Some(Tls::Disabled) => {
2661 return Err(ErrorKind::InvalidArgument {
2662 message: "'tlsCertificateKeyFilePassword' can't be set if tls=false".into(),
2663 }
2664 .into());
2665 }
2666 Some(Tls::Enabled(options)) => {
2667 options.tls_certificate_key_file_password = Some(value.as_bytes().to_vec());
2668 }
2669 None => {
2670 self.tls = Some(Tls::Enabled(
2671 TlsOptions::builder()
2672 .tls_certificate_key_file_password(value.as_bytes().to_vec())
2673 .build(),
2674 ))
2675 }
2676 },
2677 #[cfg(not(feature = "cert-key-password"))]
2678 "tlscertificatekeyfilepassword" => {
2679 return Err(Error::invalid_argument(
2680 "the cert-key-password feature must be enabled to specify \
2681 tlsCertificateKeyFilePassword in the URI",
2682 ));
2683 }
2684 "uuidrepresentation" => match value.to_lowercase().as_str() {
2685 "csharplegacy" => self.uuid_representation = Some(UuidRepresentation::CSharpLegacy),
2686 "javalegacy" => self.uuid_representation = Some(UuidRepresentation::JavaLegacy),
2687 "pythonlegacy" => self.uuid_representation = Some(UuidRepresentation::PythonLegacy),
2688 _ => {
2689 return Err(ErrorKind::InvalidArgument {
2690 message: format!(
2691 "connection string `uuidRepresentation` option can be one of \
2692 `csharpLegacy`, `javaLegacy`, or `pythonLegacy`. Received invalid \
2693 `{value}`"
2694 ),
2695 }
2696 .into())
2697 }
2698 },
2699 "w" => {
2700 let write_concern = self.write_concern.get_or_insert_with(Default::default);
2701
2702 match value.parse::<i32>() {
2703 Ok(w) => match u32::try_from(w) {
2704 Ok(uw) => write_concern.w = Some(Acknowledgment::from(uw)),
2705 Err(_) => {
2706 return Err(ErrorKind::InvalidArgument {
2707 message: "connection string `w` option cannot be a negative \
2708 integer"
2709 .to_string(),
2710 }
2711 .into())
2712 }
2713 },
2714 Err(_) => {
2715 write_concern.w = Some(Acknowledgment::from(value.to_string()));
2716 }
2717 };
2718 }
2719 k @ "waitqueuetimeoutms" => {
2720 self.wait_queue_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2721 }
2722 k @ "wtimeoutms" => {
2723 let write_concern = self.write_concern.get_or_insert_with(Default::default);
2724 write_concern.w_timeout = Some(Duration::from_millis(get_duration!(value, k)));
2725 }
2726 k @ "zlibcompressionlevel" => {
2727 let i = get_i32!(value, k);
2728 if i < -1 {
2729 return Err(ErrorKind::InvalidArgument {
2730 message: "'zlibCompressionLevel' cannot be less than -1".to_string(),
2731 }
2732 .into());
2733 }
2734
2735 if i > 9 {
2736 return Err(ErrorKind::InvalidArgument {
2737 message: "'zlibCompressionLevel' cannot be greater than 9".to_string(),
2738 }
2739 .into());
2740 }
2741
2742 parts.zlib_compression = Some(i);
2743 }
2744 #[cfg(feature = "socks5-proxy")]
2745 PROXY_HOST => parts.proxy_host = Some(value.to_string()),
2746 #[cfg(feature = "socks5-proxy")]
2747 PROXY_PORT => {
2748 let port = u16::from_str(value)
2749 .map_err(|_| Error::invalid_argument(format!("invalid proxy port: {value}")))?;
2750 parts.proxy_port = Some(port);
2751 }
2752 #[cfg(feature = "socks5-proxy")]
2753 PROXY_USERNAME if !value.is_empty() => parts.proxy_username = Some(value.to_string()),
2754 #[cfg(feature = "socks5-proxy")]
2755 PROXY_PASSWORD if !value.is_empty() => parts.proxy_password = Some(value.to_string()),
2756 #[cfg(not(feature = "socks5-proxy"))]
2757 PROXY_HOST | PROXY_PORT | PROXY_USERNAME | PROXY_PASSWORD => {
2758 return Err(Error::invalid_argument(format!(
2759 "cannot specify {key} if socks5-proxy feature is not enabled"
2760 )));
2761 }
2762
2763 other => {
2764 let (jaro_winkler, option) = URI_OPTIONS.iter().fold((0.0, ""), |acc, option| {
2765 let jaro_winkler = jaro_winkler(option, other).abs();
2766 if jaro_winkler > acc.0 {
2767 return (jaro_winkler, option);
2768 }
2769 acc
2770 });
2771 let mut message = format!("{other} is an invalid option");
2772 if jaro_winkler >= 0.84 {
2773 let _ = write!(message, ". An option with a similar name exists: {option}");
2774 }
2775 return Err(ErrorKind::InvalidArgument { message }.into());
2776 }
2777 }
2778
2779 Ok(())
2780 }
2781}
2782
2783impl FromStr for ConnectionString {
2784 type Err = Error;
2785 fn from_str(s: &str) -> Result<Self> {
2786 ConnectionString::parse(s)
2787 }
2788}
2789
2790impl<'de> Deserialize<'de> for ConnectionString {
2791 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2792 where
2793 D: Deserializer<'de>,
2794 {
2795 deserializer.deserialize_str(ConnectionStringVisitor)
2796 }
2797}
2798
2799impl Display for ConnectionString {
2800 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2801 write!(f, "{}", self.to_uri_str())
2802 }
2803}
2804
2805struct ConnectionStringVisitor;
2806
2807impl serde::de::Visitor<'_> for ConnectionStringVisitor {
2808 type Value = ConnectionString;
2809
2810 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
2811 write!(formatter, "a MongoDB connection string")
2812 }
2813
2814 fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
2815 where
2816 E: serde::de::Error,
2817 {
2818 ConnectionString::parse(v).map_err(serde::de::Error::custom)
2819 }
2820}
2821
2822#[cfg(test)]
2823mod tests {
2824 use std::time::Duration;
2825
2826 use pretty_assertions::assert_eq;
2827
2828 use super::{ClientOptions, ServerAddress};
2829 use crate::{
2830 concern::{Acknowledgment, ReadConcernLevel, WriteConcern},
2831 selection_criteria::{ReadPreference, ReadPreferenceOptions},
2832 };
2833
2834 macro_rules! tag_set {
2835 ( $($k:expr => $v:expr),* ) => {
2836 #[allow(clippy::let_and_return)]
2837 {
2838 use std::collections::HashMap;
2839
2840 #[allow(unused_mut)]
2841 let mut ts = HashMap::new();
2842 $(
2843 ts.insert($k.to_string(), $v.to_string());
2844 )*
2845
2846 ts
2847 }
2848 }
2849 }
2850
2851 fn host_without_port(hostname: &str) -> ServerAddress {
2852 ServerAddress::Tcp {
2853 host: hostname.to_string(),
2854 port: None,
2855 }
2856 }
2857
2858 #[test]
2859 fn test_parse_address_with_from_str() {
2860 let x = "localhost:27017".parse::<ServerAddress>().unwrap();
2861 match x {
2862 ServerAddress::Tcp { host, port } => {
2863 assert_eq!(host, "localhost");
2864 assert_eq!(port, Some(27017));
2865 }
2866 #[cfg(unix)]
2867 _ => panic!("expected ServerAddress::Tcp"),
2868 }
2869
2870 let x = "localhost".parse::<ServerAddress>().unwrap();
2872 match x {
2873 ServerAddress::Tcp { host, port } => {
2874 assert_eq!(host, "localhost");
2875 assert_eq!(port, None);
2876 }
2877 #[cfg(unix)]
2878 _ => panic!("expected ServerAddress::Tcp"),
2879 }
2880
2881 let x = "localhost:not a number".parse::<ServerAddress>();
2882 assert!(x.is_err());
2883
2884 #[cfg(unix)]
2885 {
2886 let x = "/path/to/socket.sock".parse::<ServerAddress>().unwrap();
2887 match x {
2888 ServerAddress::Unix { path } => {
2889 assert_eq!(path.to_str().unwrap(), "/path/to/socket.sock");
2890 }
2891 _ => panic!("expected ServerAddress::Unix"),
2892 }
2893 }
2894 }
2895
2896 #[tokio::test]
2897 async fn fails_without_scheme() {
2898 assert!(ClientOptions::parse("localhost:27017").await.is_err());
2899 }
2900
2901 #[tokio::test]
2902 async fn fails_with_invalid_scheme() {
2903 assert!(ClientOptions::parse("mangodb://localhost:27017")
2904 .await
2905 .is_err());
2906 }
2907
2908 #[tokio::test]
2909 async fn fails_with_nothing_after_scheme() {
2910 assert!(ClientOptions::parse("mongodb://").await.is_err());
2911 }
2912
2913 #[tokio::test]
2914 async fn fails_with_only_slash_after_scheme() {
2915 assert!(ClientOptions::parse("mongodb:///").await.is_err());
2916 }
2917
2918 #[tokio::test]
2919 async fn fails_with_no_host() {
2920 assert!(ClientOptions::parse("mongodb://:27017").await.is_err());
2921 }
2922
2923 #[tokio::test]
2924 async fn no_port() {
2925 let uri = "mongodb://localhost";
2926
2927 assert_eq!(
2928 ClientOptions::parse(uri).await.unwrap(),
2929 ClientOptions {
2930 hosts: vec![host_without_port("localhost")],
2931 original_uri: Some(uri.into()),
2932 ..Default::default()
2933 }
2934 );
2935 }
2936
2937 #[tokio::test]
2938 async fn no_port_trailing_slash() {
2939 let uri = "mongodb://localhost/";
2940
2941 assert_eq!(
2942 ClientOptions::parse(uri).await.unwrap(),
2943 ClientOptions {
2944 hosts: vec![host_without_port("localhost")],
2945 original_uri: Some(uri.into()),
2946 ..Default::default()
2947 }
2948 );
2949 }
2950
2951 #[tokio::test]
2952 async fn with_port() {
2953 let uri = "mongodb://localhost/";
2954
2955 assert_eq!(
2956 ClientOptions::parse(uri).await.unwrap(),
2957 ClientOptions {
2958 hosts: vec![ServerAddress::Tcp {
2959 host: "localhost".to_string(),
2960 port: Some(27017),
2961 }],
2962 original_uri: Some(uri.into()),
2963 ..Default::default()
2964 }
2965 );
2966 }
2967
2968 #[tokio::test]
2969 async fn with_port_and_trailing_slash() {
2970 let uri = "mongodb://localhost:27017/";
2971
2972 assert_eq!(
2973 ClientOptions::parse(uri).await.unwrap(),
2974 ClientOptions {
2975 hosts: vec![ServerAddress::Tcp {
2976 host: "localhost".to_string(),
2977 port: Some(27017),
2978 }],
2979 original_uri: Some(uri.into()),
2980 ..Default::default()
2981 }
2982 );
2983 }
2984
2985 #[tokio::test]
2986 async fn with_read_concern() {
2987 let uri = "mongodb://localhost:27017/?readConcernLevel=foo";
2988
2989 assert_eq!(
2990 ClientOptions::parse(uri).await.unwrap(),
2991 ClientOptions {
2992 hosts: vec![ServerAddress::Tcp {
2993 host: "localhost".to_string(),
2994 port: Some(27017),
2995 }],
2996 read_concern: Some(ReadConcernLevel::Custom("foo".to_string()).into()),
2997 original_uri: Some(uri.into()),
2998 ..Default::default()
2999 }
3000 );
3001 }
3002
3003 #[tokio::test]
3004 async fn with_w_negative_int() {
3005 assert!(ClientOptions::parse("mongodb://localhost:27017/?w=-1")
3006 .await
3007 .is_err());
3008 }
3009
3010 #[tokio::test]
3011 async fn with_w_non_negative_int() {
3012 let uri = "mongodb://localhost:27017/?w=1";
3013 let write_concern = WriteConcern::builder().w(Acknowledgment::from(1)).build();
3014
3015 assert_eq!(
3016 ClientOptions::parse(uri).await.unwrap(),
3017 ClientOptions {
3018 hosts: vec![ServerAddress::Tcp {
3019 host: "localhost".to_string(),
3020 port: Some(27017),
3021 }],
3022 write_concern: Some(write_concern),
3023 original_uri: Some(uri.into()),
3024 ..Default::default()
3025 }
3026 );
3027 }
3028
3029 #[tokio::test]
3030 async fn with_w_string() {
3031 let uri = "mongodb://localhost:27017/?w=foo";
3032 let write_concern = WriteConcern::builder()
3033 .w(Acknowledgment::from("foo".to_string()))
3034 .build();
3035
3036 assert_eq!(
3037 ClientOptions::parse(uri).await.unwrap(),
3038 ClientOptions {
3039 hosts: vec![ServerAddress::Tcp {
3040 host: "localhost".to_string(),
3041 port: Some(27017),
3042 }],
3043 write_concern: Some(write_concern),
3044 original_uri: Some(uri.into()),
3045 ..Default::default()
3046 }
3047 );
3048 }
3049
3050 #[tokio::test]
3051 async fn with_invalid_j() {
3052 assert!(
3053 ClientOptions::parse("mongodb://localhost:27017/?journal=foo")
3054 .await
3055 .is_err()
3056 );
3057 }
3058
3059 #[tokio::test]
3060 async fn with_j() {
3061 let uri = "mongodb://localhost:27017/?journal=true";
3062 let write_concern = WriteConcern::builder().journal(true).build();
3063
3064 assert_eq!(
3065 ClientOptions::parse(uri).await.unwrap(),
3066 ClientOptions {
3067 hosts: vec![ServerAddress::Tcp {
3068 host: "localhost".to_string(),
3069 port: Some(27017),
3070 }],
3071 write_concern: Some(write_concern),
3072 original_uri: Some(uri.into()),
3073 ..Default::default()
3074 }
3075 );
3076 }
3077
3078 #[tokio::test]
3079 async fn with_wtimeout_non_int() {
3080 assert!(
3081 ClientOptions::parse("mongodb://localhost:27017/?wtimeoutMS=foo")
3082 .await
3083 .is_err()
3084 );
3085 }
3086
3087 #[tokio::test]
3088 async fn with_wtimeout_negative_int() {
3089 assert!(
3090 ClientOptions::parse("mongodb://localhost:27017/?wtimeoutMS=-1")
3091 .await
3092 .is_err()
3093 );
3094 }
3095
3096 #[tokio::test]
3097 async fn with_wtimeout() {
3098 let uri = "mongodb://localhost:27017/?wtimeoutMS=27";
3099 let write_concern = WriteConcern::builder()
3100 .w_timeout(Duration::from_millis(27))
3101 .build();
3102
3103 assert_eq!(
3104 ClientOptions::parse(uri).await.unwrap(),
3105 ClientOptions {
3106 hosts: vec![ServerAddress::Tcp {
3107 host: "localhost".to_string(),
3108 port: Some(27017),
3109 }],
3110 write_concern: Some(write_concern),
3111 original_uri: Some(uri.into()),
3112 ..Default::default()
3113 }
3114 );
3115 }
3116
3117 #[tokio::test]
3118 async fn with_all_write_concern_options() {
3119 let uri = "mongodb://localhost:27017/?w=majority&journal=false&wtimeoutMS=27";
3120 let write_concern = WriteConcern::builder()
3121 .w(Acknowledgment::Majority)
3122 .journal(false)
3123 .w_timeout(Duration::from_millis(27))
3124 .build();
3125
3126 assert_eq!(
3127 ClientOptions::parse(uri).await.unwrap(),
3128 ClientOptions {
3129 hosts: vec![ServerAddress::Tcp {
3130 host: "localhost".to_string(),
3131 port: Some(27017),
3132 }],
3133 write_concern: Some(write_concern),
3134 original_uri: Some(uri.into()),
3135 ..Default::default()
3136 }
3137 );
3138 }
3139
3140 #[tokio::test]
3141 async fn with_mixed_options() {
3142 let uri = "mongodb://localhost,localhost:27018/?w=majority&readConcernLevel=majority&\
3143 journal=false&wtimeoutMS=27&replicaSet=foo&heartbeatFrequencyMS=1000&\
3144 localThresholdMS=4000&readPreference=secondaryPreferred&readpreferencetags=dc:\
3145 ny,rack:1&serverselectiontimeoutms=2000&readpreferencetags=dc:ny&\
3146 readpreferencetags=";
3147 let write_concern = WriteConcern::builder()
3148 .w(Acknowledgment::Majority)
3149 .journal(false)
3150 .w_timeout(Duration::from_millis(27))
3151 .build();
3152
3153 assert_eq!(
3154 ClientOptions::parse(uri).await.unwrap(),
3155 ClientOptions {
3156 hosts: vec![
3157 ServerAddress::Tcp {
3158 host: "localhost".to_string(),
3159 port: None,
3160 },
3161 ServerAddress::Tcp {
3162 host: "localhost".to_string(),
3163 port: Some(27018),
3164 },
3165 ],
3166 selection_criteria: Some(
3167 ReadPreference::SecondaryPreferred {
3168 options: Some(
3169 ReadPreferenceOptions::builder()
3170 .tag_sets(vec![
3171 tag_set! {
3172 "dc" => "ny",
3173 "rack" => "1"
3174 },
3175 tag_set! {
3176 "dc" => "ny"
3177 },
3178 tag_set! {},
3179 ])
3180 .build()
3181 )
3182 }
3183 .into()
3184 ),
3185 read_concern: Some(ReadConcernLevel::Majority.into()),
3186 write_concern: Some(write_concern),
3187 repl_set_name: Some("foo".to_string()),
3188 heartbeat_freq: Some(Duration::from_millis(1000)),
3189 local_threshold: Some(Duration::from_millis(4000)),
3190 server_selection_timeout: Some(Duration::from_millis(2000)),
3191 original_uri: Some(uri.into()),
3192 ..Default::default()
3193 }
3194 );
3195 }
3196}
3197
3198#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)]
3200#[builder(field_defaults(default, setter(into)))]
3201#[serde(rename_all = "camelCase")]
3202#[non_exhaustive]
3203#[export_tokens]
3204pub struct SessionOptions {
3205 pub default_transaction_options: Option<TransactionOptions>,
3213
3214 pub causal_consistency: Option<bool>,
3219
3220 pub snapshot: Option<bool>,
3223
3224 pub snapshot_time: Option<Timestamp>,
3227}
3228
3229impl SessionOptions {
3230 pub(crate) fn validate(&self) -> Result<()> {
3231 if let (Some(causal_consistency), Some(snapshot)) = (self.causal_consistency, self.snapshot)
3232 {
3233 if causal_consistency && snapshot {
3234 return Err(Error::invalid_argument(
3235 "snapshot and causal consistency are mutually exclusive",
3236 ));
3237 }
3238 }
3239 if self.snapshot_time.is_some() && self.snapshot != Some(true) {
3240 return Err(Error::invalid_argument(
3241 "cannot set `snapshot_time` without setting `snapshot` to true",
3242 ));
3243 }
3244 Ok(())
3245 }
3246}
3247
3248#[skip_serializing_none]
3250#[derive(Debug, Default, Serialize, Deserialize, TypedBuilder, Clone)]
3251#[builder(field_defaults(default, setter(into)))]
3252#[serde(rename_all = "camelCase")]
3253#[non_exhaustive]
3254#[export_tokens]
3255pub struct TransactionOptions {
3256 #[builder(default)]
3258 #[serde(skip_serializing)]
3259 pub read_concern: Option<ReadConcern>,
3260
3261 #[builder(default)]
3263 #[serde(skip_serializing_if = "write_concern_is_empty")]
3264 pub write_concern: Option<WriteConcern>,
3265
3266 #[builder(default)]
3268 #[serde(skip_serializing, rename = "readPreference")]
3269 pub selection_criteria: Option<SelectionCriteria>,
3270
3271 #[builder(default)]
3273 #[serde(
3274 serialize_with = "serde_util::serialize_duration_option_as_int_millis",
3275 deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis",
3276 rename(serialize = "maxTimeMS", deserialize = "maxCommitTimeMS"),
3277 default
3278 )]
3279 pub max_commit_time: Option<Duration>,
3280}
3281
3282#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
3284#[non_exhaustive]
3285pub enum ServerMonitoringMode {
3286 Stream,
3289 Poll,
3291 Auto,
3294}