use rand::{thread_rng, Rng};
use redis::Value::Okay;
use redis::{RedisResult, Value};
use std::fs::File;
use std::io::{self, Read};
use std::thread::sleep;
use std::time::{Duration, Instant};
const DEFAULT_RETRY_COUNT: u32 = 3;
const DEFAULT_RETRY_DELAY: u32 = 200;
const CLOCK_DRIFT_FACTOR: f32 = 0.01;
const UNLOCK_SCRIPT: &str = r"if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end";
pub struct RedLock {
pub servers: Vec<redis::Client>,
quorum: u32,
retry_count: u32,
retry_delay: u32,
}
pub struct Lock<'a> {
pub resource: Vec<u8>,
pub val: Vec<u8>,
pub validity_time: usize,
pub lock_manager: &'a RedLock,
}
impl RedLock {
pub fn new(uris: Vec<&str>) -> RedLock {
let quorum = (uris.len() as u32) / 2 + 1;
let mut servers = Vec::with_capacity(uris.len());
for &uri in uris.iter() {
servers.push(redis::Client::open(uri).unwrap())
}
RedLock {
servers,
quorum,
retry_count: DEFAULT_RETRY_COUNT,
retry_delay: DEFAULT_RETRY_DELAY,
}
}
pub fn get_unique_lock_id(&self) -> io::Result<Vec<u8>> {
let file = File::open("/dev/urandom")?;
let mut buf = Vec::with_capacity(20);
match file.take(20).read_to_end(&mut buf) {
Ok(20) => Ok(buf),
Ok(_) => Err(io::Error::new(
io::ErrorKind::Other,
"Can't read enough random bytes",
)),
Err(e) => Err(e),
}
}
pub fn set_retry(&mut self, count: u32, delay: u32) {
self.retry_count = count;
self.retry_delay = delay;
}
fn lock_instance(
&self,
client: &redis::Client,
resource: &[u8],
val: &[u8],
ttl: usize,
) -> bool {
let mut con = match client.get_connection() {
Err(_) => return false,
Ok(val) => val,
};
let result: RedisResult<Value> = redis::cmd("SET")
.arg(resource)
.arg(val)
.arg("nx")
.arg("px")
.arg(ttl)
.query(&mut con);
match result {
Ok(Okay) => true,
Ok(_) | Err(_) => false,
}
}
pub fn lock(&self, resource: &[u8], ttl: usize) -> Option<Lock> {
let val = self.get_unique_lock_id().unwrap();
let mut rng = thread_rng();
for _ in 0..self.retry_count {
let mut n = 0;
let start_time = Instant::now();
for client in &self.servers {
if self.lock_instance(client, resource, &val, ttl) {
n += 1;
}
}
let drift = (ttl as f32 * CLOCK_DRIFT_FACTOR) as usize + 2;
let elapsed = start_time.elapsed();
let validity_time = ttl
- drift
- elapsed.as_secs() as usize * 1000
- elapsed.subsec_nanos() as usize / 1_000_000;
if n >= self.quorum && validity_time > 0 {
return Some(Lock {
lock_manager: self,
resource: resource.to_vec(),
val,
validity_time,
});
} else {
for client in &self.servers {
self.unlock_instance(client, resource, &val);
}
}
let n = rng.gen_range(0..self.retry_delay);
sleep(Duration::from_millis(u64::from(n)));
}
None
}
fn unlock_instance(&self, client: &redis::Client, resource: &[u8], val: &[u8]) -> bool {
let mut con = match client.get_connection() {
Err(_) => return false,
Ok(val) => val,
};
let script = redis::Script::new(UNLOCK_SCRIPT);
let result: RedisResult<i32> = script.key(resource).arg(val).invoke(&mut con);
match result {
Ok(val) => val == 1,
Err(_) => false,
}
}
pub fn unlock(&self, lock: &Lock) {
for client in &self.servers {
self.unlock_instance(client, &lock.resource, &lock.val);
}
}
}
#[test]
fn test_redlock_get_unique_id() {
let rl = RedLock::new(vec![]);
match rl.get_unique_lock_id() {
Ok(id) => {
assert_eq!(20, id.len());
}
err => panic!("Error thrown: {:?}", err),
}
}
#[test]
fn test_redlock_get_unique_id_uniqueness() {
let rl = RedLock::new(vec![]);
let id1 = rl.get_unique_lock_id().unwrap();
let id2 = rl.get_unique_lock_id().unwrap();
assert_eq!(20, id1.len());
assert_eq!(20, id2.len());
assert!(id1 != id2);
}
#[test]
fn test_redlock_valid_instance() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
assert_eq!(3, rl.servers.len());
assert_eq!(2, rl.quorum);
}
#[test]
fn test_redlock_direct_unlock_fails() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let key = rl.get_unique_lock_id().unwrap();
let val = rl.get_unique_lock_id().unwrap();
assert_eq!(false, rl.unlock_instance(&rl.servers[0], &key, &val))
}
#[test]
fn test_redlock_direct_unlock_succeeds() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let key = rl.get_unique_lock_id().unwrap();
let val = rl.get_unique_lock_id().unwrap();
let mut con = rl.servers[0].get_connection().unwrap();
redis::cmd("SET").arg(&*key).arg(&*val).execute(&mut con);
assert_eq!(true, rl.unlock_instance(&rl.servers[0], &key, &val))
}
#[test]
fn test_redlock_direct_lock_succeeds() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let key = rl.get_unique_lock_id().unwrap();
let val = rl.get_unique_lock_id().unwrap();
let mut con = rl.servers[0].get_connection().unwrap();
redis::cmd("DEL").arg(&*key).execute(&mut con);
assert_eq!(true, rl.lock_instance(&rl.servers[0], &*key, &*val, 1000))
}
#[test]
fn test_redlock_unlock() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let key = rl.get_unique_lock_id().unwrap();
let val = rl.get_unique_lock_id().unwrap();
let mut con = rl.servers[0].get_connection().unwrap();
let _: () = redis::cmd("SET")
.arg(&*key)
.arg(&*val)
.query(&mut con)
.unwrap();
let lock = Lock {
lock_manager: &rl,
resource: key,
val,
validity_time: 0,
};
assert_eq!((), rl.unlock(&lock))
}
#[test]
fn test_redlock_lock() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let key = rl.get_unique_lock_id().unwrap();
match rl.lock(&key, 1000) {
Some(lock) => {
assert_eq!(key, lock.resource);
assert_eq!(20, lock.val.len());
assert!(lock.validity_time > 900);
}
None => panic!("Lock failed"),
}
}
#[test]
fn test_redlock_lock_unlock() {
let rl = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let rl2 = RedLock::new(vec![
"redis://127.0.0.1:6380/",
"redis://127.0.0.1:6381/",
"redis://127.0.0.1:6382/",
]);
let key = rl.get_unique_lock_id().unwrap();
let lock = rl.lock(&key, 1000).unwrap();
assert!(lock.validity_time > 900);
match rl2.lock(&key, 1000) {
Some(_l) => panic!("Lock acquired, even though it should be locked"),
None => (),
}
rl.unlock(&lock);
match rl2.lock(&key, 1000) {
Some(l) => assert!(l.validity_time > 900),
None => panic!("Lock couldn't be acquired"),
}
}