redis_lock/
sync.rs

1use super::{DEFAULT_EXPIRATION, DEFAULT_SLEEP, DEFAULT_TIMEOUT};
2use redis::{Client, Connection, RedisResult};
3use std::error::Error;
4use std::time::Duration;
5use uuid::Uuid;
6
7/// A distributed mutual exclusion lock backed by Redis.
8///
9/// Supports exclusion based on multiple resources and partial overlaps.
10///
11/// E.g. a lock on resources `["a", "b"]` will block a lock on `["a"]` or `["b", "c"]`.
12pub struct MultiResourceLock {
13    /// The Redis connection.
14    conn: Connection,
15}
16
17impl std::fmt::Debug for MultiResourceLock {
18    #[inline]
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        f.debug_struct("MultiResourceLock")
21            .field("conn", &"..")
22            .finish()
23    }
24}
25
26/// Initializes a Redis instance with the Lua library functions required for locking.
27///
28/// This only needs to be done once per Redis instance, although re-doing it should be fine.
29///
30/// # Errors
31///
32/// - When [`Client::get_connection`] errors.
33/// - When the Lua library functions cannot be loaded into Redis.
34#[inline]
35pub fn setup(client: &Client) -> Result<(), Box<dyn Error>> {
36    // Connect to Redis
37    let mut con = client.get_connection()?;
38
39    // Define your Lua library
40    let lua_library = include_str!("functions.lua");
41
42    // Load the Lua library into Redis
43    let _result: String = redis::cmd("FUNCTION")
44        .arg("LOAD")
45        .arg("REPLACE")
46        .arg(lua_library)
47        .query(&mut con)?;
48
49    Ok(())
50}
51
52impl MultiResourceLock {
53    /// Create a new instance of the lock.
54    ///
55    /// # Errors
56    ///
57    /// When [`Client::get_connection`] errors.
58    #[inline]
59    pub fn new(client: &Client) -> RedisResult<Self> {
60        let conn = client.get_connection()?;
61        Ok(MultiResourceLock { conn })
62    }
63
64    /// Calls [`MultiResourceLock::acquire`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
65    ///
66    /// # Errors
67    ///
68    /// When [`MultiResourceLock::acquire`] errors.
69    #[inline]
70    pub fn acquire_default(&mut self, resources: &[String]) -> RedisResult<Option<String>> {
71        self.acquire(
72            resources,
73            DEFAULT_EXPIRATION,
74            DEFAULT_TIMEOUT,
75            DEFAULT_SLEEP,
76        )
77    }
78
79    /// Attempts to acquire the lock blocking until the lock can be acquired.
80    ///
81    /// Blocks up to `timeout` duration making attempts every `sleep` duration.
82    ///
83    /// Returns `None` when it times out.
84    ///
85    /// # Errors
86    ///
87    /// When [`MultiResourceLock::try_acquire`] errors.
88    #[inline]
89    pub fn acquire(
90        &mut self,
91        resources: &[String],
92        expiration: Duration,
93        timeout: Duration,
94        sleep: Duration,
95    ) -> RedisResult<Option<String>> {
96        let now = std::time::Instant::now();
97        loop {
98            if now.elapsed() > timeout {
99                return Ok(None);
100            }
101            match self.try_acquire(resources, expiration)? {
102                Some(res) => break Ok(Some(res)),
103                None => std::thread::sleep(sleep),
104            }
105        }
106    }
107
108    /// Calls [`MultiResourceLock::try_acquire`] with [`DEFAULT_EXPIRATION`].
109    ///
110    /// # Errors
111    ///
112    /// When [`MultiResourceLock::try_acquire`] errors.
113    #[inline]
114    pub fn try_acquire_default(&mut self, resources: &[String]) -> RedisResult<Option<String>> {
115        self.try_acquire(resources, DEFAULT_EXPIRATION)
116    }
117
118    /// Attempts to acquire the lock returning immediately if it cannot be immediately acquired.
119    ///
120    /// # Errors
121    ///
122    /// - When the `acquire_lock` function is missing from the Redis instance.
123    #[inline]
124    pub fn try_acquire(
125        &mut self,
126        resources: &[String],
127        expiration: Duration,
128    ) -> RedisResult<Option<String>> {
129        let lock_id = Uuid::new_v4().to_string();
130        let mut args = vec![lock_id.clone(), expiration.as_millis().to_string()];
131        args.extend(resources.iter().cloned());
132
133        let result: Option<String> = redis::cmd("FCALL")
134            .arg("acquire_lock")
135            .arg(0i32)
136            .arg(&args)
137            .query(&mut self.conn)?;
138
139        Ok(result)
140    }
141
142    /// Releases a held lock.
143    ///
144    /// # Errors
145    ///
146    /// - When the `release_lock` function is missing from the Redis instance.
147    /// - When `lock_id` does not refer to a held lock.
148    #[inline]
149    pub fn release(&mut self, lock_id: &str) -> RedisResult<usize> {
150        let result: usize = redis::cmd("FCALL")
151            .arg("release_lock")
152            .arg(0i32)
153            .arg(lock_id)
154            .query(&mut self.conn)?;
155
156        Ok(result)
157    }
158
159    /// Calls [`MultiResourceLock::try_lock`] with [`DEFAULT_EXPIRATION`].
160    ///
161    /// # Errors
162    ///
163    /// When [`MultiResourceLock::try_lock`] errors.
164    #[inline]
165    pub fn try_lock_default(
166        &mut self,
167        resources: &[String],
168    ) -> RedisResult<Option<MultiResourceGuard>> {
169        self.try_lock(resources, DEFAULT_EXPIRATION)
170    }
171
172    /// Attempts to acquire the lock returning immediately if it cannot be immediately acquired.
173    ///
174    /// Wraps the result in a guard that releases the lock when dropped.
175    ///
176    /// # Errors
177    ///
178    /// When [`MultiResourceLock::try_acquire`] errors.
179    #[inline]
180    pub fn try_lock(
181        &mut self,
182        resources: &[String],
183        expiration: Duration,
184    ) -> RedisResult<Option<MultiResourceGuard<'_>>> {
185        self.try_acquire(resources, expiration).map(|result| {
186            result.map(|lock_id| MultiResourceGuard {
187                lock: self,
188                lock_id,
189            })
190        })
191    }
192
193    /// Calls [`MultiResourceLock::lock`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
194    ///
195    /// # Errors
196    ///
197    /// When [`MultiResourceLock::lock`] errors.
198    #[inline]
199    pub fn lock_default(
200        &mut self,
201        resources: &[String],
202    ) -> RedisResult<Option<MultiResourceGuard<'_>>> {
203        self.lock(
204            resources,
205            DEFAULT_EXPIRATION,
206            DEFAULT_TIMEOUT,
207            DEFAULT_SLEEP,
208        )
209    }
210
211    /// Attempts to acquire the lock blocking until the lock can be acquired.
212    ///
213    /// Blocks up to `timeout` duration making attempts every `sleep` duration.
214    ///
215    /// Returns `None` when it times out.
216    ///
217    /// Wraps the result in a guard that releases the lock when dropped.
218    ///
219    /// # Errors
220    ///
221    /// When [`MultiResourceLock::acquire`] errors.
222    #[inline]
223    pub fn lock(
224        &mut self,
225        resources: &[String],
226        expiration: Duration,
227        timeout: Duration,
228        sleep: Duration,
229    ) -> RedisResult<Option<MultiResourceGuard<'_>>> {
230        self.acquire(resources, expiration, timeout, sleep)
231            .map(|result| {
232                result.map(|lock_id| MultiResourceGuard {
233                    lock: self,
234                    lock_id,
235                })
236            })
237    }
238}
239
240/// A guard that releases the lock when it is dropped.
241#[derive(Debug)]
242pub struct MultiResourceGuard<'a> {
243    /// The lock instance.
244    lock: &'a mut MultiResourceLock,
245    /// The lock identifier.
246    lock_id: String,
247}
248
249#[expect(
250    clippy::unwrap_used,
251    reason = "You can't propagate errors in a `Drop` implementation."
252)]
253impl Drop for MultiResourceGuard<'_> {
254    #[inline]
255    fn drop(&mut self) {
256        self.lock.release(&self.lock_id).unwrap();
257    }
258}