redis_lock/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! Rusty distributed locking backed by Redis.
4//!
5//! ## Locking a single resource
6//!
7//! ```no_run
8//! # use redis::AsyncCommands;
9//! # use tokio::sync::Mutex;
10//! # use std::sync::Arc;
11//! # #[allow(dependency_on_unit_never_type_fallback)]
12//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
13//! # tokio::runtime::Runtime::new()?.block_on(async {
14//! # let client: redis::Client = todo!();
15//! let connection = Arc::new(Mutex::new(
16//!     client.get_multiplexed_async_connection().await?
17//! ));
18//! // Execute a function with the lock.
19//! redis_lock::lock_across(
20//!     &[connection],
21//!     "account1",
22//!     async move { /* .. */ },
23//!     redis_lock::LockAcrossOptions::default()
24//! ).await?;
25//! # Ok(())
26//! # })
27//! # }
28//! ```
29//!
30//! ## Locking multiple resources
31//!
32//! ```no_run
33//! # use redis::AsyncCommands;
34//! # #[allow(dependency_on_unit_never_type_fallback)]
35//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
36//! # tokio::runtime::Runtime::new()?.block_on(async {
37//! # let client = todo!();
38//! // Setup.
39//! redis_lock::setup(&client).await?;
40//! // Get lock.
41//! let mut lock = redis_lock::MultiResourceLock::new(client.clone())?;
42//! let resources = vec![String::from("account1"), String::from("account2")];
43//! // Execute a function with the lock.
44//! lock.map_default(&resources, async move { /* .. */ }).await?;
45//! # Ok(())
46//! # })
47//! # }
48//! ```
49//!
50//! ## Vs [rslock](https://github.com/hexcowboy/rslock)
51//!
52//! I would recommend this library over [rslock](https://github.com/hexcowboy/rslock) when:
53//! - your application is focussed on `async`.
54//! - your application does operations that require exclusive access to multiple resources.
55//!
56//! ## Similar work
57//!
58//! - <https://github.com/hexcowboy/rslock>
59
60use displaydoc::Display;
61use redis::Client;
62use std::error::Error;
63use std::future::Future;
64use std::time::Duration;
65use thiserror::Error;
66use uuid::Uuid;
67
68/// Re-export of [`redis::RedisError`].
69pub type RedisError = redis::RedisError;
70/// Mimic of [`redis::RedisResult`].
71pub type RedisResult<T> = Result<T, RedisError>;
72
73/// Synchronous implementation of the lock.
74#[cfg(feature = "sync")]
75#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
76pub mod sync;
77
78mod single;
79pub use single::*;
80
81/// A distributed mutual exclusion lock backed by Redis.
82///
83/// Supports exclusion based on multiple resources and partial overlaps.
84///
85/// This is much less efficient than [`lock_across`]. Ideally you should architect your
86/// application so you never need [`MultiResourceLock`].
87///
88/// E.g. a lock on resources `["a", "b"]` will block a lock on `["a"]` or `["b", "c"]`.
89pub struct MultiResourceLock {
90    /// The Redis client.
91    client: Client,
92}
93
94impl std::fmt::Debug for MultiResourceLock {
95    #[inline]
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("MultiResourceLock")
98            .field("conn", &"..")
99            .finish()
100    }
101}
102
103/// Initializes a Redis instance with the Lua library functions required for locking.
104///
105/// This only needs to be done once per Redis instance, although re-doing it should be fine.
106///
107/// # Errors
108///
109/// - When [`Client::get_connection`] errors.
110/// - When the Lua library functions cannot be loaded into Redis.
111#[inline]
112pub async fn setup(client: &Client) -> Result<(), Box<dyn Error>> {
113    // Connect to Redis
114    let mut con = client.get_multiplexed_async_connection().await?;
115
116    // Define your Lua library
117    let lua_library = include_str!("functions.lua");
118
119    // Load the Lua library into Redis
120    redis::cmd("FUNCTION")
121        .arg("LOAD")
122        .arg("REPLACE")
123        .arg(lua_library)
124        .exec_async(&mut con)
125        .await?;
126
127    Ok(())
128}
129
130/// Default expiration duration for the lock.
131pub const DEFAULT_EXPIRATION: Duration = Duration::from_secs(3600);
132/// Default timeout duration for acquiring the lock.
133pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
134/// Default sleep duration between attempts to acquire the lock.
135pub const DEFAULT_SLEEP: Duration = Duration::from_secs(1);
136
137impl MultiResourceLock {
138    /// Create a new instance of the lock.
139    ///
140    /// # Errors
141    ///
142    /// When [`Client::get_connection`] errors.
143    #[inline]
144    pub fn new(client: Client) -> RedisResult<Self> {
145        Ok(MultiResourceLock { client })
146    }
147
148    /// Calls [`MultiResourceLock::acquire`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
149    ///
150    /// # Errors
151    ///
152    /// When [`MultiResourceLock::acquire`] errors.
153    #[inline]
154    pub async fn acquire_default(&mut self, resources: &[String]) -> RedisResult<Option<String>> {
155        self.acquire(
156            resources,
157            DEFAULT_EXPIRATION,
158            DEFAULT_TIMEOUT,
159            DEFAULT_SLEEP,
160        )
161        .await
162    }
163
164    /// Attempts to acquire the lock blocking until the lock can be acquired.
165    ///
166    /// Blocks up to `timeout` duration making attempts every `sleep` duration.
167    ///
168    /// Returns `None` when it times out.
169    ///
170    /// # Errors
171    ///
172    /// When [`MultiResourceLock::try_acquire`] errors.
173    #[inline]
174    pub async fn acquire(
175        &mut self,
176        resources: &[String],
177        expiration: Duration,
178        timeout: Duration,
179        sleep: Duration,
180    ) -> RedisResult<Option<String>> {
181        let now = std::time::Instant::now();
182        loop {
183            if now.elapsed() > timeout {
184                return Ok(None);
185            }
186            match self.try_acquire(resources, expiration).await? {
187                Some(res) => break Ok(Some(res)),
188                None => tokio::time::sleep(sleep).await,
189            }
190        }
191    }
192
193    /// Calls [`MultiResourceLock::try_acquire`] with [`DEFAULT_EXPIRATION`].
194    ///
195    /// # Errors
196    ///
197    /// When [`MultiResourceLock::try_acquire`] errors.
198    #[inline]
199    pub async fn try_acquire_default(
200        &mut self,
201        resources: &[String],
202    ) -> RedisResult<Option<String>> {
203        self.try_acquire(resources, DEFAULT_EXPIRATION).await
204    }
205
206    /// Attempts to acquire the lock returning immediately if it cannot be immediately acquired.
207    ///
208    /// # Errors
209    ///
210    /// - When the `acquire_lock` function is missing from the Redis instance.
211    #[inline]
212    pub async fn try_acquire(
213        &mut self,
214        resources: &[String],
215        expiration: Duration,
216    ) -> RedisResult<Option<String>> {
217        let mut connection = self.client.get_multiplexed_async_connection().await?;
218        let lock_id = Uuid::new_v4().to_string();
219        let mut args = vec![lock_id.clone(), expiration.as_millis().to_string()];
220        args.extend(resources.iter().cloned());
221
222        let result: Option<String> = redis::cmd("FCALL")
223            .arg("acquire_lock")
224            .arg(0i32)
225            .arg(&args)
226            .query_async(&mut connection)
227            .await?;
228
229        Ok(result)
230    }
231
232    /// Releases a held lock.
233    ///
234    /// # Errors
235    ///
236    /// - When the `release_lock` function is missing from the Redis instance.
237    /// - When `lock_id` does not refer to a held lock.
238    #[inline]
239    pub async fn release(&mut self, lock_id: &str) -> RedisResult<usize> {
240        let mut connection = self.client.get_multiplexed_async_connection().await?;
241        let result: usize = redis::cmd("FCALL")
242            .arg("release_lock")
243            .arg(0i32)
244            .arg(lock_id)
245            .query_async(&mut connection)
246            .await?;
247
248        Ok(result)
249    }
250
251    // TODO Catch panics in `f`.
252    /// Since we cannot safely drop a guard in an async context, we need to provide a way to release the lock in case of an error.
253    ///
254    /// This is the suggested approach, it is less ergonomic but it is safe.
255    #[inline]
256    pub async fn map<F>(
257        &mut self,
258        resources: &[String],
259        expiration: Duration,
260        timeout: Duration,
261        sleep: Duration,
262        f: F,
263    ) -> Result<F::Output, MapError>
264    where
265        F: Future + Send + 'static,
266        F::Output: Send + 'static,
267    {
268        let lock_id = self
269            .acquire(resources, expiration, timeout, sleep)
270            .await
271            .map_err(MapError::Acquire)?
272            .ok_or(MapError::Timeout)?;
273        let result = f.await;
274        self.release(&lock_id).await.map_err(MapError::Release)?;
275        Ok(result)
276    }
277
278    /// Calls [`MultiResourceLock::map`] with [`DEFAULT_EXPIRATION`], [`DEFAULT_TIMEOUT`] and [`DEFAULT_SLEEP`].
279    #[inline]
280    pub async fn map_default<F>(
281        &mut self,
282        resources: &[String],
283        f: F,
284    ) -> Result<F::Output, MapError>
285    where
286        F: Future + Send + 'static,
287        F::Output: Send + 'static,
288    {
289        self.map(
290            resources,
291            DEFAULT_EXPIRATION,
292            DEFAULT_TIMEOUT,
293            DEFAULT_SLEEP,
294            f,
295        )
296        .await
297    }
298}
299
300/// Error for [`MultiResourceLock::map`].
301#[derive(Debug, Display, Error)]
302pub enum MapError {
303    /// Timed out attempting to acquire the lock.
304    Timeout,
305    /// Failed to acquire lock: {0}
306    Acquire(RedisError),
307    /// Failed to release lock: {0}
308    Release(RedisError),
309}