1#[cfg(feature = "cluster-async")]
2use crate::aio::AsyncPushSender;
3#[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
4use crate::caching::{CacheConfig, CacheManager};
5use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
6#[cfg(feature = "cluster-async")]
7use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
8use crate::types::{ErrorKind, ProtocolVersion, RedisError, RedisResult};
9use crate::{cluster, cluster::TlsMode};
10use rand::Rng;
11#[cfg(feature = "cluster-async")]
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::connection::TlsConnParams;
16
17#[cfg(feature = "cluster-async")]
18use crate::cluster_async;
19
20#[cfg(feature = "tls-rustls")]
21use crate::tls::{retrieve_tls_certificates, TlsCertificates};
22
23#[derive(Default)]
27struct BuilderParams {
28 password: Option<String>,
29 username: Option<String>,
30 read_from_replicas: bool,
31 tls: Option<TlsMode>,
32 #[cfg(feature = "tls-rustls")]
33 certs: Option<TlsCertificates>,
34 #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
35 danger_accept_invalid_hostnames: bool,
36 retries_configuration: RetryParams,
37 connection_timeout: Option<Duration>,
38 response_timeout: Option<Duration>,
39 protocol: Option<ProtocolVersion>,
40 #[cfg(feature = "cluster-async")]
41 async_push_sender: Option<Arc<dyn AsyncPushSender>>,
42 #[cfg(feature = "cluster-async")]
43 pub(crate) tcp_settings: TcpSettings,
44 #[cfg(feature = "cluster-async")]
45 async_dns_resolver: Option<Arc<dyn AsyncDNSResolver>>,
46 #[cfg(feature = "cache-aio")]
47 cache_config: Option<CacheConfig>,
48}
49
50#[derive(Clone)]
51pub(crate) struct RetryParams {
52 pub(crate) number_of_retries: u32,
53 max_wait_time: u64,
54 min_wait_time: u64,
55 exponent_base: u64,
56 factor: u64,
57}
58
59impl Default for RetryParams {
60 fn default() -> Self {
61 const DEFAULT_RETRIES: u32 = 16;
62 const DEFAULT_MAX_RETRY_WAIT_TIME: u64 = 655360;
63 const DEFAULT_MIN_RETRY_WAIT_TIME: u64 = 1280;
64 const DEFAULT_EXPONENT_BASE: u64 = 2;
65 const DEFAULT_FACTOR: u64 = 10;
66 Self {
67 number_of_retries: DEFAULT_RETRIES,
68 max_wait_time: DEFAULT_MAX_RETRY_WAIT_TIME,
69 min_wait_time: DEFAULT_MIN_RETRY_WAIT_TIME,
70 exponent_base: DEFAULT_EXPONENT_BASE,
71 factor: DEFAULT_FACTOR,
72 }
73 }
74}
75
76impl RetryParams {
77 pub(crate) fn wait_time_for_retry(&self, retry: u32) -> Duration {
78 let base_wait = self.exponent_base.pow(retry) * self.factor;
79 let clamped_wait = base_wait
80 .min(self.max_wait_time)
81 .max(self.min_wait_time + 1);
82 let jittered_wait = rand::rng().random_range(self.min_wait_time..clamped_wait);
83 Duration::from_millis(jittered_wait)
84 }
85}
86
87#[derive(Default, Clone)]
89pub(crate) struct ClusterParams {
90 pub(crate) password: Option<String>,
91 pub(crate) username: Option<String>,
92 pub(crate) read_from_replicas: bool,
93 pub(crate) tls: Option<TlsMode>,
97 pub(crate) retry_params: RetryParams,
98 pub(crate) tls_params: Option<TlsConnParams>,
99 pub(crate) connection_timeout: Duration,
100 pub(crate) response_timeout: Option<Duration>,
101 pub(crate) protocol: Option<ProtocolVersion>,
102 #[cfg(feature = "cluster-async")]
103 pub(crate) async_push_sender: Option<Arc<dyn AsyncPushSender>>,
104 #[cfg(feature = "cluster-async")]
105 pub(crate) tcp_settings: TcpSettings,
106 #[cfg(feature = "cluster-async")]
107 pub(crate) async_dns_resolver: Option<Arc<dyn AsyncDNSResolver>>,
108 #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
109 pub(crate) cache_manager: Option<CacheManager>,
110}
111
112impl ClusterParams {
113 fn from(value: BuilderParams) -> RedisResult<Self> {
114 #[cfg(not(feature = "tls-rustls"))]
115 let tls_params: Option<TlsConnParams> = None;
116
117 #[cfg(feature = "tls-rustls")]
118 let tls_params = {
119 let retrieved_tls_params = value.certs.as_ref().map(retrieve_tls_certificates);
120
121 retrieved_tls_params.transpose()?
122 };
123
124 #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
125 let tls_params = if value.danger_accept_invalid_hostnames {
126 let mut tls_params = tls_params.unwrap_or(TlsConnParams {
127 #[cfg(feature = "tls-rustls")]
128 client_tls_params: None,
129 #[cfg(feature = "tls-rustls")]
130 root_cert_store: None,
131 danger_accept_invalid_hostnames: false,
132 });
133 tls_params.danger_accept_invalid_hostnames = true;
134 Some(tls_params)
135 } else {
136 tls_params
137 };
138
139 #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
140 let cache_manager = value
141 .cache_config
142 .as_ref()
143 .map(|cache_config| CacheManager::new(*cache_config));
144
145 Ok(Self {
146 password: value.password,
147 username: value.username,
148 read_from_replicas: value.read_from_replicas,
149 tls: value.tls,
150 retry_params: value.retries_configuration,
151 tls_params,
152 connection_timeout: value.connection_timeout.unwrap_or(Duration::from_secs(1)),
153 response_timeout: value.response_timeout,
154 protocol: value.protocol,
155 #[cfg(feature = "cluster-async")]
156 async_push_sender: value.async_push_sender,
157 #[cfg(feature = "cluster-async")]
158 tcp_settings: value.tcp_settings,
159 #[cfg(feature = "cluster-async")]
160 async_dns_resolver: value.async_dns_resolver,
161 #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
162 cache_manager,
163 })
164 }
165
166 fn with_config(mut self, config: cluster::ClusterConfig) -> Self {
167 if let Some(connection_timeout) = config.connection_timeout {
168 self.connection_timeout = connection_timeout;
169 }
170 self.response_timeout = config.response_timeout;
171
172 #[cfg(feature = "cluster-async")]
173 if let Some(async_push_sender) = config.async_push_sender {
174 self.async_push_sender = Some(async_push_sender);
175 }
176
177 #[cfg(feature = "cluster-async")]
178 if let Some(async_dns_resolver) = config.async_dns_resolver {
179 self.async_dns_resolver = Some(async_dns_resolver);
180 }
181
182 self
183 }
184}
185
186pub struct ClusterClientBuilder {
188 initial_nodes: RedisResult<Vec<ConnectionInfo>>,
189 builder_params: BuilderParams,
190}
191
192impl ClusterClientBuilder {
193 pub fn new<T: IntoConnectionInfo>(
197 initial_nodes: impl IntoIterator<Item = T>,
198 ) -> ClusterClientBuilder {
199 ClusterClientBuilder {
200 initial_nodes: initial_nodes
201 .into_iter()
202 .map(|x| x.into_connection_info())
203 .collect(),
204 builder_params: Default::default(),
205 }
206 }
207
208 pub fn build(self) -> RedisResult<ClusterClient> {
221 let initial_nodes = self.initial_nodes?;
222
223 let first_node = match initial_nodes.first() {
224 Some(node) => node,
225 None => {
226 return Err(RedisError::from((
227 ErrorKind::InvalidClientConfig,
228 "Initial nodes can't be empty.",
229 )))
230 }
231 };
232
233 let mut cluster_params = ClusterParams::from(self.builder_params)?;
234 let password = if cluster_params.password.is_none() {
235 cluster_params
236 .password
237 .clone_from(&first_node.redis.password);
238 &cluster_params.password
239 } else {
240 &None
241 };
242 let username = if cluster_params.username.is_none() {
243 cluster_params
244 .username
245 .clone_from(&first_node.redis.username);
246 &cluster_params.username
247 } else {
248 &None
249 };
250 let tls = if cluster_params.tls.is_none() {
251 cluster_params.tls = first_node.addr.tls_mode();
252 cluster_params.tls
253 } else {
254 None
255 };
256 let protocol = if cluster_params.protocol.is_none() {
257 cluster_params.protocol = Some(first_node.redis.protocol);
258 cluster_params.protocol
259 } else {
260 None
261 };
262
263 for node in &initial_nodes {
265 if let ConnectionAddr::Unix(_) = node.addr {
266 return Err(RedisError::from((ErrorKind::InvalidClientConfig,
267 "This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
268 }
269
270 if password.is_some() && node.redis.password != *password {
271 return Err(RedisError::from((
272 ErrorKind::InvalidClientConfig,
273 "Cannot use different password among initial nodes.",
274 )));
275 }
276
277 if username.is_some() && node.redis.username != *username {
278 return Err(RedisError::from((
279 ErrorKind::InvalidClientConfig,
280 "Cannot use different username among initial nodes.",
281 )));
282 }
283 if protocol.is_some() && Some(node.redis.protocol) != protocol {
284 return Err(RedisError::from((
285 ErrorKind::InvalidClientConfig,
286 "Cannot use different protocol among initial nodes.",
287 )));
288 }
289
290 if tls.is_some() && node.addr.tls_mode() != tls {
291 return Err(RedisError::from((
292 ErrorKind::InvalidClientConfig,
293 "Cannot use different TLS modes among initial nodes.",
294 )));
295 }
296 }
297
298 Ok(ClusterClient {
299 initial_nodes,
300 cluster_params,
301 })
302 }
303
304 pub fn password(mut self, password: String) -> ClusterClientBuilder {
306 self.builder_params.password = Some(password);
307 self
308 }
309
310 pub fn username(mut self, username: String) -> ClusterClientBuilder {
312 self.builder_params.username = Some(username);
313 self
314 }
315
316 pub fn retries(mut self, retries: u32) -> ClusterClientBuilder {
318 self.builder_params.retries_configuration.number_of_retries = retries;
319 self
320 }
321
322 pub fn max_retry_wait(mut self, max_wait: u64) -> ClusterClientBuilder {
324 self.builder_params.retries_configuration.max_wait_time = max_wait;
325 self
326 }
327
328 pub fn min_retry_wait(mut self, min_wait: u64) -> ClusterClientBuilder {
330 self.builder_params.retries_configuration.min_wait_time = min_wait;
331 self
332 }
333
334 pub fn retry_wait_formula(mut self, factor: u64, exponent_base: u64) -> ClusterClientBuilder {
337 self.builder_params.retries_configuration.factor = factor;
338 self.builder_params.retries_configuration.exponent_base = exponent_base;
339 self
340 }
341
342 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
346 pub fn tls(mut self, tls: TlsMode) -> ClusterClientBuilder {
347 self.builder_params.tls = Some(tls);
348 self
349 }
350
351 #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
365 pub fn danger_accept_invalid_hostnames(mut self, insecure: bool) -> ClusterClientBuilder {
366 self.builder_params.danger_accept_invalid_hostnames = insecure;
367 self
368 }
369
370 #[cfg(feature = "tls-rustls")]
387 pub fn certs(mut self, certificates: TlsCertificates) -> ClusterClientBuilder {
388 if self.builder_params.tls.is_none() {
389 self.builder_params.tls = Some(TlsMode::Secure);
390 }
391
392 self.builder_params.certs = Some(certificates);
393 self
394 }
395
396 pub fn read_from_replicas(mut self) -> ClusterClientBuilder {
401 self.builder_params.read_from_replicas = true;
402 self
403 }
404
405 pub fn connection_timeout(mut self, connection_timeout: Duration) -> ClusterClientBuilder {
409 self.builder_params.connection_timeout = Some(connection_timeout);
410 self
411 }
412
413 pub fn response_timeout(mut self, response_timeout: Duration) -> ClusterClientBuilder {
417 self.builder_params.response_timeout = Some(response_timeout);
418 self
419 }
420
421 pub fn use_protocol(mut self, protocol: ProtocolVersion) -> ClusterClientBuilder {
423 self.builder_params.protocol = Some(protocol);
424 self
425 }
426
427 #[deprecated(since = "0.22.0", note = "Use build()")]
429 pub fn open(self) -> RedisResult<ClusterClient> {
430 self.build()
431 }
432
433 #[deprecated(since = "0.22.0", note = "Use read_from_replicas()")]
435 pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder {
436 self.builder_params.read_from_replicas = read_from_replicas;
437 self
438 }
439
440 #[cfg(feature = "cluster-async")]
441 pub fn push_sender(mut self, push_sender: impl AsyncPushSender) -> ClusterClientBuilder {
471 self.builder_params.async_push_sender = Some(Arc::new(push_sender));
472 self
473 }
474
475 #[cfg(feature = "cluster-async")]
477 pub fn tcp_settings(mut self, tcp_settings: TcpSettings) -> ClusterClientBuilder {
478 self.builder_params.tcp_settings = tcp_settings;
479 self
480 }
481
482 #[cfg(feature = "cluster-async")]
486 pub fn async_dns_resolver(mut self, resolver: impl AsyncDNSResolver) -> ClusterClientBuilder {
487 self.builder_params.async_dns_resolver = Some(Arc::new(resolver));
488 self
489 }
490
491 #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
493 pub fn cache_config(mut self, cache_config: CacheConfig) -> Self {
494 self.builder_params.cache_config = Some(cache_config);
495 self
496 }
497}
498
499#[derive(Clone)]
501pub struct ClusterClient {
502 initial_nodes: Vec<ConnectionInfo>,
503 cluster_params: ClusterParams,
504}
505
506impl ClusterClient {
507 pub fn new<T: IntoConnectionInfo>(
517 initial_nodes: impl IntoIterator<Item = T>,
518 ) -> RedisResult<ClusterClient> {
519 Self::builder(initial_nodes).build()
520 }
521
522 pub fn builder<T: IntoConnectionInfo>(
524 initial_nodes: impl IntoIterator<Item = T>,
525 ) -> ClusterClientBuilder {
526 ClusterClientBuilder::new(initial_nodes)
527 }
528
529 pub fn get_connection(&self) -> RedisResult<cluster::ClusterConnection> {
536 cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
537 }
538
539 pub fn get_connection_with_config(
546 &self,
547 config: cluster::ClusterConfig,
548 ) -> RedisResult<cluster::ClusterConnection> {
549 cluster::ClusterConnection::new(
550 self.cluster_params.clone().with_config(config),
551 self.initial_nodes.clone(),
552 )
553 }
554
555 #[cfg(feature = "cluster-async")]
562 pub async fn get_async_connection(&self) -> RedisResult<cluster_async::ClusterConnection> {
563 cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
564 .await
565 }
566
567 #[cfg(feature = "cluster-async")]
574 pub async fn get_async_connection_with_config(
575 &self,
576 config: cluster::ClusterConfig,
577 ) -> RedisResult<cluster_async::ClusterConnection> {
578 cluster_async::ClusterConnection::new(
579 &self.initial_nodes,
580 self.cluster_params.clone().with_config(config),
581 )
582 .await
583 }
584
585 #[doc(hidden)]
586 pub fn get_generic_connection<C>(&self) -> RedisResult<cluster::ClusterConnection<C>>
587 where
588 C: crate::ConnectionLike + crate::cluster::Connect + Send,
589 {
590 cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
591 }
592
593 #[doc(hidden)]
594 #[cfg(feature = "cluster-async")]
595 pub async fn get_async_generic_connection<C>(
596 &self,
597 ) -> RedisResult<cluster_async::ClusterConnection<C>>
598 where
599 C: crate::aio::ConnectionLike
600 + cluster_async::Connect
601 + Clone
602 + Send
603 + Sync
604 + Unpin
605 + 'static,
606 {
607 cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
608 .await
609 }
610
611 #[deprecated(since = "0.22.0", note = "Use new()")]
613 pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<ClusterClient> {
614 Self::new(initial_nodes)
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::{ClusterClient, ClusterClientBuilder, ConnectionInfo, IntoConnectionInfo};
621
622 fn get_connection_data() -> Vec<ConnectionInfo> {
623 vec![
624 "redis://127.0.0.1:6379".into_connection_info().unwrap(),
625 "redis://127.0.0.1:6378".into_connection_info().unwrap(),
626 "redis://127.0.0.1:6377".into_connection_info().unwrap(),
627 ]
628 }
629
630 fn get_connection_data_with_password() -> Vec<ConnectionInfo> {
631 vec![
632 "redis://:password@127.0.0.1:6379"
633 .into_connection_info()
634 .unwrap(),
635 "redis://:password@127.0.0.1:6378"
636 .into_connection_info()
637 .unwrap(),
638 "redis://:password@127.0.0.1:6377"
639 .into_connection_info()
640 .unwrap(),
641 ]
642 }
643
644 fn get_connection_data_with_username_and_password() -> Vec<ConnectionInfo> {
645 vec![
646 "redis://user1:password@127.0.0.1:6379"
647 .into_connection_info()
648 .unwrap(),
649 "redis://user1:password@127.0.0.1:6378"
650 .into_connection_info()
651 .unwrap(),
652 "redis://user1:password@127.0.0.1:6377"
653 .into_connection_info()
654 .unwrap(),
655 ]
656 }
657
658 #[test]
659 fn give_no_password() {
660 let client = ClusterClient::new(get_connection_data()).unwrap();
661 assert_eq!(client.cluster_params.password, None);
662 }
663
664 #[test]
665 fn give_password_by_initial_nodes() {
666 let client = ClusterClient::new(get_connection_data_with_password()).unwrap();
667 assert_eq!(client.cluster_params.password, Some("password".to_string()));
668 }
669
670 #[test]
671 fn give_username_and_password_by_initial_nodes() {
672 let client = ClusterClient::new(get_connection_data_with_username_and_password()).unwrap();
673 assert_eq!(client.cluster_params.password, Some("password".to_string()));
674 assert_eq!(client.cluster_params.username, Some("user1".to_string()));
675 }
676
677 #[test]
678 fn fail_if_received_different_password_between_initial_nodes() {
679 let result = ClusterClient::new(vec![
680 "redis://:password1@127.0.0.1:6379",
681 "redis://:password2@127.0.0.1:6378",
682 "redis://:password3@127.0.0.1:6377",
683 ]);
684 assert!(result.is_err());
685 }
686
687 #[test]
688 fn fail_if_received_different_username_between_initial_nodes() {
689 let result = ClusterClient::new(vec![
690 "redis://user1:password@127.0.0.1:6379",
691 "redis://user2:password@127.0.0.1:6378",
692 "redis://user1:password@127.0.0.1:6377",
693 ]);
694 assert!(result.is_err());
695 }
696
697 #[test]
698 fn fail_if_received_different_protocol_between_initial_nodes() {
699 let result = ClusterClient::new(vec![
700 "redis://127.0.0.1:6379/?protocol=3",
701 "redis://127.0.0.1:6378",
702 "redis://127.0.0.1:6377",
703 ]);
704 assert!(result.is_err());
705 }
706
707 #[test]
708 fn fail_if_received_different_tls_between_initial_nodes() {
709 let result = ClusterClient::new(vec![
710 "rediss://127.0.0.1:6379/",
711 "redis://127.0.0.1:6378",
712 "redis://127.0.0.1:6377",
713 ]);
714 assert!(result.is_err());
715 }
716
717 #[test]
718 fn give_username_password_by_method() {
719 let client = ClusterClientBuilder::new(get_connection_data_with_username_and_password())
720 .password("password".to_string())
721 .username("user1".to_string())
722 .build()
723 .unwrap();
724 assert_eq!(client.cluster_params.password, Some("password".to_string()));
725 assert_eq!(client.cluster_params.username, Some("user1".to_string()));
726 }
727
728 #[test]
729 fn give_empty_initial_nodes() {
730 let client = ClusterClient::new(Vec::<String>::new());
731 assert!(client.is_err())
732 }
733}