redis_lock/sync.rs
1use super::{DEFAULT_EXPIRATION, DEFAULT_SLEEP, DEFAULT_TIMEOUT};
2use redis::{Client, Connection, RedisResult};
3use std::error::Error;
4use std::time::Duration;
5use uuid::Uuid;
6
7/// A distributed mutual exclusion lock backed by Redis.
8///
9/// Supports exclusion based on multiple resources and partial overlaps.
10///
11/// E.g. a lock on resources `["a", "b"]` will block a lock on `["a"]` or `["b", "c"]`.
12pub struct MultiResourceLock {
13 /// The Redis connection.
14 conn: Connection,
15}
16
17impl std::fmt::Debug for MultiResourceLock {
18 #[inline]
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 f.debug_struct("MultiResourceLock")
21 .field("conn", &"..")
22 .finish()
23 }
24}
25
26/// Initializes a Redis instance with the Lua library functions required for locking.
27///
28/// This only needs to be done once per Redis instance, although re-doing it should be fine.
29///
30/// # Errors
31///
32/// - When [`Client::get_connection`] errors.
33/// - When the Lua library functions cannot be loaded into Redis.
34#[inline]
35pub fn setup(client: &Client) -> Result<(), Box<dyn Error>> {
36 // Connect to Redis
37 let mut con = client.get_connection()?;
38
39 // Define your Lua library
40 let lua_library = include_str!("functions.lua");
41
42 // Load the Lua library into Redis
43 let _result: String = redis::cmd("FUNCTION")
44 .arg("LOAD")
45 .arg("REPLACE")
46 .arg(lua_library)
47 .query(&mut con)?;
48
49 Ok(())
50}
51
52impl MultiResourceLock {
53 /// Create a new instance of the lock.
54 ///
55 /// # Errors
56 ///
57 /// When [`Client::get_connection`] errors.
58 #[inline]
59 pub fn new(client: &Client) -> RedisResult<Self> {
60 let conn = client.get_connection()?;
61 Ok(MultiResourceLock { conn })
62 }
63
64 /// Calls [`MultiResourceLock::acquire`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
65 ///
66 /// # Errors
67 ///
68 /// When [`MultiResourceLock::acquire`] errors.
69 #[inline]
70 pub fn acquire_default(&mut self, resources: &[String]) -> RedisResult<Option<String>> {
71 self.acquire(
72 resources,
73 DEFAULT_EXPIRATION,
74 DEFAULT_TIMEOUT,
75 DEFAULT_SLEEP,
76 )
77 }
78
79 /// Attempts to acquire the lock blocking until the lock can be acquired.
80 ///
81 /// Blocks up to `timeout` duration making attempts every `sleep` duration.
82 ///
83 /// Returns `None` when it times out.
84 ///
85 /// # Errors
86 ///
87 /// When [`MultiResourceLock::try_acquire`] errors.
88 #[inline]
89 pub fn acquire(
90 &mut self,
91 resources: &[String],
92 expiration: Duration,
93 timeout: Duration,
94 sleep: Duration,
95 ) -> RedisResult<Option<String>> {
96 let now = std::time::Instant::now();
97 loop {
98 if now.elapsed() > timeout {
99 return Ok(None);
100 }
101 match self.try_acquire(resources, expiration)? {
102 Some(res) => break Ok(Some(res)),
103 None => std::thread::sleep(sleep),
104 }
105 }
106 }
107
108 /// Calls [`MultiResourceLock::try_acquire`] with [`DEFAULT_EXPIRATION`].
109 ///
110 /// # Errors
111 ///
112 /// When [`MultiResourceLock::try_acquire`] errors.
113 #[inline]
114 pub fn try_acquire_default(&mut self, resources: &[String]) -> RedisResult<Option<String>> {
115 self.try_acquire(resources, DEFAULT_EXPIRATION)
116 }
117
118 /// Attempts to acquire the lock returning immediately if it cannot be immediately acquired.
119 ///
120 /// # Errors
121 ///
122 /// - When the `acquire_lock` function is missing from the Redis instance.
123 #[inline]
124 pub fn try_acquire(
125 &mut self,
126 resources: &[String],
127 expiration: Duration,
128 ) -> RedisResult<Option<String>> {
129 let lock_id = Uuid::new_v4().to_string();
130 let mut args = vec![lock_id.clone(), expiration.as_millis().to_string()];
131 args.extend(resources.iter().cloned());
132
133 let result: Option<String> = redis::cmd("FCALL")
134 .arg("acquire_lock")
135 .arg(0i32)
136 .arg(&args)
137 .query(&mut self.conn)?;
138
139 Ok(result)
140 }
141
142 /// Releases a held lock.
143 ///
144 /// # Errors
145 ///
146 /// - When the `release_lock` function is missing from the Redis instance.
147 /// - When `lock_id` does not refer to a held lock.
148 #[inline]
149 pub fn release(&mut self, lock_id: &str) -> RedisResult<usize> {
150 let result: usize = redis::cmd("FCALL")
151 .arg("release_lock")
152 .arg(0i32)
153 .arg(lock_id)
154 .query(&mut self.conn)?;
155
156 Ok(result)
157 }
158
159 /// Calls [`MultiResourceLock::try_lock`] with [`DEFAULT_EXPIRATION`].
160 ///
161 /// # Errors
162 ///
163 /// When [`MultiResourceLock::try_lock`] errors.
164 #[inline]
165 pub fn try_lock_default(
166 &mut self,
167 resources: &[String],
168 ) -> RedisResult<Option<MultiResourceGuard>> {
169 self.try_lock(resources, DEFAULT_EXPIRATION)
170 }
171
172 /// Attempts to acquire the lock returning immediately if it cannot be immediately acquired.
173 ///
174 /// Wraps the result in a guard that releases the lock when dropped.
175 ///
176 /// # Errors
177 ///
178 /// When [`MultiResourceLock::try_acquire`] errors.
179 #[inline]
180 pub fn try_lock(
181 &mut self,
182 resources: &[String],
183 expiration: Duration,
184 ) -> RedisResult<Option<MultiResourceGuard<'_>>> {
185 self.try_acquire(resources, expiration).map(|result| {
186 result.map(|lock_id| MultiResourceGuard {
187 lock: self,
188 lock_id,
189 })
190 })
191 }
192
193 /// Calls [`MultiResourceLock::lock`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
194 ///
195 /// # Errors
196 ///
197 /// When [`MultiResourceLock::lock`] errors.
198 #[inline]
199 pub fn lock_default(
200 &mut self,
201 resources: &[String],
202 ) -> RedisResult<Option<MultiResourceGuard<'_>>> {
203 self.lock(
204 resources,
205 DEFAULT_EXPIRATION,
206 DEFAULT_TIMEOUT,
207 DEFAULT_SLEEP,
208 )
209 }
210
211 /// Attempts to acquire the lock blocking until the lock can be acquired.
212 ///
213 /// Blocks up to `timeout` duration making attempts every `sleep` duration.
214 ///
215 /// Returns `None` when it times out.
216 ///
217 /// Wraps the result in a guard that releases the lock when dropped.
218 ///
219 /// # Errors
220 ///
221 /// When [`MultiResourceLock::acquire`] errors.
222 #[inline]
223 pub fn lock(
224 &mut self,
225 resources: &[String],
226 expiration: Duration,
227 timeout: Duration,
228 sleep: Duration,
229 ) -> RedisResult<Option<MultiResourceGuard<'_>>> {
230 self.acquire(resources, expiration, timeout, sleep)
231 .map(|result| {
232 result.map(|lock_id| MultiResourceGuard {
233 lock: self,
234 lock_id,
235 })
236 })
237 }
238}
239
240/// A guard that releases the lock when it is dropped.
241#[derive(Debug)]
242pub struct MultiResourceGuard<'a> {
243 /// The lock instance.
244 lock: &'a mut MultiResourceLock,
245 /// The lock identifier.
246 lock_id: String,
247}
248
249#[expect(
250 clippy::unwrap_used,
251 reason = "You can't propagate errors in a `Drop` implementation."
252)]
253impl Drop for MultiResourceGuard<'_> {
254 #[inline]
255 fn drop(&mut self) {
256 self.lock.release(&self.lock_id).unwrap();
257 }
258}