async_redis_lock/
execs.rs

1use crate::error::Error::{IdNotFound, Timeout};
2use redis::Script;
3use redis::aio::ConnectionLike as AsyncConnectionLike;
4use redis::{ConnectionLike as SyncConnectionLike, ToRedisArgs};
5use std::time::SystemTime;
6use tokio::time::{Duration, sleep};
7
8const OK_RET: i32 = 1;
9
10const LOCK: &str = r#"
11for i = 1, #KEYS do
12    if redis.call("EXISTS", KEYS[i]) > 0  then
13        return 0
14    end
15end
16for i = 1, #KEYS do
17    redis.call("SET", KEYS[i], ARGV[1], "PX", ARGV[2])
18end
19return 1
20"#;
21
22const EXTEND: &str = r#"
23for i = 1, #KEYS do
24    if redis.call("GET", KEYS[i]) ~= ARGV[1] then
25        return 0
26    end
27end
28for i = 1, #KEYS do
29    redis.call("PEXPIRE", KEYS[i], ARGV[2])
30end
31return 1
32"#;
33
34const UNLOCK: &str = r#"
35for i = 1, #KEYS do
36    if redis.call("GET", KEYS[i]) ~= ARGV[1] then
37        return 0
38    end
39end
40for i = 1, #KEYS do
41    redis.call("DEL", KEYS[i])
42end
43return 1
44"#;
45
46pub async fn lock<T: AsyncConnectionLike, K: ToRedisArgs>(
47    conn: &mut T,
48    lock_keys: K,
49    duration: Duration,
50    retry_interval: Duration,
51    retry_timeout: Option<Duration>,
52) -> anyhow::Result<String> {
53    let lock_id = uuid::Uuid::new_v4().to_string();
54    let duration = duration.as_millis() as u64;
55    let deadline = retry_timeout.map(|timeout| SystemTime::now() + timeout);
56    let script = Script::new(LOCK);
57    let mut invoke = script.prepare_invoke();
58    lock_keys.to_redis_args().iter().for_each(|key| {
59        invoke.key(key);
60        ()
61    });
62    invoke.arg(&lock_id);
63    invoke.arg(&duration);
64    loop {
65        if let OK_RET = invoke.invoke_async(conn).await? {
66            break Ok(lock_id);
67        }
68        if deadline.is_some_and(|deadline| SystemTime::now() > deadline) {
69            break Err(Timeout.into());
70        }
71        sleep(retry_interval).await;
72    }
73}
74
75pub async fn extend<T: AsyncConnectionLike, K: ToRedisArgs>(
76    conn: &mut T,
77    lock_keys: K,
78    lock_id: &str,
79    duration: Duration,
80) -> anyhow::Result<()> {
81    let duration = duration.as_millis() as u64;
82    let script = Script::new(EXTEND);
83    let mut invoke = script.prepare_invoke();
84    lock_keys.to_redis_args().iter().for_each(|key| {
85        invoke.key(key);
86        ()
87    });
88    invoke.arg(lock_id);
89    invoke.arg(duration);
90    match invoke.invoke_async(conn).await? {
91        OK_RET => Ok(()),
92        _ => Err(IdNotFound.into()),
93    }
94}
95
96pub async fn unlock<T: AsyncConnectionLike, K: ToRedisArgs>(
97    conn: &mut T,
98    lock_keys: K,
99    lock_id: &str,
100) -> anyhow::Result<()> {
101    let script = Script::new(UNLOCK);
102    let mut invoke = script.prepare_invoke();
103    lock_keys.to_redis_args().iter().for_each(|key| {
104        invoke.key(key);
105        ()
106    });
107    invoke.arg(lock_id);
108    match invoke.invoke_async(conn).await? {
109        1 => Ok(()),
110        _ => Err(IdNotFound.into()),
111    }
112}
113
114pub fn unlock_sync<T: SyncConnectionLike, K: ToRedisArgs>(
115    conn: &mut T,
116    lock_keys: K,
117    lock_id: &str,
118) -> anyhow::Result<()> {
119    let script = Script::new(UNLOCK);
120    let mut invoke = script.prepare_invoke();
121    lock_keys.to_redis_args().iter().for_each(|key| {
122        invoke.key(key);
123        ()
124    });
125    invoke.arg(lock_id);
126    match invoke.invoke(conn)? {
127        1 => Ok(()),
128        _ => Err(IdNotFound.into()),
129    }
130}