1use std::net;
2use std::mem::discriminant;
3use std::collections::hash_map::HashMap;
4use crate::Result;
5use crate::CacheError;
6use crate::Cacheable;
7use crate::CacheAccess;
8use crate::CacheFunc;
9use crate::redis;
10use redis::Commands;
11use dns_lookup::lookup_host;
12use crate::FromValue;
13
14use r2d2::Pool;
15use std::str::FromStr;
16
17const DB_CONNECTION_TIMEOUT_MS: i64 = 5000;
18
19mod r2d2_test {
20 use crate::redis;
21 use redis::{cmd, RedisError};
22 use r2d2;
23 use std::error;
24 use std::fmt;
25
26 #[derive(Debug)]
28 pub enum Error {
29 RedisCacheError(RedisError),
31 Other(String),
32 }
33
34 impl fmt::Display for Error {
35 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
36 match self {
37 Error::RedisCacheError(e) => write!(f, "Redis cache error: {}", e),
38 Error::Other(desc) => write!(f, "Other: {}", desc),
39 }
40 }
41 }
42
43 impl From<redis::RedisError> for Error {
44 fn from(e: RedisError) -> Self {
45 Error::RedisCacheError(e)
46 }
47 }
48
49 impl error::Error for Error {}
50
51 #[derive(Debug)]
52 pub struct RedisConnectionManager {
53 connection_info: redis::ConnectionInfo,
54 password: Option<String>,
55 db: Option<u16>,
56 }
57
58 impl RedisConnectionManager {
59 pub fn new<T: redis::IntoConnectionInfo>(host: T, password: Option<&str>, db: Option<u16>)
60 -> Result<RedisConnectionManager, redis::RedisError> {
61 Ok(RedisConnectionManager {
62 connection_info: host.into_connection_info()?,
63 password: password.map(|s| { s.to_string() }),
64 db,
65 })
66 }
67 }
68
69 impl r2d2::ManageConnection for RedisConnectionManager {
70 type Connection = redis::Connection;
71 type Error = Error;
72
73 fn connect(&self) -> Result<redis::Connection, Error> {
74 let client = redis::Client::open(self.connection_info.clone())?;
75 let conn = client.get_connection()?;
76
77 if let Some(ref p) = self.password {
78 if !cmd("AUTH").arg(p).query::<bool>(&conn)? {
79 return Err(Error::Other("Redis authentication failed: Bad password".to_string()));
80 }
81 }
82
83 if let Some(db) = self.db {
84 if !cmd("SELECT").arg(db).query::<bool>(&conn)? {
85 return Err(Error::Other(format!("Redis server refused to switch database: Bad index ({:?})", db)));
86 }
87 }
88
89 Ok(conn)
90 }
91
92 fn is_valid(&self, conn: &mut redis::Connection) -> Result<(), Error> {
93 Ok(redis::cmd("PING").query(conn)?)
94 }
95
96 fn has_broken(&self, _conn: &mut redis::Connection) -> bool {
97 false
98 }
99 }
100}
101
102
103#[allow(dead_code)]
104pub struct RedisCache {
105 connection_pool: Pool<r2d2_test::RedisConnectionManager>,
106}
107
108impl Clone for RedisCache {
109 fn clone(&self) -> Self {
110 RedisCache {
111 connection_pool: self.connection_pool.clone()
112 }
113 }
114}
115
116impl RedisCache {
117 pub fn new(host: &str, password: Option<&str>, db: Option<u16>) -> Result<RedisCache> {
118 let host_vec: Vec<&str> = host.split(":").collect();
119
120 let ips: Vec<net::IpAddr> = match lookup_host(host_vec[0]) {
121 Ok(hosts) => hosts,
122 Err(e) => return Err(CacheError::Other(e.to_string())),
123 };
124
125 if let Some((_, ip_v4)) = ips.iter()
126 .enumerate()
127 .find(|&(_index, ip)| {
128 discriminant(ip) == discriminant(&net::IpAddr::V4(net::Ipv4Addr::new(0, 0, 0, 0)))
129 }) {
130 let ip_host = if host_vec.len() > 1 {
131 format!("{}:{}", ip_v4.to_string(), host_vec[1])
132 } else {
133 ip_v4.to_string()
134 };
135
136 let url = format!("redis://{}", ip_host);
137
138 let manager = match r2d2_test::RedisConnectionManager::new(url.as_str(), password, db) {
139 Ok(m) => m,
140 Err(e) => return Err(CacheError::Other(e.to_string())),
141 };
142
143 let connection_pool = match Pool::builder()
144 .max_size(15)
145 .connection_timeout(::std::time::Duration::from_millis(DB_CONNECTION_TIMEOUT_MS as u64))
146 .build(manager) {
147 Ok(cp) => cp,
148 Err(e) => return Err(CacheError::Other(e.to_string())),
149 };
150
151 return Ok(RedisCache {
152 connection_pool,
153 });
154 }
155
156 Err(CacheError::Other(format!("Could'n find any valid IP for host {} ", host)))
157 }
158}
159
160impl CacheAccess for RedisCache {
161 fn insert<K: ToString, O: Cacheable + Clone + 'static>(&self, key: K, obj: O) -> Result<()> {
162 let exp = obj.expires_after();
163 self.insert_with(key, obj, exp)
164 }
165
166 fn insert_with<K: ToString, O: Cacheable + Clone + 'static>(&self, key: K, obj: O, expires_after: Option<usize>) -> Result<()> {
167 let connection = match self.connection_pool.get() {
168 Ok(con) => con,
169 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
170 };
171
172 let redis_key = redis_key_create::<K, O>(key);
173 let data = obj.to_redis_obj();
174 if let Some(ttl) = expires_after {
175 redis_hash_set_multiple_with_expire(&connection, redis_key, &data, ttl)
176 } else {
177 redis_hash_set_multiple(&connection, redis_key, &data)
178 }
179 }
180
181 fn get<K: ToString, O: Cacheable + 'static>(&self, key: K) -> Result<Option<O>> {
182 let connection = match self.connection_pool.get() {
183 Ok(con) => con,
184 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
185 };
186
187 let redis_key = redis_key_create::<K, O>(key);
188 if let Ok(val) = redis_hash_get_all(&connection, redis_key) {
189 if let Ok(c) = O::from_redis_obj(val) {
190 Ok(Some(c))
191 } else {
192 Ok(None)
193 }
194 } else {
195 Ok(None)
196 }
197 }
198
199 fn contains_key<K: ToString, O: Cacheable + Clone + 'static>(&self, key: K) -> Result<bool> {
200 let connection = match self.connection_pool.get() {
201 Ok(con) => con,
202 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
203 };
204
205 let redis_key = redis_key_create::<K, O>(key);
206
207 redis_key_exists(&connection, redis_key)
208 }
209
210 fn remove<K: ToString, O: Cacheable>(&self, key: K) -> Result<()> {
211 let connection = match self.connection_pool.get() {
212 Ok(con) => con,
213 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
214 };
215
216 let redis_key = redis_key_create::<K, O>(key);
217 redis_delete(&connection, redis_key)
218 }
219}
220
221fn redis_key_create<K: ToString, O: Cacheable>(key: K) -> String {
222 format!("{}:{}", O::model_name(), key.to_string())
223}
224
225fn redis_hash_set_multiple_with_expire<F: redis::ToRedisArgs, V: redis::ToRedisArgs>(con: &redis::Connection, key: String, v: &[(F, V)], ttl_sec: usize) -> Result<()> {
226 redis_hash_set_multiple(con, key.clone(), v)?;
227 con.expire(key, ttl_sec).map(|_: ::redis::Value| ()).map_err( |e| e.into())
228}
229
230fn redis_hash_set_multiple<F: redis::ToRedisArgs, V: redis::ToRedisArgs>(con: &redis::Connection, key: String, v: &[(F, V)]) -> Result<()> {
231 con.hset_multiple::<String, F, V, ()>(key, v).map_err( |e| e.into())
232}
233
234fn redis_hash_get_all(con: &redis::Connection, key: String) -> Result<HashMap<String, String>> {
235 con.hgetall::<String, HashMap<String, String>>(key).map_err( |e| e.into())
236}
237
238fn redis_delete(con: &redis::Connection, key: String) -> Result<()> {
239 con.del::<String, ()>(key).map_err( |e| e.into())
240}
241
242fn redis_key_exists(con: &redis::Connection, key: String) -> Result<bool> {
243 con.exists::<String, bool>(key).map_err( |e| e.into())
244}
245
246impl CacheFunc for RedisCache {
247 fn hash_delete(&self, key: &str, fields: &[&str]) -> Result<bool> {
248 let connection = match self.connection_pool.get() {
249 Ok(con) => con,
250 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
251 };
252
253 connection.hdel(key, fields).map_err(|e| e.into())
254 }
255
256 fn hash_exists(&self, key: &str, field: &str) -> Result<bool> {
257 let connection = match self.connection_pool.get() {
258 Ok(con) => con,
259 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
260 };
261 connection.hexists(key, field).map_err(|e| e.into())
262 }
263
264 fn hash_get<T: FromStr>(&self, key: &str, field: &str) -> Result<Option<T>> {
265 let connection = match self.connection_pool.get() {
266 Ok(con) => con,
267 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
268 };
269
270 let redis_val: ::redis::Value = connection.hget(key, field)?;
271 if let ::redis::Value::Nil = redis_val {
272 return Ok(None);
273 }
274
275 let val = String::from_redis_value(&redis_val)?;
276 return T::from_str(val.as_ref()).map(|t| Some(t)).map_err(|_| CacheError::Other("An error occured while parsing a redis value".to_string()))
277 }
278
279 fn hash_get_all<T: Cacheable + Clone + 'static>(&self, key: &str) -> Result<Option<T>> {
280 let connection = match self.connection_pool.get() {
281 Ok(con) => con,
282 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
283 };
284 let map: HashMap<String, String> = connection.hgetall(key)?;
285 T::from_redis_obj(map).map(|t| Some(t))
286 }
287
288 fn hash_keys(&self, key: &str) -> Result<Vec<String>> {
289 let connection = match self.connection_pool.get() {
290 Ok(con) => con,
291 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
292 };
293 connection.hkeys(key).map_err(|e| e.into())
294 }
295
296 fn hash_len(&self, key: &str) -> Result<usize> {
297 let connection = match self.connection_pool.get() {
298 Ok(con) => con,
299 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
300 };
301 connection.hlen(key).map_err(|e| e.into())
302 }
303
304 fn hash_multiple_get(&self, _key: &str, _fields: &[&str]) -> Result<Vec<Option<String>>> {
305 unimplemented!()
306 }
307
308 fn hash_multiple_set<V: ToString>(&self, key: &str, fv_pairs: &[(&str, V)]) -> Result<bool> {
309 let connection = match self.connection_pool.get() {
310 Ok(con) => con,
311 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
312 };
313 let intermediate_vec = fv_pairs.iter().map(|&(ref s, ref v)| (s.to_string(), v.to_string())).collect::<Vec<(String, String)>>();
314 connection.hset_multiple(key, &intermediate_vec).map_err(|e| e.into())
315 }
316
317 fn hash_set<V: ToString>(&self, key: &str, field: &str, value: V) -> Result<bool> {
318 let connection = match self.connection_pool.get() {
319 Ok(con) => con,
320 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
321 };
322 connection.hset(key, field, value.to_string()).map_err(|e| e.into())
323 }
324
325 fn hash_set_all<T: Cacheable + Clone + 'static>(&self, key: &str, cacheable: T) -> Result<bool> {
326 let connection = match self.connection_pool.get() {
327 Ok(con) => con,
328 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
329 };
330 let fv_pairs = cacheable.to_redis_obj();
331 connection.hset_multiple(key, &fv_pairs).map_err(|e| e.into())
332 }
333
334 fn hash_set_if_not_exists<V: ToString>(&self, key: &str, field: &str, value: V) -> Result<bool> {
335 let connection = match self.connection_pool.get() {
336 Ok(con) => con,
337 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
338 };
339 connection.hset_nx(key, field, value.to_string()).map_err(|e| e.into())
340 }
341
342 fn hash_values(&self, key: &str) -> Result<Vec<String>> {
343 let connection = match self.connection_pool.get() {
344 Ok(con) => con,
345 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
346 };
347 connection.hvals(key).map_err(|e| e.into())
348 }
349
350 fn set_add<V: ToString>(&self, key: &str, members: &[V]) -> Result<bool> {
351 let connection = match self.connection_pool.get() {
352 Ok(con) => con,
353 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
354 };
355 let string_members = members.iter().map(|m| m.to_string()).collect::<Vec<String>>();
356 connection.sadd(key, string_members).map_err(|e| e.into())
357 }
358
359 fn set_card(&self, key: &str) -> Result<u64> {
360 let connection = match self.connection_pool.get() {
361 Ok(con) => con,
362 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
363 };
364 connection.scard(key).map_err(|e| e.into())
365 }
366
367 fn set_diff(&self, keys: &[&str]) -> Result<Vec<String>> {
368 let connection = match self.connection_pool.get() {
369 Ok(con) => con,
370 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
371 };
372 connection.sdiff(keys).map_err(|e| e.into())
373 }
374
375 fn set_diffstore(&self, diff_name: &str, keys: &[&str]) -> Result<u64> {
376 let connection = match self.connection_pool.get() {
377 Ok(con) => con,
378 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
379 };
380 ::redis::cmd("SDIFFSTORE").arg(diff_name).arg(keys).query(&*connection).map_err(|e| e.into())
381 }
382
383 fn set_inter(&self, keys: &[&str]) -> Result<Vec<String>> {
384 let connection = match self.connection_pool.get() {
385 Ok(con) => con,
386 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
387 };
388 connection.sinter(keys).map_err(|e| e.into())
389 }
390
391 fn set_interstore(&self, inter_name: &str, keys: &[&str]) -> Result<u64> {
392 let connection = match self.connection_pool.get() {
393 Ok(con) => con,
394 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
395 };
396 ::redis::cmd("SINTERSTORE").arg(inter_name).arg(keys).query(&*connection).map_err(|e| e.into())
397 }
398
399 fn set_ismember<V: ToString>(&self, key: &str, member: V) -> Result<bool> {
400 let connection = match self.connection_pool.get() {
401 Ok(con) => con,
402 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
403 };
404 connection.sismember(key, member.to_string()).map_err(|e|e.into())
405 }
406
407 fn set_members(&self, key: &str) -> Result<Vec<String>> {
408 let connection = match self.connection_pool.get() {
409 Ok(con) => con,
410 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
411 };
412 connection.smembers(key).map_err(|e|e.into())
413 }
414
415 fn set_move<V: ToString>(&self, key1: &str, key2: &str, member: V) -> Result<bool> {
416 let connection = match self.connection_pool.get() {
417 Ok(con) => con,
418 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
419 };
420 connection.smove(key1, key2, member.to_string()).map_err(|e|e.into())
421 }
422
423 fn set_rem<V: ToString>(&self, key: &str, member: V) -> Result<bool> {
424 let connection = match self.connection_pool.get() {
425 Ok(con) => con,
426 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
427 };
428 connection.srem(key, member.to_string()).map_err(|e|e.into())
429 }
430
431 fn set_union(&self, keys: &[&str]) -> Result<Vec<String>> {
432 let connection = match self.connection_pool.get() {
433 Ok(con) => con,
434 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
435 };
436 connection.sunion(keys).map_err(|e| e.into())
437 }
438
439 fn set_unionstore(&self, union_name: &str, keys: &[&str]) -> Result<u64> {
440 let connection = match self.connection_pool.get() {
441 Ok(con) => con,
442 Err(e) => return Err(CacheError::ConnectionError(e.to_string())),
443 };
444 ::redis::cmd("SUNIONSTORE").arg(union_name).arg(keys).query(&*connection).map_err(|e| e.into())
445 }
446}