redis/
cluster_client.rs

1#[cfg(feature = "cluster-async")]
2use crate::aio::AsyncPushSender;
3#[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
4use crate::caching::{CacheConfig, CacheManager};
5use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
6#[cfg(feature = "cluster-async")]
7use crate::io::{tcp::TcpSettings, AsyncDNSResolver};
8use crate::types::{ErrorKind, ProtocolVersion, RedisError, RedisResult};
9use crate::{cluster, cluster::TlsMode};
10use rand::Rng;
11#[cfg(feature = "cluster-async")]
12use std::sync::Arc;
13use std::time::Duration;
14
15use crate::connection::TlsConnParams;
16
17#[cfg(feature = "cluster-async")]
18use crate::cluster_async;
19
20#[cfg(feature = "tls-rustls")]
21use crate::tls::{retrieve_tls_certificates, TlsCertificates};
22
23/// Parameters specific to builder, so that
24/// builder parameters may have different types
25/// than final ClusterParams
26#[derive(Default)]
27struct BuilderParams {
28    password: Option<String>,
29    username: Option<String>,
30    read_from_replicas: bool,
31    tls: Option<TlsMode>,
32    #[cfg(feature = "tls-rustls")]
33    certs: Option<TlsCertificates>,
34    #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
35    danger_accept_invalid_hostnames: bool,
36    retries_configuration: RetryParams,
37    connection_timeout: Option<Duration>,
38    response_timeout: Option<Duration>,
39    protocol: Option<ProtocolVersion>,
40    #[cfg(feature = "cluster-async")]
41    async_push_sender: Option<Arc<dyn AsyncPushSender>>,
42    #[cfg(feature = "cluster-async")]
43    pub(crate) tcp_settings: TcpSettings,
44    #[cfg(feature = "cluster-async")]
45    async_dns_resolver: Option<Arc<dyn AsyncDNSResolver>>,
46    #[cfg(feature = "cache-aio")]
47    cache_config: Option<CacheConfig>,
48}
49
50#[derive(Clone)]
51pub(crate) struct RetryParams {
52    pub(crate) number_of_retries: u32,
53    max_wait_time: u64,
54    min_wait_time: u64,
55    exponent_base: u64,
56    factor: u64,
57}
58
59impl Default for RetryParams {
60    fn default() -> Self {
61        const DEFAULT_RETRIES: u32 = 16;
62        const DEFAULT_MAX_RETRY_WAIT_TIME: u64 = 655360;
63        const DEFAULT_MIN_RETRY_WAIT_TIME: u64 = 1280;
64        const DEFAULT_EXPONENT_BASE: u64 = 2;
65        const DEFAULT_FACTOR: u64 = 10;
66        Self {
67            number_of_retries: DEFAULT_RETRIES,
68            max_wait_time: DEFAULT_MAX_RETRY_WAIT_TIME,
69            min_wait_time: DEFAULT_MIN_RETRY_WAIT_TIME,
70            exponent_base: DEFAULT_EXPONENT_BASE,
71            factor: DEFAULT_FACTOR,
72        }
73    }
74}
75
76impl RetryParams {
77    pub(crate) fn wait_time_for_retry(&self, retry: u32) -> Duration {
78        let base_wait = self.exponent_base.pow(retry) * self.factor;
79        let clamped_wait = base_wait
80            .min(self.max_wait_time)
81            .max(self.min_wait_time + 1);
82        let jittered_wait = rand::rng().random_range(self.min_wait_time..clamped_wait);
83        Duration::from_millis(jittered_wait)
84    }
85}
86
87/// Redis cluster specific parameters.
88#[derive(Default, Clone)]
89pub(crate) struct ClusterParams {
90    pub(crate) password: Option<String>,
91    pub(crate) username: Option<String>,
92    pub(crate) read_from_replicas: bool,
93    /// tls indicates tls behavior of connections.
94    /// When Some(TlsMode), connections use tls and verify certification depends on TlsMode.
95    /// When None, connections do not use tls.
96    pub(crate) tls: Option<TlsMode>,
97    pub(crate) retry_params: RetryParams,
98    pub(crate) tls_params: Option<TlsConnParams>,
99    pub(crate) connection_timeout: Duration,
100    pub(crate) response_timeout: Option<Duration>,
101    pub(crate) protocol: Option<ProtocolVersion>,
102    #[cfg(feature = "cluster-async")]
103    pub(crate) async_push_sender: Option<Arc<dyn AsyncPushSender>>,
104    #[cfg(feature = "cluster-async")]
105    pub(crate) tcp_settings: TcpSettings,
106    #[cfg(feature = "cluster-async")]
107    pub(crate) async_dns_resolver: Option<Arc<dyn AsyncDNSResolver>>,
108    #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
109    pub(crate) cache_manager: Option<CacheManager>,
110}
111
112impl ClusterParams {
113    fn from(value: BuilderParams) -> RedisResult<Self> {
114        #[cfg(not(feature = "tls-rustls"))]
115        let tls_params: Option<TlsConnParams> = None;
116
117        #[cfg(feature = "tls-rustls")]
118        let tls_params = {
119            let retrieved_tls_params = value.certs.as_ref().map(retrieve_tls_certificates);
120
121            retrieved_tls_params.transpose()?
122        };
123
124        #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
125        let tls_params = if value.danger_accept_invalid_hostnames {
126            let mut tls_params = tls_params.unwrap_or(TlsConnParams {
127                #[cfg(feature = "tls-rustls")]
128                client_tls_params: None,
129                #[cfg(feature = "tls-rustls")]
130                root_cert_store: None,
131                danger_accept_invalid_hostnames: false,
132            });
133            tls_params.danger_accept_invalid_hostnames = true;
134            Some(tls_params)
135        } else {
136            tls_params
137        };
138
139        #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
140        let cache_manager = value
141            .cache_config
142            .as_ref()
143            .map(|cache_config| CacheManager::new(*cache_config));
144
145        Ok(Self {
146            password: value.password,
147            username: value.username,
148            read_from_replicas: value.read_from_replicas,
149            tls: value.tls,
150            retry_params: value.retries_configuration,
151            tls_params,
152            connection_timeout: value.connection_timeout.unwrap_or(Duration::from_secs(1)),
153            response_timeout: value.response_timeout,
154            protocol: value.protocol,
155            #[cfg(feature = "cluster-async")]
156            async_push_sender: value.async_push_sender,
157            #[cfg(feature = "cluster-async")]
158            tcp_settings: value.tcp_settings,
159            #[cfg(feature = "cluster-async")]
160            async_dns_resolver: value.async_dns_resolver,
161            #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
162            cache_manager,
163        })
164    }
165
166    fn with_config(mut self, config: cluster::ClusterConfig) -> Self {
167        if let Some(connection_timeout) = config.connection_timeout {
168            self.connection_timeout = connection_timeout;
169        }
170        self.response_timeout = config.response_timeout;
171
172        #[cfg(feature = "cluster-async")]
173        if let Some(async_push_sender) = config.async_push_sender {
174            self.async_push_sender = Some(async_push_sender);
175        }
176
177        #[cfg(feature = "cluster-async")]
178        if let Some(async_dns_resolver) = config.async_dns_resolver {
179            self.async_dns_resolver = Some(async_dns_resolver);
180        }
181
182        self
183    }
184}
185
186/// Used to configure and build a [`ClusterClient`].
187pub struct ClusterClientBuilder {
188    initial_nodes: RedisResult<Vec<ConnectionInfo>>,
189    builder_params: BuilderParams,
190}
191
192impl ClusterClientBuilder {
193    /// Creates a new `ClusterClientBuilder` with the provided initial_nodes.
194    ///
195    /// This is the same as `ClusterClient::builder(initial_nodes)`.
196    pub fn new<T: IntoConnectionInfo>(
197        initial_nodes: impl IntoIterator<Item = T>,
198    ) -> ClusterClientBuilder {
199        ClusterClientBuilder {
200            initial_nodes: initial_nodes
201                .into_iter()
202                .map(|x| x.into_connection_info())
203                .collect(),
204            builder_params: Default::default(),
205        }
206    }
207
208    /// Creates a new [`ClusterClient`] from the parameters.
209    ///
210    /// This does not create connections to the Redis Cluster, but only performs some basic checks
211    /// on the initial nodes' URLs and passwords/usernames.
212    ///
213    /// When the `tls-rustls` feature is enabled and TLS credentials are provided, they are set for
214    /// each cluster connection.
215    ///
216    /// # Errors
217    ///
218    /// Upon failure to parse initial nodes or if the initial nodes have different passwords or
219    /// usernames, an error is returned.
220    pub fn build(self) -> RedisResult<ClusterClient> {
221        let initial_nodes = self.initial_nodes?;
222
223        let first_node = match initial_nodes.first() {
224            Some(node) => node,
225            None => {
226                return Err(RedisError::from((
227                    ErrorKind::InvalidClientConfig,
228                    "Initial nodes can't be empty.",
229                )))
230            }
231        };
232
233        let mut cluster_params = ClusterParams::from(self.builder_params)?;
234        let password = if cluster_params.password.is_none() {
235            cluster_params
236                .password
237                .clone_from(&first_node.redis.password);
238            &cluster_params.password
239        } else {
240            &None
241        };
242        let username = if cluster_params.username.is_none() {
243            cluster_params
244                .username
245                .clone_from(&first_node.redis.username);
246            &cluster_params.username
247        } else {
248            &None
249        };
250        let tls = if cluster_params.tls.is_none() {
251            cluster_params.tls = first_node.addr.tls_mode();
252            cluster_params.tls
253        } else {
254            None
255        };
256        let protocol = if cluster_params.protocol.is_none() {
257            cluster_params.protocol = Some(first_node.redis.protocol);
258            cluster_params.protocol
259        } else {
260            None
261        };
262
263        // Verify that the initial nodes match the cluster client's configuration.
264        for node in &initial_nodes {
265            if let ConnectionAddr::Unix(_) = node.addr {
266                return Err(RedisError::from((ErrorKind::InvalidClientConfig,
267                                             "This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
268            }
269
270            if password.is_some() && node.redis.password != *password {
271                return Err(RedisError::from((
272                    ErrorKind::InvalidClientConfig,
273                    "Cannot use different password among initial nodes.",
274                )));
275            }
276
277            if username.is_some() && node.redis.username != *username {
278                return Err(RedisError::from((
279                    ErrorKind::InvalidClientConfig,
280                    "Cannot use different username among initial nodes.",
281                )));
282            }
283            if protocol.is_some() && Some(node.redis.protocol) != protocol {
284                return Err(RedisError::from((
285                    ErrorKind::InvalidClientConfig,
286                    "Cannot use different protocol among initial nodes.",
287                )));
288            }
289
290            if tls.is_some() && node.addr.tls_mode() != tls {
291                return Err(RedisError::from((
292                    ErrorKind::InvalidClientConfig,
293                    "Cannot use different TLS modes among initial nodes.",
294                )));
295            }
296        }
297
298        Ok(ClusterClient {
299            initial_nodes,
300            cluster_params,
301        })
302    }
303
304    /// Sets password for the new ClusterClient.
305    pub fn password(mut self, password: String) -> ClusterClientBuilder {
306        self.builder_params.password = Some(password);
307        self
308    }
309
310    /// Sets username for the new ClusterClient.
311    pub fn username(mut self, username: String) -> ClusterClientBuilder {
312        self.builder_params.username = Some(username);
313        self
314    }
315
316    /// Sets number of retries for the new ClusterClient.
317    pub fn retries(mut self, retries: u32) -> ClusterClientBuilder {
318        self.builder_params.retries_configuration.number_of_retries = retries;
319        self
320    }
321
322    /// Sets maximal wait time in millisceonds between retries for the new ClusterClient.
323    pub fn max_retry_wait(mut self, max_wait: u64) -> ClusterClientBuilder {
324        self.builder_params.retries_configuration.max_wait_time = max_wait;
325        self
326    }
327
328    /// Sets minimal wait time in millisceonds between retries for the new ClusterClient.
329    pub fn min_retry_wait(mut self, min_wait: u64) -> ClusterClientBuilder {
330        self.builder_params.retries_configuration.min_wait_time = min_wait;
331        self
332    }
333
334    /// Sets the factor and exponent base for the retry wait time.
335    /// The formula for the wait is rand(min_wait_retry .. min(max_retry_wait , factor * exponent_base ^ retry))ms.
336    pub fn retry_wait_formula(mut self, factor: u64, exponent_base: u64) -> ClusterClientBuilder {
337        self.builder_params.retries_configuration.factor = factor;
338        self.builder_params.retries_configuration.exponent_base = exponent_base;
339        self
340    }
341
342    /// Sets TLS mode for the new ClusterClient.
343    ///
344    /// It is extracted from the first node of initial_nodes if not set.
345    #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
346    pub fn tls(mut self, tls: TlsMode) -> ClusterClientBuilder {
347        self.builder_params.tls = Some(tls);
348        self
349    }
350
351    /// Configure hostname verification when connecting with TLS.
352    ///
353    /// If `insecure` is true, this **disables** hostname verification, while
354    /// leaving other aspects of certificate checking enabled. This mode is
355    /// similar to what `redis-cli` does: TLS connections do check certificates,
356    /// but hostname errors are ignored.
357    ///
358    /// # Warning
359    ///
360    /// You should think very carefully before you use this method. If hostname
361    /// verification is not used, any valid certificate for any site will be
362    /// trusted for use from any other. This introduces a significant
363    /// vulnerability to man-in-the-middle attacks.
364    #[cfg(any(feature = "tls-rustls-insecure", feature = "tls-native-tls"))]
365    pub fn danger_accept_invalid_hostnames(mut self, insecure: bool) -> ClusterClientBuilder {
366        self.builder_params.danger_accept_invalid_hostnames = insecure;
367        self
368    }
369
370    /// Sets raw TLS certificates for the new ClusterClient.
371    ///
372    /// When set, enforces the connection must be TLS secured.
373    ///
374    /// All certificates must be provided as byte streams loaded from PEM files their consistency is
375    /// checked during `build()` call.
376    ///
377    /// - `certificates` - `TlsCertificates` structure containing:
378    ///     - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
379    ///         - `client_cert` - client's byte stream containing client certificate in PEM format
380    ///         - `client_key` - client's byte stream containing private key in PEM format
381    ///
382    ///     - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
383    ///
384    /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
385    /// If `root_cert` is not provided, then system root certificates are used instead.
386    #[cfg(feature = "tls-rustls")]
387    pub fn certs(mut self, certificates: TlsCertificates) -> ClusterClientBuilder {
388        if self.builder_params.tls.is_none() {
389            self.builder_params.tls = Some(TlsMode::Secure);
390        }
391
392        self.builder_params.certs = Some(certificates);
393        self
394    }
395
396    /// Enables reading from replicas for all new connections (default is disabled).
397    ///
398    /// If enabled, then read queries will go to the replica nodes & write queries will go to the
399    /// primary nodes. If there are no replica nodes, then all queries will go to the primary nodes.
400    pub fn read_from_replicas(mut self) -> ClusterClientBuilder {
401        self.builder_params.read_from_replicas = true;
402        self
403    }
404
405    /// Enables timing out on slow connection time.
406    ///
407    /// If enabled, the cluster will only wait the given time on each connection attempt to each node.
408    pub fn connection_timeout(mut self, connection_timeout: Duration) -> ClusterClientBuilder {
409        self.builder_params.connection_timeout = Some(connection_timeout);
410        self
411    }
412
413    /// Enables timing out on slow responses.
414    ///
415    /// If enabled, the cluster will only wait the given time to each response from each node.
416    pub fn response_timeout(mut self, response_timeout: Duration) -> ClusterClientBuilder {
417        self.builder_params.response_timeout = Some(response_timeout);
418        self
419    }
420
421    /// Sets the protocol with which the client should communicate with the server.
422    pub fn use_protocol(mut self, protocol: ProtocolVersion) -> ClusterClientBuilder {
423        self.builder_params.protocol = Some(protocol);
424        self
425    }
426
427    /// Use `build()`.
428    #[deprecated(since = "0.22.0", note = "Use build()")]
429    pub fn open(self) -> RedisResult<ClusterClient> {
430        self.build()
431    }
432
433    /// Use `read_from_replicas()`.
434    #[deprecated(since = "0.22.0", note = "Use read_from_replicas()")]
435    pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder {
436        self.builder_params.read_from_replicas = read_from_replicas;
437        self
438    }
439
440    #[cfg(feature = "cluster-async")]
441    /// Sets sender sender for push values.
442    ///
443    /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
444    /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::protocol] field.
445    ///
446    /// # Examples
447    ///
448    /// ```rust
449    /// # use redis::cluster::ClusterClientBuilder;
450    /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
451    /// let config = ClusterClientBuilder::new(vec!["redis://127.0.0.1:6379/"])
452    ///     .use_protocol(redis::ProtocolVersion::RESP3)
453    ///     .push_sender(tx);
454    /// ```
455    ///
456    /// ```rust
457    /// # use std::sync::{Mutex, Arc};
458    /// # use redis::cluster::ClusterClientBuilder;
459    /// let messages = Arc::new(Mutex::new(Vec::new()));
460    /// let config = ClusterClientBuilder::new(vec!["redis://127.0.0.1:6379/"])
461    ///     .use_protocol(redis::ProtocolVersion::RESP3)
462    ///     .push_sender(move |msg|{
463    ///         let Ok(mut messages) = messages.lock() else {
464    ///             return Err(redis::aio::SendError);
465    ///         };
466    ///         messages.push(msg);
467    ///         Ok(())
468    ///     });
469    /// ```
470    pub fn push_sender(mut self, push_sender: impl AsyncPushSender) -> ClusterClientBuilder {
471        self.builder_params.async_push_sender = Some(Arc::new(push_sender));
472        self
473    }
474
475    /// Set the behavior of the underlying TCP connection.
476    #[cfg(feature = "cluster-async")]
477    pub fn tcp_settings(mut self, tcp_settings: TcpSettings) -> ClusterClientBuilder {
478        self.builder_params.tcp_settings = tcp_settings;
479        self
480    }
481
482    /// Set asynchronous DNS resolver for the underlying TCP connection.
483    ///
484    /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
485    #[cfg(feature = "cluster-async")]
486    pub fn async_dns_resolver(mut self, resolver: impl AsyncDNSResolver) -> ClusterClientBuilder {
487        self.builder_params.async_dns_resolver = Some(Arc::new(resolver));
488        self
489    }
490
491    /// Sets cache config for [`crate::cluster_async::ClusterConnection`], check CacheConfig for more details.
492    #[cfg(all(feature = "cache-aio", feature = "cluster-async"))]
493    pub fn cache_config(mut self, cache_config: CacheConfig) -> Self {
494        self.builder_params.cache_config = Some(cache_config);
495        self
496    }
497}
498
499/// A Redis Cluster client, used to create connections.
500#[derive(Clone)]
501pub struct ClusterClient {
502    initial_nodes: Vec<ConnectionInfo>,
503    cluster_params: ClusterParams,
504}
505
506impl ClusterClient {
507    /// Creates a `ClusterClient` with the default parameters.
508    ///
509    /// This does not create connections to the Redis Cluster, but only performs some basic checks
510    /// on the initial nodes' URLs and passwords/usernames.
511    ///
512    /// # Errors
513    ///
514    /// Upon failure to parse initial nodes or if the initial nodes have different passwords or
515    /// usernames, an error is returned.
516    pub fn new<T: IntoConnectionInfo>(
517        initial_nodes: impl IntoIterator<Item = T>,
518    ) -> RedisResult<ClusterClient> {
519        Self::builder(initial_nodes).build()
520    }
521
522    /// Creates a [`ClusterClientBuilder`] with the provided initial_nodes.
523    pub fn builder<T: IntoConnectionInfo>(
524        initial_nodes: impl IntoIterator<Item = T>,
525    ) -> ClusterClientBuilder {
526        ClusterClientBuilder::new(initial_nodes)
527    }
528
529    /// Creates new connections to Redis Cluster nodes and returns a
530    /// [`cluster::ClusterConnection`].
531    ///
532    /// # Errors
533    ///
534    /// An error is returned if there is a failure while creating connections or slots.
535    pub fn get_connection(&self) -> RedisResult<cluster::ClusterConnection> {
536        cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
537    }
538
539    /// Creates new connections to Redis Cluster nodes with a custom config and returns a
540    /// [`cluster_async::ClusterConnection`].
541    ///
542    /// # Errors
543    ///
544    /// An error is returned if there is a failure while creating connections or slots.
545    pub fn get_connection_with_config(
546        &self,
547        config: cluster::ClusterConfig,
548    ) -> RedisResult<cluster::ClusterConnection> {
549        cluster::ClusterConnection::new(
550            self.cluster_params.clone().with_config(config),
551            self.initial_nodes.clone(),
552        )
553    }
554
555    /// Creates new connections to Redis Cluster nodes and returns a
556    /// [`cluster_async::ClusterConnection`].
557    ///
558    /// # Errors
559    ///
560    /// An error is returned if there is a failure while creating connections or slots.
561    #[cfg(feature = "cluster-async")]
562    pub async fn get_async_connection(&self) -> RedisResult<cluster_async::ClusterConnection> {
563        cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
564            .await
565    }
566
567    /// Creates new connections to Redis Cluster nodes with a custom config and returns a
568    /// [`cluster_async::ClusterConnection`].
569    ///
570    /// # Errors
571    ///
572    /// An error is returned if there is a failure while creating connections or slots.
573    #[cfg(feature = "cluster-async")]
574    pub async fn get_async_connection_with_config(
575        &self,
576        config: cluster::ClusterConfig,
577    ) -> RedisResult<cluster_async::ClusterConnection> {
578        cluster_async::ClusterConnection::new(
579            &self.initial_nodes,
580            self.cluster_params.clone().with_config(config),
581        )
582        .await
583    }
584
585    #[doc(hidden)]
586    pub fn get_generic_connection<C>(&self) -> RedisResult<cluster::ClusterConnection<C>>
587    where
588        C: crate::ConnectionLike + crate::cluster::Connect + Send,
589    {
590        cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
591    }
592
593    #[doc(hidden)]
594    #[cfg(feature = "cluster-async")]
595    pub async fn get_async_generic_connection<C>(
596        &self,
597    ) -> RedisResult<cluster_async::ClusterConnection<C>>
598    where
599        C: crate::aio::ConnectionLike
600            + cluster_async::Connect
601            + Clone
602            + Send
603            + Sync
604            + Unpin
605            + 'static,
606    {
607        cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
608            .await
609    }
610
611    /// Use `new()`.
612    #[deprecated(since = "0.22.0", note = "Use new()")]
613    pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<ClusterClient> {
614        Self::new(initial_nodes)
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::{ClusterClient, ClusterClientBuilder, ConnectionInfo, IntoConnectionInfo};
621
622    fn get_connection_data() -> Vec<ConnectionInfo> {
623        vec![
624            "redis://127.0.0.1:6379".into_connection_info().unwrap(),
625            "redis://127.0.0.1:6378".into_connection_info().unwrap(),
626            "redis://127.0.0.1:6377".into_connection_info().unwrap(),
627        ]
628    }
629
630    fn get_connection_data_with_password() -> Vec<ConnectionInfo> {
631        vec![
632            "redis://:password@127.0.0.1:6379"
633                .into_connection_info()
634                .unwrap(),
635            "redis://:password@127.0.0.1:6378"
636                .into_connection_info()
637                .unwrap(),
638            "redis://:password@127.0.0.1:6377"
639                .into_connection_info()
640                .unwrap(),
641        ]
642    }
643
644    fn get_connection_data_with_username_and_password() -> Vec<ConnectionInfo> {
645        vec![
646            "redis://user1:password@127.0.0.1:6379"
647                .into_connection_info()
648                .unwrap(),
649            "redis://user1:password@127.0.0.1:6378"
650                .into_connection_info()
651                .unwrap(),
652            "redis://user1:password@127.0.0.1:6377"
653                .into_connection_info()
654                .unwrap(),
655        ]
656    }
657
658    #[test]
659    fn give_no_password() {
660        let client = ClusterClient::new(get_connection_data()).unwrap();
661        assert_eq!(client.cluster_params.password, None);
662    }
663
664    #[test]
665    fn give_password_by_initial_nodes() {
666        let client = ClusterClient::new(get_connection_data_with_password()).unwrap();
667        assert_eq!(client.cluster_params.password, Some("password".to_string()));
668    }
669
670    #[test]
671    fn give_username_and_password_by_initial_nodes() {
672        let client = ClusterClient::new(get_connection_data_with_username_and_password()).unwrap();
673        assert_eq!(client.cluster_params.password, Some("password".to_string()));
674        assert_eq!(client.cluster_params.username, Some("user1".to_string()));
675    }
676
677    #[test]
678    fn fail_if_received_different_password_between_initial_nodes() {
679        let result = ClusterClient::new(vec![
680            "redis://:password1@127.0.0.1:6379",
681            "redis://:password2@127.0.0.1:6378",
682            "redis://:password3@127.0.0.1:6377",
683        ]);
684        assert!(result.is_err());
685    }
686
687    #[test]
688    fn fail_if_received_different_username_between_initial_nodes() {
689        let result = ClusterClient::new(vec![
690            "redis://user1:password@127.0.0.1:6379",
691            "redis://user2:password@127.0.0.1:6378",
692            "redis://user1:password@127.0.0.1:6377",
693        ]);
694        assert!(result.is_err());
695    }
696
697    #[test]
698    fn fail_if_received_different_protocol_between_initial_nodes() {
699        let result = ClusterClient::new(vec![
700            "redis://127.0.0.1:6379/?protocol=3",
701            "redis://127.0.0.1:6378",
702            "redis://127.0.0.1:6377",
703        ]);
704        assert!(result.is_err());
705    }
706
707    #[test]
708    fn fail_if_received_different_tls_between_initial_nodes() {
709        let result = ClusterClient::new(vec![
710            "rediss://127.0.0.1:6379/",
711            "redis://127.0.0.1:6378",
712            "redis://127.0.0.1:6377",
713        ]);
714        assert!(result.is_err());
715    }
716
717    #[test]
718    fn give_username_password_by_method() {
719        let client = ClusterClientBuilder::new(get_connection_data_with_username_and_password())
720            .password("password".to_string())
721            .username("user1".to_string())
722            .build()
723            .unwrap();
724        assert_eq!(client.cluster_params.password, Some("password".to_string()));
725        assert_eq!(client.cluster_params.username, Some("user1".to_string()));
726    }
727
728    #[test]
729    fn give_empty_initial_nodes() {
730        let client = ClusterClient::new(Vec::<String>::new());
731        assert!(client.is_err())
732    }
733}