1use std::fs::File;
2use std::io::{self, Read};
3use std::thread::sleep;
4use std::time::{Duration, Instant};
5
6use rand::{thread_rng, Rng};
7use redis::Value::Okay;
8use redis::{Client, IntoConnectionInfo, RedisError, RedisResult, Value};
9
10const DEFAULT_RETRY_COUNT: u32 = 3;
11const DEFAULT_RETRY_DELAY: u32 = 200;
12const CLOCK_DRIFT_FACTOR: f32 = 0.01;
13const UNLOCK_SCRIPT: &str = r"if redis.call('get',KEYS[1]) == ARGV[1] then
14 return redis.call('del',KEYS[1])
15 else
16 return 0
17 end";
18
19#[derive(Debug, Clone)]
24pub struct RedLock {
25 pub servers: Vec<Client>,
27 quorum: u32,
28 retry_count: u32,
29 retry_delay: u32,
30}
31
32pub struct Lock<'a> {
33 pub resource: Vec<u8>,
35 pub val: Vec<u8>,
37 pub validity_time: usize,
40 pub lock_manager: &'a RedLock,
42}
43
44pub struct RedLockGuard<'a> {
45 pub lock: Lock<'a>,
46}
47
48impl Drop for RedLockGuard<'_> {
49 fn drop(&mut self) {
50 self.lock.lock_manager.unlock(&self.lock);
51 }
52}
53
54impl RedLock {
55 pub fn new<T: AsRef<str> + IntoConnectionInfo>(uris: Vec<T>) -> RedLock {
60 let servers: Vec<Client> = uris
61 .into_iter()
62 .map(|uri| Client::open(uri).unwrap())
63 .collect();
64
65 Self::with_clients(servers)
66 }
67
68 pub fn with_client(server: Client) -> RedLock {
69 Self::with_clients(vec![server])
70 }
71
72 pub fn with_clients(servers: Vec<Client>) -> RedLock {
75 let quorum = (servers.len() as u32) / 2 + 1;
76
77 RedLock {
78 servers,
79 quorum,
80 retry_count: DEFAULT_RETRY_COUNT,
81 retry_delay: DEFAULT_RETRY_DELAY,
82 }
83 }
84
85 pub fn get_unique_lock_id(&self) -> io::Result<Vec<u8>> {
87 let file = File::open("/dev/urandom")?;
88 let mut buf = Vec::with_capacity(20);
89 match file.take(20).read_to_end(&mut buf) {
90 Ok(20) => Ok(buf),
91 Ok(_containers) => Err(io::Error::new(
92 io::ErrorKind::Other,
93 "Can't read enough random bytes",
94 )),
95 Err(e) => Err(e),
96 }
97 }
98
99 pub fn set_retry(&mut self, count: u32, delay: u32) {
104 self.retry_count = count;
105 self.retry_delay = delay;
106 }
107
108 fn lock_instance(
109 &self,
110 client: &redis::Client,
111 resource: &[u8],
112 val: &[u8],
113 ttl: usize,
114 ) -> Result<bool, RedisError> {
115 client
116 .get_connection()
117 .and_then(|mut conn| {
118 redis::cmd("SET")
119 .arg(resource)
120 .arg(val)
121 .arg("nx")
122 .arg("px")
123 .arg(ttl)
124 .query::<Value>(&mut conn)
125 })
126 .map(|result| matches!(result, Okay))
127 }
128
129 pub fn lock(&self, resource: &[u8], ttl: usize) -> Result<Option<Lock>, RedisError> {
137 let val = self.get_unique_lock_id().unwrap();
138
139 let mut rng = thread_rng();
140
141 for _ in 0..self.retry_count {
142 let mut n = 0;
143 let start_time = Instant::now();
144 for client in &self.servers {
145 if self.lock_instance(client, resource, &val, ttl)? {
146 n += 1;
147 }
148 }
149
150 let drift = (ttl as f32 * CLOCK_DRIFT_FACTOR) as usize + 2;
151 let elapsed = start_time.elapsed();
152 let validity_time = ttl
153 - drift
154 - elapsed.as_secs() as usize * 1000
155 - elapsed.subsec_nanos() as usize / 1_000_000;
156
157 if n >= self.quorum && validity_time > 0 {
158 return Ok(Some(Lock {
159 lock_manager: self,
160 resource: resource.to_vec(),
161 val,
162 validity_time,
163 }));
164 } else {
165 for client in &self.servers {
166 self.unlock_instance(client, resource, &val);
167 }
168 }
169
170 let n = rng.gen_range(0..self.retry_delay);
171 sleep(Duration::from_millis(u64::from(n)));
172 }
173 Ok(None)
174 }
175
176 #[cfg(feature = "async")]
183 pub async fn acquire_async(
184 &self,
185 resource: &[u8],
186 ttl: usize,
187 ) -> Result<RedLockGuard<'_>, RedisError> {
188 let lock;
189 loop {
190 match self.lock(resource, ttl)? {
191 Some(l) => {
192 lock = l;
193 break;
194 }
195 None => tokio::task::yield_now().await,
196 }
197 }
198 Ok(RedLockGuard { lock })
199 }
200
201 pub fn acquire(&self, resource: &[u8], ttl: usize) -> Result<RedLockGuard<'_>, RedisError> {
202 let lock;
203 loop {
204 if let Some(l) = self.lock(resource, ttl)? {
205 lock = l;
206 break;
207 }
208 }
209 Ok(RedLockGuard { lock })
210 }
211
212 fn unlock_instance(&self, client: &redis::Client, resource: &[u8], val: &[u8]) -> bool {
213 let mut con = match client.get_connection() {
214 Err(_containers) => return false,
215 Ok(val) => val,
216 };
217 let script = redis::Script::new(UNLOCK_SCRIPT);
218 let result: RedisResult<i32> = script.key(resource).arg(val).invoke(&mut con);
219 match result {
220 Ok(val) => val == 1,
221 Err(_containers) => false,
222 }
223 }
224
225 pub fn unlock(&self, lock: &Lock) {
230 for client in &self.servers {
231 self.unlock_instance(client, &lock.resource, &lock.val);
232 }
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use anyhow::Result;
239 use testcontainers::clients::Cli;
240 use testcontainers::images::generic::GenericImage;
241 use testcontainers::Container;
242
243 use super::*;
244
245 fn init(docker: &Cli) -> (Option<Vec<Container<GenericImage>>>, Vec<String>) {
246 match std::env::var("ADDRESSES") {
247 Ok(addresses) => (None, addresses.split(',').map(String::from).collect()),
248 _ => {
249 let (containers, addresses) = start_container(docker);
250 (Some(containers), addresses)
251 }
252 }
253 }
254
255 fn start_container(docker: &Cli) -> (Vec<Container<GenericImage>>, Vec<String>) {
256 (0..3)
257 .map(|_| {
258 let container =
259 docker.run(GenericImage::new("redis", "7-alpine").with_exposed_port(6379));
260 let address = format!("redis://localhost:{}", container.get_host_port_ipv4(6379));
261 (container, address)
262 })
263 .unzip()
264 }
265
266 #[test]
267 fn test_redlock_get_unique_id() -> Result<()> {
268 let rl = RedLock::new(Vec::<String>::new());
269 assert_eq!(rl.get_unique_lock_id()?.len(), 20);
270 Ok(())
271 }
272
273 #[test]
274 fn test_redlock_get_unique_id_uniqueness() -> Result<()> {
275 let rl = RedLock::new(Vec::<String>::new());
276
277 let id1 = rl.get_unique_lock_id()?;
278 let id2 = rl.get_unique_lock_id()?;
279
280 assert_eq!(20, id1.len());
281 assert_eq!(20, id2.len());
282 assert_ne!(id1, id2);
283 Ok(())
284 }
285
286 #[test]
287 fn test_redlock_valid_instance() {
288 let docker = Cli::default();
289 let (_containers, addresses) = init(&docker);
290 let rl = RedLock::new(addresses);
291 assert_eq!(3, rl.servers.len());
292 assert_eq!(2, rl.quorum);
293 }
294
295 #[test]
296 fn test_redlock_direct_unlock_fails() -> Result<()> {
297 let docker = Cli::default();
298 let (_containers, addresses) = init(&docker);
299 let rl = RedLock::new(addresses);
300 let key = rl.get_unique_lock_id()?;
301
302 let val = rl.get_unique_lock_id()?;
303 assert!(!rl.unlock_instance(&rl.servers[0], &key, &val));
304 Ok(())
305 }
306
307 #[test]
308 fn test_redlock_direct_unlock_succeeds() -> Result<()> {
309 let docker = Cli::default();
310 let (_containers, addresses) = init(&docker);
311 let rl = RedLock::new(addresses);
312 let key = rl.get_unique_lock_id()?;
313
314 let val = rl.get_unique_lock_id()?;
315 let mut con = rl.servers[0].get_connection()?;
316 redis::cmd("SET").arg(&key).arg(&val).execute(&mut con);
317
318 assert!(rl.unlock_instance(&rl.servers[0], &key, &val));
319 Ok(())
320 }
321
322 #[test]
323 fn test_redlock_direct_lock_succeeds() -> Result<()> {
324 let docker = Cli::default();
325 let (_containerscontainers, addresses) = init(&docker);
326 let rl = RedLock::new(addresses);
327 let key = rl.get_unique_lock_id()?;
328
329 let val = rl.get_unique_lock_id()?;
330 let mut con = rl.servers[0].get_connection()?;
331
332 redis::cmd("DEL").arg(&key).execute(&mut con);
333 assert!(rl.lock_instance(&rl.servers[0], &key, &val, 1000)?);
334 Ok(())
335 }
336
337 #[test]
338 fn test_redlock_unlock() -> Result<()> {
339 let docker = Cli::default();
340 let (_containers, addresses) = init(&docker);
341 let rl = RedLock::new(addresses);
342 let key = rl.get_unique_lock_id()?;
343
344 let val = rl.get_unique_lock_id()?;
345 let mut con = rl.servers[0].get_connection()?;
346 let _: () = redis::cmd("SET")
347 .arg(&key)
348 .arg(&val)
349 .query(&mut con)
350 .unwrap();
351
352 let lock = Lock {
353 lock_manager: &rl,
354 resource: key,
355 val,
356 validity_time: 0,
357 };
358 rl.unlock(&lock);
359 Ok(())
360 }
361
362 #[test]
363 fn test_redlock_lock() -> Result<()> {
364 let docker = Cli::default();
365 let (_containers, addresses) = init(&docker);
366 let rl = RedLock::new(addresses);
367
368 let key = rl.get_unique_lock_id()?;
369 match rl.lock(&key, 1000)? {
370 Some(lock) => {
371 assert_eq!(key, lock.resource);
372 assert_eq!(20, lock.val.len());
373 assert!(lock.validity_time > 900);
374 assert!(
375 lock.validity_time > 900,
376 "validity time: {}",
377 lock.validity_time
378 );
379 }
380 None => panic!("Lock failed"),
381 }
382 Ok(())
383 }
384
385 #[test]
386 fn test_redlock_lock_unlock() -> Result<()> {
387 let docker = Cli::default();
388 let (_containers, addresses) = init(&docker);
389 let rl = RedLock::new(addresses.to_owned());
390 let rl2 = RedLock::new(addresses);
391
392 let key = rl.get_unique_lock_id()?;
393
394 let lock = rl.lock(&key, 1000)?.unwrap();
395 assert!(
396 lock.validity_time > 900,
397 "validity time: {}",
398 lock.validity_time
399 );
400
401 if let Some(_containersl) = rl2.lock(&key, 1000)? {
402 panic!("Lock acquired, even though it should be locked")
403 }
404
405 rl.unlock(&lock);
406
407 match rl2.lock(&key, 1000)? {
408 Some(l) => assert!(l.validity_time > 900),
409 None => panic!("Lock couldn't be acquired"),
410 }
411 Ok(())
412 }
413
414 #[test]
415 fn test_redlock_lock_unlock_raii() -> Result<()> {
416 let docker = Cli::default();
417 let (_containers, addresses) = init(&docker);
418 let rl = RedLock::new(addresses.to_owned());
419 let rl2 = RedLock::new(addresses);
420
421 let key = rl.get_unique_lock_id()?;
422 {
423 let lock_guard = rl.acquire(&key, 1000)?;
424 let lock = &lock_guard.lock;
425 assert!(
426 lock.validity_time > 900,
427 "validity time: {}",
428 lock.validity_time
429 );
430
431 if let Some(_containersl) = rl2.lock(&key, 1000)? {
432 panic!("Lock acquired, even though it should be locked")
433 }
434 }
435
436 match rl2.lock(&key, 1000)? {
437 Some(l) => assert!(l.validity_time > 900),
438 None => panic!("Lock couldn't be acquired"),
439 }
440 Ok(())
441 }
442
443 #[test]
444 fn test_redlock_lock_error() -> Result<()> {
445 let rl = RedLock::new(vec!["redis://nonexistent"]);
446 let key = rl.get_unique_lock_id()?;
447 match rl.lock(&key, 1000) {
448 Ok(_containers) => panic!("Expected error"),
449 Err(e) => assert!(e.is_io_error()),
450 }
451 Ok(())
452 }
453}