klauthed_data/locks/
mod.rs1#[cfg(feature = "redis")]
36pub mod redis;
37
38#[cfg(feature = "mongodb")]
39pub mod mongo;
40
41use async_trait::async_trait;
42use klauthed_core::id::Id;
43use klauthed_core::time::Duration;
44use klauthed_core::time::{Clock, SystemClock, Timestamp};
45use std::collections::HashMap;
46use std::sync::{Arc, Mutex};
47
48use crate::error::DataError;
49
50pub struct LockTokenTag;
52
53pub type LockToken = Id<LockTokenTag>;
56
57type LockTable = Mutex<HashMap<String, (LockToken, Timestamp)>>;
59
60#[async_trait]
62pub trait LockManager: Send + Sync {
63 async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError>;
71}
72
73enum LockBackend {
75 InMemory(Arc<LockTable>),
77 #[cfg(feature = "redis")]
79 Redis(self::redis::RedisLockManager),
80 #[cfg(feature = "mongodb")]
82 Mongo(self::mongo::MongoLockManager),
83}
84
85pub struct LockGuard {
97 key: String,
98 token: LockToken,
99 backend: LockBackend,
100 released: bool,
101}
102
103impl LockGuard {
104 fn in_memory(key: String, token: LockToken, table: Arc<LockTable>) -> Self {
106 Self { key, token, backend: LockBackend::InMemory(table), released: false }
107 }
108
109 #[cfg(feature = "redis")]
111 pub(crate) fn redis(
112 key: String,
113 token: LockToken,
114 manager: self::redis::RedisLockManager,
115 ) -> Self {
116 Self { key, token, backend: LockBackend::Redis(manager), released: false }
117 }
118
119 #[cfg(feature = "mongodb")]
121 pub(crate) fn mongo(
122 key: String,
123 token: LockToken,
124 manager: self::mongo::MongoLockManager,
125 ) -> Self {
126 Self { key, token, backend: LockBackend::Mongo(manager), released: false }
127 }
128
129 pub fn key(&self) -> &str {
131 &self.key
132 }
133
134 pub fn token(&self) -> LockToken {
136 self.token
137 }
138
139 pub async fn release(mut self) -> Result<(), DataError> {
146 if self.released {
147 return Ok(());
148 }
149 self.released = true;
150 match &self.backend {
151 LockBackend::InMemory(table) => {
152 Self::release_in_memory(table, &self.key, self.token);
153 Ok(())
154 }
155 #[cfg(feature = "redis")]
156 LockBackend::Redis(manager) => {
157 manager.release_token(&self.key, self.token).await?;
158 Ok(())
159 }
160 #[cfg(feature = "mongodb")]
161 LockBackend::Mongo(manager) => {
162 manager.release_token(&self.key, self.token).await?;
163 Ok(())
164 }
165 }
166 }
167
168 fn release_in_memory(table: &LockTable, key: &str, token: LockToken) {
170 let mut guard = table.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
171 if let Some((holder, _)) = guard.get(key)
173 && *holder == token
174 {
175 guard.remove(key);
176 }
177 }
178}
179
180impl Drop for LockGuard {
181 fn drop(&mut self) {
182 if self.released {
183 return;
184 }
185 self.released = true;
186 match &self.backend {
187 LockBackend::InMemory(table) => {
188 Self::release_in_memory(table, &self.key, self.token);
189 }
190 #[cfg(feature = "redis")]
194 LockBackend::Redis(_) => {
195 tracing::debug!(
196 key = %self.key,
197 "redis lock guard dropped without explicit release; relying on TTL expiry"
198 );
199 }
200 #[cfg(feature = "mongodb")]
201 LockBackend::Mongo(_) => {
202 tracing::debug!(
203 key = %self.key,
204 "mongodb lock guard dropped without explicit release; relying on TTL expiry"
205 );
206 }
207 }
208 }
209}
210
211pub struct InMemoryLockManager {
216 table: Arc<LockTable>,
217 clock: Arc<dyn Clock>,
218}
219
220impl InMemoryLockManager {
221 pub fn new(clock: Arc<dyn Clock>) -> Self {
223 Self { table: Arc::new(Mutex::new(HashMap::new())), clock }
224 }
225}
226
227impl Default for InMemoryLockManager {
228 fn default() -> Self {
230 Self::new(Arc::new(SystemClock))
231 }
232}
233
234#[async_trait]
235impl LockManager for InMemoryLockManager {
236 async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError> {
237 let now = self.clock.now();
238 let expires_at = now
239 .checked_add(ttl)
240 .ok_or_else(|| DataError::LockHeld(format!("invalid TTL for lock '{key}'")))?;
241
242 let mut guard = self.table.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
243
244 let live_holder = guard.get(key).is_some_and(|(_, holder_expiry)| now < *holder_expiry);
246 if live_holder {
247 return Ok(None);
248 }
249
250 let token = LockToken::new();
251 guard.insert(key.to_owned(), (token, expires_at));
252 drop(guard);
253
254 Ok(Some(LockGuard::in_memory(key.to_owned(), token, Arc::clone(&self.table))))
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use klauthed_core::time::FixedClock;
262
263 fn manager_with(clock: Arc<FixedClock>) -> InMemoryLockManager {
264 InMemoryLockManager::new(clock)
265 }
266
267 #[tokio::test]
268 async fn second_acquire_is_blocked_while_held() {
269 let clock = Arc::new(FixedClock::at_unix_millis(0));
270 let locks = manager_with(clock);
271
272 let first =
273 locks.acquire("k", Duration::seconds(30)).await.unwrap().expect("first acquire wins");
274 assert_eq!(first.key(), "k");
275
276 assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_none());
278 }
279
280 #[tokio::test]
281 async fn lock_releases_on_drop() {
282 let clock = Arc::new(FixedClock::at_unix_millis(0));
283 let locks = manager_with(clock);
284
285 {
286 let _guard = locks.acquire("k", Duration::seconds(30)).await.unwrap().unwrap();
287 assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_none());
288 } assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_some());
291 }
292
293 #[tokio::test]
294 async fn explicit_release_frees_the_lock() {
295 let clock = Arc::new(FixedClock::at_unix_millis(0));
296 let locks = manager_with(clock);
297
298 let guard = locks.acquire("k", Duration::seconds(30)).await.unwrap().unwrap();
299 guard.release().await.unwrap();
300
301 assert!(locks.acquire("k", Duration::seconds(30)).await.unwrap().is_some());
302 }
303
304 #[tokio::test]
305 async fn lock_expires_after_ttl() {
306 let clock = Arc::new(FixedClock::at_unix_millis(0));
307 let locks = manager_with(Arc::clone(&clock));
308
309 let guard = locks.acquire("k", Duration::seconds(10)).await.unwrap().unwrap();
311 std::mem::forget(guard);
312
313 clock.advance(Duration::seconds(5));
315 assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_none());
316
317 clock.advance(Duration::seconds(6));
319 assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_some());
320 }
321
322 #[tokio::test]
323 async fn stale_guard_release_does_not_steal_new_holder() {
324 let clock = Arc::new(FixedClock::at_unix_millis(0));
325 let locks = manager_with(Arc::clone(&clock));
326
327 let stale = locks.acquire("k", Duration::seconds(10)).await.unwrap().unwrap();
328 clock.advance(Duration::seconds(11)); let fresh = locks.acquire("k", Duration::seconds(10)).await.unwrap().unwrap();
332
333 drop(stale);
335 assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_none());
336
337 drop(fresh);
338 assert!(locks.acquire("k", Duration::seconds(10)).await.unwrap().is_some());
339 }
340}