async_redis_lock/
execs.rs1use crate::error::Error::{IdNotFound, Timeout};
2use redis::Script;
3use redis::aio::ConnectionLike as AsyncConnectionLike;
4use redis::{ConnectionLike as SyncConnectionLike, ToRedisArgs};
5use std::time::SystemTime;
6use tokio::time::{Duration, sleep};
7
8const OK_RET: i32 = 1;
9
10const LOCK: &str = r#"
11for i = 1, #KEYS do
12 if redis.call("EXISTS", KEYS[i]) > 0 then
13 return 0
14 end
15end
16for i = 1, #KEYS do
17 redis.call("SET", KEYS[i], ARGV[1], "PX", ARGV[2])
18end
19return 1
20"#;
21
22const EXTEND: &str = r#"
23for i = 1, #KEYS do
24 if redis.call("GET", KEYS[i]) ~= ARGV[1] then
25 return 0
26 end
27end
28for i = 1, #KEYS do
29 redis.call("PEXPIRE", KEYS[i], ARGV[2])
30end
31return 1
32"#;
33
34const UNLOCK: &str = r#"
35for i = 1, #KEYS do
36 if redis.call("GET", KEYS[i]) ~= ARGV[1] then
37 return 0
38 end
39end
40for i = 1, #KEYS do
41 redis.call("DEL", KEYS[i])
42end
43return 1
44"#;
45
46pub async fn lock<T: AsyncConnectionLike, K: ToRedisArgs>(
47 conn: &mut T,
48 lock_keys: K,
49 duration: Duration,
50 retry_interval: Duration,
51 retry_timeout: Option<Duration>,
52) -> anyhow::Result<String> {
53 let lock_id = uuid::Uuid::new_v4().to_string();
54 let duration = duration.as_millis() as u64;
55 let deadline = retry_timeout.map(|timeout| SystemTime::now() + timeout);
56 let script = Script::new(LOCK);
57 let mut invoke = script.prepare_invoke();
58 lock_keys.to_redis_args().iter().for_each(|key| {
59 invoke.key(key);
60 ()
61 });
62 invoke.arg(&lock_id);
63 invoke.arg(&duration);
64 loop {
65 if let OK_RET = invoke.invoke_async(conn).await? {
66 break Ok(lock_id);
67 }
68 if deadline.is_some_and(|deadline| SystemTime::now() > deadline) {
69 break Err(Timeout.into());
70 }
71 sleep(retry_interval).await;
72 }
73}
74
75pub async fn extend<T: AsyncConnectionLike, K: ToRedisArgs>(
76 conn: &mut T,
77 lock_keys: K,
78 lock_id: &str,
79 duration: Duration,
80) -> anyhow::Result<()> {
81 let duration = duration.as_millis() as u64;
82 let script = Script::new(EXTEND);
83 let mut invoke = script.prepare_invoke();
84 lock_keys.to_redis_args().iter().for_each(|key| {
85 invoke.key(key);
86 ()
87 });
88 invoke.arg(lock_id);
89 invoke.arg(duration);
90 match invoke.invoke_async(conn).await? {
91 OK_RET => Ok(()),
92 _ => Err(IdNotFound.into()),
93 }
94}
95
96pub async fn unlock<T: AsyncConnectionLike, K: ToRedisArgs>(
97 conn: &mut T,
98 lock_keys: K,
99 lock_id: &str,
100) -> anyhow::Result<()> {
101 let script = Script::new(UNLOCK);
102 let mut invoke = script.prepare_invoke();
103 lock_keys.to_redis_args().iter().for_each(|key| {
104 invoke.key(key);
105 ()
106 });
107 invoke.arg(lock_id);
108 match invoke.invoke_async(conn).await? {
109 1 => Ok(()),
110 _ => Err(IdNotFound.into()),
111 }
112}
113
114pub fn unlock_sync<T: SyncConnectionLike, K: ToRedisArgs>(
115 conn: &mut T,
116 lock_keys: K,
117 lock_id: &str,
118) -> anyhow::Result<()> {
119 let script = Script::new(UNLOCK);
120 let mut invoke = script.prepare_invoke();
121 lock_keys.to_redis_args().iter().for_each(|key| {
122 invoke.key(key);
123 ()
124 });
125 invoke.arg(lock_id);
126 match invoke.invoke(conn)? {
127 1 => Ok(()),
128 _ => Err(IdNotFound.into()),
129 }
130}