r2d2_redis_cluster/
lib.rs

1//! Redis cluster support for the `r2d2` connection pool.
2//!
3//! # Example
4//! ```rust,no_run
5//! extern crate r2d2_redis_cluster;
6//!
7//! use std::thread;
8//!
9//! use r2d2_redis_cluster::{r2d2::Pool, Commands, RedisClusterConnectionManager};
10//!
11//! fn main() {
12//!     let redis_uri = vec!["redis://127.0.0.1:6379", "redis://127.0.0.1:6378", "redis://127.0.0.1:6377"];
13//!     let manager = RedisClusterConnectionManager::new(redis_uri).unwrap();
14//!     let pool = Pool::builder()
15//!         .build(manager)
16//!         .unwrap();
17//!
18//!     let mut handles = Vec::new();
19//!
20//!     for _ in 0..10 {
21//!         let pool = pool.clone();
22//!         handles.push(thread::spawn(move || {
23//!             let connection = pool.get().unwrap();
24//!             let _: u64 = connection.incr("test", 1).unwrap();
25//!         }));
26//!     }
27//!
28//!     for h in handles {
29//!         h.join().unwrap();
30//!     }
31//!
32//!     let mut connection = pool.get().unwrap();
33//!     let res: u64 = connection.get("test").unwrap();
34//!
35//!     assert_eq!(res, 10);
36//! }
37//! ```
38pub extern crate r2d2;
39pub extern crate redis_cluster_rs;
40
41use r2d2::ManageConnection;
42use redis_cluster_rs::{
43    redis::{ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError},
44    Builder, Connection,
45};
46use std::time::Duration;
47
48pub use redis_cluster_rs::redis::{Commands, ConnectionLike, RedisResult};
49
50/// An `r2d2::ConnectionManager` for `redis_cluster_rs::Client`.
51#[derive(Debug)]
52pub struct RedisClusterConnectionManager {
53    nodes: Vec<ConnectionInfo>,
54    readonly: bool,
55    password: Option<String>,
56    read_timeout: Option<Duration>,
57    write_timeout: Option<Duration>,
58}
59
60impl RedisClusterConnectionManager {
61    /// Create new `RedisClusterConnectionManager`.
62    pub fn new<T: IntoConnectionInfo>(
63        input_nodes: Vec<T>,
64    ) -> RedisResult<RedisClusterConnectionManager> {
65        let mut nodes = Vec::with_capacity(input_nodes.len());
66
67        for node in input_nodes {
68            nodes.push(node.into_connection_info()?)
69        }
70
71        Ok(RedisClusterConnectionManager {
72            nodes,
73            readonly: false,
74            password: None,
75            read_timeout: None,
76            write_timeout: None,
77        })
78    }
79
80    /// Create new `RedisClusterConnectionManager` with authentication.
81    #[deprecated(note = "Please use new and password function")]
82    pub fn new_with_auth<T: IntoConnectionInfo>(
83        input_nodes: Vec<T>,
84        password: String,
85    ) -> RedisResult<RedisClusterConnectionManager> {
86        let mut result = Self::new(input_nodes)?;
87        result.set_password(password);
88        Ok(result)
89    }
90
91    /// Set read only mode for new Connection.
92    pub fn set_readonly(&mut self, readonly: bool) {
93        self.readonly = readonly;
94    }
95
96    /// Set password for new Connection.
97    pub fn set_password(&mut self, password: String) {
98        self.password = Some(password);
99    }
100
101    /// Set the read timeout for the connection.
102    pub fn set_read_timeout(&mut self, timeout: Option<Duration>) {
103        self.read_timeout = timeout;
104    }
105
106    /// Set the write timeout for the connection.
107    pub fn set_write_timeout(&mut self, timeout: Option<Duration>) {
108        self.write_timeout = timeout;
109    }
110}
111
112impl ManageConnection for RedisClusterConnectionManager {
113    type Connection = Connection;
114    type Error = RedisError;
115
116    fn connect(&self) -> Result<Self::Connection, Self::Error> {
117        let builder = Builder::new(self.nodes.clone())
118            .readonly(self.readonly)
119            .read_timeout(self.read_timeout)
120            .write_timeout(self.write_timeout);
121
122        let client = if let Some(password) = self.password.clone() {
123            builder.password(password).open()?
124        } else {
125            builder.open()?
126        };
127        client.get_connection()
128    }
129
130    fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
131        if conn.check_connection() {
132            Ok(())
133        } else {
134            Err(RedisError::from((
135                ErrorKind::IoError,
136                "Connection check error.",
137            )))
138        }
139    }
140
141    fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
142        false
143    }
144}