1use redis::AsyncCommands;
2use redis::RedisError;
3use redis::SetOptions;
4use redis::Value as RedisValue;
5use tokio::time::{sleep, Duration};
6use uuid::Uuid;
7
8pub mod error {
9 #[derive(thiserror::Error, Clone, Debug, PartialEq)]
10 pub enum CanNotGetLockReason {
11 #[error("lock is busy")]
12 LockIsBussy,
13 #[error("lock is still busy with count: {retry_count} and delay {retry_delay}")]
14 LockIsStillBusy { retry_count: u32, retry_delay: u32 },
15 }
16}
17
18#[derive(thiserror::Error, Clone, Debug, PartialEq)]
19pub enum Error {
20 #[error("pool error {0}")]
21 PoolError(String),
22 #[error("redis error {0}")]
23 RedisError(String),
24 #[error("{0}")]
25 CanNotGetLock(error::CanNotGetLockReason),
26}
27
28impl From<RedisError> for Error {
29 fn from(value: RedisError) -> Self {
30 Self::RedisError(value.to_string())
31 }
32}
33
34impl From<deadpool_redis::PoolError> for Error {
35 fn from(value: deadpool_redis::PoolError) -> Self {
36 Self::PoolError(value.to_string())
37 }
38}
39
40const UNLOCK_SCRIPT: &str = r#"
41 if redis.call("get", KEYS[1]) == ARGV[1] then
42 return redis.call("del", KEYS[1])
43 else
44 return 0
45 end
46"#;
47
48#[derive(Debug)]
49pub struct Lock {
50 pub id: String,
51}
52
53pub async fn try_lock<C: AsyncCommands, T: AsRef<str>>(
54 db: &mut C,
55 key: T,
56 ttl: usize,
57) -> Result<Lock, Error> {
58 let id = Uuid::new_v4().to_string();
59 let options = SetOptions::default()
60 .with_expiration(redis::SetExpiry::PX(ttl as u64))
61 .conditional_set(redis::ExistenceCheck::NX);
62 let result = db.set_options(key.as_ref(), &id, options).await?;
63 match result {
71 RedisValue::Okay => Ok(Lock { id }),
72 _ => Err(Error::CanNotGetLock(
73 error::CanNotGetLockReason::LockIsBussy,
74 )),
75 }
76}
77
78pub async fn lock<C: AsyncCommands, T>(
79 db: &mut C,
80 key: T,
81 ttl: usize,
82 retry_count: u32,
83 retry_delay: u32,
84) -> Result<Lock, Error>
85where
86 T: AsRef<str>,
87{
88 for _ in 0..retry_count {
89 let lock_result = try_lock(db, key.as_ref(), ttl).await;
90 match lock_result {
91 Ok(lock) => return Ok(lock),
92 Err(Error::RedisError(error)) => return Err(Error::RedisError(error)),
93 Err(Error::PoolError(error)) => return Err(Error::PoolError(error)),
94 Err(Error::CanNotGetLock(_)) => {
95 sleep(Duration::from_millis(u64::from(retry_delay))).await;
96 continue;
97 }
98 };
99 }
100
101 Err(Error::CanNotGetLock(
102 error::CanNotGetLockReason::LockIsStillBusy {
103 retry_count,
104 retry_delay,
105 },
106 ))
107}
108
109pub async fn unlock<C: AsyncCommands, K, V>(db: &mut C, key: K, lock_id: V) -> Result<i64, Error>
110where
111 K: AsRef<str>,
112 V: AsRef<str>,
113{
114 let result: RedisValue = redis::Script::new(UNLOCK_SCRIPT)
115 .key(key.as_ref())
116 .arg(lock_id.as_ref())
117 .invoke_async(db)
118 .await?;
119
120 match result {
121 RedisValue::Int(remove_count) => Ok(remove_count),
122 _ => Ok(0),
123 }
124}