1pub mod error;
59pub mod execs;
60pub mod options;
61
62use crate::error::Error;
63use crate::error::Error::IdNotFound;
64use crate::execs::*;
65use crate::options::Options;
66
67use anyhow::Result;
68use redis::aio::{ConnectionManager, ConnectionManagerConfig};
69use tokio::sync::oneshot;
70use tokio::time::sleep;
71use tokio::{select, spawn};
72
73#[derive(Clone)]
74pub struct Locker {
75 client: redis::Client,
76 conn_manager: ConnectionManager,
77}
78
79impl Locker {
80 pub async fn from_redis_url(url: &str) -> Result<Self> {
81 let client = redis::Client::open(url)?;
82 let cfg = ConnectionManagerConfig::default().set_max_delay(2000);
83 let async_conn_manager = ConnectionManager::new_with_config(client.clone(), cfg).await?;
84 Ok(Self {
85 client,
86 conn_manager: async_conn_manager,
87 })
88 }
89
90 pub async fn acquire(&mut self, lock_key: &str) -> Result<Lock> {
91 self.acquire_with_options(&Options::default(), lock_key)
92 .await
93 }
94
95 pub async fn acquire_with_options(&mut self, opts: &Options, lock_key: &str) -> Result<Lock> {
96 let lock_id = lock(
97 &mut self.conn_manager,
98 lock_key,
99 opts.lifetime,
100 opts.retry_interval,
101 opts.retry_timeout,
102 )
103 .await?;
104
105 let mut conn = self.conn_manager.clone();
106 let opts = opts.clone();
107 let lock_key_c1 = lock_key.to_owned();
108 let lock_id_c1 = lock_id.clone();
109 let (stop_tx, mut stop_rx) = oneshot::channel();
110
111 spawn(async move {
112 loop {
113 select! {
114 _ = &mut stop_rx => break,
115 _ = sleep(opts.extend_interval) => {
116 if let Err(e) = extend(
117 &mut conn,
118 &lock_key_c1,
119 &lock_id_c1,
120 opts.lifetime,
121 )
122 .await
123 {
124 if let Some(e) = e.downcast_ref::<Error>() {
125 if matches!(e, IdNotFound) {
126 break;
127 }
128 }
129 }
130 },
131 }
132 }
133 });
134
135 let cli = self.client.clone();
136 let lock_key_c2 = lock_key.to_owned();
137 let lock_id_c2 = lock_id.clone();
138
139 Ok(Lock {
140 release_fn: Some(Box::new(move || -> Result<()> {
141 let _ = stop_tx.send(());
142 let mut conn = cli.get_connection()?;
143 unlock_sync(&mut conn, &lock_key_c2, &lock_id_c2)
144 })),
145 })
146 }
147}
148
149pub struct Lock {
150 pub release_fn: Option<Box<dyn FnOnce() -> Result<()> + Send + 'static>>,
151}
152
153impl Lock {
154 pub fn release(mut self) -> Result<()> {
155 self.call_release()
156 }
157
158 fn call_release(&mut self) -> Result<()> {
159 match self.release_fn.take() {
160 Some(release_fn) => release_fn(),
161 None => Ok(()),
162 }
163 }
164}
165
166impl Drop for Lock {
167 fn drop(&mut self) {
168 let _ = self.call_release();
169 }
170}
171
172#[cfg(test)]
173mod test {
174 use super::*;
175 use std::time::Duration;
176
177 #[tokio::test]
178 async fn test_lock_exclusive() {
179 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
180 .await
181 .unwrap();
182 let lock_key = String::from("test:test_lock_exclusive_key");
183
184 let r = locker.acquire(&lock_key).await;
185 assert!(r.is_ok(), "Should acquire a lock");
186
187 match locker.acquire(&lock_key).await.err() {
188 None => assert!(false, "Should get an error when acquiring another lock"),
189 Some(e) => {
190 assert_eq!(
191 e.downcast_ref::<Error>().unwrap(),
192 &Error::Timeout,
193 "Should get a timed out error when acquiring another lock"
194 )
195 }
196 }
197
198 assert!(r.unwrap().release().is_ok(), "Should release a lock");
199
200 assert!(
201 locker.acquire(&lock_key).await.is_ok(),
202 "Should acquire a lock after another lock is released"
203 );
204 }
205
206 #[tokio::test]
207 async fn test_lock_drop() {
208 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
209 .await
210 .unwrap();
211 let lock_key = "test:test_lock_drop_key";
212
213 {
214 let r = locker.acquire(&lock_key).await;
215 assert!(r.is_ok(), "Should acquire a lock in a scope");
216
217 match locker.acquire(&lock_key).await.err() {
218 None => assert!(
219 false,
220 "Should get an error when acquiring another lock in a scope"
221 ),
222 Some(e) => {
223 assert_eq!(
224 e.downcast_ref::<Error>().unwrap(),
225 &Error::Timeout,
226 "Should get an timed out error when acquiring another lock in a scope"
227 );
228 }
229 }
230 }
231
232 assert!(
233 locker.acquire(&lock_key).await.is_ok(),
234 "Should acquire a lock out of the prev scope"
235 );
236 }
237
238 #[tokio::test]
239 async fn test_lock_passive_release() {
240 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
241 .await
242 .unwrap();
243 let lock_key = "test:test_lock_passive_release_key";
244
245 let opts = Options::new()
246 .lifetime(Duration::from_secs(2))
247 .extend_interval(Duration::from_secs(3));
248 let r = locker.acquire_with_options(&opts, &lock_key).await;
249 assert!(
250 r.is_ok(),
251 "Should acquire a lock with customized lifetime and extend_interval, extend_interval greater than lifetime"
252 );
253
254 sleep(Duration::from_secs(3)).await;
255 assert!(
256 locker.acquire(&lock_key).await.is_ok(),
257 "Should passively release a lock when the lifetime is reached"
258 );
259 }
260
261 #[tokio::test]
262 async fn test_lock_extend() {
263 let mut locker = Locker::from_redis_url("redis://127.0.0.1:6379/0")
264 .await
265 .unwrap();
266 let lock_key = "test:test_lock_extend_key";
267 let opts = Options::new()
268 .lifetime(Duration::from_secs(3))
269 .extend_interval(Duration::from_secs(2));
270 let r = locker.acquire_with_options(&opts, &lock_key).await;
271 assert!(
272 r.is_ok(),
273 "Should acquire a lock with customized lifetime and extend_interval, extend_interval smaller than lifetime."
274 );
275
276 sleep(Duration::from_secs(5)).await;
277 match locker.acquire(&lock_key).await.err() {
278 None => assert!(false, "Should expand lock lifetime automatically."),
279 Some(e) => {
280 assert_eq!(e.downcast_ref::<Error>().unwrap(), &Error::Timeout)
281 }
282 }
283 }
284}