async_redis_lock/
lib.rs

1//! [![Crates.io](https://img.shields.io/crates/v/async-redis-lock)](https://crates.io/crates/redis-lock)
2//! [![docs](https://img.shields.io/crates/v/async-redis-lock?color=yellow&label=docs)](https://docs.rs/redis-lock)
3//!
4//! A simple and easy-to-use asynchronous redis distributed lock implementation based on tokio and redis-rs.
5//!
6//! ## Key Features
7//!
8//! - ✨ **Auto Extension** - Automatically extends lock lifetime in background until released
9//! - 🔒 **Passive Release** - Lock automatically releases when lifetime expires after process crash
10//! - 🎯 **Drop Support** - Supports both implicit release via drop and explicit release via method call
11//! - 🔗 **Multi-key Locking** - Ability to lock multiple keys simultaneously ensuring atomic operations across them
12//!
13//! ## Quick Start
14//!
15//! ### Installation
16//!
17//! ```toml
18//! [dependencies]
19//! async-redis-lock = "0.2.1"
20//! ```
21//!
22//! ### Basic Usage
23//!
24//! ```rust
25//! use async_redis_lock::Locker;
26//!
27//! #[tokio::main]
28//! async fn main() -> anyhow::Result<()> {
29//!     // Create locker
30//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
31//!
32//!     // Acquire lock
33//!     let lock = locker.acquire("lock_key").await?;
34//!
35//!     // At this point:
36//!     // 1. Lock is held
37//!     // 2. Background task automatically extends lock TTL
38//!     // 3. Safe to perform critical operations
39//!     // ...
40//!
41//!     // Release lock explicitly
42//!     // Alternative: drop(lock) for implicit release
43//!     lock.release()?;
44//!
45//!     Ok(())
46//! }
47//! ```
48//!
49//! ### Automatic Release
50//!
51//! ```rust
52//! use async_redis_lock::Locker;
53//!
54//! #[tokio::main]
55//! async fn main() -> anyhow::Result<()> {
56//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
57//!
58//!     // Use block scope to control lock lifetime
59//!     {
60//!         // Acquire lock and store in _lock variable
61//!         // The _ prefix indicates we only care about its Drop behavior
62//!         let _lock = locker.acquire("lock_key").await?;
63//!         // Perform operations that require locking
64//!         // ...
65//!         // Lock will be automatically released when block ends
66//!         // Thanks to Rust's Drop trait implementation
67//!     }
68//!
69//!     Ok(())
70//! }
71//! ```
72//!
73//! ### Advanced Configuration
74//!
75//! ```rust
76//! use async_redis_lock::Locker;
77//! use async_redis_lock::options::Options;
78//! use std::time::Duration;
79//!
80//! #[tokio::main]
81//! async fn main() -> anyhow::Result<()> {
82//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
83//!
84//!     // Build a custom lock options
85//!     let opts = Options::new()
86//!         // Set interval between acquisition attempts
87//!         // Default: 100ms
88//!         .retry(Duration::from_millis(100))
89//!         // Set maximum time to attempt acquisition
90//!         // Default: Some(1s)
91//!         // Note: none means retry indefinitely
92//!         .timeout(Some(Duration::from_secs(1)))
93//!         // Set lock time-to-live before auto-release
94//!         // Default: 3s
95//!         .ttl(Duration::from_secs(3))
96//!         // Set lock auto-extend interval
97//!         // Default: 1s
98//!         // Recommend: ttl/3
99//!         .extend(Duration::from_secs(1));
100//!
101//!     // Acquire lock with the custom options
102//!     let _lock = locker.acquire_with_options(&opts, "lock_key").await?;
103//!
104//!     // Perform operations that require locking
105//!     // ...
106//!
107//!     Ok(())
108//! }
109//! ```
110//! ## Important Notes
111//!
112//! 1. Don't ignore the return value of acquire method, or the lock will release immediately
113//! 2. If the extension interval is too large, the lock extension may fail because the lock has been passively released (by expiration) before the extension attempt, the recommend is ttl/3
114//! 3. Lock implements Drop trait and will auto-release when out of scope
115//!
116//!
117pub mod error;
118pub mod execs;
119pub mod options;
120
121use crate::error::Error;
122use crate::error::Error::IdNotFound;
123use crate::execs::*;
124use crate::options::Options;
125
126use anyhow::Result;
127use redis::ToRedisArgs;
128use redis::aio::{ConnectionManager, ConnectionManagerConfig};
129use tokio::sync::oneshot;
130use tokio::time::sleep;
131use tokio::{select, spawn};
132
133#[derive(Clone)]
134pub struct Locker {
135    client: redis::Client,
136    conn_manager: ConnectionManager,
137}
138
139impl Locker {
140    pub async fn from_redis_url(url: &str) -> Result<Self> {
141        let client = redis::Client::open(url)?;
142        let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
143        let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
144        Ok(Self {
145            client,
146            conn_manager: async_conn_manager,
147        })
148    }
149
150    /// Acquires a lock for the given key(s).
151    ///
152    /// # Arguments
153    /// * `lock_keys` - A single key or a collection of keys to be locked.
154    pub async fn acquire<K>(&mut self, lock_keys: &K) -> Result<Lock>
155    where
156        K: ToRedisArgs + Clone + Send + 'static,
157    {
158        self.acquire_with_options(&Options::default(), lock_keys)
159            .await
160    }
161
162    /// Acquires a lock for the given key(s) with custom options.
163    ///
164    /// # Arguments
165    /// * `opts` - Options for lock acquisition and management.
166    /// * `lock_keys` - A single key or a collection of keys to be locked.
167    pub async fn acquire_with_options<K>(&mut self, opts: &Options, lock_keys: &K) -> Result<Lock>
168    where
169        K: ToRedisArgs + Clone + Send + 'static,
170    {
171        let lock_id = lock(
172            &mut self.conn_manager,
173            lock_keys,
174            opts.ttl,
175            opts.retry,
176            opts.timeout,
177        )
178        .await?;
179
180        let mut conn = self.conn_manager.clone();
181        let opts = opts.clone();
182        let lock_keys_copy = lock_keys.clone();
183        let lock_id_copy = lock_id.clone();
184        let (stop_tx, mut stop_rx) = oneshot::channel();
185
186        spawn(async move {
187            loop {
188                let lock_keys = lock_keys_copy.clone();
189                select! {
190                    _ = &mut stop_rx => break,
191                    _ = sleep(opts.extend) => {
192                        if let Err(e) = extend(
193                            &mut conn,
194                            lock_keys,
195                            &lock_id_copy,
196                            opts.ttl,
197                        )
198                        .await
199                        {
200                            if let Some(IdNotFound) = e.downcast_ref::<Error>() {
201                                break;
202                            }
203                        }
204                    },
205                }
206            }
207        });
208
209        let cli = self.client.clone();
210        let lock_keys_copy = lock_keys.clone();
211        Ok(Lock {
212            release_fn: Some(Box::new(move || -> Result<()> {
213                let _ = stop_tx.send(());
214                let mut conn = cli.get_connection()?;
215                unlock_sync(&mut conn, lock_keys_copy, &lock_id)
216            })),
217        })
218    }
219}
220
221pub struct Lock {
222    release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
223}
224
225impl Lock {
226    pub fn release(mut self) -> Result<()> {
227        self.call_release()
228    }
229
230    fn call_release(&mut self) -> Result<()> {
231        match self.release_fn.take() {
232            Some(release_fn) => release_fn(),
233            None => Ok(()),
234        }
235    }
236}
237
238impl Drop for Lock {
239    fn drop(&mut self) {
240        let _ = self.call_release();
241    }
242}
243
244#[cfg(test)]
245mod test {
246    use super::*;
247    use std::time::Duration;
248
249    #[tokio::test]
250    async fn test_lock_exclusive() {
251        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
252            .await
253            .unwrap();
254
255        let lock_keys = vec![
256            "test:test_lock_exclusive_key_1",
257            "test:test_lock_exclusive_key_2",
258        ];
259
260        let r = locker.acquire(&lock_keys).await;
261        assert!(r.is_ok(), "Should acquire a lock");
262
263        match locker.acquire(&lock_keys).await.err() {
264            None => assert!(false, "Should get an error when acquiring another lock"),
265            Some(e) => {
266                assert_eq!(
267                    e.downcast_ref::<Error>().unwrap(),
268                    &Error::Timeout,
269                    "Should get a timed out error when acquiring another lock"
270                )
271            }
272        }
273
274        assert!(r.unwrap().release().is_ok(), "Should release a lock");
275
276        assert!(
277            locker.acquire(&lock_keys).await.is_ok(),
278            "Should acquire a lock after another lock is released"
279        );
280    }
281
282    #[tokio::test]
283    async fn test_lock_drop() {
284        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
285            .await
286            .unwrap();
287
288        let lock_keys = vec!["test:test_lock_drop_key_1", "test:test_lock_drop_key_2"];
289
290        {
291            let r = locker.acquire(&lock_keys).await;
292            assert!(r.is_ok(), "Should acquire a lock in a scope");
293
294            match locker.acquire(&lock_keys).await.err() {
295                None => assert!(
296                    false,
297                    "Should get an error when acquiring another lock in a scope"
298                ),
299                Some(e) => {
300                    assert_eq!(
301                        e.downcast_ref::<Error>().unwrap(),
302                        &Error::Timeout,
303                        "Should get an timed out error when acquiring another lock in a scope"
304                    );
305                }
306            }
307        }
308
309        assert!(
310            locker.acquire(&lock_keys).await.is_ok(),
311            "Should acquire a lock out of the prev scope"
312        );
313    }
314
315    #[tokio::test]
316    async fn test_lock_passive_release() {
317        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
318            .await
319            .unwrap();
320
321        let lock_keys = vec![
322            "test:test_lock_passive_release_key_1",
323            "test:test_lock_passive_release_key_2",
324        ];
325
326        let opts = Options::new()
327            .ttl(Duration::from_secs(2))
328            .extend(Duration::from_secs(3));
329        let r = locker.acquire_with_options(&opts, &lock_keys).await;
330        assert!(
331            r.is_ok(),
332            "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
333        );
334
335        sleep(Duration::from_secs(10)).await;
336        assert!(
337            locker.acquire(&lock_keys).await.is_ok(),
338            "Should passively release a lock when the lifetime is reached"
339        );
340    }
341
342    #[tokio::test]
343    async fn test_lock_extend() {
344        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
345            .await
346            .unwrap();
347        let lock_keys = vec!["test:test_lock_extend_key_1", "test:test_lock_extend_key_2"];
348
349        let opts = Options::new()
350            .ttl(Duration::from_secs(3))
351            .extend(Duration::from_secs(1));
352        let r = locker.acquire_with_options(&opts, &lock_keys).await;
353        assert!(
354            r.is_ok(),
355            "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime"
356        );
357
358        sleep(Duration::from_secs(5)).await;
359        match locker.acquire(&lock_keys).await.err() {
360            None => assert!(false, "Should extend lock lifetime automatically"),
361            Some(e) => {
362                assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
363            }
364        }
365    }
366}