Skip to main content

klauthed_data/locks/
mod.rs

1//! Distributed locks.
2//!
3//! A [`LockManager`] grants mutual exclusion on a string key with a time-to-live
4//! (TTL), so a crashed holder cannot wedge the lock forever — it expires.
5//! [`acquire`](LockManager::acquire) returns `Some(guard)` to the winner and
6//! `None` when the key is already held by a live (unexpired) lock. The
7//! [`LockGuard`] releases the lock when dropped, or eagerly via
8//! [`release`](LockGuard::release).
9//!
10//! Expiry is driven by an injected [`Clock`], so tests can pin and advance time
11//! with a `FixedClock` instead of sleeping.
12//!
13//! [`Clock`]: klauthed_core::time::Clock
14//!
15//! This module provides the trait, the [`LockGuard`] model, and an in-memory
16//! implementation. A Redis-backed manager (`SET key token NX PX ttl`, released
17//! with a compare-and-delete Lua script) is a future pass.
18//!
19//! ```
20//! use std::sync::Arc;
21//! use klauthed_core::time::Duration;
22//! use klauthed_core::time::SystemClock;
23//! use klauthed_data::locks::{InMemoryLockManager, LockManager};
24//!
25//! # async fn run() -> Result<(), klauthed_data::DataError> {
26//! let locks = InMemoryLockManager::new(Arc::new(SystemClock));
27//! if let Some(guard) = locks.acquire("job:nightly", Duration::seconds(30)).await? {
28//!     // critical section …
29//!     guard.release().await?;
30//! }
31//! # Ok(())
32//! # }
33//! ```
34
35#[cfg(feature = "redis")]
36pub mod redis;
37
38#[cfg(feature = "mongodb")]
39pub mod mongo;
40
41use async_trait::async_trait;
42use klauthed_core::id::Id;
43use klauthed_core::time::Duration;
44use klauthed_core::time::{Clock, SystemClock, Timestamp};
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47
48use crate::error::DataError;
49
50/// Marker tag for a lock's fencing token.
51pub struct LockTokenTag;
52
53/// A unique token identifying a single acquisition of a lock. Used as a fencing
54/// token so a release only frees the acquisition that created it.
55pub type LockToken = Id<LockTokenTag>;
56
57/// Shared lock state: key -> (token of holder, expiry instant).
58type LockTable = Mutex<HashMap<String, (LockToken, Timestamp)>>;
59
60/// A manager that grants mutually-exclusive, TTL-bounded locks by key.
61#[async_trait]
62pub trait LockManager: Send + Sync {
63    /// Try to acquire `key` for `ttl`. Returns `Some(guard)` if the lock was
64    /// free (or held by an expired acquisition), or `None` if a live holder
65    /// owns it.
66    ///
67    /// # Errors
68    /// Returns a [`DataError`] only on backend failure; contention is reported
69    /// as `Ok(None)`, not an error.
70    async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError>;
71}
72
73/// Which backend a [`LockGuard`] releases against.
74enum LockBackend {
75    /// In-memory table shared with an [`InMemoryLockManager`].
76    InMemory(Arc<LockTable>),
77    /// Redis-backed manager; release runs a compare-and-delete Lua script.
78    #[cfg(feature = "redis")]
79    Redis(self::redis::RedisLockManager),
80    /// MongoDB-backed manager using compare-and-upsert with TTL.
81    #[cfg(feature = "mongodb")]
82    Mongo(self::mongo::MongoLockManager),
83}
84
85/// A held lock. Dropping it releases the lock; [`release`](LockGuard::release)
86/// does so eagerly and lets the caller observe errors.
87///
88/// The guard carries a fencing [`token`](LockGuard::token): release only frees
89/// the lock if this same token still owns the key, so a guard that outlived its
90/// TTL (and was re-acquired by someone else) cannot stomp the new holder.
91///
92/// For the in-memory backend, dropping the guard releases synchronously. For the
93/// Redis backend, releasing requires an async round-trip, so it happens only via
94/// [`release`](LockGuard::release); a dropped-but-not-released Redis guard is
95/// cleaned up by the lock's TTL instead.
96pub struct LockGuard {
97    key: String,
98    token: LockToken,
99    backend: LockBackend,
100    released: bool,
101}
102
103impl LockGuard {
104    /// Construct an in-memory guard (used by [`InMemoryLockManager`]).
105    fn in_memory(key: String, token: LockToken, table: Arc<LockTable>) -> Self {
106        Self { key, token, backend: LockBackend::InMemory(table), released: false }
107    }
108
109    /// Construct a Redis-backed guard (used by `RedisLockManager`).
110    #[cfg(feature = "redis")]
111    pub(crate) fn redis(
112        key: String,
113        token: LockToken,
114        manager: self::redis::RedisLockManager,
115    ) -> Self {
116        Self { key, token, backend: LockBackend::Redis(manager), released: false }
117    }
118
119    /// Construct a MongoDB-backed guard (used by `MongoLockManager`).
120    #[cfg(feature = "mongodb")]
121    pub(crate) fn mongo(
122        key: String,
123        token: LockToken,
124        manager: self::mongo::MongoLockManager,
125    ) -> Self {
126        Self { key, token, backend: LockBackend::Mongo(manager), released: false }
127    }
128
129    /// The key this guard holds.
130    pub fn key(&self) -> &str {
131        &self.key
132    }
133
134    /// The fencing token for this acquisition.
135    pub fn token(&self) -> LockToken {
136        self.token
137    }
138
139    /// Release the lock now (idempotent). Only frees the key if this guard's
140    /// token still owns it.
141    ///
142    /// # Errors
143    /// Returns a [`DataError`] only if a backend round-trip fails (Redis/MongoDB);
144    /// the in-memory backend never errors.
145    pub async fn release(mut self) -> Result<(), DataError> {
146        if self.released {
147            return Ok(());
148        }
149        self.released = true;
150        match &self.backend {
151            LockBackend::InMemory(table) => {
152                Self::release_in_memory(table, &self.key, self.token);
153                Ok(())
154            }
155            #[cfg(feature = "redis")]
156            LockBackend::Redis(manager) => {
157                manager.release_token(&self.key, self.token).await?;
158                Ok(())
159            }
160            #[cfg(feature = "mongodb")]
161            LockBackend::Mongo(manager) => {
162                manager.release_token(&self.key, self.token).await?;
163                Ok(())
164            }
165        }
166    }
167
168    /// Synchronous compare-and-delete against the in-memory table.
169    fn release_in_memory(table: &LockTable, key: &str, token: LockToken) {
170        let mut guard = table.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
171        // Compare-and-delete: only remove if we still own the key.
172        if let Some((holder, _)) = guard.get(key)
173            && *holder == token
174        {
175            guard.remove(key);
176        }
177    }
178}
179
180impl Drop for LockGuard {
181    fn drop(&mut self) {
182        if self.released {
183            return;
184        }
185        self.released = true;
186        match &self.backend {
187            LockBackend::InMemory(table) => {
188                Self::release_in_memory(table, &self.key, self.token);
189            }
190            // Redis/MongoDB guards can't make async calls from Drop; the lock's TTL
191            // reclaims it. Callers who need prompt release should call
192            // `release().await` explicitly.
193            #[cfg(feature = "redis")]
194            LockBackend::Redis(_) => {
195                tracing::debug!(
196                    key = %self.key,
197                    "redis lock guard dropped without explicit release; relying on TTL expiry"
198                );
199            }
200            #[cfg(feature = "mongodb")]
201            LockBackend::Mongo(_) => {
202                tracing::debug!(
203                    key = %self.key,
204                    "mongodb lock guard dropped without explicit release; relying on TTL expiry"
205                );
206            }
207        }
208    }
209}
210
211/// A thread-safe, in-memory [`LockManager`] for tests and single-process use.
212///
213/// Expiry is evaluated against an injected [`Clock`], so a `FixedClock` makes
214/// TTL behavior deterministic in tests.
215pub struct InMemoryLockManager {
216    table: Arc<LockTable>,
217    clock: Arc<dyn Clock>,
218}
219
220impl InMemoryLockManager {
221    /// A manager driven by `clock` (use a `FixedClock` in tests).
222    pub fn new(clock: Arc<dyn Clock>) -> Self {
223        Self { table: Arc::new(Mutex::new(HashMap::new())), clock }
224    }
225}
226
227impl Default for InMemoryLockManager {
228    /// A manager backed by the real [`SystemClock`].
229    fn default() -> Self {
230        Self::new(Arc::new(SystemClock))
231    }
232}
233
234#[async_trait]
235impl LockManager for InMemoryLockManager {
236    async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError> {
237        let now = self.clock.now();
238        let expires_at = now
239            .checked_add(ttl)
240            .ok_or_else(|| DataError::LockHeld(format!("invalid TTL for lock '{key}'")))?;
241
242        let mut guard = self.table.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
243
244        // A key is takeable if absent or if its current holder has expired.
245        let live_holder = guard.get(key).is_some_and(|(_, holder_expiry)| now < *holder_expiry);
246        if live_holder {
247            return Ok(None);
248        }
249
250        let token = LockToken::new();
251        guard.insert(key.to_owned(), (token, expires_at));
252        drop(guard);
253
254        Ok(Some(LockGuard::in_memory(key.to_owned(), token, Arc::clone(&self.table))))
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use klauthed_core::time::FixedClock;
262
263    fn manager_with(clock: Arc<FixedClock>) -> InMemoryLockManager {
264        InMemoryLockManager::new(clock)
265    }
266
267    #[tokio::test]
268    async fn second_acquire_is_blocked_while_held() {
269        let clock = Arc::new(FixedClock::at_unix_millis(0));
270        let locks = manager_with(clock);
271
272        let first =
273            locks.acquire("k", Duration::seconds(30)).await.unwrap().expect("first acquire wins");
274        assert_eq!(first.key(), "k");
275
276        // Second acquire while the first is alive returns None.
277        assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_none());
278    }
279
280    #[tokio::test]
281    async fn lock_releases_on_drop() {
282        let clock = Arc::new(FixedClock::at_unix_millis(0));
283        let locks = manager_with(clock);
284
285        {
286            let _guard = locks.acquire("k", Duration::seconds(30)).await.unwrap().unwrap();
287            assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_none());
288        } // guard dropped here -> released
289
290        assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_some());
291    }
292
293    #[tokio::test]
294    async fn explicit_release_frees_the_lock() {
295        let clock = Arc::new(FixedClock::at_unix_millis(0));
296        let locks = manager_with(clock);
297
298        let guard = locks.acquire("k", Duration::seconds(30)).await.unwrap().unwrap();
299        guard.release().await.unwrap();
300
301        assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_some());
302    }
303
304    #[tokio::test]
305    async fn lock_expires_after_ttl() {
306        let clock = Arc::new(FixedClock::at_unix_millis(0));
307        let locks = manager_with(Arc::clone(&clock));
308
309        // Hold for 10s, then leak the guard so only TTL can free it.
310        let guard = locks.acquire("k", Duration::seconds(10)).await.unwrap().unwrap();
311        std::mem::forget(guard);
312
313        // Still within TTL -> blocked.
314        clock.advance(Duration::seconds(5));
315        assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_none());
316
317        // Past TTL -> the stale lock is considered expired and reusable.
318        clock.advance(Duration::seconds(6));
319        assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_some());
320    }
321
322    #[tokio::test]
323    async fn stale_guard_release_does_not_steal_new_holder() {
324        let clock = Arc::new(FixedClock::at_unix_millis(0));
325        let locks = manager_with(Arc::clone(&clock));
326
327        let stale = locks.acquire("k", Duration::seconds(10)).await.unwrap().unwrap();
328        clock.advance(Duration::seconds(11)); // stale's TTL passes
329
330        // A new holder takes the key after expiry.
331        let fresh = locks.acquire("k", Duration::seconds(10)).await.unwrap().unwrap();
332
333        // Dropping the stale guard must NOT release the fresh holder's lock.
334        drop(stale);
335        assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_none());
336
337        drop(fresh);
338        assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_some());
339    }
340}