fluffici_redis/
cluster_client.rs

1use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
2use crate::types::{ErrorKind, RedisError, RedisResult};
3use crate::{cluster, cluster::TlsMode};
4
5#[cfg(feature = "cluster-async")]
6use crate::cluster_async;
7
8const DEFAULT_RETRIES: u32 = 16;
9
10/// Parameters specific to builder, so that
11/// builder parameters may have different types
12/// than final ClusterParams
13#[derive(Default)]
14struct BuilderParams {
15    password: Option<String>,
16    username: Option<String>,
17    read_from_replicas: bool,
18    tls: Option<TlsMode>,
19    retries: Option<u32>,
20}
21
22/// Redis cluster specific parameters.
23#[derive(Default, Clone)]
24pub(crate) struct ClusterParams {
25    pub(crate) password: Option<String>,
26    pub(crate) username: Option<String>,
27    pub(crate) read_from_replicas: bool,
28    /// tls indicates tls behavior of connections.
29    /// When Some(TlsMode), connections use tls and verify certification depends on TlsMode.
30    /// When None, connections do not use tls.
31    pub(crate) tls: Option<TlsMode>,
32    pub(crate) retries: u32,
33}
34
35impl From<BuilderParams> for ClusterParams {
36    fn from(value: BuilderParams) -> Self {
37        Self {
38            password: value.password,
39            username: value.username,
40            read_from_replicas: value.read_from_replicas,
41            tls: value.tls,
42            retries: value.retries.unwrap_or(DEFAULT_RETRIES),
43        }
44    }
45}
46
47/// Used to configure and build a [`ClusterClient`].
48pub struct ClusterClientBuilder {
49    initial_nodes: RedisResult<Vec<ConnectionInfo>>,
50    builder_params: BuilderParams,
51}
52
53impl ClusterClientBuilder {
54    /// Creates a new `ClusterClientBuilder` with the provided initial_nodes.
55    ///
56    /// This is the same as `ClusterClient::builder(initial_nodes)`.
57    pub fn new<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> ClusterClientBuilder {
58        ClusterClientBuilder {
59            initial_nodes: initial_nodes
60                .into_iter()
61                .map(|x| x.into_connection_info())
62                .collect(),
63            builder_params: Default::default(),
64        }
65    }
66
67    /// Creates a new [`ClusterClient`] from the parameters.
68    ///
69    /// This does not create connections to the Redis Cluster, but only performs some basic checks
70    /// on the initial nodes' URLs and passwords/usernames.
71    ///
72    /// # Errors
73    ///
74    /// Upon failure to parse initial nodes or if the initial nodes have different passwords or
75    /// usernames, an error is returned.
76    pub fn build(self) -> RedisResult<ClusterClient> {
77        let initial_nodes = self.initial_nodes?;
78
79        let first_node = match initial_nodes.first() {
80            Some(node) => node,
81            None => {
82                return Err(RedisError::from((
83                    ErrorKind::InvalidClientConfig,
84                    "Initial nodes can't be empty.",
85                )))
86            }
87        };
88
89        let mut cluster_params: ClusterParams = self.builder_params.into();
90        let password = if cluster_params.password.is_none() {
91            cluster_params.password = first_node.redis.password.clone();
92            &cluster_params.password
93        } else {
94            &None
95        };
96        let username = if cluster_params.username.is_none() {
97            cluster_params.username = first_node.redis.username.clone();
98            &cluster_params.username
99        } else {
100            &None
101        };
102        if cluster_params.tls.is_none() {
103            cluster_params.tls = match first_node.addr {
104                ConnectionAddr::TcpTls {
105                    host: _,
106                    port: _,
107                    insecure,
108                } => Some(match insecure {
109                    false => TlsMode::Secure,
110                    true => TlsMode::Insecure,
111                }),
112                _ => None,
113            };
114        }
115
116        let mut nodes = Vec::with_capacity(initial_nodes.len());
117        for node in initial_nodes {
118            if let ConnectionAddr::Unix(_) = node.addr {
119                return Err(RedisError::from((ErrorKind::InvalidClientConfig,
120                                             "This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
121            }
122
123            if password.is_some() && node.redis.password != *password {
124                return Err(RedisError::from((
125                    ErrorKind::InvalidClientConfig,
126                    "Cannot use different password among initial nodes.",
127                )));
128            }
129
130            if username.is_some() && node.redis.username != *username {
131                return Err(RedisError::from((
132                    ErrorKind::InvalidClientConfig,
133                    "Cannot use different username among initial nodes.",
134                )));
135            }
136
137            nodes.push(node);
138        }
139
140        Ok(ClusterClient {
141            initial_nodes: nodes,
142            cluster_params,
143        })
144    }
145
146    /// Sets password for the new ClusterClient.
147    pub fn password(mut self, password: String) -> ClusterClientBuilder {
148        self.builder_params.password = Some(password);
149        self
150    }
151
152    /// Sets username for the new ClusterClient.
153    pub fn username(mut self, username: String) -> ClusterClientBuilder {
154        self.builder_params.username = Some(username);
155        self
156    }
157
158    /// Sets number of retries for the new ClusterClient.
159    pub fn retries(mut self, retries: u32) -> ClusterClientBuilder {
160        self.builder_params.retries = Some(retries);
161        self
162    }
163
164    /// Sets TLS mode for the new ClusterClient.
165    ///
166    /// It is extracted from the first node of initial_nodes if not set.
167    #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
168    pub fn tls(mut self, tls: TlsMode) -> ClusterClientBuilder {
169        self.builder_params.tls = Some(tls);
170        self
171    }
172
173    /// Enables reading from replicas for all new connections (default is disabled).
174    ///
175    /// If enabled, then read queries will go to the replica nodes & write queries will go to the
176    /// primary nodes. If there are no replica nodes, then all queries will go to the primary nodes.
177    pub fn read_from_replicas(mut self) -> ClusterClientBuilder {
178        self.builder_params.read_from_replicas = true;
179        self
180    }
181
182    /// Use `build()`.
183    #[deprecated(since = "0.22.0", note = "Use build()")]
184    pub fn open(self) -> RedisResult<ClusterClient> {
185        self.build()
186    }
187
188    /// Use `read_from_replicas()`.
189    #[deprecated(since = "0.22.0", note = "Use read_from_replicas()")]
190    pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder {
191        self.builder_params.read_from_replicas = read_from_replicas;
192        self
193    }
194}
195
196/// This is a Redis Cluster client.
197#[derive(Clone)]
198pub struct ClusterClient {
199    initial_nodes: Vec<ConnectionInfo>,
200    cluster_params: ClusterParams,
201}
202
203impl ClusterClient {
204    /// Creates a `ClusterClient` with the default parameters.
205    ///
206    /// This does not create connections to the Redis Cluster, but only performs some basic checks
207    /// on the initial nodes' URLs and passwords/usernames.
208    ///
209    /// # Errors
210    ///
211    /// Upon failure to parse initial nodes or if the initial nodes have different passwords or
212    /// usernames, an error is returned.
213    pub fn new<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<ClusterClient> {
214        Self::builder(initial_nodes).build()
215    }
216
217    /// Creates a [`ClusterClientBuilder`] with the provided initial_nodes.
218    pub fn builder<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> ClusterClientBuilder {
219        ClusterClientBuilder::new(initial_nodes)
220    }
221
222    /// Creates new connections to Redis Cluster nodes and returns a
223    /// [`cluster::ClusterConnection`].
224    ///
225    /// # Errors
226    ///
227    /// An error is returned if there is a failure while creating connections or slots.
228    pub fn get_connection(&self) -> RedisResult<cluster::ClusterConnection> {
229        cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
230    }
231
232    /// Creates new connections to Redis Cluster nodes and returns a
233    /// [`cluster_async::ClusterConnection`].
234    ///
235    /// # Errors
236    ///
237    /// An error is returned if there is a failure while creating connections or slots.
238    #[cfg(feature = "cluster-async")]
239    pub async fn get_async_connection(&self) -> RedisResult<cluster_async::ClusterConnection> {
240        cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
241            .await
242    }
243
244    #[doc(hidden)]
245    pub fn get_generic_connection<C>(&self) -> RedisResult<cluster::ClusterConnection<C>>
246    where
247        C: crate::ConnectionLike + crate::cluster::Connect + Send,
248    {
249        cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
250    }
251
252    #[doc(hidden)]
253    #[cfg(feature = "cluster-async")]
254    pub async fn get_async_generic_connection<C>(
255        &self,
256    ) -> RedisResult<cluster_async::ClusterConnection<C>>
257    where
258        C: crate::aio::ConnectionLike
259            + cluster_async::Connect
260            + Clone
261            + Send
262            + Sync
263            + Unpin
264            + 'static,
265    {
266        cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
267            .await
268    }
269
270    /// Use `new()`.
271    #[deprecated(since = "0.22.0", note = "Use new()")]
272    pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<ClusterClient> {
273        Self::new(initial_nodes)
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::{ClusterClient, ClusterClientBuilder, ConnectionInfo, IntoConnectionInfo};
280
281    fn get_connection_data() -> Vec<ConnectionInfo> {
282        vec![
283            "redis://127.0.0.1:6379".into_connection_info().unwrap(),
284            "redis://127.0.0.1:6378".into_connection_info().unwrap(),
285            "redis://127.0.0.1:6377".into_connection_info().unwrap(),
286        ]
287    }
288
289    fn get_connection_data_with_password() -> Vec<ConnectionInfo> {
290        vec![
291            "redis://:password@127.0.0.1:6379"
292                .into_connection_info()
293                .unwrap(),
294            "redis://:password@127.0.0.1:6378"
295                .into_connection_info()
296                .unwrap(),
297            "redis://:password@127.0.0.1:6377"
298                .into_connection_info()
299                .unwrap(),
300        ]
301    }
302
303    fn get_connection_data_with_username_and_password() -> Vec<ConnectionInfo> {
304        vec![
305            "redis://user1:password@127.0.0.1:6379"
306                .into_connection_info()
307                .unwrap(),
308            "redis://user1:password@127.0.0.1:6378"
309                .into_connection_info()
310                .unwrap(),
311            "redis://user1:password@127.0.0.1:6377"
312                .into_connection_info()
313                .unwrap(),
314        ]
315    }
316
317    #[test]
318    fn give_no_password() {
319        let client = ClusterClient::new(get_connection_data()).unwrap();
320        assert_eq!(client.cluster_params.password, None);
321    }
322
323    #[test]
324    fn give_password_by_initial_nodes() {
325        let client = ClusterClient::new(get_connection_data_with_password()).unwrap();
326        assert_eq!(client.cluster_params.password, Some("password".to_string()));
327    }
328
329    #[test]
330    fn give_username_and_password_by_initial_nodes() {
331        let client = ClusterClient::new(get_connection_data_with_username_and_password()).unwrap();
332        assert_eq!(client.cluster_params.password, Some("password".to_string()));
333        assert_eq!(client.cluster_params.username, Some("user1".to_string()));
334    }
335
336    #[test]
337    fn give_different_password_by_initial_nodes() {
338        let result = ClusterClient::new(vec![
339            "redis://:password1@127.0.0.1:6379",
340            "redis://:password2@127.0.0.1:6378",
341            "redis://:password3@127.0.0.1:6377",
342        ]);
343        assert!(result.is_err());
344    }
345
346    #[test]
347    fn give_different_username_by_initial_nodes() {
348        let result = ClusterClient::new(vec![
349            "redis://user1:password@127.0.0.1:6379",
350            "redis://user2:password@127.0.0.1:6378",
351            "redis://user1:password@127.0.0.1:6377",
352        ]);
353        assert!(result.is_err());
354    }
355
356    #[test]
357    fn give_username_password_by_method() {
358        let client = ClusterClientBuilder::new(get_connection_data_with_password())
359            .password("pass".to_string())
360            .username("user1".to_string())
361            .build()
362            .unwrap();
363        assert_eq!(client.cluster_params.password, Some("pass".to_string()));
364        assert_eq!(client.cluster_params.username, Some("user1".to_string()));
365    }
366
367    #[test]
368    fn give_empty_initial_nodes() {
369        let client = ClusterClient::new(Vec::<String>::new());
370        assert!(client.is_err())
371    }
372}