1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
//! Distributed locks backed by [`Cache`](crate::cache::Cache).
//!
//! "Only one worker at a time runs this task." Pair with the
//! [`crate::scheduler`] so a multi-replica deploy doesn't run a daily
//! cron N times, or wrap a long-running job whose effect should be
//! exactly-once.
//!
//! ## Mechanism
//!
//! Acquire is a Cache `set` of `lock:<name>` to a per-acquire token,
//! gated on the existing default `incr`-based check. The lock auto-
//! expires after `ttl`, so a process that crashes while holding the
//! lock doesn't deadlock the system — at worst, the next acquirer
//! waits `ttl` seconds.
//!
//! Release is conditional on the token: a process that lost its lock
//! (because TTL expired and someone else acquired) does NOT
//! accidentally release the new holder's lock.
//!
//! ## Quick start
//!
//! ```ignore
//! use rustango::distributed_lock::DistributedLock;
//! use std::time::Duration;
//! use std::sync::Arc;
//!
//! let lock = DistributedLock::new(redis_cache);
//!
//! // Try once, give up if another replica has it:
//! if let Some(guard) = lock.try_acquire("daily_report", Duration::from_secs(60)).await {
//! run_daily_report().await;
//! guard.release().await;
//! }
//!
//! // Or use the closure form which auto-releases on drop:
//! lock.with_lock("daily_report", Duration::from_secs(60), || async {
//! run_daily_report().await;
//! }).await;
//! ```
//!
//! ## Caveats
//!
//! - The non-atomic default `Cache::incr` can race under heavy
//! contention with two acquirers in the same millisecond. RedisCache
//! uses native `INCRBY` so it's safe across replicas.
//! - This is "best-effort exactly-once" — fine for cron-style work,
//! not a substitute for a transaction when correctness matters.
//! - TTL must be longer than the worst-case execution time of the
//! protected work, OR the work must be idempotent. A too-short TTL
//! means another replica could grab the lock mid-execution.
use std::sync::Arc;
use std::time::Duration;
use crate::cache::BoxedCache;
const KEY_PREFIX: &str = "lock";
/// Lock factory. Cheap to clone.
#[derive(Clone)]
pub struct DistributedLock {
cache: BoxedCache,
}
impl DistributedLock {
#[must_use]
pub fn new(cache: BoxedCache) -> Self {
Self { cache }
}
/// Try to acquire `name` for `ttl`. Returns:
/// - `Some(LockGuard)` when we got it. Call `release()` when done
/// (drop without release leaves the lock to expire on TTL —
/// safe but slightly wasteful of contention slots).
/// - `None` when someone else holds it.
pub async fn try_acquire(&self, name: &str, ttl: Duration) -> Option<LockGuard> {
let key = format!("{KEY_PREFIX}:{name}");
// `incr(key, 1, ttl)` returns 1 ONLY when the counter was
// previously absent (or 0). On RedisCache the INCRBY+EXPIRE NX
// sequence is atomic; on the in-memory default impl it's racy
// but acceptable for tests.
let n = self.cache.incr(&key, 1, Some(ttl)).await.ok()?;
if n == 1 {
// We got the lock. Stash a token so release knows it's us.
let token = format!(
"{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_nanos())
);
// Keep the counter, but ALSO write a token — release reads
// both. (The counter is the gate; the token is the receipt.)
let token_key = format!("{key}:token");
let _ = self
.cache
.set(&token_key, &token, Some(ttl))
.await;
Some(LockGuard {
cache: self.cache.clone(),
key,
token_key,
token: Arc::new(token),
released: Arc::new(std::sync::atomic::AtomicBool::new(false)),
})
} else {
None
}
}
/// Acquire-or-skip helper: runs `body` only if we got the lock.
/// Returns `Some(R)` when body ran, `None` when another holder
/// blocked us. The lock is released after body finishes (or on
/// panic, via the guard's Drop — which is best-effort since we
/// can't call async fns from Drop).
pub async fn with_lock<F, Fut, R>(
&self,
name: &str,
ttl: Duration,
body: F,
) -> Option<R>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = R>,
{
let guard = self.try_acquire(name, ttl).await?;
let result = body().await;
guard.release().await;
Some(result)
}
}
/// Holds a lock until released or until the TTL expires.
pub struct LockGuard {
cache: BoxedCache,
key: String,
token_key: String,
token: Arc<String>,
released: Arc<std::sync::atomic::AtomicBool>,
}
impl LockGuard {
/// Release the lock if we still hold it. Safe to call multiple
/// times; the second call is a no-op.
pub async fn release(self) {
self.release_inner().await;
}
async fn release_inner(&self) {
if self
.released
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
return;
}
// Token check — if our token is still the one stored, we still
// hold the lock, so we can clear it. Otherwise someone else
// acquired after our TTL expired and we mustn't touch theirs.
let stored = self
.cache
.get(&self.token_key)
.await
.ok()
.flatten();
if stored.as_deref() != Some(self.token.as_str()) {
return;
}
let _ = self.cache.delete(&self.key).await;
let _ = self.cache.delete(&self.token_key).await;
}
}
impl Drop for LockGuard {
fn drop(&mut self) {
// Best-effort: if the holder forgot to call release(), the
// TTL will eventually free the lock. We can't do an async
// delete from Drop, but a sync warning is informative.
if !self.released.load(std::sync::atomic::Ordering::SeqCst) {
tracing::debug!(
key = %self.key,
"DistributedLock guard dropped without release(); waiting for TTL"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::InMemoryCache;
use std::sync::Arc as StdArc;
fn lock() -> DistributedLock {
let cache: BoxedCache = StdArc::new(InMemoryCache::new());
DistributedLock::new(cache)
}
#[tokio::test]
async fn first_acquirer_succeeds() {
let l = lock();
let g = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g.is_some());
}
#[tokio::test]
async fn second_acquirer_blocked() {
let l = lock();
let g1 = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g1.is_some());
let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g2.is_none(), "second acquirer should be blocked");
}
#[tokio::test]
async fn release_lets_next_acquirer_in() {
let l = lock();
let g1 = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
g1.release().await;
let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g2.is_some(), "after release the lock is free");
}
#[tokio::test]
async fn different_names_dont_collide() {
let l = lock();
let a = l.try_acquire("a", Duration::from_secs(5)).await;
let b = l.try_acquire("b", Duration::from_secs(5)).await;
assert!(a.is_some());
assert!(b.is_some());
}
#[tokio::test]
async fn with_lock_runs_body_and_releases() {
let l = lock();
let result = l
.with_lock("job", Duration::from_secs(5), || async { 42 })
.await;
assert_eq!(result, Some(42));
// Lock should be released — next acquire works.
let g = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g.is_some());
}
#[tokio::test]
async fn with_lock_returns_none_when_blocked() {
let l = lock();
let _hold = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
let result = l
.with_lock("job", Duration::from_secs(5), || async { 42 })
.await;
assert_eq!(result, None);
}
#[tokio::test]
async fn release_is_idempotent_at_least_once() {
// The release call consumes the guard, so calling it twice
// requires re-acquiring + dropping — verify the inner method
// is safe to call twice.
let l = lock();
let g = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
g.release_inner().await;
// Manually call again — should be a no-op.
}
#[tokio::test]
async fn ttl_expiry_frees_lock() {
let l = lock();
let g = l.try_acquire("job", Duration::from_millis(50)).await;
assert!(g.is_some());
// Forget the guard so we can't release explicitly; wait past TTL.
std::mem::forget(g);
tokio::time::sleep(Duration::from_millis(120)).await;
// After expiry the lock is reacquireable.
let g2 = l.try_acquire("job", Duration::from_millis(50)).await;
assert!(g2.is_some(), "TTL expiry should free the lock");
}
#[tokio::test]
async fn release_after_ttl_does_not_clobber_new_holder() {
let l = lock();
let g1 = l.try_acquire("job", Duration::from_millis(30)).await.unwrap();
// Wait for TTL to expire.
tokio::time::sleep(Duration::from_millis(80)).await;
// Someone else acquires.
let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g2.is_some(), "new acquirer can claim after TTL");
// Now g1 belatedly releases — this MUST NOT clobber g2.
g1.release().await;
// g2 must still hold the lock.
let g3 = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(
g3.is_none(),
"g2 still holds the lock — late g1.release() must not clear it"
);
drop(g2);
}
#[tokio::test]
async fn with_lock_releases_even_when_body_returns_unit() {
let l = lock();
let r: Option<()> = l
.with_lock("job", Duration::from_secs(5), || async {})
.await;
assert!(r.is_some());
let g = l.try_acquire("job", Duration::from_secs(5)).await;
assert!(g.is_some());
}
}