redis_lock/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! Rusty distributed locking backed by Redis.
4//!
5//! ## Locking a single resource
6//!
7//! ```no_run
8//! # use redis::AsyncCommands;
9//! # use tokio::sync::Mutex;
10//! # use std::sync::Arc;
11//! # #[allow(dependency_on_unit_never_type_fallback)]
12//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
13//! # tokio::runtime::Runtime::new()?.block_on(async {
14//! # let client: redis::Client = todo!();
15//! let connection = Arc::new(Mutex::new(
16//! client.get_multiplexed_async_connection().await?
17//! ));
18//! // Execute a function with the lock.
19//! redis_lock::lock_across(
20//! &[connection],
21//! "account1",
22//! async move { /* .. */ },
23//! redis_lock::LockAcrossOptions::default()
24//! ).await?;
25//! # Ok(())
26//! # })
27//! # }
28//! ```
29//!
30//! ## Locking multiple resources
31//!
32//! ```no_run
33//! # use redis::AsyncCommands;
34//! # #[allow(dependency_on_unit_never_type_fallback)]
35//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
36//! # tokio::runtime::Runtime::new()?.block_on(async {
37//! # let client = todo!();
38//! // Setup.
39//! redis_lock::setup(&client).await?;
40//! // Get lock.
41//! let mut lock = redis_lock::MultiResourceLock::new(client.clone())?;
42//! let resources = vec![String::from("account1"), String::from("account2")];
43//! // Execute a function with the lock.
44//! lock.map_default(&resources, async move { /* .. */ }).await?;
45//! # Ok(())
46//! # })
47//! # }
48//! ```
49//!
50//! ## Vs [rslock](https://github.com/hexcowboy/rslock)
51//!
52//! I would recommend this library over [rslock](https://github.com/hexcowboy/rslock) when:
53//! - your application is focussed on `async`.
54//! - your application does operations that require exclusive access to multiple resources.
55//!
56//! ## Similar work
57//!
58//! - <https://github.com/hexcowboy/rslock>
59
60use displaydoc::Display;
61use redis::Client;
62use std::error::Error;
63use std::future::Future;
64use std::time::Duration;
65use thiserror::Error;
66use uuid::Uuid;
67
68/// Re-export of [`redis::RedisError`].
69pub type RedisError = redis::RedisError;
70/// Mimic of [`redis::RedisResult`].
71pub type RedisResult<T> = Result<T, RedisError>;
72
73/// Synchronous implementation of the lock.
74#[cfg(feature = "sync")]
75#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
76pub mod sync;
77
78mod single;
79pub use single::*;
80
81/// A distributed mutual exclusion lock backed by Redis.
82///
83/// Supports exclusion based on multiple resources and partial overlaps.
84///
85/// This is much less efficient than [`lock_across`]. Ideally you should architect your
86/// application so you never need [`MultiResourceLock`].
87///
88/// E.g. a lock on resources `["a", "b"]` will block a lock on `["a"]` or `["b", "c"]`.
89pub struct MultiResourceLock {
90 /// The Redis client.
91 client: Client,
92}
93
94impl std::fmt::Debug for MultiResourceLock {
95 #[inline]
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("MultiResourceLock")
98 .field("conn", &"..")
99 .finish()
100 }
101}
102
103/// Initializes a Redis instance with the Lua library functions required for locking.
104///
105/// This only needs to be done once per Redis instance, although re-doing it should be fine.
106///
107/// # Errors
108///
109/// - When [`Client::get_connection`] errors.
110/// - When the Lua library functions cannot be loaded into Redis.
111#[inline]
112pub async fn setup(client: &Client) -> Result<(), Box<dyn Error>> {
113 // Connect to Redis
114 let mut con = client.get_multiplexed_async_connection().await?;
115
116 // Define your Lua library
117 let lua_library = include_str!("functions.lua");
118
119 // Load the Lua library into Redis
120 redis::cmd("FUNCTION")
121 .arg("LOAD")
122 .arg("REPLACE")
123 .arg(lua_library)
124 .exec_async(&mut con)
125 .await?;
126
127 Ok(())
128}
129
130/// Default expiration duration for the lock.
131pub const DEFAULT_EXPIRATION: Duration = Duration::from_secs(3600);
132/// Default timeout duration for acquiring the lock.
133pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
134/// Default sleep duration between attempts to acquire the lock.
135pub const DEFAULT_SLEEP: Duration = Duration::from_secs(1);
136
137impl MultiResourceLock {
138 /// Create a new instance of the lock.
139 ///
140 /// # Errors
141 ///
142 /// When [`Client::get_connection`] errors.
143 #[inline]
144 pub fn new(client: Client) -> RedisResult<Self> {
145 Ok(MultiResourceLock { client })
146 }
147
148 /// Calls [`MultiResourceLock::acquire`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
149 ///
150 /// # Errors
151 ///
152 /// When [`MultiResourceLock::acquire`] errors.
153 #[inline]
154 pub async fn acquire_default(&mut self, resources: &[String]) -> RedisResult<Option<String>> {
155 self.acquire(
156 resources,
157 DEFAULT_EXPIRATION,
158 DEFAULT_TIMEOUT,
159 DEFAULT_SLEEP,
160 )
161 .await
162 }
163
164 /// Attempts to acquire the lock blocking until the lock can be acquired.
165 ///
166 /// Blocks up to `timeout` duration making attempts every `sleep` duration.
167 ///
168 /// Returns `None` when it times out.
169 ///
170 /// # Errors
171 ///
172 /// When [`MultiResourceLock::try_acquire`] errors.
173 #[inline]
174 pub async fn acquire(
175 &mut self,
176 resources: &[String],
177 expiration: Duration,
178 timeout: Duration,
179 sleep: Duration,
180 ) -> RedisResult<Option<String>> {
181 let now = std::time::Instant::now();
182 loop {
183 if now.elapsed() > timeout {
184 return Ok(None);
185 }
186 match self.try_acquire(resources, expiration).await? {
187 Some(res) => break Ok(Some(res)),
188 None => tokio::time::sleep(sleep).await,
189 }
190 }
191 }
192
193 /// Calls [`MultiResourceLock::try_acquire`] with [`DEFAULT_EXPIRATION`].
194 ///
195 /// # Errors
196 ///
197 /// When [`MultiResourceLock::try_acquire`] errors.
198 #[inline]
199 pub async fn try_acquire_default(
200 &mut self,
201 resources: &[String],
202 ) -> RedisResult<Option<String>> {
203 self.try_acquire(resources, DEFAULT_EXPIRATION).await
204 }
205
206 /// Attempts to acquire the lock returning immediately if it cannot be immediately acquired.
207 ///
208 /// # Errors
209 ///
210 /// - When the `acquire_lock` function is missing from the Redis instance.
211 #[inline]
212 pub async fn try_acquire(
213 &mut self,
214 resources: &[String],
215 expiration: Duration,
216 ) -> RedisResult<Option<String>> {
217 let mut connection = self.client.get_multiplexed_async_connection().await?;
218 let lock_id = Uuid::new_v4().to_string();
219 let mut args = vec![lock_id.clone(), expiration.as_millis().to_string()];
220 args.extend(resources.iter().cloned());
221
222 let result: Option<String> = redis::cmd("FCALL")
223 .arg("acquire_lock")
224 .arg(0i32)
225 .arg(&args)
226 .query_async(&mut connection)
227 .await?;
228
229 Ok(result)
230 }
231
232 /// Releases a held lock.
233 ///
234 /// # Errors
235 ///
236 /// - When the `release_lock` function is missing from the Redis instance.
237 /// - When `lock_id` does not refer to a held lock.
238 #[inline]
239 pub async fn release(&mut self, lock_id: &str) -> RedisResult<usize> {
240 let mut connection = self.client.get_multiplexed_async_connection().await?;
241 let result: usize = redis::cmd("FCALL")
242 .arg("release_lock")
243 .arg(0i32)
244 .arg(lock_id)
245 .query_async(&mut connection)
246 .await?;
247
248 Ok(result)
249 }
250
251 // TODO Catch panics in `f`.
252 /// Since we cannot safely drop a guard in an async context, we need to provide a way to release the lock in case of an error.
253 ///
254 /// This is the suggested approach, it is less ergonomic but it is safe.
255 #[inline]
256 pub async fn map<F>(
257 &mut self,
258 resources: &[String],
259 expiration: Duration,
260 timeout: Duration,
261 sleep: Duration,
262 f: F,
263 ) -> Result<F::Output, MapError>
264 where
265 F: Future + Send + 'static,
266 F::Output: Send + 'static,
267 {
268 let lock_id = self
269 .acquire(resources, expiration, timeout, sleep)
270 .await
271 .map_err(MapError::Acquire)?
272 .ok_or(MapError::Timeout)?;
273 let result = f.await;
274 self.release(&lock_id).await.map_err(MapError::Release)?;
275 Ok(result)
276 }
277
278 /// Calls [`MultiResourceLock::map`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
279 #[inline]
280 pub async fn map_default<F>(
281 &mut self,
282 resources: &[String],
283 f: F,
284 ) -> Result<F::Output, MapError>
285 where
286 F: Future + Send + 'static,
287 F::Output: Send + 'static,
288 {
289 self.map(
290 resources,
291 DEFAULT_EXPIRATION,
292 DEFAULT_TIMEOUT,
293 DEFAULT_SLEEP,
294 f,
295 )
296 .await
297 }
298}
299
300/// Error for [`MultiResourceLock::map`].
301#[derive(Debug, Display, Error)]
302pub enum MapError {
303 /// Timed out attempting to acquire the lock.
304 Timeout,
305 /// Failed to acquire lock: {0}
306 Acquire(RedisError),
307 /// Failed to release lock: {0}
308 Release(RedisError),
309}