1pub mod error;
117pub mod execs;
118pub mod options;
119
120use crate::error::Error;
121use crate::error::Error::IdNotFound;
122use crate::execs::*;
123use crate::options::Options;
124use std::time::SystemTime;
125
126use anyhow::Result;
127use redis::aio::{ConnectionManager, ConnectionManagerConfig};
128use tokio::sync::oneshot;
129use tokio::time::sleep;
130use tokio::{select, spawn};
131
132#[derive(Clone)]
133pub struct Locker {
134 client: redis::Client,
135 conn_manager: ConnectionManager,
136}
137
138impl Locker {
139 pub async fn from_redis_url(url: &str) -> Result<Self> {
140 let client = redis::Client::open(url)?;
141 let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
142 let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
143 Ok(Self {
144 client,
145 conn_manager: async_conn_manager,
146 })
147 }
148
149 pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
150 self.acquire_with_options(&Options::default(), lock_key)
151 .await
152 }
153
154 pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
155 let lock_id = lock(
156 &mut self.conn_manager,
157 lock_key,
158 opts.ttl,
159 opts.retry,
160 opts.timeout,
161 )
162 .await?;
163
164 println!(
165 "lock: {}",
166 SystemTime::now()
167 .duration_since(SystemTime::UNIX_EPOCH)
168 .unwrap()
169 .as_secs()
170 );
171
172 let mut conn = self.conn_manager.clone();
173 let opts = opts.clone();
174 let lock_key_c1 = lock_key.to_owned();
175 let lock_id_c1 = lock_id.clone();
176 let (stop_tx, mut stop_rx) = oneshot::channel();
177
178 spawn(async move {
179 loop {
180 select! {
181 _ = &mut stop_rx => break,
182 _ = sleep(opts.extend) => {
183 println!("extend: {}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
184 if let Err(e) = extend(
185 &mut conn,
186 &lock_key_c1,
187 &lock_id_c1,
188 opts.ttl,
189 )
190 .await
191 {
192 if let Some(e) = e.downcast_ref::<Error>() && matches!(e, IdNotFound) {
193 println!("extend failed: {}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
194 break;
195 }
196 } else {
197 println!("extend ok: {}", SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs());
198 }
199 },
200 }
201 }
202 println!(
203 "extend exit: {}",
204 SystemTime::now()
205 .duration_since(SystemTime::UNIX_EPOCH)
206 .unwrap()
207 .as_secs()
208 );
209 });
210
211 let cli = self.client.clone();
212 let lock_key_c2 = lock_key.to_owned();
213 let lock_id_c2 = lock_id.clone();
214
215 Ok(Lock {
216 release_fn: Some(Box::new(move || -> Result<()> {
217 let _ = stop_tx.send(());
218 let mut conn = cli.get_connection()?;
219 unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
220 })),
221 })
222 }
223}
224
225pub struct Lock {
226 pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
227}
228
229impl Lock {
230 pub fn release(mut self) -> Result<()> {
231 self.call_release()
232 }
233
234 fn call_release(&mut self) -> Result<()> {
235 match self.release_fn.take() {
236 Some(release_fn) => release_fn(),
237 None => Ok(()),
238 }
239 }
240}
241
242impl Drop for Lock {
243 fn drop(&mut self) {
244 let _ = self.call_release();
245 }
246}
247
248#[cfg(test)]
249mod test {
250 use super::*;
251 use std::time::Duration;
252
253 #[tokio::test]
254 async fn test_lock_exclusive() {
255 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
256 .await
257 .unwrap();
258 let lock_key = String::from("test:test_lock_exclusive_key");
259
260 let r = locker.acquire(&lock_key).await;
261 assert!(r.is_ok(), "Should acquire a lock");
262
263 match locker.acquire(&lock_key).await.err() {
264 None => assert!(false, "Should get an error when acquiring another lock"),
265 Some(e) => {
266 assert_eq!(
267 e.downcast_ref::<Error>().unwrap(),
268 &Error::Timeout,
269 "Should get a timed out error when acquiring another lock"
270 )
271 }
272 }
273
274 assert!(r.unwrap().release().is_ok(), "Should release a lock");
275
276 assert!(
277 locker.acquire(&lock_key).await.is_ok(),
278 "Should acquire a lock after another lock is released"
279 );
280 }
281
282 #[tokio::test]
283 async fn test_lock_drop() {
284 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
285 .await
286 .unwrap();
287 let lock_key = "test:test_lock_drop_key";
288
289 {
290 let r = locker.acquire(&lock_key).await;
291 assert!(r.is_ok(), "Should acquire a lock in a scope");
292
293 match locker.acquire(&lock_key).await.err() {
294 None => assert!(
295 false,
296 "Should get an error when acquiring another lock in a scope"
297 ),
298 Some(e) => {
299 assert_eq!(
300 e.downcast_ref::<Error>().unwrap(),
301 &Error::Timeout,
302 "Should get an timed out error when acquiring another lock in a scope"
303 );
304 }
305 }
306 }
307
308 assert!(
309 locker.acquire(&lock_key).await.is_ok(),
310 "Should acquire a lock out of the prev scope"
311 );
312 }
313
314 #[tokio::test]
315 async fn test_lock_passive_release() {
316 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
317 .await
318 .unwrap();
319 let lock_key = "test:test_lock_passive_release_key";
320
321 let opts = Options::new()
322 .ttl(Duration::from_secs(2))
323 .extend(Duration::from_secs(3));
324 let r = locker.acquire_with_options(&opts, &lock_key).await;
325 assert!(
326 r.is_ok(),
327 "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
328 );
329
330 sleep(Duration::from_secs(3)).await;
331 assert!(
332 locker.acquire(&lock_key).await.is_ok(),
333 "Should passively release a lock when the lifetime is reached"
334 );
335 }
336
337 #[tokio::test]
338 async fn test_lock_extend() {
339 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
340 .await
341 .unwrap();
342 let lock_key = "test:test_lock_extend_key";
343 let opts = Options::new()
344 .ttl(Duration::from_secs(3))
345 .extend(Duration::from_secs(1));
346 let r = locker.acquire_with_options(&opts, &lock_key).await;
347 assert!(
348 r.is_ok(),
349 "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime"
350 );
351
352 sleep(Duration::from_secs(8)).await;
353 match locker.acquire(&lock_key).await.err() {
354 None => assert!(false, "Should extend lock lifetime automatically"),
355 Some(e) => {
356 assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
357 }
358 }
359 }
360}