redsync/
redsync.rs

1use crate::builder::RedsyncBuilder;
2use crate::errors::{MultiError, RedsyncError};
3use crate::instance::Instance;
4
5use std::ops::{Add, Sub};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use rand::distributions::Alphanumeric;
10use rand::{thread_rng, Rng};
11
12/// `Lock` holds the metadata of an acquired lock.
13pub struct Lock {
14    pub resource: String,
15    pub value: String,
16    pub ttl: Duration,
17    pub expiry: Instant,
18}
19
20/// `Redsync` is a distributed lock manager that implements the Redlock algorithm.
21pub struct Redsync<I: Instance> {
22    pub(crate) cluster: Vec<I>,
23    pub(crate) quorum: u32,
24    pub(crate) retry_count: u32,
25    pub(crate) retry_delay: Duration,
26    pub(crate) retry_jitter: f64,
27    pub(crate) drift_factor: f64,
28}
29
30enum Call {
31    Lock,
32    Extend,
33}
34
35impl<I: Instance> Redsync<I> {
36    pub fn new(cluster: Vec<I>) -> Self {
37        RedsyncBuilder::new(cluster).build()
38    }
39
40    pub fn lock(&self, resource: &str, ttl: Duration) -> Result<Lock, RedsyncError> {
41        let value = self.get_unique_lock_id();
42        self.call(Call::Lock, resource, &value, ttl)
43    }
44
45    pub fn extend(&self, lock: &Lock, ttl: Duration) -> Result<Lock, RedsyncError> {
46        self.call(Call::Extend, &lock.resource, &lock.value, ttl)
47    }
48
49    fn call(
50        &self,
51        call: Call,
52        resource: &str,
53        value: &str,
54        ttl: Duration,
55    ) -> Result<Lock, RedsyncError> {
56        let drift = Duration::from_millis((ttl.as_millis() as f64 * self.drift_factor) as u64 + 2);
57
58        let mut errors = MultiError::new();
59
60        for attempt in 1..=self.retry_count {
61            let mut votes = 0;
62            let start = Instant::now();
63
64            let lock = Lock {
65                resource: String::from(resource),
66                value: String::from(value),
67                ttl,
68                expiry: start + ttl - drift,
69            };
70
71            for instance in &self.cluster {
72                let result = match call {
73                    Call::Lock => instance.acquire(&lock),
74                    Call::Extend => instance.extend(&lock),
75                };
76
77                match result {
78                    Ok(()) => votes += 1,
79                    Err(e) => errors.push(e),
80                }
81            }
82
83            if votes >= self.quorum && lock.expiry > Instant::now() {
84                return Ok(lock);
85            }
86
87            let _ = self.unlock(&lock);
88            if attempt < self.retry_count {
89                errors.reset();
90                thread::sleep(self.get_retry_delay());
91            }
92        }
93
94        match call {
95            Call::Lock => Err(RedsyncError::LockRetriesExceeded(errors)),
96            Call::Extend => Err(RedsyncError::ExtendRetriesExceeded(errors)),
97        }
98    }
99
100    pub fn unlock(&self, lock: &Lock) -> Result<(), RedsyncError> {
101        let mut n = 0;
102        let mut errors = MultiError::new();
103
104        for instance in &self.cluster {
105            match instance.release(lock) {
106                Ok(()) => n += 1,
107                Err(e) => errors.push(e),
108            };
109        }
110
111        if n < self.quorum {
112            return Err(RedsyncError::UnlockFailed(errors));
113        }
114
115        Ok(())
116    }
117
118    fn get_unique_lock_id(&self) -> String {
119        thread_rng()
120            .sample_iter(&Alphanumeric)
121            .take(20)
122            .map(char::from)
123            .collect()
124    }
125
126    fn get_retry_delay(&self) -> Duration {
127        let jitter = thread_rng().gen_range(-1.0..1.0) * self.retry_jitter;
128        if jitter > 0.0 {
129            self.retry_delay.add(Duration::from_millis(jitter as u64))
130        } else {
131            self.retry_delay.sub(Duration::from_millis(-jitter as u64))
132        }
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use std::matches;
140
141    struct FakeInstance {
142        acquire: i32,
143        extend: i32,
144        release: i32,
145    }
146
147    impl FakeInstance {
148        pub fn new(acquire: i32, extend: i32, release: i32) -> Self {
149            Self {
150                acquire,
151                extend,
152                release,
153            }
154        }
155    }
156
157    impl Instance for FakeInstance {
158        fn acquire(&self, _lock: &Lock) -> Result<(), RedsyncError> {
159            match self.acquire {
160                1 => Ok(()),
161                _ => Err(RedsyncError::ResourceLocked),
162            }
163        }
164
165        fn extend(&self, _lock: &Lock) -> Result<(), RedsyncError> {
166            match self.extend {
167                1 => Ok(()),
168                _ => Err(RedsyncError::InvalidLease),
169            }
170        }
171
172        fn release(&self, _lock: &Lock) -> Result<(), RedsyncError> {
173            match self.release {
174                1 => Ok(()),
175                _ => Err(RedsyncError::InvalidLease),
176            }
177        }
178    }
179
180    #[test]
181    fn lock() {
182        let dlm = Redsync::new(vec![
183            FakeInstance::new(1, 1, 1),
184            FakeInstance::new(1, 1, 1),
185            FakeInstance::new(0, 1, 1),
186        ]);
187
188        let attempt = dlm.lock("test", Duration::from_secs(1));
189        assert!(attempt.is_ok());
190
191        let lock = attempt.unwrap();
192        assert_eq!(lock.resource, "test");
193        assert!(lock.value.len() > 0);
194        assert_eq!(lock.ttl, Duration::from_secs(1));
195    }
196
197    #[test]
198    fn lock_error() {
199        let dlm = Redsync::new(vec![
200            FakeInstance::new(0, 1, 1),
201            FakeInstance::new(0, 1, 1),
202            FakeInstance::new(1, 1, 1),
203        ]);
204
205        let attempt = dlm.lock("test", Duration::from_secs(1));
206        assert!(matches!(
207            attempt,
208            Err(RedsyncError::LockRetriesExceeded { .. })
209        ));
210    }
211
212    #[test]
213    fn extend() -> Result<(), RedsyncError> {
214        let dlm = Redsync::new(vec![
215            FakeInstance::new(1, 1, 1),
216            FakeInstance::new(1, 1, 1),
217            FakeInstance::new(1, 0, 1),
218        ]);
219        let lock = dlm.lock("test", Duration::from_secs(1))?;
220
221        let attempt = dlm.extend(&lock, Duration::from_secs(2));
222        assert!(attempt.is_ok());
223
224        let lock = attempt.unwrap();
225        assert_eq!(lock.resource, "test");
226        assert!(lock.value.len() > 0);
227        assert_eq!(lock.ttl, Duration::from_secs(2));
228
229        Ok(())
230    }
231
232    #[test]
233    fn extend_error() -> Result<(), RedsyncError> {
234        let dlm = Redsync::new(vec![
235            FakeInstance::new(1, 0, 1),
236            FakeInstance::new(1, 0, 1),
237            FakeInstance::new(1, 1, 1),
238        ]);
239        let lock = dlm.lock("test", Duration::from_secs(1))?;
240
241        let attempt = dlm.extend(&lock, Duration::from_secs(2));
242        assert!(matches!(
243            attempt,
244            Err(RedsyncError::ExtendRetriesExceeded { .. })
245        ));
246
247        Ok(())
248    }
249
250    #[test]
251    fn unlock() -> Result<(), RedsyncError> {
252        let dlm = Redsync::new(vec![
253            FakeInstance::new(1, 1, 1),
254            FakeInstance::new(1, 1, 1),
255            FakeInstance::new(1, 1, 0),
256        ]);
257        let lock = dlm.lock("test", Duration::from_secs(1))?;
258
259        let attempt = dlm.unlock(&lock);
260        assert!(attempt.is_ok());
261
262        Ok(())
263    }
264
265    #[test]
266    fn unlock_error() -> Result<(), RedsyncError> {
267        let dlm = Redsync::new(vec![
268            FakeInstance::new(1, 1, 0),
269            FakeInstance::new(1, 1, 0),
270            FakeInstance::new(1, 1, 1),
271        ]);
272        let lock = dlm.lock("test", Duration::from_secs(1))?;
273
274        let attempt = dlm.unlock(&lock);
275        assert!(matches!(attempt, Err(RedsyncError::UnlockFailed { .. })));
276
277        Ok(())
278    }
279
280    #[test]
281    fn get_unique_lock_id() {
282        let cluster = vec![FakeInstance::new(1, 1, 1)];
283        let dlm = Redsync::new(cluster);
284
285        let value = dlm.get_unique_lock_id();
286        assert_eq!(value.len(), 20);
287        assert!(value.is_ascii());
288    }
289
290    #[test]
291    fn get_retry_delay() {
292        let cluster = vec![FakeInstance::new(1, 1, 1)];
293        let dlm = Redsync::new(cluster);
294
295        let retry_delay = dlm.get_retry_delay();
296        let (min, max) = (Duration::from_millis(100), Duration::from_millis(300));
297        assert!(
298            min < retry_delay && retry_delay < max,
299            "expected retry delay to be between {:?} and {:?}, but got {:?}",
300            min,
301            max,
302            retry_delay,
303        );
304    }
305}