Skip to main content

klauthed_data/locks/
redis.rs

1//! Redis-backed [`LockManager`].
2//!
3//! [`RedisLockManager`] implements the standard single-instance Redis lock:
4//!
5//! * **acquire** — `SET key token NX PX <ttl_ms>`. `NX` makes the write succeed
6//!   only when the key is free; `PX` bounds the hold so a crashed owner cannot
7//!   wedge the lock forever. The random `token` is the fencing token returned in
8//!   the [`LockGuard`].
9//! * **release** — a Lua compare-and-delete that removes the key only if its
10//!   value still equals our token, so a guard that outlived its TTL (and was
11//!   re-acquired by someone else) cannot delete the new owner's lock.
12//!
13//! This is the classic single-node algorithm; it is *not* Redlock and offers no
14//! guarantees across a failed-over Redis. For most service coordination needs
15//! (leader election, throttling a cron) it is sufficient.
16//!
17//! Tests that need a live Redis are marked `#[ignore]`; run them with a server
18//! at `REDIS_URL` via `cargo test -p klauthed-data --features redis -- --ignored`.
19
20use async_trait::async_trait;
21use klauthed_core::time::Duration;
22use redis::aio::ConnectionManager;
23use redis::{ExistenceCheck, SetExpiry, SetOptions};
24
25use crate::error::DataError;
26use crate::locks::{LockGuard, LockManager, LockToken};
27
28/// Lua script: delete `KEYS[1]` only if its value equals `ARGV[1]`.
29/// Returns `1` if it deleted, `0` otherwise.
30const RELEASE_SCRIPT: &str = "\
31if redis.call('GET', KEYS[1]) == ARGV[1] then
32    return redis.call('DEL', KEYS[1])
33else
34    return 0
35end";
36
37/// A [`LockManager`] that grants TTL-bounded locks via Redis `SET … NX PX`.
38///
39/// Clone-cheap: holds a cloneable [`ConnectionManager`].
40#[derive(Clone)]
41pub struct RedisLockManager {
42    conn: ConnectionManager,
43}
44
45impl RedisLockManager {
46    /// Wrap a managed Redis connection (see `cache::connect_redis`).
47    pub fn new(conn: ConnectionManager) -> Self {
48        Self { conn }
49    }
50
51    /// Release a key by fencing `token`, honoring the compare-and-delete so only
52    /// the still-current holder is freed. Returns `true` if the lock was held by
53    /// `token` and is now released, `false` if it had already expired or been
54    /// taken over.
55    ///
56    /// Exposed so a process can release a lock by token without holding the
57    /// [`LockGuard`] (e.g. after recovering the token from elsewhere). The guard
58    /// returned by [`acquire`](LockManager::acquire) releases through this on
59    /// [`release`](LockGuard::release) / drop.
60    pub async fn release_token(&self, key: &str, token: LockToken) -> Result<bool, DataError> {
61        let mut conn = self.conn.clone();
62        let deleted: i64 = redis::Script::new(RELEASE_SCRIPT)
63            .key(key)
64            .arg(token.to_string())
65            .invoke_async(&mut conn)
66            .await?;
67        Ok(deleted == 1)
68    }
69}
70
71#[async_trait]
72impl LockManager for RedisLockManager {
73    async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError> {
74        let ttl_ms: u64 = ttl.whole_milliseconds().try_into().map_err(|_| {
75            DataError::LockHeld(format!("invalid (non-positive) TTL for lock '{key}'"))
76        })?;
77        if ttl_ms == 0 {
78            return Err(DataError::LockHeld(format!("invalid (zero) TTL for lock '{key}'")));
79        }
80
81        let token = LockToken::new();
82        let options = SetOptions::default()
83            .conditional_set(ExistenceCheck::NX)
84            .with_expiration(SetExpiry::PX(ttl_ms));
85
86        let mut conn = self.conn.clone();
87        // `SET … NX` returns the value on success and nil (None) when the key
88        // already exists, so a `None` means we lost the race.
89        let outcome: Option<String> = redis::cmd("SET")
90            .arg(key)
91            .arg(token.to_string())
92            .arg(&options)
93            .query_async(&mut conn)
94            .await?;
95
96        match outcome {
97            Some(_) => Ok(Some(LockGuard::redis(key.to_owned(), token, self.clone()))),
98            None => Ok(None),
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    /// Build a manager against a live Redis from `REDIS_URL` (default
108    /// `redis://127.0.0.1/`). Used only by ignored tests.
109    async fn live_manager() -> RedisLockManager {
110        let url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_owned());
111        let client = redis::Client::open(url).expect("open redis client");
112        let conn = ConnectionManager::new(client).await.expect("connect redis");
113        RedisLockManager::new(conn)
114    }
115
116    #[tokio::test]
117    #[ignore = "requires a live Redis at REDIS_URL"]
118    async fn acquire_blocks_until_released() {
119        let locks = live_manager().await;
120        let key = format!("klauthed:test:lock:{}", LockToken::new());
121
122        let guard =
123            locks.acquire(&key, Duration::seconds(30)).await.unwrap().expect("first acquire wins");
124
125        // Second acquire while held returns None.
126        assert!(locks.acquire(&key, Duration::seconds(30)).await.unwrap().is_none());
127
128        guard.release().await.unwrap();
129
130        // Now it is free again.
131        assert!(locks.acquire(&key, Duration::seconds(30)).await.unwrap().is_some());
132    }
133
134    #[tokio::test]
135    #[ignore = "requires a live Redis at REDIS_URL"]
136    async fn stale_token_release_does_not_steal() {
137        let locks = live_manager().await;
138        let key = format!("klauthed:test:lock:{}", LockToken::new());
139
140        let stale = locks.acquire(&key, Duration::milliseconds(50)).await.unwrap().unwrap();
141        let stale_token = stale.token();
142        // Let the TTL lapse so a new holder can take the key.
143        tokio::time::sleep(std::time::Duration::from_millis(120)).await;
144
145        let _fresh = locks
146            .acquire(&key, Duration::seconds(30))
147            .await
148            .unwrap()
149            .expect("fresh acquire after expiry");
150
151        // Releasing the stale token must NOT free the fresh holder's lock.
152        let freed = locks.release_token(&key, stale_token).await.unwrap();
153        assert!(!freed);
154        std::mem::forget(stale);
155    }
156}