qm_redis/
lock.rs

1use redis::AsyncCommands;
2use redis::RedisError;
3use redis::Value as RedisValue;
4use tokio::time::{sleep, Duration};
5use uuid::Uuid;
6
7pub mod error {
8    #[derive(thiserror::Error, Clone, Debug, PartialEq)]
9    pub enum CanNotGetLockReason {
10        #[error("lock is busy")]
11        LockIsBussy,
12        #[error("lock is still busy with count: {retry_count} and delay {retry_delay}")]
13        LockIsStillBusy { retry_count: u32, retry_delay: u32 },
14    }
15}
16
17#[derive(thiserror::Error, Clone, Debug, PartialEq)]
18pub enum Error {
19    #[error("pool error {0}")]
20    PoolError(String),
21    #[error("redis error {0}")]
22    RedisError(String),
23    #[error("{0}")]
24    CanNotGetLock(error::CanNotGetLockReason),
25}
26
27impl From<RedisError> for Error {
28    fn from(value: RedisError) -> Self {
29        Self::RedisError(value.to_string())
30    }
31}
32
33impl From<deadpool_redis::PoolError> for Error {
34    fn from(value: deadpool_redis::PoolError) -> Self {
35        Self::PoolError(value.to_string())
36    }
37}
38
39const LOCK_SCRIPT: &str = "return redis.call('set', ARGV[1], ARGV[2], 'px', ARGV[3], 'nx')";
40const UNLOCK_SCRIPT: &str = r#"
41  if redis.call("get", KEYS[1]) == ARGV[1] then
42    return redis.call("del", KEYS[1])
43  else
44    return 0
45  end
46"#;
47
48#[derive(Debug)]
49pub struct Lock {
50    pub id: String,
51}
52
53pub async fn try_lock<C: AsyncCommands, T: AsRef<str>>(
54    db: &mut C,
55    key: T,
56    ttl: usize,
57) -> Result<Lock, Error> {
58    let id = Uuid::new_v4().to_string();
59    let result = redis::Script::new(LOCK_SCRIPT)
60        .arg(key.as_ref())
61        .arg(&id)
62        .arg(ttl)
63        .invoke_async(db)
64        .await?;
65
66    match result {
67        RedisValue::Okay => Ok(Lock { id }),
68        _ => Err(Error::CanNotGetLock(
69            error::CanNotGetLockReason::LockIsBussy,
70        )),
71    }
72}
73
74pub async fn lock<C: AsyncCommands, T>(
75    db: &mut C,
76    key: T,
77    ttl: usize,
78    retry_count: u32,
79    retry_delay: u32,
80) -> Result<Lock, Error>
81where
82    T: AsRef<str>,
83{
84    for _ in 0..retry_count {
85        let lock_result = try_lock(db, key.as_ref(), ttl).await;
86        match lock_result {
87            Ok(lock) => return Ok(lock),
88            Err(Error::RedisError(error)) => return Err(Error::RedisError(error)),
89            Err(Error::PoolError(error)) => return Err(Error::PoolError(error)),
90            Err(Error::CanNotGetLock(_)) => {
91                sleep(Duration::from_millis(u64::from(retry_delay))).await;
92                continue;
93            }
94        };
95    }
96
97    Err(Error::CanNotGetLock(
98        error::CanNotGetLockReason::LockIsStillBusy {
99            retry_count,
100            retry_delay,
101        },
102    ))
103}
104
105pub async fn unlock<C: AsyncCommands, K, V>(db: &mut C, key: K, lock_id: V) -> Result<i64, Error>
106where
107    K: AsRef<str>,
108    V: AsRef<str>,
109{
110    let result: RedisValue = redis::Script::new(UNLOCK_SCRIPT)
111        .key(key.as_ref())
112        .arg(lock_id.as_ref())
113        .invoke_async(db)
114        .await?;
115
116    match result {
117        RedisValue::Int(remove_count) => Ok(remove_count),
118        _ => Ok(0),
119    }
120}