use cache::{
Cache,
CacheRead,
CacheWrite,
Storage,
};
use errors::*;
use futures_cpupool::CpuPool;
use redis::{
cmd,
Client,
Commands,
Connection,
InfoDict,
};
use std::collections::HashMap;
use std::io::Cursor;
use std::time::{
Duration,
Instant,
};
#[derive(Clone)]
pub struct RedisCache {
url: String,
client: Client,
pool: CpuPool,
}
impl RedisCache {
pub fn new(url: &str, pool: &CpuPool) -> Result<RedisCache> {
Ok(RedisCache {
url: url.to_owned(),
client: Client::open(url)?,
pool: pool.clone(),
})
}
fn connect(&self) -> Result<Connection> {
self.client.get_connection()
.map_err(|e| e.into())
.and_then(|c| {
c.set_read_timeout(Some(Duration::from_millis(10_000)))?;
c.set_write_timeout(Some(Duration::from_millis(10_000)))?;
Ok(c)
})
}
}
impl Storage for RedisCache {
fn get(&self, key: &str) -> SFuture<Cache> {
let key = key.to_owned();
let me = self.clone();
Box::new(self.pool.spawn_fn(move || {
let c = me.connect()?;
let d = c.get::<&str, Vec<u8>>(&key)?;
if d.is_empty() {
Ok(Cache::Miss)
} else {
CacheRead::from(Cursor::new(d))
.map(Cache::Hit)
}
}))
}
fn put(&self, key: &str, entry: CacheWrite) -> SFuture<Duration> {
let key = key.to_owned();
let me = self.clone();
Box::new(self.pool.spawn_fn(move || {
let start = Instant::now();
let c = me.connect()?;
let d = entry.finish()?;
c.set::<&str, Vec<u8>, ()>(&key, d)?;
Ok(start.elapsed())
}))
}
fn location(&self) -> String {
format!("Redis: {}", self.url)
}
fn current_size(&self) -> Option<usize> {
self.connect().ok()
.and_then(|c| cmd("INFO").query(&c).ok())
.and_then(|i: InfoDict| i.get("used_memory"))
}
fn max_size(&self) -> Option<usize> {
self.connect().ok()
.and_then(|c| cmd("CONFIG").arg("GET").arg("maxmemory").query(&c).ok())
.and_then(|h: HashMap<String, usize>| h.get("maxmemory").map(|s| *s))
.and_then(|s| {
if s != 0 {
Some(s)
} else {
None
}
})
}
}