1use crate::authenticator::Authenticator;
26use crate::capella_ca::CAPELLA_CERT;
27use crate::error;
28use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
29use std::net::SocketAddr;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33
34#[cfg(feature = "native-tls")]
35use tokio_native_tls::native_tls::Identity;
36
37#[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
38use {
39 couchbase_core::insecure_certverfier::InsecureCertVerifier,
40 tokio_rustls::rustls::crypto::aws_lc_rs::default_provider,
41 tokio_rustls::rustls::pki_types::pem::{PemObject, SectionKind},
42 tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer},
43 tokio_rustls::rustls::RootCertStore,
44 webpki_roots::TLS_SERVER_ROOTS,
45};
46
47#[derive(Clone)]
66#[non_exhaustive]
67pub struct ClusterOptions {
68 pub authenticator: Authenticator,
70 pub compression_mode: Option<CompressionMode>,
72 pub tls_options: Option<TlsOptions>,
74 pub tcp_keep_alive_time: Option<Duration>,
76 pub poller_options: PollerOptions,
78 pub http_options: HttpOptions,
80 pub kv_options: KvOptions,
82 pub dns_options: Option<DnsOptions>,
84 pub orphan_reporter_options: OrphanReporterOptions,
86 pub default_retry_strategy: Option<Arc<dyn crate::retry::RetryStrategy>>,
89}
90
91impl Debug for ClusterOptions {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("ClusterOptions")
94 .field("authenticator", &self.authenticator)
95 .field("compression_mode", &self.compression_mode)
96 .field("has_tls_options", &self.tls_options.is_some())
97 .field("tcp_keep_alive_time", &self.tcp_keep_alive_time)
98 .field("poller_options", &self.poller_options)
99 .field("http_options", &self.http_options)
100 .field("kv_options", &self.kv_options)
101 .field("orphan_reporter_options", &self.orphan_reporter_options)
102 .finish()
103 }
104}
105
106impl ClusterOptions {
107 pub fn new(authenticator: Authenticator) -> Self {
109 Self {
110 authenticator,
111 compression_mode: None,
112 tls_options: None,
113 tcp_keep_alive_time: None,
114 poller_options: PollerOptions::new(),
115 http_options: HttpOptions::new(),
116 kv_options: KvOptions::new(),
117 dns_options: None,
118 orphan_reporter_options: OrphanReporterOptions::new(),
119 default_retry_strategy: None,
120 }
121 }
122
123 pub fn compression_mode(mut self, compression_mode: CompressionMode) -> Self {
125 self.compression_mode = Some(compression_mode);
126 self
127 }
128
129 pub fn tls_options(mut self, tls_options: TlsOptions) -> Self {
131 self.tls_options = Some(tls_options);
132 self
133 }
134
135 pub fn tcp_keep_alive_time(mut self, val: Duration) -> Self {
137 self.tcp_keep_alive_time = Some(val);
138 self
139 }
140
141 pub fn poller_options(mut self, poller_options: PollerOptions) -> Self {
143 self.poller_options = poller_options;
144 self
145 }
146
147 pub fn http_options(mut self, http_options: HttpOptions) -> Self {
149 self.http_options = http_options;
150 self
151 }
152
153 pub fn kv_options(mut self, kv_options: KvOptions) -> Self {
155 self.kv_options = kv_options;
156 self
157 }
158
159 pub fn dns_options(mut self, dns_options: DnsOptions) -> Self {
161 self.dns_options = Some(dns_options);
162 self
163 }
164
165 pub fn orphan_reporter_options(
167 mut self,
168 orphan_reporter_options: OrphanReporterOptions,
169 ) -> Self {
170 self.orphan_reporter_options = orphan_reporter_options;
171 self
172 }
173
174 pub fn default_retry_strategy(
178 mut self,
179 retry_strategy: Arc<dyn crate::retry::RetryStrategy>,
180 ) -> Self {
181 self.default_retry_strategy = Some(retry_strategy);
182 self
183 }
184}
185
186#[derive(Clone, Debug, PartialEq)]
191#[non_exhaustive]
192pub enum CompressionMode {
193 Enabled {
198 min_size: usize,
200 min_ratio: f64,
203 },
204 Disabled,
206}
207
208impl From<CompressionMode> for couchbase_core::options::agent::CompressionMode {
209 fn from(mode: CompressionMode) -> Self {
210 match mode {
211 CompressionMode::Enabled {
212 min_size,
213 min_ratio,
214 } => couchbase_core::options::agent::CompressionMode::Enabled {
215 min_size,
216 min_ratio,
217 },
218 CompressionMode::Disabled => couchbase_core::options::agent::CompressionMode::Disabled,
219 }
220 }
221}
222
223impl Display for CompressionMode {
224 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
225 match self {
226 CompressionMode::Enabled {
227 min_size,
228 min_ratio,
229 } => {
230 write!(f, "{{ min_size: {min_size}, min_ratio: {min_ratio} }}")
231 }
232 CompressionMode::Disabled => write!(f, "disabled"),
233 }
234 }
235}
236
237#[derive(Default, Clone, Debug, PartialEq)]
242#[non_exhaustive]
243pub struct PollerOptions {
244 pub config_poll_interval: Option<Duration>,
246}
247
248impl PollerOptions {
249 pub fn new() -> Self {
251 Self::default()
252 }
253
254 pub fn config_poll_interval(mut self, interval: Duration) -> Self {
256 self.config_poll_interval = Some(interval);
257 self
258 }
259}
260
261impl From<PollerOptions> for couchbase_core::options::agent::ConfigPollerConfig {
262 fn from(opts: PollerOptions) -> Self {
263 let mut core_opts = couchbase_core::options::agent::ConfigPollerConfig::default();
264 if let Some(interval) = opts.config_poll_interval {
265 core_opts = core_opts.poll_interval(interval);
266 }
267
268 core_opts
269 }
270}
271
272impl Display for PollerOptions {
273 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
274 write!(
275 f,
276 "{{ config_poll_interval: {:?} }}",
277 self.config_poll_interval
278 )
279 }
280}
281
282#[derive(Default, Clone, Debug, PartialEq)]
284#[non_exhaustive]
285pub struct HttpOptions {
286 pub max_idle_connections_per_host: Option<usize>,
288 pub idle_connection_timeout: Option<Duration>,
290}
291
292impl HttpOptions {
293 pub fn new() -> Self {
295 Self::default()
296 }
297
298 pub fn max_idle_connections_per_host(mut self, max: usize) -> Self {
300 self.max_idle_connections_per_host = Some(max);
301 self
302 }
303
304 pub fn idle_connection_timeout(mut self, timeout: Duration) -> Self {
306 self.idle_connection_timeout = Some(timeout);
307 self
308 }
309}
310
311impl From<HttpOptions> for couchbase_core::options::agent::HttpConfig {
312 fn from(opts: HttpOptions) -> Self {
313 let mut core_opts = couchbase_core::options::agent::HttpConfig::default();
314 if let Some(max) = opts.max_idle_connections_per_host {
315 core_opts = core_opts.max_idle_connections_per_host(max);
316 }
317
318 if let Some(timeout) = opts.idle_connection_timeout {
319 core_opts = core_opts.idle_connection_timeout(timeout);
320 }
321
322 core_opts
323 }
324}
325
326impl Display for HttpOptions {
327 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
328 write!(
329 f,
330 "{{ max_idle_connections_per_host: {:?}, idle_connection_timeout: {:?} }}",
331 self.max_idle_connections_per_host, self.idle_connection_timeout
332 )
333 }
334}
335
336#[derive(Default, Clone, Debug, PartialEq)]
338#[non_exhaustive]
339pub struct KvOptions {
340 pub enable_mutation_tokens: Option<bool>,
343 pub enable_server_durations: Option<bool>,
345 pub num_connections: Option<usize>,
347 pub connect_timeout: Option<Duration>,
349 pub connect_throttle_timeout: Option<Duration>,
351}
352
353impl KvOptions {
354 pub fn new() -> Self {
356 Self::default()
357 }
358
359 pub fn enable_mutation_tokens(mut self, enable: bool) -> Self {
364 self.enable_mutation_tokens = Some(enable);
365 self
366 }
367
368 pub fn enable_server_durations(mut self, enable: bool) -> Self {
370 self.enable_server_durations = Some(enable);
371 self
372 }
373
374 pub fn num_connections(mut self, num: usize) -> Self {
376 self.num_connections = Some(num);
377 self
378 }
379
380 pub fn connect_timeout(mut self, timeout: Duration) -> Self {
382 self.connect_timeout = Some(timeout);
383 self
384 }
385
386 pub fn connect_throttle_timeout(mut self, timeout: Duration) -> Self {
388 self.connect_throttle_timeout = Some(timeout);
389 self
390 }
391}
392
393impl From<KvOptions> for couchbase_core::options::agent::KvConfig {
394 fn from(opts: KvOptions) -> Self {
395 let mut core_opts =
396 couchbase_core::options::agent::KvConfig::default().enable_error_map(true);
397 if let Some(enable) = opts.enable_mutation_tokens {
398 core_opts = core_opts.enable_mutation_tokens(enable);
399 }
400
401 if let Some(enable) = opts.enable_server_durations {
402 core_opts = core_opts.enable_server_durations(enable);
403 }
404
405 if let Some(num) = opts.num_connections {
406 core_opts = core_opts.num_connections(num);
407 }
408
409 if let Some(timeout) = opts.connect_timeout {
410 core_opts = core_opts.connect_timeout(timeout);
411 }
412
413 if let Some(timeout) = opts.connect_throttle_timeout {
414 core_opts = core_opts.connect_throttle_timeout(timeout);
415 }
416
417 core_opts
418 }
419}
420
421impl Display for KvOptions {
422 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
423 write!(
424 f,
425 "{{ enable_mutation_tokens: {:?}, enable_server_durations: {:?}, num_connections: {:?}, connect_timeout: {:?}, connect_throttle_timeout: {:?} }}",
426 self.enable_mutation_tokens,
427 self.enable_server_durations,
428 self.num_connections,
429 self.connect_timeout,
430 self.connect_throttle_timeout
431 )
432 }
433}
434
435#[derive(Clone, Default)]
446#[non_exhaustive]
447pub struct TlsOptions {
448 pub danger_accept_invalid_certs: Option<bool>,
454
455 #[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
458 pub ca_certificates: Option<Vec<CertificateDer<'static>>>,
459
460 #[cfg(feature = "native-tls")]
463 pub ca_certificates: Option<Vec<tokio_native_tls::native_tls::Certificate>>,
464
465 #[cfg(feature = "native-tls")]
471 pub danger_accept_invalid_hostnames: Option<bool>,
472}
473
474impl TlsOptions {
475 pub fn new() -> Self {
477 Self::default()
478 }
479
480 pub fn danger_accept_invalid_certs(mut self, danger: bool) -> Self {
486 self.danger_accept_invalid_certs = Some(danger);
487 self
488 }
489
490 #[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
492 pub fn add_ca_certificate(mut self, cert: CertificateDer<'static>) -> Self {
493 self.ca_certificates.get_or_insert_with(Vec::new).push(cert);
494 self
495 }
496
497 #[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
499 pub fn add_ca_certificates<T: IntoIterator<Item = CertificateDer<'static>>>(
500 mut self,
501 certs: T,
502 ) -> Self {
503 self.ca_certificates
504 .get_or_insert_with(Vec::new)
505 .extend(certs);
506 self
507 }
508
509 #[cfg(feature = "native-tls")]
511 pub fn add_ca_certificate(mut self, cert: tokio_native_tls::native_tls::Certificate) -> Self {
512 self.ca_certificates.get_or_insert_with(Vec::new).push(cert);
513 self
514 }
515
516 #[cfg(feature = "native-tls")]
518 pub fn add_ca_certificates<
519 T: IntoIterator<Item = tokio_native_tls::native_tls::Certificate>,
520 >(
521 mut self,
522 certs: T,
523 ) -> Self {
524 self.ca_certificates
525 .get_or_insert_with(Vec::new)
526 .extend(certs);
527 self
528 }
529
530 #[cfg(feature = "native-tls")]
537 pub fn danger_accept_invalid_hostnames(mut self, danger: bool) -> Self {
538 self.danger_accept_invalid_hostnames = Some(danger);
539 self
540 }
541
542 #[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
543 pub(crate) fn try_into_tls_config(
544 self,
545 auth: &Authenticator,
546 ) -> Result<Arc<tokio_rustls::rustls::ClientConfig>, error::Error> {
547 let store = match self.ca_certificates {
548 Some(certs) if certs.is_empty() => {
549 return Err(error::Error::invalid_argument(
550 "ca_certificates",
551 "ca_certificates list was provided but is empty",
552 ));
553 }
554 Some(certs) => {
555 let mut store = RootCertStore::empty();
556 for cert in certs {
557 store.add(cert).map_err(|e| {
558 error::Error::other_failure(format!("failed to add cert: {e}"))
559 })?;
560 }
561 store
562 }
563 None => {
564 let mut store = RootCertStore {
565 roots: TLS_SERVER_ROOTS.to_vec(),
566 };
567
568 debug!("Adding Capella root CA to trust store");
569 let certs =
570 CertificateDer::from_pem_slice(CAPELLA_CERT.as_bytes()).map_err(|e| {
571 error::Error::other_failure(format!("failed to parse capella cert: {e}"))
572 })?;
573
574 store.add(certs).map_err(|e| {
575 error::Error::other_failure(format!(
576 "failed to add capella cert to root store: {e}"
577 ))
578 })?;
579 store
580 }
581 };
582
583 let mut builder =
584 tokio_rustls::rustls::ClientConfig::builder_with_provider(Arc::new(default_provider()))
585 .with_safe_default_protocol_versions()
586 .map_err(|e| {
587 error::Error::other_failure(format!(
588 "failed to set safe default protocol versions: {e}"
589 ))
590 })?;
591
592 let builder = if let Some(true) = self.danger_accept_invalid_certs {
593 builder
594 .dangerous()
595 .with_custom_certificate_verifier(Arc::new(InsecureCertVerifier {}))
596 } else {
597 builder.with_root_certificates(store)
598 };
599
600 let config = match auth {
601 Authenticator::CertificateAuthenticator(a) => {
602 let clone = a.clone();
603 builder
604 .with_client_auth_cert(clone.cert_chain, clone.private_key)
605 .map_err(|e| {
606 error::Error::other_failure(format!(
607 "failed to setup client auth cert: {e}"
608 ))
609 })?
610 }
611 _ => builder.with_no_client_auth(),
612 };
613
614 Ok(Arc::new(config))
615 }
616
617 #[cfg(feature = "native-tls")]
618 pub(crate) fn try_into_tls_config(
619 self,
620 auth: &Authenticator,
621 ) -> Result<tokio_native_tls::native_tls::TlsConnector, error::Error> {
622 let mut builder = tokio_native_tls::native_tls::TlsConnector::builder();
623 if let Some(true) = self.danger_accept_invalid_certs {
624 builder.danger_accept_invalid_certs(true);
625 }
626 if let Some(true) = self.danger_accept_invalid_hostnames {
627 builder.danger_accept_invalid_hostnames(true);
628 }
629
630 match self.ca_certificates {
631 Some(certs) if certs.is_empty() => {
632 return Err(error::Error::invalid_argument(
633 "ca_certificates",
634 "ca_certificates list was provided but is empty",
635 ));
636 }
637 Some(certs) => {
638 for cert in certs {
639 builder.add_root_certificate(cert);
640 }
641 }
642 None => {
643 debug!("Adding Capella root CA to trust store");
644 let capella_ca =
645 tokio_native_tls::native_tls::Certificate::from_pem(CAPELLA_CERT.as_ref())
646 .map_err(|e| {
647 error::Error::other_failure(format!("failed to add capella cert: {e}"))
648 })?;
649 builder.add_root_certificate(capella_ca);
650 }
651 }
652
653 match auth {
654 Authenticator::CertificateAuthenticator(a) => {
655 builder.identity(a.identity.clone());
656 }
657 Authenticator::PasswordAuthenticator(_) => {}
658 Authenticator::JwtAuthenticator(_) => {}
659 };
660
661 builder
662 .build()
663 .map_err(|e| error::Error::other_failure(format!("failed to build client config: {e}")))
664 }
665}
666
667#[cfg(all(feature = "rustls-tls", not(feature = "native-tls")))]
668impl Display for TlsOptions {
669 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
670 write!(f, "rustls-tls")
671 }
672}
673
674#[cfg(feature = "native-tls")]
675impl Display for TlsOptions {
676 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
677 write!(f, "native-tls")
678 }
679}
680
681#[derive(Clone, Debug, PartialEq)]
683#[non_exhaustive]
684pub struct DnsOptions {
685 pub namespace: SocketAddr,
687 pub timeout: Option<Duration>,
689}
690
691impl DnsOptions {
692 pub fn new(namespace: SocketAddr) -> Self {
694 Self {
695 namespace,
696 timeout: None,
697 }
698 }
699
700 pub fn timeout(mut self, timeout: Duration) -> Self {
702 self.timeout = Some(timeout);
703 self
704 }
705}
706impl From<DnsOptions> for couchbase_connstr::DnsConfig {
707 fn from(opts: DnsOptions) -> Self {
708 couchbase_connstr::DnsConfig {
709 namespace: opts.namespace,
710 timeout: opts.timeout,
711 }
712 }
713}
714
715#[derive(Default, Clone, Debug, PartialEq)]
720#[non_exhaustive]
721pub struct OrphanReporterOptions {
722 pub enabled: Option<bool>,
724 pub reporter_interval: Option<Duration>,
726 pub sample_size: Option<usize>,
728}
729
730impl OrphanReporterOptions {
731 pub fn new() -> Self {
733 Self::default()
734 }
735
736 pub fn enabled(mut self, enabled: bool) -> Self {
738 self.enabled = Some(enabled);
739 self
740 }
741
742 pub fn reporter_interval(mut self, reporter_interval: Duration) -> Self {
744 self.reporter_interval = Some(reporter_interval);
745 self
746 }
747
748 pub fn sample_size(mut self, sample_size: usize) -> Self {
750 self.sample_size = Some(sample_size);
751 self
752 }
753}
754
755impl Display for OrphanReporterOptions {
756 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
757 write!(
758 f,
759 "{{ enabled: {:?}, reporter_interval: {:?}, sample_size: {:?} }}",
760 self.enabled, self.reporter_interval, self.sample_size
761 )
762 }
763}
764
765impl From<OrphanReporterOptions>
766 for couchbase_core::options::orphan_reporter::OrphanReporterConfig
767{
768 fn from(opts: OrphanReporterOptions) -> Self {
769 let mut core_opts =
770 couchbase_core::options::orphan_reporter::OrphanReporterConfig::default();
771
772 if let Some(reporter_interval) = opts.reporter_interval {
773 core_opts = core_opts.reporter_interval(reporter_interval);
774 }
775
776 if let Some(sample_size) = opts.sample_size {
777 core_opts = core_opts.sample_size(sample_size);
778 }
779
780 core_opts
781 }
782}
783
784impl Display for ClusterOptions {
785 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
786 write!(
787 f,
788 "{{ authenticator: {}, compression_mode: {:?}, tls_options: {}, tcp_keep_alive_time: {:?}, poller_options: {}, http_options: {}, kv_options: {}, orphan_reporter_options: {} }}",
789 self.authenticator,
790 self.compression_mode,
791 if let Some(tls) = &self.tls_options {tls.to_string()} else {"none".to_string()},
792 self.tcp_keep_alive_time,
793 self.poller_options,
794 self.http_options,
795 self.kv_options,
796 self.orphan_reporter_options
797 )
798 }
799}