async_redis_lock/
execs.rs1use crate::error::Error::{IdNotFound, Timeout};
2use redis::ConnectionLike as SyncConnectionLike;
3use redis::Script;
4use redis::aio::ConnectionLike as AsyncConnectionLike;
5use std::time::SystemTime;
6use tokio::time::{Duration, sleep};
7
8const OK_RET: i32 = 1;
9
10const LOCK: &str = r#"if redis.call("EXISTS", KEYS[1]) > 0 then
11 return 0
12end
13redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
14return 1
15"#;
16
17const EXTEND: &str = r#"
18if redis.call("GET", KEYS[1]) ~= ARGV[1] then
19 return 0
20end
21return redis.call("PEXPIRE", KEYS[1], ARGV[2])
22"#;
23
24const UNLOCK: &str = r#"
25if redis.call("GET", KEYS[1]) ~= ARGV[1] then
26 return 0
27end
28return redis.call("DEL", KEYS[1])
29"#;
30
31pub async fn lock<T: AsyncConnectionLike>(
32 conn: &mut T,
33 lock_key: &str,
34 duration: Duration,
35 retry_interval: Duration,
36 retry_timeout: Option<Duration>,
37) -> anyhow::Result<String> {
38 let lock_id = uuid::Uuid::new_v4().to_string();
39 let script = Script::new(LOCK);
40 let duration = duration.as_millis() as u64;
41 let deadline = retry_timeout.map(|timeout| SystemTime::now() + timeout);
42
43 loop {
44 if let OK_RET = script
45 .key(lock_key)
46 .arg(&lock_id)
47 .arg(duration)
48 .invoke_async(conn)
49 .await?
50 {
51 break Ok(lock_id);
52 }
53
54 if let Some(deadline) = deadline
55 && SystemTime::now() >= deadline
56 {
57 break Err(Timeout.into());
58 }
59
60 sleep(retry_interval).await;
61 }
62}
63
64pub async fn extend<T: AsyncConnectionLike>(
65 conn: &mut T,
66 lock_key: &str,
67 lock_id: &str,
68 duration: Duration,
69) -> anyhow::Result<()> {
70 let script = Script::new(EXTEND);
71 let duration = duration.as_millis() as u64;
72 match script
73 .key(lock_key)
74 .arg(lock_id)
75 .arg(duration)
76 .invoke_async(conn)
77 .await?
78 {
79 OK_RET => Ok(()),
80 _ => Err(IdNotFound.into()),
81 }
82}
83
84pub async fn unlock<T: AsyncConnectionLike>(
85 conn: &mut T,
86 lock_key: &str,
87 lock_id: &str,
88) -> anyhow::Result<()> {
89 let script = Script::new(EXTEND);
90 match script.key(lock_key).arg(lock_id).invoke_async(conn).await? {
91 1 => Ok(()),
92 _ => Err(IdNotFound.into()),
93 }
94}
95
96pub fn unlock_sync<T: SyncConnectionLike>(
97 conn: &mut T,
98 lock_key: &str,
99 lock_id: &str,
100) -> anyhow::Result<()> {
101 let script = Script::new(UNLOCK);
102 match script.key(lock_key).arg(lock_id).invoke(conn)? {
103 1 => Ok(()),
104 _ => Err(IdNotFound.into()),
105 }
106}