async_redis_lock/
lib.rs

1//! [![Crates.io](https://img.shields.io/crates/v/async-redis-lock)](https://crates.io/crates/redis-lock)
2//! [![docs](https://img.shields.io/crates/v/async-redis-lock?color=yellow&label=docs)](https://docs.rs/redis-lock)
3//!
4//! A simple and easy-to-use asynchronous redis distributed lock implementation based on tokio and redis-rs.
5//!
6//! ## Key Features
7//!
8//! - ✨ **Auto Extension** - Automatically extends lock lifetime in background until released
9//! - 🔒 **Passive Release** - Lock automatically releases when lifetime expires after process crash
10//! - 🎯 **Drop Support** - Supports both implicit release via drop and explicit release via method call
11//!
12//! ## Quick Start
13//!
14//! ### Installation
15//!
16//! ```toml
17//! [dependencies]
18//! async-redis-lock = "0.0.1"
19//! ```
20//!
21//! ### Basic Usage
22//!
23//! ```rust
24//! use async_redis_lock::Locker;
25//!
26//! #[tokio::main]
27//! async fn main() -> anyhow::Result<()> {
28//!     // Create locker
29//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
30//!
31//!     // Acquire lock
32//!     let lock = locker.acquire("lock_key").await?;
33//!
34//!     // At this point:
35//!     // 1. Lock is held
36//!     // 2. Background task automatically extends lock TTL
37//!     // 3. Safe to perform critical operations
38//!     // ...
39//!
40//!     // Release lock explicitly
41//!     // Alternative: drop(lock) for implicit release
42//!     lock.release()?;
43//!
44//!     Ok(())
45//! }
46//! ```
47//!
48//! ### Automatic Release
49//!
50//! ```rust
51//! use async_redis_lock::Locker;
52//!
53//! #[tokio::main]
54//! async fn main() -> anyhow::Result<()> {
55//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
56//!
57//!     // Use block scope to control lock lifetime
58//!     {
59//!         // Acquire lock and store in _lock variable
60//!         // The _ prefix indicates we only care about its Drop behavior
61//!         let _lock = locker.acquire("lock_key").await?;
62//!         // Perform operations that require locking
63//!         // ...
64//!         // Lock will be automatically released when block ends
65//!         // Thanks to Rust's Drop trait implementation
66//!     }
67//!
68//!     Ok(())
69//! }
70//! ```
71//!
72//! ### Advanced Configuration
73//!
74//! ```rust
75//! use async_redis_lock::Locker;
76//! use async_redis_lock::options::Options;
77//! use std::time::Duration;
78//!
79//! #[tokio::main]
80//! async fn main() -> anyhow::Result<()> {
81//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
82//!
83//!     // Build a custom lock options
84//!     let opts = Options::new()
85//!         // Set interval between acquisition attempts
86//!         // Default: 100ms
87//!         .retry(Duration::from_millis(100))
88//!         // Set maximum time to attempt acquisition
89//!         // Default: Some(1s)
90//!         // Note: none means retry indefinitely
91//!         .timeout(Some(Duration::from_secs(1)))
92//!         // Set lock time-to-live before auto-release
93//!         // Default: 3s
94//!         .ttl(Duration::from_secs(3))
95//!         // Set lock auto-extend interval
96//!         // Default: 1s
97//!         // Recommend: lifetime/3
98//!         .extend(Duration::from_secs(1));
99//!
100//!     // Acquire lock with the custom options
101//!     let _lock = locker.acquire_with_options(&opts, "lock_key").await?;
102//!
103//!     // Perform operations that require locking
104//!     // ...
105//!
106//!     Ok(())
107//! }
108//! ```
109//! ## Important Notes
110//!
111//! 1. Don't ignore the return value of acquire method, or the lock will release immediately
112//! 2. If the extend_interval is too large, the lock extension may fail because the lock has been passively released (by expiration) before the extension attempt
113//! 3. Lock implements Drop trait and will auto-release when out of scope
114//!
115//!
116pub mod error;
117pub mod execs;
118pub mod options;
119
120use crate::error::Error;
121use crate::error::Error::IdNotFound;
122use crate::execs::*;
123use crate::options::Options;
124use std::time::SystemTime;
125
126use anyhow::Result;
127use redis::aio::{ConnectionManager, ConnectionManagerConfig};
128use tokio::sync::oneshot;
129use tokio::time::sleep;
130use tokio::{select, spawn};
131
132#[derive(Clone)]
133pub struct Locker {
134    client: redis::Client,
135    conn_manager: ConnectionManager,
136}
137
138impl Locker {
139    pub async fn from_redis_url(url: &str) -> Result<Self> {
140        let client = redis::Client::open(url)?;
141        let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
142        let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
143        Ok(Self {
144            client,
145            conn_manager: async_conn_manager,
146        })
147    }
148
149    pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
150        self.acquire_with_options(&Options::default(), lock_key)
151            .await
152    }
153
154    pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
155        let lock_id = lock(
156            &mut self.conn_manager,
157            lock_key,
158            opts.ttl,
159            opts.retry,
160            opts.timeout,
161        )
162        .await?;
163
164        println!(
165            "lock: {}",
166            SystemTime::now()
167                .duration_since(SystemTime::UNIX_EPOCH)
168                .unwrap()
169                .as_secs()
170        );
171
172        let mut conn = self.conn_manager.clone();
173        let opts = opts.clone();
174        let lock_key_c1 = lock_key.to_owned();
175        let lock_id_c1 = lock_id.clone();
176        let (stop_tx, mut stop_rx) = oneshot::channel();
177
178        spawn(async move {
179            loop {
180                select! {
181                    _ = &mut stop_rx => break,
182                    _ = sleep(opts.extend) => {
183                        println!("extend: {}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
184                        if let Err(e) = extend(
185                            &mut conn,
186                            &lock_key_c1,
187                            &lock_id_c1,
188                            opts.ttl,
189                        )
190                        .await
191                        {
192                            if let Some(e) = e.downcast_ref::<Error>() && matches!(e, IdNotFound) {
193                                println!("extend failed: {}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
194                                break;
195                            }
196                        } else {
197                            println!("extend ok: {}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
198                        }
199                    },
200                }
201            }
202            println!(
203                "extend exit: {}",
204                SystemTime::now()
205                    .duration_since(SystemTime::UNIX_EPOCH)
206                    .unwrap()
207                    .as_secs()
208            );
209        });
210
211        let cli = self.client.clone();
212        let lock_key_c2 = lock_key.to_owned();
213        let lock_id_c2 = lock_id.clone();
214
215        Ok(Lock {
216            release_fn: Some(Box::new(move || -> Result<()> {
217                let _ = stop_tx.send(());
218                let mut conn = cli.get_connection()?;
219                unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
220            })),
221        })
222    }
223}
224
225pub struct Lock {
226    pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
227}
228
229impl Lock {
230    pub fn release(mut self) -> Result<()> {
231        self.call_release()
232    }
233
234    fn call_release(&mut self) -> Result<()> {
235        match self.release_fn.take() {
236            Some(release_fn) => release_fn(),
237            None => Ok(()),
238        }
239    }
240}
241
242impl Drop for Lock {
243    fn drop(&mut self) {
244        let _ = self.call_release();
245    }
246}
247
248#[cfg(test)]
249mod test {
250    use super::*;
251    use std::time::Duration;
252
253    #[tokio::test]
254    async fn test_lock_exclusive() {
255        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
256            .await
257            .unwrap();
258        let lock_key = String::from("test:test_lock_exclusive_key");
259
260        let r = locker.acquire(&lock_key).await;
261        assert!(r.is_ok(), "Should acquire a lock");
262
263        match locker.acquire(&lock_key).await.err() {
264            None => assert!(false, "Should get an error when acquiring another lock"),
265            Some(e) => {
266                assert_eq!(
267                    e.downcast_ref::<Error>().unwrap(),
268                    &Error::Timeout,
269                    "Should get a timed out error when acquiring another lock"
270                )
271            }
272        }
273
274        assert!(r.unwrap().release().is_ok(), "Should release a lock");
275
276        assert!(
277            locker.acquire(&lock_key).await.is_ok(),
278            "Should acquire a lock after another lock is released"
279        );
280    }
281
282    #[tokio::test]
283    async fn test_lock_drop() {
284        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
285            .await
286            .unwrap();
287        let lock_key = "test:test_lock_drop_key";
288
289        {
290            let r = locker.acquire(&lock_key).await;
291            assert!(r.is_ok(), "Should acquire a lock in a scope");
292
293            match locker.acquire(&lock_key).await.err() {
294                None => assert!(
295                    false,
296                    "Should get an error when acquiring another lock in a scope"
297                ),
298                Some(e) => {
299                    assert_eq!(
300                        e.downcast_ref::<Error>().unwrap(),
301                        &Error::Timeout,
302                        "Should get an timed out error when acquiring another lock in a scope"
303                    );
304                }
305            }
306        }
307
308        assert!(
309            locker.acquire(&lock_key).await.is_ok(),
310            "Should acquire a lock out of the prev scope"
311        );
312    }
313
314    #[tokio::test]
315    async fn test_lock_passive_release() {
316        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
317            .await
318            .unwrap();
319        let lock_key = "test:test_lock_passive_release_key";
320
321        let opts = Options::new()
322            .ttl(Duration::from_secs(2))
323            .extend(Duration::from_secs(3));
324        let r = locker.acquire_with_options(&opts, &lock_key).await;
325        assert!(
326            r.is_ok(),
327            "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
328        );
329
330        sleep(Duration::from_secs(3)).await;
331        assert!(
332            locker.acquire(&lock_key).await.is_ok(),
333            "Should passively release a lock when the lifetime is reached"
334        );
335    }
336
337    #[tokio::test]
338    async fn test_lock_extend() {
339        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
340            .await
341            .unwrap();
342        let lock_key = "test:test_lock_extend_key";
343        let opts = Options::new()
344            .ttl(Duration::from_secs(3))
345            .extend(Duration::from_secs(1));
346        let r = locker.acquire_with_options(&opts, &lock_key).await;
347        assert!(
348            r.is_ok(),
349            "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime"
350        );
351
352        sleep(Duration::from_secs(8)).await;
353        match locker.acquire(&lock_key).await.err() {
354            None => assert!(false, "Should extend lock lifetime automatically"),
355            Some(e) => {
356                assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
357            }
358        }
359    }
360}