rustango/
distributed_lock.rs1use std::sync::Arc;
53use std::time::Duration;
54
55use crate::cache::BoxedCache;
56
57const KEY_PREFIX: &str = "lock";
58
59#[derive(Clone)]
61pub struct DistributedLock {
62 cache: BoxedCache,
63}
64
65impl DistributedLock {
66 #[must_use]
67 pub fn new(cache: BoxedCache) -> Self {
68 Self { cache }
69 }
70
71 pub async fn try_acquire(&self, name: &str, ttl: Duration) -> Option<LockGuard> {
77 let key = format!("{KEY_PREFIX}:{name}");
78 let n = self.cache.incr(&key, 1, Some(ttl)).await.ok()?;
83 if n == 1 {
84 let token = format!(
86 "{}-{}",
87 std::process::id(),
88 std::time::SystemTime::now()
89 .duration_since(std::time::UNIX_EPOCH)
90 .map_or(0, |d| d.as_nanos())
91 );
92 let token_key = format!("{key}:token");
95 let _ = self.cache.set(&token_key, &token, Some(ttl)).await;
96 Some(LockGuard {
97 cache: self.cache.clone(),
98 key,
99 token_key,
100 token: Arc::new(token),
101 released: Arc::new(std::sync::atomic::AtomicBool::new(false)),
102 })
103 } else {
104 None
105 }
106 }
107
108 pub async fn with_lock<F, Fut, R>(&self, name: &str, ttl: Duration, body: F) -> Option<R>
114 where
115 F: FnOnce() -> Fut,
116 Fut: std::future::Future<Output = R>,
117 {
118 let guard = self.try_acquire(name, ttl).await?;
119 let result = body().await;
120 guard.release().await;
121 Some(result)
122 }
123}
124
125pub struct LockGuard {
127 cache: BoxedCache,
128 key: String,
129 token_key: String,
130 token: Arc<String>,
131 released: Arc<std::sync::atomic::AtomicBool>,
132}
133
134impl LockGuard {
135 pub async fn release(self) {
138 self.release_inner().await;
139 }
140
141 async fn release_inner(&self) {
142 if self
143 .released
144 .swap(true, std::sync::atomic::Ordering::SeqCst)
145 {
146 return;
147 }
148 let stored = self.cache.get(&self.token_key).await.ok().flatten();
152 if stored.as_deref() != Some(self.token.as_str()) {
153 return;
154 }
155 let _ = self.cache.delete(&self.key).await;
156 let _ = self.cache.delete(&self.token_key).await;
157 }
158}
159
160impl Drop for LockGuard {
161 fn drop(&mut self) {
162 if !self.released.load(std::sync::atomic::Ordering::SeqCst) {
166 tracing::debug!(
167 key = %self.key,
168 "DistributedLock guard dropped without release(); waiting for TTL"
169 );
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::cache::InMemoryCache;
178 use std::sync::Arc as StdArc;
179
180 fn lock() -> DistributedLock {
181 let cache: BoxedCache = StdArc::new(InMemoryCache::new());
182 DistributedLock::new(cache)
183 }
184
185 #[tokio::test]
186 async fn first_acquirer_succeeds() {
187 let l = lock();
188 let g = l.try_acquire("job", Duration::from_secs(5)).await;
189 assert!(g.is_some());
190 }
191
192 #[tokio::test]
193 async fn second_acquirer_blocked() {
194 let l = lock();
195 let g1 = l.try_acquire("job", Duration::from_secs(5)).await;
196 assert!(g1.is_some());
197 let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
198 assert!(g2.is_none(), "second acquirer should be blocked");
199 }
200
201 #[tokio::test]
202 async fn release_lets_next_acquirer_in() {
203 let l = lock();
204 let g1 = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
205 g1.release().await;
206 let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
207 assert!(g2.is_some(), "after release the lock is free");
208 }
209
210 #[tokio::test]
211 async fn different_names_dont_collide() {
212 let l = lock();
213 let a = l.try_acquire("a", Duration::from_secs(5)).await;
214 let b = l.try_acquire("b", Duration::from_secs(5)).await;
215 assert!(a.is_some());
216 assert!(b.is_some());
217 }
218
219 #[tokio::test]
220 async fn with_lock_runs_body_and_releases() {
221 let l = lock();
222 let result = l
223 .with_lock("job", Duration::from_secs(5), || async { 42 })
224 .await;
225 assert_eq!(result, Some(42));
226 let g = l.try_acquire("job", Duration::from_secs(5)).await;
228 assert!(g.is_some());
229 }
230
231 #[tokio::test]
232 async fn with_lock_returns_none_when_blocked() {
233 let l = lock();
234 let _hold = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
235 let result = l
236 .with_lock("job", Duration::from_secs(5), || async { 42 })
237 .await;
238 assert_eq!(result, None);
239 }
240
241 #[tokio::test]
242 async fn release_is_idempotent_at_least_once() {
243 let l = lock();
247 let g = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
248 g.release_inner().await;
249 }
251
252 #[tokio::test]
253 async fn ttl_expiry_frees_lock() {
254 let l = lock();
255 let g = l.try_acquire("job", Duration::from_millis(50)).await;
256 assert!(g.is_some());
257 std::mem::forget(g);
259 tokio::time::sleep(Duration::from_millis(120)).await;
260 let g2 = l.try_acquire("job", Duration::from_millis(50)).await;
262 assert!(g2.is_some(), "TTL expiry should free the lock");
263 }
264
265 #[tokio::test]
266 async fn release_after_ttl_does_not_clobber_new_holder() {
267 let l = lock();
268 let g1 = l
269 .try_acquire("job", Duration::from_millis(30))
270 .await
271 .unwrap();
272 tokio::time::sleep(Duration::from_millis(80)).await;
274 let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
276 assert!(g2.is_some(), "new acquirer can claim after TTL");
277 g1.release().await;
279 let g3 = l.try_acquire("job", Duration::from_secs(5)).await;
281 assert!(
282 g3.is_none(),
283 "g2 still holds the lock — late g1.release() must not clear it"
284 );
285 drop(g2);
286 }
287
288 #[tokio::test]
289 async fn with_lock_releases_even_when_body_returns_unit() {
290 let l = lock();
291 let r: Option<()> = l
292 .with_lock("job", Duration::from_secs(5), || async {})
293 .await;
294 assert!(r.is_some());
295 let g = l.try_acquire("job", Duration::from_secs(5)).await;
296 assert!(g.is_some());
297 }
298}