Skip to main content

redlock/
redlock.rs

1use std::fs::File;
2use std::io::{self, Read};
3use std::thread::sleep;
4use std::time::{Duration, Instant};
5
6use rand::{thread_rng, Rng};
7use redis::Value::Okay;
8use redis::{Client, IntoConnectionInfo, RedisError, RedisResult, Value};
9
10const DEFAULT_RETRY_COUNT: u32 = 3;
11const DEFAULT_RETRY_DELAY: u32 = 200;
12const CLOCK_DRIFT_FACTOR: f32 = 0.01;
13const UNLOCK_SCRIPT: &str = r"if redis.call('get',KEYS[1]) == ARGV[1] then
14                                return redis.call('del',KEYS[1])
15                              else
16                                return 0
17                              end";
18
19/// The lock manager.
20///
21/// Implements the necessary functionality to acquire and release locks
22/// and handles the Redis connections.
23#[derive(Debug, Clone)]
24pub struct RedLock {
25    /// List of all Redis clients
26    pub servers: Vec<Client>,
27    quorum: u32,
28    retry_count: u32,
29    retry_delay: u32,
30}
31
32pub struct Lock<'a> {
33    /// The resource to lock. Will be used as the key in Redis.
34    pub resource: Vec<u8>,
35    /// The value for this lock.
36    pub val: Vec<u8>,
37    /// Time the lock is still valid.
38    /// Should only be slightly smaller than the requested TTL.
39    pub validity_time: usize,
40    /// Used to limit the lifetime of a lock to its lock manager.
41    pub lock_manager: &'a RedLock,
42}
43
44pub struct RedLockGuard<'a> {
45    pub lock: Lock<'a>,
46}
47
48impl Drop for RedLockGuard<'_> {
49    fn drop(&mut self) {
50        self.lock.lock_manager.unlock(&self.lock);
51    }
52}
53
54impl RedLock {
55    /// Create a new lock manager instance, defined by the given Redis connection uris.
56    /// Quorum is defined to be N/2+1, with N being the number of given Redis instances.
57    ///
58    /// Sample URI: `"redis://127.0.0.1:6379"`
59    pub fn new<T: AsRef<str> + IntoConnectionInfo>(uris: Vec<T>) -> RedLock {
60        let servers: Vec<Client> = uris
61            .into_iter()
62            .map(|uri| Client::open(uri).unwrap())
63            .collect();
64
65        Self::with_clients(servers)
66    }
67
68    pub fn with_client(server: Client) -> RedLock {
69        Self::with_clients(vec![server])
70    }
71
72    /// Create a new lock manager instance, defined by the given Redis client instance.
73    /// Quorum is defined to be N/2+1, with N being the number of given client instances.
74    pub fn with_clients(servers: Vec<Client>) -> RedLock {
75        let quorum = (servers.len() as u32) / 2 + 1;
76
77        RedLock {
78            servers,
79            quorum,
80            retry_count: DEFAULT_RETRY_COUNT,
81            retry_delay: DEFAULT_RETRY_DELAY,
82        }
83    }
84
85    /// Get 20 random bytes from `/dev/urandom`.
86    pub fn get_unique_lock_id(&self) -> io::Result<Vec<u8>> {
87        let file = File::open("/dev/urandom")?;
88        let mut buf = Vec::with_capacity(20);
89        match file.take(20).read_to_end(&mut buf) {
90            Ok(20) => Ok(buf),
91            Ok(_containers) => Err(io::Error::new(
92                io::ErrorKind::Other,
93                "Can't read enough random bytes",
94            )),
95            Err(e) => Err(e),
96        }
97    }
98
99    /// Set retry count and retry delay.
100    ///
101    /// Retry count defaults to `3`.
102    /// Retry delay defaults to `200`.
103    pub fn set_retry(&mut self, count: u32, delay: u32) {
104        self.retry_count = count;
105        self.retry_delay = delay;
106    }
107
108    fn lock_instance(
109        &self,
110        client: &redis::Client,
111        resource: &[u8],
112        val: &[u8],
113        ttl: usize,
114    ) -> Result<bool, RedisError> {
115        client
116            .get_connection()
117            .and_then(|mut conn| {
118                redis::cmd("SET")
119                    .arg(resource)
120                    .arg(val)
121                    .arg("nx")
122                    .arg("px")
123                    .arg(ttl)
124                    .query::<Value>(&mut conn)
125            })
126            .map(|result| matches!(result, Okay))
127    }
128
129    /// Acquire the lock for the given resource and the requested TTL.
130    ///
131    /// If it succeeds, a `Lock` instance is returned,
132    /// including the value and the validity time
133    ///
134    /// `Err(RedisError)` is returned on any Redis error, `None` is returned if the lock could
135    /// not be acquired and the user should retry
136    pub fn lock(&self, resource: &[u8], ttl: usize) -> Result<Option<Lock>, RedisError> {
137        let val = self.get_unique_lock_id().unwrap();
138
139        let mut rng = thread_rng();
140
141        for _ in 0..self.retry_count {
142            let mut n = 0;
143            let start_time = Instant::now();
144            for client in &self.servers {
145                if self.lock_instance(client, resource, &val, ttl)? {
146                    n += 1;
147                }
148            }
149
150            let drift = (ttl as f32 * CLOCK_DRIFT_FACTOR) as usize + 2;
151            let elapsed = start_time.elapsed();
152            let validity_time = ttl
153                - drift
154                - elapsed.as_secs() as usize * 1000
155                - elapsed.subsec_nanos() as usize / 1_000_000;
156
157            if n >= self.quorum && validity_time > 0 {
158                return Ok(Some(Lock {
159                    lock_manager: self,
160                    resource: resource.to_vec(),
161                    val,
162                    validity_time,
163                }));
164            } else {
165                for client in &self.servers {
166                    self.unlock_instance(client, resource, &val);
167                }
168            }
169
170            let n = rng.gen_range(0..self.retry_delay);
171            sleep(Duration::from_millis(u64::from(n)));
172        }
173        Ok(None)
174    }
175
176    /// Acquire the lock for the given resource and the requested TTL. \
177    /// Will wait and yield current task (tokio runtime) until the lock \
178    /// is acquired
179    ///
180    /// Returns a `RedLockGuard` instance which is a RAII wrapper for \
181    /// the old `Lock` object
182    #[cfg(feature = "async")]
183    pub async fn acquire_async(
184        &self,
185        resource: &[u8],
186        ttl: usize,
187    ) -> Result<RedLockGuard<'_>, RedisError> {
188        let lock;
189        loop {
190            match self.lock(resource, ttl)? {
191                Some(l) => {
192                    lock = l;
193                    break;
194                }
195                None => tokio::task::yield_now().await,
196            }
197        }
198        Ok(RedLockGuard { lock })
199    }
200
201    pub fn acquire(&self, resource: &[u8], ttl: usize) -> Result<RedLockGuard<'_>, RedisError> {
202        let lock;
203        loop {
204            if let Some(l) = self.lock(resource, ttl)? {
205                lock = l;
206                break;
207            }
208        }
209        Ok(RedLockGuard { lock })
210    }
211
212    fn unlock_instance(&self, client: &redis::Client, resource: &[u8], val: &[u8]) -> bool {
213        let mut con = match client.get_connection() {
214            Err(_containers) => return false,
215            Ok(val) => val,
216        };
217        let script = redis::Script::new(UNLOCK_SCRIPT);
218        let result: RedisResult<i32> = script.key(resource).arg(val).invoke(&mut con);
219        match result {
220            Ok(val) => val == 1,
221            Err(_containers) => false,
222        }
223    }
224
225    /// Unlock the given lock.
226    ///
227    /// Unlock is best effort. It will simply try to contact all instances
228    /// and remove the key.
229    pub fn unlock(&self, lock: &Lock) {
230        for client in &self.servers {
231            self.unlock_instance(client, &lock.resource, &lock.val);
232        }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use anyhow::Result;
239    use testcontainers::clients::Cli;
240    use testcontainers::images::generic::GenericImage;
241    use testcontainers::Container;
242
243    use super::*;
244
245    fn init(docker: &Cli) -> (Option<Vec<Container<GenericImage>>>, Vec<String>) {
246        match std::env::var("ADDRESSES") {
247            Ok(addresses) => (None, addresses.split(',').map(String::from).collect()),
248            _ => {
249                let (containers, addresses) = start_container(docker);
250                (Some(containers), addresses)
251            }
252        }
253    }
254
255    fn start_container(docker: &Cli) -> (Vec<Container<GenericImage>>, Vec<String>) {
256        (0..3)
257            .map(|_| {
258                let container =
259                    docker.run(GenericImage::new("redis", "7-alpine").with_exposed_port(6379));
260                let address = format!("redis://localhost:{}", container.get_host_port_ipv4(6379));
261                (container, address)
262            })
263            .unzip()
264    }
265
266    #[test]
267    fn test_redlock_get_unique_id() -> Result<()> {
268        let rl = RedLock::new(Vec::<String>::new());
269        assert_eq!(rl.get_unique_lock_id()?.len(), 20);
270        Ok(())
271    }
272
273    #[test]
274    fn test_redlock_get_unique_id_uniqueness() -> Result<()> {
275        let rl = RedLock::new(Vec::<String>::new());
276
277        let id1 = rl.get_unique_lock_id()?;
278        let id2 = rl.get_unique_lock_id()?;
279
280        assert_eq!(20, id1.len());
281        assert_eq!(20, id2.len());
282        assert_ne!(id1, id2);
283        Ok(())
284    }
285
286    #[test]
287    fn test_redlock_valid_instance() {
288        let docker = Cli::default();
289        let (_containers, addresses) = init(&docker);
290        let rl = RedLock::new(addresses);
291        assert_eq!(3, rl.servers.len());
292        assert_eq!(2, rl.quorum);
293    }
294
295    #[test]
296    fn test_redlock_direct_unlock_fails() -> Result<()> {
297        let docker = Cli::default();
298        let (_containers, addresses) = init(&docker);
299        let rl = RedLock::new(addresses);
300        let key = rl.get_unique_lock_id()?;
301
302        let val = rl.get_unique_lock_id()?;
303        assert!(!rl.unlock_instance(&rl.servers[0], &key, &val));
304        Ok(())
305    }
306
307    #[test]
308    fn test_redlock_direct_unlock_succeeds() -> Result<()> {
309        let docker = Cli::default();
310        let (_containers, addresses) = init(&docker);
311        let rl = RedLock::new(addresses);
312        let key = rl.get_unique_lock_id()?;
313
314        let val = rl.get_unique_lock_id()?;
315        let mut con = rl.servers[0].get_connection()?;
316        redis::cmd("SET").arg(&key).arg(&val).execute(&mut con);
317
318        assert!(rl.unlock_instance(&rl.servers[0], &key, &val));
319        Ok(())
320    }
321
322    #[test]
323    fn test_redlock_direct_lock_succeeds() -> Result<()> {
324        let docker = Cli::default();
325        let (_containerscontainers, addresses) = init(&docker);
326        let rl = RedLock::new(addresses);
327        let key = rl.get_unique_lock_id()?;
328
329        let val = rl.get_unique_lock_id()?;
330        let mut con = rl.servers[0].get_connection()?;
331
332        redis::cmd("DEL").arg(&key).execute(&mut con);
333        assert!(rl.lock_instance(&rl.servers[0], &key, &val, 1000)?);
334        Ok(())
335    }
336
337    #[test]
338    fn test_redlock_unlock() -> Result<()> {
339        let docker = Cli::default();
340        let (_containers, addresses) = init(&docker);
341        let rl = RedLock::new(addresses);
342        let key = rl.get_unique_lock_id()?;
343
344        let val = rl.get_unique_lock_id()?;
345        let mut con = rl.servers[0].get_connection()?;
346        let _: () = redis::cmd("SET")
347            .arg(&key)
348            .arg(&val)
349            .query(&mut con)
350            .unwrap();
351
352        let lock = Lock {
353            lock_manager: &rl,
354            resource: key,
355            val,
356            validity_time: 0,
357        };
358        rl.unlock(&lock);
359        Ok(())
360    }
361
362    #[test]
363    fn test_redlock_lock() -> Result<()> {
364        let docker = Cli::default();
365        let (_containers, addresses) = init(&docker);
366        let rl = RedLock::new(addresses);
367
368        let key = rl.get_unique_lock_id()?;
369        match rl.lock(&key, 1000)? {
370            Some(lock) => {
371                assert_eq!(key, lock.resource);
372                assert_eq!(20, lock.val.len());
373                assert!(lock.validity_time > 900);
374                assert!(
375                    lock.validity_time > 900,
376                    "validity time: {}",
377                    lock.validity_time
378                );
379            }
380            None => panic!("Lock failed"),
381        }
382        Ok(())
383    }
384
385    #[test]
386    fn test_redlock_lock_unlock() -> Result<()> {
387        let docker = Cli::default();
388        let (_containers, addresses) = init(&docker);
389        let rl = RedLock::new(addresses.to_owned());
390        let rl2 = RedLock::new(addresses);
391
392        let key = rl.get_unique_lock_id()?;
393
394        let lock = rl.lock(&key, 1000)?.unwrap();
395        assert!(
396            lock.validity_time > 900,
397            "validity time: {}",
398            lock.validity_time
399        );
400
401        if let Some(_containersl) = rl2.lock(&key, 1000)? {
402            panic!("Lock acquired, even though it should be locked")
403        }
404
405        rl.unlock(&lock);
406
407        match rl2.lock(&key, 1000)? {
408            Some(l) => assert!(l.validity_time > 900),
409            None => panic!("Lock couldn't be acquired"),
410        }
411        Ok(())
412    }
413
414    #[test]
415    fn test_redlock_lock_unlock_raii() -> Result<()> {
416        let docker = Cli::default();
417        let (_containers, addresses) = init(&docker);
418        let rl = RedLock::new(addresses.to_owned());
419        let rl2 = RedLock::new(addresses);
420
421        let key = rl.get_unique_lock_id()?;
422        {
423            let lock_guard = rl.acquire(&key, 1000)?;
424            let lock = &lock_guard.lock;
425            assert!(
426                lock.validity_time > 900,
427                "validity time: {}",
428                lock.validity_time
429            );
430
431            if let Some(_containersl) = rl2.lock(&key, 1000)? {
432                panic!("Lock acquired, even though it should be locked")
433            }
434        }
435
436        match rl2.lock(&key, 1000)? {
437            Some(l) => assert!(l.validity_time > 900),
438            None => panic!("Lock couldn't be acquired"),
439        }
440        Ok(())
441    }
442
443    #[test]
444    fn test_redlock_lock_error() -> Result<()> {
445        let rl = RedLock::new(vec!["redis://nonexistent"]);
446        let key = rl.get_unique_lock_id()?;
447        match rl.lock(&key, 1000) {
448            Ok(_containers) => panic!("Expected error"),
449            Err(e) => assert!(e.is_io_error()),
450        }
451        Ok(())
452    }
453}