1pub mod error;
117pub mod execs;
118pub mod options;
119
120use crate::error::Error;
121use crate::error::Error::IdNotFound;
122use crate::execs::*;
123use crate::options::Options;
124
125use anyhow::Result;
126use redis::aio::{ConnectionManager, ConnectionManagerConfig};
127use tokio::sync::oneshot;
128use tokio::time::sleep;
129use tokio::{select, spawn};
130
131#[derive(Clone)]
132pub struct Locker {
133 client: redis::Client,
134 conn_manager: ConnectionManager,
135}
136
137impl Locker {
138 pub async fn from_redis_url(url: &str) -> Result<Self> {
139 let client = redis::Client::open(url)?;
140 let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
141 let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
142 Ok(Self {
143 client,
144 conn_manager: async_conn_manager,
145 })
146 }
147
148 pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
149 self.acquire_with_options(&Options::default(), lock_key)
150 .await
151 }
152
153 pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
154 let lock_id = lock(
155 &mut self.conn_manager,
156 lock_key,
157 opts.ttl,
158 opts.retry,
159 opts.timeout,
160 )
161 .await?;
162
163 let mut conn = self.conn_manager.clone();
164 let opts = opts.clone();
165 let lock_key_c1 = lock_key.to_owned();
166 let lock_id_c1 = lock_id.clone();
167 let (stop_tx, mut stop_rx) = oneshot::channel();
168
169 spawn(async move {
170 loop {
171 select! {
172 _ = &mut stop_rx => break,
173 _ = sleep(opts.extend) => {
174 if let Err(e) = extend(
175 &mut conn,
176 &lock_key_c1,
177 &lock_id_c1,
178 opts.ttl,
179 )
180 .await
181 {
182 if let Some(e) = e.downcast_ref::<Error>() && matches!(e, IdNotFound) {
183 break;
184 }
185 }
186 },
187 }
188 }
189 });
190
191 let cli = self.client.clone();
192 let lock_key_c2 = lock_key.to_owned();
193 let lock_id_c2 = lock_id.clone();
194
195 Ok(Lock {
196 release_fn: Some(Box::new(move || -> Result<()> {
197 let _ = stop_tx.send(());
198 let mut conn = cli.get_connection()?;
199 unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
200 })),
201 })
202 }
203}
204
205pub struct Lock {
206 pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
207}
208
209impl Lock {
210 pub fn release(mut self) -> Result<()> {
211 self.call_release()
212 }
213
214 fn call_release(&mut self) -> Result<()> {
215 match self.release_fn.take() {
216 Some(release_fn) => release_fn(),
217 None => Ok(()),
218 }
219 }
220}
221
222impl Drop for Lock {
223 fn drop(&mut self) {
224 let _ = self.call_release();
225 }
226}
227
228#[cfg(test)]
229mod test {
230 use super::*;
231 use std::time::Duration;
232
233 #[tokio::test]
234 async fn test_lock_exclusive() {
235 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
236 .await
237 .unwrap();
238 let lock_key = String::from("test:test_lock_exclusive_key");
239
240 let r = locker.acquire(&lock_key).await;
241 assert!(r.is_ok(), "Should acquire a lock");
242
243 match locker.acquire(&lock_key).await.err() {
244 None => assert!(false, "Should get an error when acquiring another lock"),
245 Some(e) => {
246 assert_eq!(
247 e.downcast_ref::<Error>().unwrap(),
248 &Error::Timeout,
249 "Should get a timed out error when acquiring another lock"
250 )
251 }
252 }
253
254 assert!(r.unwrap().release().is_ok(), "Should release a lock");
255
256 assert!(
257 locker.acquire(&lock_key).await.is_ok(),
258 "Should acquire a lock after another lock is released"
259 );
260 }
261
262 #[tokio::test]
263 async fn test_lock_drop() {
264 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
265 .await
266 .unwrap();
267 let lock_key = "test:test_lock_drop_key";
268
269 {
270 let r = locker.acquire(&lock_key).await;
271 assert!(r.is_ok(), "Should acquire a lock in a scope");
272
273 match locker.acquire(&lock_key).await.err() {
274 None => assert!(
275 false,
276 "Should get an error when acquiring another lock in a scope"
277 ),
278 Some(e) => {
279 assert_eq!(
280 e.downcast_ref::<Error>().unwrap(),
281 &Error::Timeout,
282 "Should get an timed out error when acquiring another lock in a scope"
283 );
284 }
285 }
286 }
287
288 assert!(
289 locker.acquire(&lock_key).await.is_ok(),
290 "Should acquire a lock out of the prev scope"
291 );
292 }
293
294 #[tokio::test]
295 async fn test_lock_passive_release() {
296 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
297 .await
298 .unwrap();
299 let lock_key = "test:test_lock_passive_release_key";
300
301 let opts = Options::new()
302 .ttl(Duration::from_secs(2))
303 .extend(Duration::from_secs(3));
304 let r = locker.acquire_with_options(&opts, &lock_key).await;
305 assert!(
306 r.is_ok(),
307 "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
308 );
309
310 sleep(Duration::from_secs(3)).await;
311 assert!(
312 locker.acquire(&lock_key).await.is_ok(),
313 "Should passively release a lock when the lifetime is reached"
314 );
315 }
316
317 #[tokio::test]
318 async fn test_lock_extend() {
319 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
320 .await
321 .unwrap();
322 let lock_key = "test:test_lock_extend_key";
323 let opts = Options::new()
324 .ttl(Duration::from_secs(3))
325 .extend(Duration::from_secs(1));
326 let r = locker.acquire_with_options(&opts, &lock_key).await;
327 assert!(
328 r.is_ok(),
329 "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime"
330 );
331
332 sleep(Duration::from_secs(5)).await;
333 match locker.acquire(&lock_key).await.err() {
334 None => assert!(false, "Should extend lock lifetime automatically"),
335 Some(e) => {
336 assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
337 }
338 }
339 }
340}