Skip to main content

rustango/
distributed_lock.rs

1//! Distributed locks backed by [`Cache`](crate::cache::Cache).
2//!
3//! "Only one worker at a time runs this task." Pair with the
4//! [`crate::scheduler`] so a multi-replica deploy doesn't run a daily
5//! cron N times, or wrap a long-running job whose effect should be
6//! exactly-once.
7//!
8//! ## Mechanism
9//!
10//! Acquire is a Cache `set` of `lock:<name>` to a per-acquire token,
11//! gated on the existing default `incr`-based check. The lock auto-
12//! expires after `ttl`, so a process that crashes while holding the
13//! lock doesn't deadlock the system — at worst, the next acquirer
14//! waits `ttl` seconds.
15//!
16//! Release is conditional on the token: a process that lost its lock
17//! (because TTL expired and someone else acquired) does NOT
18//! accidentally release the new holder's lock.
19//!
20//! ## Quick start
21//!
22//! ```ignore
23//! use rustango::distributed_lock::DistributedLock;
24//! use std::time::Duration;
25//! use std::sync::Arc;
26//!
27//! let lock = DistributedLock::new(redis_cache);
28//!
29//! // Try once, give up if another replica has it:
30//! if let Some(guard) = lock.try_acquire("daily_report", Duration::from_secs(60)).await {
31//!     run_daily_report().await;
32//!     guard.release().await;
33//! }
34//!
35//! // Or use the closure form which auto-releases on drop:
36//! lock.with_lock("daily_report", Duration::from_secs(60), || async {
37//!     run_daily_report().await;
38//! }).await;
39//! ```
40//!
41//! ## Caveats
42//!
43//! - The non-atomic default `Cache::incr` can race under heavy
44//!   contention with two acquirers in the same millisecond. RedisCache
45//!   uses native `INCRBY` so it's safe across replicas.
46//! - This is "best-effort exactly-once" — fine for cron-style work,
47//!   not a substitute for a transaction when correctness matters.
48//! - TTL must be longer than the worst-case execution time of the
49//!   protected work, OR the work must be idempotent. A too-short TTL
50//!   means another replica could grab the lock mid-execution.
51
52use std::sync::Arc;
53use std::time::Duration;
54
55use crate::cache::BoxedCache;
56
57const KEY_PREFIX: &str = "lock";
58
59/// Lock factory. Cheap to clone.
60#[derive(Clone)]
61pub struct DistributedLock {
62    cache: BoxedCache,
63}
64
65impl DistributedLock {
66    #[must_use]
67    pub fn new(cache: BoxedCache) -> Self {
68        Self { cache }
69    }
70
71    /// Try to acquire `name` for `ttl`. Returns:
72    /// - `Some(LockGuard)` when we got it. Call `release()` when done
73    ///   (drop without release leaves the lock to expire on TTL —
74    ///   safe but slightly wasteful of contention slots).
75    /// - `None` when someone else holds it.
76    pub async fn try_acquire(&self, name: &str, ttl: Duration) -> Option<LockGuard> {
77        let key = format!("{KEY_PREFIX}:{name}");
78        // `incr(key, 1, ttl)` returns 1 ONLY when the counter was
79        // previously absent (or 0). On RedisCache the INCRBY+EXPIRE NX
80        // sequence is atomic; on the in-memory default impl it's racy
81        // but acceptable for tests.
82        let n = self.cache.incr(&key, 1, Some(ttl)).await.ok()?;
83        if n == 1 {
84            // We got the lock. Stash a token so release knows it's us.
85            let token = format!(
86                "{}-{}",
87                std::process::id(),
88                std::time::SystemTime::now()
89                    .duration_since(std::time::UNIX_EPOCH)
90                    .map_or(0, |d| d.as_nanos())
91            );
92            // Keep the counter, but ALSO write a token — release reads
93            // both. (The counter is the gate; the token is the receipt.)
94            let token_key = format!("{key}:token");
95            let _ = self.cache.set(&token_key, &token, Some(ttl)).await;
96            Some(LockGuard {
97                cache: self.cache.clone(),
98                key,
99                token_key,
100                token: Arc::new(token),
101                released: Arc::new(std::sync::atomic::AtomicBool::new(false)),
102            })
103        } else {
104            None
105        }
106    }
107
108    /// Acquire-or-skip helper: runs `body` only if we got the lock.
109    /// Returns `Some(R)` when body ran, `None` when another holder
110    /// blocked us. The lock is released after body finishes (or on
111    /// panic, via the guard's Drop — which is best-effort since we
112    /// can't call async fns from Drop).
113    pub async fn with_lock<F, Fut, R>(&self, name: &str, ttl: Duration, body: F) -> Option<R>
114    where
115        F: FnOnce() -> Fut,
116        Fut: std::future::Future<Output = R>,
117    {
118        let guard = self.try_acquire(name, ttl).await?;
119        let result = body().await;
120        guard.release().await;
121        Some(result)
122    }
123}
124
125/// Holds a lock until released or until the TTL expires.
126pub struct LockGuard {
127    cache: BoxedCache,
128    key: String,
129    token_key: String,
130    token: Arc<String>,
131    released: Arc<std::sync::atomic::AtomicBool>,
132}
133
134impl LockGuard {
135    /// Release the lock if we still hold it. Safe to call multiple
136    /// times; the second call is a no-op.
137    pub async fn release(self) {
138        self.release_inner().await;
139    }
140
141    async fn release_inner(&self) {
142        if self
143            .released
144            .swap(true, std::sync::atomic::Ordering::SeqCst)
145        {
146            return;
147        }
148        // Token check — if our token is still the one stored, we still
149        // hold the lock, so we can clear it. Otherwise someone else
150        // acquired after our TTL expired and we mustn't touch theirs.
151        let stored = self.cache.get(&self.token_key).await.ok().flatten();
152        if stored.as_deref() != Some(self.token.as_str()) {
153            return;
154        }
155        let _ = self.cache.delete(&self.key).await;
156        let _ = self.cache.delete(&self.token_key).await;
157    }
158}
159
160impl Drop for LockGuard {
161    fn drop(&mut self) {
162        // Best-effort: if the holder forgot to call release(), the
163        // TTL will eventually free the lock. We can't do an async
164        // delete from Drop, but a sync warning is informative.
165        if !self.released.load(std::sync::atomic::Ordering::SeqCst) {
166            tracing::debug!(
167                key = %self.key,
168                "DistributedLock guard dropped without release(); waiting for TTL"
169            );
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::cache::InMemoryCache;
178    use std::sync::Arc as StdArc;
179
180    fn lock() -> DistributedLock {
181        let cache: BoxedCache = StdArc::new(InMemoryCache::new());
182        DistributedLock::new(cache)
183    }
184
185    #[tokio::test]
186    async fn first_acquirer_succeeds() {
187        let l = lock();
188        let g = l.try_acquire("job", Duration::from_secs(5)).await;
189        assert!(g.is_some());
190    }
191
192    #[tokio::test]
193    async fn second_acquirer_blocked() {
194        let l = lock();
195        let g1 = l.try_acquire("job", Duration::from_secs(5)).await;
196        assert!(g1.is_some());
197        let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
198        assert!(g2.is_none(), "second acquirer should be blocked");
199    }
200
201    #[tokio::test]
202    async fn release_lets_next_acquirer_in() {
203        let l = lock();
204        let g1 = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
205        g1.release().await;
206        let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
207        assert!(g2.is_some(), "after release the lock is free");
208    }
209
210    #[tokio::test]
211    async fn different_names_dont_collide() {
212        let l = lock();
213        let a = l.try_acquire("a", Duration::from_secs(5)).await;
214        let b = l.try_acquire("b", Duration::from_secs(5)).await;
215        assert!(a.is_some());
216        assert!(b.is_some());
217    }
218
219    #[tokio::test]
220    async fn with_lock_runs_body_and_releases() {
221        let l = lock();
222        let result = l
223            .with_lock("job", Duration::from_secs(5), || async { 42 })
224            .await;
225        assert_eq!(result, Some(42));
226        // Lock should be released — next acquire works.
227        let g = l.try_acquire("job", Duration::from_secs(5)).await;
228        assert!(g.is_some());
229    }
230
231    #[tokio::test]
232    async fn with_lock_returns_none_when_blocked() {
233        let l = lock();
234        let _hold = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
235        let result = l
236            .with_lock("job", Duration::from_secs(5), || async { 42 })
237            .await;
238        assert_eq!(result, None);
239    }
240
241    #[tokio::test]
242    async fn release_is_idempotent_at_least_once() {
243        // The release call consumes the guard, so calling it twice
244        // requires re-acquiring + dropping — verify the inner method
245        // is safe to call twice.
246        let l = lock();
247        let g = l.try_acquire("job", Duration::from_secs(5)).await.unwrap();
248        g.release_inner().await;
249        // Manually call again — should be a no-op.
250    }
251
252    #[tokio::test]
253    async fn ttl_expiry_frees_lock() {
254        let l = lock();
255        let g = l.try_acquire("job", Duration::from_millis(50)).await;
256        assert!(g.is_some());
257        // Forget the guard so we can't release explicitly; wait past TTL.
258        std::mem::forget(g);
259        tokio::time::sleep(Duration::from_millis(120)).await;
260        // After expiry the lock is reacquireable.
261        let g2 = l.try_acquire("job", Duration::from_millis(50)).await;
262        assert!(g2.is_some(), "TTL expiry should free the lock");
263    }
264
265    #[tokio::test]
266    async fn release_after_ttl_does_not_clobber_new_holder() {
267        let l = lock();
268        let g1 = l
269            .try_acquire("job", Duration::from_millis(30))
270            .await
271            .unwrap();
272        // Wait for TTL to expire.
273        tokio::time::sleep(Duration::from_millis(80)).await;
274        // Someone else acquires.
275        let g2 = l.try_acquire("job", Duration::from_secs(5)).await;
276        assert!(g2.is_some(), "new acquirer can claim after TTL");
277        // Now g1 belatedly releases — this MUST NOT clobber g2.
278        g1.release().await;
279        // g2 must still hold the lock.
280        let g3 = l.try_acquire("job", Duration::from_secs(5)).await;
281        assert!(
282            g3.is_none(),
283            "g2 still holds the lock — late g1.release() must not clear it"
284        );
285        drop(g2);
286    }
287
288    #[tokio::test]
289    async fn with_lock_releases_even_when_body_returns_unit() {
290        let l = lock();
291        let r: Option<()> = l
292            .with_lock("job", Duration::from_secs(5), || async {})
293            .await;
294        assert!(r.is_some());
295        let g = l.try_acquire("job", Duration::from_secs(5)).await;
296        assert!(g.is_some());
297    }
298}