mouscache/
redis_cache.rs

1use std::net;
2use std::mem::discriminant;
3use std::collections::hash_map::HashMap;
4use crate::Result;
5use crate::CacheError;
6use crate::Cacheable;
7use crate::CacheAccess;
8use crate::CacheFunc;
9use crate::redis;
10use redis::Commands;
11use dns_lookup::lookup_host;
12use crate::FromValue;
13
14use r2d2::Pool;
15use std::str::FromStr;
16
17const DB_CONNECTION_TIMEOUT_MS: i64 = 5000;
18
19mod r2d2_test {
20    use crate::redis;
21    use redis::{cmd, RedisError};
22    use r2d2;
23    use std::error;
24    use std::fmt;
25
26    /// A unified enum of errors returned by redis::Client
27    #[derive(Debug)]
28    pub enum Error {
29        /// A redis::RedisError
30        RedisCacheError(RedisError),
31        Other(String),
32    }
33
34    impl fmt::Display for Error {
35        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36            match self {
37                Error::RedisCacheError(e) => write!(f, "Redis cache error: {}", e),
38                Error::Other(desc) => write!(f, "Other: {}", desc),
39            }
40        }
41    }
42
43    impl From<redis::RedisError> for Error {
44        fn from(e: RedisError) -> Self {
45            Error::RedisCacheError(e)
46        }
47    }
48
49    impl error::Error for Error {}
50
51    #[derive(Debug)]
52    pub struct RedisConnectionManager {
53        connection_info: redis::ConnectionInfo,
54        password: Option<String>,
55        db: Option<u16>,
56    }
57
58    impl RedisConnectionManager {
59        pub fn new<T: redis::IntoConnectionInfo>(host: T, password: Option<&str>, db: Option<u16>)
60                                                 -> Result<RedisConnectionManager, redis::RedisError> {
61            Ok(RedisConnectionManager {
62                connection_info: host.into_connection_info()?,
63                password: password.map(|s| { s.to_string() }),
64                db,
65            })
66        }
67    }
68
69    impl r2d2::ManageConnection for RedisConnectionManager {
70        type Connection = redis::Connection;
71        type Error = Error;
72
73        fn connect(&self) -> Result<redis::Connection, Error> {
74            let client = redis::Client::open(self.connection_info.clone())?;
75            let conn = client.get_connection()?;
76
77            if let Some(ref p) = self.password {
78                if !cmd("AUTH").arg(p).query::<bool>(&conn)? {
79                    return Err(Error::Other("Redis authentication failed: Bad password".to_string()));
80                }
81            }
82
83            if let Some(db) = self.db {
84                if !cmd("SELECT").arg(db).query::<bool>(&conn)? {
85                    return Err(Error::Other(format!("Redis server refused to switch database: Bad index ({:?})", db)));
86                }
87            }
88
89            Ok(conn)
90        }
91
92        fn is_valid(&self, conn: &mut redis::Connection) -> Result<(), Error> {
93            Ok(redis::cmd("PING").query(conn)?)
94        }
95
96        fn has_broken(&self, _conn: &mut redis::Connection) -> bool {
97            false
98        }
99    }
100}
101
102
103#[allow(dead_code)]
104pub struct RedisCache {
105    connection_pool: Pool<r2d2_test::RedisConnectionManager>,
106}
107
108impl Clone for RedisCache {
109    fn clone(&self) -> Self {
110        RedisCache {
111            connection_pool: self.connection_pool.clone()
112        }
113    }
114}
115
116impl RedisCache {
117    pub fn new(host: &str, password: Option<&str>, db: Option<u16>) -> Result<RedisCache> {
118        let host_vec: Vec<&str> = host.split(":").collect();
119
120        let ips: Vec<net::IpAddr> = match lookup_host(host_vec[0]) {
121            Ok(hosts) => hosts,
122            Err(e) => return Err(CacheError::Other(e.to_string())),
123        };
124
125        if let Some((_, ip_v4)) = ips.iter()
126            .enumerate()
127            .find(|&(_index, ip)| {
128                discriminant(ip) == discriminant(&net::IpAddr::V4(net::Ipv4Addr::new(0, 0, 0, 0)))
129            }) {
130            let ip_host = if host_vec.len() > 1 {
131                format!("{}:{}", ip_v4.to_string(), host_vec[1])
132            } else {
133                ip_v4.to_string()
134            };
135
136            let url = format!("redis://{}", ip_host);
137
138            let manager = match r2d2_test::RedisConnectionManager::new(url.as_str(), password, db) {
139                Ok(m) => m,
140                Err(e) => return Err(CacheError::Other(e.to_string())),
141            };
142
143            let connection_pool = match Pool::builder()
144                .max_size(15)
145                .connection_timeout(::std::time::Duration::from_millis(DB_CONNECTION_TIMEOUT_MS as u64))
146                .build(manager) {
147                Ok(cp) => cp,
148                Err(e) => return Err(CacheError::Other(e.to_string())),
149            };
150
151            return Ok(RedisCache {
152                connection_pool,
153            });
154        }
155
156        Err(CacheError::Other(format!("Could'n find any valid IP for host {} ", host)))
157    }
158}
159
160impl CacheAccess for RedisCache {
161    fn insert<K: ToString, O: Cacheable + Clone + 'static>(&self, key: K, obj: O) -> Result<()> {
162        let exp = obj.expires_after();
163        self.insert_with(key, obj, exp)
164    }
165
166    fn insert_with<K: ToString, O: Cacheable + Clone + 'static>(&self, key: K, obj: O, expires_after: Option<usize>) -> Result<()> {
167        let connection = match self.connection_pool.get() {
168            Ok(con) => con,
169            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
170        };
171
172        let redis_key = redis_key_create::<K, O>(key);
173        let data = obj.to_redis_obj();
174        if let Some(ttl) = expires_after {
175            redis_hash_set_multiple_with_expire(&connection, redis_key, &data, ttl)
176        } else {
177            redis_hash_set_multiple(&connection, redis_key, &data)
178        }
179    }
180
181    fn get<K: ToString, O: Cacheable + 'static>(&self, key: K) -> Result<Option<O>> {
182        let connection = match self.connection_pool.get() {
183            Ok(con) => con,
184            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
185        };
186
187        let redis_key = redis_key_create::<K, O>(key);
188        if let Ok(val) = redis_hash_get_all(&connection, redis_key) {
189            if let Ok(c) = O::from_redis_obj(val) {
190                Ok(Some(c))
191            } else {
192                Ok(None)
193            }
194        } else {
195            Ok(None)
196        }
197    }
198
199    fn contains_key<K: ToString, O: Cacheable + Clone + 'static>(&self, key: K) -> Result<bool> {
200        let connection = match self.connection_pool.get() {
201            Ok(con) => con,
202            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
203        };
204
205        let redis_key = redis_key_create::<K, O>(key);
206
207        redis_key_exists(&connection, redis_key)
208    }
209
210    fn remove<K: ToString, O: Cacheable>(&self, key: K) -> Result<()> {
211        let connection = match self.connection_pool.get() {
212            Ok(con) => con,
213            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
214        };
215
216        let redis_key = redis_key_create::<K, O>(key);
217        redis_delete(&connection, redis_key)
218    }
219}
220
221fn redis_key_create<K: ToString, O: Cacheable>(key: K) -> String {
222    format!("{}:{}", O::model_name(), key.to_string())
223}
224
225fn redis_hash_set_multiple_with_expire<F: redis::ToRedisArgs, V: redis::ToRedisArgs>(con: &redis::Connection, key: String, v: &[(F, V)], ttl_sec: usize) -> Result<()> {
226    redis_hash_set_multiple(con, key.clone(), v)?;
227    con.expire(key, ttl_sec).map(|_: ::redis::Value| ()).map_err( |e| e.into())
228}
229
230fn redis_hash_set_multiple<F: redis::ToRedisArgs, V: redis::ToRedisArgs>(con: &redis::Connection, key: String, v: &[(F, V)]) -> Result<()> {
231    con.hset_multiple::<String, F, V, ()>(key, v).map_err( |e| e.into())
232}
233
234fn redis_hash_get_all(con: &redis::Connection, key: String) -> Result<HashMap<String, String>> {
235    con.hgetall::<String, HashMap<String, String>>(key).map_err( |e| e.into())
236}
237
238fn redis_delete(con: &redis::Connection, key: String) -> Result<()> {
239    con.del::<String, ()>(key).map_err( |e| e.into())
240}
241
242fn redis_key_exists(con: &redis::Connection, key: String) -> Result<bool> {
243    con.exists::<String, bool>(key).map_err( |e| e.into())
244}
245
246impl CacheFunc for RedisCache {
247    fn hash_delete(&self, key: &str, fields: &[&str]) -> Result<bool> {
248        let connection = match self.connection_pool.get() {
249            Ok(con) => con,
250            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
251        };
252
253        connection.hdel(key, fields).map_err(|e| e.into())
254    }
255
256    fn hash_exists(&self, key: &str, field: &str) -> Result<bool> {
257        let connection = match self.connection_pool.get() {
258            Ok(con) => con,
259            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
260        };
261        connection.hexists(key, field).map_err(|e| e.into())
262    }
263
264    fn hash_get<T: FromStr>(&self, key: &str, field: &str) -> Result<Option<T>> {
265        let connection = match self.connection_pool.get() {
266            Ok(con) => con,
267            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
268        };
269
270        let redis_val: ::redis::Value = connection.hget(key, field)?;
271        if let ::redis::Value::Nil = redis_val {
272            return Ok(None);
273        }
274
275        let val = String::from_redis_value(&redis_val)?;
276        return T::from_str(val.as_ref()).map(|t| Some(t)).map_err(|_| CacheError::Other("An error occured while parsing a redis value".to_string()))
277    }
278
279    fn hash_get_all<T: Cacheable + Clone + 'static>(&self, key: &str) -> Result<Option<T>> {
280        let connection = match self.connection_pool.get() {
281            Ok(con) => con,
282            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
283        };
284        let map: HashMap<String, String> = connection.hgetall(key)?;
285        T::from_redis_obj(map).map(|t| Some(t))
286    }
287
288    fn hash_keys(&self, key: &str) -> Result<Vec<String>> {
289        let connection = match self.connection_pool.get() {
290            Ok(con) => con,
291            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
292        };
293        connection.hkeys(key).map_err(|e| e.into())
294    }
295
296    fn hash_len(&self, key: &str) -> Result<usize> {
297        let connection = match self.connection_pool.get() {
298            Ok(con) => con,
299            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
300        };
301        connection.hlen(key).map_err(|e| e.into())
302    }
303
304    fn hash_multiple_get(&self, _key: &str, _fields: &[&str]) -> Result<Vec<Option<String>>> {
305        unimplemented!()
306    }
307
308    fn hash_multiple_set<V: ToString>(&self, key: &str, fv_pairs: &[(&str, V)]) -> Result<bool> {
309        let connection = match self.connection_pool.get() {
310            Ok(con) => con,
311            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
312        };
313        let intermediate_vec = fv_pairs.iter().map(|&(ref s, ref v)| (s.to_string(), v.to_string())).collect::<Vec<(String, String)>>();
314        connection.hset_multiple(key, &intermediate_vec).map_err(|e| e.into())
315    }
316
317    fn hash_set<V: ToString>(&self, key: &str, field: &str, value: V) -> Result<bool> {
318        let connection = match self.connection_pool.get() {
319            Ok(con) => con,
320            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
321        };
322        connection.hset(key, field, value.to_string()).map_err(|e| e.into())
323    }
324
325    fn hash_set_all<T: Cacheable + Clone + 'static>(&self, key: &str, cacheable: T) -> Result<bool> {
326        let connection = match self.connection_pool.get() {
327            Ok(con) => con,
328            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
329        };
330        let fv_pairs = cacheable.to_redis_obj();
331        connection.hset_multiple(key, &fv_pairs).map_err(|e| e.into())
332    }
333
334    fn hash_set_if_not_exists<V: ToString>(&self, key: &str, field: &str, value: V) -> Result<bool> {
335        let connection = match self.connection_pool.get() {
336            Ok(con) => con,
337            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
338        };
339        connection.hset_nx(key, field, value.to_string()).map_err(|e| e.into())
340    }
341
342    fn hash_values(&self, key: &str) -> Result<Vec<String>> {
343        let connection = match self.connection_pool.get() {
344            Ok(con) => con,
345            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
346        };
347        connection.hvals(key).map_err(|e| e.into())
348    }
349
350    fn set_add<V: ToString>(&self, key: &str, members: &[V]) -> Result<bool> {
351        let connection = match self.connection_pool.get() {
352            Ok(con) => con,
353            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
354        };
355        let string_members = members.iter().map(|m| m.to_string()).collect::<Vec<String>>();
356        connection.sadd(key, string_members).map_err(|e| e.into())
357    }
358
359    fn set_card(&self, key: &str) -> Result<u64> {
360        let connection = match self.connection_pool.get() {
361            Ok(con) => con,
362            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
363        };
364        connection.scard(key).map_err(|e| e.into())
365    }
366
367    fn set_diff(&self, keys: &[&str]) -> Result<Vec<String>> {
368        let connection = match self.connection_pool.get() {
369            Ok(con) => con,
370            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
371        };
372        connection.sdiff(keys).map_err(|e| e.into())
373    }
374
375    fn set_diffstore(&self, diff_name: &str, keys: &[&str]) -> Result<u64> {
376        let connection = match self.connection_pool.get() {
377            Ok(con) => con,
378            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
379        };
380        ::redis::cmd("SDIFFSTORE").arg(diff_name).arg(keys).query(&*connection).map_err(|e| e.into())
381    }
382
383    fn set_inter(&self, keys: &[&str]) -> Result<Vec<String>> {
384        let connection = match self.connection_pool.get() {
385            Ok(con) => con,
386            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
387        };
388        connection.sinter(keys).map_err(|e| e.into())
389    }
390
391    fn set_interstore(&self, inter_name: &str, keys: &[&str]) -> Result<u64> {
392        let connection = match self.connection_pool.get() {
393            Ok(con) => con,
394            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
395        };
396        ::redis::cmd("SINTERSTORE").arg(inter_name).arg(keys).query(&*connection).map_err(|e| e.into())
397    }
398
399    fn set_ismember<V: ToString>(&self, key: &str, member: V) -> Result<bool> {
400        let connection = match self.connection_pool.get() {
401            Ok(con) => con,
402            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
403        };
404        connection.sismember(key, member.to_string()).map_err(|e|e.into())
405    }
406
407    fn set_members(&self, key: &str) -> Result<Vec<String>> {
408        let connection = match self.connection_pool.get() {
409            Ok(con) => con,
410            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
411        };
412        connection.smembers(key).map_err(|e|e.into())
413    }
414
415    fn set_move<V: ToString>(&self, key1: &str, key2: &str, member: V) -> Result<bool> {
416        let connection = match self.connection_pool.get() {
417            Ok(con) => con,
418            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
419        };
420        connection.smove(key1, key2, member.to_string()).map_err(|e|e.into())
421    }
422
423    fn set_rem<V: ToString>(&self, key: &str, member: V) -> Result<bool> {
424        let connection = match self.connection_pool.get() {
425            Ok(con) => con,
426            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
427        };
428        connection.srem(key, member.to_string()).map_err(|e|e.into())
429    }
430
431    fn set_union(&self, keys: &[&str]) -> Result<Vec<String>> {
432        let connection = match self.connection_pool.get() {
433            Ok(con) => con,
434            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
435        };
436        connection.sunion(keys).map_err(|e| e.into())
437    }
438
439    fn set_unionstore(&self, union_name: &str, keys: &[&str]) -> Result<u64> {
440        let connection = match self.connection_pool.get() {
441            Ok(con) => con,
442            Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
443        };
444        ::redis::cmd("SUNIONSTORE").arg(union_name).arg(keys).query(&*connection).map_err(|e| e.into())
445    }
446}