1pub mod error;
107pub mod execs;
108pub mod options;
109
110use crate::error::Error;
111use crate::error::Error::IdNotFound;
112use crate::execs::*;
113use crate::options::Options;
114
115use anyhow::Result;
116use redis::aio::{ConnectionManager, ConnectionManagerConfig};
117use tokio::sync::oneshot;
118use tokio::time::sleep;
119use tokio::{select, spawn};
120
121#[derive(Clone)]
122pub struct Locker {
123 client: redis::Client,
124 conn_manager: ConnectionManager,
125}
126
127impl Locker {
128 pub async fn from_redis_url(url: &str) -> Result<Self> {
129 let client = redis::Client::open(url)?;
130 let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
131 let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
132 Ok(Self {
133 client,
134 conn_manager: async_conn_manager,
135 })
136 }
137
138 pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
139 self.acquire_with_options(&Options::default(), lock_key)
140 .await
141 }
142
143 pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
144 let lock_id = lock(
145 &mut self.conn_manager,
146 lock_key,
147 opts.lifetime,
148 opts.retry_interval,
149 opts.retry_timeout,
150 )
151 .await?;
152
153 let mut conn = self.conn_manager.clone();
154 let opts = opts.clone();
155 let lock_key_c1 = lock_key.to_owned();
156 let lock_id_c1 = lock_id.clone();
157 let (stop_tx, mut stop_rx) = oneshot::channel();
158
159 spawn(async move {
160 loop {
161 select! {
162 _ = &mut stop_rx => break,
163 _ = sleep(opts.extend_interval) => {
164 if let Err(e) = extend(
165 &mut conn,
166 &lock_key_c1,
167 &lock_id_c1,
168 opts.lifetime,
169 )
170 .await
171 {
172 if let Some(e) = e.downcast_ref::<Error>() {
173 if matches!(e, IdNotFound) {
174 break;
175 }
176 }
177 }
178 },
179 }
180 }
181 });
182
183 let cli = self.client.clone();
184 let lock_key_c2 = lock_key.to_owned();
185 let lock_id_c2 = lock_id.clone();
186
187 Ok(Lock {
188 release_fn: Some(Box::new(move || -> Result<()> {
189 let _ = stop_tx.send(());
190 let mut conn = cli.get_connection()?;
191 unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
192 })),
193 })
194 }
195}
196
197pub struct Lock {
198 pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
199}
200
201impl Lock {
202 pub fn release(mut self) -> Result<()> {
203 self.call_release()
204 }
205
206 fn call_release(&mut self) -> Result<()> {
207 match self.release_fn.take() {
208 Some(release_fn) => release_fn(),
209 None => Ok(()),
210 }
211 }
212}
213
214impl Drop for Lock {
215 fn drop(&mut self) {
216 let _ = self.call_release();
217 }
218}
219
220#[cfg(test)]
221mod test {
222 use super::*;
223 use std::time::Duration;
224
225 #[tokio::test]
226 async fn test_lock_exclusive() {
227 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
228 .await
229 .unwrap();
230 let lock_key = String::from("test:test_lock_exclusive_key");
231
232 let r = locker.acquire(&lock_key).await;
233 assert!(r.is_ok(), "Should acquire a lock");
234
235 match locker.acquire(&lock_key).await.err() {
236 None => assert!(false, "Should get an error when acquiring another lock"),
237 Some(e) => {
238 assert_eq!(
239 e.downcast_ref::<Error>().unwrap(),
240 &Error::Timeout,
241 "Should get a timed out error when acquiring another lock"
242 )
243 }
244 }
245
246 assert!(r.unwrap().release().is_ok(), "Should release a lock");
247
248 assert!(
249 locker.acquire(&lock_key).await.is_ok(),
250 "Should acquire a lock after another lock is released"
251 );
252 }
253
254 #[tokio::test]
255 async fn test_lock_drop() {
256 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
257 .await
258 .unwrap();
259 let lock_key = "test:test_lock_drop_key";
260
261 {
262 let r = locker.acquire(&lock_key).await;
263 assert!(r.is_ok(), "Should acquire a lock in a scope");
264
265 match locker.acquire(&lock_key).await.err() {
266 None => assert!(
267 false,
268 "Should get an error when acquiring another lock in a scope"
269 ),
270 Some(e) => {
271 assert_eq!(
272 e.downcast_ref::<Error>().unwrap(),
273 &Error::Timeout,
274 "Should get an timed out error when acquiring another lock in a scope"
275 );
276 }
277 }
278 }
279
280 assert!(
281 locker.acquire(&lock_key).await.is_ok(),
282 "Should acquire a lock out of the prev scope"
283 );
284 }
285
286 #[tokio::test]
287 async fn test_lock_passive_release() {
288 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
289 .await
290 .unwrap();
291 let lock_key = "test:test_lock_passive_release_key";
292
293 let opts = Options::new()
294 .lifetime(Duration::from_secs(2))
295 .extend_interval(Duration::from_secs(3));
296 let r = locker.acquire_with_options(&opts, &lock_key).await;
297 assert!(
298 r.is_ok(),
299 "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
300 );
301
302 sleep(Duration::from_secs(3)).await;
303 assert!(
304 locker.acquire(&lock_key).await.is_ok(),
305 "Should passively release a lock when the lifetime is reached"
306 );
307 }
308
309 #[tokio::test]
310 async fn test_lock_extend() {
311 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
312 .await
313 .unwrap();
314 let lock_key = "test:test_lock_extend_key";
315 let opts = Options::new()
316 .lifetime(Duration::from_secs(3))
317 .extend_interval(Duration::from_secs(2));
318 let r = locker.acquire_with_options(&opts, &lock_key).await;
319 assert!(
320 r.is_ok(),
321 "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime."
322 );
323
324 sleep(Duration::from_secs(5)).await;
325 match locker.acquire(&lock_key).await.err() {
326 None => assert!(false, "Should expand lock lifetime automatically."),
327 Some(e) => {
328 assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
329 }
330 }
331 }
332}