qm_redis/
lock.rs

1use redis::AsyncCommands;
2use redis::RedisError;
3use redis::SetOptions;
4use redis::Value as RedisValue;
5use tokio::time::{sleep, Duration};
6use uuid::Uuid;
7
8pub mod error {
9    #[derive(thiserror::Error, Clone, Debug, PartialEq)]
10    pub enum CanNotGetLockReason {
11        #[error("lock is busy")]
12        LockIsBussy,
13        #[error("lock is still busy with count: {retry_count} and delay {retry_delay}")]
14        LockIsStillBusy { retry_count: u32, retry_delay: u32 },
15    }
16}
17
18#[derive(thiserror::Error, Clone, Debug, PartialEq)]
19pub enum Error {
20    #[error("pool error {0}")]
21    PoolError(String),
22    #[error("redis error {0}")]
23    RedisError(String),
24    #[error("{0}")]
25    CanNotGetLock(error::CanNotGetLockReason),
26}
27
28impl From<RedisError> for Error {
29    fn from(value: RedisError) -> Self {
30        Self::RedisError(value.to_string())
31    }
32}
33
34impl From<deadpool_redis::PoolError> for Error {
35    fn from(value: deadpool_redis::PoolError) -> Self {
36        Self::PoolError(value.to_string())
37    }
38}
39
40const UNLOCK_SCRIPT: &str = r#"
41  if redis.call("get", KEYS[1]) == ARGV[1] then
42    return redis.call("del", KEYS[1])
43  else
44    return 0
45  end
46"#;
47
48#[derive(Debug)]
49pub struct Lock {
50    pub id: String,
51}
52
53pub async fn try_lock<C: AsyncCommands, T: AsRef<str>>(
54    db: &mut C,
55    key: T,
56    ttl: usize,
57) -> Result<Lock, Error> {
58    let id = Uuid::new_v4().to_string();
59    let options = SetOptions::default()
60        .with_expiration(redis::SetExpiry::PX(ttl as u64))
61        .conditional_set(redis::ExistenceCheck::NX);
62    let result = db.set_options(key.as_ref(), &id, options).await?;
63    // let result = redis::Script::new(LOCK_SCRIPT)
64    //     .arg(key.as_ref())
65    //     .arg(&id)
66    //     .arg(ttl)
67    //     .invoke_async(db)
68    //     .await?;
69
70    match result {
71        RedisValue::Okay => Ok(Lock { id }),
72        _ => Err(Error::CanNotGetLock(
73            error::CanNotGetLockReason::LockIsBussy,
74        )),
75    }
76}
77
78pub async fn lock<C: AsyncCommands, T>(
79    db: &mut C,
80    key: T,
81    ttl: usize,
82    retry_count: u32,
83    retry_delay: u32,
84) -> Result<Lock, Error>
85where
86    T: AsRef<str>,
87{
88    for _ in 0..retry_count {
89        let lock_result = try_lock(db, key.as_ref(), ttl).await;
90        match lock_result {
91            Ok(lock) => return Ok(lock),
92            Err(Error::RedisError(error)) => return Err(Error::RedisError(error)),
93            Err(Error::PoolError(error)) => return Err(Error::PoolError(error)),
94            Err(Error::CanNotGetLock(_)) => {
95                sleep(Duration::from_millis(u64::from(retry_delay))).await;
96                continue;
97            }
98        };
99    }
100
101    Err(Error::CanNotGetLock(
102        error::CanNotGetLockReason::LockIsStillBusy {
103            retry_count,
104            retry_delay,
105        },
106    ))
107}
108
109pub async fn unlock<C: AsyncCommands, K, V>(db: &mut C, key: K, lock_id: V) -> Result<i64, Error>
110where
111    K: AsRef<str>,
112    V: AsRef<str>,
113{
114    let result: RedisValue = redis::Script::new(UNLOCK_SCRIPT)
115        .key(key.as_ref())
116        .arg(lock_id.as_ref())
117        .invoke_async(db)
118        .await?;
119
120    match result {
121        RedisValue::Int(remove_count) => Ok(remove_count),
122        _ => Ok(0),
123    }
124}