async_redis_lock/
execs.rs

1use crate::error::Error::{IdNotFound, Timeout};
2use redis::ConnectionLike as SyncConnectionLike;
3use redis::Script;
4use redis::aio::ConnectionLike as AsyncConnectionLike;
5use std::time::SystemTime;
6use tokio::time::{Duration, sleep};
7
8const OK_RET: i32 = 1;
9
10const LOCK: &str = r#"if redis.call("EXISTS", KEYS[1]) > 0  then
11    return 0
12end
13redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
14return 1
15"#;
16
17const EXTEND: &str = r#"
18if redis.call("GET", KEYS[1]) ~= ARGV[1] then
19    return 0
20end
21return redis.call("PEXPIRE", KEYS[1], ARGV[2])
22"#;
23
24const UNLOCK: &str = r#"
25if redis.call("GET", KEYS[1]) ~= ARGV[1] then
26    return 0
27end
28return redis.call("DEL", KEYS[1])
29"#;
30
31pub async fn lock<T: AsyncConnectionLike>(
32    conn: &mut T,
33    lock_key: &str,
34    duration: Duration,
35    retry_interval: Duration,
36    retry_timeout: Option<Duration>,
37) -> anyhow::Result<String> {
38    let lock_id = uuid::Uuid::new_v4().to_string();
39    let script = Script::new(LOCK);
40    let duration = duration.as_millis() as u64;
41    let deadline = retry_timeout.map(|timeout| SystemTime::now() + timeout);
42
43    loop {
44        if let OK_RET = script
45            .key(lock_key)
46            .arg(&lock_id)
47            .arg(duration)
48            .invoke_async(conn)
49            .await?
50        {
51            break Ok(lock_id);
52        }
53
54        if let Some(deadline) = deadline
55            && SystemTime::now() >= deadline
56        {
57            break Err(Timeout.into());
58        }
59
60        sleep(retry_interval).await;
61    }
62}
63
64pub async fn extend<T: AsyncConnectionLike>(
65    conn: &mut T,
66    lock_key: &str,
67    lock_id: &str,
68    duration: Duration,
69) -> anyhow::Result<()> {
70    let script = Script::new(EXTEND);
71    let duration = duration.as_millis() as u64;
72    match script
73        .key(lock_key)
74        .arg(lock_id)
75        .arg(duration)
76        .invoke_async(conn)
77        .await?
78    {
79        OK_RET => Ok(()),
80        _ => Err(IdNotFound.into()),
81    }
82}
83
84pub async fn unlock<T: AsyncConnectionLike>(
85    conn: &mut T,
86    lock_key: &str,
87    lock_id: &str,
88) -> anyhow::Result<()> {
89    let script = Script::new(EXTEND);
90    match script.key(lock_key).arg(lock_id).invoke_async(conn).await? {
91        1 => Ok(()),
92        _ => Err(IdNotFound.into()),
93    }
94}
95
96pub fn unlock_sync<T: SyncConnectionLike>(
97    conn: &mut T,
98    lock_key: &str,
99    lock_id: &str,
100) -> anyhow::Result<()> {
101    let script = Script::new(UNLOCK);
102    match script.key(lock_key).arg(lock_id).invoke(conn)? {
103        1 => Ok(()),
104        _ => Err(IdNotFound.into()),
105    }
106}