async_redis_lock/
lib.rs

1//! A simple and easy-to-use asynchronous redis distributed lock implementation based on tokio and redis-rs.
2//!
3//! ## Key Features
4//!
5//! - ✨ **Auto Extension** - Automatically extends lock lifetime in background until released
6//! - 🔒 **Passive Release** - Lock automatically releases when lifetime expires after process crash
7//! - 🎯 **Drop Support** - Supports both implicit release via drop and explicit release via method call
8//!
9//! ## Quick Start
10//!
11//! ### Installation
12//!
13//! ```toml
14//! [dependencies]
15//! async-redis-lock = "0.0.1"
16//! ```
17//!
18//! ### Basic Usage
19//!
20//! ```rust
21//! use async_redis_lock::Locker;
22//!
23//! #[tokio::main]
24//! async fn main() -> anyhow::Result<()> {
25//!     // Create locker
26//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
27//!
28//!     // Acquire lock
29//!     let lock = locker.acquire("lock_key").await?;
30//!
31//!     // At this point:
32//!     // 1. Lock is held
33//!     // 2. Background task automatically extends lock TTL
34//!     // 3. Safe to perform critical operations
35//!     // ...
36//!
37//!     // Release lock explicitly
38//!     // Alternative: drop(lock) for implicit release
39//!     lock.release()?;
40//!
41//!     Ok(())
42//! }
43//! ```
44//!
45//! ### Automatic Release
46//!
47//! ```rust
48//! use async_redis_lock::Locker;
49//!
50//! #[tokio::main]
51//! async fn main() -> anyhow::Result<()> {
52//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
53//!
54//!     // Use block scope to control lock lifetime
55//!     {
56//!         // Acquire lock and store in _lock variable
57//!         // The _ prefix indicates we only care about its Drop behavior
58//!         let _lock = locker.acquire("lock_key").await?;
59//!         // Perform operations that require locking
60//!         // ...
61//!         // Lock will be automatically released when block ends
62//!         // Thanks to Rust's Drop trait implementation
63//!     }
64//!
65//!     Ok(())
66//! }
67//! ```
68//!
69//! ### Advanced Configuration
70//!
71//! ```rust
72//! use async_redis_lock::options::Options;
73//! use std::time::Duration;
74//!
75//! #[tokio::main]
76//! async fn main() -> anyhow::Result<()> {
77//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
78//!
79//!     // Build a custom lock options
80//!     let opts = Options::new()
81//!         // Set auto-extend interval (default: 1s)
82//!         // Must be shorter than lifetime to prevent unintended lock release
83//!         .extend_interval(Duration::from_secs(3))
84//!         // Set retry interval between acquisition attempts (default: 500ms)
85//!         .retry_interval(Duration::from_secs(1))
86//!         // Set maximum time to attempt acquisition (default: 1s)
87//!         // None means retry indefinitely
88//!         .retry_timeout(Some(Duration::from_secs(3)))
89//!         // Set maximum duration before auto-release (default: 2s)
90//!         .lifetime(Duration::from_secs(5));
91//!
92//!     // Acquire lock with the custom options
93//!     let _lock = locker.acquire_with_options(&opts, "lock_key").await?;
94//!
95//!     // Perform operations that require locking
96//!     // ...
97//! }
98//! ```
99//! ## Important Notes
100//!
101//! 1. Don't ignore the return value of acquire method, or the lock will release immediately
102//! 2. extend_interval must be less than lifetime to prevent unintended release
103//! 3. Lock implements Drop trait and will auto-release when out of scope
104//!
105//!
106pub mod error;
107pub mod execs;
108pub mod options;
109
110use crate::error::Error;
111use crate::error::Error::IdNotFound;
112use crate::execs::*;
113use crate::options::Options;
114
115use anyhow::Result;
116use redis::aio::{ConnectionManager, ConnectionManagerConfig};
117use tokio::sync::oneshot;
118use tokio::time::sleep;
119use tokio::{select, spawn};
120
121#[derive(Clone)]
122pub struct Locker {
123    client: redis::Client,
124    conn_manager: ConnectionManager,
125}
126
127impl Locker {
128    pub async fn from_redis_url(url: &str) -> Result<Self> {
129        let client = redis::Client::open(url)?;
130        let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
131        let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
132        Ok(Self {
133            client,
134            conn_manager: async_conn_manager,
135        })
136    }
137
138    pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
139        self.acquire_with_options(&Options::default(), lock_key)
140            .await
141    }
142
143    pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
144        let lock_id = lock(
145            &mut self.conn_manager,
146            lock_key,
147            opts.lifetime,
148            opts.retry_interval,
149            opts.retry_timeout,
150        )
151        .await?;
152
153        let mut conn = self.conn_manager.clone();
154        let opts = opts.clone();
155        let lock_key_c1 = lock_key.to_owned();
156        let lock_id_c1 = lock_id.clone();
157        let (stop_tx, mut stop_rx) = oneshot::channel();
158
159        spawn(async move {
160            loop {
161                select! {
162                    _ = &mut stop_rx => break,
163                    _ = sleep(opts.extend_interval) => {
164                        if let Err(e) = extend(
165                            &mut conn,
166                            &lock_key_c1,
167                            &lock_id_c1,
168                            opts.lifetime,
169                        )
170                        .await
171                        {
172                            if let Some(e) = e.downcast_ref::<Error>() {
173                                if matches!(e, IdNotFound) {
174                                    break;
175                                }
176                            }
177                        }
178                    },
179                }
180            }
181        });
182
183        let cli = self.client.clone();
184        let lock_key_c2 = lock_key.to_owned();
185        let lock_id_c2 = lock_id.clone();
186
187        Ok(Lock {
188            release_fn: Some(Box::new(move || -> Result<()> {
189                let _ = stop_tx.send(());
190                let mut conn = cli.get_connection()?;
191                unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
192            })),
193        })
194    }
195}
196
197pub struct Lock {
198    pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
199}
200
201impl Lock {
202    pub fn release(mut self) -> Result<()> {
203        self.call_release()
204    }
205
206    fn call_release(&mut self) -> Result<()> {
207        match self.release_fn.take() {
208            Some(release_fn) => release_fn(),
209            None => Ok(()),
210        }
211    }
212}
213
214impl Drop for Lock {
215    fn drop(&mut self) {
216        let _ = self.call_release();
217    }
218}
219
220#[cfg(test)]
221mod test {
222    use super::*;
223    use std::time::Duration;
224
225    #[tokio::test]
226    async fn test_lock_exclusive() {
227        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
228            .await
229            .unwrap();
230        let lock_key = String::from("test:test_lock_exclusive_key");
231
232        let r = locker.acquire(&lock_key).await;
233        assert!(r.is_ok(), "Should acquire a lock");
234
235        match locker.acquire(&lock_key).await.err() {
236            None => assert!(false, "Should get an error when acquiring another lock"),
237            Some(e) => {
238                assert_eq!(
239                    e.downcast_ref::<Error>().unwrap(),
240                    &Error::Timeout,
241                    "Should get a timed out error when acquiring another lock"
242                )
243            }
244        }
245
246        assert!(r.unwrap().release().is_ok(), "Should release a lock");
247
248        assert!(
249            locker.acquire(&lock_key).await.is_ok(),
250            "Should acquire a lock after another lock is released"
251        );
252    }
253
254    #[tokio::test]
255    async fn test_lock_drop() {
256        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
257            .await
258            .unwrap();
259        let lock_key = "test:test_lock_drop_key";
260
261        {
262            let r = locker.acquire(&lock_key).await;
263            assert!(r.is_ok(), "Should acquire a lock in a scope");
264
265            match locker.acquire(&lock_key).await.err() {
266                None => assert!(
267                    false,
268                    "Should get an error when acquiring another lock in a scope"
269                ),
270                Some(e) => {
271                    assert_eq!(
272                        e.downcast_ref::<Error>().unwrap(),
273                        &Error::Timeout,
274                        "Should get an timed out error when acquiring another lock in a scope"
275                    );
276                }
277            }
278        }
279
280        assert!(
281            locker.acquire(&lock_key).await.is_ok(),
282            "Should acquire a lock out of the prev scope"
283        );
284    }
285
286    #[tokio::test]
287    async fn test_lock_passive_release() {
288        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
289            .await
290            .unwrap();
291        let lock_key = "test:test_lock_passive_release_key";
292
293        let opts = Options::new()
294            .lifetime(Duration::from_secs(2))
295            .extend_interval(Duration::from_secs(3));
296        let r = locker.acquire_with_options(&opts, &lock_key).await;
297        assert!(
298            r.is_ok(),
299            "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
300        );
301
302        sleep(Duration::from_secs(3)).await;
303        assert!(
304            locker.acquire(&lock_key).await.is_ok(),
305            "Should passively release a lock when the lifetime is reached"
306        );
307    }
308
309    #[tokio::test]
310    async fn test_lock_extend() {
311        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
312            .await
313            .unwrap();
314        let lock_key = "test:test_lock_extend_key";
315        let opts = Options::new()
316            .lifetime(Duration::from_secs(3))
317            .extend_interval(Duration::from_secs(2));
318        let r = locker.acquire_with_options(&opts, &lock_key).await;
319        assert!(
320            r.is_ok(),
321            "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime."
322        );
323
324        sleep(Duration::from_secs(5)).await;
325        match locker.acquire(&lock_key).await.err() {
326            None => assert!(false, "Should expand lock lifetime automatically."),
327            Some(e) => {
328                assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
329            }
330        }
331    }
332}