async_redis_lock/
lib.rs

1//! A simple and easy-to-use asynchronous redis distributed read-write lock implementation based on tokio and redis-rs.
2//!
3//! ## Features:
4//!
5//! 1. Automatic extension: When a lock is acquired, the lifetime of the lock will be automatically extended in background until the lock is released.
6//! 2. Passive release: When progress exit abnormally, the lock will be automatically released the lifetime is exhausted.
7//! 3. Drop support: Lock can be released implicitly by drop as well as explicitly by call release method.
8//!
9//! ## Examples:
10//! ```rust
11//!
12//! use async_redis_lock::{Locker, options::Options};
13//! use std::time::Duration;
14//! use tokio::time::sleep;
15//!
16//! #[tokio::main]
17//! async fn main() -> anyhow::Result<()> {
18//!     // Create locker from the redis url.
19//!     let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0").await?;
20//!
21//!     // Lock key.
22//!     let lock_key = "lock_key";
23//!
24//!     // Acquire a lock.
25//!     let lock = locker.acquire(lock_key).await?;
26//!
27//!     // Do something.
28//!     sleep(Duration::from_secs(5)).await;
29//!
30//!     // Release the lock explicitly.
31//!     lock.release()?;
32//!
33//!     // Acquire another lock in a scope.
34//!     {
35//!         // the type of _lock has implemented Drop trait, so the _lock will be automatically released when _lock goes out the scope.
36//!         // Note: Do not ignore the returned value of the acquire method, otherwise the lock will be released immediately.
37//!         let _lock = locker.acquire(lock_key).await?;
38//!     }
39//!
40//!     // Acquire lock with custom options.
41//!     let opts = Options::new()
42//!         // Set the wait time for passively released the lock when the process exits abnormally.
43//!         .lifetime(Duration::from_secs(5))
44//!         // Set the retry interval when the lock acquisition fails due to reasons such as existing other locks.
45//!         .retry_interval(Duration::from_secs(1))
46//!         // Set the retry timeout, if the value is None, retry util the lock acquisition success.
47//!         .retry_timeout(Some(Duration::from_secs(3)))
48//!         // Set the interval for automatically extend the lock lifetime.
49//!         // This value should always be less than lifetime,
50//!         // otherwise the lock will be passively released when extend the lifetime.
51//!         .extend_interval(Duration::from_secs(3));
52//!     locker.acquire_with_options(&opts, lock_key).await?;
53//!
54//!     Ok(())
55//! }
56//! ```
57//!
58pub mod error;
59pub mod execs;
60pub mod options;
61
62use crate::error::Error;
63use crate::error::Error::IdNotFound;
64use crate::execs::*;
65use crate::options::Options;
66
67use anyhow::Result;
68use redis::aio::{ConnectionManager, ConnectionManagerConfig};
69use tokio::sync::oneshot;
70use tokio::time::sleep;
71use tokio::{select, spawn};
72
73#[derive(Clone)]
74pub struct Locker {
75    client: redis::Client,
76    conn_manager: ConnectionManager,
77}
78
79impl Locker {
80    pub async fn from_redis_url(url: &str) -> Result<Self> {
81        let client = redis::Client::open(url)?;
82        let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
83        let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
84        Ok(Self {
85            client,
86            conn_manager: async_conn_manager,
87        })
88    }
89
90    pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
91        self.acquire_with_options(&Options::default(), lock_key)
92            .await
93    }
94
95    pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
96        let lock_id = lock(
97            &mut self.conn_manager,
98            lock_key,
99            opts.lifetime,
100            opts.retry_interval,
101            opts.retry_timeout,
102        )
103        .await?;
104
105        let mut conn = self.conn_manager.clone();
106        let opts = opts.clone();
107        let lock_key_c1 = lock_key.to_owned();
108        let lock_id_c1 = lock_id.clone();
109        let (stop_tx, mut stop_rx) = oneshot::channel();
110
111        spawn(async move {
112            loop {
113                select! {
114                    _ = &mut stop_rx => break,
115                    _ = sleep(opts.extend_interval) => {
116                        if let Err(e) = extend(
117                            &mut conn,
118                            &lock_key_c1,
119                            &lock_id_c1,
120                            opts.lifetime,
121                        )
122                        .await
123                        {
124                            if let Some(e) = e.downcast_ref::<Error>() {
125                                if matches!(e, IdNotFound) {
126                                    break;
127                                }
128                            }
129                        }
130                    },
131                }
132            }
133        });
134
135        let cli = self.client.clone();
136        let lock_key_c2 = lock_key.to_owned();
137        let lock_id_c2 = lock_id.clone();
138
139        Ok(Lock {
140            release_fn: Some(Box::new(move || -> Result<()> {
141                let _ = stop_tx.send(());
142                let mut conn = cli.get_connection()?;
143                unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
144            })),
145        })
146    }
147}
148
149pub struct Lock {
150    pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
151}
152
153impl Lock {
154    pub fn release(mut self) -> Result<()> {
155        self.call_release()
156    }
157
158    fn call_release(&mut self) -> Result<()> {
159        match self.release_fn.take() {
160            Some(release_fn) => release_fn(),
161            None => Ok(()),
162        }
163    }
164}
165
166impl Drop for Lock {
167    fn drop(&mut self) {
168        let _ = self.call_release();
169    }
170}
171
172#[cfg(test)]
173mod test {
174    use super::*;
175    use std::time::Duration;
176
177    #[tokio::test]
178    async fn test_lock_exclusive() {
179        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
180            .await
181            .unwrap();
182        let lock_key = String::from("test:test_lock_exclusive_key");
183
184        let r = locker.acquire(&lock_key).await;
185        assert!(r.is_ok(), "Should acquire a lock");
186
187        match locker.acquire(&lock_key).await.err() {
188            None => assert!(false, "Should get an error when acquiring another lock"),
189            Some(e) => {
190                assert_eq!(
191                    e.downcast_ref::<Error>().unwrap(),
192                    &Error::Timeout,
193                    "Should get a timed out error when acquiring another lock"
194                )
195            }
196        }
197
198        assert!(r.unwrap().release().is_ok(), "Should release a lock");
199
200        assert!(
201            locker.acquire(&lock_key).await.is_ok(),
202            "Should acquire a lock after another lock is released"
203        );
204    }
205
206    #[tokio::test]
207    async fn test_lock_drop() {
208        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
209            .await
210            .unwrap();
211        let lock_key = "test:test_lock_drop_key";
212
213        {
214            let r = locker.acquire(&lock_key).await;
215            assert!(r.is_ok(), "Should acquire a lock in a scope");
216
217            match locker.acquire(&lock_key).await.err() {
218                None => assert!(
219                    false,
220                    "Should get an error when acquiring another lock in a scope"
221                ),
222                Some(e) => {
223                    assert_eq!(
224                        e.downcast_ref::<Error>().unwrap(),
225                        &Error::Timeout,
226                        "Should get an timed out error when acquiring another lock in a scope"
227                    );
228                }
229            }
230        }
231
232        assert!(
233            locker.acquire(&lock_key).await.is_ok(),
234            "Should acquire a lock out of the prev scope"
235        );
236    }
237
238    #[tokio::test]
239    async fn test_lock_passive_release() {
240        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
241            .await
242            .unwrap();
243        let lock_key = "test:test_lock_passive_release_key";
244
245        let opts = Options::new()
246            .lifetime(Duration::from_secs(2))
247            .extend_interval(Duration::from_secs(3));
248        let r = locker.acquire_with_options(&opts, &lock_key).await;
249        assert!(
250            r.is_ok(),
251            "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
252        );
253
254        sleep(Duration::from_secs(3)).await;
255        assert!(
256            locker.acquire(&lock_key).await.is_ok(),
257            "Should passively release a lock when the lifetime is reached"
258        );
259    }
260
261    #[tokio::test]
262    async fn test_lock_extend() {
263        let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
264            .await
265            .unwrap();
266        let lock_key = "test:test_lock_extend_key";
267        let opts = Options::new()
268            .lifetime(Duration::from_secs(3))
269            .extend_interval(Duration::from_secs(2));
270        let r = locker.acquire_with_options(&opts, &lock_key).await;
271        assert!(
272            r.is_ok(),
273            "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime."
274        );
275
276        sleep(Duration::from_secs(5)).await;
277        match locker.acquire(&lock_key).await.err() {
278            None => assert!(false, "Should expand lock lifetime automatically."),
279            Some(e) => {
280                assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
281            }
282        }
283    }
284}