fluffici_redis/
cluster_client.rs1use crate::connection::{ConnectionAddr, ConnectionInfo, IntoConnectionInfo};
2use crate::types::{ErrorKind, RedisError, RedisResult};
3use crate::{cluster, cluster::TlsMode};
4
5#[cfg(feature = "cluster-async")]
6use crate::cluster_async;
7
8const DEFAULT_RETRIES: u32 = 16;
9
10#[derive(Default)]
14struct BuilderParams {
15 password: Option<String>,
16 username: Option<String>,
17 read_from_replicas: bool,
18 tls: Option<TlsMode>,
19 retries: Option<u32>,
20}
21
22#[derive(Default, Clone)]
24pub(crate) struct ClusterParams {
25 pub(crate) password: Option<String>,
26 pub(crate) username: Option<String>,
27 pub(crate) read_from_replicas: bool,
28 pub(crate) tls: Option<TlsMode>,
32 pub(crate) retries: u32,
33}
34
35impl From<BuilderParams> for ClusterParams {
36 fn from(value: BuilderParams) -> Self {
37 Self {
38 password: value.password,
39 username: value.username,
40 read_from_replicas: value.read_from_replicas,
41 tls: value.tls,
42 retries: value.retries.unwrap_or(DEFAULT_RETRIES),
43 }
44 }
45}
46
47pub struct ClusterClientBuilder {
49 initial_nodes: RedisResult<Vec<ConnectionInfo>>,
50 builder_params: BuilderParams,
51}
52
53impl ClusterClientBuilder {
54 pub fn new<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> ClusterClientBuilder {
58 ClusterClientBuilder {
59 initial_nodes: initial_nodes
60 .into_iter()
61 .map(|x| x.into_connection_info())
62 .collect(),
63 builder_params: Default::default(),
64 }
65 }
66
67 pub fn build(self) -> RedisResult<ClusterClient> {
77 let initial_nodes = self.initial_nodes?;
78
79 let first_node = match initial_nodes.first() {
80 Some(node) => node,
81 None => {
82 return Err(RedisError::from((
83 ErrorKind::InvalidClientConfig,
84 "Initial nodes can't be empty.",
85 )))
86 }
87 };
88
89 let mut cluster_params: ClusterParams = self.builder_params.into();
90 let password = if cluster_params.password.is_none() {
91 cluster_params.password = first_node.redis.password.clone();
92 &cluster_params.password
93 } else {
94 &None
95 };
96 let username = if cluster_params.username.is_none() {
97 cluster_params.username = first_node.redis.username.clone();
98 &cluster_params.username
99 } else {
100 &None
101 };
102 if cluster_params.tls.is_none() {
103 cluster_params.tls = match first_node.addr {
104 ConnectionAddr::TcpTls {
105 host: _,
106 port: _,
107 insecure,
108 } => Some(match insecure {
109 false => TlsMode::Secure,
110 true => TlsMode::Insecure,
111 }),
112 _ => None,
113 };
114 }
115
116 let mut nodes = Vec::with_capacity(initial_nodes.len());
117 for node in initial_nodes {
118 if let ConnectionAddr::Unix(_) = node.addr {
119 return Err(RedisError::from((ErrorKind::InvalidClientConfig,
120 "This library cannot use unix socket because Redis's cluster command returns only cluster's IP and port.")));
121 }
122
123 if password.is_some() && node.redis.password != *password {
124 return Err(RedisError::from((
125 ErrorKind::InvalidClientConfig,
126 "Cannot use different password among initial nodes.",
127 )));
128 }
129
130 if username.is_some() && node.redis.username != *username {
131 return Err(RedisError::from((
132 ErrorKind::InvalidClientConfig,
133 "Cannot use different username among initial nodes.",
134 )));
135 }
136
137 nodes.push(node);
138 }
139
140 Ok(ClusterClient {
141 initial_nodes: nodes,
142 cluster_params,
143 })
144 }
145
146 pub fn password(mut self, password: String) -> ClusterClientBuilder {
148 self.builder_params.password = Some(password);
149 self
150 }
151
152 pub fn username(mut self, username: String) -> ClusterClientBuilder {
154 self.builder_params.username = Some(username);
155 self
156 }
157
158 pub fn retries(mut self, retries: u32) -> ClusterClientBuilder {
160 self.builder_params.retries = Some(retries);
161 self
162 }
163
164 #[cfg(any(feature = "tls-native-tls", feature = "tls-rustls"))]
168 pub fn tls(mut self, tls: TlsMode) -> ClusterClientBuilder {
169 self.builder_params.tls = Some(tls);
170 self
171 }
172
173 pub fn read_from_replicas(mut self) -> ClusterClientBuilder {
178 self.builder_params.read_from_replicas = true;
179 self
180 }
181
182 #[deprecated(since = "0.22.0", note = "Use build()")]
184 pub fn open(self) -> RedisResult<ClusterClient> {
185 self.build()
186 }
187
188 #[deprecated(since = "0.22.0", note = "Use read_from_replicas()")]
190 pub fn readonly(mut self, read_from_replicas: bool) -> ClusterClientBuilder {
191 self.builder_params.read_from_replicas = read_from_replicas;
192 self
193 }
194}
195
196#[derive(Clone)]
198pub struct ClusterClient {
199 initial_nodes: Vec<ConnectionInfo>,
200 cluster_params: ClusterParams,
201}
202
203impl ClusterClient {
204 pub fn new<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<ClusterClient> {
214 Self::builder(initial_nodes).build()
215 }
216
217 pub fn builder<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> ClusterClientBuilder {
219 ClusterClientBuilder::new(initial_nodes)
220 }
221
222 pub fn get_connection(&self) -> RedisResult<cluster::ClusterConnection> {
229 cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
230 }
231
232 #[cfg(feature = "cluster-async")]
239 pub async fn get_async_connection(&self) -> RedisResult<cluster_async::ClusterConnection> {
240 cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
241 .await
242 }
243
244 #[doc(hidden)]
245 pub fn get_generic_connection<C>(&self) -> RedisResult<cluster::ClusterConnection<C>>
246 where
247 C: crate::ConnectionLike + crate::cluster::Connect + Send,
248 {
249 cluster::ClusterConnection::new(self.cluster_params.clone(), self.initial_nodes.clone())
250 }
251
252 #[doc(hidden)]
253 #[cfg(feature = "cluster-async")]
254 pub async fn get_async_generic_connection<C>(
255 &self,
256 ) -> RedisResult<cluster_async::ClusterConnection<C>>
257 where
258 C: crate::aio::ConnectionLike
259 + cluster_async::Connect
260 + Clone
261 + Send
262 + Sync
263 + Unpin
264 + 'static,
265 {
266 cluster_async::ClusterConnection::new(&self.initial_nodes, self.cluster_params.clone())
267 .await
268 }
269
270 #[deprecated(since = "0.22.0", note = "Use new()")]
272 pub fn open<T: IntoConnectionInfo>(initial_nodes: Vec<T>) -> RedisResult<ClusterClient> {
273 Self::new(initial_nodes)
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::{ClusterClient, ClusterClientBuilder, ConnectionInfo, IntoConnectionInfo};
280
281 fn get_connection_data() -> Vec<ConnectionInfo> {
282 vec![
283 "redis://127.0.0.1:6379".into_connection_info().unwrap(),
284 "redis://127.0.0.1:6378".into_connection_info().unwrap(),
285 "redis://127.0.0.1:6377".into_connection_info().unwrap(),
286 ]
287 }
288
289 fn get_connection_data_with_password() -> Vec<ConnectionInfo> {
290 vec![
291 "redis://:password@127.0.0.1:6379"
292 .into_connection_info()
293 .unwrap(),
294 "redis://:password@127.0.0.1:6378"
295 .into_connection_info()
296 .unwrap(),
297 "redis://:password@127.0.0.1:6377"
298 .into_connection_info()
299 .unwrap(),
300 ]
301 }
302
303 fn get_connection_data_with_username_and_password() -> Vec<ConnectionInfo> {
304 vec![
305 "redis://user1:password@127.0.0.1:6379"
306 .into_connection_info()
307 .unwrap(),
308 "redis://user1:password@127.0.0.1:6378"
309 .into_connection_info()
310 .unwrap(),
311 "redis://user1:password@127.0.0.1:6377"
312 .into_connection_info()
313 .unwrap(),
314 ]
315 }
316
317 #[test]
318 fn give_no_password() {
319 let client = ClusterClient::new(get_connection_data()).unwrap();
320 assert_eq!(client.cluster_params.password, None);
321 }
322
323 #[test]
324 fn give_password_by_initial_nodes() {
325 let client = ClusterClient::new(get_connection_data_with_password()).unwrap();
326 assert_eq!(client.cluster_params.password, Some("password".to_string()));
327 }
328
329 #[test]
330 fn give_username_and_password_by_initial_nodes() {
331 let client = ClusterClient::new(get_connection_data_with_username_and_password()).unwrap();
332 assert_eq!(client.cluster_params.password, Some("password".to_string()));
333 assert_eq!(client.cluster_params.username, Some("user1".to_string()));
334 }
335
336 #[test]
337 fn give_different_password_by_initial_nodes() {
338 let result = ClusterClient::new(vec![
339 "redis://:password1@127.0.0.1:6379",
340 "redis://:password2@127.0.0.1:6378",
341 "redis://:password3@127.0.0.1:6377",
342 ]);
343 assert!(result.is_err());
344 }
345
346 #[test]
347 fn give_different_username_by_initial_nodes() {
348 let result = ClusterClient::new(vec![
349 "redis://user1:password@127.0.0.1:6379",
350 "redis://user2:password@127.0.0.1:6378",
351 "redis://user1:password@127.0.0.1:6377",
352 ]);
353 assert!(result.is_err());
354 }
355
356 #[test]
357 fn give_username_password_by_method() {
358 let client = ClusterClientBuilder::new(get_connection_data_with_password())
359 .password("pass".to_string())
360 .username("user1".to_string())
361 .build()
362 .unwrap();
363 assert_eq!(client.cluster_params.password, Some("pass".to_string()));
364 assert_eq!(client.cluster_params.username, Some("user1".to_string()));
365 }
366
367 #[test]
368 fn give_empty_initial_nodes() {
369 let client = ClusterClient::new(Vec::<String>::new());
370 assert!(client.is_err())
371 }
372}