1use crate::builder::RedsyncBuilder;
2use crate::errors::{MultiError, RedsyncError};
3use crate::instance::Instance;
4
5use std::ops::{Add, Sub};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use rand::distributions::Alphanumeric;
10use rand::{thread_rng, Rng};
11
12pub struct Lock {
14 pub resource: String,
15 pub value: String,
16 pub ttl: Duration,
17 pub expiry: Instant,
18}
19
20pub struct Redsync<I: Instance> {
22 pub(crate) cluster: Vec<I>,
23 pub(crate) quorum: u32,
24 pub(crate) retry_count: u32,
25 pub(crate) retry_delay: Duration,
26 pub(crate) retry_jitter: f64,
27 pub(crate) drift_factor: f64,
28}
29
30enum Call {
31 Lock,
32 Extend,
33}
34
35impl<I: Instance> Redsync<I> {
36 pub fn new(cluster: Vec<I>) -> Self {
37 RedsyncBuilder::new(cluster).build()
38 }
39
40 pub fn lock(&self, resource: &str, ttl: Duration) -> Result<Lock, RedsyncError> {
41 let value = self.get_unique_lock_id();
42 self.call(Call::Lock, resource, &value, ttl)
43 }
44
45 pub fn extend(&self, lock: &Lock, ttl: Duration) -> Result<Lock, RedsyncError> {
46 self.call(Call::Extend, &lock.resource, &lock.value, ttl)
47 }
48
49 fn call(
50 &self,
51 call: Call,
52 resource: &str,
53 value: &str,
54 ttl: Duration,
55 ) -> Result<Lock, RedsyncError> {
56 let drift = Duration::from_millis((ttl.as_millis() as f64 * self.drift_factor) as u64 + 2);
57
58 let mut errors = MultiError::new();
59
60 for attempt in 1..=self.retry_count {
61 let mut votes = 0;
62 let start = Instant::now();
63
64 let lock = Lock {
65 resource: String::from(resource),
66 value: String::from(value),
67 ttl,
68 expiry: start + ttl - drift,
69 };
70
71 for instance in &self.cluster {
72 let result = match call {
73 Call::Lock => instance.acquire(&lock),
74 Call::Extend => instance.extend(&lock),
75 };
76
77 match result {
78 Ok(()) => votes += 1,
79 Err(e) => errors.push(e),
80 }
81 }
82
83 if votes >= self.quorum && lock.expiry > Instant::now() {
84 return Ok(lock);
85 }
86
87 let _ = self.unlock(&lock);
88 if attempt < self.retry_count {
89 errors.reset();
90 thread::sleep(self.get_retry_delay());
91 }
92 }
93
94 match call {
95 Call::Lock => Err(RedsyncError::LockRetriesExceeded(errors)),
96 Call::Extend => Err(RedsyncError::ExtendRetriesExceeded(errors)),
97 }
98 }
99
100 pub fn unlock(&self, lock: &Lock) -> Result<(), RedsyncError> {
101 let mut n = 0;
102 let mut errors = MultiError::new();
103
104 for instance in &self.cluster {
105 match instance.release(lock) {
106 Ok(()) => n += 1,
107 Err(e) => errors.push(e),
108 };
109 }
110
111 if n < self.quorum {
112 return Err(RedsyncError::UnlockFailed(errors));
113 }
114
115 Ok(())
116 }
117
118 fn get_unique_lock_id(&self) -> String {
119 thread_rng()
120 .sample_iter(&Alphanumeric)
121 .take(20)
122 .map(char::from)
123 .collect()
124 }
125
126 fn get_retry_delay(&self) -> Duration {
127 let jitter = thread_rng().gen_range(-1.0..1.0) * self.retry_jitter;
128 if jitter > 0.0 {
129 self.retry_delay.add(Duration::from_millis(jitter as u64))
130 } else {
131 self.retry_delay.sub(Duration::from_millis(-jitter as u64))
132 }
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use std::matches;
140
141 struct FakeInstance {
142 acquire: i32,
143 extend: i32,
144 release: i32,
145 }
146
147 impl FakeInstance {
148 pub fn new(acquire: i32, extend: i32, release: i32) -> Self {
149 Self {
150 acquire,
151 extend,
152 release,
153 }
154 }
155 }
156
157 impl Instance for FakeInstance {
158 fn acquire(&self, _lock: &Lock) -> Result<(), RedsyncError> {
159 match self.acquire {
160 1 => Ok(()),
161 _ => Err(RedsyncError::ResourceLocked),
162 }
163 }
164
165 fn extend(&self, _lock: &Lock) -> Result<(), RedsyncError> {
166 match self.extend {
167 1 => Ok(()),
168 _ => Err(RedsyncError::InvalidLease),
169 }
170 }
171
172 fn release(&self, _lock: &Lock) -> Result<(), RedsyncError> {
173 match self.release {
174 1 => Ok(()),
175 _ => Err(RedsyncError::InvalidLease),
176 }
177 }
178 }
179
180 #[test]
181 fn lock() {
182 let dlm = Redsync::new(vec![
183 FakeInstance::new(1, 1, 1),
184 FakeInstance::new(1, 1, 1),
185 FakeInstance::new(0, 1, 1),
186 ]);
187
188 let attempt = dlm.lock("test", Duration::from_secs(1));
189 assert!(attempt.is_ok());
190
191 let lock = attempt.unwrap();
192 assert_eq!(lock.resource, "test");
193 assert!(lock.value.len() > 0);
194 assert_eq!(lock.ttl, Duration::from_secs(1));
195 }
196
197 #[test]
198 fn lock_error() {
199 let dlm = Redsync::new(vec![
200 FakeInstance::new(0, 1, 1),
201 FakeInstance::new(0, 1, 1),
202 FakeInstance::new(1, 1, 1),
203 ]);
204
205 let attempt = dlm.lock("test", Duration::from_secs(1));
206 assert!(matches!(
207 attempt,
208 Err(RedsyncError::LockRetriesExceeded { .. })
209 ));
210 }
211
212 #[test]
213 fn extend() -> Result<(), RedsyncError> {
214 let dlm = Redsync::new(vec![
215 FakeInstance::new(1, 1, 1),
216 FakeInstance::new(1, 1, 1),
217 FakeInstance::new(1, 0, 1),
218 ]);
219 let lock = dlm.lock("test", Duration::from_secs(1))?;
220
221 let attempt = dlm.extend(&lock, Duration::from_secs(2));
222 assert!(attempt.is_ok());
223
224 let lock = attempt.unwrap();
225 assert_eq!(lock.resource, "test");
226 assert!(lock.value.len() > 0);
227 assert_eq!(lock.ttl, Duration::from_secs(2));
228
229 Ok(())
230 }
231
232 #[test]
233 fn extend_error() -> Result<(), RedsyncError> {
234 let dlm = Redsync::new(vec![
235 FakeInstance::new(1, 0, 1),
236 FakeInstance::new(1, 0, 1),
237 FakeInstance::new(1, 1, 1),
238 ]);
239 let lock = dlm.lock("test", Duration::from_secs(1))?;
240
241 let attempt = dlm.extend(&lock, Duration::from_secs(2));
242 assert!(matches!(
243 attempt,
244 Err(RedsyncError::ExtendRetriesExceeded { .. })
245 ));
246
247 Ok(())
248 }
249
250 #[test]
251 fn unlock() -> Result<(), RedsyncError> {
252 let dlm = Redsync::new(vec![
253 FakeInstance::new(1, 1, 1),
254 FakeInstance::new(1, 1, 1),
255 FakeInstance::new(1, 1, 0),
256 ]);
257 let lock = dlm.lock("test", Duration::from_secs(1))?;
258
259 let attempt = dlm.unlock(&lock);
260 assert!(attempt.is_ok());
261
262 Ok(())
263 }
264
265 #[test]
266 fn unlock_error() -> Result<(), RedsyncError> {
267 let dlm = Redsync::new(vec![
268 FakeInstance::new(1, 1, 0),
269 FakeInstance::new(1, 1, 0),
270 FakeInstance::new(1, 1, 1),
271 ]);
272 let lock = dlm.lock("test", Duration::from_secs(1))?;
273
274 let attempt = dlm.unlock(&lock);
275 assert!(matches!(attempt, Err(RedsyncError::UnlockFailed { .. })));
276
277 Ok(())
278 }
279
280 #[test]
281 fn get_unique_lock_id() {
282 let cluster = vec![FakeInstance::new(1, 1, 1)];
283 let dlm = Redsync::new(cluster);
284
285 let value = dlm.get_unique_lock_id();
286 assert_eq!(value.len(), 20);
287 assert!(value.is_ascii());
288 }
289
290 #[test]
291 fn get_retry_delay() {
292 let cluster = vec![FakeInstance::new(1, 1, 1)];
293 let dlm = Redsync::new(cluster);
294
295 let retry_delay = dlm.get_retry_delay();
296 let (min, max) = (Duration::from_millis(100), Duration::from_millis(300));
297 assert!(
298 min < retry_delay && retry_delay < max,
299 "expected retry delay to be between {:?} and {:?}, but got {:?}",
300 min,
301 max,
302 retry_delay,
303 );
304 }
305}